Flow
Documentation for the Flow C++ Library
Loading...
Searching...
No Matches
flow_work_stealing_queue.h
Go to the documentation of this file.
1#pragma once
2#include <condition_variable>
3#include <deque>
4#include <mutex>
5#include <optional>
6
7namespace flow {
8
9 // A work-stealing queue. Similar to the concurrent queue, but it allows trySteal that pop the last element.
10 template <typename T>
12 using allocator_type = typename std::deque<T>::allocator_type;
13
14 mutable std::mutex mux_;
15 std::condition_variable blocked_;
16 std::deque<T> queue_;
17
18 public:
20 WorkStealingQueue() = default;
21
23 explicit WorkStealingQueue(const allocator_type& allocator)
24 : queue_(allocator) {
25 }
26
31
34 bool empty() const {
35 std::lock_guard lock(mux_);
36 return queue_.empty();
37 }
38
40 std::size_t size() const {
41 std::lock_guard lock(mux_);
42 return queue_.size();
43 }
44
46 void push(const T& value) {
47 {
48 std::lock_guard lock(mux_);
49 queue_.push_back(value);
50 }
51 blocked_.notify_one();
52 }
53
55 void push(T&& value) {
56 {
57 std::lock_guard lock(mux_);
58 queue_.push_back(std::move(value));
59 }
60 blocked_.notify_one();
61 }
62
64 template <typename... Args>
65 void emplace(Args&&... args) {
66 {
67 std::lock_guard lock(mux_);
68 queue_.emplace_back(std::forward<Args>(args)...);
69 }
70 blocked_.notify_one();
71 }
72
75 std::optional<T> tryFront() const {
76 std::lock_guard lock(mux_);
77 if (queue_.empty()) {
78 return std::nullopt;
79 }
80 return queue_.front();
81 }
82
85 std::optional<T> tryPop() {
86 std::lock_guard lock(mux_);
87 if (queue_.empty()) {
88 return std::nullopt;
89 }
90 std::optional<T> value = std::move(queue_.front());
91
92 queue_.pop_front();
93 return value;
94 }
95
98 T waitPop() {
99 std::unique_lock lock(mux_);
100 blocked_.wait(lock, [&]() { return !queue_.empty(); });
101
102 T value = std::move(queue_.front());
103 queue_.pop_front();
104 return value;
105 }
106
109 std::optional<T> trySteal() {
110 std::lock_guard lock(mux_);
111 if (queue_.empty()) {
112 return std::nullopt;
113 }
114 std::optional<T> value = std::move(queue_.back());
115
116 queue_.pop_back();
117 return value;
118 }
119 };
120
121}
void push(const T &value)
Pushes a new element into the queue.
void emplace(Args &&... args)
Constructs a new element in place at the end of the queue.
WorkStealingQueue(const WorkStealingQueue &)=delete
std::size_t size() const
Returns the number of elements in the queue. Value can be obsolete in concurrency code.
T waitPop()
Waits until the queue is not empty, then pops and returns the first element.
WorkStealingQueue & operator=(const WorkStealingQueue &)=delete
WorkStealingQueue(WorkStealingQueue &&)=delete
typename std::deque< T >::allocator_type allocator_type
WorkStealingQueue()=default
Constructs a concurrent FIFO queue.
std::optional< T > tryPop()
Tries to pop front and return the first element.
WorkStealingQueue(const allocator_type &allocator)
Constructs a concurrent FIFO queue.
bool empty() const
Checks if the queue is empty. Value can be obsolete in concurrency code.
void push(T &&value)
Pushes a new element into the queue.
std::condition_variable blocked_
std::optional< T > tryFront() const
Tries to get a copy of the first element without removing it.
std::optional< T > trySteal()
Tries to pop back and return the last element.
WorkStealingQueue & operator=(WorkStealingQueue &&)=delete