diff --git a/include/ppqsort/parallel/cpp/task_stack.h b/include/ppqsort/parallel/cpp/task_stack.h index 8c3c531..536cc3f 100644 --- a/include/ppqsort/parallel/cpp/task_stack.h +++ b/include/ppqsort/parallel/cpp/task_stack.h @@ -48,6 +48,9 @@ namespace ppqsort::impl::cpp { } private: + FRIEND_TEST(testThreadPool, PushBusyQueues); + FRIEND_TEST(testThreadPool, PopBusyQueues); + std::vector stack_; std::mutex mutex_; }; diff --git a/include/ppqsort/parallel/cpp/thread_pool.h b/include/ppqsort/parallel/cpp/thread_pool.h index 790ae3b..e4dd4c8 100644 --- a/include/ppqsort/parallel/cpp/thread_pool.h +++ b/include/ppqsort/parallel/cpp/thread_pool.h @@ -78,6 +78,9 @@ namespace ppqsort::impl::cpp { } private: + FRIEND_TEST(testThreadPool, PushBusyQueues); + FRIEND_TEST(testThreadPool, PopBusyQueues); + bool get_next_task(const unsigned int id) { bool found = false; diff --git a/test/source/thread_pool.cpp b/test/source/thread_pool.cpp index 013bc74..83a1fd6 100644 --- a/test/source/thread_pool.cpp +++ b/test/source/thread_pool.cpp @@ -7,108 +7,164 @@ #include #include "ppqsort/parallel/cpp/thread_pool.h" -TEST(ThreadPool, TryPushConcurrent) { - using namespace ppqsort::impl::cpp; - ThreadPool pool(2); - std::atomic counter = 0; +namespace ppqsort::impl::cpp { + TEST(testThreadPool, TryPushConcurrent) { + using namespace ppqsort::impl::cpp; + ThreadPool pool(2); + std::atomic counter = 0; + + auto task = [&]() { pool.push_task([&]() {++counter;}); }; + + // Push tasks concurrently in loop and wait for them to finish + std::vector> futures; + for (int i = 0; i < 1000; ++i) { + futures.push_back(std::async(std::launch::async, task)); + } + for (auto& f : futures) { + f.wait(); + } + + pool.wait_and_stop(); + ASSERT_EQ(counter, 1000); + } + + + TEST(testThreadPool, OneThread) { + using namespace ppqsort::impl::cpp; + ThreadPool pool(1); + std::atomic counter = 0; + + auto task = [&]() { pool.push_task([&]() {counter++;}); }; + + std::vector> futures; + for (int i = 0; i < 1000; ++i) { + futures.push_back(std::async(std::launch::async, task)); + } + for (auto& f : futures) { + f.wait(); + } + + pool.wait_and_stop(); + ASSERT_EQ(counter, 1000); + } + + TEST(testThreadPool, StopEmpty) { + using namespace ppqsort::impl::cpp; + ThreadPool pool; + } + + TEST(testThreadPool, PrematureExit) { + using namespace ppqsort::impl::cpp; + ThreadPool<> testPool(2); + + std::thread::id id_task_1, id_end; + + auto end = [&]() { + id_end = std::this_thread::get_id(); + }; + + auto task_2 = [&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + testPool.push_task(end); + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + }; + + auto task_1 = [&]() { + id_task_1 = std::this_thread::get_id(); + testPool.push_task(task_2); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + }; + + // task_1 pushes task_2 and sleeps, so both threads are busy and no tasks are in queue + // task_1 finishes, no tasks in queue, but task_2 is still running --> task_1 must not exit + // task_2 pushes end_task and sleeps, so the first thread should execute the end_task + // in some other implementations, the first thread was stopped before task_2 finishes - auto task = [&]() { pool.push_task([&]() {++counter;}); }; + testPool.push_task(task_1); + testPool.wait_and_stop(); + ASSERT_EQ(id_task_1, id_end); + } + + + TEST(testThreadPool, PushBusyQueues) { + ThreadPool pool(2); + std::atomic counter{0}; + + pool.threads_queues_[0].mutex_.lock(); + pool.threads_queues_[1].mutex_.lock(); - // Push tasks concurrently in loop and wait for them to finish - std::vector> futures; - for (int i = 0; i < 1000; ++i) { - futures.push_back(std::async(std::launch::async, task)); + // simulate busy queues for some time + std::jthread t1([&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + pool.threads_queues_[0].mutex_.unlock(); + pool.threads_queues_[1].mutex_.unlock(); + }); + + // try to push, should fallback to waiting for the first queue + pool.push_task([&](){ ++counter; }); + pool.wait_and_stop(); + + ASSERT_EQ(counter, 1); } - for (auto& f : futures) { - f.wait(); - } - pool.wait_and_stop(); - ASSERT_EQ(counter, 1000); -} + TEST(testThreadPool, PopBusyQueues) { + ThreadPool pool(2); + std::atomic counter{0}; + + // push to all queues manually to avoid waking up the threads + pool.threads_queues_[0].stack_.emplace_back([&](){ ++counter; }); + pool.threads_queues_[1].stack_.emplace_back([&](){ ++counter; }); + pool.total_tasks_ = 2; + pool.pending_tasks_ = 2; + + // lock all queues + pool.threads_queues_[0].mutex_.lock(); + pool.threads_queues_[1].mutex_.lock(); + // simulate busy queues for some time + std::jthread t1([&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(5000)); + pool.threads_queues_[0].mutex_.unlock(); + pool.threads_queues_[1].mutex_.unlock(); + }); -TEST(ThreadPool, OneThread) { - using namespace ppqsort::impl::cpp; - ThreadPool pool(1); - std::atomic counter = 0; + // now push normally to wake up the threads + pool.push_task([&](){ ++counter; }); + pool.wait_and_stop(); - auto task = [&]() { pool.push_task([&]() {counter++;}); }; + ASSERT_EQ(counter, 3); + } + + + /**************************************************** + * TaskStack Tests + ****************************************************/ + + TEST(testTaskStack, PushPop) { + using namespace ppqsort::impl::cpp; + TaskStack stack; + stack.push(1); + stack.push(2); + stack.push(3); + ASSERT_EQ(stack.pop().value(), 3); + ASSERT_EQ(stack.pop().value(), 2); + ASSERT_EQ(stack.pop().value(), 1); + } + + TEST(testTaskStack, PopEmpty) { + using namespace ppqsort::impl::cpp; + TaskStack stack; + ASSERT_EQ(stack.pop().has_value(), false); + } - std::vector> futures; - for (int i = 0; i < 1000; ++i) { - futures.push_back(std::async(std::launch::async, task)); + TEST(testTaskStack, tryPushPop) { + using namespace ppqsort::impl::cpp; + TaskStack stack; + stack.try_push(1); + stack.try_push(2); + stack.try_push(3); + ASSERT_EQ(stack.try_pop().value(), 3); + ASSERT_EQ(stack.try_pop().value(), 2); + ASSERT_EQ(stack.try_pop().value(), 1); } - for (auto& f : futures) { - f.wait(); - } - - pool.wait_and_stop(); - ASSERT_EQ(counter, 1000); -} - -TEST(ThreadPool, StopEmpty) { - using namespace ppqsort::impl::cpp; - ThreadPool pool; -} - -TEST(ThreadPool, PrematureExit) { - using namespace ppqsort::impl::cpp; - ThreadPool<> testPool(2); - - std::thread::id id_task_1, id_task_2, id_task_3, id_end; - - auto end = [&]() { - id_end = std::this_thread::get_id(); - }; - - auto task_2 = [&]() { - id_task_2 = std::this_thread::get_id(); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - testPool.push_task(end); - std::this_thread::sleep_for(std::chrono::milliseconds(5000)); - }; - - auto task_1 = [&]() { - id_task_1 = std::this_thread::get_id(); - testPool.push_task(task_2); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - }; - - // task_1 pushes task_2 and sleeps, so both threads are busy and no tasks are in queue - // task_1 finishes, no tasks in queue, but task_2 is still running --> task_1 must not exit - // task_2 pushes end_task and sleeps, so the first thread should execute the end_task - // in some other implementations, the first thread was stopped before task_2 finishes - - testPool.push_task(task_1); - testPool.wait_and_stop(); - ASSERT_EQ(id_task_1, id_end); -} - -TEST(TaskStack, PushPop) { - using namespace ppqsort::impl::cpp; - TaskStack stack; - stack.push(1); - stack.push(2); - stack.push(3); - ASSERT_EQ(stack.pop().value(), 3); - ASSERT_EQ(stack.pop().value(), 2); - ASSERT_EQ(stack.pop().value(), 1); -} - -TEST(TaskStack, PopEmpty) { - using namespace ppqsort::impl::cpp; - TaskStack stack; - ASSERT_EQ(stack.pop().has_value(), false); -} - -TEST(TaskStack, tryPushPop) { - using namespace ppqsort::impl::cpp; - TaskStack stack; - stack.try_push(1); - stack.try_push(2); - stack.try_push(3); - ASSERT_EQ(stack.try_pop().value(), 3); - ASSERT_EQ(stack.try_pop().value(), 2); - ASSERT_EQ(stack.try_pop().value(), 1); } \ No newline at end of file