diff --git a/include/exec/__detail/__system_context_default_impl.hpp b/include/exec/__detail/__system_context_default_impl.hpp index 66b48191f..7ed109737 100644 --- a/include/exec/__detail/__system_context_default_impl.hpp +++ b/include/exec/__detail/__system_context_default_impl.hpp @@ -15,12 +15,17 @@ */ #pragma once -#include "__system_context_if.h" +#include "__system_context_replaceability_api.hpp" #include "stdexec/execution.hpp" #include "exec/static_thread_pool.hpp" namespace exec::__system_context_default_impl { using namespace stdexec::tags; + using system_context_replaceability::receiver; + using system_context_replaceability::bulk_item_receiver; + using system_context_replaceability::storage; + using system_context_replaceability::system_scheduler; + using system_context_replaceability::__system_context_replaceability; using __pool_scheduler_t = decltype(std::declval().get_scheduler()); @@ -28,32 +33,79 @@ namespace exec::__system_context_default_impl { template struct __operation; + /* + Storage needed for a backend operation-state: + + schedule: + - __recv::__r_ (receiver*) -- 8 + - __recv::__op_ (__operation*) -- 8 + - __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 56 (when connected with an empty receiver) + - __operation::__on_heap_ (bool) -- optimized away + --------------------- + Total: 72; extra 16 bytes compared to internal operation state. + + extra for bulk: + - __recv::__r_ (receiver*) -- 8 + - __recv::__op_ (__operation*) -- 8 + - __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 128 (when connected with an empty receiver & fun) + - __operation::__on_heap_ (bool) -- optimized away + - __bulk_functor::__r_ (bulk_item_receiver*) - 8 + --------------------- + Total: 152; extra 24 bytes compared to internal operation state. + + [*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations. + */ + template struct __recv { using receiver_concept = stdexec::receiver_t; - /// The callback to be called. - __exec_system_context_completion_callback_t __cb_; - - /// The data to be passed to the callback. - void* __data_; + //! The operation state on the frontend. + receiver* __r_; - /// The owning operation state, to be destructed when the operation completes. + //! The parent operation state that we will destroy when we complete. __operation<_Sender>* __op_; void set_value() noexcept { - __cb_(__data_, 0, nullptr); + auto __op = __op_; + auto __r = __r_; + __op->__destruct(); // destroys the operation, including `this`. + __r->set_value(); + // Note: when calling a completion signal, the parent operation might complete, making the + // static storage passed to this operation invalid. Thus, we need to ensure that we are not + // using the operation state after the completion signal. } void set_error(std::exception_ptr __ptr) noexcept { - __cb_(__data_, 2, *reinterpret_cast(&__ptr)); + auto __op = __op_; + auto __r = __r_; + __op->__destruct(); // destroys the operation, including `this`. + __r->set_error(__ptr); } void set_stopped() noexcept { - __cb_(__data_, 1, nullptr); + auto __op = __op_; + auto __r = __r_; + __op->__destruct(); // destroys the operation, including `this`. + __r->set_stopped(); } }; + /// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment. + inline storage __ensure_alignment(storage __storage, size_t __alignment) noexcept { + auto __pn = reinterpret_cast(__storage.__data); + if (__pn % __alignment == 0) { + return __storage; + } else if (__storage.__size < __alignment) { + return {nullptr, 0}; + } else { + auto __new_pn = (__pn + __alignment - 1) & ~(__alignment - 1); + return { + reinterpret_cast(__new_pn), + static_cast(__storage.__size - (__new_pn - __pn))}; + } + } + template struct __operation { /// The inner operation state, that results out of connecting the underlying sender with the receiver. @@ -62,19 +114,21 @@ namespace exec::__system_context_default_impl { bool __on_heap_; /// Try to construct the operation in the preallocated memory if it fits, otherwise allocate a new operation. - static __operation* __construct_maybe_alloc( - void* __preallocated, - size_t __psize, - _Sender __sndr, - __exec_system_context_completion_callback_t __cb, - void* __data) { - if (__preallocated == nullptr || __psize < sizeof(__operation)) { - return new __operation(std::move(__sndr), __cb, __data, true); + static __operation* + __construct_maybe_alloc(storage __storage, receiver* __completion, _Sender __sndr) { + __storage = __ensure_alignment(__storage, alignof(__operation)); + if (__storage.__data == nullptr || __storage.__size < sizeof(__operation)) { + return new __operation(std::move(__sndr), __completion, true); } else { - return new (__preallocated) __operation(std::move(__sndr), __cb, __data, false); + return new (__storage.__data) __operation(std::move(__sndr), __completion, false); } } + //! Starts the operation that will schedule work on the system scheduler. + void start() noexcept { + stdexec::start(__inner_op_); + } + /// Destructs the operation; frees any allocated memory. void __destruct() { if (__on_heap_) { @@ -85,40 +139,27 @@ namespace exec::__system_context_default_impl { } private: - __operation( - _Sender __sndr, - __exec_system_context_completion_callback_t __cb, - void* __data, - bool __on_heap) - : __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__cb, __data, this})) + __operation(_Sender __sndr, receiver* __completion, bool __on_heap) + : __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__completion, this})) , __on_heap_(__on_heap) { } }; - struct __system_scheduler_impl : __exec_system_scheduler_interface { - explicit __system_scheduler_impl(exec::static_thread_pool& __pool) - : __pool_scheduler_{__pool.get_scheduler()} { - __forward_progress_guarantee = 1; // parallel - __schedule_operation_size = sizeof(__schedule_operation_t), - __schedule_operation_alignment = alignof(__schedule_operation_t), - __schedule = __schedule_impl; - __destruct_schedule_operation = __destruct_schedule_operation_impl; - __bulk_schedule_operation_size = sizeof(__bulk_schedule_operation_t), - __bulk_schedule_operation_alignment = alignof(__bulk_schedule_operation_t), - __bulk_schedule = __bulk_schedule_impl; - __destruct_bulk_schedule_operation = __destruct_bulk_schedule_operation_impl; + struct __system_scheduler_impl : system_scheduler { + __system_scheduler_impl() + : __pool_scheduler_(__pool_.get_scheduler()) { } - private: - /// Scheduler from the underlying thread pool. + /// The underlying thread pool. + exec::static_thread_pool __pool_; __pool_scheduler_t __pool_scheduler_; + //! Functor called by the `bulk` operation; sends a `start` signal to the frontend. struct __bulk_functor { - __exec_system_context_bulk_item_callback_t __cb_item_; - void* __data_; + bulk_item_receiver* __r_; void operator()(unsigned long __idx) const noexcept { - __cb_item_(__data_, __idx); + __r_->start(static_cast(__idx)); } }; @@ -127,78 +168,36 @@ namespace exec::__system_context_default_impl { using __bulk_schedule_operation_t = __operation()), - std::declval(), + std::declval(), std::declval<__bulk_functor>()))>; - static void* __schedule_impl( - __exec_system_scheduler_interface* __self, - void* __preallocated, - uint32_t __psize, - __exec_system_context_completion_callback_t __cb, - void* __data) noexcept { - - auto __this = static_cast<__system_scheduler_impl*>(__self); - auto __sndr = stdexec::schedule(__this->__pool_scheduler_); - auto __os = __schedule_operation_t::__construct_maybe_alloc( - __preallocated, __psize, std::move(__sndr), __cb, __data); - stdexec::start(__os->__inner_op_); - return __os; - } - - static void __destruct_schedule_operation_impl( - __exec_system_scheduler_interface* /*__self*/, - void* __operation) noexcept { - auto __op = static_cast<__schedule_operation_t*>(__operation); - __op->__destruct(); - } - - static void* __bulk_schedule_impl( - __exec_system_scheduler_interface* __self, - void* __preallocated, - uint32_t __psize, - __exec_system_context_completion_callback_t __cb, - __exec_system_context_bulk_item_callback_t __cb_item, - void* __data, - unsigned long __size) noexcept { - - auto __this = static_cast<__system_scheduler_impl*>(__self); - auto __sndr = stdexec::bulk( - stdexec::schedule(__this->__pool_scheduler_), __size, __bulk_functor{__cb_item, __data}); - auto __os = __bulk_schedule_operation_t::__construct_maybe_alloc( - __preallocated, __psize, std::move(__sndr), __cb, __data); - stdexec::start(__os->__inner_op_); - return __os; - } - - static void __destruct_bulk_schedule_operation_impl( - __exec_system_scheduler_interface* /*__self*/, - void* __operation) noexcept { - auto __op = static_cast<__bulk_schedule_operation_t*>(__operation); - __op->__destruct(); - } - }; - - /// Default implementation of a system context, based on `static_thread_pool` - struct __system_context_impl : __exec_system_context_interface { - __system_context_impl() { - __version = 202402; - __get_scheduler = __get_scheduler_impl; + public: + void schedule(storage __storage, receiver* __r) noexcept override { + try { + auto __sndr = stdexec::schedule(__pool_scheduler_); + auto __os = + __schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr)); + __os->start(); + } catch (std::exception& __e) { + __r->set_error(std::current_exception()); + } } - private: - /// The underlying thread pool. - exec::static_thread_pool __pool_{}; - - /// The system scheduler implementation. - __system_scheduler_impl __scheduler_{__pool_}; - - static __exec_system_scheduler_interface* - __get_scheduler_impl(__exec_system_context_interface* __self) noexcept { - return &static_cast<__system_context_impl*>(__self)->__scheduler_; + void + bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept override { + try { + auto __sndr = + stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r}); + auto __os = + __bulk_schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr)); + __os->start(); + } catch (std::exception& __e) { + __r->set_error(std::current_exception()); + } } }; - /// Keeps track of the object implementing the system context interface. + /// Keeps track of the object implementing the system context interfaces. struct __instance_holder { /// Get the only instance of this class. @@ -208,22 +207,41 @@ namespace exec::__system_context_default_impl { } /// Get the currently selected system context object. - __exec_system_context_interface* __get_current_instance() const noexcept { + system_scheduler* __get_current_instance() const noexcept { return __current_instance_; } /// Allows changing the currently selected system context object; used for testing. - void __set_current_instance(__exec_system_context_interface* __instance) noexcept { + void __set_current_instance(system_scheduler* __instance) noexcept { __current_instance_ = __instance; } private: __instance_holder() { - static __system_context_impl __default_instance_; + static __system_scheduler_impl __default_instance_; __current_instance_ = &__default_instance_; } - __exec_system_context_interface* __current_instance_; + system_scheduler* __current_instance_; }; + struct __system_context_replaceability_impl : __system_context_replaceability { + //! Globally replaces the system scheduler backend. + //! This needs to be called within `main()` and before the system scheduler is accessed. + void __set_system_scheduler(system_scheduler* __backend) noexcept override { + __instance_holder::__singleton().__set_current_instance(__backend); + } + }; + + void* __default_query_system_context_interface(const __uuid& __id) noexcept { + if (__id == system_scheduler::__interface_identifier) { + return __instance_holder::__singleton().__get_current_instance(); + } else if (__id == __system_context_replaceability::__interface_identifier) { + static __system_context_replaceability_impl __impl; + return &__impl; + } + + return nullptr; + } + } // namespace exec::__system_context_default_impl diff --git a/include/exec/__detail/__system_context_default_impl_entry.hpp b/include/exec/__detail/__system_context_default_impl_entry.hpp index db325c6d4..9027e8274 100644 --- a/include/exec/__detail/__system_context_default_impl_entry.hpp +++ b/include/exec/__detail/__system_context_default_impl_entry.hpp @@ -27,19 +27,12 @@ STDEXEC_PRAGMA_PUSH() STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes") // warning: inline function '[...]' declared weak /// Gets the default system context implementation. -extern "C" STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak)) -__exec_system_context_interface* - __get_exec_system_context_impl() { - return exec::__system_context_default_impl::__instance_holder::__singleton() - .__get_current_instance(); -} - -/// Sets the default system context implementation. -extern "C" STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak)) - void - __set_exec_system_context_impl(__exec_system_context_interface* __instance) { - return exec::__system_context_default_impl::__instance_holder::__singleton() - .__set_current_instance(__instance); +extern + STDEXEC_SYSTEM_CONTEXT_INLINE + STDEXEC_ATTRIBUTE((weak)) + void* + __query_system_context_interface(const __uuid& __id) noexcept { + return exec::__system_context_default_impl::__default_query_system_context_interface(__id); } STDEXEC_PRAGMA_POP() diff --git a/include/exec/__detail/__system_context_if.h b/include/exec/__detail/__system_context_if.h deleted file mode 100644 index 5151ae780..000000000 --- a/include/exec/__detail/__system_context_if.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2024 Lee Howes, Lucian Radu Teodorescu - * - * Licensed under the Apache License Version 2.0 with LLVM Exceptions - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://llvm.org/LICENSE.txt - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef STDEXEC_SYSTEM_CONTEXT_IF_H -#define STDEXEC_SYSTEM_CONTEXT_IF_H - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -struct __exec_system_context_interface; -struct __exec_system_scheduler_interface; - -/// Interface that allows interaction with the system context, allowing scheduling work on the system. -struct __exec_system_context_interface { - /// The supported version of the system context interface, in the form YYYYMM. - uint32_t __version; - - /// Returns an interface to the system scheduler. - struct __exec_system_scheduler_interface* (*__get_scheduler)( - struct __exec_system_context_interface* /*self*/); -}; - -/// Callback to be called by the scheduler when new work can start. -typedef void (*__exec_system_context_completion_callback_t)( - void*, // data pointer passed to scheduler - int, // completion type: 0 for normal completion, 1 for cancellation, 2 for exception - void*); // If completion type is 2, this is the exception pointer. - -/// Callback to be called by the scheduler for each bulk item. -typedef void (*__exec_system_context_bulk_item_callback_t)( - void*, // data pointer passed to scheduler - unsigned long); // the index of the work item that is starting - -struct __exec_system_scheduler_interface { - /// The forward progress guarantee of the scheduler. - /// - /// 0 == concurrent, 1 == parallel, 2 == weakly_parallel - uint32_t __forward_progress_guarantee; - - /// The size of the operation state object on the implementation side. - uint32_t __schedule_operation_size; - /// The alignment of the operation state object on the implementation side. - uint32_t __schedule_operation_alignment; - - /// Schedules new work on the system scheduler, calling `cb` with `data` when the work can start. - /// Returns an object that should be passed to __destruct_schedule_operation when the operation completes. - void* (*__schedule)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*__preallocated*/, - uint32_t /*__psize*/, - __exec_system_context_completion_callback_t /*cb*/, - void* /*data*/); - - /// Destructs the operation state object. - void (*__destruct_schedule_operation)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*operation*/); - - /// The size of the operation state object on the implementation side. - uint32_t __bulk_schedule_operation_size; - /// The alignment of the operation state object on the implementation side. - uint32_t __bulk_schedule_operation_alignment; - - /// Schedules new bulk work of size `size` on the system scheduler, calling `cb_item` with `data` - /// for indices in [0, `size`), and calling `cb` on general completion. - /// Returns the operation state object that should be passed to __destruct_bulk_schedule_operation. - void* (*__bulk_schedule)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*__preallocated*/, - uint32_t /*__psize*/, - __exec_system_context_completion_callback_t /*cb*/, - __exec_system_context_bulk_item_callback_t /*cb_item*/, - void* /*data*/, - unsigned long /*size*/); - - /// Destructs the operation state object for a bulk_schedule. - void (*__destruct_bulk_schedule_operation)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*operation*/); -}; - -#ifdef __cplusplus -} -#endif - - -#endif diff --git a/include/exec/__detail/__system_context_replaceability_api.hpp b/include/exec/__detail/__system_context_replaceability_api.hpp new file mode 100644 index 000000000..3beb5618c --- /dev/null +++ b/include/exec/__detail/__system_context_replaceability_api.hpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2025 Lucian Radu Teodorescu, Lewis Baker + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H +#define STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H + +#include "stdexec/__detail/__execution_fwd.hpp" + +#include + +struct __uuid { + uint64_t __parts1; + uint64_t __parts2; + + friend bool operator==(__uuid, __uuid) noexcept = default; +}; + +/// Implementation-defined mechanism of querying a system context interface identified by `__id`. +extern void* __query_system_context_interface(const __uuid& __id) noexcept; + +namespace exec::system_context_replaceability { + + //! Helper for the `__queryable_interface` concept. + template <__uuid X> + using __check_constexpr_uuid = void; + + //! Concept for a queryable interface. Ensures that the interface has a `__interface_identifier` member. + template + concept __queryable_interface = + requires() { typename __check_constexpr_uuid<_T::__interface_identifier>; }; + + /// Query the system context for an interface of type `_Interface`. + template <__queryable_interface _Interface> + inline _Interface* query_system_context() { + return static_cast<_Interface*>( + __query_system_context_interface(_Interface::__interface_identifier)); + } + + /// Interface for completing a sender operation. + /// Backend will call frontend though this interface for completing the `schedule` and `schedule_bulk` operations. + struct receiver { + virtual ~receiver() = default; + + /// Called when the system scheduler completes successfully. + virtual void set_value() noexcept = 0; + /// Called when the system scheduler completes with an error. + virtual void set_error(std::exception_ptr) noexcept = 0; + /// Called when the system scheduler was stopped. + virtual void set_stopped() noexcept = 0; + }; + + /// Receiver for bulk sheduling operations. + struct bulk_item_receiver : receiver { + /// Called for each item of a bulk operation, possible on different threads. + virtual void start(uint32_t) noexcept = 0; + }; + + /// Describes a storage space. + /// Used to pass preallocated storage from the frontend to the backend. + struct storage { + void* __data; + uint32_t __size; + }; + + /// Interface for the system scheduler + struct system_scheduler { + static constexpr __uuid __interface_identifier{0x5ee9202498c4bd4f, 0xa1df2508ffcd9d7e}; + + virtual ~system_scheduler() = default; + + /// Schedule work on system scheduler, calling `__r` when done and using `__s` for preallocated memory. + virtual void schedule(storage __s, receiver* __r) noexcept = 0; + /// Schedule bulk work of size `__n` on system scheduler, calling `__r` for each item and then when done, and using `__s` for preallocated memory. + virtual void bulk_schedule(uint32_t __n, storage __s, bulk_item_receiver* __r) noexcept = 0; + }; + + /// Implementation-defined mechanism for replacing the system scheduler backend at run-time. + struct __system_context_replaceability { + static constexpr __uuid __interface_identifier{0xc008a3be3bb9284b, 0xb98edb3a740ee02c}; + + /// Globally replaces the system scheduler backend. + /// This needs to be called within `main()` and before the system scheduler is accessed. + virtual void __set_system_scheduler(system_scheduler*) noexcept = 0; + }; + +} // namespace exec::system_context_replaceability + +#endif \ No newline at end of file diff --git a/include/exec/system_context.hpp b/include/exec/system_context.hpp index 91ec1435e..6804cd8a3 100644 --- a/include/exec/system_context.hpp +++ b/include/exec/system_context.hpp @@ -16,16 +16,16 @@ #pragma once #include "stdexec/execution.hpp" -#include "__detail/__system_context_if.h" +#include "__detail/__system_context_replaceability_api.hpp" #ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE -# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 80 +# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 72 #endif #ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN # define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN 8 #endif #ifndef STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE -# define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE 168 +# define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE 152 #endif #ifndef STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN # define STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN 8 @@ -33,51 +33,32 @@ // TODO: make these configurable by providing policy to the system context -/// Gets the default system context implementation. -extern "C" -__exec_system_context_interface* - __get_exec_system_context_impl(); - -/// Sets the default system context implementation. -extern "C" - void - __set_exec_system_context_impl(__exec_system_context_interface* __instance); - namespace exec { namespace __detail { using namespace stdexec::tags; - /// Transforms from a C API signal to the `set_xxx` completion signal. + /// Allows a frontend receiver of type `_Rcvr` to be passed to the backend. template - inline void __pass_to_receiver(int __completion_type, void* __exception, _Rcvr&& __rcvr) { - if (__completion_type == 0) { - stdexec::set_value(std::forward<_Rcvr>(__rcvr)); - } else if (__completion_type == 1) { - stdexec::set_stopped(std::forward<_Rcvr>(__rcvr)); - } else if (__completion_type == 2) { - stdexec::set_error( - std::forward<_Rcvr>(__rcvr), - std::move(*reinterpret_cast(&__exception))); + struct __receiver_adapter : system_context_replaceability::receiver { + explicit __receiver_adapter(_Rcvr&& __rcvr) + : __rcvr_{std::forward<_Rcvr>(__rcvr)} { } - } - /// Same as a above, but allows passing arguments to set_value. - template - inline void __pass_to_receiver_with_args( - int __completion_type, - void* __exception, - _Rcvr&& __rcvr, - _SetValueArgs&&... __setValueArgs) { - if (__completion_type == 0) { - stdexec::set_value(std::forward<_Rcvr>(__rcvr), std::move(__setValueArgs)...); - } else if (__completion_type == 1) { - stdexec::set_stopped(std::forward<_Rcvr>(__rcvr)); - } else if (__completion_type == 2) { - stdexec::set_error( - std::forward<_Rcvr>(__rcvr), - std::move(*reinterpret_cast(&__exception))); + void set_value() noexcept override { + stdexec::set_value(std::forward<_Rcvr>(__rcvr_)); } - } + + void set_error(std::exception_ptr __ex) noexcept override { + stdexec::set_error(std::forward<_Rcvr>(__rcvr_), std::move(__ex)); + } + + void set_stopped() noexcept override { + stdexec::set_stopped(std::forward<_Rcvr>(__rcvr_)); + } + + [[no_unique_address]] + _Rcvr __rcvr_; + }; /// The type large enough to store the data produced by a sender. template @@ -110,7 +91,7 @@ namespace exec { private: /// The actual implementation of the system context. - __exec_system_context_interface* __impl_{nullptr}; + system_context_replaceability::system_scheduler* __impl_{nullptr}; }; /// The execution domain of the system_scheduler, used for the purposes of customizing @@ -125,7 +106,7 @@ namespace exec { namespace __detail { template - auto __make_system_scheduler_from(T, __exec_system_scheduler_interface*) noexcept; + auto __make_system_scheduler_from(T, system_context_replaceability::system_scheduler*) noexcept; /// Describes the environment of this sender. struct __system_scheduler_env { @@ -136,23 +117,67 @@ namespace exec { } /// The underlying implementation of the scheduler we are using. - __exec_system_scheduler_interface* __scheduler_; + system_context_replaceability::system_scheduler* __scheduler_; + }; + + template + struct __aligned_storage { + alignas(_Align) unsigned char __data_[_Size]; + + system_context_replaceability::storage __as_storage() noexcept { + return {__data_, _Size}; + } + + template + _T& __as() noexcept { + static_assert(alignof(_T) <= _Align); + return *reinterpret_cast<_T*>(__data_); + } + + void* __as_ptr() noexcept { + return __data_; + } }; + /* + Storage needed for a frontend operation-state: + + schedule: + - __receiver_adapter::__vtable -- 8 + - __receiver_adapter::__rcvr_ (Rcvr) -- assuming 0 + - __system_op::__preallocated_ (__preallocated) -- 72 + --------------------- + Total: 80; extra 8 bytes compared to backend needs. + + for bulk: + - __bulk_state_base::__fun_ (_Fn) -- 0 (assuming empty function) + - __bulk_state_base::__rcvr_ (_Rcvr) -- 0 (assuming empty receiver) + - __forward_args_receiver::__vtable -- 8 + - __forward_args_receiver::__arguments_data_ (array of bytes) -- 8 (depending on previous sender) + - __bulk_state_base::__prepare_storage_for_backend (fun ptr) -- 8 + - __bulk_state::__preallocated_ (__preallocated_) -- 152 + - __previous_operation_state_ (__inner_op_state) -- 104 + - __bulk_intermediate_receiver::__state_ (__state_&) -- 8 + - __bulk_intermediate_receiver::__scheduler_ (system_scheduler*) -- 8 + - __bulk_intermediate_receiver::__size_ (_Size) -- 4 + --------------------- + Total: 176; extra 24 bytes compared to backend needs. + + [*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations. + */ + /// The operation state used to execute the work described by this sender. template struct __system_op { /// Constructs `this` from `__rcvr` and `__scheduler_impl`. - __system_op(_Rcvr&& __rcvr, __exec_system_scheduler_interface* __scheduler_impl) - : __rcvr_{std::move(__rcvr)} - , __scheduler_{__scheduler_impl} { + __system_op(_Rcvr&& __rcvr, system_context_replaceability::system_scheduler* __scheduler_impl) + : __rcvr_{std::forward<_Rcvr>(__rcvr)} { + // Before the operation starts, we store the scheduelr implementation in __preallocated_. + // After the operation starts, we don't need this pointer anymore, and the storage can be used by the backend + __preallocated_.__as() = __scheduler_impl; } - ~__system_op() { - if (__impl_os_ != nullptr) { - __scheduler_->__destruct_schedule_operation(__scheduler_, __impl_os_); - } - } + ~__system_op() = default; __system_op(const __system_op&) = delete; __system_op(__system_op&&) = delete; @@ -161,26 +186,20 @@ namespace exec { /// Starts the work stored in `this`. void start() & noexcept { - __impl_os_ = __scheduler_->__schedule( - __scheduler_, &__preallocated_, sizeof(__preallocated_), __cb, this); - } - - static void __cb(void* __data, int __completion_type, void* __exception) { - auto __self = static_cast<__system_op*>(__data); - __detail::__pass_to_receiver(__completion_type, __exception, std::move(__self->__rcvr_)); + auto* __scheduler_impl = + __preallocated_.__as(); + __scheduler_impl->schedule(__preallocated_.__as_storage(), &__rcvr_); } /// Object that receives completion from the work described by the sender. - _Rcvr __rcvr_; - /// The underlying implementation of the scheduler. - __exec_system_scheduler_interface* __scheduler_{nullptr}; - /// The operating state on the implementation side. - void* __impl_os_{nullptr}; + __receiver_adapter<_Rcvr> __rcvr_; /// Preallocated space for storing the operation state on the implementation size. - struct alignas(STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN) __preallocated { - char __data[STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE]; - } __preallocated_; + /// We also store here the backend interface for the scheduler before we actually start the operation. + __aligned_storage< + STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE, + STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN> + __preallocated_; }; } // namespace __detail @@ -196,7 +215,7 @@ namespace exec { stdexec::set_error_t(std::exception_ptr)>; /// Implementation detail. Constructs the sender to wrap `__impl`. - system_sender(__exec_system_scheduler_interface* __impl) + system_sender(system_context_replaceability::system_scheduler* __impl) : __scheduler_{__impl} { } @@ -214,7 +233,7 @@ namespace exec { private: /// The underlying implementation of the system scheduler. - __exec_system_scheduler_interface* __scheduler_{nullptr}; + system_context_replaceability::system_scheduler* __scheduler_; }; /// A scheduler that can add work to the system context. @@ -226,8 +245,8 @@ namespace exec { bool operator==(const system_scheduler&) const noexcept = default; /// Implementation detail. Constructs the scheduler to wrap `__impl`. - system_scheduler(__exec_system_scheduler_interface* __impl) - : __scheduler_(__impl) { + system_scheduler(system_context_replaceability::system_scheduler* __impl) + : __impl_(__impl) { } /// Returns the forward progress guarantee of `this`. @@ -241,7 +260,7 @@ namespace exec { /// Schedules new work, returning the sender that signals the start of the work. system_sender schedule() const noexcept { - return {__scheduler_}; + return {__impl_}; } private: @@ -249,87 +268,136 @@ namespace exec { friend class system_bulk_sender; /// The underlying implementation of the scheduler. - __exec_system_scheduler_interface* __scheduler_; + system_context_replaceability::system_scheduler* __impl_; }; + ////////////////////////////////////////////////////////////////////////////////////////////////// + // bulk + namespace __detail { template - auto __make_system_scheduler_from(T, __exec_system_scheduler_interface* p) noexcept { + auto + __make_system_scheduler_from(T, system_context_replaceability::system_scheduler* p) noexcept { return system_scheduler{p}; } - /// The state needed to execute the bulk sender created from system context. - template - struct __bulk_state { - /// The sender object that describes the work to be done. - system_bulk_sender<_Previous, _Size, _Fn> __snd_; + /// Helper that knows how to store the values sent by `_Previous` and pass them to bulk item calls or to the completion signal. + /// This represents the base class that abstracts the storage of the values sent by the previous sender. + /// Derived class will properly implement the receiver methods. + template + struct __forward_args_receiver : system_context_replaceability::bulk_item_receiver { + using __storage_t = __detail::__sender_data_t<_Previous>; + + /// Storage for the arguments received from the previous sender. + alignas(__storage_t) unsigned char __arguments_data_[sizeof(__storage_t)]; + }; + + /// Derived class that properly forwards the arguments received from `_Previous` to the receiver methods. + /// Uses the storage defined in the base class. No extra data is added here. + template + struct __typed_forward_args_receiver : __forward_args_receiver<_Previous> { + using __base_t = __forward_args_receiver<_Previous>; + using __rcvr_t = typename _BulkState::__rcvr_t; + + /// Stores `__as` in the base class storage, with the right types. + explicit __typed_forward_args_receiver(_As&&... __as) { + static_assert(sizeof(std::tuple<_As...>) <= sizeof(__base_t::__arguments_data_)); + new (__base_t::__arguments_data_) + std::tuple...>{std::move(__as)...}; + } + + /// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender. + void set_value() noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + std::apply( + [&](auto&&... __args) { + stdexec::set_value( + std::forward<__rcvr_t>(__state->__rcvr_), std::forward<_As>(__args)...); + }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + + /// Calls `set_error()` on the final receiver of the bulk operation, passing `__ex`. + void set_error(std::exception_ptr __ex) noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + stdexec::set_error(std::forward<__rcvr_t>(__state->__rcvr_), std::move(__ex)); + } + + /// Calls `set_stopped()` on the final receiver of the bulk operation. + void set_stopped() noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + stdexec::set_stopped(std::forward<__rcvr_t>(__state->__rcvr_)); + } + + /// Calls the bulk functor passing `__index` and the values from the previous sender. + void start(uint32_t __index) noexcept override { + auto __state = reinterpret_cast<_BulkState*>(this); + std::apply( + [&](auto&&... __args) { __state->__fun_(__index, __args...); }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + }; + + /// The state needed to execute the bulk sender created from system context, minus the preallocates space. + /// The preallocated space is obtained by calling the `__prepare_storage_for_backend` function pointer. + template + struct __bulk_state_base { + using __rcvr_t = _Rcvr; + using __forward_args_helper_t = __forward_args_receiver<_Previous>; + + /// Storage for the arguments and the helper needed to pass the arguments from the previous bulk sender to the bulk functor and receiver. + /// Needs to be the first member, to easier the convertion between `__forward_args_helper_` and `this`. + alignas(__forward_args_helper_t) unsigned char __forward_args_helper_[sizeof( + __forward_args_helper_t)]{}; + + /// The function to be executed to perform the bulk work. + [[no_unique_address]] + _Fn __fun_; /// The receiver object that receives completion from the work described by the sender. + [[no_unique_address]] _Rcvr __rcvr_; - /// The operating state on the implementation side. - void* __impl_os_ = nullptr; - /// Storage for the arguments passed from the previous receiver to the function object of the bulk sender. - alignas(__detail::__sender_data_t<_Previous>) unsigned char __arguments_data_[sizeof( - __detail::__sender_data_t<_Previous>)]{}; - /// Preallocated space for storing the operation state on the implementation size. - struct alignas(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN) __preallocated { - char __data[STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE]; - } __preallocated_{}; + /// Function that prepares the preallocated storage for calling the backend. + system_context_replaceability::storage (*__prepare_storage_for_backend)(__bulk_state_base*){ + nullptr}; - ~__bulk_state() { - __snd_.__scheduler_->__destruct_bulk_schedule_operation(__snd_.__scheduler_, __impl_os_); + __bulk_state_base(_Fn&& __fun, _Rcvr&& __rcvr) + : __fun_{std::move(__fun)} + , __rcvr_{std::move(__rcvr)} { + static_assert(offsetof(__bulk_state_base, __forward_args_helper_) == 0); } - }; // namespace __detail + }; - /// Receiver that is used in "bulk" to connect toe the input sender of the bulk operation. - template + /// Receiver that is used in "bulk" to connect to the input sender of the bulk operation. + template struct __bulk_intermediate_receiver { /// Declare that this is a `receiver`. using receiver_concept = stdexec::receiver_t; - /// The type of the object that holds relevant data for the entire bulk operation. - using __bulk_state_t = __bulk_state<_Previous, _Size, _Fn, _Rcvr>; - /// Object that holds the relevant data for the entire bulk operation. - __bulk_state_t& __state_; + _BulkState& __state_; + /// The underlying implementation of the scheduler we are using. + system_context_replaceability::system_scheduler* __scheduler_{nullptr}; + /// The size of the bulk operation. + _Size __size_; template void set_value(_As&&... __as) noexcept { - // Store the input data in the shared state, in the preallocated buffer. - static_assert(sizeof(std::tuple<_As...>) <= sizeof(__state_.__arguments_data_)); - new (&__state_.__arguments_data_) - std::tuple...>{std::move(__as)...}; + // Store the input data in the shared state. + using __typed_forward_args_receiver_t = + __typed_forward_args_receiver<_Previous, _BulkState, _As...>; + auto __r = new (&__state_.__forward_args_helper_) + __typed_forward_args_receiver_t(std::forward<_As>(__as)...); + + auto __scheduler = __scheduler_; + auto __size = static_cast(__size_); - // The function that needs to be applied to each item in the bulk operation. - auto __type_erased_item_fn = +[](void* __state_arg, unsigned long __idx) { - auto* __state = static_cast<__bulk_state_t*>(__state_arg); - std::apply( - [&](auto&&... __args) { __state->__snd_.__fun_(__idx, __args...); }, - *reinterpret_cast*>(&__state->__arguments_data_)); - }; - - // The function that needs to be applied when all items are complete. - auto __type_erased_cb_fn = - +[](void* __state_arg, int __completion_type, void* __exception) { - auto __state = static_cast<__bulk_state_t*>(__state_arg); - std::apply( - [&](auto&&... __args) { - __detail::__pass_to_receiver_with_args( - __completion_type, __exception, std::move(__state->__rcvr_), __args...); - }, - *reinterpret_cast*>(&__state->__arguments_data_)); - }; + auto __storage = __state_.__prepare_storage_for_backend(&__state_); + // This might destroy the `this` object. // Schedule the bulk work on the system scheduler. - __state_.__impl_os_ = __state_.__snd_.__scheduler_->__bulk_schedule( - __state_.__snd_.__scheduler_, // self - &__state_.__preallocated_, // preallocated - sizeof(__state_.__preallocated_), // psize - __type_erased_cb_fn, // cb - __type_erased_item_fn, // cb_item - &__state_, // data - static_cast(__state_.__snd_.__size_) // size - ); + // This will invoke `start` on our receiver multiple times, and then a completion signal (e.g., `set_value`). + __scheduler->bulk_schedule(__size, __storage, __r); } /// Invoked when the previous sender completes with "stopped" to stop the entire work. @@ -350,10 +418,35 @@ namespace exec { /// The operation state object for the system bulk sender. template - struct __system_bulk_op { - /// The inner operation state, which is the result of connecting the previous sender to the bulk intermediate receiver. - using __inner_op_state = stdexec:: - connect_result_t<_Previous, __bulk_intermediate_receiver<_Previous, _Size, _Fn, _Rcvr>>; + struct __system_bulk_op : __bulk_state_base<_Previous, _Fn, _Rcvr> { + + /// The type that holds the state of the bulk operation. + using __bulk_state_base_t = __bulk_state_base<_Previous, _Fn, _Rcvr>; + + /// The type of the receiver that will be connected to the previous sender. + using __intermediate_receiver_t = + __bulk_intermediate_receiver<__bulk_state_base_t, _Previous, _Size>; + + /// The type of inner operation state, which is the result of connecting the previous sender to the bulk intermediate receiver. + using __inner_op_state = stdexec::connect_result_t<_Previous, __intermediate_receiver_t>; + + static constexpr size_t _PreallocatedSize = + std::max(size_t(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE), sizeof(__inner_op_state)); + static constexpr size_t _PreallocatedAlign = + std::max(size_t(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN), alignof(__inner_op_state)); + + /// Preallocated space for storing the inner operation state, and then storage space for the backend call. + __aligned_storage<_PreallocatedSize, _PreallocatedAlign> __preallocated_; + + /// Destroys the inner operation state object, and returns the preallocated storage for it to be used by the backend. + static system_context_replaceability::storage + __prepare_storage_for_backend_impl(__bulk_state_base_t* __base) { + auto* __self = static_cast<__system_bulk_op*>(__base); + // We don't need anymore the storage for the previous operation state. + __self->__preallocated_.template __as<__inner_op_state>().~__inner_op_state(); + // Reuse the preallocated storage for the backend. + return __self->__preallocated_.__as_storage(); + } /// Constructs `this` from `__snd` and `__rcvr`, using the object returned by `__initFunc` to start the operation. /// @@ -364,8 +457,13 @@ namespace exec { system_bulk_sender<_Previous, _Size, _Fn>&& __snd, _Rcvr&& __rcvr, _InitF&& __initFunc) - : __state_{std::move(__snd), std::move(__rcvr)} - , __previous_operation_state_{__initFunc(*this)} { + : __bulk_state_base_t{std::move(__snd.__fun_), std::move(__rcvr)} { + // Write the function that prepares the storage for the backend. + __bulk_state_base_t::__prepare_storage_for_backend = + &__system_bulk_op::__prepare_storage_for_backend_impl; + + // Start using the preallocated buffer to store the inner operation state. + new (__preallocated_.__as_ptr()) __inner_op_state(__initFunc(*this)); } __system_bulk_op(const __system_bulk_op&) = delete; @@ -377,13 +475,8 @@ namespace exec { void start() & noexcept { // Start previous operation state. // Bulk operation will be started when the previous sender completes. - stdexec::start(__previous_operation_state_); + stdexec::start(__preallocated_.template __as<__inner_op_state>()); } - - /// The state of this bulk operation. - __bulk_state<_Previous, _Size, _Fn, _Rcvr> __state_; - /// The operation state object of the previous computation. - __inner_op_state __previous_operation_state_; }; } // namespace __detail @@ -395,10 +488,9 @@ namespace exec { using __completions_t = stdexec::transform_completion_signatures< // stdexec::__completion_signatures_of_t, _Env...>, // stdexec::completion_signatures>; + template - friend struct __detail::__bulk_state; - template - friend struct __detail::__bulk_intermediate_receiver; + friend struct __detail::__system_bulk_op; public: /// Marks this type as being a sender @@ -406,7 +498,7 @@ namespace exec { /// Constructs `this`. system_bulk_sender(system_scheduler __sched, _Previous __previous, _Size __size, _Fn&& __fun) - : __scheduler_{__sched.__scheduler_} + : __scheduler_{__sched.__impl_} , __previous_{std::move(__previous)} , __size_{std::move(__size)} , __fun_{std::move(__fun)} { @@ -421,11 +513,13 @@ namespace exec { template auto connect(_Rcvr __rcvr) && noexcept(stdexec::__nothrow_move_constructible<_Rcvr>) // -> __detail::__system_bulk_op<_Previous, _Size, _Fn, _Rcvr> { - using __receiver_t = __detail::__bulk_intermediate_receiver<_Previous, _Size, _Fn, _Rcvr>; - return {std::move(*this), std::move(__rcvr), [](auto& __op) { + using __res_t = __detail::__system_bulk_op<_Previous, _Size, _Fn, _Rcvr>; + using __receiver_t = typename __res_t::__intermediate_receiver_t; + return {std::move(*this), std::move(__rcvr), [this](auto& __op) { // Connect bulk input receiver with the previous operation and store in the operating state. return stdexec::connect( - std::move(__op.__state_.__snd_.__previous_), __receiver_t{__op.__state_}); + std::move(this->__previous_), + __receiver_t{__op, this->__scheduler_, this->__size_}); }}; } @@ -437,40 +531,35 @@ namespace exec { private: /// The underlying implementation of the scheduler we are using. - __exec_system_scheduler_interface* __scheduler_{nullptr}; + system_context_replaceability::system_scheduler* __scheduler_{nullptr}; /// The previous sender, the one that produces the input value for the bulk function. _Previous __previous_; /// The size of the bulk operation. _Size __size_; /// The function to be executed to perform the bulk work. + [[no_unique_address]] _Fn __fun_; }; inline system_context::system_context() { - __impl_ = __get_exec_system_context_impl(); - // TODO error handling + __impl_ = system_context_replaceability::query_system_context< + system_context_replaceability::system_scheduler>(); + if (!__impl_) { + throw std::runtime_error{"No system context implementation found"}; + } } inline system_scheduler system_context::get_scheduler() { - return system_scheduler{__impl_->__get_scheduler(__impl_)}; + return system_scheduler{__impl_}; } inline size_t system_context::max_concurrency() const noexcept { return std::thread::hardware_concurrency(); } - auto system_scheduler::query(stdexec::get_forward_progress_guarantee_t) const noexcept + inline auto system_scheduler::query(stdexec::get_forward_progress_guarantee_t) const noexcept -> stdexec::forward_progress_guarantee { - switch (__scheduler_->__forward_progress_guarantee) { - case 0: - return stdexec::forward_progress_guarantee::concurrent; - case 1: - return stdexec::forward_progress_guarantee::parallel; - case 2: - return stdexec::forward_progress_guarantee::weakly_parallel; - default: - return stdexec::forward_progress_guarantee::parallel; - } + return stdexec::forward_progress_guarantee::parallel; } struct __transform_system_bulk_sender { diff --git a/test/exec/test_system_context.cpp b/test/exec/test_system_context.cpp index d3d1a9bb0..8f4d6c366 100644 --- a/test/exec/test_system_context.cpp +++ b/test/exec/test_system_context.cpp @@ -237,67 +237,34 @@ TEST_CASE("simple bulk chaining on system context", "[types][system_scheduler]") CHECK(std::get<0>(res.value()) == pool_id); } -struct my_system_scheduler_impl : __exec_system_scheduler_interface { - my_system_scheduler_impl() - : base_{pool_} { - __forward_progress_guarantee = base_.__forward_progress_guarantee; - __schedule_operation_size = base_.__schedule_operation_size; - __schedule_operation_alignment = base_.__schedule_operation_alignment; - __destruct_schedule_operation = base_.__destruct_schedule_operation; - __bulk_schedule_operation_size = base_.__bulk_schedule_operation_size; - __bulk_schedule_operation_alignment = base_.__bulk_schedule_operation_alignment; - __bulk_schedule = base_.__bulk_schedule; - __destruct_bulk_schedule_operation = base_.__destruct_bulk_schedule_operation; - - __schedule = __schedule_impl; // have our own schedule implementation - } +struct my_system_scheduler_impl : exec::__system_context_default_impl::__system_scheduler_impl { + using base_t = exec::__system_context_default_impl::__system_scheduler_impl; + + my_system_scheduler_impl() = default; int num_schedules() const { return count_schedules_; } - private: - exec::static_thread_pool pool_; - exec::__system_context_default_impl::__system_scheduler_impl base_; - int count_schedules_ = 0; - - static void* __schedule_impl( - __exec_system_scheduler_interface* self_arg, - void* preallocated, - uint32_t psize, - __exec_system_context_completion_callback_t callback, - void* data) noexcept { - auto self = static_cast(self_arg); - // increment our counter. - self->count_schedules_++; - // delegate to the base implementation. - return self->base_.__schedule(&self->base_, preallocated, psize, callback, data); - } -}; - -struct my_system_context_impl : __exec_system_context_interface { - my_system_context_impl() { - __version = 202402; - __get_scheduler = __get_scheduler_impl; + void schedule( + exec::__system_context_default_impl::storage __s, + exec::__system_context_default_impl::receiver* __r) noexcept override { + count_schedules_++; + base_t::schedule(__s, __r); } - int num_schedules() const { - return scheduler_.num_schedules(); - } private: - my_system_scheduler_impl scheduler_{}; - - static __exec_system_scheduler_interface* - __get_scheduler_impl(__exec_system_context_interface* __self) noexcept { - return &static_cast(__self)->scheduler_; - } + int count_schedules_ = 0; }; TEST_CASE("can change the implementation of system context", "[types][system_scheduler]") { + using namespace exec::system_context_replaceability; + // Not to spec. - my_system_context_impl ctx_impl; - __set_exec_system_context_impl(&ctx_impl); + my_system_scheduler_impl my_scheduler; + auto scr = query_system_context<__system_context_replaceability>(); + scr->__set_system_scheduler(&my_scheduler); std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; @@ -306,9 +273,9 @@ TEST_CASE("can change the implementation of system context", "[types][system_sch auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); - REQUIRE(ctx_impl.num_schedules() == 0); + REQUIRE(my_scheduler.num_schedules() == 0); ex::sync_wait(std::move(snd)); - REQUIRE(ctx_impl.num_schedules() == 1); + REQUIRE(my_scheduler.num_schedules() == 1); REQUIRE(pool_id != std::thread::id{}); REQUIRE(this_id != pool_id); diff --git a/test/exec/test_system_context_replaceability.cpp b/test/exec/test_system_context_replaceability.cpp index 82d0e4cbd..8f6c7335b 100644 --- a/test/exec/test_system_context_replaceability.cpp +++ b/test/exec/test_system_context_replaceability.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include namespace ex = stdexec; @@ -26,61 +25,34 @@ namespace { static int count_schedules = 0; - struct my_system_scheduler_impl : __exec_system_scheduler_interface { - my_system_scheduler_impl() - : base_{pool_} { - __forward_progress_guarantee = base_.__forward_progress_guarantee; - __schedule_operation_size = base_.__schedule_operation_size; - __schedule_operation_alignment = base_.__schedule_operation_alignment; - __destruct_schedule_operation = base_.__destruct_schedule_operation; - __bulk_schedule_operation_size = base_.__bulk_schedule_operation_size; - __bulk_schedule_operation_alignment = base_.__bulk_schedule_operation_alignment; - __bulk_schedule = base_.__bulk_schedule; - __destruct_bulk_schedule_operation = base_.__destruct_bulk_schedule_operation; + struct my_system_scheduler_impl : exec::__system_context_default_impl::__system_scheduler_impl { + using base_t = exec::__system_context_default_impl::__system_scheduler_impl; - __schedule = __schedule_impl; // have our own schedule implementation - } - - private: - exec::static_thread_pool pool_; - exec::__system_context_default_impl::__system_scheduler_impl base_; + my_system_scheduler_impl() = default; - static void* __schedule_impl( - __exec_system_scheduler_interface* self_arg, - void* preallocated, - uint32_t psize, - __exec_system_context_completion_callback_t callback, - void* data) noexcept { - printf("Using my_system_scheduler_impl::__schedule_impl\n"); - auto self = static_cast(self_arg); - // increment our counter. + void schedule( + exec::__system_context_default_impl::storage __s, + exec::__system_context_default_impl::receiver* __r) noexcept override { count_schedules++; - // delegate to the base implementation. - return self->base_.__schedule(&self->base_, preallocated, psize, callback, data); + base_t::schedule(__s, __r); } }; - struct my_system_context_impl : __exec_system_context_interface { - my_system_context_impl() { - __version = 202402; - __get_scheduler = __get_scheduler_impl; + void* my_query_system_context_interface(__uuid id) noexcept { + if (id == exec::__system_context_default_impl::system_scheduler::__interface_identifier) { + static my_system_scheduler_impl instance; + return &instance; } + return nullptr; + } - private: - my_system_scheduler_impl scheduler_{}; - - static __exec_system_scheduler_interface* - __get_scheduler_impl(__exec_system_context_interface* __self) noexcept { - return &static_cast(__self)->scheduler_; - } - }; } // namespace // Should replace the function defined in __system_context_default_impl.hpp -extern "C" STDEXEC_ATTRIBUTE((weak)) __exec_system_context_interface* __get_exec_system_context_impl() { - printf("Using my_system_context_impl\n"); - static my_system_context_impl instance; - return &instance; +extern STDEXEC_ATTRIBUTE((weak)) + void* + __query_system_context_interface(const __uuid& id) noexcept { + return my_query_system_context_interface(id); } TEST_CASE(