diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index 6bbc42ade..477f96884 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -72,6 +72,11 @@ namespace exec { } #endif + template + struct not_a_sender { + using is_sender = void; + }; + struct task_base { task_base* next; void (*__execute)(task_base*, std::uint32_t tid) noexcept; @@ -215,21 +220,44 @@ namespace exec { }; #endif + public: struct domain { // For eager customization template Sender> auto transform_sender(Sender&& sndr) const noexcept { - auto sched = stdexec::get_completion_scheduler( - stdexec::get_env(sndr)); - return stdexec::__sexpr_apply((Sender&&) sndr, transform_bulk{*sched.pool_}); + if constexpr (stdexec::__completes_on) { + auto sched = stdexec::get_completion_scheduler( + stdexec::get_env(sndr)); + return stdexec::__sexpr_apply((Sender&&) sndr, transform_bulk{*sched.pool_}); + } else { + static_assert( + stdexec::__completes_on, + "No static_thread_pool instance can be found in the sender's environment " + "on which to schedule bulk work."); + return not_a_sender>(); + } + STDEXEC_UNREACHABLE(); } // transform the generic bulk sender into a parallel thread-pool bulk sender template Sender, class Env> - requires stdexec::__callable auto transform_sender(Sender&& sndr, const Env& env) const noexcept { - auto sched = stdexec::get_scheduler(env); - return stdexec::__sexpr_apply((Sender&&) sndr, transform_bulk{*sched.pool_}); + if constexpr (stdexec::__completes_on) { + auto sched = stdexec::get_completion_scheduler( + stdexec::get_env(sndr)); + return stdexec::__sexpr_apply((Sender&&) sndr, transform_bulk{*sched.pool_}); + } else if constexpr (stdexec::__starts_on) { + auto sched = stdexec::get_scheduler(env); + return stdexec::__sexpr_apply((Sender&&) sndr, transform_bulk{*sched.pool_}); + } else { + static_assert( // + stdexec::__starts_on + || stdexec::__completes_on, + "No static_thread_pool instance can be found in the sender's or receiver's " + "environment on which to schedule bulk work."); + return not_a_sender>(); + } + STDEXEC_UNREACHABLE(); } #if STDEXEC_HAS_STD_RANGES() diff --git a/include/stdexec/__detail/__config.hpp b/include/stdexec/__detail/__config.hpp index 19804f44d..a0a73b048 100644 --- a/include/stdexec/__detail/__config.hpp +++ b/include/stdexec/__detail/__config.hpp @@ -176,7 +176,7 @@ #define STDEXEC_ATTR_WHICH_4(_ATTR) __forceinline #elif STDEXEC_CLANG() #define STDEXEC_ATTR_WHICH_4(_ATTR) \ - __attribute__((__always_inline__, __artificial__, __nodebug__)) inline + [[gnu::__always_inline__, gnu::__artificial__, gnu::__nodebug__]] inline #elif defined(__GNUC__) #define STDEXEC_ATTR_WHICH_4(_ATTR) __attribute__((__always_inline__, __artificial__)) inline #else diff --git a/include/stdexec/__detail/__execution_fwd.hpp b/include/stdexec/__detail/__execution_fwd.hpp index 27dd5c1aa..718d81bdd 100644 --- a/include/stdexec/__detail/__execution_fwd.hpp +++ b/include/stdexec/__detail/__execution_fwd.hpp @@ -175,6 +175,14 @@ namespace stdexec { using __transfer_just::transfer_just_t; extern const transfer_just_t transfer_just; + ////////////////////////////////////////////////////////////////////////////////////////////////// + namespace __bulk { + struct bulk_t; + } + + using __bulk::bulk_t; + extern const bulk_t bulk; + ////////////////////////////////////////////////////////////////////////////////////////////////// namespace __on_v2 { struct on_t; diff --git a/include/stdexec/concepts.hpp b/include/stdexec/concepts.hpp index bff3d879b..1c6b9d732 100644 --- a/include/stdexec/concepts.hpp +++ b/include/stdexec/concepts.hpp @@ -234,6 +234,9 @@ namespace stdexec { template concept __nothrow_decay_copyable = __nothrow_constructible_from<__decay_t<_Ty>, _Ty>; + template + concept __decays_to_derived_from = derived_from<__decay_t<_Ty>, _Up>; + } // namespace stdexec // #if !STDEXEC_HAS_STD_CONCEPTS_HEADER() diff --git a/include/stdexec/execution.hpp b/include/stdexec/execution.hpp index ea9cbf1e9..034eac161 100644 --- a/include/stdexec/execution.hpp +++ b/include/stdexec/execution.hpp @@ -60,6 +60,9 @@ namespace stdexec { static inline constexpr Tag (*signature)(Sig) = nullptr; }; + struct default_domain; + struct dependent_domain; + namespace __domain { template using __legacy_c11n_for = typename _Tag::__legacy_customizations_t; @@ -82,22 +85,37 @@ namespace stdexec { }; template - concept __has_transform_member = + concept __has_transform_sender = requires(_DomainOrTag __tag, _Sender&& __sender, const _Env&... __env) { __tag.transform_sender((_Sender&&) __sender, __env...); }; template - concept __has_default_transform = // + concept __has_default_transform_sender = // sender_expr<_Sender> // - && __has_transform_member<__tag_of<_Sender>, _Sender, _Env...>; + && __has_transform_sender<__tag_of<_Sender>, _Sender, _Env...>; + + template + concept __has_transform_env = + requires(_Type __obj, _Sender&& __sender, _Env&& __env) { + __obj.transform_env((_Sender&&) __sender, (_Env&&) __env); + }; + + template + concept __has_default_transform_env = // + sender_expr<_Sender> // + && __has_transform_env<__tag_of<_Sender>, _Sender, _Env>; template - concept __has_apply_member = requires(_DomainOrTag __tag, _Args&&... __args) { + concept __has_apply_sender = requires(_DomainOrTag __tag, _Args&&... __args) { __tag.apply_sender((_Args&&) __args...); }; } // namespace __domain + namespace __write_ { + struct __write_t; + } + struct default_domain { default_domain() = default; @@ -108,7 +126,7 @@ namespace stdexec { // Look for a legacy customization for the given tag, and if found, apply it. if constexpr (__callable<__sexpr_apply_t, _Sender, __domain::__legacy_customization>) { return stdexec::__sexpr_apply((_Sender&&) __sndr, __domain::__legacy_customization()); - } else if constexpr (__domain::__has_default_transform<_Sender>) { + } else if constexpr (__domain::__has_default_transform_sender<_Sender>) { return __tag_of<_Sender>().transform_sender((_Sender&&) __sndr); } else { return static_cast<_Sender>((_Sender&&) __sndr); @@ -120,7 +138,7 @@ namespace stdexec { template STDEXEC_ATTRIBUTE((always_inline)) decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env) const { - if constexpr (__domain::__has_default_transform<_Sender, _Env>) { + if constexpr (__domain::__has_default_transform_sender<_Sender, _Env>) { return __tag_of<_Sender>().transform_sender((_Sender&&) __sndr, __env); } else { return static_cast<_Sender>((_Sender&&) __sndr); @@ -130,7 +148,7 @@ namespace stdexec { template requires __domain::__has_legacy_c11n<_Tag, _Sender, _Args...> - || __domain::__has_apply_member<_Tag, _Sender, _Args...> + || __domain::__has_apply_sender<_Tag, _Sender, _Args...> STDEXEC_ATTRIBUTE((always_inline)) decltype(auto) apply_sender(_Tag, _Sender&& __sndr, _Args&&... __args) const { // Look for a legacy customization for the given tag, and if found, apply it. @@ -142,6 +160,15 @@ namespace stdexec { } STDEXEC_UNREACHABLE(); } + + template + decltype(auto) transform_env(_Sender&& __sndr, _Env&& __env) const noexcept { + if constexpr (__domain::__has_default_transform_env<_Sender, _Env>) { + return __tag_of<_Sender>().transform_env((_Sender&&) __sndr, (_Env&&) __env); + } else { + return static_cast<_Env>((_Env&&) __env); + } + } }; ////////////////////////////////////////////////////////////////////////////////////////////////// @@ -593,6 +620,20 @@ namespace stdexec { { get_env(std::as_const(__ep)) } -> queryable; }; + ///////////////////////////////////////////////////////////////////////////// + template + concept __completes_on = + __decays_to< + __call_result_t, env_of_t<_Sender>>, + _Scheduler>; + + ///////////////////////////////////////////////////////////////////////////// + template + concept __starts_on = + __decays_to< + __call_result_t, + _Scheduler>; + ///////////////////////////////////////////////////////////////////////////// inline constexpr struct __get_env_domain_t { template @@ -621,6 +662,52 @@ namespace stdexec { template using __sender_domain_of_t = __call_result_t<__get_sender_domain_t, _Sender, _Tag, _Default>; + namespace __domain { + struct __common_domain_fn { + static default_domain __common_domain() noexcept { + return {}; + } + + template + requires __all_of<_Domain, _OtherDomains...> + static _Domain __common_domain(_Domain __domain, _OtherDomains...) noexcept { + return (_Domain&&) __domain; + } + + template + static auto __common_domain(_Domains...) noexcept // + -> __if_c<__one_of, dependent_domain, __none_such> { + return {}; + } + + auto operator()(__ignore, __ignore, const auto&... __sndrs) const noexcept { + return __common_domain(__get_sender_domain(__sndrs)...); + } + }; + + template + using __common_domain_t = // + __call_result_t<__common_domain_fn, int, int, _Senders...>; + + template + concept __has_common_domain = // + __none_of<__none_such, __common_domain_t<_Senders...>>; + + template + struct __get_env_common_domain { + template _Self> + static auto get_env(const _Self& __self) noexcept { + using _Domain = __call_result_t<__sexpr_apply_t, const _Self&, __common_domain_fn>; + if constexpr (same_as<_Domain, default_domain>) { + return empty_env(); + } else { + return __mkprop(__sexpr_apply(__self, __common_domain_fn()), get_domain); + } + STDEXEC_UNREACHABLE(); + } + }; + } // namespace __domain + ///////////////////////////////////////////////////////////////////////////// // [execution.receivers] namespace __receivers { @@ -1109,33 +1196,133 @@ namespace stdexec { using __debug::__debug_sender; ///////////////////////////////////////////////////////////////////////////// - // [execution.transform_sender] - inline constexpr struct transform_sender_t { - template + // dependent_domain + struct dependent_domain { + template + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> STDEXEC_ATTRIBUTE((always_inline)) - /*constexpr*/ decltype(auto) - operator()(_Domain __dom, _Sender&& __sndr, const _Env&... __env) const { - if constexpr (__domain::__has_transform_member<_Domain, _Sender, const _Env&...>) { - return __dom.transform_sender((_Sender&&) __sndr, __env...); - } else { - return default_domain().transform_sender((_Sender&&) __sndr, __env...); + decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env) const; + }; + + ///////////////////////////////////////////////////////////////////////////// + // [execution.transform_sender] + namespace __domain { + struct __transform_sender { + template + STDEXEC_ATTRIBUTE((always_inline)) + /*constexpr*/ decltype(auto) + operator()(_Domain __dom, _Sender&& __sndr) const { + if constexpr (__domain::__has_transform_sender<_Domain, _Sender>) { + return __dom.transform_sender((_Sender&&) __sndr); + } else { + return default_domain().transform_sender((_Sender&&) __sndr); + } + } + + template + STDEXEC_ATTRIBUTE((always_inline)) + /*constexpr*/ decltype(auto) + operator()(_Domain __dom, _Sender&& __sndr, const _Env& __env) const { + if constexpr (__domain::__has_transform_sender<_Domain, _Sender, _Env>) { + return __dom.transform_sender((_Sender&&) __sndr, __env); + } else { + return default_domain().transform_sender((_Sender&&) __sndr, __env); + } + } + }; + + struct __transform_env { + template + STDEXEC_ATTRIBUTE((always_inline)) + /*constexpr*/ decltype(auto) + operator()(_Domain __dom, _Sender&& __sndr, _Env&& __env) const noexcept { + if constexpr (__domain::__has_transform_env<_Domain, _Sender, _Env>) { + return __dom.transform_env((_Sender&&) __sndr, (_Env&&) __env); + } else { + return default_domain().transform_env((_Sender&&) __sndr, (_Env&&) __env); + } } + }; + } // namespace __domain + + ///////////////////////////////////////////////////////////////////////////// + // [execution.transform_sender] + inline constexpr struct transform_sender_t : __domain::__transform_sender { + using __domain::__transform_sender::operator(); + + // If we are doing a lazy customization of a type whose domain is value-dependent (e.g., + // let_value), first transform the sender to determine the domain. Then continue transforming + // the sender with the requested domain. + template + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> + /*constexpr*/ decltype(auto) + operator()(_Domain __dom, _Sender&& __sndr, const _Env& __env) const { + static_assert(__none_of<_Domain, dependent_domain>); + return (*this)(__dom, dependent_domain().transform_sender((_Sender&&) __sndr, __env), __env); } } transform_sender{}; template using transform_sender_result_t = __call_result_t; + inline constexpr __domain::__transform_env transform_env{}; + + struct _CHILD_SENDERS_WITH_DIFFERENT_DOMAINS_ {}; + + template + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> + decltype(auto) dependent_domain::transform_sender(_Sender&& __sndr, const _Env& __env) const { + // apply any algorithm-specific transformation to the environment + const auto& __env2 = transform_env(*this, (_Sender&&) __sndr, __env); + + // recursively transform the sender to determine the domain + return __sexpr_apply( + (_Sender&&) __sndr, + [&](_Tag, _Data&& __data, _Childs&&... __childs) { + // TODO: propagate meta-exceptions here: + auto __sndr2 = __make_sexpr<_Tag>( + (_Data&&) __data, + __domain::__transform_sender()(*this, (_Childs&&) __childs, __env2)...); + using _Sender2 = decltype(__sndr2); + + auto __domain2 = __sexpr_apply(__sndr2, __domain::__common_domain_fn()); + using _Domain2 = decltype(__domain2); + //print<_Domain2>(); + + if constexpr (same_as<_Domain2, __none_such>) { + return __mexception<_CHILD_SENDERS_WITH_DIFFERENT_DOMAINS_, _WITH_SENDER_<_Sender2>>(); + } else { + return __domain::__transform_sender()(__domain2, std::move(__sndr2), __env); + } + STDEXEC_UNREACHABLE(); + }); + } + + // A helper for use when building sender trees where each node must be transformed. + template + auto __make_transformer(_Domain, const _Env& __env) { + return [&](_Tag) { + return [&](_Args&&... __args) -> decltype(auto) { + return stdexec::transform_sender(_Domain(), _Tag()((_Args&&) __args...), __env); + }; + }; + } + + ///////////////////////////////////////////////////////////////////////////// + template + concept __has_implementation_for = + __domain::__has_apply_sender<_Domain, _Tag, _Sender, _Args...> + || __domain::__has_apply_sender; + ///////////////////////////////////////////////////////////////////////////// // [execution.apply_sender] inline constexpr struct apply_sender_t { template - requires __domain::__has_apply_member<_Domain, _Tag, _Sender, _Args...> - || __domain::__has_apply_member + requires __has_implementation_for<_Tag, _Domain, _Sender, _Args...> STDEXEC_ATTRIBUTE((always_inline)) /*constexpr*/ decltype(auto) operator()(_Domain __dom, _Tag, _Sender&& __sndr, _Args&&... __args) const { - if constexpr (__domain::__has_apply_member<_Domain, _Tag, _Sender, _Args...>) { + if constexpr (__domain::__has_apply_sender<_Domain, _Tag, _Sender, _Args...>) { return __dom.apply_sender(_Tag(), (_Sender&&) __sndr, (_Args&&) __args...); } else { return default_domain().apply_sender(_Tag(), (_Sender&&) __sndr, (_Args&&) __args...); @@ -3098,7 +3285,7 @@ namespace stdexec { struct then_t : __with_default_get_env { template auto operator()(_Sender&& __sndr, _Fun __fun) const { - auto __domain = __get_sender_domain((_Sender&&) __sndr); + auto __domain = __get_sender_domain(__sndr); return stdexec::transform_sender( __domain, __make_sexpr((_Fun&&) __fun, (_Sender&&) __sndr)); } @@ -4238,7 +4425,7 @@ namespace stdexec { } }; - struct ensure_started_t { + struct ensure_started_t : __with_default_get_env { template requires sender_in<_Sender, empty_env> && __decay_copyable> auto operator()(_Sender&& __sndr) const { @@ -4382,14 +4569,18 @@ namespace stdexec { __q<__decay_ref>, __mbind_front<__mtry_catch_q<__call_result_t, __on_not_callable<_Set>>, _Fun>>; + // FUTURE: when we have a scheduler query for "always completes inline", + // then we can use that instead of hard-coding `__inln::__scheduler` here. + template + concept __unknown_context = + __one_of<_Scheduler, __none_such, __inln::__scheduler>; + // The receiver that gets connected to the result sender is the input receiver, // possibly augmented with the input sender's completion scheduler (which is // where the result sender will be started). template using __result_receiver_t = __if_c< - // FUTURE: when we have a scheduler query for "always completes inline", - // then we can use that instead of hard-coding `__inln::__scheduler` here. - __one_of<_Scheduler, __none_such, __inln::__scheduler>, + __unknown_context<_Scheduler>, _Receiver, __t<__detail::__receiver_with< &__operation_base_base_<__id<_Receiver>, __id<_Scheduler>>::__rcvr_, @@ -4400,9 +4591,7 @@ namespace stdexec { // in the environment seen by the result sender. template using __result_env_t = __if_c< - // FUTURE: when we have a scheduler query for "always completes inline", - // then we can use that instead of hard-coding `__inln::__scheduler` here. - __one_of<_Scheduler, __none_such, __inln::__scheduler>, + __unknown_context<_Scheduler>, _Env, __env::__env_join_t< __env::__prop<_Scheduler(get_scheduler_t)>, @@ -4561,66 +4750,68 @@ namespace stdexec { }; }; - template - struct __sender { - using _Sender = stdexec::__t<_SenderId>; + template + struct __sender_base { using _Set = stdexec::__t<_SetId>; + using is_sender = void; - struct __t { - using __id = __sender; - using is_sender = void; - - template - using __operation_t = // - stdexec::__t< - __operation< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>>; - template - using __receiver_t = - __receiver< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>; + template + using __operation_t = // + stdexec::__t< + __operation< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>>; + template + using __receiver_t = + __receiver< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>; - template - using __completion_sched = - __query_result_or_t, env_of_t<_Sender>, __none_such>; + template + using __completion_sched = + __query_result_or_t, env_of_t<_CvrefSender>, __none_such>; - template - using __completions = // + template + using __completions = // __mapply< __transform< __mbind_front_q<__tfx_signal_t, _Env, _Fun, _Set, __completion_sched<_Sender>>, __q<__concat_completion_signatures_t> >, __completion_signatures_of_t<_Sender, _Env>>; - template <__decays_to<__t> _Self, receiver _Receiver> - requires sender_to<__copy_cvref_t<_Self, _Sender>, __receiver_t<_Self, _Receiver>> - friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) - -> __operation_t<_Self, _Receiver> { - return __operation_t<_Self, _Receiver>{ - ((_Self&&) __self).__sndr_, (_Receiver&&) __rcvr, ((_Self&&) __self).__fun_}; - } + template <__decays_to_derived_from<__sender_base> _Self, receiver _Receiver> + requires sender_to<__copy_cvref_t<_Self, _Sender>, __receiver_t<_Self, _Receiver>> + friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) + -> __operation_t<_Self, _Receiver> { + return __operation_t<_Self, _Receiver>{ + ((_Self&&) __self).__sndr_, (_Receiver&&) __rcvr, ((_Self&&) __self).__fun_}; + } - friend auto tag_invoke(get_env_t, const __t& __self) noexcept -> env_of_t { - return get_env(__self.__sndr_); - } + template <__decays_to_derived_from<__sender_base> _Self, class _Env> + friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) + -> __completions<__copy_cvref_t<_Self, _Sender>, _Env> { + return {}; + } - template <__decays_to<__t> _Self, class _Env> - friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) - -> __completions<__copy_cvref_t<_Self, _Sender>, _Env> { - return {}; - } + // BUGBUG better would be to port the `let_[value|error|stopped]` algorithms to __sexpr + template + static auto apply(_Self&& __self, _ApplyFun __fun) -> __call_result_t< + _ApplyFun, + _SetId, // Actually one of let_value_t, let_error_t, let_stopped_t + __copy_cvref_t<_Self, _Fun>, + __copy_cvref_t<_Self, _Sender>> { + return ((_ApplyFun&&) __fun)( + _SetId(), ((_Self&&) __self).__fun_, ((_Self&&) __self).__sndr_); + } - // BUGBUG better would be to port the `let_[value|error|stopped]` algorithms to __sexpr - template - static auto apply(_Self&& __self, _ApplyFun __fun) -> __call_result_t< - _ApplyFun, - _SetId, // Actually one of let_value_t, let_error_t, let_stopped_t - __copy_cvref_t<_Self, _Fun>, - __copy_cvref_t<_Self, _Sender>> { - return ((_ApplyFun&&) __fun)( - _SetId(), ((_Self&&) __self).__fun_, ((_Self&&) __self).__sndr_); - } + _Sender __sndr_; + _Fun __fun_; + }; - _Sender __sndr_; - _Fun __fun_; + template + struct __sender { + struct __t : __sender_base, _Fun, _SetId> { + using __id = __sender; + + friend auto tag_invoke(get_env_t, const __t& __self) noexcept /*-> env_of_t*/ { + return __join_env(__mkprop(_Domain(), get_domain), get_env(__self.__sndr_)); + } }; }; @@ -4637,15 +4828,15 @@ namespace stdexec { tag_invoke_t(_LetTag, _Sender, _Function)>; using __t = _SetTag; - template + template using __sender = - stdexec::__t<__let::__sender>, _Fun, _LetTag>>; + stdexec::__t<__let::__sender>, _Fun, _LetTag, _Domain>>; template auto operator()(_Sender&& __sndr, _Fun __fun) const { auto __domain = __get_sender_domain((_Sender&&) __sndr); return stdexec::transform_sender( - __domain, __sender<_Sender, _Fun>{(_Sender&&) __sndr, (_Fun&&) __fun}); + __domain, __sender<_Sender, _Fun>{{(_Sender&&) __sndr, (_Fun&&) __fun}}); } template @@ -4653,6 +4844,54 @@ namespace stdexec { __binder_back<_LetTag, _Fun> operator()(_Fun __fun) const { return {{}, {}, {(_Fun&&) __fun}}; } + + // Compute all the domains of all the result senders and make sure they're all the same + template + using __result_domain_t = + __gather_completions_for< + _SetTag, + _Child, + _Env, + __mtry_catch< + __mbind_front_q<__call_result_t, _Fun>, + __on_not_callable<_SetTag>>, + __q<__domain::__common_domain_t>>; + + static auto get_env(__ignore) noexcept { + return __mkprop(dependent_domain(), get_domain); + } + + template _Sender, class _Env> + static decltype(auto) transform_env(_Sender&& __sndr, const _Env& __env) { + return __sexpr_apply( + (_Sender&&) __sndr, + [&] _Child>(__ignore, _Fun&&, _Child&& __child) -> decltype(auto) { + using _Scheduler = __completion_sched<_Child, _SetTag>; + if constexpr (__unknown_context<_Scheduler>) { + return (__env); + } else { + return __join_env( + __mkprop(get_completion_scheduler<_SetTag>(stdexec::get_env(__child)), get_scheduler), + __mkprop(get_domain), + __env); + } + STDEXEC_UNREACHABLE(); + } + ); + } + + template _Sender, class _Env> + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> + static decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env) { + return __sexpr_apply( + (_Sender&&) __sndr, + [&] _Child>(__ignore, _Fun&& __fun, _Child&& __child) { + // TODO: propagate errors here + using _Domain = __result_domain_t<_Child, _Fun, _Env>; + static_assert(__none_of<_Domain, __none_such, dependent_domain>); + return __sender<_Child, _Fun, _Domain>{{(_Child&&) __child, (_Fun&&) __fun}}; + }); + } }; struct let_value_t : __let::__let_xxx_t { }; @@ -4750,64 +4989,63 @@ namespace stdexec { }; }; - template - struct __sender { - using _Sender = stdexec::__t<_SenderId>; - - struct __t { - using __id = __sender; - using is_sender = void; - - template - using __operation_t = - stdexec::__t<__operation, stdexec::__id<_Receiver>>>; - template - using __receiver_t = - stdexec::__t<__receiver, stdexec::__id<_Receiver>>>; + struct stopped_as_optional_t : __with_default_get_env { + template + auto operator()(_Sender&& __sndr) const { + return __make_sexpr(__(), (_Sender&&) __sndr); + } - template <__decays_to<__t> _Self, receiver _Receiver> - requires __single_typed_sender<__copy_cvref_t<_Self, _Sender>, env_of_t<_Receiver>> - && sender_to<__copy_cvref_t<_Self, _Sender>, __receiver_t<_Self, _Receiver>> - friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) - -> __operation_t<_Self, _Receiver> { - return {((_Self&&) __self).__sndr_, (_Receiver&&) __rcvr}; - } + STDEXEC_ATTRIBUTE((always_inline)) // + __binder_back operator()() const noexcept { + return {}; + } - friend auto tag_invoke(get_env_t, const __t& __self) noexcept -> env_of_t { - return get_env(__self.__sndr_); - } +#if STDEXEC_FRIENDSHIP_IS_LEXICAL() + private: + template + friend struct stdexec::__sexpr; +#endif - template - requires(sizeof...(_Tys) == 1) - using __set_value_t = completion_signatures>...)>; + template + using __operation_t = + stdexec::__t<__operation, stdexec::__id<_Receiver>>>; + template + using __receiver_t = + stdexec::__t<__receiver, stdexec::__id<_Receiver>>>; - template - using __set_error_t = completion_signatures; + template + struct __connect_fn { + _Receiver __rcvr_; - template <__decays_to<__t> _Self, class _Env> - friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) - -> make_completion_signatures< - __copy_cvref_t<_Self, _Sender>, - _Env, - completion_signatures, - __set_value_t, - __set_error_t, - completion_signatures<>> { - return {}; + template + __operation_t<_Child, _Receiver> operator()(stopped_as_optional_t, __, _Child&& __child) { + return {(_Child&&) __child, std::move(__rcvr_)}; } - - _Sender __sndr_; }; - }; - struct stopped_as_optional_t { - template - auto operator()(_Sender&& __sndr) const -> __t<__sender>>> { - return {(_Sender&&) __sndr}; + template _Self, receiver _Receiver> + requires __single_typed_sender<__child_of<_Self>, env_of_t<_Receiver>> + && sender_to<__child_of<_Self>, __receiver_t<__child_of<_Self>, _Receiver>> + static __operation_t<__child_of<_Self>, _Receiver> connect(_Self&& __self, _Receiver __rcvr) { + return __sexpr_apply((_Self&&) __self, __connect_fn<_Receiver>{(_Receiver&&) __rcvr}); } - STDEXEC_ATTRIBUTE((always_inline)) // - __binder_back operator()() const noexcept { + template + requires(sizeof...(_Tys) == 1) + using __set_value_t = completion_signatures>...)>; + + template + using __set_error_t = completion_signatures; + + template _Self, class _Env> + static auto get_completion_signatures(_Self&&, _Env&&) + -> make_completion_signatures< + __child_of<_Self>, + _Env, + completion_signatures, + __set_value_t, + __set_error_t, + completion_signatures<>> { return {}; } }; @@ -5451,12 +5689,8 @@ namespace stdexec { auto __make_transform_fn(const _Env& __env) { return [&](_Scheduler&& __sched, _Values&&... __vals) { auto __domain = __get_env_domain(__env); - return stdexec::transform_sender( - __domain, - transfer( - stdexec::transform_sender(__domain, just((_Values&&) __vals...), __env), - (_Scheduler&&) __sched), - __env); + auto __tfx = __make_transformer(__domain, __env); + return __tfx(transfer)(__tfx(just)((_Values&&) __vals...), (_Scheduler&&) __sched); }; } @@ -5534,9 +5768,7 @@ namespace stdexec { struct __write_t : __with_default_get_env<__write_t> { template auto operator()(_Sender&& __sndr, _Envs... __envs) const { - auto __domain = __get_sender_domain(__sndr); - return stdexec::transform_sender( - __domain, __make_sexpr<__write_t>(__join_env(std::move(__envs)...), (_Sender&&) __sndr)); + return __make_sexpr<__write_t>(__join_env(std::move(__envs)...), (_Sender&&) __sndr); } template @@ -5545,6 +5777,19 @@ namespace stdexec { return {{}, {}, {std::move(__envs)...}}; } + template + STDEXEC_ATTRIBUTE((always_inline)) + static auto __transform_env_fn(_Env&& __env) noexcept { + return [&](__ignore, const auto& __data, __ignore) noexcept { + return __join_env(__data, (_Env&&) __env); + }; + } + + template _Self, class _Env> + static auto transform_env(const _Self& __self, _Env&& __env) noexcept { + return __sexpr_apply(__self, __transform_env_fn((_Env&&) __env)); + } + #if STDEXEC_FRIENDSHIP_IS_LEXICAL() private: template @@ -5574,6 +5819,7 @@ namespace stdexec { -> stdexec::__completion_signatures_of_t< __child_of<_Self>, __env::__env_join_t>&, _Env>> { + return {}; } }; } // namespace __write_ @@ -6276,50 +6522,7 @@ namespace stdexec { struct _INVALID_ARGUMENTS_TO_WHEN_ALL_ { }; - struct __common_domain_fn { - template - requires __all_of<_Domain, _OtherDomains...> - static _Domain __common_domain(_Domain __domain, _OtherDomains...) noexcept { - return (_Domain&&) __domain; - } - - template - static __none_such __common_domain(_Domains...) noexcept { - return {}; - } - - static default_domain __common_domain() noexcept { - return {}; - } - - auto operator()(__ignore, __ignore, const auto&... __sndrs) const noexcept { - return __common_domain(__get_sender_domain(__sndrs)...); - } - }; - - template - using __common_domain_t = // - __call_result_t<__common_domain_fn, int, int, _Senders...>; - - template - concept __has_common_domain = // - __none_of<__none_such, __common_domain_t<_Senders...>>; - - template - struct __get_env_common_domain { - template _Self> - static auto get_env(const _Self& __self) noexcept { - using _Domain = __call_result_t<__sexpr_apply_t, const _Self&, __common_domain_fn>; - if constexpr (same_as<_Domain, default_domain>) { - return empty_env(); - } else { - return __mkprop(__sexpr_apply(__self, __common_domain_fn()), get_domain); - } - STDEXEC_UNREACHABLE(); - } - }; - - struct when_all_t : __get_env_common_domain { + struct when_all_t : __domain::__get_env_common_domain { // Used by the default_domain to find legacy customizations: using _Sender = __1; using __legacy_customizations_t = // @@ -6327,9 +6530,9 @@ namespace stdexec { // TODO: improve diagnostic when senders have different domains template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Senders&&... __sndrs) const { - auto __domain = __common_domain_t<_Senders...>(); + auto __domain = __domain::__common_domain_t<_Senders...>(); return stdexec::transform_sender( __domain, __make_sexpr(__(), (_Senders&&) __sndrs...)); } @@ -6388,15 +6591,15 @@ namespace stdexec { } }; - struct when_all_with_variant_t : __get_env_common_domain { + struct when_all_with_variant_t : __domain::__get_env_common_domain { using _Sender = __1; using __legacy_customizations_t = // __types; template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Senders&&... __sndrs) const { - auto __domain = __common_domain_t<_Senders...>(); + auto __domain = __domain::__common_domain_t<_Senders...>(); return stdexec::transform_sender( __domain, __make_sexpr(__(), (_Senders&&) __sndrs...)); } @@ -6409,11 +6612,8 @@ namespace stdexec { return __sexpr_apply( (_Sender&&) __sndr, [&](__ignore, __ignore, _Child&&... __child) { auto __domain = __get_env_domain(__env); - return stdexec::transform_sender( - __domain, - when_all_t()( - stdexec::transform_sender(__domain, into_variant((_Child&&) __child), __env)...), - __env); + auto __tfx = __make_transformer(__domain, __env); + return __tfx(when_all_t())(__tfx(into_variant)((_Child&&) __child)...); }); } }; @@ -6428,7 +6628,7 @@ namespace stdexec { _Sender...)>; template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Scheduler&& __sched, _Senders&&... __sndrs) const { using _Env = __t<__schedule_from::__environ<__id<__decay_t<_Scheduler>>>>; auto __domain = query_or(get_domain, __sched, default_domain()); @@ -6452,12 +6652,10 @@ namespace stdexec { (_Sender&&) __sndr, [&](__ignore, _Data&& __data, _Child&&... __child) { auto __domain = __get_env_domain(__env); - return stdexec::transform_sender( - __domain, - transfer( - stdexec::transform_sender(__domain, when_all_t()((_Child&&) __child...), __env), - get_completion_scheduler(__data)), - __env); + auto __tfx = __make_transformer(__domain, __env); + return __tfx(transfer)( + __tfx(when_all_t())((_Child&&) __child...), + get_completion_scheduler(__data)); }); } }; @@ -6472,7 +6670,7 @@ namespace stdexec { _Sender...)>; template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Scheduler&& __sched, _Senders&&... __sndrs) const { using _Env = __t<__schedule_from::__environ<__id<__decay_t<_Scheduler>>>>; auto __domain = query_or(get_domain, __sched, default_domain()); @@ -6496,12 +6694,10 @@ namespace stdexec { (_Sender&&) __sndr, [&](__ignore, _Data&& __data, _Child&&... __child) { auto __domain = __get_env_domain(__env); - return stdexec::transform_sender( - __domain, - transfer_when_all_t()( + auto __tfx = __make_transformer(__domain, __env); + return __tfx(transfer_when_all_t())( get_completion_scheduler((_Data&&) __data), - stdexec::transform_sender(__domain, into_variant((_Child&&) __child), __env)...), - __env); + __tfx(into_variant)((_Child&&) __child)...); }); } }; @@ -6691,6 +6887,8 @@ namespace stdexec { __mstring _Details = __no_scheduler_details> struct _CANNOT_RESTORE_EXECUTION_CONTEXT_AFTER_ON_ { }; + struct on_t; + STDEXEC_PRAGMA_PUSH() STDEXEC_PRAGMA_IGNORE_GNU("-Wunused-local-typedefs") @@ -6698,7 +6896,6 @@ namespace stdexec { // Issue a custom diagnostic if the environment doesn't provide a scheduler. template static auto transform_sender(_Sender&&, const _Env&) { - struct __no_scheduler_in_environment { using is_sender = void; using completion_signatures = // @@ -6723,6 +6920,9 @@ namespace stdexec { } }; + template + __always(_Ty) -> __always<_Ty>; + struct on_t : __with_default_get_env , __no_scheduler_in_environment { @@ -6734,23 +6934,43 @@ namespace stdexec { __domain, __make_sexpr((_Scheduler&&) __sched, (_Sender&&) __sndr)); } + template + static auto __mkenv(_Scheduler __sched) { + auto __env = __join_env(__mkprop(__sched, get_scheduler), __mkprop(get_domain)); + struct __env_t : decltype(__env) { }; + return __env_t{__env}; + } + + template + STDEXEC_ATTRIBUTE((always_inline)) + static auto __transform_env_fn(_Env&& __env) noexcept { + return [&](__ignore, auto __sched, __ignore) noexcept { + return __join_env(__mkenv(__sched), (_Env&&) __env); + }; + } + + template + static auto transform_env(const _Sender& __sndr, _Env&& __env) noexcept { + return __sexpr_apply(__sndr, __transform_env_fn((_Env&&) __env)); + } + using __no_scheduler_in_environment::transform_sender; template requires __callable static auto transform_sender(_Sender&& __sndr, const _Env& __env) { - auto __domain = __get_env_domain(__env); - auto __old = get_scheduler(__env); - return stdexec::transform_sender( - __domain, - __sexpr_apply( - (_Sender&&) __sndr, - [&](__ignore, _Scheduler __sched, _Child&& __child) { - return transfer( - let_value(transfer_just(std::move(__sched)), __always<_Child>{(_Child&&) __child}), + return __sexpr_apply( + (_Sender&&) __sndr, + [&](__ignore, _Scheduler __sched, _Child&& __child) { + auto __domain = __get_env_domain(__env); + auto __old = get_scheduler(__env); + auto __tfx = __make_transformer(__domain, __env); + return __tfx(transfer)( + __tfx(let_value)( + __tfx(transfer_just)(std::move(__sched)), + __always{(_Child&&) __child}), std::move(__old)); - }), - __env); + }); } }; @@ -6788,10 +7008,7 @@ namespace stdexec { template static auto __mkenv(_Scheduler __sched) { auto __env = __join_env(__mkprop(__sched, get_scheduler), __mkprop(get_domain)); - using _Env = decltype(__env); - - struct __env_t : _Env { }; - + struct __env_t : decltype(__env) { }; return __env_t{__env}; } @@ -6809,10 +7026,11 @@ namespace stdexec { [&](__ignore, _Data&& __data, _Child&& __child) { auto&& [__sched, __clsur] = (_Data&&) __data; using _Closure = decltype(__clsur); + auto __tfx = __make_transformer(__domain, __env); return __write( - transfer( + __tfx(transfer)( ((_Closure&&) __clsur)( - transfer(__write((_Child&&) __child, __mkenv(__old)), __sched)), + __tfx(transfer)(__write((_Child&&) __child, __mkenv(__old)), __sched)), __old), __mkenv(__sched)); }), @@ -6841,6 +7059,7 @@ namespace stdexec { } struct __env : __result_of<__make_env, run_loop&> { + __env(); __env(run_loop& __loop) noexcept : __result_of<__make_env, run_loop&>{__sync_wait::__make_env(__loop)} { } @@ -7002,10 +7221,12 @@ namespace stdexec { struct sync_wait_t { template _Sender> requires __valid_sync_wait_argument<_Sender> - && __callable, sync_wait_t, _Sender> + && __has_implementation_for, _Sender> auto operator()(_Sender&& __sndr) const -> std::optional<__value_tuple_for_t<_Sender>> { - auto __domain = __get_sender_domain(__sndr); - return stdexec::apply_sender(__domain, *this, (_Sender&&) __sndr); + using _SD = __sender_domain_of_t<_Sender>; + constexpr bool __has_custom_impl = __callable; + using _Domain = __if_c<__has_custom_impl, _SD, default_domain>; + return stdexec::apply_sender(_Domain(), *this, (_Sender&&) __sndr); } #if STDEXEC_NVHPC() diff --git a/test/exec/test_on2.cpp b/test/exec/test_on2.cpp index 7af217369..5d4026622 100644 --- a/test/exec/test_on2.cpp +++ b/test/exec/test_on2.cpp @@ -52,14 +52,14 @@ namespace { CHECK_FALSE(called); // Tell sched1 to start executing one task - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // Now the base sender is called, and execution is transfered to sched2 CHECK(called); CHECK(recv_value == 0); // Tell sched2 to start executing one task - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the base sender is called, and a value is sent to the receiver CHECK(recv_value == 19); @@ -86,14 +86,14 @@ namespace { CHECK_FALSE(called); // Tell sched1 to start executing one task - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // Now the base sender is called, and execution is transfered to sched2 CHECK(called); CHECK(recv_error == 0); // Tell sched2 to start executing one task - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the base sender is called, and an error is sent to the receiver CHECK(recv_error == 19); @@ -122,14 +122,14 @@ namespace { // Tell sched1 to start executing one task. This will post // work to sched2 - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // The base sender shouldn't be started CHECK_FALSE(called); // Tell sched2 to start executing one task. This will execute // the base sender and post work back to sched1 - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the base sender is called, and execution is transfered back // to sched1 @@ -138,14 +138,14 @@ namespace { // Tell sched1 to start executing one task. This will post work to // sched3 - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // The final receiver still hasn't been called CHECK(recv_value == 0); // Tell sched3 to start executing one task. It should call the // final receiver - sched3.start_next(); + REQUIRE(sched3.try_start_next()); // Now the value is sent to the receiver CHECK(recv_value == 19); @@ -174,14 +174,14 @@ namespace { // Tell sched1 to start executing one task. This will post // work to sched2 - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // The base sender shouldn't be started CHECK_FALSE(called); // Tell sched2 to start executing one task. This will execute // the base sender and post work back to sched1 - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the base sender is called, and execution is transfered back // to sched1 @@ -190,14 +190,14 @@ namespace { // Tell sched1 to start executing one task. This will post work to // sched3 - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // The final receiver still hasn't been called CHECK(recv_error == 0); // Tell sched3 to start executing one task. It should call the // final receiver - sched3.start_next(); + REQUIRE(sched3.try_start_next()); // Now the error is sent to the receiver CHECK(recv_error == 19); @@ -223,14 +223,14 @@ namespace { CHECK_FALSE(called); // Tell sched1 to start executing one task - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // Now the closure is called, and execution is transfered to sched2 CHECK(called); CHECK(recv_value == 0); // Tell sched2 to start executing one task - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the closure is called, and a value is sent to the receiver CHECK(recv_value == 19); @@ -257,14 +257,14 @@ namespace { CHECK_FALSE(called); // Tell sched1 to start executing one task - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // Now the closure is called, and execution is transfered to sched2 CHECK(called); CHECK(recv_error == 0); // Tell sched2 to start executing one task - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the closure is called, and a error is sent to the receiver CHECK(recv_error == 19); @@ -293,21 +293,21 @@ namespace { // Tell sched1 to start executing one task. This will post // work to sched3 - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // The closure shouldn't be started CHECK_FALSE(called); // Tell sched3 to start executing one task. This post work to // sched2. - sched3.start_next(); + REQUIRE(sched3.try_start_next()); // The closure shouldn't be started CHECK_FALSE(called); // Tell sched2 to start executing one task. This will execute // the closure and post work back to sched3 - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the closure is called, and execution is transfered back // to sched3 @@ -316,7 +316,7 @@ namespace { // Tell sched3 to start executing one task. This will call the // receiver - sched3.start_next(); + REQUIRE(sched3.try_start_next()); // Now the value is sent to the receiver CHECK(recv_value == 19); @@ -345,21 +345,21 @@ namespace { // Tell sched1 to start executing one task. This will post // work to sched3 - sched1.start_next(); + REQUIRE(sched1.try_start_next()); // The closure shouldn't be started CHECK_FALSE(called); // Tell sched3 to start executing one task. This post work to // sched2. - sched3.start_next(); + REQUIRE(sched3.try_start_next()); // The closure shouldn't be started CHECK_FALSE(called); // Tell sched2 to start executing one task. This will execute // the closure and post work back to sched3 - sched2.start_next(); + REQUIRE(sched2.try_start_next()); // Now the closure is called, and execution is transfered back // to sched3 @@ -368,7 +368,7 @@ namespace { // Tell sched3 to start executing one task. This will call the // receiver - sched3.start_next(); + REQUIRE(sched3.try_start_next()); // Now the error is sent to the receiver CHECK(recv_error == 19); diff --git a/test/test_common/schedulers.hpp b/test/test_common/schedulers.hpp index 9746f1bc3..678a194c0 100644 --- a/test/test_common/schedulers.hpp +++ b/test/test_common/schedulers.hpp @@ -47,6 +47,10 @@ namespace { using cmd_vec_t = std::vector; struct data { + explicit data(int id) + : id_(id) { + } + int id_; cmd_vec_t all_commands_; std::mutex mutex_; std::condition_variable cv_; @@ -108,11 +112,37 @@ namespace { using __t = impulse_scheduler; impulse_scheduler() - : shared_data_(std::make_shared()) { + : shared_data_(std::make_shared(0)) { + } + + explicit impulse_scheduler(int id) + : shared_data_(std::make_shared(id)) { } ~impulse_scheduler() = default; + //! Actually start the command from the last started operation_state + //! Returns immediately if no command registered (i.e., no operation state started) + bool try_start_next() { + // Wait for a command that we can execute + std::unique_lock lock{shared_data_->mutex_}; + + // If there are no commands in the queue, return false + if (shared_data_->all_commands_.empty()) { + return false; + } + + // Pop one command from the queue + auto cmd = std::move(shared_data_->all_commands_.front()); + shared_data_->all_commands_.erase(shared_data_->all_commands_.begin()); + // Exit the lock before executing the command + lock.unlock(); + // Execute the command, i.e., send an impulse to the connected sender + cmd(); + // Return true to signal that we started a command + return true; + } + //! Actually start the command from the last started operation_state //! Blocks if no command registered (i.e., no operation state started) void start_next() {