diff --git a/dbms/src/Common/ColumnsHashing.h b/dbms/src/Common/ColumnsHashing.h index 442cb153cce..30f9bddea3f 100644 --- a/dbms/src/Common/ColumnsHashing.h +++ b/dbms/src/Common/ColumnsHashing.h @@ -48,8 +48,10 @@ struct HashMethodOneNumber using Self = HashMethodOneNumber; using Base = columns_hashing_impl::HashMethodBase; using KeyHolderType = FieldType; + using BatchKeyHolderType = KeyHolderType; static constexpr bool is_serialized_key = false; + static constexpr bool can_batch_get_key_holder = false; const FieldType * vec; @@ -110,7 +112,7 @@ class KeyStringBatchHandlerBase const auto row = batch_row_idx + i; const auto last_offset = offsets[row - 1]; // Remove last zero byte. - StringRef key(chars + last_offset, offsets[row] - offsets[row -1 ] - 1); + StringRef key(chars + last_offset, offsets[row] - last_offset - 1); if constexpr (has_collator) key = derived_collator->sortKey(key.data, key.size, sort_key_containers[i]); @@ -121,15 +123,12 @@ class KeyStringBatchHandlerBase void santityCheck() const { - // Make sure init() has called. + // Make sure init() has been called. assert(sort_key_containers.size() == batch_rows.size() && !sort_key_containers.empty()); } protected: - bool inited() const - { - return !sort_key_containers.empty(); - } + bool inited() const { return !sort_key_containers.empty(); } void init(size_t start_row, size_t max_batch_size) { @@ -149,20 +148,20 @@ class KeyStringBatchHandlerBase if likely (collator) { -#define M(VAR_PREFIX, COLLATOR_NAME, IMPL_TYPE, COLLATOR_ID) \ - case (COLLATOR_ID): \ - { \ - return prepareNextBatchType(chars, offsets, cur_batch_size, collator); \ - break; \ - } +#define M(VAR_PREFIX, COLLATOR_NAME, IMPL_TYPE, COLLATOR_ID) \ + case (COLLATOR_ID): \ + { \ + return prepareNextBatchType(chars, offsets, cur_batch_size, collator); \ + break; \ + } switch (collator->getCollatorId()) { APPLY_FOR_COLLATOR_TYPES(M) - default: - { - throw Exception(fmt::format("unexpected collator: {}", collator->getCollatorId())); - } + default: + { + throw Exception(fmt::format("unexpected collator: {}", collator->getCollatorId())); + } }; #undef M } @@ -172,6 +171,7 @@ class KeyStringBatchHandlerBase } } +public: // NOTE: i is the index of mini batch, it's not the row index of Column. ALWAYS_INLINE inline ArenaKeyHolder getKeyHolderBatch(size_t i, Arena * pool) const { @@ -195,6 +195,8 @@ struct HashMethodString using BatchHandlerBase = KeyStringBatchHandlerBase; static constexpr bool is_serialized_key = false; + // todo + static constexpr bool can_batch_get_key_holder = false; const IColumn::Offset * offsets; const UInt8 * chars; @@ -213,11 +215,6 @@ struct HashMethodString collator = collators[0]; } - bool batchGetkeyHolder() override - { - return BatchHandlerBase::inited(); - } - void initBatchHandler(size_t start_row, size_t max_batch_size) { assert(!BatchHandlerBase::inited()); @@ -260,6 +257,7 @@ struct HashMethodStringBin using BatchKeyHolderType = KeyHolderType; static constexpr bool is_serialized_key = false; + static constexpr bool can_batch_get_key_holder = false; const IColumn::Offset * offsets; const UInt8 * chars; @@ -461,6 +459,7 @@ struct HashMethodFastPathTwoKeysSerialized using BatchKeyHolderType = KeyHolderType; static constexpr bool is_serialized_key = true; + static constexpr bool can_batch_get_key_holder = false; Key1Desc key_1_desc; Key2Desc key_2_desc; @@ -499,6 +498,7 @@ struct HashMethodFixedString using BatchKeyHolderType = KeyHolderType; static constexpr bool is_serialized_key = false; + static constexpr bool can_batch_get_key_holder = false; size_t n; const ColumnFixedString::Chars_t * chars; @@ -548,6 +548,7 @@ struct HashMethodKeysFixed using BatchKeyHolderType = KeyHolderType; static constexpr bool is_serialized_key = false; + static constexpr bool can_batch_get_key_holder = false; static constexpr bool has_nullable_keys = has_nullable_keys_; Sizes key_sizes; @@ -713,10 +714,7 @@ class KeySerializedBatchHandlerBase } protected: - bool inited() const - { - return !byte_size.empty(); - } + bool inited() const { return !byte_size.empty(); } void init(size_t start_row, const ColumnRawPtrs & key_columns, const TiDB::TiDBCollators & collators) { @@ -756,7 +754,7 @@ class KeySerializedBatchHandlerBase pos, batch_row_idx, cur_batch_size, - false, + nullptr, collators.empty() ? nullptr : collators[i], &sort_key_container); @@ -768,6 +766,7 @@ class KeySerializedBatchHandlerBase return mem_size; } +public: // NOTE: i is the index of mini batch, it's not the row index of Column. ALWAYS_INLINE inline ArenaKeyHolder getKeyHolderBatch(size_t i, Arena * pool) const { @@ -790,10 +789,12 @@ struct HashMethodSerialized using Self = HashMethodSerialized; using Base = columns_hashing_impl::HashMethodBase; using BatchHandlerBase = KeySerializedBatchHandlerBase; - static constexpr bool is_serialized_key = true; using KeyHolderType = SerializedKeyHolder; using BatchKeyHolderType = ArenaKeyHolder; + static constexpr bool is_serialized_key = true; + static constexpr bool can_batch_get_key_holder = true; + ColumnRawPtrs key_columns; size_t keys_size; TiDB::TiDBCollators collators; @@ -807,11 +808,6 @@ struct HashMethodSerialized , collators(collators_) {} - bool batchGetkeyHolder() override - { - return BatchHandlerBase::inited(); - } - void initBatchHandler(size_t start_row, size_t /* max_batch_size */) { assert(!BatchHandlerBase::inited()); @@ -849,6 +845,7 @@ struct HashMethodHashed using BatchKeyHolderType = KeyHolderType; static constexpr bool is_serialized_key = false; + static constexpr bool can_batch_get_key_holder = false; ColumnRawPtrs key_columns; TiDB::TiDBCollators collators; diff --git a/dbms/src/Common/ColumnsHashingImpl.h b/dbms/src/Common/ColumnsHashingImpl.h index a9a23db14f5..bf130d2bd29 100644 --- a/dbms/src/Common/ColumnsHashingImpl.h +++ b/dbms/src/Common/ColumnsHashingImpl.h @@ -129,11 +129,6 @@ class HashMethodBase using Cache = LastElementCache; using Derived = TDerived; - bool batchGetKeyHolder() const override - { - return false; - } - template ALWAYS_INLINE inline EmplaceResult emplaceKey( Data & data, diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 9bf9402a7ed..c27921c6d99 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -46,6 +46,9 @@ extern const char random_fail_in_resize_callback[]; extern const char force_agg_prefetch[]; } // namespace FailPoints +static constexpr size_t agg_prefetch_step = 16; +static constexpr size_t agg_mini_batch = 256; + #define AggregationMethodName(NAME) AggregatedDataVariants::AggregationMethod_##NAME #define AggregationMethodNameTwoLevel(NAME) AggregatedDataVariants::AggregationMethod_##NAME##_two_level #define AggregationMethodType(NAME) AggregatedDataVariants::Type::NAME @@ -625,7 +628,8 @@ AggregatedDataVariants::Type Aggregator::chooseAggregationMethod() if (params.keys_size == 1 && types_not_null[0]->isFixedString()) return AggregatedDataVariants::Type::key_fixed_string; - return ChooseAggregationMethodFastPath(params.keys_size, types_not_null, params.collators); + // return ChooseAggregationMethodFastPath(params.keys_size, types_not_null, params.collators); + return AggregatedDataVariants::Type::serialized; } @@ -652,13 +656,6 @@ void Aggregator::createAggregateStates(AggregateDataPtr & aggregate_data) const } } -template -concept HasBatchGetKeyHolderMemberFunc = requires -{ - // todo also require initBatchHandler() - {std::declval().getKeyHolderBatch(std::declval(), std::declval())} -> std::same_as; -}; - /** It's interesting - if you remove `noinline`, then gcc for some reason will inline this function, and the performance decreases (~ 10%). * (Probably because after the inline of this function, more internal functions no longer be inlined.) * Inline does not make sense, since the inner loop is entirely inside this function. @@ -684,49 +681,63 @@ void NO_INLINE Aggregator::executeImpl( // For key_serialized, memory allocation and key serialization will be batch-wise. // For key_string, collation decode will be batch-wise. - if constexpr (HasBatchGetKeyHolderMemberFunc) + static constexpr bool batch_get_key_holder = Method::State::can_batch_get_key_holder; + if constexpr (batch_get_key_holder) { - state.initBatchHandler(agg_process_info.start_row); + state.initBatchHandler(agg_process_info.start_row, agg_mini_batch); result.batch_get_key_holder = true; } + using KeyHolderType = typename std::conditional< + batch_get_key_holder, + typename Method::State::BatchKeyHolderType, + typename Method::State::KeyHolderType>::type; if constexpr (Method::Data::is_string_hash_map) { // StringHashMap doesn't support prefetch. - executeImplBatch(method, state, aggregates_pool, agg_process_info); + executeImplBatch< + collect_hit_rate, + only_lookup, + /*enable_prefetch=*/false, + batch_get_key_holder, + KeyHolderType>(method, state, aggregates_pool, agg_process_info); } else { if (disable_prefetch) { - if constexpr (state.batchGetKeyHolder()) - executeImplBatch(method, state, aggregates_pool, agg_process_info); - else - executeImplBatch(method, state, aggregates_pool, agg_process_info); + executeImplBatch< + collect_hit_rate, + only_lookup, + /*enable_prefetch=*/false, + batch_get_key_holder, + KeyHolderType>(method, state, aggregates_pool, agg_process_info); } else { - if constexpr (state.batchGetKeyHolder()) - executeImplBatch(method, state, aggregates_pool, agg_process_info); - else - executeImplBatch(method, state, aggregates_pool, agg_process_info); + executeImplBatch< + collect_hit_rate, + only_lookup, + /*enable_prefetch=*/true, + batch_get_key_holder, + KeyHolderType>(method, state, aggregates_pool, agg_process_info); } } } -template +template std::optional::ResultType> Aggregator::emplaceOrFindKey( Method & method, typename Method::State & state, - typename Method::State::Derived::KeyHolderType && key_holder, + KeyHolderType & key_holder, size_t hashval) const { try { if constexpr (only_lookup) - return state.template findKey(method.data, std::move(key_holder), hashval); + return state.template findKey(method.data, key_holder, hashval); else - return state.template emplaceKey(method.data, std::move(key_holder), hashval); + return state.template emplaceKey(method.data, key_holder, hashval); } catch (ResizeException &) { @@ -755,7 +766,7 @@ std::optional::Res } } -template +template ALWAYS_INLINE inline void setupHashVals( size_t row_idx, size_t batch_size, @@ -766,14 +777,13 @@ ALWAYS_INLINE inline void setupHashVals( Method & method, typename Method::State & state) { - assert(hashvals.size() == key_holders.size() && hashvals.size() == batch_size); + assert(hashvals.size() == key_holders.size() && hashvals.size() >= batch_size); for (size_t i = row_idx, j = 0; i < row_idx + batch_size; ++i, ++j) { - if constexpr (Method::State::batch_get_key_holder) - key_holders[j] = static_cast(&state)->getKeyHolderBatch( - j, - aggregates_pool); + if constexpr (batch_get_key_holder) + key_holders[j] + = static_cast(&state)->getKeyHolderBatch(j, aggregates_pool); else key_holders[j] = static_cast(&state)->getKeyHolder( i, @@ -783,7 +793,13 @@ ALWAYS_INLINE inline void setupHashVals( } } -template +template < + bool collect_hit_rate, + bool only_lookup, + bool enable_prefetch, + bool batch_get_key_holder, + typename KeyHolderType, + typename Method> ALWAYS_INLINE void Aggregator::executeImplBatch( Method & method, typename Method::State & state, @@ -795,11 +811,13 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( /// Optimization for special case when there are no aggregate functions. if (params.aggregates_size == 0) - return handleOneBatch( - method, - state, - agg_process_info, - aggregates_pool); + return handleOneBatch< + collect_hit_rate, + only_lookup, + enable_prefetch, + batch_get_key_holder, + /*compute_agg_data=*/false, + KeyHolderType>(method, state, agg_process_info, aggregates_pool); /// Optimization for special case when aggregating by 8bit key. if constexpr (std::is_same_v) @@ -841,14 +859,23 @@ ALWAYS_INLINE void Aggregator::executeImplBatch( } /// Generic case. - return handleOneBatch( - method, - state, - agg_process_info, - aggregates_pool); -} - -template + return handleOneBatch< + collect_hit_rate, + only_lookup, + enable_prefetch, + batch_get_key_holder, + /*compute_agg_data=*/true, + KeyHolderType>(method, state, agg_process_info, aggregates_pool); +} + +template < + bool collect_hit_rate, + bool only_lookup, + bool enable_prefetch, + bool batch_get_key_holder, + bool compute_agg_data, + typename KeyHolderType, + typename Method> void Aggregator::handleOneBatch( Method & method, typename Method::State & state, @@ -884,23 +911,32 @@ void Aggregator::handleOneBatch( // mini batch will only be used when HashTable is big(a.k.a enable_prefetch is true), // which can reduce cache miss of agg data. mini_batch_size = agg_mini_batch; - hashvals.resize(agg_mini_batch); - key_holders.resize(agg_mini_batch); } // i is the begin row index of each mini batch. while (i < end) { + if unlikely (i + mini_batch_size > end) + mini_batch_size = end - i; + size_t batch_mem_size = 0; - if constexpr (Method::State::batch_get_key_holder) - batch_mem_size = state.prepareNextBatch(mini_batch_size, &temp_batch_pool); + if constexpr (batch_get_key_holder) + batch_mem_size = state.prepareNextBatch(&temp_batch_pool, mini_batch_size); - if constexpr (enable_prefetch) + if constexpr (enable_prefetch || batch_get_key_holder) { - if unlikely (i + mini_batch_size > end) - mini_batch_size = end - i; + hashvals.resize(mini_batch_size); + key_holders.resize(mini_batch_size); - setupHashVals(i, mini_batch_size, hashvals, key_holders, aggregates_pool, sort_key_containers, method, state); + setupHashVals( + i, + mini_batch_size, + hashvals, + key_holders, + aggregates_pool, + sort_key_containers, + method, + state); } const auto cur_batch_end = i + mini_batch_size; @@ -915,13 +951,11 @@ void Aggregator::handleOneBatch( if likely (k + agg_prefetch_step < hashvals.size()) method.data.prefetch(hashvals[k + agg_prefetch_step]); - emplace_result_holder - = emplaceOrFindKey(method, state, std::move(key_holders[k]), hashvals[k]); + emplace_result_holder = emplaceOrFindKey(method, state, key_holders[k], hashvals[k]); } - else if constexpr (Method::State::batch_get_key_holder) + else if constexpr (batch_get_key_holder) { - emplace_result_holder - = emplaceOrFindKey(method, state, std::move(key_holders[k]), hashvals[k]); + emplace_result_holder = emplaceOrFindKey(method, state, key_holders[k], hashvals[k]); } else { @@ -987,7 +1021,8 @@ void Aggregator::handleOneBatch( processed_rows = j; } - if constexpr (Method::State::batch_get_key_holder) + // todo rollback all to avoid wasting align. + if constexpr (batch_get_key_holder) temp_batch_pool.rollback(batch_mem_size); if unlikely (!processed_rows.has_value()) @@ -1193,7 +1228,7 @@ bool Aggregator::executeOnBlockImpl( { \ executeImpl( \ *ToAggregationMethodPtr(NAME, result.aggregation_method_impl), \ - result, \ + result, \ agg_process_info, \ params.collators); \ break; \ @@ -1301,39 +1336,51 @@ Block Aggregator::convertOneBucketToBlock( size_t bucket) const { const bool batch_get_key_holder = data_variants.batch_get_key_holder; -#define FILLER_DEFINE(name, skip_convert_key) \ - auto filler_##name = [bucket, &method, arena, this, batch_get_key_holder]( \ - const Sizes & key_sizes, \ - MutableColumns & key_columns, \ - AggregateColumnsData & aggregate_columns, \ - MutableColumns & final_aggregate_columns, \ - bool final_) { \ - using METHOD_TYPE = std::decay_t; \ - using DATA_TYPE = std::decay_t; \ - if (METHOD_TYPE::State::is_serialized_key && batch_get_key_holder) \ - { \ - convertToBlockImpl( \ - method, \ - method.data.impls[bucket], \ - key_sizes, \ - key_columns, \ - aggregate_columns, \ - final_aggregate_columns, \ - arena, \ - final_); \ - } \ - else \ - { \ - convertToBlockImpl( \ - method, \ - method.data.impls[bucket], \ - key_sizes, \ - key_columns, \ - aggregate_columns, \ - final_aggregate_columns, \ - arena, \ - final_); \ - } \ +#define FILLER_DEFINE(name, skip_convert_key) \ + auto filler_##name = [bucket, &method, arena, this, batch_get_key_holder]( \ + const Sizes & key_sizes, \ + MutableColumns & key_columns, \ + AggregateColumnsData & aggregate_columns, \ + MutableColumns & final_aggregate_columns, \ + bool final_) { \ + (void)batch_get_key_holder; \ + using METHOD_TYPE = std::decay_t; \ + using DATA_TYPE = std::decay_t; \ + if constexpr (METHOD_TYPE::State::is_serialized_key && METHOD_TYPE::State::can_batch_get_key_holder) \ + { \ + if (batch_get_key_holder) \ + convertToBlockImpl( \ + method, \ + method.data.impls[bucket], \ + key_sizes, \ + key_columns, \ + aggregate_columns, \ + final_aggregate_columns, \ + arena, \ + final_); \ + else \ + convertToBlockImpl( \ + method, \ + method.data.impls[bucket], \ + key_sizes, \ + key_columns, \ + aggregate_columns, \ + final_aggregate_columns, \ + arena, \ + final_); \ + } \ + else \ + { \ + convertToBlockImpl( \ + method, \ + method.data.impls[bucket], \ + key_sizes, \ + key_columns, \ + aggregate_columns, \ + final_aggregate_columns, \ + arena, \ + final_); \ + } \ } FILLER_DEFINE(convert_key, false); @@ -1377,37 +1424,49 @@ BlocksList Aggregator::convertOneBucketToBlocks( size_t bucket) const { const auto batch_get_key_holder = data_variants.batch_get_key_holder; -#define FILLER_DEFINE(name, skip_convert_key) \ - auto filler_##name = [bucket, &method, arena, this, batch_deserialize_key]( \ - const Sizes & key_sizes, \ - std::vector & key_columns_vec, \ - std::vector & aggregate_columns_vec, \ - std::vector & final_aggregate_columns_vec, \ - bool final_) { \ - if (Method::State::is_serialized_key && batch_get_key_holder) \ - { \ - convertToBlocksImpl( \ - method, \ - method.data.impls[bucket], \ - key_sizes, \ - key_columns_vec, \ - aggregate_columns_vec, \ - final_aggregate_columns_vec, \ - arena, \ - final_); \ - } \ - else \ - { \ - convertToBlocksImpl( \ - method, \ - method.data.impls[bucket], \ - key_sizes, \ - key_columns_vec, \ - aggregate_columns_vec, \ - final_aggregate_columns_vec, \ - arena, \ - final_); \ - } \ +#define FILLER_DEFINE(name, skip_convert_key) \ + auto filler_##name = [bucket, &method, arena, this, batch_get_key_holder]( \ + const Sizes & key_sizes, \ + std::vector & key_columns_vec, \ + std::vector & aggregate_columns_vec, \ + std::vector & final_aggregate_columns_vec, \ + bool final_) { \ + (void)batch_get_key_holder; \ + if constexpr (Method::State::is_serialized_key && Method::State::can_batch_get_key_holder) \ + { \ + if (batch_get_key_holder) \ + convertToBlocksImpl( \ + method, \ + method.data.impls[bucket], \ + key_sizes, \ + key_columns_vec, \ + aggregate_columns_vec, \ + final_aggregate_columns_vec, \ + arena, \ + final_); \ + else \ + convertToBlocksImpl( \ + method, \ + method.data.impls[bucket], \ + key_sizes, \ + key_columns_vec, \ + aggregate_columns_vec, \ + final_aggregate_columns_vec, \ + arena, \ + final_); \ + } \ + else \ + { \ + convertToBlocksImpl( \ + method, \ + method.data.impls[bucket], \ + key_sizes, \ + key_columns_vec, \ + aggregate_columns_vec, \ + final_aggregate_columns_vec, \ + arena, \ + final_); \ + } \ }; FILLER_DEFINE(convert_key, false); @@ -1819,7 +1878,7 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( if constexpr (batch_deserialize_key) { // Assume key is StringRef, because only key_string and key_serialize can be batch-wise. - key_places.push_back(key.data); + key_places.push_back(const_cast(key.data)); } else { @@ -1921,12 +1980,15 @@ void NO_INLINE Aggregator::convertToBlocksImplFinal( if constexpr (batch_deserialize_key) { // Assume key is StringRef, because only key_string and key_serialize can be batch-wise. - key_places.push_back(key.data); + key_places.push_back(const_cast(key.data)); } else { - agg_keys_helpers[key_columns_vec_index] - ->insertKeyIntoColumns(key, key_columns_vec[key_columns_vec_index], key_sizes_ref, params.collators); + agg_keys_helpers[key_columns_vec_index]->insertKeyIntoColumns( + key, + key_columns_vec[key_columns_vec_index], + key_sizes_ref, + params.collators); } } places[data_index] = mapped; @@ -1934,8 +1996,11 @@ void NO_INLINE Aggregator::convertToBlocksImplFinal( if unlikely (data_index == current_bound) { - method.insertKeyIntoColumnsBatch(key_places, key_columns_vec[key_columns_vec_index]); - key_places.clear(); + if constexpr (!skip_convert_key && batch_deserialize_key) + { + method.insertKeyIntoColumnsBatch(key_places, key_columns_vec[key_columns_vec_index]); + key_places.clear(); + } ++key_columns_vec_index; current_bound += params.max_block_size; @@ -1988,7 +2053,7 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( } agg_keys_helper.initAggKeys(data.size(), key_columns); } - + // For key_serialized, deserialize will be batch-wise if it's serialized batch-wise. PaddedPODArray key_places; if constexpr (batch_deserialize_key) @@ -2000,7 +2065,7 @@ void NO_INLINE Aggregator::convertToBlockImplNotFinal( if constexpr (batch_deserialize_key) { // Assume key is StringRef. - key_places.push_back(key.data); + key_places.push_back(const_cast(key.data)); } else { @@ -2063,12 +2128,15 @@ void NO_INLINE Aggregator::convertToBlocksImplNotFinal( if constexpr (batch_deserialize_key) { // Assume key is StringRef, because only key_string and key_serialize can be batch-wise. - key_places.push_back(key.data); + key_places.push_back(const_cast(key.data)); } else { - agg_keys_helpers[key_columns_vec_index] - ->insertKeyIntoColumns(key, key_columns_vec[key_columns_vec_index], key_sizes_ref, params.collators); + agg_keys_helpers[key_columns_vec_index]->insertKeyIntoColumns( + key, + key_columns_vec[key_columns_vec_index], + key_sizes_ref, + params.collators); } } @@ -2081,8 +2149,11 @@ void NO_INLINE Aggregator::convertToBlocksImplNotFinal( if unlikely (data_index == current_bound) { - method.insertKeyIntoColumnsBatch(key_places, key_columns_vec[++key_columns_vec_index]); - key_places.clear(); + if constexpr (!skip_convert_key && batch_deserialize_key) + { + method.insertKeyIntoColumnsBatch(key_places, key_columns_vec[++key_columns_vec_index]); + key_places.clear(); + } ++key_columns_vec_index; current_bound += params.max_block_size; @@ -2344,26 +2415,38 @@ BlocksList Aggregator::prepareBlocksAndFillSingleLevel(AggregatedDataVariants & { size_t rows = data_variants.size(); const bool batch_get_key_holder = data_variants.batch_get_key_holder; -#define M(NAME, skip_convert_key) \ - case AggregationMethodType(NAME): \ - { \ - auto & tmp_method = *ToAggregationMethodPtr(NAME, data_variants.aggregation_method_impl); \ - auto & tmp_data = ToAggregationMethodPtr(NAME, data_variants.aggregation_method_impl) -> data; \ - if (decltype(tmp_method)::State::is_serialized_key && batch_get_key_holder) \ - { \ - convertToBlocksImpl( \ - tmp_method, \ - tmp_data, \ - key_sizes, \ - key_columns_vec, \ - aggregate_columns_vec, \ - final_aggregate_columns_vec, \ - data_variants.aggregates_pool, \ - final_); \ - } \ - else \ - { \ - convertToBlocksImpl( \ +#define M(NAME, skip_convert_key) \ + case AggregationMethodType(NAME): \ + { \ + auto & tmp_method = *ToAggregationMethodPtr(NAME, data_variants.aggregation_method_impl); \ + auto & tmp_data = ToAggregationMethodPtr(NAME, data_variants.aggregation_method_impl) -> data; \ + using MethodType = std::decay_t; \ + if constexpr (MethodType::State::is_serialized_key && MethodType::State::can_batch_get_key_holder) \ + { \ + if (batch_get_key_holder) \ + convertToBlocksImpl( \ + tmp_method, \ + tmp_data, \ + key_sizes, \ + key_columns_vec, \ + aggregate_columns_vec, \ + final_aggregate_columns_vec, \ + data_variants.aggregates_pool, \ + final_); \ + else \ + convertToBlocksImpl( \ + tmp_method, \ + tmp_data, \ + key_sizes, \ + key_columns_vec, \ + aggregate_columns_vec, \ + final_aggregate_columns_vec, \ + data_variants.aggregates_pool, \ + final_); \ + } \ + else \ + { \ + convertToBlocksImpl( \ tmp_method, \ tmp_data, \ key_sizes, \ @@ -2372,20 +2455,21 @@ BlocksList Aggregator::prepareBlocksAndFillSingleLevel(AggregatedDataVariants & final_aggregate_columns_vec, \ data_variants.aggregates_pool, \ final_); \ - } \ - break; \ + } \ + break; \ } #define M_skip_convert_key(NAME) M(NAME, true) #define M_convert_key(NAME) M(NAME, false) #define FILLER_DEFINE(name, M_tmp) \ - auto filler_##name = [&data_variants, this, batch_get_key_holder]( \ + auto filler_##name = [&data_variants, this, batch_get_key_holder]( \ const Sizes & key_sizes, \ std::vector & key_columns_vec, \ std::vector & aggregate_columns_vec, \ std::vector & final_aggregate_columns_vec, \ bool final_) { \ + (void)batch_get_key_holder; \ switch (data_variants.type) \ { \ APPLY_FOR_VARIANTS_SINGLE_LEVEL(M_tmp) \ @@ -2576,7 +2660,7 @@ MergingBucketsPtr Aggregator::mergeAndConvertToBlocks( } for (auto & data : non_empty_data) - RUNTIME_CHECK(non_empty_data[0]->batch_get_key_holder == data.batch_get_key_holder); + RUNTIME_CHECK(non_empty_data[0]->batch_get_key_holder == data->batch_get_key_holder); // for single level merge, concurrency must be 1. size_t merge_concurrency = has_at_least_one_two_level ? std::max(max_threads, 1) : 1; diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 5220ed70916..272ee342dfe 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -52,9 +52,6 @@ class IBlockOutputStream; template class AggHashTableToBlocksBlockInputStream; -static constexpr size_t agg_prefetch_step = 16; -static constexpr size_t agg_mini_batch = 256; - /** Different data structures that can be used for aggregation * For efficiency, the aggregation data itself is put into the pool. * Data and pool ownership (states of aggregate functions) @@ -717,12 +714,11 @@ struct AggregationMethodSerialized pos = key_columns[i]->deserializeAndInsertFromArena(pos, collators.empty() ? nullptr : collators[i]); } - static void insertKeyIntoColumnsBatch( - PaddedPODArray & key_places, - std::vector & key_columns) + static void insertKeyIntoColumnsBatch(PaddedPODArray & key_places, std::vector & key_columns) { + // todo: nt optimization for (auto * key_column : key_columns) - key_column->deserializeForCmpAndInsertFromPos(key_places, true); + key_column->deserializeForCmpAndInsertFromPos(key_places, false); } }; @@ -1461,29 +1457,42 @@ class Aggregator template void executeImpl( Method & method, - Arena * aggregates_pool, + AggregatedDataVariants & result, AggProcessInfo & agg_process_info, TiDB::TiDBCollators & collators) const; - template + template < + bool collect_hit_rate, + bool only_loopup, + bool enable_prefetch, + bool batch_get_key_holder, + typename KeyHolderType, + typename Method> void executeImplBatch( Method & method, typename Method::State & state, Arena * aggregates_pool, AggProcessInfo & agg_process_info) const; - template + template < + bool collect_hit_rate, + bool only_lookup, + bool enable_prefetch, + bool batch_get_key_holder, + bool compute_agg_data, + typename KeyHolderType, + typename Method> void handleOneBatch( Method & method, typename Method::State & state, AggProcessInfo & agg_process_info, Arena * aggregates_pool) const; - template + template std::optional::ResultType> emplaceOrFindKey( Method & method, typename Method::State & state, - typename Method::State::Derived::KeyHolderType && key_holder, + KeyHolderType & key_holder, size_t hashval) const; template