diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 80b6419d4938..763019a40d92 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -3,6 +3,7 @@ import tempfile import dgl +import dgl.backend as F import numpy as np import pyarrow.parquet as pq @@ -19,7 +20,8 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import create_chunked_dataset +from pytest_utils import chunk_graph, create_chunked_dataset +from scipy import sparse as spsp from tools.verification_utils import ( verify_graph_feats, @@ -202,6 +204,103 @@ def test_chunk_graph_arbitrary_chunks( ) +def create_mini_chunked_dataset( + root_dir, + num_chunks, + data_fmt, + edges_fmt, + vector_rows, + few_entity="node", + **kwargs, +): + num_nodes = {"n1": 1000, "n2": 1010, "n3": 1020} + etypes = [ + ("n1", "r1", "n2"), + ("n2", "r1", "n1"), + ("n1", "r2", "n3"), + ("n2", "r3", "n3"), + ] + node_items = ["n1", "n2", "n3"] + edges_coo = {} + for etype in etypes: + src_ntype, _, dst_ntype = etype + arr = spsp.random( + num_nodes[src_ntype], + num_nodes[dst_ntype], + density=0.001, + format="coo", + random_state=100, + ) + edges_coo[etype] = (arr.row, arr.col) + edge_items = [] + if few_entity == "edge": + edges_coo[("n1", "a0", "n2")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edges_coo[("n1", "a1", "n3")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edge_items.append(("n1", "a0", "n2")) + edge_items.append(("n1", "a1", "n3")) + elif few_entity == "node": + edges_coo[("n1", "r_few", "n_few")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edges_coo[("a0", "a01", "n_1")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edge_items.append(("n1", "r_few", "n_few")) + edge_items.append(("a0", "a01", "n_1")) + node_items.append("n_few") + node_items.append("n_1") + num_nodes["n_few"] = 2 + num_nodes["n_1"] = 2 + g = dgl.heterograph(edges_coo) + + node_data = {} + edge_data = {} + # save feature + input_dir = os.path.join(root_dir, "data_test") + + for ntype in node_items: + os.makedirs(os.path.join(input_dir, ntype)) + feat = np.random.randn(num_nodes[ntype], 3) + feat_path = os.path.join(input_dir, f"{ntype}/feat.npy") + with open(feat_path, "wb") as f: + np.save(f, feat) + g.nodes[ntype].data["feat"] = torch.from_numpy(feat) + node_data[ntype] = {"feat": feat_path} + + for etype in set(edge_items): + os.makedirs(os.path.join(input_dir, etype[1])) + num_edge = len(edges_coo[etype][0]) + feat = np.random.randn(num_edge, 4) + feat_path = os.path.join(input_dir, f"{etype[1]}/feat.npy") + with open(feat_path, "wb") as f: + np.save(f, feat) + g.edges[etype].data["feat"] = torch.from_numpy(feat) + edge_data[etype] = {"feat": feat_path} + + output_dir = os.path.join(root_dir, "chunked-data") + chunk_graph( + g, + "mag240m", + node_data, + edge_data, + num_chunks=num_chunks, + output_path=output_dir, + data_fmt=data_fmt, + edges_fmt=edges_fmt, + vector_rows=vector_rows, + **kwargs, + ) + return g + + def _test_pipeline( num_chunks, num_parts, @@ -373,6 +472,98 @@ def test_pipeline_feature_format(data_fmt): _test_pipeline(4, 4, 4, data_fmt=data_fmt) +@pytest.mark.parametrize( + "num_chunks, num_parts, world_size", + [[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]], +) +@pytest.mark.parametrize("few_entity", ["node", "edge"]) +def test_partition_hetero_few_entity( + num_chunks, + num_parts, + world_size, + few_entity, + graph_formats=None, + data_fmt="numpy", + edges_fmt="csv", + vector_rows=False, + num_chunks_nodes=None, + num_chunks_edges=None, + num_chunks_node_data=None, + num_chunks_edge_data=None, +): + with tempfile.TemporaryDirectory() as root_dir: + g = create_mini_chunked_dataset( + root_dir, + num_chunks, + few_entity=few_entity, + data_fmt=data_fmt, + edges_fmt=edges_fmt, + vector_rows=vector_rows, + num_chunks_nodes=num_chunks_nodes, + num_chunks_edges=num_chunks_edges, + num_chunks_node_data=num_chunks_node_data, + num_chunks_edge_data=num_chunks_edge_data, + ) + + # Step1: graph partition + in_dir = os.path.join(root_dir, "chunked-data") + output_dir = os.path.join(root_dir, "parted_data") + os.system( + "python3 tools/partition_algo/random_partition.py " + "--in_dir {} --out_dir {} --num_partitions {}".format( + in_dir, output_dir, num_parts + ) + ) + + # Step2: data dispatch + partition_dir = os.path.join(root_dir, "parted_data") + out_dir = os.path.join(root_dir, "partitioned") + ip_config = os.path.join(root_dir, "ip_config.txt") + with open(ip_config, "w") as f: + for i in range(world_size): + f.write(f"127.0.0.{i + 1}\n") + + cmd = "python3 tools/dispatch_data.py" + cmd += f" --in-dir {in_dir}" + cmd += f" --partitions-dir {partition_dir}" + cmd += f" --out-dir {out_dir}" + cmd += f" --ip-config {ip_config}" + cmd += " --ssh-port 22" + cmd += " --process-group-timeout 60" + cmd += " --save-orig-nids" + cmd += " --save-orig-eids" + cmd += f" --graph-formats {graph_formats}" if graph_formats else "" + os.system(cmd) + + # read original node/edge IDs + def read_orig_ids(fname): + orig_ids = {} + for i in range(num_parts): + ids_path = os.path.join(out_dir, f"part{i}", fname) + part_ids = load_tensors(ids_path) + for type, data in part_ids.items(): + if type not in orig_ids: + orig_ids[type] = data + else: + orig_ids[type] = torch.cat((orig_ids[type], data)) + return orig_ids + + orig_nids = read_orig_ids("orig_nids.dgl") + orig_eids = read_orig_ids("orig_eids.dgl") + + # load partitions and verify + part_config = os.path.join(out_dir, "metadata.json") + for i in range(num_parts): + part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition( + part_config, i + ) + verify_partition_data_types(part_g) + verify_partition_formats(part_g, graph_formats) + verify_graph_feats( + g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids + ) + + def test_utils_generate_read_list(): read_list = generate_read_list(10, 4) assert np.array_equal(read_list[0], np.array([0, 1, 2])) diff --git a/tools/distpartitioning/convert_partition.py b/tools/distpartitioning/convert_partition.py index 5013b6d40f20..d2d70b96e5c5 100644 --- a/tools/distpartitioning/convert_partition.py +++ b/tools/distpartitioning/convert_partition.py @@ -9,6 +9,7 @@ import dgl.graphbolt as gb import numpy as np import torch as th +import torch.distributed as dist from dgl import EID, ETYPE, NID, NTYPE from dgl.distributed.constants import DGL2GB_EID, GB_DST_ID @@ -355,6 +356,34 @@ def _process_partition_gb( return indptr, indices[sorted_idx], edge_ids[sorted_idx] +def _update_node_map(node_map_val, end_ids_per_rank, id_ntypes, prev_last_id): + """this function is modified from the function '_update_node_edge_map' in dgl.distributed.partition""" + # Update the node_map_val to be contiguous. + rank = dist.get_rank() + prev_end_id = ( + end_ids_per_rank[rank - 1].item() if rank > 0 else prev_last_id + ) + ntype_ids = {ntype: ntype_id for ntype_id, ntype in enumerate(id_ntypes)} + for ntype_id in list(ntype_ids.values()): + ntype = id_ntypes[ntype_id] + start_id = node_map_val[ntype][0][0] + end_id = node_map_val[ntype][0][1] + if not (start_id == -1 and end_id == -1): + continue + prev_ntype_id = ( + ntype_ids[ntype] - 1 + if ntype_ids[ntype] > 0 + else max(ntype_ids.values()) + ) + prev_ntype = id_ntypes[prev_ntype_id] + if ntype_ids[ntype] == 0: + node_map_val[ntype][0][0] = prev_end_id + else: + node_map_val[ntype][0][0] = node_map_val[prev_ntype][0][1] + node_map_val[ntype][0][1] = node_map_val[ntype][0][0] + return node_map_val[ntype][0][-1] + + def create_graph_object( tot_node_count, tot_edge_count, @@ -368,6 +397,7 @@ def create_graph_object( edgeid_offset, node_typecounts, edge_typecounts, + last_ids={}, return_orig_nids=False, return_orig_eids=False, use_graphbolt=False, @@ -512,12 +542,30 @@ def create_graph_object( shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1]) # Determine the node ID ranges of different node types. + prev_last_id = last_ids.get(part_id - 1, 0) for ntype_name in global_nid_ranges: ntype_id = ntypes_map[ntype_name] type_nids = shuffle_global_nids[ntype_ids == ntype_id] - node_map_val[ntype_name].append( - [int(type_nids[0]), int(type_nids[-1]) + 1] - ) + if len(type_nids) == 0: + node_map_val[ntype_name].append([-1, -1]) + else: + node_map_val[ntype_name].append( + [int(type_nids[0]), int(type_nids[-1]) + 1] + ) + last_id = th.tensor( + [max(prev_last_id, int(type_nids[-1]) + 1)], dtype=th.int64 + ) + id_ntypes = list(global_nid_ranges.keys()) + + gather_last_ids = [ + th.zeros(1, dtype=th.int64) for _ in range(dist.get_world_size()) + ] + + dist.all_gather(gather_last_ids, last_id) + prev_last_id = _update_node_map( + node_map_val, gather_last_ids, id_ntypes, prev_last_id + ) + last_ids[part_id] = prev_last_id # process edges memory_snapshot("CreateDGLObj_AssignEdgeData: ", part_id) diff --git a/tools/distpartitioning/data_shuffle.py b/tools/distpartitioning/data_shuffle.py index 6800064a2b0b..daae0fcbb3fd 100644 --- a/tools/distpartitioning/data_shuffle.py +++ b/tools/distpartitioning/data_shuffle.py @@ -489,6 +489,10 @@ def exchange_feature( feat_dims_dtype.append(DATA_TYPE_ID[torch.float32]) feature_dimension = 0 + feature_dimension_tensor = torch.tensor([feature_dimension]) + dist.all_reduce(feature_dimension_tensor, op=dist.ReduceOp.MAX) + feature_dimension = feature_dimension_tensor.item() + logging.debug(f"Sending the feature shape information - {feat_dims_dtype}") all_dims_dtype = allgather_sizes( feat_dims_dtype, world_size, num_parts, return_sizes=True @@ -553,7 +557,11 @@ def exchange_feature( else: cur_features[local_feat_key] = output_feat_list cur_global_ids[local_feat_key] = output_id_list - + else: + cur_features[local_feat_key] = torch.empty( + (0, feature_dimension), dtype=torch.float32 + ) + cur_global_ids[local_feat_key] = torch.empty((0,), dtype=torch.int64) return cur_features, cur_global_ids @@ -1301,6 +1309,7 @@ def prepare_local_data(src_data, local_part_id): if params.graph_formats: graph_formats = params.graph_formats.split(",") + prev_last_ids = {} for local_part_id in range(params.num_parts // world_size): # Synchronize for each local partition of the graph object. dist.barrier() @@ -1340,6 +1349,7 @@ def prepare_local_data(src_data, local_part_id): schema_map[constants.STR_NUM_NODES_PER_TYPE], ), edge_typecounts, + prev_last_ids, return_orig_nids=params.save_orig_nids, return_orig_eids=params.save_orig_eids, use_graphbolt=params.use_graphbolt, @@ -1390,6 +1400,19 @@ def prepare_local_data(src_data, local_part_id): ] = json_metadata memory_snapshot("MetadataCreateComplete: ", rank) + last_id_tensor = torch.tensor( + [prev_last_ids[rank + (local_part_id * world_size)]], + dtype=torch.int64, + ) + gather_list = [ + torch.zeros(1, dtype=torch.int64) for _ in range(world_size) + ] + dist.all_gather(gather_list, last_id_tensor) + for rank_id, last_id in enumerate(gather_list): + prev_last_ids[ + rank_id + (local_part_id * world_size) + ] = last_id.item() + if rank == 0: # get meta-data from all partitions and merge them on rank-0 metadata_list = gather_metadata_json(output_meta_json, rank, world_size) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 95ba386853aa..fcff3cd74302 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -547,6 +547,11 @@ def get_dataset( autogenerate_column_names=True, ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") + + if os.path.getsize(edge_file) == 0: + # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. + # The src_ids and dst_ids should remain empty. + continue with pyarrow.csv.open_csv( edge_file, read_options=read_options,