Flow
Documentation for the Flow C++ Library
Loading...
Searching...
No Matches
flow_multi_queue_thread_pool.h
Go to the documentation of this file.
1#pragma once
3#include "flow_thread_task.h"
4#include "flow_vector.h"
6
7#include <future>
8#include <thread>
9
10namespace flow {
11
14 template <typename ThreadSafeQueue = ConcurrentQueue<ThreadTask>, typename LocalQueue = WorkStealingQueue<ThreadTask>>
18 ThreadSafeQueue poolQueue_;
19
20 inline static thread_local std::size_t threadId = 0;
21 inline static thread_local LocalQueue* localQueue = nullptr;
22
23 public:
24 explicit MultiQueueThreadPool(std::size_t threadCount = std::thread::hardware_concurrency() - 1)
25 : localQueues_(threadCount) {
26
27 threads_.reserve(threadCount);
28 for (std::size_t i = 0; i < threadCount; ++i) {
29 threads_.emplaceBack(std::bind(&MultiQueueThreadPool::runTasks, this, std::placeholders::_1, i));
30 }
31 }
32
34 std::size_t poolSize() const {
35 return threads_.size();
36 }
37
43 template <typename Callable, typename ...Args>
44 std::future<std::invoke_result_t<Callable, Args&&...>> submit(Callable callable, Args&&... args) {
45 using ReturnType = std::invoke_result_t<Callable, Args&&...>;
46 std::packaged_task<ReturnType()> task(std::bind(std::move(callable), std::forward<Args>(args)...));
47 auto result = task.get_future();
48
49 if (localQueue) {
50 localQueue->push(ThreadTask(std::move(task)));
51 } else {
52 poolQueue_.push(ThreadTask(std::move(task)));
53 }
54
55 return result;
56 }
57
64
65 // Pop from local queue.
66 if (localQueue) {
67 if (auto task = localQueue->tryPop(); task) {
68 task->execute();
69 return;
70 }
71 }
72
73 // Pop from the pool queue.
74 if (auto task = poolQueue_.tryPop(); task) {
75 task->execute();
76 return;
77 }
78
79 // Steal from other queue.
80 std::size_t threadCount = poolSize();
81 for (std::size_t i = 0; i < threadCount; ++i) {
82 std::size_t id = (threadId + 1 + i) % threadCount;
83 if (auto task = localQueues_[id].trySteal(); task) {
84 task->execute();
85 return;
86 }
87 }
88
89 std::this_thread::yield();
90 }
91
92 private:
93 void runTasks(std::stop_token token, std::size_t id) {
94 threadId = id;
96
97 while (!token.stop_requested()) {
99 }
100 }
101 };
102
103}
static thread_local std::size_t threadId
flow::Vector< LocalQueue > localQueues_
MultiQueueThreadPool(std::size_t threadCount=std::thread::hardware_concurrency() - 1)
flow::Vector< std::jthread > threads_
void runTasks(std::stop_token token, std::size_t id)
std::future< std::invoke_result_t< Callable, Args &&... > > submit(Callable callable, Args &&... args)
Submit a task to the thread-local queue. If the task is submit by an external thread outside of the t...
void runPendingTask()
Try run a task from the local queue. If the local queue is empty, it tries to run a task from the poo...
static thread_local LocalQueue * localQueue
A task that can be execute by a thread. Internally, it uses type erasure to store a std::packaged_tas...