Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small subresults to union #1735

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
49 changes: 49 additions & 0 deletions src/engine/QueryPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,13 @@
ParsedQuery::GraphPattern* rootPattern) {
QueryPlanner::GraphPatternPlanner optimizer{*this, rootPattern};
for (auto& child : rootPattern->_graphPatterns) {
optimizer.candidatesForUnion.push_back(child);
child.visit([&optimizer](auto& arg) {
return optimizer.graphPatternOperationVisitor(arg);
});
checkCancellation();
}

// one last pass in case the last one was not an optional
// if the last child was not an optional clause we still have unjoined
// candidates. Do one last pass over them.
Expand Down Expand Up @@ -2632,15 +2634,62 @@
}
// _______________________________________________________________

namespace {
template <typename T>
auto is(const auto& gp) {
return std::holds_alternative<T>(gp);
}
} // namespace
namespace pq = parsedQuery;
// _______________________________________________________________
void QueryPlanner::GraphPatternPlanner::optimizeCommutatively() {
auto isOptimizationBarrier = [](const parsedQuery::GraphPatternOperation& g) {
return is<pq::Optional>(g) || is<pq::Minus>(g) || is<pq::Bind>(g);
};
std::erase_if(candidatesForUnion, isOptimizationBarrier);
auto isUnion = [](const parsedQuery::GraphPatternOperation& gp) {
return is<parsedQuery::Union>(gp);
};
ql::ranges::sort(candidatesForUnion, {}, isUnion);
auto beg = ql::ranges::lower_bound(candidatesForUnion, true,
ql::ranges::less{}, isUnion);
AD_CORRECTNESS_CHECK(beg > candidatesForUnion.begin() ||
ql::ranges::all_of(candidatesForUnion, isUnion));
size_t numUnions = candidatesForUnion.end() - beg;
size_t numNonUnions = candidatesForUnion.size() - numUnions;

auto tg = planner_.createTripleGraph(&candidateTriples_);
auto lastRow = planner_
.fillDpTab(tg, rootPattern_->_filters,
rootPattern_->textLimits_, candidatePlans_)
.back();
candidateTriples_._triples.clear();
candidatePlans_.clear();
if (numUnions == 1 && numNonUnions > 0) {
LOG(INFO) << "Recursing for union optimization" << std::endl;
auto parsedUnion =
std::move(std::get<parsedQuery::Union>(candidatesForUnion.back()));
candidatesForUnion.pop_back();

Check warning on line 2672 in src/engine/QueryPlanner.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/QueryPlanner.cpp#L2669-L2672

Added lines #L2669 - L2672 were not covered by tests
for (auto& op : candidatesForUnion) {
parsedUnion._child1._graphPatterns.push_back(op);
parsedUnion._child2._graphPatterns.push_back(op);
}
ql::ranges::copy(rootPattern_->_filters,
std::back_inserter(parsedUnion._child1._filters));
ql::ranges::copy(rootPattern_->_filters,
std::back_inserter(parsedUnion._child2._filters));

Check warning on line 2680 in src/engine/QueryPlanner.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/QueryPlanner.cpp#L2674-L2680

Added lines #L2674 - L2680 were not covered by tests

candidatesForUnion.clear();
visitUnion(parsedUnion);
planner_.checkCancellation();
AD_CORRECTNESS_CHECK(candidatePlans_.size() == 1);

Check warning on line 2685 in src/engine/QueryPlanner.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/QueryPlanner.cpp#L2682-L2685

Added lines #L2682 - L2685 were not covered by tests
if (RuntimeParameters().get<"always-multiply-unions">()) {
lastRow.clear();
}
ql::ranges::move(candidatePlans_.back(), std::back_inserter(lastRow));
candidatePlans_.clear();
}

Check warning on line 2691 in src/engine/QueryPlanner.cpp

View check run for this annotation

Codecov / codecov/patch

src/engine/QueryPlanner.cpp#L2687-L2691

Added lines #L2687 - L2691 were not covered by tests
candidatesForUnion.clear();
candidatePlans_.push_back(std::move(lastRow));
planner_.checkCancellation();
}
Expand Down
3 changes: 3 additions & 0 deletions src/engine/QueryPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ class QueryPlanner {
// pattern, and plans from different rows can be joined in an arbitrary
// order.
std::vector<std::vector<SubtreePlan>> candidatePlans_{};
// TODO<joka921> Comment
std::vector<parsedQuery::GraphPatternOperation> candidatesForUnion{};
std::vector<SparqlFilter> filtersForUnion{};

// Triples from BasicGraphPatterns that can be joined arbitrarily
// with each other and with the contents of `candidatePlans_`
Expand Down
1 change: 1 addition & 0 deletions src/global/RuntimeParameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ inline auto& RuntimeParameters() {
// Control up until which size lazy results should be cached. Caching
// does cause significant overhead for this case.
MemorySizeParameter<"lazy-result-max-cache-size">{5_MB},
Bool<"always-multiply-unions">{false},
Bool<"websocket-updates-enabled">{true},
// When the result of an index scan is smaller than a single block, then
// its size estimate will be the size of the block divided by this
Expand Down
1 change: 1 addition & 0 deletions src/util/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ HttpOrHttpsResponse HttpClientImpl<StreamType>::sendRequest(
request.set(http::field::accept, acceptHeader);
request.set(http::field::content_type, contentTypeHeader);
request.set(http::field::content_length, std::to_string(requestBody.size()));
request.set("Query-Id", "1234069183");
request.body() = requestBody;

auto wait = [&client, &handle]<typename T>(
Expand Down
1 change: 1 addition & 0 deletions src/util/http/websocket/WebSocketSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ net::awaitable<void> WebSocketSession::acceptAndWait(
// Experimental operators, see
// https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/overview/composition/cpp20_coroutines.html
// for more information
// TODO<joka921> Debug the aborts in the websocket module...
co_await (waitForServerEvents() && handleClientCommands());
} catch (boost::system::system_error& error) {
if (cancelOnClose_) {
Expand Down
Loading