1//! Unbounded channel implemented as a linked list.
2
3use std::boxed::Box;
4use std::cell::UnsafeCell;
5use std::marker::PhantomData;
6use std::mem::MaybeUninit;
7use std::ptr;
8use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
9use std::time::Instant;
10
11use crossbeam_utils::{Backoff, CachePadded};
12
13use crate::context::Context;
14use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
15use crate::select::{Operation, SelectHandle, Selected, Token};
16use crate::waker::SyncWaker;
17
18// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
19// following changes by @kleimkuhler:
20//
21// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
22// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
23
24// Bits indicating the state of a slot:
25// * If a message has been written into the slot, `WRITE` is set.
26// * If a message has been read from the slot, `READ` is set.
27// * If the block is being destroyed, `DESTROY` is set.
28const WRITE: usize = 1;
29const READ: usize = 2;
30const DESTROY: usize = 4;
31
32// Each block covers one "lap" of indices.
33const LAP: usize = 32;
34// The maximum number of messages a block can hold.
35const BLOCK_CAP: usize = LAP - 1;
36// How many lower bits are reserved for metadata.
37const SHIFT: usize = 1;
38// Has two different purposes:
39// * If set in head, indicates that the block is not the last one.
40// * If set in tail, indicates that the channel is disconnected.
41const MARK_BIT: usize = 1;
42
43/// A slot in a block.
44struct Slot<T> {
45 /// The message.
46 msg: UnsafeCell<MaybeUninit<T>>,
47
48 /// The state of the slot.
49 state: AtomicUsize,
50}
51
52impl<T> Slot<T> {
53 const UNINIT: Self = Self {
54 msg: UnsafeCell::new(MaybeUninit::uninit()),
55 state: AtomicUsize::new(0),
56 };
57
58 /// Waits until a message is written into the slot.
59 fn wait_write(&self) {
60 let backoff: Backoff = Backoff::new();
61 while self.state.load(order:Ordering::Acquire) & WRITE == 0 {
62 backoff.snooze();
63 }
64 }
65}
66
67/// A block in a linked list.
68///
69/// Each block in the list can hold up to `BLOCK_CAP` messages.
70struct Block<T> {
71 /// The next block in the linked list.
72 next: AtomicPtr<Block<T>>,
73
74 /// Slots for messages.
75 slots: [Slot<T>; BLOCK_CAP],
76}
77
78impl<T> Block<T> {
79 /// Creates an empty block.
80 fn new() -> Block<T> {
81 Self {
82 next: AtomicPtr::new(ptr::null_mut()),
83 slots: [Slot::UNINIT; BLOCK_CAP],
84 }
85 }
86
87 /// Waits until the next pointer is set.
88 fn wait_next(&self) -> *mut Block<T> {
89 let backoff = Backoff::new();
90 loop {
91 let next = self.next.load(Ordering::Acquire);
92 if !next.is_null() {
93 return next;
94 }
95 backoff.snooze();
96 }
97 }
98
99 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
100 unsafe fn destroy(this: *mut Block<T>, start: usize) {
101 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
102 // begun destruction of the block.
103 for i in start..BLOCK_CAP - 1 {
104 let slot = (*this).slots.get_unchecked(i);
105
106 // Mark the `DESTROY` bit if a thread is still using the slot.
107 if slot.state.load(Ordering::Acquire) & READ == 0
108 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
109 {
110 // If a thread is still using the slot, it will continue destruction of the block.
111 return;
112 }
113 }
114
115 // No thread is using the block, now it is safe to destroy it.
116 drop(Box::from_raw(this));
117 }
118}
119
120/// A position in a channel.
121#[derive(Debug)]
122struct Position<T> {
123 /// The index in the channel.
124 index: AtomicUsize,
125
126 /// The block in the linked list.
127 block: AtomicPtr<Block<T>>,
128}
129
130/// The token type for the list flavor.
131#[derive(Debug)]
132pub(crate) struct ListToken {
133 /// The block of slots.
134 block: *const u8,
135
136 /// The offset into the block.
137 offset: usize,
138}
139
140impl Default for ListToken {
141 #[inline]
142 fn default() -> Self {
143 ListToken {
144 block: ptr::null(),
145 offset: 0,
146 }
147 }
148}
149
150/// Unbounded channel implemented as a linked list.
151///
152/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
153/// represented as numbers of type `usize` and wrap on overflow.
154///
155/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
156/// improve cache efficiency.
157pub(crate) struct Channel<T> {
158 /// The head of the channel.
159 head: CachePadded<Position<T>>,
160
161 /// The tail of the channel.
162 tail: CachePadded<Position<T>>,
163
164 /// Receivers waiting while the channel is empty and not disconnected.
165 receivers: SyncWaker,
166
167 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
168 _marker: PhantomData<T>,
169}
170
171impl<T> Channel<T> {
172 /// Creates a new unbounded channel.
173 pub(crate) fn new() -> Self {
174 Channel {
175 head: CachePadded::new(Position {
176 block: AtomicPtr::new(ptr::null_mut()),
177 index: AtomicUsize::new(0),
178 }),
179 tail: CachePadded::new(Position {
180 block: AtomicPtr::new(ptr::null_mut()),
181 index: AtomicUsize::new(0),
182 }),
183 receivers: SyncWaker::new(),
184 _marker: PhantomData,
185 }
186 }
187
188 /// Returns a receiver handle to the channel.
189 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
190 Receiver(self)
191 }
192
193 /// Returns a sender handle to the channel.
194 pub(crate) fn sender(&self) -> Sender<'_, T> {
195 Sender(self)
196 }
197
198 /// Attempts to reserve a slot for sending a message.
199 fn start_send(&self, token: &mut Token) -> bool {
200 let backoff = Backoff::new();
201 let mut tail = self.tail.index.load(Ordering::Acquire);
202 let mut block = self.tail.block.load(Ordering::Acquire);
203 let mut next_block = None;
204
205 loop {
206 // Check if the channel is disconnected.
207 if tail & MARK_BIT != 0 {
208 token.list.block = ptr::null();
209 return true;
210 }
211
212 // Calculate the offset of the index into the block.
213 let offset = (tail >> SHIFT) % LAP;
214
215 // If we reached the end of the block, wait until the next one is installed.
216 if offset == BLOCK_CAP {
217 backoff.snooze();
218 tail = self.tail.index.load(Ordering::Acquire);
219 block = self.tail.block.load(Ordering::Acquire);
220 continue;
221 }
222
223 // If we're going to have to install the next block, allocate it in advance in order to
224 // make the wait for other threads as short as possible.
225 if offset + 1 == BLOCK_CAP && next_block.is_none() {
226 next_block = Some(Box::new(Block::<T>::new()));
227 }
228
229 // If this is the first message to be sent into the channel, we need to allocate the
230 // first block and install it.
231 if block.is_null() {
232 let new = Box::into_raw(Box::new(Block::<T>::new()));
233
234 if self
235 .tail
236 .block
237 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
238 .is_ok()
239 {
240 self.head.block.store(new, Ordering::Release);
241 block = new;
242 } else {
243 next_block = unsafe { Some(Box::from_raw(new)) };
244 tail = self.tail.index.load(Ordering::Acquire);
245 block = self.tail.block.load(Ordering::Acquire);
246 continue;
247 }
248 }
249
250 let new_tail = tail + (1 << SHIFT);
251
252 // Try advancing the tail forward.
253 match self.tail.index.compare_exchange_weak(
254 tail,
255 new_tail,
256 Ordering::SeqCst,
257 Ordering::Acquire,
258 ) {
259 Ok(_) => unsafe {
260 // If we've reached the end of the block, install the next one.
261 if offset + 1 == BLOCK_CAP {
262 let next_block = Box::into_raw(next_block.unwrap());
263 self.tail.block.store(next_block, Ordering::Release);
264 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
265 (*block).next.store(next_block, Ordering::Release);
266 }
267
268 token.list.block = block as *const u8;
269 token.list.offset = offset;
270 return true;
271 },
272 Err(t) => {
273 tail = t;
274 block = self.tail.block.load(Ordering::Acquire);
275 backoff.spin();
276 }
277 }
278 }
279 }
280
281 /// Writes a message into the channel.
282 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
283 // If there is no slot, the channel is disconnected.
284 if token.list.block.is_null() {
285 return Err(msg);
286 }
287
288 // Write the message into the slot.
289 let block = token.list.block.cast::<Block<T>>();
290 let offset = token.list.offset;
291 let slot = (*block).slots.get_unchecked(offset);
292 slot.msg.get().write(MaybeUninit::new(msg));
293 slot.state.fetch_or(WRITE, Ordering::Release);
294
295 // Wake a sleeping receiver.
296 self.receivers.notify();
297 Ok(())
298 }
299
300 /// Attempts to reserve a slot for receiving a message.
301 fn start_recv(&self, token: &mut Token) -> bool {
302 let backoff = Backoff::new();
303 let mut head = self.head.index.load(Ordering::Acquire);
304 let mut block = self.head.block.load(Ordering::Acquire);
305
306 loop {
307 // Calculate the offset of the index into the block.
308 let offset = (head >> SHIFT) % LAP;
309
310 // If we reached the end of the block, wait until the next one is installed.
311 if offset == BLOCK_CAP {
312 backoff.snooze();
313 head = self.head.index.load(Ordering::Acquire);
314 block = self.head.block.load(Ordering::Acquire);
315 continue;
316 }
317
318 let mut new_head = head + (1 << SHIFT);
319
320 if new_head & MARK_BIT == 0 {
321 atomic::fence(Ordering::SeqCst);
322 let tail = self.tail.index.load(Ordering::Relaxed);
323
324 // If the tail equals the head, that means the channel is empty.
325 if head >> SHIFT == tail >> SHIFT {
326 // If the channel is disconnected...
327 if tail & MARK_BIT != 0 {
328 // ...then receive an error.
329 token.list.block = ptr::null();
330 return true;
331 } else {
332 // Otherwise, the receive operation is not ready.
333 return false;
334 }
335 }
336
337 // If head and tail are not in the same block, set `MARK_BIT` in head.
338 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
339 new_head |= MARK_BIT;
340 }
341 }
342
343 // The block can be null here only if the first message is being sent into the channel.
344 // In that case, just wait until it gets initialized.
345 if block.is_null() {
346 backoff.snooze();
347 head = self.head.index.load(Ordering::Acquire);
348 block = self.head.block.load(Ordering::Acquire);
349 continue;
350 }
351
352 // Try moving the head index forward.
353 match self.head.index.compare_exchange_weak(
354 head,
355 new_head,
356 Ordering::SeqCst,
357 Ordering::Acquire,
358 ) {
359 Ok(_) => unsafe {
360 // If we've reached the end of the block, move to the next one.
361 if offset + 1 == BLOCK_CAP {
362 let next = (*block).wait_next();
363 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
364 if !(*next).next.load(Ordering::Relaxed).is_null() {
365 next_index |= MARK_BIT;
366 }
367
368 self.head.block.store(next, Ordering::Release);
369 self.head.index.store(next_index, Ordering::Release);
370 }
371
372 token.list.block = block as *const u8;
373 token.list.offset = offset;
374 return true;
375 },
376 Err(h) => {
377 head = h;
378 block = self.head.block.load(Ordering::Acquire);
379 backoff.spin();
380 }
381 }
382 }
383 }
384
385 /// Reads a message from the channel.
386 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
387 if token.list.block.is_null() {
388 // The channel is disconnected.
389 return Err(());
390 }
391
392 // Read the message.
393 let block = token.list.block as *mut Block<T>;
394 let offset = token.list.offset;
395 let slot = (*block).slots.get_unchecked(offset);
396 slot.wait_write();
397 let msg = slot.msg.get().read().assume_init();
398
399 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
400 // couldn't because we were busy reading from the slot.
401 if offset + 1 == BLOCK_CAP {
402 Block::destroy(block, 0);
403 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
404 Block::destroy(block, offset + 1);
405 }
406
407 Ok(msg)
408 }
409
410 /// Attempts to send a message into the channel.
411 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
412 self.send(msg, None).map_err(|err| match err {
413 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
414 SendTimeoutError::Timeout(_) => unreachable!(),
415 })
416 }
417
418 /// Sends a message into the channel.
419 pub(crate) fn send(
420 &self,
421 msg: T,
422 _deadline: Option<Instant>,
423 ) -> Result<(), SendTimeoutError<T>> {
424 let token = &mut Token::default();
425 assert!(self.start_send(token));
426 unsafe {
427 self.write(token, msg)
428 .map_err(SendTimeoutError::Disconnected)
429 }
430 }
431
432 /// Attempts to receive a message without blocking.
433 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
434 let token = &mut Token::default();
435
436 if self.start_recv(token) {
437 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
438 } else {
439 Err(TryRecvError::Empty)
440 }
441 }
442
443 /// Receives a message from the channel.
444 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
445 let token = &mut Token::default();
446 loop {
447 // Try receiving a message several times.
448 let backoff = Backoff::new();
449 loop {
450 if self.start_recv(token) {
451 unsafe {
452 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
453 }
454 }
455
456 if backoff.is_completed() {
457 break;
458 } else {
459 backoff.snooze();
460 }
461 }
462
463 if let Some(d) = deadline {
464 if Instant::now() >= d {
465 return Err(RecvTimeoutError::Timeout);
466 }
467 }
468
469 // Prepare for blocking until a sender wakes us up.
470 Context::with(|cx| {
471 let oper = Operation::hook(token);
472 self.receivers.register(oper, cx);
473
474 // Has the channel become ready just now?
475 if !self.is_empty() || self.is_disconnected() {
476 let _ = cx.try_select(Selected::Aborted);
477 }
478
479 // Block the current thread.
480 let sel = cx.wait_until(deadline);
481
482 match sel {
483 Selected::Waiting => unreachable!(),
484 Selected::Aborted | Selected::Disconnected => {
485 self.receivers.unregister(oper).unwrap();
486 // If the channel was disconnected, we still have to check for remaining
487 // messages.
488 }
489 Selected::Operation(_) => {}
490 }
491 });
492 }
493 }
494
495 /// Returns the current number of messages inside the channel.
496 pub(crate) fn len(&self) -> usize {
497 loop {
498 // Load the tail index, then load the head index.
499 let mut tail = self.tail.index.load(Ordering::SeqCst);
500 let mut head = self.head.index.load(Ordering::SeqCst);
501
502 // If the tail index didn't change, we've got consistent indices to work with.
503 if self.tail.index.load(Ordering::SeqCst) == tail {
504 // Erase the lower bits.
505 tail &= !((1 << SHIFT) - 1);
506 head &= !((1 << SHIFT) - 1);
507
508 // Fix up indices if they fall onto block ends.
509 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
510 tail = tail.wrapping_add(1 << SHIFT);
511 }
512 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
513 head = head.wrapping_add(1 << SHIFT);
514 }
515
516 // Rotate indices so that head falls into the first block.
517 let lap = (head >> SHIFT) / LAP;
518 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
519 head = head.wrapping_sub((lap * LAP) << SHIFT);
520
521 // Remove the lower bits.
522 tail >>= SHIFT;
523 head >>= SHIFT;
524
525 // Return the difference minus the number of blocks between tail and head.
526 return tail - head - tail / LAP;
527 }
528 }
529 }
530
531 /// Returns the capacity of the channel.
532 pub(crate) fn capacity(&self) -> Option<usize> {
533 None
534 }
535
536 /// Disconnects senders and wakes up all blocked receivers.
537 ///
538 /// Returns `true` if this call disconnected the channel.
539 pub(crate) fn disconnect_senders(&self) -> bool {
540 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
541
542 if tail & MARK_BIT == 0 {
543 self.receivers.disconnect();
544 true
545 } else {
546 false
547 }
548 }
549
550 /// Disconnects receivers.
551 ///
552 /// Returns `true` if this call disconnected the channel.
553 pub(crate) fn disconnect_receivers(&self) -> bool {
554 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
555
556 if tail & MARK_BIT == 0 {
557 // If receivers are dropped first, discard all messages to free
558 // memory eagerly.
559 self.discard_all_messages();
560 true
561 } else {
562 false
563 }
564 }
565
566 /// Discards all messages.
567 ///
568 /// This method should only be called when all receivers are dropped.
569 fn discard_all_messages(&self) {
570 let backoff = Backoff::new();
571 let mut tail = self.tail.index.load(Ordering::Acquire);
572 loop {
573 let offset = (tail >> SHIFT) % LAP;
574 if offset != BLOCK_CAP {
575 break;
576 }
577
578 // New updates to tail will be rejected by MARK_BIT and aborted unless it's
579 // at boundary. We need to wait for the updates take affect otherwise there
580 // can be memory leaks.
581 backoff.snooze();
582 tail = self.tail.index.load(Ordering::Acquire);
583 }
584
585 let mut head = self.head.index.load(Ordering::Acquire);
586 let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
587
588 // If we're going to be dropping messages we need to synchronize with initialization
589 if head >> SHIFT != tail >> SHIFT {
590 // The block can be null here only if a sender is in the process of initializing the
591 // channel while another sender managed to send a message by inserting it into the
592 // semi-initialized channel and advanced the tail.
593 // In that case, just wait until it gets initialized.
594 while block.is_null() {
595 backoff.snooze();
596 block = self.head.block.load(Ordering::Acquire);
597 }
598 }
599
600 unsafe {
601 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
602 while head >> SHIFT != tail >> SHIFT {
603 let offset = (head >> SHIFT) % LAP;
604
605 if offset < BLOCK_CAP {
606 // Drop the message in the slot.
607 let slot = (*block).slots.get_unchecked(offset);
608 slot.wait_write();
609 (*slot.msg.get()).assume_init_drop();
610 } else {
611 (*block).wait_next();
612 // Deallocate the block and move to the next one.
613 let next = (*block).next.load(Ordering::Acquire);
614 drop(Box::from_raw(block));
615 block = next;
616 }
617
618 head = head.wrapping_add(1 << SHIFT);
619 }
620
621 // Deallocate the last remaining block.
622 if !block.is_null() {
623 drop(Box::from_raw(block));
624 }
625 }
626 head &= !MARK_BIT;
627 self.head.index.store(head, Ordering::Release);
628 }
629
630 /// Returns `true` if the channel is disconnected.
631 pub(crate) fn is_disconnected(&self) -> bool {
632 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
633 }
634
635 /// Returns `true` if the channel is empty.
636 pub(crate) fn is_empty(&self) -> bool {
637 let head = self.head.index.load(Ordering::SeqCst);
638 let tail = self.tail.index.load(Ordering::SeqCst);
639 head >> SHIFT == tail >> SHIFT
640 }
641
642 /// Returns `true` if the channel is full.
643 pub(crate) fn is_full(&self) -> bool {
644 false
645 }
646}
647
648impl<T> Drop for Channel<T> {
649 fn drop(&mut self) {
650 let mut head = *self.head.index.get_mut();
651 let mut tail = *self.tail.index.get_mut();
652 let mut block = *self.head.block.get_mut();
653
654 // Erase the lower bits.
655 head &= !((1 << SHIFT) - 1);
656 tail &= !((1 << SHIFT) - 1);
657
658 unsafe {
659 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
660 while head != tail {
661 let offset = (head >> SHIFT) % LAP;
662
663 if offset < BLOCK_CAP {
664 // Drop the message in the slot.
665 let slot = (*block).slots.get_unchecked(offset);
666 (*slot.msg.get()).assume_init_drop();
667 } else {
668 // Deallocate the block and move to the next one.
669 let next = *(*block).next.get_mut();
670 drop(Box::from_raw(block));
671 block = next;
672 }
673
674 head = head.wrapping_add(1 << SHIFT);
675 }
676
677 // Deallocate the last remaining block.
678 if !block.is_null() {
679 drop(Box::from_raw(block));
680 }
681 }
682 }
683}
684
685/// Receiver handle to a channel.
686pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
687
688/// Sender handle to a channel.
689pub(crate) struct Sender<'a, T>(&'a Channel<T>);
690
691impl<T> SelectHandle for Receiver<'_, T> {
692 fn try_select(&self, token: &mut Token) -> bool {
693 self.0.start_recv(token)
694 }
695
696 fn deadline(&self) -> Option<Instant> {
697 None
698 }
699
700 fn register(&self, oper: Operation, cx: &Context) -> bool {
701 self.0.receivers.register(oper, cx);
702 self.is_ready()
703 }
704
705 fn unregister(&self, oper: Operation) {
706 self.0.receivers.unregister(oper);
707 }
708
709 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
710 self.try_select(token)
711 }
712
713 fn is_ready(&self) -> bool {
714 !self.0.is_empty() || self.0.is_disconnected()
715 }
716
717 fn watch(&self, oper: Operation, cx: &Context) -> bool {
718 self.0.receivers.watch(oper, cx);
719 self.is_ready()
720 }
721
722 fn unwatch(&self, oper: Operation) {
723 self.0.receivers.unwatch(oper);
724 }
725}
726
727impl<T> SelectHandle for Sender<'_, T> {
728 fn try_select(&self, token: &mut Token) -> bool {
729 self.0.start_send(token)
730 }
731
732 fn deadline(&self) -> Option<Instant> {
733 None
734 }
735
736 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
737 self.is_ready()
738 }
739
740 fn unregister(&self, _oper: Operation) {}
741
742 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
743 self.try_select(token)
744 }
745
746 fn is_ready(&self) -> bool {
747 true
748 }
749
750 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
751 self.is_ready()
752 }
753
754 fn unwatch(&self, _oper: Operation) {}
755}
756