1//! Async broadcast channel
2//!
3//! An async multi-producer multi-consumer broadcast channel, where each consumer gets a clone of every
4//! message sent on the channel. For obvious reasons, the channel can only be used to broadcast types
5//! that implement [`Clone`].
6//!
7//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
8//! among multiple threads.
9//!
10//! When all `Sender`s or all `Receiver`s are dropped, the channel becomes closed. When a channel is
11//! closed, no more messages can be sent, but remaining messages can still be received.
12//!
13//! The channel can also be closed manually by calling [`Sender::close()`] or [`Receiver::close()`].
14//!
15//! ## Examples
16//!
17//! ```rust
18//! use async_broadcast::{broadcast, TryRecvError};
19//! use futures_lite::{future::block_on, stream::StreamExt};
20//!
21//! block_on(async move {
22//! let (s1, mut r1) = broadcast(2);
23//! let s2 = s1.clone();
24//! let mut r2 = r1.clone();
25//!
26//! // Send 2 messages from two different senders.
27//! s1.broadcast(7).await.unwrap();
28//! s2.broadcast(8).await.unwrap();
29//!
30//! // Channel is now at capacity so sending more messages will result in an error.
31//! assert!(s2.try_broadcast(9).unwrap_err().is_full());
32//! assert!(s1.try_broadcast(10).unwrap_err().is_full());
33//!
34//! // We can use `recv` method of the `Stream` implementation to receive messages.
35//! assert_eq!(r1.next().await.unwrap(), 7);
36//! assert_eq!(r1.recv().await.unwrap(), 8);
37//! assert_eq!(r2.next().await.unwrap(), 7);
38//! assert_eq!(r2.recv().await.unwrap(), 8);
39//!
40//! // All receiver got all messages so channel is now empty.
41//! assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
42//! assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
43//!
44//! // Drop both senders, which closes the channel.
45//! drop(s1);
46//! drop(s2);
47//!
48//! assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
49//! assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
50//! })
51//! ```
52//!
53//! ## Difference with `async-channel`
54//!
55//! This crate is similar to [`async-channel`] in that they both provide an MPMC channel but the
56//! main difference being that in `async-channel`, each message sent on the channel is only received
57//! by one of the receivers. `async-broadcast` on the other hand, delivers each message to every
58//! receiver (IOW broadcast) by cloning it for each receiver.
59//!
60//! [`async-channel`]: https://crates.io/crates/async-channel
61//!
62//! ## Difference with other broadcast crates
63//!
64//! * [`broadcaster`]: The main difference would be that `broadcaster` doesn't have a sender and
65//! receiver split and both sides use clones of the same BroadcastChannel instance. The messages
66//! are sent are sent to all channel clones. While this can work for many cases, the lack of
67//! sender and receiver split, means that often times, you'll find yourself having to drain the
68//! channel on the sending side yourself.
69//!
70//! * [`postage`]: this crate provides a [broadcast API][pba] similar to `async_broadcast`. However,
71//! it:
72//! - (at the time of this writing) duplicates [futures] API, which isn't ideal.
73//! - Does not support overflow mode nor has the concept of inactive receivers, so a slow or
74//! inactive receiver blocking the whole channel is not a solvable problem.
75//! - Provides all kinds of channels, which is generally good but if you just need a broadcast
76//! channel, `async_broadcast` is probably a better choice.
77//!
78//! * [`tokio::sync`]: Tokio's `sync` module provides a [broadcast channel][tbc] API. The differences
79//! here are:
80//! - While this implementation does provide [overflow mode][tom], it is the default behavior and not
81//! opt-in.
82//! - There is no equivalent of inactive receivers.
83//! - While it's possible to build tokio with only the `sync` module, it comes with other APIs that
84//! you may not need.
85//!
86//! [`broadcaster`]: https://crates.io/crates/broadcaster
87//! [`postage`]: https://crates.io/crates/postage
88//! [pba]: https://docs.rs/postage/0.4.1/postage/broadcast/fn.channel.html
89//! [futures]: https://crates.io/crates/futures
90//! [`tokio::sync`]: https://docs.rs/tokio/1.6.0/tokio/sync
91//! [tbc]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html
92//! [tom]: https://docs.rs/tokio/1.6.0/tokio/sync/broadcast/index.html#lagging
93//!
94#![forbid(unsafe_code, future_incompatible, rust_2018_idioms)]
95#![deny(missing_debug_implementations, nonstandard_style)]
96#![warn(missing_docs, rustdoc::missing_doc_code_examples, unreachable_pub)]
97
98#[cfg(doctest)]
99mod doctests {
100 doc_comment::doctest!("../README.md");
101}
102
103use std::collections::VecDeque;
104use std::convert::TryInto;
105use std::error;
106use std::fmt;
107use std::future::Future;
108use std::pin::Pin;
109use std::sync::{Arc, RwLock};
110use std::task::{Context, Poll};
111
112use event_listener::{Event, EventListener};
113use futures_core::{ready, stream::Stream};
114
115/// Create a new broadcast channel.
116///
117/// The created channel has space to hold at most `cap` messages at a time.
118///
119/// # Panics
120///
121/// Capacity must be a positive number. If `cap` is zero, this function will panic.
122///
123/// # Examples
124///
125/// ```
126/// # futures_lite::future::block_on(async {
127/// use async_broadcast::{broadcast, TryRecvError, TrySendError};
128///
129/// let (s, mut r1) = broadcast(1);
130/// let mut r2 = r1.clone();
131///
132/// assert_eq!(s.broadcast(10).await, Ok(None));
133/// assert_eq!(s.try_broadcast(20), Err(TrySendError::Full(20)));
134///
135/// assert_eq!(r1.recv().await, Ok(10));
136/// assert_eq!(r2.recv().await, Ok(10));
137/// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
138/// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
139/// # });
140/// ```
141pub fn broadcast<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
142 assert!(cap > 0, "capacity cannot be zero");
143
144 let inner = Arc::new(RwLock::new(Inner {
145 queue: VecDeque::with_capacity(cap),
146 capacity: cap,
147 overflow: false,
148 await_active: true,
149 receiver_count: 1,
150 inactive_receiver_count: 0,
151 sender_count: 1,
152 head_pos: 0,
153 is_closed: false,
154 send_ops: Event::new(),
155 recv_ops: Event::new(),
156 }));
157
158 let s = Sender {
159 inner: inner.clone(),
160 };
161 let r = Receiver {
162 inner,
163 pos: 0,
164 listener: None,
165 };
166
167 (s, r)
168}
169
170#[derive(Debug)]
171struct Inner<T> {
172 queue: VecDeque<(T, usize)>,
173 // We assign the same capacity to the queue but that's just specifying the minimum capacity and
174 // the actual capacity could be anything. Hence the need to keep track of our own set capacity.
175 capacity: usize,
176 receiver_count: usize,
177 inactive_receiver_count: usize,
178 sender_count: usize,
179 /// Send sequence number of the front of the queue
180 head_pos: u64,
181 overflow: bool,
182 await_active: bool,
183
184 is_closed: bool,
185
186 /// Send operations waiting while the channel is full.
187 send_ops: Event,
188
189 /// Receive operations waiting while the channel is empty and not closed.
190 recv_ops: Event,
191}
192
193impl<T> Inner<T> {
194 /// Try receiving at the given position, returning either the element or a reference to it.
195 ///
196 /// Result is used here instead of Cow because we don't have a Clone bound on T.
197 fn try_recv_at(&mut self, pos: &mut u64) -> Result<Result<T, &T>, TryRecvError> {
198 let i = match pos.checked_sub(self.head_pos) {
199 Some(i) => i
200 .try_into()
201 .expect("Head position more than usize::MAX behind a receiver"),
202 None => {
203 let count = self.head_pos - *pos;
204 *pos = self.head_pos;
205 return Err(TryRecvError::Overflowed(count));
206 }
207 };
208
209 let last_waiter;
210 if let Some((_elt, waiters)) = self.queue.get_mut(i) {
211 *pos += 1;
212 *waiters -= 1;
213 last_waiter = *waiters == 0;
214 } else {
215 debug_assert_eq!(i, self.queue.len());
216 if self.is_closed {
217 return Err(TryRecvError::Closed);
218 } else {
219 return Err(TryRecvError::Empty);
220 }
221 }
222
223 // If we read from the front of the queue and this is the last receiver reading it
224 // we can pop the queue instead of cloning the message
225 if last_waiter {
226 // Only the first element of the queue should have 0 waiters
227 assert_eq!(i, 0);
228
229 // Remove the element from the queue, adjust space, and notify senders
230 let elt = self.queue.pop_front().unwrap().0;
231 self.head_pos += 1;
232 if !self.overflow {
233 // Notify 1 awaiting senders that there is now room. If there is still room in the
234 // queue, the notified operation will notify another awaiting sender.
235 self.send_ops.notify(1);
236 }
237
238 Ok(Ok(elt))
239 } else {
240 Ok(Err(&self.queue[i].0))
241 }
242 }
243
244 /// Closes the channel and notifies all waiting operations.
245 ///
246 /// Returns `true` if this call has closed the channel and it was not closed already.
247 fn close(&mut self) -> bool {
248 if self.is_closed {
249 return false;
250 }
251
252 self.is_closed = true;
253 // Notify all waiting senders and receivers.
254 self.send_ops.notify(usize::MAX);
255 self.recv_ops.notify(usize::MAX);
256
257 true
258 }
259
260 /// Set the channel capacity.
261 ///
262 /// There are times when you need to change the channel's capacity after creating it. If the
263 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
264 /// dropped to shrink the channel.
265 fn set_capacity(&mut self, new_cap: usize) {
266 self.capacity = new_cap;
267 if new_cap > self.queue.capacity() {
268 let diff = new_cap - self.queue.capacity();
269 self.queue.reserve(diff);
270 }
271
272 // Ensure queue doesn't have more than `new_cap` messages.
273 if new_cap < self.queue.len() {
274 let diff = self.queue.len() - new_cap;
275 self.queue.drain(0..diff);
276 self.head_pos += diff as u64;
277 }
278 }
279
280 /// Close the channel if there aren't any receivers present anymore
281 fn close_channel(&mut self) {
282 if self.receiver_count == 0 && self.inactive_receiver_count == 0 {
283 self.close();
284 }
285 }
286}
287
288/// The sending side of the broadcast channel.
289///
290/// Senders can be cloned and shared among threads. When all senders associated with a channel are
291/// dropped, the channel becomes closed.
292///
293/// The channel can also be closed manually by calling [`Sender::close()`].
294#[derive(Debug)]
295pub struct Sender<T> {
296 inner: Arc<RwLock<Inner<T>>>,
297}
298
299impl<T> Sender<T> {
300 /// Returns the channel capacity.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use async_broadcast::broadcast;
306 ///
307 /// let (s, r) = broadcast::<i32>(5);
308 /// assert_eq!(s.capacity(), 5);
309 /// ```
310 pub fn capacity(&self) -> usize {
311 self.inner.read().unwrap().capacity
312 }
313
314 /// Set the channel capacity.
315 ///
316 /// There are times when you need to change the channel's capacity after creating it. If the
317 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
318 /// dropped to shrink the channel.
319 ///
320 /// # Examples
321 ///
322 /// ```
323 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
324 ///
325 /// let (mut s, mut r) = broadcast::<i32>(3);
326 /// assert_eq!(s.capacity(), 3);
327 /// s.try_broadcast(1).unwrap();
328 /// s.try_broadcast(2).unwrap();
329 /// s.try_broadcast(3).unwrap();
330 ///
331 /// s.set_capacity(1);
332 /// assert_eq!(s.capacity(), 1);
333 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
334 /// assert_eq!(r.try_recv().unwrap(), 3);
335 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
336 /// s.try_broadcast(1).unwrap();
337 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
338 ///
339 /// s.set_capacity(2);
340 /// assert_eq!(s.capacity(), 2);
341 /// s.try_broadcast(2).unwrap();
342 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
343 /// ```
344 pub fn set_capacity(&mut self, new_cap: usize) {
345 self.inner.write().unwrap().set_capacity(new_cap);
346 }
347
348 /// If overflow mode is enabled on this channel.
349 ///
350 /// # Examples
351 ///
352 /// ```
353 /// use async_broadcast::broadcast;
354 ///
355 /// let (s, r) = broadcast::<i32>(5);
356 /// assert!(!s.overflow());
357 /// ```
358 pub fn overflow(&self) -> bool {
359 self.inner.read().unwrap().overflow
360 }
361
362 /// Set overflow mode on the channel.
363 ///
364 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
365 /// full. It achieves that by removing the oldest message from the channel.
366 ///
367 /// # Examples
368 ///
369 /// ```
370 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
371 ///
372 /// let (mut s, mut r) = broadcast::<i32>(2);
373 /// s.try_broadcast(1).unwrap();
374 /// s.try_broadcast(2).unwrap();
375 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
376 /// s.set_overflow(true);
377 /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
378 /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
379 ///
380 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
381 /// assert_eq!(r.try_recv().unwrap(), 3);
382 /// assert_eq!(r.try_recv().unwrap(), 4);
383 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
384 /// ```
385 pub fn set_overflow(&mut self, overflow: bool) {
386 self.inner.write().unwrap().overflow = overflow;
387 }
388
389 /// If sender will wait for active receivers.
390 ///
391 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
392 /// `true`.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use async_broadcast::broadcast;
398 ///
399 /// let (s, _) = broadcast::<i32>(5);
400 /// assert!(s.await_active());
401 /// ```
402 pub fn await_active(&self) -> bool {
403 self.inner.read().unwrap().await_active
404 }
405
406 /// Specify if sender will wait for active receivers.
407 ///
408 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
409 /// `true`.
410 ///
411 /// # Examples
412 ///
413 /// ```
414 /// # futures_lite::future::block_on(async {
415 /// use async_broadcast::broadcast;
416 ///
417 /// let (mut s, mut r) = broadcast::<i32>(2);
418 /// s.broadcast(1).await.unwrap();
419 ///
420 /// let _ = r.deactivate();
421 /// s.set_await_active(false);
422 /// assert!(s.broadcast(2).await.is_err());
423 /// # });
424 /// ```
425 pub fn set_await_active(&mut self, await_active: bool) {
426 self.inner.write().unwrap().await_active = await_active;
427 }
428
429 /// Closes the channel.
430 ///
431 /// Returns `true` if this call has closed the channel and it was not closed already.
432 ///
433 /// The remaining messages can still be received.
434 ///
435 /// # Examples
436 ///
437 /// ```
438 /// # futures_lite::future::block_on(async {
439 /// use async_broadcast::{broadcast, RecvError};
440 ///
441 /// let (s, mut r) = broadcast(1);
442 /// s.broadcast(1).await.unwrap();
443 /// assert!(s.close());
444 ///
445 /// assert_eq!(r.recv().await.unwrap(), 1);
446 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
447 /// # });
448 /// ```
449 pub fn close(&self) -> bool {
450 self.inner.write().unwrap().close()
451 }
452
453 /// Returns `true` if the channel is closed.
454 ///
455 /// # Examples
456 ///
457 /// ```
458 /// # futures_lite::future::block_on(async {
459 /// use async_broadcast::{broadcast, RecvError};
460 ///
461 /// let (s, r) = broadcast::<()>(1);
462 /// assert!(!s.is_closed());
463 ///
464 /// drop(r);
465 /// assert!(s.is_closed());
466 /// # });
467 /// ```
468 pub fn is_closed(&self) -> bool {
469 self.inner.read().unwrap().is_closed
470 }
471
472 /// Returns `true` if the channel is empty.
473 ///
474 /// # Examples
475 ///
476 /// ```
477 /// # futures_lite::future::block_on(async {
478 /// use async_broadcast::broadcast;
479 ///
480 /// let (s, r) = broadcast(1);
481 ///
482 /// assert!(s.is_empty());
483 /// s.broadcast(1).await;
484 /// assert!(!s.is_empty());
485 /// # });
486 /// ```
487 pub fn is_empty(&self) -> bool {
488 self.inner.read().unwrap().queue.is_empty()
489 }
490
491 /// Returns `true` if the channel is full.
492 ///
493 /// # Examples
494 ///
495 /// ```
496 /// # futures_lite::future::block_on(async {
497 /// use async_broadcast::broadcast;
498 ///
499 /// let (s, r) = broadcast(1);
500 ///
501 /// assert!(!s.is_full());
502 /// s.broadcast(1).await;
503 /// assert!(s.is_full());
504 /// # });
505 /// ```
506 pub fn is_full(&self) -> bool {
507 let inner = self.inner.read().unwrap();
508
509 inner.queue.len() == inner.capacity
510 }
511
512 /// Returns the number of messages in the channel.
513 ///
514 /// # Examples
515 ///
516 /// ```
517 /// # futures_lite::future::block_on(async {
518 /// use async_broadcast::broadcast;
519 ///
520 /// let (s, r) = broadcast(2);
521 /// assert_eq!(s.len(), 0);
522 ///
523 /// s.broadcast(1).await;
524 /// s.broadcast(2).await;
525 /// assert_eq!(s.len(), 2);
526 /// # });
527 /// ```
528 pub fn len(&self) -> usize {
529 self.inner.read().unwrap().queue.len()
530 }
531
532 /// Returns the number of receivers for the channel.
533 ///
534 /// This does not include inactive receivers. Use [`Sender::inactive_receiver_count`] if you
535 /// are interested in that.
536 ///
537 /// # Examples
538 ///
539 /// ```
540 /// use async_broadcast::broadcast;
541 ///
542 /// let (s, r) = broadcast::<()>(1);
543 /// assert_eq!(s.receiver_count(), 1);
544 /// let r = r.deactivate();
545 /// assert_eq!(s.receiver_count(), 0);
546 ///
547 /// let r2 = r.activate_cloned();
548 /// assert_eq!(r.receiver_count(), 1);
549 /// assert_eq!(r.inactive_receiver_count(), 1);
550 /// ```
551 pub fn receiver_count(&self) -> usize {
552 self.inner.read().unwrap().receiver_count
553 }
554
555 /// Returns the number of inactive receivers for the channel.
556 ///
557 /// # Examples
558 ///
559 /// ```
560 /// use async_broadcast::broadcast;
561 ///
562 /// let (s, r) = broadcast::<()>(1);
563 /// assert_eq!(s.receiver_count(), 1);
564 /// let r = r.deactivate();
565 /// assert_eq!(s.receiver_count(), 0);
566 ///
567 /// let r2 = r.activate_cloned();
568 /// assert_eq!(r.receiver_count(), 1);
569 /// assert_eq!(r.inactive_receiver_count(), 1);
570 /// ```
571 pub fn inactive_receiver_count(&self) -> usize {
572 self.inner.read().unwrap().inactive_receiver_count
573 }
574
575 /// Returns the number of senders for the channel.
576 ///
577 /// # Examples
578 ///
579 /// ```
580 /// # futures_lite::future::block_on(async {
581 /// use async_broadcast::broadcast;
582 ///
583 /// let (s, r) = broadcast::<()>(1);
584 /// assert_eq!(s.sender_count(), 1);
585 ///
586 /// let s2 = s.clone();
587 /// assert_eq!(s.sender_count(), 2);
588 /// # });
589 /// ```
590 pub fn sender_count(&self) -> usize {
591 self.inner.read().unwrap().sender_count
592 }
593
594 /// Produce a new Receiver for this channel.
595 ///
596 /// The new receiver starts with zero messages available. This will not re-open the channel if
597 /// it was closed due to all receivers being dropped.
598 ///
599 /// # Examples
600 ///
601 /// ```
602 /// # futures_lite::future::block_on(async {
603 /// use async_broadcast::{broadcast, RecvError};
604 ///
605 /// let (s, mut r1) = broadcast(2);
606 ///
607 /// assert_eq!(s.broadcast(1).await, Ok(None));
608 ///
609 /// let mut r2 = s.new_receiver();
610 ///
611 /// assert_eq!(s.broadcast(2).await, Ok(None));
612 /// drop(s);
613 ///
614 /// assert_eq!(r1.recv().await, Ok(1));
615 /// assert_eq!(r1.recv().await, Ok(2));
616 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
617 ///
618 /// assert_eq!(r2.recv().await, Ok(2));
619 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
620 /// # });
621 /// ```
622 pub fn new_receiver(&self) -> Receiver<T> {
623 let mut inner = self.inner.write().unwrap();
624 inner.receiver_count += 1;
625 Receiver {
626 inner: self.inner.clone(),
627 pos: inner.head_pos + inner.queue.len() as u64,
628 listener: None,
629 }
630 }
631}
632
633impl<T: Clone> Sender<T> {
634 /// Broadcasts a message on the channel.
635 ///
636 /// If the channel is full, this method waits until there is space for a message unless:
637 ///
638 /// 1. overflow mode (set through [`Sender::set_overflow`]) is enabled, in which case it removes
639 /// the oldest message from the channel to make room for the new message. The removed message
640 /// is returned to the caller.
641 /// 2. this behavior is disabled using [`Sender::set_await_active`], in which case, it returns
642 /// [`SendError`] immediately.
643 ///
644 /// If the channel is closed, this method returns an error.
645 ///
646 /// # Examples
647 ///
648 /// ```
649 /// # futures_lite::future::block_on(async {
650 /// use async_broadcast::{broadcast, SendError};
651 ///
652 /// let (s, r) = broadcast(1);
653 ///
654 /// assert_eq!(s.broadcast(1).await, Ok(None));
655 /// drop(r);
656 /// assert_eq!(s.broadcast(2).await, Err(SendError(2)));
657 /// # });
658 /// ```
659 pub fn broadcast(&self, msg: T) -> Send<'_, T> {
660 Send {
661 sender: self,
662 listener: None,
663 msg: Some(msg),
664 }
665 }
666
667 /// Attempts to broadcast a message on the channel.
668 ///
669 /// If the channel is full, this method returns an error unless overflow mode (set through
670 /// [`Sender::set_overflow`]) is enabled. If the overflow mode is enabled, it removes the
671 /// oldest message from the channel to make room for the new message. The removed message
672 /// is returned to the caller.
673 ///
674 /// If the channel is closed, this method returns an error.
675 ///
676 /// # Examples
677 ///
678 /// ```
679 /// use async_broadcast::{broadcast, TrySendError};
680 ///
681 /// let (s, r) = broadcast(1);
682 ///
683 /// assert_eq!(s.try_broadcast(1), Ok(None));
684 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
685 ///
686 /// drop(r);
687 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Closed(3)));
688 /// ```
689 pub fn try_broadcast(&self, msg: T) -> Result<Option<T>, TrySendError<T>> {
690 let mut ret = None;
691 let mut inner = self.inner.write().unwrap();
692
693 if inner.is_closed {
694 return Err(TrySendError::Closed(msg));
695 } else if inner.receiver_count == 0 {
696 assert!(inner.inactive_receiver_count != 0);
697
698 return Err(TrySendError::Inactive(msg));
699 } else if inner.queue.len() == inner.capacity {
700 if inner.overflow {
701 // Make room by popping a message.
702 ret = inner.queue.pop_front().map(|(m, _)| m);
703 } else {
704 return Err(TrySendError::Full(msg));
705 }
706 }
707 let receiver_count = inner.receiver_count;
708 inner.queue.push_back((msg, receiver_count));
709 if ret.is_some() {
710 inner.head_pos += 1;
711 }
712
713 // Notify all awaiting receive operations.
714 inner.recv_ops.notify(usize::MAX);
715
716 Ok(ret)
717 }
718}
719
720impl<T> Drop for Sender<T> {
721 fn drop(&mut self) {
722 let mut inner: RwLockWriteGuard<'_, Inner<…>> = self.inner.write().unwrap();
723
724 inner.sender_count -= 1;
725
726 if inner.sender_count == 0 {
727 inner.close();
728 }
729 }
730}
731
732impl<T> Clone for Sender<T> {
733 fn clone(&self) -> Self {
734 self.inner.write().unwrap().sender_count += 1;
735
736 Sender {
737 inner: self.inner.clone(),
738 }
739 }
740}
741
742/// The receiving side of a channel.
743///
744/// Receivers can be cloned and shared among threads. When all (active) receivers associated with a
745/// channel are dropped, the channel becomes closed. You can deactivate a receiver using
746/// [`Receiver::deactivate`] if you would like the channel to remain open without keeping active
747/// receivers around.
748#[derive(Debug)]
749pub struct Receiver<T> {
750 inner: Arc<RwLock<Inner<T>>>,
751 pos: u64,
752
753 /// Listens for a send or close event to unblock this stream.
754 listener: Option<EventListener>,
755}
756
757impl<T> Receiver<T> {
758 /// Returns the channel capacity.
759 ///
760 /// # Examples
761 ///
762 /// ```
763 /// use async_broadcast::broadcast;
764 ///
765 /// let (_s, r) = broadcast::<i32>(5);
766 /// assert_eq!(r.capacity(), 5);
767 /// ```
768 pub fn capacity(&self) -> usize {
769 self.inner.read().unwrap().capacity
770 }
771
772 /// Set the channel capacity.
773 ///
774 /// There are times when you need to change the channel's capacity after creating it. If the
775 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
776 /// dropped to shrink the channel.
777 ///
778 /// # Examples
779 ///
780 /// ```
781 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
782 ///
783 /// let (s, mut r) = broadcast::<i32>(3);
784 /// assert_eq!(r.capacity(), 3);
785 /// s.try_broadcast(1).unwrap();
786 /// s.try_broadcast(2).unwrap();
787 /// s.try_broadcast(3).unwrap();
788 ///
789 /// r.set_capacity(1);
790 /// assert_eq!(r.capacity(), 1);
791 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
792 /// assert_eq!(r.try_recv().unwrap(), 3);
793 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
794 /// s.try_broadcast(1).unwrap();
795 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
796 ///
797 /// r.set_capacity(2);
798 /// assert_eq!(r.capacity(), 2);
799 /// s.try_broadcast(2).unwrap();
800 /// assert_eq!(s.try_broadcast(2), Err(TrySendError::Full(2)));
801 /// ```
802 pub fn set_capacity(&mut self, new_cap: usize) {
803 self.inner.write().unwrap().set_capacity(new_cap);
804 }
805
806 /// If overflow mode is enabled on this channel.
807 ///
808 /// # Examples
809 ///
810 /// ```
811 /// use async_broadcast::broadcast;
812 ///
813 /// let (_s, r) = broadcast::<i32>(5);
814 /// assert!(!r.overflow());
815 /// ```
816 pub fn overflow(&self) -> bool {
817 self.inner.read().unwrap().overflow
818 }
819
820 /// Set overflow mode on the channel.
821 ///
822 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
823 /// full. It achieves that by removing the oldest message from the channel.
824 ///
825 /// # Examples
826 ///
827 /// ```
828 /// use async_broadcast::{broadcast, TrySendError, TryRecvError};
829 ///
830 /// let (s, mut r) = broadcast::<i32>(2);
831 /// s.try_broadcast(1).unwrap();
832 /// s.try_broadcast(2).unwrap();
833 /// assert_eq!(s.try_broadcast(3), Err(TrySendError::Full(3)));
834 /// r.set_overflow(true);
835 /// assert_eq!(s.try_broadcast(3).unwrap(), Some(1));
836 /// assert_eq!(s.try_broadcast(4).unwrap(), Some(2));
837 ///
838 /// assert_eq!(r.try_recv(), Err(TryRecvError::Overflowed(2)));
839 /// assert_eq!(r.try_recv().unwrap(), 3);
840 /// assert_eq!(r.try_recv().unwrap(), 4);
841 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
842 /// ```
843 pub fn set_overflow(&mut self, overflow: bool) {
844 self.inner.write().unwrap().overflow = overflow;
845 }
846
847 /// If sender will wait for active receivers.
848 ///
849 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
850 /// `true`.
851 ///
852 /// # Examples
853 ///
854 /// ```
855 /// use async_broadcast::broadcast;
856 ///
857 /// let (_, r) = broadcast::<i32>(5);
858 /// assert!(r.await_active());
859 /// ```
860 pub fn await_active(&self) -> bool {
861 self.inner.read().unwrap().await_active
862 }
863
864 /// Specify if sender will wait for active receivers.
865 ///
866 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
867 /// `true`.
868 ///
869 /// # Examples
870 ///
871 /// ```
872 /// # futures_lite::future::block_on(async {
873 /// use async_broadcast::broadcast;
874 ///
875 /// let (s, mut r) = broadcast::<i32>(2);
876 /// s.broadcast(1).await.unwrap();
877 ///
878 /// r.set_await_active(false);
879 /// let _ = r.deactivate();
880 /// assert!(s.broadcast(2).await.is_err());
881 /// # });
882 /// ```
883 pub fn set_await_active(&mut self, await_active: bool) {
884 self.inner.write().unwrap().await_active = await_active;
885 }
886
887 /// Closes the channel.
888 ///
889 /// Returns `true` if this call has closed the channel and it was not closed already.
890 ///
891 /// The remaining messages can still be received.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// # futures_lite::future::block_on(async {
897 /// use async_broadcast::{broadcast, RecvError};
898 ///
899 /// let (s, mut r) = broadcast(1);
900 /// s.broadcast(1).await.unwrap();
901 /// assert!(s.close());
902 ///
903 /// assert_eq!(r.recv().await.unwrap(), 1);
904 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
905 /// # });
906 /// ```
907 pub fn close(&self) -> bool {
908 self.inner.write().unwrap().close()
909 }
910
911 /// Returns `true` if the channel is closed.
912 ///
913 /// # Examples
914 ///
915 /// ```
916 /// # futures_lite::future::block_on(async {
917 /// use async_broadcast::{broadcast, RecvError};
918 ///
919 /// let (s, r) = broadcast::<()>(1);
920 /// assert!(!s.is_closed());
921 ///
922 /// drop(r);
923 /// assert!(s.is_closed());
924 /// # });
925 /// ```
926 pub fn is_closed(&self) -> bool {
927 self.inner.read().unwrap().is_closed
928 }
929
930 /// Returns `true` if the channel is empty.
931 ///
932 /// # Examples
933 ///
934 /// ```
935 /// # futures_lite::future::block_on(async {
936 /// use async_broadcast::broadcast;
937 ///
938 /// let (s, r) = broadcast(1);
939 ///
940 /// assert!(s.is_empty());
941 /// s.broadcast(1).await;
942 /// assert!(!s.is_empty());
943 /// # });
944 /// ```
945 pub fn is_empty(&self) -> bool {
946 self.inner.read().unwrap().queue.is_empty()
947 }
948
949 /// Returns `true` if the channel is full.
950 ///
951 /// # Examples
952 ///
953 /// ```
954 /// # futures_lite::future::block_on(async {
955 /// use async_broadcast::broadcast;
956 ///
957 /// let (s, r) = broadcast(1);
958 ///
959 /// assert!(!s.is_full());
960 /// s.broadcast(1).await;
961 /// assert!(s.is_full());
962 /// # });
963 /// ```
964 pub fn is_full(&self) -> bool {
965 let inner = self.inner.read().unwrap();
966
967 inner.queue.len() == inner.capacity
968 }
969
970 /// Returns the number of messages in the channel.
971 ///
972 /// # Examples
973 ///
974 /// ```
975 /// # futures_lite::future::block_on(async {
976 /// use async_broadcast::broadcast;
977 ///
978 /// let (s, r) = broadcast(2);
979 /// assert_eq!(s.len(), 0);
980 ///
981 /// s.broadcast(1).await;
982 /// s.broadcast(2).await;
983 /// assert_eq!(s.len(), 2);
984 /// # });
985 /// ```
986 pub fn len(&self) -> usize {
987 self.inner.read().unwrap().queue.len()
988 }
989
990 /// Returns the number of receivers for the channel.
991 ///
992 /// This does not include inactive receivers. Use [`Receiver::inactive_receiver_count`] if you
993 /// are interested in that.
994 ///
995 /// # Examples
996 ///
997 /// ```
998 /// use async_broadcast::broadcast;
999 ///
1000 /// let (s, r) = broadcast::<()>(1);
1001 /// assert_eq!(s.receiver_count(), 1);
1002 /// let r = r.deactivate();
1003 /// assert_eq!(s.receiver_count(), 0);
1004 ///
1005 /// let r2 = r.activate_cloned();
1006 /// assert_eq!(r.receiver_count(), 1);
1007 /// assert_eq!(r.inactive_receiver_count(), 1);
1008 /// ```
1009 pub fn receiver_count(&self) -> usize {
1010 self.inner.read().unwrap().receiver_count
1011 }
1012
1013 /// Returns the number of inactive receivers for the channel.
1014 ///
1015 /// # Examples
1016 ///
1017 /// ```
1018 /// use async_broadcast::broadcast;
1019 ///
1020 /// let (s, r) = broadcast::<()>(1);
1021 /// assert_eq!(s.receiver_count(), 1);
1022 /// let r = r.deactivate();
1023 /// assert_eq!(s.receiver_count(), 0);
1024 ///
1025 /// let r2 = r.activate_cloned();
1026 /// assert_eq!(r.receiver_count(), 1);
1027 /// assert_eq!(r.inactive_receiver_count(), 1);
1028 /// ```
1029 pub fn inactive_receiver_count(&self) -> usize {
1030 self.inner.read().unwrap().inactive_receiver_count
1031 }
1032
1033 /// Returns the number of senders for the channel.
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```
1038 /// # futures_lite::future::block_on(async {
1039 /// use async_broadcast::broadcast;
1040 ///
1041 /// let (s, r) = broadcast::<()>(1);
1042 /// assert_eq!(s.sender_count(), 1);
1043 ///
1044 /// let s2 = s.clone();
1045 /// assert_eq!(s.sender_count(), 2);
1046 /// # });
1047 /// ```
1048 pub fn sender_count(&self) -> usize {
1049 self.inner.read().unwrap().sender_count
1050 }
1051
1052 /// Downgrade to a [`InactiveReceiver`].
1053 ///
1054 /// An inactive receiver is one that can not and does not receive any messages. Its only purpose
1055 /// is keep the associated channel open even when there are no (active) receivers. An inactive
1056 /// receiver can be upgraded into a [`Receiver`] using [`InactiveReceiver::activate`] or
1057 /// [`InactiveReceiver::activate_cloned`].
1058 ///
1059 /// [`Sender::try_broadcast`] will return [`TrySendError::Inactive`] if only inactive
1060 /// receivers exists for the associated channel and [`Sender::broadcast`] will wait until an
1061 /// active receiver is available.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```
1066 /// # futures_lite::future::block_on(async {
1067 /// use async_broadcast::{broadcast, TrySendError};
1068 ///
1069 /// let (s, r) = broadcast(1);
1070 /// let inactive = r.deactivate();
1071 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1072 ///
1073 /// let mut r = inactive.activate();
1074 /// assert_eq!(s.broadcast(10).await, Ok(None));
1075 /// assert_eq!(r.recv().await, Ok(10));
1076 /// # });
1077 /// ```
1078 pub fn deactivate(self) -> InactiveReceiver<T> {
1079 // Drop::drop impl of Receiver will take care of `receiver_count`.
1080 self.inner.write().unwrap().inactive_receiver_count += 1;
1081
1082 InactiveReceiver {
1083 inner: self.inner.clone(),
1084 }
1085 }
1086}
1087
1088impl<T: Clone> Receiver<T> {
1089 /// Receives a message from the channel.
1090 ///
1091 /// If the channel is empty, this method waits until there is a message.
1092 ///
1093 /// If the channel is closed, this method receives a message or returns an error if there are
1094 /// no more messages.
1095 ///
1096 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1097 /// this method returns an error and readjusts its cursor to point to the first available
1098 /// message.
1099 ///
1100 /// # Examples
1101 ///
1102 /// ```
1103 /// # futures_lite::future::block_on(async {
1104 /// use async_broadcast::{broadcast, RecvError};
1105 ///
1106 /// let (s, mut r1) = broadcast(1);
1107 /// let mut r2 = r1.clone();
1108 ///
1109 /// assert_eq!(s.broadcast(1).await, Ok(None));
1110 /// drop(s);
1111 ///
1112 /// assert_eq!(r1.recv().await, Ok(1));
1113 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1114 /// assert_eq!(r2.recv().await, Ok(1));
1115 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1116 /// # });
1117 /// ```
1118 pub fn recv(&mut self) -> Recv<'_, T> {
1119 Recv {
1120 receiver: self,
1121 listener: None,
1122 }
1123 }
1124
1125 /// Attempts to receive a message from the channel.
1126 ///
1127 /// If the channel is empty or closed, this method returns an error.
1128 ///
1129 /// If this receiver has missed a message (only possible if overflow mode is enabled), then
1130 /// this method returns an error and readjusts its cursor to point to the first available
1131 /// message.
1132 ///
1133 /// # Examples
1134 ///
1135 /// ```
1136 /// # futures_lite::future::block_on(async {
1137 /// use async_broadcast::{broadcast, TryRecvError};
1138 ///
1139 /// let (s, mut r1) = broadcast(1);
1140 /// let mut r2 = r1.clone();
1141 /// assert_eq!(s.broadcast(1).await, Ok(None));
1142 ///
1143 /// assert_eq!(r1.try_recv(), Ok(1));
1144 /// assert_eq!(r1.try_recv(), Err(TryRecvError::Empty));
1145 /// assert_eq!(r2.try_recv(), Ok(1));
1146 /// assert_eq!(r2.try_recv(), Err(TryRecvError::Empty));
1147 ///
1148 /// drop(s);
1149 /// assert_eq!(r1.try_recv(), Err(TryRecvError::Closed));
1150 /// assert_eq!(r2.try_recv(), Err(TryRecvError::Closed));
1151 /// # });
1152 /// ```
1153 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
1154 self.inner
1155 .write()
1156 .unwrap()
1157 .try_recv_at(&mut self.pos)
1158 .map(|cow| cow.unwrap_or_else(T::clone))
1159 }
1160
1161 /// Produce a new Sender for this channel.
1162 ///
1163 /// This will not re-open the channel if it was closed due to all senders being dropped.
1164 ///
1165 /// # Examples
1166 ///
1167 /// ```
1168 /// # futures_lite::future::block_on(async {
1169 /// use async_broadcast::{broadcast, RecvError};
1170 ///
1171 /// let (s1, mut r) = broadcast(2);
1172 ///
1173 /// assert_eq!(s1.broadcast(1).await, Ok(None));
1174 ///
1175 /// let mut s2 = r.new_sender();
1176 ///
1177 /// assert_eq!(s2.broadcast(2).await, Ok(None));
1178 /// drop(s1);
1179 /// drop(s2);
1180 ///
1181 /// assert_eq!(r.recv().await, Ok(1));
1182 /// assert_eq!(r.recv().await, Ok(2));
1183 /// assert_eq!(r.recv().await, Err(RecvError::Closed));
1184 /// # });
1185 /// ```
1186 pub fn new_sender(&self) -> Sender<T> {
1187 self.inner.write().unwrap().sender_count += 1;
1188
1189 Sender {
1190 inner: self.inner.clone(),
1191 }
1192 }
1193
1194 /// Produce a new Receiver for this channel.
1195 ///
1196 /// Unlike [`Receiver::clone`], this method creates a new receiver that starts with zero
1197 /// messages available. This is slightly faster than a real clone.
1198 ///
1199 /// # Examples
1200 ///
1201 /// ```
1202 /// # futures_lite::future::block_on(async {
1203 /// use async_broadcast::{broadcast, RecvError};
1204 ///
1205 /// let (s, mut r1) = broadcast(2);
1206 ///
1207 /// assert_eq!(s.broadcast(1).await, Ok(None));
1208 ///
1209 /// let mut r2 = r1.new_receiver();
1210 ///
1211 /// assert_eq!(s.broadcast(2).await, Ok(None));
1212 /// drop(s);
1213 ///
1214 /// assert_eq!(r1.recv().await, Ok(1));
1215 /// assert_eq!(r1.recv().await, Ok(2));
1216 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1217 ///
1218 /// assert_eq!(r2.recv().await, Ok(2));
1219 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1220 /// # });
1221 /// ```
1222 pub fn new_receiver(&self) -> Self {
1223 let mut inner = self.inner.write().unwrap();
1224 inner.receiver_count += 1;
1225 Receiver {
1226 inner: self.inner.clone(),
1227 pos: inner.head_pos + inner.queue.len() as u64,
1228 listener: None,
1229 }
1230 }
1231}
1232
1233impl<T> Drop for Receiver<T> {
1234 fn drop(&mut self) {
1235 let mut inner: RwLockWriteGuard<'_, Inner<…>> = self.inner.write().unwrap();
1236
1237 // Remove ourself from each item's counter
1238 loop {
1239 match inner.try_recv_at(&mut self.pos) {
1240 Ok(_) => continue,
1241 Err(TryRecvError::Overflowed(_)) => continue,
1242 Err(TryRecvError::Closed) => break,
1243 Err(TryRecvError::Empty) => break,
1244 }
1245 }
1246
1247 inner.receiver_count -= 1;
1248
1249 inner.close_channel();
1250 }
1251}
1252
1253impl<T> Clone for Receiver<T> {
1254 /// Produce a clone of this Receiver that has the same messages queued.
1255 ///
1256 /// # Examples
1257 ///
1258 /// ```
1259 /// # futures_lite::future::block_on(async {
1260 /// use async_broadcast::{broadcast, RecvError};
1261 ///
1262 /// let (s, mut r1) = broadcast(1);
1263 ///
1264 /// assert_eq!(s.broadcast(1).await, Ok(None));
1265 /// drop(s);
1266 ///
1267 /// let mut r2 = r1.clone();
1268 ///
1269 /// assert_eq!(r1.recv().await, Ok(1));
1270 /// assert_eq!(r1.recv().await, Err(RecvError::Closed));
1271 /// assert_eq!(r2.recv().await, Ok(1));
1272 /// assert_eq!(r2.recv().await, Err(RecvError::Closed));
1273 /// # });
1274 /// ```
1275 fn clone(&self) -> Self {
1276 let mut inner = self.inner.write().unwrap();
1277 inner.receiver_count += 1;
1278 // increment the waiter count on all items not yet received by this object
1279 let n = self.pos.saturating_sub(inner.head_pos) as usize;
1280 for (_elt, waiters) in inner.queue.iter_mut().skip(n) {
1281 *waiters += 1;
1282 }
1283 Receiver {
1284 inner: self.inner.clone(),
1285 pos: self.pos,
1286 listener: None,
1287 }
1288 }
1289}
1290
1291impl<T: Clone> Stream for Receiver<T> {
1292 type Item = T;
1293
1294 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1295 loop {
1296 // If this stream is listening for events, first wait for a notification.
1297 if let Some(listener) = self.listener.as_mut() {
1298 ready!(Pin::new(listener).poll(cx));
1299 self.listener = None;
1300 }
1301
1302 loop {
1303 // Attempt to receive a message.
1304 match self.try_recv() {
1305 Ok(msg) => {
1306 // The stream is not blocked on an event - drop the listener.
1307 self.listener = None;
1308 return Poll::Ready(Some(msg));
1309 }
1310 Err(TryRecvError::Closed) => {
1311 // The stream is not blocked on an event - drop the listener.
1312 self.listener = None;
1313 return Poll::Ready(None);
1314 }
1315 Err(TryRecvError::Overflowed(_)) => continue,
1316 Err(TryRecvError::Empty) => {}
1317 }
1318
1319 // Receiving failed - now start listening for notifications or wait for one.
1320 match self.listener.as_mut() {
1321 None => {
1322 // Start listening and then try receiving again.
1323 self.listener = {
1324 let inner = self.inner.write().unwrap();
1325 Some(inner.recv_ops.listen())
1326 };
1327 }
1328 Some(_) => {
1329 // Go back to the outer loop to poll the listener.
1330 break;
1331 }
1332 }
1333 }
1334 }
1335 }
1336}
1337
1338impl<T: Clone> futures_core::stream::FusedStream for Receiver<T> {
1339 fn is_terminated(&self) -> bool {
1340 let inner: RwLockReadGuard<'_, Inner<…>> = self.inner.read().unwrap();
1341
1342 inner.is_closed && inner.queue.is_empty()
1343 }
1344}
1345
1346/// An error returned from [`Sender::broadcast()`].
1347///
1348/// Received because the channel is closed or no active receivers were present while `await-active`
1349/// was set to `false` (See [`Sender::set_await_active`] for details).
1350#[derive(PartialEq, Eq, Clone, Copy)]
1351pub struct SendError<T>(pub T);
1352
1353impl<T> SendError<T> {
1354 /// Unwraps the message that couldn't be sent.
1355 pub fn into_inner(self) -> T {
1356 self.0
1357 }
1358}
1359
1360impl<T> error::Error for SendError<T> {}
1361
1362impl<T> fmt::Debug for SendError<T> {
1363 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1364 write!(f, "SendError(..)")
1365 }
1366}
1367
1368impl<T> fmt::Display for SendError<T> {
1369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1370 write!(f, "sending into a closed channel")
1371 }
1372}
1373
1374/// An error returned from [`Sender::try_broadcast()`].
1375#[derive(PartialEq, Eq, Clone, Copy)]
1376pub enum TrySendError<T> {
1377 /// The channel is full but not closed.
1378 Full(T),
1379
1380 /// The channel is closed.
1381 Closed(T),
1382
1383 /// There are currently no active receivers, only inactive ones.
1384 Inactive(T),
1385}
1386
1387impl<T> TrySendError<T> {
1388 /// Unwraps the message that couldn't be sent.
1389 pub fn into_inner(self) -> T {
1390 match self {
1391 TrySendError::Full(t) => t,
1392 TrySendError::Closed(t) => t,
1393 TrySendError::Inactive(t) => t,
1394 }
1395 }
1396
1397 /// Returns `true` if the channel is full but not closed.
1398 pub fn is_full(&self) -> bool {
1399 match self {
1400 TrySendError::Full(_) => true,
1401 TrySendError::Closed(_) | TrySendError::Inactive(_) => false,
1402 }
1403 }
1404
1405 /// Returns `true` if the channel is closed.
1406 pub fn is_closed(&self) -> bool {
1407 match self {
1408 TrySendError::Full(_) | TrySendError::Inactive(_) => false,
1409 TrySendError::Closed(_) => true,
1410 }
1411 }
1412
1413 /// Returns `true` if there are currently no active receivers, only inactive ones.
1414 pub fn is_disconnected(&self) -> bool {
1415 match self {
1416 TrySendError::Full(_) | TrySendError::Closed(_) => false,
1417 TrySendError::Inactive(_) => true,
1418 }
1419 }
1420}
1421
1422impl<T> error::Error for TrySendError<T> {}
1423
1424impl<T> fmt::Debug for TrySendError<T> {
1425 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1426 match *self {
1427 TrySendError::Full(..) => write!(f, "Full(..)"),
1428 TrySendError::Closed(..) => write!(f, "Closed(..)"),
1429 TrySendError::Inactive(..) => write!(f, "Inactive(..)"),
1430 }
1431 }
1432}
1433
1434impl<T> fmt::Display for TrySendError<T> {
1435 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1436 match *self {
1437 TrySendError::Full(..) => write!(f, "sending into a full channel"),
1438 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1439 TrySendError::Inactive(..) => write!(f, "sending into the void (no active receivers)"),
1440 }
1441 }
1442}
1443
1444/// An error returned from [`Receiver::recv()`].
1445#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1446pub enum RecvError {
1447 /// The channel has overflowed since the last element was seen. Future recv operations will
1448 /// succeed, but some messages have been skipped.
1449 ///
1450 /// Contains the number of messages missed.
1451 Overflowed(u64),
1452
1453 /// The channel is empty and closed.
1454 Closed,
1455}
1456
1457impl error::Error for RecvError {}
1458
1459impl fmt::Display for RecvError {
1460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1461 match self {
1462 Self::Overflowed(n: &u64) => write!(f, "receiving skipped {} messages", n),
1463 Self::Closed => write!(f, "receiving from an empty and closed channel"),
1464 }
1465 }
1466}
1467
1468/// An error returned from [`Receiver::try_recv()`].
1469#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1470pub enum TryRecvError {
1471 /// The channel has overflowed since the last element was seen. Future recv operations will
1472 /// succeed, but some messages have been skipped.
1473 Overflowed(u64),
1474
1475 /// The channel is empty but not closed.
1476 Empty,
1477
1478 /// The channel is empty and closed.
1479 Closed,
1480}
1481
1482impl TryRecvError {
1483 /// Returns `true` if the channel is empty but not closed.
1484 pub fn is_empty(&self) -> bool {
1485 match self {
1486 TryRecvError::Empty => true,
1487 TryRecvError::Closed => false,
1488 TryRecvError::Overflowed(_) => false,
1489 }
1490 }
1491
1492 /// Returns `true` if the channel is empty and closed.
1493 pub fn is_closed(&self) -> bool {
1494 match self {
1495 TryRecvError::Empty => false,
1496 TryRecvError::Closed => true,
1497 TryRecvError::Overflowed(_) => false,
1498 }
1499 }
1500
1501 /// Returns `true` if this error indicates the receiver missed messages.
1502 pub fn is_overflowed(&self) -> bool {
1503 match self {
1504 TryRecvError::Empty => false,
1505 TryRecvError::Closed => false,
1506 TryRecvError::Overflowed(_) => true,
1507 }
1508 }
1509}
1510
1511impl error::Error for TryRecvError {}
1512
1513impl fmt::Display for TryRecvError {
1514 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1515 match *self {
1516 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1517 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1518 TryRecvError::Overflowed(n: u64) => {
1519 write!(f, "receiving operation observed {} lost messages", n)
1520 }
1521 }
1522 }
1523}
1524
1525/// A future returned by [`Sender::broadcast()`].
1526#[derive(Debug)]
1527#[must_use = "futures do nothing unless .awaited"]
1528pub struct Send<'a, T> {
1529 sender: &'a Sender<T>,
1530 listener: Option<EventListener>,
1531 msg: Option<T>,
1532}
1533
1534impl<'a, T> Unpin for Send<'a, T> {}
1535
1536impl<'a, T: Clone> Future for Send<'a, T> {
1537 type Output = Result<Option<T>, SendError<T>>;
1538
1539 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1540 let mut this = Pin::new(self);
1541
1542 loop {
1543 let msg = this.msg.take().unwrap();
1544 let inner = &this.sender.inner;
1545
1546 // Attempt to send a message.
1547 match this.sender.try_broadcast(msg) {
1548 Ok(msg) => {
1549 let inner = inner.write().unwrap();
1550
1551 if inner.queue.len() < inner.capacity {
1552 // Not full still, so notify the next awaiting sender.
1553 inner.send_ops.notify(1);
1554 }
1555
1556 return Poll::Ready(Ok(msg));
1557 }
1558 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1559 Err(TrySendError::Full(m)) => this.msg = Some(m),
1560 Err(TrySendError::Inactive(m)) if inner.read().unwrap().await_active => {
1561 this.msg = Some(m)
1562 }
1563 Err(TrySendError::Inactive(m)) => return Poll::Ready(Err(SendError(m))),
1564 }
1565
1566 // Sending failed - now start listening for notifications or wait for one.
1567 match &mut this.listener {
1568 None => {
1569 // Start listening and then try sending again.
1570 let inner = inner.write().unwrap();
1571 this.listener = Some(inner.send_ops.listen());
1572 }
1573 Some(l) => {
1574 // Wait for a notification.
1575 ready!(Pin::new(l).poll(cx));
1576 this.listener = None;
1577 }
1578 }
1579 }
1580 }
1581}
1582
1583/// A future returned by [`Receiver::recv()`].
1584#[derive(Debug)]
1585#[must_use = "futures do nothing unless .awaited"]
1586pub struct Recv<'a, T> {
1587 receiver: &'a mut Receiver<T>,
1588 listener: Option<EventListener>,
1589}
1590
1591impl<'a, T> Unpin for Recv<'a, T> {}
1592
1593impl<'a, T: Clone> Future for Recv<'a, T> {
1594 type Output = Result<T, RecvError>;
1595
1596 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1597 let mut this = Pin::new(self);
1598
1599 loop {
1600 // Attempt to receive a message.
1601 match this.receiver.try_recv() {
1602 Ok(msg) => return Poll::Ready(Ok(msg)),
1603 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError::Closed)),
1604 Err(TryRecvError::Overflowed(n)) => {
1605 return Poll::Ready(Err(RecvError::Overflowed(n)));
1606 }
1607 Err(TryRecvError::Empty) => {}
1608 }
1609
1610 // Receiving failed - now start listening for notifications or wait for one.
1611 match &mut this.listener {
1612 None => {
1613 // Start listening and then try receiving again.
1614 this.listener = {
1615 let inner = this.receiver.inner.write().unwrap();
1616 Some(inner.recv_ops.listen())
1617 };
1618 }
1619 Some(l) => {
1620 // Wait for a notification.
1621 ready!(Pin::new(l).poll(cx));
1622 this.listener = None;
1623 }
1624 }
1625 }
1626 }
1627}
1628
1629/// An inactive receiver.
1630///
1631/// An inactive receiver is a receiver that is unable to receive messages. It's only useful for
1632/// keeping a channel open even when no associated active receivers exist.
1633#[derive(Debug)]
1634pub struct InactiveReceiver<T> {
1635 inner: Arc<RwLock<Inner<T>>>,
1636}
1637
1638impl<T> InactiveReceiver<T> {
1639 /// Convert to an activate [`Receiver`].
1640 ///
1641 /// Consumes `self`. Use [`InactiveReceiver::activate_cloned`] if you want to keep `self`.
1642 ///
1643 /// # Examples
1644 ///
1645 /// ```
1646 /// use async_broadcast::{broadcast, TrySendError};
1647 ///
1648 /// let (s, r) = broadcast(1);
1649 /// let inactive = r.deactivate();
1650 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1651 ///
1652 /// let mut r = inactive.activate();
1653 /// assert_eq!(s.try_broadcast(10), Ok(None));
1654 /// assert_eq!(r.try_recv(), Ok(10));
1655 /// ```
1656 pub fn activate(self) -> Receiver<T> {
1657 self.activate_cloned()
1658 }
1659
1660 /// Create an activate [`Receiver`] for the associated channel.
1661 ///
1662 /// # Examples
1663 ///
1664 /// ```
1665 /// use async_broadcast::{broadcast, TrySendError};
1666 ///
1667 /// let (s, r) = broadcast(1);
1668 /// let inactive = r.deactivate();
1669 /// assert_eq!(s.try_broadcast(10), Err(TrySendError::Inactive(10)));
1670 ///
1671 /// let mut r = inactive.activate_cloned();
1672 /// assert_eq!(s.try_broadcast(10), Ok(None));
1673 /// assert_eq!(r.try_recv(), Ok(10));
1674 /// ```
1675 pub fn activate_cloned(&self) -> Receiver<T> {
1676 let mut inner = self.inner.write().unwrap();
1677 inner.receiver_count += 1;
1678
1679 if inner.receiver_count == 1 {
1680 // Notify 1 awaiting senders that there is now a receiver. If there is still room in the
1681 // queue, the notified operation will notify another awaiting sender.
1682 inner.send_ops.notify(1);
1683 }
1684
1685 Receiver {
1686 inner: self.inner.clone(),
1687 pos: inner.head_pos + inner.queue.len() as u64,
1688 listener: None,
1689 }
1690 }
1691
1692 /// Returns the channel capacity.
1693 ///
1694 /// See [`Receiver::capacity`] documentation for examples.
1695 pub fn capacity(&self) -> usize {
1696 self.inner.read().unwrap().capacity
1697 }
1698
1699 /// Set the channel capacity.
1700 ///
1701 /// There are times when you need to change the channel's capacity after creating it. If the
1702 /// `new_cap` is less than the number of messages in the channel, the oldest messages will be
1703 /// dropped to shrink the channel.
1704 ///
1705 /// See [`Receiver::set_capacity`] documentation for examples.
1706 pub fn set_capacity(&mut self, new_cap: usize) {
1707 self.inner.write().unwrap().set_capacity(new_cap);
1708 }
1709
1710 /// If overflow mode is enabled on this channel.
1711 ///
1712 /// See [`Receiver::overflow`] documentation for examples.
1713 pub fn overflow(&self) -> bool {
1714 self.inner.read().unwrap().overflow
1715 }
1716
1717 /// Set overflow mode on the channel.
1718 ///
1719 /// When overflow mode is set, broadcasting to the channel will succeed even if the channel is
1720 /// full. It achieves that by removing the oldest message from the channel.
1721 ///
1722 /// See [`Receiver::set_overflow`] documentation for examples.
1723 pub fn set_overflow(&mut self, overflow: bool) {
1724 self.inner.write().unwrap().overflow = overflow;
1725 }
1726
1727 /// If sender will wait for active receivers.
1728 ///
1729 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1730 /// `true`.
1731 ///
1732 /// # Examples
1733 ///
1734 /// ```
1735 /// use async_broadcast::broadcast;
1736 ///
1737 /// let (_, r) = broadcast::<i32>(5);
1738 /// let r = r.deactivate();
1739 /// assert!(r.await_active());
1740 /// ```
1741 pub fn await_active(&self) -> bool {
1742 self.inner.read().unwrap().await_active
1743 }
1744
1745 /// Specify if sender will wait for active receivers.
1746 ///
1747 /// If set to `false`, [`Send`] will resolve immediately with a [`SendError`]. Defaults to
1748 /// `true`.
1749 ///
1750 /// # Examples
1751 ///
1752 /// ```
1753 /// # futures_lite::future::block_on(async {
1754 /// use async_broadcast::broadcast;
1755 ///
1756 /// let (s, r) = broadcast::<i32>(2);
1757 /// s.broadcast(1).await.unwrap();
1758 ///
1759 /// let mut r = r.deactivate();
1760 /// r.set_await_active(false);
1761 /// assert!(s.broadcast(2).await.is_err());
1762 /// # });
1763 /// ```
1764 pub fn set_await_active(&mut self, await_active: bool) {
1765 self.inner.write().unwrap().await_active = await_active;
1766 }
1767
1768 /// Closes the channel.
1769 ///
1770 /// Returns `true` if this call has closed the channel and it was not closed already.
1771 ///
1772 /// The remaining messages can still be received.
1773 ///
1774 /// See [`Receiver::close`] documentation for examples.
1775 pub fn close(&self) -> bool {
1776 self.inner.write().unwrap().close()
1777 }
1778
1779 /// Returns `true` if the channel is closed.
1780 ///
1781 /// See [`Receiver::is_closed`] documentation for examples.
1782 pub fn is_closed(&self) -> bool {
1783 self.inner.read().unwrap().is_closed
1784 }
1785
1786 /// Returns `true` if the channel is empty.
1787 ///
1788 /// See [`Receiver::is_empty`] documentation for examples.
1789 pub fn is_empty(&self) -> bool {
1790 self.inner.read().unwrap().queue.is_empty()
1791 }
1792
1793 /// Returns `true` if the channel is full.
1794 ///
1795 /// See [`Receiver::is_full`] documentation for examples.
1796 pub fn is_full(&self) -> bool {
1797 let inner = self.inner.read().unwrap();
1798
1799 inner.queue.len() == inner.capacity
1800 }
1801
1802 /// Returns the number of messages in the channel.
1803 ///
1804 /// See [`Receiver::len`] documentation for examples.
1805 pub fn len(&self) -> usize {
1806 self.inner.read().unwrap().queue.len()
1807 }
1808
1809 /// Returns the number of receivers for the channel.
1810 ///
1811 /// This does not include inactive receivers. Use [`InactiveReceiver::inactive_receiver_count`]
1812 /// if you're interested in that.
1813 ///
1814 /// # Examples
1815 ///
1816 /// ```
1817 /// use async_broadcast::broadcast;
1818 ///
1819 /// let (s, r) = broadcast::<()>(1);
1820 /// assert_eq!(s.receiver_count(), 1);
1821 /// let r = r.deactivate();
1822 /// assert_eq!(s.receiver_count(), 0);
1823 ///
1824 /// let r2 = r.activate_cloned();
1825 /// assert_eq!(r.receiver_count(), 1);
1826 /// assert_eq!(r.inactive_receiver_count(), 1);
1827 /// ```
1828 pub fn receiver_count(&self) -> usize {
1829 self.inner.read().unwrap().receiver_count
1830 }
1831
1832 /// Returns the number of inactive receivers for the channel.
1833 ///
1834 /// # Examples
1835 ///
1836 /// ```
1837 /// use async_broadcast::broadcast;
1838 ///
1839 /// let (s, r) = broadcast::<()>(1);
1840 /// assert_eq!(s.receiver_count(), 1);
1841 /// let r = r.deactivate();
1842 /// assert_eq!(s.receiver_count(), 0);
1843 ///
1844 /// let r2 = r.activate_cloned();
1845 /// assert_eq!(r.receiver_count(), 1);
1846 /// assert_eq!(r.inactive_receiver_count(), 1);
1847 /// ```
1848 pub fn inactive_receiver_count(&self) -> usize {
1849 self.inner.read().unwrap().inactive_receiver_count
1850 }
1851
1852 /// Returns the number of senders for the channel.
1853 ///
1854 /// See [`Receiver::sender_count`] documentation for examples.
1855 pub fn sender_count(&self) -> usize {
1856 self.inner.read().unwrap().sender_count
1857 }
1858}
1859
1860impl<T> Clone for InactiveReceiver<T> {
1861 fn clone(&self) -> Self {
1862 self.inner.write().unwrap().inactive_receiver_count += 1;
1863
1864 InactiveReceiver {
1865 inner: self.inner.clone(),
1866 }
1867 }
1868}
1869
1870impl<T> Drop for InactiveReceiver<T> {
1871 fn drop(&mut self) {
1872 let mut inner: RwLockWriteGuard<'_, Inner<…>> = self.inner.write().unwrap();
1873
1874 inner.inactive_receiver_count -= 1;
1875 inner.close_channel();
1876 }
1877}
1878