Skip to content

Commit

Permalink
Parallel DISTINCT for SimpleAggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Feb 19, 2025
1 parent 7d2e7db commit 3df11ac
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 141 deletions.
25 changes: 20 additions & 5 deletions src/include/processor/operator/aggregate/aggregate_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include "processor/result/factorized_table_schema.h"

namespace kuzu {
namespace common {
class InMemOverflowBuffer;
}
namespace storage {
class MemoryManager;
}
Expand Down Expand Up @@ -292,16 +295,28 @@ struct AggregateHashTableUtils {
storage::MemoryManager& memoryManager,
const std::vector<common::LogicalType>& groupByKeyTypes,
const common::LogicalType& distinctKeyType);

static FactorizedTableSchema getTableSchemaForKeys(
const std::vector<common::LogicalType>& groupByKeyTypes,
const common::LogicalType& distinctKeyType);
};

// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor): This is a final class.
class HashAggregateSharedState;
// Separate class since the SimpleAggregate has multiple different top-level destinations for
// partitioning
class AggregatePartitioningData {
public:
virtual ~AggregatePartitioningData() = default;

Check warning on line 308 in src/include/processor/operator/aggregate/aggregate_hash_table.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/aggregate/aggregate_hash_table.h#L308

Added line #L308 was not covered by tests
virtual void appendTuples(const FactorizedTable& table, ft_col_offset_t hashOffset) = 0;
virtual void appendDistinctTuple(size_t /*distinctFuncIndex*/, std::span<uint8_t> /*tuple*/,
common::hash_t /*hash*/) = 0;
virtual void appendOverflow(common::InMemOverflowBuffer&& overflowBuffer) = 0;
};

