1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10
11use std::boxed::Box;
12use std::cell::UnsafeCell;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
17
18use crossbeam_utils::{Backoff, CachePadded};
19
20use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
24
25/// A slot in a channel.
26struct Slot<T> {
27 /// The current stamp.
28 stamp: AtomicUsize,
29
30 /// The message in this slot.
31 msg: UnsafeCell<MaybeUninit<T>>,
32}
33
34/// The token type for the array flavor.
35#[derive(Debug)]
36pub(crate) struct ArrayToken {
37 /// Slot to read from or write to.
38 slot: *const u8,
39
40 /// Stamp to store into the slot after reading or writing.
41 stamp: usize,
42}
43
44impl Default for ArrayToken {
45 #[inline]
46 fn default() -> Self {
47 ArrayToken {
48 slot: ptr::null(),
49 stamp: 0,
50 }
51 }
52}
53
54/// Bounded channel based on a preallocated array.
55pub(crate) struct Channel<T> {
56 /// The head of the channel.
57 ///
58 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60 /// represent the lap. The mark bit in the head is always zero.
61 ///
62 /// Messages are popped from the head of the channel.
63 head: CachePadded<AtomicUsize>,
64
65 /// The tail of the channel.
66 ///
67 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69 /// represent the lap. The mark bit indicates that the channel is disconnected.
70 ///
71 /// Messages are pushed into the tail of the channel.
72 tail: CachePadded<AtomicUsize>,
73
74 /// The buffer holding slots.
75 buffer: Box<[Slot<T>]>,
76
77 /// The channel capacity.
78 cap: usize,
79
80 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81 one_lap: usize,
82
83 /// If this bit is set in the tail, that means the channel is disconnected.
84 mark_bit: usize,
85
86 /// Senders waiting while the channel is full.
87 senders: SyncWaker,
88
89 /// Receivers waiting while the channel is empty and not disconnected.
90 receivers: SyncWaker,
91}
92
93impl<T> Channel<T> {
94 /// Creates a bounded channel of capacity `cap`.
95 pub(crate) fn with_capacity(cap: usize) -> Self {
96 assert!(cap > 0, "capacity must be positive");
97
98 // Compute constants `mark_bit` and `one_lap`.
99 let mark_bit = (cap + 1).next_power_of_two();
100 let one_lap = mark_bit * 2;
101
102 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
103 let head = 0;
104 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
105 let tail = 0;
106
107 // Allocate a buffer of `cap` slots initialized
108 // with stamps.
109 let buffer: Box<[Slot<T>]> = (0..cap)
110 .map(|i| {
111 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
112 Slot {
113 stamp: AtomicUsize::new(i),
114 msg: UnsafeCell::new(MaybeUninit::uninit()),
115 }
116 })
117 .collect();
118
119 Channel {
120 buffer,
121 cap,
122 one_lap,
123 mark_bit,
124 head: CachePadded::new(AtomicUsize::new(head)),
125 tail: CachePadded::new(AtomicUsize::new(tail)),
126 senders: SyncWaker::new(),
127 receivers: SyncWaker::new(),
128 }
129 }
130
131 /// Returns a receiver handle to the channel.
132 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
133 Receiver(self)
134 }
135
136 /// Returns a sender handle to the channel.
137 pub(crate) fn sender(&self) -> Sender<'_, T> {
138 Sender(self)
139 }
140
141 /// Attempts to reserve a slot for sending a message.
142 fn start_send(&self, token: &mut Token) -> bool {
143 let backoff = Backoff::new();
144 let mut tail = self.tail.load(Ordering::Relaxed);
145
146 loop {
147 // Check if the channel is disconnected.
148 if tail & self.mark_bit != 0 {
149 token.array.slot = ptr::null();
150 token.array.stamp = 0;
151 return true;
152 }
153
154 // Deconstruct the tail.
155 let index = tail & (self.mark_bit - 1);
156 let lap = tail & !(self.one_lap - 1);
157
158 // Inspect the corresponding slot.
159 debug_assert!(index < self.buffer.len());
160 let slot = unsafe { self.buffer.get_unchecked(index) };
161 let stamp = slot.stamp.load(Ordering::Acquire);
162
163 // If the tail and the stamp match, we may attempt to push.
164 if tail == stamp {
165 let new_tail = if index + 1 < self.cap {
166 // Same lap, incremented index.
167 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
168 tail + 1
169 } else {
170 // One lap forward, index wraps around to zero.
171 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
172 lap.wrapping_add(self.one_lap)
173 };
174
175 // Try moving the tail.
176 match self.tail.compare_exchange_weak(
177 tail,
178 new_tail,
179 Ordering::SeqCst,
180 Ordering::Relaxed,
181 ) {
182 Ok(_) => {
183 // Prepare the token for the follow-up call to `write`.
184 token.array.slot = slot as *const Slot<T> as *const u8;
185 token.array.stamp = tail + 1;
186 return true;
187 }
188 Err(t) => {
189 tail = t;
190 backoff.spin();
191 }
192 }
193 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
194 atomic::fence(Ordering::SeqCst);
195 let head = self.head.load(Ordering::Relaxed);
196
197 // If the head lags one lap behind the tail as well...
198 if head.wrapping_add(self.one_lap) == tail {
199 // ...then the channel is full.
200 return false;
201 }
202
203 backoff.spin();
204 tail = self.tail.load(Ordering::Relaxed);
205 } else {
206 // Snooze because we need to wait for the stamp to get updated.
207 backoff.snooze();
208 tail = self.tail.load(Ordering::Relaxed);
209 }
210 }
211 }
212
213 /// Writes a message into the channel.
214 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
215 // If there is no slot, the channel is disconnected.
216 if token.array.slot.is_null() {
217 return Err(msg);
218 }
219
220 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
221
222 // Write the message into the slot and update the stamp.
223 slot.msg.get().write(MaybeUninit::new(msg));
224 slot.stamp.store(token.array.stamp, Ordering::Release);
225
226 // Wake a sleeping receiver.
227 self.receivers.notify();
228 Ok(())
229 }
230
231 /// Attempts to reserve a slot for receiving a message.
232 fn start_recv(&self, token: &mut Token) -> bool {
233 let backoff = Backoff::new();
234 let mut head = self.head.load(Ordering::Relaxed);
235
236 loop {
237 // Deconstruct the head.
238 let index = head & (self.mark_bit - 1);
239 let lap = head & !(self.one_lap - 1);
240
241 // Inspect the corresponding slot.
242 debug_assert!(index < self.buffer.len());
243 let slot = unsafe { self.buffer.get_unchecked(index) };
244 let stamp = slot.stamp.load(Ordering::Acquire);
245
246 // If the the stamp is ahead of the head by 1, we may attempt to pop.
247 if head + 1 == stamp {
248 let new = if index + 1 < self.cap {
249 // Same lap, incremented index.
250 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
251 head + 1
252 } else {
253 // One lap forward, index wraps around to zero.
254 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
255 lap.wrapping_add(self.one_lap)
256 };
257
258 // Try moving the head.
259 match self.head.compare_exchange_weak(
260 head,
261 new,
262 Ordering::SeqCst,
263 Ordering::Relaxed,
264 ) {
265 Ok(_) => {
266 // Prepare the token for the follow-up call to `read`.
267 token.array.slot = slot as *const Slot<T> as *const u8;
268 token.array.stamp = head.wrapping_add(self.one_lap);
269 return true;
270 }
271 Err(h) => {
272 head = h;
273 backoff.spin();
274 }
275 }
276 } else if stamp == head {
277 atomic::fence(Ordering::SeqCst);
278 let tail = self.tail.load(Ordering::Relaxed);
279
280 // If the tail equals the head, that means the channel is empty.
281 if (tail & !self.mark_bit) == head {
282 // If the channel is disconnected...
283 if tail & self.mark_bit != 0 {
284 // ...then receive an error.
285 token.array.slot = ptr::null();
286 token.array.stamp = 0;
287 return true;
288 } else {
289 // Otherwise, the receive operation is not ready.
290 return false;
291 }
292 }
293
294 backoff.spin();
295 head = self.head.load(Ordering::Relaxed);
296 } else {
297 // Snooze because we need to wait for the stamp to get updated.
298 backoff.snooze();
299 head = self.head.load(Ordering::Relaxed);
300 }
301 }
302 }
303
304 /// Reads a message from the channel.
305 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
306 if token.array.slot.is_null() {
307 // The channel is disconnected.
308 return Err(());
309 }
310
311 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
312
313 // Read the message from the slot and update the stamp.
314 let msg = slot.msg.get().read().assume_init();
315 slot.stamp.store(token.array.stamp, Ordering::Release);
316
317 // Wake a sleeping sender.
318 self.senders.notify();
319 Ok(msg)
320 }
321
322 /// Attempts to send a message into the channel.
323 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
324 let token = &mut Token::default();
325 if self.start_send(token) {
326 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
327 } else {
328 Err(TrySendError::Full(msg))
329 }
330 }
331
332 /// Sends a message into the channel.
333 pub(crate) fn send(
334 &self,
335 msg: T,
336 deadline: Option<Instant>,
337 ) -> Result<(), SendTimeoutError<T>> {
338 let token = &mut Token::default();
339 loop {
340 // Try sending a message several times.
341 let backoff = Backoff::new();
342 loop {
343 if self.start_send(token) {
344 let res = unsafe { self.write(token, msg) };
345 return res.map_err(SendTimeoutError::Disconnected);
346 }
347
348 if backoff.is_completed() {
349 break;
350 } else {
351 backoff.snooze();
352 }
353 }
354
355 if let Some(d) = deadline {
356 if Instant::now() >= d {
357 return Err(SendTimeoutError::Timeout(msg));
358 }
359 }
360
361 Context::with(|cx| {
362 // Prepare for blocking until a receiver wakes us up.
363 let oper = Operation::hook(token);
364 self.senders.register(oper, cx);
365
366 // Has the channel become ready just now?
367 if !self.is_full() || self.is_disconnected() {
368 let _ = cx.try_select(Selected::Aborted);
369 }
370
371 // Block the current thread.
372 let sel = cx.wait_until(deadline);
373
374 match sel {
375 Selected::Waiting => unreachable!(),
376 Selected::Aborted | Selected::Disconnected => {
377 self.senders.unregister(oper).unwrap();
378 }
379 Selected::Operation(_) => {}
380 }
381 });
382 }
383 }
384
385 /// Attempts to receive a message without blocking.
386 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
387 let token = &mut Token::default();
388
389 if self.start_recv(token) {
390 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
391 } else {
392 Err(TryRecvError::Empty)
393 }
394 }
395
396 /// Receives a message from the channel.
397 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
398 let token = &mut Token::default();
399 loop {
400 // Try receiving a message several times.
401 let backoff = Backoff::new();
402 loop {
403 if self.start_recv(token) {
404 let res = unsafe { self.read(token) };
405 return res.map_err(|_| RecvTimeoutError::Disconnected);
406 }
407
408 if backoff.is_completed() {
409 break;
410 } else {
411 backoff.snooze();
412 }
413 }
414
415 if let Some(d) = deadline {
416 if Instant::now() >= d {
417 return Err(RecvTimeoutError::Timeout);
418 }
419 }
420
421 Context::with(|cx| {
422 // Prepare for blocking until a sender wakes us up.
423 let oper = Operation::hook(token);
424 self.receivers.register(oper, cx);
425
426 // Has the channel become ready just now?
427 if !self.is_empty() || self.is_disconnected() {
428 let _ = cx.try_select(Selected::Aborted);
429 }
430
431 // Block the current thread.
432 let sel = cx.wait_until(deadline);
433
434 match sel {
435 Selected::Waiting => unreachable!(),
436 Selected::Aborted | Selected::Disconnected => {
437 self.receivers.unregister(oper).unwrap();
438 // If the channel was disconnected, we still have to check for remaining
439 // messages.
440 }
441 Selected::Operation(_) => {}
442 }
443 });
444 }
445 }
446
447 /// Returns the current number of messages inside the channel.
448 pub(crate) fn len(&self) -> usize {
449 loop {
450 // Load the tail, then load the head.
451 let tail = self.tail.load(Ordering::SeqCst);
452 let head = self.head.load(Ordering::SeqCst);
453
454 // If the tail didn't change, we've got consistent values to work with.
455 if self.tail.load(Ordering::SeqCst) == tail {
456 let hix = head & (self.mark_bit - 1);
457 let tix = tail & (self.mark_bit - 1);
458
459 return if hix < tix {
460 tix - hix
461 } else if hix > tix {
462 self.cap - hix + tix
463 } else if (tail & !self.mark_bit) == head {
464 0
465 } else {
466 self.cap
467 };
468 }
469 }
470 }
471
472 /// Returns the capacity of the channel.
473 pub(crate) fn capacity(&self) -> Option<usize> {
474 Some(self.cap)
475 }
476
477 /// Disconnects the channel and wakes up all blocked senders and receivers.
478 ///
479 /// Returns `true` if this call disconnected the channel.
480 pub(crate) fn disconnect(&self) -> bool {
481 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
482
483 if tail & self.mark_bit == 0 {
484 self.senders.disconnect();
485 self.receivers.disconnect();
486 true
487 } else {
488 false
489 }
490 }
491
492 /// Returns `true` if the channel is disconnected.
493 pub(crate) fn is_disconnected(&self) -> bool {
494 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
495 }
496
497 /// Returns `true` if the channel is empty.
498 pub(crate) fn is_empty(&self) -> bool {
499 let head = self.head.load(Ordering::SeqCst);
500 let tail = self.tail.load(Ordering::SeqCst);
501
502 // Is the tail equal to the head?
503 //
504 // Note: If the head changes just before we load the tail, that means there was a moment
505 // when the channel was not empty, so it is safe to just return `false`.
506 (tail & !self.mark_bit) == head
507 }
508
509 /// Returns `true` if the channel is full.
510 pub(crate) fn is_full(&self) -> bool {
511 let tail = self.tail.load(Ordering::SeqCst);
512 let head = self.head.load(Ordering::SeqCst);
513
514 // Is the head lagging one lap behind tail?
515 //
516 // Note: If the tail changes just before we load the head, that means there was a moment
517 // when the channel was not full, so it is safe to just return `false`.
518 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
519 }
520}
521
522impl<T> Drop for Channel<T> {
523 fn drop(&mut self) {
524 if mem::needs_drop::<T>() {
525 // Get the index of the head.
526 let head = *self.head.get_mut();
527 let tail = *self.tail.get_mut();
528
529 let hix = head & (self.mark_bit - 1);
530 let tix = tail & (self.mark_bit - 1);
531
532 let len = if hix < tix {
533 tix - hix
534 } else if hix > tix {
535 self.cap - hix + tix
536 } else if (tail & !self.mark_bit) == head {
537 0
538 } else {
539 self.cap
540 };
541
542 // Loop over all slots that hold a message and drop them.
543 for i in 0..len {
544 // Compute the index of the next slot holding a message.
545 let index = if hix + i < self.cap {
546 hix + i
547 } else {
548 hix + i - self.cap
549 };
550
551 unsafe {
552 debug_assert!(index < self.buffer.len());
553 let slot = self.buffer.get_unchecked_mut(index);
554 (*slot.msg.get()).assume_init_drop();
555 }
556 }
557 }
558 }
559}
560
561/// Receiver handle to a channel.
562pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
563
564/// Sender handle to a channel.
565pub(crate) struct Sender<'a, T>(&'a Channel<T>);
566
567impl<T> SelectHandle for Receiver<'_, T> {
568 fn try_select(&self, token: &mut Token) -> bool {
569 self.0.start_recv(token)
570 }
571
572 fn deadline(&self) -> Option<Instant> {
573 None
574 }
575
576 fn register(&self, oper: Operation, cx: &Context) -> bool {
577 self.0.receivers.register(oper, cx);
578 self.is_ready()
579 }
580
581 fn unregister(&self, oper: Operation) {
582 self.0.receivers.unregister(oper);
583 }
584
585 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
586 self.try_select(token)
587 }
588
589 fn is_ready(&self) -> bool {
590 !self.0.is_empty() || self.0.is_disconnected()
591 }
592
593 fn watch(&self, oper: Operation, cx: &Context) -> bool {
594 self.0.receivers.watch(oper, cx);
595 self.is_ready()
596 }
597
598 fn unwatch(&self, oper: Operation) {
599 self.0.receivers.unwatch(oper);
600 }
601}
602
603impl<T> SelectHandle for Sender<'_, T> {
604 fn try_select(&self, token: &mut Token) -> bool {
605 self.0.start_send(token)
606 }
607
608 fn deadline(&self) -> Option<Instant> {
609 None
610 }
611
612 fn register(&self, oper: Operation, cx: &Context) -> bool {
613 self.0.senders.register(oper, cx);
614 self.is_ready()
615 }
616
617 fn unregister(&self, oper: Operation) {
618 self.0.senders.unregister(oper);
619 }
620
621 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
622 self.try_select(token)
623 }
624
625 fn is_ready(&self) -> bool {
626 !self.0.is_full() || self.0.is_disconnected()
627 }
628
629 fn watch(&self, oper: Operation, cx: &Context) -> bool {
630 self.0.senders.watch(oper, cx);
631 self.is_ready()
632 }
633
634 fn unwatch(&self, oper: Operation) {
635 self.0.senders.unwatch(oper);
636 }
637}
638