1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10
11use super::context::Context;
12use super::error::*;
13use super::select::{Operation, Selected, Token};
14use super::utils::{Backoff, CachePadded};
15use super::waker::SyncWaker;
16
17use crate::cell::UnsafeCell;
18use crate::mem::MaybeUninit;
19use crate::ptr;
20use crate::sync::atomic::{self, AtomicUsize, Ordering};
21use crate::time::Instant;
22
23/// A slot in a channel.
24struct Slot<T> {
25 /// The current stamp.
26 stamp: AtomicUsize,
27
28 /// The message in this slot. Either read out in `read` or dropped through
29 /// `discard_all_messages`.
30 msg: UnsafeCell<MaybeUninit<T>>,
31}
32
33/// The token type for the array flavor.
34#[derive(Debug)]
35pub(crate) struct ArrayToken {
36 /// Slot to read from or write to.
37 slot: *const u8,
38
39 /// Stamp to store into the slot after reading or writing.
40 stamp: usize,
41}
42
43impl Default for ArrayToken {
44 #[inline]
45 fn default() -> Self {
46 ArrayToken { slot: ptr::null(), stamp: 0 }
47 }
48}
49
50/// Bounded channel based on a preallocated array.
51pub(crate) struct Channel<T> {
52 /// The head of the channel.
53 ///
54 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
55 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
56 /// represent the lap. The mark bit in the head is always zero.
57 ///
58 /// Messages are popped from the head of the channel.
59 head: CachePadded<AtomicUsize>,
60
61 /// The tail of the channel.
62 ///
63 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
64 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
65 /// represent the lap. The mark bit indicates that the channel is disconnected.
66 ///
67 /// Messages are pushed into the tail of the channel.
68 tail: CachePadded<AtomicUsize>,
69
70 /// The buffer holding slots.
71 buffer: Box<[Slot<T>]>,
72
73 /// The channel capacity.
74 cap: usize,
75
76 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
77 one_lap: usize,
78
79 /// If this bit is set in the tail, that means the channel is disconnected.
80 mark_bit: usize,
81
82 /// Senders waiting while the channel is full.
83 senders: SyncWaker,
84
85 /// Receivers waiting while the channel is empty and not disconnected.
86 receivers: SyncWaker,
87}
88
89impl<T> Channel<T> {
90 /// Creates a bounded channel of capacity `cap`.
91 pub(crate) fn with_capacity(cap: usize) -> Self {
92 assert!(cap > 0, "capacity must be positive");
93
94 // Compute constants `mark_bit` and `one_lap`.
95 let mark_bit = (cap + 1).next_power_of_two();
96 let one_lap = mark_bit * 2;
97
98 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
99 let head = 0;
100 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
101 let tail = 0;
102
103 // Allocate a buffer of `cap` slots initialized
104 // with stamps.
105 let buffer: Box<[Slot<T>]> = (0..cap)
106 .map(|i| {
107 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
108 Slot { stamp: AtomicUsize::new(i), msg: UnsafeCell::new(MaybeUninit::uninit()) }
109 })
110 .collect();
111
112 Channel {
113 buffer,
114 cap,
115 one_lap,
116 mark_bit,
117 head: CachePadded::new(AtomicUsize::new(head)),
118 tail: CachePadded::new(AtomicUsize::new(tail)),
119 senders: SyncWaker::new(),
120 receivers: SyncWaker::new(),
121 }
122 }
123
124 /// Attempts to reserve a slot for sending a message.
125 fn start_send(&self, token: &mut Token) -> bool {
126 let backoff = Backoff::new();
127 let mut tail = self.tail.load(Ordering::Relaxed);
128
129 loop {
130 // Check if the channel is disconnected.
131 if tail & self.mark_bit != 0 {
132 token.array.slot = ptr::null();
133 token.array.stamp = 0;
134 return true;
135 }
136
137 // Deconstruct the tail.
138 let index = tail & (self.mark_bit - 1);
139 let lap = tail & !(self.one_lap - 1);
140
141 // Inspect the corresponding slot.
142 debug_assert!(index < self.buffer.len());
143 let slot = unsafe { self.buffer.get_unchecked(index) };
144 let stamp = slot.stamp.load(Ordering::Acquire);
145
146 // If the tail and the stamp match, we may attempt to push.
147 if tail == stamp {
148 let new_tail = if index + 1 < self.cap {
149 // Same lap, incremented index.
150 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
151 tail + 1
152 } else {
153 // One lap forward, index wraps around to zero.
154 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
155 lap.wrapping_add(self.one_lap)
156 };
157
158 // Try moving the tail.
159 match self.tail.compare_exchange_weak(
160 tail,
161 new_tail,
162 Ordering::SeqCst,
163 Ordering::Relaxed,
164 ) {
165 Ok(_) => {
166 // Prepare the token for the follow-up call to `write`.
167 token.array.slot = slot as *const Slot<T> as *const u8;
168 token.array.stamp = tail + 1;
169 return true;
170 }
171 Err(_) => {
172 backoff.spin_light();
173 tail = self.tail.load(Ordering::Relaxed);
174 }
175 }
176 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
177 atomic::fence(Ordering::SeqCst);
178 let head = self.head.load(Ordering::Relaxed);
179
180 // If the head lags one lap behind the tail as well...
181 if head.wrapping_add(self.one_lap) == tail {
182 // ...then the channel is full.
183 return false;
184 }
185
186 backoff.spin_light();
187 tail = self.tail.load(Ordering::Relaxed);
188 } else {
189 // Snooze because we need to wait for the stamp to get updated.
190 backoff.spin_heavy();
191 tail = self.tail.load(Ordering::Relaxed);
192 }
193 }
194 }
195
196 /// Writes a message into the channel.
197 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
198 // If there is no slot, the channel is disconnected.
199 if token.array.slot.is_null() {
200 return Err(msg);
201 }
202
203 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
204
205 // Write the message into the slot and update the stamp.
206 slot.msg.get().write(MaybeUninit::new(msg));
207 slot.stamp.store(token.array.stamp, Ordering::Release);
208
209 // Wake a sleeping receiver.
210 self.receivers.notify();
211 Ok(())
212 }
213
214 /// Attempts to reserve a slot for receiving a message.
215 fn start_recv(&self, token: &mut Token) -> bool {
216 let backoff = Backoff::new();
217 let mut head = self.head.load(Ordering::Relaxed);
218
219 loop {
220 // Deconstruct the head.
221 let index = head & (self.mark_bit - 1);
222 let lap = head & !(self.one_lap - 1);
223
224 // Inspect the corresponding slot.
225 debug_assert!(index < self.buffer.len());
226 let slot = unsafe { self.buffer.get_unchecked(index) };
227 let stamp = slot.stamp.load(Ordering::Acquire);
228
229 // If the stamp is ahead of the head by 1, we may attempt to pop.
230 if head + 1 == stamp {
231 let new = if index + 1 < self.cap {
232 // Same lap, incremented index.
233 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
234 head + 1
235 } else {
236 // One lap forward, index wraps around to zero.
237 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
238 lap.wrapping_add(self.one_lap)
239 };
240
241 // Try moving the head.
242 match self.head.compare_exchange_weak(
243 head,
244 new,
245 Ordering::SeqCst,
246 Ordering::Relaxed,
247 ) {
248 Ok(_) => {
249 // Prepare the token for the follow-up call to `read`.
250 token.array.slot = slot as *const Slot<T> as *const u8;
251 token.array.stamp = head.wrapping_add(self.one_lap);
252 return true;
253 }
254 Err(_) => {
255 backoff.spin_light();
256 head = self.head.load(Ordering::Relaxed);
257 }
258 }
259 } else if stamp == head {
260 atomic::fence(Ordering::SeqCst);
261 let tail = self.tail.load(Ordering::Relaxed);
262
263 // If the tail equals the head, that means the channel is empty.
264 if (tail & !self.mark_bit) == head {
265 // If the channel is disconnected...
266 if tail & self.mark_bit != 0 {
267 // ...then receive an error.
268 token.array.slot = ptr::null();
269 token.array.stamp = 0;
270 return true;
271 } else {
272 // Otherwise, the receive operation is not ready.
273 return false;
274 }
275 }
276
277 backoff.spin_light();
278 head = self.head.load(Ordering::Relaxed);
279 } else {
280 // Snooze because we need to wait for the stamp to get updated.
281 backoff.spin_heavy();
282 head = self.head.load(Ordering::Relaxed);
283 }
284 }
285 }
286
287 /// Reads a message from the channel.
288 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
289 if token.array.slot.is_null() {
290 // The channel is disconnected.
291 return Err(());
292 }
293
294 let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
295
296 // Read the message from the slot and update the stamp.
297 let msg = slot.msg.get().read().assume_init();
298 slot.stamp.store(token.array.stamp, Ordering::Release);
299
300 // Wake a sleeping sender.
301 self.senders.notify();
302 Ok(msg)
303 }
304
305 /// Attempts to send a message into the channel.
306 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
307 let token = &mut Token::default();
308 if self.start_send(token) {
309 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
310 } else {
311 Err(TrySendError::Full(msg))
312 }
313 }
314
315 /// Sends a message into the channel.
316 pub(crate) fn send(
317 &self,
318 msg: T,
319 deadline: Option<Instant>,
320 ) -> Result<(), SendTimeoutError<T>> {
321 let token = &mut Token::default();
322 loop {
323 // Try sending a message.
324 if self.start_send(token) {
325 let res = unsafe { self.write(token, msg) };
326 return res.map_err(SendTimeoutError::Disconnected);
327 }
328
329 if let Some(d) = deadline {
330 if Instant::now() >= d {
331 return Err(SendTimeoutError::Timeout(msg));
332 }
333 }
334
335 Context::with(|cx| {
336 // Prepare for blocking until a receiver wakes us up.
337 let oper = Operation::hook(token);
338 self.senders.register(oper, cx);
339
340 // Has the channel become ready just now?
341 if !self.is_full() || self.is_disconnected() {
342 let _ = cx.try_select(Selected::Aborted);
343 }
344
345 // Block the current thread.
346 let sel = cx.wait_until(deadline);
347
348 match sel {
349 Selected::Waiting => unreachable!(),
350 Selected::Aborted | Selected::Disconnected => {
351 self.senders.unregister(oper).unwrap();
352 }
353 Selected::Operation(_) => {}
354 }
355 });
356 }
357 }
358
359 /// Attempts to receive a message without blocking.
360 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
361 let token = &mut Token::default();
362
363 if self.start_recv(token) {
364 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
365 } else {
366 Err(TryRecvError::Empty)
367 }
368 }
369
370 /// Receives a message from the channel.
371 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
372 let token = &mut Token::default();
373 loop {
374 // Try receiving a message.
375 if self.start_recv(token) {
376 let res = unsafe { self.read(token) };
377 return res.map_err(|_| RecvTimeoutError::Disconnected);
378 }
379
380 if let Some(d) = deadline {
381 if Instant::now() >= d {
382 return Err(RecvTimeoutError::Timeout);
383 }
384 }
385
386 Context::with(|cx| {
387 // Prepare for blocking until a sender wakes us up.
388 let oper = Operation::hook(token);
389 self.receivers.register(oper, cx);
390
391 // Has the channel become ready just now?
392 if !self.is_empty() || self.is_disconnected() {
393 let _ = cx.try_select(Selected::Aborted);
394 }
395
396 // Block the current thread.
397 let sel = cx.wait_until(deadline);
398
399 match sel {
400 Selected::Waiting => unreachable!(),
401 Selected::Aborted | Selected::Disconnected => {
402 self.receivers.unregister(oper).unwrap();
403 // If the channel was disconnected, we still have to check for remaining
404 // messages.
405 }
406 Selected::Operation(_) => {}
407 }
408 });
409 }
410 }
411
412 /// Returns the current number of messages inside the channel.
413 pub(crate) fn len(&self) -> usize {
414 loop {
415 // Load the tail, then load the head.
416 let tail = self.tail.load(Ordering::SeqCst);
417 let head = self.head.load(Ordering::SeqCst);
418
419 // If the tail didn't change, we've got consistent values to work with.
420 if self.tail.load(Ordering::SeqCst) == tail {
421 let hix = head & (self.mark_bit - 1);
422 let tix = tail & (self.mark_bit - 1);
423
424 return if hix < tix {
425 tix - hix
426 } else if hix > tix {
427 self.cap - hix + tix
428 } else if (tail & !self.mark_bit) == head {
429 0
430 } else {
431 self.cap
432 };
433 }
434 }
435 }
436
437 /// Returns the capacity of the channel.
438 #[allow(clippy::unnecessary_wraps)] // This is intentional.
439 pub(crate) fn capacity(&self) -> Option<usize> {
440 Some(self.cap)
441 }
442
443 /// Disconnects senders and wakes up all blocked receivers.
444 ///
445 /// Returns `true` if this call disconnected the channel.
446 pub(crate) fn disconnect_senders(&self) -> bool {
447 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
448
449 if tail & self.mark_bit == 0 {
450 self.receivers.disconnect();
451 true
452 } else {
453 false
454 }
455 }
456
457 /// Disconnects receivers and wakes up all blocked senders.
458 ///
459 /// Returns `true` if this call disconnected the channel.
460 ///
461 /// # Safety
462 /// May only be called once upon dropping the last receiver. The
463 /// destruction of all other receivers must have been observed with acquire
464 /// ordering or stronger.
465 pub(crate) unsafe fn disconnect_receivers(&self) -> bool {
466 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
467 let disconnected = if tail & self.mark_bit == 0 {
468 self.senders.disconnect();
469 true
470 } else {
471 false
472 };
473
474 self.discard_all_messages(tail);
475 disconnected
476 }
477
478 /// Discards all messages.
479 ///
480 /// `tail` should be the current (and therefore last) value of `tail`.
481 ///
482 /// # Panicking
483 /// If a destructor panics, the remaining messages are leaked, matching the
484 /// behaviour of the unbounded channel.
485 ///
486 /// # Safety
487 /// This method must only be called when dropping the last receiver. The
488 /// destruction of all other receivers must have been observed with acquire
489 /// ordering or stronger.
490 unsafe fn discard_all_messages(&self, tail: usize) {
491 debug_assert!(self.is_disconnected());
492
493 // Only receivers modify `head`, so since we are the last one,
494 // this value will not change and will not be observed (since
495 // no new messages can be sent after disconnection).
496 let mut head = self.head.load(Ordering::Relaxed);
497 let tail = tail & !self.mark_bit;
498
499 let backoff = Backoff::new();
500 loop {
501 // Deconstruct the head.
502 let index = head & (self.mark_bit - 1);
503 let lap = head & !(self.one_lap - 1);
504
505 // Inspect the corresponding slot.
506 debug_assert!(index < self.buffer.len());
507 let slot = unsafe { self.buffer.get_unchecked(index) };
508 let stamp = slot.stamp.load(Ordering::Acquire);
509
510 // If the stamp is ahead of the head by 1, we may drop the message.
511 if head + 1 == stamp {
512 head = if index + 1 < self.cap {
513 // Same lap, incremented index.
514 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
515 head + 1
516 } else {
517 // One lap forward, index wraps around to zero.
518 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
519 lap.wrapping_add(self.one_lap)
520 };
521
522 unsafe {
523 (*slot.msg.get()).assume_init_drop();
524 }
525 // If the tail equals the head, that means the channel is empty.
526 } else if tail == head {
527 return;
528 // Otherwise, a sender is about to write into the slot, so we need
529 // to wait for it to update the stamp.
530 } else {
531 backoff.spin_heavy();
532 }
533 }
534 }
535
536 /// Returns `true` if the channel is disconnected.
537 pub(crate) fn is_disconnected(&self) -> bool {
538 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
539 }
540
541 /// Returns `true` if the channel is empty.
542 pub(crate) fn is_empty(&self) -> bool {
543 let head = self.head.load(Ordering::SeqCst);
544 let tail = self.tail.load(Ordering::SeqCst);
545
546 // Is the tail equal to the head?
547 //
548 // Note: If the head changes just before we load the tail, that means there was a moment
549 // when the channel was not empty, so it is safe to just return `false`.
550 (tail & !self.mark_bit) == head
551 }
552
553 /// Returns `true` if the channel is full.
554 pub(crate) fn is_full(&self) -> bool {
555 let tail = self.tail.load(Ordering::SeqCst);
556 let head = self.head.load(Ordering::SeqCst);
557
558 // Is the head lagging one lap behind tail?
559 //
560 // Note: If the tail changes just before we load the head, that means there was a moment
561 // when the channel was not full, so it is safe to just return `false`.
562 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
563 }
564}
565