Skip to content

Commit

Permalink
Rework thread pool queue depth impl
Browse files Browse the repository at this point in the history
  • Loading branch information
trapexit committed Oct 14, 2023
1 parent 8635456 commit 620cab2
Show file tree
Hide file tree
Showing 20 changed files with 662 additions and 715 deletions.
16 changes: 8 additions & 8 deletions libfuse/include/fuse_dirents.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ int fuse_dirents_add_plus(fuse_dirents_t *d,
const uint64_t namelen,
const fuse_entry_t *entry,
const struct stat *st);
int fuse_dirents_add_linux(fuse_dirents_t *d,
const struct linux_dirent64 *de,
const uint64_t namelen);
int fuse_dirents_add_linux_plus(fuse_dirents_t *d,
const struct linux_dirent64 *de,
const uint64_t namelen,
const fuse_entry_t *entry,
const struct stat *st);
int fuse_dirents_add_linux(fuse_dirents_t *d,
const linux_dirent64_t *de,
const uint64_t namelen);
int fuse_dirents_add_linux_plus(fuse_dirents_t *d,
const linux_dirent64_t *de,
const uint64_t namelen,
const fuse_entry_t *entry,
const struct stat *st);

void *fuse_dirents_find(fuse_dirents_t *d,
const uint64_t ino);
Expand Down
5 changes: 4 additions & 1 deletion libfuse/include/linux_dirent64.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

#include <stdint.h>

struct linux_dirent64
#define DIRENT_NAMELEN(X) ((X)->reclen - offsetof(linux_dirent64_t,name))

