-
Notifications
You must be signed in to change notification settings - Fork 7
thread_pool.hpp
Not Enough Standards' thread pool utilities are defined in header <nes/thread_pool.hpp>
Thread pool is a common pattern that aims to provides efficient multithreaded workloads. There are many implementations available, but each of them are slightly different in terms of performance and functionalities.
This implementation aims to provide an intuitive way to manage workloads synchronisation, while maximizing parallelism. The task lists are inspired by modern GPU APIs, such as Vulkan, so if you know how to use these APIs, you will easily understand how to use this thread pool implementation.
How to:
First you create a nes::thread_pool
, you can specify the amount of thread you need, or let the by default value of std::thread::hardware_concurrency()
or 8 if std::thread::hardware_concurrency()
is unavailable.
Then you can start submitting work to the pool, either with nes::thread_pool::execute and nes::thread_pool::invoke for single work, or you can use task lists.
To use task lists you need a nes::task_builder
. Once you have a nes::task_builder
you can record your task list, and then build a nes::task_list
.
Your task list can then be submitted to your thread pool. You can optionally get the future returned by the thread pool to get back your task list and reuse it as well as knowing when its execution is done.
nes::thread_pool
is the main class of this header. A thread pool manage a user-defined amount of worker threads that will execute batches, either pushed direcly to it or pushed through a task list.
nes::task_builder
is the class that generates task lists.
nes::task_result
is a class used to get the result of an invoke, or manipulate barriers and checkpoints. nes::task_checkpoint
is an alias of nes::task_result<void>
.
nes::task_fence
is a class used to put fine-grained synchronization between a nes::task_list
and any other point in the program.
nes::task_list
is an opaque class that store everything related to a task list. Tasks lists are reusable: a thread pool takes ownership of the task list when pushed in, and give it back to the user once its job is done through the future returned by nes::thread_pool::push.
namespace nes
{
template<typename T>
class task_result
{
public:
task_result() = default;
~task_result() = default;
task_result(const task_result&) = delete;
task_result& operator=(const task_result&) = delete;
task_result(task_result&& other) noexcept;
task_result& operator=(task_result&& other) noexcept;
T get();
bool valid() const noexcept;
void wait() const;
template<class Rep, class Period>
bool wait_for(const std::chrono::duration<Rep, Period>& timeout) const;
template<class Rep, class Period>
bool wait_until(const std::chrono::duration<Rep, Period>& timeout) const;
};
using task_checkpoint = task_result<void>;
class task_fence
{
public:
task_fence() = default;
~task_fence() = default;
task_fence(const task_fence&) = delete;
task_fence& operator=(const task_fence&) = delete;
task_fence(task_fence&& other) noexcept;
task_fence& operator=(task_fence&& other) noexcept;
void signal() noexcept;
};
class task_list
{
public:
constexpr task_list() = default;
~task_list() = default;
task_list(const task_list&) = delete;
task_list& operator=(const task_list&) = delete;
task_list(task_list&&) = default;
task_list& operator=(task_list&&) = default;
};
class task_builder
{
public:
explicit task_builder(std::uint32_t thread_count = std::thread::hardware_concurrency());
~task_builder() = default;
task_builder(const task_builder&) = delete;
task_builder& operator=(const task_builder&) = delete;
task_builder(task_builder&&) = default;
task_builder& operator=(task_builder&&) = default;
template<typename Func, typename... Args>
void execute(Func&& func, Args&&... args);
template<typename Func, typename... Args>
[[nodiscard]] auto invoke(Func&& func, Args&&... args);
template<typename Func, typename... Args>
void dispatch(std::uint32_t x, std::uint32_t y, std::uint32_t z, Func&& func, Args&&... args);
task_checkpoint barrier();
[[nodiscard]] task_checkpoint checkpoint();
[[nodiscard]] task_fence fence();
task_list build();
};
class thread_pool
{
public:
explicit thread_pool(std::size_t thread_count = std::thread::hardware_concurrency());
~thread_pool();
thread_pool(const thread_pool&) = delete;
thread_pool& operator=(const thread_pool&) = delete;
thread_pool(thread_pool&&) = delete;
thread_pool& operator=(thread_pool&&) = delete;
template<typename Func, typename... Args>
void execute(Func&& func, Args&&... args);
template<typename Func, typename... Args>
auto invoke(Func&& func, Args&&... args);
std::future<task_list> push(task_list list);
void wait_idle();
std::size_t thread_count() const noexcept;
};
}
Here is an example in which .
#include <array>
#include <random>
#include <iostream>
#include <nes/thread_pool.hpp>
int main()
{
static constexpr std::size_t buffer_size{32};
//Some buffers
std::array<std::uint32_t, buffer_size> input{};
std::array<std::uint32_t, buffer_size> temp{};
std::array<std::uint32_t, buffer_size> output{};
const auto print_buffers = [&input, &temp, &output]()
{
const auto print_buffer = [](const std::array<std::uint32_t, buffer_size>& buffer)
{
for(auto value : buffer)
{
std::cout << value << ",";
}
};
std::cout << "input: ";
print_buffer(input);
std::cout << "\ntemp: ";
print_buffer(temp);
std::cout << "\noutput: ";
print_buffer(output);
std::cout << std::endl;
};
//Fill the buffer with random values
std::mt19937 rng{std::random_device{}()};
std::uniform_int_distribution<std::uint32_t> dist{1, 9};
for(auto& input_value : input)
{
input_value = dist(rng);
}
//The task builder
nes::task_builder builder{};
builder.dispatch(buffer_size, 1, 1, [&input, &temp](std::uint32_t x, std::uint32_t y [[maybe_unused]], std::uint32_t z [[maybe_unused]])
{
temp[x] = input[x] * 2u;
});
nes::task_checkpoint checkpoint{builder.checkpoint()};
nes::task_fence fence{builder.fence()};
builder.dispatch(buffer_size, 1, 1, [&input, &temp, &output](std::uint32_t x, std::uint32_t y [[maybe_unused]], std::uint32_t z [[maybe_unused]])
{
for(auto value : temp)
{
output[x] += (value + input[x]);
}
});
//Create a thread pool to run our task list.
nes::thread_pool thread_pool{};
std::cout << "Initial state:" << std::endl;
print_buffers();
std::cout << "Launching first the work..." << std::endl;
std::future<nes::task_list> future{thread_pool.push(builder.build())};
std::cout << "Work started..." << std::endl;
checkpoint.wait();
std::cout << "First dispatch done:" << std::endl;
print_buffers();
std::cout << "Launching second dispatch..." << std::endl;
fence.signal();
std::cout << "Second dispatch started..." << std::endl;
future.wait();
std::cout << "Second dispatch done:" << std::endl;
print_buffers();
}
Initial state:
input: 9,6,6,5,2,8,5,3,1,2,1,7,9,4,2,8,4,5,3,5,7,2,5,8,5,8,7,1,1,7,3,8,
temp: 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
output: 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
Launching first the work...
Work started...
First dispatch done:
input: 9,6,6,5,2,8,5,3,1,2,1,7,9,4,2,8,4,5,3,5,7,2,5,8,5,8,7,1,1,7,3,8,
temp: 18,12,12,10,4,16,10,6,2,4,2,14,18,8,4,16,8,10,6,10,14,4,10,16,10,16,14,2,2,14,6,16,
output: 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
Launching second dispatch...
Second dispatch started...
Second dispatch done:
input: 9,6,6,5,2,8,5,3,1,2,1,7,9,4,2,8,4,5,3,5,7,2,5,8,5,8,7,1,1,7,3,8,
temp: 18,12,12,10,4,16,10,6,2,4,2,14,18,8,4,16,8,10,6,10,14,4,10,16,10,16,14,2,2,14,6,16,
output: 602,506,506,474,378,570,474,410,346,378,346,538,602,442,378,570,442,474,410,474,538,378,474,570,474,570,538,346,346,538,410,570,