Skip to content

Commit

Permalink
lifo queue with some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maikel committed Oct 23, 2023
1 parent a927300 commit bff157c
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 63 deletions.
154 changes: 92 additions & 62 deletions include/exec/__detail/__bwos_lifo_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ namespace exec::bwos {
}
#elif defined(__arm__) || defined(__aarch64__) || defined(_M_ARM64)
namespace exec::bwos {
static inline void spin_loop_pause() noexcept {
static inline void spin_loop_pause() noexcept {
#if ( \
defined(__ARM_ARCH_6K__) || defined(__ARM_ARCH_6Z__) || defined(__ARM_ARCH_6ZK__) \
|| defined(__ARM_ARCH_6T2__) || defined(__ARM_ARCH_7__) || defined(__ARM_ARCH_7A__) \
|| defined(__ARM_ARCH_7R__) || defined(__ARM_ARCH_7M__) || defined(__ARM_ARCH_7S__) \
|| defined(__ARM_ARCH_8A__) || defined(__aarch64__))
asm volatile ("yield" ::: "memory");
asm volatile("yield" ::: "memory");
#elif defined(_M_ARM64)
__yield();
#else
asm volatile ("nop" ::: "memory");
asm volatile("nop" ::: "memory");
#endif
}
}
}
#else
namespace exec::bwos {
Expand Down Expand Up @@ -93,6 +93,21 @@ namespace exec::bwos {
std::size_t back;
};

template <class Allocator>
struct alloc_deleter {
using pointer = typename std::allocator_traits<Allocator>::pointer;

[[no_unique_address]] Allocator allocator_;
std::size_t block_size_;

void operator()(pointer ptr) noexcept {
for (std::size_t i = 0; i < block_size_; ++i) {
std::allocator_traits<Allocator>::destroy(allocator_, ptr + i);
}
std::allocator_traits<Allocator>::deallocate(allocator_, ptr, block_size_);
}
};

template <class Tp, class Allocator = std::allocator<Tp>>
class lifo_queue {
public:
Expand All @@ -101,11 +116,11 @@ namespace exec::bwos {
std::size_t block_size,
Allocator allocator = Allocator());

Tp get() noexcept;
Tp pop_back() noexcept;

Tp steal() noexcept;
Tp steal_front() noexcept;

bool put(Tp value) noexcept;
bool push_back(Tp value) noexcept;

std::size_t get_available_capacity() const noexcept;

Expand All @@ -116,19 +131,10 @@ namespace exec::bwos {
using allocator_of_t = typename std::allocator_traits<Allocator>::template rebind_alloc<Sp>;

struct block_type {
struct alloc_deleter {
[[no_unique_address]] Allocator allocator_;
std::size_t block_size_;

void operator()(Tp *ptr) noexcept {
std::allocator_traits<Allocator>::deallocate(allocator_, ptr, block_size_);
}
};

explicit block_type(std::size_t block_size, Allocator allocator = Allocator());

block_type(const block_type &) = delete;
block_type &operator=(const block_type &) = delete;
block_type(const block_type &);
block_type &operator=(const block_type &);

block_type(block_type &&) noexcept;
block_type &operator=(block_type &&) noexcept;
Expand All @@ -155,7 +161,7 @@ namespace exec::bwos {
alignas(hardware_destructive_interference_size) std::atomic<std::uint64_t> tail_{};
alignas(hardware_destructive_interference_size) std::atomic<std::uint64_t> steal_head_{};
alignas(hardware_destructive_interference_size) std::atomic<std::uint64_t> steal_tail_{};
std::unique_ptr<Tp[], alloc_deleter> ring_buffer_{};
std::vector<Tp, Allocator> ring_buffer_{};
};

bool advance_get_index() noexcept;
Expand All @@ -176,13 +182,15 @@ namespace exec::bwos {
std::size_t num_blocks,
std::size_t block_size,
Allocator allocator)
: blocks_(std::bit_ceil(num_blocks), block_type(block_size, allocator))
, mask_(blocks_.size() - 1)
{
}
: blocks_(
std::max(2ul, std::bit_ceil(num_blocks)),
block_type(block_size, allocator),
allocator_of_t<block_type>(allocator))
, mask_(blocks_.size() - 1) {
}

template <class Tp, class Allocator>
Tp lifo_queue<Tp, Allocator>::get() noexcept {
Tp lifo_queue<Tp, Allocator>::pop_back() noexcept {
do {
std::size_t owner_index = owner_block_.load(std::memory_order_relaxed) & mask_;
block_type &current_block = blocks_[owner_index];
Expand All @@ -198,7 +206,7 @@ namespace exec::bwos {
}

template <class Tp, class Allocator>
Tp lifo_queue<Tp, Allocator>::steal() noexcept {
Tp lifo_queue<Tp, Allocator>::steal_front() noexcept {
std::size_t thief = 0;
do {
thief = thief_block_.load(std::memory_order_relaxed);
Expand All @@ -219,7 +227,7 @@ namespace exec::bwos {
}

template <class Tp, class Allocator>
bool lifo_queue<Tp, Allocator>::put(Tp value) noexcept {
bool lifo_queue<Tp, Allocator>::push_back(Tp value) noexcept {
do {
std::size_t owner_index = owner_block_.load(std::memory_order_relaxed) & mask_;
block_type &current_block = blocks_[owner_index];
Expand All @@ -245,7 +253,7 @@ namespace exec::bwos {

template <class Tp, class Allocator>
std::size_t lifo_queue<Tp, Allocator>::get_block_size() const noexcept {
return blocks_[0].block_size_;
return blocks_[0].block_size();
}

template <class Tp, class Allocator>
Expand Down Expand Up @@ -306,43 +314,60 @@ namespace exec::bwos {
// Implementation of lifo_queue::block_type member methods

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type::block_type(
std::size_t block_size, Allocator allocator)
lifo_queue<Tp, Allocator>::block_type::block_type(std::size_t block_size, Allocator allocator)
: steal_tail_{block_size}
, ring_buffer_{std::make_unique<Tp[]>(block_size, alloc_deleter{allocator, block_size})} {
, ring_buffer_(block_size, allocator) {
}

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type::block_type(const block_type &other) {
head_.store(other.head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_.store(other.tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_tail_.store(other.steal_tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_head_.store(other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = other.ring_buffer_;
}

template <class Tp, class Allocator>
typename lifo_queue<Tp, Allocator>::block_type &
lifo_queue<Tp, Allocator>::block_type::operator=(const block_type &other) {
head_.store(other.head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_.store(other.tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_tail_.store(other.steal_tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_head_.store(other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = other.ring_buffer_;
return *this;
}

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type::block_type(block_type &&other) noexcept {
head_.store(other.head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_.store(other.tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_tail_.store(other.steal_tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_head_.store(
other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = std::exchange(other.ring_buffer_, nullptr);
steal_head_.store(other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = std::exchange(std::move(other.ring_buffer_), {});
}

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type &
lifo_queue<Tp, Allocator>::block_type::operator=(block_type &&other) noexcept {
typename lifo_queue<Tp, Allocator>::block_type &
lifo_queue<Tp, Allocator>::block_type::operator=(block_type &&other) noexcept {
head_.store(other.head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_.store(other.tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_tail_.store(other.steal_tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_head_.store(
other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = std::exchange(other.ring_buffer_, nullptr);
steal_head_.store(other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = std::exchange(std::move(other.ring_buffer_), {});
return *this;
}

template <class Tp, class Allocator>
lifo_queue_error_code lifo_queue<Tp, Allocator>::block_type::put(Tp value) noexcept {
std::uint64_t back = tail_.load(std::memory_order_relaxed);
if (back == block_size()) {
return lifo_queue_error_code::full;
if (back < block_size()) [[likely]] {
ring_buffer_[back] = static_cast<Tp &&>(value);
tail_.store(back + 1, std::memory_order_release);
return lifo_queue_error_code::success;
}
ring_buffer_[back] = value;
tail_.store(back + 1, std::memory_order_release);
return lifo_queue_error_code::success;
return lifo_queue_error_code::full;
}

template <class Tp, class Allocator>
Expand All @@ -356,35 +381,40 @@ namespace exec::bwos {
return {lifo_queue_error_code::empty, nullptr};
}
tail_.store(back - 1, std::memory_order_relaxed);
return {lifo_queue_error_code::success, ring_buffer_[back - 1]};
return {lifo_queue_error_code::success, static_cast<Tp &&>(ring_buffer_[back - 1])};
}

template <class Tp, class Allocator>
fetch_result<Tp> lifo_queue<Tp, Allocator>::block_type::steal() noexcept {
std::uint64_t steal = steal_tail_.load(std::memory_order_relaxed);
if (steal == block_size()) {
return {lifo_queue_error_code::done, nullptr};
std::uint64_t spos = steal_tail_.load(std::memory_order_relaxed);
fetch_result<Tp> result{};
if (spos == block_size()) [[unlikely]] {
result.status = lifo_queue_error_code::done;
return result;
}
std::uint64_t back = tail_.load(std::memory_order_relaxed);
if (back == steal) {
return {lifo_queue_error_code::empty, nullptr};
if (spos == back) [[unlikely]] {
result.status = lifo_queue_error_code::empty;
return result;
}
if (!steal_tail_.compare_exchange_strong(steal, steal + 1, std::memory_order_relaxed)) {
return {lifo_queue_error_code::conflict, nullptr};
if (!steal_tail_.compare_exchange_strong(spos, spos + 1, std::memory_order_relaxed)) {
result.status = lifo_queue_error_code::conflict;
return result;
}
void *value = ring_buffer_[steal];
steal_head_.fetch_add(1, std::memory_order_relaxed);
return {lifo_queue_error_code::success, value};
result.value = static_cast<Tp &&>(ring_buffer_[spos]);
steal_head_.fetch_add(1, std::memory_order_release);
result.status = lifo_queue_error_code::success;
return result;
}

template <class Tp, class Allocator>
takeover_result lifo_queue<Tp, Allocator>::block_type::takeover() noexcept {
std::uint64_t sPos = steal_tail_.exchange(block_size(), std::memory_order_relaxed);
if (sPos == block_size()) [[unlikely]] {
std::uint64_t spos = steal_tail_.exchange(block_size(), std::memory_order_relaxed);
if (spos == block_size()) [[unlikely]] {
return {head_.load(std::memory_order_relaxed), tail_.load(std::memory_order_relaxed)};
}
head_.store(sPos, std::memory_order_relaxed);
return {sPos, tail_.load(std::memory_order_relaxed)};
head_.store(spos, std::memory_order_relaxed);
return {spos, tail_.load(std::memory_order_relaxed)};
}

template <class Tp, class Allocator>
Expand All @@ -394,9 +424,9 @@ namespace exec::bwos {
}

template <class Tp, class Allocator>
bool lifo_queue<Tp, Allocator>::block_type::reclaim(std::size_t expectedPos) noexcept {
std::uint64_t sCnt = steal_head_.load(std::memory_order_relaxed);
if (expectedPos == sCnt) {
bool lifo_queue<Tp, Allocator>::block_type::reclaim(std::size_t expected_steals) noexcept {
std::uint64_t steal_counter = steal_head_.load(std::memory_order_acquire);
if (expected_steals == steal_counter) {
head_.store(0, std::memory_order_relaxed);
tail_.store(0, std::memory_order_relaxed);
steal_tail_.store(block_size(), std::memory_order_relaxed);
Expand All @@ -408,7 +438,7 @@ namespace exec::bwos {

template <class Tp, class Allocator>
std::size_t lifo_queue<Tp, Allocator>::block_type::block_size() const noexcept {
return ring_buffer_.get_deleter().block_size_;
return ring_buffer_.size();
}

template <class Tp, class Allocator>
Expand Down
64 changes: 63 additions & 1 deletion test/exec/test_bwos_lifo_queue.cpp
Original file line number Diff line number Diff line change
@@ -1 +1,63 @@
#include "exec/__detail/__bwos_lifo_queue.hpp"
#include "exec/__detail/__bwos_lifo_queue.hpp"

#include <catch2/catch.hpp>

TEST_CASE("exec::bwos::lifo_queue - ", "[bwos]") {
exec::bwos::lifo_queue<int*> queue(8, 2);
int x = 1;
int y = 2;
SECTION("Observers") {
CHECK(queue.get_block_size() == 2);
}
SECTION("Empty Get") {
CHECK(queue.pop_back() == nullptr);
}
SECTION("Empty Steal") {
CHECK(queue.steal_front() == nullptr);
}
SECTION("Put one, get one") {
CHECK(queue.push_back(&x));
CHECK(queue.pop_back() == &x);
CHECK(queue.pop_back() == nullptr);
}
SECTION("Put one, steal none") {
CHECK(queue.push_back(&x));
CHECK(queue.steal_front() == nullptr);
CHECK(queue.pop_back() == &x);
}
SECTION("Put one, get one, put one, get one") {
CHECK(queue.push_back(&x));
CHECK(queue.pop_back() == &x);
CHECK(queue.push_back(&y));
CHECK(queue.pop_back() == &y);
CHECK(queue.pop_back() == nullptr);
}
SECTION("Put two, get two") {
CHECK(queue.push_back(&x));
CHECK(queue.push_back(&y));
CHECK(queue.pop_back() == &y);
CHECK(queue.pop_back() == &x);
CHECK(queue.pop_back() == nullptr);
}
SECTION("Put three, Steal two") {
CHECK(queue.push_back(&x));
CHECK(queue.push_back(&y));
CHECK(queue.push_back(&x));
CHECK(queue.steal_front() == &x);
CHECK(queue.steal_front() == &y);
CHECK(queue.steal_front() == nullptr);
CHECK(queue.pop_back() == &x);
CHECK(queue.pop_back() == nullptr);
}
SECTION("Put 4, Steal 1, Get 3") {
CHECK(queue.push_back(&x));
CHECK(queue.push_back(&y));
CHECK(queue.push_back(&x));
CHECK(queue.push_back(&y));
CHECK(queue.steal_front() == &x);
CHECK(queue.pop_back() == &y);
CHECK(queue.pop_back() == &x);
CHECK(queue.pop_back() == &y);
CHECK(queue.pop_back() == nullptr);
}
}

0 comments on commit bff157c

Please sign in to comment.