Skip to content

Commit

Permalink
Merge pull request #1177 from NVIDIA/fix-optional-env-split-ensure-st…
Browse files Browse the repository at this point in the history
…arted

fix the handling of optional environments passed to `split` and `ensure_started`
  • Loading branch information
ericniebler authored Dec 25, 2023
2 parents d21a430 + 487b720 commit ab2b772
Showing 1 changed file with 32 additions and 52 deletions.
84 changes: 32 additions & 52 deletions include/stdexec/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3072,12 +3072,6 @@ namespace stdexec {
}
};

template <class _Cvref, class... _Tys>
using __set_value_t = completion_signatures<set_value_t(__minvoke<_Cvref, _Tys>...)>;

template <class _Cvref, class _Ty>
using __set_error_t = completion_signatures<set_error_t(__minvoke<_Cvref, _Ty>)>;

template <class _Cvref, class _CvrefSenderId, class _EnvId>
using __completions_t = //
__try_make_completion_signatures<
Expand All @@ -3088,8 +3082,8 @@ namespace stdexec {
completion_signatures<
set_error_t(__minvoke<_Cvref, std::exception_ptr>),
set_stopped_t()>, // NOT TO SPEC
__mbind_front_q<__set_value_t, _Cvref>,
__mbind_front_q<__set_error_t, _Cvref>>;
__transform<_Cvref, __mcompose<__q<completion_signatures>, __qf<set_value_t>>>,
__transform<_Cvref, __mcompose<__q<completion_signatures>, __qf<set_error_t>>>>;

template <class _Ty>
using __clref_t = const __decay_t<_Ty>&;
Expand Down Expand Up @@ -3196,19 +3190,12 @@ namespace stdexec {
struct __split_t { };

struct split_t {
template <sender _Sender>
requires sender_in<_Sender, empty_env> && __decay_copyable<env_of_t<_Sender>>
auto operator()(_Sender&& __sndr) const {
auto __domain = __get_early_domain(__sndr);
return stdexec::transform_sender(__domain, __make_sexpr<split_t>(__(), (_Sender&&) __sndr));
}

template <sender _Sender, class _Env>
template <sender _Sender, class _Env = empty_env>
requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
auto operator()(_Sender&& __sndr, _Env&& __env) const {
auto operator()(_Sender&& __sndr, _Env&& __env = {}) const {
auto __domain = __get_late_domain(__sndr, __env);
return stdexec::transform_sender(
__domain, __make_sexpr<split_t>(__(), (_Sender&&) __sndr), (_Env&&) __env);
__domain, __make_sexpr<split_t>((_Env&&) __env, (_Sender&&) __sndr));
}

STDEXEC_ATTRIBUTE((always_inline)) //
Expand All @@ -3228,13 +3215,16 @@ namespace stdexec {
template <class _CvrefSender, class _Env>
using __receiver_t = __t<__meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>>>;

template <class _Sender, class _Env = empty_env>
requires sender_to<__child_of<_Sender>, __receiver_t<__child_of<_Sender>, _Env>>
static auto transform_sender(_Sender&& __sndr, _Env __env = {}) {
template <class _Sender>
requires sender_to<
__child_of<_Sender>,
__receiver_t<__child_of<_Sender>, __decay_t<__data_of<_Sender>>>>
static auto transform_sender(_Sender&& __sndr) {
return __sexpr_apply(
(_Sender&&) __sndr, [&]<class _Child>(__ignore, __ignore, _Child&& __child) {
auto __state = __make_intrusive<__sh_state<_Child, _Env>>(
(_Child&&) __child, std::move(__env));
(_Sender&&) __sndr,
[&]<class _Env, class _Child>(__ignore, _Env&& __env, _Child&& __child) {
auto __state = __make_intrusive<__sh_state<_Child, __decay_t<_Env>>>(
(_Child&&) __child, (_Env&&) __env);
return __make_sexpr<__split_t>(__data{std::move(__state)});
});
}
Expand All @@ -3256,12 +3246,14 @@ namespace stdexec {
// ensure_started() is called.
template <class _ShState>
struct __data {
explicit __data(__intrusive_ptr<_ShState> __sh_state) noexcept
: __sh_state(std::move(__sh_state)) {
explicit __data(__intrusive_ptr<_ShState> __ptr) noexcept
: __sh_state(std::move(__ptr)) {
// Eagerly launch the async operation.
__sh_state->__start_op();
}

__data(__data&&) = default;
__data& operator=(__data&&) = default;
__data(__data&&) noexcept = default;
__data& operator=(__data&&) noexcept = default;

~__data() {
if (__sh_state != nullptr) {
Expand All @@ -3277,28 +3269,15 @@ namespace stdexec {
struct __ensure_started_t { };

struct ensure_started_t {
template <sender _Sender>
requires sender_in<_Sender, empty_env> && __decay_copyable<env_of_t<_Sender>>
auto operator()(_Sender&& __sndr) const {
if constexpr (sender_expr_for<_Sender, __ensure_started_t>) {
return (_Sender&&) __sndr;
} else {
auto __domain = __get_early_domain(__sndr);
return stdexec::transform_sender(
__domain, __make_sexpr<ensure_started_t>(__(), (_Sender&&) __sndr));
}
STDEXEC_UNREACHABLE();
}

template <sender _Sender, class _Env>
template <sender _Sender, class _Env = empty_env>
requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
auto operator()(_Sender&& __sndr, _Env&& __env) const {
auto operator()(_Sender&& __sndr, _Env&& __env = {}) const {
if constexpr (sender_expr_for<_Sender, __ensure_started_t>) {
return (_Sender&&) __sndr;
} else {
auto __domain = __get_late_domain(__sndr, __env);
return stdexec::transform_sender(
__domain, __make_sexpr<ensure_started_t>(__(), (_Sender&&) __sndr), (_Env&&) __env);
__domain, __make_sexpr<ensure_started_t>((_Env&&) __env, (_Sender&&) __sndr));
}
STDEXEC_UNREACHABLE();
}
Expand All @@ -3320,15 +3299,16 @@ namespace stdexec {
template <class _CvrefSender, class _Env>
using __receiver_t = __t<__meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>>>;

template <class _Sender, class _Env = empty_env>
requires sender_to<__child_of<_Sender>, __receiver_t<__child_of<_Sender>, _Env>>
static auto transform_sender(_Sender&& __sndr, _Env __env = {}) {
template <class _Sender>
requires sender_to<
__child_of<_Sender>,
__receiver_t<__child_of<_Sender>, __decay_t<__data_of<_Sender>>>>
static auto transform_sender(_Sender&& __sndr) {
return __sexpr_apply(
(_Sender&&) __sndr, [&]<class _Child>(__ignore, __ignore, _Child&& __child) {
auto __state = __make_intrusive<__sh_state<_Child, _Env>>(
(_Child&&) __child, std::move(__env));
// Eagerly launch the async operation.
__state->__start_op();
(_Sender&&) __sndr,
[&]<class _Env, class _Child>(__ignore, _Env&& __env, _Child&& __child) {
auto __state = __make_intrusive<__sh_state<_Child, __decay_t<_Env>>>(
(_Child&&) __child, (_Env&&) __env);
return __make_sexpr<__ensure_started_t>(__data{std::move(__state)});
});
}
Expand Down

0 comments on commit ab2b772

Please sign in to comment.