Skip to content

Commit

Permalink
zonemap total
Browse files Browse the repository at this point in the history
Signed-off-by: Smith Cruise <chendingchao1@126.com>
  • Loading branch information
Smith-Cruise committed Nov 26, 2024
1 parent 00c97a7 commit 0950aa2
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 133 deletions.
31 changes: 18 additions & 13 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,7 @@ Status HiveDataSource::_init_conjunct_ctxs(RuntimeState* state) {
_update_has_any_predicate();

RETURN_IF_ERROR(_decompose_conjunct_ctxs(state));
{
std::vector<ExprContext*> cloned_conjunct_ctxs;
RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs));
for (auto* ctx : cloned_conjunct_ctxs) {
_all_conjunct_ctxs.emplace_back(ctx);
}

cloned_conjunct_ctxs.clear();
RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _conjunct_ctxs, &cloned_conjunct_ctxs));
for (auto* ctx : cloned_conjunct_ctxs) {
_all_conjunct_ctxs.emplace_back(ctx);
}
}
RETURN_IF_ERROR(_setup_all_conjunct_ctxs(state));
return Status::OK();
}

Expand Down Expand Up @@ -459,6 +447,23 @@ Status HiveDataSource::_decompose_conjunct_ctxs(RuntimeState* state) {
return Status::OK();
}

Status HiveDataSource::_setup_all_conjunct_ctxs(RuntimeState* state) {
// clone conjunct from _min_max_conjunct_ctxs & _conjunct_ctxs
// then we will generate PredicateTree based on _all_conjunct_ctxs
std::vector<ExprContext*> cloned_conjunct_ctxs;
RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _min_max_conjunct_ctxs, &cloned_conjunct_ctxs));
for (auto* ctx : cloned_conjunct_ctxs) {
_all_conjunct_ctxs.emplace_back(ctx);
}

cloned_conjunct_ctxs.clear();
RETURN_IF_ERROR(Expr::clone_if_not_exists(state, &_pool, _conjunct_ctxs, &cloned_conjunct_ctxs));
for (auto* ctx : cloned_conjunct_ctxs) {
_all_conjunct_ctxs.emplace_back(ctx);
}
return Status::OK();
}

