Skip to content

Commit

Permalink
Add empty columns to chunked node group if needed during COPY (#4882)
Browse files Browse the repository at this point in the history
  • Loading branch information
royi-luo authored Feb 14, 2025
1 parent 0e1c19f commit 8a29797
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class RelBatchInsert final : public BatchInsert {
void updateProgress(const ExecutionContext* context) const;

private:
static void appendNodeGroup(transaction::Transaction* transaction,
static void appendNodeGroup(storage::MemoryManager& mm, transaction::Transaction* transaction,
storage::CSRNodeGroup& nodeGroup, const RelBatchInsertInfo& relInfo,
const RelBatchInsertLocalState& localState, BatchInsertSharedState& sharedState,
const PartitionerSharedState& partitionerSharedState);
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/store/chunked_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class ChunkedNodeGroup {
common::row_idx_t startRowIdx, NodeGroupDataFormat format = NodeGroupDataFormat::REGULAR);
ChunkedNodeGroup(ChunkedNodeGroup& base,
const std::vector<common::column_id_t>& selectedColumns);
ChunkedNodeGroup(MemoryManager& mm, ChunkedNodeGroup& base,
std::span<const common::LogicalType> columnTypes,
std::span<const common::column_id_t> baseColumnIDs);
ChunkedNodeGroup(MemoryManager& mm, const std::vector<common::LogicalType>& columnTypes,
bool enableCompression, uint64_t capacity, common::row_idx_t startRowIdx,
ResidencyState residencyState, NodeGroupDataFormat format = NodeGroupDataFormat::REGULAR);
Expand Down
13 changes: 13 additions & 0 deletions src/include/storage/store/csr_chunked_node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ class ChunkedCSRNodeGroup final : public ChunkedNodeGroup {
ChunkedCSRNodeGroup(ChunkedCSRNodeGroup& base,
const std::vector<common::column_id_t>& selectedColumns)
: ChunkedNodeGroup{base, selectedColumns}, csrHeader{std::move(base.csrHeader)} {}
ChunkedCSRNodeGroup(MemoryManager& mm, ChunkedCSRNodeGroup& base,
std::span<const common::LogicalType> columnTypes,
std::span<const common::column_id_t> baseColumnIDs)
: ChunkedNodeGroup(mm, base, columnTypes, baseColumnIDs),
csrHeader(std::move(base.csrHeader)) {}
ChunkedCSRNodeGroup(ChunkedCSRHeader csrHeader,
std::vector<std::unique_ptr<ColumnChunk>> chunks, common::row_idx_t startRowIdx)
: ChunkedNodeGroup{std::move(chunks), startRowIdx, NodeGroupDataFormat::CSR},
Expand All @@ -126,6 +131,14 @@ class ChunkedCSRNodeGroup final : public ChunkedNodeGroup {

void flush(FileHandle& dataFH) override;

// this does not override ChunkedNodeGroup::merge() since clang-tidy analyzer
// seems to struggle with detecting the std::move of the header unless this is inlined
void mergeChunkedCSRGroup(ChunkedCSRNodeGroup& base,
const std::vector<common::column_id_t>& columnsToMergeInto) {
ChunkedNodeGroup::merge(base, columnsToMergeInto);
csrHeader = std::move(base.csrHeader);
}

private:
ChunkedCSRHeader csrHeader;
};
Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/store/node_group_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class NodeGroupCollection {
// The returned values are the startOffset and numValuesAppended.
// NOTE: This is specially coded to only be used by NodeBatchInsert for now.
std::pair<common::offset_t, common::offset_t> appendToLastNodeGroupAndFlushWhenFull(
transaction::Transaction* transaction, const std::vector<common::column_id_t>& columnIDs,
ChunkedNodeGroup& chunkedGroup);
MemoryManager& mm, transaction::Transaction* transaction,
const std::vector<common::column_id_t>& columnIDs, ChunkedNodeGroup& chunkedGroup);

common::row_idx_t getNumTotalRows() const;
common::node_group_idx_t getNumNodeGroups() const {
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class KUZU_API NodeTable final : public Table {
return *columns[columnID];
}

std::pair<common::offset_t, common::offset_t> appendToLastNodeGroup(
std::pair<common::offset_t, common::offset_t> appendToLastNodeGroup(MemoryManager& mm,
transaction::Transaction* transaction, const std::vector<common::column_id_t>& columnIDs,
ChunkedNodeGroup& chunkedGroup);

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/node_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ void NodeBatchInsert::writeAndResetNodeGroup(transaction::Transaction* transacti
// we only need to write the main data in the chunked node group, the extra data is only used
// during the lifetime of this operator to populate error messages
ChunkedNodeGroup sliceToWriteToDisk(*nodeGroup, info->outputDataColumns);
auto [nodeOffset, numRowsWritten] =
nodeTable->appendToLastNodeGroup(transaction, info->insertColumnIDs, sliceToWriteToDisk);
auto [nodeOffset, numRowsWritten] = nodeTable->appendToLastNodeGroup(*mm, transaction,
info->insertColumnIDs, sliceToWriteToDisk);
nodeGroup->merge(sliceToWriteToDisk, info->outputDataColumns);

if (indexBuilder) {
Expand Down
33 changes: 23 additions & 10 deletions src/processor/operator/persistent/rel_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ void RelBatchInsert::executeInternal(ExecutionContext* context) {
->getOrCreateNodeGroup(context->clientContext->getTransaction(),
relLocalState->nodeGroupIdx, relInfo->direction)
->cast<CSRNodeGroup>();
appendNodeGroup(context->clientContext->getTransaction(), nodeGroup, *relInfo,
*relLocalState, *sharedState, *partitionerSharedState);
appendNodeGroup(*context->clientContext->getMemoryManager(),
context->clientContext->getTransaction(), nodeGroup, *relInfo, *relLocalState,
*sharedState, *partitionerSharedState);
updateProgress(context);
}
}

static void appendNewChunkedGroup(transaction::Transaction* transaction,
static void appendNewChunkedGroup(MemoryManager& mm, transaction::Transaction* transaction,
const std::vector<column_id_t>& columnIDs, ChunkedCSRNodeGroup& chunkedGroup,
RelTable& relTable, CSRNodeGroup& nodeGroup, RelDataDirection direction) {
const bool isNewNodeGroup = nodeGroup.isEmpty();
Expand All @@ -92,15 +93,23 @@ static void appendNewChunkedGroup(transaction::Transaction* transaction,
if (isNewNodeGroup) {
auto flushedChunkedGroup =
chunkedGroup.flushAsNewChunkedNodeGroup(transaction, *relTable.getDataFH());
nodeGroup.setPersistentChunkedGroup(std::move(flushedChunkedGroup));

// If there are deleted columns that haven't been vaccumed yet
// we need to add extra columns to the chunked group
// to ensure that the number of columns is consistent with the rest of the node group
auto persistentChunkedGroup = std::make_unique<ChunkedCSRNodeGroup>(mm,
flushedChunkedGroup->cast<ChunkedCSRNodeGroup>(), nodeGroup.getDataTypes(), columnIDs);

nodeGroup.setPersistentChunkedGroup(std::move(persistentChunkedGroup));
} else {
nodeGroup.appendChunkedCSRGroup(transaction, columnIDs, chunkedGroup);
}
}

void RelBatchInsert::appendNodeGroup(transaction::Transaction* transaction, CSRNodeGroup& nodeGroup,
const RelBatchInsertInfo& relInfo, const RelBatchInsertLocalState& localState,
BatchInsertSharedState& sharedState, const PartitionerSharedState& partitionerSharedState) {
void RelBatchInsert::appendNodeGroup(MemoryManager& mm, transaction::Transaction* transaction,
CSRNodeGroup& nodeGroup, const RelBatchInsertInfo& relInfo,
const RelBatchInsertLocalState& localState, BatchInsertSharedState& sharedState,
const PartitionerSharedState& partitionerSharedState) {
const auto nodeGroupIdx = localState.nodeGroupIdx;
auto partitioningBuffer =
partitionerSharedState.getPartitionBuffer(relInfo.partitioningIdx, localState.nodeGroupIdx);
Expand Down Expand Up @@ -144,9 +153,13 @@ void RelBatchInsert::appendNodeGroup(transaction::Transaction* transaction, CSRN
localState.chunkedGroup->finalize();

auto* relTable = sharedState.table->ptrCast<RelTable>();
appendNewChunkedGroup(transaction, relInfo.insertColumnIDs,
localState.chunkedGroup->cast<ChunkedCSRNodeGroup>(), *relTable, nodeGroup,
relInfo.direction);

ChunkedCSRNodeGroup sliceToWriteToDisk{localState.chunkedGroup->cast<ChunkedCSRNodeGroup>(),
relInfo.outputDataColumns};
appendNewChunkedGroup(mm, transaction, relInfo.insertColumnIDs, sliceToWriteToDisk, *relTable,
nodeGroup, relInfo.direction);
localState.chunkedGroup->cast<ChunkedCSRNodeGroup>().mergeChunkedCSRGroup(sliceToWriteToDisk,
relInfo.outputDataColumns);

localState.chunkedGroup->resetToEmpty();
}
Expand Down
29 changes: 29 additions & 0 deletions src/storage/store/chunked_node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,35 @@ ChunkedNodeGroup::ChunkedNodeGroup(MemoryManager& mm, const std::vector<LogicalT
}
}

ChunkedNodeGroup::ChunkedNodeGroup(MemoryManager& mm, ChunkedNodeGroup& base,
std::span<const common::LogicalType> columnTypes,
std::span<const common::column_id_t> baseColumnIDs)
: format{base.format}, residencyState{base.residencyState}, startRowIdx{base.startRowIdx},
capacity{base.capacity}, numRows{base.numRows.load()},
versionInfo(std::move(base.versionInfo)), dataInUse{true} {
bool enableCompression = false;
KU_ASSERT(!baseColumnIDs.empty());

chunks.resize(columnTypes.size());

KU_ASSERT(base.getNumColumns() == baseColumnIDs.size());
for (column_id_t i = 0; i < baseColumnIDs.size(); ++i) {
auto baseColumnID = baseColumnIDs[i];
KU_ASSERT(baseColumnID < chunks.size());
chunks[baseColumnID] = base.moveColumnChunk(i);
enableCompression = chunks[baseColumnID]->isCompressionEnabled();
KU_ASSERT(chunks[baseColumnID]->getDataType().getPhysicalType() ==
columnTypes[baseColumnID].getPhysicalType());
}

for (column_id_t i = 0; i < columnTypes.size(); ++i) {
if (chunks[i] == nullptr) {
chunks[i] = std::make_unique<ColumnChunk>(mm, columnTypes[i].copy(), 0,
enableCompression, ResidencyState::IN_MEMORY);
}
}
}

void ChunkedNodeGroup::merge(ChunkedNodeGroup& base,
const std::vector<column_id_t>& columnsToMergeInto) {
KU_ASSERT(base.getNumColumns() == columnsToMergeInto.size());
Expand Down
11 changes: 9 additions & 2 deletions src/storage/store/node_group_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void NodeGroupCollection::append(const Transaction* transaction,
}

std::pair<offset_t, offset_t> NodeGroupCollection::appendToLastNodeGroupAndFlushWhenFull(
Transaction* transaction, const std::vector<column_id_t>& columnIDs,
MemoryManager& mm, Transaction* transaction, const std::vector<column_id_t>& columnIDs,
ChunkedNodeGroup& chunkedGroup) {
NodeGroup* lastNodeGroup = nullptr;
offset_t startOffset = 0;
Expand Down Expand Up @@ -142,8 +142,15 @@ std::pair<offset_t, offset_t> NodeGroupCollection::appendToLastNodeGroupAndFlush
if (directFlushWhenAppend) {
chunkedGroup.finalize();
auto flushedGroup = chunkedGroup.flushAsNewChunkedNodeGroup(transaction, *dataFH);

// If there are deleted columns that haven't been vaccumed yet
// we need to add extra columns to the chunked group
// to ensure that the number of columns is consistent with the rest of the node group
auto groupToMerge = std::make_unique<ChunkedNodeGroup>(mm, *flushedGroup,
lastNodeGroup->getDataTypes(), columnIDs);

KU_ASSERT(lastNodeGroup->getNumChunkedGroups() == 0);
lastNodeGroup->merge(transaction, std::move(flushedGroup));
lastNodeGroup->merge(transaction, std::move(groupToMerge));
}
return {startOffset, numToAppend};
}
Expand Down
8 changes: 5 additions & 3 deletions src/storage/store/node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,12 @@ void NodeTable::addColumn(Transaction* transaction, TableAddColumnState& addColu
hasChanges = true;
}

std::pair<offset_t, offset_t> NodeTable::appendToLastNodeGroup(Transaction* transaction,
const std::vector<column_id_t>& columnIDs, ChunkedNodeGroup& chunkedGroup) {
std::pair<offset_t, offset_t> NodeTable::appendToLastNodeGroup(MemoryManager& mm,
Transaction* transaction, const std::vector<column_id_t>& columnIDs,
ChunkedNodeGroup& chunkedGroup) {
hasChanges = true;
return nodeGroups->appendToLastNodeGroupAndFlushWhenFull(transaction, columnIDs, chunkedGroup);
return nodeGroups->appendToLastNodeGroupAndFlushWhenFull(mm, transaction, columnIDs,
chunkedGroup);
}

DataChunk NodeTable::constructDataChunkForPKColumn() const {
Expand Down
33 changes: 33 additions & 0 deletions test/test_files/ddl/ddl_empty.test
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,19 @@ Karissa|40
Zhang|50
Noura|25

-CASE CopyNodeAfterDropAddColNewGroup
-STATEMENT create node table Comment (id int64, creationDate INT64, locationIP STRING, browserUsed STRING, content STRING, length INT32, PRIMARY KEY (id));
---- ok
-STATEMENT ALTER TABLE Comment DROP length;
---- ok
-STATEMENT ALTER TABLE Comment ADD size INT64;
---- ok
-STATEMENT COPY Comment FROM '${KUZU_ROOT_DIRECTORY}/dataset/ldbc-sf01/Comment.csv';
---- ok
-STATEMENT MATCH (n:Comment) WHERE n.id = 1030792151057 return n.size;
---- 1
86

-CASE CopyRelAfterDropAddCol
-STATEMENT CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name));
---- ok
Expand All @@ -111,3 +124,23 @@ Adam|2020|Zhang
Karissa|2021|Zhang
Zhang|2022|Noura
Zhang||Adam

-CASE CopyRelAfterDropAddColNewGroup
-STATEMENT CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name));
---- ok
-STATEMENT CREATE REL TABLE Follows(FROM User TO User, comment STRING);
---- ok
-STATEMENT COPY User FROM '${KUZU_ROOT_DIRECTORY}/dataset/demo-db/csv/user.csv';
---- ok
-STATEMENT ALTER TABLE Follows DROP comment;
---- ok
-STATEMENT ALTER TABLE Follows ADD since INT64;
---- ok
-STATEMENT COPY Follows FROM '${KUZU_ROOT_DIRECTORY}/dataset/demo-db/csv/follows.csv';
---- ok
-STATEMENT MATCH (a:User)-[e:Follows]->(b:User) RETURN a.name, e.since, b.name;
---- 4
Adam|2020|Karissa
Adam|2020|Zhang
Karissa|2021|Zhang
Zhang|2022|Noura

0 comments on commit 8a29797

Please sign in to comment.