From 8f9e6393b3f4bfcb8e2c5c18b78c65ddeaa17ef2 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 14 Aug 2024 02:35:23 +0000 Subject: [PATCH 01/17] change a variable --- python/dgl/distributed/partition.py | 93 ++++++++++++++++++----------- 1 file changed, 57 insertions(+), 36 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 73ea48959597..fd0fcae9d9c2 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1105,7 +1105,7 @@ def get_homogeneous(g, balance_ntypes): inner_node_mask = _get_inner_node_mask(parts[i], ntype_id) val.append( F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0)) - ) + )#note inner_node_mask(tensor[n,bool])->tensor[n,int64]->sum->scalar, compute the num of one partition inner_nids = F.boolean_mask( parts[i].ndata[NID], inner_node_mask ) @@ -1115,7 +1115,7 @@ def get_homogeneous(g, balance_ntypes): int(F.as_scalar(inner_nids[-1])) + 1, ] ) - val = np.cumsum(val).tolist() + val = np.cumsum(val).tolist()# note computing the cumulative sum of array elements. assert val[-1] == g.num_nodes(ntype) for etype in g.canonical_etypes: etype_id = g.get_etype_id(etype) @@ -1135,7 +1135,7 @@ def get_homogeneous(g, balance_ntypes): [int(inner_eids[0]), int(inner_eids[-1]) + 1] ) val = np.cumsum(val).tolist() - assert val[-1] == g.num_edges(etype) + assert val[-1] == g.num_edges(etype)# note assure the tot graph can be used else: node_map_val = {} edge_map_val = {} @@ -1305,32 +1305,52 @@ def get_homogeneous(g, balance_ntypes): part_dir = os.path.join(out_path, "part" + str(part_id)) node_feat_file = os.path.join(part_dir, "node_feat.dgl") edge_feat_file = os.path.join(part_dir, "edge_feat.dgl") - part_graph_file = os.path.join(part_dir, "graph.dgl") - part_metadata["part-{}".format(part_id)] = { - "node_feats": os.path.relpath(node_feat_file, out_path), - "edge_feats": os.path.relpath(edge_feat_file, out_path), - "part_graph": os.path.relpath(part_graph_file, out_path), - } + os.makedirs(part_dir, mode=0o775, exist_ok=True) save_tensors(node_feat_file, node_feats) save_tensors(edge_feat_file, edge_feats) - sort_etypes = len(g.etypes) > 1 - _save_graphs( - part_graph_file, - [part], - formats=graph_formats, - sort_etypes=sort_etypes, + #save + if use_graphbolt: + part_metadata["part-{}".format(part_id)] = { + "node_feats": os.path.relpath(node_feat_file, out_path), + "edge_feats": os.path.relpath(edge_feat_file, out_path), + } + else: + part_graph_file = os.path.join(part_dir, "graph.dgl") + + part_metadata["part-{}".format(part_id)] = { + "node_feats": os.path.relpath(node_feat_file, out_path), + "edge_feats": os.path.relpath(edge_feat_file, out_path), + "part_graph": os.path.relpath(part_graph_file, out_path), + } + sort_etypes = len(g.etypes) > 1 + _save_graphs( + part_graph_file, + [part], + formats=graph_formats, + sort_etypes=sort_etypes, + ) + + + part_config = os.path.join(out_path, graph_name + ".json") + if use_graphbolt: + kwargs["graph_formats"] = graph_formats + dgl_partition_to_graphbolt( + part_config, + parts=parts, + part_meta=part_metadata, + **kwargs, ) + else: + _dump_part_config(part_config, part_metadata) + print( "Save partitions: {:.3f} seconds, peak memory: {:.3f} GB".format( time.time() - start, get_peak_mem() ) ) - part_config = os.path.join(out_path, graph_name + ".json") - _dump_part_config(part_config, part_metadata) - num_cuts = sim_g.num_edges() - tot_num_inner_edges if num_parts == 1: num_cuts = 0 @@ -1340,13 +1360,6 @@ def get_homogeneous(g, balance_ntypes): ) ) - if use_graphbolt: - kwargs["graph_formats"] = graph_formats - dgl_partition_to_graphbolt( - part_config, - **kwargs, - ) - if return_mapping: return orig_nids, orig_eids @@ -1392,9 +1405,9 @@ def init_type_per_edge(graph, gpb): etype_ids = gpb.map_to_per_etype(graph.edata[EID])[0] return etype_ids - -def gb_convert_single_dgl_partition( +def gb_convert_single_dgl_partition(# TODO change this part_id, + parts, graph_formats, part_config, store_eids, @@ -1427,14 +1440,18 @@ def gb_convert_single_dgl_partition( "Running in debug mode which means all attributes of DGL partitions" " will be saved to the new format." ) - + part_meta = _load_part_config(part_config) num_parts = part_meta["num_parts"] - graph, _, _, gpb, _, _, _ = load_partition( - part_config, part_id, load_feats=False - ) - _, _, ntypes, etypes = load_partition_book(part_config, part_id) + if parts!=None: + assert len(parts)==num_parts + graph=parts[part_id] + else: + graph, _, _, gpb, _, _, _ = load_partition( + part_config, part_id, load_feats=False + ) + gpb, _, ntypes, etypes = load_partition_book(part_config, part_id) is_homo = is_homogeneous(ntypes, etypes) node_type_to_id = ( None if is_homo else {ntype: ntid for ntid, ntype in enumerate(ntypes)} @@ -1503,7 +1520,7 @@ def gb_convert_single_dgl_partition( indptr, dtype=indices.dtype ) - # Cast various data to minimum dtype. + # Cast various data to minimum dtype.#note convert to minimun dtype # Cast 1: indptr. indptr = _cast_to_minimum_dtype(graph.num_edges(), indptr) # Cast 2: indices. @@ -1552,7 +1569,6 @@ def gb_convert_single_dgl_partition( return os.path.relpath(csc_graph_path, os.path.dirname(part_config)) # Update graph path. - def dgl_partition_to_graphbolt( part_config, *, @@ -1561,7 +1577,10 @@ def dgl_partition_to_graphbolt( store_inner_edge=False, graph_formats=None, n_jobs=1, -): + parts=None, + part_meta=None +):# note + """Convert partitions of dgl to FusedCSCSamplingGraph of GraphBolt. This API converts `DGLGraph` partitions to `FusedCSCSamplingGraph` which is @@ -1598,7 +1617,8 @@ def dgl_partition_to_graphbolt( "Running in debug mode which means all attributes of DGL partitions" " will be saved to the new format." ) - part_meta = _load_part_config(part_config) + if part_meta==None: + part_meta = _load_part_config(part_config) new_part_meta = copy.deepcopy(part_meta) num_parts = part_meta["num_parts"] @@ -1615,6 +1635,7 @@ def dgl_partition_to_graphbolt( convert_with_format = partial( gb_convert_single_dgl_partition, graph_formats=graph_formats, + parts=parts, part_config=part_config, store_eids=store_eids, store_inner_node=store_inner_node, From e0313ad7cd7da0ac86522e37b6e572e162506cc1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Fri, 11 Oct 2024 05:51:13 +0000 Subject: [PATCH 02/17] create few node branch --- tests/tools/test_dist_part.py | 207 +++++++++++++++++++++++++++++++- tools/distpartitioning/utils.py | 2 + 2 files changed, 205 insertions(+), 4 deletions(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 80b6419d4938..9416e91e8eb9 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -6,6 +6,8 @@ import numpy as np import pyarrow.parquet as pq +from scipy import sparse as spsp +import dgl.backend as F import pytest import torch from dgl.data.utils import load_graphs, load_tensors @@ -19,7 +21,10 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import create_chunked_dataset +from pytest_utils import ( + create_chunked_dataset, + chunk_graph, +) from tools.verification_utils import ( verify_graph_feats, @@ -202,6 +207,111 @@ def test_chunk_graph_arbitrary_chunks( ) +def create_mini_chunked_dataset(root_dir, num_chunks, data_fmt, edges_fmt, vector_rows, **kwargs): + num_nodes = {"n1": 1000, "n2": 1010, "n3": 1020} + etypes = [ + ("n1", "r1", "n2"), + ("n2", "r1", "n1"), + ("n1", "r2", "n3"), + ("n2", "r3", "n3"), + ] + edges_coo = {} + for etype in etypes: + src_ntype, _, dst_ntype = etype + arr = spsp.random( + num_nodes[src_ntype], + num_nodes[dst_ntype], + density=0.001, + format="coo", + random_state=100, + ) + edges_coo[etype] = (arr.row, arr.col) + edges_coo[("n1", "a0", "n2")] = (torch.tensor([0, 1]), torch.tensor([1, 0])) + edges_coo[("n1", "a1", "n3")] = (torch.tensor([0, 1]), torch.tensor([1, 0])) + g=dgl.heterograph(edges_coo) + + # save feature + n1_feats = np.random.randn(num_nodes["n1"], 3) + n2_feats = np.random.randn(num_nodes["n2"], 3) + n3_feats = np.random.randn(num_nodes["n3"], 3) + + n1_label = np.random.choice(4, 1000) + n2_label = np.random.choice(4, 1010) + n3_label = np.random.choice(4, 1020) + + a0_feat = np.random.randn(2, 3) + a1_feat = np.random.randn(2, 3) + + input_dir = os.path.join(root_dir, "data_test") + os.makedirs(input_dir) + for sub_d in ["n1", "n2", "n3", "a0", "a1"]: + os.makedirs(os.path.join(input_dir, sub_d)) + + n1_feat_path = os.path.join(input_dir, "n1/feat.npy") + with open(n1_feat_path, "wb") as f: + np.save(f, n1_feats) + g.nodes["n1"].data["feat"] = torch.from_numpy(n1_feats) + + n2_feat_path = os.path.join(input_dir, "n2/feat.npy") + with open(n2_feat_path, "wb") as f: + np.save(f, n2_feats) + g.nodes["n2"].data["feat"] = torch.from_numpy(n2_feats) + + n3_feat_path = os.path.join(input_dir, "n3/feat.npy") + with open(n3_feat_path, "wb") as f: + np.save(f, n3_feats) + g.nodes["n3"].data["feat"] = torch.from_numpy(n3_feats) + + n1_label_path = os.path.join(input_dir, "n1/label.npy") + with open(n1_label_path, "wb") as f: + np.save(f, n1_label) + g.nodes["n1"].data["label"] = torch.from_numpy(n1_label) + + n2_label_path = os.path.join(input_dir, "n2/label.npy") + with open(n2_label_path, "wb") as f: + np.save(f, n2_label) + g.nodes["n2"].data["label"] = torch.from_numpy(n2_label) + + n3_label_path = os.path.join(input_dir, "n3/label.npy") + with open(n3_label_path, "wb") as f: + np.save(f, n3_label) + g.nodes["n3"].data["label"] = torch.from_numpy(n3_label) + + a0_feat_path = os.path.join(input_dir, "a0/feat.npy") + with open(a0_feat_path, "wb") as f: + np.save(f, a0_feat) + g.edges["n1", "a0", "n2"].data["feat"] = torch.from_numpy(a0_feat) + + a1_feat_path = os.path.join(input_dir, "a1/feat.npy") + with open(a1_feat_path, "wb") as f: + np.save(f, a1_feat) + g.edges["n1", "a1", "n3"].data["feat"] = torch.from_numpy(a1_feat) + + node_data={ + "n1": {"feat": n1_feat_path, "label": n1_label_path}, + "n2": {"feat": n2_feat_path, "label": n2_label_path}, + "n3": {"feat": n3_feat_path, "label": n3_label_path}, + } + edge_data={ + ("n1", "a0", "n2"): {"feat": a0_feat_path}, + ("n1", "a1", "n3"): {"feat": a1_feat_path}, + } + output_dir = os.path.join(root_dir, "chunked-data") + chunk_graph( + g, + "mag240m", + node_data, + edge_data, + num_chunks=num_chunks, + output_path=output_dir, + data_fmt=data_fmt, + edges_fmt=edges_fmt, + vector_rows=vector_rows, + **kwargs, + ) + return g + + def _test_pipeline( num_chunks, num_parts, @@ -234,7 +344,7 @@ def _test_pipeline( in_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "parted_data") os.system( - "python3 tools/partition_algo/random_partition.py " + "/opt/conda/envs/pytorch/bin/python tools/partition_algo/random_partition.py " "--in_dir {} --out_dir {} --num_partitions {}".format( in_dir, output_dir, num_parts ) @@ -253,7 +363,7 @@ def _test_pipeline( for i in range(world_size): f.write(f"127.0.0.{i + 1}\n") - cmd = "python3 tools/dispatch_data.py" + cmd = "/opt/conda/envs/pytorch/bin/python tools/dispatch_data.py" cmd += f" --in-dir {in_dir}" cmd += f" --partitions-dir {partition_dir}" cmd += f" --out-dir {out_dir}" @@ -267,7 +377,7 @@ def _test_pipeline( # check if verify_partitions.py is used for validation. if use_verify_partitions: - cmd = "python3 tools/verify_partitions.py " + cmd = "/opt/conda/envs/pytorch/bin/python tools/verify_partitions.py " cmd += f" --orig-dataset-dir {in_dir}" cmd += f" --part-graph {out_dir}" cmd += f" --partitions-dir {output_dir}" @@ -373,9 +483,98 @@ def test_pipeline_feature_format(data_fmt): _test_pipeline(4, 4, 4, data_fmt=data_fmt) +def test_partition_hetero_few_nodes( + num_chunks, + num_parts, + world_size, + graph_formats=None, + data_fmt="numpy", + edges_fmt="csv", + vector_rows=False, + num_chunks_nodes=None, + num_chunks_edges=None, + num_chunks_node_data=None, + num_chunks_edge_data=None, +): + with tempfile.TemporaryDirectory() as root_dir: + g = create_mini_chunked_dataset( + root_dir, + num_chunks, + data_fmt=data_fmt, + edges_fmt=edges_fmt, + vector_rows=vector_rows, + num_chunks_nodes=num_chunks_nodes, + num_chunks_edges=num_chunks_edges, + num_chunks_node_data=num_chunks_node_data, + num_chunks_edge_data=num_chunks_edge_data, + ) + + # Step1: graph partition + in_dir = os.path.join(root_dir, "chunked-data") + output_dir = os.path.join(root_dir, "parted_data") + os.system( + "/opt/conda/envs/pytorch/bin/python tools/partition_algo/random_partition.py " + "--in_dir {} --out_dir {} --num_partitions {}".format( + in_dir, output_dir, num_parts + ) + ) + + # Step2: data dispatch + partition_dir = os.path.join(root_dir, "parted_data") + out_dir = os.path.join(root_dir, "partitioned") + ip_config = os.path.join(root_dir, "ip_config.txt") + with open(ip_config, "w") as f: + for i in range(world_size): + f.write(f"127.0.0.{i + 1}\n") + + cmd = "/opt/conda/envs/pytorch/bin/python tools/dispatch_data.py" + cmd += f" --in-dir {in_dir}" + cmd += f" --partitions-dir {partition_dir}" + cmd += f" --out-dir {out_dir}" + cmd += f" --ip-config {ip_config}" + cmd += " --ssh-port 22" + cmd += " --process-group-timeout 60" + cmd += " --save-orig-nids" + cmd += " --save-orig-eids" + cmd += f" --graph-formats {graph_formats}" if graph_formats else "" + os.system(cmd) + + # read original node/edge IDs + def read_orig_ids(fname): + orig_ids = {} + for i in range(num_parts): + ids_path = os.path.join(out_dir, f"part{i}", fname) + part_ids = load_tensors(ids_path) + for type, data in part_ids.items(): + if type not in orig_ids: + orig_ids[type] = data + else: + orig_ids[type] = torch.cat((orig_ids[type], data)) + return orig_ids + + orig_nids = read_orig_ids("orig_nids.dgl") + orig_eids = read_orig_ids("orig_eids.dgl") + + # load partitions and verify + part_config = os.path.join(out_dir, "metadata.json") + for i in range(num_parts): + part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition( + part_config, i + ) + verify_partition_data_types(part_g) + verify_partition_formats(part_g, graph_formats) + verify_graph_feats( + g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids + ) + + def test_utils_generate_read_list(): read_list = generate_read_list(10, 4) assert np.array_equal(read_list[0], np.array([0, 1, 2])) assert np.array_equal(read_list[1], np.array([3, 4, 5])) assert np.array_equal(read_list[2], np.array([6, 7])) assert np.array_equal(read_list[3], np.array([8, 9])) + +if __name__=='__main__': + test_partition_hetero_few_nodes(2,4,4) + # test_pipeline_basics(4,4,4,) \ No newline at end of file diff --git a/tools/distpartitioning/utils.py b/tools/distpartitioning/utils.py index 32292a843bc5..b1411be27548 100644 --- a/tools/distpartitioning/utils.py +++ b/tools/distpartitioning/utils.py @@ -3,6 +3,8 @@ import os from itertools import cycle +import sys +sys.path.append('/home/ubuntu/workspace/dgl/tools/distpartitioning') import constants import dgl From 78ff94cc51103de95fdb03f8a22361264ffafa02 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Oct 2024 03:10:22 +0000 Subject: [PATCH 03/17] fix few entity id's issue --- tests/tools/test_dist_part.py | 162 ++++++++++---------- tools/distpartitioning/convert_partition.py | 53 ++++++- tools/distpartitioning/data_shuffle.py | 55 +++++-- tools/distpartitioning/dataset_utils.py | 27 ++-- 4 files changed, 182 insertions(+), 115 deletions(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index 9416e91e8eb9..c4d2eeed3fac 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -3,11 +3,10 @@ import tempfile import dgl +import dgl.backend as F import numpy as np import pyarrow.parquet as pq -from scipy import sparse as spsp -import dgl.backend as F import pytest import torch from dgl.data.utils import load_graphs, load_tensors @@ -21,10 +20,8 @@ from distpartitioning import array_readwriter from distpartitioning.utils import generate_read_list -from pytest_utils import ( - create_chunked_dataset, - chunk_graph, -) +from pytest_utils import chunk_graph, create_chunked_dataset +from scipy import sparse as spsp from tools.verification_utils import ( verify_graph_feats, @@ -207,7 +204,15 @@ def test_chunk_graph_arbitrary_chunks( ) -def create_mini_chunked_dataset(root_dir, num_chunks, data_fmt, edges_fmt, vector_rows, **kwargs): +def create_mini_chunked_dataset( + root_dir, + num_chunks, + data_fmt, + edges_fmt, + vector_rows, + few_entity="node", + **kwargs, +): num_nodes = {"n1": 1000, "n2": 1010, "n3": 1020} etypes = [ ("n1", "r1", "n2"), @@ -215,6 +220,7 @@ def create_mini_chunked_dataset(root_dir, num_chunks, data_fmt, edges_fmt, vecto ("n1", "r2", "n3"), ("n2", "r3", "n3"), ] + node_items = ["n1", "n2", "n3"] edges_coo = {} for etype in etypes: src_ntype, _, dst_ntype = etype @@ -226,76 +232,59 @@ def create_mini_chunked_dataset(root_dir, num_chunks, data_fmt, edges_fmt, vecto random_state=100, ) edges_coo[etype] = (arr.row, arr.col) - edges_coo[("n1", "a0", "n2")] = (torch.tensor([0, 1]), torch.tensor([1, 0])) - edges_coo[("n1", "a1", "n3")] = (torch.tensor([0, 1]), torch.tensor([1, 0])) - g=dgl.heterograph(edges_coo) - + edge_items = [] + if few_entity == "edge": + edges_coo[("n1", "a0", "n2")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edges_coo[("n1", "a1", "n3")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edge_items.append(("n1", "a0", "n2")) + edge_items.append(("n1", "a1", "n3")) + elif few_entity == "node": + edges_coo[("n1", "r_few", "n_few")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edges_coo[("a0", "a01", "n_1")] = ( + torch.tensor([0, 1]), + torch.tensor([1, 0]), + ) + edge_items.append(("n1", "r_few", "n_few")) + edge_items.append(("a0", "a01", "n_1")) + node_items.append("n_few") + node_items.append("n_1") + num_nodes["n_few"] = 2 + num_nodes["n_1"] = 2 + g = dgl.heterograph(edges_coo) + + node_data = {} + edge_data = {} # save feature - n1_feats = np.random.randn(num_nodes["n1"], 3) - n2_feats = np.random.randn(num_nodes["n2"], 3) - n3_feats = np.random.randn(num_nodes["n3"], 3) - - n1_label = np.random.choice(4, 1000) - n2_label = np.random.choice(4, 1010) - n3_label = np.random.choice(4, 1020) + input_dir = os.path.join(root_dir, "data_test") - a0_feat = np.random.randn(2, 3) - a1_feat = np.random.randn(2, 3) + for ntype in node_items: + os.makedirs(os.path.join(input_dir, ntype)) + feat = np.random.randn(num_nodes[ntype], 3) + feat_path = os.path.join(input_dir, f"{ntype}/feat.npy") + with open(feat_path, "wb") as f: + np.save(f, feat) + g.nodes[ntype].data["feat"] = torch.from_numpy(feat) + node_data[ntype] = {"feat": feat_path} + + for etype in set(edge_items): + os.makedirs(os.path.join(input_dir, etype[1])) + num_edge = len(edges_coo[etype][0]) + feat = np.random.randn(num_edge, 4) + feat_path = os.path.join(input_dir, f"{etype[1]}/feat.npy") + with open(feat_path, "wb") as f: + np.save(f, feat) + g.edges[etype].data["feat"] = torch.from_numpy(feat) + edge_data[etype] = {"feat": feat_path} - input_dir = os.path.join(root_dir, "data_test") - os.makedirs(input_dir) - for sub_d in ["n1", "n2", "n3", "a0", "a1"]: - os.makedirs(os.path.join(input_dir, sub_d)) - - n1_feat_path = os.path.join(input_dir, "n1/feat.npy") - with open(n1_feat_path, "wb") as f: - np.save(f, n1_feats) - g.nodes["n1"].data["feat"] = torch.from_numpy(n1_feats) - - n2_feat_path = os.path.join(input_dir, "n2/feat.npy") - with open(n2_feat_path, "wb") as f: - np.save(f, n2_feats) - g.nodes["n2"].data["feat"] = torch.from_numpy(n2_feats) - - n3_feat_path = os.path.join(input_dir, "n3/feat.npy") - with open(n3_feat_path, "wb") as f: - np.save(f, n3_feats) - g.nodes["n3"].data["feat"] = torch.from_numpy(n3_feats) - - n1_label_path = os.path.join(input_dir, "n1/label.npy") - with open(n1_label_path, "wb") as f: - np.save(f, n1_label) - g.nodes["n1"].data["label"] = torch.from_numpy(n1_label) - - n2_label_path = os.path.join(input_dir, "n2/label.npy") - with open(n2_label_path, "wb") as f: - np.save(f, n2_label) - g.nodes["n2"].data["label"] = torch.from_numpy(n2_label) - - n3_label_path = os.path.join(input_dir, "n3/label.npy") - with open(n3_label_path, "wb") as f: - np.save(f, n3_label) - g.nodes["n3"].data["label"] = torch.from_numpy(n3_label) - - a0_feat_path = os.path.join(input_dir, "a0/feat.npy") - with open(a0_feat_path, "wb") as f: - np.save(f, a0_feat) - g.edges["n1", "a0", "n2"].data["feat"] = torch.from_numpy(a0_feat) - - a1_feat_path = os.path.join(input_dir, "a1/feat.npy") - with open(a1_feat_path, "wb") as f: - np.save(f, a1_feat) - g.edges["n1", "a1", "n3"].data["feat"] = torch.from_numpy(a1_feat) - - node_data={ - "n1": {"feat": n1_feat_path, "label": n1_label_path}, - "n2": {"feat": n2_feat_path, "label": n2_label_path}, - "n3": {"feat": n3_feat_path, "label": n3_label_path}, - } - edge_data={ - ("n1", "a0", "n2"): {"feat": a0_feat_path}, - ("n1", "a1", "n3"): {"feat": a1_feat_path}, - } output_dir = os.path.join(root_dir, "chunked-data") chunk_graph( g, @@ -344,7 +333,7 @@ def _test_pipeline( in_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "parted_data") os.system( - "/opt/conda/envs/pytorch/bin/python tools/partition_algo/random_partition.py " + "python tools/partition_algo/random_partition.py " "--in_dir {} --out_dir {} --num_partitions {}".format( in_dir, output_dir, num_parts ) @@ -363,7 +352,7 @@ def _test_pipeline( for i in range(world_size): f.write(f"127.0.0.{i + 1}\n") - cmd = "/opt/conda/envs/pytorch/bin/python tools/dispatch_data.py" + cmd = "python tools/dispatch_data.py" cmd += f" --in-dir {in_dir}" cmd += f" --partitions-dir {partition_dir}" cmd += f" --out-dir {out_dir}" @@ -377,7 +366,9 @@ def _test_pipeline( # check if verify_partitions.py is used for validation. if use_verify_partitions: - cmd = "/opt/conda/envs/pytorch/bin/python tools/verify_partitions.py " + cmd = ( + "/opt/conda/envs/pytorch/bin/python tools/verify_partitions.py " + ) cmd += f" --orig-dataset-dir {in_dir}" cmd += f" --part-graph {out_dir}" cmd += f" --partitions-dir {output_dir}" @@ -483,10 +474,16 @@ def test_pipeline_feature_format(data_fmt): _test_pipeline(4, 4, 4, data_fmt=data_fmt) -def test_partition_hetero_few_nodes( +@pytest.mark.parametrize( + "num_chunks, num_parts, world_size", + [[4, 4, 4], [8, 4, 2], [8, 4, 4], [9, 6, 3], [11, 11, 1], [11, 4, 1]], +) +@pytest.mark.parametrize("few_entity", ["node", "edge"]) +def test_partition_hetero_few_entity( num_chunks, num_parts, world_size, + few_entity, graph_formats=None, data_fmt="numpy", edges_fmt="csv", @@ -500,6 +497,7 @@ def test_partition_hetero_few_nodes( g = create_mini_chunked_dataset( root_dir, num_chunks, + few_entity=few_entity, data_fmt=data_fmt, edges_fmt=edges_fmt, vector_rows=vector_rows, @@ -513,7 +511,7 @@ def test_partition_hetero_few_nodes( in_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "parted_data") os.system( - "/opt/conda/envs/pytorch/bin/python tools/partition_algo/random_partition.py " + "python tools/partition_algo/random_partition.py " "--in_dir {} --out_dir {} --num_partitions {}".format( in_dir, output_dir, num_parts ) @@ -527,7 +525,7 @@ def test_partition_hetero_few_nodes( for i in range(world_size): f.write(f"127.0.0.{i + 1}\n") - cmd = "/opt/conda/envs/pytorch/bin/python tools/dispatch_data.py" + cmd = "python tools/dispatch_data.py" cmd += f" --in-dir {in_dir}" cmd += f" --partitions-dir {partition_dir}" cmd += f" --out-dir {out_dir}" @@ -574,7 +572,3 @@ def test_utils_generate_read_list(): assert np.array_equal(read_list[1], np.array([3, 4, 5])) assert np.array_equal(read_list[2], np.array([6, 7])) assert np.array_equal(read_list[3], np.array([8, 9])) - -if __name__=='__main__': - test_partition_hetero_few_nodes(2,4,4) - # test_pipeline_basics(4,4,4,) \ No newline at end of file diff --git a/tools/distpartitioning/convert_partition.py b/tools/distpartitioning/convert_partition.py index 5013b6d40f20..903b84540914 100644 --- a/tools/distpartitioning/convert_partition.py +++ b/tools/distpartitioning/convert_partition.py @@ -9,6 +9,7 @@ import dgl.graphbolt as gb import numpy as np import torch as th +import torch.distributed as dist from dgl import EID, ETYPE, NID, NTYPE from dgl.distributed.constants import DGL2GB_EID, GB_DST_ID @@ -355,6 +356,33 @@ def _process_partition_gb( return indptr, indices[sorted_idx], edge_ids[sorted_idx] +def update_node_map(node_map_val, end_ids_per_rank, id_ntypes, prev_last_id): + # Update the node_map_val to be contiguous. + rank = dist.get_rank() + prev_end_id = ( + end_ids_per_rank[rank - 1].item() if rank > 0 else prev_last_id + ) + ntype_ids = {ntype: ntype_id for ntype_id, ntype in enumerate(id_ntypes)} + for ntype_id in list(ntype_ids.values()): + ntype = id_ntypes[ntype_id] + start_id = node_map_val[ntype][0][0] + end_id = node_map_val[ntype][0][1] + if not (start_id == -1 and end_id == -1): + continue + prev_ntype_id = ( + ntype_ids[ntype] - 1 + if ntype_ids[ntype] > 0 + else max(ntype_ids.values()) + ) + prev_ntype = id_ntypes[prev_ntype_id] + if ntype_ids[ntype] == 0: + node_map_val[ntype][0][0] = prev_end_id + else: + node_map_val[ntype][0][0] = node_map_val[prev_ntype][0][1] + node_map_val[ntype][0][1] = node_map_val[ntype][0][0] + return node_map_val[ntype][0][-1] + + def create_graph_object( tot_node_count, tot_edge_count, @@ -368,6 +396,7 @@ def create_graph_object( edgeid_offset, node_typecounts, edge_typecounts, + last_ids={}, return_orig_nids=False, return_orig_eids=False, use_graphbolt=False, @@ -512,12 +541,30 @@ def create_graph_object( shuffle_global_nid_range = (shuffle_global_nids[0], shuffle_global_nids[-1]) # Determine the node ID ranges of different node types. + prev_last_id = last_ids.get(part_id - 1, 0) for ntype_name in global_nid_ranges: ntype_id = ntypes_map[ntype_name] type_nids = shuffle_global_nids[ntype_ids == ntype_id] - node_map_val[ntype_name].append( - [int(type_nids[0]), int(type_nids[-1]) + 1] - ) + if len(type_nids) == 0: + node_map_val[ntype_name].append([-1, -1]) + else: + node_map_val[ntype_name].append( + [int(type_nids[0]), int(type_nids[-1]) + 1] + ) + last_id = th.tensor( + [max(prev_last_id, int(type_nids[-1]) + 1)], dtype=th.int64 + ) + id_ntypes = list(global_nid_ranges.keys()) + + gather_last_ids = [ + th.zeros(1, dtype=th.int64) for _ in range(dist.get_world_size()) + ] + + dist.all_gather(gather_last_ids, last_id) + prev_last_id = update_node_map( + node_map_val, gather_last_ids, id_ntypes, prev_last_id + ) + last_ids[part_id] = prev_last_id # process edges memory_snapshot("CreateDGLObj_AssignEdgeData: ", part_id) diff --git a/tools/distpartitioning/data_shuffle.py b/tools/distpartitioning/data_shuffle.py index 6800064a2b0b..fc4ee0283069 100644 --- a/tools/distpartitioning/data_shuffle.py +++ b/tools/distpartitioning/data_shuffle.py @@ -285,21 +285,21 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup): local_etype_ids.append(rcvd_edge_data[:, 3]) local_eids.append(rcvd_edge_data[:, 4]) - edge_data[ - constants.GLOBAL_SRC_ID + "/" + str(local_part_id) - ] = np.concatenate(local_src_ids) - edge_data[ - constants.GLOBAL_DST_ID + "/" + str(local_part_id) - ] = np.concatenate(local_dst_ids) - edge_data[ - constants.GLOBAL_TYPE_EID + "/" + str(local_part_id) - ] = np.concatenate(local_type_eids) - edge_data[ - constants.ETYPE_ID + "/" + str(local_part_id) - ] = np.concatenate(local_etype_ids) - edge_data[ - constants.GLOBAL_EID + "/" + str(local_part_id) - ] = np.concatenate(local_eids) + edge_data[constants.GLOBAL_SRC_ID + "/" + str(local_part_id)] = ( + np.concatenate(local_src_ids) + ) + edge_data[constants.GLOBAL_DST_ID + "/" + str(local_part_id)] = ( + np.concatenate(local_dst_ids) + ) + edge_data[constants.GLOBAL_TYPE_EID + "/" + str(local_part_id)] = ( + np.concatenate(local_type_eids) + ) + edge_data[constants.ETYPE_ID + "/" + str(local_part_id)] = ( + np.concatenate(local_etype_ids) + ) + edge_data[constants.GLOBAL_EID + "/" + str(local_part_id)] = ( + np.concatenate(local_eids) + ) # Check if the data was exchanged correctly local_edge_count = 0 @@ -489,6 +489,10 @@ def exchange_feature( feat_dims_dtype.append(DATA_TYPE_ID[torch.float32]) feature_dimension = 0 + feature_dimension_tensor = torch.tensor([feature_dimension]) + dist.all_reduce(feature_dimension_tensor, op=dist.ReduceOp.MAX) + feature_dimension = feature_dimension_tensor.item() + logging.debug(f"Sending the feature shape information - {feat_dims_dtype}") all_dims_dtype = allgather_sizes( feat_dims_dtype, world_size, num_parts, return_sizes=True @@ -553,7 +557,11 @@ def exchange_feature( else: cur_features[local_feat_key] = output_feat_list cur_global_ids[local_feat_key] = output_id_list - + else: + cur_features[local_feat_key] = torch.empty( + (0, feature_dimension), dtype=torch.float32 + ) + cur_global_ids[local_feat_key] = torch.empty((0,), dtype=torch.int64) return cur_features, cur_global_ids @@ -1301,6 +1309,7 @@ def prepare_local_data(src_data, local_part_id): if params.graph_formats: graph_formats = params.graph_formats.split(",") + prev_last_ids = {} for local_part_id in range(params.num_parts // world_size): # Synchronize for each local partition of the graph object. dist.barrier() @@ -1340,6 +1349,7 @@ def prepare_local_data(src_data, local_part_id): schema_map[constants.STR_NUM_NODES_PER_TYPE], ), edge_typecounts, + prev_last_ids, return_orig_nids=params.save_orig_nids, return_orig_eids=params.save_orig_eids, use_graphbolt=params.use_graphbolt, @@ -1390,6 +1400,19 @@ def prepare_local_data(src_data, local_part_id): ] = json_metadata memory_snapshot("MetadataCreateComplete: ", rank) + last_id_tensor = torch.tensor( + [prev_last_ids[rank + (local_part_id * world_size)]], + dtype=torch.int64, + ) + gather_list = [ + torch.zeros(1, dtype=torch.int64) for _ in range(world_size) + ] + dist.all_gather(gather_list, last_id_tensor) + for rank_id, last_id in enumerate(gather_list): + prev_last_ids[rank_id + (local_part_id * world_size)] = ( + last_id.item() + ) + if rank == 0: # get meta-data from all partitions and merge them on rank-0 metadata_list = gather_metadata_json(output_meta_json, rank, world_size) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 95ba386853aa..f6b542f34b53 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -547,18 +547,21 @@ def get_dataset( autogenerate_column_names=True, ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") - with pyarrow.csv.open_csv( - edge_file, - read_options=read_options, - parse_options=parse_options, - ) as reader: - for next_chunk in reader: - if next_chunk is None: - break - - next_table = pyarrow.Table.from_batches([next_chunk]) - src_ids.append(next_table["f0"].to_numpy()) - dst_ids.append(next_table["f1"].to_numpy()) + if os.path.getsize(edge_file) != 0: + with pyarrow.csv.open_csv( + edge_file, + read_options=read_options, + parse_options=parse_options, + ) as reader: + for next_chunk in reader: + if next_chunk is None: + break + + next_table = pyarrow.Table.from_batches( + [next_chunk] + ) + src_ids.append(next_table["f0"].to_numpy()) + dst_ids.append(next_table["f1"].to_numpy()) elif ( etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_PARQUET From 83b37e01e77c3b0e12b3e14e60191aa2290c2b52 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Oct 2024 03:21:15 +0000 Subject: [PATCH 04/17] fix format --- python/dgl/distributed/partition.py | 4 ++- tools/distpartitioning/data_shuffle.py | 36 +++++++++++++------------- tools/distpartitioning/utils.py | 2 -- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 2c2f40e9b17b..42ace8a7f4d7 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1268,7 +1268,9 @@ def get_homogeneous(g, balance_ntypes): int(F.as_scalar(inner_nids[-1])) + 1, ] ) - val = np.cumsum(val).tolist()# note computing the cumulative sum of array elements. + val = np.cumsum( + val + ).tolist()# note computing the cumulative sum of array elements. assert val[-1] == g.num_nodes(ntype) for etype in g.canonical_etypes: etype_id = g.get_etype_id(etype) diff --git a/tools/distpartitioning/data_shuffle.py b/tools/distpartitioning/data_shuffle.py index fc4ee0283069..6aba4fb59b3a 100644 --- a/tools/distpartitioning/data_shuffle.py +++ b/tools/distpartitioning/data_shuffle.py @@ -285,21 +285,21 @@ def exchange_edge_data(rank, world_size, num_parts, edge_data, id_lookup): local_etype_ids.append(rcvd_edge_data[:, 3]) local_eids.append(rcvd_edge_data[:, 4]) - edge_data[constants.GLOBAL_SRC_ID + "/" + str(local_part_id)] = ( - np.concatenate(local_src_ids) - ) - edge_data[constants.GLOBAL_DST_ID + "/" + str(local_part_id)] = ( - np.concatenate(local_dst_ids) - ) - edge_data[constants.GLOBAL_TYPE_EID + "/" + str(local_part_id)] = ( - np.concatenate(local_type_eids) - ) - edge_data[constants.ETYPE_ID + "/" + str(local_part_id)] = ( - np.concatenate(local_etype_ids) - ) - edge_data[constants.GLOBAL_EID + "/" + str(local_part_id)] = ( - np.concatenate(local_eids) - ) + edge_data[ + constants.GLOBAL_SRC_ID + "/" + str(local_part_id) + ] = np.concatenate(local_src_ids) + edge_data[ + constants.GLOBAL_DST_ID + "/" + str(local_part_id) + ] = np.concatenate(local_dst_ids) + edge_data[ + constants.GLOBAL_TYPE_EID + "/" + str(local_part_id) + ] = np.concatenate(local_type_eids) + edge_data[ + constants.ETYPE_ID + "/" + str(local_part_id) + ] = np.concatenate(local_etype_ids) + edge_data[ + constants.GLOBAL_EID + "/" + str(local_part_id) + ] = np.concatenate(local_eids) # Check if the data was exchanged correctly local_edge_count = 0 @@ -1409,9 +1409,9 @@ def prepare_local_data(src_data, local_part_id): ] dist.all_gather(gather_list, last_id_tensor) for rank_id, last_id in enumerate(gather_list): - prev_last_ids[rank_id + (local_part_id * world_size)] = ( - last_id.item() - ) + prev_last_ids[ + rank_id + (local_part_id * world_size) + ] = last_id.item() if rank == 0: # get meta-data from all partitions and merge them on rank-0 diff --git a/tools/distpartitioning/utils.py b/tools/distpartitioning/utils.py index b1411be27548..32292a843bc5 100644 --- a/tools/distpartitioning/utils.py +++ b/tools/distpartitioning/utils.py @@ -3,8 +3,6 @@ import os from itertools import cycle -import sys -sys.path.append('/home/ubuntu/workspace/dgl/tools/distpartitioning') import constants import dgl From 0e9cf4c80969964406520a670164f4e43180c11c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Oct 2024 03:23:23 +0000 Subject: [PATCH 05/17] fix format --- tests/tools/test_dist_part.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/tools/test_dist_part.py b/tests/tools/test_dist_part.py index c4d2eeed3fac..763019a40d92 100644 --- a/tests/tools/test_dist_part.py +++ b/tests/tools/test_dist_part.py @@ -333,7 +333,7 @@ def _test_pipeline( in_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "parted_data") os.system( - "python tools/partition_algo/random_partition.py " + "python3 tools/partition_algo/random_partition.py " "--in_dir {} --out_dir {} --num_partitions {}".format( in_dir, output_dir, num_parts ) @@ -352,7 +352,7 @@ def _test_pipeline( for i in range(world_size): f.write(f"127.0.0.{i + 1}\n") - cmd = "python tools/dispatch_data.py" + cmd = "python3 tools/dispatch_data.py" cmd += f" --in-dir {in_dir}" cmd += f" --partitions-dir {partition_dir}" cmd += f" --out-dir {out_dir}" @@ -366,9 +366,7 @@ def _test_pipeline( # check if verify_partitions.py is used for validation. if use_verify_partitions: - cmd = ( - "/opt/conda/envs/pytorch/bin/python tools/verify_partitions.py " - ) + cmd = "python3 tools/verify_partitions.py " cmd += f" --orig-dataset-dir {in_dir}" cmd += f" --part-graph {out_dir}" cmd += f" --partitions-dir {output_dir}" @@ -511,7 +509,7 @@ def test_partition_hetero_few_entity( in_dir = os.path.join(root_dir, "chunked-data") output_dir = os.path.join(root_dir, "parted_data") os.system( - "python tools/partition_algo/random_partition.py " + "python3 tools/partition_algo/random_partition.py " "--in_dir {} --out_dir {} --num_partitions {}".format( in_dir, output_dir, num_parts ) @@ -525,7 +523,7 @@ def test_partition_hetero_few_entity( for i in range(world_size): f.write(f"127.0.0.{i + 1}\n") - cmd = "python tools/dispatch_data.py" + cmd = "python3 tools/dispatch_data.py" cmd += f" --in-dir {in_dir}" cmd += f" --partitions-dir {partition_dir}" cmd += f" --out-dir {out_dir}" From beab715313d6fda8efe76245316966cce8135066 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Oct 2024 03:26:11 +0000 Subject: [PATCH 06/17] change format --- python/dgl/distributed/partition.py | 2 +- tools/distpartitioning/data_shuffle.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 42ace8a7f4d7..0bf6deaaba49 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1270,7 +1270,7 @@ def get_homogeneous(g, balance_ntypes): ) val = np.cumsum( val - ).tolist()# note computing the cumulative sum of array elements. + ).tolist() # note computing the cumulative sum of array elements. assert val[-1] == g.num_nodes(ntype) for etype in g.canonical_etypes: etype_id = g.get_etype_id(etype) diff --git a/tools/distpartitioning/data_shuffle.py b/tools/distpartitioning/data_shuffle.py index 6aba4fb59b3a..daae0fcbb3fd 100644 --- a/tools/distpartitioning/data_shuffle.py +++ b/tools/distpartitioning/data_shuffle.py @@ -1411,7 +1411,7 @@ def prepare_local_data(src_data, local_part_id): for rank_id, last_id in enumerate(gather_list): prev_last_ids[ rank_id + (local_part_id * world_size) - ] = last_id.item() + ] = last_id.item() if rank == 0: # get meta-data from all partitions and merge them on rank-0 From 43df3cab716571e3513b64b4e3f201557593dd5b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Oct 2024 12:03:23 +0000 Subject: [PATCH 07/17] add comment --- python/dgl/distributed/partition.py | 4 +--- tools/distpartitioning/dataset_utils.py | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/dgl/distributed/partition.py b/python/dgl/distributed/partition.py index 0bf6deaaba49..48005ffb4d27 100644 --- a/python/dgl/distributed/partition.py +++ b/python/dgl/distributed/partition.py @@ -1268,9 +1268,7 @@ def get_homogeneous(g, balance_ntypes): int(F.as_scalar(inner_nids[-1])) + 1, ] ) - val = np.cumsum( - val - ).tolist() # note computing the cumulative sum of array elements. + val = np.cumsum(val).tolist() assert val[-1] == g.num_nodes(ntype) for etype in g.canonical_etypes: etype_id = g.get_etype_id(etype) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index f6b542f34b53..8567988f192f 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -547,6 +547,8 @@ def get_dataset( autogenerate_column_names=True, ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") + # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. + # The src_ids and dst_ids should remain empty. if os.path.getsize(edge_file) != 0: with pyarrow.csv.open_csv( edge_file, From 94030de4e0e841c247ce15cb4c4e0a8dde4e6113 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 15 Oct 2024 01:38:45 +0000 Subject: [PATCH 08/17] add comment --- tools/distpartitioning/convert_partition.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tools/distpartitioning/convert_partition.py b/tools/distpartitioning/convert_partition.py index 903b84540914..de1e48e14e08 100644 --- a/tools/distpartitioning/convert_partition.py +++ b/tools/distpartitioning/convert_partition.py @@ -352,11 +352,14 @@ def _process_partition_gb( sorted_idx = ( th.repeat_interleave(indptr[:-1], split_size, dim=0) + sorted_idx ) + else: + sorted_idxs=th.arange(len(edge_ids)) return indptr, indices[sorted_idx], edge_ids[sorted_idx] -def update_node_map(node_map_val, end_ids_per_rank, id_ntypes, prev_last_id): +def _update_node_map(node_map_val, end_ids_per_rank, id_ntypes, prev_last_id): + """this function is modified from the function '_update_node_edge_map' in dgl.distributed.partition """ # Update the node_map_val to be contiguous. rank = dist.get_rank() prev_end_id = ( @@ -561,7 +564,7 @@ def create_graph_object( ] dist.all_gather(gather_last_ids, last_id) - prev_last_id = update_node_map( + prev_last_id = _update_node_map( node_map_val, gather_last_ids, id_ntypes, prev_last_id ) last_ids[part_id] = prev_last_id From 41a6b4ae5d72c0d33053f18a0188259f414e88cb Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 15 Oct 2024 01:41:41 +0000 Subject: [PATCH 09/17] change convert_partition.py --- tools/distpartitioning/convert_partition.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tools/distpartitioning/convert_partition.py b/tools/distpartitioning/convert_partition.py index de1e48e14e08..a719793997d1 100644 --- a/tools/distpartitioning/convert_partition.py +++ b/tools/distpartitioning/convert_partition.py @@ -352,8 +352,6 @@ def _process_partition_gb( sorted_idx = ( th.repeat_interleave(indptr[:-1], split_size, dim=0) + sorted_idx ) - else: - sorted_idxs=th.arange(len(edge_ids)) return indptr, indices[sorted_idx], edge_ids[sorted_idx] From 7bf402f23cdb66661aa4ea566d692e3248bb0073 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 15 Oct 2024 01:55:21 +0000 Subject: [PATCH 10/17] change format --- tools/distpartitioning/convert_partition.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/distpartitioning/convert_partition.py b/tools/distpartitioning/convert_partition.py index a719793997d1..d2d70b96e5c5 100644 --- a/tools/distpartitioning/convert_partition.py +++ b/tools/distpartitioning/convert_partition.py @@ -357,7 +357,7 @@ def _process_partition_gb( def _update_node_map(node_map_val, end_ids_per_rank, id_ntypes, prev_last_id): - """this function is modified from the function '_update_node_edge_map' in dgl.distributed.partition """ + """this function is modified from the function '_update_node_edge_map' in dgl.distributed.partition""" # Update the node_map_val to be contiguous. rank = dist.get_rank() prev_end_id = ( From 13d2dac44416b598b98a602cb9f878041594c8da Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 15 Oct 2024 10:39:22 +0000 Subject: [PATCH 11/17] change dataset_utils --- tools/distpartitioning/dataset_utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 8567988f192f..c5853d6479d2 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -547,9 +547,11 @@ def get_dataset( autogenerate_column_names=True, ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") + if os.path.getsize(edge_file) == 0: # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. # The src_ids and dst_ids should remain empty. - if os.path.getsize(edge_file) != 0: + continue + else: with pyarrow.csv.open_csv( edge_file, read_options=read_options, From ca656abb00819a011addb27c43848ea549529224 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 15 Oct 2024 10:39:58 +0000 Subject: [PATCH 12/17] change dataset_utils --- tools/distpartitioning/dataset_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index c5853d6479d2..588e3d60dd58 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -548,8 +548,8 @@ def get_dataset( ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") if os.path.getsize(edge_file) == 0: - # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. - # The src_ids and dst_ids should remain empty. + # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. + # The src_ids and dst_ids should remain empty. continue else: with pyarrow.csv.open_csv( From 8f55d30cc34f156813179726f620ebb3024b0a35 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 16 Oct 2024 03:01:59 +0000 Subject: [PATCH 13/17] change dataset_utils --- tools/distpartitioning/dataset_utils.py | 29 ++++++++++++------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 588e3d60dd58..6ecbf6e89edb 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -551,21 +551,20 @@ def get_dataset( # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. # The src_ids and dst_ids should remain empty. continue - else: - with pyarrow.csv.open_csv( - edge_file, - read_options=read_options, - parse_options=parse_options, - ) as reader: - for next_chunk in reader: - if next_chunk is None: - break - - next_table = pyarrow.Table.from_batches( - [next_chunk] - ) - src_ids.append(next_table["f0"].to_numpy()) - dst_ids.append(next_table["f1"].to_numpy()) + with pyarrow.csv.open_csv( + edge_file, + read_options=read_options, + parse_options=parse_options, + ) as reader: + for next_chunk in reader: + if next_chunk is None: + break + + next_table = pyarrow.Table.from_batches( + [next_chunk] + ) + src_ids.append(next_table["f0"].to_numpy()) + dst_ids.append(next_table["f1"].to_numpy()) elif ( etype_info[constants.STR_FORMAT][constants.STR_NAME] == constants.STR_PARQUET From d77b911318530d797708a325f057a3c27cf6e863 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 16 Oct 2024 03:02:15 +0000 Subject: [PATCH 14/17] change dataset_utils --- tools/distpartitioning/dataset_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 6ecbf6e89edb..1ca937f5a3b8 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -547,9 +547,9 @@ def get_dataset( autogenerate_column_names=True, ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") + # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. + # The src_ids and dst_ids should remain empty. if os.path.getsize(edge_file) == 0: - # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. - # The src_ids and dst_ids should remain empty. continue with pyarrow.csv.open_csv( edge_file, From 4573ea209ddbd3a54a5844f4a9f89864fd23919e Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 16 Oct 2024 04:54:04 +0000 Subject: [PATCH 15/17] change format --- tools/distpartitioning/dataset_utils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 1ca937f5a3b8..b258d13a3093 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -560,9 +560,7 @@ def get_dataset( if next_chunk is None: break - next_table = pyarrow.Table.from_batches( - [next_chunk] - ) + next_table = pyarrow.Table.from_batches([next_chunk]) src_ids.append(next_table["f0"].to_numpy()) dst_ids.append(next_table["f1"].to_numpy()) elif ( From 72ecb68e8dc92aac3fd957c369b29eecea9389e7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 16 Oct 2024 08:01:29 +0000 Subject: [PATCH 16/17] change dataset_utils --- tools/distpartitioning/dataset_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index b258d13a3093..3c7fcfb6b8fd 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -547,6 +547,7 @@ def get_dataset( autogenerate_column_names=True, ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") + # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. # The src_ids and dst_ids should remain empty. if os.path.getsize(edge_file) == 0: From b47898fe53b089aa63850a1b6a40755e89d8936d Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 17 Oct 2024 01:01:04 +0000 Subject: [PATCH 17/17] change dataset_utils --- tools/distpartitioning/dataset_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/distpartitioning/dataset_utils.py b/tools/distpartitioning/dataset_utils.py index 3c7fcfb6b8fd..fcff3cd74302 100644 --- a/tools/distpartitioning/dataset_utils.py +++ b/tools/distpartitioning/dataset_utils.py @@ -548,9 +548,9 @@ def get_dataset( ) parse_options = pyarrow.csv.ParseOptions(delimiter=" ") - # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. - # The src_ids and dst_ids should remain empty. if os.path.getsize(edge_file) == 0: + # if getsize() == 0, the file is empty, indicating that the partition doesn't have this attribute. + # The src_ids and dst_ids should remain empty. continue with pyarrow.csv.open_csv( edge_file,