diff --git a/include/stdexec/__detail/__meta.hpp b/include/stdexec/__detail/__meta.hpp index b757f91a5..45d316732 100644 --- a/include/stdexec/__detail/__meta.hpp +++ b/include/stdexec/__detail/__meta.hpp @@ -687,7 +687,7 @@ namespace stdexec { concept __has_id = requires { typename _Ty::__id; }; template - struct _Yp { + struct _Id { using __t = _Ty; // Uncomment the line below to find any code that likely misuses the @@ -707,7 +707,7 @@ namespace stdexec { template <> struct __id_ { template - using __f = _Yp<_Ty>; + using __f = _Id<_Ty>; }; template using __id = __minvoke<__id_<__has_id<_Ty>>, _Ty>; diff --git a/include/tbbexec/tbb_thread_pool.hpp b/include/tbbexec/tbb_thread_pool.hpp index 00c5b0777..bf14ba7dc 100644 --- a/include/tbbexec/tbb_thread_pool.hpp +++ b/include/tbbexec/tbb_thread_pool.hpp @@ -22,12 +22,6 @@ #include namespace tbbexec { - - template - class operation; - - using task_base = exec::static_thread_pool::task_base; - //! This is a P2300-style thread pool wrapping tbb::task_arena, which its docs describe as "A class that represents an //! explicit, user-managed task scheduler arena." //! Once set up, a tbb::task_arena has @@ -37,10 +31,18 @@ namespace tbbexec { //! //! See https://spec.oneapi.io/versions/1.0-rev-3/elements/oneTBB/source/task_scheduler/task_arena/task_arena_cls.html namespace detail { - template // CRTP + template + struct operation { + using Receiver = stdexec::__t; + struct __t; + }; + + using task_base = exec::static_thread_pool::task_base; + + template // CRTP class thread_pool_base { - template - friend class operation; + template + friend struct operation; public: struct scheduler { @@ -49,8 +51,8 @@ namespace tbbexec { bool operator==(const scheduler&) const = default; private: - template - friend class operation; + template + friend struct operation; class sender { public: @@ -61,27 +63,27 @@ namespace tbbexec { stdexec::completion_signatures; private: - template - operation>> - make_operation_(Receiver&& r) const { - return operation>>{ - this->pool_, (Receiver&&) r}; + template + stdexec::__t>> + make_operation_(Receiver rcvr) const { + return stdexec::__t>>{ + this->pool_, (Receiver&&) rcvr}; } template - friend operation>> - tag_invoke(stdexec::connect_t, sender s, Receiver&& r) { - return s.make_operation_(std::forward(r)); + friend stdexec::__t>> + tag_invoke(stdexec::connect_t, sender sndr, Receiver rcvr) { + return sndr.make_operation_(std::move(rcvr)); } template friend typename DerivedPoolType::scheduler - tag_invoke(stdexec::get_completion_scheduler_t, sender s) noexcept { - return s.pool_.get_scheduler(); + tag_invoke(stdexec::get_completion_scheduler_t, sender sndr) noexcept { + return sndr.pool_.get_scheduler(); } - friend const sender& tag_invoke(stdexec::get_env_t, const sender& s) noexcept { - return s; + friend const sender& tag_invoke(stdexec::get_env_t, const sender& sndr) noexcept { + return sndr; } friend struct DerivedPoolType::tbb_thread_pool::scheduler; @@ -218,58 +220,64 @@ namespace tbbexec { } }; - template + template struct bulk_receiver { - using receiver_concept = stdexec::receiver_t; using Sender = stdexec::__t; using Receiver = stdexec::__t; - using shared_state = bulk_shared_state; + struct __t { + using __id = bulk_receiver; + using receiver_concept = stdexec::receiver_t; - shared_state& shared_state_; + using shared_state = bulk_shared_state; - void enqueue() noexcept { - shared_state_.pool_.bulk_enqueue(&shared_state_, shared_state_.num_agents_required()); - } + shared_state& shared_state_; - template - friend void tag_invoke( - stdexec::same_as auto, - bulk_receiver&& self, - As&&... as) noexcept { - using tuple_t = stdexec::__decayed_tuple; + void enqueue() noexcept { + shared_state_.pool_.bulk_enqueue(&shared_state_, shared_state_.num_agents_required()); + } + + template + friend void tag_invoke( + stdexec::same_as auto, + __t&& self, + As&&... as) noexcept { + using tuple_t = stdexec::__decayed_tuple; - shared_state& state = self.shared_state_; + shared_state& state = self.shared_state_; - if constexpr (MayThrow) { - try { + if constexpr (MayThrow) { + try { + state.data_.template emplace((As&&) as...); + } catch (...) { + stdexec::set_error(std::move(state.receiver_), std::current_exception()); + } + } else { state.data_.template emplace((As&&) as...); - } catch (...) { - stdexec::set_error(std::move(state.receiver_), std::current_exception()); } - } else { - state.data_.template emplace((As&&) as...); - } - if (state.shape_) { - self.enqueue(); - } else { - state.apply([&](auto&... args) { - stdexec::set_value(std::move(state.receiver_), std::move(args)...); - }); + if (state.shape_) { + self.enqueue(); + } else { + state.apply([&](auto&... args) { + stdexec::set_value(std::move(state.receiver_), std::move(args)...); + }); + } } - } - template Tag, class... As> - friend void tag_invoke(Tag tag, bulk_receiver&& self, As&&... as) noexcept { - shared_state& state = self.shared_state_; - tag((Receiver&&) state.receiver_, (As&&) as...); - } + template < + stdexec::__one_of Tag, + class... As> + friend void tag_invoke(Tag tag, __t&& self, As&&... as) noexcept { + shared_state& state = self.shared_state_; + tag((Receiver&&) state.receiver_, (As&&) as...); + } - friend auto tag_invoke(stdexec::get_env_t, const bulk_receiver& self) noexcept - -> stdexec::env_of_t { - return stdexec::get_env(self.shared_state_.receiver_); - } + friend auto tag_invoke(stdexec::get_env_t, const __t& self) noexcept + -> stdexec::env_of_t { + return stdexec::get_env(self.shared_state_.receiver_); + } + }; }; template @@ -277,137 +285,138 @@ namespace tbbexec { using Sender = stdexec::__t; using Receiver = stdexec::__t; - static constexpr bool may_throw = !stdexec::__v, - stdexec::__mbind_front_q, - stdexec::__q>>; + struct __t { + using __id = bulk_op_state; + static constexpr bool may_throw = !stdexec::__v, + stdexec::__mbind_front_q, + stdexec::__q>>; - using bulk_rcvr = bulk_receiver; - using shared_state = bulk_shared_state; - using inner_op_state = stdexec::connect_result_t; + using bulk_rcvr = + stdexec::__t>; + using shared_state = bulk_shared_state; + using inner_op_state = stdexec::connect_result_t; - shared_state shared_state_; + shared_state shared_state_; - inner_op_state inner_op_; + inner_op_state inner_op_; - friend void tag_invoke(stdexec::start_t, bulk_op_state& op) noexcept { - stdexec::start(op.inner_op_); - } + friend void tag_invoke(stdexec::start_t, __t& op) noexcept { + stdexec::start(op.inner_op_); + } - bulk_op_state( - DerivedPoolType& pool, - Shape shape, - Fun fn, - Sender&& sender, - Receiver receiver) - : shared_state_(pool, (Receiver&&) receiver, shape, fn) - , inner_op_{stdexec::connect((Sender&&) sender, bulk_rcvr{shared_state_})} { - } + __t(DerivedPoolType& pool, Shape shape, Fun fn, Sender&& sender, Receiver receiver) + : shared_state_(pool, (Receiver&&) receiver, shape, fn) + , inner_op_{stdexec::connect((Sender&&) sender, bulk_rcvr{shared_state_})} { + } + }; }; template using __decay_ref = stdexec::__decay_t<_Ty>&; - template + template struct bulk_sender { - using sender_concept = stdexec::sender_t; using Sender = stdexec::__t; - using Fun = stdexec::__t; - DerivedPoolType& pool_; - Sender sndr_; - Shape shape_; - Fun fun_; + struct __t { + using __id = bulk_sender; + using sender_concept = stdexec::sender_t; - template - using with_error_invoke_t = stdexec::__if_c< - stdexec::__v, - stdexec::__mbind_front_q>, - stdexec::__q>>, - stdexec::completion_signatures<>, - stdexec::__with_exception_ptr>; - - template - using set_value_t = - stdexec::completion_signatures...)>; - - template - using completion_signatures = stdexec::__try_make_completion_signatures< - stdexec::__copy_cvref_t, - Env, - with_error_invoke_t, Env>, - stdexec::__q>; - - template - using bulk_op_state_t = bulk_op_state< - stdexec::__x>, - stdexec::__x>, - Shape, - Fun>; - - template Self, stdexec::receiver Receiver> - requires stdexec:: - receiver_of>> - friend bulk_op_state_t - tag_invoke(stdexec::connect_t, Self&& self, Receiver&& rcvr) noexcept( - stdexec::__nothrow_constructible_from< - bulk_op_state_t, - DerivedPoolType&, - Shape, - Fun, + DerivedPoolType& pool_; + Sender sndr_; + Shape shape_; + Fun fun_; + + template + using with_error_invoke_t = stdexec::__if_c< + stdexec::__v) { - return bulk_op_state_t{ - self.pool_, self.shape_, self.fun_, ((Self&&) self).sndr_, (Receiver&&) rcvr}; - } + Env, + stdexec::__transform< + stdexec::__q<__decay_ref>, + stdexec::__mbind_front_q>, + stdexec::__q>>, + stdexec::completion_signatures<>, + stdexec::__with_exception_ptr>; + + template + using set_value_t = + stdexec::completion_signatures...)>; + + template + using completion_signatures = stdexec::__try_make_completion_signatures< + stdexec::__copy_cvref_t, + Env, + with_error_invoke_t, Env>, + stdexec::__q>; + + template + using bulk_op_state_t = stdexec::__t< + bulk_op_state< stdexec::__cvref_id, stdexec::__id, Shape, Fun>>; + + template Self, stdexec::receiver Receiver> + requires stdexec:: + receiver_of>> + friend bulk_op_state_t + tag_invoke(stdexec::connect_t, Self&& self, Receiver rcvr) noexcept( + stdexec::__nothrow_constructible_from< + bulk_op_state_t, + DerivedPoolType&, + Shape, + Fun, + Sender, + Receiver>) { + return bulk_op_state_t{ + self.pool_, self.shape_, self.fun_, ((Self&&) self).sndr_, (Receiver&&) rcvr}; + } - template Self, class Env> - friend auto tag_invoke(stdexec::get_completion_signatures_t, Self&&, Env&&) - -> completion_signatures { - return {}; - } + template Self, class Env> + friend auto tag_invoke(stdexec::get_completion_signatures_t, Self&&, Env&&) + -> completion_signatures { + return {}; + } - template Tag, class... As> - requires stdexec::__callable - friend auto tag_invoke(Tag tag, const bulk_sender& self, As&&... as) noexcept( - stdexec::__nothrow_callable) - -> stdexec::__call_result_if_t< - stdexec::tag_category, - Tag, - const Sender&, - As...> { - return ((Tag&&) tag)(self.sndr_, (As&&) as...); - } + template Tag, class... As> + requires stdexec::__callable + friend auto tag_invoke(Tag tag, const __t& self, As&&... as) noexcept( + stdexec::__nothrow_callable) + -> stdexec::__call_result_if_t< + stdexec::tag_category, + Tag, + const Sender&, + As...> { + return ((Tag&&) tag)(self.sndr_, (As&&) as...); + } - template Tag> - friend const bulk_sender& tag_invoke(Tag tag, const bulk_sender& self) noexcept { - return self; - } + template Tag> + friend const __t& tag_invoke(Tag tag, const __t& self) noexcept { + return self; + } + }; }; sender make_sender() const { return sender{*pool_}; } - friend sender tag_invoke(stdexec::schedule_t, const scheduler& s) noexcept { - return s.make_sender(); + friend sender tag_invoke(stdexec::schedule_t, const scheduler& sch) noexcept { + return sch.make_sender(); } template - using bulk_sender_t = bulk_sender< - stdexec::__x>, - Shape, - stdexec::__x>>; - - template - friend bulk_sender_t - tag_invoke(stdexec::bulk_t, const scheduler& sch, S&& sndr, Shape shape, Fn fun) noexcept { - return bulk_sender_t{*sch.pool_, (S&&) sndr, shape, (Fn&&) fun}; + using bulk_sender_t = + stdexec::__t>, Shape, Fun>>; + + template + friend bulk_sender_t tag_invoke( + stdexec::bulk_t, + const scheduler& sch, + S&& sndr, + Shape shape, + Fun fun) noexcept { + return bulk_sender_t{*sch.pool_, (S&&) sndr, shape, (Fun&&) fun}; } constexpr stdexec::forward_progress_guarantee forward_progress_guarantee() const noexcept { @@ -457,12 +466,44 @@ namespace tbbexec { } }; + template + struct operation::__t : task_base { + using __id = operation; + friend class thread_pool_base; + + PoolType& pool_; + Receiver receiver_; + + explicit __t(PoolType& pool, Receiver rcvr) + : pool_(pool) + , receiver_(std::move(rcvr)) { + this->__execute = + [](task_base* t, std::uint32_t /* tid What is this needed for? */) noexcept { + auto& op = *static_cast<__t*>(t); + auto stoken = stdexec::get_stop_token(stdexec::get_env(op.receiver_)); + if (stoken.stop_requested()) { + stdexec::set_stopped(std::move(op.receiver_)); + } else { + stdexec::set_value(std::move(op.receiver_)); + } + }; + } + + void enqueue() noexcept { + pool_.enqueue(this); + } + + friend void tag_invoke(stdexec::start_t, __t& op) noexcept { + op.enqueue(); + } + }; } // namespace detail class tbb_thread_pool : public detail::thread_pool_base { public: //! Constructor forwards to tbb::task_arena constructor: - template + template + requires stdexec::constructible_from explicit tbb_thread_pool(Args&&... args) : arena_{std::forward(args)...} { arena_.initialize(); @@ -479,47 +520,13 @@ namespace tbbexec { friend detail::thread_pool_base; - template - friend class operation; + template + friend struct detail::operation; - void enqueue(task_base* task, std::uint32_t tid = 0) noexcept { + void enqueue(detail::task_base* task, std::uint32_t tid = 0) noexcept { arena_.enqueue([task, tid] { task->__execute(task, /*tid=*/tid); }); } tbb::task_arena arena_{tbb::task_arena::attach{}}; }; - - template - class operation : task_base { - using Receiver = stdexec::__t; - friend class detail::thread_pool_base; - - PoolType& pool_; - Receiver receiver_; - - explicit operation(PoolType& pool, Receiver&& r) - : pool_(pool) - , receiver_(std::move(r)) { - this->__execute = - [](task_base* t, std::uint32_t /* tid What is this needed for? */) noexcept { - auto& op = *static_cast(t); - auto stoken = stdexec::get_stop_token(stdexec::get_env(op.receiver_)); - if (stoken.stop_requested()) { - stdexec::set_stopped(std::move(op.receiver_)); - } else { - stdexec::set_value(std::move(op.receiver_)); - } - }; - } - - void enqueue() noexcept { - pool_.enqueue(this); - } - - friend void tag_invoke(stdexec::start_t, operation& op) noexcept { - op.enqueue(); - } - }; - - } // namespace tbbexec