1//! Unbounded channel implemented as a linked list.
2
3use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
16
17// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
18// following changes by @kleimkuhler:
19//
20// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
21// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
22
23// Bits indicating the state of a slot:
24// * If a message has been written into the slot, `WRITE` is set.
25// * If a message has been read from the slot, `READ` is set.
26// * If the block is being destroyed, `DESTROY` is set.
27const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
30
31// Each block covers one "lap" of indices.
32const LAP: usize = 32;
33// The maximum number of messages a block can hold.
34const BLOCK_CAP: usize = LAP - 1;
35// How many lower bits are reserved for metadata.
36const SHIFT: usize = 1;
37// Has two different purposes:
38// * If set in head, indicates that the block is not the last one.
39// * If set in tail, indicates that the channel is disconnected.
40const MARK_BIT: usize = 1;
41
42/// A slot in a block.
43struct Slot<T> {
44 /// The message.
45 msg: UnsafeCell<MaybeUninit<T>>,
46
47 /// The state of the slot.
48 state: AtomicUsize,
49}
50
51impl<T> Slot<T> {
52 const UNINIT: Self = Self {
53 msg: UnsafeCell::new(MaybeUninit::uninit()),
54 state: AtomicUsize::new(0),
55 };
56
57 /// Waits until a message is written into the slot.
58 fn wait_write(&self) {
59 let backoff: Backoff = Backoff::new();
60 while self.state.load(order:Ordering::Acquire) & WRITE == 0 {
61 backoff.snooze();
62 }
63 }
64}
65
66/// A block in a linked list.
67///
68/// Each block in the list can hold up to `BLOCK_CAP` messages.
69struct Block<T> {
70 /// The next block in the linked list.
71 next: AtomicPtr<Block<T>>,
72
73 /// Slots for messages.
74 slots: [Slot<T>; BLOCK_CAP],
75}
76
77impl<T> Block<T> {
78 /// Creates an empty block.
79 fn new() -> Block<T> {
80 Self {
81 next: AtomicPtr::new(ptr::null_mut()),
82 slots: [Slot::UNINIT; BLOCK_CAP],
83 }
84 }
85
86 /// Waits until the next pointer is set.
87 fn wait_next(&self) -> *mut Block<T> {
88 let backoff = Backoff::new();
89 loop {
90 let next = self.next.load(Ordering::Acquire);
91 if !next.is_null() {
92 return next;
93 }
94 backoff.snooze();
95 }
96 }
97
98 /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
99 unsafe fn destroy(this: *mut Block<T>, start: usize) {
100 // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
101 // begun destruction of the block.
102 for i in start..BLOCK_CAP - 1 {
103 let slot = (*this).slots.get_unchecked(i);
104
105 // Mark the `DESTROY` bit if a thread is still using the slot.
106 if slot.state.load(Ordering::Acquire) & READ == 0
107 && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
108 {
109 // If a thread is still using the slot, it will continue destruction of the block.
110 return;
111 }
112 }
113
114 // No thread is using the block, now it is safe to destroy it.
115 drop(Box::from_raw(this));
116 }
117}
118
119/// A position in a channel.
120#[derive(Debug)]
121struct Position<T> {
122 /// The index in the channel.
123 index: AtomicUsize,
124
125 /// The block in the linked list.
126 block: AtomicPtr<Block<T>>,
127}
128
129/// The token type for the list flavor.
130#[derive(Debug)]
131pub(crate) struct ListToken {
132 /// The block of slots.
133 block: *const u8,
134
135 /// The offset into the block.
136 offset: usize,
137}
138
139impl Default for ListToken {
140 #[inline]
141 fn default() -> Self {
142 ListToken {
143 block: ptr::null(),
144 offset: 0,
145 }
146 }
147}
148
149/// Unbounded channel implemented as a linked list.
150///
151/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
152/// represented as numbers of type `usize` and wrap on overflow.
153///
154/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
155/// improve cache efficiency.
156pub(crate) struct Channel<T> {
157 /// The head of the channel.
158 head: CachePadded<Position<T>>,
159
160 /// The tail of the channel.
161 tail: CachePadded<Position<T>>,
162
163 /// Receivers waiting while the channel is empty and not disconnected.
164 receivers: SyncWaker,
165
166 /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
167 _marker: PhantomData<T>,
168}
169
170impl<T> Channel<T> {
171 /// Creates a new unbounded channel.
172 pub(crate) fn new() -> Self {
173 Channel {
174 head: CachePadded::new(Position {
175 block: AtomicPtr::new(ptr::null_mut()),
176 index: AtomicUsize::new(0),
177 }),
178 tail: CachePadded::new(Position {
179 block: AtomicPtr::new(ptr::null_mut()),
180 index: AtomicUsize::new(0),
181 }),
182 receivers: SyncWaker::new(),
183 _marker: PhantomData,
184 }
185 }
186
187 /// Returns a receiver handle to the channel.
188 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
189 Receiver(self)
190 }
191
192 /// Returns a sender handle to the channel.
193 pub(crate) fn sender(&self) -> Sender<'_, T> {
194 Sender(self)
195 }
196
197 /// Attempts to reserve a slot for sending a message.
198 fn start_send(&self, token: &mut Token) -> bool {
199 let backoff = Backoff::new();
200 let mut tail = self.tail.index.load(Ordering::Acquire);
201 let mut block = self.tail.block.load(Ordering::Acquire);
202 let mut next_block = None;
203
204 loop {
205 // Check if the channel is disconnected.
206 if tail & MARK_BIT != 0 {
207 token.list.block = ptr::null();
208 return true;
209 }
210
211 // Calculate the offset of the index into the block.
212 let offset = (tail >> SHIFT) % LAP;
213
214 // If we reached the end of the block, wait until the next one is installed.
215 if offset == BLOCK_CAP {
216 backoff.snooze();
217 tail = self.tail.index.load(Ordering::Acquire);
218 block = self.tail.block.load(Ordering::Acquire);
219 continue;
220 }
221
222 // If we're going to have to install the next block, allocate it in advance in order to
223 // make the wait for other threads as short as possible.
224 if offset + 1 == BLOCK_CAP && next_block.is_none() {
225 next_block = Some(Box::new(Block::<T>::new()));
226 }
227
228 // If this is the first message to be sent into the channel, we need to allocate the
229 // first block and install it.
230 if block.is_null() {
231 let new = Box::into_raw(Box::new(Block::<T>::new()));
232
233 if self
234 .tail
235 .block
236 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
237 .is_ok()
238 {
239 self.head.block.store(new, Ordering::Release);
240 block = new;
241 } else {
242 next_block = unsafe { Some(Box::from_raw(new)) };
243 tail = self.tail.index.load(Ordering::Acquire);
244 block = self.tail.block.load(Ordering::Acquire);
245 continue;
246 }
247 }
248
249 let new_tail = tail + (1 << SHIFT);
250
251 // Try advancing the tail forward.
252 match self.tail.index.compare_exchange_weak(
253 tail,
254 new_tail,
255 Ordering::SeqCst,
256 Ordering::Acquire,
257 ) {
258 Ok(_) => unsafe {
259 // If we've reached the end of the block, install the next one.
260 if offset + 1 == BLOCK_CAP {
261 let next_block = Box::into_raw(next_block.unwrap());
262 self.tail.block.store(next_block, Ordering::Release);
263 self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
264 (*block).next.store(next_block, Ordering::Release);
265 }
266
267 token.list.block = block as *const u8;
268 token.list.offset = offset;
269 return true;
270 },
271 Err(t) => {
272 tail = t;
273 block = self.tail.block.load(Ordering::Acquire);
274 backoff.spin();
275 }
276 }
277 }
278 }
279
280 /// Writes a message into the channel.
281 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
282 // If there is no slot, the channel is disconnected.
283 if token.list.block.is_null() {
284 return Err(msg);
285 }
286
287 // Write the message into the slot.
288 let block = token.list.block.cast::<Block<T>>();
289 let offset = token.list.offset;
290 let slot = (*block).slots.get_unchecked(offset);
291 slot.msg.get().write(MaybeUninit::new(msg));
292 slot.state.fetch_or(WRITE, Ordering::Release);
293
294 // Wake a sleeping receiver.
295 self.receivers.notify();
296 Ok(())
297 }
298
299 /// Attempts to reserve a slot for receiving a message.
300 fn start_recv(&self, token: &mut Token) -> bool {
301 let backoff = Backoff::new();
302 let mut head = self.head.index.load(Ordering::Acquire);
303 let mut block = self.head.block.load(Ordering::Acquire);
304
305 loop {
306 // Calculate the offset of the index into the block.
307 let offset = (head >> SHIFT) % LAP;
308
309 // If we reached the end of the block, wait until the next one is installed.
310 if offset == BLOCK_CAP {
311 backoff.snooze();
312 head = self.head.index.load(Ordering::Acquire);
313 block = self.head.block.load(Ordering::Acquire);
314 continue;
315 }
316
317 let mut new_head = head + (1 << SHIFT);
318
319 if new_head & MARK_BIT == 0 {
320 atomic::fence(Ordering::SeqCst);
321 let tail = self.tail.index.load(Ordering::Relaxed);
322
323 // If the tail equals the head, that means the channel is empty.
324 if head >> SHIFT == tail >> SHIFT {
325 // If the channel is disconnected...
326 if tail & MARK_BIT != 0 {
327 // ...then receive an error.
328 token.list.block = ptr::null();
329 return true;
330 } else {
331 // Otherwise, the receive operation is not ready.
332 return false;
333 }
334 }
335
336 // If head and tail are not in the same block, set `MARK_BIT` in head.
337 if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
338 new_head |= MARK_BIT;
339 }
340 }
341
342 // The block can be null here only if the first message is being sent into the channel.
343 // In that case, just wait until it gets initialized.
344 if block.is_null() {
345 backoff.snooze();
346 head = self.head.index.load(Ordering::Acquire);
347 block = self.head.block.load(Ordering::Acquire);
348 continue;
349 }
350
351 // Try moving the head index forward.
352 match self.head.index.compare_exchange_weak(
353 head,
354 new_head,
355 Ordering::SeqCst,
356 Ordering::Acquire,
357 ) {
358 Ok(_) => unsafe {
359 // If we've reached the end of the block, move to the next one.
360 if offset + 1 == BLOCK_CAP {
361 let next = (*block).wait_next();
362 let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
363 if !(*next).next.load(Ordering::Relaxed).is_null() {
364 next_index |= MARK_BIT;
365 }
366
367 self.head.block.store(next, Ordering::Release);
368 self.head.index.store(next_index, Ordering::Release);
369 }
370
371 token.list.block = block as *const u8;
372 token.list.offset = offset;
373 return true;
374 },
375 Err(h) => {
376 head = h;
377 block = self.head.block.load(Ordering::Acquire);
378 backoff.spin();
379 }
380 }
381 }
382 }
383
384 /// Reads a message from the channel.
385 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
386 if token.list.block.is_null() {
387 // The channel is disconnected.
388 return Err(());
389 }
390
391 // Read the message.
392 let block = token.list.block as *mut Block<T>;
393 let offset = token.list.offset;
394 let slot = (*block).slots.get_unchecked(offset);
395 slot.wait_write();
396 let msg = slot.msg.get().read().assume_init();
397
398 // Destroy the block if we've reached the end, or if another thread wanted to destroy but
399 // couldn't because we were busy reading from the slot.
400 if offset + 1 == BLOCK_CAP {
401 Block::destroy(block, 0);
402 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
403 Block::destroy(block, offset + 1);
404 }
405
406 Ok(msg)
407 }
408
409 /// Attempts to send a message into the channel.
410 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
411 self.send(msg, None).map_err(|err| match err {
412 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
413 SendTimeoutError::Timeout(_) => unreachable!(),
414 })
415 }
416
417 /// Sends a message into the channel.
418 pub(crate) fn send(
419 &self,
420 msg: T,
421 _deadline: Option<Instant>,
422 ) -> Result<(), SendTimeoutError<T>> {
423 let token = &mut Token::default();
424 assert!(self.start_send(token));
425 unsafe {
426 self.write(token, msg)
427 .map_err(SendTimeoutError::Disconnected)
428 }
429 }
430
431 /// Attempts to receive a message without blocking.
432 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
433 let token = &mut Token::default();
434
435 if self.start_recv(token) {
436 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
437 } else {
438 Err(TryRecvError::Empty)
439 }
440 }
441
442 /// Receives a message from the channel.
443 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
444 let token = &mut Token::default();
445 loop {
446 // Try receiving a message several times.
447 let backoff = Backoff::new();
448 loop {
449 if self.start_recv(token) {
450 unsafe {
451 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
452 }
453 }
454
455 if backoff.is_completed() {
456 break;
457 } else {
458 backoff.snooze();
459 }
460 }
461
462 if let Some(d) = deadline {
463 if Instant::now() >= d {
464 return Err(RecvTimeoutError::Timeout);
465 }
466 }
467
468 // Prepare for blocking until a sender wakes us up.
469 Context::with(|cx| {
470 let oper = Operation::hook(token);
471 self.receivers.register(oper, cx);
472
473 // Has the channel become ready just now?
474 if !self.is_empty() || self.is_disconnected() {
475 let _ = cx.try_select(Selected::Aborted);
476 }
477
478 // Block the current thread.
479 let sel = cx.wait_until(deadline);
480
481 match sel {
482 Selected::Waiting => unreachable!(),
483 Selected::Aborted | Selected::Disconnected => {
484 self.receivers.unregister(oper).unwrap();
485 // If the channel was disconnected, we still have to check for remaining
486 // messages.
487 }
488 Selected::Operation(_) => {}
489 }
490 });
491 }
492 }
493
494 /// Returns the current number of messages inside the channel.
495 pub(crate) fn len(&self) -> usize {
496 loop {
497 // Load the tail index, then load the head index.
498 let mut tail = self.tail.index.load(Ordering::SeqCst);
499 let mut head = self.head.index.load(Ordering::SeqCst);
500
501 // If the tail index didn't change, we've got consistent indices to work with.
502 if self.tail.index.load(Ordering::SeqCst) == tail {
503 // Erase the lower bits.
504 tail &= !((1 << SHIFT) - 1);
505 head &= !((1 << SHIFT) - 1);
506
507 // Fix up indices if they fall onto block ends.
508 if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
509 tail = tail.wrapping_add(1 << SHIFT);
510 }
511 if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
512 head = head.wrapping_add(1 << SHIFT);
513 }
514
515 // Rotate indices so that head falls into the first block.
516 let lap = (head >> SHIFT) / LAP;
517 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
518 head = head.wrapping_sub((lap * LAP) << SHIFT);
519
520 // Remove the lower bits.
521 tail >>= SHIFT;
522 head >>= SHIFT;
523
524 // Return the difference minus the number of blocks between tail and head.
525 return tail - head - tail / LAP;
526 }
527 }
528 }
529
530 /// Returns the capacity of the channel.
531 pub(crate) fn capacity(&self) -> Option<usize> {
532 None
533 }
534
535 /// Disconnects senders and wakes up all blocked receivers.
536 ///
537 /// Returns `true` if this call disconnected the channel.
538 pub(crate) fn disconnect_senders(&self) -> bool {
539 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
540
541 if tail & MARK_BIT == 0 {
542 self.receivers.disconnect();
543 true
544 } else {
545 false
546 }
547 }
548
549 /// Disconnects receivers.
550 ///
551 /// Returns `true` if this call disconnected the channel.
552 pub(crate) fn disconnect_receivers(&self) -> bool {
553 let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
554
555 if tail & MARK_BIT == 0 {
556 // If receivers are dropped first, discard all messages to free
557 // memory eagerly.
558 self.discard_all_messages();
559 true
560 } else {
561 false
562 }
563 }
564
565 /// Discards all messages.
566 ///
567 /// This method should only be called when all receivers are dropped.
568 fn discard_all_messages(&self) {
569 let backoff = Backoff::new();
570 let mut tail = self.tail.index.load(Ordering::Acquire);
571 loop {
572 let offset = (tail >> SHIFT) % LAP;
573 if offset != BLOCK_CAP {
574 break;
575 }
576
577 // New updates to tail will be rejected by MARK_BIT and aborted unless it's
578 // at boundary. We need to wait for the updates take affect otherwise there
579 // can be memory leaks.
580 backoff.snooze();
581 tail = self.tail.index.load(Ordering::Acquire);
582 }
583
584 let mut head = self.head.index.load(Ordering::Acquire);
585 let mut block = self.head.block.load(Ordering::Acquire);
586
587 // If we're going to be dropping messages we need to synchronize with initialization
588 if head >> SHIFT != tail >> SHIFT {
589 // The block can be null here only if a sender is in the process of initializing the
590 // channel while another sender managed to send a message by inserting it into the
591 // semi-initialized channel and advanced the tail.
592 // In that case, just wait until it gets initialized.
593 while block.is_null() {
594 backoff.snooze();
595 block = self.head.block.load(Ordering::Acquire);
596 }
597 }
598 unsafe {
599 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
600 while head >> SHIFT != tail >> SHIFT {
601 let offset = (head >> SHIFT) % LAP;
602
603 if offset < BLOCK_CAP {
604 // Drop the message in the slot.
605 let slot = (*block).slots.get_unchecked(offset);
606 slot.wait_write();
607 let p = &mut *slot.msg.get();
608 p.as_mut_ptr().drop_in_place();
609 } else {
610 (*block).wait_next();
611 // Deallocate the block and move to the next one.
612 let next = (*block).next.load(Ordering::Acquire);
613 drop(Box::from_raw(block));
614 block = next;
615 }
616
617 head = head.wrapping_add(1 << SHIFT);
618 }
619
620 // Deallocate the last remaining block.
621 if !block.is_null() {
622 drop(Box::from_raw(block));
623 }
624 }
625 head &= !MARK_BIT;
626 self.head.block.store(ptr::null_mut(), Ordering::Release);
627 self.head.index.store(head, Ordering::Release);
628 }
629
630 /// Returns `true` if the channel is disconnected.
631 pub(crate) fn is_disconnected(&self) -> bool {
632 self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
633 }
634
635 /// Returns `true` if the channel is empty.
636 pub(crate) fn is_empty(&self) -> bool {
637 let head = self.head.index.load(Ordering::SeqCst);
638 let tail = self.tail.index.load(Ordering::SeqCst);
639 head >> SHIFT == tail >> SHIFT
640 }
641
642 /// Returns `true` if the channel is full.
643 pub(crate) fn is_full(&self) -> bool {
644 false
645 }
646}
647
648impl<T> Drop for Channel<T> {
649 fn drop(&mut self) {
650 let mut head = *self.head.index.get_mut();
651 let mut tail = *self.tail.index.get_mut();
652 let mut block = *self.head.block.get_mut();
653
654 // Erase the lower bits.
655 head &= !((1 << SHIFT) - 1);
656 tail &= !((1 << SHIFT) - 1);
657
658 unsafe {
659 // Drop all messages between head and tail and deallocate the heap-allocated blocks.
660 while head != tail {
661 let offset = (head >> SHIFT) % LAP;
662
663 if offset < BLOCK_CAP {
664 // Drop the message in the slot.
665 let slot = (*block).slots.get_unchecked(offset);
666 let p = &mut *slot.msg.get();
667 p.as_mut_ptr().drop_in_place();
668 } else {
669 // Deallocate the block and move to the next one.
670 let next = *(*block).next.get_mut();
671 drop(Box::from_raw(block));
672 block = next;
673 }
674
675 head = head.wrapping_add(1 << SHIFT);
676 }
677
678 // Deallocate the last remaining block.
679 if !block.is_null() {
680 drop(Box::from_raw(block));
681 }
682 }
683 }
684}
685
686/// Receiver handle to a channel.
687pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
688
689/// Sender handle to a channel.
690pub(crate) struct Sender<'a, T>(&'a Channel<T>);
691
692impl<T> SelectHandle for Receiver<'_, T> {
693 fn try_select(&self, token: &mut Token) -> bool {
694 self.0.start_recv(token)
695 }
696
697 fn deadline(&self) -> Option<Instant> {
698 None
699 }
700
701 fn register(&self, oper: Operation, cx: &Context) -> bool {
702 self.0.receivers.register(oper, cx);
703 self.is_ready()
704 }
705
706 fn unregister(&self, oper: Operation) {
707 self.0.receivers.unregister(oper);
708 }
709
710 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
711 self.try_select(token)
712 }
713
714 fn is_ready(&self) -> bool {
715 !self.0.is_empty() || self.0.is_disconnected()
716 }
717
718 fn watch(&self, oper: Operation, cx: &Context) -> bool {
719 self.0.receivers.watch(oper, cx);
720 self.is_ready()
721 }
722
723 fn unwatch(&self, oper: Operation) {
724 self.0.receivers.unwatch(oper);
725 }
726}
727
728impl<T> SelectHandle for Sender<'_, T> {
729 fn try_select(&self, token: &mut Token) -> bool {
730 self.0.start_send(token)
731 }
732
733 fn deadline(&self) -> Option<Instant> {
734 None
735 }
736
737 fn register(&self, _oper: Operation, _cx: &Context) -> bool {
738 self.is_ready()
739 }
740
741 fn unregister(&self, _oper: Operation) {}
742
743 fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
744 self.try_select(token)
745 }
746
747 fn is_ready(&self) -> bool {
748 true
749 }
750
751 fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
752 self.is_ready()
753 }
754
755 fn unwatch(&self, _oper: Operation) {}
756}
757