diff --git a/competitions/data.py b/competitions/data.py index 10a9e433..f2ac7852 100644 --- a/competitions/data.py +++ b/competitions/data.py @@ -14,6 +14,8 @@ class CompetitionId(IntEnum): B14_MODEL = 4 + B14_MODEL_MULTI_DATASET = 5 + # Overwrite the default __repr__, which doesn't work with # bt.logging for some unknown reason. def __repr__(self) -> str: diff --git a/constants/__init__.py b/constants/__init__.py index 052f3c26..ec75ffd2 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -34,10 +34,10 @@ # --------------------------------- # Release -__version__ = "4.5.3" +__version__ = "4.6.0" # Validator schema version -__validator_version__ = "3.4.0" +__validator_version__ = "4.6.0" version_split = __validator_version__.split(".") __spec_version__ = ( (1000 * int(version_split[0])) @@ -64,6 +64,9 @@ # Starting block for activating sample unpacking BLOCK_SAMPLE_PACK = 4_001_017 +# Starting block for 14B* (multi dataset experiment). +BLOCK_14B_STAR = 4_252_646 + # Minimum percent of weight on a vali for a miner to be considered a top miner. # Since there can be multiple competitions at different reward percentages we can't just check biggest. WEIGHT_SYNC_MINER_MIN_PERCENT = 0.05 @@ -97,76 +100,25 @@ CompetitionId.B3_MODEL: pt.dataset.SubsetFalconLoader, CompetitionId.B7_MODEL: pt.dataset.SubsetFineWebEdu2Loader, CompetitionId.B14_MODEL: pt.dataset.SubsetFineWebEdu2Loader, + # B14 model multi dataset adds the following dataset to the baseline b14 competition. + CompetitionId.B14_MODEL_MULTI_DATASET: pt.dataset.SubsetStackV1DedupLoader, } -# Defined model constraints by competition id to ensure they are constant across blocks. -MODEL_CONSTRAINTS_BY_COMPETITION_ID: Dict[CompetitionId, ModelConstraints] = { - CompetitionId.M772_MODEL: ModelConstraints( - max_model_parameter_size=772_000_000, - min_model_parameter_size=572_000_000, - sequence_length=1024, - allowed_architectures=ALLOWED_MODEL_TYPES_1, - tokenizer="distilgpt2", - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=5 * 1024 * 1024 * 1024, - ), - CompetitionId.B7_MODEL: ModelConstraints( - max_model_parameter_size=6_900_000_000, - min_model_parameter_size=6_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B3_MODEL: ModelConstraints( - max_model_parameter_size=3_400_000_000, - min_model_parameter_size=3_200_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B14_MODEL: ModelConstraints( - max_model_parameter_size=13_900_000_000, - min_model_parameter_size=13_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=29 * 1024 * 1024 * 1024, - ), -} +# Synchronize on blocks roughly every 30 minutes. +SYNC_BLOCK_CADENCE = 150 +# Delay at least as long as the sync block cadence with an additional buffer. +EVAL_BLOCK_DELAY = SYNC_BLOCK_CADENCE + 100 # Defined model constraints by competition id with decaying epsilon -MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY: Dict[ - CompetitionId, ModelConstraints -] = { +MODEL_CONSTRAINTS_BY_COMPETITION_ID: Dict[CompetitionId, ModelConstraints] = { CompetitionId.M772_MODEL: ModelConstraints( max_model_parameter_size=772_000_000, min_model_parameter_size=572_000_000, sequence_length=1024, allowed_architectures=ALLOWED_MODEL_TYPES_1, tokenizer="distilgpt2", - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 50400), + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0001, 50400), max_bytes=5 * 1024 * 1024 * 1024, ), CompetitionId.B3_MODEL: ModelConstraints( @@ -179,22 +131,8 @@ "torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2", }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 50400), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B7_MODEL: ModelConstraints( - max_model_parameter_size=6_900_000_000, - min_model_parameter_size=6_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 50400), + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0001, 50400), max_bytes=15 * 1024 * 1024 * 1024, ), CompetitionId.B14_MODEL: ModelConstraints( @@ -207,24 +145,21 @@ "torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2", }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 100800), + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0001, 72000), max_bytes=29 * 1024 * 1024 * 1024, ), } -# Defined model constraints by competition id with decaying epsilon -MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2: Dict[ - CompetitionId, ModelConstraints -] = { +MODEL_CONSTRAINTS_BY_COMPETITION_ID_2: Dict[CompetitionId, ModelConstraints] = { CompetitionId.M772_MODEL: ModelConstraints( max_model_parameter_size=772_000_000, min_model_parameter_size=572_000_000, sequence_length=1024, allowed_architectures=ALLOWED_MODEL_TYPES_1, tokenizer="distilgpt2", - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.0001, 50400), + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0005, 50400), max_bytes=5 * 1024 * 1024 * 1024, ), CompetitionId.B3_MODEL: ModelConstraints( @@ -237,8 +172,8 @@ "torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2", }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.0001, 50400), + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0005, 50400), max_bytes=15 * 1024 * 1024 * 1024, ), CompetitionId.B14_MODEL: ModelConstraints( @@ -251,42 +186,16 @@ "torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2", }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.0001, 72000), + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0005, 50400), max_bytes=29 * 1024 * 1024 * 1024, ), } - # Schedule of competitions by block. COMPETITION_SCHEDULE_BY_BLOCK: List[Tuple[int, List[Competition]]] = [ ( 0, - [ - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - 1.0, - ) - ], - ), - ( - 3_565_190, - [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.M772_MODEL], - 0.35, - ), - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - 0.65, - ), - ], - ), - ( - BLOCK_3B_7BSTAR_UNPACK, [ Competition( CompetitionId.M772_MODEL, @@ -298,96 +207,31 @@ MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B3_MODEL], 0.29, ), - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - 0.57, - ), - ], - ), - ( - 3_750_683, - [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.M772_MODEL - ], - 0.14, - ), - Competition( - CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B3_MODEL - ], - 0.29, - ), - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B7_MODEL - ], - 0.15, - ), Competition( CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B14_MODEL - ], - 0.42, + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + 0.57, ), ], ), ( - 3_849_722, + BLOCK_14B_STAR, [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.M772_MODEL - ], - 0.14, - ), Competition( CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B3_MODEL - ], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_2[CompetitionId.B3_MODEL], 0.29, ), Competition( CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B14_MODEL - ], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_2[CompetitionId.B14_MODEL], 0.57, ), - ], - ), - ( - BLOCK_SAMPLE_PACK, - [ Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2[ - CompetitionId.M772_MODEL - ], + CompetitionId.B14_MODEL_MULTI_DATASET, + MODEL_CONSTRAINTS_BY_COMPETITION_ID_2[CompetitionId.B14_MODEL], 0.14, ), - Competition( - CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2[ - CompetitionId.B3_MODEL - ], - 0.29, - ), - Competition( - CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2[ - CompetitionId.B14_MODEL - ], - 0.57, - ), ], ), ] @@ -418,8 +262,12 @@ sample_pack_block = BLOCK_SAMPLE_PACK # validators number of pages to eval over miners on each step. -pages_per_eval_unpack = 5 # With sample unpacking -pages_per_eval_pack = 11 +pages_per_eval_unpack = 10 # With sample unpacking +pages_per_eval_pack = 22 + +# In a future release we will update the loaders to be able to load a certain number of tokens rather than pages. +# Until then we need to set this manually +pages_per_eval_14bstar_pack = 1 # validator eval batch size. batch_size = 1 @@ -427,9 +275,7 @@ sample_min = 5 # Max number of uids that can be either pending eval or currently being evaluated. # We allow the sample_min per competition + 10 additional models to be held at any one time. -updated_models_limit = ( - sample_min * len(MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2) + 10 -) +updated_models_limit = sample_min * len(MODEL_CONSTRAINTS_BY_COMPETITION_ID) + 10 # time required between updates to the chain. chain_update_cadence = dt.timedelta(minutes=20) # Number of blocks required between retrying evaluation of a model. diff --git a/docs/miner.md b/docs/miner.md index fca251ab..1b918130 100644 --- a/docs/miner.md +++ b/docs/miner.md @@ -125,6 +125,8 @@ You can manually upload with the following command: python scripts/upload_model.py --load_model_dir --competition_id 0 --hf_repo_id my-username/my-project --wallet.name coldkey --wallet.hotkey hotkey ``` +Note: We recommend keeping your hugging face repo private until after you have committed your metadata to the chain. This ensures other miners are unable to upload your model as their own until a later block. Adding the `--update_repo_visibility` flag will also automatically attempt to update the hugging face repo visibility to public after committing to the chain. + Note: If you are not sure about the competition ID, you can add the `--list_competitions` flag to get a list of all competitions. You can also check out competition IDs in [competitions/data.py](https://github.com/macrocosm-os/pretraining/blob/main/competitions/data.py). ## Running a custom Miner diff --git a/docs/validator.md b/docs/validator.md index 755569a5..598925d0 100644 --- a/docs/validator.md +++ b/docs/validator.md @@ -1,8 +1,9 @@ -# Validator +# Validator Validators download the models from hugging face for each miner based on the Bittensor chain metadata and continuously evaluate them, setting weights based on the performance of each model against a dataset for each competition. They also log results to [wandb](https://wandb.ai/macrocosmos/pretraining-validators). You can view the entire validation system by reading the code in `neurons/validator.py`. Pseudocode for the validation system is as follows: + ```python weights = zeros(256) while True: @@ -51,7 +52,7 @@ It is important to note that this affects the game theoretics of the incentive l # System Requirements -Validators will need enough disk space to store the models of miners being evaluated. Each model has a max size by block defined in [constants/__init__.py](https://github.com/macrocosm-os/pretraining/blob/main/constants/__init__.py#L57) and the validator has cleanup logic to remove old models. It is recommended to have at least 2 TB of disk space and 80GB of system memory. +Validators will need enough disk space to store the models of miners being evaluated. Each model has a max size by block defined in [constants/**init**.py](https://github.com/macrocosm-os/pretraining/blob/main/constants/__init__.py#L57) and the validator has cleanup logic to remove old models. It is recommended to have at least 2 TB of disk space and 80GB of system memory. Validators will need enough processing power to evaluate their model. As of Sept 2nd, 2024, an upgrade to the Nvidia A100 GPU with 80GB of VRAM is required. This GPU's high throughput and FLOPs enable the running of 14B models without impacting the speed of the validation cycle. Although only 40GB of VRAM is necessary, we have observed that A100 GPUs with 80GB are more readily available and are offered at a comparable price to the 40GB variants. The additional VRAM provided by this GPU will allows more flexibility for optimization in future releases, enabling larger validation batch sizes to enhance the stability of the validation process by reducing scoring variance. @@ -68,12 +69,14 @@ git clone https://github.com/macrocosm-os/pretraining.git 2. Setup your python [virtual environment](https://docs.python.org/3/library/venv.html) or [Conda environment](https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html#creating-an-environment-with-commands). 3. Install the requirements. From your virtual environment, run + ```shell cd pretraining python -m pip install -e . ``` Note: flash-attn may not have their dependencies set up correctly. If you run into issues try installing those requirements separately first: + ```shell pip install packaging pip install wheel @@ -84,39 +87,75 @@ pip install torch 5. (Optional) Run a Subtensor instance: -Your node will run better if you are connecting to a local Bittensor chain entrypoint node rather than using Opentensor's. -We recommend running a local node as follows and passing the ```--subtensor.network local``` flag to your running miners/validators. +Your node will run better if you are connecting to a local Bittensor chain entrypoint node rather than using Opentensor's. +We recommend running a local node as follows and passing the ```--subtensor.network local``` flag to your running miners/validators. To install and run a local subtensor node follow the commands below with Docker and Docker-Compose previously installed. + ```bash git clone https://github.com/opentensor/subtensor.git cd subtensor docker compose up --detach ``` ---- -# Running the Validator +## Obtaining your Hugging Face token + +The dataset for code, `The Stack V1-dedup`, requires a **Hugging Face access token**. Follow these steps to obtain and configure one: + +### Step 1: Get Your Hugging Face Access Token + +1. Go to the [Hugging Face website](https://huggingface.co/). +2. If you don’t already have an account, create one. Otherwise, log in. +3. Go to the [dataset's website](https://huggingface.co/datasets/bigcode/the-stack-dedup) and agree to their terms of use. You should immediately gain access to their dataset. +4. Click on your profile icon in the top-right corner, and select **Settings**. +5. In the settings menu, locate and click on **Access Tokens**. +6. Under the Access Tokens section, click **New token** and generate a token with write permissions. +7. Copy the generated token. + +### Step 2: Create a `.env` File in the `pretraining` Directory + +1. Navigate to your `pretraining` directory where you want to save the environment file. +2. Create a new file named `.env` in this directory (if it doesn’t already exist). You can do this from the command line using: + + ```bash + touch .env + ``` + +3. Open the `.env` file with your preferred text editor and add the following line, replacing `YOUR_HF_TOKEN_HERE` with your actual Hugging Face token: -## With auto-updates + ```bash + HF_TOKEN=YOUR_HF_TOKEN_HERE + ``` + +4. Save and close the file. + +This `.env` file now securely holds your Hugging Face token, allowing scripts in the `pretraining` directory to load it automatically if they’re set up to read environment variables. + +## Running the Validator + +### With auto-updates We highly recommend running the validator with auto-updates. This will help ensure your validator is always running the latest release, helping to maintain a high vtrust. Prerequisites: + 1. To run with auto-update, you will need to have [pm2](https://pm2.keymetrics.io/) installed. 2. Make sure your virtual environment is activated. This is important because the auto-updater will automatically update the package dependencies with pip. 3. Make sure you're using the main branch: `git checkout main`. From the pretraining folder: + ```shell pm2 start --name net9-vali-updater --interpreter python scripts/start_validator.py -- --pm2_name net9-vali --wallet.name coldkey --wallet.hotkey hotkey [other vali flags] ``` This will start a process called `net9-vali-updater`. This process periodically checks for a new git commit on the current branch. When one is found, it performs a `pip install` for the latest packages, and restarts the validator process (who's name is given by the `--pm2_name` flag) -## Without auto-updates +### Without auto-updates If you'd prefer to manage your own validator updates... From the pretraining folder: + ```shell pm2 start python -- ./neurons/validator.py --wallet.name coldkey --wallet.hotkey hotkey ``` @@ -128,6 +167,7 @@ pm2 start python -- ./neurons/validator.py --wallet.name coldkey --wallet.hotkey The Validator offers some flags to customize properties, such as the device to evaluate on and the number of models to evaluate each step. You can view the full set of flags by running + ```shell python ./neurons/validator.py -h ``` @@ -135,6 +175,7 @@ python ./neurons/validator.py -h ## Test Running Validation Test running validation: + ```shell python neurons/validator.py --wallet.name YOUR_WALLET_NAME @@ -143,4 +184,5 @@ python neurons/validator.py --wandb.off --offline ``` ---- \ No newline at end of file + +--- diff --git a/neurons/config.py b/neurons/config.py index eeb58d1f..b1c4f03d 100644 --- a/neurons/config.py +++ b/neurons/config.py @@ -32,12 +32,6 @@ def validator_config(): default=50, help="Number of blocks to wait before setting weights.", ) - parser.add_argument( - "--pages_per_eval", - type=int, - default=None, - help="Number of pages used to eval each step. If not specified, it will be automatically set.", - ) parser.add_argument( "--sample_min", type=int, diff --git a/neurons/validator.py b/neurons/validator.py index 4681e477..48f935db 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -40,7 +40,7 @@ import torch import wandb -from huggingface_hub.utils import RepositoryNotFoundError +from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError from rich.console import Console from rich.table import Table from taoverse.metagraph import utils as metagraph_utils @@ -86,12 +86,27 @@ class PerUIDEvalState: # The hugging face repo name. repo_name: str = "Unknown" - # The losses per batch. - losses: typing.List[float] = dataclasses.field(default=None) + # The losses per batch per dataset. + losses: typing.Dict[str, typing.List[float]] = dataclasses.field( + default_factory=lambda: defaultdict(list) + ) def avg_loss(self) -> float: - """Safely computes the average loss from a list of losses.""" - return sum(self.losses) / len(self.losses) if self.losses else math.inf + """Safely computes the average loss across all dataset loss lists.""" + all_losses = [loss for loss_list in self.losses.values() for loss in loss_list] + return sum(all_losses) / len(all_losses) if all_losses else math.inf + + def avg_dataset_loss(self, dataset_name: str) -> float: + """Safely computes the average loss from a list of losses for a specific dataset.""" + return ( + sum(self.losses[dataset_name]) / len(self.losses[dataset_name]) + if self.losses[dataset_name] + else math.inf + ) + + def avg_loss_per_dataset(self) -> typing.Dict[str, float]: + """Safely computes the average loss per dataset.""" + return {k: (sum(v) / len(v) if v else math.inf) for k, v in self.losses.items()} class Validator: @@ -160,7 +175,6 @@ def __init__(self): self.weights = torch.zeros_like(torch.tensor(self.metagraph.S)) self.global_step = 0 self.last_epoch = self.metagraph.block.item() - self.last_wandb_step = 0 self.uids_to_eval: typing.Dict[CompetitionId, typing.Set] = defaultdict(set) @@ -437,7 +451,9 @@ def update_models(self): ) if competition is not None and not is_queued_for_eval: eval_history = ( - self.model_tracker.get_eval_results_for_miner_hotkey(hotkey) + self.model_tracker.get_eval_results_for_miner_hotkey( + hotkey, competition.id + ) ) force_sync = should_retry_model( competition.constraints.epsilon_func, @@ -449,6 +465,32 @@ def update_models(self): f"Force downloading model for UID {next_uid} because it should be retried. Eval_history={eval_history}" ) + # Special case for 14B*. We want to retry if the model is competitive in either 14B or 14B*. + if competition.id == CompetitionId.B14_MODEL: + # Check that 14B* is currently running. + competition_14b = ( + competition_utils.get_competition_for_block( + CompetitionId.B14_MODEL_MULTI_DATASET, + curr_block, + constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + ) + if competition_14b is not None: + eval_history_14b_star = self.model_tracker.get_eval_results_for_miner_hotkey( + hotkey, CompetitionId.B14_MODEL_MULTI_DATASET + ) + force_sync_14b_star = should_retry_model( + competition_14b.constraints.epsilon_func, + curr_block, + eval_history_14b_star, + ) + if force_sync_14b_star: + # Even if it is already logging for 14B, also log for 14B*. + bt.logging.debug( + f"Force downloading model for UID {next_uid} because it should be retried for 14B*. Eval_history={eval_history_14b_star}" + ) + force_sync = True + # Compare metadata and tracker, syncing new model from remote store to local if necessary. try: updated = asyncio.run( @@ -462,6 +504,7 @@ def update_models(self): except MinerMisconfiguredError as e: self.model_tracker.on_model_evaluated( hotkey, + 0, EvalResult( block=curr_block, score=math.inf, @@ -489,7 +532,7 @@ def update_models(self): f"Failed to find metadata for uid {next_uid} with hotkey {hotkey}" ) - except RepositoryNotFoundError as e: + except (RepositoryNotFoundError, RevisionNotFoundError) as e: bt.logging.trace(e) except MinerMisconfiguredError as e: bt.logging.trace(e) @@ -544,8 +587,7 @@ def _queue_top_models_for_eval(self) -> None: for uid in uids_to_add: # Check when we last evaluated this model. hotkey = metagraph.hotkeys[uid] - eval_history = self.model_tracker.get_eval_results_for_miner_hotkey(hotkey) - last_eval_block = eval_history[-1].block if eval_history else 0 + last_eval_block = self.model_tracker.get_block_last_evaluated(hotkey) or 0 curr_block = self._get_current_block() if curr_block - last_eval_block >= constants.model_retry_cadence: try: @@ -563,6 +605,7 @@ def _queue_top_models_for_eval(self) -> None: except MinerMisconfiguredError as e: self.model_tracker.on_model_evaluated( hotkey, + 0, EvalResult( block=curr_block, score=math.inf, @@ -762,8 +805,28 @@ async def run_step(self): # Every validator step should pick a single competition in a round-robin fashion competition = competition_schedule[self.global_step % len(competition_schedule)] + # If the competition is 14b* we skip it since we run it concurrently with 14b instead. + if competition.id == CompetitionId.B14_MODEL_MULTI_DATASET: + bt.logging.info( + "Skipping step for B14* competition. It will be evaluated as part of the regular B14 competition." + ) + return + bt.logging.info("Starting evaluation for competition: " + str(competition.id)) + running_14b_star = competition.id == CompetitionId.B14_MODEL and any( + comp.id == CompetitionId.B14_MODEL_MULTI_DATASET + for comp in competition_schedule + ) + + if running_14b_star: + competition_14b_star = competition_utils.get_competition_for_block( + comp_id=CompetitionId.B14_MODEL_MULTI_DATASET, + block=cur_block, + schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + bt.logging.info("Additionally running competition 14B* in parallel to 14B.") + # Add uids with newly updated models to the upcoming batch of evaluations. with self.pending_uids_to_eval_lock: self.uids_to_eval[competition.id].update( @@ -795,27 +858,37 @@ async def run_step(self): SubsetDataLoader = constants.DATASET_BY_COMPETITION_ID[competition.id] bt.logging.trace(f"Dataset in use: {SubsetDataLoader.name}.") + if running_14b_star: + # This will be set to a copy of 14b later with the additional eval losses appended. + uid_to_state_14b_star = defaultdict(PerUIDEvalState) + + # Also get the dataloader for 14b_star. + SubsetDataLoader_14b_star = constants.DATASET_BY_COMPETITION_ID[ + CompetitionId.B14_MODEL_MULTI_DATASET + ] + bt.logging.trace( + f"Additionally using 14b* dataset: {SubsetDataLoader_14b_star.name}." + ) + # Get the tokenizer tokenizer = pt.model.load_tokenizer( competition.constraints, cache_dir=self.config.model_dir ) - if cur_block >= constants.sample_pack_block: - pack_samples = True - pages_per_eval = constants.pages_per_eval_pack - else: - pack_samples = False - pages_per_eval = constants.pages_per_eval_unpack - - # If the option is set in the config, override - pages_per_eval = ( - self.config.pages_per_eval - if self.config.pages_per_eval is not None - else pages_per_eval - ) + # Use sample packing. + pack_samples = True + pages_per_eval = constants.pages_per_eval_pack + + # Try to synchronize the data used by validators. + try: + seed = metagraph_utils.get_hash_of_sync_block( + self.subtensor, constants.SYNC_BLOCK_CADENCE + ) + except: + seed = None - bt.logging.debug(f"Sample packing is set to: {pack_samples}.") - bt.logging.debug(f"Number of pages per evaluation step is: {pages_per_eval}") + bt.logging.debug(f"Number of pages per evaluation step is: {pages_per_eval}.") + bt.logging.debug(f"Seed used for loading data is: {seed}.") dataloader = SubsetDataLoader( batch_size=constants.batch_size, @@ -823,6 +896,7 @@ async def run_step(self): num_pages=pages_per_eval, tokenizer=tokenizer, pack_samples=pack_samples, + random_seed=seed, ) batches = list(dataloader) @@ -832,19 +906,46 @@ async def run_step(self): # This is useful for logging to wandb pages = dataloader.get_page_names() + bt.logging.debug(f"Competition {competition.id} | Computing losses on {uids}") + bt.logging.debug(f"Pages used are {pages}") + + if running_14b_star: + dataloader_14b_star = SubsetDataLoader_14b_star( + batch_size=constants.batch_size, + sequence_length=competition_14b_star.constraints.sequence_length, + num_pages=constants.pages_per_eval_14bstar_pack, + tokenizer=tokenizer, + pack_samples=pack_samples, + random_seed=seed, + ) + + batches_14b_star = list(dataloader_14b_star) + bt.logging.debug( + f"Number of 14b* validation batches is {len(batches_14b_star)}" + ) + bt.logging.debug(f"14b* Batch size is {len(batches_14b_star[0])}") + + #This is useful for logging to wandb + pages_14b_star = dataloader_14b_star.get_page_names() + + bt.logging.debug(f"14b* Pages used are {pages_14b_star}") + + compute_loss_perf_14b_star = PerfMonitor("Eval: Compute loss 14b star") + # Prepare evaluation. kwargs = competition.constraints.kwargs.copy() kwargs["use_cache"] = True - bt.logging.debug(f"Competition {competition.id} | Computing losses on {uids}") - bt.logging.debug(f"Pages used are {pages}") - load_model_perf = PerfMonitor("Eval: Load model") compute_loss_perf = PerfMonitor("Eval: Compute loss") for uid_i in uids: # This variable should be overwritten below if the model has metadata. losses: typing.List[float] = [math.inf for _ in range(len(batches))] + if running_14b_star: + losses_14b_star: typing.List[float] = [ + math.inf for _ in range(len(batches_14b_star)) + ] bt.logging.trace(f"Getting metadata for uid: {uid_i}.") @@ -894,7 +995,25 @@ async def run_step(self): ttl=430, mode="spawn", ) - + if running_14b_star: + with compute_loss_perf_14b_star.sample(): + try: + losses_14b_star = utils.run_in_subprocess( + functools.partial( + pt.validation.compute_losses, + model_i.pt_model, + batches_14b_star, + self.config.device, + tokenizer.eos_token_id, + pack_samples, + ), + ttl=430, + mode="spawn", + ) + except Exception as e: + bt.logging.error( + f"Error in eval loop: {e}. Setting 14b* losses for uid: {uid_i} to infinity." + ) del model_i except Exception as e: @@ -906,60 +1025,38 @@ async def run_step(self): f"Unable to load the model for {uid_i} or it belongs to another competition. Setting loss to inifinity for this competition." ) - uid_to_state[uid_i].losses = losses - average_model_loss = sum(losses) / len(losses) + uid_to_state[uid_i].losses[SubsetDataLoader.name] = losses bt.logging.trace( - f"Computed model losses for uid:{uid_i} with average loss: {average_model_loss}" + f"Computed model losses for uid:{uid_i} with average loss: {uid_to_state[uid_i].avg_loss()}" ) - # Compute wins and win rates per uid. - # Take the average loss across all batches for comparison of best model. - uid_to_average_loss = { - uid: state.avg_loss() for uid, state in uid_to_state.items() - } - uid_to_block = {uid: state.block for uid, state in uid_to_state.items()} - - # Filter to the list of uids that may at one point be a top model. - competitive_uids = pt.validation.compute_competitive_uids( - uid_to_average_loss, uid_to_block, competition.constraints.epsilon_func - ) - - # Log which models got dropped for the second pass. - dropped_uids = [uid for uid in uids if uid not in competitive_uids] - if dropped_uids: - bt.logging.info( - f"The following uids were not included in the win rate calculation because they did not beat the fully decayed loss of any previously submitted model in this eval batch: {dropped_uids}." - ) + if running_14b_star: + # Make a deep copy of the current uid_to_state and append the additional losses. + uid_to_state_14b_star[uid_i] = copy.deepcopy(uid_to_state[uid_i]) + uid_to_state_14b_star[uid_i].losses[ + SubsetDataLoader_14b_star.name + ] = losses_14b_star + bt.logging.trace( + f"Computed 14b* model losses for uid:{uid_i} with average loss: {uid_to_state_14b_star[uid_i].avg_loss()}. Details: {uid_to_state_14b_star[uid_i].avg_loss_per_dataset()}" + ) # Calculate new wins and win_rate with only the competitive uids considered. - wins, win_rate = pt.validation.compute_wins( - competitive_uids, - uid_to_average_loss, - uid_to_block, - competition.constraints.epsilon_func, - cur_block, + wins, win_rate = self._compute_and_set_competition_weights( + cur_block=cur_block, + uids=uids, + uid_to_state=uid_to_state, + competition=competition, ) - top_uid = max(win_rate, key=win_rate.get) - self._record_eval_results(top_uid, cur_block, uid_to_state) - - # Compute softmaxed weights based on win rate. - model_weights = torch.tensor( - [win_rate.get(uid, 0) for uid in uids], dtype=torch.float32 - ) - step_weights = torch.softmax(model_weights / constants.temperature, dim=0) - - # Fill in metagraph sized tensor with the step weights of the evaluated models. - with self.metagraph_lock: - competition_weights = torch.zeros_like(self.metagraph.S) - - for i, uid_i in enumerate(uids): - competition_weights[uid_i] = step_weights[i] - - # Record weights for the current competition. - self.competition_tracker.record_competition_weights( - competition.id, competition_weights - ) + if running_14b_star: + wins_14b_star, win_rate_14b_star = ( + self._compute_and_set_competition_weights( + cur_block=cur_block, + uids=uids, + uid_to_state=uid_to_state_14b_star, + competition=competition_14b_star, + ) + ) # Get ids for all competitions in the schedule. active_competition_ids = set([comp.id for comp in competition_schedule]) @@ -975,6 +1072,12 @@ async def run_step(self): tracker_competition_weights = self.competition_tracker.get_competition_weights( competition.id ) + if running_14b_star: + # Get the weights as if only the 14b and 14b* competition existed for purposes of model prioritization. + tracker_competition_weights = self.competition_tracker.get_subnet_weights( + [competition, competition_14b_star] + ) + model_prioritization = { uid: ( # Add 1 to ensure it is always greater than a win rate. @@ -990,6 +1093,12 @@ async def run_step(self): : self.config.sample_min ] ) + + # Note when breaking ties of 0 weight models we use the primary dataset in all cases. + uid_to_average_loss = { + uid: state.avg_loss() for uid, state in uid_to_state.items() + } + # Make sure we always keep around sample_min number of models to maintain previous behavior. if len(models_to_keep) < self.config.sample_min: for uid in sorted(uid_to_average_loss, key=uid_to_average_loss.get): @@ -1017,15 +1126,104 @@ async def run_step(self): uid_to_state, self._get_uids_to_competition_ids(), pages, - model_weights, wins, win_rate, load_model_perf, compute_loss_perf, ) - # Increment the number of completed run steps by 1 - self.run_step_count += 1 + if running_14b_star: + bt.logging.debug(compute_loss_perf_14b_star.summary_str()) + + uids_to_competition_ids_14b_star = { + k: ( + CompetitionId.B14_MODEL_MULTI_DATASET.value + if v == CompetitionId.B14_MODEL + else v + ) + for k, v in self._get_uids_to_competition_ids().items() + } + + self.log_step( + CompetitionId.B14_MODEL_MULTI_DATASET, + competition.constraints.epsilon_func, + cur_block, + uids, + uid_to_state_14b_star, + uids_to_competition_ids_14b_star, + pages_14b_star, + wins_14b_star, + win_rate_14b_star, + load_model_perf, + compute_loss_perf_14b_star, + ) + + def _compute_and_set_competition_weights( + self, + cur_block: int, + uids: typing.List[int], + uid_to_state: typing.Dict[int, PerUIDEvalState], + competition: Competition, + ) -> typing.Tuple[typing.Dict[int, int], typing.Dict[int, float]]: + """Computes competition weights including checks for competitiveness and records them internally. + + Args: + cur_block (int): The current block. + uids (typing.List[int]): All uids being considered during the current evaluation. + uid_to_state (typing.Dict[int, PerUIDEvalState]): Evaluation information for each uid. + competition (Competition): The current competition being evaluated. + + Returns: + tuple: A tuple containing two dictionaries, one for wins and one for win rates. + """ + uid_to_average_loss = { + uid: state.avg_loss() for uid, state in uid_to_state.items() + } + uid_to_block = {uid: state.block for uid, state in uid_to_state.items()} + + # Filter to the list of uids that may at one point be a top model. + competitive_uids = pt.validation.compute_competitive_uids( + uid_to_average_loss, uid_to_block, competition.constraints.epsilon_func + ) + + # Log which models got dropped for the second pass. + dropped_uids = [uid for uid in uids if uid not in competitive_uids] + if dropped_uids: + bt.logging.info( + f"The following uids were not included in the win rate calculation because they did not beat the fully decayed loss of any previously submitted model in this eval batch: {dropped_uids}." + ) + + # Calculate new wins and win_rate with only the competitive uids considered. + wins, win_rate = pt.validation.compute_wins( + competitive_uids, + uid_to_average_loss, + uid_to_block, + competition.constraints.epsilon_func, + cur_block, + ) + + top_uid = max(win_rate, key=win_rate.get) + self._record_eval_results(top_uid, cur_block, uid_to_state, competition.id) + + # Compute softmaxed weights based on win rate. + model_weights = torch.tensor( + [win_rate.get(uid, 0) for uid in uids], dtype=torch.float32 + ) + step_weights = torch.softmax(model_weights / constants.temperature, dim=0) + + # Fill in metagraph sized tensor with the step weights of the evaluated models. + with self.metagraph_lock: + competition_weights = torch.zeros_like(self.metagraph.S) + + for i, uid_i in enumerate(uids): + competition_weights[uid_i] = step_weights[i] + + # Record weights for the current competition. + self.competition_tracker.record_competition_weights( + competition.id, competition_weights + ) + + return wins, win_rate def _update_uids_to_eval( self, @@ -1061,6 +1259,7 @@ def _record_eval_results( top_uid: int, curr_block: int, uid_to_state: typing.Dict[int, PerUIDEvalState], + competition_id: CompetitionId, ) -> None: """Records the results of the evaluation step to the model tracker. @@ -1073,6 +1272,7 @@ def _record_eval_results( for _, state in uid_to_state.items(): self.model_tracker.on_model_evaluated( state.hotkey, + competition_id, EvalResult( block=curr_block, score=state.avg_loss(), @@ -1090,7 +1290,6 @@ def log_step( uid_to_state: typing.Dict[int, PerUIDEvalState], uid_to_competition_id: typing.Dict[int, typing.Optional[int]], pages: typing.List[str], - model_weights: typing.List[float], wins: typing.Dict[int, int], win_rate: typing.Dict[int, float], load_model_perf: PerfMonitor, @@ -1107,12 +1306,12 @@ def log_step( } # The sub-competition weights - sub_competition_weights = torch.softmax( - model_weights / constants.temperature, dim=0 + sub_competition_weights = self.competition_tracker.get_competition_weights( + competition_id ) # All uids in the competition step log are from the same competition. - for idx, uid in enumerate(uids): + for uid in uids: step_log["uid_data"][str(uid)] = { "uid": uid, "block": uid_to_state[uid].block, @@ -1126,8 +1325,17 @@ def log_step( "win_rate": win_rate[uid] if uid in win_rate else 0, "win_total": wins[uid] if uid in wins else 0, "weight": self.weights[uid].item(), - "norm_weight": sub_competition_weights[idx].item(), + "norm_weight": sub_competition_weights[uid].item(), + "dataset_perf": {}, } + + # Log performance per dataset + for dataset_name, avg_loss in ( + uid_to_state[uid].avg_loss_per_dataset().items() + ): + step_log["uid_data"][str(uid)]["dataset_perf"][ + f"{dataset_name}" + ] = {'average_loss': avg_loss} table = Table(title="Step", expand=True) table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("hf", style="magenta", overflow="fold") @@ -1139,7 +1347,7 @@ def log_step( table.add_column("comp_weight", style="magenta", overflow="fold") table.add_column("block", style="magenta", overflow="fold") table.add_column("comp", style="magenta", overflow="fold") - for idx, uid in enumerate(uids): + for uid in uids: try: table.add_row( str(uid), @@ -1149,7 +1357,7 @@ def log_step( str(round(step_log["uid_data"][str(uid)]["win_rate"], 4)), str(step_log["uid_data"][str(uid)]["win_total"]), str(round(self.weights[uid].item(), 4)), - str(round(sub_competition_weights[idx].item(), 4)), + str(round(sub_competition_weights[uid].item(), 4)), str(step_log["uid_data"][str(uid)]["block"]), str(step_log["uid_data"][str(uid)]["competition_id"]), ) @@ -1211,8 +1419,7 @@ def log_step( }, "weight_data": {str(uid): self.weights[uid].item() for uid in uids}, "competition_weight_data": { - str(uid): sub_competition_weights[i].item() - for i, uid in enumerate(uids) + str(uid): sub_competition_weights[uid].item() for uid in uids }, "competition_id": {str(uid): int(competition_id)}, "load_model_perf": { @@ -1230,11 +1437,11 @@ def log_step( } bt.logging.trace("Logging to Wandb") self.wandb_run.log( - {**graphed_data, "original_format_json": original_format_json}, - step=self.last_wandb_step, + {**graphed_data, "original_format_json": original_format_json} ) - self.last_wandb_step += 1 + # Increment the number of completed run steps by 1 + self.run_step_count += 1 def _get_uids_to_competition_ids( self, @@ -1260,7 +1467,7 @@ async def run(self): while True: try: # First run a step. - await self.try_run_step(ttl=60 * 60) + await self.try_run_step(ttl=120 * 60) self.global_step += 1 block = self._get_current_block() diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 86cddc94..14bd80c8 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -1,20 +1,3 @@ -# The MIT License (MIT) -# Copyright © 2023 Yuma Rao -# Copyright © 2023 const - -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the “Software”), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of -# the Software. - -# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO -# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. import typing import random import time @@ -26,322 +9,249 @@ from datasets import load_dataset from pprint import pprint +import os +from dotenv import load_dotenv +load_dotenv() class SubsetLoader(IterableDataset): - """ - Base class for data-specific subset loader classes. - - # TODO: Make this class abstract - """ + """Base class for data-specific subset loader classes.""" + + name: str = None # Dataset name + rows_base_url: str = "https://datasets-server.huggingface.co/rows" + size_base_url: str = "https://datasets-server.huggingface.co/size" + max_pages: int = None + def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, + self, + batch_size=None, + sequence_length=None, + num_pages=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, + random_seed: typing.Optional[int] = None, + config: str = "default", + split: str = "train", + requires_auth: bool = False, ): self.batch_size = batch_size self.sequence_length = sequence_length self.num_pages = num_pages self.tokenizer = tokenizer self.pack_samples = pack_samples + self.config = config + self.split = split + self.requires_auth = requires_auth - self.num_rows_per_page = 100 + # Initialize with seed if provided + if random_seed is not None: + random.seed(random_seed) - # Buffer to hold pages loaded from the api - self.buffer = [] + self.num_rows_per_page = 50 + self.duplicate_page_threshold = 100 + self.retry_limit = 10 + self.retry_delay = 5 - # Buffer to hold pages already loaded into a batch + # Buffers + self.buffer = [] self.used_buffer = [] - - # Buffer to hold padded pages self.padded_buffer = [] - def fetch_data_for_pages(self, pages): - """ - Set the pages to be used to fill the buffer. Then fetch the page data - to the buffer. - """ + # Get HF token if needed + self.hf_token = None + if self.requires_auth: + self.hf_token = os.getenv("HF_TOKEN") + if not self.hf_token: + raise ValueError("HF_TOKEN environment variable not found") - self.pages = pages + # Initialize request params + self.params = self._get_default_params() - # Empty the buffer if it is not. - self.buffer = [] + # Fetch pages if specified + if self.num_pages: + self._initialize_pages() + + def _get_default_params(self): + """Get default request parameters. Override if needed.""" + return { + "dataset": self.name, + "config": self.config, + "split": self.split, + } + def _get_request_headers(self): + """Get request headers. Override if needed.""" + headers = {} + if self.requires_auth: + headers["Authorization"] = f"Bearer {self.hf_token}" + return headers + + def _initialize_pages(self): + """Initialize pages based on loader type""" + if hasattr(self, 'fetch_dataset_configs'): + # For FineWebEdu2 style loaders + self.configs_data = self.fetch_dataset_configs() + self._fetch_data_to_buffer(self.num_pages) + else: + # For simple page-based loaders + pages = self._sample_pages() + self.fetch_data_for_pages(pages) + + def fetch_data_for_pages(self, pages): + """Set the pages and fetch their data to the buffer.""" + self.pages = pages + self.buffer = [] for page in self.pages: self._fetch_data_for_page(page) + def _fetch_data_for_page(self, page): + """Fetch data for a single page""" + # Handle different page types (tuple vs int) + if isinstance(page, tuple): + config_name, page_num, split = page + self.params.update({ + "config": config_name, + "split": split, + "offset": page_num, + }) + else: + self.params["offset"] = page + + self.params["length"] = self.num_rows_per_page + + attempt = 0 + while attempt < self.retry_limit: + try: + response = requests.get( + self.rows_base_url, + params=self.params, + headers=self._get_request_headers() + ) + response.raise_for_status() - def _get_pad_size(self, input_ids): - """ - Get the number of tokens to be padded to the sample to match - the max allowed sequence length. - If sample packing is activated, then return 1 - """ + for row in response.json()["rows"]: + content = self._get_content_from_row(row) + input_ids = self.tokenizer(content, truncation=True)["input_ids"] + self.buffer += input_ids + self.buffer += [self.tokenizer.eos_token_id] + + break + + except requests.exceptions.RequestException as e: + attempt += 1 + bt.logging.warning( + f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + ) + if attempt < self.retry_limit: + time.sleep(self.retry_delay) + else: + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") + raise + + def _get_content_from_row(self, row): + """Extract content from row based on dataset format. Override if needed.""" + return row["row"].get("text", row["row"].get("content")) + def _sample_pages(self): + """Sample random pages. Override for custom sampling logic.""" + return [random.randint(1, self.max_pages) for _ in range(self.num_pages)] + + def get_page_names(self): + """Get page names in consistent format""" + if not hasattr(self, 'pages'): + return [] + + if isinstance(self.pages[0], tuple): + return [f"{cfg_name}_{num_rows}_{split}" + for cfg_name, num_rows, split in self.pages] + return self.pages + + def _get_pad_size(self, input_ids): + """Get padding size for input tokens.""" if self.pack_samples: return 1 sample_size = len(input_ids) - - remainder = (sample_size % self.sequence_length) - pad_size = (self.sequence_length - remainder) - - # Apply modulo again to guarantee a pad size of 0 if remainder is 0 + remainder = sample_size % self.sequence_length + pad_size = self.sequence_length - remainder pad_size = pad_size % self.sequence_length - return pad_size def _refill_padded_buffer(self): - """ - This methods pulls one page from `self.buffer`, pads it and pushs - it to the `self.padded_buffer`. - """ - - while ( - self.buffer - and len(self.padded_buffer) < self.sequence_length - ): - + """Refill the padded buffer from the main buffer.""" + while self.buffer and len(self.padded_buffer) < self.sequence_length: input_ids = [] - - # search for EOS token index and cut the buffer at it. EOS_index = self.buffer.index(self.tokenizer.eos_token_id) - input_ids = self.buffer[:EOS_index+1] - self.buffer =self.buffer[EOS_index+1:] - + input_ids = self.buffer[: EOS_index + 1] + self.buffer = self.buffer[EOS_index + 1 :] self.used_buffer += input_ids - - # Add to padded buffer without the EOS token. self.padded_buffer += input_ids[:-1] - - # Pad - self.padded_buffer += [self.tokenizer.eos_token_id] * self._get_pad_size(input_ids=input_ids[:-1]) + self.padded_buffer += [self.tokenizer.eos_token_id] * self._get_pad_size( + input_ids=input_ids[:-1] + ) def __iter__(self): self.buffer = self.used_buffer + self.buffer self.padded_buffer = [] - - # Pad and prepare one page for batching self._refill_padded_buffer() - return self def __next__(self): batch = [] - while len(self.padded_buffer) >= self.sequence_length: batch.append(self.padded_buffer[: self.sequence_length]) self.padded_buffer = self.padded_buffer[self.sequence_length :] self._refill_padded_buffer() - if len(batch) == self.batch_size: return np.stack(batch) - raise StopIteration -class SubsetFineWebEdu2Loader(SubsetLoader): - - name: str = "HuggingFaceFW/fineweb-edu-score-2" - rows_base_url: str = "https://datasets-server.huggingface.co/rows" - size_base_url: str = "https://datasets-server.huggingface.co/size" - - retry_limit: int = 10 # Number of retries - retry_delay: int = 5 # Seconds to wait between retries - - def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, - ): - super().__init__(batch_size, - sequence_length, - num_pages, - tokenizer, - pack_samples) - - # Get the dataset configs and their row sizes - self.configs_data = self.fetch_dataset_configs() - - # We first need to fetch the data and fill the loader buffer. - # Since some sample files are broken, we first try to find `num_pages` - # responsive samples, then we add them to the found pages `self.pages` - if self.num_pages: - self._fetch_data_to_buffer(self.num_pages) - +class SubsetPes2oXLoader(SubsetLoader): + max_pages: int = 8242000 + name: str = "laion/Pes2oX-fulltext" - def _fetch_data_to_buffer(self, num_pages): - """ - Randomly sample pages and add their data to the buffer. - If a page is inaccessible, another one is sampled. - this method sets the `pages` property - """ + def __init__(self, **kwargs): + super().__init__(config="pes2ov2", **kwargs) - self.pages = [] - attempts = 0 - - while len(self.pages) < num_pages: - # randomly sample one page - config_name, page, split = self.get_random_pages(num_pages = 1)[0] +class SubsetStackV1DedupLoader(SubsetLoader): + max_pages: int = 236655813 + name: str = "bigcode/the-stack-dedup" - # Create the request parameters - params = dict(dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page - ) - - try: - response = requests.get(self.rows_base_url, params=params) + def __init__(self, **kwargs): + super().__init__(requires_auth=True, **kwargs) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - # Add the page since the request was successful - self.pages.append((config_name, page, split)) - - for row in response.json()["rows"]: - content = row["row"]["text"] - - # get the tokenized and encoded sample - input_ids = self.tokenizer(content, truncation=True)["input_ids"] - self.buffer += input_ids - self.buffer += [self.tokenizer.eos_token_id] - - response.close() - - except requests.exceptions.RequestException as e: - - response.close() - attempts += 1 - bt.logging.warning( - f"Failed to fetch data, retrying with a newly sampled page. Attempt {attempts}/{self.retry_limit * num_pages}" - ) - if attempts < num_pages * self.retry_limit: - pass - - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - def fetch_data_to_rows(self, num_pages): - - rows = [] - attempts = 0 - num_downloaded_pages = 0 - - while num_downloaded_pages < num_pages: - - # randomly sample one page - config_name, page, split = self.get_random_pages(num_pages = 1)[0] - - # Create the request parameters - params = dict(dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page - ) - - try: - response = requests.get(self.rows_base_url, params=params) - - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - - num_downloaded_pages += 1 - - for row in response.json()["rows"]: - rows.append(row["row"]["text"]) - - except requests.exceptions.RequestException as e: - attempts += 1 - bt.logging.warning( - f"Failed to fetch data, retrying with a newly sampled page. Attempt {attempts}/{self.retry_limit * num_pages}" - ) - if attempts < num_pages * self.retry_limit: - pass - - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - - return rows - - def get_random_pages(self, num_pages): - """ - Randomly sample one page. - A page is a row number of a given split of a given dataset dump. - """ - pages = [] - - for _ in range(num_pages): - - # Choose a random config - config_name = random.choice(list(self.configs_data.keys())) - - # Choose a random page (row) - page = random.randint(0, - self.configs_data[config_name]['num_rows'] - 1 - self.num_rows_per_page) - - split = self.configs_data[config_name]['split'] - - pages.append((config_name, page, split)) - - return pages - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple - """ - - page_names = [] - - if hasattr(self, 'pages'): - page_names = [f'{cfg_name}_{num_rows}_{split}' for - cfg_name, num_rows, split in self.pages] +class SubsetFalconLoader(SubsetLoader): + max_pages: int = 968000015 + name: str = "tiiuae/falcon-refinedweb" - return page_names +class SubsetFineWebEdu2Loader(SubsetLoader): + name: str = "HuggingFaceFW/fineweb-edu-score-2" + def fetch_dataset_configs(self) -> typing.Dict[str, typing.Dict]: """ - Fetch the different dump names, aka configs, aka samples, of the - dataset. - The returned value is a dictionary with dump names as keys and - a dict of the number of rows and the split as values. + Fetch dataset configs and their metadata. + Returns a dictionary with config names as keys and metadata as values. """ - # Request parameters - params = dict( - dataset = self.name - ) - + params = dict(dataset=self.name) + attempt = 0 while attempt < self.retry_limit: try: response = requests.get(self.size_base_url, params=params) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - - # Extract the configs dict - configs_dict = response.json()['size']['splits'] - - # Now create a dict with config names (except 'default') as - # keys, and the number of rows as values - configs_data = {entry['config']: {'num_rows': entry['num_rows'] , - 'split': entry['split']} - for entry in configs_dict - if entry['config'] != 'default' - } + response.raise_for_status() + + configs_dict = response.json()["size"]["splits"] + configs_data = { + entry["config"]: { + "num_rows": entry["num_rows"], + "split": entry["split"], + } + for entry in configs_dict + if entry["config"] != "default" + } return configs_data @@ -351,34 +261,44 @@ def fetch_dataset_configs(self) -> typing.Dict[str, typing.Dict]: f"Failed to fetch dataset configs, retrying. Attempt {attempt}/{self.retry_limit}" ) if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry + time.sleep(self.retry_delay) else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") raise - def _fetch_data_for_page(self, page): - - retry_limit = 10 + def _fetch_data_to_buffer(self, num_pages): + """Fetch data to buffer with support for multiple configs.""" + self.pages = [] + attempts = 0 + duplicates = 0 + initial_offset = random.randint(0, self.num_rows_per_page - 1) - attempt = 0 - while attempt < retry_limit: - config_name, page, split = page - - # Create the request parameters - params = dict(dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page - ) + while len(self.pages) < num_pages: + page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] + + if page in self.pages: + duplicates += 1 + if duplicates >= self.duplicate_page_threshold: + bt.logging.debug( + f"Hit duplicate page threshold of {self.duplicate_page_threshold}. " + f"Stopping early at: {len(self.pages)} pages." + ) + break + continue + + config_name, page_row_start, split = page + params = { + "dataset": self.name, + "config": config_name, + "split": split, + "offset": page_row_start, + "length": self.num_rows_per_page, + } try: - response = requests.get(self.rows_base_url, params=params) - - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code + response.raise_for_status() + self.pages.append(page) for row in response.json()["rows"]: content = row["row"]["text"] @@ -386,104 +306,75 @@ def _fetch_data_for_page(self, page): self.buffer += input_ids self.buffer += [self.tokenizer.eos_token_id] - break # If the request was successful, break out of the retry loop - except requests.exceptions.RequestException as e: - attempt += 1 + attempts += 1 bt.logging.warning( - f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + f"Failed to fetch data, retrying. Attempt {attempts}/{self.retry_limit * num_pages}" ) - if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) + if attempts >= num_pages * self.retry_limit: + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") raise + def get_random_pages(self, num_pages, initial_offset): + """Get random pages across different configs.""" + pages = [] + for _ in range(num_pages): + config_name = random.choice(list(self.configs_data.keys())) + data_row_count = self.configs_data[config_name]["num_rows"] - initial_offset + data_page_count = (data_row_count + 1) // self.num_rows_per_page + selected_page_start = initial_offset + ( + random.randint(0, data_page_count - 1) * self.num_rows_per_page + ) + split = self.configs_data[config_name]["split"] + pages.append((config_name, selected_page_start, split)) + return pages + + def fetch_data_to_rows(self, num_pages): + """Fetch data and return raw text rows instead of adding to buffer.""" + downloaded_pages = set() + rows = [] + attempts = 0 + duplicates = 0 + initial_offset = random.randint(0, self.num_rows_per_page - 1) + + while len(downloaded_pages) < num_pages: + page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] + + if page in downloaded_pages: + duplicates += 1 + if duplicates >= self.duplicate_page_threshold: + bt.logging.debug( + f"Hit duplicate page threshold of {self.duplicate_page_threshold}. " + f"Stopping early at: {len(downloaded_pages)} pages." + ) + break + continue + + config_name, page_row_start, split = page + params = { + "dataset": self.name, + "config": config_name, + "split": split, + "offset": page_row_start, + "length": self.num_rows_per_page, + } -class SubsetFalconLoader(SubsetLoader): - max_pages: int = 968000015 - name: str = "tiiuae/falcon-refinedweb" - base_url: str = "https://datasets-server.huggingface.co/rows" - - def __init__( - self, - batch_size, - sequence_length, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, - ): - super().__init__(batch_size, - sequence_length, - num_pages, - tokenizer, - pack_samples) - - self.params = { - "dataset": self.name, - "config": "default", - "split": "train", - } - - self.retry_limit = 10 # Number of retries - self.retry_delay = 5 # Seconds to wait between retries - - # Fetch pages only if the number of pages is specified - if self.num_pages: - pages = self._sample_pages() - self.fetch_data_for_pages(pages) - - - def _fetch_data_for_page(self, page): - self.params["offset"] = page - self.params["limit"] = self.num_rows_per_page - attempt = 0 - while attempt < self.retry_limit: try: - response = requests.get(self.base_url, params=self.params) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code + response = requests.get(self.rows_base_url, params=params) + response.raise_for_status() + downloaded_pages.add(page) + for row in response.json()["rows"]: - content = row["row"]["content"] - input_ids = self.tokenizer(content, truncation=True)["input_ids"] - self.buffer += input_ids - self.buffer += [self.tokenizer.eos_token_id] + rows.append(row["row"]["text"]) - break # If the request was successful, break out of the retry loop except requests.exceptions.RequestException as e: - attempt += 1 + attempts += 1 bt.logging.warning( - f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + f"Failed to fetch data, retrying with a newly sampled page. " + f"Attempt {attempts}/{self.retry_limit * num_pages}" ) - if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) + if attempts >= num_pages * self.retry_limit: + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") raise - def _sample_pages(self): - """ - Randomly sample pages to be used in validation - """ - pages = [ - random.randint(1, self.max_pages) - for _ in range(self.num_pages) - ] - - return pages - - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple - """ - page_names = [] - - if hasattr(self, 'pages'): - page_names = self.pages - - return page_names + return rows diff --git a/pretrain/mining.py b/pretrain/mining.py index 1f0eb668..d9533d0b 100644 --- a/pretrain/mining.py +++ b/pretrain/mining.py @@ -17,20 +17,13 @@ # DEALINGS IN THE SOFTWARE. import os -import sys import time -import torch -import constants - -import bittensor as bt -import pretrain as pt - from dataclasses import replace -from typing import Optional, Dict, Any +from typing import Any, Dict, Optional -from transformers import PreTrainedModel, AutoModelForCausalLM +import bittensor as bt +import huggingface_hub from safetensors.torch import load_model - from taoverse.model import utils as model_utils from taoverse.model.data import Model, ModelId from taoverse.model.storage.chain.chain_model_metadata_store import ( @@ -42,8 +35,10 @@ from taoverse.model.storage.model_metadata_store import ModelMetadataStore from taoverse.model.storage.remote_model_store import RemoteModelStore from taoverse.model.utils import get_hash_of_two_strings +from transformers import AutoModelForCausalLM, PreTrainedModel - +import constants +import pretrain as pt from competitions.data import CompetitionId @@ -60,6 +55,7 @@ async def push( wallet: bt.wallet, competition_id: CompetitionId, retry_delay_secs: int = 60, + update_repo_visibility: bool = False, metadata_store: Optional[ModelMetadataStore] = None, remote_model_store: Optional[RemoteModelStore] = None, ): @@ -71,6 +67,7 @@ async def push( competition_id (CompetitionId): The competition the miner is participating in. wallet (bt.wallet): The wallet of the Miner uploading the model. retry_delay_secs (int): The number of seconds to wait before retrying to push the model to the chain. + update_repo_visibility (bool): Whether to make the repo public after pushing the model. metadata_store (Optional[ModelMetadataStore]): The metadata store. If None, defaults to writing to the chain. remote_model_store (Optional[RemoteModelStore]): The remote model store. If None, defaults to writing to HuggingFace @@ -141,6 +138,15 @@ async def push( bt.logging.error(f"Retrying in {retry_delay_secs} seconds...") time.sleep(retry_delay_secs) + if update_repo_visibility: + bt.logging.debug("Making repo public.") + huggingface_hub.update_repo_visibility( + repo, + private=False, + token=HuggingFaceModelStore.assert_access_token_exists(), + ) + bt.logging.success("Model set to public") + def save(model: PreTrainedModel, model_dir: str): """Saves a model to the provided directory""" diff --git a/requirements.txt b/requirements.txt index 4725d2c6..8c2a5fc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ transformers==4.44.1 wandb datasets flash-attn -taoverse==1.0.6 +taoverse==1.0.9 diff --git a/scripts/upload_model.py b/scripts/upload_model.py index ebe299b6..315c5bd9 100644 --- a/scripts/upload_model.py +++ b/scripts/upload_model.py @@ -63,6 +63,11 @@ def get_config(): parser.add_argument( "--list_competitions", action="store_true", help="Print out all competitions" ) + parser.add_argument( + "--update_repo_visibility", + action="store_true", + help="If true, the repo will be made public after uploading.", + ) # Include wallet and logging arguments from bittensor bt.wallet.add_args(parser) @@ -78,7 +83,7 @@ def get_config(): async def main(config: bt.config): # Create bittensor objects. bt.logging(config=config) - + wallet = bt.wallet(config=config) subtensor = bt.subtensor(config=config) metagraph = subtensor.metagraph(config.netuid) @@ -108,9 +113,10 @@ async def main(config: bt.config): await pt.mining.push( model, config.hf_repo_id, - wallet, + wallet, config.competition_id, metadata_store=chain_metadata_store, + update_repo_visibility=config.update_repo_visibility, ) @@ -122,4 +128,3 @@ async def main(config: bt.config): else: print(config) asyncio.run(main(config)) - diff --git a/tests/pretrain/test_dataset.py b/tests/pretrain/test_dataset.py index 00132395..f62e88b2 100644 --- a/tests/pretrain/test_dataset.py +++ b/tests/pretrain/test_dataset.py @@ -2,6 +2,7 @@ import numpy as np +from collections import defaultdict from competitions.data import CompetitionId from constants import MODEL_CONSTRAINTS_BY_COMPETITION_ID import pretrain as pt @@ -11,38 +12,300 @@ config = config.validator_config() -def test_FineWeb_loader_page_copy(): - """ - Test that pages can be correctly copied from one FineWeb dataloader - to another - """ - # Some test params - NUM_PAGES = 20 +class TestDataset(unittest.TestCase): + def test_fineweb_loader_page_copy(self): + """ + Test that pages can be correctly copied from one FineWeb dataloader + to another + """ + # Some test params + NUM_PAGES = 20 - # Load a tokenizer - tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - cache_dir=config.model_dir, - ) + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) - # First dataloader - dataloader_1 = pt.dataset.SubsetFineWebEdu2Loader( - batch_size=4, sequence_length=4092, num_pages=NUM_PAGES, tokenizer=tokenizer - ) + # First dataloader + dataloader_1 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, sequence_length=4092, num_pages=NUM_PAGES, tokenizer=tokenizer + ) - # Assert that the number of pages loaded successfully are the one required - assert len(dataloader_1.pages) == NUM_PAGES + # Assert that the number of pages loaded successfully are the one required + self.assertEqual(len(dataloader_1.pages), NUM_PAGES) - # Now create a second loader without automatic page loading - dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( - batch_size=4, sequence_length=4092, num_pages=None, tokenizer=tokenizer - ) + # Now create a second loader without automatic page loading + dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, sequence_length=4092, num_pages=None, tokenizer=tokenizer + ) - # Copy pages from the first dataloader - dataloader_2.fetch_data_for_pages(pages=dataloader_1.pages) + # Copy pages from the first dataloader + dataloader_2.fetch_data_for_pages(pages=dataloader_1.pages) - # Assert both dataloaders have the same pages - assert set(dataloader_1.pages) == set(dataloader_2.pages) + # Assert both dataloaders have the same pages + self.assertEqual(set(dataloader_1.pages), set(dataloader_2.pages)) - # Assert that both have the same buffers - assert np.array_equal(dataloader_1.buffer, dataloader_2.buffer) + # Assert that both have the same buffers + self.assertTrue(np.array_equal(dataloader_1.buffer, dataloader_2.buffer)) + + def test_fineweb_loader_unique_pages(self): + """Tests that the fineweb loader only loads unique pages.""" + # Ensure we get all the possible pages of the aritificially shortened data. + NUM_PAGES = 5 + CONFIG_DATA = {"CC-MAIN-2013-20": {"num_rows": 499, "split": "train"}} + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=None, sequence_length=4092, num_pages=None, tokenizer=tokenizer + ) + + dataloader.configs_data = CONFIG_DATA + + # Only fetch these once for performance, although for better correctness should consider running in a loop. + # We check for max pages or max pages - 1 to handle the random offset. + dataloader._fetch_data_to_buffer(NUM_PAGES) + self.assertIn(len(dataloader.pages), [NUM_PAGES, NUM_PAGES - 1]) + self.assertIn(len(set(dataloader.pages)), [NUM_PAGES, NUM_PAGES - 1]) + + rows = dataloader.fetch_data_to_rows(NUM_PAGES) + self.assertIn( + len(rows), + [ + NUM_PAGES * dataloader.num_rows_per_page, + (NUM_PAGES - 1) * dataloader.num_rows_per_page, + ], + ) + self.assertIn( + len(set(rows)), + [ + NUM_PAGES * dataloader.num_rows_per_page, + (NUM_PAGES - 1) * dataloader.num_rows_per_page, + ], + ) + + def test_fineweb_loader_duplicate_threshold(self): + """Tests that the fineweb loader stops loading after hitting too many duplicate pages.""" + # Try to get 6 pages from a set that only contains 5 pages worth. + NUM_PAGES = 6 + NUM_PAGES_ACTUAL = 5 + NUM_ROWS_PER_PAGE = 100 + CONFIG_DATA = {"CC-MAIN-2013-20": {"num_rows": 499, "split": "train"}} + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=None, sequence_length=4092, num_pages=None, tokenizer=tokenizer + ) + + dataloader.configs_data = CONFIG_DATA + dataloader.num_rows_per_page = NUM_ROWS_PER_PAGE + + # Only fetch these once for performance, although for better correctness should consider running in a loop. + # We check for actual pages or actual pages - 1 to handle the random offset. + dataloader._fetch_data_to_buffer(NUM_PAGES) + self.assertIn(len(dataloader.pages), [NUM_PAGES_ACTUAL, NUM_PAGES_ACTUAL - 1]) + self.assertIn( + len(set(dataloader.pages)), [NUM_PAGES_ACTUAL, NUM_PAGES_ACTUAL - 1] + ) + + rows = dataloader.fetch_data_to_rows(NUM_PAGES) + self.assertIn( + len(rows), + [ + NUM_PAGES_ACTUAL * dataloader.num_rows_per_page, + (NUM_PAGES_ACTUAL - 1) * dataloader.num_rows_per_page, + ], + ) + self.assertIn( + len(set(rows)), + [ + NUM_PAGES_ACTUAL * dataloader.num_rows_per_page, + (NUM_PAGES_ACTUAL - 1) * dataloader.num_rows_per_page, + ], + ) + + def test_fineweb_loader_page_offset(self): + """Tests that the fineweb loader will only generate page starts that are num rows per pages apart.""" + # Load a tokenizer. + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=0, sequence_length=4092, num_pages=0, tokenizer=tokenizer + ) + + # Ensure we know the num_rows_per_page. + test_num_rows_per_page = 100 + dataloader.num_rows_per_page = test_num_rows_per_page + + # Create a fake configs data with only 599 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 599, "split": "train"} + } + + # Ensure get random pages returns only 0, 100, 200, 300, 400 and 500. + expected_page_starts_1 = {0, 100, 200, 300, 400, 500} + page_starts_1 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1, initial_offset=0) + _, page_start, _ = random_pages[0] + page_starts_1[page_start] += 1 + + self.assertEqual(set(page_starts_1.keys()), expected_page_starts_1) + + # Create a fake configs data with only 598 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 598, "split": "train"} + } + + # Ensure get random pages returns only 0, 100, 200, 300, and 400 (since 500-598 is not 100 rows). + expected_page_starts_2 = {0, 100, 200, 300, 400} + page_starts_2 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1, initial_offset=0) + _, page_start, _ = random_pages[0] + page_starts_2[page_start] += 1 + + self.assertEqual(set(page_starts_2.keys()), expected_page_starts_2) + + def test_fineweb_loader_page_initial_offset(self): + """Tests that the fineweb loader correctly applies an initial offset to the page starts.""" + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=0, sequence_length=4092, num_pages=0, tokenizer=tokenizer + ) + + # Ensure we know the num_rows_per_page. + test_num_rows_per_page = 100 + dataloader.num_rows_per_page = test_num_rows_per_page + + # Create a fake configs data with only 599 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 599, "split": "train"} + } + + # Define initial offset of 50. + initial_offset = 50 + # Ensure get random pages returns only 50, 150, 250, 350, and 450. + expected_page_starts_1 = {50, 150, 250, 350, 450} + page_starts_1 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1, initial_offset=initial_offset) + _, page_start, _ = random_pages[0] + page_starts_1[page_start] += 1 + + self.assertEqual(set(page_starts_1.keys()), expected_page_starts_1) + + # Create a fake configs data with only 548 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 548, "split": "train"} + } + + # Ensure get random pages returns only 50, 150, 250, and 350 (since 450-548 is not 100 rows) + expected_page_starts_2 = {50, 150, 250, 350} + page_starts_2 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1, initial_offset=initial_offset) + _, page_start, _ = random_pages[0] + page_starts_2[page_start] += 1 + + self.assertEqual(set(page_starts_2.keys()), expected_page_starts_2) + + def test_fineweb_loader_seed(self): + """Tests that the fineweb data loader fetches the same data with the same seed (barring retries).""" + + # Use the same seed for each loader. + RANDOM_SEED = 1 + # Fetch just two pages to keep the test reasonably fast. + NUM_PAGES = 2 + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + # First dataloader. + dataloader_1 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert that the number of pages requested were loaded. + self.assertEqual(len(dataloader_1.pages), NUM_PAGES) + + # Now create a second loader with the same seed. + dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert both dataloaders have the same pages + self.assertEqual(set(dataloader_1.pages), set(dataloader_2.pages)) + + # Assert that both have the same buffers + self.assertTrue(np.array_equal(dataloader_1.buffer, dataloader_2.buffer)) + + def test_falcon_loader_seed(self): + """Tests that the falcon data loader fetches the same data with the same seed.""" + + # Use the same seed for each loader. + RANDOM_SEED = 1 + # Fetch just two pages to keep the test reasonably fast. + NUM_PAGES = 2 + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + # First dataloader. + dataloader_1 = pt.dataset.SubsetFalconLoader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert that the number of pages requested were loaded. + self.assertEqual(len(dataloader_1.pages), NUM_PAGES) + + # Now create a second loader with the same seed. + dataloader_2 = pt.dataset.SubsetFalconLoader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert both dataloaders have the same pages + self.assertEqual(set(dataloader_1.pages), set(dataloader_2.pages)) + + # Assert that both have the same buffers + self.assertTrue(np.array_equal(dataloader_1.buffer, dataloader_2.buffer)) diff --git a/tests/pretrain/test_mining.py b/tests/pretrain/test_mining.py index e2a83188..4b2579c3 100644 --- a/tests/pretrain/test_mining.py +++ b/tests/pretrain/test_mining.py @@ -45,9 +45,10 @@ def _test_push(self, min_expected_block: int = 1): pt.mining.push( model=self.tiny_model, wallet=self.wallet, - competition_id=CompetitionId.B7_MODEL, + competition_id=CompetitionId.B14_MODEL, repo="namespace/name", retry_delay_secs=1, + update_repo_visibility=False, metadata_store=self.metadata_store, remote_model_store=self.remote_store, )