Skip to content

Commit

Permalink
port schedule_from to __sexpr
Browse files Browse the repository at this point in the history
  • Loading branch information
ericniebler committed Dec 13, 2023
1 parent 6fb5039 commit 2cc5147
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 216 deletions.
14 changes: 1 addition & 13 deletions include/stdexec/__detail/__basic_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,7 @@ namespace stdexec {
, __state_(__sexpr_impl<__tag_t>::get_state((_Sexpr&&) __sndr, __rcvr_)) {
}

_Receiver& __rcvr() noexcept {
return __rcvr_;
}

const _Receiver& __rcvr() const noexcept {
_Receiver& __rcvr() & noexcept {
return __rcvr_;
}
};
Expand Down Expand Up @@ -302,14 +298,6 @@ namespace stdexec {
__op_base_t* __base = (__op_base_t*) ((char*) __derived - __offset);
return __base->__rcvr();
}

decltype(auto) __receiver() const noexcept {
using __derived_t = decltype(__op_base_t::__state_);
const __derived_t* __derived = static_cast<const __derived_t*>(this);
constexpr std::size_t __offset = offsetof(__op_base_t, __state_);
const __op_base_t* __base = (const __op_base_t*) ((const char*) __derived - __offset);
return __base->__rcvr();
}
};

