From 389a661d62357b810e011bcf1232559d738e5aac Mon Sep 17 00:00:00 2001 From: Maikel Nadolski Date: Tue, 24 Oct 2023 21:59:33 +0200 Subject: [PATCH] Dont use ranges and fix advance_put_index --- include/exec/__detail/__bwos_lifo_queue.hpp | 52 ++++++++++----------- include/exec/static_thread_pool.hpp | 20 ++------ 2 files changed, 31 insertions(+), 41 deletions(-) diff --git a/include/exec/__detail/__bwos_lifo_queue.hpp b/include/exec/__detail/__bwos_lifo_queue.hpp index 20fdf88e5..3672d1e46 100644 --- a/include/exec/__detail/__bwos_lifo_queue.hpp +++ b/include/exec/__detail/__bwos_lifo_queue.hpp @@ -16,11 +16,12 @@ */ #pragma once +#include "../../stdexec/__detail/__config.hpp" + #include #include #include #include -#include #include #include @@ -108,9 +109,8 @@ namespace exec::bwos { bool push_back(Tp value) noexcept; - template - requires std::convertible_to, Tp> - std::ranges::iterator_t push_back(Range &&range) noexcept; + template + Iterator push_back(Iterator first, Sentinel last) noexcept; std::size_t get_available_capacity() const noexcept; std::size_t get_free_capacity() const noexcept; @@ -133,8 +133,8 @@ namespace exec::bwos { lifo_queue_error_code put(Tp value) noexcept; - template - std::ranges::iterator_t bulk_put(Range &&range) noexcept; + template + Iterator bulk_put(Iterator first, Sentinel last) noexcept; fetch_result get() noexcept; @@ -237,17 +237,14 @@ namespace exec::bwos { } template - template - requires std::convertible_to, Tp> - std::ranges::iterator_t lifo_queue::push_back(Range &&range) noexcept { - auto subrange = std::ranges::subrange(range); + template + Iterator lifo_queue::push_back(Iterator first, Sentinel last) noexcept { do { std::size_t owner_index = owner_block_.load(std::memory_order_relaxed) & mask_; block_type ¤t_block = blocks_[owner_index]; - auto it = current_block.bulk_put(subrange); - subrange = std::ranges::subrange(it, subrange.end()); - } while (!std::ranges::empty(subrange) && advance_put_index()); - return subrange.begin(); + first = current_block.bulk_put(first, last); + } while (first != last && advance_put_index()); + return first; } template @@ -300,6 +297,11 @@ namespace exec::bwos { bool lifo_queue::advance_put_index() noexcept { std::size_t owner_counter = owner_block_.load(std::memory_order_relaxed); std::size_t next_counter = owner_counter + 1ul; + std::size_t thief_counter = thief_block_.load(std::memory_order_relaxed); + STDEXEC_ASSERT(thief_counter < next_counter); + if (next_counter - thief_counter >= blocks_.size()) { + return false; + } std::size_t next_index = next_counter & mask_; block_type &next_block = blocks_[next_index]; if (!next_block.is_writable()) [[unlikely]] { @@ -331,9 +333,9 @@ namespace exec::bwos { template lifo_queue::block_type::block_type(std::size_t block_size, Allocator allocator) - : head_{block_size} - , tail_{block_size} - , steal_head_{block_size} + : head_{0} + , tail_{0} + , steal_head_{0} , steal_tail_{block_size} , ring_buffer_(block_size, allocator) { } @@ -390,19 +392,17 @@ namespace exec::bwos { } template - template - std::ranges::iterator_t - lifo_queue::block_type::bulk_put(Range &&range) noexcept { + template + Iterator + lifo_queue::block_type::bulk_put(Iterator first, Sentinel last) noexcept { std::uint64_t back = tail_.load(std::memory_order_relaxed); - auto it = std::ranges::begin(range); - auto last = std::ranges::end(range); - while (it != last && back < block_size()) { - ring_buffer_[back] = static_cast(*it); + while (first != last && back < block_size()) { + ring_buffer_[back] = static_cast(*first); ++back; - ++it; + ++first; } tail_.store(back, std::memory_order_relaxed); - return it; + return first; } template diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index fbf09c946..cb4a929c2 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -267,6 +267,7 @@ namespace exec { void index(std::uint32_t value) { index_ = value; } + std::uint32_t index() const noexcept { return index_; } @@ -282,7 +283,7 @@ namespace exec { std::atomic wakeup_{false}; std::atomic stopRequested_{false}; std::vector victims_{}; - std::uint32_t index_; + std::uint32_t index_{}; }; void run(std::uint32_t index) noexcept; @@ -313,10 +314,10 @@ namespace exec { threadStates_[index].index(index); } std::vector victims{}; - for (thread_state& state : threadStates_) { + for (thread_state& state: threadStates_) { victims.emplace_back(state.as_victim()); } - for (thread_state& state : threadStates_) { + for (thread_state& state: threadStates_) { state.victims(victims); } threads_.reserve(threadCount); @@ -366,17 +367,6 @@ namespace exec { const std::uint32_t threadCount = static_cast(threads_.size()); const std::uint32_t startIndex = nextThread_.fetch_add(1, std::memory_order_relaxed) % threadCount; - - // First try to enqueue to one of the threads without blocking. - for (std::uint32_t i = 0; i < threadCount; ++i) { - const auto index = - (startIndex + i) < threadCount ? (startIndex + i) : (startIndex + i - threadCount); - if (threadStates_[index].try_push(task)) { - return; - } - } - - // Otherwise, do a blocking enqueue on the selected thread. threadStates_[startIndex].push(task); } @@ -390,7 +380,7 @@ namespace exec { inline void move_pending_to_local( __intrusive_queue<&task_base::next>& pending_queue, bwos::lifo_queue& local_queue) { - auto last = local_queue.push_back(std::views::all(pending_queue)); + auto last = local_queue.push_back(pending_queue.begin(), pending_queue.end()); __intrusive_queue<&task_base::next> tmp{}; tmp.splice(tmp.begin(), pending_queue, pending_queue.begin(), last); tmp.clear();