diff --git a/Makefile b/Makefile index cee96c2..721c621 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,19 @@ -network = ws://127.0.0.1:9944 -netuid = 1 +## Network Parameters ## +finney = wss://entrypoint-finney.opentensor.ai:443 +testnet = wss://test.finney.opentensor.ai:443 +locanet = ws://127.0.0.1:9944 + +testnet_netuid = 256 +localnet_netuid = 1 logging_level = trace # options= ['info', 'debug', 'trace'] -coldkey = cm-owner + +netuid = $(testnet_netuid) +network = $(testnet) + +## User Parameters +coldkey = default +validator_hotkey = validator +miner_hotkey = miner metagraph: btcli subnet metagraph --netuid $(netuid) --subtensor.chain_endpoint $(network) @@ -16,32 +28,23 @@ validator: python start_validator.py \ --neuron.name validator \ --wallet.name $(coldkey) \ - --wallet.hotkey validator \ + --wallet.hotkey $(validator_hotkey) \ --subtensor.chain_endpoint $(network) \ --axon.port 30335 \ --netuid $(netuid) \ --logging.level $(logging_level) -validator2: - python start_validator.py \ - --neuron.name validator2 \ - --wallet.name $(coldkey) \ - --wallet.hotkey validator2 \ - --subtensor.chain_endpoint $(network) \ - --axon.port 30339 \ - --netuid $(netuid) \ - --logging.level $(logging_level) - miner: python start_miner.py \ --neuron.name miner \ --wallet.name $(coldkey) \ - --wallet.hotkey miner \ + --wallet.hotkey $(miner_hotkey) \ --subtensor.chain_endpoint $(network) \ --axon.port 30336 \ --netuid $(netuid) \ --logging.level $(logging_level) \ --timeout 16 \ + --vpermit_tao_limit 2 \ --forward_function forward miner2: @@ -55,25 +58,3 @@ miner2: --logging.level $(logging_level) \ --timeout 16 \ --forward_function forward_bad - -miner3: - python start_miner.py \ - --neuron.name miner3 \ - --wallet.name $(coldkey) \ - --wallet.hotkey miner3 \ - --subtensor.chain_endpoint $(network) \ - --axon.port 30338 \ - --netuid $(netuid) \ - --logging.level $(logging_level) \ - --timeout 16 \ - --forward_function forward - -setup_local: - btcli wallet faucet --wallet.name $(coldkey) --subtensor.chain_endpoint $(network) ;\ - btcli subnet create --wallet.name $(coldkey) --subtensor.chain_endpoint $(network) ;\ - btcli subnet register \ - --wallet.name $(coldkey) \ - --wallet.hotkey validator \ - --netuid $(netuid) - --subtensor.chain_endpoint $(network) ;\ - btcli stake add --wallet.name $(coldkey) --wallet.hotkey validator --amount 1024 ;\ diff --git a/precog/utils/bittensor.py b/precog/utils/bittensor.py index 7903a33..890e390 100644 --- a/precog/utils/bittensor.py +++ b/precog/utils/bittensor.py @@ -13,9 +13,9 @@ def setup_bittensor_objects(self): # if chain endpoint is set, overwrite network arg self.config.subtensor.network = self.config.subtensor.chain_endpoint # Initialize subtensor. - self.subtensor = bt.subtensor(config=self.config, network=self.config.subtensor.network) + self.subtensor = bt.subtensor(config=self.config, network=self.config.subtensor.chain_endpoint) self.metagraph = self.subtensor.metagraph(self.config.netuid) - self.wallet = bt.wallet(name=self.config.wallet.name, hotkey=self.config.wallet.hotkey) + self.wallet = bt.wallet(config=self.config) self.dendrite = bt.dendrite(wallet=self.wallet) self.axon = bt.axon(wallet=self.wallet, config=self.config, port=self.config.axon.port) # Connect the validator to the network. @@ -46,7 +46,7 @@ def setup_bittensor_objects(self): def print_info(self) -> None: if self.config.neuron.type == "Validator": - weight_timing = self.set_weights_rate - self.blocks_since_last_update + weight_timing = self.hyperparameters.weights_rate_limit - self.blocks_since_last_update if weight_timing <= 0: weight_timing = "a few" # hashtag aesthetic af log = ( diff --git a/precog/utils/general.py b/precog/utils/general.py index fbe3d5c..11e6485 100644 --- a/precog/utils/general.py +++ b/precog/utils/general.py @@ -1,10 +1,13 @@ import argparse +import asyncio import re -from typing import Optional +import time +from typing import Any, Callable, Optional import bittensor as bt import git import requests +from numpy import argsort, array, concatenate, cumsum, empty_like from precog.utils.classes import NestedNamespace @@ -84,3 +87,54 @@ def get_version() -> Optional[str]: raise Exception("Version information not found") return version_match.group() + + +def rank(vector): + if vector is None or len(vector) <= 1: + return array([0]) + else: + # Sort the array and get the indices that would sort it + sorted_indices = argsort(vector) + sorted_vector = vector[sorted_indices] + # Create a mask for where each new unique value starts in the sorted array + unique_mask = concatenate(([True], sorted_vector[1:] != sorted_vector[:-1])) + # Use cumulative sum of the unique mask to get the ranks, then assign back in original order + ranks = cumsum(unique_mask) - 1 + rank_vector = empty_like(vector, dtype=int) + rank_vector[sorted_indices] = ranks + return rank_vector + + +async def loop_handler(self, func: Callable, sleep_time: float = 120): + try: + while not self.stop_event.is_set(): + async with self.lock: + await func() + await asyncio.sleep(sleep_time) + except asyncio.CancelledError: + bt.logging.error(f"{func.__name__} cancelled") + raise + except KeyboardInterrupt: + raise + except Exception as e: + bt.logging.error(f"{func.__name__} raised error: {e}") + raise e + finally: + async with self.lock: + self.stop_event.set() + + +def func_with_retry(func: Callable, max_attempts: int = 3, delay: float = 1, *args, **kwargs) -> Any: + attempt = 0 + while attempt < max_attempts: + try: + result = func(*args, **kwargs) + return result + except Exception as e: + attempt += 1 + bt.logging.debug(f"Function {func} failed: Attempt {attempt} of {max_attempts} with error: {e}") + if attempt == max_attempts: + bt.logging.error(f"Function {func} failed {max_attempts} times, skipping.") + raise + else: + time.sleep(delay) diff --git a/precog/validators/reward.py b/precog/validators/reward.py index 22e14c8..bad11fc 100644 --- a/precog/validators/reward.py +++ b/precog/validators/reward.py @@ -5,6 +5,7 @@ import numpy as np from precog.protocol import Challenge +from precog.utils.general import rank from precog.utils.timestamp import align_timepoints, get_now, mature_dictionary, round_minute_down @@ -49,22 +50,6 @@ def calc_rewards( return rewards -def rank(vector): - if vector is None or len(vector) <= 1: - return np.array([0]) - else: - # Sort the array and get the indices that would sort it - sorted_indices = np.argsort(vector) - sorted_vector = vector[sorted_indices] - # Create a mask for where each new unique value starts in the sorted array - unique_mask = np.concatenate(([True], sorted_vector[1:] != sorted_vector[:-1])) - # Use cumulative sum of the unique mask to get the ranks, then assign back in original order - ranks = np.cumsum(unique_mask) - 1 - rank_vector = np.empty_like(vector, dtype=int) - rank_vector[sorted_indices] = ranks - return rank_vector - - def interval_error(intervals, cm_prices): if intervals is None: return np.array([0]) diff --git a/precog/validators/validator.py b/precog/validators/validator.py index d09c2c2..b8bf9c3 100755 --- a/precog/validators/validator.py +++ b/precog/validators/validator.py @@ -1,8 +1,6 @@ import asyncio from pathlib import Path -import bittensor as bt - from precog.utils.classes import Config from precog.utils.general import parse_arguments from precog.validators.weight_setter import weight_setter @@ -22,15 +20,6 @@ def __init__(self): async def main(self): loop = asyncio.get_event_loop() self.weight_setter = weight_setter(config=self.config, loop=loop) - try: - loop.run_forever() - except BrokenPipeError: - bt.logging.error("Recieved a Broken Pipe substrate error") - asyncio.run(self.reset_instance()) - except Exception as e: - bt.logging.error(f"Unhandled exception: {e}") - finally: - bt.logging.info("Exiting Validator") async def reset_instance(self): self.__init__() diff --git a/precog/validators/weight_setter.py b/precog/validators/weight_setter.py index 7fcd768..32d2fb9 100755 --- a/precog/validators/weight_setter.py +++ b/precog/validators/weight_setter.py @@ -3,14 +3,15 @@ import pickle import bittensor as bt +import websocket from numpy import array from pytz import timezone -from substrateinterface import SubstrateInterface from precog import __spec_version__ from precog.protocol import Challenge from precog.utils.bittensor import check_uid_availability, print_info, setup_bittensor_objects from precog.utils.classes import MinerHistory +from precog.utils.general import func_with_retry, loop_handler from precog.utils.timestamp import elapsed_seconds, get_before, get_now, is_query_time, iso8601_to_datetime from precog.utils.wandb import log_wandb, setup_wandb from precog.validators.reward import calc_rewards @@ -23,11 +24,10 @@ def __init__(self, config=None, loop=None): self.lock = asyncio.Lock() setup_bittensor_objects(self) self.timezone = timezone("UTC") - self.prediction_interval = self.config.prediction_interval # in minutes + self.prediction_interval = self.config.prediction_interval # in seconds self.N_TIMEPOINTS = self.config.N_TIMEPOINTS # number of timepoints to predict - self.last_sync = 0 - self.set_weights_rate = 100 # in blocks - self.resync_metagraph_rate = 20 # in blocks + self.hyperparameters = func_with_retry(self.subtensor.get_subnet_hyperparameters, netuid=self.config.netuid) + self.resync_metagraph_rate = 600 # in seconds bt.logging.info( f"Running validator for subnet: {self.config.netuid} on network: {self.config.subtensor.network}" ) @@ -40,21 +40,29 @@ def __init__(self, config=None, loop=None): self.save_state() else: self.load_state() - self.node = SubstrateInterface(url=self.config.subtensor.chain_endpoint) self.current_block = self.subtensor.get_current_block() - self.blocks_since_last_update = ( - self.current_block - self.node_query("SubtensorModule", "LastUpdate", [self.config.netuid])[self.my_uid] + self.blocks_since_last_update = self.subtensor.blocks_since_last_update( + netuid=self.config.netuid, uid=self.my_uid ) - self.tempo = self.node_query("SubtensorModule", "Tempo", [self.config.netuid]) if self.config.wandb_on: setup_wandb(self) self.stop_event = asyncio.Event() bt.logging.info("Setup complete, starting loop") self.loop.create_task( - self.loop_handler(self.scheduled_prediction_request, sleep_time=self.config.print_cadence) + loop_handler(self, self.scheduled_prediction_request, sleep_time=self.config.print_cadence) ) - self.loop.create_task(self.loop_handler(self.resync_metagraph, sleep_time=self.resync_metagraph_rate)) - self.loop.create_task(self.loop_handler(self.set_weights, sleep_time=self.set_weights_rate)) + self.loop.create_task(loop_handler(self, self.resync_metagraph, sleep_time=self.resync_metagraph_rate)) + self.loop.create_task(loop_handler(self, self.set_weights, sleep_time=self.hyperparameters.weights_rate_limit)) + try: + self.loop.run_forever() + except websocket._exceptions.WebSocketConnectionClosedException: + # TODO: Exceptions are not being caught in this loop + bt.logging.info("Caught websocket connection closed exception") + self.__reset_instance__() + except Exception as e: + bt.logging.error(f"Error on loop: {e}") + finally: + self.__exit__(None, None, None) def __exit__(self, exc_type, exc_value, traceback): self.save_state() @@ -62,26 +70,15 @@ def __exit__(self, exc_type, exc_value, traceback): pending = asyncio.all_tasks(self.loop) for task in pending: task.cancel() - asyncio.gather(*pending) except Exception as e: bt.logging.error(f"Error on __exit__ function: {e}") - self.loop.stop() - - async def loop_handler(self, func, sleep_time=120): - try: - while not self.stop_event.is_set(): - await func() - await asyncio.sleep(sleep_time) - except asyncio.exceptions.CancelledError: - raise - except KeyboardInterrupt: - raise - except Exception: - raise finally: - async with self.lock: - self.stop_event.set() - self.__exit__(None, None, None) + asyncio.gather(*pending, return_exceptions=True) + self.loop.stop() + + def __reset_instance__(self): + self.__exit__(None, None, None) + self.__init__(self.config, self.loop) async def get_available_uids(self): miner_uids = [] @@ -91,25 +88,22 @@ async def get_available_uids(self): miner_uids.append(uid) return miner_uids - async def resync_metagraph(self, force=False): + async def resync_metagraph(self): """Resyncs the metagraph and updates the hotkeys and moving averages based on the new metagraph.""" - async with self.lock: - self.blocks_since_sync = self.current_block - self.last_sync - if self.blocks_since_sync >= self.resync_metagraph_rate or force: - bt.logging.info("Syncing Metagraph...") - self.metagraph.sync(subtensor=self.subtensor) - bt.logging.info("Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages") - # Zero out all hotkeys that have been replaced. - self.available_uids = asyncio.run(self.get_available_uids()) - for uid, hotkey in enumerate(self.metagraph.hotkeys): - if (uid not in self.MinerHistory and uid in self.available_uids) or self.hotkeys[uid] != hotkey: - bt.logging.info(f"Replacing hotkey on {uid} with {self.metagraph.hotkeys[uid]}") - self.hotkeys[uid] = hotkey - self.scores[uid] = 0 # hotkey has been replaced - self.MinerHistory[uid] = MinerHistory(uid, timezone=self.timezone) - self.moving_average_scores[uid] = 0 - self.last_sync = self.subtensor.get_current_block() - self.save_state() + self.subtensor = bt.subtensor(config=self.config, network=self.config.subtensor.chain_endpoint) + bt.logging.info("Syncing Metagraph...") + self.metagraph.sync(subtensor=self.subtensor) + bt.logging.info("Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages") + # Zero out all hotkeys that have been replaced. + self.available_uids = asyncio.run(self.get_available_uids()) + for uid, hotkey in enumerate(self.metagraph.hotkeys): + if (uid not in self.MinerHistory and uid in self.available_uids) or self.hotkeys[uid] != hotkey: + bt.logging.info(f"Replacing hotkey on {uid} with {self.metagraph.hotkeys[uid]}") + self.hotkeys[uid] = hotkey + self.scores[uid] = 0 # hotkey has been replaced + self.MinerHistory[uid] = MinerHistory(uid, timezone=self.timezone) + self.moving_average_scores[uid] = 0 + self.save_state() def query_miners(self): timestamp = get_now().isoformat() @@ -122,20 +116,17 @@ def query_miners(self): ) return responses, timestamp - def node_query(self, module, method, params): - try: - result = self.node.query(module, method, params).value - except Exception: - # reinitilize node - self.node = SubstrateInterface(url=self.subtensor.chain_endpoint) - result = self.node.query(module, method, params).value - return result - async def set_weights(self): - if self.blocks_since_last_update >= self.set_weights_rate: - async with self.lock: - uids = array(self.available_uids) - weights = [self.moving_average_scores[uid] for uid in self.available_uids] + try: + self.blocks_since_last_update = func_with_retry( + self.subtensor.blocks_since_last_update, netuid=self.config.netuid, uid=self.my_uid + ) + self.current_block = func_with_retry(self.subtensor.get_current_block) + except Exception as e: + bt.logging.error(f"Failed to get current block with error {e}, skipping block update") + if self.blocks_since_last_update >= self.hyperparameters.weights_rate_limit: + uids = array(self.available_uids) + weights = [self.moving_average_scores[uid] for uid in self.available_uids] for i, j in zip(weights, self.available_uids): bt.logging.debug(f"UID: {j} | Weight: {i}") if sum(weights) == 0: @@ -152,21 +143,16 @@ async def set_weights(self): uids=uint_uids, weights=uint_weights, wait_for_inclusion=True, - wait_for_finalization=True, version_key=__spec_version__, ) if result: bt.logging.success("✅ Set Weights on chain successfully!") + self.blocks_since_last_update = 0 else: bt.logging.debug( "Failed to set weights this iteration with message:", msg, ) - async with self.lock: - self.current_block = self.subtensor.get_current_block() - self.blocks_since_last_update = ( - self.current_block - self.node_query("SubtensorModule", "LastUpdate", [self.config.netuid])[self.my_uid] - ) async def scheduled_prediction_request(self): if not hasattr(self, "timestamp"): @@ -184,12 +170,11 @@ async def scheduled_prediction_request(self): except Exception as e: bt.logging.error(f"Failed to calculate rewards with error: {e}") # Adjust the scores based on responses from miners and update moving average. - async with self.lock: - for i, value in zip(self.available_uids, rewards): - self.moving_average_scores[i] = (1 - self.config.alpha) * self.moving_average_scores[ - i - ] + self.config.alpha * value - self.scores = list(self.moving_average_scores.values()) + for i, value in zip(self.available_uids, rewards): + self.moving_average_scores[i] = (1 - self.config.alpha) * self.moving_average_scores[ + i + ] + self.config.alpha * value + self.scores = list(self.moving_average_scores.values()) if self.config.wandb_on: log_wandb(responses, rewards, self.available_uids) else: