diff --git a/include/exec/async_scope.hpp b/include/exec/async_scope.hpp index 44ed49446..9bdc24f68 100644 --- a/include/exec/async_scope.hpp +++ b/include/exec/async_scope.hpp @@ -18,6 +18,7 @@ #include "../stdexec/execution.hpp" #include "../stdexec/stop_token.hpp" #include "../stdexec/__detail/__intrusive_queue.hpp" +#include "../stdexec/__detail/__optional.hpp" #include "env.hpp" #include @@ -313,6 +314,7 @@ namespace exec { void __complete_() noexcept { try { + __forward_consumer_.reset(); auto __state = std::move(__state_); STDEXEC_ASSERT(__state != nullptr); std::unique_lock __guard{__state->__mutex_}; @@ -354,7 +356,7 @@ namespace exec { _Receiver __rcvr_; std::unique_ptr<__future_state<_Sender, _Env>> __state_; STDEXEC_ATTRIBUTE((no_unique_address)) - __forward_consumer __forward_consumer_; + stdexec::__optional<__forward_consumer> __forward_consumer_; public: using __id = __future_op; @@ -383,7 +385,7 @@ namespace exec { }} , __rcvr_(static_cast<_Receiver2&&>(__rcvr)) , __state_(std::move(__state)) - , __forward_consumer_(get_stop_token(get_env(__rcvr_)), + , __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)), __forward_stopped{&__state_->__stop_source_}) { } @@ -503,7 +505,7 @@ namespace exec { } inplace_stop_source __stop_source_; - std::optional> __forward_scope_; + stdexec::__optional> __forward_scope_; std::mutex __mutex_; __future_step __step_ = __future_step::__created; std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> __no_future_; @@ -525,7 +527,7 @@ namespace exec { void __dispatch_result_(std::unique_lock& __guard) noexcept { auto& __state = *__state_; auto __local_subscribers = std::move(__state.__subscribers_); - __state.__forward_scope_ = std::nullopt; + __state.__forward_scope_.reset(); if (__state.__no_future_.get() != nullptr) { // nobody is waiting for the results // delete this and return diff --git a/test/exec/test_type_async_scope.cpp b/test/exec/test_type_async_scope.cpp index 53f8d6f9a..abec4f5b8 100644 --- a/test/exec/test_type_async_scope.cpp +++ b/test/exec/test_type_async_scope.cpp @@ -108,5 +108,15 @@ namespace { stdexec::sync_wait(scope.on_empty()); expect_empty(scope); } + + SECTION("request_stop nested spawn_future") { + exec::static_thread_pool ctx{1}; + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin)); + scope.request_stop(); + stdexec::sync_wait(ex::when_all(scope.on_empty(), std::move(ftr))); + // Verify the program finishes without crashing + } } } // namespace diff --git a/test/rrd/Makefile b/test/rrd/Makefile index 347e40aeb..4bc190633 100644 --- a/test/rrd/Makefile +++ b/test/rrd/Makefile @@ -29,19 +29,6 @@ all: tests .PHONY: tests tests: $(test_exe_files) -$(build_dir)/%.check-result: $(build_dir)/% always-run - @ \ - printf '%s%s ...%s\n' $(ansi_term_bold) $(*) $(ansi_term_reset) >&2; \ - $(<); \ - status="$${?}"; \ - printf %d "$${status}" >$(@); \ - if [ "$${status}" -eq 0 ]; then \ - printf '%s%s %s%s\n' $(ansi_term_green) $(*) OK $(ansi_term_reset); \ - else \ - printf '%s%s %s%s\n' $(ansi_term_red) $(*) FAIL $(ansi_term_reset); \ - fi >&2; \ - exit "$${status}" - $(build_dir)/%: $(build_dir)/%.cpp.o $(LINK.cpp) $(^) -o $(@) @@ -53,7 +40,4 @@ $(build_dir)/%.cpp.o: %.cpp clean: rm -fr -- $(build_dir)/ -.PHONY: always-run -always-run: - -include $(o_files:=.d) diff --git a/test/rrd/README.md b/test/rrd/README.md index 1701aae49..59037e121 100644 --- a/test/rrd/README.md +++ b/test/rrd/README.md @@ -1,13 +1,13 @@ ## Relacy tests [Relacy (RRD)](https://www.1024cores.net/home/relacy-race-detector/rrd-introduction) -is a data race detector. It replaces the OS scheduler with an scheduler that +is a data race detector. It replaces the OS scheduler with a scheduler that explores many different thread interleavings, and logs detected races or assertion failures. Relacy can also simulate relaxed hardware by simulating old values of a variable as allowed by the C++11 memory model. Relacy requires a specialized build. In particular, it is a header only library that -replaces the standard library and pthread APIs at compile time. Since it places some +replaces the standard library and pthread APIs at compile time. Since it replaces some standard library includes, writing new tests may require working around certain limitations in terms of what the replacement headers and accompanying runtime can support. For example, Relacy's atomic replacements cannot support `++x`, so the @@ -18,10 +18,10 @@ stdexec library could needs to use `x.fetch_add(1)` to be compatible with Relacy Run the following commands from within this directory (`./tests/rrd`). ``` -# TODO: Merge patches into upstream Relacy @ dvyukov's version -git clone -b stdexec https://github.com/ccotter/relacy +git clone -b stdexec https://github.com/dvyukov/relacy CXX=g++-11 make -j 4 ./build/split +./build/async_scope ``` ## Recommended use diff --git a/test/rrd/async_scope.cpp b/test/rrd/async_scope.cpp index 8dc8d55fc..a9b96078d 100644 --- a/test/rrd/async_scope.cpp +++ b/test/rrd/async_scope.cpp @@ -1,51 +1,131 @@ -#include "../../relacy/relacy_std.hpp" -#include "../../relacy/relacy_cli.hpp" - -#include -#include -#include -#include -#include - -#include -#include -#include - -using rl::nvar; -using rl::nvolatile; -using rl::mutex; - -namespace ex = stdexec; -using exec::async_scope; - -struct async_scope_bug : rl::test_suite -{ - static size_t const dynamic_thread_count = 2; - - void thread(unsigned) - { - exec::static_thread_pool ctx{1}; - - ex::scheduler auto sch = ctx.get_scheduler(); - - exec::async_scope scope; - std::atomic_bool produced{false}; - ex::sender auto begin = ex::schedule(sch); - { - ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); })); - (void) ftr; - } - stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() { - RL_ASSERT(produced.load()); - })); - } -}; - -int main() -{ - rl::test_params p; - p.iteration_count = 50000; - p.execution_depth_limit = 10000; - rl::simulate(p); - return 0; -} +#include "../../relacy/relacy_std.hpp" +#include "../../relacy/relacy_cli.hpp" + +#include +#include +#include +#include + +#include +#include + +using rl::nvar; +using rl::nvolatile; +using rl::mutex; + +namespace ex = stdexec; +using exec::async_scope; + +struct drop_async_scope_future : rl::test_suite +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + exec::async_scope scope; + std::atomic_bool produced{false}; + ex::sender auto begin = ex::schedule(sch); + { + ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); })); + (void) ftr; + } + stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() { + RL_ASSERT(produced.load()); + })); + } +}; + +struct attach_async_scope_future : rl::test_suite +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + exec::async_scope scope; + std::atomic_bool produced{false}; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); })); + ex::sender auto ftr_then = std::move(ftr) | stdexec::then([&] { + RL_ASSERT(produced.load()); + }); + stdexec::sync_wait(stdexec::when_all(scope.on_empty(), std::move(ftr_then))); + } +}; + +struct async_scope_future_set_result : rl::test_suite +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + struct throwing_copy { + throwing_copy() = default; + throwing_copy(const throwing_copy&) { + throw std::logic_error(""); + } + }; + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([] { return throwing_copy(); })); + bool threw = false; + try { + stdexec::sync_wait(std::move(ftr)); + RL_ASSERT(false); + } catch (const std::logic_error&) { + threw = true; + } + RL_ASSERT(threw); + stdexec::sync_wait(scope.on_empty()); + } +}; + +template +struct async_scope_request_stop : rl::test_suite, 1> +{ + static size_t const dynamic_thread_count = 1; + + void thread(unsigned) + { + exec::single_thread_context ctx; + ex::scheduler auto sch = ctx.get_scheduler(); + + if constexpr (test_case == 0) { + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin)); + scope.request_stop(); + stdexec::sync_wait(ex::when_all(scope.on_empty(), std::move(ftr))); + } else { + exec::async_scope scope; + ex::sender auto begin = ex::schedule(sch); + { + // Drop the future on the floor + ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin)); + } + scope.request_stop(); + stdexec::sync_wait(scope.on_empty()); + } + } +}; + +int main() +{ + rl::test_params p; + p.iteration_count = 100000; + p.execution_depth_limit = 10000; + rl::simulate(p); + rl::simulate(p); + rl::simulate(p); + rl::simulate>(p); + rl::simulate>(p); + return 0; +} diff --git a/test/rrd/split.cpp b/test/rrd/split.cpp index 330ebb39c..8354b4f3b 100644 --- a/test/rrd/split.cpp +++ b/test/rrd/split.cpp @@ -1,45 +1,38 @@ -#include "../../relacy/relacy_std.hpp" -#include "../../relacy/relacy_cli.hpp" - -#include -#include -#include -#include - -#include -#include -#include - -using rl::nvar; -using rl::nvolatile; -using rl::mutex; - -namespace ex = stdexec; -using exec::async_scope; - -struct split_bug : rl::test_suite -{ - static size_t const dynamic_thread_count = 2; - - void thread(unsigned) - { - exec::static_thread_pool pool{1}; - auto split = ex::schedule(pool.get_scheduler()) // - | ex::then([] { - return 42; - }) - | ex::split(); - - auto [val] = ex::sync_wait(split).value(); - RL_ASSERT(val == 42); - } -}; - -int main() -{ - rl::test_params p; - p.iteration_count = 50000; - p.execution_depth_limit = 10000; - rl::simulate(p); - return 0; -} +#include "../../relacy/relacy_std.hpp" +#include "../../relacy/relacy_cli.hpp" + +#include +#include + +using rl::nvar; +using rl::nvolatile; +using rl::mutex; + +namespace ex = stdexec; + +struct split_bug : rl::test_suite +{ + static size_t const dynamic_thread_count = 2; + + void thread(unsigned) + { + exec::static_thread_pool pool{1}; + auto split = ex::schedule(pool.get_scheduler()) // + | ex::then([] { + return 42; + }) + | ex::split(); + + auto [val] = ex::sync_wait(split).value(); + RL_ASSERT(val == 42); + } +}; + +int main() +{ + rl::test_params p; + p.iteration_count = 50000; + p.execution_depth_limit = 10000; + rl::simulate(p); + return 0; +}