1
2// Copyright Oliver Kowalke 2016.
3// Distributed under the Boost Software License, Version 1.0.
4// (See accompanying file LICENSE_1_0.txt or copy at
5// http://www.boost.org/LICENSE_1_0.txt)
6
7#ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
8#define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
9
10#include <atomic>
11#include <chrono>
12#include <cstddef>
13#include <cstdint>
14#include <memory>
15#include <vector>
16
17#include <boost/config.hpp>
18
19#include <boost/fiber/channel_op_status.hpp>
20#include <boost/fiber/context.hpp>
21#include <boost/fiber/detail/config.hpp>
22#include <boost/fiber/detail/convert.hpp>
23#if defined(BOOST_NO_CXX14_STD_EXCHANGE)
24#include <boost/fiber/detail/exchange.hpp>
25#endif
26#include <boost/fiber/detail/spinlock.hpp>
27#include <boost/fiber/exceptions.hpp>
28#include <boost/fiber/waker.hpp>
29
30#ifdef BOOST_HAS_ABI_HEADERS
31# include BOOST_ABI_PREFIX
32#endif
33
34namespace boost {
35namespace fibers {
36
37template< typename T >
38class unbuffered_channel {
39public:
40 using value_type = typename std::remove_reference<T>::type;
41
42private:
43 struct slot {
44 value_type value;
45 waker w;
46
47 slot( value_type const& value_, waker && w) :
48 value{ value_ },
49 w{ std::move(w) } {
50 }
51
52 slot( value_type && value_, waker && w) :
53 value{ std::move( value_) },
54 w{ std::move(w) } {
55 }
56 };
57
58 // shared cacheline
59 std::atomic< slot * > slot_{ nullptr };
60 // shared cacheline
61 std::atomic_bool closed_{ false };
62 mutable detail::spinlock splk_producers_{};
63 wait_queue waiting_producers_{};
64 mutable detail::spinlock splk_consumers_{};
65 wait_queue waiting_consumers_{};
66 char pad_[cacheline_length];
67
68 bool is_empty_() {
69 return nullptr == slot_.load( std::memory_order_acquire);
70 }
71
72 bool try_push_( slot * own_slot) {
73 for (;;) {
74 slot * s = slot_.load( std::memory_order_acquire);
75 if ( nullptr == s) {
76 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
77 continue;
78 }
79 return true;
80 }
81 return false;
82 }
83 }
84
85 slot * try_pop_() {
86 slot * nil_slot = nullptr;
87 for (;;) {
88 slot * s = slot_.load( std::memory_order_acquire);
89 if ( nullptr != s) {
90 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
91 continue;}
92 }
93 return s;
94 }
95 }
96
97public:
98 unbuffered_channel() = default;
99
100 ~unbuffered_channel() {
101 close();
102 }
103
104 unbuffered_channel( unbuffered_channel const&) = delete;
105 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
106
107 bool is_closed() const noexcept {
108 return closed_.load( m: std::memory_order_acquire);
109 }
110
111 void close() noexcept {
112 // set flag
113 if ( ! closed_.exchange( i: true, m: std::memory_order_acquire) ) {
114 // notify current waiting
115 slot * s = slot_.load( std::memory_order_acquire);
116 if ( nullptr != s) {
117 // notify context
118 s->w.wake();
119 }
120 detail::spinlock_lock lk1{ splk_producers_ };
121 waiting_producers_.notify_all();
122
123 detail::spinlock_lock lk2{ splk_consumers_ };
124 waiting_consumers_.notify_all();
125 }
126 }
127
128 channel_op_status push( value_type const& value) {
129 context * active_ctx = context::active();
130 slot s{ value, {} };
131 for (;;) {
132 if ( BOOST_UNLIKELY( is_closed() ) ) {
133 return channel_op_status::closed;
134 }
135 s.w = active_ctx->create_waker();
136 if ( try_push_( own_slot: & s) ) {
137 detail::spinlock_lock lk{ splk_consumers_ };
138 waiting_consumers_.notify_one();
139 // suspend till value has been consumed
140 active_ctx->suspend( lk);
141 // resumed
142 if ( BOOST_UNLIKELY( is_closed() ) ) {
143 // channel was closed before value was consumed
144 return channel_op_status::closed;
145 }
146 // value has been consumed
147 return channel_op_status::success;
148 }
149 detail::spinlock_lock lk{ splk_producers_ };
150 if ( BOOST_UNLIKELY( is_closed() ) ) {
151 return channel_op_status::closed;
152 }
153 if ( is_empty_() ) {
154 continue;
155 }
156
157 waiting_producers_.suspend_and_wait( lk, active_ctx);
158 // resumed, slot mabye free
159 }
160 }
161
162 channel_op_status push( value_type && value) {
163 context * active_ctx = context::active();
164 slot s{ std::move( value), {} };
165 for (;;) {
166 if ( BOOST_UNLIKELY( is_closed() ) ) {
167 return channel_op_status::closed;
168 }
169 s.w = active_ctx->create_waker();
170 if ( try_push_( own_slot: & s) ) {
171 detail::spinlock_lock lk{ splk_consumers_ };
172 waiting_consumers_.notify_one();
173 // suspend till value has been consumed
174 active_ctx->suspend( lk);
175 // resumed
176 if ( BOOST_UNLIKELY( is_closed() ) ) {
177 // channel was closed before value was consumed
178 return channel_op_status::closed;
179 }
180 // value has been consumed
181 return channel_op_status::success;
182 }
183 detail::spinlock_lock lk{ splk_producers_ };
184 if ( BOOST_UNLIKELY( is_closed() ) ) {
185 return channel_op_status::closed;
186 }
187 if ( is_empty_() ) {
188 continue;
189 }
190 waiting_producers_.suspend_and_wait( lk, active_ctx);
191 // resumed, slot mabye free
192 }
193 }
194
195 template< typename Rep, typename Period >
196 channel_op_status push_wait_for( value_type const& value,
197 std::chrono::duration< Rep, Period > const& timeout_duration) {
198 return push_wait_until( value,
199 std::chrono::steady_clock::now() + timeout_duration);
200 }
201
202 template< typename Rep, typename Period >
203 channel_op_status push_wait_for( value_type && value,
204 std::chrono::duration< Rep, Period > const& timeout_duration) {
205 return push_wait_until( std::forward< value_type >( value),
206 std::chrono::steady_clock::now() + timeout_duration);
207 }
208
209 template< typename Clock, typename Duration >
210 channel_op_status push_wait_until( value_type const& value,
211 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
212 context * active_ctx = context::active();
213 slot s{ value, {} };
214 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
215 for (;;) {
216 if ( BOOST_UNLIKELY( is_closed() ) ) {
217 return channel_op_status::closed;
218 }
219 s.w = active_ctx->create_waker();
220 if ( try_push_( own_slot: & s) ) {
221 detail::spinlock_lock lk{ splk_consumers_ };
222 waiting_consumers_.notify_one();
223 // suspend this producer
224 if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
225 // clear slot
226 slot * nil_slot = nullptr, * own_slot = & s;
227 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
228 // resumed, value has not been consumed
229 return channel_op_status::timeout;
230 }
231 // resumed
232 if ( BOOST_UNLIKELY( is_closed() ) ) {
233 // channel was closed before value was consumed
234 return channel_op_status::closed;
235 }
236 // value has been consumed
237 return channel_op_status::success;
238 }
239 detail::spinlock_lock lk{ splk_producers_ };
240 if ( BOOST_UNLIKELY( is_closed() ) ) {
241 return channel_op_status::closed;
242 }
243 if ( is_empty_() ) {
244 continue;
245 }
246
247 if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
248 {
249 return channel_op_status::timeout;
250 }
251 // resumed, slot maybe free
252 }
253 }
254
255 template< typename Clock, typename Duration >
256 channel_op_status push_wait_until( value_type && value,
257 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
258 context * active_ctx = context::active();
259 slot s{ std::move( value), {} };
260 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
261 for (;;) {
262 if ( BOOST_UNLIKELY( is_closed() ) ) {
263 return channel_op_status::closed;
264 }
265 s.w = active_ctx->create_waker();
266 if ( try_push_( own_slot: & s) ) {
267 detail::spinlock_lock lk{ splk_consumers_ };
268 waiting_consumers_.notify_one();
269 // suspend this producer
270 if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
271 // clear slot
272 slot * nil_slot = nullptr, * own_slot = & s;
273 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
274 // resumed, value has not been consumed
275 return channel_op_status::timeout;
276 }
277 // resumed
278 if ( BOOST_UNLIKELY( is_closed() ) ) {
279 // channel was closed before value was consumed
280 return channel_op_status::closed;
281 }
282 // value has been consumed
283 return channel_op_status::success;
284 }
285 detail::spinlock_lock lk{ splk_producers_ };
286 if ( BOOST_UNLIKELY( is_closed() ) ) {
287 return channel_op_status::closed;
288 }
289 if ( is_empty_() ) {
290 continue;
291 }
292 if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
293 {
294 return channel_op_status::timeout;
295 }
296 // resumed, slot maybe free
297 }
298 }
299
300 channel_op_status pop( value_type & value) {
301 context * active_ctx = context::active();
302 slot * s = nullptr;
303 for (;;) {
304 if ( nullptr != ( s = try_pop_() ) ) {
305 {
306 detail::spinlock_lock lk{ splk_producers_ };
307 waiting_producers_.notify_one();
308 }
309 value = std::move( s->value);
310 // notify context
311 s->w.wake();
312 return channel_op_status::success;
313 }
314 detail::spinlock_lock lk{ splk_consumers_ };
315 if ( BOOST_UNLIKELY( is_closed() ) ) {
316 return channel_op_status::closed;
317 }
318 if ( ! is_empty_() ) {
319 continue;
320 }
321 waiting_consumers_.suspend_and_wait( lk, active_ctx);
322 // resumed, slot mabye set
323 }
324 }
325
326 value_type value_pop() {
327 context * active_ctx = context::active();
328 slot * s = nullptr;
329 for (;;) {
330 if ( nullptr != ( s = try_pop_() ) ) {
331 {
332 detail::spinlock_lock lk{ splk_producers_ };
333 waiting_producers_.notify_one();
334 }
335 // consume value
336 value_type value = std::move( s->value);
337 // notify context
338 s->w.wake();
339 return std::move( value);
340 }
341 detail::spinlock_lock lk{ splk_consumers_ };
342 if ( BOOST_UNLIKELY( is_closed() ) ) {
343 throw fiber_error{
344 std::make_error_code( e: std::errc::operation_not_permitted),
345 "boost fiber: channel is closed" };
346 }
347 if ( ! is_empty_() ) {
348 continue;
349 }
350 waiting_consumers_.suspend_and_wait( lk, active_ctx);
351 // resumed, slot mabye set
352 }
353 }
354
355 template< typename Rep, typename Period >
356 channel_op_status pop_wait_for( value_type & value,
357 std::chrono::duration< Rep, Period > const& timeout_duration) {
358 return pop_wait_until( value,
359 std::chrono::steady_clock::now() + timeout_duration);
360 }
361
362 template< typename Clock, typename Duration >
363 channel_op_status pop_wait_until( value_type & value,
364 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
365 context * active_ctx = context::active();
366 slot * s = nullptr;
367 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
368 for (;;) {
369 if ( nullptr != ( s = try_pop_() ) ) {
370 {
371 detail::spinlock_lock lk{ splk_producers_ };
372 waiting_producers_.notify_one();
373 }
374 // consume value
375 value = std::move( s->value);
376 // notify context
377 s->w.wake();
378 return channel_op_status::success;
379 }
380 detail::spinlock_lock lk{ splk_consumers_ };
381 if ( BOOST_UNLIKELY( is_closed() ) ) {
382 return channel_op_status::closed;
383 }
384 if ( ! is_empty_() ) {
385 continue;
386 }
387 if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
388 return channel_op_status::timeout;
389 }
390 }
391 }
392
393 class iterator {
394 private:
395 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
396
397 unbuffered_channel * chan_{ nullptr };
398 storage_type storage_;
399
400 void increment_( bool initial = false) {
401 BOOST_ASSERT( nullptr != chan_);
402 try {
403 if ( ! initial) {
404 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
405 }
406 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
407 } catch ( fiber_error const&) {
408 chan_ = nullptr;
409 }
410 }
411
412 public:
413 using iterator_category = std::input_iterator_tag;
414 using difference_type = std::ptrdiff_t;
415 using pointer = value_type *;
416 using reference = value_type &;
417
418 using pointer_t = pointer;
419 using reference_t = reference;
420
421 iterator() = default;
422
423 explicit iterator( unbuffered_channel< T > * chan) noexcept :
424 chan_{ chan } {
425 increment_( initial: true);
426 }
427
428 iterator( iterator const& other) noexcept :
429 chan_{ other.chan_ } {
430 }
431
432 iterator & operator=( iterator const& other) noexcept {
433 if ( this == & other) return * this;
434 chan_ = other.chan_;
435 return * this;
436 }
437
438 bool operator==( iterator const& other) const noexcept {
439 return other.chan_ == chan_;
440 }
441
442 bool operator!=( iterator const& other) const noexcept {
443 return other.chan_ != chan_;
444 }
445
446 iterator & operator++() {
447 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
448 increment_();
449 return * this;
450 }
451
452 const iterator operator++( int) = delete;
453
454 reference_t operator*() noexcept {
455 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
456 }
457
458 pointer_t operator->() noexcept {
459 return reinterpret_cast< value_type * >( std::addressof( storage_) );
460 }
461 };
462
463 friend class iterator;
464};
465
466template< typename T >
467typename unbuffered_channel< T >::iterator
468begin( unbuffered_channel< T > & chan) {
469 return typename unbuffered_channel< T >::iterator( & chan);
470}
471
472template< typename T >
473typename unbuffered_channel< T >::iterator
474end( unbuffered_channel< T > &) {
475 return typename unbuffered_channel< T >::iterator();
476}
477
478}}
479
480#ifdef BOOST_HAS_ABI_HEADERS
481# include BOOST_ABI_SUFFIX
482#endif
483
484#endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H
485

source code of boost/libs/fiber/include/boost/fiber/unbuffered_channel.hpp