1/*
2Copyright 2018 Google Inc. All Rights Reserved.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS-IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17#ifndef RESONANCE_AUDIO_UTILS_THREADSAFE_FIFO_H_
18#define RESONANCE_AUDIO_UTILS_THREADSAFE_FIFO_H_
19
20#include <atomic>
21#include <chrono>
22#include <condition_variable>
23#include <memory>
24#include <mutex>
25#include <thread>
26#include <vector>
27
28#include "base/logging.h"
29
30namespace vraudio {
31
32// Container to share preallocated data between threads. It is thread-safe for
33// single producer - single consumer FIFO usage.
34//
35// @tparam T Object type that the FIFO handles.
36template <typename T>
37class ThreadsafeFifo {
38 public:
39 // Constructor preallocates the maximum number of objects in the FIFO queue
40 // and defines the maximum waiting period before triggering a buffer underflow
41 // or overflow event. Sleeping is enabled by default and can be disabled via
42 // |EnableBlockingSleepUntilMethods|.
43 //
44 // @param max_objects Maximum number of objects in FIFO queue.
45 explicit ThreadsafeFifo(size_t max_objects);
46
47 // Constructor preallocates the maximum number of objects in the FIFO queue.
48 // Sleeping is enabled by default and can be disabled via
49 // |EnableBlockingSleepUntilMethods|.
50 //
51 // @param max_objects Maximum number of objects in FIFO queue.
52 // @param init Initializer to be assigned to allocated objects.
53 ThreadsafeFifo(size_t max_objects, const T& init);
54
55 // Returns a pointer to an available input object T. If the queue is full, a
56 // nullptr is returned.
57 //
58 // @return Pointer to an available input object. Nullptr if no input object is
59 // available.
60 T* AcquireInputObject();
61
62 // Releases a previously acquired input object to be pushed onto the FIFO
63 // front.
64 void ReleaseInputObject(const T* object);
65
66 // Returns a pointer to an output object T. If the queue is empty, a nullptr
67 // is returned.
68 //
69 // @return Pointer to the output object. Nullptr on empty queue.
70 T* AcquireOutputObject();
71
72 // Releases a previously acquired output object back to the FIFO.
73 void ReleaseOutputObject(const T* object);
74
75 // Blocks until the FIFO queue has an input object available or
76 // |EnableBlockingSleepUntilMethods(false)| is called.
77 //
78 // Returns true if free slot is available.
79 bool SleepUntilInputObjectIsAvailable() const;
80
81 // Blocks until the FIFO queue has an output object available or
82 // |EnableBlockingSleepUntilMethods(false)| is called.
83 //
84 // Returns true if an object is available.
85 bool SleepUntilOutputObjectIsAvailable() const;
86
87 // Allows for unblocking |SleepUntil[Input|Output]ObjectIsAvailable|
88 // method.
89 void EnableBlockingSleepUntilMethods(bool enable);
90
91 // Returns the number of objects in the FIFO queue.
92 size_t Size() const;
93
94 // Returns true if FIFO queue is empty, false otherwise.
95 bool Empty() const;
96
97 // Returns true if FIFO queue is full, false otherwise.
98 bool Full() const;
99
100 // Clears the FIFO queue. This call is only thread-safe if called by the
101 // consumer.
102 void Clear();
103
104 private:
105 // Conditional to signal empty/full queue events.
106 mutable std::mutex fifo_empty_mutex_;
107 mutable std::condition_variable fifo_empty_conditional_;
108
109 mutable std::mutex fifo_full_mutex_;
110 mutable std::condition_variable fifo_full_conditional_;
111
112 // Vector that stores all objects.
113 std::vector<T> fifo_;
114 size_t read_pos_;
115 size_t write_pos_;
116
117 // Atomic counter that reflects the size of |fifo_|.
118 std::atomic<size_t> fifo_size_;
119
120 std::atomic<bool> enable_sleeping_;
121};
122
123template <typename T>
124ThreadsafeFifo<T>::ThreadsafeFifo(size_t max_objects)
125 : fifo_(max_objects),
126 read_pos_(0),
127 write_pos_(0),
128 fifo_size_(0),
129 enable_sleeping_(true) {
130 CHECK_GT(max_objects, 0) << "FIFO size must be greater than zero";
131}
132
133template <typename T>
134ThreadsafeFifo<T>::ThreadsafeFifo(size_t max_objects, const T& init)
135 : ThreadsafeFifo(max_objects) {
136 for (auto& object : fifo_) {
137 object = init;
138 }
139}
140
141template <typename T>
142T* ThreadsafeFifo<T>::AcquireInputObject() {
143 if (Full()) {
144 return nullptr;
145 }
146 CHECK_LT(fifo_size_, fifo_.size());
147
148 // Add object to FIFO queue.
149 return &fifo_[write_pos_];
150}
151
152template <typename T>
153void ThreadsafeFifo<T>::ReleaseInputObject(const T* object) {
154 DCHECK_EQ(object, &fifo_[write_pos_]);
155
156 ++write_pos_;
157 write_pos_ = write_pos_ % fifo_.size();
158 if (fifo_size_.fetch_add(i: 1) == 0) {
159 {
160 // Taking the lock and dropping it immediately assure that the notify
161 // cannot happen between the check of the predicate and wait of the
162 // |fifo_empty_conditional_|.
163 std::lock_guard<std::mutex> lock(fifo_empty_mutex_);
164 }
165 // In case of an empty queue, notify reader.
166 fifo_empty_conditional_.notify_one();
167 }
168}
169
170template <typename T>
171T* ThreadsafeFifo<T>::AcquireOutputObject() {
172 if (Empty()) {
173 return nullptr;
174 }
175 CHECK_GT(fifo_size_, 0);
176 return &fifo_[read_pos_];
177}
178
179template <typename T>
180void ThreadsafeFifo<T>::ReleaseOutputObject(const T* object) {
181 DCHECK_EQ(object, &fifo_[read_pos_]);
182
183 ++read_pos_;
184 read_pos_ = read_pos_ % fifo_.size();
185
186 if (fifo_size_.fetch_sub(i: 1) == fifo_.size()) {
187 {
188 // Taking the lock and dropping it immediately assure that the notify
189 // cannot happen between the check of the predicate and wait of the
190 // |fifo_full_conditional_|.
191 std::lock_guard<std::mutex> lock(fifo_full_mutex_);
192 }
193 // In case of a previously full queue, notify writer.
194 fifo_full_conditional_.notify_one();
195 }
196}
197
198template <typename T>
199bool ThreadsafeFifo<T>::SleepUntilInputObjectIsAvailable() const {
200 // In case of a full queue, wait to allow objects to be popped from the
201 // FIFO queue.
202 std::unique_lock<std::mutex> lock(fifo_full_mutex_);
203 fifo_full_conditional_.wait(lock, [this]() {
204 return fifo_size_.load() < fifo_.size() || !enable_sleeping_.load();
205 });
206 return fifo_size_.load() < fifo_.size();
207}
208
209template <typename T>
210bool ThreadsafeFifo<T>::SleepUntilOutputObjectIsAvailable() const {
211 // In case of an empty queue, wait for new objects to be added.
212 std::unique_lock<std::mutex> lock(fifo_empty_mutex_);
213 fifo_empty_conditional_.wait(lock, [this]() {
214 return fifo_size_.load() > 0 || !enable_sleeping_.load();
215 });
216 return fifo_size_.load() > 0;
217}
218
219template <typename T>
220void ThreadsafeFifo<T>::EnableBlockingSleepUntilMethods(bool enable) {
221 enable_sleeping_ = enable;
222 // Taking the lock and dropping it immediately assure that the notify
223 // cannot happen between the check of the predicate and wait of the
224 // |fifo_empty_conditional_| and |fifo_full_conditional_|.
225 { std::lock_guard<std::mutex> lock(fifo_empty_mutex_); }
226 { std::lock_guard<std::mutex> lock(fifo_full_mutex_); }
227 fifo_empty_conditional_.notify_one();
228 fifo_full_conditional_.notify_one();
229}
230
231template <typename T>
232size_t ThreadsafeFifo<T>::Size() const {
233 return fifo_size_.load();
234}
235
236template <typename T>
237bool ThreadsafeFifo<T>::Empty() const {
238 return fifo_size_.load() == 0;
239}
240
241template <typename T>
242bool ThreadsafeFifo<T>::Full() const {
243 return fifo_size_.load() == fifo_.size();
244}
245
246template <typename T>
247void ThreadsafeFifo<T>::Clear() {
248 while (!Empty()) {
249 T* output = AcquireOutputObject();
250 if (output != nullptr) {
251 ReleaseOutputObject(object: output);
252 }
253 }
254}
255
256} // namespace vraudio
257
258#endif // RESONANCE_AUDIO_UTILS_THREADSAFE_FIFO_H_
259

source code of qtmultimedia/src/3rdparty/resonance-audio/resonance_audio/utils/threadsafe_fifo.h