From c1b016cf5184fd177b608631fd855a88d68ef420 Mon Sep 17 00:00:00 2001 From: hscott Date: Thu, 5 Dec 2024 12:25:44 -0500 Subject: [PATCH] moved to more advanced loop handler --- precog/utils/general.py | 31 +++++++++++--- precog/validators/weight_setter.py | 69 +++++++++++------------------- 2 files changed, 52 insertions(+), 48 deletions(-) diff --git a/precog/utils/general.py b/precog/utils/general.py index 912b5f1..f0a343f 100644 --- a/precog/utils/general.py +++ b/precog/utils/general.py @@ -1,9 +1,11 @@ import argparse +import asyncio import re from typing import Optional import bittensor as bt import git +from numpy import argsort, array, concatenate, cumsum, empty_like import requests from precog.utils.classes import NestedNamespace @@ -88,15 +90,34 @@ def get_version() -> Optional[str]: def rank(vector): if vector is None or len(vector) <= 1: - return np.array([0]) + return array([0]) else: # Sort the array and get the indices that would sort it - sorted_indices = np.argsort(vector) + sorted_indices = argsort(vector) sorted_vector = vector[sorted_indices] # Create a mask for where each new unique value starts in the sorted array - unique_mask = np.concatenate(([True], sorted_vector[1:] != sorted_vector[:-1])) + unique_mask = concatenate(([True], sorted_vector[1:] != sorted_vector[:-1])) # Use cumulative sum of the unique mask to get the ranks, then assign back in original order - ranks = np.cumsum(unique_mask) - 1 - rank_vector = np.empty_like(vector, dtype=int) + ranks = cumsum(unique_mask) - 1 + rank_vector = empty_like(vector, dtype=int) rank_vector[sorted_indices] = ranks return rank_vector + + +async def loop_handler(self, func, sleep_time=120): + try: + while not self.stop_event.is_set(): + async with self.lock: + await func() + await asyncio.sleep(sleep_time) + except asyncio.CancelledError: + bt.logging.error(f"{func.__name__} cancelled") + raise + except KeyboardInterrupt: + raise + except Exception as e: + bt.logging.error(f"{func.__name__} raised error: {e}") + raise + finally: + async with self.lock: + self.stop_event.set() diff --git a/precog/validators/weight_setter.py b/precog/validators/weight_setter.py index 7fcd768..54e3ab6 100755 --- a/precog/validators/weight_setter.py +++ b/precog/validators/weight_setter.py @@ -11,6 +11,7 @@ from precog.protocol import Challenge from precog.utils.bittensor import check_uid_availability, print_info, setup_bittensor_objects from precog.utils.classes import MinerHistory +from precog.utils.general import loop_handler from precog.utils.timestamp import elapsed_seconds, get_before, get_now, is_query_time, iso8601_to_datetime from precog.utils.wandb import log_wandb, setup_wandb from precog.validators.reward import calc_rewards @@ -51,10 +52,10 @@ def __init__(self, config=None, loop=None): self.stop_event = asyncio.Event() bt.logging.info("Setup complete, starting loop") self.loop.create_task( - self.loop_handler(self.scheduled_prediction_request, sleep_time=self.config.print_cadence) + loop_handler(self.scheduled_prediction_request, sleep_time=self.config.print_cadence) ) - self.loop.create_task(self.loop_handler(self.resync_metagraph, sleep_time=self.resync_metagraph_rate)) - self.loop.create_task(self.loop_handler(self.set_weights, sleep_time=self.set_weights_rate)) + self.loop.create_task(loop_handler(self.resync_metagraph, sleep_time=self.resync_metagraph_rate)) + self.loop.create_task(loop_handler(self.set_weights, sleep_time=self.set_weights_rate)) def __exit__(self, exc_type, exc_value, traceback): self.save_state() @@ -67,21 +68,6 @@ def __exit__(self, exc_type, exc_value, traceback): bt.logging.error(f"Error on __exit__ function: {e}") self.loop.stop() - async def loop_handler(self, func, sleep_time=120): - try: - while not self.stop_event.is_set(): - await func() - await asyncio.sleep(sleep_time) - except asyncio.exceptions.CancelledError: - raise - except KeyboardInterrupt: - raise - except Exception: - raise - finally: - async with self.lock: - self.stop_event.set() - self.__exit__(None, None, None) async def get_available_uids(self): miner_uids = [] @@ -93,23 +79,22 @@ async def get_available_uids(self): async def resync_metagraph(self, force=False): """Resyncs the metagraph and updates the hotkeys and moving averages based on the new metagraph.""" - async with self.lock: - self.blocks_since_sync = self.current_block - self.last_sync - if self.blocks_since_sync >= self.resync_metagraph_rate or force: - bt.logging.info("Syncing Metagraph...") - self.metagraph.sync(subtensor=self.subtensor) - bt.logging.info("Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages") - # Zero out all hotkeys that have been replaced. - self.available_uids = asyncio.run(self.get_available_uids()) - for uid, hotkey in enumerate(self.metagraph.hotkeys): - if (uid not in self.MinerHistory and uid in self.available_uids) or self.hotkeys[uid] != hotkey: - bt.logging.info(f"Replacing hotkey on {uid} with {self.metagraph.hotkeys[uid]}") - self.hotkeys[uid] = hotkey - self.scores[uid] = 0 # hotkey has been replaced - self.MinerHistory[uid] = MinerHistory(uid, timezone=self.timezone) - self.moving_average_scores[uid] = 0 - self.last_sync = self.subtensor.get_current_block() - self.save_state() + self.blocks_since_sync = self.current_block - self.last_sync + if self.blocks_since_sync >= self.resync_metagraph_rate or force: + bt.logging.info("Syncing Metagraph...") + self.metagraph.sync(subtensor=self.subtensor) + bt.logging.info("Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages") + # Zero out all hotkeys that have been replaced. + self.available_uids = asyncio.run(self.get_available_uids()) + for uid, hotkey in enumerate(self.metagraph.hotkeys): + if (uid not in self.MinerHistory and uid in self.available_uids) or self.hotkeys[uid] != hotkey: + bt.logging.info(f"Replacing hotkey on {uid} with {self.metagraph.hotkeys[uid]}") + self.hotkeys[uid] = hotkey + self.scores[uid] = 0 # hotkey has been replaced + self.MinerHistory[uid] = MinerHistory(uid, timezone=self.timezone) + self.moving_average_scores[uid] = 0 + self.last_sync = self.subtensor.get_current_block() + self.save_state() def query_miners(self): timestamp = get_now().isoformat() @@ -133,9 +118,8 @@ def node_query(self, module, method, params): async def set_weights(self): if self.blocks_since_last_update >= self.set_weights_rate: - async with self.lock: - uids = array(self.available_uids) - weights = [self.moving_average_scores[uid] for uid in self.available_uids] + uids = array(self.available_uids) + weights = [self.moving_average_scores[uid] for uid in self.available_uids] for i, j in zip(weights, self.available_uids): bt.logging.debug(f"UID: {j} | Weight: {i}") if sum(weights) == 0: @@ -162,11 +146,10 @@ async def set_weights(self): "Failed to set weights this iteration with message:", msg, ) - async with self.lock: - self.current_block = self.subtensor.get_current_block() - self.blocks_since_last_update = ( - self.current_block - self.node_query("SubtensorModule", "LastUpdate", [self.config.netuid])[self.my_uid] - ) + self.current_block = self.subtensor.get_current_block() + self.blocks_since_last_update = ( + self.current_block - self.node_query("SubtensorModule", "LastUpdate", [self.config.netuid])[self.my_uid] + ) async def scheduled_prediction_request(self): if not hasattr(self, "timestamp"):