From b6d3fd7d2e0a03ad533cafe1057e72dc6b3bd72a Mon Sep 17 00:00:00 2001 From: Sid Date: Tue, 15 Oct 2024 20:42:12 -0700 Subject: [PATCH 01/21] Ensure fineweb loader chooses page starts offset by rows per page. --- pretrain/dataset.py | 168 ++++++++++++++++----------------- tests/pretrain/test_dataset.py | 104 ++++++++++++++------ 2 files changed, 160 insertions(+), 112 deletions(-) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 86cddc94..289b2076 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -33,13 +33,14 @@ class SubsetLoader(IterableDataset): # TODO: Make this class abstract """ + def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, + self, + batch_size=None, + sequence_length=None, + num_pages=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, ): self.batch_size = batch_size self.sequence_length = sequence_length @@ -72,7 +73,6 @@ def fetch_data_for_pages(self, pages): for page in self.pages: self._fetch_data_for_page(page) - def _get_pad_size(self, input_ids): """ Get the number of tokens to be padded to the sample to match @@ -85,8 +85,8 @@ def _get_pad_size(self, input_ids): sample_size = len(input_ids) - remainder = (sample_size % self.sequence_length) - pad_size = (self.sequence_length - remainder) + remainder = sample_size % self.sequence_length + pad_size = self.sequence_length - remainder # Apply modulo again to guarantee a pad size of 0 if remainder is 0 pad_size = pad_size % self.sequence_length @@ -99,17 +99,14 @@ def _refill_padded_buffer(self): it to the `self.padded_buffer`. """ - while ( - self.buffer - and len(self.padded_buffer) < self.sequence_length - ): + while self.buffer and len(self.padded_buffer) < self.sequence_length: input_ids = [] # search for EOS token index and cut the buffer at it. EOS_index = self.buffer.index(self.tokenizer.eos_token_id) - input_ids = self.buffer[:EOS_index+1] - self.buffer =self.buffer[EOS_index+1:] + input_ids = self.buffer[: EOS_index + 1] + self.buffer = self.buffer[EOS_index + 1 :] self.used_buffer += input_ids @@ -117,7 +114,9 @@ def _refill_padded_buffer(self): self.padded_buffer += input_ids[:-1] # Pad - self.padded_buffer += [self.tokenizer.eos_token_id] * self._get_pad_size(input_ids=input_ids[:-1]) + self.padded_buffer += [self.tokenizer.eos_token_id] * self._get_pad_size( + input_ids=input_ids[:-1] + ) def __iter__(self): self.buffer = self.used_buffer + self.buffer @@ -152,18 +151,16 @@ class SubsetFineWebEdu2Loader(SubsetLoader): retry_delay: int = 5 # Seconds to wait between retries def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, + self, + batch_size=None, + sequence_length=None, + num_pages=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, ): - super().__init__(batch_size, - sequence_length, - num_pages, - tokenizer, - pack_samples) + super().__init__( + batch_size, sequence_length, num_pages, tokenizer, pack_samples + ) # Get the dataset configs and their row sizes self.configs_data = self.fetch_dataset_configs() @@ -174,7 +171,6 @@ def __init__( if self.num_pages: self._fetch_data_to_buffer(self.num_pages) - def _fetch_data_to_buffer(self, num_pages): """ Randomly sample pages and add their data to the buffer. @@ -188,14 +184,15 @@ def _fetch_data_to_buffer(self, num_pages): while len(self.pages) < num_pages: # randomly sample one page - config_name, page, split = self.get_random_pages(num_pages = 1)[0] + config_name, page, split = self.get_random_pages(num_pages=1)[0] # Create the request parameters - params = dict(dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page + params = dict( + dataset=self.name, + config=config_name, + split=split, + offset=page, + limit=self.num_rows_per_page, ) try: @@ -241,14 +238,15 @@ def fetch_data_to_rows(self, num_pages): while num_downloaded_pages < num_pages: # randomly sample one page - config_name, page, split = self.get_random_pages(num_pages = 1)[0] + config_name, page, split = self.get_random_pages(num_pages=1)[0] # Create the request parameters - params = dict(dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page + params = dict( + dataset=self.name, + config=config_name, + split=split, + offset=page, + limit=self.num_rows_per_page, ) try: @@ -275,28 +273,33 @@ def fetch_data_to_rows(self, num_pages): ) raise - return rows def get_random_pages(self, num_pages): """ - Randomly sample one page. - A page is a row number of a given split of a given dataset dump. + Randomly sample num_pages. + A page is a row number of a given split of a given dataset dump offset by num_rows_per_page. """ pages = [] for _ in range(num_pages): - # Choose a random config + # Choose a random config. config_name = random.choice(list(self.configs_data.keys())) - # Choose a random page (row) - page = random.randint(0, - self.configs_data[config_name]['num_rows'] - 1 - self.num_rows_per_page) + # Choose a random page start. + # We do so by chunking the rows of the config data into N pages of length num_rows_per_page. + data_row_count = self.configs_data[config_name]["num_rows"] + # Add 1 to the row count as we are 0 indexed. + data_page_count = (data_row_count + 1) // self.num_rows_per_page + # Select a random page start by taking the randomly selected page and multiplying by num_rows_per_page. + selected_page_start = ( + random.randint(0, data_page_count - 1) * self.num_rows_per_page + ) - split = self.configs_data[config_name]['split'] + split = self.configs_data[config_name]["split"] - pages.append((config_name, page, split)) + pages.append((config_name, selected_page_start, split)) return pages @@ -308,9 +311,11 @@ def get_page_names(self): page_names = [] - if hasattr(self, 'pages'): - page_names = [f'{cfg_name}_{num_rows}_{split}' for - cfg_name, num_rows, split in self.pages] + if hasattr(self, "pages"): + page_names = [ + f"{cfg_name}_{num_rows}_{split}" + for cfg_name, num_rows, split in self.pages + ] return page_names @@ -322,9 +327,7 @@ def fetch_dataset_configs(self) -> typing.Dict[str, typing.Dict]: a dict of the number of rows and the split as values. """ # Request parameters - params = dict( - dataset = self.name - ) + params = dict(dataset=self.name) attempt = 0 while attempt < self.retry_limit: @@ -333,15 +336,18 @@ def fetch_dataset_configs(self) -> typing.Dict[str, typing.Dict]: response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code # Extract the configs dict - configs_dict = response.json()['size']['splits'] + configs_dict = response.json()["size"]["splits"] # Now create a dict with config names (except 'default') as # keys, and the number of rows as values - configs_data = {entry['config']: {'num_rows': entry['num_rows'] , - 'split': entry['split']} - for entry in configs_dict - if entry['config'] != 'default' - } + configs_data = { + entry["config"]: { + "num_rows": entry["num_rows"], + "split": entry["split"], + } + for entry in configs_dict + if entry["config"] != "default" + } return configs_data @@ -367,11 +373,12 @@ def _fetch_data_for_page(self, page): config_name, page, split = page # Create the request parameters - params = dict(dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page + params = dict( + dataset=self.name, + config=config_name, + split=split, + offset=page, + limit=self.num_rows_per_page, ) try: @@ -408,18 +415,16 @@ class SubsetFalconLoader(SubsetLoader): base_url: str = "https://datasets-server.huggingface.co/rows" def __init__( - self, - batch_size, - sequence_length, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, + self, + batch_size, + sequence_length, + num_pages=None, + tokenizer: AutoTokenizer = None, + pack_samples: bool = False, ): - super().__init__(batch_size, - sequence_length, - num_pages, - tokenizer, - pack_samples) + super().__init__( + batch_size, sequence_length, num_pages, tokenizer, pack_samples + ) self.params = { "dataset": self.name, @@ -435,7 +440,6 @@ def __init__( pages = self._sample_pages() self.fetch_data_for_pages(pages) - def _fetch_data_for_page(self, page): self.params["offset"] = page self.params["limit"] = self.num_rows_per_page @@ -468,14 +472,10 @@ def _sample_pages(self): """ Randomly sample pages to be used in validation """ - pages = [ - random.randint(1, self.max_pages) - for _ in range(self.num_pages) - ] + pages = [random.randint(1, self.max_pages) for _ in range(self.num_pages)] return pages - def get_page_names(self): """ This is a utility function that returns the page names that were used. @@ -483,7 +483,7 @@ def get_page_names(self): """ page_names = [] - if hasattr(self, 'pages'): + if hasattr(self, "pages"): page_names = self.pages return page_names diff --git a/tests/pretrain/test_dataset.py b/tests/pretrain/test_dataset.py index 00132395..9f9a5735 100644 --- a/tests/pretrain/test_dataset.py +++ b/tests/pretrain/test_dataset.py @@ -2,6 +2,7 @@ import numpy as np +from collections import defaultdict from competitions.data import CompetitionId from constants import MODEL_CONSTRAINTS_BY_COMPETITION_ID import pretrain as pt @@ -11,38 +12,85 @@ config = config.validator_config() -def test_FineWeb_loader_page_copy(): - """ - Test that pages can be correctly copied from one FineWeb dataloader - to another - """ - # Some test params - NUM_PAGES = 20 +class TestDataset(unittest.TestCase): + def test_fineweb_loader_page_copy(self): + """ + Test that pages can be correctly copied from one FineWeb dataloader + to another + """ + # Some test params + NUM_PAGES = 20 - # Load a tokenizer - tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - cache_dir=config.model_dir, - ) + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + cache_dir=config.model_dir, + ) - # First dataloader - dataloader_1 = pt.dataset.SubsetFineWebEdu2Loader( - batch_size=4, sequence_length=4092, num_pages=NUM_PAGES, tokenizer=tokenizer - ) + # First dataloader + dataloader_1 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, sequence_length=4092, num_pages=NUM_PAGES, tokenizer=tokenizer + ) - # Assert that the number of pages loaded successfully are the one required - assert len(dataloader_1.pages) == NUM_PAGES + # Assert that the number of pages loaded successfully are the one required + assert len(dataloader_1.pages) == NUM_PAGES - # Now create a second loader without automatic page loading - dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( - batch_size=4, sequence_length=4092, num_pages=None, tokenizer=tokenizer - ) + # Now create a second loader without automatic page loading + dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, sequence_length=4092, num_pages=None, tokenizer=tokenizer + ) - # Copy pages from the first dataloader - dataloader_2.fetch_data_for_pages(pages=dataloader_1.pages) + # Copy pages from the first dataloader + dataloader_2.fetch_data_for_pages(pages=dataloader_1.pages) - # Assert both dataloaders have the same pages - assert set(dataloader_1.pages) == set(dataloader_2.pages) + # Assert both dataloaders have the same pages + assert set(dataloader_1.pages) == set(dataloader_2.pages) - # Assert that both have the same buffers - assert np.array_equal(dataloader_1.buffer, dataloader_2.buffer) + # Assert that both have the same buffers + assert np.array_equal(dataloader_1.buffer, dataloader_2.buffer) + + def test_fineweb_loader_page_offset(self): + """Tests that the fineweb loader will only generate page starts that are num rows per pages apart.""" + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=0, sequence_length=4092, num_pages=0, tokenizer=tokenizer + ) + + # Ensure we know the num_rows_per_page. + test_num_rows_per_page = 100 + dataloader.num_rows_per_page = test_num_rows_per_page + + # Create a fake configs data with only 599 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 599, "split": "train"} + } + + # Ensure get random pages returns only 0, 100, 200, 300, 400 and 500 + expected_page_starts_1 = {0, 100, 200, 300, 400, 500} + page_starts_1 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1) + _, page_start, _ = random_pages[0] + page_starts_1[page_start] += 1 + + self.assertEqual(set(page_starts_1.keys()), expected_page_starts_1) + + # Create a fake configs data with only 598 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 598, "split": "train"} + } + + # Ensure get random pages returns only 0, 100, 200, 300, and 400 (since 500-598 is not 100 rows) + expected_page_starts_2 = {0, 100, 200, 300, 400} + page_starts_2 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1) + _, page_start, _ = random_pages[0] + page_starts_2[page_start] += 1 + + self.assertEqual(set(page_starts_2.keys()), expected_page_starts_2) From b9e9957c7352bc44668b8b3bf33b3de65df3ab28 Mon Sep 17 00:00:00 2001 From: Sid Date: Tue, 15 Oct 2024 22:33:10 -0700 Subject: [PATCH 02/21] Ensure the fineweb loader only fetches unique pages. --- pretrain/dataset.py | 49 +++++++++++++++++++++----- tests/pretrain/test_dataset.py | 63 ++++++++++++++++++++++++++++++++-- 2 files changed, 100 insertions(+), 12 deletions(-) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 289b2076..0378d663 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -49,6 +49,7 @@ def __init__( self.pack_samples = pack_samples self.num_rows_per_page = 100 + self.duplicate_page_threshold = 100 # Buffer to hold pages loaded from the api self.buffer = [] @@ -173,25 +174,39 @@ def __init__( def _fetch_data_to_buffer(self, num_pages): """ - Randomly sample pages and add their data to the buffer. + Randomly sample unique pages and add their data to the buffer. If a page is inaccessible, another one is sampled. this method sets the `pages` property """ self.pages = [] attempts = 0 + duplicates = 0 while len(self.pages) < num_pages: # randomly sample one page - config_name, page, split = self.get_random_pages(num_pages=1)[0] + page = self.get_random_pages(num_pages=1)[0] + + # skip the page if we already have it + if page in self.pages: + duplicates += 1 + if duplicates >= self.duplicate_page_threshold: + bt.logging.debug( + f"Hit duplicate page threshold of {self.duplicate_page_threshold}. Stopping early at: {len(self.pages)} pages." + ) + break + else: + continue + + config_name, page_row_start, split = page # Create the request parameters params = dict( dataset=self.name, config=config_name, split=split, - offset=page, + offset=page_row_start, limit=self.num_rows_per_page, ) @@ -201,7 +216,7 @@ def _fetch_data_to_buffer(self, num_pages): response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code # Add the page since the request was successful - self.pages.append((config_name, page, split)) + self.pages.append(page) for row in response.json()["rows"]: content = row["row"]["text"] @@ -231,21 +246,37 @@ def _fetch_data_to_buffer(self, num_pages): def fetch_data_to_rows(self, num_pages): + # This explicitly does not set the pages property, simply keeping track for duplicates. + # We also use a set here as the order does not matter. + downloaded_pages = set() rows = [] attempts = 0 - num_downloaded_pages = 0 + duplicates = 0 - while num_downloaded_pages < num_pages: + while len(downloaded_pages) < num_pages: # randomly sample one page - config_name, page, split = self.get_random_pages(num_pages=1)[0] + page = self.get_random_pages(num_pages=1)[0] + + # skip the page if we already have it + if page in downloaded_pages: + duplicates += 1 + if duplicates >= self.duplicate_page_threshold: + bt.logging.debug( + f"Hit duplicate page threshold of {self.duplicate_page_threshold}. Stopping early at: {len(downloaded_pages)} pages." + ) + break + else: + continue + + config_name, page_row_start, split = page # Create the request parameters params = dict( dataset=self.name, config=config_name, split=split, - offset=page, + offset=page_row_start, limit=self.num_rows_per_page, ) @@ -254,7 +285,7 @@ def fetch_data_to_rows(self, num_pages): response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - num_downloaded_pages += 1 + downloaded_pages.add(page) for row in response.json()["rows"]: rows.append(row["row"]["text"]) diff --git a/tests/pretrain/test_dataset.py b/tests/pretrain/test_dataset.py index 9f9a5735..e7e6f133 100644 --- a/tests/pretrain/test_dataset.py +++ b/tests/pretrain/test_dataset.py @@ -33,7 +33,7 @@ def test_fineweb_loader_page_copy(self): ) # Assert that the number of pages loaded successfully are the one required - assert len(dataloader_1.pages) == NUM_PAGES + self.assertEqual(len(dataloader_1.pages), NUM_PAGES) # Now create a second loader without automatic page loading dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( @@ -44,10 +44,67 @@ def test_fineweb_loader_page_copy(self): dataloader_2.fetch_data_for_pages(pages=dataloader_1.pages) # Assert both dataloaders have the same pages - assert set(dataloader_1.pages) == set(dataloader_2.pages) + self.assertEqual(set(dataloader_1.pages), set(dataloader_2.pages)) # Assert that both have the same buffers - assert np.array_equal(dataloader_1.buffer, dataloader_2.buffer) + self.assertTrue(np.array_equal(dataloader_1.buffer, dataloader_2.buffer)) + + def test_fineweb_loader_unique_pages(self): + """Tests that the fineweb loader only loads unique pages.""" + # Ensure we get all 5 possible pages of the aritificially shortened data. + NUM_PAGES = 5 + CONFIG_DATA = {"CC-MAIN-2013-20": {"num_rows": 499, "split": "train"}} + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=None, sequence_length=4092, num_pages=None, tokenizer=tokenizer + ) + + dataloader.configs_data = CONFIG_DATA + + # Only fetch these once for performance, although for better correctness should consider running in a loop. + dataloader._fetch_data_to_buffer(NUM_PAGES) + self.assertEqual(len(dataloader.pages), NUM_PAGES) + self.assertEqual(len(set(dataloader.pages)), NUM_PAGES) + + rows = dataloader.fetch_data_to_rows(NUM_PAGES) + self.assertEqual(len(rows), NUM_PAGES * dataloader.num_rows_per_page) + self.assertEqual(len(set(rows)), NUM_PAGES * dataloader.num_rows_per_page) + + def test_fineweb_loader_duplicate_threshold(self): + """Tests that the fineweb loader stops loading after hitting too many duplciate pages.""" + # Try to get 6 pages from a set that only contains 5 pages worth. + NUM_PAGES = 6 + NUM_PAGES_ACTUAL = 5 + CONFIG_DATA = {"CC-MAIN-2013-20": {"num_rows": 499, "split": "train"}} + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=None, sequence_length=4092, num_pages=None, tokenizer=tokenizer + ) + + dataloader.configs_data = CONFIG_DATA + + # Only fetch these once for performance, although for better correctness should consider running in a loop. + dataloader._fetch_data_to_buffer(NUM_PAGES) + self.assertEqual(len(dataloader.pages), NUM_PAGES_ACTUAL) + self.assertEqual(len(set(dataloader.pages)), NUM_PAGES_ACTUAL) + + rows = dataloader.fetch_data_to_rows(NUM_PAGES) + self.assertEqual(len(rows), NUM_PAGES_ACTUAL * dataloader.num_rows_per_page) + self.assertEqual( + len(set(rows)), NUM_PAGES_ACTUAL * dataloader.num_rows_per_page + ) def test_fineweb_loader_page_offset(self): """Tests that the fineweb loader will only generate page starts that are num rows per pages apart.""" From fa181f9c6dc1e8f6d2f4ebb71e7707aec18934e9 Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 24 Oct 2024 09:10:32 -0700 Subject: [PATCH 03/21] Also use a random initial offset for the page starts. --- pretrain/dataset.py | 18 ++++-- tests/pretrain/test_dataset.py | 106 ++++++++++++++++++++++++++++----- 2 files changed, 102 insertions(+), 22 deletions(-) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 0378d663..45b19267 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -183,10 +183,13 @@ def _fetch_data_to_buffer(self, num_pages): attempts = 0 duplicates = 0 + # Choose a consistent initial offset for the random pages so we do not overlap on each page get. + initial_offset = random.randint(0, self.num_rows_per_page - 1) + while len(self.pages) < num_pages: # randomly sample one page - page = self.get_random_pages(num_pages=1)[0] + page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] # skip the page if we already have it if page in self.pages: @@ -252,11 +255,13 @@ def fetch_data_to_rows(self, num_pages): rows = [] attempts = 0 duplicates = 0 + # Choose a consistent initial offset for the random pages so we do not overlap on each page get. + initial_offset = random.randint(0, self.num_rows_per_page - 1) while len(downloaded_pages) < num_pages: # randomly sample one page - page = self.get_random_pages(num_pages=1)[0] + page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] # skip the page if we already have it if page in downloaded_pages: @@ -306,9 +311,9 @@ def fetch_data_to_rows(self, num_pages): return rows - def get_random_pages(self, num_pages): + def get_random_pages(self, num_pages, initial_offset): """ - Randomly sample num_pages. + Randomly sample num_pages with an intiial offset. A page is a row number of a given split of a given dataset dump offset by num_rows_per_page. """ pages = [] @@ -320,11 +325,12 @@ def get_random_pages(self, num_pages): # Choose a random page start. # We do so by chunking the rows of the config data into N pages of length num_rows_per_page. - data_row_count = self.configs_data[config_name]["num_rows"] + # We remove the initial offset from the total rows in doing this calculation to ensure we don't go over. + data_row_count = self.configs_data[config_name]["num_rows"] - initial_offset # Add 1 to the row count as we are 0 indexed. data_page_count = (data_row_count + 1) // self.num_rows_per_page # Select a random page start by taking the randomly selected page and multiplying by num_rows_per_page. - selected_page_start = ( + selected_page_start = initial_offset + ( random.randint(0, data_page_count - 1) * self.num_rows_per_page ) diff --git a/tests/pretrain/test_dataset.py b/tests/pretrain/test_dataset.py index e7e6f133..2a4a1b69 100644 --- a/tests/pretrain/test_dataset.py +++ b/tests/pretrain/test_dataset.py @@ -51,7 +51,7 @@ def test_fineweb_loader_page_copy(self): def test_fineweb_loader_unique_pages(self): """Tests that the fineweb loader only loads unique pages.""" - # Ensure we get all 5 possible pages of the aritificially shortened data. + # Ensure we get all the possible pages of the aritificially shortened data. NUM_PAGES = 5 CONFIG_DATA = {"CC-MAIN-2013-20": {"num_rows": 499, "split": "train"}} @@ -68,16 +68,29 @@ def test_fineweb_loader_unique_pages(self): dataloader.configs_data = CONFIG_DATA # Only fetch these once for performance, although for better correctness should consider running in a loop. + # We check for max pages or max pages - 1 to handle the random offset. dataloader._fetch_data_to_buffer(NUM_PAGES) - self.assertEqual(len(dataloader.pages), NUM_PAGES) - self.assertEqual(len(set(dataloader.pages)), NUM_PAGES) + self.assertIn(len(dataloader.pages), [NUM_PAGES, NUM_PAGES - 1]) + self.assertIn(len(set(dataloader.pages)), [NUM_PAGES, NUM_PAGES - 1]) rows = dataloader.fetch_data_to_rows(NUM_PAGES) - self.assertEqual(len(rows), NUM_PAGES * dataloader.num_rows_per_page) - self.assertEqual(len(set(rows)), NUM_PAGES * dataloader.num_rows_per_page) + self.assertIn( + len(rows), + [ + NUM_PAGES * dataloader.num_rows_per_page, + (NUM_PAGES - 1) * dataloader.num_rows_per_page, + ], + ) + self.assertIn( + len(set(rows)), + [ + NUM_PAGES * dataloader.num_rows_per_page, + (NUM_PAGES - 1) * dataloader.num_rows_per_page, + ], + ) def test_fineweb_loader_duplicate_threshold(self): - """Tests that the fineweb loader stops loading after hitting too many duplciate pages.""" + """Tests that the fineweb loader stops loading after hitting too many duplicate pages.""" # Try to get 6 pages from a set that only contains 5 pages worth. NUM_PAGES = 6 NUM_PAGES_ACTUAL = 5 @@ -96,19 +109,32 @@ def test_fineweb_loader_duplicate_threshold(self): dataloader.configs_data = CONFIG_DATA # Only fetch these once for performance, although for better correctness should consider running in a loop. + # We check for actual pages or actual pages - 1 to handle the random offset. dataloader._fetch_data_to_buffer(NUM_PAGES) - self.assertEqual(len(dataloader.pages), NUM_PAGES_ACTUAL) - self.assertEqual(len(set(dataloader.pages)), NUM_PAGES_ACTUAL) + self.assertIn(len(dataloader.pages), [NUM_PAGES_ACTUAL, NUM_PAGES_ACTUAL - 1]) + self.assertIn( + len(set(dataloader.pages)), [NUM_PAGES_ACTUAL, NUM_PAGES_ACTUAL - 1] + ) rows = dataloader.fetch_data_to_rows(NUM_PAGES) - self.assertEqual(len(rows), NUM_PAGES_ACTUAL * dataloader.num_rows_per_page) - self.assertEqual( - len(set(rows)), NUM_PAGES_ACTUAL * dataloader.num_rows_per_page + self.assertIn( + len(rows), + [ + NUM_PAGES_ACTUAL * dataloader.num_rows_per_page, + (NUM_PAGES_ACTUAL - 1) * dataloader.num_rows_per_page, + ], + ) + self.assertIn( + len(set(rows)), + [ + NUM_PAGES_ACTUAL * dataloader.num_rows_per_page, + (NUM_PAGES_ACTUAL - 1) * dataloader.num_rows_per_page, + ], ) def test_fineweb_loader_page_offset(self): """Tests that the fineweb loader will only generate page starts that are num rows per pages apart.""" - # Load a tokenizer + # Load a tokenizer. tokenizer = pt.model.load_tokenizer( MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], cache_dir=config.model_dir, @@ -127,11 +153,11 @@ def test_fineweb_loader_page_offset(self): "CC-MAIN-2013-20": {"num_rows": 599, "split": "train"} } - # Ensure get random pages returns only 0, 100, 200, 300, 400 and 500 + # Ensure get random pages returns only 0, 100, 200, 300, 400 and 500. expected_page_starts_1 = {0, 100, 200, 300, 400, 500} page_starts_1 = defaultdict(int) for _ in range(1000): - random_pages = dataloader.get_random_pages(1) + random_pages = dataloader.get_random_pages(1, initial_offset=0) _, page_start, _ = random_pages[0] page_starts_1[page_start] += 1 @@ -142,11 +168,59 @@ def test_fineweb_loader_page_offset(self): "CC-MAIN-2013-20": {"num_rows": 598, "split": "train"} } - # Ensure get random pages returns only 0, 100, 200, 300, and 400 (since 500-598 is not 100 rows) + # Ensure get random pages returns only 0, 100, 200, 300, and 400 (since 500-598 is not 100 rows). expected_page_starts_2 = {0, 100, 200, 300, 400} page_starts_2 = defaultdict(int) for _ in range(1000): - random_pages = dataloader.get_random_pages(1) + random_pages = dataloader.get_random_pages(1, initial_offset=0) + _, page_start, _ = random_pages[0] + page_starts_2[page_start] += 1 + + self.assertEqual(set(page_starts_2.keys()), expected_page_starts_2) + + def test_fineweb_loader_page_initial_offset(self): + """Tests that the fineweb loader correctly applies an initial offset to the page starts.""" + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + cache_dir=config.model_dir, + ) + + dataloader = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=0, sequence_length=4092, num_pages=0, tokenizer=tokenizer + ) + + # Ensure we know the num_rows_per_page. + test_num_rows_per_page = 100 + dataloader.num_rows_per_page = test_num_rows_per_page + + # Create a fake configs data with only 599 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 599, "split": "train"} + } + + # Define initial offset of 50. + initial_offset = 50 + # Ensure get random pages returns only 50, 150, 250, 350, and 450. + expected_page_starts_1 = {50, 150, 250, 350, 450} + page_starts_1 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1, initial_offset=initial_offset) + _, page_start, _ = random_pages[0] + page_starts_1[page_start] += 1 + + self.assertEqual(set(page_starts_1.keys()), expected_page_starts_1) + + # Create a fake configs data with only 548 rows. + dataloader.configs_data = { + "CC-MAIN-2013-20": {"num_rows": 548, "split": "train"} + } + + # Ensure get random pages returns only 50, 150, 250, and 350 (since 450-548 is not 100 rows) + expected_page_starts_2 = {50, 150, 250, 350} + page_starts_2 = defaultdict(int) + for _ in range(1000): + random_pages = dataloader.get_random_pages(1, initial_offset=initial_offset) _, page_start, _ = random_pages[0] page_starts_2[page_start] += 1 From 63386692dfcab394e0cd1b1f2f158573af375d0e Mon Sep 17 00:00:00 2001 From: Sid Date: Wed, 23 Oct 2024 10:42:12 -0700 Subject: [PATCH 04/21] Use a seed based on sync block for data loading. --- constants/__init__.py | 244 ++------------------------------- neurons/validator.py | 25 ++-- pretrain/dataset.py | 11 +- requirements.txt | 2 +- scripts/upload_model.py | 5 +- tests/pretrain/test_dataset.py | 92 ++++++++++++- tests/pretrain/test_mining.py | 2 +- 7 files changed, 125 insertions(+), 256 deletions(-) diff --git a/constants/__init__.py b/constants/__init__.py index 052f3c26..c7cdcea3 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -99,131 +99,20 @@ CompetitionId.B14_MODEL: pt.dataset.SubsetFineWebEdu2Loader, } -# Defined model constraints by competition id to ensure they are constant across blocks. -MODEL_CONSTRAINTS_BY_COMPETITION_ID: Dict[CompetitionId, ModelConstraints] = { - CompetitionId.M772_MODEL: ModelConstraints( - max_model_parameter_size=772_000_000, - min_model_parameter_size=572_000_000, - sequence_length=1024, - allowed_architectures=ALLOWED_MODEL_TYPES_1, - tokenizer="distilgpt2", - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=5 * 1024 * 1024 * 1024, - ), - CompetitionId.B7_MODEL: ModelConstraints( - max_model_parameter_size=6_900_000_000, - min_model_parameter_size=6_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B3_MODEL: ModelConstraints( - max_model_parameter_size=3_400_000_000, - min_model_parameter_size=3_200_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B14_MODEL: ModelConstraints( - max_model_parameter_size=13_900_000_000, - min_model_parameter_size=13_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=FixedEpsilon(0.005), - max_bytes=29 * 1024 * 1024 * 1024, - ), -} +# Synchronize on blocks roughly every 30 minutes. +SYNC_BLOCK_CADENCE = 150 +# Delay at least as long as the sync block cadence with an additional buffer. +EVAL_BLOCK_DELAY = SYNC_BLOCK_CADENCE + 100 # Defined model constraints by competition id with decaying epsilon -MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY: Dict[ - CompetitionId, ModelConstraints -] = { - CompetitionId.M772_MODEL: ModelConstraints( - max_model_parameter_size=772_000_000, - min_model_parameter_size=572_000_000, - sequence_length=1024, - allowed_architectures=ALLOWED_MODEL_TYPES_1, - tokenizer="distilgpt2", - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 50400), - max_bytes=5 * 1024 * 1024 * 1024, - ), - CompetitionId.B3_MODEL: ModelConstraints( - max_model_parameter_size=3_400_000_000, - min_model_parameter_size=3_200_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 50400), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B7_MODEL: ModelConstraints( - max_model_parameter_size=6_900_000_000, - min_model_parameter_size=6_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 50400), - max_bytes=15 * 1024 * 1024 * 1024, - ), - CompetitionId.B14_MODEL: ModelConstraints( - max_model_parameter_size=13_900_000_000, - min_model_parameter_size=13_700_000_000, - sequence_length=4096, - allowed_architectures=ALLOWED_MODEL_TYPES_2, - tokenizer="Xenova/gpt-4", - kwargs={ - "torch_dtype": torch.bfloat16, - "attn_implementation": "flash_attention_2", - }, - eval_block_delay=0, - epsilon_func=LinearDecay(0.005, 0.001, 100800), - max_bytes=29 * 1024 * 1024 * 1024, - ), -} - -# Defined model constraints by competition id with decaying epsilon -MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2: Dict[ - CompetitionId, ModelConstraints -] = { +MODEL_CONSTRAINTS_BY_COMPETITION_ID: Dict[CompetitionId, ModelConstraints] = { CompetitionId.M772_MODEL: ModelConstraints( max_model_parameter_size=772_000_000, min_model_parameter_size=572_000_000, sequence_length=1024, allowed_architectures=ALLOWED_MODEL_TYPES_1, tokenizer="distilgpt2", - eval_block_delay=0, + eval_block_delay=EVAL_BLOCK_DELAY, epsilon_func=LinearDecay(0.005, 0.0001, 50400), max_bytes=5 * 1024 * 1024 * 1024, ), @@ -237,7 +126,7 @@ "torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2", }, - eval_block_delay=0, + eval_block_delay=EVAL_BLOCK_DELAY, epsilon_func=LinearDecay(0.005, 0.0001, 50400), max_bytes=15 * 1024 * 1024 * 1024, ), @@ -251,42 +140,16 @@ "torch_dtype": torch.bfloat16, "attn_implementation": "flash_attention_2", }, - eval_block_delay=0, + eval_block_delay=EVAL_BLOCK_DELAY, epsilon_func=LinearDecay(0.005, 0.0001, 72000), max_bytes=29 * 1024 * 1024 * 1024, ), } - # Schedule of competitions by block. COMPETITION_SCHEDULE_BY_BLOCK: List[Tuple[int, List[Competition]]] = [ ( 0, - [ - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - 1.0, - ) - ], - ), - ( - 3_565_190, - [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.M772_MODEL], - 0.35, - ), - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - 0.65, - ), - ], - ), - ( - BLOCK_3B_7BSTAR_UNPACK, [ Competition( CompetitionId.M772_MODEL, @@ -298,94 +161,9 @@ MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B3_MODEL], 0.29, ), - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], - 0.57, - ), - ], - ), - ( - 3_750_683, - [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.M772_MODEL - ], - 0.14, - ), - Competition( - CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B3_MODEL - ], - 0.29, - ), - Competition( - CompetitionId.B7_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B7_MODEL - ], - 0.15, - ), - Competition( - CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B14_MODEL - ], - 0.42, - ), - ], - ), - ( - 3_849_722, - [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.M772_MODEL - ], - 0.14, - ), - Competition( - CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B3_MODEL - ], - 0.29, - ), Competition( CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY[ - CompetitionId.B14_MODEL - ], - 0.57, - ), - ], - ), - ( - BLOCK_SAMPLE_PACK, - [ - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2[ - CompetitionId.M772_MODEL - ], - 0.14, - ), - Competition( - CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2[ - CompetitionId.B3_MODEL - ], - 0.29, - ), - Competition( - CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2[ - CompetitionId.B14_MODEL - ], + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], 0.57, ), ], @@ -427,9 +205,7 @@ sample_min = 5 # Max number of uids that can be either pending eval or currently being evaluated. # We allow the sample_min per competition + 10 additional models to be held at any one time. -updated_models_limit = ( - sample_min * len(MODEL_CONSTRAINTS_BY_COMPETITION_ID_LINEAR_DECAY_2) + 10 -) +updated_models_limit = sample_min * len(MODEL_CONSTRAINTS_BY_COMPETITION_ID) + 10 # time required between updates to the chain. chain_update_cadence = dt.timedelta(minutes=20) # Number of blocks required between retrying evaluation of a model. diff --git a/neurons/validator.py b/neurons/validator.py index 4681e477..94a1aa7d 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -800,22 +800,26 @@ async def run_step(self): competition.constraints, cache_dir=self.config.model_dir ) - if cur_block >= constants.sample_pack_block: - pack_samples = True - pages_per_eval = constants.pages_per_eval_pack - else: - pack_samples = False - pages_per_eval = constants.pages_per_eval_unpack + # Use sample packing. + pack_samples = True - # If the option is set in the config, override + # If the option is set in the config, override. pages_per_eval = ( self.config.pages_per_eval if self.config.pages_per_eval is not None - else pages_per_eval + else constants.pages_per_eval_pack ) - bt.logging.debug(f"Sample packing is set to: {pack_samples}.") - bt.logging.debug(f"Number of pages per evaluation step is: {pages_per_eval}") + # Try to synchronize the data used by validators. + try: + seed = metagraph_utils.get_hash_of_sync_block( + self.subtensor, constants.SYNC_BLOCK_CADENCE + ) + except: + seed = None + + bt.logging.debug(f"Number of pages per evaluation step is: {pages_per_eval}.") + bt.logging.debug(f"Seed used for loading data is: {seed}.") dataloader = SubsetDataLoader( batch_size=constants.batch_size, @@ -823,6 +827,7 @@ async def run_step(self): num_pages=pages_per_eval, tokenizer=tokenizer, pack_samples=pack_samples, + seed=seed, ) batches = list(dataloader) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 45b19267..61bbbfb7 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -41,6 +41,7 @@ def __init__( num_pages=None, tokenizer: AutoTokenizer = None, pack_samples: bool = False, + random_seed: typing.Optional[int] = None, ): self.batch_size = batch_size self.sequence_length = sequence_length @@ -48,6 +49,10 @@ def __init__( self.tokenizer = tokenizer self.pack_samples = pack_samples + # Initialize with seed if provided. + if random_seed is not None: + random.seed(random_seed) + self.num_rows_per_page = 100 self.duplicate_page_threshold = 100 @@ -158,9 +163,10 @@ def __init__( num_pages=None, tokenizer: AutoTokenizer = None, pack_samples: bool = False, + random_seed: typing.Optional[int] = None, ): super().__init__( - batch_size, sequence_length, num_pages, tokenizer, pack_samples + batch_size, sequence_length, num_pages, tokenizer, pack_samples, random_seed ) # Get the dataset configs and their row sizes @@ -458,9 +464,10 @@ def __init__( num_pages=None, tokenizer: AutoTokenizer = None, pack_samples: bool = False, + random_seed: typing.Optional[int] = None, ): super().__init__( - batch_size, sequence_length, num_pages, tokenizer, pack_samples + batch_size, sequence_length, num_pages, tokenizer, pack_samples, random_seed ) self.params = { diff --git a/requirements.txt b/requirements.txt index 4725d2c6..22130ec6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ transformers==4.44.1 wandb datasets flash-attn -taoverse==1.0.6 +taoverse==1.0.8 diff --git a/scripts/upload_model.py b/scripts/upload_model.py index ebe299b6..5d197e0f 100644 --- a/scripts/upload_model.py +++ b/scripts/upload_model.py @@ -78,7 +78,7 @@ def get_config(): async def main(config: bt.config): # Create bittensor objects. bt.logging(config=config) - + wallet = bt.wallet(config=config) subtensor = bt.subtensor(config=config) metagraph = subtensor.metagraph(config.netuid) @@ -108,7 +108,7 @@ async def main(config: bt.config): await pt.mining.push( model, config.hf_repo_id, - wallet, + wallet, config.competition_id, metadata_store=chain_metadata_store, ) @@ -122,4 +122,3 @@ async def main(config: bt.config): else: print(config) asyncio.run(main(config)) - diff --git a/tests/pretrain/test_dataset.py b/tests/pretrain/test_dataset.py index 2a4a1b69..a11d54f5 100644 --- a/tests/pretrain/test_dataset.py +++ b/tests/pretrain/test_dataset.py @@ -23,7 +23,7 @@ def test_fineweb_loader_page_copy(self): # Load a tokenizer tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], cache_dir=config.model_dir, ) @@ -57,7 +57,7 @@ def test_fineweb_loader_unique_pages(self): # Load a tokenizer tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], cache_dir=config.model_dir, ) @@ -98,7 +98,7 @@ def test_fineweb_loader_duplicate_threshold(self): # Load a tokenizer tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], cache_dir=config.model_dir, ) @@ -136,7 +136,7 @@ def test_fineweb_loader_page_offset(self): """Tests that the fineweb loader will only generate page starts that are num rows per pages apart.""" # Load a tokenizer. tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], cache_dir=config.model_dir, ) @@ -182,7 +182,7 @@ def test_fineweb_loader_page_initial_offset(self): """Tests that the fineweb loader correctly applies an initial offset to the page starts.""" # Load a tokenizer tokenizer = pt.model.load_tokenizer( - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B7_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], cache_dir=config.model_dir, ) @@ -225,3 +225,85 @@ def test_fineweb_loader_page_initial_offset(self): page_starts_2[page_start] += 1 self.assertEqual(set(page_starts_2.keys()), expected_page_starts_2) + + def test_fineweb_loader_seed(self): + """Tests that the fineweb data loader fetches the same data with the same seed (barring retries).""" + + # Use the same seed for each loader. + RANDOM_SEED = 1 + # Fetch just two pages to keep the test reasonably fast. + NUM_PAGES = 2 + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + # First dataloader. + dataloader_1 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert that the number of pages requested were loaded. + self.assertEqual(len(dataloader_1.pages), NUM_PAGES) + + # Now create a second loader with the same seed. + dataloader_2 = pt.dataset.SubsetFineWebEdu2Loader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert both dataloaders have the same pages + self.assertEqual(set(dataloader_1.pages), set(dataloader_2.pages)) + + # Assert that both have the same buffers + self.assertTrue(np.array_equal(dataloader_1.buffer, dataloader_2.buffer)) + + def test_falcon_loader_seed(self): + """Tests that the falcon data loader fetches the same data with the same seed.""" + + # Use the same seed for each loader. + RANDOM_SEED = 1 + # Fetch just two pages to keep the test reasonably fast. + NUM_PAGES = 2 + + # Load a tokenizer + tokenizer = pt.model.load_tokenizer( + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + cache_dir=config.model_dir, + ) + + # First dataloader. + dataloader_1 = pt.dataset.SubsetFalconLoader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert that the number of pages requested were loaded. + self.assertEqual(len(dataloader_1.pages), NUM_PAGES) + + # Now create a second loader with the same seed. + dataloader_2 = pt.dataset.SubsetFalconLoader( + batch_size=4, + sequence_length=4092, + num_pages=NUM_PAGES, + tokenizer=tokenizer, + random_seed=RANDOM_SEED, + ) + + # Assert both dataloaders have the same pages + self.assertEqual(set(dataloader_1.pages), set(dataloader_2.pages)) + + # Assert that both have the same buffers + self.assertTrue(np.array_equal(dataloader_1.buffer, dataloader_2.buffer)) diff --git a/tests/pretrain/test_mining.py b/tests/pretrain/test_mining.py index e2a83188..279aca07 100644 --- a/tests/pretrain/test_mining.py +++ b/tests/pretrain/test_mining.py @@ -45,7 +45,7 @@ def _test_push(self, min_expected_block: int = 1): pt.mining.push( model=self.tiny_model, wallet=self.wallet, - competition_id=CompetitionId.B7_MODEL, + competition_id=CompetitionId.B14_MODEL, repo="namespace/name", retry_delay_secs=1, metadata_store=self.metadata_store, From 6350eade4531592df1c2a493fe98198267fcf29d Mon Sep 17 00:00:00 2001 From: Sid Date: Fri, 25 Oct 2024 14:15:46 -0700 Subject: [PATCH 05/21] Use correct argument name. --- neurons/validator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neurons/validator.py b/neurons/validator.py index 94a1aa7d..163e41eb 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -827,7 +827,7 @@ async def run_step(self): num_pages=pages_per_eval, tokenizer=tokenizer, pack_samples=pack_samples, - seed=seed, + random_seed=seed, ) batches = list(dataloader) From 798bcb008dff3ce9f8436d07dae90c5c4dc1918e Mon Sep 17 00:00:00 2001 From: Sid Date: Wed, 30 Oct 2024 16:24:13 -0700 Subject: [PATCH 06/21] Log RevisionNotFound like RepositoryNotFound errors. --- neurons/validator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/neurons/validator.py b/neurons/validator.py index 163e41eb..e3a3c0d3 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -40,7 +40,7 @@ import torch import wandb -from huggingface_hub.utils import RepositoryNotFoundError +from huggingface_hub.utils import RepositoryNotFoundError, RevisionNotFoundError from rich.console import Console from rich.table import Table from taoverse.metagraph import utils as metagraph_utils @@ -491,6 +491,8 @@ def update_models(self): except RepositoryNotFoundError as e: bt.logging.trace(e) + except RevisionNotFoundError as e: + bt.logging.trace(e) except MinerMisconfiguredError as e: bt.logging.trace(e) except Exception as e: From c1cae423df43d6e8fdc6916dad9663e83a7fe1af Mon Sep 17 00:00:00 2001 From: Sid Date: Wed, 30 Oct 2024 16:46:36 -0700 Subject: [PATCH 07/21] Combine except for both Repo and Rev. --- neurons/validator.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index e3a3c0d3..68ff8ce9 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -489,9 +489,7 @@ def update_models(self): f"Failed to find metadata for uid {next_uid} with hotkey {hotkey}" ) - except RepositoryNotFoundError as e: - bt.logging.trace(e) - except RevisionNotFoundError as e: + except (RepositoryNotFoundError, RevisionNotFoundError) as e: bt.logging.trace(e) except MinerMisconfiguredError as e: bt.logging.trace(e) From 8335b4afa993099f4320af7815f52a8a00914faf Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 31 Oct 2024 09:41:13 -0700 Subject: [PATCH 08/21] Add argument to make repo public after model upload. --- pretrain/mining.py | 12 ++++++++++++ scripts/upload_model.py | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/pretrain/mining.py b/pretrain/mining.py index 1f0eb668..64c19d6a 100644 --- a/pretrain/mining.py +++ b/pretrain/mining.py @@ -23,6 +23,7 @@ import constants import bittensor as bt +import huggingface_hub import pretrain as pt from dataclasses import replace @@ -60,6 +61,7 @@ async def push( wallet: bt.wallet, competition_id: CompetitionId, retry_delay_secs: int = 60, + update_repo_visibility: bool = False, metadata_store: Optional[ModelMetadataStore] = None, remote_model_store: Optional[RemoteModelStore] = None, ): @@ -71,6 +73,7 @@ async def push( competition_id (CompetitionId): The competition the miner is participating in. wallet (bt.wallet): The wallet of the Miner uploading the model. retry_delay_secs (int): The number of seconds to wait before retrying to push the model to the chain. + update_repo_visibility (bool): Whether to make the repo public after pushing the model. metadata_store (Optional[ModelMetadataStore]): The metadata store. If None, defaults to writing to the chain. remote_model_store (Optional[RemoteModelStore]): The remote model store. If None, defaults to writing to HuggingFace @@ -141,6 +144,15 @@ async def push( bt.logging.error(f"Retrying in {retry_delay_secs} seconds...") time.sleep(retry_delay_secs) + if update_repo_visibility: + bt.logging.debug("Making repo public.") + huggingface_hub.update_repo_visibility( + repo, + private=False, + token=HuggingFaceModelStore.assert_access_token_exists(), + ) + bt.logging.success("Model set to public") + def save(model: PreTrainedModel, model_dir: str): """Saves a model to the provided directory""" diff --git a/scripts/upload_model.py b/scripts/upload_model.py index 5d197e0f..315c5bd9 100644 --- a/scripts/upload_model.py +++ b/scripts/upload_model.py @@ -63,6 +63,11 @@ def get_config(): parser.add_argument( "--list_competitions", action="store_true", help="Print out all competitions" ) + parser.add_argument( + "--update_repo_visibility", + action="store_true", + help="If true, the repo will be made public after uploading.", + ) # Include wallet and logging arguments from bittensor bt.wallet.add_args(parser) @@ -111,6 +116,7 @@ async def main(config: bt.config): wallet, config.competition_id, metadata_store=chain_metadata_store, + update_repo_visibility=config.update_repo_visibility, ) From 25ae02ea66a42a4f434318510470549d395842d8 Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 31 Oct 2024 09:41:55 -0700 Subject: [PATCH 09/21] Organize mining imports. --- pretrain/mining.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pretrain/mining.py b/pretrain/mining.py index 64c19d6a..d9533d0b 100644 --- a/pretrain/mining.py +++ b/pretrain/mining.py @@ -17,21 +17,13 @@ # DEALINGS IN THE SOFTWARE. import os -import sys import time -import torch -import constants +from dataclasses import replace +from typing import Any, Dict, Optional import bittensor as bt import huggingface_hub -import pretrain as pt - -from dataclasses import replace -from typing import Optional, Dict, Any - -from transformers import PreTrainedModel, AutoModelForCausalLM from safetensors.torch import load_model - from taoverse.model import utils as model_utils from taoverse.model.data import Model, ModelId from taoverse.model.storage.chain.chain_model_metadata_store import ( @@ -43,8 +35,10 @@ from taoverse.model.storage.model_metadata_store import ModelMetadataStore from taoverse.model.storage.remote_model_store import RemoteModelStore from taoverse.model.utils import get_hash_of_two_strings +from transformers import AutoModelForCausalLM, PreTrainedModel - +import constants +import pretrain as pt from competitions.data import CompetitionId From 9d85c7e7951cf91576c92909eb9cef6aedcd6348 Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 31 Oct 2024 09:45:41 -0700 Subject: [PATCH 10/21] Update test arguments (this is not coverage). --- tests/pretrain/test_mining.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/pretrain/test_mining.py b/tests/pretrain/test_mining.py index 279aca07..4b2579c3 100644 --- a/tests/pretrain/test_mining.py +++ b/tests/pretrain/test_mining.py @@ -48,6 +48,7 @@ def _test_push(self, min_expected_block: int = 1): competition_id=CompetitionId.B14_MODEL, repo="namespace/name", retry_delay_secs=1, + update_repo_visibility=False, metadata_store=self.metadata_store, remote_model_store=self.remote_store, ) From f3d83059111185e7d10116cad002f8a834d08b4e Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 31 Oct 2024 19:19:06 -0700 Subject: [PATCH 11/21] Also update docs. --- docs/miner.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/miner.md b/docs/miner.md index fca251ab..1b918130 100644 --- a/docs/miner.md +++ b/docs/miner.md @@ -125,6 +125,8 @@ You can manually upload with the following command: python scripts/upload_model.py --load_model_dir --competition_id 0 --hf_repo_id my-username/my-project --wallet.name coldkey --wallet.hotkey hotkey ``` +Note: We recommend keeping your hugging face repo private until after you have committed your metadata to the chain. This ensures other miners are unable to upload your model as their own until a later block. Adding the `--update_repo_visibility` flag will also automatically attempt to update the hugging face repo visibility to public after committing to the chain. + Note: If you are not sure about the competition ID, you can add the `--list_competitions` flag to get a list of all competitions. You can also check out competition IDs in [competitions/data.py](https://github.com/macrocosm-os/pretraining/blob/main/competitions/data.py). ## Running a custom Miner From 4265e3b0465e4db9540d6a8d7e6f4b5dfcaa901e Mon Sep 17 00:00:00 2001 From: roel Date: Mon, 4 Nov 2024 22:10:14 +0000 Subject: [PATCH 12/21] added stack v1 dedup dataloader. Requires huggingface token --- pretrain/dataset.py | 96 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 61bbbfb7..81814af6 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -26,6 +26,10 @@ from datasets import load_dataset from pprint import pprint +import os +from dotenv import load_dotenv +load_dotenv() + class SubsetLoader(IterableDataset): """ @@ -146,6 +150,98 @@ def __next__(self): raise StopIteration +class SubsetStackV1DedupLoader(SubsetLoader): + max_pages: int = 236655813 + name: str = "bigcode/the-stack-dedup" + base_url: str = "https://datasets-server.huggingface.co/rows" + + def __init__( + self, + batch_size, + sequence_length, + num_pages=None, + tokenizer: AutoTokenizer=None, + pack_samples: bool=False, + ): + super().__init__(batch_size, + sequence_length, + num_pages, + tokenizer, + pack_samples) + + self.params = { + "dataset": self.name, + "config": "default", + "split": "train", + } + + self.retry_limit = 10 # Number of retries + self.retry_delay = 5 # Seconds to wait between retries + + # Get HF token from environment + self.hf_token = os.getenv("HF_TOKEN") + if not self.hf_token: + raise ValueError("HF_TOKEN environment variable not found") + + # Fetch pages only if the number of pages is specified + if self.num_pages: + pages = self._sample_pages() + self.fetch_data_for_pages(pages) + + def _fetch_data_for_page(self, page): + self.params["offset"] = page + self.params["limit"] = self.num_rows_per_page + attempt = 0 + while attempt < self.retry_limit: + try: + # Add authorization header + headers = {"Authorization": f"Bearer {self.hf_token}"} + response = requests.get(self.base_url, params=self.params, headers=headers) + response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code + for row in response.json()["rows"]: + content = row["row"]["content"] + input_ids = self.tokenizer(content, truncation=True)["input_ids"] + self.buffer += input_ids + self.buffer += [self.tokenizer.eos_token_id] + + break # If the request was successful, break out of the retry loop + except requests.exceptions.RequestException as e: + attempt += 1 + bt.logging.warning( + f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + ) + if attempt < self.retry_limit: + time.sleep(self.retry_delay) # Wait before the next retry + else: + bt.logging.error( + "Maximum retry limit reached. Unable to fetch data." + ) + raise + + def _sample_pages(self): + """ + Randomly sample pages to be used in validation + """ + pages = [ + random.randint(1, self.max_pages) + for _ in range(self.num_pages) + ] + + return pages + + + def get_page_names(self): + """ + This is a utility function that returns the page names that were used. + Each page as a single string instead of a tuple + """ + page_names = [] + + if hasattr(self, 'pages'): + page_names = self.pages + + return page_names + class SubsetFineWebEdu2Loader(SubsetLoader): From a9a7036d0340396ca9e720bc4ac7396f0da8ddad Mon Sep 17 00:00:00 2001 From: roel Date: Wed, 6 Nov 2024 17:28:27 +0000 Subject: [PATCH 13/21] added basic laion/Pes2oX-fulltext dataloader --- pretrain/dataset.py | 87 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 81814af6..8d76e827 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -150,6 +150,93 @@ def __next__(self): raise StopIteration +class SubsetPes2oXLoader(SubsetLoader): + max_pages: int = 8242000 + name: str = "laion/Pes2oX-fulltext" + base_url: str = "https://datasets-server.huggingface.co/rows" + + def __init__( + self, + batch_size, + sequence_length, + num_pages=None, + tokenizer: AutoTokenizer=None, + pack_samples: bool=True, + ): + super().__init__(batch_size, + sequence_length, + num_pages, + tokenizer, + pack_samples) + + self.params = { + "dataset": self.name, + "config": "pes2ov2", + "split": "train", + } + + self.retry_limit = 10 # Number of retries + self.retry_delay = 5 # Seconds to wait between retries + + # Fetch pages only if the number of pages is specified + if self.num_pages: + pages = self._sample_pages() + self.fetch_data_for_pages(pages) + + def _fetch_data_for_page(self, page): + self.params["offset"] = page + self.params["limit"] = self.num_rows_per_page + attempt = 0 + while attempt < self.retry_limit: + try: + # Add authorization header + response = requests.get(self.base_url, params=self.params) + response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code + for row in response.json()["rows"]: + content = row["row"]["text"] + input_ids = self.tokenizer(content, truncation=True)["input_ids"] + self.buffer += input_ids + self.buffer += [self.tokenizer.eos_token_id] + + break # If the request was successful, break out of the retry loop + except requests.exceptions.RequestException as e: + attempt += 1 + bt.logging.warning( + f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + ) + if attempt < self.retry_limit: + time.sleep(self.retry_delay) # Wait before the next retry + else: + bt.logging.error( + "Maximum retry limit reached. Unable to fetch data." + ) + raise + + def _sample_pages(self): + """ + Randomly sample pages to be used in validation + """ + pages = [ + random.randint(1, self.max_pages) + for _ in range(self.num_pages) + ] + + return pages + + + def get_page_names(self): + """ + This is a utility function that returns the page names that were used. + Each page as a single string instead of a tuple + """ + page_names = [] + + if hasattr(self, 'pages'): + page_names = self.pages + + return page_names + + class SubsetStackV1DedupLoader(SubsetLoader): max_pages: int = 236655813 name: str = "bigcode/the-stack-dedup" From 9ebce5945758496d43c252eded4e02302a5d6c71 Mon Sep 17 00:00:00 2001 From: roel Date: Wed, 6 Nov 2024 17:44:18 +0000 Subject: [PATCH 14/21] new SubsetLoader class to handle different configs, auth requirements, etc --- pretrain/dataset.py | 736 +++++++++++--------------------------------- 1 file changed, 175 insertions(+), 561 deletions(-) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 8d76e827..d78288eb 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -1,20 +1,3 @@ -# The MIT License (MIT) -# Copyright © 2023 Yuma Rao -# Copyright © 2023 const - -# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated -# documentation files (the “Software”), to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, -# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in all copies or substantial portions of -# the Software. - -# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO -# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -# DEALINGS IN THE SOFTWARE. import typing import random import time @@ -30,14 +13,14 @@ from dotenv import load_dotenv load_dotenv() - class SubsetLoader(IterableDataset): - """ - Base class for data-specific subset loader classes. - - # TODO: Make this class abstract - """ - + """Base class for data-specific subset loader classes.""" + + name: str = None # Dataset name + base_url: str = "https://datasets-server.huggingface.co/rows" + size_base_url: str = "https://datasets-server.huggingface.co/size" + max_pages: int = None + def __init__( self, batch_size=None, @@ -46,84 +29,162 @@ def __init__( tokenizer: AutoTokenizer = None, pack_samples: bool = False, random_seed: typing.Optional[int] = None, + config: str = "default", + split: str = "train", + requires_auth: bool = False, ): self.batch_size = batch_size self.sequence_length = sequence_length self.num_pages = num_pages self.tokenizer = tokenizer self.pack_samples = pack_samples + self.config = config + self.split = split + self.requires_auth = requires_auth - # Initialize with seed if provided. + # Initialize with seed if provided if random_seed is not None: random.seed(random_seed) self.num_rows_per_page = 100 self.duplicate_page_threshold = 100 + self.retry_limit = 10 + self.retry_delay = 5 - # Buffer to hold pages loaded from the api + # Buffers self.buffer = [] - - # Buffer to hold pages already loaded into a batch self.used_buffer = [] - - # Buffer to hold padded pages self.padded_buffer = [] - def fetch_data_for_pages(self, pages): - """ - Set the pages to be used to fill the buffer. Then fetch the page data - to the buffer. - """ + # Get HF token if needed + self.hf_token = None + if self.requires_auth: + self.hf_token = os.getenv("HF_TOKEN") + if not self.hf_token: + raise ValueError("HF_TOKEN environment variable not found") - self.pages = pages + # Initialize request params + self.params = self._get_default_params() - # Empty the buffer if it is not. - self.buffer = [] + # Fetch pages if specified + if self.num_pages: + self._initialize_pages() + + def _get_default_params(self): + """Get default request parameters. Override if needed.""" + return { + "dataset": self.name, + "config": self.config, + "split": self.split, + } + def _get_request_headers(self): + """Get request headers. Override if needed.""" + headers = {} + if self.requires_auth: + headers["Authorization"] = f"Bearer {self.hf_token}" + return headers + + def _initialize_pages(self): + """Initialize pages based on loader type""" + if hasattr(self, 'fetch_dataset_configs'): + # For FineWebEdu2 style loaders + self.configs_data = self.fetch_dataset_configs() + self._fetch_data_to_buffer(self.num_pages) + else: + # For simple page-based loaders + pages = self._sample_pages() + self.fetch_data_for_pages(pages) + + def fetch_data_for_pages(self, pages): + """Set the pages and fetch their data to the buffer.""" + self.pages = pages + self.buffer = [] for page in self.pages: self._fetch_data_for_page(page) - def _get_pad_size(self, input_ids): - """ - Get the number of tokens to be padded to the sample to match - the max allowed sequence length. - If sample packing is activated, then return 1 - """ + def _fetch_data_for_page(self, page): + """Fetch data for a single page""" + # Handle different page types (tuple vs int) + if isinstance(page, tuple): + config_name, page_num, split = page + self.params.update({ + "config": config_name, + "split": split, + "offset": page_num, + }) + else: + self.params["offset"] = page + + self.params["limit"] = self.num_rows_per_page + + attempt = 0 + while attempt < self.retry_limit: + try: + response = requests.get( + self.base_url, + params=self.params, + headers=self._get_request_headers() + ) + response.raise_for_status() + + for row in response.json()["rows"]: + content = self._get_content_from_row(row) + input_ids = self.tokenizer(content, truncation=True)["input_ids"] + self.buffer += input_ids + self.buffer += [self.tokenizer.eos_token_id] + + break + + except requests.exceptions.RequestException as e: + attempt += 1 + bt.logging.warning( + f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + ) + if attempt < self.retry_limit: + time.sleep(self.retry_delay) + else: + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") + raise + + def _get_content_from_row(self, row): + """Extract content from row based on dataset format. Override if needed.""" + return row["row"].get("text", row["row"].get("content")) + + def _sample_pages(self): + """Sample random pages. Override for custom sampling logic.""" + return [random.randint(1, self.max_pages) for _ in range(self.num_pages)] + + def get_page_names(self): + """Get page names in consistent format""" + if not hasattr(self, 'pages'): + return [] + + if isinstance(self.pages[0], tuple): + return [f"{cfg_name}_{num_rows}_{split}" + for cfg_name, num_rows, split in self.pages] + return self.pages + def _get_pad_size(self, input_ids): + """Get padding size for input tokens.""" if self.pack_samples: return 1 sample_size = len(input_ids) - remainder = sample_size % self.sequence_length pad_size = self.sequence_length - remainder - - # Apply modulo again to guarantee a pad size of 0 if remainder is 0 pad_size = pad_size % self.sequence_length - return pad_size def _refill_padded_buffer(self): - """ - This methods pulls one page from `self.buffer`, pads it and pushs - it to the `self.padded_buffer`. - """ - + """Refill the padded buffer from the main buffer.""" while self.buffer and len(self.padded_buffer) < self.sequence_length: - input_ids = [] - - # search for EOS token index and cut the buffer at it. EOS_index = self.buffer.index(self.tokenizer.eos_token_id) input_ids = self.buffer[: EOS_index + 1] self.buffer = self.buffer[EOS_index + 1 :] - self.used_buffer += input_ids - - # Add to padded buffer without the EOS token. self.padded_buffer += input_ids[:-1] - - # Pad self.padded_buffer += [self.tokenizer.eos_token_id] * self._get_pad_size( input_ids=input_ids[:-1] ) @@ -131,586 +192,139 @@ def _refill_padded_buffer(self): def __iter__(self): self.buffer = self.used_buffer + self.buffer self.padded_buffer = [] - - # Pad and prepare one page for batching self._refill_padded_buffer() - return self def __next__(self): batch = [] - while len(self.padded_buffer) >= self.sequence_length: batch.append(self.padded_buffer[: self.sequence_length]) self.padded_buffer = self.padded_buffer[self.sequence_length :] self._refill_padded_buffer() - if len(batch) == self.batch_size: return np.stack(batch) - raise StopIteration + class SubsetPes2oXLoader(SubsetLoader): max_pages: int = 8242000 name: str = "laion/Pes2oX-fulltext" - base_url: str = "https://datasets-server.huggingface.co/rows" - - def __init__( - self, - batch_size, - sequence_length, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=True, - ): - super().__init__(batch_size, - sequence_length, - num_pages, - tokenizer, - pack_samples) - - self.params = { - "dataset": self.name, - "config": "pes2ov2", - "split": "train", - } - self.retry_limit = 10 # Number of retries - self.retry_delay = 5 # Seconds to wait between retries - - # Fetch pages only if the number of pages is specified - if self.num_pages: - pages = self._sample_pages() - self.fetch_data_for_pages(pages) - - def _fetch_data_for_page(self, page): - self.params["offset"] = page - self.params["limit"] = self.num_rows_per_page - attempt = 0 - while attempt < self.retry_limit: - try: - # Add authorization header - response = requests.get(self.base_url, params=self.params) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - for row in response.json()["rows"]: - content = row["row"]["text"] - input_ids = self.tokenizer(content, truncation=True)["input_ids"] - self.buffer += input_ids - self.buffer += [self.tokenizer.eos_token_id] - - break # If the request was successful, break out of the retry loop - except requests.exceptions.RequestException as e: - attempt += 1 - bt.logging.warning( - f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" - ) - if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - def _sample_pages(self): - """ - Randomly sample pages to be used in validation - """ - pages = [ - random.randint(1, self.max_pages) - for _ in range(self.num_pages) - ] - - return pages - - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple - """ - page_names = [] - - if hasattr(self, 'pages'): - page_names = self.pages - - return page_names + def __init__(self, **kwargs): + super().__init__(config="pes2ov2", **kwargs) class SubsetStackV1DedupLoader(SubsetLoader): max_pages: int = 236655813 name: str = "bigcode/the-stack-dedup" - base_url: str = "https://datasets-server.huggingface.co/rows" - def __init__( - self, - batch_size, - sequence_length, - num_pages=None, - tokenizer: AutoTokenizer=None, - pack_samples: bool=False, - ): - super().__init__(batch_size, - sequence_length, - num_pages, - tokenizer, - pack_samples) - - self.params = { - "dataset": self.name, - "config": "default", - "split": "train", - } + def __init__(self, **kwargs): + super().__init__(requires_auth=True, **kwargs) - self.retry_limit = 10 # Number of retries - self.retry_delay = 5 # Seconds to wait between retries - # Get HF token from environment - self.hf_token = os.getenv("HF_TOKEN") - if not self.hf_token: - raise ValueError("HF_TOKEN environment variable not found") +class SubsetFalconLoader(SubsetLoader): + max_pages: int = 968000015 + name: str = "tiiuae/falcon-refinedweb" - # Fetch pages only if the number of pages is specified - if self.num_pages: - pages = self._sample_pages() - self.fetch_data_for_pages(pages) - def _fetch_data_for_page(self, page): - self.params["offset"] = page - self.params["limit"] = self.num_rows_per_page +class SubsetFineWebEdu2Loader(SubsetLoader): + name: str = "HuggingFaceFW/fineweb-edu-score-2" + + def fetch_dataset_configs(self) -> typing.Dict[str, typing.Dict]: + """ + Fetch dataset configs and their metadata. + Returns a dictionary with config names as keys and metadata as values. + """ + params = dict(dataset=self.name) + attempt = 0 while attempt < self.retry_limit: try: - # Add authorization header - headers = {"Authorization": f"Bearer {self.hf_token}"} - response = requests.get(self.base_url, params=self.params, headers=headers) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - for row in response.json()["rows"]: - content = row["row"]["content"] - input_ids = self.tokenizer(content, truncation=True)["input_ids"] - self.buffer += input_ids - self.buffer += [self.tokenizer.eos_token_id] + response = requests.get(self.size_base_url, params=params) + response.raise_for_status() + + configs_dict = response.json()["size"]["splits"] + configs_data = { + entry["config"]: { + "num_rows": entry["num_rows"], + "split": entry["split"], + } + for entry in configs_dict + if entry["config"] != "default" + } + + return configs_data - break # If the request was successful, break out of the retry loop except requests.exceptions.RequestException as e: attempt += 1 bt.logging.warning( - f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" + f"Failed to fetch dataset configs, retrying. Attempt {attempt}/{self.retry_limit}" ) if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry + time.sleep(self.retry_delay) else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") raise - def _sample_pages(self): - """ - Randomly sample pages to be used in validation - """ - pages = [ - random.randint(1, self.max_pages) - for _ in range(self.num_pages) - ] - - return pages - - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple - """ - page_names = [] - - if hasattr(self, 'pages'): - page_names = self.pages - - return page_names - - -class SubsetFineWebEdu2Loader(SubsetLoader): - - name: str = "HuggingFaceFW/fineweb-edu-score-2" - rows_base_url: str = "https://datasets-server.huggingface.co/rows" - size_base_url: str = "https://datasets-server.huggingface.co/size" - - retry_limit: int = 10 # Number of retries - retry_delay: int = 5 # Seconds to wait between retries - - def __init__( - self, - batch_size=None, - sequence_length=None, - num_pages=None, - tokenizer: AutoTokenizer = None, - pack_samples: bool = False, - random_seed: typing.Optional[int] = None, - ): - super().__init__( - batch_size, sequence_length, num_pages, tokenizer, pack_samples, random_seed - ) - - # Get the dataset configs and their row sizes - self.configs_data = self.fetch_dataset_configs() - - # We first need to fetch the data and fill the loader buffer. - # Since some sample files are broken, we first try to find `num_pages` - # responsive samples, then we add them to the found pages `self.pages` - if self.num_pages: - self._fetch_data_to_buffer(self.num_pages) - def _fetch_data_to_buffer(self, num_pages): - """ - Randomly sample unique pages and add their data to the buffer. - If a page is inaccessible, another one is sampled. - this method sets the `pages` property - """ - + """Fetch data to buffer with support for multiple configs.""" self.pages = [] attempts = 0 duplicates = 0 - - # Choose a consistent initial offset for the random pages so we do not overlap on each page get. initial_offset = random.randint(0, self.num_rows_per_page - 1) while len(self.pages) < num_pages: - - # randomly sample one page page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] - # skip the page if we already have it if page in self.pages: duplicates += 1 if duplicates >= self.duplicate_page_threshold: bt.logging.debug( - f"Hit duplicate page threshold of {self.duplicate_page_threshold}. Stopping early at: {len(self.pages)} pages." + f"Hit duplicate page threshold of {self.duplicate_page_threshold}. " + f"Stopping early at: {len(self.pages)} pages." ) break - else: - continue + continue config_name, page_row_start, split = page - - # Create the request parameters - params = dict( - dataset=self.name, - config=config_name, - split=split, - offset=page_row_start, - limit=self.num_rows_per_page, - ) + params = { + "dataset": self.name, + "config": config_name, + "split": split, + "offset": page_row_start, + "limit": self.num_rows_per_page, + } try: - response = requests.get(self.rows_base_url, params=params) - - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - - # Add the page since the request was successful + response = requests.get(self.base_url, params=params) + response.raise_for_status() self.pages.append(page) for row in response.json()["rows"]: content = row["row"]["text"] - - # get the tokenized and encoded sample input_ids = self.tokenizer(content, truncation=True)["input_ids"] self.buffer += input_ids self.buffer += [self.tokenizer.eos_token_id] - response.close() - - except requests.exceptions.RequestException as e: - - response.close() - attempts += 1 - bt.logging.warning( - f"Failed to fetch data, retrying with a newly sampled page. Attempt {attempts}/{self.retry_limit * num_pages}" - ) - if attempts < num_pages * self.retry_limit: - pass - - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - def fetch_data_to_rows(self, num_pages): - - # This explicitly does not set the pages property, simply keeping track for duplicates. - # We also use a set here as the order does not matter. - downloaded_pages = set() - rows = [] - attempts = 0 - duplicates = 0 - # Choose a consistent initial offset for the random pages so we do not overlap on each page get. - initial_offset = random.randint(0, self.num_rows_per_page - 1) - - while len(downloaded_pages) < num_pages: - - # randomly sample one page - page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] - - # skip the page if we already have it - if page in downloaded_pages: - duplicates += 1 - if duplicates >= self.duplicate_page_threshold: - bt.logging.debug( - f"Hit duplicate page threshold of {self.duplicate_page_threshold}. Stopping early at: {len(downloaded_pages)} pages." - ) - break - else: - continue - - config_name, page_row_start, split = page - - # Create the request parameters - params = dict( - dataset=self.name, - config=config_name, - split=split, - offset=page_row_start, - limit=self.num_rows_per_page, - ) - - try: - response = requests.get(self.rows_base_url, params=params) - - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - - downloaded_pages.add(page) - - for row in response.json()["rows"]: - rows.append(row["row"]["text"]) - except requests.exceptions.RequestException as e: attempts += 1 bt.logging.warning( - f"Failed to fetch data, retrying with a newly sampled page. Attempt {attempts}/{self.retry_limit * num_pages}" + f"Failed to fetch data, retrying. Attempt {attempts}/{self.retry_limit * num_pages}" ) - if attempts < num_pages * self.retry_limit: - pass - - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) + if attempts >= num_pages * self.retry_limit: + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") raise - return rows - def get_random_pages(self, num_pages, initial_offset): - """ - Randomly sample num_pages with an intiial offset. - A page is a row number of a given split of a given dataset dump offset by num_rows_per_page. - """ + """Get random pages across different configs.""" pages = [] - for _ in range(num_pages): - - # Choose a random config. config_name = random.choice(list(self.configs_data.keys())) - - # Choose a random page start. - # We do so by chunking the rows of the config data into N pages of length num_rows_per_page. - # We remove the initial offset from the total rows in doing this calculation to ensure we don't go over. data_row_count = self.configs_data[config_name]["num_rows"] - initial_offset - # Add 1 to the row count as we are 0 indexed. data_page_count = (data_row_count + 1) // self.num_rows_per_page - # Select a random page start by taking the randomly selected page and multiplying by num_rows_per_page. selected_page_start = initial_offset + ( random.randint(0, data_page_count - 1) * self.num_rows_per_page ) - split = self.configs_data[config_name]["split"] - pages.append((config_name, selected_page_start, split)) - - return pages - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple - """ - - page_names = [] - - if hasattr(self, "pages"): - page_names = [ - f"{cfg_name}_{num_rows}_{split}" - for cfg_name, num_rows, split in self.pages - ] - - return page_names - - def fetch_dataset_configs(self) -> typing.Dict[str, typing.Dict]: - """ - Fetch the different dump names, aka configs, aka samples, of the - dataset. - The returned value is a dictionary with dump names as keys and - a dict of the number of rows and the split as values. - """ - # Request parameters - params = dict(dataset=self.name) - - attempt = 0 - while attempt < self.retry_limit: - try: - response = requests.get(self.size_base_url, params=params) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - - # Extract the configs dict - configs_dict = response.json()["size"]["splits"] - - # Now create a dict with config names (except 'default') as - # keys, and the number of rows as values - configs_data = { - entry["config"]: { - "num_rows": entry["num_rows"], - "split": entry["split"], - } - for entry in configs_dict - if entry["config"] != "default" - } - - return configs_data - - except requests.exceptions.RequestException as e: - attempt += 1 - bt.logging.warning( - f"Failed to fetch dataset configs, retrying. Attempt {attempt}/{self.retry_limit}" - ) - if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - def _fetch_data_for_page(self, page): - - retry_limit = 10 - - attempt = 0 - while attempt < retry_limit: - config_name, page, split = page - - # Create the request parameters - params = dict( - dataset=self.name, - config=config_name, - split=split, - offset=page, - limit=self.num_rows_per_page, - ) - - try: - - response = requests.get(self.rows_base_url, params=params) - - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - - for row in response.json()["rows"]: - content = row["row"]["text"] - input_ids = self.tokenizer(content, truncation=True)["input_ids"] - self.buffer += input_ids - self.buffer += [self.tokenizer.eos_token_id] - - break # If the request was successful, break out of the retry loop - - except requests.exceptions.RequestException as e: - attempt += 1 - bt.logging.warning( - f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" - ) - if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - -class SubsetFalconLoader(SubsetLoader): - max_pages: int = 968000015 - name: str = "tiiuae/falcon-refinedweb" - base_url: str = "https://datasets-server.huggingface.co/rows" - - def __init__( - self, - batch_size, - sequence_length, - num_pages=None, - tokenizer: AutoTokenizer = None, - pack_samples: bool = False, - random_seed: typing.Optional[int] = None, - ): - super().__init__( - batch_size, sequence_length, num_pages, tokenizer, pack_samples, random_seed - ) - - self.params = { - "dataset": self.name, - "config": "default", - "split": "train", - } - - self.retry_limit = 10 # Number of retries - self.retry_delay = 5 # Seconds to wait between retries - - # Fetch pages only if the number of pages is specified - if self.num_pages: - pages = self._sample_pages() - self.fetch_data_for_pages(pages) - - def _fetch_data_for_page(self, page): - self.params["offset"] = page - self.params["limit"] = self.num_rows_per_page - attempt = 0 - while attempt < self.retry_limit: - try: - response = requests.get(self.base_url, params=self.params) - response.raise_for_status() # This will raise an HTTPError if the HTTP request returned an unsuccessful status code - for row in response.json()["rows"]: - content = row["row"]["content"] - input_ids = self.tokenizer(content, truncation=True)["input_ids"] - self.buffer += input_ids - self.buffer += [self.tokenizer.eos_token_id] - - break # If the request was successful, break out of the retry loop - except requests.exceptions.RequestException as e: - attempt += 1 - bt.logging.warning( - f"Failed to fetch data for page {page}, retrying. Attempt {attempt}/{self.retry_limit}" - ) - if attempt < self.retry_limit: - time.sleep(self.retry_delay) # Wait before the next retry - else: - bt.logging.error( - "Maximum retry limit reached. Unable to fetch data." - ) - raise - - def _sample_pages(self): - """ - Randomly sample pages to be used in validation - """ - pages = [random.randint(1, self.max_pages) for _ in range(self.num_pages)] - - return pages - - def get_page_names(self): - """ - This is a utility function that returns the page names that were used. - Each page as a single string instead of a tuple - """ - page_names = [] - - if hasattr(self, "pages"): - page_names = self.pages - - return page_names + return pages \ No newline at end of file From e693661874570000dd1ee912054112b0cfdf3fc3 Mon Sep 17 00:00:00 2001 From: roel Date: Fri, 8 Nov 2024 10:32:53 +0000 Subject: [PATCH 15/21] added fetch data to rows and renames base_url to rows_base_url --- pretrain/dataset.py | 58 +++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/pretrain/dataset.py b/pretrain/dataset.py index d78288eb..4c456195 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -17,7 +17,7 @@ class SubsetLoader(IterableDataset): """Base class for data-specific subset loader classes.""" name: str = None # Dataset name - base_url: str = "https://datasets-server.huggingface.co/rows" + rows_base_url: str = "https://datasets-server.huggingface.co/rows" size_base_url: str = "https://datasets-server.huggingface.co/size" max_pages: int = None @@ -122,7 +122,7 @@ def _fetch_data_for_page(self, page): while attempt < self.retry_limit: try: response = requests.get( - self.base_url, + self.rows_base_url, params=self.params, headers=self._get_request_headers() ) @@ -296,7 +296,7 @@ def _fetch_data_to_buffer(self, num_pages): } try: - response = requests.get(self.base_url, params=params) + response = requests.get(self.rows_base_url, params=params) response.raise_for_status() self.pages.append(page) @@ -327,4 +327,54 @@ def get_random_pages(self, num_pages, initial_offset): ) split = self.configs_data[config_name]["split"] pages.append((config_name, selected_page_start, split)) - return pages \ No newline at end of file + return pages + + def fetch_data_to_rows(self, num_pages): + """Fetch data and return raw text rows instead of adding to buffer.""" + downloaded_pages = set() + rows = [] + attempts = 0 + duplicates = 0 + initial_offset = random.randint(0, self.num_rows_per_page - 1) + + while len(downloaded_pages) < num_pages: + page = self.get_random_pages(num_pages=1, initial_offset=initial_offset)[0] + + if page in downloaded_pages: + duplicates += 1 + if duplicates >= self.duplicate_page_threshold: + bt.logging.debug( + f"Hit duplicate page threshold of {self.duplicate_page_threshold}. " + f"Stopping early at: {len(downloaded_pages)} pages." + ) + break + continue + + config_name, page_row_start, split = page + params = { + "dataset": self.name, + "config": config_name, + "split": split, + "offset": page_row_start, + "limit": self.num_rows_per_page, + } + + try: + response = requests.get(self.rows_base_url, params=params) + response.raise_for_status() + downloaded_pages.add(page) + + for row in response.json()["rows"]: + rows.append(row["row"]["text"]) + + except requests.exceptions.RequestException as e: + attempts += 1 + bt.logging.warning( + f"Failed to fetch data, retrying with a newly sampled page. " + f"Attempt {attempts}/{self.retry_limit * num_pages}" + ) + if attempts >= num_pages * self.retry_limit: + bt.logging.error("Maximum retry limit reached. Unable to fetch data.") + raise + + return rows \ No newline at end of file From 1908899cc50dc4c22f128a260bca63dadb3a1679 Mon Sep 17 00:00:00 2001 From: rlpa Date: Fri, 8 Nov 2024 14:25:19 +0100 Subject: [PATCH 16/21] updated validator docs --- docs/validator.md | 60 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/docs/validator.md b/docs/validator.md index 755569a5..598925d0 100644 --- a/docs/validator.md +++ b/docs/validator.md @@ -1,8 +1,9 @@ -# Validator +# Validator Validators download the models from hugging face for each miner based on the Bittensor chain metadata and continuously evaluate them, setting weights based on the performance of each model against a dataset for each competition. They also log results to [wandb](https://wandb.ai/macrocosmos/pretraining-validators). You can view the entire validation system by reading the code in `neurons/validator.py`. Pseudocode for the validation system is as follows: + ```python weights = zeros(256) while True: @@ -51,7 +52,7 @@ It is important to note that this affects the game theoretics of the incentive l # System Requirements -Validators will need enough disk space to store the models of miners being evaluated. Each model has a max size by block defined in [constants/__init__.py](https://github.com/macrocosm-os/pretraining/blob/main/constants/__init__.py#L57) and the validator has cleanup logic to remove old models. It is recommended to have at least 2 TB of disk space and 80GB of system memory. +Validators will need enough disk space to store the models of miners being evaluated. Each model has a max size by block defined in [constants/**init**.py](https://github.com/macrocosm-os/pretraining/blob/main/constants/__init__.py#L57) and the validator has cleanup logic to remove old models. It is recommended to have at least 2 TB of disk space and 80GB of system memory. Validators will need enough processing power to evaluate their model. As of Sept 2nd, 2024, an upgrade to the Nvidia A100 GPU with 80GB of VRAM is required. This GPU's high throughput and FLOPs enable the running of 14B models without impacting the speed of the validation cycle. Although only 40GB of VRAM is necessary, we have observed that A100 GPUs with 80GB are more readily available and are offered at a comparable price to the 40GB variants. The additional VRAM provided by this GPU will allows more flexibility for optimization in future releases, enabling larger validation batch sizes to enhance the stability of the validation process by reducing scoring variance. @@ -68,12 +69,14 @@ git clone https://github.com/macrocosm-os/pretraining.git 2. Setup your python [virtual environment](https://docs.python.org/3/library/venv.html) or [Conda environment](https://conda.io/projects/conda/en/latest/user-guide/tasks/manage-environments.html#creating-an-environment-with-commands). 3. Install the requirements. From your virtual environment, run + ```shell cd pretraining python -m pip install -e . ``` Note: flash-attn may not have their dependencies set up correctly. If you run into issues try installing those requirements separately first: + ```shell pip install packaging pip install wheel @@ -84,39 +87,75 @@ pip install torch 5. (Optional) Run a Subtensor instance: -Your node will run better if you are connecting to a local Bittensor chain entrypoint node rather than using Opentensor's. -We recommend running a local node as follows and passing the ```--subtensor.network local``` flag to your running miners/validators. +Your node will run better if you are connecting to a local Bittensor chain entrypoint node rather than using Opentensor's. +We recommend running a local node as follows and passing the ```--subtensor.network local``` flag to your running miners/validators. To install and run a local subtensor node follow the commands below with Docker and Docker-Compose previously installed. + ```bash git clone https://github.com/opentensor/subtensor.git cd subtensor docker compose up --detach ``` ---- -# Running the Validator +## Obtaining your Hugging Face token + +The dataset for code, `The Stack V1-dedup`, requires a **Hugging Face access token**. Follow these steps to obtain and configure one: + +### Step 1: Get Your Hugging Face Access Token + +1. Go to the [Hugging Face website](https://huggingface.co/). +2. If you don’t already have an account, create one. Otherwise, log in. +3. Go to the [dataset's website](https://huggingface.co/datasets/bigcode/the-stack-dedup) and agree to their terms of use. You should immediately gain access to their dataset. +4. Click on your profile icon in the top-right corner, and select **Settings**. +5. In the settings menu, locate and click on **Access Tokens**. +6. Under the Access Tokens section, click **New token** and generate a token with write permissions. +7. Copy the generated token. + +### Step 2: Create a `.env` File in the `pretraining` Directory + +1. Navigate to your `pretraining` directory where you want to save the environment file. +2. Create a new file named `.env` in this directory (if it doesn’t already exist). You can do this from the command line using: + + ```bash + touch .env + ``` + +3. Open the `.env` file with your preferred text editor and add the following line, replacing `YOUR_HF_TOKEN_HERE` with your actual Hugging Face token: -## With auto-updates + ```bash + HF_TOKEN=YOUR_HF_TOKEN_HERE + ``` + +4. Save and close the file. + +This `.env` file now securely holds your Hugging Face token, allowing scripts in the `pretraining` directory to load it automatically if they’re set up to read environment variables. + +## Running the Validator + +### With auto-updates We highly recommend running the validator with auto-updates. This will help ensure your validator is always running the latest release, helping to maintain a high vtrust. Prerequisites: + 1. To run with auto-update, you will need to have [pm2](https://pm2.keymetrics.io/) installed. 2. Make sure your virtual environment is activated. This is important because the auto-updater will automatically update the package dependencies with pip. 3. Make sure you're using the main branch: `git checkout main`. From the pretraining folder: + ```shell pm2 start --name net9-vali-updater --interpreter python scripts/start_validator.py -- --pm2_name net9-vali --wallet.name coldkey --wallet.hotkey hotkey [other vali flags] ``` This will start a process called `net9-vali-updater`. This process periodically checks for a new git commit on the current branch. When one is found, it performs a `pip install` for the latest packages, and restarts the validator process (who's name is given by the `--pm2_name` flag) -## Without auto-updates +### Without auto-updates If you'd prefer to manage your own validator updates... From the pretraining folder: + ```shell pm2 start python -- ./neurons/validator.py --wallet.name coldkey --wallet.hotkey hotkey ``` @@ -128,6 +167,7 @@ pm2 start python -- ./neurons/validator.py --wallet.name coldkey --wallet.hotkey The Validator offers some flags to customize properties, such as the device to evaluate on and the number of models to evaluate each step. You can view the full set of flags by running + ```shell python ./neurons/validator.py -h ``` @@ -135,6 +175,7 @@ python ./neurons/validator.py -h ## Test Running Validation Test running validation: + ```shell python neurons/validator.py --wallet.name YOUR_WALLET_NAME @@ -143,4 +184,5 @@ python neurons/validator.py --wandb.off --offline ``` ---- \ No newline at end of file + +--- From 23bf67875b8faab35f96238608eef91d0ab361db Mon Sep 17 00:00:00 2001 From: Sid Date: Mon, 28 Oct 2024 16:35:31 -0700 Subject: [PATCH 17/21] Add new 14B* competition that also uses the falcon dataset. --- competitions/data.py | 2 + constants/__init__.py | 33 ++++ neurons/validator.py | 355 +++++++++++++++++++++++++++++++++--------- requirements.txt | 2 +- 4 files changed, 316 insertions(+), 76 deletions(-) diff --git a/competitions/data.py b/competitions/data.py index 10a9e433..f2ac7852 100644 --- a/competitions/data.py +++ b/competitions/data.py @@ -14,6 +14,8 @@ class CompetitionId(IntEnum): B14_MODEL = 4 + B14_MODEL_MULTI_DATASET = 5 + # Overwrite the default __repr__, which doesn't work with # bt.logging for some unknown reason. def __repr__(self) -> str: diff --git a/constants/__init__.py b/constants/__init__.py index c7cdcea3..d37d079a 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -64,6 +64,10 @@ # Starting block for activating sample unpacking BLOCK_SAMPLE_PACK = 4_001_017 +# Starting block for 14B* (multi dataset experiment). +# TODO: Update starting block as needed. +BLOCK_14B_STAR = 4_210_361 + # Minimum percent of weight on a vali for a miner to be considered a top miner. # Since there can be multiple competitions at different reward percentages we can't just check biggest. WEIGHT_SYNC_MINER_MIN_PERCENT = 0.05 @@ -97,6 +101,8 @@ CompetitionId.B3_MODEL: pt.dataset.SubsetFalconLoader, CompetitionId.B7_MODEL: pt.dataset.SubsetFineWebEdu2Loader, CompetitionId.B14_MODEL: pt.dataset.SubsetFineWebEdu2Loader, + # B14 model multi dataset adds the following dataset to the baseline b14 competition. + CompetitionId.B14_MODEL_MULTI_DATASET: pt.dataset.SubsetFalconLoader, } # Synchronize on blocks roughly every 30 minutes. @@ -168,6 +174,33 @@ ), ], ), + ( + BLOCK_14B_STAR, + [ + # TODO confirm if we are removing the other 2 competitions. + Competition( + CompetitionId.M772_MODEL, + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.M772_MODEL], + 0.14, + ), + Competition( + CompetitionId.B3_MODEL, + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B3_MODEL], + 0.29, + ), + Competition( + CompetitionId.B14_MODEL, + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + 0.52, + ), + # TODO decide specific weight. + Competition( + CompetitionId.B14_MODEL_MULTI_DATASET, + MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], + 0.05, + ), + ], + ), ] for block_and_competitions in COMPETITION_SCHEDULE_BY_BLOCK: diff --git a/neurons/validator.py b/neurons/validator.py index 68ff8ce9..e7c23b96 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -86,12 +86,27 @@ class PerUIDEvalState: # The hugging face repo name. repo_name: str = "Unknown" - # The losses per batch. - losses: typing.List[float] = dataclasses.field(default=None) + # The losses per batch per dataset. + losses: typing.Dict[str, typing.List[float]] = dataclasses.field( + default_factory=lambda: defaultdict(list) + ) def avg_loss(self) -> float: - """Safely computes the average loss from a list of losses.""" - return sum(self.losses) / len(self.losses) if self.losses else math.inf + """Safely computes the average loss across all dataset loss lists.""" + all_losses = [loss for loss_list in self.losses.values() for loss in loss_list] + return sum(all_losses) / len(all_losses) if all_losses else math.inf + + def avg_dataset_loss(self, dataset_name: str) -> float: + """Safely computes the average loss from a list of losses for a specific dataset.""" + return ( + sum(self.losses[dataset_name]) / len(self.losses[dataset_name]) + if self.losses[dataset_name] + else math.inf + ) + + def avg_loss_per_dataset(self) -> typing.Dict[str, float]: + """Safely computes the average loss per dataset.""" + return {k: (sum(v) / len(v) if v else math.inf) for k, v in self.losses.items()} class Validator: @@ -160,7 +175,6 @@ def __init__(self): self.weights = torch.zeros_like(torch.tensor(self.metagraph.S)) self.global_step = 0 self.last_epoch = self.metagraph.block.item() - self.last_wandb_step = 0 self.uids_to_eval: typing.Dict[CompetitionId, typing.Set] = defaultdict(set) @@ -437,7 +451,9 @@ def update_models(self): ) if competition is not None and not is_queued_for_eval: eval_history = ( - self.model_tracker.get_eval_results_for_miner_hotkey(hotkey) + self.model_tracker.get_eval_results_for_miner_hotkey( + hotkey, competition.id + ) ) force_sync = should_retry_model( competition.constraints.epsilon_func, @@ -449,6 +465,32 @@ def update_models(self): f"Force downloading model for UID {next_uid} because it should be retried. Eval_history={eval_history}" ) + # Special case for 14B*. We want to retry if the model is competitive in either 14B or 14B*. + if competition.id == CompetitionId.B14_MODEL: + # Check that 14B* is currently running. + competition_14b = ( + competition_utils.get_competition_for_block( + CompetitionId.B14_MODEL_MULTI_DATASET, + curr_block, + constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + ) + if competition_14b is not None: + eval_history_14b_star = self.model_tracker.get_eval_results_for_miner_hotkey( + hotkey, CompetitionId.B14_MODEL_MULTI_DATASET + ) + force_sync_14b_star = should_retry_model( + competition_14b.constraints.epsilon_func, + curr_block, + eval_history_14b_star, + ) + if force_sync_14b_star: + # Even if it is already logging for 14B, also log for 14B*. + bt.logging.debug( + f"Force downloading model for UID {next_uid} because it should be retried for 14B*. Eval_history={eval_history_14b_star}" + ) + force_sync = True + # Compare metadata and tracker, syncing new model from remote store to local if necessary. try: updated = asyncio.run( @@ -462,6 +504,7 @@ def update_models(self): except MinerMisconfiguredError as e: self.model_tracker.on_model_evaluated( hotkey, + 0, EvalResult( block=curr_block, score=math.inf, @@ -544,8 +587,7 @@ def _queue_top_models_for_eval(self) -> None: for uid in uids_to_add: # Check when we last evaluated this model. hotkey = metagraph.hotkeys[uid] - eval_history = self.model_tracker.get_eval_results_for_miner_hotkey(hotkey) - last_eval_block = eval_history[-1].block if eval_history else 0 + last_eval_block = self.model_tracker.get_block_last_evaluated(hotkey) or 0 curr_block = self._get_current_block() if curr_block - last_eval_block >= constants.model_retry_cadence: try: @@ -563,6 +605,7 @@ def _queue_top_models_for_eval(self) -> None: except MinerMisconfiguredError as e: self.model_tracker.on_model_evaluated( hotkey, + 0, EvalResult( block=curr_block, score=math.inf, @@ -762,8 +805,28 @@ async def run_step(self): # Every validator step should pick a single competition in a round-robin fashion competition = competition_schedule[self.global_step % len(competition_schedule)] + # If the competition is 14b* we skip it since we run it concurrently with 14b instead. + if competition.id == CompetitionId.B14_MODEL_MULTI_DATASET: + bt.logging.info( + "Skipping step for B14* competition. It will be evaluated as part of the regular B14 competition." + ) + return + bt.logging.info("Starting evaluation for competition: " + str(competition.id)) + running_14b_star = competition.id == CompetitionId.B14_MODEL and any( + comp.id == CompetitionId.B14_MODEL_MULTI_DATASET + for comp in competition_schedule + ) + + if running_14b_star: + competition_14b_star = competition_utils.get_competition_for_block( + comp_id=CompetitionId.B14_MODEL_MULTI_DATASET, + block=cur_block, + schedule_by_block=constants.COMPETITION_SCHEDULE_BY_BLOCK, + ) + bt.logging.info("Additionally running competition 14B* in parallel to 14B.") + # Add uids with newly updated models to the upcoming batch of evaluations. with self.pending_uids_to_eval_lock: self.uids_to_eval[competition.id].update( @@ -795,6 +858,18 @@ async def run_step(self): SubsetDataLoader = constants.DATASET_BY_COMPETITION_ID[competition.id] bt.logging.trace(f"Dataset in use: {SubsetDataLoader.name}.") + if running_14b_star: + # This will be set to a copy of 14b later with the additional eval losses appended. + uid_to_state_14b_star = defaultdict(PerUIDEvalState) + + # Also get the dataloader for 14b_star. + SubsetDataLoader_14b_star = constants.DATASET_BY_COMPETITION_ID[ + CompetitionId.B14_MODEL_MULTI_DATASET + ] + bt.logging.trace( + f"Additionally using 14b* dataset: {SubsetDataLoader_14b_star.name}." + ) + # Get the tokenizer tokenizer = pt.model.load_tokenizer( competition.constraints, cache_dir=self.config.model_dir @@ -837,19 +912,46 @@ async def run_step(self): # This is useful for logging to wandb pages = dataloader.get_page_names() + bt.logging.debug(f"Competition {competition.id} | Computing losses on {uids}") + bt.logging.debug(f"Pages used are {pages}") + + if running_14b_star: + dataloader_14b_star = SubsetDataLoader_14b_star( + batch_size=constants.batch_size, + sequence_length=competition_14b_star.constraints.sequence_length, + num_pages=pages_per_eval, + tokenizer=tokenizer, + pack_samples=pack_samples, + random_seed=seed, + ) + + batches_14b_star = list(dataloader_14b_star) + bt.logging.debug( + f"Number of 14b* validation batches is {len(batches_14b_star)}" + ) + bt.logging.debug(f"14b* Batch size is {len(batches_14b_star[0])}") + + # This is useful for logging to wandb + pages_14b_star = dataloader_14b_star.get_page_names() + + bt.logging.debug(f"14b* Pages used are {pages_14b_star}") + + compute_loss_perf_14b_star = PerfMonitor("Eval: Compute loss 14b star") + # Prepare evaluation. kwargs = competition.constraints.kwargs.copy() kwargs["use_cache"] = True - bt.logging.debug(f"Competition {competition.id} | Computing losses on {uids}") - bt.logging.debug(f"Pages used are {pages}") - load_model_perf = PerfMonitor("Eval: Load model") compute_loss_perf = PerfMonitor("Eval: Compute loss") for uid_i in uids: # This variable should be overwritten below if the model has metadata. losses: typing.List[float] = [math.inf for _ in range(len(batches))] + if running_14b_star: + losses_14b_star: typing.List[float] = [ + math.inf for _ in range(len(batches_14b_star)) + ] bt.logging.trace(f"Getting metadata for uid: {uid_i}.") @@ -899,7 +1001,25 @@ async def run_step(self): ttl=430, mode="spawn", ) - + if running_14b_star: + with compute_loss_perf_14b_star.sample(): + try: + losses_14b_star = utils.run_in_subprocess( + functools.partial( + pt.validation.compute_losses, + model_i.pt_model, + batches_14b_star, + self.config.device, + tokenizer.eos_token_id, + pack_samples, + ), + ttl=430, + mode="spawn", + ) + except Exception as e: + bt.logging.error( + f"Error in eval loop: {e}. Setting 14b* losses for uid: {uid_i} to infinity." + ) del model_i except Exception as e: @@ -911,60 +1031,38 @@ async def run_step(self): f"Unable to load the model for {uid_i} or it belongs to another competition. Setting loss to inifinity for this competition." ) - uid_to_state[uid_i].losses = losses - average_model_loss = sum(losses) / len(losses) + uid_to_state[uid_i].losses[SubsetDataLoader.name] = losses bt.logging.trace( - f"Computed model losses for uid:{uid_i} with average loss: {average_model_loss}" + f"Computed model losses for uid:{uid_i} with average loss: {uid_to_state[uid_i].avg_loss()}" ) - # Compute wins and win rates per uid. - # Take the average loss across all batches for comparison of best model. - uid_to_average_loss = { - uid: state.avg_loss() for uid, state in uid_to_state.items() - } - uid_to_block = {uid: state.block for uid, state in uid_to_state.items()} - - # Filter to the list of uids that may at one point be a top model. - competitive_uids = pt.validation.compute_competitive_uids( - uid_to_average_loss, uid_to_block, competition.constraints.epsilon_func - ) - - # Log which models got dropped for the second pass. - dropped_uids = [uid for uid in uids if uid not in competitive_uids] - if dropped_uids: - bt.logging.info( - f"The following uids were not included in the win rate calculation because they did not beat the fully decayed loss of any previously submitted model in this eval batch: {dropped_uids}." - ) + if running_14b_star: + # Make a deep copy of the current uid_to_state and append the additional losses. + uid_to_state_14b_star[uid_i] = copy.deepcopy(uid_to_state[uid_i]) + uid_to_state_14b_star[uid_i].losses[ + SubsetDataLoader_14b_star.name + ] = losses_14b_star + bt.logging.trace( + f"Computed 14b* model losses for uid:{uid_i} with average loss: {uid_to_state_14b_star[uid_i].avg_loss()}. Details: {uid_to_state_14b_star[uid_i].avg_loss_per_dataset()}" + ) # Calculate new wins and win_rate with only the competitive uids considered. - wins, win_rate = pt.validation.compute_wins( - competitive_uids, - uid_to_average_loss, - uid_to_block, - competition.constraints.epsilon_func, - cur_block, - ) - - top_uid = max(win_rate, key=win_rate.get) - self._record_eval_results(top_uid, cur_block, uid_to_state) - - # Compute softmaxed weights based on win rate. - model_weights = torch.tensor( - [win_rate.get(uid, 0) for uid in uids], dtype=torch.float32 + wins, win_rate = self._compute_and_set_competition_weights( + cur_block=cur_block, + uids=uids, + uid_to_state=uid_to_state, + competition=competition, ) - step_weights = torch.softmax(model_weights / constants.temperature, dim=0) - - # Fill in metagraph sized tensor with the step weights of the evaluated models. - with self.metagraph_lock: - competition_weights = torch.zeros_like(self.metagraph.S) - for i, uid_i in enumerate(uids): - competition_weights[uid_i] = step_weights[i] - - # Record weights for the current competition. - self.competition_tracker.record_competition_weights( - competition.id, competition_weights - ) + if running_14b_star: + wins_14b_star, win_rate_14b_star = ( + self._compute_and_set_competition_weights( + cur_block=cur_block, + uids=uids, + uid_to_state=uid_to_state_14b_star, + competition=competition_14b_star, + ) + ) # Get ids for all competitions in the schedule. active_competition_ids = set([comp.id for comp in competition_schedule]) @@ -980,6 +1078,12 @@ async def run_step(self): tracker_competition_weights = self.competition_tracker.get_competition_weights( competition.id ) + if running_14b_star: + # Get the weights as if only the 14b and 14b* competition existed for purposes of model prioritization. + tracker_competition_weights = self.competition_tracker.get_subnet_weights( + [competition, competition_14b_star] + ) + model_prioritization = { uid: ( # Add 1 to ensure it is always greater than a win rate. @@ -995,6 +1099,12 @@ async def run_step(self): : self.config.sample_min ] ) + + # Note when breaking ties of 0 weight models we use the primary dataset in all cases. + uid_to_average_loss = { + uid: state.avg_loss() for uid, state in uid_to_state.items() + } + # Make sure we always keep around sample_min number of models to maintain previous behavior. if len(models_to_keep) < self.config.sample_min: for uid in sorted(uid_to_average_loss, key=uid_to_average_loss.get): @@ -1022,15 +1132,104 @@ async def run_step(self): uid_to_state, self._get_uids_to_competition_ids(), pages, - model_weights, wins, win_rate, load_model_perf, compute_loss_perf, ) - # Increment the number of completed run steps by 1 - self.run_step_count += 1 + if running_14b_star: + bt.logging.debug(compute_loss_perf_14b_star.summary_str()) + + uids_to_competition_ids_14b_star = { + k: ( + CompetitionId.B14_MODEL_MULTI_DATASET.value + if v == CompetitionId.B14_MODEL + else v + ) + for k, v in self._get_uids_to_competition_ids().items() + } + + self.log_step( + CompetitionId.B14_MODEL_MULTI_DATASET, + competition.constraints.epsilon_func, + cur_block, + uids, + uid_to_state_14b_star, + uids_to_competition_ids_14b_star, + pages_14b_star, + wins_14b_star, + win_rate_14b_star, + load_model_perf, + compute_loss_perf_14b_star, + ) + + def _compute_and_set_competition_weights( + self, + cur_block: int, + uids: typing.List[int], + uid_to_state: typing.Dict[int, PerUIDEvalState], + competition: Competition, + ) -> typing.Tuple[typing.Dict[int, int], typing.Dict[int, float]]: + """Computes competition weights including checks for competitiveness and records them internally. + + Args: + cur_block (int): The current block. + uids (typing.List[int]): All uids being considered during the current evaluation. + uid_to_state (typing.Dict[int, PerUIDEvalState]): Evaluation information for each uid. + competition (Competition): The current competition being evaluated. + + Returns: + tuple: A tuple containing two dictionaries, one for wins and one for win rates. + """ + uid_to_average_loss = { + uid: state.avg_loss() for uid, state in uid_to_state.items() + } + uid_to_block = {uid: state.block for uid, state in uid_to_state.items()} + + # Filter to the list of uids that may at one point be a top model. + competitive_uids = pt.validation.compute_competitive_uids( + uid_to_average_loss, uid_to_block, competition.constraints.epsilon_func + ) + + # Log which models got dropped for the second pass. + dropped_uids = [uid for uid in uids if uid not in competitive_uids] + if dropped_uids: + bt.logging.info( + f"The following uids were not included in the win rate calculation because they did not beat the fully decayed loss of any previously submitted model in this eval batch: {dropped_uids}." + ) + + # Calculate new wins and win_rate with only the competitive uids considered. + wins, win_rate = pt.validation.compute_wins( + competitive_uids, + uid_to_average_loss, + uid_to_block, + competition.constraints.epsilon_func, + cur_block, + ) + + top_uid = max(win_rate, key=win_rate.get) + self._record_eval_results(top_uid, cur_block, uid_to_state, competition.id) + + # Compute softmaxed weights based on win rate. + model_weights = torch.tensor( + [win_rate.get(uid, 0) for uid in uids], dtype=torch.float32 + ) + step_weights = torch.softmax(model_weights / constants.temperature, dim=0) + + # Fill in metagraph sized tensor with the step weights of the evaluated models. + with self.metagraph_lock: + competition_weights = torch.zeros_like(self.metagraph.S) + + for i, uid_i in enumerate(uids): + competition_weights[uid_i] = step_weights[i] + + # Record weights for the current competition. + self.competition_tracker.record_competition_weights( + competition.id, competition_weights + ) + + return wins, win_rate def _update_uids_to_eval( self, @@ -1066,6 +1265,7 @@ def _record_eval_results( top_uid: int, curr_block: int, uid_to_state: typing.Dict[int, PerUIDEvalState], + competition_id: CompetitionId, ) -> None: """Records the results of the evaluation step to the model tracker. @@ -1078,6 +1278,7 @@ def _record_eval_results( for _, state in uid_to_state.items(): self.model_tracker.on_model_evaluated( state.hotkey, + competition_id, EvalResult( block=curr_block, score=state.avg_loss(), @@ -1095,7 +1296,6 @@ def log_step( uid_to_state: typing.Dict[int, PerUIDEvalState], uid_to_competition_id: typing.Dict[int, typing.Optional[int]], pages: typing.List[str], - model_weights: typing.List[float], wins: typing.Dict[int, int], win_rate: typing.Dict[int, float], load_model_perf: PerfMonitor, @@ -1112,12 +1312,12 @@ def log_step( } # The sub-competition weights - sub_competition_weights = torch.softmax( - model_weights / constants.temperature, dim=0 + sub_competition_weights = self.competition_tracker.get_competition_weights( + competition_id ) # All uids in the competition step log are from the same competition. - for idx, uid in enumerate(uids): + for uid in uids: step_log["uid_data"][str(uid)] = { "uid": uid, "block": uid_to_state[uid].block, @@ -1131,8 +1331,14 @@ def log_step( "win_rate": win_rate[uid] if uid in win_rate else 0, "win_total": wins[uid] if uid in wins else 0, "weight": self.weights[uid].item(), - "norm_weight": sub_competition_weights[idx].item(), + "norm_weight": sub_competition_weights[uid].item(), } + for dataset_name, avg_loss in ( + uid_to_state[uid].avg_loss_per_dataset().items() + ): + step_log["uid_data"][str(uid)][ + f"{dataset_name}_average_loss" + ] = avg_loss table = Table(title="Step", expand=True) table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("hf", style="magenta", overflow="fold") @@ -1144,7 +1350,7 @@ def log_step( table.add_column("comp_weight", style="magenta", overflow="fold") table.add_column("block", style="magenta", overflow="fold") table.add_column("comp", style="magenta", overflow="fold") - for idx, uid in enumerate(uids): + for uid in uids: try: table.add_row( str(uid), @@ -1154,7 +1360,7 @@ def log_step( str(round(step_log["uid_data"][str(uid)]["win_rate"], 4)), str(step_log["uid_data"][str(uid)]["win_total"]), str(round(self.weights[uid].item(), 4)), - str(round(sub_competition_weights[idx].item(), 4)), + str(round(sub_competition_weights[uid].item(), 4)), str(step_log["uid_data"][str(uid)]["block"]), str(step_log["uid_data"][str(uid)]["competition_id"]), ) @@ -1216,8 +1422,7 @@ def log_step( }, "weight_data": {str(uid): self.weights[uid].item() for uid in uids}, "competition_weight_data": { - str(uid): sub_competition_weights[i].item() - for i, uid in enumerate(uids) + str(uid): sub_competition_weights[uid].item() for uid in uids }, "competition_id": {str(uid): int(competition_id)}, "load_model_perf": { @@ -1235,11 +1440,11 @@ def log_step( } bt.logging.trace("Logging to Wandb") self.wandb_run.log( - {**graphed_data, "original_format_json": original_format_json}, - step=self.last_wandb_step, + {**graphed_data, "original_format_json": original_format_json} ) - self.last_wandb_step += 1 + # Increment the number of completed run steps by 1 + self.run_step_count += 1 def _get_uids_to_competition_ids( self, @@ -1265,7 +1470,7 @@ async def run(self): while True: try: # First run a step. - await self.try_run_step(ttl=60 * 60) + await self.try_run_step(ttl=120 * 60) self.global_step += 1 block = self._get_current_block() diff --git a/requirements.txt b/requirements.txt index 22130ec6..8c2a5fc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,4 @@ transformers==4.44.1 wandb datasets flash-attn -taoverse==1.0.8 +taoverse==1.0.9 From 29bb861f24791e0a3d12fd59df77df9fd12d98f8 Mon Sep 17 00:00:00 2001 From: Sid Date: Tue, 29 Oct 2024 20:24:23 -0700 Subject: [PATCH 18/21] Bump the release and validator schema version. --- constants/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/constants/__init__.py b/constants/__init__.py index d37d079a..a67f353a 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -34,10 +34,10 @@ # --------------------------------- # Release -__version__ = "4.5.3" +__version__ = "4.5.4" # Validator schema version -__validator_version__ = "3.4.0" +__validator_version__ = "4.5.4" version_split = __validator_version__.split(".") __spec_version__ = ( (1000 * int(version_split[0])) From 864a89d67973934671492dbdb0e3a9d1293c244a Mon Sep 17 00:00:00 2001 From: Sid Date: Thu, 31 Oct 2024 18:59:19 -0700 Subject: [PATCH 19/21] Adjust ratio of 14b* tokens to ~1/4 of total tokens. --- constants/__init__.py | 9 +++++++++ neurons/validator.py | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/constants/__init__.py b/constants/__init__.py index a67f353a..0e489c95 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -105,6 +105,15 @@ CompetitionId.B14_MODEL_MULTI_DATASET: pt.dataset.SubsetFalconLoader, } +# In a future release we will update the loaders to be able to load a certain number of tokens rather than pages. +# Until then we can use this ratio to approximate the correct ratio for B14*. Token counts using Xenova/gpt-4. +ESTIMATED_TOKENS_PER_PAGE_FALCON = 55419 +ESTIMATED_TOKENS_PER_PAGE_FINEWEB = 87490 +# To make the additional data be 1/4 of the total we need a 1:3 ratio. +PAGE_RATIO_14B_STAR = ( + 0.33 * ESTIMATED_TOKENS_PER_PAGE_FINEWEB / ESTIMATED_TOKENS_PER_PAGE_FALCON +) + # Synchronize on blocks roughly every 30 minutes. SYNC_BLOCK_CADENCE = 150 # Delay at least as long as the sync block cadence with an additional buffer. diff --git a/neurons/validator.py b/neurons/validator.py index e7c23b96..266dd316 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -919,7 +919,7 @@ async def run_step(self): dataloader_14b_star = SubsetDataLoader_14b_star( batch_size=constants.batch_size, sequence_length=competition_14b_star.constraints.sequence_length, - num_pages=pages_per_eval, + num_pages=int(pages_per_eval * constants.PAGE_RATIO_14B_STAR), tokenizer=tokenizer, pack_samples=pack_samples, random_seed=seed, From f49ecda743c5b676859bf451d1eed794ccf254d2 Mon Sep 17 00:00:00 2001 From: Sid Date: Fri, 1 Nov 2024 09:31:20 -0700 Subject: [PATCH 20/21] Remove 772M and adjust start block. --- constants/__init__.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/constants/__init__.py b/constants/__init__.py index 0e489c95..6afce8a6 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -65,8 +65,7 @@ BLOCK_SAMPLE_PACK = 4_001_017 # Starting block for 14B* (multi dataset experiment). -# TODO: Update starting block as needed. -BLOCK_14B_STAR = 4_210_361 +BLOCK_14B_STAR = 4_252_646 # Minimum percent of weight on a vali for a miner to be considered a top miner. # Since there can be multiple competitions at different reward percentages we can't just check biggest. @@ -186,12 +185,6 @@ ( BLOCK_14B_STAR, [ - # TODO confirm if we are removing the other 2 competitions. - Competition( - CompetitionId.M772_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.M772_MODEL], - 0.14, - ), Competition( CompetitionId.B3_MODEL, MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B3_MODEL], @@ -200,7 +193,7 @@ Competition( CompetitionId.B14_MODEL, MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], - 0.52, + 0.66, ), # TODO decide specific weight. Competition( From 46d18fb8ab8a73123462afdf3e857150fd011b00 Mon Sep 17 00:00:00 2001 From: cryptal-mc Date: Fri, 8 Nov 2024 18:25:35 +0000 Subject: [PATCH 21/21] Added support for The Stack V1 Dedup in multiset 14B --- constants/__init__.py | 75 +++++++++++++++++++++++++--------- neurons/config.py | 6 --- neurons/validator.py | 23 +++++------ pretrain/dataset.py | 10 ++--- tests/pretrain/test_dataset.py | 2 + 5 files changed, 72 insertions(+), 44 deletions(-) diff --git a/constants/__init__.py b/constants/__init__.py index 6afce8a6..ec75ffd2 100644 --- a/constants/__init__.py +++ b/constants/__init__.py @@ -34,10 +34,10 @@ # --------------------------------- # Release -__version__ = "4.5.4" +__version__ = "4.6.0" # Validator schema version -__validator_version__ = "4.5.4" +__validator_version__ = "4.6.0" version_split = __validator_version__.split(".") __spec_version__ = ( (1000 * int(version_split[0])) @@ -101,18 +101,9 @@ CompetitionId.B7_MODEL: pt.dataset.SubsetFineWebEdu2Loader, CompetitionId.B14_MODEL: pt.dataset.SubsetFineWebEdu2Loader, # B14 model multi dataset adds the following dataset to the baseline b14 competition. - CompetitionId.B14_MODEL_MULTI_DATASET: pt.dataset.SubsetFalconLoader, + CompetitionId.B14_MODEL_MULTI_DATASET: pt.dataset.SubsetStackV1DedupLoader, } -# In a future release we will update the loaders to be able to load a certain number of tokens rather than pages. -# Until then we can use this ratio to approximate the correct ratio for B14*. Token counts using Xenova/gpt-4. -ESTIMATED_TOKENS_PER_PAGE_FALCON = 55419 -ESTIMATED_TOKENS_PER_PAGE_FINEWEB = 87490 -# To make the additional data be 1/4 of the total we need a 1:3 ratio. -PAGE_RATIO_14B_STAR = ( - 0.33 * ESTIMATED_TOKENS_PER_PAGE_FINEWEB / ESTIMATED_TOKENS_PER_PAGE_FALCON -) - # Synchronize on blocks roughly every 30 minutes. SYNC_BLOCK_CADENCE = 150 # Delay at least as long as the sync block cadence with an additional buffer. @@ -160,6 +151,47 @@ ), } +MODEL_CONSTRAINTS_BY_COMPETITION_ID_2: Dict[CompetitionId, ModelConstraints] = { + CompetitionId.M772_MODEL: ModelConstraints( + max_model_parameter_size=772_000_000, + min_model_parameter_size=572_000_000, + sequence_length=1024, + allowed_architectures=ALLOWED_MODEL_TYPES_1, + tokenizer="distilgpt2", + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0005, 50400), + max_bytes=5 * 1024 * 1024 * 1024, + ), + CompetitionId.B3_MODEL: ModelConstraints( + max_model_parameter_size=3_400_000_000, + min_model_parameter_size=3_200_000_000, + sequence_length=4096, + allowed_architectures=ALLOWED_MODEL_TYPES_2, + tokenizer="Xenova/gpt-4", + kwargs={ + "torch_dtype": torch.bfloat16, + "attn_implementation": "flash_attention_2", + }, + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0005, 50400), + max_bytes=15 * 1024 * 1024 * 1024, + ), + CompetitionId.B14_MODEL: ModelConstraints( + max_model_parameter_size=13_900_000_000, + min_model_parameter_size=13_700_000_000, + sequence_length=4096, + allowed_architectures=ALLOWED_MODEL_TYPES_2, + tokenizer="Xenova/gpt-4", + kwargs={ + "torch_dtype": torch.bfloat16, + "attn_implementation": "flash_attention_2", + }, + eval_block_delay=EVAL_BLOCK_DELAY, + epsilon_func=LinearDecay(0.005, 0.0005, 50400), + max_bytes=29 * 1024 * 1024 * 1024, + ), +} + # Schedule of competitions by block. COMPETITION_SCHEDULE_BY_BLOCK: List[Tuple[int, List[Competition]]] = [ ( @@ -187,19 +219,18 @@ [ Competition( CompetitionId.B3_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B3_MODEL], + MODEL_CONSTRAINTS_BY_COMPETITION_ID_2[CompetitionId.B3_MODEL], 0.29, ), Competition( CompetitionId.B14_MODEL, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], - 0.66, + MODEL_CONSTRAINTS_BY_COMPETITION_ID_2[CompetitionId.B14_MODEL], + 0.57, ), - # TODO decide specific weight. Competition( CompetitionId.B14_MODEL_MULTI_DATASET, - MODEL_CONSTRAINTS_BY_COMPETITION_ID[CompetitionId.B14_MODEL], - 0.05, + MODEL_CONSTRAINTS_BY_COMPETITION_ID_2[CompetitionId.B14_MODEL], + 0.14, ), ], ), @@ -231,8 +262,12 @@ sample_pack_block = BLOCK_SAMPLE_PACK # validators number of pages to eval over miners on each step. -pages_per_eval_unpack = 5 # With sample unpacking -pages_per_eval_pack = 11 +pages_per_eval_unpack = 10 # With sample unpacking +pages_per_eval_pack = 22 + +# In a future release we will update the loaders to be able to load a certain number of tokens rather than pages. +# Until then we need to set this manually +pages_per_eval_14bstar_pack = 1 # validator eval batch size. batch_size = 1 diff --git a/neurons/config.py b/neurons/config.py index eeb58d1f..b1c4f03d 100644 --- a/neurons/config.py +++ b/neurons/config.py @@ -32,12 +32,6 @@ def validator_config(): default=50, help="Number of blocks to wait before setting weights.", ) - parser.add_argument( - "--pages_per_eval", - type=int, - default=None, - help="Number of pages used to eval each step. If not specified, it will be automatically set.", - ) parser.add_argument( "--sample_min", type=int, diff --git a/neurons/validator.py b/neurons/validator.py index 266dd316..48f935db 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -877,14 +877,8 @@ async def run_step(self): # Use sample packing. pack_samples = True - - # If the option is set in the config, override. - pages_per_eval = ( - self.config.pages_per_eval - if self.config.pages_per_eval is not None - else constants.pages_per_eval_pack - ) - + pages_per_eval = constants.pages_per_eval_pack + # Try to synchronize the data used by validators. try: seed = metagraph_utils.get_hash_of_sync_block( @@ -919,7 +913,7 @@ async def run_step(self): dataloader_14b_star = SubsetDataLoader_14b_star( batch_size=constants.batch_size, sequence_length=competition_14b_star.constraints.sequence_length, - num_pages=int(pages_per_eval * constants.PAGE_RATIO_14B_STAR), + num_pages=constants.pages_per_eval_14bstar_pack, tokenizer=tokenizer, pack_samples=pack_samples, random_seed=seed, @@ -931,7 +925,7 @@ async def run_step(self): ) bt.logging.debug(f"14b* Batch size is {len(batches_14b_star[0])}") - # This is useful for logging to wandb + #This is useful for logging to wandb pages_14b_star = dataloader_14b_star.get_page_names() bt.logging.debug(f"14b* Pages used are {pages_14b_star}") @@ -1332,13 +1326,16 @@ def log_step( "win_total": wins[uid] if uid in wins else 0, "weight": self.weights[uid].item(), "norm_weight": sub_competition_weights[uid].item(), + "dataset_perf": {}, } + + # Log performance per dataset for dataset_name, avg_loss in ( uid_to_state[uid].avg_loss_per_dataset().items() ): - step_log["uid_data"][str(uid)][ - f"{dataset_name}_average_loss" - ] = avg_loss + step_log["uid_data"][str(uid)]["dataset_perf"][ + f"{dataset_name}" + ] = {'average_loss': avg_loss} table = Table(title="Step", expand=True) table.add_column("uid", justify="right", style="cyan", no_wrap=True) table.add_column("hf", style="magenta", overflow="fold") diff --git a/pretrain/dataset.py b/pretrain/dataset.py index 4c456195..14bd80c8 100644 --- a/pretrain/dataset.py +++ b/pretrain/dataset.py @@ -46,7 +46,7 @@ def __init__( if random_seed is not None: random.seed(random_seed) - self.num_rows_per_page = 100 + self.num_rows_per_page = 50 self.duplicate_page_threshold = 100 self.retry_limit = 10 self.retry_delay = 5 @@ -116,7 +116,7 @@ def _fetch_data_for_page(self, page): else: self.params["offset"] = page - self.params["limit"] = self.num_rows_per_page + self.params["length"] = self.num_rows_per_page attempt = 0 while attempt < self.retry_limit: @@ -292,7 +292,7 @@ def _fetch_data_to_buffer(self, num_pages): "config": config_name, "split": split, "offset": page_row_start, - "limit": self.num_rows_per_page, + "length": self.num_rows_per_page, } try: @@ -356,7 +356,7 @@ def fetch_data_to_rows(self, num_pages): "config": config_name, "split": split, "offset": page_row_start, - "limit": self.num_rows_per_page, + "length": self.num_rows_per_page, } try: @@ -377,4 +377,4 @@ def fetch_data_to_rows(self, num_pages): bt.logging.error("Maximum retry limit reached. Unable to fetch data.") raise - return rows \ No newline at end of file + return rows diff --git a/tests/pretrain/test_dataset.py b/tests/pretrain/test_dataset.py index a11d54f5..f62e88b2 100644 --- a/tests/pretrain/test_dataset.py +++ b/tests/pretrain/test_dataset.py @@ -94,6 +94,7 @@ def test_fineweb_loader_duplicate_threshold(self): # Try to get 6 pages from a set that only contains 5 pages worth. NUM_PAGES = 6 NUM_PAGES_ACTUAL = 5 + NUM_ROWS_PER_PAGE = 100 CONFIG_DATA = {"CC-MAIN-2013-20": {"num_rows": 499, "split": "train"}} # Load a tokenizer @@ -107,6 +108,7 @@ def test_fineweb_loader_duplicate_threshold(self): ) dataloader.configs_data = CONFIG_DATA + dataloader.num_rows_per_page = NUM_ROWS_PER_PAGE # Only fetch these once for performance, although for better correctness should consider running in a loop. # We check for actual pages or actual pages - 1 to handle the random offset.