Skip to content

Commit

Permalink
fxied deepspeed
Browse files Browse the repository at this point in the history
  • Loading branch information
hariharan-devarajan committed Oct 10, 2024
1 parent 7104043 commit 6b50605
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 16 deletions.
9 changes: 4 additions & 5 deletions dlio_benchmark/configs/workload/megatron_deepspeed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@ dataset:

reader:
data_loader: pytorch
batch_size: 1024
batch_size: 16
read_threads: 1
file_shuffle: seed
sample_shuffle: seed

train:
epochs: 311541
total_training_steps: 64
computation_time: 0.03 # every iteration has 290 steps and each iteration is 8.9 sec.
epochs: 3
computation_time: 2.44 # 2.44 sec per step

checkpoint:
checkpoint_folder: checkpoints/megatron-deepspeed
epochs_between_checkpoints: 1000
steps_between_checkpoints: 1000
model_size: 30102
type: all_ranks
optimization_groups: [1009254400, 865075200, 793600]
Expand Down
34 changes: 23 additions & 11 deletions dlio_benchmark/reader/indexed_binary_mmap_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class IndexedBinaryMMapReader(FormatReader):
def __init__(self, dataset_type, thread_index, epoch):
super().__init__(dataset_type, thread_index)
self.file_map_ibr = {}
self.load_index()
self.buffer_map = {}
self.load_index()

def index_file_path_off(self, prefix_path):
return prefix_path + '.off.idx'
Expand All @@ -61,6 +61,9 @@ def load_index_file(self, global_sample_idx, filename, sample_index):
bin_buffer_mmap = np.memmap(sz_file, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.file_map_ibr[filename].append(np.frombuffer(bin_buffer, dtype=np.uint32))
bin_buffer_mmap = np.memmap(filename, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8)

@dlp.log
def load_index(self):
Expand All @@ -76,24 +79,20 @@ def load_index(self):

@dlp.log
def open(self, filename):
super().open(filename)
bin_buffer_mmap = np.memmap(filename, mode='r', order='C')
bin_buffer = memoryview(bin_buffer_mmap)
self.buffer_map[filename] = np.frombuffer(bin_buffer, dtype=np.uint8)
return bin_buffer_mmap
super().open(filename)
return self.buffer_map[filename]

@dlp.log
def close(self, filename):
super().close(filename)
self.open_file_map[filename]._mmap.close()


@dlp.log
def get_sample(self, filename, sample_index):
super().get_sample(filename, sample_index)
buffer = self.buffer_map[filename]
offset = self.file_map_ibr[filename][0][sample_index]
size = self.file_map_ibr[filename][1][sample_index]
logging.debug(f"reading sample from offset {offset} of size {size} from file {filename}")
image = buffer[offset:offset+size]
dlp.update(image_size=size)

Expand All @@ -103,14 +102,27 @@ def next(self):

@dlp.log
def read_index(self, image_idx, step):
return super().read_index(image_idx, step)
filename, sample_index = self.global_index_map[image_idx]
self.get_sample(filename, sample_index)
return self._args.resized_image

@dlp.log
def finalize(self):
return super().finalize()
super().finalize()
if self._args.data_loader_sampler == DataLoaderSampler.ITERATIVE:
for global_sample_idx, filename, sample_index in self.file_map[self.thread_index]:
self.buffer_map[filename]._mmap.close()
self.file_map_ibr[filename][0]._mmap.close()
self.file_map_ibr[filename][1]._mmap.close()
elif self._args.data_loader_sampler == DataLoaderSampler.INDEX:
for global_sample_idx, (filename, sample_index) in self.global_index_map.items():
self.buffer_map[filename]._mmap.close()
self.file_map_ibr[filename][0]._mmap.close()
self.file_map_ibr[filename][1]._mmap.close()


def is_index_based(self):
return True

def is_iterator_based(self):
return True
return True

0 comments on commit 6b50605

Please sign in to comment.