Flow
Documentation for the Flow C++ Library
Loading...
Searching...
No Matches
flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue > Class Template Reference

A work-stealing multiqueue threadpool. Each worker thread has a thread_local task queue to reduce thread contention. More...

#include <flow_multi_queue_thread_pool.h>

Public Member Functions

 MultiQueueThreadPool (std::size_t threadCount=std::thread::hardware_concurrency() - 1)
std::size_t poolSize () const
template<typename Callable, typename ... Args>
std::future< std::invoke_result_t< Callable, Args &&... > > submit (Callable callable, Args &&... args)
 Submit a task to the thread-local queue. If the task is submit by an external thread outside of the thread pool, then it submits to the pool queue shared by all threads instead.
void runPendingTask ()
 Try run a task from the local queue. If the local queue is empty, it tries to run a task from the pool queue. If the pool queue is empty, it tries to steal the work from other thread's local queue. If all queues are empty, then it yields the thread. This helps resolves thread deadlock due to dependency waiting.

Private Member Functions

void runTasks (std::stop_token token, std::size_t id)

Private Attributes

flow::Vector< std::jthread > threads_
flow::Vector< LocalQueue > localQueues_
ThreadSafeQueue poolQueue_

Static Private Attributes

static thread_local std::size_t threadId = 0
static thread_local LocalQueue * localQueue = nullptr

Detailed Description

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
class flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >

A work-stealing multiqueue threadpool. Each worker thread has a thread_local task queue to reduce thread contention.

Definition at line 15 of file flow_multi_queue_thread_pool.h.

Constructor & Destructor Documentation

◆ MultiQueueThreadPool()

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::MultiQueueThreadPool ( std::size_t threadCount = std::thread::hardware_concurrency() - 1)
inlineexplicit

Definition at line 24 of file flow_multi_queue_thread_pool.h.

26
27 threads_.reserve(threadCount);
28 for (std::size_t i = 0; i < threadCount; ++i) {
30 }
31 }
A work-stealing multiqueue threadpool. Each worker thread has a thread_local task queue to reduce thr...
flow::Vector< LocalQueue > localQueues_
flow::Vector< std::jthread > threads_

References localQueues_, runTasks(), and threads_.

Member Function Documentation

◆ poolSize()

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
std::size_t flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::poolSize ( ) const
inline
Returns
The number of worker threads.

Definition at line 34 of file flow_multi_queue_thread_pool.h.

34 {
35 return threads_.size();
36 }

References threads_.

Referenced by runPendingTask().

◆ runPendingTask()

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
void flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::runPendingTask ( )
inline

Try run a task from the local queue. If the local queue is empty, it tries to run a task from the pool queue. If the pool queue is empty, it tries to steal the work from other thread's local queue. If all queues are empty, then it yields the thread. This helps resolves thread deadlock due to dependency waiting.

Definition at line 63 of file flow_multi_queue_thread_pool.h.

63 {
64
65 // Pop from local queue.
66 if (localQueue) {
67 if (auto task = localQueue->tryPop(); task) {
68 task->execute();
69 return;
70 }
71 }
72
73 // Pop from the pool queue.
74 if (auto task = poolQueue_.tryPop(); task) {
75 task->execute();
76 return;
77 }
78
79 // Steal from other queue.
81 for (std::size_t i = 0; i < threadCount; ++i) {
82 std::size_t id = (threadId + 1 + i) % threadCount;
83 if (auto task = localQueues_[id].trySteal(); task) {
84 task->execute();
85 return;
86 }
87 }
88
90 }
static thread_local std::size_t threadId
static thread_local LocalQueue * localQueue

References localQueue, localQueues_, poolQueue_, poolSize(), and threadId.

Referenced by runTasks().

◆ runTasks()

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
void flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::runTasks ( std::stop_token token,
std::size_t id )
inlineprivate

Definition at line 93 of file flow_multi_queue_thread_pool.h.

93 {
94 threadId = id;
96
97 while (!token.stop_requested()) {
99 }
100 }
void runPendingTask()
Try run a task from the local queue. If the local queue is empty, it tries to run a task from the poo...

References localQueue, localQueues_, runPendingTask(), and threadId.

Referenced by MultiQueueThreadPool().

◆ submit()

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
template<typename Callable, typename ... Args>
std::future< std::invoke_result_t< Callable, Args &&... > > flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::submit ( Callable callable,
Args &&... args )
inline

Submit a task to the thread-local queue. If the task is submit by an external thread outside of the thread pool, then it submits to the pool queue shared by all threads instead.

Parameters
callableThe callable.
...argsThe callable arguments.

Definition at line 44 of file flow_multi_queue_thread_pool.h.

44 {
47 auto result = task.get_future();
48
49 if (localQueue) {
51 } else {
53 }
54
55 return result;
56 }

References localQueue, and poolQueue_.

Member Data Documentation

◆ localQueue

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
thread_local LocalQueue* flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::localQueue = nullptr
inlinestaticprivate

Definition at line 21 of file flow_multi_queue_thread_pool.h.

Referenced by runPendingTask(), runTasks(), and submit().

◆ localQueues_

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
flow::Vector<LocalQueue> flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::localQueues_
private

Definition at line 17 of file flow_multi_queue_thread_pool.h.

Referenced by MultiQueueThreadPool(), runPendingTask(), and runTasks().

◆ poolQueue_

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
ThreadSafeQueue flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::poolQueue_
private

Definition at line 18 of file flow_multi_queue_thread_pool.h.

Referenced by runPendingTask(), and submit().

◆ threadId

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
thread_local std::size_t flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::threadId = 0
inlinestaticprivate

Definition at line 20 of file flow_multi_queue_thread_pool.h.

Referenced by runPendingTask(), and runTasks().

◆ threads_

template<typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
flow::Vector<std::jthread> flow::MultiQueueThreadPool< ThreadSafeQueue, LocalQueue >::threads_
private

Definition at line 16 of file flow_multi_queue_thread_pool.h.

Referenced by MultiQueueThreadPool(), and poolSize().


The documentation for this class was generated from the following file: