Skip to content

Commit

Permalink
change the system_context to be replaceable at runtime
Browse files Browse the repository at this point in the history
also, fix static initialization order fiasco issues wrt the system context at shutdown.

this design for the system context has a few nice properties:

* lazily constructed at first use
* accessible by default-constructing a system_context object, which grabs a ref-count to the current global system context in its ctor and releases it in its dtor.
* is usable at all points during the execution of the program, even before/after main, with no Static Initialization Order Fiasco.
* is replaceable at runtime as many times as you like.
* replacing the global system context does not affect any existing `system_context` objects.
* shuts down cleanly with no leaks
* this commit changes the ABI of the system context interfaces, so it bumps the version number.
* it also reserves space in the `system_context_interface` and `system_scheduler_interface` for additional "virtual" functions.

adds two helper types: `system_context_base` and `system_scheduler_base`, which automate the population of the `system_context_interface` and `system_scheduler_interface` "vtables", respectively.

it also adds `static_system_context_instance` to make it simple to safely put a custom system context into static storage.
  • Loading branch information
ericniebler committed Jul 20, 2024
1 parent bfc2610 commit e5945b5
Show file tree
Hide file tree
Showing 7 changed files with 623 additions and 326 deletions.
187 changes: 74 additions & 113 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,41 @@
*/
#pragma once

#include "__system_context_if.h"
#include "stdexec/execution.hpp"
#include "exec/static_thread_pool.hpp"

