1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::AtomicUsize;
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
123use crate::util::WakeList;
124
125use std::fmt;
126use std::future::Future;
127use std::marker::PhantomPinned;
128use std::pin::Pin;
129use std::ptr::NonNull;
130use std::sync::atomic::Ordering::SeqCst;
131use std::task::{Context, Poll, Waker};
132use std::usize;
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let (tx, mut rx1) = broadcast::channel(16);
147/// let mut rx2 = tx.subscribe();
148///
149/// tokio::spawn(async move {
150/// assert_eq!(rx1.recv().await.unwrap(), 10);
151/// assert_eq!(rx1.recv().await.unwrap(), 20);
152/// });
153///
154/// tokio::spawn(async move {
155/// assert_eq!(rx2.recv().await.unwrap(), 10);
156/// assert_eq!(rx2.recv().await.unwrap(), 20);
157/// });
158///
159/// tx.send(10).unwrap();
160/// tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167}
168
169/// Receiving-half of the [`broadcast`] channel.
170///
171/// Must not be used concurrently. Messages may be retrieved using
172/// [`recv`][Receiver::recv].
173///
174/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175/// wrapper.
176///
177/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178///
179/// # Examples
180///
181/// ```
182/// use tokio::sync::broadcast;
183///
184/// #[tokio::main]
185/// async fn main() {
186/// let (tx, mut rx1) = broadcast::channel(16);
187/// let mut rx2 = tx.subscribe();
188///
189/// tokio::spawn(async move {
190/// assert_eq!(rx1.recv().await.unwrap(), 10);
191/// assert_eq!(rx1.recv().await.unwrap(), 20);
192/// });
193///
194/// tokio::spawn(async move {
195/// assert_eq!(rx2.recv().await.unwrap(), 10);
196/// assert_eq!(rx2.recv().await.unwrap(), 20);
197/// });
198///
199/// tx.send(10).unwrap();
200/// tx.send(20).unwrap();
201/// }
202/// ```
203///
204/// [`broadcast`]: crate::sync::broadcast
205pub struct Receiver<T> {
206 /// State shared with all receivers and senders.
207 shared: Arc<Shared<T>>,
208
209 /// Next position to read from
210 next: u64,
211}
212
213pub mod error {
214 //! Broadcast error types
215
216 use std::fmt;
217
218 /// Error returned by the [`send`] function on a [`Sender`].
219 ///
220 /// A **send** operation can only fail if there are no active receivers,
221 /// implying that the message could never be received. The error contains the
222 /// message being sent as a payload so it can be recovered.
223 ///
224 /// [`send`]: crate::sync::broadcast::Sender::send
225 /// [`Sender`]: crate::sync::broadcast::Sender
226 #[derive(Debug)]
227 pub struct SendError<T>(pub T);
228
229 impl<T> fmt::Display for SendError<T> {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 write!(f, "channel closed")
232 }
233 }
234
235 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237 /// An error returned from the [`recv`] function on a [`Receiver`].
238 ///
239 /// [`recv`]: crate::sync::broadcast::Receiver::recv
240 /// [`Receiver`]: crate::sync::broadcast::Receiver
241 #[derive(Debug, PartialEq, Eq, Clone)]
242 pub enum RecvError {
243 /// There are no more active senders implying no further messages will ever
244 /// be sent.
245 Closed,
246
247 /// The receiver lagged too far behind. Attempting to receive again will
248 /// return the oldest message still retained by the channel.
249 ///
250 /// Includes the number of skipped messages.
251 Lagged(u64),
252 }
253
254 impl fmt::Display for RecvError {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 RecvError::Closed => write!(f, "channel closed"),
258 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
259 }
260 }
261 }
262
263 impl std::error::Error for RecvError {}
264
265 /// An error returned from the [`try_recv`] function on a [`Receiver`].
266 ///
267 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268 /// [`Receiver`]: crate::sync::broadcast::Receiver
269 #[derive(Debug, PartialEq, Eq, Clone)]
270 pub enum TryRecvError {
271 /// The channel is currently empty. There are still active
272 /// [`Sender`] handles, so data may yet become available.
273 ///
274 /// [`Sender`]: crate::sync::broadcast::Sender
275 Empty,
276
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind and has been forcibly disconnected.
282 /// Attempting to receive again will return the oldest message still
283 /// retained by the channel.
284 ///
285 /// Includes the number of skipped messages.
286 Lagged(u64),
287 }
288
289 impl fmt::Display for TryRecvError {
290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291 match self {
292 TryRecvError::Empty => write!(f, "channel empty"),
293 TryRecvError::Closed => write!(f, "channel closed"),
294 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
295 }
296 }
297 }
298
299 impl std::error::Error for TryRecvError {}
300}
301
302use self::error::{RecvError, SendError, TryRecvError};
303
304/// Data shared between senders and receivers.
305struct Shared<T> {
306 /// slots in the channel.
307 buffer: Box<[RwLock<Slot<T>>]>,
308
309 /// Mask a position -> index.
310 mask: usize,
311
312 /// Tail of the queue. Includes the rx wait list.
313 tail: Mutex<Tail>,
314
315 /// Number of outstanding Sender handles.
316 num_tx: AtomicUsize,
317}
318
319/// Next position to write a value.
320struct Tail {
321 /// Next position to write to.
322 pos: u64,
323
324 /// Number of active receivers.
325 rx_cnt: usize,
326
327 /// True if the channel is closed.
328 closed: bool,
329
330 /// Receivers waiting for a value.
331 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
332}
333
334/// Slot in the buffer.
335struct Slot<T> {
336 /// Remaining number of receivers that are expected to see this value.
337 ///
338 /// When this goes to zero, the value is released.
339 ///
340 /// An atomic is used as it is mutated concurrently with the slot read lock
341 /// acquired.
342 rem: AtomicUsize,
343
344 /// Uniquely identifies the `send` stored in the slot.
345 pos: u64,
346
347 /// The value being broadcast.
348 ///
349 /// The value is set by `send` when the write lock is held. When a reader
350 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351 val: UnsafeCell<Option<T>>,
352}
353
354/// An entry in the wait queue.
355struct Waiter {
356 /// True if queued.
357 queued: bool,
358
359 /// Task waiting on the broadcast channel.
360 waker: Option<Waker>,
361
362 /// Intrusive linked-list pointers.
363 pointers: linked_list::Pointers<Waiter>,
364
365 /// Should not be `Unpin`.
366 _p: PhantomPinned,
367}
368
369impl Waiter {
370 fn new() -> Self {
371 Self {
372 queued: false,
373 waker: None,
374 pointers: linked_list::Pointers::new(),
375 _p: PhantomPinned,
376 }
377 }
378}
379
380generate_addr_of_methods! {
381 impl<> Waiter {
382 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
383 &self.pointers
384 }
385 }
386}
387
388struct RecvGuard<'a, T> {
389 slot: RwLockReadGuard<'a, Slot<T>>,
390}
391
392/// Receive a value future.
393struct Recv<'a, T> {
394 /// Receiver being waited on.
395 receiver: &'a mut Receiver<T>,
396
397 /// Entry in the waiter `LinkedList`.
398 waiter: UnsafeCell<Waiter>,
399}
400
401unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
402unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
403
404/// Max number of receivers. Reserve space to lock.
405const MAX_RECEIVERS: usize = usize::MAX >> 2;
406
407/// Create a bounded, multi-producer, multi-consumer channel where each sent
408/// value is broadcasted to all active receivers.
409///
410/// **Note:** The actual capacity may be greater than the provided `capacity`.
411///
412/// All data sent on [`Sender`] will become available on every active
413/// [`Receiver`] in the same order as it was sent.
414///
415/// The `Sender` can be cloned to `send` to the same channel from multiple
416/// points in the process or it can be used concurrently from an `Arc`. New
417/// `Receiver` handles are created by calling [`Sender::subscribe`].
418///
419/// If all [`Receiver`] handles are dropped, the `send` method will return a
420/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
421/// method will return a [`RecvError`].
422///
423/// [`Sender`]: crate::sync::broadcast::Sender
424/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
425/// [`Receiver`]: crate::sync::broadcast::Receiver
426/// [`recv`]: crate::sync::broadcast::Receiver::recv
427/// [`SendError`]: crate::sync::broadcast::error::SendError
428/// [`RecvError`]: crate::sync::broadcast::error::RecvError
429///
430/// # Examples
431///
432/// ```
433/// use tokio::sync::broadcast;
434///
435/// #[tokio::main]
436/// async fn main() {
437/// let (tx, mut rx1) = broadcast::channel(16);
438/// let mut rx2 = tx.subscribe();
439///
440/// tokio::spawn(async move {
441/// assert_eq!(rx1.recv().await.unwrap(), 10);
442/// assert_eq!(rx1.recv().await.unwrap(), 20);
443/// });
444///
445/// tokio::spawn(async move {
446/// assert_eq!(rx2.recv().await.unwrap(), 10);
447/// assert_eq!(rx2.recv().await.unwrap(), 20);
448/// });
449///
450/// tx.send(10).unwrap();
451/// tx.send(20).unwrap();
452/// }
453/// ```
454///
455/// # Panics
456///
457/// This will panic if `capacity` is equal to `0` or larger
458/// than `usize::MAX / 2`.
459#[track_caller]
460pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
461 // SAFETY: In the line below we are creating one extra receiver, so there will be 1 in total.
462 let tx = unsafe { Sender::new_with_receiver_count(1, capacity) };
463 let rx = Receiver {
464 shared: tx.shared.clone(),
465 next: 0,
466 };
467 (tx, rx)
468}
469
470unsafe impl<T: Send> Send for Sender<T> {}
471unsafe impl<T: Send> Sync for Sender<T> {}
472
473unsafe impl<T: Send> Send for Receiver<T> {}
474unsafe impl<T: Send> Sync for Receiver<T> {}
475
476impl<T> Sender<T> {
477 /// Creates the sending-half of the [`broadcast`] channel.
478 ///
479 /// See the documentation of [`broadcast::channel`] for more information on this method.
480 ///
481 /// [`broadcast`]: crate::sync::broadcast
482 /// [`broadcast::channel`]: crate::sync::broadcast::channel
483 #[track_caller]
484 pub fn new(capacity: usize) -> Self {
485 // SAFETY: We don't create extra receivers, so there are 0.
486 unsafe { Self::new_with_receiver_count(0, capacity) }
487 }
488
489 /// Creates the sending-half of the [`broadcast`](self) channel, and provide the receiver
490 /// count.
491 ///
492 /// See the documentation of [`broadcast::channel`](self::channel) for more errors when
493 /// calling this function.
494 ///
495 /// # Safety:
496 ///
497 /// The caller must ensure that the amount of receivers for this Sender is correct before
498 /// the channel functionalities are used, the count is zero by default, as this function
499 /// does not create any receivers by itself.
500 #[track_caller]
501 unsafe fn new_with_receiver_count(receiver_count: usize, mut capacity: usize) -> Self {
502 assert!(capacity > 0, "broadcast channel capacity cannot be zero");
503 assert!(
504 capacity <= usize::MAX >> 1,
505 "broadcast channel capacity exceeded `usize::MAX / 2`"
506 );
507
508 // Round to a power of two
509 capacity = capacity.next_power_of_two();
510
511 let mut buffer = Vec::with_capacity(capacity);
512
513 for i in 0..capacity {
514 buffer.push(RwLock::new(Slot {
515 rem: AtomicUsize::new(0),
516 pos: (i as u64).wrapping_sub(capacity as u64),
517 val: UnsafeCell::new(None),
518 }));
519 }
520
521 let shared = Arc::new(Shared {
522 buffer: buffer.into_boxed_slice(),
523 mask: capacity - 1,
524 tail: Mutex::new(Tail {
525 pos: 0,
526 rx_cnt: receiver_count,
527 closed: false,
528 waiters: LinkedList::new(),
529 }),
530 num_tx: AtomicUsize::new(1),
531 });
532
533 Sender { shared }
534 }
535
536 /// Attempts to send a value to all active [`Receiver`] handles, returning
537 /// it back if it could not be sent.
538 ///
539 /// A successful send occurs when there is at least one active [`Receiver`]
540 /// handle. An unsuccessful send would be one where all associated
541 /// [`Receiver`] handles have already been dropped.
542 ///
543 /// # Return
544 ///
545 /// On success, the number of subscribed [`Receiver`] handles is returned.
546 /// This does not mean that this number of receivers will see the message as
547 /// a receiver may drop or lag ([see lagging](self#lagging)) before receiving
548 /// the message.
549 ///
550 /// # Note
551 ///
552 /// A return value of `Ok` **does not** mean that the sent value will be
553 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
554 /// handles may be dropped before receiving the sent message.
555 ///
556 /// A return value of `Err` **does not** mean that future calls to `send`
557 /// will fail. New [`Receiver`] handles may be created by calling
558 /// [`subscribe`].
559 ///
560 /// [`Receiver`]: crate::sync::broadcast::Receiver
561 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// use tokio::sync::broadcast;
567 ///
568 /// #[tokio::main]
569 /// async fn main() {
570 /// let (tx, mut rx1) = broadcast::channel(16);
571 /// let mut rx2 = tx.subscribe();
572 ///
573 /// tokio::spawn(async move {
574 /// assert_eq!(rx1.recv().await.unwrap(), 10);
575 /// assert_eq!(rx1.recv().await.unwrap(), 20);
576 /// });
577 ///
578 /// tokio::spawn(async move {
579 /// assert_eq!(rx2.recv().await.unwrap(), 10);
580 /// assert_eq!(rx2.recv().await.unwrap(), 20);
581 /// });
582 ///
583 /// tx.send(10).unwrap();
584 /// tx.send(20).unwrap();
585 /// }
586 /// ```
587 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
588 let mut tail = self.shared.tail.lock();
589
590 if tail.rx_cnt == 0 {
591 return Err(SendError(value));
592 }
593
594 // Position to write into
595 let pos = tail.pos;
596 let rem = tail.rx_cnt;
597 let idx = (pos & self.shared.mask as u64) as usize;
598
599 // Update the tail position
600 tail.pos = tail.pos.wrapping_add(1);
601
602 // Get the slot
603 let mut slot = self.shared.buffer[idx].write().unwrap();
604
605 // Track the position
606 slot.pos = pos;
607
608 // Set remaining receivers
609 slot.rem.with_mut(|v| *v = rem);
610
611 // Write the value
612 slot.val = UnsafeCell::new(Some(value));
613
614 // Release the slot lock before notifying the receivers.
615 drop(slot);
616
617 // Notify and release the mutex. This must happen after the slot lock is
618 // released, otherwise the writer lock bit could be cleared while another
619 // thread is in the critical section.
620 self.shared.notify_rx(tail);
621
622 Ok(rem)
623 }
624
625 /// Creates a new [`Receiver`] handle that will receive values sent **after**
626 /// this call to `subscribe`.
627 ///
628 /// # Examples
629 ///
630 /// ```
631 /// use tokio::sync::broadcast;
632 ///
633 /// #[tokio::main]
634 /// async fn main() {
635 /// let (tx, _rx) = broadcast::channel(16);
636 ///
637 /// // Will not be seen
638 /// tx.send(10).unwrap();
639 ///
640 /// let mut rx = tx.subscribe();
641 ///
642 /// tx.send(20).unwrap();
643 ///
644 /// let value = rx.recv().await.unwrap();
645 /// assert_eq!(20, value);
646 /// }
647 /// ```
648 pub fn subscribe(&self) -> Receiver<T> {
649 let shared = self.shared.clone();
650 new_receiver(shared)
651 }
652
653 /// Returns the number of queued values.
654 ///
655 /// A value is queued until it has either been seen by all receivers that were alive at the time
656 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
657 /// queue's capacity.
658 ///
659 /// # Note
660 ///
661 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
662 /// have been evicted from the queue before being seen by all receivers.
663 ///
664 /// # Examples
665 ///
666 /// ```
667 /// use tokio::sync::broadcast;
668 ///
669 /// #[tokio::main]
670 /// async fn main() {
671 /// let (tx, mut rx1) = broadcast::channel(16);
672 /// let mut rx2 = tx.subscribe();
673 ///
674 /// tx.send(10).unwrap();
675 /// tx.send(20).unwrap();
676 /// tx.send(30).unwrap();
677 ///
678 /// assert_eq!(tx.len(), 3);
679 ///
680 /// rx1.recv().await.unwrap();
681 ///
682 /// // The len is still 3 since rx2 hasn't seen the first value yet.
683 /// assert_eq!(tx.len(), 3);
684 ///
685 /// rx2.recv().await.unwrap();
686 ///
687 /// assert_eq!(tx.len(), 2);
688 /// }
689 /// ```
690 pub fn len(&self) -> usize {
691 let tail = self.shared.tail.lock();
692
693 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
694 let mut low = 0;
695 let mut high = self.shared.buffer.len();
696 while low < high {
697 let mid = low + (high - low) / 2;
698 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
699 if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
700 low = mid + 1;
701 } else {
702 high = mid;
703 }
704 }
705
706 self.shared.buffer.len() - low
707 }
708
709 /// Returns true if there are no queued values.
710 ///
711 /// # Examples
712 ///
713 /// ```
714 /// use tokio::sync::broadcast;
715 ///
716 /// #[tokio::main]
717 /// async fn main() {
718 /// let (tx, mut rx1) = broadcast::channel(16);
719 /// let mut rx2 = tx.subscribe();
720 ///
721 /// assert!(tx.is_empty());
722 ///
723 /// tx.send(10).unwrap();
724 ///
725 /// assert!(!tx.is_empty());
726 ///
727 /// rx1.recv().await.unwrap();
728 ///
729 /// // The queue is still not empty since rx2 hasn't seen the value.
730 /// assert!(!tx.is_empty());
731 ///
732 /// rx2.recv().await.unwrap();
733 ///
734 /// assert!(tx.is_empty());
735 /// }
736 /// ```
737 pub fn is_empty(&self) -> bool {
738 let tail = self.shared.tail.lock();
739
740 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
741 self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
742 }
743
744 /// Returns the number of active receivers
745 ///
746 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
747 /// [`subscribe`]. These are the handles that will receive values sent on
748 /// this [`Sender`].
749 ///
750 /// # Note
751 ///
752 /// It is not guaranteed that a sent message will reach this number of
753 /// receivers. Active receivers may never call [`recv`] again before
754 /// dropping.
755 ///
756 /// [`recv`]: crate::sync::broadcast::Receiver::recv
757 /// [`Receiver`]: crate::sync::broadcast::Receiver
758 /// [`Sender`]: crate::sync::broadcast::Sender
759 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
760 /// [`channel`]: crate::sync::broadcast::channel
761 ///
762 /// # Examples
763 ///
764 /// ```
765 /// use tokio::sync::broadcast;
766 ///
767 /// #[tokio::main]
768 /// async fn main() {
769 /// let (tx, _rx1) = broadcast::channel(16);
770 ///
771 /// assert_eq!(1, tx.receiver_count());
772 ///
773 /// let mut _rx2 = tx.subscribe();
774 ///
775 /// assert_eq!(2, tx.receiver_count());
776 ///
777 /// tx.send(10).unwrap();
778 /// }
779 /// ```
780 pub fn receiver_count(&self) -> usize {
781 let tail = self.shared.tail.lock();
782 tail.rx_cnt
783 }
784
785 /// Returns `true` if senders belong to the same channel.
786 ///
787 /// # Examples
788 ///
789 /// ```
790 /// use tokio::sync::broadcast;
791 ///
792 /// #[tokio::main]
793 /// async fn main() {
794 /// let (tx, _rx) = broadcast::channel::<()>(16);
795 /// let tx2 = tx.clone();
796 ///
797 /// assert!(tx.same_channel(&tx2));
798 ///
799 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
800 ///
801 /// assert!(!tx3.same_channel(&tx2));
802 /// }
803 /// ```
804 pub fn same_channel(&self, other: &Self) -> bool {
805 Arc::ptr_eq(&self.shared, &other.shared)
806 }
807
808 fn close_channel(&self) {
809 let mut tail = self.shared.tail.lock();
810 tail.closed = true;
811
812 self.shared.notify_rx(tail);
813 }
814}
815
816/// Create a new `Receiver` which reads starting from the tail.
817fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
818 let mut tail = shared.tail.lock();
819
820 assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");
821
822 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
823
824 let next = tail.pos;
825
826 drop(tail);
827
828 Receiver { shared, next }
829}
830
831/// List used in `Shared::notify_rx`. It wraps a guarded linked list
832/// and gates the access to it on the `Shared.tail` mutex. It also empties
833/// the list on drop.
834struct WaitersList<'a, T> {
835 list: GuardedLinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
836 is_empty: bool,
837 shared: &'a Shared<T>,
838}
839
840impl<'a, T> Drop for WaitersList<'a, T> {
841 fn drop(&mut self) {
842 // If the list is not empty, we unlink all waiters from it.
843 // We do not wake the waiters to avoid double panics.
844 if !self.is_empty {
845 let _lock_guard = self.shared.tail.lock();
846 while self.list.pop_back().is_some() {}
847 }
848 }
849}
850
851impl<'a, T> WaitersList<'a, T> {
852 fn new(
853 unguarded_list: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
854 guard: Pin<&'a Waiter>,
855 shared: &'a Shared<T>,
856 ) -> Self {
857 let guard_ptr = NonNull::from(guard.get_ref());
858 let list = unguarded_list.into_guarded(guard_ptr);
859 WaitersList {
860 list,
861 is_empty: false,
862 shared,
863 }
864 }
865
866 /// Removes the last element from the guarded list. Modifying this list
867 /// requires an exclusive access to the main list in `Notify`.
868 fn pop_back_locked(&mut self, _tail: &mut Tail) -> Option<NonNull<Waiter>> {
869 let result = self.list.pop_back();
870 if result.is_none() {
871 // Save information about emptiness to avoid waiting for lock
872 // in the destructor.
873 self.is_empty = true;
874 }
875 result
876 }
877}
878
879impl<T> Shared<T> {
880 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
881 // It is critical for `GuardedLinkedList` safety that the guard node is
882 // pinned in memory and is not dropped until the guarded list is dropped.
883 let guard = Waiter::new();
884 pin!(guard);
885
886 // We move all waiters to a secondary list. It uses a `GuardedLinkedList`
887 // underneath to allow every waiter to safely remove itself from it.
888 //
889 // * This list will be still guarded by the `waiters` lock.
890 // `NotifyWaitersList` wrapper makes sure we hold the lock to modify it.
891 // * This wrapper will empty the list on drop. It is critical for safety
892 // that we will not leave any list entry with a pointer to the local
893 // guard node after this function returns / panics.
894 let mut list = WaitersList::new(std::mem::take(&mut tail.waiters), guard.as_ref(), self);
895
896 let mut wakers = WakeList::new();
897 'outer: loop {
898 while wakers.can_push() {
899 match list.pop_back_locked(&mut tail) {
900 Some(mut waiter) => {
901 // Safety: `tail` lock is still held.
902 let waiter = unsafe { waiter.as_mut() };
903
904 assert!(waiter.queued);
905 waiter.queued = false;
906
907 if let Some(waker) = waiter.waker.take() {
908 wakers.push(waker);
909 }
910 }
911 None => {
912 break 'outer;
913 }
914 }
915 }
916
917 // Release the lock before waking.
918 drop(tail);
919
920 // Before we acquire the lock again all sorts of things can happen:
921 // some waiters may remove themselves from the list and new waiters
922 // may be added. This is fine since at worst we will unnecessarily
923 // wake up waiters which will then queue themselves again.
924
925 wakers.wake_all();
926
927 // Acquire the lock again.
928 tail = self.tail.lock();
929 }
930
931 // Release the lock before waking.
932 drop(tail);
933
934 wakers.wake_all();
935 }
936}
937
938impl<T> Clone for Sender<T> {
939 fn clone(&self) -> Sender<T> {
940 let shared = self.shared.clone();
941 shared.num_tx.fetch_add(1, SeqCst);
942
943 Sender { shared }
944 }
945}
946
947impl<T> Drop for Sender<T> {
948 fn drop(&mut self) {
949 if 1 == self.shared.num_tx.fetch_sub(1, SeqCst) {
950 self.close_channel();
951 }
952 }
953}
954
955impl<T> Receiver<T> {
956 /// Returns the number of messages that were sent into the channel and that
957 /// this [`Receiver`] has yet to receive.
958 ///
959 /// If the returned value from `len` is larger than the next largest power of 2
960 /// of the capacity of the channel any call to [`recv`] will return an
961 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
962 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
963 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
964 /// values larger than 16.
965 ///
966 /// [`Receiver`]: crate::sync::broadcast::Receiver
967 /// [`recv`]: crate::sync::broadcast::Receiver::recv
968 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
969 ///
970 /// # Examples
971 ///
972 /// ```
973 /// use tokio::sync::broadcast;
974 ///
975 /// #[tokio::main]
976 /// async fn main() {
977 /// let (tx, mut rx1) = broadcast::channel(16);
978 ///
979 /// tx.send(10).unwrap();
980 /// tx.send(20).unwrap();
981 ///
982 /// assert_eq!(rx1.len(), 2);
983 /// assert_eq!(rx1.recv().await.unwrap(), 10);
984 /// assert_eq!(rx1.len(), 1);
985 /// assert_eq!(rx1.recv().await.unwrap(), 20);
986 /// assert_eq!(rx1.len(), 0);
987 /// }
988 /// ```
989 pub fn len(&self) -> usize {
990 let next_send_pos = self.shared.tail.lock().pos;
991 (next_send_pos - self.next) as usize
992 }
993
994 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
995 /// has yet to receive.
996 ///
997 /// [`Receiver]: create::sync::broadcast::Receiver
998 ///
999 /// # Examples
1000 ///
1001 /// ```
1002 /// use tokio::sync::broadcast;
1003 ///
1004 /// #[tokio::main]
1005 /// async fn main() {
1006 /// let (tx, mut rx1) = broadcast::channel(16);
1007 ///
1008 /// assert!(rx1.is_empty());
1009 ///
1010 /// tx.send(10).unwrap();
1011 /// tx.send(20).unwrap();
1012 ///
1013 /// assert!(!rx1.is_empty());
1014 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1015 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1016 /// assert!(rx1.is_empty());
1017 /// }
1018 /// ```
1019 pub fn is_empty(&self) -> bool {
1020 self.len() == 0
1021 }
1022
1023 /// Returns `true` if receivers belong to the same channel.
1024 ///
1025 /// # Examples
1026 ///
1027 /// ```
1028 /// use tokio::sync::broadcast;
1029 ///
1030 /// #[tokio::main]
1031 /// async fn main() {
1032 /// let (tx, rx) = broadcast::channel::<()>(16);
1033 /// let rx2 = tx.subscribe();
1034 ///
1035 /// assert!(rx.same_channel(&rx2));
1036 ///
1037 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
1038 ///
1039 /// assert!(!rx3.same_channel(&rx2));
1040 /// }
1041 /// ```
1042 pub fn same_channel(&self, other: &Self) -> bool {
1043 Arc::ptr_eq(&self.shared, &other.shared)
1044 }
1045
1046 /// Locks the next value if there is one.
1047 fn recv_ref(
1048 &mut self,
1049 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
1050 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
1051 let idx = (self.next & self.shared.mask as u64) as usize;
1052
1053 // The slot holding the next value to read
1054 let mut slot = self.shared.buffer[idx].read().unwrap();
1055
1056 if slot.pos != self.next {
1057 // Release the `slot` lock before attempting to acquire the `tail`
1058 // lock. This is required because `send2` acquires the tail lock
1059 // first followed by the slot lock. Acquiring the locks in reverse
1060 // order here would result in a potential deadlock: `recv_ref`
1061 // acquires the `slot` lock and attempts to acquire the `tail` lock
1062 // while `send2` acquired the `tail` lock and attempts to acquire
1063 // the slot lock.
1064 drop(slot);
1065
1066 let mut old_waker = None;
1067
1068 let mut tail = self.shared.tail.lock();
1069
1070 // Acquire slot lock again
1071 slot = self.shared.buffer[idx].read().unwrap();
1072
1073 // Make sure the position did not change. This could happen in the
1074 // unlikely event that the buffer is wrapped between dropping the
1075 // read lock and acquiring the tail lock.
1076 if slot.pos != self.next {
1077 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
1078
1079 if next_pos == self.next {
1080 // At this point the channel is empty for *this* receiver. If
1081 // it's been closed, then that's what we return, otherwise we
1082 // set a waker and return empty.
1083 if tail.closed {
1084 return Err(TryRecvError::Closed);
1085 }
1086
1087 // Store the waker
1088 if let Some((waiter, waker)) = waiter {
1089 // Safety: called while locked.
1090 unsafe {
1091 // Only queue if not already queued
1092 waiter.with_mut(|ptr| {
1093 // If there is no waker **or** if the currently
1094 // stored waker references a **different** task,
1095 // track the tasks' waker to be notified on
1096 // receipt of a new value.
1097 match (*ptr).waker {
1098 Some(ref w) if w.will_wake(waker) => {}
1099 _ => {
1100 old_waker = std::mem::replace(
1101 &mut (*ptr).waker,
1102 Some(waker.clone()),
1103 );
1104 }
1105 }
1106
1107 if !(*ptr).queued {
1108 (*ptr).queued = true;
1109 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1110 }
1111 });
1112 }
1113 }
1114
1115 // Drop the old waker after releasing the locks.
1116 drop(slot);
1117 drop(tail);
1118 drop(old_waker);
1119
1120 return Err(TryRecvError::Empty);
1121 }
1122
1123 // At this point, the receiver has lagged behind the sender by
1124 // more than the channel capacity. The receiver will attempt to
1125 // catch up by skipping dropped messages and setting the
1126 // internal cursor to the **oldest** message stored by the
1127 // channel.
1128 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1129
1130 let missed = next.wrapping_sub(self.next);
1131
1132 drop(tail);
1133
1134 // The receiver is slow but no values have been missed
1135 if missed == 0 {
1136 self.next = self.next.wrapping_add(1);
1137
1138 return Ok(RecvGuard { slot });
1139 }
1140
1141 self.next = next;
1142
1143 return Err(TryRecvError::Lagged(missed));
1144 }
1145 }
1146
1147 self.next = self.next.wrapping_add(1);
1148
1149 Ok(RecvGuard { slot })
1150 }
1151}
1152
1153impl<T: Clone> Receiver<T> {
1154 /// Re-subscribes to the channel starting from the current tail element.
1155 ///
1156 /// This [`Receiver`] handle will receive a clone of all values sent
1157 /// **after** it has resubscribed. This will not include elements that are
1158 /// in the queue of the current receiver. Consider the following example.
1159 ///
1160 /// # Examples
1161 ///
1162 /// ```
1163 /// use tokio::sync::broadcast;
1164 ///
1165 /// #[tokio::main]
1166 /// async fn main() {
1167 /// let (tx, mut rx) = broadcast::channel(2);
1168 ///
1169 /// tx.send(1).unwrap();
1170 /// let mut rx2 = rx.resubscribe();
1171 /// tx.send(2).unwrap();
1172 ///
1173 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1174 /// assert_eq!(rx.recv().await.unwrap(), 1);
1175 /// }
1176 /// ```
1177 pub fn resubscribe(&self) -> Self {
1178 let shared = self.shared.clone();
1179 new_receiver(shared)
1180 }
1181 /// Receives the next value for this receiver.
1182 ///
1183 /// Each [`Receiver`] handle will receive a clone of all values sent
1184 /// **after** it has subscribed.
1185 ///
1186 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1187 /// dropped, indicating that no further values can be sent on the channel.
1188 ///
1189 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1190 /// sent values will overwrite old values. At this point, a call to [`recv`]
1191 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1192 /// internal cursor is updated to point to the oldest value still held by
1193 /// the channel. A subsequent call to [`recv`] will return this value
1194 /// **unless** it has been since overwritten.
1195 ///
1196 /// # Cancel safety
1197 ///
1198 /// This method is cancel safe. If `recv` is used as the event in a
1199 /// [`tokio::select!`](crate::select) statement and some other branch
1200 /// completes first, it is guaranteed that no messages were received on this
1201 /// channel.
1202 ///
1203 /// [`Receiver`]: crate::sync::broadcast::Receiver
1204 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1205 ///
1206 /// # Examples
1207 ///
1208 /// ```
1209 /// use tokio::sync::broadcast;
1210 ///
1211 /// #[tokio::main]
1212 /// async fn main() {
1213 /// let (tx, mut rx1) = broadcast::channel(16);
1214 /// let mut rx2 = tx.subscribe();
1215 ///
1216 /// tokio::spawn(async move {
1217 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1218 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1219 /// });
1220 ///
1221 /// tokio::spawn(async move {
1222 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1223 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1224 /// });
1225 ///
1226 /// tx.send(10).unwrap();
1227 /// tx.send(20).unwrap();
1228 /// }
1229 /// ```
1230 ///
1231 /// Handling lag
1232 ///
1233 /// ```
1234 /// use tokio::sync::broadcast;
1235 ///
1236 /// #[tokio::main]
1237 /// async fn main() {
1238 /// let (tx, mut rx) = broadcast::channel(2);
1239 ///
1240 /// tx.send(10).unwrap();
1241 /// tx.send(20).unwrap();
1242 /// tx.send(30).unwrap();
1243 ///
1244 /// // The receiver lagged behind
1245 /// assert!(rx.recv().await.is_err());
1246 ///
1247 /// // At this point, we can abort or continue with lost messages
1248 ///
1249 /// assert_eq!(20, rx.recv().await.unwrap());
1250 /// assert_eq!(30, rx.recv().await.unwrap());
1251 /// }
1252 /// ```
1253 pub async fn recv(&mut self) -> Result<T, RecvError> {
1254 let fut = Recv::new(self);
1255 fut.await
1256 }
1257
1258 /// Attempts to return a pending value on this receiver without awaiting.
1259 ///
1260 /// This is useful for a flavor of "optimistic check" before deciding to
1261 /// await on a receiver.
1262 ///
1263 /// Compared with [`recv`], this function has three failure cases instead of two
1264 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1265 ///
1266 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1267 /// dropped, indicating that no further values can be sent on the channel.
1268 ///
1269 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1270 /// sent values will overwrite old values. At this point, a call to [`recv`]
1271 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1272 /// internal cursor is updated to point to the oldest value still held by
1273 /// the channel. A subsequent call to [`try_recv`] will return this value
1274 /// **unless** it has been since overwritten. If there are no values to
1275 /// receive, `Err(TryRecvError::Empty)` is returned.
1276 ///
1277 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1278 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1279 /// [`Receiver`]: crate::sync::broadcast::Receiver
1280 ///
1281 /// # Examples
1282 ///
1283 /// ```
1284 /// use tokio::sync::broadcast;
1285 ///
1286 /// #[tokio::main]
1287 /// async fn main() {
1288 /// let (tx, mut rx) = broadcast::channel(16);
1289 ///
1290 /// assert!(rx.try_recv().is_err());
1291 ///
1292 /// tx.send(10).unwrap();
1293 ///
1294 /// let value = rx.try_recv().unwrap();
1295 /// assert_eq!(10, value);
1296 /// }
1297 /// ```
1298 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1299 let guard = self.recv_ref(None)?;
1300 guard.clone_value().ok_or(TryRecvError::Closed)
1301 }
1302
1303 /// Blocking receive to call outside of asynchronous contexts.
1304 ///
1305 /// # Panics
1306 ///
1307 /// This function panics if called within an asynchronous execution
1308 /// context.
1309 ///
1310 /// # Examples
1311 /// ```
1312 /// use std::thread;
1313 /// use tokio::sync::broadcast;
1314 ///
1315 /// #[tokio::main]
1316 /// async fn main() {
1317 /// let (tx, mut rx) = broadcast::channel(16);
1318 ///
1319 /// let sync_code = thread::spawn(move || {
1320 /// assert_eq!(rx.blocking_recv(), Ok(10));
1321 /// });
1322 ///
1323 /// let _ = tx.send(10);
1324 /// sync_code.join().unwrap();
1325 /// }
1326 /// ```
1327 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1328 crate::future::block_on(self.recv())
1329 }
1330}
1331
1332impl<T> Drop for Receiver<T> {
1333 fn drop(&mut self) {
1334 let mut tail = self.shared.tail.lock();
1335
1336 tail.rx_cnt -= 1;
1337 let until = tail.pos;
1338
1339 drop(tail);
1340
1341 while self.next < until {
1342 match self.recv_ref(None) {
1343 Ok(_) => {}
1344 // The channel is closed
1345 Err(TryRecvError::Closed) => break,
1346 // Ignore lagging, we will catch up
1347 Err(TryRecvError::Lagged(..)) => {}
1348 // Can't be empty
1349 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1350 }
1351 }
1352 }
1353}
1354
1355impl<'a, T> Recv<'a, T> {
1356 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1357 Recv {
1358 receiver,
1359 waiter: UnsafeCell::new(Waiter {
1360 queued: false,
1361 waker: None,
1362 pointers: linked_list::Pointers::new(),
1363 _p: PhantomPinned,
1364 }),
1365 }
1366 }
1367
1368 /// A custom `project` implementation is used in place of `pin-project-lite`
1369 /// as a custom drop implementation is needed.
1370 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1371 unsafe {
1372 // Safety: Receiver is Unpin
1373 is_unpin::<&mut Receiver<T>>();
1374
1375 let me = self.get_unchecked_mut();
1376 (me.receiver, &me.waiter)
1377 }
1378 }
1379}
1380
1381impl<'a, T> Future for Recv<'a, T>
1382where
1383 T: Clone,
1384{
1385 type Output = Result<T, RecvError>;
1386
1387 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1388 ready!(crate::trace::trace_leaf(cx));
1389
1390 let (receiver, waiter) = self.project();
1391
1392 let guard = match receiver.recv_ref(Some((waiter, cx.waker()))) {
1393 Ok(value) => value,
1394 Err(TryRecvError::Empty) => return Poll::Pending,
1395 Err(TryRecvError::Lagged(n)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1396 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1397 };
1398
1399 Poll::Ready(guard.clone_value().ok_or(RecvError::Closed))
1400 }
1401}
1402
1403impl<'a, T> Drop for Recv<'a, T> {
1404 fn drop(&mut self) {
1405 // Acquire the tail lock. This is required for safety before accessing
1406 // the waiter node.
1407 let mut tail = self.receiver.shared.tail.lock();
1408
1409 // safety: tail lock is held
1410 let queued = self.waiter.with(|ptr| unsafe { (*ptr).queued });
1411
1412 if queued {
1413 // Remove the node
1414 //
1415 // safety: tail lock is held and the wait node is verified to be in
1416 // the list.
1417 unsafe {
1418 self.waiter.with_mut(|ptr| {
1419 tail.waiters.remove((&mut *ptr).into());
1420 });
1421 }
1422 }
1423 }
1424}
1425
1426/// # Safety
1427///
1428/// `Waiter` is forced to be !Unpin.
1429unsafe impl linked_list::Link for Waiter {
1430 type Handle = NonNull<Waiter>;
1431 type Target = Waiter;
1432
1433 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1434 *handle
1435 }
1436
1437 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1438 ptr
1439 }
1440
1441 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1442 Waiter::addr_of_pointers(target)
1443 }
1444}
1445
1446impl<T> fmt::Debug for Sender<T> {
1447 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1448 write!(fmt, "broadcast::Sender")
1449 }
1450}
1451
1452impl<T> fmt::Debug for Receiver<T> {
1453 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1454 write!(fmt, "broadcast::Receiver")
1455 }
1456}
1457
1458impl<'a, T> RecvGuard<'a, T> {
1459 fn clone_value(&self) -> Option<T>
1460 where
1461 T: Clone,
1462 {
1463 self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1464 }
1465}
1466
1467impl<'a, T> Drop for RecvGuard<'a, T> {
1468 fn drop(&mut self) {
1469 // Decrement the remaining counter
1470 if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1471 // Safety: Last receiver, drop the value
1472 self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1473 }
1474 }
1475}
1476
1477fn is_unpin<T: Unpin>() {}
1478
1479#[cfg(not(loom))]
1480#[cfg(test)]
1481mod tests {
1482 use super::*;
1483
1484 #[test]
1485 fn receiver_count_on_sender_constructor() {
1486 let sender = Sender::<i32>::new(16);
1487 assert_eq!(sender.receiver_count(), 0);
1488
1489 let rx_1 = sender.subscribe();
1490 assert_eq!(sender.receiver_count(), 1);
1491
1492 let rx_2 = rx_1.resubscribe();
1493 assert_eq!(sender.receiver_count(), 2);
1494
1495 let rx_3 = sender.subscribe();
1496 assert_eq!(sender.receiver_count(), 3);
1497
1498 drop(rx_3);
1499 drop(rx_1);
1500 assert_eq!(sender.receiver_count(), 1);
1501
1502 drop(rx_2);
1503 assert_eq!(sender.receiver_count(), 0);
1504 }
1505
1506 #[cfg(not(loom))]
1507 #[test]
1508 fn receiver_count_on_channel_constructor() {
1509 let (sender, rx) = channel::<i32>(16);
1510 assert_eq!(sender.receiver_count(), 1);
1511
1512 let _rx_2 = rx.resubscribe();
1513 assert_eq!(sender.receiver_count(), 2);
1514 }
1515}
1516