diff --git a/include/exec/sequence.hpp b/include/exec/sequence.hpp index af148b8c7..a0056bd7c 100644 --- a/include/exec/sequence.hpp +++ b/include/exec/sequence.hpp @@ -16,7 +16,8 @@ #pragma once #include -#include +#include +#include namespace exec { namespace _seq { @@ -25,121 +26,121 @@ namespace exec { struct sequence_t { template - Sndr operator()(Sndr sndr) const; + STDEXEC_ATTRIBUTE((nodiscard, host, device)) + Sndr + operator()(Sndr sndr) const; template requires(sizeof...(Sndrs) > 1) && stdexec::__domain::__has_common_domain - _sndr operator()(Sndrs... sndrs) const; + STDEXEC_ATTRIBUTE((nodiscard, host, device)) + _sndr + operator()(Sndrs... sndrs) const; }; - template - struct _ops_tuple; - - template - struct _ops_tuple : _ops_tuple { - explicit _ops_tuple(Sndr&& sndr, Rest&&... rest) - : _ops_tuple{static_cast(rest)...} - , _head{static_cast(sndr)} { - } - - Sndr _head; - - _ops_tuple& _tail() noexcept { - return *this; - } - }; - - template - struct _ops_tuple { - using _rcvr_t = Rcvr; - Rcvr _rcvr; - }; - - template - union _ops_variant { }; - - template + template struct _rcvr { using receiver_concept = stdexec::receiver_t; - using _rcvr_t = typename _ops_tuple::_rcvr_t; - _ops_variant* _self; + using _opstate_t = stdexec::__t; + _opstate_t* _opstate; template - void set_value(Args&&... args) && noexcept { - auto& sndrs = *_self->_head.__get()._sndrs; - try { - if constexpr (sizeof...(Rest) == 1) { - // destroy _head after completing the operation in case the arguments are references - // to objects owned by _head. - stdexec::set_value(static_cast<_rcvr_t&&>(sndrs._rcvr), static_cast(args)...); - _self->_head.__destroy(); - } else { - _self->_head.__destroy(); - _self->_tail.__construct(sndrs._head, sndrs._tail()); // potentially throwing - stdexec::start(_self->_tail.__get()._head.__get()._op); - } - } catch (...) { - stdexec::set_error(static_cast<_rcvr_t&&>(sndrs._rcvr), std::current_exception()); - } + STDEXEC_ATTRIBUTE((always_inline, host, device)) + void + set_value(Args&&... args) && noexcept { + _opstate->_set_value(Index(), static_cast(args)...); } template - void set_error(Error&& err) && noexcept { - stdexec::set_error( - static_cast<_rcvr_t&&>(_self->_head.__get()._sndrs->_rcvr), static_cast(err)); - _self->_head.__destroy(); + STDEXEC_ATTRIBUTE((host, device)) + void + set_error(Error&& err) && noexcept { + stdexec::set_error(static_cast(_opstate->_rcvr), static_cast(err)); } - void set_stopped() && noexcept { - stdexec::set_stopped(static_cast<_rcvr_t&&>(_self->_head.__get()._sndrs->_rcvr)); - _self->_head.__destroy(); + STDEXEC_ATTRIBUTE((host, device)) + void + set_stopped() && noexcept { + stdexec::set_stopped(static_cast(_opstate->_rcvr)); } - stdexec::env_of_t<_rcvr_t> get_env() const noexcept { - return stdexec::get_env(_self->_head.__get()._sndrs->_rcvr); + // TODO: use the predecessor's completion scheduler as the current scheduler here. + STDEXEC_ATTRIBUTE((host, device)) + stdexec::env_of_t + get_env() const noexcept { + return stdexec::get_env(_opstate->_rcvr); } }; - template - requires(sizeof...(Rest) > 0) - union _ops_variant { - explicit _ops_variant(Sndr& sndr, _ops_tuple& sndrs) { - auto connect_fn = [&] { - return stdexec::connect(static_cast(sndr), _rcvr{this}); - }; - _head.__construct(&sndrs, stdexec::__emplace_from{connect_fn}); - } - - ~_ops_variant() { - } - - struct _head_t { - _ops_tuple* _sndrs; - stdexec::connect_result_t> _op; - }; - - stdexec::__manual_lifetime<_head_t> _head; - stdexec::__manual_lifetime<_ops_variant> _tail; - }; - template struct _opstate; - template - struct _opstate { + template + struct _opstate { using operation_state_concept = stdexec::operation_state_t; - _ops_tuple _tupl; - _ops_variant _var; + // We will be connecting the first sender in the opstate constructor, so we don't need to + // store it in the opstate. The use of `stdexec::__ignore` causes the first sender to not + // be stored. + using _senders_tuple_t = stdexec::__tuple_for; + + template + using _rcvr_t = _rcvr, stdexec::__msize_t>; + + template + using _child_opstate_t = stdexec::connect_result_t>>; + + using _mk_child_ops_variant_fn = + stdexec::__mzip_with2, stdexec::__qq>; + + using _ops_variant_t = stdexec::__minvoke< + _mk_child_ops_variant_fn, + stdexec::__tuple_for, + stdexec::__make_indices>; + + template + STDEXEC_ATTRIBUTE((host, device)) + explicit _opstate(Rcvr&& rcvr, CvrefSndrs&& sndrs) + : _rcvr{static_cast(rcvr)} + , _sndrs{_senders_tuple_t::__convert_from(static_cast(sndrs))} + // move all but the first sender into the opstate. + , _ops{} { + // Below, it looks like we are using `sndrs` after it has been moved from. This is not the + // case. `sndrs` is moved into a tuple type that has `__ignore` for the first element. The + // result is that the first sender in `sndrs` is not moved from, but the rest are. + _ops.template emplace_from_at<0>( + stdexec::connect, + stdexec::__tup::get<0>(static_cast(sndrs)), + _rcvr_t<0>{this}); + } - explicit _opstate(Rcvr&& rcvr, Sndr sndr, Rest&&... rest) - : _tupl{static_cast(rest)..., static_cast(rcvr)} - , _var{sndr, _tupl} { + template + STDEXEC_ATTRIBUTE((host, device)) + void + _set_value(Index, [[maybe_unused]] Args&&... args) noexcept { + try { + constexpr size_t Idx = stdexec::__v + 1; + if constexpr (Idx == sizeof...(Sndrs) + 1) { + stdexec::set_value(static_cast(_rcvr), static_cast(args)...); + } else { + auto& sndr = stdexec::__tup::get(_sndrs); + auto& op = _ops.template emplace_from_at( + stdexec::connect, std::move(sndr), _rcvr_t{this}); + stdexec::start(op); + } + } catch (...) { + stdexec::set_error(static_cast(_rcvr), std::current_exception()); + } } - void start() & noexcept { - stdexec::start(_var._head.__get()._op); + STDEXEC_ATTRIBUTE((host, device)) + void + start() & noexcept { + stdexec::start(_ops.template get<0>()); } + + Rcvr _rcvr; + _senders_tuple_t _sndrs; + _ops_variant_t _ops; }; // The completions of the sequence sender are the error and stopped completions of all the @@ -183,7 +184,7 @@ namespace exec { }; template - struct _sndr : stdexec::__tuple_for { + struct _sndr { using sender_concept = stdexec::sender_t; template @@ -191,33 +192,39 @@ namespace exec { template requires(stdexec::__decay_copyable> && ...) - static auto get_completion_signatures(Self&&, Env&&...) -> _completions_t { + STDEXEC_ATTRIBUTE((host, device)) + static auto + get_completion_signatures(Self&&, Env&&...) -> _completions_t { return {}; } template - static auto connect(Self&& self, Rcvr rcvr) { - return self.apply( - [](Rcvr&& rcvr, auto, auto, Sndrs... sndrs) { - return _opstate{ - static_cast(rcvr), static_cast(sndrs)...}; - }, - static_cast(self), - static_cast(rcvr)); + STDEXEC_ATTRIBUTE((host, device)) + static auto + connect(Self&& self, Rcvr rcvr) { + return _opstate{static_cast(rcvr), static_cast(self)._sndrs}; } + + STDEXEC_ATTRIBUTE((no_unique_address, maybe_unused)) + sequence_t _tag; // + STDEXEC_ATTRIBUTE((no_unique_address, maybe_unused)) + stdexec::__ignore _ignore; // + stdexec::__tuple_for _sndrs; }; template - Sndr sequence_t::operator()(Sndr sndr) const { + STDEXEC_ATTRIBUTE((host, device)) + Sndr + sequence_t::operator()(Sndr sndr) const { return sndr; } template requires(sizeof...(Sndrs) > 1) && stdexec::__domain::__has_common_domain - _sndr sequence_t::operator()(Sndrs... sndrs) const { - return _sndr{ - {{}, {}, {static_cast(sndrs)}...} - }; + STDEXEC_ATTRIBUTE((host, device)) + _sndr + sequence_t::operator()(Sndrs... sndrs) const { + return _sndr{{}, {}, {{static_cast(sndrs)}...}}; } } // namespace _seq diff --git a/include/stdexec/__detail/__meta.hpp b/include/stdexec/__detail/__meta.hpp index e62bb86ec..75ff05933 100644 --- a/include/stdexec/__detail/__meta.hpp +++ b/include/stdexec/__detail/__meta.hpp @@ -655,6 +655,18 @@ namespace stdexec { using __f = __minvoke<_Fn, _As...>; }; + template + struct __muncurry_<__pack::__t<_Ns...> *> { + template + using __f = __minvoke<_Fn, __msize_t<_Ns>...>; + }; + + template