From 8aa1d75728667ec2cd61f9db54e2e220c80d57e7 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Fri, 15 Nov 2024 14:52:21 +0000 Subject: [PATCH 1/7] update --- benchmark/efficiency.py | 286 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 benchmark/efficiency.py diff --git a/benchmark/efficiency.py b/benchmark/efficiency.py new file mode 100644 index 0000000..5bb19a7 --- /dev/null +++ b/benchmark/efficiency.py @@ -0,0 +1,286 @@ +"""Example script to run the models in this repository. + +python relbench_example.py --dataset rel-trial --task site-sponsor-run + --model contextgnn --epochs 10 +""" + +import argparse +import json +import os +import warnings +from pathlib import Path +from typing import Dict, List, Tuple, Union + +import numpy as np +import torch +import torch.nn.functional as F +from relbench.base import Dataset, RecommendationTask, TaskType +from relbench.datasets import get_dataset +from relbench.modeling.graph import ( + get_link_train_table_input, + make_pkey_fkey_graph, +) +from relbench.modeling.loader import SparseTensor +from relbench.modeling.utils import get_stype_proposal +from relbench.tasks import get_task +from torch import Tensor +from torch_frame import stype +from torch_frame.config.text_embedder import TextEmbedderConfig +from torch_geometric.loader import NeighborLoader +from torch_geometric.seed import seed_everything +from torch_geometric.typing import NodeType +from torch_geometric.utils.cross_entropy import sparse_cross_entropy +from tqdm import tqdm + +from contextgnn.nn.models import IDGNN, ContextGNN, ShallowRHSGNN +from contextgnn.utils import GloveTextEmbedding, RHSEmbeddingMode + +warnings.filterwarnings("ignore", category=FutureWarning) + + +parser = argparse.ArgumentParser() +parser.add_argument("--dataset", type=str, default="rel-trial") +parser.add_argument("--task", type=str, default="site-sponsor-run") +parser.add_argument("--lr", type=float, default=0.001) +parser.add_argument("--epochs", type=int, default=20) +parser.add_argument("--eval_epochs_interval", type=int, default=1) +parser.add_argument("--batch_size", type=int, default=512) +parser.add_argument("--channels", type=int, default=128) +parser.add_argument("--aggr", type=str, default="sum") +parser.add_argument("--num_layers", type=int, default=4) +parser.add_argument("--num_neighbors", type=int, default=128) +parser.add_argument("--temporal_strategy", type=str, default="last") +parser.add_argument("--max_steps_per_epoch", type=int, default=2000) +parser.add_argument("--num_workers", type=int, default=0) +parser.add_argument("--seed", type=int, default=42) +parser.add_argument("--cache_dir", type=str, + default=os.path.expanduser("~/.cache/relbench_examples")) +args = parser.parse_args() + + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +if torch.cuda.is_available(): + torch.set_num_threads(1) +seed_everything(args.seed) + +print("warming up...") +dataset: Dataset = get_dataset(args.dataset, download=True) +task: RecommendationTask = get_task(args.dataset, args.task, download=True) +tune_metric = "link_prediction_map" +assert task.task_type == TaskType.LINK_PREDICTION + +stypes_cache_path = Path(f"{args.cache_dir}/{args.dataset}/stypes.json") +try: + with open(stypes_cache_path, "r") as f: + col_to_stype_dict = json.load(f) + for table, col_to_stype in col_to_stype_dict.items(): + for col, stype_str in col_to_stype.items(): + col_to_stype[col] = stype(stype_str) +except FileNotFoundError: + col_to_stype_dict = get_stype_proposal(dataset.get_db()) + Path(stypes_cache_path).parent.mkdir(parents=True, exist_ok=True) + with open(stypes_cache_path, "w") as f: + json.dump(col_to_stype_dict, f, indent=2, default=str) + +data, col_stats_dict = make_pkey_fkey_graph( + dataset.get_db(), + col_to_stype_dict=col_to_stype_dict, + text_embedder_cfg=TextEmbedderConfig( + text_embedder=GloveTextEmbedding(device=device), batch_size=256), + cache_dir=f"{args.cache_dir}/{args.dataset}/materialized", +) +num_neighbors = [ + int(args.num_neighbors // 2**i) for i in range(args.num_layers) +] +loader_dict: Dict[str, NeighborLoader] = {} +dst_nodes_dict: Dict[str, Tuple[NodeType, Tensor]] = {} +num_dst_nodes_dict: Dict[str, int] = {} +for split in ["train", "val", "test"]: + table = task.get_table(split) + table_input = get_link_train_table_input(table, task) + dst_nodes_dict[split] = table_input.dst_nodes + num_dst_nodes_dict[split] = table_input.num_dst_nodes + loader_dict[split] = NeighborLoader( + data, + num_neighbors=num_neighbors, + time_attr="time", + input_nodes=table_input.src_nodes, + input_time=table_input.src_time, + subgraph_type="bidirectional", + batch_size=args.batch_size, + temporal_strategy=args.temporal_strategy, + shuffle=split == "train", + num_workers=args.num_workers, + persistent_workers=args.num_workers > 0, + ) + +num_steps = 1_000 +# num_uniq_batches = 20 +# batches = [] +# for i, batch in enumerate(loader_dict["train"]): +# if i == num_uniq_batches: +# break +# batches.append(batch) + + +def create_model(model_type: str) -> Union[IDGNN, ContextGNN, ShallowRHSGNN]: + if model_type == "idgnn": + return IDGNN( + data=data, + col_stats_dict=col_stats_dict, + num_layers=args.num_layers, + channels=args.channels, + out_channels=1, + aggr=args.aggr, + norm="layer_norm", + torch_frame_model_kwargs={ + "channels": 64, + "num_layers": 4, + }, + ).to(device) + elif model_type == "contextgnn": + return ContextGNN( + data=data, + col_stats_dict=col_stats_dict, + rhs_emb_mode=RHSEmbeddingMode.FUSION, + dst_entity_table=task.dst_entity_table, + num_nodes=num_dst_nodes_dict["train"], + num_layers=args.num_layers, + channels=args.channels, + aggr="sum", + norm="layer_norm", + embedding_dim=64, + torch_frame_model_kwargs={ + "channels": 64, + "num_layers": 4, + }, + ).to(device) + elif model_type == 'shallowrhsgnn': + return ShallowRHSGNN( + data=data, + col_stats_dict=col_stats_dict, + rhs_emb_mode=RHSEmbeddingMode.FUSION, + dst_entity_table=task.dst_entity_table, + num_nodes=num_dst_nodes_dict["train"], + num_layers=args.num_layers, + channels=args.channels, + aggr="sum", + norm="layer_norm", + embedding_dim=64, + torch_frame_model_kwargs={ + "channels": 64, + "num_layers": 4, + }, + ).to(device) + raise ValueError(f"Unsupported model type {model_type}.") + + +sparse_tensor = SparseTensor(dst_nodes_dict["train"][1], device=device) +for model_type in ["contextgnn", "idgnn", "shallowrhsgnn"]: + model = create_model(model_type) + optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) + + def train() -> float: + model.train() + + print("warming up...") + # for i, batch in batches[:10]: # warmup + for i, batch in enumerate(loader_dict["train"]): + batch = batch.to(device) + input_id = batch[task.src_entity_table].input_id + src_batch, dst_index = sparse_tensor[input_id] + optimizer.zero_grad() + if model_type == 'idgnn': + out = model(batch, task.src_entity_table, + task.dst_entity_table).flatten() + batch_size = batch[task.src_entity_table].batch_size + target = torch.isin( + batch[task.dst_entity_table].batch + + batch_size * batch[task.dst_entity_table].n_id, + src_batch + batch_size * dst_index, + ).float() + loss = F.binary_cross_entropy_with_logits(out, target) + elif model_type in ['contextgnn', 'shallowrhsgnn']: + logits = model(batch, task.src_entity_table, task.dst_entity_table) + edge_label_index = torch.stack([src_batch, dst_index], dim=0) + loss = sparse_cross_entropy(logits, edge_label_index) + + loss.backward() + optimizer.step() + + if i == 9: + break + + print("benchmarking...") + start = torch.cuda.Event(enable_timing=True) + end = torch.cuda.Event(enable_timing=True) + start.record() + # for i in range(num_steps): + # batch = batches[i % len(batches)] + for i, batch in enumerate(loader_dict["train"]): + batch = batch.to(device) + input_id = batch[task.src_entity_table].input_id + src_batch, dst_index = sparse_tensor[input_id] + optimizer.zero_grad() + if model_type == 'idgnn': + out = model(batch, task.src_entity_table, + task.dst_entity_table).flatten() + batch_size = batch[task.src_entity_table].batch_size + target = torch.isin( + batch[task.dst_entity_table].batch + + batch_size * batch[task.dst_entity_table].n_id, + src_batch + batch_size * dst_index, + ).float() + loss = F.binary_cross_entropy_with_logits(out, target) + elif model_type in ['contextgnn', 'shallowrhsgnn']: + logits = model(batch, task.src_entity_table, task.dst_entity_table) + edge_label_index = torch.stack([src_batch, dst_index], dim=0) + loss = sparse_cross_entropy(logits, edge_label_index) + + loss.backward() + optimizer.step() + + if i == num_steps - 1: + print(f"done at {i}th step") + break + + end.record() # type: ignore + torch.cuda.synchronize() + gpu_time = start.elapsed_time(end) + gpu_time_in_s = gpu_time / 1_000 + print( + f"model: {model_type}, ", + f"total: {gpu_time_in_s} s, " + f"avg: {gpu_time_in_s / num_steps} s/iter, " + f"avg: {num_steps / gpu_time_in_s} iter/s") + + train() + + +@torch.no_grad() +def test(loader: NeighborLoader, desc: str) -> np.ndarray: + model.eval() + + pred_list: List[Tensor] = [] + for batch in tqdm(loader, desc=desc): + batch = batch.to(device) + batch_size = batch[task.src_entity_table].batch_size + + if model_type == "idgnn": + out = (model.forward(batch, task.src_entity_table, + task.dst_entity_table).detach().flatten()) + scores = torch.zeros(batch_size, task.num_dst_nodes, + device=out.device) + scores[batch[task.dst_entity_table].batch, + batch[task.dst_entity_table].n_id] = torch.sigmoid(out) + elif model_type in ['contextgnn', 'shallowrhsgnn']: + out = model(batch, task.src_entity_table, + task.dst_entity_table).detach() + scores = torch.sigmoid(out) + else: + raise ValueError(f"Unsupported model type: {model_type}.") + + _, pred_mini = torch.topk(scores, k=task.eval_k, dim=1) + pred_list.append(pred_mini) + pred = torch.cat(pred_list, dim=0).cpu().numpy() + return pred From 8af2cab082f7b8a54da20b2a5cccc721c2a33257 Mon Sep 17 00:00:00 2001 From: Yiwen Yuan Date: Tue, 19 Nov 2024 06:40:01 +0000 Subject: [PATCH 2/7] incorporate rhs sample size --- benchmark/efficiency.py | 1 + 1 file changed, 1 insertion(+) diff --git a/benchmark/efficiency.py b/benchmark/efficiency.py index 5bb19a7..acbacf1 100644 --- a/benchmark/efficiency.py +++ b/benchmark/efficiency.py @@ -44,6 +44,7 @@ parser.add_argument("--lr", type=float, default=0.001) parser.add_argument("--epochs", type=int, default=20) parser.add_argument("--eval_epochs_interval", type=int, default=1) +parser.add_argument("--rhs_sample_size", type=int, default=10) parser.add_argument("--batch_size", type=int, default=512) parser.add_argument("--channels", type=int, default=128) parser.add_argument("--aggr", type=str, default="sum") From d91e36509ddcf1270ad612dbeba29d69ad81f543 Mon Sep 17 00:00:00 2001 From: Yiwen Yuan Date: Tue, 19 Nov 2024 06:40:06 +0000 Subject: [PATCH 3/7] incorporate rhs sample size --- benchmark/efficiency.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/benchmark/efficiency.py b/benchmark/efficiency.py index acbacf1..2726c61 100644 --- a/benchmark/efficiency.py +++ b/benchmark/efficiency.py @@ -37,7 +37,6 @@ warnings.filterwarnings("ignore", category=FutureWarning) - parser = argparse.ArgumentParser() parser.add_argument("--dataset", type=str, default="rel-trial") parser.add_argument("--task", type=str, default="site-sponsor-run") @@ -58,7 +57,6 @@ default=os.path.expanduser("~/.cache/relbench_examples")) args = parser.parse_args() - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if torch.cuda.is_available(): torch.set_num_threads(1) @@ -202,7 +200,8 @@ def train() -> float: ).float() loss = F.binary_cross_entropy_with_logits(out, target) elif model_type in ['contextgnn', 'shallowrhsgnn']: - logits = model(batch, task.src_entity_table, task.dst_entity_table) + logits = model(batch, task.src_entity_table, + task.dst_entity_table) edge_label_index = torch.stack([src_batch, dst_index], dim=0) loss = sparse_cross_entropy(logits, edge_label_index) @@ -234,7 +233,8 @@ def train() -> float: ).float() loss = F.binary_cross_entropy_with_logits(out, target) elif model_type in ['contextgnn', 'shallowrhsgnn']: - logits = model(batch, task.src_entity_table, task.dst_entity_table) + logits = model(batch, task.src_entity_table, + task.dst_entity_table) edge_label_index = torch.stack([src_batch, dst_index], dim=0) loss = sparse_cross_entropy(logits, edge_label_index) @@ -250,8 +250,7 @@ def train() -> float: gpu_time = start.elapsed_time(end) gpu_time_in_s = gpu_time / 1_000 print( - f"model: {model_type}, ", - f"total: {gpu_time_in_s} s, " + f"model: {model_type}, ", f"total: {gpu_time_in_s} s, " f"avg: {gpu_time_in_s / num_steps} s/iter, " f"avg: {num_steps / gpu_time_in_s} iter/s") From 3fd460efac018c56eb6c37bc8a5cffd9be1ffc49 Mon Sep 17 00:00:00 2001 From: Yiwen Yuan Date: Tue, 19 Nov 2024 06:55:16 +0000 Subject: [PATCH 4/7] fix efficiency --- benchmark/efficiency.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/benchmark/efficiency.py b/benchmark/efficiency.py index 2726c61..b3eea0e 100644 --- a/benchmark/efficiency.py +++ b/benchmark/efficiency.py @@ -43,7 +43,7 @@ parser.add_argument("--lr", type=float, default=0.001) parser.add_argument("--epochs", type=int, default=20) parser.add_argument("--eval_epochs_interval", type=int, default=1) -parser.add_argument("--rhs_sample_size", type=int, default=10) +parser.add_argument("--rhs_sample_size", type=int, default=-1) parser.add_argument("--batch_size", type=int, default=512) parser.add_argument("--channels", type=int, default=128) parser.add_argument("--aggr", type=str, default="sum") @@ -141,7 +141,7 @@ def create_model(model_type: str) -> Union[IDGNN, ContextGNN, ShallowRHSGNN]: return ContextGNN( data=data, col_stats_dict=col_stats_dict, - rhs_emb_mode=RHSEmbeddingMode.FUSION, + rhs_emb_mode=RHSEmbeddingMode.LOOKUP, dst_entity_table=task.dst_entity_table, num_nodes=num_dst_nodes_dict["train"], num_layers=args.num_layers, @@ -153,6 +153,8 @@ def create_model(model_type: str) -> Union[IDGNN, ContextGNN, ShallowRHSGNN]: "channels": 64, "num_layers": 4, }, + rhs_sample_size=None + if args.rhs_sample_size < 0 else args.rhs_sample_size, ).to(device) elif model_type == 'shallowrhsgnn': return ShallowRHSGNN( @@ -179,7 +181,7 @@ def create_model(model_type: str) -> Union[IDGNN, ContextGNN, ShallowRHSGNN]: model = create_model(model_type) optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) - def train() -> float: + def train(): model.train() print("warming up...") @@ -200,9 +202,18 @@ def train() -> float: ).float() loss = F.binary_cross_entropy_with_logits(out, target) elif model_type in ['contextgnn', 'shallowrhsgnn']: - logits = model(batch, task.src_entity_table, - task.dst_entity_table) - edge_label_index = torch.stack([src_batch, dst_index], dim=0) + if args.rhs_sample_size < 0: + logits = model(batch, task.src_entity_table, + task.dst_entity_table) + edge_label_index = torch.stack([src_batch, dst_index], + dim=0) + else: + (logits, lhs_y_batch, + rhs_y_index) = model.forward_sample_softmax( + batch, task.src_entity_table, task.dst_entity_table, + src_batch, dst_index) + edge_label_index = torch.stack([lhs_y_batch, rhs_y_index], + dim=0) loss = sparse_cross_entropy(logits, edge_label_index) loss.backward() @@ -233,9 +244,18 @@ def train() -> float: ).float() loss = F.binary_cross_entropy_with_logits(out, target) elif model_type in ['contextgnn', 'shallowrhsgnn']: - logits = model(batch, task.src_entity_table, - task.dst_entity_table) - edge_label_index = torch.stack([src_batch, dst_index], dim=0) + if args.rhs_sample_size < 0: + logits = model(batch, task.src_entity_table, + task.dst_entity_table) + edge_label_index = torch.stack([src_batch, dst_index], + dim=0) + else: + (logits, lhs_y_batch, + rhs_y_index) = model.forward_sample_softmax( + batch, task.src_entity_table, task.dst_entity_table, + src_batch, dst_index) + edge_label_index = torch.stack([lhs_y_batch, rhs_y_index], + dim=0) loss = sparse_cross_entropy(logits, edge_label_index) loss.backward() @@ -245,7 +265,7 @@ def train() -> float: print(f"done at {i}th step") break - end.record() # type: ignore + end.record() torch.cuda.synchronize() gpu_time = start.elapsed_time(end) gpu_time_in_s = gpu_time / 1_000 From 325e5a2ed1ba6a612d9bd56dbcb138d092018b37 Mon Sep 17 00:00:00 2001 From: Yiwen Yuan Date: Tue, 19 Nov 2024 09:42:13 +0000 Subject: [PATCH 5/7] fix --- examples/contextgnn_sample_softmax.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/examples/contextgnn_sample_softmax.py b/examples/contextgnn_sample_softmax.py index fa3f4a8..cb7d291 100644 --- a/examples/contextgnn_sample_softmax.py +++ b/examples/contextgnn_sample_softmax.py @@ -35,19 +35,19 @@ from contextgnn.utils import GloveTextEmbedding, RHSEmbeddingMode parser = argparse.ArgumentParser() -parser.add_argument("--dataset", type=str, default="rel-amazon") -parser.add_argument("--task", type=str, default="user-item-purchase") +parser.add_argument("--dataset", type=str, default="rel-trial") +parser.add_argument("--task", type=str, default="site-sponsor-run") parser.add_argument("--lr", type=float, default=0.001) parser.add_argument("--epochs", type=int, default=20) parser.add_argument("--eval_epochs_interval", type=int, default=1) -parser.add_argument("--batch_size", type=int, default=128) +parser.add_argument("--rhs_sample_size", type=int, default=-1) +parser.add_argument("--batch_size", type=int, default=512) parser.add_argument("--channels", type=int, default=128) parser.add_argument("--aggr", type=str, default="sum") -parser.add_argument("--num_layers", type=int, default=6) -parser.add_argument("--num_neighbors", type=int, default=64) -parser.add_argument("--rhs_sample_size", type=int, default=1000) +parser.add_argument("--num_layers", type=int, default=4) +parser.add_argument("--num_neighbors", type=int, default=128) parser.add_argument("--temporal_strategy", type=str, default="last") -parser.add_argument("--max_steps_per_epoch", type=int, default=200) +parser.add_argument("--max_steps_per_epoch", type=int, default=2000) parser.add_argument("--num_workers", type=int, default=0) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--cache_dir", type=str, From c41c63ec56c4b6c4ea23f502a323d6eeedfbc47b Mon Sep 17 00:00:00 2001 From: Yiwen Yuan Date: Thu, 21 Nov 2024 12:13:27 +0000 Subject: [PATCH 6/7] add ngcf benchmark script --- benchmark/ngcf_efficiency.py | 366 +++++++++++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 benchmark/ngcf_efficiency.py diff --git a/benchmark/ngcf_efficiency.py b/benchmark/ngcf_efficiency.py new file mode 100644 index 0000000..7cfb4ef --- /dev/null +++ b/benchmark/ngcf_efficiency.py @@ -0,0 +1,366 @@ + +"""Example script to run the models in this repository. + +python3 ngcf.py --dataset rel-hm --task user-item-purchase --val_loss +python3 ngcf.py --dataset rel-avito --task user-ad-visit --val_loss +""" + +from __future__ import annotations + +import argparse +import json +import os +from pathlib import Path +from typing import Dict, List, Literal + +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +from relbench.base import Dataset, RecommendationTask, TaskType +from relbench.datasets import get_dataset +from relbench.modeling.graph import ( + get_link_train_table_input, + make_pkey_fkey_graph, +) +from relbench.modeling.utils import get_stype_proposal +from relbench.tasks import get_task +from torch import Tensor +from torch.utils.data import DataLoader +from torch.utils.tensorboard import SummaryWriter +from torch_frame import stype +from torch_frame.config.text_embedder import TextEmbedderConfig +from torch_geometric.nn.conv.gcn_conv import gcn_norm +from torch_geometric.nn.models.lightgcn import BPRLoss +from torch_geometric.seed import seed_everything +from torch_geometric.typing import SparseTensor +from torch_geometric.utils import add_self_loops, coalesce, to_undirected +from tqdm import tqdm + +from hybridgnn.utils import GloveTextEmbedding + +parser = argparse.ArgumentParser() +parser.add_argument("--dataset", type=str, default="rel-trial") +parser.add_argument("--task", type=str, default="site-sponsor-run") +parser.add_argument("--lr", type=float, default=0.0001) +parser.add_argument("--epochs", type=int, default=10) +parser.add_argument("--eval_epochs_interval", type=int, default=1) +parser.add_argument("--batch_size", type=int, default=1024) +parser.add_argument("--channels", type=int, default=64) +parser.add_argument("--num_layers", type=int, default=3) +parser.add_argument("--seed", type=int, default=42) +parser.add_argument("--max_num_train_edges", type=int, default=3000000) +parser.add_argument("--lambda_reg", type=float, default=1e-4) +parser.add_argument("--node_dropout", type=float, default=0.1) +parser.add_argument("--max_steps_per_epoch", type=int, default=2000) +parser.add_argument("--val_loss", default=False, action="store_true") +parser.add_argument("--cache_dir", type=str, + default=os.path.expanduser("~/.cache/relbench_examples")) +args = parser.parse_args() + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") +if torch.cuda.is_available(): + torch.set_num_threads(1) +seed_everything(args.seed) + + +class NGCF(torch.nn.Module): + def __init__( + self, + num_src_nodes: int, + num_dst_nodes: int, + emb_size: int = 64, + num_layers: int = 3, + node_dropout: float = 0.1, + ): + super(NGCF, self).__init__() + self.num_src_nodes = num_src_nodes + self.num_dst_nodes = num_dst_nodes + self.num_total_nodes = num_src_nodes + num_dst_nodes + self.node_dropout = node_dropout + self.emb = nn.Embedding(num_src_nodes + num_dst_nodes, emb_size) + self.gc_layers = nn.ModuleList() + self.bi_layers = nn.ModuleList() + for _ in range(num_layers): + self.gc_layers.append(nn.Linear(emb_size, emb_size)) + self.bi_layers.append(nn.Linear(emb_size, emb_size)) + self.reset_parameters() + + def reset_parameters(self): + nn.init.xavier_uniform_(self.emb.weight) + for lin in self.gc_layers: + nn.init.xavier_uniform_(lin.weight) + for lin in self.bi_layers: + nn.init.xavier_uniform_(lin.weight) + + def sparse_dropout(self, row: Tensor, col: Tensor, value: Tensor, + rate: float, nnz: int) -> SparseTensor: + rand: Tensor = (1 - rate) + torch.rand(nnz) + assert isinstance(rand, Tensor) + dropout_mask = torch.floor(rand).type(torch.bool) + adj = SparseTensor( + row=row[dropout_mask], + col=col[dropout_mask], + value=value[dropout_mask] * (1. / (1 - rate)), + sparse_sizes=(self.num_total_nodes, self.num_total_nodes), + ) + return adj + + def get_embedding(self, norm_adj: SparseTensor, + device=torch.device) -> Tensor: + ego_emb = self.emb.weight + all_embs: List[Tensor] = [ego_emb] + if self.node_dropout > 0 and self.training: + row, col, value = norm_adj.coo() + adj = self.sparse_dropout( + row, + col, + value, + self.node_dropout, + norm_adj.nnz(), + ) + else: + adj = norm_adj + adj = adj.to(device) + for i in range(len(self.gc_layers)): + msg_emb = adj @ ego_emb + aggr_emb = self.gc_layers[i](msg_emb) + bi_emb = torch.mul(ego_emb, msg_emb) + bi_emb = self.bi_layers[i](bi_emb) + ego_emb = F.leaky_relu(aggr_emb + bi_emb, negative_slope=0.2) + ego_emb = F.dropout(ego_emb) + norm_emb = F.normalize(ego_emb, p=2, dim=1) + all_embs += [norm_emb] + res_embs: Tensor = torch.cat(all_embs, 1) + return res_embs + + def recommendation_loss( + self, + pos_edge_rank: Tensor, + neg_edge_rank: Tensor, + node_id: Tensor | None = None, + lambda_reg: float = 1e-4, + **kwargs, + ) -> Tensor: + loss_fn = BPRLoss(lambda_reg, **kwargs) + emb = self.emb.weight + emb = emb if node_id is None else emb[node_id] + return loss_fn(pos_edge_rank, neg_edge_rank, emb) + + def forward(self, edge_label_index: Tensor, norm_adj: Tensor, + device: torch.device) -> Tensor: + all_embs = self.get_embedding(norm_adj, device) + out_src = all_embs[edge_label_index[0]] + out_dst = all_embs[edge_label_index[1]] + return (out_src * out_dst).sum(dim=-1) + + +dataset: Dataset = get_dataset(args.dataset, download=True) +task: RecommendationTask = get_task(args.dataset, args.task, download=True) +tune_metric = "link_prediction_map" +assert task.task_type == TaskType.LINK_PREDICTION + +stypes_cache_path = Path(f"{args.cache_dir}/{args.dataset}/stypes.json") +try: + with open(stypes_cache_path, "r") as f: + col_to_stype_dict = json.load(f) + for table, col_to_stype in col_to_stype_dict.items(): + for col, stype_str in col_to_stype.items(): + col_to_stype[col] = stype(stype_str) +except FileNotFoundError: + col_to_stype_dict = get_stype_proposal(dataset.get_db()) + Path(stypes_cache_path).parent.mkdir(parents=True, exist_ok=True) + with open(stypes_cache_path, "w") as f: + json.dump(col_to_stype_dict, f, indent=2, default=str) + +_ = make_pkey_fkey_graph( + dataset.get_db(), + col_to_stype_dict=col_to_stype_dict, + text_embedder_cfg=TextEmbedderConfig( + text_embedder=GloveTextEmbedding(device=device), batch_size=256), + cache_dir=f"{args.cache_dir}/{args.dataset}/materialized", +) + +num_src_nodes = task.num_src_nodes +num_dst_nodes = task.num_dst_nodes +num_total_nodes = num_src_nodes + num_dst_nodes + +split_edge_index_dict: Dict[str, Tensor] = {} +split_edge_weight_dict: Dict[str, Tensor] = {} +n_id_dict: Dict[str, Tensor] = {} +for split in ["train", "val", "test"]: + table = task.get_table(split) + table_input = get_link_train_table_input(table, task) + + # Get n_id for each split ################################################# + src_entities = torch.from_numpy(table.df[task.src_entity_col].to_numpy()) + # Only validation and test need the source entities for prediction + if split != "train": + n_id_dict[split] = src_entities + + # Get message passing edge_index for each split ########################### + dst_csr = table_input.dst_nodes[1] + # Compute counts per row from the CSR matrix + counts_per_row = dst_csr.crow_indices()[1:] - dst_csr.crow_indices()[:-1] + # Get source nodes using row indices + src = table_input.src_nodes[1].repeat_interleave(counts_per_row) + # Get edge_index using src and column indices + edge_index = torch.stack([src, dst_csr.col_indices()], dim=0) + # Convert to bipartite graph + edge_index[1, :] += num_src_nodes + # Remove duplicated edges but use edge weight for message passing + edge_weight = torch.ones(edge_index.size(1)).to(edge_index.device) + edge_index, edge_weight = coalesce(edge_index, edge_attr=edge_weight, + num_nodes=num_total_nodes) + split_edge_index_dict[split] = edge_index + split_edge_weight_dict[split] = edge_weight + +model = NGCF(num_src_nodes, num_dst_nodes, emb_size=args.channels, + num_layers=args.num_layers, + node_dropout=args.node_dropout).to(device) +loss_fn = BPRLoss(lambda_reg=args.lambda_reg) + +train_edge_index = split_edge_index_dict["train"].to("cpu") +train_edge_weight = split_edge_weight_dict["train"].to("cpu") +# Shuffle train edges to avoid only using same edges for supervision each time +perm = torch.randperm(train_edge_index.size(1), device="cpu") +train_edge_index = train_edge_index[:, perm][:, :args.max_num_train_edges] +train_edge_weight = train_edge_weight[perm][:args.max_num_train_edges] +# Convert to undirected graph +train_mp_edge_index_orig, train_mp_edge_weight_orig = to_undirected( + train_edge_index, train_edge_weight) +# Add self loops +train_mp_edge_index, train_mp_edge_weight = add_self_loops( + train_mp_edge_index_orig, train_mp_edge_weight_orig, + num_nodes=num_total_nodes) +# GCN normalized edges +train_mp_edge_index, train_mp_edge_weight = gcn_norm( + train_mp_edge_index, + train_mp_edge_weight, + num_nodes=num_total_nodes, + add_self_loops=False, +) +train_norm_adj = SparseTensor( + row=train_mp_edge_index[0], + col=train_mp_edge_index[1], + value=train_mp_edge_weight, + sparse_sizes=(num_total_nodes, num_total_nodes), +).to("cpu") +val_edge_index = split_edge_index_dict["val"] +val_edge_weight = split_edge_weight_dict["val"] +val_mp_edge_index_orig, val_mp_edge_weight_orig = to_undirected( + val_edge_index, val_edge_weight) +test_mp_edge_index_orig = torch.cat( + [train_mp_edge_index_orig, val_mp_edge_index_orig], dim=1) +test_mp_edge_weight_orig = torch.cat( + [train_mp_edge_weight_orig, val_mp_edge_weight_orig], dim=0) +test_mp_edge_index_orig, test_mp_edge_weight_orig = coalesce( + test_mp_edge_index_orig, edge_attr=test_mp_edge_weight_orig, + num_nodes=num_total_nodes) +test_mp_edge_index, test_mp_edge_weight = gcn_norm( + test_mp_edge_index_orig, + test_mp_edge_weight_orig, + num_nodes=num_total_nodes, + add_self_loops=False, +) +test_norm_adj = SparseTensor( + row=test_mp_edge_index[0], + col=test_mp_edge_index[1], + value=test_mp_edge_weight, + sparse_sizes=(num_total_nodes, num_total_nodes), +).to("cpu") + +val_n_ids = n_id_dict["val"].to(device) +test_n_ids = n_id_dict["test"].to(device) +train_loader: DataLoader = DataLoader( + torch.arange(train_edge_index.size(1)), # type: ignore + shuffle=True, + batch_size=args.batch_size) + +optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) +writer = SummaryWriter() + + +def get_edge_label_index(sup_edge_index: Tensor, index: Tensor) -> Tensor: + pos_edge_label_index = sup_edge_index[:, index].to(device) + neg_edge_label_index = torch.stack([ + pos_edge_label_index[0], + torch.randint( + num_src_nodes, + num_total_nodes, + (index.numel(), ), + device=device, + ) + ], dim=0) + edge_label_index = torch.cat([ + pos_edge_label_index, + neg_edge_label_index, + ], dim=1) + return edge_label_index + + +num_steps = 1_000 + +def train(epoch: int) -> float: + model.train() + total_loss = total_examples = 0 + total_steps = min(args.max_steps_per_epoch, len(train_loader)) + print("warming up") + for i, index in enumerate( + train_loader, total=total_steps, desc="Train"): + if i >= args.max_steps_per_epoch: + break + edge_label_index = get_edge_label_index(train_edge_index, index) + optimizer.zero_grad() + pos_rank, neg_rank = model( + edge_label_index, + train_norm_adj, + device=device, + ).chunk(2) + loss = model.recommendation_loss( + pos_rank, + neg_rank, + node_id=edge_label_index.unique(), + lambda_reg=args.lambda_reg, + ) + loss.backward() + optimizer.step() + if i == 9: + break + + + print("benchmarking...") + start = torch.cuda.Event(enable_timing=True) + end = torch.cuda.Event(enable_timing=True) + start.record() + # for i in range(num_steps): + # batch = batches[i % len(batches)] + for i, batch in enumerate(train_loader): + edge_label_index = get_edge_label_index(train_edge_index, index) + optimizer.zero_grad() + pos_rank, neg_rank = model( + edge_label_index, + train_norm_adj, + device=device, + ).chunk(2) + loss = model.recommendation_loss( + pos_rank, + neg_rank, + node_id=edge_label_index.unique(), + lambda_reg=args.lambda_reg, + ) + loss.backward() + optimizer.step() + + if i == num_steps - 1: + print(f"done at {i}th step") + break + + end.record() + torch.cuda.synchronize() + gpu_time = start.elapsed_time(end) + gpu_time_in_s = gpu_time / 1_000 + print( + f"model: ngcf, ", f"total: {gpu_time_in_s} s, " + f"avg: {gpu_time_in_s / num_steps} s/iter, " + f"avg: {num_steps / gpu_time_in_s} iter/s") \ No newline at end of file From dac0265a5a87a35b43a830dea3603c8d321afca9 Mon Sep 17 00:00:00 2001 From: Yiwen Yuan Date: Thu, 21 Nov 2024 12:13:31 +0000 Subject: [PATCH 7/7] add ngcf benchmark script --- benchmark/ngcf_efficiency.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/benchmark/ngcf_efficiency.py b/benchmark/ngcf_efficiency.py index 7cfb4ef..a1de0cd 100644 --- a/benchmark/ngcf_efficiency.py +++ b/benchmark/ngcf_efficiency.py @@ -1,4 +1,3 @@ - """Example script to run the models in this repository. python3 ngcf.py --dataset rel-hm --task user-item-purchase --val_loss @@ -301,13 +300,13 @@ def get_edge_label_index(sup_edge_index: Tensor, index: Tensor) -> Tensor: num_steps = 1_000 + def train(epoch: int) -> float: model.train() total_loss = total_examples = 0 total_steps = min(args.max_steps_per_epoch, len(train_loader)) print("warming up") - for i, index in enumerate( - train_loader, total=total_steps, desc="Train"): + for i, index in enumerate(train_loader, total=total_steps, desc="Train"): if i >= args.max_steps_per_epoch: break edge_label_index = get_edge_label_index(train_edge_index, index) @@ -328,26 +327,25 @@ def train(epoch: int) -> float: if i == 9: break - print("benchmarking...") start = torch.cuda.Event(enable_timing=True) end = torch.cuda.Event(enable_timing=True) start.record() - # for i in range(num_steps): - # batch = batches[i % len(batches)] + # for i in range(num_steps): + # batch = batches[i % len(batches)] for i, batch in enumerate(train_loader): edge_label_index = get_edge_label_index(train_edge_index, index) optimizer.zero_grad() pos_rank, neg_rank = model( - edge_label_index, - train_norm_adj, - device=device, + edge_label_index, + train_norm_adj, + device=device, ).chunk(2) loss = model.recommendation_loss( - pos_rank, - neg_rank, - node_id=edge_label_index.unique(), - lambda_reg=args.lambda_reg, + pos_rank, + neg_rank, + node_id=edge_label_index.unique(), + lambda_reg=args.lambda_reg, ) loss.backward() optimizer.step() @@ -363,4 +361,4 @@ def train(epoch: int) -> float: print( f"model: ngcf, ", f"total: {gpu_time_in_s} s, " f"avg: {gpu_time_in_s / num_steps} s/iter, " - f"avg: {num_steps / gpu_time_in_s} iter/s") \ No newline at end of file + f"avg: {num_steps / gpu_time_in_s} iter/s")