Skip to content

Commit

Permalink
Dont use ranges and fix advance_put_index
Browse files Browse the repository at this point in the history
  • Loading branch information
maikel committed Oct 24, 2023
1 parent 03364ba commit 389a661
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 41 deletions.
52 changes: 26 additions & 26 deletions include/exec/__detail/__bwos_lifo_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
*/
#pragma once

#include "../../stdexec/__detail/__config.hpp"

#include <atomic>
#include <bit>
#include <memory>
#include <new>
#include <ranges>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -108,9 +109,8 @@ namespace exec::bwos {

bool push_back(Tp value) noexcept;

template <std::ranges::forward_range Range>
requires std::convertible_to<std::ranges::range_value_t<Range>, Tp>
std::ranges::iterator_t<Range> push_back(Range &&range) noexcept;
template <class Iterator, class Sentinel>
Iterator push_back(Iterator first, Sentinel last) noexcept;

std::size_t get_available_capacity() const noexcept;
std::size_t get_free_capacity() const noexcept;
Expand All @@ -133,8 +133,8 @@ namespace exec::bwos {

lifo_queue_error_code put(Tp value) noexcept;

template <class Range>
std::ranges::iterator_t<Range> bulk_put(Range &&range) noexcept;
template <class Iterator, class Sentinel>
Iterator bulk_put(Iterator first, Sentinel last) noexcept;

fetch_result<Tp> get() noexcept;

Expand Down Expand Up @@ -237,17 +237,14 @@ namespace exec::bwos {
}

template <class Tp, class Allocator>
template <std::ranges::forward_range Range>
requires std::convertible_to<std::ranges::range_value_t<Range>, Tp>
std::ranges::iterator_t<Range> lifo_queue<Tp, Allocator>::push_back(Range &&range) noexcept {
auto subrange = std::ranges::subrange(range);
template <class Iterator, class Sentinel>
Iterator lifo_queue<Tp, Allocator>::push_back(Iterator first, Sentinel last) noexcept {
do {
std::size_t owner_index = owner_block_.load(std::memory_order_relaxed) & mask_;
block_type &current_block = blocks_[owner_index];
auto it = current_block.bulk_put(subrange);
subrange = std::ranges::subrange(it, subrange.end());
} while (!std::ranges::empty(subrange) && advance_put_index());
return subrange.begin();
first = current_block.bulk_put(first, last);
} while (first != last && advance_put_index());
return first;
}

template <class Tp, class Allocator>
Expand Down Expand Up @@ -300,6 +297,11 @@ namespace exec::bwos {
bool lifo_queue<Tp, Allocator>::advance_put_index() noexcept {
std::size_t owner_counter = owner_block_.load(std::memory_order_relaxed);
std::size_t next_counter = owner_counter + 1ul;
std::size_t thief_counter = thief_block_.load(std::memory_order_relaxed);
STDEXEC_ASSERT(thief_counter < next_counter);
if (next_counter - thief_counter >= blocks_.size()) {
return false;
}
std::size_t next_index = next_counter & mask_;
block_type &next_block = blocks_[next_index];
if (!next_block.is_writable()) [[unlikely]] {
Expand Down Expand Up @@ -331,9 +333,9 @@ namespace exec::bwos {

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type::block_type(std::size_t block_size, Allocator allocator)
: head_{block_size}
, tail_{block_size}
, steal_head_{block_size}
: head_{0}
, tail_{0}
, steal_head_{0}
, steal_tail_{block_size}
, ring_buffer_(block_size, allocator) {
}
Expand Down Expand Up @@ -390,19 +392,17 @@ namespace exec::bwos {
}

template <class Tp, class Allocator>
template <class Range>
std::ranges::iterator_t<Range>
lifo_queue<Tp, Allocator>::block_type::bulk_put(Range &&range) noexcept {
template <class Iterator, class Sentinel>
Iterator
lifo_queue<Tp, Allocator>::block_type::bulk_put(Iterator first, Sentinel last) noexcept {
std::uint64_t back = tail_.load(std::memory_order_relaxed);
auto it = std::ranges::begin(range);
auto last = std::ranges::end(range);
while (it != last && back < block_size()) {
ring_buffer_[back] = static_cast<Tp &&>(*it);
while (first != last && back < block_size()) {
ring_buffer_[back] = static_cast<Tp &&>(*first);
++back;
++it;
++first;
}
tail_.store(back, std::memory_order_relaxed);
return it;
return first;
}

template <class Tp, class Allocator>
Expand Down
20 changes: 5 additions & 15 deletions include/exec/static_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ namespace exec {
void index(std::uint32_t value) {
index_ = value;
}

std::uint32_t index() const noexcept {
return index_;
}
Expand All @@ -282,7 +283,7 @@ namespace exec {
std::atomic<bool> wakeup_{false};
std::atomic<bool> stopRequested_{false};
std::vector<workstealing_victim> victims_{};
std::uint32_t index_;
std::uint32_t index_{};
};

void run(std::uint32_t index) noexcept;
Expand Down Expand Up @@ -313,10 +314,10 @@ namespace exec {
threadStates_[index].index(index);
}
std::vector<workstealing_victim> victims{};
for (thread_state& state : threadStates_) {
for (thread_state& state: threadStates_) {
victims.emplace_back(state.as_victim());
}
for (thread_state& state : threadStates_) {
for (thread_state& state: threadStates_) {
state.victims(victims);
}
threads_.reserve(threadCount);
Expand Down Expand Up @@ -366,17 +367,6 @@ namespace exec {
const std::uint32_t threadCount = static_cast<std::uint32_t>(threads_.size());
const std::uint32_t startIndex =
nextThread_.fetch_add(1, std::memory_order_relaxed) % threadCount;

// First try to enqueue to one of the threads without blocking.
for (std::uint32_t i = 0; i < threadCount; ++i) {
const auto index =
(startIndex + i) < threadCount ? (startIndex + i) : (startIndex + i - threadCount);
if (threadStates_[index].try_push(task)) {
return;
}
}

// Otherwise, do a blocking enqueue on the selected thread.
threadStates_[startIndex].push(task);
}

Expand All @@ -390,7 +380,7 @@ namespace exec {
inline void move_pending_to_local(
__intrusive_queue<&task_base::next>& pending_queue,
bwos::lifo_queue<task_base*>& local_queue) {
auto last = local_queue.push_back(std::views::all(pending_queue));
auto last = local_queue.push_back(pending_queue.begin(), pending_queue.end());
__intrusive_queue<&task_base::next> tmp{};
tmp.splice(tmp.begin(), pending_queue, pending_queue.begin(), last);
tmp.clear();
Expand Down

0 comments on commit 389a661

Please sign in to comment.