From 8115ade2b2eae7a4bcae4669ffc5e06bbb7abfb3 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Sun, 15 Sep 2024 14:00:08 +0100 Subject: [PATCH] Fix race in async_scope In the original logic, when the future receiver completion is called, `__save_completion` and `__dispatch_result` both acquire and release the mutex. If the sequence is * Thread 1 calls `__save_completion`, sets `__data_`, then returns successfully. * Thread 2 calls the `__future` destructor, which acquires the mutex and sees that `__data_` is set, so it returns early, and since it still owns `__state_`, the future state is destroyed. * Thread 1 calls `__dispatch_result`, and attempts to access now freed memory. It's not clear why `__save_completion`/`__dispatch_result` acquire and release the mutex separately, so I combined the calls to both be done under the mutex. Additionally, there appeared to be a bug when setting the value/error into `__data_` raised an exception where `__dispatch_result` was never invoked, leading to the program to never complete. I added a test that waits indefinitely without this fix. Fixes #1413 --- include/exec/async_scope.hpp | 29 ++++++++++----------- test/exec/async_scope/test_spawn_future.cpp | 22 ++++++++++++++++ 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/include/exec/async_scope.hpp b/include/exec/async_scope.hpp index 031493fba..8404fab2d 100644 --- a/include/exec/async_scope.hpp +++ b/include/exec/async_scope.hpp @@ -522,9 +522,8 @@ namespace exec { __future_state_base<_Completions, _Env>* __state_; const __impl* __scope_; - void __dispatch_result_() noexcept { + void __dispatch_result_(std::unique_lock& __guard) noexcept { auto& __state = *__state_; - std::unique_lock __guard{__state.__mutex_}; auto __local_subscribers = std::move(__state.__subscribers_); __state.__forward_scope_ = std::nullopt; if (__state.__no_future_.get() != nullptr) { @@ -543,38 +542,38 @@ namespace exec { } template - bool __save_completion(_Tag, _As&&... __as) noexcept { + void __save_completion(_Tag, _As&&... __as) noexcept { auto& __state = *__state_; try { - std::unique_lock __guard{__state.__mutex_}; using _Tuple = __decayed_std_tuple<_Tag, _As...>; __state.__data_.template emplace<_Tuple>(_Tag(), static_cast<_As&&>(__as)...); - return true; } catch (...) { using _Tuple = std::tuple; __state.__data_.template emplace<_Tuple>(set_error_t(), std::current_exception()); } - return false; } template <__movable_value... _As> void set_value(_As&&... __as) noexcept { - if (__save_completion(set_value_t(), static_cast<_As&&>(__as)...)) { - __dispatch_result_(); - } + auto& __state = *__state_; + std::unique_lock __guard{__state.__mutex_}; + __save_completion(set_value_t(), static_cast<_As&&>(__as)...); + __dispatch_result_(__guard); } template <__movable_value _Error> void set_error(_Error&& __err) noexcept { - if (__save_completion(set_error_t(), static_cast<_Error&&>(__err))) { - __dispatch_result_(); - } + auto& __state = *__state_; + std::unique_lock __guard{__state.__mutex_}; + __save_completion(set_error_t(), static_cast<_Error&&>(__err)); + __dispatch_result_(__guard); } void set_stopped() noexcept { - if (__save_completion(set_stopped_t())) { - __dispatch_result_(); - } + auto& __state = *__state_; + std::unique_lock __guard{__state.__mutex_}; + __save_completion(set_stopped_t()); + __dispatch_result_(__guard); } auto get_env() const noexcept -> const __env_t<_Env>& { diff --git a/test/exec/async_scope/test_spawn_future.cpp b/test/exec/async_scope/test_spawn_future.cpp index 456b2220e..51f3da10f 100644 --- a/test/exec/async_scope/test_spawn_future.cpp +++ b/test/exec/async_scope/test_spawn_future.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "test_common/schedulers.hpp" #include "test_common/receivers.hpp" #include "test_common/type_helpers.hpp" @@ -134,6 +135,27 @@ namespace { expect_empty(scope); } + TEST_CASE("spawn_future with throwing copy", "[async_scope][spawn_future]") { + async_scope scope; + exec::static_thread_pool pool{2}; + + struct throwing_copy { + throwing_copy() = default; + throwing_copy(const throwing_copy&) { + throw std::logic_error("cannot copy"); + } + }; + + ex::sender auto snd = scope.spawn_future(ex::on(pool.get_scheduler(), ex::just(throwing_copy()))); + try { + sync_wait(std::move(snd)); + FAIL("Exceptions should have been thrown"); + } catch (const std::logic_error& e) { + SUCCEED("correct exception caught"); + } + sync_wait(scope.on_empty()); + } + TEST_CASE( "spawn_future returned sender can be connected but not started", "[async_scope][spawn_future]") {