Skip to content

Commit

Permalink
Add wsp path tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Feb 14, 2025
1 parent eb9df66 commit 16ac83f
Show file tree
Hide file tree
Showing 14 changed files with 381 additions and 148 deletions.
1 change: 1 addition & 0 deletions src/function/gds/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_library(kuzu_function_algorithm
OBJECT
all_shortest_paths.cpp
bfs_graph.cpp
gds.cpp
gds_frontier.cpp
gds_state.cpp
Expand Down
25 changes: 16 additions & 9 deletions src/function/gds/all_shortest_paths.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "graph/graph.h"
#include "main/client_context.h"
#include "processor/execution_context.h"
#include "binder/expression/node_expression.h"

using namespace kuzu::processor;
using namespace kuzu::common;
Expand Down Expand Up @@ -172,7 +173,7 @@ class AllSPPathsEdgeCompute : public SPEdgeCompute {
public:
AllSPPathsEdgeCompute(SinglePathLengthsFrontierPair* frontiersPair, BFSGraph* bfsGraph)
: SPEdgeCompute{frontiersPair}, bfsGraph{bfsGraph} {
parentPtrsBlock = bfsGraph->addNewBlock();
block = bfsGraph->addNewBlock();
}

std::vector<nodeID_t> edgeCompute(nodeID_t boundNodeID, NbrScanState::Chunk& resultChunk,
Expand All @@ -188,12 +189,11 @@ class AllSPPathsEdgeCompute : public SPEdgeCompute {
auto shouldUpdate =
nbrLen == PathLengths::UNVISITED || nbrLen == frontierPair->getCurrentIter();
if (shouldUpdate) {
if (!parentPtrsBlock->hasSpace()) {
parentPtrsBlock = bfsGraph->addNewBlock();
if (!block->hasSpace()) {
block = bfsGraph->addNewBlock();
}
auto parent = parentPtrsBlock->reserveNext();
parent->store(frontierPair->getCurrentIter(), boundNodeID, edgeID, fwdEdge);
bfsGraph->addParent(parent, nbrNodeID.offset);
bfsGraph->addParent(frontierPair->getCurrentIter(), boundNodeID, edgeID, nbrNodeID,
fwdEdge, block);
}
if (nbrLen == PathLengths::UNVISITED) {
activeNodes.push_back(nbrNodeID);
Expand All @@ -208,7 +208,7 @@ class AllSPPathsEdgeCompute : public SPEdgeCompute {

private:
BFSGraph* bfsGraph;
ObjectBlock<ParentList>* parentPtrsBlock = nullptr;
ObjectBlock<ParentList>* block = nullptr;
};

/**
Expand All @@ -222,7 +222,9 @@ class AllSPDestinationsAlgorithm final : public RJAlgorithm {
AllSPDestinationsAlgorithm(const AllSPDestinationsAlgorithm& other) : RJAlgorithm{other} {}

expression_vector getResultColumns(const function::GDSBindInput& /*bindInput*/) const override {
auto columns = getBaseResultColumns();
expression_vector columns;
columns.push_back(bindData->getNodeInput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->getNodeOutput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->ptrCast<RJBindData>()->lengthExpr);
return columns;
}
Expand Down Expand Up @@ -256,9 +258,14 @@ class AllSPPathsAlgorithm final : public RJAlgorithm {
AllSPPathsAlgorithm(const AllSPPathsAlgorithm& other) : RJAlgorithm{other} {}

expression_vector getResultColumns(const function::GDSBindInput& /*bindInput*/) const override {
auto columns = getBaseResultColumns();
auto rjBindData = bindData->ptrCast<RJBindData>();
expression_vector columns;
columns.push_back(bindData->getNodeInput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->getNodeOutput()->constCast<NodeExpression>().getInternalID());
columns.push_back(rjBindData->lengthExpr);
if (rjBindData->extendDirection == ExtendDirection::BOTH) {
columns.push_back(rjBindData->directionExpr);
}
columns.push_back(rjBindData->pathNodeIDsExpr);
columns.push_back(rjBindData->pathEdgeIDsExpr);
return columns;
Expand Down
67 changes: 67 additions & 0 deletions src/function/gds/bfs_graph.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include "function/gds/bfs_graph.h"

using namespace kuzu::common;

namespace kuzu {
namespace function {

static constexpr uint64_t BFS_GRAPH_BLOCK_SIZE = (std::uint64_t)1 << 19;

ObjectBlock<ParentList>* BFSGraph::addNewBlock() {
std::unique_lock lck{mtx};
auto memBlock = mm->allocateBuffer(false /* init to 0 */, BFS_GRAPH_BLOCK_SIZE);
blocks.push_back(
std::make_unique<ObjectBlock<ParentList>>(std::move(memBlock), BFS_GRAPH_BLOCK_SIZE));
return blocks[blocks.size() - 1].get();
}

void BFSGraph::addParent(uint16_t iter, nodeID_t boundNodeID, relID_t edgeID, nodeID_t nbrNodeID,
bool fwdEdge, ObjectBlock<ParentList>* block) {
auto parent = block->reserveNext();
parent->setNbrInfo(boundNodeID, edgeID, fwdEdge);
parent->setIter(iter);
// Since by default the parentPtr of each node is nullptr, that's what we start with.
ParentList* expected = nullptr;
while (!currParentPtrs[nbrNodeID.offset].compare_exchange_strong(expected, parent))
;
parent->setNextPtr(expected);
}

void BFSGraph::addSingleParent(uint16_t iter, nodeID_t boundNodeID, relID_t edgeID,
nodeID_t nbrNodeID, bool fwdEdge, ObjectBlock<kuzu::function::ParentList>* block) {
auto parent = block->reserveNext();
parent->setNbrInfo(boundNodeID, edgeID, fwdEdge);
parent->setIter(iter);
ParentList* expected = nullptr;
if (currParentPtrs[nbrNodeID.offset].compare_exchange_strong(expected, parent)) {
parent->setNextPtr(expected);
} else {
// Other thread has added the parent. Do NOT add parent and revert reserved slot.
block->revertLast();
}
}

static double getCost(ParentList* parentList) {
return parentList == nullptr ? std::numeric_limits<double>::max() : parentList->getCost();
}

bool BFSGraph::tryAddSingleParentWithWeight(nodeID_t boundNodeID, relID_t edgeID,
nodeID_t nbrNodeID, bool fwdEdge, double weight, ObjectBlock<ParentList>* block) {
ParentList* expected = getParentListHead(nbrNodeID.offset);
auto parent = block->reserveNext();
parent->setNbrInfo(boundNodeID, edgeID, fwdEdge);
parent->setCost(getParentListHead(boundNodeID.offset)->getCost() + weight);
while (parent->getCost() < getCost(expected)) {
if (currParentPtrs[nbrNodeID.offset].compare_exchange_strong(expected, parent)) {
// Since each node can have one parent, set next ptr to nullptr.
parent->setNextPtr(nullptr);
return true;
}
}
// Other thread has added the parent. Do NOT add parent and revert reserved slot.
block->revertLast();
return false;
}

} // namespace function
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/function/gds/output_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ PathsOutputWriter::PathsOutputWriter(main::ClientContext* context,
BFSGraph& bfsGraph)
: RJOutputWriter{context, outputNodeMask, sourceNodeID}, info{info}, bfsGraph{bfsGraph} {
auto mm = context->getMemoryManager();
lengthVector = createVector(LogicalType::UINT16(), mm);
if (info.writeEdgeDirection) {
directionVector = createVector(LogicalType::LIST(LogicalType::BOOL()), mm);
}
lengthVector = createVector(LogicalType::UINT16(), mm);
if (info.writePath) {
pathNodeIDsVector = createVector(LogicalType::LIST(LogicalType::INTERNAL_ID()), mm);
pathEdgeIDsVector = createVector(LogicalType::LIST(LogicalType::INTERNAL_ID()), mm);
Expand Down
23 changes: 0 additions & 23 deletions src/function/gds/rec_joins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,6 @@ void RJAlgorithm::setToNoPath() {
bindData->ptrCast<RJBindData>()->writePath = false;
}

binder::expression_vector RJAlgorithm::getResultColumnsNoPath() {
expression_vector columns;
auto& inputNode = bindData->getNodeInput()->constCast<NodeExpression>();
columns.push_back(inputNode.getInternalID());
auto& outputNode = bindData->getNodeOutput()->constCast<NodeExpression>();
columns.push_back(outputNode.getInternalID());
columns.push_back(bindData->ptrCast<RJBindData>()->lengthExpr);
return columns;
}

expression_vector RJAlgorithm::getBaseResultColumns() const {
expression_vector columns;
auto& inputNode = bindData->getNodeInput()->constCast<NodeExpression>();
columns.push_back(inputNode.getInternalID());
auto& outputNode = bindData->getNodeOutput()->constCast<NodeExpression>();
columns.push_back(outputNode.getInternalID());
auto rjBindData = bindData->ptrCast<RJBindData>();
if (rjBindData->extendDirection == ExtendDirection::BOTH) {
columns.push_back(rjBindData->directionExpr);
}
return columns;
}

// All recursive join computation have the same vertex compute. This vertex compute writes
// result (could be dst, length or path) from a dst node ID to given source node ID.
class RJVertexCompute : public VertexCompute {
Expand Down
25 changes: 16 additions & 9 deletions src/function/gds/single_shortest_paths.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "function/gds_function.h"
#include "graph/graph.h"
#include "processor/execution_context.h"
#include "binder/expression/node_expression.h"

using namespace kuzu::processor;
using namespace kuzu::common;
Expand Down Expand Up @@ -82,7 +83,7 @@ class SingleSPPathsEdgeCompute : public SPEdgeCompute {
public:
SingleSPPathsEdgeCompute(SinglePathLengthsFrontierPair* frontierPair, BFSGraph* bfsGraph)
: SPEdgeCompute{frontierPair}, bfsGraph{bfsGraph} {
parentPtrsBlock = bfsGraph->addNewBlock();
block = bfsGraph->addNewBlock();
}

std::vector<nodeID_t> edgeCompute(nodeID_t boundNodeID, NbrScanState::Chunk& resultChunk,
Expand All @@ -91,12 +92,11 @@ class SingleSPPathsEdgeCompute : public SPEdgeCompute {
resultChunk.forEach([&](auto nbrNodeID, auto edgeID) {
if (frontierPair->getPathLengths()->getMaskValueFromNextFrontier(nbrNodeID.offset) ==
PathLengths::UNVISITED) {
if (!parentPtrsBlock->hasSpace()) {
parentPtrsBlock = bfsGraph->addNewBlock();
if (!block->hasSpace()) {
block = bfsGraph->addNewBlock();
}
auto parent = parentPtrsBlock->reserveNext();
parent->store(frontierPair->getCurrentIter(), boundNodeID, edgeID, isFwd);
bfsGraph->tryAddSingleParent(parent, nbrNodeID.offset, parentPtrsBlock);
bfsGraph->addSingleParent(frontierPair->getCurrentIter(), boundNodeID, edgeID,
nbrNodeID, isFwd, block);
activeNodes.push_back(nbrNodeID);
}
});
Expand All @@ -109,7 +109,7 @@ class SingleSPPathsEdgeCompute : public SPEdgeCompute {

private:
BFSGraph* bfsGraph;
ObjectBlock<ParentList>* parentPtrsBlock = nullptr;
ObjectBlock<ParentList>* block = nullptr;
};

/**
Expand All @@ -125,7 +125,9 @@ class SingleSPDestinationsAlgorithm : public RJAlgorithm {
: RJAlgorithm{other} {}

expression_vector getResultColumns(const function::GDSBindInput& /*bindInput*/) const override {
auto columns = getBaseResultColumns();
expression_vector columns;
columns.push_back(bindData->getNodeInput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->getNodeOutput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->ptrCast<RJBindData>()->lengthExpr);
return columns;
}
Expand Down Expand Up @@ -155,9 +157,14 @@ class SingleSPPathsAlgorithm : public RJAlgorithm {
SingleSPPathsAlgorithm(const SingleSPPathsAlgorithm& other) : RJAlgorithm{other} {}

expression_vector getResultColumns(const function::GDSBindInput& /*bindInput*/) const override {
auto columns = getBaseResultColumns();
auto rjBindData = bindData->ptrCast<RJBindData>();
expression_vector columns;
columns.push_back(bindData->getNodeInput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->getNodeOutput()->constCast<NodeExpression>().getInternalID());
columns.push_back(rjBindData->lengthExpr);
if (rjBindData->extendDirection == ExtendDirection::BOTH) {
columns.push_back(rjBindData->directionExpr);
}
columns.push_back(rjBindData->pathNodeIDsExpr);
columns.push_back(rjBindData->pathEdgeIDsExpr);
return columns;
Expand Down
27 changes: 16 additions & 11 deletions src/function/gds/variable_length_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "function/gds/rec_joins.h"
#include "graph/graph.h"
#include "processor/execution_context.h"
#include "binder/expression/node_expression.h"

using namespace kuzu::binder;
using namespace kuzu::common;
Expand Down Expand Up @@ -44,25 +45,24 @@ class VarLenPathsOutputWriter final : public PathsOutputWriter {
struct VarLenJoinsEdgeCompute : public EdgeCompute {
DoublePathLengthsFrontierPair* frontierPair;
BFSGraph* bfsGraph;
ObjectBlock<ParentList>* parentPtrsBlock = nullptr;
ObjectBlock<ParentList>* block = nullptr;

VarLenJoinsEdgeCompute(DoublePathLengthsFrontierPair* frontierPair, BFSGraph* bfsGraph)
: frontierPair{frontierPair}, bfsGraph{bfsGraph} {
parentPtrsBlock = bfsGraph->addNewBlock();
block = bfsGraph->addNewBlock();
};

std::vector<nodeID_t> edgeCompute(nodeID_t boundNodeID, graph::NbrScanState::Chunk& chunk,
bool isFwd) override {
bool fwdEdge) override {
std::vector<nodeID_t> activeNodes;
chunk.forEach([&](auto nbrNode, auto edgeID) {
chunk.forEach([&](auto nbrNodeID, auto edgeID) {
// We should always update the nbrID in variable length joins
if (!parentPtrsBlock->hasSpace()) {
parentPtrsBlock = bfsGraph->addNewBlock();
if (!block->hasSpace()) {
block = bfsGraph->addNewBlock();
}
auto parent = parentPtrsBlock->reserveNext();
parent->store(frontierPair->getCurrentIter(), boundNodeID, edgeID, isFwd);
bfsGraph->addParent(parent, nbrNode.offset);
activeNodes.push_back(nbrNode);
bfsGraph->addParent(frontierPair->getCurrentIter(), boundNodeID, edgeID, nbrNodeID,
fwdEdge, block);
activeNodes.push_back(nbrNodeID);
});
return activeNodes;
}
Expand All @@ -84,10 +84,15 @@ class VarLenJoinsAlgorithm final : public RJAlgorithm {

binder::expression_vector getResultColumns(
const function::GDSBindInput& /*bindInput*/) const override {
auto columns = getBaseResultColumns();
auto rjBindData = bindData->ptrCast<RJBindData>();
expression_vector columns;
columns.push_back(bindData->getNodeInput()->constCast<NodeExpression>().getInternalID());
columns.push_back(bindData->getNodeOutput()->constCast<NodeExpression>().getInternalID());
columns.push_back(rjBindData->lengthExpr);
if (rjBindData->writePath) {
if (rjBindData->extendDirection == ExtendDirection::BOTH) {
columns.push_back(rjBindData->directionExpr);
}
columns.push_back(rjBindData->pathNodeIDsExpr);
columns.push_back(rjBindData->pathEdgeIDsExpr);
}
Expand Down
Loading

0 comments on commit 16ac83f

Please sign in to comment.