void HiveDataSource::_init_counter(RuntimeState* state) {
const auto& hdfs_scan_node = _provider->_hdfs_scan_node;

Expand Down
3 changes: 2 additions & 1 deletion be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class HiveDataSource final : public DataSource {
Status _init_conjunct_ctxs(RuntimeState* state);
void _update_has_any_predicate();
Status _decompose_conjunct_ctxs(RuntimeState* state);
Status _setup_all_conjunct_ctxs(RuntimeState* state);
void _init_tuples_and_slots(RuntimeState* state);
void _init_counter(RuntimeState* state);
void _init_rf_counters();
Expand Down Expand Up @@ -119,7 +120,7 @@ class HiveDataSource final : public DataSource {
// ============ conjuncts =================
std::vector<ExprContext*> _min_max_conjunct_ctxs;

// whole conjuncts, used to generate PredicateTree
// contains whole conjuncts, used to generate PredicateTree
std::vector<ExprContext*> _all_conjunct_ctxs{};
// complex conjuncts, such as contains multi slot, are evaled in scanner.
std::vector<ExprContext*> _scanner_conjunct_ctxs;
Expand Down
23 changes: 11 additions & 12 deletions be/src/formats/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,19 @@ class ColumnReader {

virtual void select_offset_index(const SparseRange<uint64_t>& range, const uint64_t rg_first_row) = 0;

virtual Status row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) const {
// not implemented, means select the whole row group
row_ranges->add({rg_first_row, rg_first_row + rg_num_rows});
return Status::OK();
virtual StatusOr<bool> row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
CompoundNodeType pred_relation, const uint64_t rg_first_row,
const uint64_t rg_num_rows) const {
return true;
}

virtual Status page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) {
// not implemented, means select the whole row group
row_ranges->add({rg_first_row, rg_first_row + rg_num_rows});
return Status::OK();
// return true means page index filter happened
// return false means no page index filter happened
virtual StatusOr<bool> page_index_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
SparseRange<uint64_t>* row_ranges, CompoundNodeType pred_relation,
const uint64_t rg_first_row, const uint64_t rg_num_rows) {
DCHECK(row_ranges->empty());
return false;
}

private:
Expand Down
141 changes: 141 additions & 0 deletions be/src/formats/parquet/complex_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

#include "formats/parquet/complex_column_reader.h"

#include <storage/column_expr_predicate.h>

#include "column/array_column.h"
#include "column/map_column.h"
#include "column/struct_column.h"
#include "exprs/subfield_expr.h"
#include "formats/parquet/schema.h"
#include "gutil/casts.h"
#include "gutil/strings/substitute.h"
Expand Down Expand Up @@ -350,5 +353,143 @@ void StructColumnReader::_handle_null_rows(uint8_t* is_nulls, bool* has_null, si
}
}
}
StatusOr<bool> StructColumnReader::row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
CompoundNodeType pred_relation,
const uint64_t rg_first_row,
const uint64_t rg_num_rows) const {
ObjectPool pool;
std::vector<const ColumnPredicate*> rewritten_predicates;
RETURN_IF_ERROR(_rewrite_column_expr_predicate(&pool, predicates, rewritten_predicates));

auto is_satisfied = [&](const ColumnPredicate* predicate) -> bool {
if (!predicate->is_expr_predicate()) {
// for not supported expr, select it by default
return true;
}

const ColumnExprPredicate* expr_predicate = down_cast<const ColumnExprPredicate*>(predicate);
const std::vector<ExprContext*>& expr_contexts = expr_predicate->get_expr_ctxs();
if (expr_contexts.size() != 1) {
// defense code
return true;
}

ExprContext* expr_context = expr_contexts[0];
Expr* root_expr = expr_context->root();
const std::vector<Expr*>& expr_children = root_expr->children();
Expr* subfield_expr = expr_children[0];
// check there must have two children, and the left one is SubfieldExpr
if (expr_children.size() != 2 || subfield_expr->node_type() != TExprNodeType::type::SUBFIELD_EXPR) {
return true;
}
Expr* right_expr = expr_children[1];

// check exprs are monotonic
if (!root_expr->is_monotonic() || !subfield_expr->is_monotonic() || !right_expr->is_monotonic()) {
return true;
}

std::vector<std::vector<std::string>> subfields{};
int num_subfield = subfield_expr->get_subfields(&subfields);
if (num_subfield != 1) {
// must only exist one subfield
return true;
}

if (subfield_expr->children().size() != 1 && !subfield_expr->get_child(0)->is_slotref()) {
return true;
}

Expr* new_slot_expr = Expr::copy(&pool, subfield_expr->get_child(0));
Expr* new_right_expr = Expr::copy(&pool, right_expr);
Expr* new_root_expr = root_expr->clone(&pool);
new_root_expr->set_monotonic(true);
new_root_expr->add_child(new_slot_expr);
new_root_expr->add_child(new_right_expr);

auto expr_rewrite = std::make_unique<ExprContext>(new_root_expr);
auto st = expr_rewrite->prepare(expr_predicate->runtime_state());
if (!st.ok()) {
return true;
}
st = expr_rewrite->open(expr_predicate->runtime_state());
if (!st.ok()) {
return true;
}
auto stv = ColumnExprPredicate::make_column_expr_predicate(
get_type_info(subfield_expr->type().type, subfield_expr->type().precision, subfield_expr->type().scale),
expr_predicate->column_id(), expr_predicate->runtime_state(), expr_rewrite.get(),
expr_predicate->slot_desc());
if (!stv.ok()) {
return true;
}

ColumnPredicate* new_predicate = stv.value();
ColumnReader* column_reader = nullptr;
for (const std::string& subfield : subfields[0]) {
if (column_reader == nullptr) {
column_reader = get_child_column_reader(subfield);
} else {
StructColumnReader* struct_column_reader = down_cast<StructColumnReader*>(column_reader);
column_reader = struct_column_reader->get_child_column_reader(subfield);
}

if (column_reader == nullptr) {
return true;
}
}

if (column_reader == nullptr) {
return true;
}

if (column_reader->get_column_parquet_field()->type != ColumnType::SCALAR) {
return true;
}

std::vector<const ColumnPredicate*> new_predicates;
new_predicates.emplace_back(new_predicate);
auto res = column_reader->row_group_zone_map_filter(new_predicates, pred_relation, rg_first_row, rg_num_rows);
if (!res.ok()) {
return true;
}
return res.value();
};

if (pred_relation == CompoundNodeType::AND) {
return std::ranges::all_of(rewritten_predicates, [&](const auto* pred) { return is_satisfied(pred); });
} else {
return rewritten_predicates.empty() ||
std::ranges::any_of(rewritten_predicates, [&](const auto* pred) { return is_satisfied(pred); });
}

return Status::OK();
}

