diff --git a/src/engine/QueryPlanner.cpp b/src/engine/QueryPlanner.cpp index 43caa71f02..c99b46202f 100644 --- a/src/engine/QueryPlanner.cpp +++ b/src/engine/QueryPlanner.cpp @@ -231,11 +231,13 @@ std::vector QueryPlanner::optimize( ParsedQuery::GraphPattern* rootPattern) { QueryPlanner::GraphPatternPlanner optimizer{*this, rootPattern}; for (auto& child : rootPattern->_graphPatterns) { + optimizer.candidatesForUnion.push_back(child); child.visit([&optimizer](auto& arg) { return optimizer.graphPatternOperationVisitor(arg); }); checkCancellation(); } + // one last pass in case the last one was not an optional // if the last child was not an optional clause we still have unjoined // candidates. Do one last pass over them. @@ -2632,8 +2634,30 @@ void QueryPlanner::GraphPatternPlanner::visitSubquery( } // _______________________________________________________________ +namespace { +template +auto is(const auto& gp) { + return std::holds_alternative(gp); +} +} // namespace +namespace pq = parsedQuery; // _______________________________________________________________ void QueryPlanner::GraphPatternPlanner::optimizeCommutatively() { + auto isOptimizationBarrier = [](const parsedQuery::GraphPatternOperation& g) { + return is(g) || is(g) || is(g); + }; + std::erase_if(candidatesForUnion, isOptimizationBarrier); + auto isUnion = [](const parsedQuery::GraphPatternOperation& gp) { + return is(gp); + }; + ql::ranges::sort(candidatesForUnion, {}, isUnion); + auto beg = ql::ranges::lower_bound(candidatesForUnion, true, + ql::ranges::less{}, isUnion); + AD_CORRECTNESS_CHECK(beg > candidatesForUnion.begin() || + ql::ranges::all_of(candidatesForUnion, isUnion)); + size_t numUnions = candidatesForUnion.end() - beg; + size_t numNonUnions = candidatesForUnion.size() - numUnions; + auto tg = planner_.createTripleGraph(&candidateTriples_); auto lastRow = planner_ .fillDpTab(tg, rootPattern_->_filters, @@ -2641,6 +2665,31 @@ void QueryPlanner::GraphPatternPlanner::optimizeCommutatively() { .back(); candidateTriples_._triples.clear(); candidatePlans_.clear(); + if (numUnions == 1 && numNonUnions > 0) { + LOG(INFO) << "Recursing for union optimization" << std::endl; + auto parsedUnion = + std::move(std::get(candidatesForUnion.back())); + candidatesForUnion.pop_back(); + for (auto& op : candidatesForUnion) { + parsedUnion._child1._graphPatterns.push_back(op); + parsedUnion._child2._graphPatterns.push_back(op); + } + ql::ranges::copy(rootPattern_->_filters, + std::back_inserter(parsedUnion._child1._filters)); + ql::ranges::copy(rootPattern_->_filters, + std::back_inserter(parsedUnion._child2._filters)); + + candidatesForUnion.clear(); + visitUnion(parsedUnion); + planner_.checkCancellation(); + AD_CORRECTNESS_CHECK(candidatePlans_.size() == 1); + if (RuntimeParameters().get<"always-multiply-unions">()) { + lastRow.clear(); + } + ql::ranges::move(candidatePlans_.back(), std::back_inserter(lastRow)); + candidatePlans_.clear(); + } + candidatesForUnion.clear(); candidatePlans_.push_back(std::move(lastRow)); planner_.checkCancellation(); } diff --git a/src/engine/QueryPlanner.h b/src/engine/QueryPlanner.h index b51523baed..5730b3fea0 100644 --- a/src/engine/QueryPlanner.h +++ b/src/engine/QueryPlanner.h @@ -512,6 +512,9 @@ class QueryPlanner { // pattern, and plans from different rows can be joined in an arbitrary // order. std::vector> candidatePlans_{}; + // TODO Comment + std::vector candidatesForUnion{}; + std::vector filtersForUnion{}; // Triples from BasicGraphPatterns that can be joined arbitrarily // with each other and with the contents of `candidatePlans_` diff --git a/src/global/RuntimeParameters.h b/src/global/RuntimeParameters.h index c082b14ca4..fb56e0ee01 100644 --- a/src/global/RuntimeParameters.h +++ b/src/global/RuntimeParameters.h @@ -55,6 +55,7 @@ inline auto& RuntimeParameters() { // Control up until which size lazy results should be cached. Caching // does cause significant overhead for this case. MemorySizeParameter<"lazy-result-max-cache-size">{5_MB}, + Bool<"always-multiply-unions">{false}, Bool<"websocket-updates-enabled">{true}, // When the result of an index scan is smaller than a single block, then // its size estimate will be the size of the block divided by this diff --git a/src/util/http/HttpClient.cpp b/src/util/http/HttpClient.cpp index e09808c3c3..c95f5a19c0 100644 --- a/src/util/http/HttpClient.cpp +++ b/src/util/http/HttpClient.cpp @@ -102,6 +102,7 @@ HttpOrHttpsResponse HttpClientImpl::sendRequest( request.set(http::field::accept, acceptHeader); request.set(http::field::content_type, contentTypeHeader); request.set(http::field::content_length, std::to_string(requestBody.size())); + request.set("Query-Id", "1234069183"); request.body() = requestBody; auto wait = [&client, &handle]( diff --git a/src/util/http/websocket/WebSocketSession.cpp b/src/util/http/websocket/WebSocketSession.cpp index 75e6e4402d..dfbf675e1f 100644 --- a/src/util/http/websocket/WebSocketSession.cpp +++ b/src/util/http/websocket/WebSocketSession.cpp @@ -92,6 +92,7 @@ net::awaitable WebSocketSession::acceptAndWait( // Experimental operators, see // https://www.boost.org/doc/libs/1_81_0/doc/html/boost_asio/overview/composition/cpp20_coroutines.html // for more information + // TODO Debug the aborts in the websocket module... co_await (waitForServerEvents() && handleClientCommands()); } catch (boost::system::system_error& error) { if (cancelOnClose_) {