From d0970f3d89e797b309c539e26b7777d058c11cbe Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Wed, 18 Sep 2024 10:55:26 +0100 Subject: [PATCH] Fix async_scope cancel race Destroy the stop callback before destroying the future state during `__complete_`. --- include/exec/async_scope.hpp | 6 +- test/rrd/Makefile | 16 --- test/rrd/README.md | 8 +- test/rrd/async_scope.cpp | 182 +++++++++++++++++++++++++---------- test/rrd/split.cpp | 83 ++++++++-------- 5 files changed, 177 insertions(+), 118 deletions(-) diff --git a/include/exec/async_scope.hpp b/include/exec/async_scope.hpp index 44ed49446..fbdda6506 100644 --- a/include/exec/async_scope.hpp +++ b/include/exec/async_scope.hpp @@ -21,6 +21,7 @@ #include "env.hpp" #include +#include namespace exec { ///////////////////////////////////////////////////////////////////////////// @@ -313,6 +314,7 @@ namespace exec { void __complete_() noexcept { try { + __forward_consumer_.reset(); auto __state = std::move(__state_); STDEXEC_ASSERT(__state != nullptr); std::unique_lock __guard{__state->__mutex_}; @@ -354,7 +356,7 @@ namespace exec { _Receiver __rcvr_; std::unique_ptr<__future_state<_Sender, _Env>> __state_; STDEXEC_ATTRIBUTE((no_unique_address)) - __forward_consumer __forward_consumer_; + std::optional<__forward_consumer> __forward_consumer_; public: using __id = __future_op; @@ -383,7 +385,7 @@ namespace exec { }} , __rcvr_(static_cast<_Receiver2&&>(__rcvr)) , __state_(std::move(__state)) - , __forward_consumer_(get_stop_token(get_env(__rcvr_)), + , __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)), __forward_stopped{&__state_->__stop_source_}) { } diff --git a/test/rrd/Makefile b/test/rrd/Makefile index 347e40aeb..4bc190633 100644 --- a/test/rrd/Makefile +++ b/test/rrd/Makefile @@ -29,19 +29,6 @@ all: tests .PHONY: tests tests: $(test_exe_files) -$(build_dir)/%.check-result: $(build_dir)/% always-run - @ \ - printf '%s%s ...%s\n' $(ansi_term_bold) $(*) $(ansi_term_reset) >&2; \ - $(<); \ - status="$${?}"; \ - printf %d "$${status}" >$(@); \ - if [ "$${status}" -eq 0 ]; then \ - printf '%s%s %s%s\n' $(ansi_term_green) $(*) OK $(ansi_term_reset); \ - else \ - printf '%s%s %s%s\n' $(ansi_term_red) $(*) FAIL $(ansi_term_reset); \ - fi >&2; \ - exit "$${status}" - $(build_dir)/%: $(build_dir)/%.cpp.o $(LINK.cpp) $(^) -o $(@) @@ -53,7 +40,4 @@ $(build_dir)/%.cpp.o: %.cpp clean: rm -fr -- $(build_dir)/ -.PHONY: always-run -always-run: - -include $(o_files:=.d) diff --git a/test/rrd/README.md b/test/rrd/README.md index 1701aae49..59037e121 100644 --- a/test/rrd/README.md +++ b/test/rrd/README.md @@ -1,13 +1,13 @@ ## Relacy tests [Relacy (RRD)](https://www.1024cores.net/home/relacy-race-detector/rrd-introduction) -is a data race detector. It replaces the OS scheduler with an scheduler that +is a data race detector. It replaces the OS scheduler with a scheduler that explores many different thread interleavings, and logs detected races or assertion failures. Relacy can also simulate relaxed hardware by simulating old values of a variable as allowed by the C++11 memory model. Relacy requires a specialized build. In particular, it is a header only library that -replaces the standard library and pthread APIs at compile time. Since it places some +replaces the standard library and pthread APIs at compile time. Since it replaces some standard library includes, writing new tests may require working around certain limitations in terms of what the replacement headers and accompanying runtime can support. For example, Relacy's atomic replacements cannot support `++x`, so the @@ -18,10 +18,10 @@ stdexec library could needs to use `x.fetch_add(1)` to be compatible with Relacy Run the following commands from within this directory (`./tests/rrd`). ``` -# TODO: Merge patches into upstream Relacy @ dvyukov's version -git clone -b stdexec https://github.com/ccotter/relacy +git clone -b stdexec https://github.com/dvyukov/relacy CXX=g++-11 make -j 4 ./build/split +./build/async_scope ``` ## Recommended use diff --git a/test/rrd/async_scope.cpp b/test/rrd/async_scope.cpp index 8dc8d55fc..a9b96078d 100644 --- a/test/rrd/async_scope.cpp +++ b/test/rrd/async_scope.cpp @@ -1,51 +1,131 @@ -#include "../../relacy/relacy_std.hpp" -#include "../../relacy/relacy_cli.hpp" - -#include -#include -#include -#include -#include - -#include -#include -#include - -using rl::nvar; -using rl::nvolatile; -using rl::mutex; - -namespace ex = stdexec; -using exec::async_scope; - -struct async_scope_bug : rl::test_suite -{ - static size_t const dynamic_thread_count = 2; - - void thread(unsigned) - { - exec::static_thread_pool ctx{1}; - - ex::scheduler auto sch = ctx.get_scheduler(); - - exec::async_scope scope; - std::atomic_bool produced{false}; - ex::sender auto begin = ex::schedule(sch); - { - ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); })); - (void) ftr; - } - stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() { - RL_ASSERT(produced.load()); - })); - } -}; - -int main() -{ - rl::test_params p; - p.iteration_count = 50000; - p.execution_depth_limit = 10000; - rl::simulate(p); - return 0; -} +#include "../../relacy/relacy_std.hpp" +#include "../../relacy/relacy_cli.hpp" + +#include +#include +#include +#include + +#include +#include + +using rl::nvar; +using rl::nvolatile; +using rl::mutex; + +namespace ex = stdexec; +using exec::async_scope; + +struct drop_async_scope_future : rl::test_suite +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + exec::async_scope scope; + std::atomic_bool produced{false}; + ex::sender auto begin = ex::schedule(sch); + { + ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); })); + (void) ftr; + } + stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() { + RL_ASSERT(produced.load()); + })); + } +}; + +struct attach_async_scope_future : rl::test_suite +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + exec::async_scope scope; + std::atomic_bool produced{false}; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); })); + ex::sender auto ftr_then = std::move(ftr) | stdexec::then([&] { + RL_ASSERT(produced.load()); + }); + stdexec::sync_wait(stdexec::when_all(scope.on_empty(), std::move(ftr_then))); + } +}; + +struct async_scope_future_set_result : rl::test_suite +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + struct throwing_copy { + throwing_copy() = default; + throwing_copy(const throwing_copy&) { + throw std::logic_error(""); + } + }; + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([] { return throwing_copy(); })); + bool threw = false; + try { + stdexec::sync_wait(std::move(ftr)); + RL_ASSERT(false); + } catch (const std::logic_error&) { + threw = true; + } + RL_ASSERT(threw); + stdexec::sync_wait(scope.on_empty()); + } +}; + +template +struct async_scope_request_stop : rl::test_suite, 1> +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + if constexpr (test_case == 0) { + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin)); + scope.request_stop(); + stdexec::sync_wait(ex::when_all(scope.on_empty(), std::move(ftr))); + } else { + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + { + // Drop the future on the floor + ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin)); + } + scope.request_stop(); + stdexec::sync_wait(scope.on_empty()); + } + } +}; + +int main() +{ + rl::test_params p; + p.iteration_count = 100000; + p.execution_depth_limit = 10000; + rl::simulate(p); + rl::simulate(p); + rl::simulate(p); + rl::simulate>(p); + rl::simulate>(p); + return 0; +} diff --git a/test/rrd/split.cpp b/test/rrd/split.cpp index 330ebb39c..8354b4f3b 100644 --- a/test/rrd/split.cpp +++ b/test/rrd/split.cpp @@ -1,45 +1,38 @@ -#include "../../relacy/relacy_std.hpp" -#include "../../relacy/relacy_cli.hpp" - -#include -#include -#include -#include - -#include -#include -#include - -using rl::nvar; -using rl::nvolatile; -using rl::mutex; - -namespace ex = stdexec; -using exec::async_scope; - -struct split_bug : rl::test_suite -{ - static size_t const dynamic_thread_count = 2; - - void thread(unsigned) - { - exec::static_thread_pool pool{1}; - auto split = ex::schedule(pool.get_scheduler()) // - | ex::then([] { - return 42; - }) - | ex::split(); - - auto [val] = ex::sync_wait(split).value(); - RL_ASSERT(val == 42); - } -}; - -int main() -{ - rl::test_params p; - p.iteration_count = 50000; - p.execution_depth_limit = 10000; - rl::simulate(p); - return 0; -} +#include "../../relacy/relacy_std.hpp" +#include "../../relacy/relacy_cli.hpp" + +#include +#include + +using rl::nvar; +using rl::nvolatile; +using rl::mutex; + +namespace ex = stdexec; + +struct split_bug : rl::test_suite +{ + static size_t const dynamic_thread_count = 2; + + void thread(unsigned) + { + exec::static_thread_pool pool{1}; + auto split = ex::schedule(pool.get_scheduler()) // + | ex::then([] { + return 42; + }) + | ex::split(); + + auto [val] = ex::sync_wait(split).value(); + RL_ASSERT(val == 42); + } +}; + +int main() +{ + rl::test_params p; + p.iteration_count = 50000; + p.execution_depth_limit = 10000; + rl::simulate(p); + return 0; +}