From ad2d524196a3bc7bb39934f950df6ed5301f0c40 Mon Sep 17 00:00:00 2001 From: Eric Niebler Date: Wed, 25 Oct 2023 14:15:39 -0700 Subject: [PATCH] the let_* algorithms have a dependent domain, which causes a recursive tree transformation --- include/exec/static_thread_pool.hpp | 40 ++- include/stdexec/__detail/__execution_fwd.hpp | 8 + include/stdexec/concepts.hpp | 3 + include/stdexec/execution.hpp | 358 ++++++++++++------- 4 files changed, 281 insertions(+), 128 deletions(-) diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index b87d517de..e77ab23a6 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -37,6 +37,11 @@ namespace exec { void (*__execute)(task_base*, std::uint32_t tid) noexcept; }; + template + struct not_a_sender { + using is_sender = void; + }; + class static_thread_pool { template class operation; @@ -100,21 +105,44 @@ namespace exec { static_thread_pool& pool_; }; + public: struct domain { // For eager customization template Sender> auto transform_sender(Sender&& sndr) const noexcept { - auto sched = stdexec::get_completion_scheduler( - stdexec::get_env(sndr)); - return stdexec::apply_sender((Sender&&) sndr, transform_bulk{*sched.pool_}); + if constexpr (stdexec::__completes_on) { + auto sched = stdexec::get_completion_scheduler( + stdexec::get_env(sndr)); + return stdexec::apply_sender((Sender&&) sndr, transform_bulk{*sched.pool_}); + } else { + static_assert( + stdexec::__completes_on, + "No static_thread_pool instance can be found in the sender's environment " + "on which to schedule bulk work."); + return not_a_sender>(); + } + STDEXEC_UNREACHABLE(); } // transform the generic bulk sender into a parallel thread-pool bulk sender template Sender, class Env> - requires stdexec::__callable auto transform_sender(Sender&& sndr, const Env& env) const noexcept { - auto sched = stdexec::get_scheduler(env); - return stdexec::apply_sender((Sender&&) sndr, transform_bulk{*sched.pool_}); + if constexpr (stdexec::__completes_on) { + auto sched = stdexec::get_completion_scheduler( + stdexec::get_env(sndr)); + return stdexec::apply_sender((Sender&&) sndr, transform_bulk{*sched.pool_}); + } else if constexpr (stdexec::__starts_on) { + auto sched = stdexec::get_scheduler(env); + return stdexec::apply_sender((Sender&&) sndr, transform_bulk{*sched.pool_}); + } else { + static_assert( // + stdexec::__starts_on + || stdexec::__completes_on, + "No static_thread_pool instance can be found in the sender's or receiver's " + "environment on which to schedule bulk work."); + return not_a_sender>(); + } + STDEXEC_UNREACHABLE(); } }; diff --git a/include/stdexec/__detail/__execution_fwd.hpp b/include/stdexec/__detail/__execution_fwd.hpp index 27dd5c1aa..718d81bdd 100644 --- a/include/stdexec/__detail/__execution_fwd.hpp +++ b/include/stdexec/__detail/__execution_fwd.hpp @@ -175,6 +175,14 @@ namespace stdexec { using __transfer_just::transfer_just_t; extern const transfer_just_t transfer_just; + ////////////////////////////////////////////////////////////////////////////////////////////////// + namespace __bulk { + struct bulk_t; + } + + using __bulk::bulk_t; + extern const bulk_t bulk; + ////////////////////////////////////////////////////////////////////////////////////////////////// namespace __on_v2 { struct on_t; diff --git a/include/stdexec/concepts.hpp b/include/stdexec/concepts.hpp index bff3d879b..1c6b9d732 100644 --- a/include/stdexec/concepts.hpp +++ b/include/stdexec/concepts.hpp @@ -234,6 +234,9 @@ namespace stdexec { template concept __nothrow_decay_copyable = __nothrow_constructible_from<__decay_t<_Ty>, _Ty>; + template + concept __decays_to_derived_from = derived_from<__decay_t<_Ty>, _Up>; + } // namespace stdexec // #if !STDEXEC_HAS_STD_CONCEPTS_HEADER() diff --git a/include/stdexec/execution.hpp b/include/stdexec/execution.hpp index 1b72701fb..1471cb2e4 100644 --- a/include/stdexec/execution.hpp +++ b/include/stdexec/execution.hpp @@ -97,6 +97,9 @@ namespace stdexec { static inline constexpr Tag (*signature)(Sig) = nullptr; }; + struct default_domain; + struct dependent_domain; + namespace __domain { template using __legacy_c11n_for = typename _Tag::__legacy_customizations_t; @@ -121,8 +124,9 @@ namespace stdexec { }; template - concept __has_default_transform = sender_expr<_Sender> && // - __has_transform_member<__tag_of<_Sender>, _Sender, _Env...>; + concept __has_default_transform = // + sender_expr<_Sender> // + && __has_transform_member<__tag_of<_Sender>, _Sender, _Env...>; } // namespace __domain struct default_domain { @@ -626,6 +630,20 @@ namespace stdexec { { get_env(std::as_const(__ep)) } -> __none_of; }; + ///////////////////////////////////////////////////////////////////////////// + template + concept __completes_on = + __decays_to< + __call_result_t, env_of_t<_Sender>>, + _Scheduler>; + + ///////////////////////////////////////////////////////////////////////////// + template + concept __starts_on = + __decays_to< + __call_result_t, + _Scheduler>; + ///////////////////////////////////////////////////////////////////////////// inline constexpr struct __get_env_domain_t { template @@ -654,6 +672,52 @@ namespace stdexec { template using __sender_domain_of_t = __call_result_t<__get_sender_domain_t, _Sender, _Tag>; + namespace __domain { + struct __common_domain_fn { + static default_domain __common_domain() noexcept { + return {}; + } + + template + requires __all_of<_Domain, _OtherDomains...> + static _Domain __common_domain(_Domain __domain, _OtherDomains...) noexcept { + return (_Domain&&) __domain; + } + + template + static auto __common_domain(_Domains...) noexcept // + -> __if_c<__one_of, dependent_domain, __none_such> { + return {}; + } + + auto operator()(__ignore, __ignore, const auto&... __sndrs) const noexcept { + return __common_domain(__get_sender_domain(__sndrs)...); + } + }; + + template + using __common_domain_t = // + __call_result_t<__common_domain_fn, int, int, _Senders...>; + + template + concept __has_common_domain = // + __none_of<__none_such, __common_domain_t<_Senders...>>; + + template + struct __get_env_common_domain { + template _Self> + static auto get_env(const _Self& __self) noexcept { + using _Domain = __call_result_t; + if constexpr (same_as<_Domain, default_domain>) { + return empty_env(); + } else { + return __mkprop(apply_sender(__self, __common_domain_fn()), get_domain); + } + STDEXEC_UNREACHABLE(); + } + }; + } // namespace __domain + ///////////////////////////////////////////////////////////////////////////// // [execution.receivers] namespace __receivers { @@ -1164,23 +1228,86 @@ namespace stdexec { using __debug::__is_debug_env; using __debug::__debug_sender; + ///////////////////////////////////////////////////////////////////////////// + // dependent_domain + struct dependent_domain { + template + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> + decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env) const; + + template + _Sender transform_sender(_Sender&& __sndr, const _Env&...) const { + return static_cast<_Sender&&>(__sndr); + } + }; + + namespace __domain { + struct __transform_sender { + template + /*constexpr*/ decltype(auto) + operator()(_Domain __dom, _Sender&& __sndr) const { + if constexpr (__domain::__has_transform_member<_Domain, _Sender>) { + return __dom.transform_sender((_Sender&&) __sndr); + } else { + return default_domain().transform_sender((_Sender&&) __sndr); + } + } + + template + /*constexpr*/ decltype(auto) + operator()(_Domain __dom, _Sender&& __sndr, const _Env& __env) const { + if constexpr (__domain::__has_transform_member<_Domain, _Sender, const _Env&>) { + return __dom.transform_sender((_Sender&&) __sndr, __env); + } else { + return default_domain().transform_sender((_Sender&&) __sndr, __env); + } + } + }; + } // namespace __domain + ///////////////////////////////////////////////////////////////////////////// // [execution.transform_sender] - inline constexpr struct transform_sender_t { - template + inline constexpr struct transform_sender_t : __domain::__transform_sender { + using __domain::__transform_sender::operator(); + + template _Env> + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> /*constexpr*/ decltype(auto) - operator()(_Domain __dom, _Sender&& __sndr, const _Env&... __env) const { - if constexpr (__domain::__has_transform_member<_Domain, _Sender, const _Env&...>) { - return __dom.transform_sender((_Sender&&) __sndr, __env...); - } else { - return default_domain().transform_sender((_Sender&&) __sndr, __env...); - } + operator()(_Domain __dom, _Sender&& __sndr, const _Env& __env) const { + return (*this)(__dom, dependent_domain().transform_sender((_Sender&&) __sndr, __env), __env); } } transform_sender{}; template using transform_sender_result_t = __call_result_t; + struct _CHILD_SENDERS_WITH_DIFFERENT_DOMAINS_ {}; + + template + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> + decltype(auto) dependent_domain::transform_sender(_Sender&& __sndr, const _Env& __env) const { + // recursively transform the sender to determine the domain + return apply_sender( + (_Sender&&) __sndr, + [&](_Tag, _Data&& __data, _Childs&&... __childs) { + // TODO: propagate meta-exceptions here: + auto __sndr2 = make_sender_expr<_Tag>( + (_Data&&) __data, + this->transform_sender((_Childs&&) __childs, __env)...); + using _Sender2 = decltype(__sndr2); + + auto __domain2 = apply_sender(__sndr2, __domain::__common_domain_fn()); + using _Domain2 = decltype(__domain2); + + if constexpr (same_as<_Domain2, __none_such>) { + return __mexception<_CHILD_SENDERS_WITH_DIFFERENT_DOMAINS_, _WITH_SENDER_<_Sender2>>(); + } else { + return __detail::transform_sender()(__domain2, std::move(__sndr2), __env); + } + STDEXEC_UNREACHABLE(); + }); + } + ///////////////////////////////////////////////////////////////////////////// // [execution.sndtraits] namespace __get_completion_signatures { @@ -4670,72 +4797,74 @@ namespace stdexec { }; }; - template - struct __sender { - using _Sender = stdexec::__t<_SenderId>; + template + struct __sender_base { using _Set = stdexec::__t<_SetId>; + using is_sender = void; - struct __t { - using __id = __sender; - using is_sender = void; + template + using __operation_t = // + stdexec::__t< + __operation< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>>; + template + using __receiver_t = + __receiver< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>; - template - using __operation_t = // - stdexec::__t< - __operation< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>>; - template - using __receiver_t = - __receiver< stdexec::__cvref_id<_Self, _Sender>, stdexec::__id<_Receiver>, _Fun, _Set>; - - template - using __completion_sched = - __query_result_or_t, env_of_t<_Sender>, __none_such>; - - template - using __completions = // - __minvoke_if_c< - same_as< - __completion_signatures_of_t<_Sender, _Env>, - dependent_completion_signatures>, - __mconst>, - __mbind_front_q< - __mapply, - __transform< - __mbind_front_q<__tfx_signal_t, _Env, _Fun, _Set, __completion_sched<_Sender>>, - __q<__concat_completion_signatures_t> >, - __completion_signatures_of_t<_Sender, _Env>>>; + template + using __completion_sched = + __query_result_or_t, env_of_t<_CvrefSender>, __none_such>; - template <__decays_to<__t> _Self, receiver _Receiver> - requires sender_to<__copy_cvref_t<_Self, _Sender>, __receiver_t<_Self, _Receiver>> - friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) - -> __operation_t<_Self, _Receiver> { - return __operation_t<_Self, _Receiver>{ - ((_Self&&) __self).__sndr_, (_Receiver&&) __rcvr, ((_Self&&) __self).__fun_}; - } + template + using __completions = // + __minvoke_if_c< + same_as< + __completion_signatures_of_t<_CvrefSender, _Env>, + dependent_completion_signatures>, + __mconst>, + __mbind_front_q< + __mapply, + __transform< + __mbind_front_q<__tfx_signal_t, _Env, _Fun, _Set, __completion_sched<_CvrefSender>>, + __q<__concat_completion_signatures_t> >, + __completion_signatures_of_t<_CvrefSender, _Env>>>; - friend auto tag_invoke(get_env_t, const __t& __self) noexcept -> env_of_t { - return get_env(__self.__sndr_); - } + template <__decays_to_derived_from<__sender_base> _Self, receiver _Receiver> + requires sender_to<__copy_cvref_t<_Self, _Sender>, __receiver_t<_Self, _Receiver>> + friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) + -> __operation_t<_Self, _Receiver> { + return __operation_t<_Self, _Receiver>{ + ((_Self&&) __self).__sndr_, (_Receiver&&) __rcvr, ((_Self&&) __self).__fun_}; + } - template <__decays_to<__t> _Self, class _Env> - friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) - -> __completions<__copy_cvref_t<_Self, _Sender>, _Env> { - return {}; - } + template <__decays_to_derived_from<__sender_base> _Self, class _Env> + friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&) + -> __completions<__copy_cvref_t<_Self, _Sender>, _Env> { + return {}; + } - // BUGBUG better would be to port the `let_[value|error|stopped]` algorithms to __sexpr - template - static auto apply(_Self&& __self, _ApplyFun __fun) -> __call_result_t< - _ApplyFun, - _SetId, // Actually one of let_value_t, let_error_t, let_stopped_t - __copy_cvref_t<_Self, _Fun>, - __copy_cvref_t<_Self, _Sender>> { - return ((_ApplyFun&&) __fun)( - _SetId(), ((_Self&&) __self).__fun_, ((_Self&&) __self).__sndr_); - } + // BUGBUG better would be to port the `let_[value|error|stopped]` algorithms to __sexpr + template + static auto apply(_Self&& __self, _ApplyFun __fun) -> __call_result_t< + _ApplyFun, + _SetId, // Actually one of let_value_t, let_error_t, let_stopped_t + __copy_cvref_t<_Self, _Fun>, + __copy_cvref_t<_Self, _Sender>> { + return ((_ApplyFun&&) __fun)( + _SetId(), ((_Self&&) __self).__fun_, ((_Self&&) __self).__sndr_); + } - _Sender __sndr_; - _Fun __fun_; + _Sender __sndr_; + _Fun __fun_; + }; + + template + struct __sender { + struct __t : __sender_base, _Fun, _SetId> { + using __id = __sender; + + friend auto tag_invoke(get_env_t, const __t& __self) noexcept /*-> env_of_t*/ { + return __join_env(__mkprop(_Domain(), get_domain), get_env(__self.__sndr_)); + } }; }; @@ -4752,21 +4881,49 @@ namespace stdexec { tag_invoke_t(_LetTag, _Sender, _Function)>; using __t = _SetTag; - template + template using __sender = - stdexec::__t<__let::__sender>, _Fun, _LetTag>>; + stdexec::__t<__let::__sender>, _Fun, _LetTag, _Domain>>; template auto operator()(_Sender&& __sndr, _Fun __fun) const { auto __domain = __get_sender_domain((_Sender&&) __sndr); return stdexec::transform_sender( - __domain, __sender<_Sender, _Fun>{(_Sender&&) __sndr, (_Fun&&) __fun}); + __domain, __sender<_Sender, _Fun>{{(_Sender&&) __sndr, (_Fun&&) __fun}}); } template __binder_back<_LetTag, _Fun> operator()(_Fun __fun) const { return {{}, {}, {(_Fun&&) __fun}}; } + + static auto get_env(__ignore) noexcept { + return __mkprop(dependent_domain(), get_domain); + } + + template _Sender, __none_of _Env> + requires same_as<__sender_domain_of_t<_Sender>, dependent_domain> + static decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env) { + return apply_sender( + (_Sender&&) __sndr, + [&] _Child>(__ignore, _Fun&& __fun, _Child&& __child) { + // Compute all the domains of all the result senders and make sure they're all + // the same. + // TODO: propagate errors here + using _Domain = // + __gather_completions_for< + _SetTag, + _Child, + _Env, + __mtry_catch< + __mbind_front_q<__call_result_t, _Fun>, + __on_not_callable<_SetTag>>, + __q<__domain::__common_domain_t>>; + static_assert(__none_of<_Domain, __none_such, dependent_domain>); + + return __sender<_Child, _Fun, _Domain>{{(_Child&&) __child, (_Fun&&) __fun}}; + }); + } }; struct let_value_t : __let::__let_xxx_t { }; @@ -6389,50 +6546,7 @@ namespace stdexec { struct _INVALID_ARGUMENTS_TO_WHEN_ALL_ { }; - struct __common_domain_fn { - template - requires __all_of<_Domain, _OtherDomains...> - static _Domain __common_domain(_Domain __domain, _OtherDomains...) noexcept { - return (_Domain&&) __domain; - } - - template - static __none_such __common_domain(_Domains...) noexcept { - return {}; - } - - static default_domain __common_domain() noexcept { - return {}; - } - - auto operator()(__ignore, __ignore, const auto&... __sndrs) const noexcept { - return __common_domain(__get_sender_domain(__sndrs)...); - } - }; - - template - using __common_domain_t = // - __call_result_t<__common_domain_fn, int, int, _Senders...>; - - template - concept __has_common_domain = // - __none_of<__none_such, __common_domain_t<_Senders...>>; - - template - struct __get_env_common_domain { - template _Self> - static auto get_env(const _Self& __self) noexcept { - using _Domain = __call_result_t; - if constexpr (same_as<_Domain, default_domain>) { - return empty_env(); - } else { - return __mkprop(apply_sender(__self, __common_domain_fn()), get_domain); - } - STDEXEC_UNREACHABLE(); - } - }; - - struct when_all_t : __get_env_common_domain { + struct when_all_t : __domain::__get_env_common_domain { // Used by the default_domain to find legacy customizations: using _Sender = __1; using __legacy_customizations_t = // @@ -6440,9 +6554,9 @@ namespace stdexec { // TODO: improve diagnostic when senders have different domains template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Senders&&... __sndrs) const { - auto __domain = __common_domain_t<_Senders...>(); + auto __domain = __domain::__common_domain_t<_Senders...>(); return stdexec::transform_sender( __domain, make_sender_expr(__(), (_Senders&&) __sndrs...)); } @@ -6504,15 +6618,15 @@ namespace stdexec { template using __into_variant_result_t = __result_of; - struct when_all_with_variant_t : __get_env_common_domain { + struct when_all_with_variant_t : __domain::__get_env_common_domain { using _Sender = __1; using __legacy_customizations_t = // __types; template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Senders&&... __sndrs) const { - auto __domain = __common_domain_t<_Senders...>(); + auto __domain = __domain::__common_domain_t<_Senders...>(); return stdexec::transform_sender( __domain, make_sender_expr(__(), (_Senders&&) __sndrs...)); } @@ -6544,7 +6658,7 @@ namespace stdexec { _Sender...)>; template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Scheduler&& __sched, _Senders&&... __sndrs) const { using _Env = __t<__schedule_from::__environ<__id<__decay_t<_Scheduler>>>>; auto __domain = query_or(get_domain, __sched, default_domain()); @@ -6588,7 +6702,7 @@ namespace stdexec { _Sender...)>; template - requires __has_common_domain<_Senders...> + requires __domain::__has_common_domain<_Senders...> auto operator()(_Scheduler&& __sched, _Senders&&... __sndrs) const { using _Env = __t<__schedule_from::__environ<__id<__decay_t<_Scheduler>>>>; auto __domain = query_or(get_domain, __sched, default_domain());