1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10
11use std::cell::UnsafeCell;
12use std::mem::MaybeUninit;
13use std::ptr;
14use std::sync::atomic::{self, AtomicUsize, Ordering};
15use std::time::Instant;
16
17use crossbeam_utils::{Backoff, CachePadded};
18
19use crate::context::Context;
20use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
21use crate::select::{Operation, SelectHandle, Selected, Token};
22use crate::waker::SyncWaker;
23
24/// A slot in a channel.
25struct Slot<T> {
26 /// The current stamp.
27 stamp: AtomicUsize,
28
29 /// The message in this slot.
30 msg: UnsafeCell<MaybeUninit<T>>,
31}
32
33/// The token type for the array flavor.
34#[derive(Debug)]
35pub(crate) struct ArrayToken {
36 /// Slot to read from or write to.
37 slot: *const u8,
38
39 /// Stamp to store into the slot after reading or writing.
40 stamp: usize,
41}
42
43impl Default for ArrayToken {
44 #[inline]
45 fn default() -> Self {
46 ArrayToken {
47 slot: ptr::null(),
48 stamp: 0,
49 }
50 }
51}
52
53/// Bounded channel based on a preallocated array.
54pub(crate) struct Channel<T> {
55 /// The head of the channel.
56 ///
57 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
58 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
59 /// represent the lap. The mark bit in the head is always zero.
60 ///
61 /// Messages are popped from the head of the channel.
62 head: CachePadded<AtomicUsize>,
63
64 /// The tail of the channel.
65 ///
66 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
67 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
68 /// represent the lap. The mark bit indicates that the channel is disconnected.
69 ///
70 /// Messages are pushed into the tail of the channel.
71 tail: CachePadded<AtomicUsize>,
72
73 /// The buffer holding slots.
74 buffer: Box<[Slot<T>]>,
75
76 /// The channel capacity.
77 cap: usize,
78
79 /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
80 one_lap: usize,
81
82 /// If this bit is set in the tail, that means the channel is disconnected.
83 mark_bit: usize,
84
85 /// Senders waiting while the channel is full.
86 senders: SyncWaker,
87
88 /// Receivers waiting while the channel is empty and not disconnected.
89 receivers: SyncWaker,
90}
91
92impl<T> Channel<T> {
93 /// Creates a bounded channel of capacity `cap`.
94 pub(crate) fn with_capacity(cap: usize) -> Self {
95 assert!(cap > 0, "capacity must be positive");
96
97 // Compute constants `mark_bit` and `one_lap`.
98 let mark_bit = (cap + 1).next_power_of_two();
99 let one_lap = mark_bit * 2;
100
101 // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
102 let head = 0;
103 // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
104 let tail = 0;
105
106 // Allocate a buffer of `cap` slots initialized
107 // with stamps.
108 let buffer: Box<[Slot<T>]> = (0..cap)
109 .map(|i| {
110 // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
111 Slot {
112 stamp: AtomicUsize::new(i),
113 msg: UnsafeCell::new(MaybeUninit::uninit()),
114 }
115 })
116 .collect();
117
118 Channel {
119 buffer,
120 cap,
121 one_lap,
122 mark_bit,
123 head: CachePadded::new(AtomicUsize::new(head)),
124 tail: CachePadded::new(AtomicUsize::new(tail)),
125 senders: SyncWaker::new(),
126 receivers: SyncWaker::new(),
127 }
128 }
129
130 /// Returns a receiver handle to the channel.
131 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
132 Receiver(self)
133 }
134
135 /// Returns a sender handle to the channel.
136 pub(crate) fn sender(&self) -> Sender<'_, T> {
137 Sender(self)
138 }
139
140 /// Attempts to reserve a slot for sending a message.
141 fn start_send(&self, token: &mut Token) -> bool {
142 let backoff = Backoff::new();
143 let mut tail = self.tail.load(Ordering::Relaxed);
144
145 loop {
146 // Check if the channel is disconnected.
147 if tail & self.mark_bit != 0 {
148 token.array.slot = ptr::null();
149 token.array.stamp = 0;
150 return true;
151 }
152
153 // Deconstruct the tail.
154 let index = tail & (self.mark_bit - 1);
155 let lap = tail & !(self.one_lap - 1);
156
157 // Inspect the corresponding slot.
158 debug_assert!(index < self.buffer.len());
159 let slot = unsafe { self.buffer.get_unchecked(index) };
160 let stamp = slot.stamp.load(Ordering::Acquire);
161
162 // If the tail and the stamp match, we may attempt to push.
163 if tail == stamp {
164 let new_tail = if index + 1 < self.cap {
165 // Same lap, incremented index.
166 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
167 tail + 1
168 } else {
169 // One lap forward, index wraps around to zero.
170 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
171 lap.wrapping_add(self.one_lap)
172 };
173
174 // Try moving the tail.
175 match self.tail.compare_exchange_weak(
176 tail,
177 new_tail,
178 Ordering::SeqCst,
179 Ordering::Relaxed,
180 ) {
181 Ok(_) => {
182 // Prepare the token for the follow-up call to `write`.
183 token.array.slot = slot as *const Slot<T> as *const u8;
184 token.array.stamp = tail + 1;
185 return true;
186 }
187 Err(t) => {
188 tail = t;
189 backoff.spin();
190 }
191 }
192 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
193 atomic::fence(Ordering::SeqCst);
194 let head = self.head.load(Ordering::Relaxed);
195
196 // If the head lags one lap behind the tail as well...
197 if head.wrapping_add(self.one_lap) == tail {
198 // ...then the channel is full.
199 return false;
200 }
201
202 backoff.spin();
203 tail = self.tail.load(Ordering::Relaxed);
204 } else {
205 // Snooze because we need to wait for the stamp to get updated.
206 backoff.snooze();
207 tail = self.tail.load(Ordering::Relaxed);
208 }
209 }
210 }
211
212 /// Writes a message into the channel.
213 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
214 // If there is no slot, the channel is disconnected.
215 if token.array.slot.is_null() {
216 return Err(msg);
217 }
218
219 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
220
221 // Write the message into the slot and update the stamp.
222 slot.msg.get().write(MaybeUninit::new(msg));
223 slot.stamp.store(token.array.stamp, Ordering::Release);
224
225 // Wake a sleeping receiver.
226 self.receivers.notify();
227 Ok(())
228 }
229
230 /// Attempts to reserve a slot for receiving a message.
231 fn start_recv(&self, token: &mut Token) -> bool {
232 let backoff = Backoff::new();
233 let mut head = self.head.load(Ordering::Relaxed);
234
235 loop {
236 // Deconstruct the head.
237 let index = head & (self.mark_bit - 1);
238 let lap = head & !(self.one_lap - 1);
239
240 // Inspect the corresponding slot.
241 debug_assert!(index < self.buffer.len());
242 let slot = unsafe { self.buffer.get_unchecked(index) };
243 let stamp = slot.stamp.load(Ordering::Acquire);
244
245 // If the the stamp is ahead of the head by 1, we may attempt to pop.
246 if head + 1 == stamp {
247 let new = if index + 1 < self.cap {
248 // Same lap, incremented index.
249 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
250 head + 1
251 } else {
252 // One lap forward, index wraps around to zero.
253 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
254 lap.wrapping_add(self.one_lap)
255 };
256
257 // Try moving the head.
258 match self.head.compare_exchange_weak(
259 head,
260 new,
261 Ordering::SeqCst,
262 Ordering::Relaxed,
263 ) {
264 Ok(_) => {
265 // Prepare the token for the follow-up call to `read`.
266 token.array.slot = slot as *const Slot<T> as *const u8;
267 token.array.stamp = head.wrapping_add(self.one_lap);
268 return true;
269 }
270 Err(h) => {
271 head = h;
272 backoff.spin();
273 }
274 }
275 } else if stamp == head {
276 atomic::fence(Ordering::SeqCst);
277 let tail = self.tail.load(Ordering::Relaxed);
278
279 // If the tail equals the head, that means the channel is empty.
280 if (tail & !self.mark_bit) == head {
281 // If the channel is disconnected...
282 if tail & self.mark_bit != 0 {
283 // ...then receive an error.
284 token.array.slot = ptr::null();
285 token.array.stamp = 0;
286 return true;
287 } else {
288 // Otherwise, the receive operation is not ready.
289 return false;
290 }
291 }
292
293 backoff.spin();
294 head = self.head.load(Ordering::Relaxed);
295 } else {
296 // Snooze because we need to wait for the stamp to get updated.
297 backoff.snooze();
298 head = self.head.load(Ordering::Relaxed);
299 }
300 }
301 }
302
303 /// Reads a message from the channel.
304 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
305 if token.array.slot.is_null() {
306 // The channel is disconnected.
307 return Err(());
308 }
309
310 let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
311
312 // Read the message from the slot and update the stamp.
313 let msg = slot.msg.get().read().assume_init();
314 slot.stamp.store(token.array.stamp, Ordering::Release);
315
316 // Wake a sleeping sender.
317 self.senders.notify();
318 Ok(msg)
319 }
320
321 /// Attempts to send a message into the channel.
322 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
323 let token = &mut Token::default();
324 if self.start_send(token) {
325 unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
326 } else {
327 Err(TrySendError::Full(msg))
328 }
329 }
330
331 /// Sends a message into the channel.
332 pub(crate) fn send(
333 &self,
334 msg: T,
335 deadline: Option<Instant>,
336 ) -> Result<(), SendTimeoutError<T>> {
337 let token = &mut Token::default();
338 loop {
339 // Try sending a message several times.
340 let backoff = Backoff::new();
341 loop {
342 if self.start_send(token) {
343 let res = unsafe { self.write(token, msg) };
344 return res.map_err(SendTimeoutError::Disconnected);
345 }
346
347 if backoff.is_completed() {
348 break;
349 } else {
350 backoff.snooze();
351 }
352 }
353
354 if let Some(d) = deadline {
355 if Instant::now() >= d {
356 return Err(SendTimeoutError::Timeout(msg));
357 }
358 }
359
360 Context::with(|cx| {
361 // Prepare for blocking until a receiver wakes us up.
362 let oper = Operation::hook(token);
363 self.senders.register(oper, cx);
364
365 // Has the channel become ready just now?
366 if !self.is_full() || self.is_disconnected() {
367 let _ = cx.try_select(Selected::Aborted);
368 }
369
370 // Block the current thread.
371 let sel = cx.wait_until(deadline);
372
373 match sel {
374 Selected::Waiting => unreachable!(),
375 Selected::Aborted | Selected::Disconnected => {
376 self.senders.unregister(oper).unwrap();
377 }
378 Selected::Operation(_) => {}
379 }
380 });
381 }
382 }
383
384 /// Attempts to receive a message without blocking.
385 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
386 let token = &mut Token::default();
387
388 if self.start_recv(token) {
389 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
390 } else {
391 Err(TryRecvError::Empty)
392 }
393 }
394
395 /// Receives a message from the channel.
396 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
397 let token = &mut Token::default();
398 loop {
399 // Try receiving a message several times.
400 let backoff = Backoff::new();
401 loop {
402 if self.start_recv(token) {
403 let res = unsafe { self.read(token) };
404 return res.map_err(|_| RecvTimeoutError::Disconnected);
405 }
406
407 if backoff.is_completed() {
408 break;
409 } else {
410 backoff.snooze();
411 }
412 }
413
414 if let Some(d) = deadline {
415 if Instant::now() >= d {
416 return Err(RecvTimeoutError::Timeout);
417 }
418 }
419
420 Context::with(|cx| {
421 // Prepare for blocking until a sender wakes us up.
422 let oper = Operation::hook(token);
423 self.receivers.register(oper, cx);
424
425 // Has the channel become ready just now?
426 if !self.is_empty() || self.is_disconnected() {
427 let _ = cx.try_select(Selected::Aborted);
428 }
429
430 // Block the current thread.
431 let sel = cx.wait_until(deadline);
432
433 match sel {
434 Selected::Waiting => unreachable!(),
435 Selected::Aborted | Selected::Disconnected => {
436 self.receivers.unregister(oper).unwrap();
437 // If the channel was disconnected, we still have to check for remaining
438 // messages.
439 }
440 Selected::Operation(_) => {}
441 }
442 });
443 }
444 }
445
446 /// Returns the current number of messages inside the channel.
447 pub(crate) fn len(&self) -> usize {
448 loop {
449 // Load the tail, then load the head.
450 let tail = self.tail.load(Ordering::SeqCst);
451 let head = self.head.load(Ordering::SeqCst);
452
453 // If the tail didn't change, we've got consistent values to work with.
454 if self.tail.load(Ordering::SeqCst) == tail {
455 let hix = head & (self.mark_bit - 1);
456 let tix = tail & (self.mark_bit - 1);
457
458 return if hix < tix {
459 tix - hix
460 } else if hix > tix {
461 self.cap - hix + tix
462 } else if (tail & !self.mark_bit) == head {
463 0
464 } else {
465 self.cap
466 };
467 }
468 }
469 }
470
471 /// Returns the capacity of the channel.
472 pub(crate) fn capacity(&self) -> Option<usize> {
473 Some(self.cap)
474 }
475
476 /// Disconnects the channel and wakes up all blocked senders and receivers.
477 ///
478 /// Returns `true` if this call disconnected the channel.
479 pub(crate) fn disconnect(&self) -> bool {
480 let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
481
482 if tail & self.mark_bit == 0 {
483 self.senders.disconnect();
484 self.receivers.disconnect();
485 true
486 } else {
487 false
488 }
489 }
490
491 /// Returns `true` if the channel is disconnected.
492 pub(crate) fn is_disconnected(&self) -> bool {
493 self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
494 }
495
496 /// Returns `true` if the channel is empty.
497 pub(crate) fn is_empty(&self) -> bool {
498 let head = self.head.load(Ordering::SeqCst);
499 let tail = self.tail.load(Ordering::SeqCst);
500
501 // Is the tail equal to the head?
502 //
503 // Note: If the head changes just before we load the tail, that means there was a moment
504 // when the channel was not empty, so it is safe to just return `false`.
505 (tail & !self.mark_bit) == head
506 }
507
508 /// Returns `true` if the channel is full.
509 pub(crate) fn is_full(&self) -> bool {
510 let tail = self.tail.load(Ordering::SeqCst);
511 let head = self.head.load(Ordering::SeqCst);
512
513 // Is the head lagging one lap behind tail?
514 //
515 // Note: If the tail changes just before we load the head, that means there was a moment
516 // when the channel was not full, so it is safe to just return `false`.
517 head.wrapping_add(self.one_lap) == tail & !self.mark_bit
518 }
519}
520
521impl<T> Drop for Channel<T> {
522 fn drop(&mut self) {
523 // Get the index of the head.
524 let head = *self.head.get_mut();
525 let tail = *self.tail.get_mut();
526
527 let hix = head & (self.mark_bit - 1);
528 let tix = tail & (self.mark_bit - 1);
529
530 let len = if hix < tix {
531 tix - hix
532 } else if hix > tix {
533 self.cap - hix + tix
534 } else if (tail & !self.mark_bit) == head {
535 0
536 } else {
537 self.cap
538 };
539
540 // Loop over all slots that hold a message and drop them.
541 for i in 0..len {
542 // Compute the index of the next slot holding a message.
543 let index = if hix + i < self.cap {
544 hix + i
545 } else {
546 hix + i - self.cap
547 };
548
549 unsafe {
550 debug_assert!(index < self.buffer.len());
551 let slot = self.buffer.get_unchecked_mut(index);
552 let msg = &mut *slot.msg.get();
553 msg.as_mut_ptr().drop_in_place();
554 }
555 }
556 }
557}
558
559/// Receiver handle to a channel.
560pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
561
562/// Sender handle to a channel.
563pub(crate) struct Sender<'a, T>(&'a Channel<T>);
564
565impl<T> SelectHandle for Receiver<'_, T> {
566 fn try_select(&self, token: &mut Token) -> bool {
567 self.0.start_recv(token)
568 }
569
570 fn deadline(&self) -> Option<Instant> {
571 None
572 }
573
574 fn register(&self, oper: Operation, cx: &Context) -> bool {
575 self.0.receivers.register(oper, cx);
576 self.is_ready()
577 }
578
579 fn unregister(&self, oper: Operation) {
580 self.0.receivers.unregister(oper);
581 }
582
583 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
584 self.try_select(token)
585 }
586
587 fn is_ready(&self) -> bool {
588 !self.0.is_empty() || self.0.is_disconnected()
589 }
590
591 fn watch(&self, oper: Operation, cx: &Context) -> bool {
592 self.0.receivers.watch(oper, cx);
593 self.is_ready()
594 }
595
596 fn unwatch(&self, oper: Operation) {
597 self.0.receivers.unwatch(oper);
598 }
599}
600
601impl<T> SelectHandle for Sender<'_, T> {
602 fn try_select(&self, token: &mut Token) -> bool {
603 self.0.start_send(token)
604 }
605
606 fn deadline(&self) -> Option<Instant> {
607 None
608 }
609
610 fn register(&self, oper: Operation, cx: &Context) -> bool {
611 self.0.senders.register(oper, cx);
612 self.is_ready()
613 }
614
615 fn unregister(&self, oper: Operation) {
616 self.0.senders.unregister(oper);
617 }
618
619 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
620 self.try_select(token)
621 }
622
623 fn is_ready(&self) -> bool {
624 !self.0.is_full() || self.0.is_disconnected()
625 }
626
627 fn watch(&self, oper: Operation, cx: &Context) -> bool {
628 self.0.senders.watch(oper, cx);
629 self.is_ready()
630 }
631
632 fn unwatch(&self, oper: Operation) {
633 self.0.senders.unwatch(oper);
634 }
635}
636