Skip to content

Commit

Permalink
Fix bugs regarding query cancellation and server threads (#1149)
Browse files Browse the repository at this point in the history
1. Fix a bug in the recursive query cancellation (introduced by #1125), which led to enormous runtimes for queries with many operations, like https://qlever.cs.uni-freiburg.de/wikidata/yI0y30 or https://qlever.cs.uni-freiburg.de/osm-planet/8Cn1w5 .

2. Fix a bug in the management of multiple server threads (introduced by #1103), which led to the server effectively not being multi-threaded anymore.

3. Make the thread sanitizer use Clang 17 (with CLang 16 there was a bug when a strand was resumed on another thread).

NOTE: The combination of 1+2 made QLever very unstable, since any query with many operations would stall the server. As a side effect, our proxy web server (which currently has a large timeout) was also affected because of unprocessed requests piling up. This was a nerve-racking but very interesting lesson in many respects.
  • Loading branch information
joka921 authored Nov 17, 2023
1 parent ebba447 commit e2061e3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
build-type: Release
expensive-tests: false
- compiler: clang
compiler-version: 16
compiler-version: 17
build-type: Debug
expensive-tests: false
ubsan-flags: " -fsanitize=thread -O1 -g"
Expand Down
5 changes: 2 additions & 3 deletions src/engine/Operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ void Operation::recursivelySetCancellationHandle(
SharedCancellationHandle cancellationHandle) {
AD_CORRECTNESS_CHECK(cancellationHandle);
forAllDescendants([&cancellationHandle](auto child) {
child->getRootOperation()->recursivelySetCancellationHandle(
cancellationHandle);
child->getRootOperation()->cancellationHandle_ = cancellationHandle;
});
cancellationHandle_ = std::move(cancellationHandle);
}
Expand All @@ -66,7 +65,7 @@ void Operation::recursivelySetTimeConstraint(
std::chrono::steady_clock::time_point deadline) {
deadline_ = deadline;
forAllDescendants([deadline](auto child) {
child->getRootOperation()->recursivelySetTimeConstraint(deadline);
child->getRootOperation()->deadline_ = deadline;
});
}

Expand Down
12 changes: 8 additions & 4 deletions src/util/AsioHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#define QLEVER_ASIOHELPERS_H

#include <boost/asio/awaitable.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/use_awaitable.hpp>

#include "util/Exception.h"
Expand All @@ -19,12 +19,16 @@ namespace net = boost::asio;
/// this coroutine was co_spawned on.
/// IMPORTANT: If the coroutine is cancelled, no guarantees are given. Make
/// sure to keep that in mind when handling cancellation errors!
// TODO<RobinTF, joka921> When using `net::dispatch()` instead of `net::post()`
// and the `awaitable` itself dispatches to a strand, then this strand is not
// left in all cases when leaving this function. Further investigate whether we
// lack understanding here or whether this is a bug in `Boost::ASIO`.
template <typename T>
inline net::awaitable<T> resumeOnOriginalExecutor(net::awaitable<T> awaitable) {
std::exception_ptr exceptionPtr;
try {
T result = co_await std::move(awaitable);
co_await net::dispatch(net::use_awaitable);
co_await net::post(net::use_awaitable);
co_return result;
} catch (...) {
exceptionPtr = std::current_exception();
Expand All @@ -33,7 +37,7 @@ inline net::awaitable<T> resumeOnOriginalExecutor(net::awaitable<T> awaitable) {
if (cancellationState.cancelled() == net::cancellation_type::none) {
// use_awaitable always resumes the coroutine on the executor the coroutine
// was co_spawned on
co_await net::dispatch(net::use_awaitable);
co_await net::post(net::use_awaitable);
}
AD_CORRECTNESS_CHECK(exceptionPtr);
std::rethrow_exception(exceptionPtr);
Expand All @@ -55,7 +59,7 @@ inline net::awaitable<void> resumeOnOriginalExecutor(
net::cancellation_type::none) {
// use_awaitable always resumes the coroutine on the executor the coroutine
// was co_spawned on
co_await net::dispatch(net::use_awaitable);
co_await net::post(net::use_awaitable);
}
if (exceptionPtr) {
std::rethrow_exception(exceptionPtr);
Expand Down
17 changes: 8 additions & 9 deletions src/util/http/HttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,15 @@ class HttpServer {
try {
// Wait for a request (the code only continues after we have received
// and accepted a request).
// Note: Although a single session is conceptually single-threaded, we
// still have to manually schedule it on the `coroExecutor` to make the
// thread sanitizer happy. The reason is, that the thread of execution
// specified by the coroutine might change threads as the coroutine is
// suspended and then resumed.
auto coroExecutor = co_await net::this_coro::executor;
auto socket = co_await acceptor_.async_accept(
coroExecutor, boost::asio::use_awaitable);

// Although a coroutine is conceptually single-threaded we still
// schedule onto an explicit strand because the Websocket implementation
// expects a strand.
auto strand = net::make_strand(ioContext_);
auto socket =
co_await acceptor_.async_accept(strand, boost::asio::use_awaitable);
// Schedule the session such that it may run in parallel to this loop.
net::co_spawn(coroExecutor, session(std::move(socket)), net::detached);
net::co_spawn(strand, session(std::move(socket)), net::detached);
} catch (const boost::system::system_error& b) {
// If the server is shut down this will cause operations to abort.
// This will most likely only happen in tests, but could also occur
Expand Down
30 changes: 23 additions & 7 deletions test/HttpTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "util/http/HttpClient.h"
#include "util/http/HttpServer.h"
#include "util/http/HttpUtils.h"
#include "util/jthread.h"

using namespace ad_utility::httpUtils;
using namespace boost::beast::http;
Expand Down Expand Up @@ -40,14 +41,29 @@ TEST(HttpServer, HttpTest) {
httpServer.runInOwnThread();

// Create a client, and send a GET and a POST request in one session.
// The constants in those test loops can be increased to find threading issues
// using the thread sanitizer. However, these constants can't be higher by
// default because the checks on GitHub actions will run forwever if they are.
{
HttpClient httpClient("localhost", std::to_string(httpServer.getPort()));
ASSERT_EQ(httpClient.sendRequest(verb::get, "localhost", "target1").str(),
"GET\ntarget1\n");
ASSERT_EQ(
httpClient.sendRequest(verb::post, "localhost", "target1", "body1")
.str(),
"POST\ntarget1\nbody1");
std::vector<ad_utility::JThread> threads;
for (size_t i = 0; i < 2; ++i) {
threads.emplace_back([&]() {
for (size_t j = 0; j < 5; ++j) {
{
HttpClient httpClient("localhost",
std::to_string(httpServer.getPort()));
ASSERT_EQ(
httpClient.sendRequest(verb::get, "localhost", "target1").str(),
"GET\ntarget1\n");
ASSERT_EQ(
httpClient
.sendRequest(verb::post, "localhost", "target1", "body1")
.str(),
"POST\ntarget1\nbody1");
}
}
});
}
}

// Do the same thing in a second session (to check if everything is still fine
Expand Down

0 comments on commit e2061e3

Please sign in to comment.