Skip to content

Commit

Permalink
[Enhancement] generalize tbb's thread_pool_base (#1403)
Browse files Browse the repository at this point in the history
* Copy implementation of thread_pool_base.

Inorder to use later the thread_pool_base we need to separate it from
tbb

* Rename tbbexec to execpools

The goal is to have a common space for all the thread pools that depends
on 3rd party threading libraries, like tbb.

The new name is execpools like execution pools, where all thread pools
are collected.

tbb subdirectory is about separating the different implementation files
to manage easily the library dependencies.

* Create task flow pool implementation.

The goal is to see that the task_pool_base abstraction is sufficient or
not.

* Add boost thread pool implementation

* Add Standalone implementation for Asio.

The idea is that let's have an asio thread pool rather then a boost.
In this way the standalone asio can be also used as an implementation.

Which implementation is used for Asio is controlled by the cmake
parameter STDEXEC_ASIO_IMPLEMENTATION. It can be 'boost' or
'standalone'.

Cmake generates a configuration file that creates a namespace:
execpools::asio_impl. This points to the corresponding implementation.
In this way the pool implementation is referenced only to this
namespace.

The configuration file is generated into the build directory, inorder to
not pollute the source tree with generated files.

---------

Co-authored-by: David Eles <eles.david88@gmail.com>
Co-authored-by: Eric Niebler <eniebler@nvidia.com>
  • Loading branch information
