Skip to content

Commit

Permalink
Merge pull request #1131 from NVIDIA/execute-domain
Browse files Browse the repository at this point in the history
port the `execute` algorithm to use `apply_sender`
  • Loading branch information
ericniebler authored Nov 2, 2023
2 parents 3a0439c + 5e58038 commit f786cbf
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 79 deletions.
1 change: 1 addition & 0 deletions include/exec/__detail/__bwos_lifo_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <atomic>
#include <bit>
#include <cstdint>
#include <memory>
#include <new>
#include <utility>
Expand Down
7 changes: 7 additions & 0 deletions include/stdexec/__detail/__config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@
#define STDEXEC_HAS_STD_RANGES() 0
#endif

#if __has_include(<memory_resource>) && \
(defined(__cpp_lib_memory_resource) && __cpp_lib_memory_resource >= 201603L)
#define STDEXEC_HAS_STD_MEMORY_RESOURCE() 1
#else
#define STDEXEC_HAS_STD_MEMORY_RESOURCE() 0
#endif

#ifdef STDEXEC_ASSERT
#error "Redefinition of STDEXEC_ASSERT is not permitted. Define STDEXEC_ASSERT_FN instead."
#endif
Expand Down
141 changes: 62 additions & 79 deletions include/stdexec/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2426,76 +2426,72 @@ namespace stdexec {
using __with_awaitable_senders::__continuation_handle;
#endif

namespace {
inline constexpr auto __ref = []<class _Ty>(_Ty& __ty) noexcept {
return [__ty = &__ty]() noexcept -> decltype(auto) {
return (*__ty);
};
};
}

template <class _Ty>
using __ref_t = decltype(__ref(__declval<_Ty&>()));

/////////////////////////////////////////////////////////////////////////////
// NOT TO SPEC: __submit
namespace __submit_ {
template <class _ReceiverId>
struct __operation_base;

template <class _ReceiverId>
struct __operation_base {
using _Receiver = __t<_ReceiverId>;
_Receiver __rcvr_;
template <class _OpRef>
struct __receiver {
using is_receiver = void;
using __t = __receiver;
using __id = __receiver;

using __delete_fn_t = void(__operation_base<_ReceiverId>*) noexcept;
__delete_fn_t* __delete_;
};
using _Operation = __decay_t<__call_result_t<_OpRef>>;
using _Receiver = stdexec::__t<__mapply<__q<__msecond>, _Operation>>;

template <class _ReceiverId>
struct __receiver {
using _Receiver = stdexec::__t<_ReceiverId>;
_OpRef __opref_;

struct __t {
using is_receiver = void;
using __id = __receiver;
__operation_base<_ReceiverId>* __op_state_;
// Forward all the receiver ops, and delete the operation state.
template <__completion_tag _Tag, class... _As>
requires __callable<_Tag, _Receiver, _As...>
friend void tag_invoke(_Tag __tag, __receiver&& __self, _As&&... __as) noexcept {
__tag((_Receiver&&) __self.__opref_().__rcvr_, (_As&&) __as...);
__self.__delete_op();
}

// Forward all the receiver ops, and delete the operation state.
template <__completion_tag _Tag, class... _As>
requires __callable<_Tag, _Receiver, _As...>
friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept(
__nothrow_callable<_Tag, _Receiver, _As...>) {
// Delete the state as cleanup:
auto __g = __scope_guard{__self.__op_state_->__delete_, __self.__op_state_};
return __tag((_Receiver&&) __self.__op_state_->__rcvr_, (_As&&) __as...);
void __delete_op() noexcept {
_Operation* __op = &__opref_();
if constexpr (__callable<get_allocator_t, env_of_t<_Receiver>>) {
auto&& __env = get_env(__op->__rcvr_);
auto __alloc = get_allocator(__env);
using _Alloc = decltype(__alloc);
using _OpAlloc = typename std::allocator_traits<_Alloc>::template rebind_alloc<_Operation>;
_OpAlloc __op_alloc{__alloc};
std::allocator_traits<_OpAlloc>::destroy(__op_alloc, __op);
std::allocator_traits<_OpAlloc>::deallocate(__op_alloc, __op, 1);
} else {
delete __op;
}
}

// Forward all receiever queries.
friend auto tag_invoke(get_env_t, const __t& __self) noexcept -> env_of_t<_Receiver> {
return get_env(__self.__op_state_->__rcvr_);
}
};
// Forward all receiever queries.
friend auto tag_invoke(get_env_t, const __receiver& __self) noexcept -> env_of_t<_Receiver&> {
return get_env(__self.__opref_().__rcvr_);
}
};
template <class _ReceiverId>
using __receiver_t = __t<__receiver<_ReceiverId>>;

template <class _SenderId, class _ReceiverId>
struct __operation : __operation_base<_ReceiverId> {
struct __operation {
using _Sender = stdexec::__t<_SenderId>;
using _Receiver = stdexec::__t<_ReceiverId>;
using __receiver_t = __receiver<__ref_t<__operation>>;

connect_result_t<_Sender, __receiver_t<_ReceiverId>> __op_state_;

template <__decays_to<_Receiver> _CvrefReceiver>
__operation(_Sender&& __sndr, _CvrefReceiver&& __rcvr)
: __operation_base<_ReceiverId>{
(_CvrefReceiver&&) __rcvr,
[](__operation_base<_ReceiverId>* __self) noexcept {
__operation* __op = static_cast<__operation*>(__self);
if constexpr (__callable<get_allocator_t, env_of_t<_Receiver>>) {
auto&& __env = get_env(__self->__rcvr_);
auto __alloc = get_allocator(__env);
using _Alloc = decltype(__alloc);
using _Op = __operation;
using _OpAlloc = typename std::allocator_traits<_Alloc>::template rebind_alloc<_Op>;
_OpAlloc __op_alloc{__alloc};
std::allocator_traits<_OpAlloc>::destroy(__op_alloc, __op);
std::allocator_traits<_OpAlloc>::deallocate(__op_alloc, __op, 1);
} else {
delete __op;
}
}}
, __op_state_(connect((_Sender&&) __sndr, __receiver_t<_ReceiverId>{this})) {
STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
connect_result_t<_Sender, __receiver_t> __op_state_;

__operation(_Sender&& __sndr, _Receiver __rcvr)
: __rcvr_((_Receiver&&) __rcvr)
, __op_state_(connect((_Sender&&) __sndr, __receiver_t{__ref(*this)})) {
}
};

Expand Down Expand Up @@ -2640,7 +2636,6 @@ namespace stdexec {
_Sender),
tag_invoke_t(start_detached_t, _Sender)>;

// Default implementation goes here:
template <class _Sender, class _Env = empty_env>
requires sender_to<_Sender, __detached_receiver_t<_Env>>
void apply_sender(_Sender&& __sndr, _Env&& __env = {}) const {
Expand Down Expand Up @@ -2778,18 +2773,20 @@ namespace stdexec {
struct execute_t {
template <scheduler _Scheduler, class _Fun>
requires __callable<_Fun&> && move_constructible<_Fun>
void operator()(_Scheduler&& __sched, _Fun __fun) const //
noexcept(noexcept(
__submit(schedule((_Scheduler&&) __sched), __as_receiver<_Fun>{(_Fun&&) __fun}))) {
(void) __submit(schedule((_Scheduler&&) __sched), __as_receiver<_Fun>{(_Fun&&) __fun});
void operator()(_Scheduler&& __sched, _Fun __fun) const noexcept(false) {
// Look for a legacy customization
if constexpr (tag_invocable<execute_t, _Scheduler, _Fun>) {
tag_invoke(execute_t{}, (_Scheduler&&) __sched, (_Fun&&) __fun);
} else {
auto __domain = query_or(get_domain, __sched, default_domain());
stdexec::apply_sender(__domain, *this, schedule((_Scheduler&&) __sched), (_Fun&&) __fun);
}
}

template <scheduler _Scheduler, class _Fun>
template <sender_of<set_value_t()> _Sender, class _Fun>
requires __callable<_Fun&> && move_constructible<_Fun>
&& tag_invocable<execute_t, _Scheduler, _Fun>
void operator()(_Scheduler&& __sched, _Fun __fun) const
noexcept(nothrow_tag_invocable<execute_t, _Scheduler, _Fun>) {
(void) tag_invoke(execute_t{}, (_Scheduler&&) __sched, (_Fun&&) __fun);
void apply_sender(_Sender&& __sndr, _Fun __fun) const noexcept(false) {
__submit((_Sender&&) __sndr, __as_receiver<_Fun>{(_Fun&&) __fun});
}
};
}
Expand Down Expand Up @@ -7101,20 +7098,6 @@ namespace stdexec {
};
};

struct sync_wait_t;

using _Sender = __0;
template <class _Tag>
using __cust_sigs = __types<
// For legacy reasons:
tag_invoke_t(_Tag, get_completion_scheduler_t<set_value_t>(get_env_t(const _Sender&)), _Sender),
tag_invoke_t(_Tag, __get_sender_domain_t STDEXEC_MSVC((*) )(const _Sender&), _Sender),
tag_invoke_t(_Tag, _Sender)>;

template <class _Tag, class _Sender>
inline constexpr bool __is_sync_wait_customized =
__minvocable<__which<__cust_sigs<_Tag>>, _Sender>;

template <class _Sender>
using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>;

Expand Down
50 changes: 50 additions & 0 deletions test/stdexec/algos/consumers/test_start_detached.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

#include <chrono>

#if STDEXEC_HAS_STD_MEMORY_RESOURCE()
#include <memory_resource>
#endif

namespace ex = stdexec;

using namespace std::chrono_literals;
Expand Down Expand Up @@ -160,5 +164,51 @@ namespace {
exec::make_env(exec::with(ex::get_scheduler, custom_scheduler{})));
CHECK_FALSE(called);
}

#if STDEXEC_HAS_STD_MEMORY_RESOURCE() && \
(defined(__cpp_lib_polymorphic_allocator) && __cpp_lib_polymorphic_allocator >= 201902L)

struct counting_resource : std::pmr::memory_resource {
counting_resource() = default;

std::size_t get_count() const noexcept {
return count;
}

std::size_t get_alive() const noexcept {
return alive;
}
private:
void* do_allocate(std::size_t bytes, std::size_t alignment) override {
++count;
++alive;
return std::pmr::new_delete_resource()->allocate(bytes, alignment);
}
void do_deallocate(void* p, std::size_t bytes, std::size_t alignment) override {
--alive;
return std::pmr::new_delete_resource()->deallocate(p, bytes, alignment);
}
bool do_is_equal(const memory_resource& other) const noexcept override {
return this == &other;
}

std::size_t count = 0, alive = 0;
};

// NOT TO SPEC
TEST_CASE("start_detached works with a custom allocator", "[consumers][start_detached]") {
bool called = false;
counting_resource res;
std::pmr::polymorphic_allocator<std::byte> alloc(&res);
ex::start_detached(
ex::just() | ex::then([&] { called = true; }),
exec::make_env(exec::with(ex::get_allocator, alloc)));
CHECK(called);
CHECK(res.get_count() == 1);
CHECK(res.get_alive() == 0);
}
#endif

}

STDEXEC_PRAGMA_POP()

0 comments on commit f786cbf

Please sign in to comment.