1//! An async multi-producer multi-consumer channel, where each message can be received by only
2//! one of all existing consumers.
3//!
4//! There are two kinds of channels:
5//!
6//! 1. [Bounded][`bounded()`] channel with limited capacity.
7//! 2. [Unbounded][`unbounded()`] channel with unlimited capacity.
8//!
9//! A channel has the [`Sender`] and [`Receiver`] side. Both sides are cloneable and can be shared
10//! among multiple threads.
11//!
12//! When all [`Sender`]s or all [`Receiver`]s are dropped, the channel becomes closed. When a
13//! channel is closed, no more messages can be sent, but remaining messages can still be received.
14//!
15//! The channel can also be closed manually by calling [`Sender::close()`] or
16//! [`Receiver::close()`].
17//!
18//! # Examples
19//!
20//! ```
21//! # futures_lite::future::block_on(async {
22//! let (s, r) = async_channel::unbounded();
23//!
24//! assert_eq!(s.send("Hello").await, Ok(()));
25//! assert_eq!(r.recv().await, Ok("Hello"));
26//! # });
27//! ```
28
29#![cfg_attr(not(feature = "std"), no_std)]
30#![forbid(unsafe_code)]
31#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
32#![doc(
33 html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
34)]
35#![doc(
36 html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
37)]
38
39extern crate alloc;
40
41use core::fmt;
42use core::future::Future;
43use core::marker::PhantomPinned;
44use core::pin::Pin;
45use core::sync::atomic::{AtomicUsize, Ordering};
46use core::task::{Context, Poll};
47use core::usize;
48
49use alloc::sync::Arc;
50
51use concurrent_queue::{ConcurrentQueue, PopError, PushError};
52use event_listener::{Event, EventListener};
53use event_listener_strategy::{easy_wrapper, EventListenerFuture, Strategy};
54use futures_core::ready;
55use futures_core::stream::Stream;
56use pin_project_lite::pin_project;
57
58struct Channel<T> {
59 /// Inner message queue.
60 queue: ConcurrentQueue<T>,
61
62 /// Send operations waiting while the channel is full.
63 send_ops: Event,
64
65 /// Receive operations waiting while the channel is empty and not closed.
66 recv_ops: Event,
67
68 /// Stream operations while the channel is empty and not closed.
69 stream_ops: Event,
70
71 /// The number of currently active `Sender`s.
72 sender_count: AtomicUsize,
73
74 /// The number of currently active `Receivers`s.
75 receiver_count: AtomicUsize,
76}
77
78impl<T> Channel<T> {
79 /// Closes the channel and notifies all blocked operations.
80 ///
81 /// Returns `true` if this call has closed the channel and it was not closed already.
82 fn close(&self) -> bool {
83 if self.queue.close() {
84 // Notify all send operations.
85 self.send_ops.notify(usize::MAX);
86
87 // Notify all receive and stream operations.
88 self.recv_ops.notify(usize::MAX);
89 self.stream_ops.notify(usize::MAX);
90
91 true
92 } else {
93 false
94 }
95 }
96}
97
98/// Creates a bounded channel.
99///
100/// The created channel has space to hold at most `cap` messages at a time.
101///
102/// # Panics
103///
104/// Capacity must be a positive number. If `cap` is zero, this function will panic.
105///
106/// # Examples
107///
108/// ```
109/// # futures_lite::future::block_on(async {
110/// use async_channel::{bounded, TryRecvError, TrySendError};
111///
112/// let (s, r) = bounded(1);
113///
114/// assert_eq!(s.send(10).await, Ok(()));
115/// assert_eq!(s.try_send(20), Err(TrySendError::Full(20)));
116///
117/// assert_eq!(r.recv().await, Ok(10));
118/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
119/// # });
120/// ```
121pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
122 assert!(cap > 0, "capacity cannot be zero");
123
124 let channel: Arc> = Arc::new(data:Channel {
125 queue: ConcurrentQueue::bounded(cap),
126 send_ops: Event::new(),
127 recv_ops: Event::new(),
128 stream_ops: Event::new(),
129 sender_count: AtomicUsize::new(1),
130 receiver_count: AtomicUsize::new(1),
131 });
132
133 let s: Sender = Sender {
134 channel: channel.clone(),
135 };
136 let r: Receiver = Receiver {
137 listener: None,
138 channel,
139 _pin: PhantomPinned,
140 };
141 (s, r)
142}
143
144/// Creates an unbounded channel.
145///
146/// The created channel can hold an unlimited number of messages.
147///
148/// # Examples
149///
150/// ```
151/// # futures_lite::future::block_on(async {
152/// use async_channel::{unbounded, TryRecvError};
153///
154/// let (s, r) = unbounded();
155///
156/// assert_eq!(s.send(10).await, Ok(()));
157/// assert_eq!(s.send(20).await, Ok(()));
158///
159/// assert_eq!(r.recv().await, Ok(10));
160/// assert_eq!(r.recv().await, Ok(20));
161/// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
162/// # });
163/// ```
164pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
165 let channel: Arc> = Arc::new(data:Channel {
166 queue: ConcurrentQueue::unbounded(),
167 send_ops: Event::new(),
168 recv_ops: Event::new(),
169 stream_ops: Event::new(),
170 sender_count: AtomicUsize::new(1),
171 receiver_count: AtomicUsize::new(1),
172 });
173
174 let s: Sender = Sender {
175 channel: channel.clone(),
176 };
177 let r: Receiver = Receiver {
178 listener: None,
179 channel,
180 _pin: PhantomPinned,
181 };
182 (s, r)
183}
184
185/// The sending side of a channel.
186///
187/// Senders can be cloned and shared among threads. When all senders associated with a channel are
188/// dropped, the channel becomes closed.
189///
190/// The channel can also be closed manually by calling [`Sender::close()`].
191pub struct Sender<T> {
192 /// Inner channel state.
193 channel: Arc<Channel<T>>,
194}
195
196impl<T> Sender<T> {
197 /// Attempts to send a message into the channel.
198 ///
199 /// If the channel is full or closed, this method returns an error.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use async_channel::{bounded, TrySendError};
205 ///
206 /// let (s, r) = bounded(1);
207 ///
208 /// assert_eq!(s.try_send(1), Ok(()));
209 /// assert_eq!(s.try_send(2), Err(TrySendError::Full(2)));
210 ///
211 /// drop(r);
212 /// assert_eq!(s.try_send(3), Err(TrySendError::Closed(3)));
213 /// ```
214 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
215 match self.channel.queue.push(msg) {
216 Ok(()) => {
217 // Notify a blocked receive operation. If the notified operation gets canceled,
218 // it will notify another blocked receive operation.
219 self.channel.recv_ops.notify_additional(1);
220
221 // Notify all blocked streams.
222 self.channel.stream_ops.notify(usize::MAX);
223
224 Ok(())
225 }
226 Err(PushError::Full(msg)) => Err(TrySendError::Full(msg)),
227 Err(PushError::Closed(msg)) => Err(TrySendError::Closed(msg)),
228 }
229 }
230
231 /// Sends a message into the channel.
232 ///
233 /// If the channel is full, this method waits until there is space for a message.
234 ///
235 /// If the channel is closed, this method returns an error.
236 ///
237 /// # Examples
238 ///
239 /// ```
240 /// # futures_lite::future::block_on(async {
241 /// use async_channel::{unbounded, SendError};
242 ///
243 /// let (s, r) = unbounded();
244 ///
245 /// assert_eq!(s.send(1).await, Ok(()));
246 /// drop(r);
247 /// assert_eq!(s.send(2).await, Err(SendError(2)));
248 /// # });
249 /// ```
250 pub fn send(&self, msg: T) -> Send<'_, T> {
251 Send::_new(SendInner {
252 sender: self,
253 msg: Some(msg),
254 listener: None,
255 _pin: PhantomPinned,
256 })
257 }
258
259 /// Sends a message into this channel using the blocking strategy.
260 ///
261 /// If the channel is full, this method will block until there is room.
262 /// If the channel is closed, this method returns an error.
263 ///
264 /// # Blocking
265 ///
266 /// Rather than using asynchronous waiting, like the [`send`](Self::send) method,
267 /// this method will block the current thread until the message is sent.
268 ///
269 /// This method should not be used in an asynchronous context. It is intended
270 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
271 /// Calling this method in an asynchronous context may result in deadlocks.
272 ///
273 /// # Examples
274 ///
275 /// ```
276 /// use async_channel::{unbounded, SendError};
277 ///
278 /// let (s, r) = unbounded();
279 ///
280 /// assert_eq!(s.send_blocking(1), Ok(()));
281 /// drop(r);
282 /// assert_eq!(s.send_blocking(2), Err(SendError(2)));
283 /// ```
284 #[cfg(all(feature = "std", not(target_family = "wasm")))]
285 pub fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
286 self.send(msg).wait()
287 }
288
289 /// Closes the channel.
290 ///
291 /// Returns `true` if this call has closed the channel and it was not closed already.
292 ///
293 /// The remaining messages can still be received.
294 ///
295 /// # Examples
296 ///
297 /// ```
298 /// # futures_lite::future::block_on(async {
299 /// use async_channel::{unbounded, RecvError};
300 ///
301 /// let (s, r) = unbounded();
302 /// assert_eq!(s.send(1).await, Ok(()));
303 /// assert!(s.close());
304 ///
305 /// assert_eq!(r.recv().await, Ok(1));
306 /// assert_eq!(r.recv().await, Err(RecvError));
307 /// # });
308 /// ```
309 pub fn close(&self) -> bool {
310 self.channel.close()
311 }
312
313 /// Returns `true` if the channel is closed.
314 ///
315 /// # Examples
316 ///
317 /// ```
318 /// # futures_lite::future::block_on(async {
319 /// use async_channel::{unbounded, RecvError};
320 ///
321 /// let (s, r) = unbounded::<()>();
322 /// assert!(!s.is_closed());
323 ///
324 /// drop(r);
325 /// assert!(s.is_closed());
326 /// # });
327 /// ```
328 pub fn is_closed(&self) -> bool {
329 self.channel.queue.is_closed()
330 }
331
332 /// Returns `true` if the channel is empty.
333 ///
334 /// # Examples
335 ///
336 /// ```
337 /// # futures_lite::future::block_on(async {
338 /// use async_channel::unbounded;
339 ///
340 /// let (s, r) = unbounded();
341 ///
342 /// assert!(s.is_empty());
343 /// s.send(1).await;
344 /// assert!(!s.is_empty());
345 /// # });
346 /// ```
347 pub fn is_empty(&self) -> bool {
348 self.channel.queue.is_empty()
349 }
350
351 /// Returns `true` if the channel is full.
352 ///
353 /// Unbounded channels are never full.
354 ///
355 /// # Examples
356 ///
357 /// ```
358 /// # futures_lite::future::block_on(async {
359 /// use async_channel::bounded;
360 ///
361 /// let (s, r) = bounded(1);
362 ///
363 /// assert!(!s.is_full());
364 /// s.send(1).await;
365 /// assert!(s.is_full());
366 /// # });
367 /// ```
368 pub fn is_full(&self) -> bool {
369 self.channel.queue.is_full()
370 }
371
372 /// Returns the number of messages in the channel.
373 ///
374 /// # Examples
375 ///
376 /// ```
377 /// # futures_lite::future::block_on(async {
378 /// use async_channel::unbounded;
379 ///
380 /// let (s, r) = unbounded();
381 /// assert_eq!(s.len(), 0);
382 ///
383 /// s.send(1).await;
384 /// s.send(2).await;
385 /// assert_eq!(s.len(), 2);
386 /// # });
387 /// ```
388 pub fn len(&self) -> usize {
389 self.channel.queue.len()
390 }
391
392 /// Returns the channel capacity if it's bounded.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use async_channel::{bounded, unbounded};
398 ///
399 /// let (s, r) = bounded::<i32>(5);
400 /// assert_eq!(s.capacity(), Some(5));
401 ///
402 /// let (s, r) = unbounded::<i32>();
403 /// assert_eq!(s.capacity(), None);
404 /// ```
405 pub fn capacity(&self) -> Option<usize> {
406 self.channel.queue.capacity()
407 }
408
409 /// Returns the number of receivers for the channel.
410 ///
411 /// # Examples
412 ///
413 /// ```
414 /// # futures_lite::future::block_on(async {
415 /// use async_channel::unbounded;
416 ///
417 /// let (s, r) = unbounded::<()>();
418 /// assert_eq!(s.receiver_count(), 1);
419 ///
420 /// let r2 = r.clone();
421 /// assert_eq!(s.receiver_count(), 2);
422 /// # });
423 /// ```
424 pub fn receiver_count(&self) -> usize {
425 self.channel.receiver_count.load(Ordering::SeqCst)
426 }
427
428 /// Returns the number of senders for the channel.
429 ///
430 /// # Examples
431 ///
432 /// ```
433 /// # futures_lite::future::block_on(async {
434 /// use async_channel::unbounded;
435 ///
436 /// let (s, r) = unbounded::<()>();
437 /// assert_eq!(s.sender_count(), 1);
438 ///
439 /// let s2 = s.clone();
440 /// assert_eq!(s.sender_count(), 2);
441 /// # });
442 /// ```
443 pub fn sender_count(&self) -> usize {
444 self.channel.sender_count.load(Ordering::SeqCst)
445 }
446
447 /// Downgrade the sender to a weak reference.
448 pub fn downgrade(&self) -> WeakSender<T> {
449 WeakSender {
450 channel: self.channel.clone(),
451 }
452 }
453}
454
455impl<T> Drop for Sender<T> {
456 fn drop(&mut self) {
457 // Decrement the sender count and close the channel if it drops down to zero.
458 if self.channel.sender_count.fetch_sub(val:1, order:Ordering::AcqRel) == 1 {
459 self.channel.close();
460 }
461 }
462}
463
464impl<T> fmt::Debug for Sender<T> {
465 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
466 write!(f, "Sender {{ .. }}")
467 }
468}
469
470impl<T> Clone for Sender<T> {
471 fn clone(&self) -> Sender<T> {
472 let count: usize = self.channel.sender_count.fetch_add(val:1, order:Ordering::Relaxed);
473
474 // Make sure the count never overflows, even if lots of sender clones are leaked.
475 if count > usize::MAX / 2 {
476 abort();
477 }
478
479 Sender {
480 channel: self.channel.clone(),
481 }
482 }
483}
484
485pin_project! {
486 /// The receiving side of a channel.
487 ///
488 /// Receivers can be cloned and shared among threads. When all receivers associated with a channel
489 /// are dropped, the channel becomes closed.
490 ///
491 /// The channel can also be closed manually by calling [`Receiver::close()`].
492 ///
493 /// Receivers implement the [`Stream`] trait.
494 pub struct Receiver<T> {
495 // Inner channel state.
496 channel: Arc<Channel<T>>,
497
498 // Listens for a send or close event to unblock this stream.
499 listener: Option<EventListener>,
500
501 // Keeping this type `!Unpin` enables future optimizations.
502 #[pin]
503 _pin: PhantomPinned
504 }
505
506 impl<T> PinnedDrop for Receiver<T> {
507 fn drop(this: Pin<&mut Self>) {
508 let this = this.project();
509
510 // Decrement the receiver count and close the channel if it drops down to zero.
511 if this.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
512 this.channel.close();
513 }
514 }
515 }
516}
517
518impl<T> Receiver<T> {
519 /// Attempts to receive a message from the channel.
520 ///
521 /// If the channel is empty, or empty and closed, this method returns an error.
522 ///
523 /// # Examples
524 ///
525 /// ```
526 /// # futures_lite::future::block_on(async {
527 /// use async_channel::{unbounded, TryRecvError};
528 ///
529 /// let (s, r) = unbounded();
530 /// assert_eq!(s.send(1).await, Ok(()));
531 ///
532 /// assert_eq!(r.try_recv(), Ok(1));
533 /// assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
534 ///
535 /// drop(s);
536 /// assert_eq!(r.try_recv(), Err(TryRecvError::Closed));
537 /// # });
538 /// ```
539 pub fn try_recv(&self) -> Result<T, TryRecvError> {
540 match self.channel.queue.pop() {
541 Ok(msg) => {
542 // Notify a blocked send operation. If the notified operation gets canceled, it
543 // will notify another blocked send operation.
544 self.channel.send_ops.notify_additional(1);
545
546 Ok(msg)
547 }
548 Err(PopError::Empty) => Err(TryRecvError::Empty),
549 Err(PopError::Closed) => Err(TryRecvError::Closed),
550 }
551 }
552
553 /// Receives a message from the channel.
554 ///
555 /// If the channel is empty, this method waits until there is a message.
556 ///
557 /// If the channel is closed, this method receives a message or returns an error if there are
558 /// no more messages.
559 ///
560 /// # Examples
561 ///
562 /// ```
563 /// # futures_lite::future::block_on(async {
564 /// use async_channel::{unbounded, RecvError};
565 ///
566 /// let (s, r) = unbounded();
567 ///
568 /// assert_eq!(s.send(1).await, Ok(()));
569 /// drop(s);
570 ///
571 /// assert_eq!(r.recv().await, Ok(1));
572 /// assert_eq!(r.recv().await, Err(RecvError));
573 /// # });
574 /// ```
575 pub fn recv(&self) -> Recv<'_, T> {
576 Recv::_new(RecvInner {
577 receiver: self,
578 listener: None,
579 _pin: PhantomPinned,
580 })
581 }
582
583 /// Receives a message from the channel using the blocking strategy.
584 ///
585 /// If the channel is empty, this method waits until there is a message.
586 /// If the channel is closed, this method receives a message or returns an error if there are
587 /// no more messages.
588 ///
589 /// # Blocking
590 ///
591 /// Rather than using asynchronous waiting, like the [`recv`](Self::recv) method,
592 /// this method will block the current thread until the message is sent.
593 ///
594 /// This method should not be used in an asynchronous context. It is intended
595 /// to be used such that a channel can be used in both asynchronous and synchronous contexts.
596 /// Calling this method in an asynchronous context may result in deadlocks.
597 ///
598 /// # Examples
599 ///
600 /// ```
601 /// use async_channel::{unbounded, RecvError};
602 ///
603 /// let (s, r) = unbounded();
604 ///
605 /// assert_eq!(s.send_blocking(1), Ok(()));
606 /// drop(s);
607 ///
608 /// assert_eq!(r.recv_blocking(), Ok(1));
609 /// assert_eq!(r.recv_blocking(), Err(RecvError));
610 /// ```
611 #[cfg(all(feature = "std", not(target_family = "wasm")))]
612 pub fn recv_blocking(&self) -> Result<T, RecvError> {
613 self.recv().wait()
614 }
615
616 /// Closes the channel.
617 ///
618 /// Returns `true` if this call has closed the channel and it was not closed already.
619 ///
620 /// The remaining messages can still be received.
621 ///
622 /// # Examples
623 ///
624 /// ```
625 /// # futures_lite::future::block_on(async {
626 /// use async_channel::{unbounded, RecvError};
627 ///
628 /// let (s, r) = unbounded();
629 /// assert_eq!(s.send(1).await, Ok(()));
630 ///
631 /// assert!(r.close());
632 /// assert_eq!(r.recv().await, Ok(1));
633 /// assert_eq!(r.recv().await, Err(RecvError));
634 /// # });
635 /// ```
636 pub fn close(&self) -> bool {
637 self.channel.close()
638 }
639
640 /// Returns `true` if the channel is closed.
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// # futures_lite::future::block_on(async {
646 /// use async_channel::{unbounded, RecvError};
647 ///
648 /// let (s, r) = unbounded::<()>();
649 /// assert!(!r.is_closed());
650 ///
651 /// drop(s);
652 /// assert!(r.is_closed());
653 /// # });
654 /// ```
655 pub fn is_closed(&self) -> bool {
656 self.channel.queue.is_closed()
657 }
658
659 /// Returns `true` if the channel is empty.
660 ///
661 /// # Examples
662 ///
663 /// ```
664 /// # futures_lite::future::block_on(async {
665 /// use async_channel::unbounded;
666 ///
667 /// let (s, r) = unbounded();
668 ///
669 /// assert!(s.is_empty());
670 /// s.send(1).await;
671 /// assert!(!s.is_empty());
672 /// # });
673 /// ```
674 pub fn is_empty(&self) -> bool {
675 self.channel.queue.is_empty()
676 }
677
678 /// Returns `true` if the channel is full.
679 ///
680 /// Unbounded channels are never full.
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// # futures_lite::future::block_on(async {
686 /// use async_channel::bounded;
687 ///
688 /// let (s, r) = bounded(1);
689 ///
690 /// assert!(!r.is_full());
691 /// s.send(1).await;
692 /// assert!(r.is_full());
693 /// # });
694 /// ```
695 pub fn is_full(&self) -> bool {
696 self.channel.queue.is_full()
697 }
698
699 /// Returns the number of messages in the channel.
700 ///
701 /// # Examples
702 ///
703 /// ```
704 /// # futures_lite::future::block_on(async {
705 /// use async_channel::unbounded;
706 ///
707 /// let (s, r) = unbounded();
708 /// assert_eq!(r.len(), 0);
709 ///
710 /// s.send(1).await;
711 /// s.send(2).await;
712 /// assert_eq!(r.len(), 2);
713 /// # });
714 /// ```
715 pub fn len(&self) -> usize {
716 self.channel.queue.len()
717 }
718
719 /// Returns the channel capacity if it's bounded.
720 ///
721 /// # Examples
722 ///
723 /// ```
724 /// use async_channel::{bounded, unbounded};
725 ///
726 /// let (s, r) = bounded::<i32>(5);
727 /// assert_eq!(r.capacity(), Some(5));
728 ///
729 /// let (s, r) = unbounded::<i32>();
730 /// assert_eq!(r.capacity(), None);
731 /// ```
732 pub fn capacity(&self) -> Option<usize> {
733 self.channel.queue.capacity()
734 }
735
736 /// Returns the number of receivers for the channel.
737 ///
738 /// # Examples
739 ///
740 /// ```
741 /// # futures_lite::future::block_on(async {
742 /// use async_channel::unbounded;
743 ///
744 /// let (s, r) = unbounded::<()>();
745 /// assert_eq!(r.receiver_count(), 1);
746 ///
747 /// let r2 = r.clone();
748 /// assert_eq!(r.receiver_count(), 2);
749 /// # });
750 /// ```
751 pub fn receiver_count(&self) -> usize {
752 self.channel.receiver_count.load(Ordering::SeqCst)
753 }
754
755 /// Returns the number of senders for the channel.
756 ///
757 /// # Examples
758 ///
759 /// ```
760 /// # futures_lite::future::block_on(async {
761 /// use async_channel::unbounded;
762 ///
763 /// let (s, r) = unbounded::<()>();
764 /// assert_eq!(r.sender_count(), 1);
765 ///
766 /// let s2 = s.clone();
767 /// assert_eq!(r.sender_count(), 2);
768 /// # });
769 /// ```
770 pub fn sender_count(&self) -> usize {
771 self.channel.sender_count.load(Ordering::SeqCst)
772 }
773
774 /// Downgrade the receiver to a weak reference.
775 pub fn downgrade(&self) -> WeakReceiver<T> {
776 WeakReceiver {
777 channel: self.channel.clone(),
778 }
779 }
780}
781
782impl<T> fmt::Debug for Receiver<T> {
783 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
784 write!(f, "Receiver {{ .. }}")
785 }
786}
787
788impl<T> Clone for Receiver<T> {
789 fn clone(&self) -> Receiver<T> {
790 let count: usize = self.channel.receiver_count.fetch_add(val:1, order:Ordering::Relaxed);
791
792 // Make sure the count never overflows, even if lots of receiver clones are leaked.
793 if count > usize::MAX / 2 {
794 abort();
795 }
796
797 Receiver {
798 channel: self.channel.clone(),
799 listener: None,
800 _pin: PhantomPinned,
801 }
802 }
803}
804
805impl<T> Stream for Receiver<T> {
806 type Item = T;
807
808 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
809 loop {
810 // If this stream is listening for events, first wait for a notification.
811 {
812 let this = self.as_mut().project();
813 if let Some(listener) = this.listener.as_mut() {
814 ready!(Pin::new(listener).poll(cx));
815 *this.listener = None;
816 }
817 }
818
819 loop {
820 // Attempt to receive a message.
821 match self.try_recv() {
822 Ok(msg) => {
823 // The stream is not blocked on an event - drop the listener.
824 let this = self.as_mut().project();
825 *this.listener = None;
826 return Poll::Ready(Some(msg));
827 }
828 Err(TryRecvError::Closed) => {
829 // The stream is not blocked on an event - drop the listener.
830 let this = self.as_mut().project();
831 *this.listener = None;
832 return Poll::Ready(None);
833 }
834 Err(TryRecvError::Empty) => {}
835 }
836
837 // Receiving failed - now start listening for notifications or wait for one.
838 let this = self.as_mut().project();
839 if this.listener.is_some() {
840 // Go back to the outer loop to wait for a notification.
841 break;
842 } else {
843 *this.listener = Some(this.channel.stream_ops.listen());
844 }
845 }
846 }
847 }
848}
849
850impl<T> futures_core::stream::FusedStream for Receiver<T> {
851 fn is_terminated(&self) -> bool {
852 self.channel.queue.is_closed() && self.channel.queue.is_empty()
853 }
854}
855
856/// A [`Sender`] that prevents the channel from not being closed.
857///
858/// This is created through the [`Sender::downgrade`] method. In order to use it, it needs
859/// to be upgraded into a [`Sender`] through the `upgrade` method.
860pub struct WeakSender<T> {
861 channel: Arc<Channel<T>>,
862}
863
864impl<T> WeakSender<T> {
865 /// Upgrade the [`WeakSender`] into a [`Sender`].
866 pub fn upgrade(&self) -> Option<Sender<T>> {
867 if self.channel.queue.is_closed() {
868 None
869 } else {
870 match self.channel.sender_count.fetch_update(
871 set_order:Ordering::Relaxed,
872 fetch_order:Ordering::Relaxed,
873 |count: usize| if count == 0 { None } else { Some(count + 1) },
874 ) {
875 Err(_) => None,
876 Ok(new_value: usize) if new_value > usize::MAX / 2 => {
877 // Make sure the count never overflows, even if lots of sender clones are leaked.
878 abort();
879 }
880 Ok(_) => Some(Sender {
881 channel: self.channel.clone(),
882 }),
883 }
884 }
885 }
886}
887
888impl<T> Clone for WeakSender<T> {
889 fn clone(&self) -> Self {
890 WeakSender {
891 channel: self.channel.clone(),
892 }
893 }
894}
895
896impl<T> fmt::Debug for WeakSender<T> {
897 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
898 write!(f, "WeakSender {{ .. }}")
899 }
900}
901
902/// A [`Receiver`] that prevents the channel from not being closed.
903///
904/// This is created through the [`Receiver::downgrade`] method. In order to use it, it needs
905/// to be upgraded into a [`Receiver`] through the `upgrade` method.
906pub struct WeakReceiver<T> {
907 channel: Arc<Channel<T>>,
908}
909
910impl<T> WeakReceiver<T> {
911 /// Upgrade the [`WeakReceiver`] into a [`Receiver`].
912 pub fn upgrade(&self) -> Option<Receiver<T>> {
913 if self.channel.queue.is_closed() {
914 None
915 } else {
916 match self.channel.receiver_count.fetch_update(
917 Ordering::Relaxed,
918 Ordering::Relaxed,
919 |count| if count == 0 { None } else { Some(count + 1) },
920 ) {
921 Err(_) => None,
922 Ok(new_value) if new_value > usize::MAX / 2 => {
923 // Make sure the count never overflows, even if lots of receiver clones are leaked.
924 abort();
925 }
926 Ok(_) => Some(Receiver {
927 channel: self.channel.clone(),
928 listener: None,
929 _pin: PhantomPinned,
930 }),
931 }
932 }
933 }
934}
935
936impl<T> Clone for WeakReceiver<T> {
937 fn clone(&self) -> Self {
938 WeakReceiver {
939 channel: self.channel.clone(),
940 }
941 }
942}
943
944impl<T> fmt::Debug for WeakReceiver<T> {
945 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
946 write!(f, "WeakReceiver {{ .. }}")
947 }
948}
949
950/// An error returned from [`Sender::send()`].
951///
952/// Received because the channel is closed.
953#[derive(PartialEq, Eq, Clone, Copy)]
954pub struct SendError<T>(pub T);
955
956impl<T> SendError<T> {
957 /// Unwraps the message that couldn't be sent.
958 pub fn into_inner(self) -> T {
959 self.0
960 }
961}
962
963#[cfg(feature = "std")]
964impl<T> std::error::Error for SendError<T> {}
965
966impl<T> fmt::Debug for SendError<T> {
967 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
968 write!(f, "SendError(..)")
969 }
970}
971
972impl<T> fmt::Display for SendError<T> {
973 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
974 write!(f, "sending into a closed channel")
975 }
976}
977
978/// An error returned from [`Sender::try_send()`].
979#[derive(PartialEq, Eq, Clone, Copy)]
980pub enum TrySendError<T> {
981 /// The channel is full but not closed.
982 Full(T),
983
984 /// The channel is closed.
985 Closed(T),
986}
987
988impl<T> TrySendError<T> {
989 /// Unwraps the message that couldn't be sent.
990 pub fn into_inner(self) -> T {
991 match self {
992 TrySendError::Full(t) => t,
993 TrySendError::Closed(t) => t,
994 }
995 }
996
997 /// Returns `true` if the channel is full but not closed.
998 pub fn is_full(&self) -> bool {
999 match self {
1000 TrySendError::Full(_) => true,
1001 TrySendError::Closed(_) => false,
1002 }
1003 }
1004
1005 /// Returns `true` if the channel is closed.
1006 pub fn is_closed(&self) -> bool {
1007 match self {
1008 TrySendError::Full(_) => false,
1009 TrySendError::Closed(_) => true,
1010 }
1011 }
1012}
1013
1014#[cfg(feature = "std")]
1015impl<T> std::error::Error for TrySendError<T> {}
1016
1017impl<T> fmt::Debug for TrySendError<T> {
1018 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1019 match *self {
1020 TrySendError::Full(..) => write!(f, "Full(..)"),
1021 TrySendError::Closed(..) => write!(f, "Closed(..)"),
1022 }
1023 }
1024}
1025
1026impl<T> fmt::Display for TrySendError<T> {
1027 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1028 match *self {
1029 TrySendError::Full(..) => write!(f, "sending into a full channel"),
1030 TrySendError::Closed(..) => write!(f, "sending into a closed channel"),
1031 }
1032 }
1033}
1034
1035/// An error returned from [`Receiver::recv()`].
1036///
1037/// Received because the channel is empty and closed.
1038#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1039pub struct RecvError;
1040
1041#[cfg(feature = "std")]
1042impl std::error::Error for RecvError {}
1043
1044impl fmt::Display for RecvError {
1045 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046 write!(f, "receiving from an empty and closed channel")
1047 }
1048}
1049
1050/// An error returned from [`Receiver::try_recv()`].
1051#[derive(PartialEq, Eq, Clone, Copy, Debug)]
1052pub enum TryRecvError {
1053 /// The channel is empty but not closed.
1054 Empty,
1055
1056 /// The channel is empty and closed.
1057 Closed,
1058}
1059
1060impl TryRecvError {
1061 /// Returns `true` if the channel is empty but not closed.
1062 pub fn is_empty(&self) -> bool {
1063 match self {
1064 TryRecvError::Empty => true,
1065 TryRecvError::Closed => false,
1066 }
1067 }
1068
1069 /// Returns `true` if the channel is empty and closed.
1070 pub fn is_closed(&self) -> bool {
1071 match self {
1072 TryRecvError::Empty => false,
1073 TryRecvError::Closed => true,
1074 }
1075 }
1076}
1077
1078#[cfg(feature = "std")]
1079impl std::error::Error for TryRecvError {}
1080
1081impl fmt::Display for TryRecvError {
1082 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1083 match *self {
1084 TryRecvError::Empty => write!(f, "receiving from an empty channel"),
1085 TryRecvError::Closed => write!(f, "receiving from an empty and closed channel"),
1086 }
1087 }
1088}
1089
1090easy_wrapper! {
1091 /// A future returned by [`Sender::send()`].
1092 #[derive(Debug)]
1093 #[must_use = "futures do nothing unless you `.await` or poll them"]
1094 pub struct Send<'a, T>(SendInner<'a, T> => Result<(), SendError<T>>);
1095 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1096 pub(crate) wait();
1097}
1098
1099pin_project! {
1100 #[derive(Debug)]
1101 #[project(!Unpin)]
1102 struct SendInner<'a, T> {
1103 // Reference to the original sender.
1104 sender: &'a Sender<T>,
1105
1106 // The message to send.
1107 msg: Option<T>,
1108
1109 // Listener waiting on the channel.
1110 listener: Option<EventListener>,
1111
1112 // Keeping this type `!Unpin` enables future optimizations.
1113 #[pin]
1114 _pin: PhantomPinned
1115 }
1116}
1117
1118impl<'a, T> EventListenerFuture for SendInner<'a, T> {
1119 type Output = Result<(), SendError<T>>;
1120
1121 /// Run this future with the given `Strategy`.
1122 fn poll_with_strategy<'x, S: Strategy<'x>>(
1123 self: Pin<&mut Self>,
1124 strategy: &mut S,
1125 context: &mut S::Context,
1126 ) -> Poll<Result<(), SendError<T>>> {
1127 let this = self.project();
1128
1129 loop {
1130 let msg = this.msg.take().unwrap();
1131 // Attempt to send a message.
1132 match this.sender.try_send(msg) {
1133 Ok(()) => return Poll::Ready(Ok(())),
1134 Err(TrySendError::Closed(msg)) => return Poll::Ready(Err(SendError(msg))),
1135 Err(TrySendError::Full(m)) => *this.msg = Some(m),
1136 }
1137
1138 // Sending failed - now start listening for notifications or wait for one.
1139 if this.listener.is_some() {
1140 // Poll using the given strategy
1141 ready!(S::poll(strategy, &mut *this.listener, context));
1142 } else {
1143 *this.listener = Some(this.sender.channel.send_ops.listen());
1144 }
1145 }
1146 }
1147}
1148
1149easy_wrapper! {
1150 /// A future returned by [`Receiver::recv()`].
1151 #[derive(Debug)]
1152 #[must_use = "futures do nothing unless you `.await` or poll them"]
1153 pub struct Recv<'a, T>(RecvInner<'a, T> => Result<T, RecvError>);
1154 #[cfg(all(feature = "std", not(target_family = "wasm")))]
1155 pub(crate) wait();
1156}
1157
1158pin_project! {
1159 #[derive(Debug)]
1160 #[project(!Unpin)]
1161 struct RecvInner<'a, T> {
1162 // Reference to the receiver.
1163 receiver: &'a Receiver<T>,
1164
1165 // Listener waiting on the channel.
1166 listener: Option<EventListener>,
1167
1168 // Keeping this type `!Unpin` enables future optimizations.
1169 #[pin]
1170 _pin: PhantomPinned
1171 }
1172}
1173
1174impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
1175 type Output = Result<T, RecvError>;
1176
1177 /// Run this future with the given `Strategy`.
1178 fn poll_with_strategy<'x, S: Strategy<'x>>(
1179 self: Pin<&mut Self>,
1180 strategy: &mut S,
1181 cx: &mut S::Context,
1182 ) -> Poll<Result<T, RecvError>> {
1183 let this = self.project();
1184
1185 loop {
1186 // Attempt to receive a message.
1187 match this.receiver.try_recv() {
1188 Ok(msg) => return Poll::Ready(Ok(msg)),
1189 Err(TryRecvError::Closed) => return Poll::Ready(Err(RecvError)),
1190 Err(TryRecvError::Empty) => {}
1191 }
1192
1193 // Receiving failed - now start listening for notifications or wait for one.
1194 if this.listener.is_some() {
1195 // Poll using the given strategy
1196 ready!(S::poll(strategy, &mut *this.listener, cx));
1197 } else {
1198 *this.listener = Some(this.receiver.channel.recv_ops.listen());
1199 }
1200 }
1201 }
1202}
1203
1204#[cfg(feature = "std")]
1205use std::process::abort;
1206
1207#[cfg(not(feature = "std"))]
1208fn abort() -> ! {
1209 struct PanicOnDrop;
1210
1211 impl Drop for PanicOnDrop {
1212 fn drop(&mut self) {
1213 panic!("Panic while panicking to abort");
1214 }
1215 }
1216
1217 let _bomb = PanicOnDrop;
1218 panic!("Panic while panicking to abort")
1219}
1220