Skip to content

Commit

Permalink
Add test cases for busy queues
Browse files Browse the repository at this point in the history
  • Loading branch information
GabTux committed Mar 27, 2024
1 parent 88b321d commit 9ebd7ed
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 95 deletions.
3 changes: 3 additions & 0 deletions include/ppqsort/parallel/cpp/task_stack.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ namespace ppqsort::impl::cpp {
}

private:
FRIEND_TEST(testThreadPool, PushBusyQueues);
FRIEND_TEST(testThreadPool, PopBusyQueues);

std::vector<taskType> stack_;
std::mutex mutex_;
};
Expand Down
3 changes: 3 additions & 0 deletions include/ppqsort/parallel/cpp/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ namespace ppqsort::impl::cpp {
}

private:
FRIEND_TEST(testThreadPool, PushBusyQueues);
FRIEND_TEST(testThreadPool, PopBusyQueues);


bool get_next_task(const unsigned int id) {
bool found = false;
Expand Down
246 changes: 151 additions & 95 deletions test/source/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,108 +7,164 @@
#include <future>
#include "ppqsort/parallel/cpp/thread_pool.h"

TEST(ThreadPool, TryPushConcurrent) {
using namespace ppqsort::impl::cpp;
ThreadPool pool(2);
std::atomic<int> counter = 0;
namespace ppqsort::impl::cpp {
TEST(testThreadPool, TryPushConcurrent) {
using namespace ppqsort::impl::cpp;
ThreadPool pool(2);
std::atomic<int> counter = 0;

auto task = [&]() { pool.push_task([&]() {++counter;}); };

// Push tasks concurrently in loop and wait for them to finish
std::vector<std::future<void>> futures;
for (int i = 0; i < 1000; ++i) {
futures.push_back(std::async(std::launch::async, task));
}
for (auto& f : futures) {
f.wait();
}

pool.wait_and_stop();
ASSERT_EQ(counter, 1000);
}


TEST(testThreadPool, OneThread) {
using namespace ppqsort::impl::cpp;
ThreadPool pool(1);
std::atomic<int> counter = 0;

auto task = [&]() { pool.push_task([&]() {counter++;}); };

std::vector<std::future<void>> futures;
for (int i = 0; i < 1000; ++i) {
futures.push_back(std::async(std::launch::async, task));
}
for (auto& f : futures) {
f.wait();
}

pool.wait_and_stop();
ASSERT_EQ(counter, 1000);
}

TEST(testThreadPool, StopEmpty) {
using namespace ppqsort::impl::cpp;
ThreadPool pool;
}

TEST(testThreadPool, PrematureExit) {
using namespace ppqsort::impl::cpp;
ThreadPool<> testPool(2);

std::thread::id id_task_1, id_end;

auto end = [&]() {
id_end = std::this_thread::get_id();
};

auto task_2 = [&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
testPool.push_task(end);
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
};

auto task_1 = [&]() {
id_task_1 = std::this_thread::get_id();
testPool.push_task(task_2);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
};

// task_1 pushes task_2 and sleeps, so both threads are busy and no tasks are in queue
// task_1 finishes, no tasks in queue, but task_2 is still running --> task_1 must not exit
// task_2 pushes end_task and sleeps, so the first thread should execute the end_task
// in some other implementations, the first thread was stopped before task_2 finishes

auto task = [&]() { pool.push_task([&]() {++counter;}); };
testPool.push_task(task_1);
testPool.wait_and_stop();
ASSERT_EQ(id_task_1, id_end);
}


TEST(testThreadPool, PushBusyQueues) {
ThreadPool pool(2);
std::atomic<int> counter{0};

pool.threads_queues_[0].mutex_.lock();
pool.threads_queues_[1].mutex_.lock();

// Push tasks concurrently in loop and wait for them to finish
std::vector<std::future<void>> futures;
for (int i = 0; i < 1000; ++i) {
futures.push_back(std::async(std::launch::async, task));
// simulate busy queues for some time
std::jthread t1([&](){
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
pool.threads_queues_[0].mutex_.unlock();
pool.threads_queues_[1].mutex_.unlock();
});

// try to push, should fallback to waiting for the first queue
pool.push_task([&](){ ++counter; });
pool.wait_and_stop();

ASSERT_EQ(counter, 1);
}
for (auto& f : futures) {
f.wait();
}

pool.wait_and_stop();
ASSERT_EQ(counter, 1000);
}
TEST(testThreadPool, PopBusyQueues) {
ThreadPool pool(2);
std::atomic<int> counter{0};

// push to all queues manually to avoid waking up the threads
pool.threads_queues_[0].stack_.emplace_back([&](){ ++counter; });
pool.threads_queues_[1].stack_.emplace_back([&](){ ++counter; });
pool.total_tasks_ = 2;
pool.pending_tasks_ = 2;

// lock all queues
pool.threads_queues_[0].mutex_.lock();
pool.threads_queues_[1].mutex_.lock();

// simulate busy queues for some time
std::jthread t1([&](){
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
pool.threads_queues_[0].mutex_.unlock();
pool.threads_queues_[1].mutex_.unlock();
});

TEST(ThreadPool, OneThread) {
using namespace ppqsort::impl::cpp;
ThreadPool pool(1);
std::atomic<int> counter = 0;
// now push normally to wake up the threads
pool.push_task([&](){ ++counter; });
pool.wait_and_stop();

auto task = [&]() { pool.push_task([&]() {counter++;}); };
ASSERT_EQ(counter, 3);
}


/****************************************************
* TaskStack Tests
****************************************************/

TEST(testTaskStack, PushPop) {
using namespace ppqsort::impl::cpp;
TaskStack<int> stack;
stack.push(1);
stack.push(2);
stack.push(3);
ASSERT_EQ(stack.pop().value(), 3);
ASSERT_EQ(stack.pop().value(), 2);
ASSERT_EQ(stack.pop().value(), 1);
}

TEST(testTaskStack, PopEmpty) {
using namespace ppqsort::impl::cpp;
TaskStack<int> stack;
ASSERT_EQ(stack.pop().has_value(), false);
}

std::vector<std::future<void>> futures;
for (int i = 0; i < 1000; ++i) {
futures.push_back(std::async(std::launch::async, task));
TEST(testTaskStack, tryPushPop) {
using namespace ppqsort::impl::cpp;
TaskStack<int> stack;
stack.try_push(1);
stack.try_push(2);
stack.try_push(3);
ASSERT_EQ(stack.try_pop().value(), 3);
ASSERT_EQ(stack.try_pop().value(), 2);
ASSERT_EQ(stack.try_pop().value(), 1);
}
for (auto& f : futures) {
f.wait();
}

pool.wait_and_stop();
ASSERT_EQ(counter, 1000);
}

TEST(ThreadPool, StopEmpty) {
using namespace ppqsort::impl::cpp;
ThreadPool pool;
}

TEST(ThreadPool, PrematureExit) {
using namespace ppqsort::impl::cpp;
ThreadPool<> testPool(2);

std::thread::id id_task_1, id_task_2, id_task_3, id_end;

auto end = [&]() {
id_end = std::this_thread::get_id();
};

auto task_2 = [&]() {
id_task_2 = std::this_thread::get_id();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
testPool.push_task(end);
std::this_thread::sleep_for(std::chrono::milliseconds(5000));
};

auto task_1 = [&]() {
id_task_1 = std::this_thread::get_id();
testPool.push_task(task_2);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
};

// task_1 pushes task_2 and sleeps, so both threads are busy and no tasks are in queue
// task_1 finishes, no tasks in queue, but task_2 is still running --> task_1 must not exit
// task_2 pushes end_task and sleeps, so the first thread should execute the end_task
// in some other implementations, the first thread was stopped before task_2 finishes

testPool.push_task(task_1);
testPool.wait_and_stop();
ASSERT_EQ(id_task_1, id_end);
}

TEST(TaskStack, PushPop) {
using namespace ppqsort::impl::cpp;
TaskStack<int> stack;
stack.push(1);
stack.push(2);
stack.push(3);
ASSERT_EQ(stack.pop().value(), 3);
ASSERT_EQ(stack.pop().value(), 2);
ASSERT_EQ(stack.pop().value(), 1);
}

TEST(TaskStack, PopEmpty) {
using namespace ppqsort::impl::cpp;
TaskStack<int> stack;
ASSERT_EQ(stack.pop().has_value(), false);
}

TEST(TaskStack, tryPushPop) {
using namespace ppqsort::impl::cpp;
TaskStack<int> stack;
stack.try_push(1);
stack.try_push(2);
stack.try_push(3);
ASSERT_EQ(stack.try_pop().value(), 3);
ASSERT_EQ(stack.try_pop().value(), 2);
ASSERT_EQ(stack.try_pop().value(), 1);
}

0 comments on commit 9ebd7ed

Please sign in to comment.