diff --git a/include/exec/__detail/__system_context_default_impl.hpp b/include/exec/__detail/__system_context_default_impl.hpp index 66b48191f..5e31e3443 100644 --- a/include/exec/__detail/__system_context_default_impl.hpp +++ b/include/exec/__detail/__system_context_default_impl.hpp @@ -15,12 +15,17 @@ */ #pragma once -#include "__system_context_if.h" +#include "__system_context_replaceability_api.hpp" #include "stdexec/execution.hpp" #include "exec/static_thread_pool.hpp" namespace exec::__system_context_default_impl { using namespace stdexec::tags; + using system_context_replaceability::receiver; + using system_context_replaceability::bulk_item_receiver; + using system_context_replaceability::storage; + using system_context_replaceability::system_scheduler; + using system_context_replaceability::__system_context_replaceability; using __pool_scheduler_t = decltype(std::declval().get_scheduler()); @@ -32,28 +37,46 @@ namespace exec::__system_context_default_impl { struct __recv { using receiver_concept = stdexec::receiver_t; - /// The callback to be called. - __exec_system_context_completion_callback_t __cb_; + //! The operation state on the frontend. + receiver* __r_; - /// The data to be passed to the callback. - void* __data_; - - /// The owning operation state, to be destructed when the operation completes. + //! The parent operation state that we will destroy when we complete. __operation<_Sender>* __op_; void set_value() noexcept { - __cb_(__data_, 0, nullptr); + auto __op = __op_; + __r_->set_value(); + __op->__destruct(); } void set_error(std::exception_ptr __ptr) noexcept { - __cb_(__data_, 2, *reinterpret_cast(&__ptr)); + auto __op = __op_; + __r_->set_error(__ptr); + __op->__destruct(); } void set_stopped() noexcept { - __cb_(__data_, 1, nullptr); + auto __op = __op_; + __r_->set_stopped(); + __op->__destruct(); } }; + /// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment. + inline storage __ensure_alignment(storage __storage, size_t __alignment) noexcept { + auto __pn = reinterpret_cast(__storage.__data); + if (__pn % __alignment == 0) { + return __storage; + } else if (__storage.__size < __alignment) { + return {nullptr, 0}; + } else { + auto __new_pn = (__pn + __alignment - 1) & ~(__alignment - 1); + return { + reinterpret_cast(__new_pn), + static_cast(__storage.__size - (__new_pn - __pn))}; + } + } + template struct __operation { /// The inner operation state, that results out of connecting the underlying sender with the receiver. @@ -62,19 +85,21 @@ namespace exec::__system_context_default_impl { bool __on_heap_; /// Try to construct the operation in the preallocated memory if it fits, otherwise allocate a new operation. - static __operation* __construct_maybe_alloc( - void* __preallocated, - size_t __psize, - _Sender __sndr, - __exec_system_context_completion_callback_t __cb, - void* __data) { - if (__preallocated == nullptr || __psize < sizeof(__operation)) { - return new __operation(std::move(__sndr), __cb, __data, true); + static __operation* + __construct_maybe_alloc(storage __storage, receiver* __completion, _Sender __sndr) { + __storage = __ensure_alignment(__storage, alignof(__operation)); + if (__storage.__data == nullptr || __storage.__size < sizeof(__operation)) { + return new __operation(std::move(__sndr), __completion, true); } else { - return new (__preallocated) __operation(std::move(__sndr), __cb, __data, false); + return new (__storage.__data) __operation(std::move(__sndr), __completion, false); } } + //! Starts the operation that will schedule work on the system scheduler. + void start() noexcept { + stdexec::start(__inner_op_); + } + /// Destructs the operation; frees any allocated memory. void __destruct() { if (__on_heap_) { @@ -85,40 +110,27 @@ namespace exec::__system_context_default_impl { } private: - __operation( - _Sender __sndr, - __exec_system_context_completion_callback_t __cb, - void* __data, - bool __on_heap) - : __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__cb, __data, this})) + __operation(_Sender __sndr, receiver* __completion, bool __on_heap) + : __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__completion, this})) , __on_heap_(__on_heap) { } }; - struct __system_scheduler_impl : __exec_system_scheduler_interface { - explicit __system_scheduler_impl(exec::static_thread_pool& __pool) - : __pool_scheduler_{__pool.get_scheduler()} { - __forward_progress_guarantee = 1; // parallel - __schedule_operation_size = sizeof(__schedule_operation_t), - __schedule_operation_alignment = alignof(__schedule_operation_t), - __schedule = __schedule_impl; - __destruct_schedule_operation = __destruct_schedule_operation_impl; - __bulk_schedule_operation_size = sizeof(__bulk_schedule_operation_t), - __bulk_schedule_operation_alignment = alignof(__bulk_schedule_operation_t), - __bulk_schedule = __bulk_schedule_impl; - __destruct_bulk_schedule_operation = __destruct_bulk_schedule_operation_impl; + struct __system_scheduler_impl : system_scheduler { + __system_scheduler_impl() + : __pool_scheduler_(__pool_.get_scheduler()) { } - private: - /// Scheduler from the underlying thread pool. + /// The underlying thread pool. + exec::static_thread_pool __pool_; __pool_scheduler_t __pool_scheduler_; + //! Functor called by the `bulk` operation; sends a `start` signal to the frontend. struct __bulk_functor { - __exec_system_context_bulk_item_callback_t __cb_item_; - void* __data_; + bulk_item_receiver* __item_r_; void operator()(unsigned long __idx) const noexcept { - __cb_item_(__data_, __idx); + __item_r_->start(static_cast(__idx)); } }; @@ -127,78 +139,48 @@ namespace exec::__system_context_default_impl { using __bulk_schedule_operation_t = __operation()), - std::declval(), + std::declval(), std::declval<__bulk_functor>()))>; - static void* __schedule_impl( - __exec_system_scheduler_interface* __self, - void* __preallocated, - uint32_t __psize, - __exec_system_context_completion_callback_t __cb, - void* __data) noexcept { - - auto __this = static_cast<__system_scheduler_impl*>(__self); - auto __sndr = stdexec::schedule(__this->__pool_scheduler_); - auto __os = __schedule_operation_t::__construct_maybe_alloc( - __preallocated, __psize, std::move(__sndr), __cb, __data); - stdexec::start(__os->__inner_op_); - return __os; - } - - static void __destruct_schedule_operation_impl( - __exec_system_scheduler_interface* /*__self*/, - void* __operation) noexcept { - auto __op = static_cast<__schedule_operation_t*>(__operation); - __op->__destruct(); + public: + void schedule(storage __storage, receiver* __r) noexcept override { + try { + auto __sndr = stdexec::schedule(__pool_scheduler_); + auto __os = + __schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr)); + __os->start(); + } catch (std::exception& __e) { + __r->set_error(std::current_exception()); + } } - static void* __bulk_schedule_impl( - __exec_system_scheduler_interface* __self, - void* __preallocated, - uint32_t __psize, - __exec_system_context_completion_callback_t __cb, - __exec_system_context_bulk_item_callback_t __cb_item, - void* __data, - unsigned long __size) noexcept { - - auto __this = static_cast<__system_scheduler_impl*>(__self); - auto __sndr = stdexec::bulk( - stdexec::schedule(__this->__pool_scheduler_), __size, __bulk_functor{__cb_item, __data}); - auto __os = __bulk_schedule_operation_t::__construct_maybe_alloc( - __preallocated, __psize, std::move(__sndr), __cb, __data); - stdexec::start(__os->__inner_op_); - return __os; - } - - static void __destruct_bulk_schedule_operation_impl( - __exec_system_scheduler_interface* /*__self*/, - void* __operation) noexcept { - auto __op = static_cast<__bulk_schedule_operation_t*>(__operation); - __op->__destruct(); + void bulk_schedule( + uint32_t __size, + storage __storage, + receiver* __r, + bulk_item_receiver* __item_r) noexcept override { + try { + auto __sndr = + stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__item_r}); + auto __os = + __bulk_schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr)); + __os->start(); + } catch (std::exception& __e) { + __r->set_error(std::current_exception()); + } } - }; - /// Default implementation of a system context, based on `static_thread_pool` - struct __system_context_impl : __exec_system_context_interface { - __system_context_impl() { - __version = 202402; - __get_scheduler = __get_scheduler_impl; + uint32_t max_concurrency() noexcept override { + uint32_t n = std::thread::hardware_concurrency(); + return n == 0 ? 1 : n; } - private: - /// The underlying thread pool. - exec::static_thread_pool __pool_{}; - - /// The system scheduler implementation. - __system_scheduler_impl __scheduler_{__pool_}; - - static __exec_system_scheduler_interface* - __get_scheduler_impl(__exec_system_context_interface* __self) noexcept { - return &static_cast<__system_context_impl*>(__self)->__scheduler_; + stdexec::forward_progress_guarantee get_forward_progress_guarantee() noexcept override { + return stdexec::forward_progress_guarantee::parallel; } }; - /// Keeps track of the object implementing the system context interface. + /// Keeps track of the object implementing the system context interfaces. struct __instance_holder { /// Get the only instance of this class. @@ -208,22 +190,41 @@ namespace exec::__system_context_default_impl { } /// Get the currently selected system context object. - __exec_system_context_interface* __get_current_instance() const noexcept { + system_scheduler* __get_current_instance() const noexcept { return __current_instance_; } /// Allows changing the currently selected system context object; used for testing. - void __set_current_instance(__exec_system_context_interface* __instance) noexcept { + void __set_current_instance(system_scheduler* __instance) noexcept { __current_instance_ = __instance; } private: __instance_holder() { - static __system_context_impl __default_instance_; + static __system_scheduler_impl __default_instance_; __current_instance_ = &__default_instance_; } - __exec_system_context_interface* __current_instance_; + system_scheduler* __current_instance_; }; + struct __system_context_replaceability_impl : __system_context_replaceability { + //! Globally replaces the system scheduler backend. + //! This needs to be called within `main()` and before the system scheduler is accessed. + void __set_system_scheduler(system_scheduler* __backend) noexcept override { + __instance_holder::__singleton().__set_current_instance(__backend); + } + }; + + void* __default_query_system_context_interface(std::type_index __id) noexcept { + if (__id == typeid(system_scheduler)) { + return __instance_holder::__singleton().__get_current_instance(); + } else if (__id == typeid(__system_context_replaceability)) { + static __system_context_replaceability_impl __impl; + return &__impl; + } + + return nullptr; + } + } // namespace exec::__system_context_default_impl diff --git a/include/exec/__detail/__system_context_default_impl_entry.hpp b/include/exec/__detail/__system_context_default_impl_entry.hpp index db325c6d4..8fc9f9d6a 100644 --- a/include/exec/__detail/__system_context_default_impl_entry.hpp +++ b/include/exec/__detail/__system_context_default_impl_entry.hpp @@ -27,19 +27,12 @@ STDEXEC_PRAGMA_PUSH() STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes") // warning: inline function '[...]' declared weak /// Gets the default system context implementation. -extern "C" STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak)) -__exec_system_context_interface* - __get_exec_system_context_impl() { - return exec::__system_context_default_impl::__instance_holder::__singleton() - .__get_current_instance(); -} - -/// Sets the default system context implementation. -extern "C" STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak)) - void - __set_exec_system_context_impl(__exec_system_context_interface* __instance) { - return exec::__system_context_default_impl::__instance_holder::__singleton() - .__set_current_instance(__instance); +extern + STDEXEC_SYSTEM_CONTEXT_INLINE + STDEXEC_ATTRIBUTE((weak)) + void* + __query_system_context_interface(std::type_index __id) noexcept { + return exec::__system_context_default_impl::__default_query_system_context_interface(__id); } STDEXEC_PRAGMA_POP() diff --git a/include/exec/__detail/__system_context_if.h b/include/exec/__detail/__system_context_if.h deleted file mode 100644 index 5151ae780..000000000 --- a/include/exec/__detail/__system_context_if.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright (c) 2024 Lee Howes, Lucian Radu Teodorescu - * - * Licensed under the Apache License Version 2.0 with LLVM Exceptions - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * https://llvm.org/LICENSE.txt - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef STDEXEC_SYSTEM_CONTEXT_IF_H -#define STDEXEC_SYSTEM_CONTEXT_IF_H - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -struct __exec_system_context_interface; -struct __exec_system_scheduler_interface; - -/// Interface that allows interaction with the system context, allowing scheduling work on the system. -struct __exec_system_context_interface { - /// The supported version of the system context interface, in the form YYYYMM. - uint32_t __version; - - /// Returns an interface to the system scheduler. - struct __exec_system_scheduler_interface* (*__get_scheduler)( - struct __exec_system_context_interface* /*self*/); -}; - -/// Callback to be called by the scheduler when new work can start. -typedef void (*__exec_system_context_completion_callback_t)( - void*, // data pointer passed to scheduler - int, // completion type: 0 for normal completion, 1 for cancellation, 2 for exception - void*); // If completion type is 2, this is the exception pointer. - -/// Callback to be called by the scheduler for each bulk item. -typedef void (*__exec_system_context_bulk_item_callback_t)( - void*, // data pointer passed to scheduler - unsigned long); // the index of the work item that is starting - -struct __exec_system_scheduler_interface { - /// The forward progress guarantee of the scheduler. - /// - /// 0 == concurrent, 1 == parallel, 2 == weakly_parallel - uint32_t __forward_progress_guarantee; - - /// The size of the operation state object on the implementation side. - uint32_t __schedule_operation_size; - /// The alignment of the operation state object on the implementation side. - uint32_t __schedule_operation_alignment; - - /// Schedules new work on the system scheduler, calling `cb` with `data` when the work can start. - /// Returns an object that should be passed to __destruct_schedule_operation when the operation completes. - void* (*__schedule)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*__preallocated*/, - uint32_t /*__psize*/, - __exec_system_context_completion_callback_t /*cb*/, - void* /*data*/); - - /// Destructs the operation state object. - void (*__destruct_schedule_operation)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*operation*/); - - /// The size of the operation state object on the implementation side. - uint32_t __bulk_schedule_operation_size; - /// The alignment of the operation state object on the implementation side. - uint32_t __bulk_schedule_operation_alignment; - - /// Schedules new bulk work of size `size` on the system scheduler, calling `cb_item` with `data` - /// for indices in [0, `size`), and calling `cb` on general completion. - /// Returns the operation state object that should be passed to __destruct_bulk_schedule_operation. - void* (*__bulk_schedule)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*__preallocated*/, - uint32_t /*__psize*/, - __exec_system_context_completion_callback_t /*cb*/, - __exec_system_context_bulk_item_callback_t /*cb_item*/, - void* /*data*/, - unsigned long /*size*/); - - /// Destructs the operation state object for a bulk_schedule. - void (*__destruct_bulk_schedule_operation)( - struct __exec_system_scheduler_interface* /*self*/, - void* /*operation*/); -}; - -#ifdef __cplusplus -} -#endif - - -#endif diff --git a/include/exec/__detail/__system_context_replaceability_api.hpp b/include/exec/__detail/__system_context_replaceability_api.hpp new file mode 100644 index 000000000..263729616 --- /dev/null +++ b/include/exec/__detail/__system_context_replaceability_api.hpp @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2025 Lucian Radu Teodorescu, Lewis Baker + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H +#define STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H + +#include "stdexec/__detail/__execution_fwd.hpp" + +#include + + +/// Implementation-defined mechanism of querying a system context interface of type `id`. +extern void* __query_system_context_interface(std::type_index id) noexcept; + +namespace exec::system_context_replaceability { + /// Query the system context for an interface of type `_Interface`. + template + inline _Interface* query_system_context() { + return static_cast<_Interface*>(__query_system_context_interface(typeid(_Interface))); + } + + /// Interface for completing a sender operation. + /// Backend will call frontend though this interface for completing the `schedule` and `schedule_bulk` operations. + struct receiver { + virtual ~receiver() = default; + + /// Called when the system scheduler completes successfully. + virtual void set_value() noexcept = 0; + /// Called when the system scheduler completes with an error. + virtual void set_error(std::exception_ptr) noexcept = 0; + /// Called when the system scheduler was stopped. + virtual void set_stopped() noexcept = 0; + }; + + /// Interface for receiving bulk item signals. + struct bulk_item_receiver { + virtual ~bulk_item_receiver() = default; + + /// Called for each item of a bulk operation, possible on different threads. + virtual void start(uint32_t) noexcept = 0; + }; + + /// Describes a storage space. + /// Used to pass preallocated storage from the frontend to the backend. + struct storage { + void* __data; + uint32_t __size; + }; + + /// Interface for the system scheduler + struct system_scheduler { + virtual ~system_scheduler() = default; + + /// Schedule work on system scheduler, calling `__r` when done and using `__s` for preallocated memory. + virtual void schedule(storage __s, receiver* __r) noexcept = 0; + /// Schedule bulk work of size `__n` on system scheduler, calling `__br` for each item, calling `__r` when done and using `__s` for preallocated memory. + virtual void + bulk_schedule(uint32_t __n, storage __s, receiver* __r, bulk_item_receiver* __br) noexcept = 0; + /// Returns the maximum concurrency supported by the system scheduler. + virtual uint32_t max_concurrency() noexcept = 0; + /// Get the forward progress guarantee promised by the system scheduler. + virtual stdexec::forward_progress_guarantee get_forward_progress_guarantee() noexcept = 0; + }; + + /// Implementation-defined mechanism for replacing the system scheduler backend at run-time. + struct __system_context_replaceability { + /// Globally replaces the system scheduler backend. + /// This needs to be called within `main()` and before the system scheduler is accessed. + virtual void __set_system_scheduler(system_scheduler*) noexcept = 0; + }; + +} // namespace exec::system_context_replaceability + +#endif \ No newline at end of file diff --git a/include/exec/system_context.hpp b/include/exec/system_context.hpp index 91ec1435e..dc0a6a981 100644 --- a/include/exec/system_context.hpp +++ b/include/exec/system_context.hpp @@ -16,7 +16,7 @@ #pragma once #include "stdexec/execution.hpp" -#include "__detail/__system_context_if.h" +#include "__detail/__system_context_replaceability_api.hpp" #ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE # define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 80 @@ -33,51 +33,31 @@ // TODO: make these configurable by providing policy to the system context -/// Gets the default system context implementation. -extern "C" -__exec_system_context_interface* - __get_exec_system_context_impl(); - -/// Sets the default system context implementation. -extern "C" - void - __set_exec_system_context_impl(__exec_system_context_interface* __instance); - namespace exec { namespace __detail { using namespace stdexec::tags; - /// Transforms from a C API signal to the `set_xxx` completion signal. + /// Allows a frontend receiver of type `_Rcvr` to be passed to the backend. template - inline void __pass_to_receiver(int __completion_type, void* __exception, _Rcvr&& __rcvr) { - if (__completion_type == 0) { - stdexec::set_value(std::forward<_Rcvr>(__rcvr)); - } else if (__completion_type == 1) { - stdexec::set_stopped(std::forward<_Rcvr>(__rcvr)); - } else if (__completion_type == 2) { - stdexec::set_error( - std::forward<_Rcvr>(__rcvr), - std::move(*reinterpret_cast(&__exception))); + struct __receiver_adapter : system_context_replaceability::receiver { + explicit __receiver_adapter(_Rcvr&& __rcvr) + : __rcvr_{std::forward<_Rcvr>(__rcvr)} { } - } - /// Same as a above, but allows passing arguments to set_value. - template - inline void __pass_to_receiver_with_args( - int __completion_type, - void* __exception, - _Rcvr&& __rcvr, - _SetValueArgs&&... __setValueArgs) { - if (__completion_type == 0) { - stdexec::set_value(std::forward<_Rcvr>(__rcvr), std::move(__setValueArgs)...); - } else if (__completion_type == 1) { - stdexec::set_stopped(std::forward<_Rcvr>(__rcvr)); - } else if (__completion_type == 2) { - stdexec::set_error( - std::forward<_Rcvr>(__rcvr), - std::move(*reinterpret_cast(&__exception))); + void set_value() noexcept override { + stdexec::set_value(std::forward<_Rcvr>(__rcvr_)); } - } + + void set_error(std::exception_ptr __ex) noexcept override { + stdexec::set_error(std::forward<_Rcvr>(__rcvr_), std::move(__ex)); + } + + void set_stopped() noexcept override { + stdexec::set_stopped(std::forward<_Rcvr>(__rcvr_)); + } + + _Rcvr __rcvr_; + }; /// The type large enough to store the data produced by a sender. template @@ -110,7 +90,7 @@ namespace exec { private: /// The actual implementation of the system context. - __exec_system_context_interface* __impl_{nullptr}; + system_context_replaceability::system_scheduler* __impl_{nullptr}; }; /// The execution domain of the system_scheduler, used for the purposes of customizing @@ -125,7 +105,7 @@ namespace exec { namespace __detail { template - auto __make_system_scheduler_from(T, __exec_system_scheduler_interface*) noexcept; + auto __make_system_scheduler_from(T, system_context_replaceability::system_scheduler*) noexcept; /// Describes the environment of this sender. struct __system_scheduler_env { @@ -136,23 +116,19 @@ namespace exec { } /// The underlying implementation of the scheduler we are using. - __exec_system_scheduler_interface* __scheduler_; + system_context_replaceability::system_scheduler* __scheduler_; }; /// The operation state used to execute the work described by this sender. template struct __system_op { /// Constructs `this` from `__rcvr` and `__scheduler_impl`. - __system_op(_Rcvr&& __rcvr, __exec_system_scheduler_interface* __scheduler_impl) - : __rcvr_{std::move(__rcvr)} + __system_op(_Rcvr&& __rcvr, system_context_replaceability::system_scheduler* __scheduler_impl) + : __rcvr_{std::forward<_Rcvr>(__rcvr)} , __scheduler_{__scheduler_impl} { } - ~__system_op() { - if (__impl_os_ != nullptr) { - __scheduler_->__destruct_schedule_operation(__scheduler_, __impl_os_); - } - } + ~__system_op() = default; __system_op(const __system_op&) = delete; __system_op(__system_op&&) = delete; @@ -161,21 +137,14 @@ namespace exec { /// Starts the work stored in `this`. void start() & noexcept { - __impl_os_ = __scheduler_->__schedule( - __scheduler_, &__preallocated_, sizeof(__preallocated_), __cb, this); - } - - static void __cb(void* __data, int __completion_type, void* __exception) { - auto __self = static_cast<__system_op*>(__data); - __detail::__pass_to_receiver(__completion_type, __exception, std::move(__self->__rcvr_)); + system_context_replaceability::storage __storage{&__preallocated_, sizeof(__preallocated_)}; + __scheduler_->schedule(__storage, &__rcvr_); } /// Object that receives completion from the work described by the sender. - _Rcvr __rcvr_; + __receiver_adapter<_Rcvr> __rcvr_; /// The underlying implementation of the scheduler. - __exec_system_scheduler_interface* __scheduler_{nullptr}; - /// The operating state on the implementation side. - void* __impl_os_{nullptr}; + system_context_replaceability::system_scheduler* __scheduler_; /// Preallocated space for storing the operation state on the implementation size. struct alignas(STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_ALIGN) __preallocated { @@ -196,7 +165,7 @@ namespace exec { stdexec::set_error_t(std::exception_ptr)>; /// Implementation detail. Constructs the sender to wrap `__impl`. - system_sender(__exec_system_scheduler_interface* __impl) + system_sender(system_context_replaceability::system_scheduler* __impl) : __scheduler_{__impl} { } @@ -214,7 +183,7 @@ namespace exec { private: /// The underlying implementation of the system scheduler. - __exec_system_scheduler_interface* __scheduler_{nullptr}; + system_context_replaceability::system_scheduler* __scheduler_; }; /// A scheduler that can add work to the system context. @@ -226,8 +195,8 @@ namespace exec { bool operator==(const system_scheduler&) const noexcept = default; /// Implementation detail. Constructs the scheduler to wrap `__impl`. - system_scheduler(__exec_system_scheduler_interface* __impl) - : __scheduler_(__impl) { + system_scheduler(system_context_replaceability::system_scheduler* __impl) + : __impl_(__impl) { } /// Returns the forward progress guarantee of `this`. @@ -241,7 +210,7 @@ namespace exec { /// Schedules new work, returning the sender that signals the start of the work. system_sender schedule() const noexcept { - return {__scheduler_}; + return {__impl_}; } private: @@ -249,39 +218,101 @@ namespace exec { friend class system_bulk_sender; /// The underlying implementation of the scheduler. - __exec_system_scheduler_interface* __scheduler_; + system_context_replaceability::system_scheduler* __impl_; }; namespace __detail { template - auto __make_system_scheduler_from(T, __exec_system_scheduler_interface* p) noexcept { + auto + __make_system_scheduler_from(T, system_context_replaceability::system_scheduler* p) noexcept { return system_scheduler{p}; } + /// Helper that knows how to store the values sent by `_Previous` and pass them to bulk item calls or to the completion signal. + /// This represents the base class that abstracts the storage of the values sent by the previous sender. + /// Derived class will properly implement the receiver methods. + template + struct __forward_args_receiver + : system_context_replaceability::receiver + , system_context_replaceability::bulk_item_receiver { + using __storage_t = __detail::__sender_data_t<_Previous>; + + /// Pointer to the `__system_bulk_op` object. + void* __state_; + /// Storage for the arguments received from the previous sender. + alignas(__storage_t) unsigned char __arguments_data_[sizeof(__storage_t)]; + }; + + /// Derived class that properly forwards the arguments received from `_Previous` to the receiver methods. + /// Uses the storage defined in the base class. No extra data is added here. + template + struct __typed_forward_args_receiver : __forward_args_receiver<_Previous> { + using __base_t = __forward_args_receiver<_Previous>; + using __rcvr_t = _BulkState::__rcvr_t; + + /// Stores `__state` and `__as` in the base class storage, with the right types. + explicit __typed_forward_args_receiver(_BulkState* __state, _As&&... __as) { + __base_t::__state_ = __state; + static_assert(sizeof(std::tuple<_As...>) <= sizeof(__base_t::__arguments_data_)); + new (__base_t::__arguments_data_) + std::tuple...>{std::move(__as)...}; + } + + /// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender. + void set_value() noexcept override { + auto __state = static_cast<_BulkState*>(__base_t::__state_); + std::apply( + [&](auto&&... __args) { + stdexec::set_value( + std::forward<__rcvr_t>(__state->__rcvr_), std::forward<_As>(__args)...); + }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + + /// Calls `set_error()` on the final receiver of the bulk operation, passing `__ex`. + void set_error(std::exception_ptr __ex) noexcept override { + auto __state = static_cast<_BulkState*>(__base_t::__state_); + stdexec::set_error(std::forward<__rcvr_t>(__state->__rcvr_), std::move(__ex)); + } + + /// Calls `set_stopped()` on the final receiver of the bulk operation. + void set_stopped() noexcept override { + auto __state = static_cast<_BulkState*>(__base_t::__state_); + stdexec::set_stopped(std::forward<__rcvr_t>(__state->__rcvr_)); + } + + /// Calls the bulk functor passing `__index` and the values from the previous sender. + void start(uint32_t __index) noexcept override { + auto __state = static_cast<_BulkState*>(__base_t::__state_); + std::apply( + [&](auto&&... __args) { __state->__snd_.__fun_(__index, __args...); }, + *reinterpret_cast*>(__base_t::__arguments_data_)); + } + }; + /// The state needed to execute the bulk sender created from system context. template struct __bulk_state { + using __rcvr_t = _Rcvr; + using __forward_args_helper_t = __forward_args_receiver<_Previous>; + /// The sender object that describes the work to be done. system_bulk_sender<_Previous, _Size, _Fn> __snd_; /// The receiver object that receives completion from the work described by the sender. _Rcvr __rcvr_; - /// The operating state on the implementation side. - void* __impl_os_ = nullptr; - /// Storage for the arguments passed from the previous receiver to the function object of the bulk sender. - alignas(__detail::__sender_data_t<_Previous>) unsigned char __arguments_data_[sizeof( - __detail::__sender_data_t<_Previous>)]{}; + /// Storage for the arguments and the helper needed to pass the arguments from the previous bulk sender to the bulk functor and receiver. + alignas(__forward_args_helper_t) unsigned char __forward_args_helper_[sizeof( + __forward_args_helper_t)]{}; /// Preallocated space for storing the operation state on the implementation size. struct alignas(STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_ALIGN) __preallocated { char __data[STDEXEC_SYSTEM_CONTEXT_BULK_SCHEDULE_OP_SIZE]; } __preallocated_{}; - ~__bulk_state() { - __snd_.__scheduler_->__destruct_bulk_schedule_operation(__snd_.__scheduler_, __impl_os_); - } + ~__bulk_state() = default; }; // namespace __detail - /// Receiver that is used in "bulk" to connect toe the input sender of the bulk operation. + /// Receiver that is used in "bulk" to connect to the input sender of the bulk operation. template struct __bulk_intermediate_receiver { /// Declare that this is a `receiver`. @@ -295,41 +326,18 @@ namespace exec { template void set_value(_As&&... __as) noexcept { - // Store the input data in the shared state, in the preallocated buffer. - static_assert(sizeof(std::tuple<_As...>) <= sizeof(__state_.__arguments_data_)); - new (&__state_.__arguments_data_) - std::tuple...>{std::move(__as)...}; - - // The function that needs to be applied to each item in the bulk operation. - auto __type_erased_item_fn = +[](void* __state_arg, unsigned long __idx) { - auto* __state = static_cast<__bulk_state_t*>(__state_arg); - std::apply( - [&](auto&&... __args) { __state->__snd_.__fun_(__idx, __args...); }, - *reinterpret_cast*>(&__state->__arguments_data_)); - }; - - // The function that needs to be applied when all items are complete. - auto __type_erased_cb_fn = - +[](void* __state_arg, int __completion_type, void* __exception) { - auto __state = static_cast<__bulk_state_t*>(__state_arg); - std::apply( - [&](auto&&... __args) { - __detail::__pass_to_receiver_with_args( - __completion_type, __exception, std::move(__state->__rcvr_), __args...); - }, - *reinterpret_cast*>(&__state->__arguments_data_)); - }; + // Store the input data in the shared state. + using __typed_forward_args_receiver_t = + __typed_forward_args_receiver<_Previous, __bulk_state_t, _As...>; + auto __r = new (&__state_.__forward_args_helper_) + __typed_forward_args_receiver_t(&__state_, std::move(__as)...); // Schedule the bulk work on the system scheduler. - __state_.__impl_os_ = __state_.__snd_.__scheduler_->__bulk_schedule( - __state_.__snd_.__scheduler_, // self - &__state_.__preallocated_, // preallocated - sizeof(__state_.__preallocated_), // psize - __type_erased_cb_fn, // cb - __type_erased_item_fn, // cb_item - &__state_, // data - static_cast(__state_.__snd_.__size_) // size - ); + // This will invoke `start` on our receiver multiple times, and then a completion signal (e.g., `set_value`). + system_context_replaceability::storage __storage{ + &__state_.__preallocated_, sizeof(__state_.__preallocated_)}; + __state_.__snd_.__scheduler_->bulk_schedule( + static_cast(__state_.__snd_.__size_), __storage, __r, __r); } /// Invoked when the previous sender completes with "stopped" to stop the entire work. @@ -399,6 +407,8 @@ namespace exec { friend struct __detail::__bulk_state; template friend struct __detail::__bulk_intermediate_receiver; + template + friend struct __detail::__typed_forward_args_receiver; public: /// Marks this type as being a sender @@ -406,7 +416,7 @@ namespace exec { /// Constructs `this`. system_bulk_sender(system_scheduler __sched, _Previous __previous, _Size __size, _Fn&& __fun) - : __scheduler_{__sched.__scheduler_} + : __scheduler_{__sched.__impl_} , __previous_{std::move(__previous)} , __size_{std::move(__size)} , __fun_{std::move(__fun)} { @@ -437,7 +447,7 @@ namespace exec { private: /// The underlying implementation of the scheduler we are using. - __exec_system_scheduler_interface* __scheduler_{nullptr}; + system_context_replaceability::system_scheduler* __scheduler_{nullptr}; /// The previous sender, the one that produces the input value for the bulk function. _Previous __previous_; /// The size of the bulk operation. @@ -447,30 +457,24 @@ namespace exec { }; inline system_context::system_context() { - __impl_ = __get_exec_system_context_impl(); - // TODO error handling + __impl_ = system_context_replaceability::query_system_context< + system_context_replaceability::system_scheduler>(); + if (!__impl_) { + throw std::runtime_error{"No system context implementation found"}; + } } inline system_scheduler system_context::get_scheduler() { - return system_scheduler{__impl_->__get_scheduler(__impl_)}; + return system_scheduler{__impl_}; } inline size_t system_context::max_concurrency() const noexcept { - return std::thread::hardware_concurrency(); + return __impl_->max_concurrency(); } - auto system_scheduler::query(stdexec::get_forward_progress_guarantee_t) const noexcept + inline auto system_scheduler::query(stdexec::get_forward_progress_guarantee_t) const noexcept -> stdexec::forward_progress_guarantee { - switch (__scheduler_->__forward_progress_guarantee) { - case 0: - return stdexec::forward_progress_guarantee::concurrent; - case 1: - return stdexec::forward_progress_guarantee::parallel; - case 2: - return stdexec::forward_progress_guarantee::weakly_parallel; - default: - return stdexec::forward_progress_guarantee::parallel; - } + return __impl_->get_forward_progress_guarantee(); } struct __transform_system_bulk_sender { diff --git a/test/exec/test_system_context.cpp b/test/exec/test_system_context.cpp index d3d1a9bb0..8f4d6c366 100644 --- a/test/exec/test_system_context.cpp +++ b/test/exec/test_system_context.cpp @@ -237,67 +237,34 @@ TEST_CASE("simple bulk chaining on system context", "[types][system_scheduler]") CHECK(std::get<0>(res.value()) == pool_id); } -struct my_system_scheduler_impl : __exec_system_scheduler_interface { - my_system_scheduler_impl() - : base_{pool_} { - __forward_progress_guarantee = base_.__forward_progress_guarantee; - __schedule_operation_size = base_.__schedule_operation_size; - __schedule_operation_alignment = base_.__schedule_operation_alignment; - __destruct_schedule_operation = base_.__destruct_schedule_operation; - __bulk_schedule_operation_size = base_.__bulk_schedule_operation_size; - __bulk_schedule_operation_alignment = base_.__bulk_schedule_operation_alignment; - __bulk_schedule = base_.__bulk_schedule; - __destruct_bulk_schedule_operation = base_.__destruct_bulk_schedule_operation; - - __schedule = __schedule_impl; // have our own schedule implementation - } +struct my_system_scheduler_impl : exec::__system_context_default_impl::__system_scheduler_impl { + using base_t = exec::__system_context_default_impl::__system_scheduler_impl; + + my_system_scheduler_impl() = default; int num_schedules() const { return count_schedules_; } - private: - exec::static_thread_pool pool_; - exec::__system_context_default_impl::__system_scheduler_impl base_; - int count_schedules_ = 0; - - static void* __schedule_impl( - __exec_system_scheduler_interface* self_arg, - void* preallocated, - uint32_t psize, - __exec_system_context_completion_callback_t callback, - void* data) noexcept { - auto self = static_cast(self_arg); - // increment our counter. - self->count_schedules_++; - // delegate to the base implementation. - return self->base_.__schedule(&self->base_, preallocated, psize, callback, data); - } -}; - -struct my_system_context_impl : __exec_system_context_interface { - my_system_context_impl() { - __version = 202402; - __get_scheduler = __get_scheduler_impl; + void schedule( + exec::__system_context_default_impl::storage __s, + exec::__system_context_default_impl::receiver* __r) noexcept override { + count_schedules_++; + base_t::schedule(__s, __r); } - int num_schedules() const { - return scheduler_.num_schedules(); - } private: - my_system_scheduler_impl scheduler_{}; - - static __exec_system_scheduler_interface* - __get_scheduler_impl(__exec_system_context_interface* __self) noexcept { - return &static_cast(__self)->scheduler_; - } + int count_schedules_ = 0; }; TEST_CASE("can change the implementation of system context", "[types][system_scheduler]") { + using namespace exec::system_context_replaceability; + // Not to spec. - my_system_context_impl ctx_impl; - __set_exec_system_context_impl(&ctx_impl); + my_system_scheduler_impl my_scheduler; + auto scr = query_system_context<__system_context_replaceability>(); + scr->__set_system_scheduler(&my_scheduler); std::thread::id this_id = std::this_thread::get_id(); std::thread::id pool_id{}; @@ -306,9 +273,9 @@ TEST_CASE("can change the implementation of system context", "[types][system_sch auto snd = ex::then(ex::schedule(sched), [&] { pool_id = std::this_thread::get_id(); }); - REQUIRE(ctx_impl.num_schedules() == 0); + REQUIRE(my_scheduler.num_schedules() == 0); ex::sync_wait(std::move(snd)); - REQUIRE(ctx_impl.num_schedules() == 1); + REQUIRE(my_scheduler.num_schedules() == 1); REQUIRE(pool_id != std::thread::id{}); REQUIRE(this_id != pool_id); diff --git a/test/exec/test_system_context_replaceability.cpp b/test/exec/test_system_context_replaceability.cpp index 82d0e4cbd..e100ac853 100644 --- a/test/exec/test_system_context_replaceability.cpp +++ b/test/exec/test_system_context_replaceability.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include namespace ex = stdexec; @@ -26,61 +25,34 @@ namespace { static int count_schedules = 0; - struct my_system_scheduler_impl : __exec_system_scheduler_interface { - my_system_scheduler_impl() - : base_{pool_} { - __forward_progress_guarantee = base_.__forward_progress_guarantee; - __schedule_operation_size = base_.__schedule_operation_size; - __schedule_operation_alignment = base_.__schedule_operation_alignment; - __destruct_schedule_operation = base_.__destruct_schedule_operation; - __bulk_schedule_operation_size = base_.__bulk_schedule_operation_size; - __bulk_schedule_operation_alignment = base_.__bulk_schedule_operation_alignment; - __bulk_schedule = base_.__bulk_schedule; - __destruct_bulk_schedule_operation = base_.__destruct_bulk_schedule_operation; + struct my_system_scheduler_impl : exec::__system_context_default_impl::__system_scheduler_impl { + using base_t = exec::__system_context_default_impl::__system_scheduler_impl; - __schedule = __schedule_impl; // have our own schedule implementation - } - - private: - exec::static_thread_pool pool_; - exec::__system_context_default_impl::__system_scheduler_impl base_; + my_system_scheduler_impl() = default; - static void* __schedule_impl( - __exec_system_scheduler_interface* self_arg, - void* preallocated, - uint32_t psize, - __exec_system_context_completion_callback_t callback, - void* data) noexcept { - printf("Using my_system_scheduler_impl::__schedule_impl\n"); - auto self = static_cast(self_arg); - // increment our counter. + void schedule( + exec::__system_context_default_impl::storage __s, + exec::__system_context_default_impl::receiver* __r) noexcept override { count_schedules++; - // delegate to the base implementation. - return self->base_.__schedule(&self->base_, preallocated, psize, callback, data); + base_t::schedule(__s, __r); } }; - struct my_system_context_impl : __exec_system_context_interface { - my_system_context_impl() { - __version = 202402; - __get_scheduler = __get_scheduler_impl; + void* my_query_system_context_interface(std::type_index id) noexcept { + if (id == typeid(exec::__system_context_default_impl::system_scheduler)) { + static my_system_scheduler_impl instance; + return &instance; } + return nullptr; + } - private: - my_system_scheduler_impl scheduler_{}; - - static __exec_system_scheduler_interface* - __get_scheduler_impl(__exec_system_context_interface* __self) noexcept { - return &static_cast(__self)->scheduler_; - } - }; } // namespace // Should replace the function defined in __system_context_default_impl.hpp -extern "C" STDEXEC_ATTRIBUTE((weak)) __exec_system_context_interface* __get_exec_system_context_impl() { - printf("Using my_system_context_impl\n"); - static my_system_context_impl instance; - return &instance; +extern STDEXEC_ATTRIBUTE((weak)) + void* + __query_system_context_interface(std::type_index id) noexcept { + return my_query_system_context_interface(id); } TEST_CASE(