Flow
Documentation for the Flow C++ Library
Loading...
Searching...
No Matches
flow_concurrent_queue.h
Go to the documentation of this file.
1#pragma once
2#include <condition_variable>
3#include <deque>
4#include <mutex>
5#include <optional>
6#include <queue>
7
8namespace flow {
9 template <typename T, typename Container = std::deque<T>>
11 using allocator_type = typename Container::allocator_type;
12
13 mutable std::mutex mux_;
14 std::condition_variable blocked_;
15 std::queue<T, Container> queue_;
16
17 public:
19 ConcurrentQueue() = default;
20
22 explicit ConcurrentQueue(const allocator_type& allocator)
23 : queue_(allocator) {
24 }
25
30
33 bool empty() const {
34 std::lock_guard lock(mux_);
35 return queue_.empty();
36 }
37
39 std::size_t size() const {
40 std::lock_guard lock(mux_);
41 return queue_.size();
42 }
43
45 void push(const T& value) {
46 {
47 std::lock_guard lock(mux_);
48 queue_.push(value);
49 }
50 blocked_.notify_one();
51 }
52
54 void push(T&& value) {
55 {
56 std::lock_guard lock(mux_);
57 queue_.push(std::move(value));
58 }
59 blocked_.notify_one();
60 }
61
63 template <typename... Args>
64 void emplace(Args&&... args) {
65 {
66 std::lock_guard lock(mux_);
67 queue_.emplace(std::forward<Args>(args)...);
68 }
69 blocked_.notify_one();
70 }
71
74 std::optional<T> tryFront() const {
75 std::lock_guard lock(mux_);
76 if (queue_.empty()) {
77 return std::nullopt;
78 }
79 return queue_.front();
80 }
81
84 std::optional<T> tryPop() {
85 std::lock_guard lock(mux_);
86 if (queue_.empty()) {
87 return std::nullopt;
88 }
89 std::optional<T> value = std::move(queue_.front());
90
91 queue_.pop();
92 return value;
93 }
94
97 T waitPop() {
98 std::unique_lock lock(mux_);
99 blocked_.wait(lock, [&]() { return !queue_.empty(); });
100
101 T value = std::move(queue_.front());
102 queue_.pop();
103 return value;
104 }
105 };
106}
T waitPop()
Waits until the queue is not empty, then pops and returns the first element.
std::optional< T > tryFront() const
Tries to get a copy of the first element without removing it.
ConcurrentQueue & operator=(const ConcurrentQueue &)=delete
ConcurrentQueue(const allocator_type &allocator)
Constructs a concurrent FIFO queue.
std::optional< T > tryPop()
Tries to pop and return the first element.
ConcurrentQueue()=default
Constructs a concurrent FIFO queue.
void emplace(Args &&... args)
Constructs a new element in place at the end of the queue.
typename Container::allocator_type allocator_type
ConcurrentQueue(ConcurrentQueue &&)=delete
ConcurrentQueue & operator=(ConcurrentQueue &&)=delete
ConcurrentQueue(const ConcurrentQueue &)=delete
std::queue< T, Container > queue_
void push(T &&value)
Pushes a new element into the queue.
std::size_t size() const
Returns the number of elements in the queue. Value can be obsolete in concurrency code.
std::condition_variable blocked_
void push(const T &value)
Pushes a new element into the queue.
bool empty() const
Checks if the queue is empty. Value can be obsolete in concurrency code.