Skip to content

Commit

Permalink
Merge pull request #1130 from maikel/numa-affinity
Browse files Browse the repository at this point in the history
Implement Numa affinity for worker threads
  • Loading branch information
ericniebler authored Dec 16, 2023
2 parents e8e27de + 93ebad5 commit e7cd275
Show file tree
Hide file tree
Showing 15 changed files with 742 additions and 122 deletions.
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ if (STDEXEC_ENABLE_TBB)
)
endif ()

option (STDEXEC_ENABLE_NUMA "Enable NUMA affinity for static_thread_pool" OFF)
if (STDEXEC_ENABLE_NUMA)
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules")
find_package(numa REQUIRED)
target_link_libraries(stdexec INTERFACE numa::numa)
target_compile_definitions(stdexec INTERFACE STDEXEC_ENABLE_NUMA)
endif()
include(CheckIncludeFileCXX)
CHECK_INCLUDE_FILE_CXX("linux/io_uring.h" STDEXEC_FOUND_IO_URING)
option (STDEXEC_ENABLE_IO_URING_TESTS "Enable io_uring tests" ${STDEXEC_FOUND_IO_URING})
Expand Down
95 changes: 95 additions & 0 deletions cmake/Modules/Findnuma.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#
# Copyright (c) 2023 Maikel Nadolski
# Copyright (c) 2023 NVIDIA Corporation
#
# Licensed under the Apache License Version 2.0 with LLVM Exceptions
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# https://llvm.org/LICENSE.txt
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#[=======================================================================[.rst:
Findnuma
-------
Finds the numa library.
Imported Targets
^^^^^^^^^^^^^^^^
This module provides the following imported targets, if found:
``numa::numa``
The numa library
Result Variables
^^^^^^^^^^^^^^^^
This will define the following variables:
``numa_FOUND``
True if the system has the Foo library.
``numa_VERSION``
The version of the Foo library which was found.
``numa_INCLUDE_DIRS``
Include directories needed to use Foo.
``numa_LIBRARIES``
Libraries needed to link to Foo.
Cache Variables
^^^^^^^^^^^^^^^
The following cache variables may also be set:
``numa_INCLUDE_DIR``
The directory containing ``numa.h``.
``numa_LIBRARY``
The path to the Foo library.
#]=======================================================================]

find_path(numa_INCLUDE_DIR
NAMES numa.h
PATHS ${PC_Foo_INCLUDE_DIRS}
PATH_SUFFIXES numa
)
find_library(numa_LIBRARY
NAMES numa
PATHS ${PC_Foo_LIBRARY_DIRS}
)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(numa
FOUND_VAR numa_FOUND
REQUIRED_VARS
numa_LIBRARY
numa_INCLUDE_DIR
VERSION_VAR numa_VERSION
)

if(numa_FOUND)
set(numa_LIBRARIES ${numa_LIBRARY})
set(numa_INCLUDE_DIRS ${numa_INCLUDE_DIR})
set(numa_DEFINITIONS ${PC_numa_CFLAGS_OTHER})
endif()

if(numa_FOUND AND NOT TARGET numa::numa)
add_library(numa::numa UNKNOWN IMPORTED)
set_target_properties(numa::numa PROPERTIES
IMPORTED_LOCATION "${numa_LIBRARY}"
INTERFACE_COMPILE_OPTIONS "${PC_numa_CFLAGS_OTHER}"
INTERFACE_INCLUDE_DIRECTORIES "${numa_INCLUDE_DIR}"
)
endif()

mark_as_advanced(
numa_INCLUDE_DIR
numa_LIBRARY
)
33 changes: 25 additions & 8 deletions examples/benchmark/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/
#include <exec/env.hpp>
#include <exec/__detail/__numa.hpp>
#include <exec/static_thread_pool.hpp>

#include <algorithm>
#include <barrier>
Expand Down Expand Up @@ -87,37 +89,52 @@ statistics_all compute_perf(
return all;
}

struct numa_deleter {
std::size_t size_;
exec::numa_allocator<char> allocator_;
void operator()(char* ptr) noexcept {
allocator_.deallocate(ptr, size_);
}
};

