Skip to content

Commit

Permalink
Merge branch 'main' into continues-starts-on-rename
Browse files Browse the repository at this point in the history
  • Loading branch information
ericniebler authored Sep 19, 2024
2 parents 5c1def1 + 4feb2ed commit 02d3922
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 120 deletions.
10 changes: 6 additions & 4 deletions include/exec/async_scope.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "../stdexec/execution.hpp"
#include "../stdexec/stop_token.hpp"
#include "../stdexec/__detail/__intrusive_queue.hpp"
#include "../stdexec/__detail/__optional.hpp"
#include "env.hpp"

#include <mutex>
Expand Down Expand Up @@ -313,6 +314,7 @@ namespace exec {

void __complete_() noexcept {
try {
__forward_consumer_.reset();
auto __state = std::move(__state_);
STDEXEC_ASSERT(__state != nullptr);
std::unique_lock __guard{__state->__mutex_};
Expand Down Expand Up @@ -354,7 +356,7 @@ namespace exec {
_Receiver __rcvr_;
std::unique_ptr<__future_state<_Sender, _Env>> __state_;
STDEXEC_ATTRIBUTE((no_unique_address))
__forward_consumer __forward_consumer_;
stdexec::__optional<__forward_consumer> __forward_consumer_;

public:
using __id = __future_op;
Expand Down Expand Up @@ -383,7 +385,7 @@ namespace exec {
}}
, __rcvr_(static_cast<_Receiver2&&>(__rcvr))
, __state_(std::move(__state))
, __forward_consumer_(get_stop_token(get_env(__rcvr_)),
, __forward_consumer_(std::in_place, get_stop_token(get_env(__rcvr_)),
__forward_stopped{&__state_->__stop_source_}) {
}

Expand Down Expand Up @@ -503,7 +505,7 @@ namespace exec {
}

inplace_stop_source __stop_source_;
std::optional<inplace_stop_callback<__forward_stopped>> __forward_scope_;
stdexec::__optional<inplace_stop_callback<__forward_stopped>> __forward_scope_;
std::mutex __mutex_;
__future_step __step_ = __future_step::__created;
std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>> __no_future_;
Expand All @@ -525,7 +527,7 @@ namespace exec {
void __dispatch_result_(std::unique_lock<std::mutex>& __guard) noexcept {
auto& __state = *__state_;
auto __local_subscribers = std::move(__state.__subscribers_);
__state.__forward_scope_ = std::nullopt;
__state.__forward_scope_.reset();
if (__state.__no_future_.get() != nullptr) {
// nobody is waiting for the results
// delete this and return
Expand Down
10 changes: 10 additions & 0 deletions test/exec/test_type_async_scope.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,15 @@ namespace {
stdexec::sync_wait(scope.on_empty());
expect_empty(scope);
}

SECTION("request_stop nested spawn_future") {
exec::static_thread_pool ctx{1};
exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin));
scope.request_stop();
stdexec::sync_wait(ex::when_all(scope.on_empty(), std::move(ftr)));
// Verify the program finishes without crashing
}
}
} // namespace
16 changes: 0 additions & 16 deletions test/rrd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,6 @@ all: tests
.PHONY: tests
tests: $(test_exe_files)

$(build_dir)/%.check-result: $(build_dir)/% always-run
@ \
printf '%s%s ...%s\n' $(ansi_term_bold) $(*) $(ansi_term_reset) >&2; \
$(<); \
status="$${?}"; \
printf %d "$${status}" >$(@); \
if [ "$${status}" -eq 0 ]; then \
printf '%s%s %s%s\n' $(ansi_term_green) $(*) OK $(ansi_term_reset); \
else \
printf '%s%s %s%s\n' $(ansi_term_red) $(*) FAIL $(ansi_term_reset); \
fi >&2; \
exit "$${status}"

$(build_dir)/%: $(build_dir)/%.cpp.o
$(LINK.cpp) $(^) -o $(@)

Expand All @@ -53,7 +40,4 @@ $(build_dir)/%.cpp.o: %.cpp
clean:
rm -fr -- $(build_dir)/

.PHONY: always-run
always-run:

-include $(o_files:=.d)
8 changes: 4 additions & 4 deletions test/rrd/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
## Relacy tests

[Relacy (RRD)](https://www.1024cores.net/home/relacy-race-detector/rrd-introduction)
is a data race detector. It replaces the OS scheduler with an scheduler that
is a data race detector. It replaces the OS scheduler with a scheduler that
explores many different thread interleavings, and logs detected races or assertion
failures. Relacy can also simulate relaxed hardware by simulating old values of a
variable as allowed by the C++11 memory model.

Relacy requires a specialized build. In particular, it is a header only library that
replaces the standard library and pthread APIs at compile time. Since it places some
replaces the standard library and pthread APIs at compile time. Since it replaces some
standard library includes, writing new tests may require working around certain
limitations in terms of what the replacement headers and accompanying runtime can
support. For example, Relacy's atomic replacements cannot support `++x`, so the
Expand All @@ -18,10 +18,10 @@ stdexec library could needs to use `x.fetch_add(1)` to be compatible with Relacy
Run the following commands from within this directory (`./tests/rrd`).

```
# TODO: Merge patches into upstream Relacy @ dvyukov's version
git clone -b stdexec https://github.com/ccotter/relacy
git clone -b stdexec https://github.com/dvyukov/relacy
CXX=g++-11 make -j 4
./build/split
./build/async_scope
```

## Recommended use
Expand Down
182 changes: 131 additions & 51 deletions test/rrd/async_scope.cpp
Original file line number Diff line number Diff line change
@@ -1,51 +1,131 @@
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <test_common/schedulers.hpp>
#include <test_common/type_helpers.hpp>

#include <chrono>
#include <random>
#include <iostream>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;
using exec::async_scope;

struct async_scope_bug : rl::test_suite<async_scope_bug, 1>
{
static size_t const dynamic_thread_count = 2;

void thread(unsigned)
{
exec::static_thread_pool ctx{1};

ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
std::atomic_bool produced{false};
ex::sender auto begin = ex::schedule(sch);
{
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); }));
(void) ftr;
}
stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() {
RL_ASSERT(produced.load());
}));
}
};

