1//! A multi-producer, multi-consumer broadcast queue. Each sent value is seen by
2//! all consumers.
3//!
4//! A [`Sender`] is used to broadcast values to **all** connected [`Receiver`]
5//! values. [`Sender`] handles are clone-able, allowing concurrent send and
6//! receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as
7//! long as `T` is `Send`.
8//!
9//! When a value is sent, **all** [`Receiver`] handles are notified and will
10//! receive the value. The value is stored once inside the channel and cloned on
11//! demand for each receiver. Once all receivers have received a clone of the
12//! value, the value is released from the channel.
13//!
14//! A channel is created by calling [`channel`], specifying the maximum number
15//! of messages the channel can retain at any given time.
16//!
17//! New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The
18//! returned [`Receiver`] will receive values sent **after** the call to
19//! `subscribe`.
20//!
21//! This channel is also suitable for the single-producer multi-consumer
22//! use-case, where a single sender broadcasts values to many receivers.
23//!
24//! ## Lagging
25//!
26//! As sent messages must be retained until **all** [`Receiver`] handles receive
27//! a clone, broadcast channels are susceptible to the "slow receiver" problem.
28//! In this case, all but one receiver are able to receive values at the rate
29//! they are sent. Because one receiver is stalled, the channel starts to fill
30//! up.
31//!
32//! This broadcast channel implementation handles this case by setting a hard
33//! upper bound on the number of values the channel may retain at any given
34//! time. This upper bound is passed to the [`channel`] function as an argument.
35//!
36//! If a value is sent when the channel is at capacity, the oldest value
37//! currently held by the channel is released. This frees up space for the new
38//! value. Any receiver that has not yet seen the released value will return
39//! [`RecvError::Lagged`] the next time [`recv`] is called.
40//!
41//! Once [`RecvError::Lagged`] is returned, the lagging receiver's position is
42//! updated to the oldest value contained by the channel. The next call to
43//! [`recv`] will return this value.
44//!
45//! This behavior enables a receiver to detect when it has lagged so far behind
46//! that data has been dropped. The caller may decide how to respond to this:
47//! either by aborting its task or by tolerating lost messages and resuming
48//! consumption of the channel.
49//!
50//! ## Closing
51//!
52//! When **all** [`Sender`] handles have been dropped, no new values may be
53//! sent. At this point, the channel is "closed". Once a receiver has received
54//! all values retained by the channel, the next call to [`recv`] will return
55//! with [`RecvError::Closed`].
56//!
57//! When a [`Receiver`] handle is dropped, any messages not read by the receiver
58//! will be marked as read. If this receiver was the only one not to have read
59//! that message, the message will be dropped at this point.
60//!
61//! [`Sender`]: crate::sync::broadcast::Sender
62//! [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
63//! [`Receiver`]: crate::sync::broadcast::Receiver
64//! [`channel`]: crate::sync::broadcast::channel
65//! [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged
66//! [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed
67//! [`recv`]: crate::sync::broadcast::Receiver::recv
68//!
69//! # Examples
70//!
71//! Basic usage
72//!
73//! ```
74//! use tokio::sync::broadcast;
75//!
76//! #[tokio::main]
77//! async fn main() {
78//! let (tx, mut rx1) = broadcast::channel(16);
79//! let mut rx2 = tx.subscribe();
80//!
81//! tokio::spawn(async move {
82//! assert_eq!(rx1.recv().await.unwrap(), 10);
83//! assert_eq!(rx1.recv().await.unwrap(), 20);
84//! });
85//!
86//! tokio::spawn(async move {
87//! assert_eq!(rx2.recv().await.unwrap(), 10);
88//! assert_eq!(rx2.recv().await.unwrap(), 20);
89//! });
90//!
91//! tx.send(10).unwrap();
92//! tx.send(20).unwrap();
93//! }
94//! ```
95//!
96//! Handling lag
97//!
98//! ```
99//! use tokio::sync::broadcast;
100//!
101//! #[tokio::main]
102//! async fn main() {
103//! let (tx, mut rx) = broadcast::channel(2);
104//!
105//! tx.send(10).unwrap();
106//! tx.send(20).unwrap();
107//! tx.send(30).unwrap();
108//!
109//! // The receiver lagged behind
110//! assert!(rx.recv().await.is_err());
111//!
112//! // At this point, we can abort or continue with lost messages
113//!
114//! assert_eq!(20, rx.recv().await.unwrap());
115//! assert_eq!(30, rx.recv().await.unwrap());
116//! }
117//! ```
118
119use crate::loom::cell::UnsafeCell;
120use crate::loom::sync::atomic::AtomicUsize;
121use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
122use crate::util::linked_list::{self, LinkedList};
123use crate::util::WakeList;
124
125use std::fmt;
126use std::future::Future;
127use std::marker::PhantomPinned;
128use std::pin::Pin;
129use std::ptr::NonNull;
130use std::sync::atomic::Ordering::SeqCst;
131use std::task::{Context, Poll, Waker};
132use std::usize;
133
134/// Sending-half of the [`broadcast`] channel.
135///
136/// May be used from many threads. Messages can be sent with
137/// [`send`][Sender::send].
138///
139/// # Examples
140///
141/// ```
142/// use tokio::sync::broadcast;
143///
144/// #[tokio::main]
145/// async fn main() {
146/// let (tx, mut rx1) = broadcast::channel(16);
147/// let mut rx2 = tx.subscribe();
148///
149/// tokio::spawn(async move {
150/// assert_eq!(rx1.recv().await.unwrap(), 10);
151/// assert_eq!(rx1.recv().await.unwrap(), 20);
152/// });
153///
154/// tokio::spawn(async move {
155/// assert_eq!(rx2.recv().await.unwrap(), 10);
156/// assert_eq!(rx2.recv().await.unwrap(), 20);
157/// });
158///
159/// tx.send(10).unwrap();
160/// tx.send(20).unwrap();
161/// }
162/// ```
163///
164/// [`broadcast`]: crate::sync::broadcast
165pub struct Sender<T> {
166 shared: Arc<Shared<T>>,
167}
168
169/// Receiving-half of the [`broadcast`] channel.
170///
171/// Must not be used concurrently. Messages may be retrieved using
172/// [`recv`][Receiver::recv].
173///
174/// To turn this receiver into a `Stream`, you can use the [`BroadcastStream`]
175/// wrapper.
176///
177/// [`BroadcastStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.BroadcastStream.html
178///
179/// # Examples
180///
181/// ```
182/// use tokio::sync::broadcast;
183///
184/// #[tokio::main]
185/// async fn main() {
186/// let (tx, mut rx1) = broadcast::channel(16);
187/// let mut rx2 = tx.subscribe();
188///
189/// tokio::spawn(async move {
190/// assert_eq!(rx1.recv().await.unwrap(), 10);
191/// assert_eq!(rx1.recv().await.unwrap(), 20);
192/// });
193///
194/// tokio::spawn(async move {
195/// assert_eq!(rx2.recv().await.unwrap(), 10);
196/// assert_eq!(rx2.recv().await.unwrap(), 20);
197/// });
198///
199/// tx.send(10).unwrap();
200/// tx.send(20).unwrap();
201/// }
202/// ```
203///
204/// [`broadcast`]: crate::sync::broadcast
205pub struct Receiver<T> {
206 /// State shared with all receivers and senders.
207 shared: Arc<Shared<T>>,
208
209 /// Next position to read from
210 next: u64,
211}
212
213pub mod error {
214 //! Broadcast error types
215
216 use std::fmt;
217
218 /// Error returned by from the [`send`] function on a [`Sender`].
219 ///
220 /// A **send** operation can only fail if there are no active receivers,
221 /// implying that the message could never be received. The error contains the
222 /// message being sent as a payload so it can be recovered.
223 ///
224 /// [`send`]: crate::sync::broadcast::Sender::send
225 /// [`Sender`]: crate::sync::broadcast::Sender
226 #[derive(Debug)]
227 pub struct SendError<T>(pub T);
228
229 impl<T> fmt::Display for SendError<T> {
230 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
231 write!(f, "channel closed")
232 }
233 }
234
235 impl<T: fmt::Debug> std::error::Error for SendError<T> {}
236
237 /// An error returned from the [`recv`] function on a [`Receiver`].
238 ///
239 /// [`recv`]: crate::sync::broadcast::Receiver::recv
240 /// [`Receiver`]: crate::sync::broadcast::Receiver
241 #[derive(Debug, PartialEq, Eq, Clone)]
242 pub enum RecvError {
243 /// There are no more active senders implying no further messages will ever
244 /// be sent.
245 Closed,
246
247 /// The receiver lagged too far behind. Attempting to receive again will
248 /// return the oldest message still retained by the channel.
249 ///
250 /// Includes the number of skipped messages.
251 Lagged(u64),
252 }
253
254 impl fmt::Display for RecvError {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 match self {
257 RecvError::Closed => write!(f, "channel closed"),
258 RecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
259 }
260 }
261 }
262
263 impl std::error::Error for RecvError {}
264
265 /// An error returned from the [`try_recv`] function on a [`Receiver`].
266 ///
267 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
268 /// [`Receiver`]: crate::sync::broadcast::Receiver
269 #[derive(Debug, PartialEq, Eq, Clone)]
270 pub enum TryRecvError {
271 /// The channel is currently empty. There are still active
272 /// [`Sender`] handles, so data may yet become available.
273 ///
274 /// [`Sender`]: crate::sync::broadcast::Sender
275 Empty,
276
277 /// There are no more active senders implying no further messages will ever
278 /// be sent.
279 Closed,
280
281 /// The receiver lagged too far behind and has been forcibly disconnected.
282 /// Attempting to receive again will return the oldest message still
283 /// retained by the channel.
284 ///
285 /// Includes the number of skipped messages.
286 Lagged(u64),
287 }
288
289 impl fmt::Display for TryRecvError {
290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291 match self {
292 TryRecvError::Empty => write!(f, "channel empty"),
293 TryRecvError::Closed => write!(f, "channel closed"),
294 TryRecvError::Lagged(amt) => write!(f, "channel lagged by {}", amt),
295 }
296 }
297 }
298
299 impl std::error::Error for TryRecvError {}
300}
301
302use self::error::*;
303
304/// Data shared between senders and receivers.
305struct Shared<T> {
306 /// slots in the channel.
307 buffer: Box<[RwLock<Slot<T>>]>,
308
309 /// Mask a position -> index.
310 mask: usize,
311
312 /// Tail of the queue. Includes the rx wait list.
313 tail: Mutex<Tail>,
314
315 /// Number of outstanding Sender handles.
316 num_tx: AtomicUsize,
317}
318
319/// Next position to write a value.
320struct Tail {
321 /// Next position to write to.
322 pos: u64,
323
324 /// Number of active receivers.
325 rx_cnt: usize,
326
327 /// True if the channel is closed.
328 closed: bool,
329
330 /// Receivers waiting for a value.
331 waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
332}
333
334/// Slot in the buffer.
335struct Slot<T> {
336 /// Remaining number of receivers that are expected to see this value.
337 ///
338 /// When this goes to zero, the value is released.
339 ///
340 /// An atomic is used as it is mutated concurrently with the slot read lock
341 /// acquired.
342 rem: AtomicUsize,
343
344 /// Uniquely identifies the `send` stored in the slot.
345 pos: u64,
346
347 /// The value being broadcast.
348 ///
349 /// The value is set by `send` when the write lock is held. When a reader
350 /// drops, `rem` is decremented. When it hits zero, the value is dropped.
351 val: UnsafeCell<Option<T>>,
352}
353
354/// An entry in the wait queue.
355struct Waiter {
356 /// True if queued.
357 queued: bool,
358
359 /// Task waiting on the broadcast channel.
360 waker: Option<Waker>,
361
362 /// Intrusive linked-list pointers.
363 pointers: linked_list::Pointers<Waiter>,
364
365 /// Should not be `Unpin`.
366 _p: PhantomPinned,
367}
368
369generate_addr_of_methods! {
370 impl<> Waiter {
371 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
372 &self.pointers
373 }
374 }
375}
376
377struct RecvGuard<'a, T> {
378 slot: RwLockReadGuard<'a, Slot<T>>,
379}
380
381/// Receive a value future.
382struct Recv<'a, T> {
383 /// Receiver being waited on.
384 receiver: &'a mut Receiver<T>,
385
386 /// Entry in the waiter `LinkedList`.
387 waiter: UnsafeCell<Waiter>,
388}
389
390unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
391unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
392
393/// Max number of receivers. Reserve space to lock.
394const MAX_RECEIVERS: usize = usize::MAX >> 2;
395
396/// Create a bounded, multi-producer, multi-consumer channel where each sent
397/// value is broadcasted to all active receivers.
398///
399/// All data sent on [`Sender`] will become available on every active
400/// [`Receiver`] in the same order as it was sent.
401///
402/// The `Sender` can be cloned to `send` to the same channel from multiple
403/// points in the process or it can be used concurrently from an `Arc`. New
404/// `Receiver` handles are created by calling [`Sender::subscribe`].
405///
406/// If all [`Receiver`] handles are dropped, the `send` method will return a
407/// [`SendError`]. Similarly, if all [`Sender`] handles are dropped, the [`recv`]
408/// method will return a [`RecvError`].
409///
410/// [`Sender`]: crate::sync::broadcast::Sender
411/// [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe
412/// [`Receiver`]: crate::sync::broadcast::Receiver
413/// [`recv`]: crate::sync::broadcast::Receiver::recv
414/// [`SendError`]: crate::sync::broadcast::error::SendError
415/// [`RecvError`]: crate::sync::broadcast::error::RecvError
416///
417/// # Examples
418///
419/// ```
420/// use tokio::sync::broadcast;
421///
422/// #[tokio::main]
423/// async fn main() {
424/// let (tx, mut rx1) = broadcast::channel(16);
425/// let mut rx2 = tx.subscribe();
426///
427/// tokio::spawn(async move {
428/// assert_eq!(rx1.recv().await.unwrap(), 10);
429/// assert_eq!(rx1.recv().await.unwrap(), 20);
430/// });
431///
432/// tokio::spawn(async move {
433/// assert_eq!(rx2.recv().await.unwrap(), 10);
434/// assert_eq!(rx2.recv().await.unwrap(), 20);
435/// });
436///
437/// tx.send(10).unwrap();
438/// tx.send(20).unwrap();
439/// }
440/// ```
441///
442/// # Panics
443///
444/// This will panic if `capacity` is equal to `0` or larger
445/// than `usize::MAX / 2`.
446#[track_caller]
447pub fn channel<T: Clone>(mut capacity: usize) -> (Sender<T>, Receiver<T>) {
448 assert!(capacity > 0, "capacity is empty");
449 assert!(capacity <= usize::MAX >> 1, "requested capacity too large");
450
451 // Round to a power of two
452 capacity = capacity.next_power_of_two();
453
454 let mut buffer = Vec::with_capacity(capacity);
455
456 for i in 0..capacity {
457 buffer.push(RwLock::new(Slot {
458 rem: AtomicUsize::new(0),
459 pos: (i as u64).wrapping_sub(capacity as u64),
460 val: UnsafeCell::new(None),
461 }));
462 }
463
464 let shared = Arc::new(Shared {
465 buffer: buffer.into_boxed_slice(),
466 mask: capacity - 1,
467 tail: Mutex::new(Tail {
468 pos: 0,
469 rx_cnt: 1,
470 closed: false,
471 waiters: LinkedList::new(),
472 }),
473 num_tx: AtomicUsize::new(1),
474 });
475
476 let rx = Receiver {
477 shared: shared.clone(),
478 next: 0,
479 };
480
481 let tx = Sender { shared };
482
483 (tx, rx)
484}
485
486unsafe impl<T: Send> Send for Sender<T> {}
487unsafe impl<T: Send> Sync for Sender<T> {}
488
489unsafe impl<T: Send> Send for Receiver<T> {}
490unsafe impl<T: Send> Sync for Receiver<T> {}
491
492impl<T> Sender<T> {
493 /// Attempts to send a value to all active [`Receiver`] handles, returning
494 /// it back if it could not be sent.
495 ///
496 /// A successful send occurs when there is at least one active [`Receiver`]
497 /// handle. An unsuccessful send would be one where all associated
498 /// [`Receiver`] handles have already been dropped.
499 ///
500 /// # Return
501 ///
502 /// On success, the number of subscribed [`Receiver`] handles is returned.
503 /// This does not mean that this number of receivers will see the message as
504 /// a receiver may drop before receiving the message.
505 ///
506 /// # Note
507 ///
508 /// A return value of `Ok` **does not** mean that the sent value will be
509 /// observed by all or any of the active [`Receiver`] handles. [`Receiver`]
510 /// handles may be dropped before receiving the sent message.
511 ///
512 /// A return value of `Err` **does not** mean that future calls to `send`
513 /// will fail. New [`Receiver`] handles may be created by calling
514 /// [`subscribe`].
515 ///
516 /// [`Receiver`]: crate::sync::broadcast::Receiver
517 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
518 ///
519 /// # Examples
520 ///
521 /// ```
522 /// use tokio::sync::broadcast;
523 ///
524 /// #[tokio::main]
525 /// async fn main() {
526 /// let (tx, mut rx1) = broadcast::channel(16);
527 /// let mut rx2 = tx.subscribe();
528 ///
529 /// tokio::spawn(async move {
530 /// assert_eq!(rx1.recv().await.unwrap(), 10);
531 /// assert_eq!(rx1.recv().await.unwrap(), 20);
532 /// });
533 ///
534 /// tokio::spawn(async move {
535 /// assert_eq!(rx2.recv().await.unwrap(), 10);
536 /// assert_eq!(rx2.recv().await.unwrap(), 20);
537 /// });
538 ///
539 /// tx.send(10).unwrap();
540 /// tx.send(20).unwrap();
541 /// }
542 /// ```
543 pub fn send(&self, value: T) -> Result<usize, SendError<T>> {
544 let mut tail = self.shared.tail.lock();
545
546 if tail.rx_cnt == 0 {
547 return Err(SendError(value));
548 }
549
550 // Position to write into
551 let pos = tail.pos;
552 let rem = tail.rx_cnt;
553 let idx = (pos & self.shared.mask as u64) as usize;
554
555 // Update the tail position
556 tail.pos = tail.pos.wrapping_add(1);
557
558 // Get the slot
559 let mut slot = self.shared.buffer[idx].write().unwrap();
560
561 // Track the position
562 slot.pos = pos;
563
564 // Set remaining receivers
565 slot.rem.with_mut(|v| *v = rem);
566
567 // Write the value
568 slot.val = UnsafeCell::new(Some(value));
569
570 // Release the slot lock before notifying the receivers.
571 drop(slot);
572
573 // Notify and release the mutex. This must happen after the slot lock is
574 // released, otherwise the writer lock bit could be cleared while another
575 // thread is in the critical section.
576 self.shared.notify_rx(tail);
577
578 Ok(rem)
579 }
580
581 /// Creates a new [`Receiver`] handle that will receive values sent **after**
582 /// this call to `subscribe`.
583 ///
584 /// # Examples
585 ///
586 /// ```
587 /// use tokio::sync::broadcast;
588 ///
589 /// #[tokio::main]
590 /// async fn main() {
591 /// let (tx, _rx) = broadcast::channel(16);
592 ///
593 /// // Will not be seen
594 /// tx.send(10).unwrap();
595 ///
596 /// let mut rx = tx.subscribe();
597 ///
598 /// tx.send(20).unwrap();
599 ///
600 /// let value = rx.recv().await.unwrap();
601 /// assert_eq!(20, value);
602 /// }
603 /// ```
604 pub fn subscribe(&self) -> Receiver<T> {
605 let shared = self.shared.clone();
606 new_receiver(shared)
607 }
608
609 /// Returns the number of queued values.
610 ///
611 /// A value is queued until it has either been seen by all receivers that were alive at the time
612 /// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
613 /// queue's capacity.
614 ///
615 /// # Note
616 ///
617 /// In contrast to [`Receiver::len`], this method only reports queued values and not values that
618 /// have been evicted from the queue before being seen by all receivers.
619 ///
620 /// # Examples
621 ///
622 /// ```
623 /// use tokio::sync::broadcast;
624 ///
625 /// #[tokio::main]
626 /// async fn main() {
627 /// let (tx, mut rx1) = broadcast::channel(16);
628 /// let mut rx2 = tx.subscribe();
629 ///
630 /// tx.send(10).unwrap();
631 /// tx.send(20).unwrap();
632 /// tx.send(30).unwrap();
633 ///
634 /// assert_eq!(tx.len(), 3);
635 ///
636 /// rx1.recv().await.unwrap();
637 ///
638 /// // The len is still 3 since rx2 hasn't seen the first value yet.
639 /// assert_eq!(tx.len(), 3);
640 ///
641 /// rx2.recv().await.unwrap();
642 ///
643 /// assert_eq!(tx.len(), 2);
644 /// }
645 /// ```
646 pub fn len(&self) -> usize {
647 let tail = self.shared.tail.lock();
648
649 let base_idx = (tail.pos & self.shared.mask as u64) as usize;
650 let mut low = 0;
651 let mut high = self.shared.buffer.len();
652 while low < high {
653 let mid = low + (high - low) / 2;
654 let idx = base_idx.wrapping_add(mid) & self.shared.mask;
655 if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
656 low = mid + 1;
657 } else {
658 high = mid;
659 }
660 }
661
662 self.shared.buffer.len() - low
663 }
664
665 /// Returns true if there are no queued values.
666 ///
667 /// # Examples
668 ///
669 /// ```
670 /// use tokio::sync::broadcast;
671 ///
672 /// #[tokio::main]
673 /// async fn main() {
674 /// let (tx, mut rx1) = broadcast::channel(16);
675 /// let mut rx2 = tx.subscribe();
676 ///
677 /// assert!(tx.is_empty());
678 ///
679 /// tx.send(10).unwrap();
680 ///
681 /// assert!(!tx.is_empty());
682 ///
683 /// rx1.recv().await.unwrap();
684 ///
685 /// // The queue is still not empty since rx2 hasn't seen the value.
686 /// assert!(!tx.is_empty());
687 ///
688 /// rx2.recv().await.unwrap();
689 ///
690 /// assert!(tx.is_empty());
691 /// }
692 /// ```
693 pub fn is_empty(&self) -> bool {
694 let tail = self.shared.tail.lock();
695
696 let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
697 self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
698 }
699
700 /// Returns the number of active receivers
701 ///
702 /// An active receiver is a [`Receiver`] handle returned from [`channel`] or
703 /// [`subscribe`]. These are the handles that will receive values sent on
704 /// this [`Sender`].
705 ///
706 /// # Note
707 ///
708 /// It is not guaranteed that a sent message will reach this number of
709 /// receivers. Active receivers may never call [`recv`] again before
710 /// dropping.
711 ///
712 /// [`recv`]: crate::sync::broadcast::Receiver::recv
713 /// [`Receiver`]: crate::sync::broadcast::Receiver
714 /// [`Sender`]: crate::sync::broadcast::Sender
715 /// [`subscribe`]: crate::sync::broadcast::Sender::subscribe
716 /// [`channel`]: crate::sync::broadcast::channel
717 ///
718 /// # Examples
719 ///
720 /// ```
721 /// use tokio::sync::broadcast;
722 ///
723 /// #[tokio::main]
724 /// async fn main() {
725 /// let (tx, _rx1) = broadcast::channel(16);
726 ///
727 /// assert_eq!(1, tx.receiver_count());
728 ///
729 /// let mut _rx2 = tx.subscribe();
730 ///
731 /// assert_eq!(2, tx.receiver_count());
732 ///
733 /// tx.send(10).unwrap();
734 /// }
735 /// ```
736 pub fn receiver_count(&self) -> usize {
737 let tail = self.shared.tail.lock();
738 tail.rx_cnt
739 }
740
741 /// Returns `true` if senders belong to the same channel.
742 ///
743 /// # Examples
744 ///
745 /// ```
746 /// use tokio::sync::broadcast;
747 ///
748 /// #[tokio::main]
749 /// async fn main() {
750 /// let (tx, _rx) = broadcast::channel::<()>(16);
751 /// let tx2 = tx.clone();
752 ///
753 /// assert!(tx.same_channel(&tx2));
754 ///
755 /// let (tx3, _rx3) = broadcast::channel::<()>(16);
756 ///
757 /// assert!(!tx3.same_channel(&tx2));
758 /// }
759 /// ```
760 pub fn same_channel(&self, other: &Self) -> bool {
761 Arc::ptr_eq(&self.shared, &other.shared)
762 }
763
764 fn close_channel(&self) {
765 let mut tail = self.shared.tail.lock();
766 tail.closed = true;
767
768 self.shared.notify_rx(tail);
769 }
770}
771
772/// Create a new `Receiver` which reads starting from the tail.
773fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
774 let mut tail: MutexGuard<'_, Tail> = shared.tail.lock();
775
776 if tail.rx_cnt == MAX_RECEIVERS {
777 panic!("max receivers");
778 }
779
780 tail.rx_cnt = tail.rx_cnt.checked_add(1).expect(msg:"overflow");
781
782 let next: u64 = tail.pos;
783
784 drop(tail);
785
786 Receiver { shared, next }
787}
788
789impl<T> Shared<T> {
790 fn notify_rx<'a, 'b: 'a>(&'b self, mut tail: MutexGuard<'a, Tail>) {
791 let mut wakers = WakeList::new();
792 'outer: loop {
793 while wakers.can_push() {
794 match tail.waiters.pop_back() {
795 Some(mut waiter) => {
796 // Safety: `tail` lock is still held.
797 let waiter = unsafe { waiter.as_mut() };
798
799 assert!(waiter.queued);
800 waiter.queued = false;
801
802 if let Some(waker) = waiter.waker.take() {
803 wakers.push(waker);
804 }
805 }
806 None => {
807 break 'outer;
808 }
809 }
810 }
811
812 // Release the lock before waking.
813 drop(tail);
814
815 // Before we acquire the lock again all sorts of things can happen:
816 // some waiters may remove themselves from the list and new waiters
817 // may be added. This is fine since at worst we will unnecessarily
818 // wake up waiters which will then queue themselves again.
819
820 wakers.wake_all();
821
822 // Acquire the lock again.
823 tail = self.tail.lock();
824 }
825
826 // Release the lock before waking.
827 drop(tail);
828
829 wakers.wake_all();
830 }
831}
832
833impl<T> Clone for Sender<T> {
834 fn clone(&self) -> Sender<T> {
835 let shared: Arc> = self.shared.clone();
836 shared.num_tx.fetch_add(val:1, order:SeqCst);
837
838 Sender { shared }
839 }
840}
841
842impl<T> Drop for Sender<T> {
843 fn drop(&mut self) {
844 if 1 == self.shared.num_tx.fetch_sub(val:1, order:SeqCst) {
845 self.close_channel();
846 }
847 }
848}
849
850impl<T> Receiver<T> {
851 /// Returns the number of messages that were sent into the channel and that
852 /// this [`Receiver`] has yet to receive.
853 ///
854 /// If the returned value from `len` is larger than the next largest power of 2
855 /// of the capacity of the channel any call to [`recv`] will return an
856 /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an
857 /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10,
858 /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns
859 /// values larger than 16.
860 ///
861 /// [`Receiver`]: crate::sync::broadcast::Receiver
862 /// [`recv`]: crate::sync::broadcast::Receiver::recv
863 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
864 ///
865 /// # Examples
866 ///
867 /// ```
868 /// use tokio::sync::broadcast;
869 ///
870 /// #[tokio::main]
871 /// async fn main() {
872 /// let (tx, mut rx1) = broadcast::channel(16);
873 ///
874 /// tx.send(10).unwrap();
875 /// tx.send(20).unwrap();
876 ///
877 /// assert_eq!(rx1.len(), 2);
878 /// assert_eq!(rx1.recv().await.unwrap(), 10);
879 /// assert_eq!(rx1.len(), 1);
880 /// assert_eq!(rx1.recv().await.unwrap(), 20);
881 /// assert_eq!(rx1.len(), 0);
882 /// }
883 /// ```
884 pub fn len(&self) -> usize {
885 let next_send_pos = self.shared.tail.lock().pos;
886 (next_send_pos - self.next) as usize
887 }
888
889 /// Returns true if there aren't any messages in the channel that the [`Receiver`]
890 /// has yet to receive.
891 ///
892 /// [`Receiver]: create::sync::broadcast::Receiver
893 ///
894 /// # Examples
895 ///
896 /// ```
897 /// use tokio::sync::broadcast;
898 ///
899 /// #[tokio::main]
900 /// async fn main() {
901 /// let (tx, mut rx1) = broadcast::channel(16);
902 ///
903 /// assert!(rx1.is_empty());
904 ///
905 /// tx.send(10).unwrap();
906 /// tx.send(20).unwrap();
907 ///
908 /// assert!(!rx1.is_empty());
909 /// assert_eq!(rx1.recv().await.unwrap(), 10);
910 /// assert_eq!(rx1.recv().await.unwrap(), 20);
911 /// assert!(rx1.is_empty());
912 /// }
913 /// ```
914 pub fn is_empty(&self) -> bool {
915 self.len() == 0
916 }
917
918 /// Returns `true` if receivers belong to the same channel.
919 ///
920 /// # Examples
921 ///
922 /// ```
923 /// use tokio::sync::broadcast;
924 ///
925 /// #[tokio::main]
926 /// async fn main() {
927 /// let (tx, rx) = broadcast::channel::<()>(16);
928 /// let rx2 = tx.subscribe();
929 ///
930 /// assert!(rx.same_channel(&rx2));
931 ///
932 /// let (_tx3, rx3) = broadcast::channel::<()>(16);
933 ///
934 /// assert!(!rx3.same_channel(&rx2));
935 /// }
936 /// ```
937 pub fn same_channel(&self, other: &Self) -> bool {
938 Arc::ptr_eq(&self.shared, &other.shared)
939 }
940
941 /// Locks the next value if there is one.
942 fn recv_ref(
943 &mut self,
944 waiter: Option<(&UnsafeCell<Waiter>, &Waker)>,
945 ) -> Result<RecvGuard<'_, T>, TryRecvError> {
946 let idx = (self.next & self.shared.mask as u64) as usize;
947
948 // The slot holding the next value to read
949 let mut slot = self.shared.buffer[idx].read().unwrap();
950
951 if slot.pos != self.next {
952 // Release the `slot` lock before attempting to acquire the `tail`
953 // lock. This is required because `send2` acquires the tail lock
954 // first followed by the slot lock. Acquiring the locks in reverse
955 // order here would result in a potential deadlock: `recv_ref`
956 // acquires the `slot` lock and attempts to acquire the `tail` lock
957 // while `send2` acquired the `tail` lock and attempts to acquire
958 // the slot lock.
959 drop(slot);
960
961 let mut old_waker = None;
962
963 let mut tail = self.shared.tail.lock();
964
965 // Acquire slot lock again
966 slot = self.shared.buffer[idx].read().unwrap();
967
968 // Make sure the position did not change. This could happen in the
969 // unlikely event that the buffer is wrapped between dropping the
970 // read lock and acquiring the tail lock.
971 if slot.pos != self.next {
972 let next_pos = slot.pos.wrapping_add(self.shared.buffer.len() as u64);
973
974 if next_pos == self.next {
975 // At this point the channel is empty for *this* receiver. If
976 // it's been closed, then that's what we return, otherwise we
977 // set a waker and return empty.
978 if tail.closed {
979 return Err(TryRecvError::Closed);
980 }
981
982 // Store the waker
983 if let Some((waiter, waker)) = waiter {
984 // Safety: called while locked.
985 unsafe {
986 // Only queue if not already queued
987 waiter.with_mut(|ptr| {
988 // If there is no waker **or** if the currently
989 // stored waker references a **different** task,
990 // track the tasks' waker to be notified on
991 // receipt of a new value.
992 match (*ptr).waker {
993 Some(ref w) if w.will_wake(waker) => {}
994 _ => {
995 old_waker = std::mem::replace(
996 &mut (*ptr).waker,
997 Some(waker.clone()),
998 );
999 }
1000 }
1001
1002 if !(*ptr).queued {
1003 (*ptr).queued = true;
1004 tail.waiters.push_front(NonNull::new_unchecked(&mut *ptr));
1005 }
1006 });
1007 }
1008 }
1009
1010 // Drop the old waker after releasing the locks.
1011 drop(slot);
1012 drop(tail);
1013 drop(old_waker);
1014
1015 return Err(TryRecvError::Empty);
1016 }
1017
1018 // At this point, the receiver has lagged behind the sender by
1019 // more than the channel capacity. The receiver will attempt to
1020 // catch up by skipping dropped messages and setting the
1021 // internal cursor to the **oldest** message stored by the
1022 // channel.
1023 let next = tail.pos.wrapping_sub(self.shared.buffer.len() as u64);
1024
1025 let missed = next.wrapping_sub(self.next);
1026
1027 drop(tail);
1028
1029 // The receiver is slow but no values have been missed
1030 if missed == 0 {
1031 self.next = self.next.wrapping_add(1);
1032
1033 return Ok(RecvGuard { slot });
1034 }
1035
1036 self.next = next;
1037
1038 return Err(TryRecvError::Lagged(missed));
1039 }
1040 }
1041
1042 self.next = self.next.wrapping_add(1);
1043
1044 Ok(RecvGuard { slot })
1045 }
1046}
1047
1048impl<T: Clone> Receiver<T> {
1049 /// Re-subscribes to the channel starting from the current tail element.
1050 ///
1051 /// This [`Receiver`] handle will receive a clone of all values sent
1052 /// **after** it has resubscribed. This will not include elements that are
1053 /// in the queue of the current receiver. Consider the following example.
1054 ///
1055 /// # Examples
1056 ///
1057 /// ```
1058 /// use tokio::sync::broadcast;
1059 ///
1060 /// #[tokio::main]
1061 /// async fn main() {
1062 /// let (tx, mut rx) = broadcast::channel(2);
1063 ///
1064 /// tx.send(1).unwrap();
1065 /// let mut rx2 = rx.resubscribe();
1066 /// tx.send(2).unwrap();
1067 ///
1068 /// assert_eq!(rx2.recv().await.unwrap(), 2);
1069 /// assert_eq!(rx.recv().await.unwrap(), 1);
1070 /// }
1071 /// ```
1072 pub fn resubscribe(&self) -> Self {
1073 let shared = self.shared.clone();
1074 new_receiver(shared)
1075 }
1076 /// Receives the next value for this receiver.
1077 ///
1078 /// Each [`Receiver`] handle will receive a clone of all values sent
1079 /// **after** it has subscribed.
1080 ///
1081 /// `Err(RecvError::Closed)` is returned when all `Sender` halves have
1082 /// dropped, indicating that no further values can be sent on the channel.
1083 ///
1084 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1085 /// sent values will overwrite old values. At this point, a call to [`recv`]
1086 /// will return with `Err(RecvError::Lagged)` and the [`Receiver`]'s
1087 /// internal cursor is updated to point to the oldest value still held by
1088 /// the channel. A subsequent call to [`recv`] will return this value
1089 /// **unless** it has been since overwritten.
1090 ///
1091 /// # Cancel safety
1092 ///
1093 /// This method is cancel safe. If `recv` is used as the event in a
1094 /// [`tokio::select!`](crate::select) statement and some other branch
1095 /// completes first, it is guaranteed that no messages were received on this
1096 /// channel.
1097 ///
1098 /// [`Receiver`]: crate::sync::broadcast::Receiver
1099 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1100 ///
1101 /// # Examples
1102 ///
1103 /// ```
1104 /// use tokio::sync::broadcast;
1105 ///
1106 /// #[tokio::main]
1107 /// async fn main() {
1108 /// let (tx, mut rx1) = broadcast::channel(16);
1109 /// let mut rx2 = tx.subscribe();
1110 ///
1111 /// tokio::spawn(async move {
1112 /// assert_eq!(rx1.recv().await.unwrap(), 10);
1113 /// assert_eq!(rx1.recv().await.unwrap(), 20);
1114 /// });
1115 ///
1116 /// tokio::spawn(async move {
1117 /// assert_eq!(rx2.recv().await.unwrap(), 10);
1118 /// assert_eq!(rx2.recv().await.unwrap(), 20);
1119 /// });
1120 ///
1121 /// tx.send(10).unwrap();
1122 /// tx.send(20).unwrap();
1123 /// }
1124 /// ```
1125 ///
1126 /// Handling lag
1127 ///
1128 /// ```
1129 /// use tokio::sync::broadcast;
1130 ///
1131 /// #[tokio::main]
1132 /// async fn main() {
1133 /// let (tx, mut rx) = broadcast::channel(2);
1134 ///
1135 /// tx.send(10).unwrap();
1136 /// tx.send(20).unwrap();
1137 /// tx.send(30).unwrap();
1138 ///
1139 /// // The receiver lagged behind
1140 /// assert!(rx.recv().await.is_err());
1141 ///
1142 /// // At this point, we can abort or continue with lost messages
1143 ///
1144 /// assert_eq!(20, rx.recv().await.unwrap());
1145 /// assert_eq!(30, rx.recv().await.unwrap());
1146 /// }
1147 /// ```
1148 pub async fn recv(&mut self) -> Result<T, RecvError> {
1149 let fut = Recv::new(self);
1150 fut.await
1151 }
1152
1153 /// Attempts to return a pending value on this receiver without awaiting.
1154 ///
1155 /// This is useful for a flavor of "optimistic check" before deciding to
1156 /// await on a receiver.
1157 ///
1158 /// Compared with [`recv`], this function has three failure cases instead of two
1159 /// (one for closed, one for an empty buffer, one for a lagging receiver).
1160 ///
1161 /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have
1162 /// dropped, indicating that no further values can be sent on the channel.
1163 ///
1164 /// If the [`Receiver`] handle falls behind, once the channel is full, newly
1165 /// sent values will overwrite old values. At this point, a call to [`recv`]
1166 /// will return with `Err(TryRecvError::Lagged)` and the [`Receiver`]'s
1167 /// internal cursor is updated to point to the oldest value still held by
1168 /// the channel. A subsequent call to [`try_recv`] will return this value
1169 /// **unless** it has been since overwritten. If there are no values to
1170 /// receive, `Err(TryRecvError::Empty)` is returned.
1171 ///
1172 /// [`recv`]: crate::sync::broadcast::Receiver::recv
1173 /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv
1174 /// [`Receiver`]: crate::sync::broadcast::Receiver
1175 ///
1176 /// # Examples
1177 ///
1178 /// ```
1179 /// use tokio::sync::broadcast;
1180 ///
1181 /// #[tokio::main]
1182 /// async fn main() {
1183 /// let (tx, mut rx) = broadcast::channel(16);
1184 ///
1185 /// assert!(rx.try_recv().is_err());
1186 ///
1187 /// tx.send(10).unwrap();
1188 ///
1189 /// let value = rx.try_recv().unwrap();
1190 /// assert_eq!(10, value);
1191 /// }
1192 /// ```
1193 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1194 let guard = self.recv_ref(None)?;
1195 guard.clone_value().ok_or(TryRecvError::Closed)
1196 }
1197
1198 /// Blocking receive to call outside of asynchronous contexts.
1199 ///
1200 /// # Panics
1201 ///
1202 /// This function panics if called within an asynchronous execution
1203 /// context.
1204 ///
1205 /// # Examples
1206 /// ```
1207 /// use std::thread;
1208 /// use tokio::sync::broadcast;
1209 ///
1210 /// #[tokio::main]
1211 /// async fn main() {
1212 /// let (tx, mut rx) = broadcast::channel(16);
1213 ///
1214 /// let sync_code = thread::spawn(move || {
1215 /// assert_eq!(rx.blocking_recv(), Ok(10));
1216 /// });
1217 ///
1218 /// let _ = tx.send(10);
1219 /// sync_code.join().unwrap();
1220 /// }
1221 pub fn blocking_recv(&mut self) -> Result<T, RecvError> {
1222 crate::future::block_on(self.recv())
1223 }
1224}
1225
1226impl<T> Drop for Receiver<T> {
1227 fn drop(&mut self) {
1228 let mut tail: MutexGuard<'_, Tail> = self.shared.tail.lock();
1229
1230 tail.rx_cnt -= 1;
1231 let until: u64 = tail.pos;
1232
1233 drop(tail);
1234
1235 while self.next < until {
1236 match self.recv_ref(waiter:None) {
1237 Ok(_) => {}
1238 // The channel is closed
1239 Err(TryRecvError::Closed) => break,
1240 // Ignore lagging, we will catch up
1241 Err(TryRecvError::Lagged(..)) => {}
1242 // Can't be empty
1243 Err(TryRecvError::Empty) => panic!("unexpected empty broadcast channel"),
1244 }
1245 }
1246 }
1247}
1248
1249impl<'a, T> Recv<'a, T> {
1250 fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
1251 Recv {
1252 receiver,
1253 waiter: UnsafeCell::new(Waiter {
1254 queued: false,
1255 waker: None,
1256 pointers: linked_list::Pointers::new(),
1257 _p: PhantomPinned,
1258 }),
1259 }
1260 }
1261
1262 /// A custom `project` implementation is used in place of `pin-project-lite`
1263 /// as a custom drop implementation is needed.
1264 fn project(self: Pin<&mut Self>) -> (&mut Receiver<T>, &UnsafeCell<Waiter>) {
1265 unsafe {
1266 // Safety: Receiver is Unpin
1267 is_unpin::<&mut Receiver<T>>();
1268
1269 let me = self.get_unchecked_mut();
1270 (me.receiver, &me.waiter)
1271 }
1272 }
1273}
1274
1275impl<'a, T> Future for Recv<'a, T>
1276where
1277 T: Clone,
1278{
1279 type Output = Result<T, RecvError>;
1280
1281 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
1282 ready!(crate::trace::trace_leaf(cx));
1283
1284 let (receiver: &mut Receiver, waiter: &UnsafeCell) = self.project();
1285
1286 let guard: RecvGuard<'_, T> = match receiver.recv_ref(waiter:Some((waiter, cx.waker()))) {
1287 Ok(value: RecvGuard<'_, T>) => value,
1288 Err(TryRecvError::Empty) => return Poll::Pending,
1289 Err(TryRecvError::Lagged(n: u64)) => return Poll::Ready(Err(RecvError::Lagged(n))),
1290 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1291 };
1292
1293 Poll::Ready(guard.clone_value().ok_or(err:RecvError::Closed))
1294 }
1295}
1296
1297impl<'a, T> Drop for Recv<'a, T> {
1298 fn drop(&mut self) {
1299 // Acquire the tail lock. This is required for safety before accessing
1300 // the waiter node.
1301 let mut tail: MutexGuard<'_, Tail> = self.receiver.shared.tail.lock();
1302
1303 // safety: tail lock is held
1304 let queued: bool = self.waiter.with(|ptr: *const Waiter| unsafe { (*ptr).queued });
1305
1306 if queued {
1307 // Remove the node
1308 //
1309 // safety: tail lock is held and the wait node is verified to be in
1310 // the list.
1311 unsafe {
1312 self.waiter.with_mut(|ptr: *mut Waiter| {
1313 tail.waiters.remove((&mut *ptr).into());
1314 });
1315 }
1316 }
1317 }
1318}
1319
1320/// # Safety
1321///
1322/// `Waiter` is forced to be !Unpin.
1323unsafe impl linked_list::Link for Waiter {
1324 type Handle = NonNull<Waiter>;
1325 type Target = Waiter;
1326
1327 fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
1328 *handle
1329 }
1330
1331 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
1332 ptr
1333 }
1334
1335 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
1336 Waiter::addr_of_pointers(me:target)
1337 }
1338}
1339
1340impl<T> fmt::Debug for Sender<T> {
1341 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1342 write!(fmt, "broadcast::Sender")
1343 }
1344}
1345
1346impl<T> fmt::Debug for Receiver<T> {
1347 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1348 write!(fmt, "broadcast::Receiver")
1349 }
1350}
1351
1352impl<'a, T> RecvGuard<'a, T> {
1353 fn clone_value(&self) -> Option<T>
1354 where
1355 T: Clone,
1356 {
1357 self.slot.val.with(|ptr: *const Option| unsafe { (*ptr).clone() })
1358 }
1359}
1360
1361impl<'a, T> Drop for RecvGuard<'a, T> {
1362 fn drop(&mut self) {
1363 // Decrement the remaining counter
1364 if 1 == self.slot.rem.fetch_sub(val:1, order:SeqCst) {
1365 // Safety: Last receiver, drop the value
1366 self.slot.val.with_mut(|ptr: *mut Option| unsafe { *ptr = None });
1367 }
1368 }
1369}
1370
1371fn is_unpin<T: Unpin>() {}
1372