From e2061e372dd5400e718814159d49cd0da7f91109 Mon Sep 17 00:00:00 2001 From: Johannes Kalmbach Date: Fri, 17 Nov 2023 02:43:49 +0100 Subject: [PATCH] Fix bugs regarding query cancellation and server threads (#1149) 1. Fix a bug in the recursive query cancellation (introduced by #1125), which led to enormous runtimes for queries with many operations, like https://qlever.cs.uni-freiburg.de/wikidata/yI0y30 or https://qlever.cs.uni-freiburg.de/osm-planet/8Cn1w5 . 2. Fix a bug in the management of multiple server threads (introduced by #1103), which led to the server effectively not being multi-threaded anymore. 3. Make the thread sanitizer use Clang 17 (with CLang 16 there was a bug when a strand was resumed on another thread). NOTE: The combination of 1+2 made QLever very unstable, since any query with many operations would stall the server. As a side effect, our proxy web server (which currently has a large timeout) was also affected because of unprocessed requests piling up. This was a nerve-racking but very interesting lesson in many respects. --- .github/workflows/native-build.yml | 2 +- src/engine/Operation.cpp | 5 ++--- src/util/AsioHelpers.h | 12 ++++++++---- src/util/http/HttpServer.h | 17 ++++++++--------- test/HttpTest.cpp | 30 +++++++++++++++++++++++------- 5 files changed, 42 insertions(+), 24 deletions(-) diff --git a/.github/workflows/native-build.yml b/.github/workflows/native-build.yml index 7d35ff24a8..ab2cf89888 100644 --- a/.github/workflows/native-build.yml +++ b/.github/workflows/native-build.yml @@ -48,7 +48,7 @@ jobs: build-type: Release expensive-tests: false - compiler: clang - compiler-version: 16 + compiler-version: 17 build-type: Debug expensive-tests: false ubsan-flags: " -fsanitize=thread -O1 -g" diff --git a/src/engine/Operation.cpp b/src/engine/Operation.cpp index fe2e36a294..a25b7c1c36 100644 --- a/src/engine/Operation.cpp +++ b/src/engine/Operation.cpp @@ -54,8 +54,7 @@ void Operation::recursivelySetCancellationHandle( SharedCancellationHandle cancellationHandle) { AD_CORRECTNESS_CHECK(cancellationHandle); forAllDescendants([&cancellationHandle](auto child) { - child->getRootOperation()->recursivelySetCancellationHandle( - cancellationHandle); + child->getRootOperation()->cancellationHandle_ = cancellationHandle; }); cancellationHandle_ = std::move(cancellationHandle); } @@ -66,7 +65,7 @@ void Operation::recursivelySetTimeConstraint( std::chrono::steady_clock::time_point deadline) { deadline_ = deadline; forAllDescendants([deadline](auto child) { - child->getRootOperation()->recursivelySetTimeConstraint(deadline); + child->getRootOperation()->deadline_ = deadline; }); } diff --git a/src/util/AsioHelpers.h b/src/util/AsioHelpers.h index dd4c286c90..6b61192eef 100644 --- a/src/util/AsioHelpers.h +++ b/src/util/AsioHelpers.h @@ -6,7 +6,7 @@ #define QLEVER_ASIOHELPERS_H #include -#include +#include #include #include "util/Exception.h" @@ -19,12 +19,16 @@ namespace net = boost::asio; /// this coroutine was co_spawned on. /// IMPORTANT: If the coroutine is cancelled, no guarantees are given. Make /// sure to keep that in mind when handling cancellation errors! +// TODO When using `net::dispatch()` instead of `net::post()` +// and the `awaitable` itself dispatches to a strand, then this strand is not +// left in all cases when leaving this function. Further investigate whether we +// lack understanding here or whether this is a bug in `Boost::ASIO`. template inline net::awaitable resumeOnOriginalExecutor(net::awaitable awaitable) { std::exception_ptr exceptionPtr; try { T result = co_await std::move(awaitable); - co_await net::dispatch(net::use_awaitable); + co_await net::post(net::use_awaitable); co_return result; } catch (...) { exceptionPtr = std::current_exception(); @@ -33,7 +37,7 @@ inline net::awaitable resumeOnOriginalExecutor(net::awaitable awaitable) { if (cancellationState.cancelled() == net::cancellation_type::none) { // use_awaitable always resumes the coroutine on the executor the coroutine // was co_spawned on - co_await net::dispatch(net::use_awaitable); + co_await net::post(net::use_awaitable); } AD_CORRECTNESS_CHECK(exceptionPtr); std::rethrow_exception(exceptionPtr); @@ -55,7 +59,7 @@ inline net::awaitable resumeOnOriginalExecutor( net::cancellation_type::none) { // use_awaitable always resumes the coroutine on the executor the coroutine // was co_spawned on - co_await net::dispatch(net::use_awaitable); + co_await net::post(net::use_awaitable); } if (exceptionPtr) { std::rethrow_exception(exceptionPtr); diff --git a/src/util/http/HttpServer.h b/src/util/http/HttpServer.h index a2e65793c5..0510cde87a 100644 --- a/src/util/http/HttpServer.h +++ b/src/util/http/HttpServer.h @@ -161,16 +161,15 @@ class HttpServer { try { // Wait for a request (the code only continues after we have received // and accepted a request). - // Note: Although a single session is conceptually single-threaded, we - // still have to manually schedule it on the `coroExecutor` to make the - // thread sanitizer happy. The reason is, that the thread of execution - // specified by the coroutine might change threads as the coroutine is - // suspended and then resumed. - auto coroExecutor = co_await net::this_coro::executor; - auto socket = co_await acceptor_.async_accept( - coroExecutor, boost::asio::use_awaitable); + + // Although a coroutine is conceptually single-threaded we still + // schedule onto an explicit strand because the Websocket implementation + // expects a strand. + auto strand = net::make_strand(ioContext_); + auto socket = + co_await acceptor_.async_accept(strand, boost::asio::use_awaitable); // Schedule the session such that it may run in parallel to this loop. - net::co_spawn(coroExecutor, session(std::move(socket)), net::detached); + net::co_spawn(strand, session(std::move(socket)), net::detached); } catch (const boost::system::system_error& b) { // If the server is shut down this will cause operations to abort. // This will most likely only happen in tests, but could also occur diff --git a/test/HttpTest.cpp b/test/HttpTest.cpp index 9e28886eaf..20edd00b53 100644 --- a/test/HttpTest.cpp +++ b/test/HttpTest.cpp @@ -11,6 +11,7 @@ #include "util/http/HttpClient.h" #include "util/http/HttpServer.h" #include "util/http/HttpUtils.h" +#include "util/jthread.h" using namespace ad_utility::httpUtils; using namespace boost::beast::http; @@ -40,14 +41,29 @@ TEST(HttpServer, HttpTest) { httpServer.runInOwnThread(); // Create a client, and send a GET and a POST request in one session. + // The constants in those test loops can be increased to find threading issues + // using the thread sanitizer. However, these constants can't be higher by + // default because the checks on GitHub actions will run forwever if they are. { - HttpClient httpClient("localhost", std::to_string(httpServer.getPort())); - ASSERT_EQ(httpClient.sendRequest(verb::get, "localhost", "target1").str(), - "GET\ntarget1\n"); - ASSERT_EQ( - httpClient.sendRequest(verb::post, "localhost", "target1", "body1") - .str(), - "POST\ntarget1\nbody1"); + std::vector threads; + for (size_t i = 0; i < 2; ++i) { + threads.emplace_back([&]() { + for (size_t j = 0; j < 5; ++j) { + { + HttpClient httpClient("localhost", + std::to_string(httpServer.getPort())); + ASSERT_EQ( + httpClient.sendRequest(verb::get, "localhost", "target1").str(), + "GET\ntarget1\n"); + ASSERT_EQ( + httpClient + .sendRequest(verb::post, "localhost", "target1", "body1") + .str(), + "POST\ntarget1\nbody1"); + } + } + }); + } } // Do the same thing in a second session (to check if everything is still fine