Status StructColumnReader::_rewrite_column_expr_predicate(ObjectPool* pool,
const std::vector<const ColumnPredicate*>& src_preds,
std::vector<const ColumnPredicate*>& dst_preds) const {
DCHECK_NOTNULL(pool);

// try to rewrite EQ expr first
for (const ColumnPredicate* predicate : src_preds) {
if (!predicate->is_expr_predicate()) {
dst_preds.emplace_back(predicate);
continue;
}

const ColumnExprPredicate* expr_predicate = down_cast<const ColumnExprPredicate*>(predicate);
std::vector<const ColumnExprPredicate*> output;
RETURN_IF_ERROR(expr_predicate->try_to_rewrite_for_zone_map_filter(pool, &output));
if (output.size() == 0) {
// no rewrite happened, insert the original predicate
dst_preds.emplace_back(predicate);
} else {
// insert rewritten predicates
dst_preds.insert(dst_preds.end(), output.begin(), output.end());
}
}
return Status::OK();
}

} // namespace starrocks::parquet
16 changes: 16 additions & 0 deletions be/src/formats/parquet/complex_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,23 @@ class StructColumnReader final : public ColumnReader {
}
}

StatusOr<bool> row_group_zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
CompoundNodeType pred_relation, const uint64_t rg_first_row,
const uint64_t rg_num_rows) const override;

ColumnReader* get_child_column_reader(const std::string& subfield) const {
auto it = _child_readers.find(subfield);
if (it == _child_readers.end()) {
return nullptr;
} else {
return it->second.get();
}
}

private:
Status _rewrite_column_expr_predicate(ObjectPool* pool, const std::vector<const ColumnPredicate*>& src_pred,
std::vector<const ColumnPredicate*>& dst_preds) const;

void _handle_null_rows(uint8_t* is_nulls, bool* has_null, size_t num_rows);