template <class Pool, class RunThread>
void my_main(int argc, char** argv) {
void my_main(int argc, char** argv, exec::numa_policy* policy = exec::get_numa_policy()) {
int nthreads = std::thread::hardware_concurrency();
if (argc > 1) {
nthreads = std::atoi(argv[1]);
}
std::size_t total_scheds = 10'000'000;
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::vector<std::unique_ptr<char[]>> buffers(nthreads);
std::vector<std::unique_ptr<char, numa_deleter>> buffers;
#endif
Pool pool(nthreads + 1);
std::optional<Pool> pool{};
if constexpr (std::same_as<Pool, exec::static_thread_pool>) {
pool.emplace(nthreads, exec::bwos_params{}, policy);
} else {
pool.emplace(nthreads);
}
std::barrier<> barrier(nthreads + 1);
std::vector<std::thread> threads;
std::atomic<bool> stop{false};
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::size_t buffer_size = 1000 << 20;
for (auto& buf: buffers) {
buf = std::make_unique_for_overwrite<char[]>(buffer_size);
std::size_t buffer_size = 2000 << 20;
for (std::size_t i = 0; i < static_cast<std::size_t>(nthreads); ++i) {
exec::numa_allocator<char> alloc(policy->thread_index_to_node(i));
buffers.push_back(std::unique_ptr<char, numa_deleter>{alloc.allocate(buffer_size), numa_deleter{buffer_size, alloc}});
}
#endif
for (std::size_t i = 0; i < static_cast<std::size_t>(nthreads); ++i) {
threads.emplace_back(
RunThread{},
std::ref(pool),
std::ref(*pool),
total_scheds,
i,
std::ref(barrier),
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char>{buffers[i].get(), buffer_size},
#endif
std::ref(stop));
std::ref(stop),
policy);
}
std::size_t nRuns = 100;
std::size_t warmup = 1;
Expand Down
18 changes: 15 additions & 3 deletions examples/benchmark/static_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
auto scheduler = pool.get_scheduler();
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
exec::nodemask mask{};
mask.set(numa_node);
auto scheduler = pool.get_constrained_scheduler(mask);
std::mutex mut;
std::condition_variable cv;
while (true) {
Expand Down Expand Up @@ -81,6 +86,13 @@ struct RunThread {
}
};

struct my_numa_distribution : public exec::default_numa_policy {
std::size_t thread_index_to_node(std::size_t index) override {
return exec::default_numa_policy::thread_index_to_node(2 * index);
}
};

int main(int argc, char** argv) {
my_main<exec::static_thread_pool, RunThread>(argc, argv);
my_numa_distribution numa{};
my_main<exec::static_thread_pool, RunThread>(argc, argv, &numa);
}
14 changes: 12 additions & 2 deletions examples/benchmark/static_thread_pool_bulk_enqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
while (true) {
barrier.arrive_and_wait();
if (stop.load()) {
Expand All @@ -54,8 +57,15 @@ struct RunThread {
}
};

struct my_numa_distribution : public exec::default_numa_policy {
std::size_t thread_index_to_node(std::size_t index) override {
return exec::default_numa_policy::thread_index_to_node(2 * index);
}
};

int main(int argc, char** argv) {
my_main<exec::static_thread_pool, RunThread>(argc, argv);
my_numa_distribution numa{};
my_main<exec::static_thread_pool, RunThread>(argc, argv, &numa);
}
#else
int main() {
Expand Down
7 changes: 5 additions & 2 deletions examples/benchmark/static_thread_pool_bulk_enqueue_nested.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
auto scheduler = pool.get_scheduler();
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
auto scheduler = pool.get_scheduler_on_thread(tid);
while (true) {
barrier.arrive_and_wait();
if (stop.load()) {
Expand Down
5 changes: 4 additions & 1 deletion examples/benchmark/static_thread_pool_nested.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
Expand Down
5 changes: 4 additions & 1 deletion examples/benchmark/static_thread_pool_nested_old.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
Expand Down
5 changes: 4 additions & 1 deletion examples/benchmark/static_thread_pool_old.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
Expand Down
9 changes: 6 additions & 3 deletions examples/benchmark/tbb_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
Expand All @@ -39,7 +42,7 @@ struct RunThread {
pmr::monotonic_buffer_resource resource{
buffer.data(), buffer.size(), pmr::null_memory_resource()};
pmr::polymorphic_allocator<char> alloc(&resource);
auto [start, end] = exec::even_share(total_scheds, tid, pool.available_parallelism() - 1);
auto [start, end] = exec::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
auto env = exec::make_env(exec::with(stdexec::get_allocator, alloc));
Expand All @@ -57,7 +60,7 @@ struct RunThread {
--scheds;
}
#else
auto [start, end] = exec::even_share(total_scheds, tid, pool.available_parallelism() - 1);
auto [start, end] = exec::even_share(total_scheds, tid, pool.available_parallelism());
std::size_t scheds = end - start;
std::atomic<std::size_t> counter{scheds};
while (scheds) {
Expand Down
5 changes: 4 additions & 1 deletion examples/benchmark/tbb_thread_pool_nested.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ struct RunThread {
#ifndef STDEXEC_NO_MONOTONIC_BUFFER_RESOURCE
[[maybe_unused]] std::span<char> buffer,
#endif
std::atomic<bool>& stop) {
std::atomic<bool>& stop,
exec::numa_policy* numa) {
std::size_t numa_node = numa->thread_index_to_node(tid);
numa->bind_to_node(numa_node);
auto scheduler = pool.get_scheduler();
std::mutex mut;
std::condition_variable cv;
Expand Down
6 changes: 3 additions & 3 deletions include/exec/__detail/__bwos_lifo_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ namespace exec::bwos {
alignas(hardware_destructive_interference_size) std::atomic<std::uint64_t> tail_{};
alignas(hardware_destructive_interference_size) std::atomic<std::uint64_t> steal_head_{};
alignas(hardware_destructive_interference_size) std::atomic<std::uint64_t> steal_tail_{};
std::vector<Tp, Allocator> ring_buffer_{};
std::vector<Tp, Allocator> ring_buffer_;
};

bool advance_get_index() noexcept;
Expand Down Expand Up @@ -339,12 +339,12 @@ namespace exec::bwos {
}

template <class Tp, class Allocator>
lifo_queue<Tp, Allocator>::block_type::block_type(const block_type &other) {
lifo_queue<Tp, Allocator>::block_type::block_type(const block_type &other)
: ring_buffer_(other.ring_buffer_) {
head_.store(other.head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_.store(other.tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_tail_.store(other.steal_tail_.load(std::memory_order_relaxed), std::memory_order_relaxed);
steal_head_.store(other.steal_head_.load(std::memory_order_relaxed), std::memory_order_relaxed);
ring_buffer_ = other.ring_buffer_;
}

template <class Tp, class Allocator>
Expand Down
Loading

0 comments on commit e7cd275

Please sign in to comment.