Skip to content

Commit

Permalink
Fix async_scope cancel race
Browse files Browse the repository at this point in the history
Destroy the stop callback before destroying the future state during
`__complete_`.
  • Loading branch information
ccotter committed Sep 18, 2024
1 parent fe8718b commit d0970f3
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 118 deletions.
6 changes: 4 additions & 2 deletions include/exec/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "env.hpp"

#include <mutex>
#include <optional>

namespace exec {
/////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -313,6 +314,7 @@ namespace exec {

void __complete_() noexcept {
try {
__forward_consumer_.reset();
auto __state = std::move(__state_);
STDEXEC_ASSERT(__state != nullptr);
std::unique_lock __guard{__state->__mutex_};
Expand Down Expand Up @@ -354,7 +356,7 @@ namespace exec {
_Receiver __rcvr_;
std::unique_ptr<__future_state<_Sender, _Env>> __state_;
STDEXEC_ATTRIBUTE((no_unique_address))
__forward_consumer __forward_consumer_;
std::optional<__forward_consumer> __forward_consumer_;

public:
using __id = __future_op;
Expand Down Expand Up @@ -383,7 +385,7 @@ namespace exec {
}}
, __rcvr_(static_cast<_Receiver2&&>(__rcvr))
, __state_(std::move(__state))
, __forward_consumer_(get_stop_token(get_env(__rcvr_)),
, __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)),
__forward_stopped{&__state_->__stop_source_}) {
}

Expand Down
16 changes: 0 additions & 16 deletions test/rrd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@ all: tests
.PHONY: tests
tests: $(test_exe_files)

$(build_dir)/%.check-result: $(build_dir)/% always-run
@ \
printf '%s%s ...%s\n' $(ansi_term_bold) $(*) $(ansi_term_reset) >&2; \
$(<); \
status="$${?}"; \
printf %d "$${status}" >$(@); \
if [ "$${status}" -eq 0 ]; then \
printf '%s%s %s%s\n' $(ansi_term_green) $(*) OK $(ansi_term_reset); \
else \
printf '%s%s %s%s\n' $(ansi_term_red) $(*) FAIL $(ansi_term_reset); \
fi >&2; \
exit "$${status}"

$(build_dir)/%: $(build_dir)/%.cpp.o
$(LINK.cpp) $(^) -o $(@)

Expand All @@ -53,7 +40,4 @@ $(build_dir)/%.cpp.o: %.cpp
clean:
rm -fr -- $(build_dir)/

.PHONY: always-run
always-run:

-include $(o_files:=.d)
8 changes: 4 additions & 4 deletions test/rrd/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
## Relacy tests

[Relacy (RRD)](https://www.1024cores.net/home/relacy-race-detector/rrd-introduction)
is a data race detector. It replaces the OS scheduler with an scheduler that
is a data race detector. It replaces the OS scheduler with a scheduler that
explores many different thread interleavings, and logs detected races or assertion
failures. Relacy can also simulate relaxed hardware by simulating old values of a
variable as allowed by the C++11 memory model.

Relacy requires a specialized build. In particular, it is a header only library that
replaces the standard library and pthread APIs at compile time. Since it places some
replaces the standard library and pthread APIs at compile time. Since it replaces some
standard library includes, writing new tests may require working around certain
limitations in terms of what the replacement headers and accompanying runtime can
support. For example, Relacy's atomic replacements cannot support `++x`, so the
Expand All @@ -18,10 +18,10 @@ stdexec library could needs to use `x.fetch_add(1)` to be compatible with Relacy
Run the following commands from within this directory (`./tests/rrd`).

```
# TODO: Merge patches into upstream Relacy @ dvyukov's version
git clone -b stdexec https://github.com/ccotter/relacy
git clone -b stdexec https://github.com/dvyukov/relacy
CXX=g++-11 make -j 4
./build/split
./build/async_scope
```

## Recommended use
Expand Down
182 changes: 131 additions & 51 deletions test/rrd/async_scope.cpp
Original file line number Diff line number Diff line change
@@ -1,51 +1,131 @@
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <test_common/schedulers.hpp>
#include <test_common/type_helpers.hpp>

#include <chrono>
#include <random>
#include <iostream>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;
using exec::async_scope;

struct async_scope_bug : rl::test_suite<async_scope_bug, 1>
{
static size_t const dynamic_thread_count = 2;

void thread(unsigned)
{
exec::static_thread_pool ctx{1};

ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
std::atomic_bool produced{false};
ex::sender auto begin = ex::schedule(sch);
{
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); }));
(void) ftr;
}
stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() {
RL_ASSERT(produced.load());
}));
}
};

