Skip to content

Commit

Permalink
Fix race in async_scope
Browse files Browse the repository at this point in the history
In the original logic, when the future receiver completion is called,
`__save_completion` and `__dispatch_result` both acquire and release
the mutex. If the sequence is

 * Thread 1 calls `__save_completion`, sets `__data_`, then returns
   successfully.
 * Thread 2 calls the `__future` destructor, which acquires the mutex
   and sees that `__data_` is set, so it returns early, and since it
   still owns `__state_`, the future state is destroyed.
 * Thread 1 calls `__dispatch_result`, and attempts to access now
   freed memory.

It's not clear why `__save_completion`/`__dispatch_result` acquire and
release the mutex separately, so I combined the calls to both be done
under the mutex.

Additionally, there appeared to be a bug when setting the value/error into
`__data_` raised an exception where `__dispatch_result` was never invoked,
leading to the program to never complete. I added a test that waits
indefinitely without this fix.

Fixes #1413
  • Loading branch information
ccotter committed Sep 15, 2024
1 parent cddcd5a commit 8115ade
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
29 changes: 14 additions & 15 deletions include/exec/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,8 @@ namespace exec {
__future_state_base<_Completions, _Env>* __state_;
const __impl* __scope_;

void __dispatch_result_() noexcept {
void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept {
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
auto __local_subscribers = std::move(__state.__subscribers_);
__state.__forward_scope_ = std::nullopt;
if (__state.__no_future_.get() != nullptr) {
Expand All @@ -543,38 +542,38 @@ namespace exec {
}

template <class _Tag, class... _As>
bool __save_completion(_Tag, _As&&... __as) noexcept {
void __save_completion(_Tag, _As&&... __as) noexcept {
auto& __state = *__state_;
try {
std::unique_lock __guard{__state.__mutex_};
using _Tuple = __decayed_std_tuple<_Tag, _As...>;
__state.__data_.template emplace<_Tuple>(_Tag(), static_cast<_As&&>(__as)...);
return true;
} catch (...) {
using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
__state.__data_.template emplace<_Tuple>(set_error_t(), std::current_exception());
}
return false;
}

template <__movable_value... _As>
void set_value(_As&&... __as) noexcept {
if (__save_completion(set_value_t(), static_cast<_As&&>(__as)...)) {
__dispatch_result_();
}
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
__save_completion(set_value_t(), static_cast<_As&&>(__as)...);
__dispatch_result_(__guard);
}

template <__movable_value _Error>
void set_error(_Error&& __err) noexcept {
if (__save_completion(set_error_t(), static_cast<_Error&&>(__err))) {
__dispatch_result_();
}
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
__save_completion(set_error_t(), static_cast<_Error&&>(__err));
__dispatch_result_(__guard);
}

void set_stopped() noexcept {
if (__save_completion(set_stopped_t())) {
__dispatch_result_();
}
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
__save_completion(set_stopped_t());
__dispatch_result_(__guard);
}

auto get_env() const noexcept -> const __env_t<_Env>& {
Expand Down
22 changes: 22 additions & 0 deletions test/exec/async_scope/test_spawn_future.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <catch2/catch.hpp>
#include <exec/async_scope.hpp>
#include <exec/env.hpp>
#include <exec/static_thread_pool.hpp>
#include "test_common/schedulers.hpp"
#include "test_common/receivers.hpp"
#include "test_common/type_helpers.hpp"
Expand Down Expand Up @@ -134,6 +135,27 @@ namespace {
expect_empty(scope);
}

TEST_CASE("spawn_future with throwing copy", "[async_scope][spawn_future]") {
async_scope scope;
exec::static_thread_pool pool{2};

struct throwing_copy {
throwing_copy() = default;
throwing_copy(const throwing_copy&) {
throw std::logic_error("cannot copy");
}
};

ex::sender auto snd = scope.spawn_future(ex::on(pool.get_scheduler(), ex::just(throwing_copy())));
try {
sync_wait(std::move(snd));
FAIL("Exceptions should have been thrown");
} catch (const std::logic_error& e) {
SUCCEED("correct exception caught");
}
sync_wait(scope.on_empty());
}

TEST_CASE(
"spawn_future returned sender can be connected but not started",
"[async_scope][spawn_future]") {
Expand Down

0 comments on commit 8115ade

Please sign in to comment.