From 8a2979786fce48f5a5e0a49db05abf6f8880f7d1 Mon Sep 17 00:00:00 2001 From: Royi Luo Date: Fri, 14 Feb 2025 14:31:05 -0500 Subject: [PATCH] Add empty columns to chunked node group if needed during COPY (#4882) --- .../operator/persistent/rel_batch_insert.h | 2 +- .../storage/store/chunked_node_group.h | 3 ++ .../storage/store/csr_chunked_node_group.h | 13 ++++++++ .../storage/store/node_group_collection.h | 4 +-- src/include/storage/store/node_table.h | 2 +- .../operator/persistent/node_batch_insert.cpp | 4 +-- .../operator/persistent/rel_batch_insert.cpp | 33 +++++++++++++------ src/storage/store/chunked_node_group.cpp | 29 ++++++++++++++++ src/storage/store/node_group_collection.cpp | 11 +++++-- src/storage/store/node_table.cpp | 8 +++-- test/test_files/ddl/ddl_empty.test | 33 +++++++++++++++++++ 11 files changed, 121 insertions(+), 21 deletions(-) diff --git a/src/include/processor/operator/persistent/rel_batch_insert.h b/src/include/processor/operator/persistent/rel_batch_insert.h index cfcfe3cece2..c534108efee 100644 --- a/src/include/processor/operator/persistent/rel_batch_insert.h +++ b/src/include/processor/operator/persistent/rel_batch_insert.h @@ -107,7 +107,7 @@ class RelBatchInsert final : public BatchInsert { void updateProgress(const ExecutionContext* context) const; private: - static void appendNodeGroup(transaction::Transaction* transaction, + static void appendNodeGroup(storage::MemoryManager& mm, transaction::Transaction* transaction, storage::CSRNodeGroup& nodeGroup, const RelBatchInsertInfo& relInfo, const RelBatchInsertLocalState& localState, BatchInsertSharedState& sharedState, const PartitionerSharedState& partitionerSharedState); diff --git a/src/include/storage/store/chunked_node_group.h b/src/include/storage/store/chunked_node_group.h index 71647ef6adc..f9e727be403 100644 --- a/src/include/storage/store/chunked_node_group.h +++ b/src/include/storage/store/chunked_node_group.h @@ -37,6 +37,9 @@ class ChunkedNodeGroup { common::row_idx_t startRowIdx, NodeGroupDataFormat format = NodeGroupDataFormat::REGULAR); ChunkedNodeGroup(ChunkedNodeGroup& base, const std::vector& selectedColumns); + ChunkedNodeGroup(MemoryManager& mm, ChunkedNodeGroup& base, + std::span columnTypes, + std::span baseColumnIDs); ChunkedNodeGroup(MemoryManager& mm, const std::vector& columnTypes, bool enableCompression, uint64_t capacity, common::row_idx_t startRowIdx, ResidencyState residencyState, NodeGroupDataFormat format = NodeGroupDataFormat::REGULAR); diff --git a/src/include/storage/store/csr_chunked_node_group.h b/src/include/storage/store/csr_chunked_node_group.h index 6719e9c0827..93688d6842a 100644 --- a/src/include/storage/store/csr_chunked_node_group.h +++ b/src/include/storage/store/csr_chunked_node_group.h @@ -101,6 +101,11 @@ class ChunkedCSRNodeGroup final : public ChunkedNodeGroup { ChunkedCSRNodeGroup(ChunkedCSRNodeGroup& base, const std::vector& selectedColumns) : ChunkedNodeGroup{base, selectedColumns}, csrHeader{std::move(base.csrHeader)} {} + ChunkedCSRNodeGroup(MemoryManager& mm, ChunkedCSRNodeGroup& base, + std::span columnTypes, + std::span baseColumnIDs) + : ChunkedNodeGroup(mm, base, columnTypes, baseColumnIDs), + csrHeader(std::move(base.csrHeader)) {} ChunkedCSRNodeGroup(ChunkedCSRHeader csrHeader, std::vector> chunks, common::row_idx_t startRowIdx) : ChunkedNodeGroup{std::move(chunks), startRowIdx, NodeGroupDataFormat::CSR}, @@ -126,6 +131,14 @@ class ChunkedCSRNodeGroup final : public ChunkedNodeGroup { void flush(FileHandle& dataFH) override; + // this does not override ChunkedNodeGroup::merge() since clang-tidy analyzer + // seems to struggle with detecting the std::move of the header unless this is inlined + void mergeChunkedCSRGroup(ChunkedCSRNodeGroup& base, + const std::vector& columnsToMergeInto) { + ChunkedNodeGroup::merge(base, columnsToMergeInto); + csrHeader = std::move(base.csrHeader); + } + private: ChunkedCSRHeader csrHeader; }; diff --git a/src/include/storage/store/node_group_collection.h b/src/include/storage/store/node_group_collection.h index ff0a49d5f51..d93da764431 100644 --- a/src/include/storage/store/node_group_collection.h +++ b/src/include/storage/store/node_group_collection.h @@ -30,8 +30,8 @@ class NodeGroupCollection { // The returned values are the startOffset and numValuesAppended. // NOTE: This is specially coded to only be used by NodeBatchInsert for now. std::pair appendToLastNodeGroupAndFlushWhenFull( - transaction::Transaction* transaction, const std::vector& columnIDs, - ChunkedNodeGroup& chunkedGroup); + MemoryManager& mm, transaction::Transaction* transaction, + const std::vector& columnIDs, ChunkedNodeGroup& chunkedGroup); common::row_idx_t getNumTotalRows() const; common::node_group_idx_t getNumNodeGroups() const { diff --git a/src/include/storage/store/node_table.h b/src/include/storage/store/node_table.h index db5ec27ad1b..4334caef526 100644 --- a/src/include/storage/store/node_table.h +++ b/src/include/storage/store/node_table.h @@ -172,7 +172,7 @@ class KUZU_API NodeTable final : public Table { return *columns[columnID]; } - std::pair appendToLastNodeGroup( + std::pair appendToLastNodeGroup(MemoryManager& mm, transaction::Transaction* transaction, const std::vector& columnIDs, ChunkedNodeGroup& chunkedGroup); diff --git a/src/processor/operator/persistent/node_batch_insert.cpp b/src/processor/operator/persistent/node_batch_insert.cpp index ccbb9fbd774..d609225c930 100644 --- a/src/processor/operator/persistent/node_batch_insert.cpp +++ b/src/processor/operator/persistent/node_batch_insert.cpp @@ -175,8 +175,8 @@ void NodeBatchInsert::writeAndResetNodeGroup(transaction::Transaction* transacti // we only need to write the main data in the chunked node group, the extra data is only used // during the lifetime of this operator to populate error messages ChunkedNodeGroup sliceToWriteToDisk(*nodeGroup, info->outputDataColumns); - auto [nodeOffset, numRowsWritten] = - nodeTable->appendToLastNodeGroup(transaction, info->insertColumnIDs, sliceToWriteToDisk); + auto [nodeOffset, numRowsWritten] = nodeTable->appendToLastNodeGroup(*mm, transaction, + info->insertColumnIDs, sliceToWriteToDisk); nodeGroup->merge(sliceToWriteToDisk, info->outputDataColumns); if (indexBuilder) { diff --git a/src/processor/operator/persistent/rel_batch_insert.cpp b/src/processor/operator/persistent/rel_batch_insert.cpp index 5d9ea17882a..7192b6aa7f9 100644 --- a/src/processor/operator/persistent/rel_batch_insert.cpp +++ b/src/processor/operator/persistent/rel_batch_insert.cpp @@ -72,13 +72,14 @@ void RelBatchInsert::executeInternal(ExecutionContext* context) { ->getOrCreateNodeGroup(context->clientContext->getTransaction(), relLocalState->nodeGroupIdx, relInfo->direction) ->cast(); - appendNodeGroup(context->clientContext->getTransaction(), nodeGroup, *relInfo, - *relLocalState, *sharedState, *partitionerSharedState); + appendNodeGroup(*context->clientContext->getMemoryManager(), + context->clientContext->getTransaction(), nodeGroup, *relInfo, *relLocalState, + *sharedState, *partitionerSharedState); updateProgress(context); } } -static void appendNewChunkedGroup(transaction::Transaction* transaction, +static void appendNewChunkedGroup(MemoryManager& mm, transaction::Transaction* transaction, const std::vector& columnIDs, ChunkedCSRNodeGroup& chunkedGroup, RelTable& relTable, CSRNodeGroup& nodeGroup, RelDataDirection direction) { const bool isNewNodeGroup = nodeGroup.isEmpty(); @@ -92,15 +93,23 @@ static void appendNewChunkedGroup(transaction::Transaction* transaction, if (isNewNodeGroup) { auto flushedChunkedGroup = chunkedGroup.flushAsNewChunkedNodeGroup(transaction, *relTable.getDataFH()); - nodeGroup.setPersistentChunkedGroup(std::move(flushedChunkedGroup)); + + // If there are deleted columns that haven't been vaccumed yet + // we need to add extra columns to the chunked group + // to ensure that the number of columns is consistent with the rest of the node group + auto persistentChunkedGroup = std::make_unique(mm, + flushedChunkedGroup->cast(), nodeGroup.getDataTypes(), columnIDs); + + nodeGroup.setPersistentChunkedGroup(std::move(persistentChunkedGroup)); } else { nodeGroup.appendChunkedCSRGroup(transaction, columnIDs, chunkedGroup); } } -void RelBatchInsert::appendNodeGroup(transaction::Transaction* transaction, CSRNodeGroup& nodeGroup, - const RelBatchInsertInfo& relInfo, const RelBatchInsertLocalState& localState, - BatchInsertSharedState& sharedState, const PartitionerSharedState& partitionerSharedState) { +void RelBatchInsert::appendNodeGroup(MemoryManager& mm, transaction::Transaction* transaction, + CSRNodeGroup& nodeGroup, const RelBatchInsertInfo& relInfo, + const RelBatchInsertLocalState& localState, BatchInsertSharedState& sharedState, + const PartitionerSharedState& partitionerSharedState) { const auto nodeGroupIdx = localState.nodeGroupIdx; auto partitioningBuffer = partitionerSharedState.getPartitionBuffer(relInfo.partitioningIdx, localState.nodeGroupIdx); @@ -144,9 +153,13 @@ void RelBatchInsert::appendNodeGroup(transaction::Transaction* transaction, CSRN localState.chunkedGroup->finalize(); auto* relTable = sharedState.table->ptrCast(); - appendNewChunkedGroup(transaction, relInfo.insertColumnIDs, - localState.chunkedGroup->cast(), *relTable, nodeGroup, - relInfo.direction); + + ChunkedCSRNodeGroup sliceToWriteToDisk{localState.chunkedGroup->cast(), + relInfo.outputDataColumns}; + appendNewChunkedGroup(mm, transaction, relInfo.insertColumnIDs, sliceToWriteToDisk, *relTable, + nodeGroup, relInfo.direction); + localState.chunkedGroup->cast().mergeChunkedCSRGroup(sliceToWriteToDisk, + relInfo.outputDataColumns); localState.chunkedGroup->resetToEmpty(); } diff --git a/src/storage/store/chunked_node_group.cpp b/src/storage/store/chunked_node_group.cpp index e5c4d906844..463b5029d8f 100644 --- a/src/storage/store/chunked_node_group.cpp +++ b/src/storage/store/chunked_node_group.cpp @@ -53,6 +53,35 @@ ChunkedNodeGroup::ChunkedNodeGroup(MemoryManager& mm, const std::vector columnTypes, + std::span baseColumnIDs) + : format{base.format}, residencyState{base.residencyState}, startRowIdx{base.startRowIdx}, + capacity{base.capacity}, numRows{base.numRows.load()}, + versionInfo(std::move(base.versionInfo)), dataInUse{true} { + bool enableCompression = false; + KU_ASSERT(!baseColumnIDs.empty()); + + chunks.resize(columnTypes.size()); + + KU_ASSERT(base.getNumColumns() == baseColumnIDs.size()); + for (column_id_t i = 0; i < baseColumnIDs.size(); ++i) { + auto baseColumnID = baseColumnIDs[i]; + KU_ASSERT(baseColumnID < chunks.size()); + chunks[baseColumnID] = base.moveColumnChunk(i); + enableCompression = chunks[baseColumnID]->isCompressionEnabled(); + KU_ASSERT(chunks[baseColumnID]->getDataType().getPhysicalType() == + columnTypes[baseColumnID].getPhysicalType()); + } + + for (column_id_t i = 0; i < columnTypes.size(); ++i) { + if (chunks[i] == nullptr) { + chunks[i] = std::make_unique(mm, columnTypes[i].copy(), 0, + enableCompression, ResidencyState::IN_MEMORY); + } + } +} + void ChunkedNodeGroup::merge(ChunkedNodeGroup& base, const std::vector& columnsToMergeInto) { KU_ASSERT(base.getNumColumns() == columnsToMergeInto.size()); diff --git a/src/storage/store/node_group_collection.cpp b/src/storage/store/node_group_collection.cpp index f2787cf0676..669247fbdbb 100644 --- a/src/storage/store/node_group_collection.cpp +++ b/src/storage/store/node_group_collection.cpp @@ -105,7 +105,7 @@ void NodeGroupCollection::append(const Transaction* transaction, } std::pair NodeGroupCollection::appendToLastNodeGroupAndFlushWhenFull( - Transaction* transaction, const std::vector& columnIDs, + MemoryManager& mm, Transaction* transaction, const std::vector& columnIDs, ChunkedNodeGroup& chunkedGroup) { NodeGroup* lastNodeGroup = nullptr; offset_t startOffset = 0; @@ -142,8 +142,15 @@ std::pair NodeGroupCollection::appendToLastNodeGroupAndFlush if (directFlushWhenAppend) { chunkedGroup.finalize(); auto flushedGroup = chunkedGroup.flushAsNewChunkedNodeGroup(transaction, *dataFH); + + // If there are deleted columns that haven't been vaccumed yet + // we need to add extra columns to the chunked group + // to ensure that the number of columns is consistent with the rest of the node group + auto groupToMerge = std::make_unique(mm, *flushedGroup, + lastNodeGroup->getDataTypes(), columnIDs); + KU_ASSERT(lastNodeGroup->getNumChunkedGroups() == 0); - lastNodeGroup->merge(transaction, std::move(flushedGroup)); + lastNodeGroup->merge(transaction, std::move(groupToMerge)); } return {startOffset, numToAppend}; } diff --git a/src/storage/store/node_table.cpp b/src/storage/store/node_table.cpp index e7d94aef076..800c42fb685 100644 --- a/src/storage/store/node_table.cpp +++ b/src/storage/store/node_table.cpp @@ -457,10 +457,12 @@ void NodeTable::addColumn(Transaction* transaction, TableAddColumnState& addColu hasChanges = true; } -std::pair NodeTable::appendToLastNodeGroup(Transaction* transaction, - const std::vector& columnIDs, ChunkedNodeGroup& chunkedGroup) { +std::pair NodeTable::appendToLastNodeGroup(MemoryManager& mm, + Transaction* transaction, const std::vector& columnIDs, + ChunkedNodeGroup& chunkedGroup) { hasChanges = true; - return nodeGroups->appendToLastNodeGroupAndFlushWhenFull(transaction, columnIDs, chunkedGroup); + return nodeGroups->appendToLastNodeGroupAndFlushWhenFull(mm, transaction, columnIDs, + chunkedGroup); } DataChunk NodeTable::constructDataChunkForPKColumn() const { diff --git a/test/test_files/ddl/ddl_empty.test b/test/test_files/ddl/ddl_empty.test index 6c448fde49e..34d00f91558 100644 --- a/test/test_files/ddl/ddl_empty.test +++ b/test/test_files/ddl/ddl_empty.test @@ -89,6 +89,19 @@ Karissa|40 Zhang|50 Noura|25 +-CASE CopyNodeAfterDropAddColNewGroup +-STATEMENT create node table Comment (id int64, creationDate INT64, locationIP STRING, browserUsed STRING, content STRING, length INT32, PRIMARY KEY (id)); +---- ok +-STATEMENT ALTER TABLE Comment DROP length; +---- ok +-STATEMENT ALTER TABLE Comment ADD size INT64; +---- ok +-STATEMENT COPY Comment FROM '${KUZU_ROOT_DIRECTORY}/dataset/ldbc-sf01/Comment.csv'; +---- ok +-STATEMENT MATCH (n:Comment) WHERE n.id = 1030792151057 return n.size; +---- 1 +86 + -CASE CopyRelAfterDropAddCol -STATEMENT CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name)); ---- ok @@ -111,3 +124,23 @@ Adam|2020|Zhang Karissa|2021|Zhang Zhang|2022|Noura Zhang||Adam + +-CASE CopyRelAfterDropAddColNewGroup +-STATEMENT CREATE NODE TABLE User(name STRING, age INT64, PRIMARY KEY (name)); +---- ok +-STATEMENT CREATE REL TABLE Follows(FROM User TO User, comment STRING); +---- ok +-STATEMENT COPY User FROM '${KUZU_ROOT_DIRECTORY}/dataset/demo-db/csv/user.csv'; +---- ok +-STATEMENT ALTER TABLE Follows DROP comment; +---- ok +-STATEMENT ALTER TABLE Follows ADD since INT64; +---- ok +-STATEMENT COPY Follows FROM '${KUZU_ROOT_DIRECTORY}/dataset/demo-db/csv/follows.csv'; +---- ok +-STATEMENT MATCH (a:User)-[e:Follows]->(b:User) RETURN a.name, e.since, b.name; +---- 4 +Adam|2020|Karissa +Adam|2020|Zhang +Karissa|2021|Zhang +Zhang|2022|Noura