Skip to content

Commit

Permalink
Merge pull request #1048 from NVIDIA/split-transform-sender-customiza…
Browse files Browse the repository at this point in the history
…tion

port the `split` algorithm over to use `transform_sender` for customization
  • Loading branch information
ericniebler authored Aug 26, 2023
2 parents 968e780 + e12d88e commit 31abdb2
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 90 deletions.
4 changes: 2 additions & 2 deletions include/stdexec/__detail/__meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ namespace stdexec {
template <class _Ty>
using __id = __minvoke<__id_<__has_id<_Ty>>, _Ty>;

template <class _Ty>
using __cvref_t = __copy_cvref_t<_Ty, __t<std::remove_cvref_t<_Ty>>>;
template <class _From, class _To = __decay_t<_From>>
using __cvref_t = __copy_cvref_t<_From, __t<_To>>;

template <class _From, class _To = __decay_t<_From>>
using __cvref_id = __copy_cvref_t<_From, __id<_To>>;
Expand Down
4 changes: 2 additions & 2 deletions include/stdexec/__detail/__sender_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ namespace stdexec {
return [... __captures = (_Captures&&) __captures]<class _Cvref, class _Fun>(
_Cvref, _Fun && __fun) mutable //
noexcept(__nothrow_callable<_Fun, _Tag, __minvoke<_Cvref, _Captures>...>) //
-> decltype(auto) //
-> __call_result_t<_Fun, _Tag, __minvoke<_Cvref, _Captures>...>
requires __callable<_Fun, _Tag, __minvoke<_Cvref, _Captures>...>
{
return ((_Fun&&) __fun)(
Expand Down Expand Up @@ -291,7 +291,7 @@ namespace stdexec {
extern __lazy_sender_name __name_of_v<__basic_sender<_ImplOf>>;

template <__has_id _Sender>
requires (!same_as<__id<_Sender>, _Sender>)
requires(!same_as<__id<_Sender>, _Sender>)
extern __id_name __name_of_v<_Sender>;
} // namespace __detail

Expand Down
196 changes: 110 additions & 86 deletions include/stdexec/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -993,8 +993,8 @@ namespace stdexec {
struct __invalid_completion {
struct __t {
template <class _CvrefSenderId, class _Env, class... _Sigs>
// BUGBUG this works around a recently (aug 2023) introduced regression in nvc++
requires (!__one_of<_Sig, _Sigs...>)
// BUGBUG this works around a recently (aug 2023) introduced regression in nvc++
requires(!__one_of<_Sig, _Sigs...>)
__t(__debug_receiver<_CvrefSenderId, _Env, completion_signatures<_Sigs...>>&&) noexcept {
using _SenderId = __decay_t<_CvrefSenderId>;
using _Sender = stdexec::__t<_SenderId>;
Expand Down Expand Up @@ -1830,9 +1830,11 @@ namespace stdexec {
std::exception_ptr __eptr;
try {
if constexpr (same_as<__result_t, void>)
co_await (co_await (_Awaitable&&) __awaitable, __co_call(set_value, (_Receiver&&) __rcvr));
co_await (
co_await (_Awaitable&&) __awaitable, __co_call(set_value, (_Receiver&&) __rcvr));
else
co_await __co_call(set_value, (_Receiver&&) __rcvr, co_await (_Awaitable&&) __awaitable);
co_await __co_call(
set_value, (_Receiver&&) __rcvr, co_await (_Awaitable&&) __awaitable);
} catch (...) {
__eptr = std::current_exception();
}
Expand Down Expand Up @@ -2132,7 +2134,7 @@ namespace stdexec {
template <class _Sender, class _Promise>
concept __awaitable_sender =
sender_in<_Sender, env_of_t<_Promise&>> && //
__mvalid<__value_t, _Sender, _Promise> && //
__mvalid<__value_t, _Sender, _Promise> && //
sender_to<_Sender, __receiver_t<_Sender, _Promise>> && //
requires(_Promise& __promise) {
{ __promise.unhandled_stopped() } -> convertible_to<__coro::coroutine_handle<>>;
Expand Down Expand Up @@ -3741,7 +3743,6 @@ namespace stdexec {

using __receiver_ = stdexec::__t<__receiver<_CvrefSenderId, _EnvId>>;

void* const __token_{(void*) 0xDEADBEEF};
in_place_stop_source __stop_source_{};
__variant_t __data_;
std::atomic<void*> __head_{nullptr};
Expand All @@ -3768,6 +3769,7 @@ namespace stdexec {
};

template <class _CvrefSenderId, class _EnvId, class _ReceiverId>
requires sender_to<__cvref_t<_CvrefSenderId>, __t<__receiver<_CvrefSenderId, _EnvId>>>
struct __operation {
using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
using _Env = stdexec::__t<_EnvId>;
Expand Down Expand Up @@ -3822,7 +3824,6 @@ namespace stdexec {
friend void tag_invoke(start_t, __t& __self) noexcept {
stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>* __shared_state =
__self.__shared_state_.get();
STDEXEC_ASSERT(__shared_state->__token_ == (void*) 0xDEADBEEF);
std::atomic<void*>& __head = __shared_state->__head_;
void* const __completion_state = static_cast<void*>(__shared_state);
void* __old = __head.load(std::memory_order_acquire);
Expand Down Expand Up @@ -3860,98 +3861,121 @@ namespace stdexec {
};
};

template <class _CvrefSenderId, class _EnvId>
struct __sender {
using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
using _Env = stdexec::__t<_EnvId>;

template <class _Receiver>
using __operation =
stdexec::__t<__operation<_CvrefSenderId, _EnvId, stdexec::__id<_Receiver>>>;

struct __t {
using __id = __sender;
using is_sender = void;

explicit __t(_CvrefSender&& __sndr, _Env __env)
: __shared_state_{
std::make_shared<__sh_state_>(static_cast<_CvrefSender&&>(__sndr), (_Env&&) __env)} {
}

private:
using __sh_state_ = stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>;

template <class... _Tys>
using __set_value_t = completion_signatures<set_value_t(const __decay_t<_Tys>&...)>;

template <class _Ty>
using __set_error_t = completion_signatures<set_error_t(const __decay_t<_Ty>&)>;

template <class _Self>
using __completions_t = //
__try_make_completion_signatures<
// NOT TO SPEC:
// See https://github.com/brycelelbach/wg21_p2300_execution/issues/26
_CvrefSender,
__env_t<__mfront<_Env, _Self>>,
completion_signatures<
set_error_t(const std::exception_ptr&),
set_stopped_t()>, // NOT TO SPEC
__q<__set_value_t>,
__q<__set_error_t>>;
struct __split_t {
#if STDEXEC_FRIENDSHIP_IS_LEXICAL()
private:
template <class...>
friend struct stdexec::__basic_sender;
#endif

std::shared_ptr<__sh_state_> __shared_state_;
template <class... _Tys>
using __set_value_t = completion_signatures<set_value_t(const __decay_t<_Tys>&...)>;

template <__decays_to<__t> _Self, receiver_of<__completions_t<_Self>> _Receiver>
friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __recvr) //
noexcept(std::is_nothrow_move_constructible_v<_Receiver>) -> __operation<_Receiver> {
return __operation<_Receiver>{(_Receiver&&) __recvr, __self.__shared_state_};
}
template <class _Ty>
using __set_error_t = completion_signatures<set_error_t(const __decay_t<_Ty>&)>;

template <__decays_to<__t> _Self, class _OtherEnv>
friend auto tag_invoke(get_completion_signatures_t, _Self&&, _OtherEnv&&)
-> __completions_t<_Self>;
template <class _CvrefSenderId, class _EnvId>
using __completions_t = //
__try_make_completion_signatures<
// NOT TO SPEC:
// See https://github.com/brycelelbach/wg21_p2300_execution/issues/26
__cvref_t<_CvrefSenderId>,
__env_t<__t<_EnvId>>,
completion_signatures<
set_error_t(const std::exception_ptr&),
set_stopped_t()>, // NOT TO SPEC
__q<__set_value_t>,
__q<__set_error_t>>;

static inline constexpr auto __connect_fn = []<class _Receiver>(_Receiver& __rcvr) noexcept {
return [&]<class _ShState>(auto, std::shared_ptr<_ShState> __sh_state) //
noexcept(__nothrow_decay_copyable<_Receiver>)
-> __t<__mapply<__mbind_back_q<__operation, __id<_Receiver>>, __id<_ShState>>> {
return {(_Receiver&&) __rcvr, std::move(__sh_state)};
};
};
};

struct split_t;

// When looking for user-defined customizations of split, these
// are the signatures to test against, in order:
using _Sender = __0;
using _Env = __1;
using __cust_sigs = //
__types<
tag_invoke_t(split_t, __get_sender_domain_t(const _Sender&), _Sender),
tag_invoke_t(split_t, __get_sender_domain_t(const _Sender&), _Sender, _Env),
tag_invoke_t(split_t, __get_env_domain_t(_Env&, _Sender&), _Sender),
tag_invoke_t(split_t, __get_env_domain_t(_Env&, _Sender&), _Sender, _Env),
tag_invoke_t(split_t, _Sender),
tag_invoke_t(split_t, _Sender, _Env)>;

template <class _Sender, class _Env>
inline constexpr bool __is_split_customized = __minvocable<__which<__cust_sigs>, _Sender, _Env>;

template <class _Sender, class _Env>
using __sender_t = __t<__sender<stdexec::__cvref_id<_Sender>, stdexec::__id<__decay_t<_Env>>>>;
static inline constexpr auto __get_completion_signatures_fn =
[]<class _ShState>(auto, const std::shared_ptr<_ShState>&) //
-> __mapply<__q<__completions_t>, __id<_ShState>> {
return {};
};

template <class _Sender, class _Env>
using __dispatcher_for =
__make_dispatcher<__cust_sigs, __mconstructor_for<__sender_t>, _Sender, _Env>;
template <__lazy_sender_for<__split_t> _Self, class _Receiver>
static auto connect(_Self&& __self, _Receiver __rcvr) noexcept(
__nothrow_callable<
__sender_apply_fn,
_Self,
__call_result_t<__mtypeof<__connect_fn>, _Receiver&>>)
-> __call_result_t<
__sender_apply_fn,
_Self,
__call_result_t<__mtypeof<__connect_fn>, _Receiver&>> {
return __sender_apply((_Self&&) __self, __connect_fn(__rcvr));
}

template <__lazy_sender_for<__split_t> _Self, class _OtherEnv>
static auto get_completion_signatures(_Self&& __self, _OtherEnv&&)
-> __call_result_t<__sender_apply_fn, _Self, __mtypeof<__get_completion_signatures_fn>> {
return {};
}
};

struct split_t {
struct split_t : __split_t {
// For the sake of finding customizations, first build a dummy split
// sender and pass it to the domain's transform_sender. If transform_sender
// changes its type, then use the transformed sender. Otherwise, build
// the real split sender, which might consume the sender.
template <sender _Sender, class _Env = empty_env>
requires(sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>)
|| __is_split_customized<_Sender, _Env>
auto operator()(_Sender&& __sndr, _Env&& __env = _Env{}) const
noexcept(__nothrow_callable<__dispatcher_for<_Sender, _Env>, _Sender, _Env>)
-> __call_result_t<__dispatcher_for<_Sender, _Env>, _Sender, _Env> {
return __dispatcher_for<_Sender, _Env>{}((_Sender&&) __sndr, (_Env&&) __env);
requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
auto operator()(_Sender&& __sndr, _Env&& __env = _Env{}) const {
// The current domain is the first of:
// 1. The domain in the user-provided environment.
// 2. The domain of the scheduler in the user-provided environment.
// 3. The domain in the sender's environment.
// 4. The domain of the value completion scheduler in the sender's environment.
// 5. The default domain, wrapping the sender's value completion scheduler (if any).
// clang-format off
auto __domain =
query_or(get_domain, __env,
query_or(__compose(get_domain, get_scheduler), __env,
query_or(get_domain, get_env(__sndr),
query_or(__compose(get_domain, get_completion_scheduler<set_value_t>), get_env(__sndr),
__default_domain{query_or(get_completion_scheduler<set_value_t>, get_env(__sndr), __none_such())}))));
// clang-format on

using __split_sender_t = __result_of<__make_basic_sender, split_t, __, _Sender>;
using __tfx_sender_t = //
decltype(__domain.transform_sender(
__declval<__copy_cvref_t<_Sender, __split_sender_t>>(), __env));

// If transforming the sender changes the type, then use the transformed
// sender.
if constexpr (!same_as<__decay_t<__split_sender_t>, __decay_t<__tfx_sender_t>>) {
auto __split_sender = __make_basic_sender(*this, __(), (_Sender&&) __sndr);
return __domain.transform_sender(
const_cast<__copy_cvref_t<_Sender, __split_sender_t>&&>(__split_sender), __env);
} else {
using __sh_state_t = __t<__sh_state<__cvref_id<_Sender>, __id<__decay_t<_Env>>>>;
auto __sh_state = std::make_shared<__sh_state_t>((_Sender&&) __sndr, (_Env&&) __env);
return __make_basic_sender(__split_t(), std::move(__sh_state));
}
}

__binder_back<split_t> operator()() const {
return {{}, {}, {}};
}

using _Sender = __2;
using _Env = __0;
using __legacy_customizations_t = //
__types<
tag_invoke_t(split_t, __get_env_domain_t(_Env&, _Sender&), _Sender),
tag_invoke_t(split_t, __get_env_domain_t(_Env&, _Sender&), _Sender, _Env),
tag_invoke_t(split_t, __get_sender_domain_t(const _Sender&), _Sender),
tag_invoke_t(split_t, __get_sender_domain_t(const _Sender&), _Sender, _Env),
tag_invoke_t(split_t, _Sender),
tag_invoke_t(split_t, _Sender, _Env)>;
};
} // namespace __split

Expand Down

0 comments on commit 31abdb2

Please sign in to comment.