namespace exec::__system_context_default_impl {
using namespace stdexec::tags;
STDEXEC_PRAGMA_PUSH()
// "warning: empty struct has size 0 in C, size 1 in C++"
// The intended use of these two structs is as a base class for other structs.
// When used like that, the size occupied by these structs is 0 because of the
// empty base class optimization.
STDEXEC_PRAGMA_IGNORE_GNU("-Wextern-c-compat")

#include "__system_context_if.h"

using __pool_scheduler_t = decltype(std::declval<exec::static_thread_pool>().get_scheduler());
STDEXEC_PRAGMA_POP()

namespace exec::__detail {
using stdexec::__declval;
using __pool_scheduler_t =
stdexec::__decay_t<decltype(__declval<exec::static_thread_pool&>().get_scheduler())>;

/// Receiver that calls the callback when the operation completes.
template <class _Sender>
template <class _Sender, bool _IsBulk>
struct __operation;

template <class _Sender>
template <class _Sender, bool _IsBulk>
struct __recv {
using receiver_concept = stdexec::receiver_t;

/// The callback to be called.
__exec_system_context_completion_callback_t __cb_;
system_context_completion_callback __cb_;

/// The data to be passed to the callback.
void* __data_;

/// The owning operation state, to be destructed when the operation completes.
__operation<_Sender>* __op_;
__operation<_Sender, _IsBulk>* __op_;

void set_value() noexcept {
__cb_(__data_, 0, nullptr);
Expand All @@ -54,10 +64,11 @@ namespace exec::__system_context_default_impl {
}
};

template <typename _Sender>
struct __operation {
template <class _Sender, bool _IsBulk>
struct __operation
: stdexec::__if_c<_IsBulk, system_bulk_operation_state, system_operation_state> {
/// The inner operation state, that results out of connecting the underlying sender with the receiver.
stdexec::connect_result_t<_Sender, __recv<_Sender>> __inner_op_;
stdexec::connect_result_t<_Sender, __recv<_Sender, _IsBulk>> __inner_op_;
/// True if the operation is on the heap, false if it is in the preallocated space.
bool __on_heap_;

Expand All @@ -66,123 +77,107 @@ namespace exec::__system_context_default_impl {
void* __preallocated,
size_t __psize,
_Sender __sndr,
__exec_system_context_completion_callback_t __cb,
system_context_completion_callback __cb,
void* __data) {
if (__preallocated == nullptr || __psize < sizeof(__operation)) {
return new __operation(std::move(__sndr), __cb, __data, true);
return ::new __operation(std::move(__sndr), __cb, __data, true);
} else {
return new (__preallocated) __operation(std::move(__sndr), __cb, __data, false);
return ::new (__preallocated) __operation(std::move(__sndr), __cb, __data, false);
}
}

/// Destructs the operation; frees any allocated memory.
void __destruct() {
if (__on_heap_) {
delete this;
} else {
std::destroy_at(this);
static void operator delete(__operation* __self, std::destroying_delete_t) noexcept {
auto on_heap = __self->__on_heap_;
std::destroy_at(__self);
if (on_heap) {
::operator delete(__self);
}
}

private:
__operation(
_Sender __sndr,
__exec_system_context_completion_callback_t __cb,
system_context_completion_callback __cb,
void* __data,
bool __on_heap)
: __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__cb, __data, this}))
bool __on_heap) noexcept(stdexec::__nothrow_connectable<_Sender, __recv<_Sender, _IsBulk>>) //
: __inner_op_(
stdexec::connect(std::move(__sndr), __recv<_Sender, _IsBulk>{__cb, __data, this}))
, __on_heap_(__on_heap) {
}
};

struct __system_scheduler_impl : __exec_system_scheduler_interface {
explicit __system_scheduler_impl(exec::static_thread_pool& __pool)
: __pool_scheduler_{__pool.get_scheduler()} {
__forward_progress_guarantee = 1; // parallel
__schedule_operation_size = sizeof(__schedule_operation_t),
__schedule_operation_alignment = alignof(__schedule_operation_t),
__schedule = __schedule_impl;
__destruct_schedule_operation = __destruct_schedule_operation_impl;
__bulk_schedule_operation_size = sizeof(__bulk_schedule_operation_t),
__bulk_schedule_operation_alignment = alignof(__bulk_schedule_operation_t),
__bulk_schedule = __bulk_schedule_impl;
__destruct_bulk_schedule_operation = __destruct_bulk_schedule_operation_impl;
}

struct __system_scheduler_impl : exec::system_scheduler_base {
private:
/// Scheduler from the underlying thread pool.
__pool_scheduler_t __pool_scheduler_;

struct __bulk_functor {
__exec_system_context_bulk_item_callback_t __cb_item_;
system_context_bulk_item_callback __cb_item_;
void* __data_;

void operator()(unsigned long __idx) const noexcept {
void operator()(uint64_t __idx) const noexcept {
__cb_item_(__data_, __idx);
}
};

using __schedule_operation_t =
__operation<decltype(stdexec::schedule(std::declval<__pool_scheduler_t>()))>;
public:
static constexpr stdexec::forward_progress_guarantee forward_progress_guarantee =
stdexec::forward_progress_guarantee::parallel;

explicit __system_scheduler_impl(exec::static_thread_pool& __pool) noexcept
: exec::system_scheduler_base(this)
, __pool_scheduler_{__pool.get_scheduler()} {
}

using __bulk_schedule_operation_t = __operation<decltype(stdexec::bulk(
stdexec::schedule(std::declval<__pool_scheduler_t>()),
std::declval<unsigned long>(),
std::declval<__bulk_functor>()))>;
using schedule_operation = //
__operation<stdexec::__result_of<stdexec::schedule, __pool_scheduler_t>, false>;

static void* __schedule_impl(
__exec_system_scheduler_interface* __self,
using bulk_schedule_operation = //
__operation<
stdexec::__result_of<
stdexec::bulk,
stdexec::__result_of<stdexec::schedule, __pool_scheduler_t&>,
uint64_t,
__bulk_functor>,
true>;

system_operation_state* schedule(
void* __preallocated,
uint32_t __psize,
__exec_system_context_completion_callback_t __cb,
system_context_completion_callback __cb,
void* __data) noexcept {

auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sndr = stdexec::schedule(__this->__pool_scheduler_);
auto __os = __schedule_operation_t::__construct_maybe_alloc(
auto __sndr = stdexec::schedule(__pool_scheduler_);
auto __os = schedule_operation::__construct_maybe_alloc(
__preallocated, __psize, std::move(__sndr), __cb, __data);
stdexec::start(__os->__inner_op_);
return __os;
}

static void __destruct_schedule_operation_impl(
__exec_system_scheduler_interface* /*__self*/,
void* __operation) noexcept {
auto __op = static_cast<__schedule_operation_t*>(__operation);
__op->__destruct();
}

static void* __bulk_schedule_impl(
__exec_system_scheduler_interface* __self,
system_bulk_operation_state* bulk_schedule(
void* __preallocated,
uint32_t __psize,
__exec_system_context_completion_callback_t __cb,
__exec_system_context_bulk_item_callback_t __cb_item,
system_context_completion_callback __cb,
system_context_bulk_item_callback __cb_item,
void* __data,
unsigned long __size) noexcept {

auto __this = static_cast<__system_scheduler_impl*>(__self);
uint64_t __size) noexcept {
auto __sndr = stdexec::bulk(
stdexec::schedule(__this->__pool_scheduler_), __size, __bulk_functor{__cb_item, __data});
auto __os = __bulk_schedule_operation_t::__construct_maybe_alloc(
stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__cb_item, __data});
auto __os = bulk_schedule_operation::__construct_maybe_alloc(
__preallocated, __psize, std::move(__sndr), __cb, __data);
stdexec::start(__os->__inner_op_);
return __os;
}

static void __destruct_bulk_schedule_operation_impl(
__exec_system_scheduler_interface* /*__self*/,
void* __operation) noexcept {
auto __op = static_cast<__bulk_schedule_operation_t*>(__operation);
__op->__destruct();
}
};

/// Default implementation of a system context, based on `static_thread_pool`
struct __system_context_impl : __exec_system_context_interface {
__system_context_impl() {
__version = 202402;
__get_scheduler = __get_scheduler_impl;
struct __system_context_impl : exec::system_context_base {
__system_context_impl() noexcept
: exec::system_context_base(this) {
}

system_scheduler_interface* get_scheduler() noexcept {
return &__scheduler_;
}

private:
Expand All @@ -191,39 +186,5 @@ namespace exec::__system_context_default_impl {

/// The system scheduler implementation.
__system_scheduler_impl __scheduler_{__pool_};

static __exec_system_scheduler_interface*
__get_scheduler_impl(__exec_system_context_interface* __self) noexcept {
return &static_cast<__system_context_impl*>(__self)->__scheduler_;
}
};

/// Keeps track of the object implementing the system context interface.
struct __instance_holder {

/// Get the only instance of this class.
static __instance_holder& __singleton() {
static __instance_holder __this_instance_;
return __this_instance_;
}

/// Get the currently selected system context object.
__exec_system_context_interface* __get_current_instance() const noexcept {
return __current_instance_;
}

/// Allows changing the currently selected system context object; used for testing.
void __set_current_instance(__exec_system_context_interface* __instance) noexcept {
__current_instance_ = __instance;
}

private:
__instance_holder() {
static __system_context_impl __default_instance_;
__current_instance_ = &__default_instance_;
}

__exec_system_context_interface* __current_instance_;
};

} // namespace exec::__system_context_default_impl
} // namespace exec::__detail
Loading

0 comments on commit e5945b5

Please sign in to comment.