diff --git a/include/exec/reduce.hpp b/include/exec/reduce.hpp new file mode 100644 index 000000000..d2cf25f14 --- /dev/null +++ b/include/exec/reduce.hpp @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "../stdexec/execution.hpp" + +#include +#include + +namespace exec { + + namespace __reduce { + + template + struct __receiver { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __data { + _Receiver __rcvr_; + STDEXEC_NO_UNIQUE_ADDRESS _InitT __init_; + STDEXEC_NO_UNIQUE_ADDRESS _RedOp __redop_; + }; + + struct __t { + using is_receiver = void; + using __id = __receiver; + __data* __op_; + + template < + stdexec::__same_as _Tag, + class _Range, + class _Value = stdexec::range_value_t<_Range>> + requires stdexec::invocable<_RedOp, _InitT, _Value> + && stdexec::__receiver_of_invoke_result<_Receiver, _RedOp, _InitT, _Value> + friend void tag_invoke(_Tag, __t&& __self, _Range&& __range) noexcept { + auto result = std::reduce( + std::ranges::begin(__range), std::ranges::end(__range), __self.__op_->__init_, __self.__op_->__redop_); + + stdexec::set_value((_Receiver&&) __self.__op_->__rcvr_, std::move(result)); + } + + template _Tag, class... _As> + requires stdexec::__callable<_Tag, _Receiver, _As...> + friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept { + __tag((_Receiver&&) __self.__op_->__rcvr_, (_As&&) __as...); + } + + friend auto tag_invoke(stdexec::get_env_t, const __t& __self) noexcept + -> stdexec::env_of_t { + return stdexec::get_env(__self.__op_->__rcvr_); + } + }; + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + using __receiver_id = __receiver<_ReceiverId, _InitT, _RedOp>; + using __receiver_t = stdexec::__t<__receiver_id>; + + struct __t : stdexec::__immovable { + using __id = __operation; + typename __receiver_id::__data __data_; + stdexec::connect_result_t<_Sender, __receiver_t> __op_; + + __t(_Sender&& __sndr, _Receiver __rcvr, _InitT __init, _RedOp __redop) noexcept( + stdexec::__nothrow_decay_copyable<_Receiver> && stdexec::__nothrow_decay_copyable<_RedOp> + && stdexec::__nothrow_connectable<_Sender, __receiver_t>) + : __data_{(_Receiver&&) __rcvr, (_InitT&&) __init, (_RedOp&&) __redop} + , __op_(stdexec::connect((_Sender&&) __sndr, __receiver_t{&__data_})) { + } + + friend void tag_invoke(stdexec::start_t, __t& __self) noexcept { + stdexec::start(__self.__op_); + } + }; + }; + + template + struct __sender { + using _Sender = stdexec::__t<_SenderId>; + template + using __receiver = stdexec::__t<__receiver, _InitT, _RedOp>>; + template + using __operation = stdexec::__t< + __operation, stdexec::__id<_Receiver>, _InitT, _RedOp>>; + + struct __t { + using __id = __sender; + using is_sender = void; + STDEXEC_NO_UNIQUE_ADDRESS _Sender __sndr_; + STDEXEC_NO_UNIQUE_ADDRESS _InitT __init_; + STDEXEC_NO_UNIQUE_ADDRESS _RedOp __redop_; + + template _Self, stdexec::receiver _Receiver> + requires stdexec::sender_to, __receiver<_Receiver>> + friend auto tag_invoke(stdexec::connect_t, _Self&& __self, _Receiver __rcvr) noexcept( + stdexec::__nothrow_constructible_from< + __operation<_Self, _Receiver>, + stdexec::__copy_cvref_t<_Self, _Sender>, + _Receiver&&, + stdexec::__copy_cvref_t<_Self, _InitT>, + stdexec::__copy_cvref_t<_Self, _RedOp>>) -> __operation<_Self, _Receiver> { + return { + ((_Self&&) __self).__sndr_, + (_Receiver&&) __rcvr, + ((_Self&&) __self).__init_, + ((_Self&&) __self).__redop_}; + } + + template _Self, class _Env> + friend auto tag_invoke(stdexec::get_completion_signatures_t, _Self&&, _Env&&) + -> stdexec::dependent_completion_signatures<_Env>; + + template _Self, class _Env> + friend auto tag_invoke(stdexec::get_completion_signatures_t, _Self&&, _Env&&) + -> stdexec::completion_signatures + requires true; + + friend auto tag_invoke(stdexec::get_env_t, const __t& __self) noexcept + -> stdexec::env_of_t { + return get_env(__self.__sndr_); + } + }; + }; + + struct reduce_t { + template + using __sender = + stdexec::__t<__sender>, _InitT, _RedOp>>; + + template + requires stdexec::__tag_invocable_with_completion_scheduler< + reduce_t, + stdexec::set_value_t, + _Sender, + _InitT, + _RedOp> + stdexec::sender auto operator()(_Sender&& __sndr, _InitT __init, _RedOp __redop) const noexcept( + stdexec::nothrow_tag_invocable< + reduce_t, + stdexec::__completion_scheduler_for<_Sender, stdexec::set_value_t>, + _Sender, + _InitT, + _RedOp>) { + auto __sched = stdexec::get_completion_scheduler(stdexec::get_env(__sndr)); + return tag_invoke( + reduce_t{}, std::move(__sched), (_Sender&&) __sndr, (_InitT&&) __init, (_RedOp&&) __redop); + } + + template + requires(!stdexec::__tag_invocable_with_completion_scheduler< + reduce_t, + stdexec::set_value_t, + _Sender, + _InitT, + _RedOp>) + && stdexec::tag_invocable + stdexec::sender auto operator()(_Sender&& __sndr, _InitT __init, _RedOp __redop) const + noexcept(stdexec::nothrow_tag_invocable) { + return tag_invoke(reduce_t{}, (_Sender&&) __sndr, (_InitT&&) __init, (_RedOp&&) __redop); + } + + template + requires(!stdexec::__tag_invocable_with_completion_scheduler< + reduce_t, + stdexec::set_value_t, + _Sender, + _InitT, + _RedOp>) + && (!stdexec::tag_invocable) + STDEXEC_DETAIL_CUDACC_HOST_DEVICE __sender<_Sender, _InitT, _RedOp> + operator()(_Sender&& __sndr, _InitT __init, _RedOp __redop) const { + return __sender<_Sender, _InitT, _RedOp>{(_Sender&&) __sndr, __init, (_RedOp&&) __redop}; + } + + template > + stdexec::__binder_back operator()(_InitT __init, _RedOp __redop = {}) const { + return { + {}, + {}, + {(_InitT&&) __init, (_RedOp&&) __redop} + }; + } + }; + + } + + using __reduce::reduce_t; + inline constexpr reduce_t reduce{}; +} diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index 96f46f146..d7eeff357 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -16,6 +16,8 @@ */ #pragma once +#include "reduce.hpp" + #include "../stdexec/execution.hpp" #include "../stdexec/__detail/__config.hpp" #include "../stdexec/__detail/__intrusive_queue.hpp" @@ -407,10 +409,6 @@ namespace exec { } }; - friend sender tag_invoke(stdexec::schedule_t, const scheduler& s) noexcept { - return s.make_sender_(); - } - template using bulk_sender_t = // bulk_sender< @@ -424,6 +422,264 @@ namespace exec { return bulk_sender_t{*sch.pool_, (S&&) sndr, shape, (Fn&&) fun}; } + template + struct reduce_shared_state { + using Sender = stdexec::__t; + using Receiver = stdexec::__t; + + struct reduction_task : task_base { + reduce_shared_state* sh_state_; + + reduction_task(reduce_shared_state* sh_state) + : sh_state_(sh_state) { + task_base::__execute = [](task_base* t, const std::uint32_t tid) noexcept { + auto& sh_state = *static_cast(t)->sh_state_; + auto total_threads = sh_state.num_threads_available(); + auto& partial = sh_state.partials_[tid]; + + auto reducer = [&](auto& input) { + const auto [first, last] = even_share(std::ranges::size(input), tid, total_threads); + if (first != last) { + auto begin = std::ranges::begin(input) + first; + partial = std::reduce( + std::next(begin), std::ranges::begin(input) + last, *begin, sh_state.redop_); + } + }; + + auto finalize = [&](auto&) { + auto result = std::reduce( + std::ranges::begin(sh_state.partials_), + std::ranges::end(sh_state.partials_), + sh_state.init_, + sh_state.redop_); + // deallocate + std::vector tmp; + sh_state.partials_.swap(tmp); + stdexec::set_value((Receiver&&) sh_state.receiver_, std::move(result)); + }; + + sh_state.apply(reducer); + + const bool is_last_thread = sh_state.finished_threads_.fetch_add(1) + == (total_threads - 1); + if (is_last_thread) { + sh_state.apply(finalize); + } + }; + } + }; + + template + struct dummy_ { + using __t = T; + }; + + using inrange_t = // + stdexec::__decay_t, + stdexec::__q, + stdexec::__q>>>>; + + using reduction_result_t = std::invoke_result_t; + + inrange_t range_; + static_thread_pool& pool_; + Receiver receiver_; + InitT init_; + RedOp redop_; + std::vector partials_; + std::atomic finished_threads_{0}; + std::vector tasks_; + + static std::pair + even_share(std::size_t n, std::size_t rank, std::size_t size) noexcept { + const auto avg_per_thread = n / size; + const auto n_big_share = avg_per_thread + 1; + const auto big_shares = n % size; + const auto is_big_share = rank < big_shares; + const auto begin = is_big_share + ? n_big_share * rank + : n_big_share * big_shares + (rank - big_shares) * avg_per_thread; + const auto end = begin + (is_big_share ? n_big_share : avg_per_thread); + + return std::make_pair(begin, end); + } + + std::uint32_t num_threads_available() const { + return pool_.available_parallelism(); + } + + template + void apply(F f) { + f(range_); + } + + reduce_shared_state(static_thread_pool& pool, Receiver receiver, InitT init, RedOp redop) + : range_{} + , pool_{pool} + , receiver_{(Receiver&&) receiver} + , init_{init} + , redop_{redop} + , partials_(num_threads_available()) + , tasks_{num_threads_available(), {this}} { + } + }; + + template + struct reduce_reciever { + using is_receiver = void; + using Sender = stdexec::__t; + using Receiver = stdexec::__t; + + using shared_state = reduce_shared_state; + + shared_state& shared_state_; + + void enqueue() noexcept { + shared_state_.pool_.bulk_enqueue( + shared_state_.tasks_.data(), shared_state_.num_threads_available()); + } + + template + friend void tag_invoke( + stdexec::same_as auto, + reduce_reciever&& self, + Range&& input) noexcept { + shared_state& state = self.shared_state_; + + const auto n = std::ranges::size(input); + const auto min_parallel_size = 2 * state.num_threads_available(); + if (n >= min_parallel_size) { + state.range_ = (Range&&) input; + self.enqueue(); + } else { + auto result = std::reduce( + std::ranges::begin(input), std::ranges::end(input), state.init_, state.redop_); + state.apply([&](auto&) { + stdexec::set_value(std::move(state.receiver_), std::move(result)); + }); + } + } + + template Tag, class... As> + friend void tag_invoke(Tag tag, reduce_reciever&& self, As&&... as) noexcept { + shared_state& state = self.shared_state_; + tag((Receiver&&) state.receiver_, (As&&) as...); + } + + friend auto tag_invoke(stdexec::get_env_t, const reduce_reciever& self) noexcept + -> stdexec::env_of_t { + return stdexec::get_env(self.shared_state_.receiver_); + } + }; + + template + struct reduce_op_state { + using Sender = stdexec::__t; + using Receiver = stdexec::__t; + + using reduce_rcvr = reduce_reciever; + using shared_state = reduce_shared_state; + using inner_op_state = stdexec::connect_result_t; + + shared_state shared_state_; + + inner_op_state inner_op_; + + friend void tag_invoke(stdexec::start_t, reduce_op_state& op) noexcept { + stdexec::start(op.inner_op_); + } + + reduce_op_state( + static_thread_pool& pool, + InitT init, + RedOp redop, + Sender&& sender, + Receiver receiver) + : shared_state_(pool, (Receiver&&) receiver, init, redop) + , inner_op_{stdexec::connect((Sender&&) sender, reduce_rcvr{shared_state_})} { + } + }; + + template + struct reduce_sender { + using Sender = stdexec::__t; + using RedOp = stdexec::__t; + using is_sender = void; + using reduction_result_t = std::invoke_result_t; + + static_thread_pool& pool_; + Sender sndr_; + InitT init_; + RedOp redop_; + + template + using completion_signatures = + stdexec::completion_signatures; + + template + using reduce_op_state_t = // + reduce_op_state< + stdexec::__x>, + stdexec::__x>, + InitT, + RedOp>; + + template Self, stdexec::receiver Receiver> + requires stdexec:: + receiver_of>> + friend reduce_op_state_t // + tag_invoke(stdexec::connect_t, Self&& self, Receiver&& rcvr) // + noexcept(stdexec::__nothrow_constructible_from< + reduce_op_state_t, + static_thread_pool&, + InitT, + RedOp, + Sender, + Receiver>) { + return reduce_op_state_t{ + self.pool_, self.init_, self.redop_, ((Self&&) self).sndr_, (Receiver&&) rcvr}; + } + + template Self, class Env> + friend auto tag_invoke(stdexec::get_completion_signatures_t, Self&&, Env&&) + -> stdexec::dependent_completion_signatures; + + template Self, class Env> + friend auto tag_invoke(stdexec::get_completion_signatures_t, Self&&, Env&&) + -> completion_signatures + requires true; + + friend auto tag_invoke(stdexec::get_env_t, const reduce_sender& self) noexcept + -> stdexec::env_of_t { + return stdexec::get_env(self.sndr_); + } + }; + + + template + using reduce_sender_t = // + reduce_sender< + stdexec::__x>, + stdexec::__decay_t, + stdexec::__x>>; + + template + friend reduce_sender_t tag_invoke( + exec::reduce_t, + const scheduler& sch, + S&& sndr, + InitT init, + RedOp redop) noexcept { + return reduce_sender_t{ + *sch.pool_, (S&&) sndr, (InitT&&) init, (RedOp&&) redop}; + } + + friend sender tag_invoke(stdexec::schedule_t, const scheduler& s) noexcept { + return s.make_sender_(); + } + friend stdexec::forward_progress_guarantee tag_invoke(stdexec::get_forward_progress_guarantee_t, const static_thread_pool&) noexcept { return stdexec::forward_progress_guarantee::parallel; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index b2d2eeb37..501aa9433 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -69,6 +69,7 @@ set(stdexec_test_sources exec/test_on.cpp exec/test_on2.cpp exec/test_on3.cpp + exec/test_reduce.cpp exec/test_repeat_effect_until.cpp exec/async_scope/test_dtor.cpp exec/async_scope/test_spawn.cpp diff --git a/test/exec/test_reduce.cpp b/test/exec/test_reduce.cpp new file mode 100644 index 000000000..492e08d81 --- /dev/null +++ b/test/exec/test_reduce.cpp @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2023 NVIDIA Corporation + * + * Licensed under the Apache License Version 2.0 with LLVM Exceptions + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +TEST_CASE("exec reduce returns a sender with single input", "[adaptors][reduce]") { + constexpr int N = 2048; + int input[N] = {}; + std::fill_n(input, N, 1); + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) | exec::reduce(0); + + STATIC_REQUIRE(stdexec::sender_of); + + (void) task; +} + +TEST_CASE("exec reduce returns a sender with two inputs", "[adaptors][reduce]") { + constexpr int N = 2048; + int input[N] = {}; + std::fill_n(input, N, 1); + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) + | exec::reduce(0, std::minus<>{}); + + STATIC_REQUIRE(stdexec::sender_of); + + (void) task; +} + +TEST_CASE("exec reduce returns init value when value range is empty", "[adaptors][reduce]") { + constexpr int input[1]{}; + constexpr int init = 47; + std::span range{input, 0}; + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), range) | exec::reduce(47); + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == init); +} + +TEST_CASE("exec reduce yields correct result for single value in range", "[adaptors][reduce]") { + constexpr int single = 37; + constexpr int init = 47; + + std::vector input{single}; + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == init + single); +} + +TEST_CASE("exec reduce uses sum as default operation", "[adaptors][reduce]") { + constexpr int N = 2048; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 1); + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == (N * (N + 1) / 2) + init); +} + +TEST_CASE("exec reduce uses the passed reduction operation", "[adaptors][reduce]") { + constexpr int N = 2048; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 2); + + auto minFun = [](auto acc, auto value) { + return std::min(acc, value); + }; + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) + | exec::reduce(init, minFun); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == 2); +} + +TEST_CASE("exec reduce yields correct result with product as operation", "[adaptors][reduce]") { + constexpr int N = 15; + constexpr int init = 3; + + std::vector input(N, 2); + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) + | exec::reduce(init, std::multiplies<>{}); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == 3 * std::pow(2, 15)); +} + +TEST_CASE("exec reduce works with custom value type", "[adaptors][reduce]") { + constexpr int N = 2048; + constexpr int init = 47; + + struct value_t { + int x; + }; + + auto sum = [](value_t acc, value_t val) { + return value_t{acc.x + val.x}; + }; + + std::vector input(N, value_t{1}); + + exec::static_thread_pool pool{}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) + | exec::reduce(value_t{init}, sum); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + STATIC_REQUIRE(stdexec::sender_of); + + REQUIRE(result.x == N + init); +} + +TEST_CASE("exec reduce yields correct result if thread pool has 1 thread", "[adaptors][reduce]") { + constexpr int N = 128; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 1); + + exec::static_thread_pool pool{1}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == (N * (N + 1) / 2) + init); +} + +TEST_CASE( + "exec reduce correct if thread pool has more threads than values in range", + "[adaptors][reduce]") { + constexpr int N = 4; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 1); + + exec::static_thread_pool pool{N + 2}; + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == (N * (N + 1) / 2) + init); +} + +TEST_CASE( + "exec reduce correct if reduction threads are not balanced [adaptors][reduce]") { + constexpr int N = 128; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 1); + + exec::static_thread_pool pool{3}; // N mod 3 = 2 + auto task = stdexec::transfer_just(pool.get_scheduler(), std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == (N * (N + 1) / 2) + init); +} + +TEST_CASE("exec reduce runs on single_thread_context scheduler", "[adaptors][reduce]") { + constexpr int N = 128; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 1); + + exec::single_thread_context single{}; + auto task = stdexec::transfer_just(single.get_scheduler(), std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == (N * (N + 1) / 2) + init); +} + +TEST_CASE("exec reduce runs on inline_scheduler", "[adaptors][reduce]") { + constexpr int N = 128; + constexpr int init = 47; + + std::vector input(N); + std::iota(input.begin(), input.end(), 1); + + exec::inline_scheduler sched; + auto task = stdexec::transfer_just(sched, std::span{input}) | exec::reduce(init); + + auto [result] = stdexec::sync_wait(std::move(task)).value(); + + REQUIRE(result == (N * (N + 1) / 2) + init); +}