typedef struct linux_dirent64_t linux_dirent64_t;
struct linux_dirent64_t
{
uint64_t ino;
int64_t off;
Expand Down
75 changes: 47 additions & 28 deletions libfuse/include/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "moodycamel/blockingconcurrentqueue.h"

#include <algorithm>
#include <atomic>
#include <csignal>
#include <cstring>
Expand All @@ -15,6 +16,7 @@

#include <syslog.h>


struct ThreadPoolTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const int MAX_SEMA_SPINS = 1;
Expand All @@ -29,17 +31,20 @@ class ThreadPool

public:
explicit
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::size_t const queue_depth_ = 1,
ThreadPool(unsigned const thread_count_ = std::thread::hardware_concurrency(),
unsigned const max_queue_depth_ = std::thread::hardware_concurrency(),
std::string const name_ = {})
: _queue(queue_depth_,thread_count_,thread_count_),
: _queue(),
_queue_depth(0),
_max_queue_depth(std::max(thread_count_,max_queue_depth_)),
_name(get_thread_name(name_))
{
syslog(LOG_DEBUG,
"threadpool: spawning %zu threads of queue depth %zu named '%s'",
"threadpool (%s): spawning %u threads w/ max queue depth %u%s",
_name.c_str(),
thread_count_,
queue_depth_,
_name.c_str());
_max_queue_depth,
((_max_queue_depth != max_queue_depth_) ? " (adjusted)" : ""));

sigset_t oldset;
sigset_t newset;
Expand All @@ -57,7 +62,8 @@ class ThreadPool
if(rv != 0)
{
syslog(LOG_WARNING,
"threadpool: error spawning thread - %d (%s)",
"threadpool (%s): error spawning thread - %d (%s)",
_name.c_str(),
rv,
strerror(rv));
continue;
Expand All @@ -78,9 +84,9 @@ class ThreadPool
~ThreadPool()
{
syslog(LOG_DEBUG,
"threadpool: destroying %zu threads named '%s'",
_threads.size(),
_name.c_str());
"threadpool (%s): destroying %lu threads",
_name.c_str(),
_threads.size());

auto func = []() { pthread_exit(NULL); };
for(std::size_t i = 0; i < _threads.size(); i++)
Expand Down Expand Up @@ -114,13 +120,16 @@ class ThreadPool
ThreadPool *btp = static_cast<ThreadPool*>(arg_);
ThreadPool::Func func;
ThreadPool::Queue &q = btp->_queue;
std::atomic<unsigned> &queue_depth = btp->_queue_depth;
moodycamel::ConsumerToken ctok(btp->_queue);

while(true)
{
q.wait_dequeue(ctok,func);

func();

queue_depth.fetch_sub(1,std::memory_order_release);
}

return NULL;
Expand All @@ -146,7 +155,8 @@ class ThreadPool
if(rv != 0)
{
syslog(LOG_WARNING,
"threadpool: error spawning thread - %d (%s)",
"threadpool (%s): error spawning thread - %d (%s)",
_name.c_str(),
rv,
strerror(rv));
return -rv;
Expand All @@ -161,7 +171,7 @@ class ThreadPool
}

syslog(LOG_DEBUG,
"threadpool: 1 thread added to pool '%s' named '%s'",
"threadpool (%s): 1 thread added named '%s'",
_name.c_str(),
name.c_str());

Expand Down Expand Up @@ -201,7 +211,7 @@ class ThreadPool
char name[16];
pthread_getname_np(t,name,sizeof(name));
syslog(LOG_DEBUG,
"threadpool: 1 thread removed from pool '%s' named '%s'",
"threadpool (%s): 1 thread removed named '%s'",
_name.c_str(),
name);

Expand Down Expand Up @@ -238,28 +248,32 @@ class ThreadPool
enqueue_work(moodycamel::ProducerToken &ptok_,
FuncType &&f_)
{
timespec ts = {0,10};
while(true)
timespec ts = {0,1000};
for(unsigned i = 0; i < 1000000; i++)
{
if(_queue.try_enqueue(ptok_,f_))
return;
if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
break;
::nanosleep(&ts,NULL);
ts.tv_nsec += 10;
}

_queue.enqueue(ptok_,f_);
_queue_depth.fetch_add(1,std::memory_order_release);
}

template<typename FuncType>
void
enqueue_work(FuncType &&f_)
{
timespec ts = {0,10};
while(true)
timespec ts = {0,1000};
for(unsigned i = 0; i < 1000000; i++)
{
if(_queue.try_enqueue(f_))
return;
if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
break;
::nanosleep(&ts,NULL);
ts.tv_nsec += 10;
}

_queue.enqueue(f_);
_queue_depth.fetch_add(1,std::memory_order_release);
}

template<typename FuncType>
Expand All @@ -272,21 +286,24 @@ class ThreadPool

auto promise = std::make_shared<Promise>();
auto future = promise->get_future();
auto work = [=]()

auto work = [=]()
{
auto rv = f_();
promise->set_value(rv);
};

timespec ts = {0,10};
while(true)
timespec ts = {0,1000};
for(unsigned i = 0; i < 1000000; i++)
{
if(_queue.try_enqueue(work))
if(_queue_depth.load(std::memory_order_acquire) < _max_queue_depth)
break;
::nanosleep(&ts,NULL);
ts.tv_nsec += 10;
}

_queue.enqueue(work);
_queue_depth.fetch_add(1,std::memory_order_release);

return future;
}

Expand All @@ -307,6 +324,8 @@ class ThreadPool

private:
Queue _queue;
std::atomic<unsigned> _queue_depth;
unsigned const _max_queue_depth;

private:
std::string const _name;
Expand Down
16 changes: 8 additions & 8 deletions libfuse/lib/fuse_dirents.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ fuse_dirents_add_plus(fuse_dirents_t *d_,
}

int
fuse_dirents_add_linux(fuse_dirents_t *d_,
const struct linux_dirent64 *dirent_,
const uint64_t namelen_)
fuse_dirents_add_linux(fuse_dirents_t *d_,
const linux_dirent64_t *dirent_,
const uint64_t namelen_)
{
fuse_dirent_t *d;

Expand Down Expand Up @@ -336,11 +336,11 @@ fuse_dirents_add_linux(fuse_dirents_t *d_,
}

int
fuse_dirents_add_linux_plus(fuse_dirents_t *d_,
const struct linux_dirent64 *dirent_,
const uint64_t namelen_,
const fuse_entry_t *entry_,
const struct stat *st_)
fuse_dirents_add_linux_plus(fuse_dirents_t *d_,
const linux_dirent64_t *dirent_,
const uint64_t namelen_,
const fuse_entry_t *entry_,
const struct stat *st_)
{
fuse_direntplus_t *d;

Expand Down
7 changes: 5 additions & 2 deletions libfuse/lib/fuse_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,13 @@ fuse_session_loop_mt(struct fuse_session *se_,

if(process_thread_count > 0)
process_tp = std::make_shared<ThreadPool>(process_thread_count,
process_thread_queue_depth,
(process_thread_count *
process_thread_queue_depth),
"fuse.process");

read_tp = std::make_unique<ThreadPool>(read_thread_count,1,"fuse.read");
read_tp = std::make_unique<ThreadPool>(read_thread_count,
read_thread_count,
"fuse.read");
if(process_tp)
{
for(auto i = 0; i < read_thread_count; i++)
Expand Down
Loading

0 comments on commit 620cab2

Please sign in to comment.