Skip to content

Commit

Permalink
Merge pull request #1414 from ccotter/async-scope-race
Browse files Browse the repository at this point in the history
Fix race in async_scope
  • Loading branch information
ericniebler authored Sep 16, 2024
2 parents cddcd5a + 8115ade commit 2de858b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
29 changes: 14 additions & 15 deletions include/exec/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,8 @@ namespace exec {
__future_state_base<_Completions, _Env>* __state_;
const __impl* __scope_;

void __dispatch_result_() noexcept {
void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept {
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
auto __local_subscribers = std::move(__state.__subscribers_);
__state.__forward_scope_ = std::nullopt;
if (__state.__no_future_.get() != nullptr) {
Expand All @@ -543,38 +542,38 @@ namespace exec {
}

template <class _Tag, class... _As>
bool __save_completion(_Tag, _As&&... __as) noexcept {
void __save_completion(_Tag, _As&&... __as) noexcept {
auto& __state = *__state_;
try {
std::unique_lock __guard{__state.__mutex_};
using _Tuple = __decayed_std_tuple<_Tag, _As...>;
__state.__data_.template emplace<_Tuple>(_Tag(), static_cast<_As&&>(__as)...);
return true;
} catch (...) {
using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
__state.__data_.template emplace<_Tuple>(set_error_t(), std::current_exception());
}
return false;
}

template <__movable_value... _As>
void set_value(_As&&... __as) noexcept {
if (__save_completion(set_value_t(), static_cast<_As&&>(__as)...)) {
__dispatch_result_();
}
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
__save_completion(set_value_t(), static_cast<_As&&>(__as)...);
__dispatch_result_(__guard);
}

template <__movable_value _Error>
void set_error(_Error&& __err) noexcept {
if (__save_completion(set_error_t(), static_cast<_Error&&>(__err))) {
__dispatch_result_();
}
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
__save_completion(set_error_t(), static_cast<_Error&&>(__err));
__dispatch_result_(__guard);
}

void set_stopped() noexcept {
if (__save_completion(set_stopped_t())) {
__dispatch_result_();
}
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
__save_completion(set_stopped_t());
__dispatch_result_(__guard);
}

auto get_env() const noexcept -> const __env_t<_Env>& {
Expand Down
22 changes: 22 additions & 0 deletions test/exec/async_scope/test_spawn_future.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <catch2/catch.hpp>
#include <exec/async_scope.hpp>
#include <exec/env.hpp>
#include <exec/static_thread_pool.hpp>
#include "test_common/schedulers.hpp"
#include "test_common/receivers.hpp"
#include "test_common/type_helpers.hpp"
Expand Down Expand Up @@ -134,6 +135,27 @@ namespace {
expect_empty(scope);
}

TEST_CASE("spawn_future with throwing copy", "[async_scope][spawn_future]") {
async_scope scope;
exec::static_thread_pool pool{2};

struct throwing_copy {
throwing_copy() = default;
throwing_copy(const throwing_copy&) {
throw std::logic_error("cannot copy");
}
};

ex::sender auto snd = scope.spawn_future(ex::on(pool.get_scheduler(), ex::just(throwing_copy())));
try {
sync_wait(std::move(snd));
FAIL("Exceptions should have been thrown");
} catch (const std::logic_error& e) {
SUCCEED("correct exception caught");
}
sync_wait(scope.on_empty());
}

TEST_CASE(
"spawn_future returned sender can be connected but not started",
"[async_scope][spawn_future]") {
Expand Down

0 comments on commit 2de858b

Please sign in to comment.