3 people authored Oct 9, 2024
1 parent 3e41a95 commit 4d8d194
Show file tree
Hide file tree
Showing 20 changed files with 1,361 additions and 549 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ compile_commands.json
/*.cpp
/*.patch
/*.diff
/include/execpools/asio/asio_config.hpp
callgrind.*
*.pbf
*.o
a.out
*.code-workspace
*.code-workspace
88 changes: 83 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -330,18 +330,96 @@ if (STDEXEC_ENABLE_TBB)
INSTALL_EXPORT_SET stdexec-exports
)

file(GLOB_RECURSE tbbexec_sources include/tbbexec/*.hpp)
add_library(tbbexec INTERFACE ${tbbexec_sources})
list(APPEND stdexec_export_targets tbbexec)
add_library(STDEXEC::tbbexec ALIAS tbbexec)
file(GLOB_RECURSE tbbpool_sources include/execpools/tbb/*.hpp)
add_library(tbbpool INTERFACE ${tbbpool_sources})
list(APPEND stdexec_export_targets tbbpool)
add_library(STDEXEC::tbbpool ALIAS tbbpool)

target_link_libraries(tbbexec
target_link_libraries(tbbpool
INTERFACE
STDEXEC::stdexec
TBB::tbb
)
endif()

option(STDEXEC_ENABLE_TASKFLOW "Enable TaskFlow targets" OFF)

if(STDEXEC_ENABLE_TASKFLOW)
include(rapids-find)
rapids_cpm_find(Taskflow 3.7.0
CPM_ARGS
GITHUB_REPOSITORY taskflow/taskflow
GIT_TAG v3.7.0
)
file(GLOB_RECURSE taskflow_pool include/execpools/taskflow/*.hpp)
add_library(taskflow_pool INTERFACE ${taskflowexec_sources})
list(APPEND stdexec_export_targets taskflow_pool)
add_library(STDEXEC::taskflow_pool ALIAS taskflow_pool)

target_link_libraries(taskflow_pool
INTERFACE
STDEXEC::stdexec
Taskflow
)
endif()

option(STDEXEC_ENABLE_ASIO "Enable Boost targets" OFF)
set(STDEXEC_ASIO_IMPLEMENTATION "boost" CACHE STRING "boost")
set_property(CACHE STDEXEC_ASIO_IMPLEMENTATION PROPERTY STRINGS boost standalone)

if(STDEXEC_ENABLE_ASIO)
set(STDEXEC_ASIO_USES_ASIO FALSE)
set(STDEXEC_ASIO_USES_STANDALONE FALSE)

include(rapids-find)
if(${STDEXEC_ASIO_IMPLEMENTATION} STREQUAL "boost")
set(STDEXEC_ASIO_USES_BOOST TRUE)
elseif(${STDEXEC_ASIO_IMPLEMENTATION} STREQUAL "standalone")
set(STDEXEC_ASIO_USES_STANDALONE TRUE)
else()
message(FATAL_ERROR "Unknown configuration for ASIO implementation: " ${STDEXEC_ASIO_IMPLEMENTATION})
endif()

file(GLOB_RECURSE boost_pool_sources include/execpools/asio/*.hpp)
set(STDEXEC_ASIO_CONFIG_FILE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/include/execpools/asio)
configure_file(include/execpools/asio/asio_config.hpp.in ${STDEXEC_ASIO_CONFIG_FILE_PATH}/asio_config.hpp)

if(${STDEXEC_ASIO_USES_BOOST})
set(BOOST_ENABLE_COMPATIBILITY_TARGETS TRUE)
rapids_cpm_find(Boost 1.86.0
CPM_ARGS
GITHUB_REPOSITORY boostorg/boost
GIT_TAG boost-1.86.0
)
add_library(stdexec_boost_pool INTERFACE ${boost_pool_sources})
list(APPEND stdexec_export_targets stdexec_boost_pool)
add_library(STDEXEC::asio_pool ALIAS stdexec_boost_pool)

target_link_libraries(stdexec_boost_pool
INTERFACE
STDEXEC::stdexec
Boost::boost
)
elseif(${STDEXEC_ASIO_USES_STANDALONE})
include(cmake/import_standalone_asio.cmake)
import_standalone_asio(
TAG "asio-1-31-0"
VERSION "1.31.0")

add_library(stdexec_asio_pool INTERFACE ${boost_pool_sources})
list(APPEND stdexec_export_targets stdexec_asio_pool)
add_library(STDEXEC::asio_pool ALIAS stdexec_asio_pool)

target_link_libraries(stdexec_asio_pool
INTERFACE
STDEXEC::stdexec
asio
)
else()
message(FATAL_ERROR "ASIO implementation is not configured")
endif()
endif()

include(CheckIncludeFileCXX)
if (CMAKE_SYSTEM_NAME STREQUAL "Darwin")
CHECK_INCLUDE_FILE_CXX("dispatch/dispatch.h" STDEXEC_FOUND_LIBDISPATCH)
Expand Down
63 changes: 63 additions & 0 deletions cmake/import_standalone_asio.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# - This function imports the standalone version of ASIO
#
# Importing standalone asio can't be done via rapids-cpm, because the library has no cmake
# build setup. But still it can be imported with CPM.
#
# This function is based on the CPM example: https://github.com/cpm-cmake/CPM.cmake/blob/master/examples/asio-standalone/CMakeLists.txt
#
# import_standalone_asio([TAG github-tag] VERSION [version-stirng])
#
function(import_standalone_asio)
set(options "")
set(args TAG VERSION)
set(multi_args "")
cmake_parse_arguments(IMPORT_STANDALONE_ASIO "${options}" "${args}" "${multi_args}")

CPMAddPackage("gh:chriskohlhoff/asio#${IMPORT_STANDALONE_ASIO_TAG}@${IMPORT_STANDALONE_ASIO_VERSION}")

# ASIO doesn't use CMake, we have to configure it manually. Extra notes for using on Windows:
#
# 1) If _WIN32_WINNT is not set, ASIO assumes _WIN32_WINNT=0x0501, i.e. Windows XP target, which is
# definitely not the platform which most users target.
#
# 2) WIN32_LEAN_AND_MEAN is defined to make Winsock2 work.
if(asio_ADDED)
add_library(asio INTERFACE)

target_include_directories(asio SYSTEM INTERFACE ${asio_SOURCE_DIR}/asio/include)

target_compile_definitions(asio INTERFACE ASIO_STANDALONE ASIO_NO_DEPRECATED)

target_link_libraries(asio INTERFACE Threads::Threads)

if(WIN32)
# macro see @ https://stackoverflow.com/a/40217291/1746503
macro(get_win32_winnt version)
if(CMAKE_SYSTEM_VERSION)
set(ver ${CMAKE_SYSTEM_VERSION})
string(REGEX MATCH "^([0-9]+).([0-9])" ver ${ver})
string(REGEX MATCH "^([0-9]+)" verMajor ${ver})
# Check for Windows 10, b/c we'll need to convert to hex 'A'.
if("${verMajor}" MATCHES "10")
set(verMajor "A")
string(REGEX REPLACE "^([0-9]+)" ${verMajor} ver ${ver})
endif("${verMajor}" MATCHES "10")
# Remove all remaining '.' characters.
string(REPLACE "." "" ver ${ver})
# Prepend each digit with a zero.
string(REGEX REPLACE "([0-9A-Z])" "0\\1" ver ${ver})
set(${version} "0x${ver}")
endif()
endmacro()

if(NOT DEFINED _WIN32_WINNT)
get_win32_winnt(ver)
set(_WIN32_WINNT ${ver})
endif()

message(STATUS "Set _WIN32_WINNET=${_WIN32_WINNT}")

target_compile_definitions(asio INTERFACE _WIN32_WINNT=${_WIN32_WINNT} WIN32_LEAN_AND_MEAN)
endif()
endif()
endfunction()
18 changes: 14 additions & 4 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,21 @@ endif()

if (STDEXEC_ENABLE_TBB)
add_executable(example.benchmark.tbb_thread_pool benchmark/tbb_thread_pool.cpp)
target_link_libraries(example.benchmark.tbb_thread_pool PRIVATE STDEXEC::tbbexec)
target_link_libraries(example.benchmark.tbb_thread_pool PRIVATE STDEXEC::tbbpool)

add_executable(example.benchmark.tbb_thread_pool_nested benchmark/tbb_thread_pool_nested.cpp)
target_link_libraries(example.benchmark.tbb_thread_pool_nested PRIVATE STDEXEC::tbbexec)
target_link_libraries(example.benchmark.tbb_thread_pool_nested PRIVATE STDEXEC::tbbpool)

add_executable(example.benchmark.fibonacci benchmark/fibonacci.cpp)
target_link_libraries(example.benchmark.fibonacci PRIVATE STDEXEC::tbbexec)
endif()
target_link_libraries(example.benchmark.fibonacci PRIVATE STDEXEC::tbbpool)
endif()

if(STDEXEC_ENABLE_TASKFLOW)
add_executable(example.benchmark.taskflow_thread_pool benchmark/taskflow_thread_pool.cpp)
target_link_libraries(example.benchmark.taskflow_thread_pool PRIVATE STDEXEC::taskflow_pool)
endif()

if(STDEXEC_ENABLE_ASIO)
add_executable(example.benchmark.asio_thread_pool benchmark/asio_thread_pool.cpp)
target_link_libraries(example.benchmark.asio_thread_pool PRIVATE STDEXEC::asio_pool)
endif()
89 changes: 89 additions & 0 deletions examples/benchmark/asio_thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2023 Maikel Nadolski
* Copyright (c) 2023 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "./common.hpp"
#include <execpools/asio/asio_thread_pool.hpp>

struct RunThread {
void operator()(
execpools::asio_thread_pool& pool,
std::size_t total_scheds,
std::size_t tid,
std::barrier<>& barrier,
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop,
exec::numa_policy numa) {
int numa_node = numa.thread_index_to_node(tid);
numa.bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
while (true) {
barrier.arrive_and_wait();
if (stop.load()) {
break;
}
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
pmr::monotonic_buffer_resource resource{
buffer.data(), buffer.size(), pmr::null_memory_resource()};
pmr::polymorphic_allocator<char> alloc(&resource);
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
auto env = exec::make_env(stdexec::prop{stdexec::get_allocator, alloc});
while (scheds) {
stdexec::start_detached( //
stdexec::schedule(scheduler) //
| stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}),
env);
--scheds;
}
#else
auto [start, end] = exec::_pool_::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
while (scheds) {
stdexec::start_detached( //
stdexec::schedule(scheduler) //
| stdexec::then([&] {
auto prev = counter.fetch_sub(1);
if (prev == 1) {
std::lock_guard lock{mut};
cv.notify_one();
}
}));
--scheds;
}
#endif
std::unique_lock lock{mut};
cv.wait(lock, [&] { return counter.load() == 0; });
lock.unlock();
barrier.arrive_and_wait();
}
}
};

int main(int argc, char** argv) {
my_main<execpools::asio_thread_pool, RunThread>(argc, argv);
}
6 changes: 3 additions & 3 deletions examples/benchmark/fibonacci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <cstdlib>
#include <iostream>

#include <tbbexec/tbb_thread_pool.hpp>
#include <execpools/tbb/tbb_thread_pool.hpp>
#include <exec/static_thread_pool.hpp>

#include <exec/any_sender_of.hpp>
Expand Down Expand Up @@ -100,10 +100,10 @@ int main(int argc, char** argv) {
return -1;
}

std::variant<tbbexec::tbb_thread_pool, exec::static_thread_pool> pool;
std::variant<execpools::tbb_thread_pool, exec::static_thread_pool> pool;

if (argv[4] == std::string_view("tbb")) {
pool.emplace<tbbexec::tbb_thread_pool>(static_cast<int>(std::thread::hardware_concurrency()));
pool.emplace<execpools::tbb_thread_pool>(static_cast<int>(std::thread::hardware_concurrency()));
} else {
pool.emplace<exec::static_thread_pool>(
std::thread::hardware_concurrency(), exec::bwos_params{}, exec::get_numa_policy());
Expand Down
Loading

0 comments on commit 4d8d194

Please sign in to comment.