int main()
{
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
rl::simulate<async_scope_bug>(p);
return 0;
}
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <exec/single_thread_context.hpp>

#include <optional>
#include <stdexcept>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;
using exec::async_scope;

struct drop_async_scope_future : rl::test_suite<drop_async_scope_future, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
std::atomic_bool produced{false};
ex::sender auto begin = ex::schedule(sch);
{
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); }));
(void) ftr;
}
stdexec::sync_wait(scope.on_empty() | stdexec::then([&]() {
RL_ASSERT(produced.load());
}));
}
};

struct attach_async_scope_future : rl::test_suite<attach_async_scope_future, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
std::atomic_bool produced{false};
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([&]() { produced.store(true); }));
ex::sender auto ftr_then = std::move(ftr) | stdexec::then([&] {
RL_ASSERT(produced.load());
});
stdexec::sync_wait(stdexec::when_all(scope.on_empty(), std::move(ftr_then)));
}
};

struct async_scope_future_set_result : rl::test_suite<async_scope_future_set_result, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
struct throwing_copy {
throwing_copy() = default;
throwing_copy(const throwing_copy&) {
throw std::logic_error("");
}
};
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(begin | stdexec::then([] { return throwing_copy(); }));
bool threw = false;
try {
stdexec::sync_wait(std::move(ftr));
RL_ASSERT(false);
} catch (const std::logic_error&) {
threw = true;
}
RL_ASSERT(threw);
stdexec::sync_wait(scope.on_empty());
}
};

template <int test_case>
struct async_scope_request_stop : rl::test_suite<async_scope_request_stop<test_case>, 1>
{
static size_t const dynamic_thread_count = 1;

void thread(unsigned)
{
exec::single_thread_context ctx;
ex::scheduler auto sch = ctx.get_scheduler();

if constexpr (test_case == 0) {
exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin));
scope.request_stop();
stdexec::sync_wait(ex::when_all(scope.on_empty(), std::move(ftr)));
} else {
exec::async_scope scope;
ex::sender auto begin = ex::schedule(sch);
{
// Drop the future on the floor
ex::sender auto ftr = scope.spawn_future(scope.spawn_future(begin));
}
scope.request_stop();
stdexec::sync_wait(scope.on_empty());
}
}
};

int main()
{
rl::test_params p;
p.iteration_count = 100000;
p.execution_depth_limit = 10000;
rl::simulate<drop_async_scope_future>(p);
rl::simulate<attach_async_scope_future>(p);
rl::simulate<async_scope_future_set_result>(p);
rl::simulate<async_scope_request_stop<0>>(p);
rl::simulate<async_scope_request_stop<1>>(p);
return 0;
}
83 changes: 38 additions & 45 deletions test/rrd/split.cpp
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/async_scope.hpp>
#include <exec/static_thread_pool.hpp>
#include <test_common/schedulers.hpp>

#include <chrono>
#include <random>
#include <iostream>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;
using exec::async_scope;

struct split_bug : rl::test_suite<split_bug, 1>
{
static size_t const dynamic_thread_count = 2;

void thread(unsigned)
{
exec::static_thread_pool pool{1};
auto split = ex::schedule(pool.get_scheduler()) //
| ex::then([] {
return 42;
})
| ex::split();

auto [val] = ex::sync_wait(split).value();
RL_ASSERT(val == 42);
}
};

int main()
{
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
rl::simulate<split_bug>(p);
return 0;
}
#include "../../relacy/relacy_std.hpp"
#include "../../relacy/relacy_cli.hpp"

#include <stdexec/execution.hpp>
#include <exec/static_thread_pool.hpp>

using rl::nvar;
using rl::nvolatile;
using rl::mutex;

namespace ex = stdexec;

struct split_bug : rl::test_suite<split_bug, 1>
{
static size_t const dynamic_thread_count = 2;

void thread(unsigned)
{
exec::static_thread_pool pool{1};
auto split = ex::schedule(pool.get_scheduler()) //
| ex::then([] {
return 42;
})
| ex::split();

auto [val] = ex::sync_wait(split).value();
RL_ASSERT(val == 42);
}
};

int main()
{
rl::test_params p;
p.iteration_count = 50000;
p.execution_depth_limit = 10000;
rl::simulate<split_bug>(p);
return 0;
}

0 comments on commit 02d3922

Please sign in to comment.