STDEXEC_PRAGMA_POP()
Expand Down
301 changes: 98 additions & 203 deletions include/stdexec/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4151,11 +4151,11 @@ namespace stdexec {
using __make_bind = __mbind_front_q<__make_bind_, _State>;

template <class _Tag>
using __tuple_t = __mbind_front_q<__decayed_tuple, _Tag>;
using __async_results_t = __mbind_front_q<__decayed_tuple, _Tag>;

template <class _Sender, class _Env, class _State, class _Tag>
using __bind_completions_t =
__gather_completions_for<_Tag, _Sender, _Env, __tuple_t<_Tag>, __make_bind<_State>>;
__gather_completions_for<_Tag, _Sender, _Env, __async_results_t<_Tag>, __make_bind<_State>>;

template <class _Sender, class _Env>
using __variant_for_t = //
Expand All @@ -4165,179 +4165,13 @@ namespace stdexec {
set_error_t,
set_stopped_t>>;

template <class _SchedulerId, class _VariantId, class _ReceiverId>
struct __operation1_base;

// This receiver is to be completed on the execution context
// associated with the scheduler. When the source sender
// completes, the completion information is saved off in the
// operation state so that when this receiver completes, it can
// read the completion out of the operation state and forward it
// to the output receiver after transitioning to the scheduler's
// context.
template <class _SchedulerId, class _VariantId, class _ReceiverId>
struct __receiver2 {
using _Receiver = stdexec::__t<_ReceiverId>;

struct __t {
using receiver_concept = receiver_t;
using __id = __receiver2;
__operation1_base<_SchedulerId, _VariantId, _ReceiverId>* __op_state_;

// If the work is successfully scheduled on the new execution
// context and is ready to run, forward the completion signal in
// the operation state
template <same_as<set_value_t> _Tag>
friend void tag_invoke(_Tag, __t&& __self) noexcept {
__self.__op_state_->__complete();
}

template <__one_of<set_error_t, set_stopped_t> _Tag, class... _As>
requires __callable<_Tag, _Receiver, _As...>
friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept {
_Tag{}((_Receiver&&) __self.__op_state_->__rcvr_, (_As&&) __as...);
}

friend auto tag_invoke(get_env_t, const __t& __self) noexcept -> env_of_t<_Receiver> {
return get_env(__self.__op_state_->__rcvr_);
}
};
};

// This receiver is connected to the input sender. When that
// sender completes (on whatever context it completes on), save
// the completion information into the operation state. Then,
// schedule a second operation to __complete on the execution
// context of the scheduler. That second receiver will read the
// completion information out of the operation state and propagate
// it to the output receiver from within the desired context.
template <class _SchedulerId, class _VariantId, class _ReceiverId>
struct __receiver1 {
using _Scheduler = stdexec::__t<_SchedulerId>;
using _Receiver = stdexec::__t<_ReceiverId>;
using __receiver2_t = stdexec::__t<__receiver2<_SchedulerId, _VariantId, _ReceiverId>>;

struct __t {
using __id = __receiver1;
using receiver_concept = receiver_t;
__operation1_base<_SchedulerId, _VariantId, _ReceiverId>* __op_state_;

template <class... _Args>
static constexpr bool __nothrow_complete_ = (__nothrow_decay_copyable<_Args> && ...);

template <class _Tag, class... _Args>
static void __complete_(_Tag, __t&& __self, _Args&&... __args) //
noexcept(__nothrow_complete_<_Args...>) {
// Write the tag and the args into the operation state so that
// we can forward the completion from within the scheduler's
// execution context.
__self.__op_state_->__data_.template emplace<__decayed_tuple<_Tag, _Args...>>(
_Tag{}, (_Args&&) __args...);
// Enqueue the schedule operation so the completion happens
// on the scheduler's execution context.
start(__self.__op_state_->__state2_);
}

template <__completion_tag _Tag, class... _Args>
requires __callable<_Tag, _Receiver, __decay_t<_Args>...>
friend void tag_invoke(_Tag __tag, __t&& __self, _Args&&... __args) noexcept {
__try_call(
(_Receiver&&) __self.__op_state_->__rcvr_,
__function_constant_v<__complete_<_Tag, _Args...>>,
(_Tag&&) __tag,
(__t&&) __self,
(_Args&&) __args...);
}

friend auto tag_invoke(get_env_t, const __t& __self) noexcept -> env_of_t<_Receiver> {
return get_env(__self.__op_state_->__rcvr_);
}
};
};

template <class _SchedulerId, class _VariantId, class _ReceiverId>
struct __operation1_base : __immovable {
using _Scheduler = stdexec::__t<_SchedulerId>;
using _Receiver = stdexec::__t<_ReceiverId>;
using _Variant = stdexec::__t<_VariantId>;
using __receiver2_t = stdexec::__t<__receiver2<_SchedulerId, _VariantId, _ReceiverId>>;

_Scheduler __sched_;
_Receiver __rcvr_;
_Variant __data_;
connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;

__operation1_base(_Scheduler __sched, _Receiver&& __rcvr)
: __sched_((_Scheduler&&) __sched)
, __rcvr_((_Receiver&&) __rcvr)
, __state2_(connect(schedule(__sched_), __receiver2_t{this})) {
}

void __complete() noexcept {
STDEXEC_ASSERT(!__data_.valueless_by_exception());
std::visit(
[this]<class _Tup>(_Tup& __tupl) -> void {
if constexpr (same_as<_Tup, std::monostate>) {
std::terminate(); // reaching this indicates a bug in schedule_from
} else {
__apply(
[&]<class... _Args>(auto __tag, _Args&... __args) -> void {
__tag((_Receiver&&) __rcvr_, (_Args&&) __args...);
},
__tupl);
}
},
__data_);
}
};

template <class _SchedulerId, class _CvrefSenderId, class _ReceiverId>
struct __operation1 {
using _Scheduler = stdexec::__t<_SchedulerId>;
using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
using _Receiver = stdexec::__t<_ReceiverId>;
using __variant_t = __variant_for_t<_CvrefSender, env_of_t<_Receiver>>;
using __receiver1_t =
stdexec::__t<__receiver1<_SchedulerId, stdexec::__id<__variant_t>, _ReceiverId>>;
using __base_t = __operation1_base<_SchedulerId, stdexec::__id<__variant_t>, _ReceiverId>;

struct __t : __base_t {
using __id = __operation1;
connect_result_t<_CvrefSender, __receiver1_t> __state1_;

__t(_Scheduler __sched, _CvrefSender&& __sndr, _Receiver&& __rcvr)
: __base_t{(_Scheduler&&) __sched, (_Receiver&&) __rcvr}
, __state1_(connect((_CvrefSender&&) __sndr, __receiver1_t{this})) {
}

friend void tag_invoke(start_t, __t& __op_state) noexcept {
start(__op_state.__state1_);
}
};
};

template <class _Tp>
using __decay_rvalue_ref = __decay_t<_Tp>&&;

template <class _Tag>
using __decay_signature =
__transform<__q<__decay_rvalue_ref>, __mcompose<__q<completion_signatures>, __qf<_Tag>>>;

template <class _SchedulerId>
struct __environ {
struct __t
: __env::__prop<stdexec::__t<_SchedulerId>(
get_completion_scheduler_t<set_value_t>,
get_completion_scheduler_t<set_stopped_t>)> {
using __id = __environ;

template <same_as<get_domain_t> _Key>
friend auto tag_invoke(_Key, const __t& __self) noexcept {
return query_or(get_domain, __self.__value_, default_domain());
}
};
};

template <class... _Ts>
using __all_nothrow_decay_copyable = __mbool<(__nothrow_decay_copyable<_Ts> && ...)>;

Expand Down Expand Up @@ -4367,6 +4201,82 @@ namespace stdexec {
__decay_signature<set_value_t>,
__decay_signature<set_error_t>>;

template <class _SchedulerId>
struct __environ {
struct __t
: __env::__prop<stdexec::__t<_SchedulerId>(
get_completion_scheduler_t<set_value_t>,
get_completion_scheduler_t<set_stopped_t>)> {
using __id = __environ;

template <same_as<get_domain_t> _Key>
friend auto tag_invoke(_Key, const __t& __self) noexcept {
return query_or(get_domain, __self.__value_, default_domain());
}
};
};

template <class _Scheduler, class _Sexpr, class _Receiver>
struct __state;

// This receiver is to be completed on the execution context
// associated with the scheduler. When the source sender
// completes, the completion information is saved off in the
// operation state so that when this receiver completes, it can
// read the completion out of the operation state and forward it
// to the output receiver after transitioning to the scheduler's
// context.
template <class _Scheduler, class _Sexpr, class _Receiver>
struct __receiver2 : receiver_adaptor<__receiver2<_Scheduler, _Sexpr, _Receiver>> {
explicit __receiver2(__state<_Scheduler, _Sexpr, _Receiver>* __state) noexcept
: __state_{__state} {
}

_Receiver&& base() && noexcept {
return std::move(__state_->__receiver());
}

const _Receiver& base() const & noexcept {
return __state_->__receiver();
}

void set_value() && noexcept {
STDEXEC_ASSERT(!__state_->__data_.valueless_by_exception());
std::visit(
[__state = __state_]<class _Tup>(_Tup& __tupl) noexcept -> void {
if constexpr (same_as<_Tup, std::monostate>) {
std::terminate(); // reaching this indicates a bug in schedule_from
} else {
__apply(
[&]<class... _Args>(auto __tag, _Args&... __args) -> void {
__tag(std::move(__state->__receiver()), (_Args&&) __args...);
},
__tupl);
}
},
__state_->__data_);
}

__state<_Scheduler, _Sexpr, _Receiver>* __state_;
};

template <class _Scheduler, class _Sexpr, class _Receiver>
struct __state : __enable_receiver_from_this<_Sexpr, _Receiver> {
using __variant_t = __variant_for_t<__child_of<_Sexpr>, env_of_t<_Receiver>>;
using __receiver2_t = __receiver2<_Scheduler, _Sexpr, _Receiver>;

__variant_t __data_;
connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;

explicit __state(_Scheduler __sched)
: __data_()
, __state2_(connect(schedule(__sched), __receiver2_t{this})) {
}

void __complete() noexcept {
}
};

struct schedule_from_t {
template <scheduler _Scheduler, sender _Sender>
auto operator()(_Scheduler&& __sched, _Sender&& __sndr) const {
Expand All @@ -4384,29 +4294,9 @@ namespace stdexec {
};

struct __schedule_from_impl : __sexpr_defaults {
template <class, class _Data, class _Child>
using __env_ = __env::__env_join_t<_Data, env_of_t<_Child>>;

template <class _Sender>
using __env_t = __mapply<__q<__env_>, _Sender>;

template <class _Sender>
using __scheduler_t =
__decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>, __env_t<_Sender>>>;

template <class _Sender, class _Receiver>
using __receiver_t = //
stdexec::__t< __receiver1<
stdexec::__id<__scheduler_t<_Sender>>,
stdexec::__id<__variant_for_t<__child_of<_Sender>, env_of_t<_Receiver>>>,
stdexec::__id<_Receiver>>>;

template <class _Sender, class _Receiver>
using __operation_t = //
stdexec::__t< __operation1< //
stdexec::__id<__scheduler_t<_Sender>>,
stdexec::__cvref_id<__child_of<_Sender>>,
stdexec::__id<_Receiver>>>;
__decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>, env_of_t<_Sender>>>;

static constexpr auto get_attrs = //
[]<class _Data, class _Child>(const _Data& __data, const _Child& __child) noexcept {
Expand All @@ -4420,20 +4310,25 @@ namespace stdexec {
return {};
};

static constexpr auto connect = //
[]<class _Sender, receiver _Receiver>(_Sender && __sndr, _Receiver __rcvr) //
-> __operation_t<_Sender, _Receiver> //
requires sender_to<__child_of<_Sender>, __receiver_t<_Sender, _Receiver>>
{
static_assert(sender_expr_for<_Sender, schedule_from_t>);
return __sexpr_apply(
(_Sender&&) __sndr,
[&]<class _Data, class _Child>(
__ignore, _Data&& __data, _Child&& __child) -> __operation_t<_Sender, _Receiver> {
auto __sched = get_completion_scheduler<set_value_t>(__data);
return {__sched, (_Child&&) __child, (_Receiver&&) __rcvr};
});
};
static constexpr auto get_state =
[]<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver& __rcvr) {
static_assert(sender_expr_for<_Sender, schedule_from_t>);
auto __sched = get_completion_scheduler<set_value_t>(stdexec::get_env(__sndr));
using _Scheduler = decltype(__sched);
return __state<_Scheduler, _Sender, _Receiver>{__sched};
};

static constexpr auto complete =
[]<class _Tag, class... _Args>(__ignore, auto& __state, auto& __rcvr, _Tag, _Args&&... __args) noexcept -> void {
// Write the tag and the args into the operation state so that
// we can forward the completion from within the scheduler's
// execution context.
using __async_result = __decayed_tuple<_Tag, _Args...>;
__state.__data_.template emplace<__async_result>(_Tag(), (_Args&&) __args...);
// Enqueue the schedule operation so the completion happens
// on the scheduler's execution context.
stdexec::start(__state.__state2_);
};
};
} // namespace __schedule_from

Expand Down

0 comments on commit 2cc5147

Please sign in to comment.