Flow
Documentation for the Flow C++ Library
Loading...
Searching...
No Matches
flow_concurrent_flex_queue.h
Go to the documentation of this file.
1#pragma once
2#include <condition_variable>
3#include <memory>
4#include <mutex>
5#include <optional>
6
7namespace flow {
8
10 template <typename T>
12 struct Node {
13 std::optional<T> value;
14 std::unique_ptr<Node> next;
15 };
16
17 std::unique_ptr<Node> head_;
19 mutable std::mutex headMux_;
20 mutable std::mutex tailMux_;
21 std::condition_variable blocked_;
22
23 public:
26 : head_(std::make_unique<Node>()), tail_(head_.get()) {
27 }
28
33
36 bool empty() const {
37 std::scoped_lock lock(headMux_, tailMux_);
38 return head_.get() == tail_;
39 }
40
43 void push(const T& value) {
44 auto dummy = std::make_unique<Node>();
45 {
46 std::lock_guard lock(tailMux_);
47 tail_->value = value;
48 tail_->next = std::move(dummy);
49 tail_ = tail_->next.get();
50 }
51 blocked_.notify_one();
52 }
53
56 void push(T&& value) {
57 auto dummy = std::make_unique<Node>();
58 {
59 std::lock_guard lock(tailMux_);
60 tail_->value = std::move(value);
61 tail_->next = std::move(dummy);
62 tail_ = tail_->next.get();
63 }
64 blocked_.notify_one();
65 }
66
69 template <typename... Args>
70 void emplace(Args&&... args) {
71 auto dummy = std::make_unique<Node>();
72 {
73 std::lock_guard lock(tailMux_);
74 tail_->value.emplace(std::forward<Args>(args)...);
75 tail_->next = std::move(dummy);
76 tail_ = tail_->next.get();
77 }
78 blocked_.notify_one();
79 }
80
83 std::optional<T> tryFront() const {
84 std::unique_lock lock(headMux_);
85 if (head_.get() == tail_) {
86 return std::nullopt;
87 }
88 return head_->value;
89 }
90
93 std::optional<T> tryPop() {
94 std::unique_lock lock(headMux_);
95 if (head_.get() == tail_) {
96 return std::nullopt;
97 }
98
99 auto oldHead = std::move(head_);
100 head_ = std::move(oldHead->next);
101 return std::move(oldHead->value);
102 }
103
107 std::unique_lock lock(headMux_);
108 blocked_.wait(lock, [&]() { return head_.get() != tail_; });
109
110 auto oldHead = std::move(head_);
111 head_ = std::move(oldHead->next);
112 return oldHead->value.value();
113 }
114 };
115}
ConcurrentFlexQueue(const ConcurrentFlexQueue &)=delete
void emplace(Args &&... args)
Constructs a new element in place at the end of the queue.
bool empty() const
Checks if the queue is empty.
ConcurrentFlexQueue(ConcurrentFlexQueue &&)=delete
void push(T &&value)
Pushes a new element into the queue.
std::optional< T > tryPop()
Attempts to pop and return the first element.
T waitPop()
Waits until the queue is not empty, then pops and returns the first element.
ConcurrentFlexQueue & operator=(const ConcurrentFlexQueue &)=delete
void push(const T &value)
Pushes a new element into the queue.
std::optional< T > tryFront() const
Attempts to retrieve the first element without removing it.
ConcurrentFlexQueue & operator=(ConcurrentFlexQueue &&)=delete
ConcurrentFlexQueue()
Constructs an empty queue.