// _children_readers order is the same as TypeDescriptor children order.
Expand Down
8 changes: 6 additions & 2 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
#include "exprs/runtime_filter.h"
#include "exprs/runtime_filter_bank.h"
#include "formats/parquet/column_converter.h"
#include "formats/parquet/encoding_plain.h"
#include "formats/parquet/metadata.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/parquet/schema.h"
#include "formats/parquet/statistics_helper.h"
#include "formats/parquet/utils.h"
Expand Down Expand Up @@ -326,14 +326,15 @@ bool FileReader::_filter_group_with_more_filter(const GroupReaderPtr& group_read
// when doing row group filter, there maybe some error, but we'd better just ignore it instead of returning the error
// status and lead to the query failed.
bool FileReader::_filter_group(const GroupReaderPtr& group_reader) {
if (_scanner_ctx->conjuncts_manager != nullptr) {
if (config::parquet_advance_zonemap_filter) {
auto res = _scanner_ctx->predicate_tree.visit(
ZoneMapEvaluator<FilterLevel::ROW_GROUP>{_scanner_ctx->predicate_tree, group_reader.get()});
if (!res.ok()) {
LOG(WARNING) << "filter row group failed: " << res.status().message();
return false;
}
if (res.value().has_value() && res.value()->empty()) {
// no rows selected, the whole row group can be filtered
return true;
}
return false;
Expand Down Expand Up @@ -447,6 +448,9 @@ Status FileReader::_init_group_readers() {
_group_reader_param.file_metadata = _file_metadata.get();
_group_reader_param.case_sensitive = fd_scanner_ctx.case_sensitive;
_group_reader_param.lazy_column_coalesce_counter = fd_scanner_ctx.lazy_column_coalesce_counter;
_group_reader_param.partition_columns = &fd_scanner_ctx.partition_columns;
_group_reader_param.partition_values = &fd_scanner_ctx.partition_values;
_group_reader_param.not_existed_slots = &fd_scanner_ctx.not_existed_slots;
// for pageIndex
_group_reader_param.min_max_conjunct_ctxs = fd_scanner_ctx.min_max_conjunct_ctxs;
_group_reader_param.predicate_tree = &fd_scanner_ctx.predicate_tree;
Expand Down
30 changes: 25 additions & 5 deletions be/src/formats/parquet/group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
#include "exec/hdfs_scanner.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "formats/parquet/zone_map_filter_evaluator.h"
#include "formats/parquet/column_reader_factory.h"
#include "formats/parquet/metadata.h"
#include "formats/parquet/page_index_reader.h"
#include "formats/parquet/scalar_column_reader.h"
#include "formats/parquet/schema.h"
#include "formats/parquet/zone_map_filter_evaluator.h"
#include "gutil/strings/substitute.h"
#include "runtime/types.h"
#include "simd/simd.h"
Expand Down Expand Up @@ -84,10 +85,10 @@ Status GroupReader::_deal_with_pageindex() {
*_param.predicate_tree, this}));
if (sparse_range.has_value()) {
if (sparse_range.value().empty()) {
// the whole row group has been filtered
_is_group_filtered = true;
} else if (sparse_range.value() != _range) {
// todo should think about _range
// some pages has been filtered
} else if (sparse_range->span_size() < _row_group_metadata->num_rows) {
// some pages have been filtered
_range = sparse_range.value();
for (const auto& pair : _column_readers) {
pair.second->select_offset_index(_range, _row_group_first_row);
Expand Down Expand Up @@ -342,6 +343,24 @@ Status GroupReader::_create_column_readers() {
ASSIGN_OR_RETURN(ColumnReaderPtr column_reader, _create_column_reader(column));
_column_readers[column.slot_id()] = std::move(column_reader);
}

// create for partition values
if (_param.partition_columns != nullptr && _param.partition_values != nullptr) {
for (size_t i = 0; i < _param.partition_columns->size(); i++) {
const auto& column = (*_param.partition_columns)[i];
const auto* slot_desc = column.slot_desc;
const auto value = (*_param.partition_values)[i];
_column_readers.emplace(slot_desc->id(), std::make_unique<FixedValueColumnReader>(value->get(0)));
}
}

// create for not existed column
if (_param.not_existed_slots != nullptr) {
for (size_t i = 0; i < _param.not_existed_slots->size(); i++) {
const auto* slot = (*_param.not_existed_slots)[i];
_column_readers.emplace(slot->id(), std::make_unique<FixedValueColumnReader>(kNullDatum));
}
}
return Status::OK();
}

Expand Down Expand Up @@ -369,7 +388,8 @@ Status GroupReader::_prepare_column_readers() const {
SCOPED_RAW_TIMER(&_param.stats->column_reader_init_ns);
for (const auto& [slot_id, column_reader] : _column_readers) {
RETURN_IF_ERROR(column_reader->prepare());
if (column_reader->get_column_parquet_field()->is_complex_type()) {
if (column_reader->get_column_parquet_field() != nullptr &&
column_reader->get_column_parquet_field()->is_complex_type()) {
// For complex type columns, we need parse def & rep levels.
// For OptionalColumnReader, by default, we will not parse it's def level for performance. But if
// column is a complex type, we have to parse def level to calculate nullability.
Expand Down
8 changes: 7 additions & 1 deletion be/src/formats/parquet/group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,14 @@ struct GroupReaderParam {

// used for pageIndex
std::vector<ExprContext*> min_max_conjunct_ctxs;

const PredicateTree* predicate_tree = nullptr;

// partition column
const std::vector<HdfsScannerContext::ColumnInfo>* partition_columns = nullptr;
// partition column value which read from hdfs file path
const std::vector<ColumnPtr>* partition_values = nullptr;
// not existed column
const std::vector<SlotDescriptor*>* not_existed_slots = nullptr;
};

class PageIndexReader;
Expand Down
Loading

0 comments on commit 0950aa2

Please sign in to comment.