int main()
{
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
rl::simulate<async_scope_bug>(p);
return 0;
}
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <exec/single_thread_context.hpp>

#include <optional>
#include <stdexcept>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;
using exec::async_scope;

struct drop_async_scope_future : rl::test_suite<drop_async_scope_future, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
std::atomic_bool produced{false};
ex::sender auto begin = ex::schedule(sch);
{
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); }));
(void) ftr;
}
stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() {
RL_ASSERT(produced.load());
}));
}
};

struct attach_async_scope_future : rl::test_suite<attach_async_scope_future, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
std::atomic_bool produced{false};
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); }));
ex::sender auto ftr_then = std::move(ftr) | stdexec::then([&] {
RL_ASSERT(produced.load());
});
stdexec::sync_wait(stdexec::when_all(scope.on_empty(), std::move(ftr_then)));
}
};

struct async_scope_future_set_result : rl::test_suite<async_scope_future_set_result, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
struct throwing_copy {
throwing_copy() = default;
throwing_copy(const throwing_copy&) {
throw std::logic_error("");
}
};
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([] { return throwing_copy(); }));
bool threw = false;
try {
stdexec::sync_wait(std::move(ftr));
RL_ASSERT(false);
} catch (const std::logic_error&) {
threw = true;
}
RL_ASSERT(threw);
stdexec::sync_wait(scope.on_empty());
}
};

template <int test_case>
struct async_scope_request_stop : rl::test_suite<async_scope_request_stop<test_case>, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

if constexpr (test_case == 0) {
exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin));
scope.request_stop();
stdexec::sync_wait(ex::when_all(scope.on_empty(), std::move(ftr)));
} else {
exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
{
// Drop the future on the floor
ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin));
}
scope.request_stop();
stdexec::sync_wait(scope.on_empty());
}
}
};

int main()
{
rl::test_params p;
p.iteration_count = 100000;
p.execution_depth_limit = 10000;
rl::simulate<drop_async_scope_future>(p);
rl::simulate<attach_async_scope_future>(p);
rl::simulate<async_scope_future_set_result>(p);
rl::simulate<async_scope_request_stop<0>>(p);
rl::simulate<async_scope_request_stop<1>>(p);
return 0;
}
83 changes: 38 additions & 45 deletions test/rrd/split.cpp
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <test_common/schedulers.hpp>

#include <chrono>
#include <random>
#include <iostream>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;
using exec::async_scope;

struct split_bug : rl::test_suite<split_bug, 1>
{
static size_t const dynamic_thread_count = 2;

void thread(unsigned)
{
exec::static_thread_pool pool{1};
auto split = ex::schedule(pool.get_scheduler()) //
| ex::then([] {
return 42;
})
| ex::split();

auto [val] = ex::sync_wait(split).value();
RL_ASSERT(val == 42);
}
};

int main()
{
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
rl::simulate<split_bug>(p);
return 0;
}
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/static_thread_pool.hpp>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;

struct split_bug : rl::test_suite<split_bug, 1>
{
static size_t const dynamic_thread_count = 2;

void thread(unsigned)
{
exec::static_thread_pool pool{1};
auto split = ex::schedule(pool.get_scheduler()) //
| ex::then([] {
return 42;
})
| ex::split();

auto [val] = ex::sync_wait(split).value();
RL_ASSERT(val == 42);
}
};

int main()
{
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
rl::simulate<split_bug>(p);
return 0;
}

0 comments on commit d0970f3

Please sign in to comment.