Skip to content

Commit

Permalink
moved to more advanced loop handler
Browse files Browse the repository at this point in the history
  • Loading branch information
hscott-yuma committed Dec 5, 2024
1 parent 6ecdd83 commit c1b016c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 48 deletions.
31 changes: 26 additions & 5 deletions precog/utils/general.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import asyncio
import re
from typing import Optional

import bittensor as bt
import git
from numpy import argsort, array, concatenate, cumsum, empty_like
import requests

from precog.utils.classes import NestedNamespace
Expand Down Expand Up @@ -88,15 +90,34 @@ def get_version() -> Optional[str]:

def rank(vector):
if vector is None or len(vector) <= 1:
return np.array([0])
return array([0])
else:
# Sort the array and get the indices that would sort it
sorted_indices = np.argsort(vector)
sorted_indices = argsort(vector)
sorted_vector = vector[sorted_indices]
# Create a mask for where each new unique value starts in the sorted array
unique_mask = np.concatenate(([True], sorted_vector[1:] != sorted_vector[:-1]))
unique_mask = concatenate(([True], sorted_vector[1:] != sorted_vector[:-1]))
# Use cumulative sum of the unique mask to get the ranks, then assign back in original order
ranks = np.cumsum(unique_mask) - 1
rank_vector = np.empty_like(vector, dtype=int)
ranks = cumsum(unique_mask) - 1
rank_vector = empty_like(vector, dtype=int)
rank_vector[sorted_indices] = ranks
return rank_vector


async def loop_handler(self, func, sleep_time=120):
try:
while not self.stop_event.is_set():
async with self.lock:
await func()
await asyncio.sleep(sleep_time)
except asyncio.CancelledError:
bt.logging.error(f"{func.__name__} cancelled")
raise
except KeyboardInterrupt:
raise
except Exception as e:
bt.logging.error(f"{func.__name__} raised error: {e}")
raise
finally:
async with self.lock:
self.stop_event.set()
69 changes: 26 additions & 43 deletions precog/validators/weight_setter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from precog.protocol import Challenge
from precog.utils.bittensor import check_uid_availability, print_info, setup_bittensor_objects
from precog.utils.classes import MinerHistory
from precog.utils.general import loop_handler
from precog.utils.timestamp import elapsed_seconds, get_before, get_now, is_query_time, iso8601_to_datetime
from precog.utils.wandb import log_wandb, setup_wandb
from precog.validators.reward import calc_rewards
Expand Down Expand Up @@ -51,10 +52,10 @@ def __init__(self, config=None, loop=None):
self.stop_event = asyncio.Event()
bt.logging.info("Setup complete, starting loop")
self.loop.create_task(
self.loop_handler(self.scheduled_prediction_request, sleep_time=self.config.print_cadence)
loop_handler(self.scheduled_prediction_request, sleep_time=self.config.print_cadence)
)
self.loop.create_task(self.loop_handler(self.resync_metagraph, sleep_time=self.resync_metagraph_rate))
self.loop.create_task(self.loop_handler(self.set_weights, sleep_time=self.set_weights_rate))
self.loop.create_task(loop_handler(self.resync_metagraph, sleep_time=self.resync_metagraph_rate))
self.loop.create_task(loop_handler(self.set_weights, sleep_time=self.set_weights_rate))

def __exit__(self, exc_type, exc_value, traceback):
self.save_state()
Expand All @@ -67,21 +68,6 @@ def __exit__(self, exc_type, exc_value, traceback):
bt.logging.error(f"Error on __exit__ function: {e}")
self.loop.stop()

async def loop_handler(self, func, sleep_time=120):
try:
while not self.stop_event.is_set():
await func()
await asyncio.sleep(sleep_time)
except asyncio.exceptions.CancelledError:
raise
except KeyboardInterrupt:
raise
except Exception:
raise
finally:
async with self.lock:
self.stop_event.set()
self.__exit__(None, None, None)

async def get_available_uids(self):
miner_uids = []
Expand All @@ -93,23 +79,22 @@ async def get_available_uids(self):

async def resync_metagraph(self, force=False):
"""Resyncs the metagraph and updates the hotkeys and moving averages based on the new metagraph."""
async with self.lock:
self.blocks_since_sync = self.current_block - self.last_sync
if self.blocks_since_sync >= self.resync_metagraph_rate or force:
bt.logging.info("Syncing Metagraph...")
self.metagraph.sync(subtensor=self.subtensor)
bt.logging.info("Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages")
# Zero out all hotkeys that have been replaced.
self.available_uids = asyncio.run(self.get_available_uids())
for uid, hotkey in enumerate(self.metagraph.hotkeys):
if (uid not in self.MinerHistory and uid in self.available_uids) or self.hotkeys[uid] != hotkey:
bt.logging.info(f"Replacing hotkey on {uid} with {self.metagraph.hotkeys[uid]}")
self.hotkeys[uid] = hotkey
self.scores[uid] = 0 # hotkey has been replaced
self.MinerHistory[uid] = MinerHistory(uid, timezone=self.timezone)
self.moving_average_scores[uid] = 0
self.last_sync = self.subtensor.get_current_block()
self.save_state()
self.blocks_since_sync = self.current_block - self.last_sync
if self.blocks_since_sync >= self.resync_metagraph_rate or force:
bt.logging.info("Syncing Metagraph...")
self.metagraph.sync(subtensor=self.subtensor)
bt.logging.info("Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages")
# Zero out all hotkeys that have been replaced.
self.available_uids = asyncio.run(self.get_available_uids())
for uid, hotkey in enumerate(self.metagraph.hotkeys):
if (uid not in self.MinerHistory and uid in self.available_uids) or self.hotkeys[uid] != hotkey:
bt.logging.info(f"Replacing hotkey on {uid} with {self.metagraph.hotkeys[uid]}")
self.hotkeys[uid] = hotkey
self.scores[uid] = 0 # hotkey has been replaced
self.MinerHistory[uid] = MinerHistory(uid, timezone=self.timezone)
self.moving_average_scores[uid] = 0
self.last_sync = self.subtensor.get_current_block()
self.save_state()

def query_miners(self):
timestamp = get_now().isoformat()
Expand All @@ -133,9 +118,8 @@ def node_query(self, module, method, params):

async def set_weights(self):
if self.blocks_since_last_update >= self.set_weights_rate:
async with self.lock:
uids = array(self.available_uids)
weights = [self.moving_average_scores[uid] for uid in self.available_uids]
uids = array(self.available_uids)
weights = [self.moving_average_scores[uid] for uid in self.available_uids]
for i, j in zip(weights, self.available_uids):
bt.logging.debug(f"UID: {j} | Weight: {i}")
if sum(weights) == 0:
Expand All @@ -162,11 +146,10 @@ async def set_weights(self):
"Failed to set weights this iteration with message:",
msg,
)
async with self.lock:
self.current_block = self.subtensor.get_current_block()
self.blocks_since_last_update = (
self.current_block - self.node_query("SubtensorModule", "LastUpdate", [self.config.netuid])[self.my_uid]
)
self.current_block = self.subtensor.get_current_block()
self.blocks_since_last_update = (
self.current_block - self.node_query("SubtensorModule", "LastUpdate", [self.config.netuid])[self.my_uid]
)

async def scheduled_prediction_request(self):
if not hasattr(self, "timestamp"):
Expand Down

0 comments on commit c1b016c

Please sign in to comment.