Skip to content

Commit

Permalink
error handling of max_result_set_writers
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Feb 21, 2025
1 parent a5378e5 commit ecc61e7
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 49 deletions.
37 changes: 26 additions & 11 deletions src/jogasaki/plan/compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,13 @@ void preprocess(
});
}

std::shared_ptr<mirror_container> preprocess_mirror(
std::pair<status, std::shared_ptr<mirror_container>> preprocess_mirror(
maybe_shared_ptr<statement::statement> const& statement,
std::shared_ptr<::yugawara::variable::configurable_provider> const& provider,
compiled_info info
compiled_info info, compiler_context &ctx
) {
auto container = std::make_shared<mirror_container>();
status final_status = status::ok;
switch(statement->kind()) {
case statement::statement_kind::execute:
container->work_level().set_minimum(statement_work_level_kind::key_operation);
Expand Down Expand Up @@ -281,15 +282,27 @@ std::shared_ptr<mirror_container> preprocess_mirror(
);
takatori::plan::enumerate_bottom(
unsafe_downcast<takatori::statement::execute>(*statement).execution_plan(),
[&container](takatori::plan::step const& s) {
[&final_status, &container](takatori::plan::step const& s) {
if (s.kind() == takatori::plan::step_kind::process) {
if (impl::has_emit_operator(s)) {
container->set_partitions(impl::calculate_partition(s));
auto [calculate_status, calculate] = impl::calculate_partition(s);
final_status = calculate_status;
container->set_partitions(calculate);
}
} else {
VLOG_LP(log_error) << "The bottom of graph_type must be process.";
}
});
if (final_status != status::ok) {
auto msg = string_builder{}
<< "The result calculated by calculate_partition("
<< container->get_partitions() << ") exceeded max_result_set_writers("
<< global::config_pool()->max_result_set_writers() << ")"
<< string_builder::to_string;
set_compile_error(
ctx, error_code::sql_execution_exception, msg, status::err_unsupported);
return {final_status, nullptr};
}
break;
case statement::statement_kind::write:
container->work_level().set_minimum(statement_work_level_kind::simple_write);
Expand All @@ -312,7 +325,7 @@ std::shared_ptr<mirror_container> preprocess_mirror(
throw_exception(std::logic_error{""});
}
container->host_variable_info(create_host_variable_info(provider, info));
return container;
return {status::ok, container};
}

template <mizugaki::analyzer::sql_analyzer_result_kind Kind>
Expand Down Expand Up @@ -467,11 +480,15 @@ status create_prepared_statement(
auto stmt = result.release_statement();
stmt->runtime_hint() = sp->result();
auto s = std::shared_ptr<::takatori::statement::statement>(std::move(stmt));
auto [mirror_status, mirror] = preprocess_mirror(s, provider, result.info(), ctx);
if (mirror_status != status::ok) {
return mirror_status;
}
out = std::make_shared<plan::prepared_statement>(
s,
result.info(),
provider,
preprocess_mirror(s, provider, result.info()),
mirror,
ctx.sql_text()
);
return status::ok;
Expand Down Expand Up @@ -1213,7 +1230,7 @@ size_t intermediate_calculate_partition(takatori::plan::step const& s) noexcept
}
return sum;
}
size_t calculate_partition(takatori::plan::step const& s) noexcept {
std::pair<status, size_t> calculate_partition(takatori::plan::step const& s) noexcept {
auto& process = unsafe_downcast<takatori::plan::process>(s);
auto partition = global::config_pool()->default_partitions();
if (!process.downstreams().empty()) {
Expand All @@ -1222,11 +1239,9 @@ size_t calculate_partition(takatori::plan::step const& s) noexcept {
partition = intermediate_calculate_partition(s);
}
if (partition > global::config_pool()->max_result_set_writers()) {
LOG_LP(ERROR) << "The result calculated by calculate_partition(" << partition
<< ") exceeded max_result_set_writers("
<< global::config_pool()->max_result_set_writers() << ")";
return {status::err_compiler_error, partition};
}
return partition;
return {status::ok, partition};
}

} // namespace impl
Expand Down
9 changes: 5 additions & 4 deletions src/jogasaki/plan/compiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ void preprocess(
std::shared_ptr<mirror_container> const& container
);

[[nodiscard]] std::shared_ptr<mirror_container> preprocess_mirror(
[[nodiscard]] std::pair<status, std::shared_ptr<mirror_container>> preprocess_mirror(
maybe_shared_ptr<takatori::statement::statement> const& statement,
std::shared_ptr<::yugawara::variable::configurable_provider> const& provider,
compiled_info info
compiled_info info,
compiler_context &ctx
);

[[nodiscard]] executor::process::step create(
Expand Down Expand Up @@ -99,9 +100,9 @@ std::shared_ptr<executor::process::impl::variable_table> create_host_variables(
/**
* @brief calculate partition
* @param s the plan of step
* @return size of the partition
* @return a pair containing the status of the calculation and the computed partition size.
*/
[[nodiscard]] size_t calculate_partition(takatori::plan::step const& s) noexcept;
[[nodiscard]] std::pair<status, size_t> calculate_partition(takatori::plan::step const& s) noexcept;

/**
* @brief determine whether to stop calculating the partition.
Expand Down
102 changes: 68 additions & 34 deletions test/jogasaki/plan/partition_calculation_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ TEST_F(partition_calculation_test, simple_query_rtx) {
EXPECT_TRUE(impl::has_emit_operator(p0));
EXPECT_EQ(parallel, impl::terminal_calculate_partition(p0));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(parallel, impl::calculate_partition(p0));
auto [ignore, size_result] = impl::calculate_partition(p0);
EXPECT_EQ(parallel, size_result);
}
TEST_F(partition_calculation_test, simple_query_no_rtx) {
std::string sql = "select * from T0";
Expand All @@ -206,7 +207,8 @@ TEST_F(partition_calculation_test, simple_query_no_rtx) {
EXPECT_TRUE(impl::has_emit_operator(p0));
EXPECT_EQ(1, impl::terminal_calculate_partition(p0));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(1, impl::calculate_partition(p0));
auto [ignore, size_result] = impl::calculate_partition(p0);
EXPECT_EQ(1, size_result);
}

TEST_F(partition_calculation_test, simple_query2_rtx) {
Expand All @@ -229,7 +231,8 @@ TEST_F(partition_calculation_test, simple_query2_rtx) {
EXPECT_TRUE(impl::has_emit_operator(p0));
EXPECT_EQ(parallel, impl::terminal_calculate_partition(p0));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(parallel, impl::calculate_partition(p0));
auto [ignore, size_result] = impl::calculate_partition(p0);
EXPECT_EQ(parallel, size_result);
}
TEST_F(partition_calculation_test, simple_query2_no_rtx) {
std::string sql = "select * from T0 where C1 = 1.0;";
Expand All @@ -251,7 +254,8 @@ TEST_F(partition_calculation_test, simple_query2_no_rtx) {
EXPECT_TRUE(impl::has_emit_operator(p0));
EXPECT_EQ(1, impl::terminal_calculate_partition(p0));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(1, impl::calculate_partition(p0));
auto [ignore, size_result] = impl::calculate_partition(p0);
EXPECT_EQ(1, size_result);
}

TEST_F(partition_calculation_test, project_filter_rtx) {
Expand Down Expand Up @@ -306,10 +310,14 @@ TEST_F(partition_calculation_test, project_filter_rtx) {
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p2));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p3));
EXPECT_EQ(parallel, impl::calculate_partition(p0));
EXPECT_EQ(parallel, impl::calculate_partition(p1));
EXPECT_EQ(parallel, impl::calculate_partition(p2));
EXPECT_EQ(parallel, impl::calculate_partition(p3));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
auto [ignore2, size_result2] = impl::calculate_partition(p2);
auto [ignore3, size_result3] = impl::calculate_partition(p3);
EXPECT_EQ(parallel, size_result0);
EXPECT_EQ(parallel, size_result1);
EXPECT_EQ(parallel, size_result2);
EXPECT_EQ(parallel, size_result3);
}
TEST_F(partition_calculation_test, project_filter_no_rtx) {
std::string sql = "select C1+C0, C0, C1 from T0 where C1=1.0";
Expand Down Expand Up @@ -363,10 +371,14 @@ TEST_F(partition_calculation_test, project_filter_no_rtx) {
EXPECT_EQ(1, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p2));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p3));
EXPECT_EQ(1, impl::calculate_partition(p0));
EXPECT_EQ(1, impl::calculate_partition(p1));
EXPECT_EQ(1, impl::calculate_partition(p2));
EXPECT_EQ(1, impl::calculate_partition(p3));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
auto [ignore2, size_result2] = impl::calculate_partition(p2);
auto [ignore3, size_result3] = impl::calculate_partition(p3);
EXPECT_EQ(1, size_result0);
EXPECT_EQ(1, size_result1);
EXPECT_EQ(1, size_result2);
EXPECT_EQ(1, size_result3);
}

TEST_F(partition_calculation_test, left_outer_join_rtx) {
Expand Down Expand Up @@ -401,8 +413,10 @@ TEST_F(partition_calculation_test, left_outer_join_rtx) {
EXPECT_EQ(parallel, impl::terminal_calculate_partition(p1));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p1));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
EXPECT_EQ(partition, size_result0);
EXPECT_EQ(partition, size_result1);
}
auto& b2 = next_top(c.execution_plan(), b);
auto&& graph2 = takatori::util::downcast<takatori::plan::process>(b2).operators();
Expand All @@ -419,8 +433,10 @@ TEST_F(partition_calculation_test, left_outer_join_rtx) {
EXPECT_EQ(parallel, impl::terminal_calculate_partition(p1));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p1));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
EXPECT_EQ(partition, size_result0);
EXPECT_EQ(partition, size_result1);
}
auto& grp1 = b.downstreams()[0];
auto& grp2 = b2.downstreams()[0];
Expand Down Expand Up @@ -457,10 +473,14 @@ TEST_F(partition_calculation_test, left_outer_join_rtx) {
EXPECT_EQ(partition, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::intermediate_calculate_partition(p2));
EXPECT_EQ(partition, impl::intermediate_calculate_partition(p3));
EXPECT_EQ(partition, impl::calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p2));
EXPECT_EQ(partition, impl::calculate_partition(p3));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
auto [ignore2, size_result2] = impl::calculate_partition(p2);
auto [ignore3, size_result3] = impl::calculate_partition(p3);
EXPECT_EQ(partition, size_result0);
EXPECT_EQ(partition, size_result1);
EXPECT_EQ(partition, size_result2);
EXPECT_EQ(partition, size_result3);
}
}

Expand Down Expand Up @@ -496,8 +516,10 @@ TEST_F(partition_calculation_test, left_outer_join_no_rtx) {
EXPECT_EQ(1, impl::terminal_calculate_partition(p1));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p1));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
EXPECT_EQ(partition, size_result0);
EXPECT_EQ(partition, size_result1);
}
auto& b2 = next_top(c.execution_plan(), b);
auto&& graph2 = takatori::util::downcast<takatori::plan::process>(b2).operators();
Expand All @@ -514,8 +536,10 @@ TEST_F(partition_calculation_test, left_outer_join_no_rtx) {
EXPECT_EQ(1, impl::terminal_calculate_partition(p1));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p1));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
EXPECT_EQ(partition, size_result0);
EXPECT_EQ(partition, size_result1);
}
auto& grp1 = b.downstreams()[0];
auto& grp2 = b2.downstreams()[0];
Expand Down Expand Up @@ -552,10 +576,14 @@ TEST_F(partition_calculation_test, left_outer_join_no_rtx) {
EXPECT_EQ(partition, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::intermediate_calculate_partition(p2));
EXPECT_EQ(partition, impl::intermediate_calculate_partition(p3));
EXPECT_EQ(partition, impl::calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p2));
EXPECT_EQ(partition, impl::calculate_partition(p3));
auto [ignore, size_result0] = impl::calculate_partition(p0);
auto [ignore1, size_result1] = impl::calculate_partition(p1);
auto [ignore2, size_result2] = impl::calculate_partition(p2);
auto [ignore3, size_result3] = impl::calculate_partition(p3);
EXPECT_EQ(partition, size_result0);
EXPECT_EQ(partition, size_result1);
EXPECT_EQ(partition, size_result2);
EXPECT_EQ(partition, size_result3);
}
}

Expand All @@ -579,12 +607,14 @@ TEST_F(partition_calculation_test, union_all_rtx) {
EXPECT_FALSE(impl::has_emit_operator(p0));
EXPECT_EQ(parallel, impl::terminal_calculate_partition(p0));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p0));
auto [ignore0, size_result0] = impl::calculate_partition(p0);
EXPECT_EQ(partition, size_result0);
auto& p1 = next_top(c.execution_plan(), p0);
EXPECT_FALSE(impl::has_emit_operator(p1));
EXPECT_EQ(parallel, impl::terminal_calculate_partition(p1));
EXPECT_EQ(parallel, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p1));
auto [ignore1, size_result1] = impl::calculate_partition(p1);
EXPECT_EQ(partition, size_result1);

auto& grp1 = p0.downstreams()[0];
auto& b3 = grp1.downstreams()[0];
Expand All @@ -594,7 +624,8 @@ TEST_F(partition_calculation_test, union_all_rtx) {
EXPECT_TRUE(impl::has_emit_operator(p2));
EXPECT_EQ(partition, impl::terminal_calculate_partition(p2));
EXPECT_EQ(parallel * 2, impl::intermediate_calculate_partition(p2));
EXPECT_EQ(parallel * 2, impl::calculate_partition(p2));
auto [ignore2, size_result2] = impl::calculate_partition(p2);
EXPECT_EQ(parallel * 2, size_result2);
}

TEST_F(partition_calculation_test, union_all_no_rtx) {
Expand All @@ -617,12 +648,14 @@ TEST_F(partition_calculation_test, union_all_no_rtx) {
EXPECT_FALSE(impl::has_emit_operator(p0));
EXPECT_EQ(1, impl::terminal_calculate_partition(p0));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p0));
EXPECT_EQ(partition, impl::calculate_partition(p0));
auto [ignore0, size_result0] = impl::calculate_partition(p0);
EXPECT_EQ(partition, size_result0);
auto& p1 = next_top(c.execution_plan(), p0);
EXPECT_FALSE(impl::has_emit_operator(p1));
EXPECT_EQ(1, impl::terminal_calculate_partition(p1));
EXPECT_EQ(1, impl::intermediate_calculate_partition(p1));
EXPECT_EQ(partition, impl::calculate_partition(p1));
auto [ignore1, size_result1] = impl::calculate_partition(p1);
EXPECT_EQ(partition, size_result1);

auto& grp1 = p0.downstreams()[0];
auto& b3 = grp1.downstreams()[0];
Expand All @@ -632,7 +665,8 @@ TEST_F(partition_calculation_test, union_all_no_rtx) {
EXPECT_TRUE(impl::has_emit_operator(p2));
EXPECT_EQ(partition, impl::terminal_calculate_partition(p2));
EXPECT_EQ(2, impl::intermediate_calculate_partition(p2));
EXPECT_EQ(2, impl::calculate_partition(p2));
auto [ignore2, size_result2] = impl::calculate_partition(p2);
EXPECT_EQ(2, size_result2);
}

} // namespace jogasaki::plan

0 comments on commit ecc61e7

Please sign in to comment.