// Fixed-sized Aggregate hash table that flushes tuples into partitions in the
// HashAggregateSharedState when full
class PartitioningAggregateHashTable final : public AggregateHashTable {
public:
PartitioningAggregateHashTable(HashAggregateSharedState* sharedState,
PartitioningAggregateHashTable(AggregatePartitioningData* partitioningData,
storage::MemoryManager& memoryManager, std::vector<common::LogicalType> keyTypes,
std::vector<common::LogicalType> payloadTypes,
const std::vector<function::AggregateFunction>& aggregateFunctions,
Expand All @@ -310,7 +325,7 @@ class PartitioningAggregateHashTable final : public AggregateHashTable {
: AggregateHashTable(memoryManager, std::move(keyTypes), std::move(payloadTypes),
aggregateFunctions, distinctAggKeyTypes,
common::DEFAULT_VECTOR_CAPACITY /*minimum size*/, tableSchema.copy()),
sharedState{sharedState}, tableSchema{std::move(tableSchema)} {}
tableSchema{std::move(tableSchema)}, partitioningData{partitioningData} {}

uint64_t append(const std::vector<common::ValueVector*>& flatKeyVectors,
const std::vector<common::ValueVector*>& unFlatKeyVectors,
Expand All @@ -321,8 +336,8 @@ class PartitioningAggregateHashTable final : public AggregateHashTable {
void mergeAll();

private:
HashAggregateSharedState* sharedState;
FactorizedTableSchema tableSchema;
AggregatePartitioningData* partitioningData;
};

} // namespace processor
Expand Down
64 changes: 63 additions & 1 deletion src/include/processor/operator/aggregate/base_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,87 @@
#include <mutex>

#include "aggregate_input.h"
#include "common/mpsc_queue.h"
#include "function/aggregate_function.h"
#include "processor/operator/sink.h"
#include "processor/result/factorized_table.h"
#include "processor/result/factorized_table_schema.h"

namespace kuzu {
namespace main {
class ClientContext;
}
namespace processor {
class AggregateHashTable;

size_t getNumPartitionsForParallelism(main::ClientContext* context);

class BaseAggregateSharedState {
protected:
explicit BaseAggregateSharedState(
const std::vector<function::AggregateFunction>& aggregateFunctions);
const std::vector<function::AggregateFunction>& aggregateFunctions, size_t numPartitions);

virtual std::pair<uint64_t, uint64_t> getNextRangeToRead() = 0;

~BaseAggregateSharedState() = default;

void finalizeAggregateHashTable(const AggregateHashTable& localHashTable);

class HashTableQueue {
public:
HashTableQueue(storage::MemoryManager* memoryManager, FactorizedTableSchema tableSchema);

std::unique_ptr<HashTableQueue> copy() const {
return std::make_unique<HashTableQueue>(headBlock.load()->table.getMemoryManager(),
headBlock.load()->table.getTableSchema()->copy());
}
~HashTableQueue();

void appendTuple(std::span<uint8_t> tuple);

void mergeInto(AggregateHashTable& hashTable);

bool empty() const {
auto headBlock = this->headBlock.load();
return (headBlock == nullptr || headBlock->numTuplesReserved == 0) &&
queuedTuples.approxSize() == 0;
}

struct TupleBlock {
TupleBlock(storage::MemoryManager* memoryManager, FactorizedTableSchema tableSchema)
: numTuplesReserved{0}, numTuplesWritten{0},
table{memoryManager, std::move(tableSchema)} {
// Start at a fixed capacity of one full block (so that concurrent writes are safe).
// If it is not filled, we resize it to the actual capacity before writing it to the
// hashTable
table.resize(table.getNumTuplesPerBlock());
}
// numTuplesReserved may be greater than the capacity of the factorizedTable
// if threads try to write to it while a new block is being allocated
// So it should not be relied on for anything other than reserving tuples
std::atomic<uint64_t> numTuplesReserved;
// Set after the tuple has been written to the block.
// Once numTuplesWritten == factorizedTable.getNumTuplesPerBlock() all writes have
// finished
std::atomic<uint64_t> numTuplesWritten;
FactorizedTable table;
};
common::MPSCQueue<TupleBlock*> queuedTuples;
// When queueing tuples, they are always added to the headBlock until the headBlock is full
// (numTuplesReserved >= factorizedTable.getNumTuplesPerBlock()), then pushed into the
// queuedTuples (at which point, the numTuplesReserved may not be equal to the
// numTuplesWritten)
std::atomic<TupleBlock*> headBlock;
uint64_t numTuplesPerBlock;
};

protected:
std::mutex mtx;
uint64_t currentOffset;
std::vector<function::AggregateFunction> aggregateFunctions;
std::atomic<size_t> numThreadsFinishedProducing;
std::atomic<size_t> numThreads;
uint8_t shiftForPartitioning;
};

class BaseAggregate : public Sink {
Expand Down
79 changes: 19 additions & 60 deletions src/include/processor/operator/aggregate/hash_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,34 @@ struct HashAggregateInfo {
};

// NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor): This is a final class.
class HashAggregateSharedState final : public BaseAggregateSharedState {
class HashAggregateSharedState final : public BaseAggregateSharedState,
public AggregatePartitioningData {

public:
explicit HashAggregateSharedState(main::ClientContext* context, HashAggregateInfo hashAggInfo,
const std::vector<function::AggregateFunction>& aggregateFunctions,
std::span<AggregateInfo> aggregateInfos, std::vector<common::LogicalType> keyTypes,
std::vector<common::LogicalType> payloadTypes);

void appendTuple(std::span<uint8_t> tuple, common::hash_t hash) {
auto& partition =
globalPartitions[(hash >> shiftForPartitioning) % globalPartitions.size()];
partition.queue->appendTuple(tuple);
void appendTuples(const FactorizedTable& factorizedTable, ft_col_offset_t hashOffset) override {
auto numBytesPerTuple = factorizedTable.getTableSchema()->getNumBytesPerTuple();
for (ft_tuple_idx_t tupleIdx = 0; tupleIdx < factorizedTable.getNumTuples(); tupleIdx++) {
auto tuple = factorizedTable.getTuple(tupleIdx);
auto hash = *reinterpret_cast<common::hash_t*>(tuple + hashOffset);
auto& partition =
globalPartitions[(hash >> shiftForPartitioning) % globalPartitions.size()];
partition.queue->appendTuple(std::span(tuple, numBytesPerTuple));
}
}

void appendDistinctTuple(size_t distinctFuncIndex, std::span<uint8_t> tuple,
common::hash_t hash) {
common::hash_t hash) override {
auto& partition =
globalPartitions[(hash >> shiftForPartitioning) % globalPartitions.size()];
partition.distinctTableQueues[distinctFuncIndex]->appendTuple(tuple);
}

void appendOverflow(common::InMemOverflowBuffer&& overflowBuffer) {
void appendOverflow(common::InMemOverflowBuffer&& overflowBuffer) override {
overflow.push(std::make_unique<common::InMemOverflowBuffer>(std::move(overflowBuffer)));
}

Expand Down Expand Up @@ -89,56 +95,6 @@ class HashAggregateSharedState final : public BaseAggregateSharedState {
std::tuple<const FactorizedTable*, common::offset_t> getPartitionForOffset(
common::offset_t offset) const;

public:
HashAggregateInfo aggInfo;
common::MPSCQueue<std::unique_ptr<common::InMemOverflowBuffer>> overflow;
class HashTableQueue {
public:
HashTableQueue(storage::MemoryManager* memoryManager, FactorizedTableSchema tableSchema);

std::unique_ptr<HashTableQueue> copy() const {
return std::make_unique<HashTableQueue>(headBlock.load()->table.getMemoryManager(),
headBlock.load()->table.getTableSchema()->copy());
}
~HashTableQueue();

void appendTuple(std::span<uint8_t> tuple);

void mergeInto(AggregateHashTable& hashTable);

bool empty() const {
auto headBlock = this->headBlock.load();
return (headBlock == nullptr || headBlock->numTuplesReserved == 0) &&
queuedTuples.approxSize() == 0;
}

private:
struct TupleBlock {
TupleBlock(storage::MemoryManager* memoryManager, FactorizedTableSchema tableSchema)
: numTuplesReserved{0}, numTuplesWritten{0},
table{memoryManager, std::move(tableSchema)} {
// Start at a fixed capacity of one full block (so that concurrent writes are safe).
// If it is not filled, we resize it to the actual capacity before writing it to the
// hashTable
table.resize(table.getNumTuplesPerBlock());
}
// numTuplesReserved may be greater than the capacity of the factorizedTable
// if threads try to write to it while a new block is being allocated
// So it should not be relied on for anything other than reserving tuples
std::atomic<uint64_t> numTuplesReserved;
// Set after the tuple has been written to the block.
// Once numTuplesWritten == factorizedTable.getNumTuplesPerBlock() all writes have
// finished
std::atomic<uint64_t> numTuplesWritten;
FactorizedTable table;
};
common::MPSCQueue<TupleBlock*> queuedTuples;
// When queueing tuples, they are always added to the headBlock until the headBlock is full
// (numTuplesReserved >= factorizedTable.getNumTuplesPerBlock()), then pushed into the
// queuedTuples (at which point, the numTuplesReserved may not be equal to the
// numTuplesWritten)
std::atomic<TupleBlock*> headBlock;
};
struct Partition {
std::unique_ptr<AggregateHashTable> hashTable;
std::mutex mtx;
Expand All @@ -148,11 +104,14 @@ class HashAggregateSharedState final : public BaseAggregateSharedState {
std::vector<std::unique_ptr<HashTableQueue>> distinctTableQueues;
std::atomic<bool> finalized = false;
};
std::vector<Partition> globalPartitions;

public:
HashAggregateInfo aggInfo;
common::MPSCQueue<std::unique_ptr<common::InMemOverflowBuffer>> overflow;
uint64_t limitNumber;
storage::MemoryManager* memoryManager;
uint8_t shiftForPartitioning;
bool readyForFinalization = false;
std::vector<Partition> globalPartitions;
bool readyForFinalization;
};

struct HashAggregateLocalState {
Expand Down
Loading

0 comments on commit 3df11ac

Please sign in to comment.