1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`](channel) function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// #Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// Owned permit to send one value into the channel.
72///
73/// This is identical to the [`Permit`] type, except that it moves the sender
74/// rather than borrowing it.
75///
76/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
77/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
78/// before generating a message to send.
79///
80/// [`Permit`]: Permit
81/// [`Sender::reserve_owned()`]: Sender::reserve_owned
82/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
83pub struct OwnedPermit<T> {
84 chan: Option<chan::Tx<T, Semaphore>>,
85}
86
87/// Receives values from the associated `Sender`.
88///
89/// Instances are created by the [`channel`](channel) function.
90///
91/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
92///
93/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
94pub struct Receiver<T> {
95 /// The channel receiver.
96 chan: chan::Rx<T, Semaphore>,
97}
98
99/// Creates a bounded mpsc channel for communicating between asynchronous tasks
100/// with backpressure.
101///
102/// The channel will buffer up to the provided number of messages. Once the
103/// buffer is full, attempts to send new messages will wait until a message is
104/// received from the channel. The provided buffer capacity must be at least 1.
105///
106/// All data sent on `Sender` will become available on `Receiver` in the same
107/// order as it was sent.
108///
109/// The `Sender` can be cloned to `send` to the same channel from multiple code
110/// locations. Only one `Receiver` is supported.
111///
112/// If the `Receiver` is disconnected while trying to `send`, the `send` method
113/// will return a `SendError`. Similarly, if `Sender` is disconnected while
114/// trying to `recv`, the `recv` method will return `None`.
115///
116/// # Panics
117///
118/// Panics if the buffer capacity is 0.
119///
120/// # Examples
121///
122/// ```rust
123/// use tokio::sync::mpsc;
124///
125/// #[tokio::main]
126/// async fn main() {
127/// let (tx, mut rx) = mpsc::channel(100);
128///
129/// tokio::spawn(async move {
130/// for i in 0..10 {
131/// if let Err(_) = tx.send(i).await {
132/// println!("receiver dropped");
133/// return;
134/// }
135/// }
136/// });
137///
138/// while let Some(i) = rx.recv().await {
139/// println!("got = {}", i);
140/// }
141/// }
142/// ```
143#[track_caller]
144pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
145 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
146 let semaphore: Semaphore = Semaphore {
147 semaphore: semaphore::Semaphore::new(permits:buffer),
148 bound: buffer,
149 };
150 let (tx: Tx, rx: Rx) = chan::channel(semaphore);
151
152 let tx: Sender = Sender::new(chan:tx);
153 let rx: Receiver = Receiver::new(chan:rx);
154
155 (tx, rx)
156}
157
158/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
159/// representing the channel bound.
160#[derive(Debug)]
161pub(crate) struct Semaphore {
162 pub(crate) semaphore: semaphore::Semaphore,
163 pub(crate) bound: usize,
164}
165
166impl<T> Receiver<T> {
167 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
168 Receiver { chan }
169 }
170
171 /// Receives the next value for this receiver.
172 ///
173 /// This method returns `None` if the channel has been closed and there are
174 /// no remaining messages in the channel's buffer. This indicates that no
175 /// further values can ever be received from this `Receiver`. The channel is
176 /// closed when all senders have been dropped, or when [`close`] is called.
177 ///
178 /// If there are no messages in the channel's buffer, but the channel has
179 /// not yet been closed, this method will sleep until a message is sent or
180 /// the channel is closed. Note that if [`close`] is called, but there are
181 /// still outstanding [`Permits`] from before it was closed, the channel is
182 /// not considered closed by `recv` until the permits are released.
183 ///
184 /// # Cancel safety
185 ///
186 /// This method is cancel safe. If `recv` is used as the event in a
187 /// [`tokio::select!`](crate::select) statement and some other branch
188 /// completes first, it is guaranteed that no messages were received on this
189 /// channel.
190 ///
191 /// [`close`]: Self::close
192 /// [`Permits`]: struct@crate::sync::mpsc::Permit
193 ///
194 /// # Examples
195 ///
196 /// ```
197 /// use tokio::sync::mpsc;
198 ///
199 /// #[tokio::main]
200 /// async fn main() {
201 /// let (tx, mut rx) = mpsc::channel(100);
202 ///
203 /// tokio::spawn(async move {
204 /// tx.send("hello").await.unwrap();
205 /// });
206 ///
207 /// assert_eq!(Some("hello"), rx.recv().await);
208 /// assert_eq!(None, rx.recv().await);
209 /// }
210 /// ```
211 ///
212 /// Values are buffered:
213 ///
214 /// ```
215 /// use tokio::sync::mpsc;
216 ///
217 /// #[tokio::main]
218 /// async fn main() {
219 /// let (tx, mut rx) = mpsc::channel(100);
220 ///
221 /// tx.send("hello").await.unwrap();
222 /// tx.send("world").await.unwrap();
223 ///
224 /// assert_eq!(Some("hello"), rx.recv().await);
225 /// assert_eq!(Some("world"), rx.recv().await);
226 /// }
227 /// ```
228 pub async fn recv(&mut self) -> Option<T> {
229 use crate::future::poll_fn;
230 poll_fn(|cx| self.chan.recv(cx)).await
231 }
232
233 /// Tries to receive the next value for this receiver.
234 ///
235 /// This method returns the [`Empty`] error if the channel is currently
236 /// empty, but there are still outstanding [senders] or [permits].
237 ///
238 /// This method returns the [`Disconnected`] error if the channel is
239 /// currently empty, and there are no outstanding [senders] or [permits].
240 ///
241 /// Unlike the [`poll_recv`] method, this method will never return an
242 /// [`Empty`] error spuriously.
243 ///
244 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
245 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
246 /// [`poll_recv`]: Self::poll_recv
247 /// [senders]: crate::sync::mpsc::Sender
248 /// [permits]: crate::sync::mpsc::Permit
249 ///
250 /// # Examples
251 ///
252 /// ```
253 /// use tokio::sync::mpsc;
254 /// use tokio::sync::mpsc::error::TryRecvError;
255 ///
256 /// #[tokio::main]
257 /// async fn main() {
258 /// let (tx, mut rx) = mpsc::channel(100);
259 ///
260 /// tx.send("hello").await.unwrap();
261 ///
262 /// assert_eq!(Ok("hello"), rx.try_recv());
263 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
264 ///
265 /// tx.send("hello").await.unwrap();
266 /// // Drop the last sender, closing the channel.
267 /// drop(tx);
268 ///
269 /// assert_eq!(Ok("hello"), rx.try_recv());
270 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
271 /// }
272 /// ```
273 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
274 self.chan.try_recv()
275 }
276
277 /// Blocking receive to call outside of asynchronous contexts.
278 ///
279 /// This method returns `None` if the channel has been closed and there are
280 /// no remaining messages in the channel's buffer. This indicates that no
281 /// further values can ever be received from this `Receiver`. The channel is
282 /// closed when all senders have been dropped, or when [`close`] is called.
283 ///
284 /// If there are no messages in the channel's buffer, but the channel has
285 /// not yet been closed, this method will block until a message is sent or
286 /// the channel is closed.
287 ///
288 /// This method is intended for use cases where you are sending from
289 /// asynchronous code to synchronous code, and will work even if the sender
290 /// is not using [`blocking_send`] to send the message.
291 ///
292 /// Note that if [`close`] is called, but there are still outstanding
293 /// [`Permits`] from before it was closed, the channel is not considered
294 /// closed by `blocking_recv` until the permits are released.
295 ///
296 /// [`close`]: Self::close
297 /// [`Permits`]: struct@crate::sync::mpsc::Permit
298 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
299 ///
300 /// # Panics
301 ///
302 /// This function panics if called within an asynchronous execution
303 /// context.
304 ///
305 /// # Examples
306 ///
307 /// ```
308 /// use std::thread;
309 /// use tokio::runtime::Runtime;
310 /// use tokio::sync::mpsc;
311 ///
312 /// fn main() {
313 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
314 ///
315 /// let sync_code = thread::spawn(move || {
316 /// assert_eq!(Some(10), rx.blocking_recv());
317 /// });
318 ///
319 /// Runtime::new()
320 /// .unwrap()
321 /// .block_on(async move {
322 /// let _ = tx.send(10).await;
323 /// });
324 /// sync_code.join().unwrap()
325 /// }
326 /// ```
327 #[track_caller]
328 #[cfg(feature = "sync")]
329 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
330 pub fn blocking_recv(&mut self) -> Option<T> {
331 crate::future::block_on(self.recv())
332 }
333
334 /// Closes the receiving half of a channel without dropping it.
335 ///
336 /// This prevents any further messages from being sent on the channel while
337 /// still enabling the receiver to drain messages that are buffered. Any
338 /// outstanding [`Permit`] values will still be able to send messages.
339 ///
340 /// To guarantee that no messages are dropped, after calling `close()`,
341 /// `recv()` must be called until `None` is returned. If there are
342 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
343 /// not return `None` until those are released.
344 ///
345 /// [`Permit`]: Permit
346 /// [`OwnedPermit`]: OwnedPermit
347 ///
348 /// # Examples
349 ///
350 /// ```
351 /// use tokio::sync::mpsc;
352 ///
353 /// #[tokio::main]
354 /// async fn main() {
355 /// let (tx, mut rx) = mpsc::channel(20);
356 ///
357 /// tokio::spawn(async move {
358 /// let mut i = 0;
359 /// while let Ok(permit) = tx.reserve().await {
360 /// permit.send(i);
361 /// i += 1;
362 /// }
363 /// });
364 ///
365 /// rx.close();
366 ///
367 /// while let Some(msg) = rx.recv().await {
368 /// println!("got {}", msg);
369 /// }
370 ///
371 /// // Channel closed and no messages are lost.
372 /// }
373 /// ```
374 pub fn close(&mut self) {
375 self.chan.close();
376 }
377
378 /// Polls to receive the next message on this channel.
379 ///
380 /// This method returns:
381 ///
382 /// * `Poll::Pending` if no messages are available but the channel is not
383 /// closed, or if a spurious failure happens.
384 /// * `Poll::Ready(Some(message))` if a message is available.
385 /// * `Poll::Ready(None)` if the channel has been closed and all messages
386 /// sent before it was closed have been received.
387 ///
388 /// When the method returns `Poll::Pending`, the `Waker` in the provided
389 /// `Context` is scheduled to receive a wakeup when a message is sent on any
390 /// receiver, or when the channel is closed. Note that on multiple calls to
391 /// `poll_recv`, only the `Waker` from the `Context` passed to the most
392 /// recent call is scheduled to receive a wakeup.
393 ///
394 /// If this method returns `Poll::Pending` due to a spurious failure, then
395 /// the `Waker` will be notified when the situation causing the spurious
396 /// failure has been resolved. Note that receiving such a wakeup does not
397 /// guarantee that the next call will succeed — it could fail with another
398 /// spurious failure.
399 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
400 self.chan.recv(cx)
401 }
402}
403
404impl<T> fmt::Debug for Receiver<T> {
405 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
406 fmt&mut DebugStruct<'_, '_>.debug_struct("Receiver")
407 .field(name:"chan", &self.chan)
408 .finish()
409 }
410}
411
412impl<T> Unpin for Receiver<T> {}
413
414impl<T> Sender<T> {
415 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
416 Sender { chan }
417 }
418
419 /// Sends a value, waiting until there is capacity.
420 ///
421 /// A successful send occurs when it is determined that the other end of the
422 /// channel has not hung up already. An unsuccessful send would be one where
423 /// the corresponding receiver has already been closed. Note that a return
424 /// value of `Err` means that the data will never be received, but a return
425 /// value of `Ok` does not mean that the data will be received. It is
426 /// possible for the corresponding receiver to hang up immediately after
427 /// this function returns `Ok`.
428 ///
429 /// # Errors
430 ///
431 /// If the receive half of the channel is closed, either due to [`close`]
432 /// being called or the [`Receiver`] handle dropping, the function returns
433 /// an error. The error includes the value passed to `send`.
434 ///
435 /// [`close`]: Receiver::close
436 /// [`Receiver`]: Receiver
437 ///
438 /// # Cancel safety
439 ///
440 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
441 /// statement and some other branch completes first, then it is guaranteed
442 /// that the message was not sent.
443 ///
444 /// This channel uses a queue to ensure that calls to `send` and `reserve`
445 /// complete in the order they were requested. Cancelling a call to
446 /// `send` makes you lose your place in the queue.
447 ///
448 /// # Examples
449 ///
450 /// In the following example, each call to `send` will block until the
451 /// previously sent value was received.
452 ///
453 /// ```rust
454 /// use tokio::sync::mpsc;
455 ///
456 /// #[tokio::main]
457 /// async fn main() {
458 /// let (tx, mut rx) = mpsc::channel(1);
459 ///
460 /// tokio::spawn(async move {
461 /// for i in 0..10 {
462 /// if let Err(_) = tx.send(i).await {
463 /// println!("receiver dropped");
464 /// return;
465 /// }
466 /// }
467 /// });
468 ///
469 /// while let Some(i) = rx.recv().await {
470 /// println!("got = {}", i);
471 /// }
472 /// }
473 /// ```
474 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
475 match self.reserve().await {
476 Ok(permit) => {
477 permit.send(value);
478 Ok(())
479 }
480 Err(_) => Err(SendError(value)),
481 }
482 }
483
484 /// Completes when the receiver has dropped.
485 ///
486 /// This allows the producers to get notified when interest in the produced
487 /// values is canceled and immediately stop doing work.
488 ///
489 /// # Cancel safety
490 ///
491 /// This method is cancel safe. Once the channel is closed, it stays closed
492 /// forever and all future calls to `closed` will return immediately.
493 ///
494 /// # Examples
495 ///
496 /// ```
497 /// use tokio::sync::mpsc;
498 ///
499 /// #[tokio::main]
500 /// async fn main() {
501 /// let (tx1, rx) = mpsc::channel::<()>(1);
502 /// let tx2 = tx1.clone();
503 /// let tx3 = tx1.clone();
504 /// let tx4 = tx1.clone();
505 /// let tx5 = tx1.clone();
506 /// tokio::spawn(async move {
507 /// drop(rx);
508 /// });
509 ///
510 /// futures::join!(
511 /// tx1.closed(),
512 /// tx2.closed(),
513 /// tx3.closed(),
514 /// tx4.closed(),
515 /// tx5.closed()
516 /// );
517 /// println!("Receiver dropped");
518 /// }
519 /// ```
520 pub async fn closed(&self) {
521 self.chan.closed().await
522 }
523
524 /// Attempts to immediately send a message on this `Sender`
525 ///
526 /// This method differs from [`send`] by returning immediately if the channel's
527 /// buffer is full or no receiver is waiting to acquire some data. Compared
528 /// with [`send`], this function has two failure cases instead of one (one for
529 /// disconnection, one for a full buffer).
530 ///
531 /// # Errors
532 ///
533 /// If the channel capacity has been reached, i.e., the channel has `n`
534 /// buffered values where `n` is the argument passed to [`channel`], then an
535 /// error is returned.
536 ///
537 /// If the receive half of the channel is closed, either due to [`close`]
538 /// being called or the [`Receiver`] handle dropping, the function returns
539 /// an error. The error includes the value passed to `send`.
540 ///
541 /// [`send`]: Sender::send
542 /// [`channel`]: channel
543 /// [`close`]: Receiver::close
544 ///
545 /// # Examples
546 ///
547 /// ```
548 /// use tokio::sync::mpsc;
549 ///
550 /// #[tokio::main]
551 /// async fn main() {
552 /// // Create a channel with buffer size 1
553 /// let (tx1, mut rx) = mpsc::channel(1);
554 /// let tx2 = tx1.clone();
555 ///
556 /// tokio::spawn(async move {
557 /// tx1.send(1).await.unwrap();
558 /// tx1.send(2).await.unwrap();
559 /// // task waits until the receiver receives a value.
560 /// });
561 ///
562 /// tokio::spawn(async move {
563 /// // This will return an error and send
564 /// // no message if the buffer is full
565 /// let _ = tx2.try_send(3);
566 /// });
567 ///
568 /// let mut msg;
569 /// msg = rx.recv().await.unwrap();
570 /// println!("message {} received", msg);
571 ///
572 /// msg = rx.recv().await.unwrap();
573 /// println!("message {} received", msg);
574 ///
575 /// // Third message may have never been sent
576 /// match rx.recv().await {
577 /// Some(msg) => println!("message {} received", msg),
578 /// None => println!("the third message was never sent"),
579 /// }
580 /// }
581 /// ```
582 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
583 match self.chan.semaphore().semaphore.try_acquire(1) {
584 Ok(_) => {}
585 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
586 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
587 }
588
589 // Send the message
590 self.chan.send(message);
591 Ok(())
592 }
593
594 /// Sends a value, waiting until there is capacity, but only for a limited time.
595 ///
596 /// Shares the same success and error conditions as [`send`], adding one more
597 /// condition for an unsuccessful send, which is when the provided timeout has
598 /// elapsed, and there is no capacity available.
599 ///
600 /// [`send`]: Sender::send
601 ///
602 /// # Errors
603 ///
604 /// If the receive half of the channel is closed, either due to [`close`]
605 /// being called or the [`Receiver`] having been dropped,
606 /// the function returns an error. The error includes the value passed to `send`.
607 ///
608 /// [`close`]: Receiver::close
609 /// [`Receiver`]: Receiver
610 ///
611 /// # Panics
612 ///
613 /// This function panics if it is called outside the context of a Tokio
614 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
615 ///
616 /// # Examples
617 ///
618 /// In the following example, each call to `send_timeout` will block until the
619 /// previously sent value was received, unless the timeout has elapsed.
620 ///
621 /// ```rust
622 /// use tokio::sync::mpsc;
623 /// use tokio::time::{sleep, Duration};
624 ///
625 /// #[tokio::main]
626 /// async fn main() {
627 /// let (tx, mut rx) = mpsc::channel(1);
628 ///
629 /// tokio::spawn(async move {
630 /// for i in 0..10 {
631 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
632 /// println!("send error: #{:?}", e);
633 /// return;
634 /// }
635 /// }
636 /// });
637 ///
638 /// while let Some(i) = rx.recv().await {
639 /// println!("got = {}", i);
640 /// sleep(Duration::from_millis(200)).await;
641 /// }
642 /// }
643 /// ```
644 #[cfg(feature = "time")]
645 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
646 pub async fn send_timeout(
647 &self,
648 value: T,
649 timeout: Duration,
650 ) -> Result<(), SendTimeoutError<T>> {
651 let permit = match crate::time::timeout(timeout, self.reserve()).await {
652 Err(_) => {
653 return Err(SendTimeoutError::Timeout(value));
654 }
655 Ok(Err(_)) => {
656 return Err(SendTimeoutError::Closed(value));
657 }
658 Ok(Ok(permit)) => permit,
659 };
660
661 permit.send(value);
662 Ok(())
663 }
664
665 /// Blocking send to call outside of asynchronous contexts.
666 ///
667 /// This method is intended for use cases where you are sending from
668 /// synchronous code to asynchronous code, and will work even if the
669 /// receiver is not using [`blocking_recv`] to receive the message.
670 ///
671 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
672 ///
673 /// # Panics
674 ///
675 /// This function panics if called within an asynchronous execution
676 /// context.
677 ///
678 /// # Examples
679 ///
680 /// ```
681 /// use std::thread;
682 /// use tokio::runtime::Runtime;
683 /// use tokio::sync::mpsc;
684 ///
685 /// fn main() {
686 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
687 ///
688 /// let sync_code = thread::spawn(move || {
689 /// tx.blocking_send(10).unwrap();
690 /// });
691 ///
692 /// Runtime::new().unwrap().block_on(async move {
693 /// assert_eq!(Some(10), rx.recv().await);
694 /// });
695 /// sync_code.join().unwrap()
696 /// }
697 /// ```
698 #[track_caller]
699 #[cfg(feature = "sync")]
700 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
701 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
702 crate::future::block_on(self.send(value))
703 }
704
705 /// Checks if the channel has been closed. This happens when the
706 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
707 /// called.
708 ///
709 /// [`Receiver`]: crate::sync::mpsc::Receiver
710 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
711 ///
712 /// ```
713 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
714 /// assert!(!tx.is_closed());
715 ///
716 /// let tx2 = tx.clone();
717 /// assert!(!tx2.is_closed());
718 ///
719 /// drop(rx);
720 /// assert!(tx.is_closed());
721 /// assert!(tx2.is_closed());
722 /// ```
723 pub fn is_closed(&self) -> bool {
724 self.chan.is_closed()
725 }
726
727 /// Waits for channel capacity. Once capacity to send one message is
728 /// available, it is reserved for the caller.
729 ///
730 /// If the channel is full, the function waits for the number of unreceived
731 /// messages to become less than the channel capacity. Capacity to send one
732 /// message is reserved for the caller. A [`Permit`] is returned to track
733 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
734 /// reserved capacity.
735 ///
736 /// Dropping [`Permit`] without sending a message releases the capacity back
737 /// to the channel.
738 ///
739 /// [`Permit`]: Permit
740 /// [`send`]: Permit::send
741 ///
742 /// # Cancel safety
743 ///
744 /// This channel uses a queue to ensure that calls to `send` and `reserve`
745 /// complete in the order they were requested. Cancelling a call to
746 /// `reserve` makes you lose your place in the queue.
747 ///
748 /// # Examples
749 ///
750 /// ```
751 /// use tokio::sync::mpsc;
752 ///
753 /// #[tokio::main]
754 /// async fn main() {
755 /// let (tx, mut rx) = mpsc::channel(1);
756 ///
757 /// // Reserve capacity
758 /// let permit = tx.reserve().await.unwrap();
759 ///
760 /// // Trying to send directly on the `tx` will fail due to no
761 /// // available capacity.
762 /// assert!(tx.try_send(123).is_err());
763 ///
764 /// // Sending on the permit succeeds
765 /// permit.send(456);
766 ///
767 /// // The value sent on the permit is received
768 /// assert_eq!(rx.recv().await.unwrap(), 456);
769 /// }
770 /// ```
771 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
772 self.reserve_inner().await?;
773 Ok(Permit { chan: &self.chan })
774 }
775
776 /// Waits for channel capacity, moving the `Sender` and returning an owned
777 /// permit. Once capacity to send one message is available, it is reserved
778 /// for the caller.
779 ///
780 /// This moves the sender _by value_, and returns an owned permit that can
781 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
782 /// this method may be used in cases where the permit must be valid for the
783 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
784 /// essentially a reference count increment, comparable to [`Arc::clone`]),
785 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
786 /// moved, it can be cloned prior to calling `reserve_owned`.
787 ///
788 /// If the channel is full, the function waits for the number of unreceived
789 /// messages to become less than the channel capacity. Capacity to send one
790 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
791 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
792 /// consumes the reserved capacity.
793 ///
794 /// Dropping the [`OwnedPermit`] without sending a message releases the
795 /// capacity back to the channel.
796 ///
797 /// # Cancel safety
798 ///
799 /// This channel uses a queue to ensure that calls to `send` and `reserve`
800 /// complete in the order they were requested. Cancelling a call to
801 /// `reserve_owned` makes you lose your place in the queue.
802 ///
803 /// # Examples
804 /// Sending a message using an [`OwnedPermit`]:
805 /// ```
806 /// use tokio::sync::mpsc;
807 ///
808 /// #[tokio::main]
809 /// async fn main() {
810 /// let (tx, mut rx) = mpsc::channel(1);
811 ///
812 /// // Reserve capacity, moving the sender.
813 /// let permit = tx.reserve_owned().await.unwrap();
814 ///
815 /// // Send a message, consuming the permit and returning
816 /// // the moved sender.
817 /// let tx = permit.send(123);
818 ///
819 /// // The value sent on the permit is received.
820 /// assert_eq!(rx.recv().await.unwrap(), 123);
821 ///
822 /// // The sender can now be used again.
823 /// tx.send(456).await.unwrap();
824 /// }
825 /// ```
826 ///
827 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
828 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
829 ///
830 /// ```
831 /// use tokio::sync::mpsc;
832 ///
833 /// #[tokio::main]
834 /// async fn main() {
835 /// let (tx, mut rx) = mpsc::channel(1);
836 ///
837 /// // Clone the sender and reserve capacity.
838 /// let permit = tx.clone().reserve_owned().await.unwrap();
839 ///
840 /// // Trying to send directly on the `tx` will fail due to no
841 /// // available capacity.
842 /// assert!(tx.try_send(123).is_err());
843 ///
844 /// // Sending on the permit succeeds.
845 /// permit.send(456);
846 ///
847 /// // The value sent on the permit is received
848 /// assert_eq!(rx.recv().await.unwrap(), 456);
849 /// }
850 /// ```
851 ///
852 /// [`Sender::reserve`]: Sender::reserve
853 /// [`OwnedPermit`]: OwnedPermit
854 /// [`send`]: OwnedPermit::send
855 /// [`Arc::clone`]: std::sync::Arc::clone
856 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
857 self.reserve_inner().await?;
858 Ok(OwnedPermit {
859 chan: Some(self.chan),
860 })
861 }
862
863 async fn reserve_inner(&self) -> Result<(), SendError<()>> {
864 crate::trace::async_trace_leaf().await;
865
866 match self.chan.semaphore().semaphore.acquire(1).await {
867 Ok(_) => Ok(()),
868 Err(_) => Err(SendError(())),
869 }
870 }
871
872 /// Tries to acquire a slot in the channel without waiting for the slot to become
873 /// available.
874 ///
875 /// If the channel is full this function will return [`TrySendError`], otherwise
876 /// if there is a slot available it will return a [`Permit`] that will then allow you
877 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
878 /// [`reserve`] except it does not await for the slot to become available.
879 ///
880 /// Dropping [`Permit`] without sending a message releases the capacity back
881 /// to the channel.
882 ///
883 /// [`Permit`]: Permit
884 /// [`send`]: Permit::send
885 /// [`reserve`]: Sender::reserve
886 ///
887 /// # Examples
888 ///
889 /// ```
890 /// use tokio::sync::mpsc;
891 ///
892 /// #[tokio::main]
893 /// async fn main() {
894 /// let (tx, mut rx) = mpsc::channel(1);
895 ///
896 /// // Reserve capacity
897 /// let permit = tx.try_reserve().unwrap();
898 ///
899 /// // Trying to send directly on the `tx` will fail due to no
900 /// // available capacity.
901 /// assert!(tx.try_send(123).is_err());
902 ///
903 /// // Trying to reserve an additional slot on the `tx` will
904 /// // fail because there is no capacity.
905 /// assert!(tx.try_reserve().is_err());
906 ///
907 /// // Sending on the permit succeeds
908 /// permit.send(456);
909 ///
910 /// // The value sent on the permit is received
911 /// assert_eq!(rx.recv().await.unwrap(), 456);
912 ///
913 /// }
914 /// ```
915 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
916 match self.chan.semaphore().semaphore.try_acquire(1) {
917 Ok(_) => {}
918 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
919 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
920 }
921
922 Ok(Permit { chan: &self.chan })
923 }
924
925 /// Tries to acquire a slot in the channel without waiting for the slot to become
926 /// available, returning an owned permit.
927 ///
928 /// This moves the sender _by value_, and returns an owned permit that can
929 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
930 /// this method may be used in cases where the permit must be valid for the
931 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
932 /// essentially a reference count increment, comparable to [`Arc::clone`]),
933 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
934 /// moved, it can be cloned prior to calling `try_reserve_owned`.
935 ///
936 /// If the channel is full this function will return a [`TrySendError`].
937 /// Since the sender is taken by value, the `TrySendError` returned in this
938 /// case contains the sender, so that it may be used again. Otherwise, if
939 /// there is a slot available, this method will return an [`OwnedPermit`]
940 /// that can then be used to [`send`] on the channel with a guaranteed slot.
941 /// This function is similar to [`reserve_owned`] except it does not await
942 /// for the slot to become available.
943 ///
944 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
945 /// to the channel.
946 ///
947 /// [`OwnedPermit`]: OwnedPermit
948 /// [`send`]: OwnedPermit::send
949 /// [`reserve_owned`]: Sender::reserve_owned
950 /// [`Arc::clone`]: std::sync::Arc::clone
951 ///
952 /// # Examples
953 ///
954 /// ```
955 /// use tokio::sync::mpsc;
956 ///
957 /// #[tokio::main]
958 /// async fn main() {
959 /// let (tx, mut rx) = mpsc::channel(1);
960 ///
961 /// // Reserve capacity
962 /// let permit = tx.clone().try_reserve_owned().unwrap();
963 ///
964 /// // Trying to send directly on the `tx` will fail due to no
965 /// // available capacity.
966 /// assert!(tx.try_send(123).is_err());
967 ///
968 /// // Trying to reserve an additional slot on the `tx` will
969 /// // fail because there is no capacity.
970 /// assert!(tx.try_reserve().is_err());
971 ///
972 /// // Sending on the permit succeeds
973 /// permit.send(456);
974 ///
975 /// // The value sent on the permit is received
976 /// assert_eq!(rx.recv().await.unwrap(), 456);
977 ///
978 /// }
979 /// ```
980 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
981 match self.chan.semaphore().semaphore.try_acquire(1) {
982 Ok(_) => {}
983 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
984 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
985 }
986
987 Ok(OwnedPermit {
988 chan: Some(self.chan),
989 })
990 }
991
992 /// Returns `true` if senders belong to the same channel.
993 ///
994 /// # Examples
995 ///
996 /// ```
997 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
998 /// let tx2 = tx.clone();
999 /// assert!(tx.same_channel(&tx2));
1000 ///
1001 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1002 /// assert!(!tx3.same_channel(&tx2));
1003 /// ```
1004 pub fn same_channel(&self, other: &Self) -> bool {
1005 self.chan.same_channel(&other.chan)
1006 }
1007
1008 /// Returns the current capacity of the channel.
1009 ///
1010 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1011 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1012 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1013 /// specified when calling [`channel`]
1014 ///
1015 /// # Examples
1016 ///
1017 /// ```
1018 /// use tokio::sync::mpsc;
1019 ///
1020 /// #[tokio::main]
1021 /// async fn main() {
1022 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1023 ///
1024 /// assert_eq!(tx.capacity(), 5);
1025 ///
1026 /// // Making a reservation drops the capacity by one.
1027 /// let permit = tx.reserve().await.unwrap();
1028 /// assert_eq!(tx.capacity(), 4);
1029 ///
1030 /// // Sending and receiving a value increases the capacity by one.
1031 /// permit.send(());
1032 /// rx.recv().await.unwrap();
1033 /// assert_eq!(tx.capacity(), 5);
1034 /// }
1035 /// ```
1036 ///
1037 /// [`send`]: Sender::send
1038 /// [`reserve`]: Sender::reserve
1039 /// [`channel`]: channel
1040 /// [`max_capacity`]: Sender::max_capacity
1041 pub fn capacity(&self) -> usize {
1042 self.chan.semaphore().semaphore.available_permits()
1043 }
1044
1045 /// Converts the `Sender` to a [`WeakSender`] that does not count
1046 /// towards RAII semantics, i.e. if all `Sender` instances of the
1047 /// channel were dropped and only `WeakSender` instances remain,
1048 /// the channel is closed.
1049 pub fn downgrade(&self) -> WeakSender<T> {
1050 WeakSender {
1051 chan: self.chan.downgrade(),
1052 }
1053 }
1054
1055 /// Returns the maximum buffer capacity of the channel.
1056 ///
1057 /// The maximum capacity is the buffer capacity initially specified when calling
1058 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1059 /// available buffer capacity: as messages are sent and received, the
1060 /// value returned by [`capacity`] will go up or down, whereas the value
1061 /// returned by `max_capacity` will remain constant.
1062 ///
1063 /// # Examples
1064 ///
1065 /// ```
1066 /// use tokio::sync::mpsc;
1067 ///
1068 /// #[tokio::main]
1069 /// async fn main() {
1070 /// let (tx, _rx) = mpsc::channel::<()>(5);
1071 ///
1072 /// // both max capacity and capacity are the same at first
1073 /// assert_eq!(tx.max_capacity(), 5);
1074 /// assert_eq!(tx.capacity(), 5);
1075 ///
1076 /// // Making a reservation doesn't change the max capacity.
1077 /// let permit = tx.reserve().await.unwrap();
1078 /// assert_eq!(tx.max_capacity(), 5);
1079 /// // but drops the capacity by one
1080 /// assert_eq!(tx.capacity(), 4);
1081 /// }
1082 /// ```
1083 ///
1084 /// [`channel`]: channel
1085 /// [`max_capacity`]: Sender::max_capacity
1086 /// [`capacity`]: Sender::capacity
1087 pub fn max_capacity(&self) -> usize {
1088 self.chan.semaphore().bound
1089 }
1090}
1091
1092impl<T> Clone for Sender<T> {
1093 fn clone(&self) -> Self {
1094 Sender {
1095 chan: self.chan.clone(),
1096 }
1097 }
1098}
1099
1100impl<T> fmt::Debug for Sender<T> {
1101 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1102 fmt&mut DebugStruct<'_, '_>.debug_struct("Sender")
1103 .field(name:"chan", &self.chan)
1104 .finish()
1105 }
1106}
1107
1108impl<T> Clone for WeakSender<T> {
1109 fn clone(&self) -> Self {
1110 WeakSender {
1111 chan: self.chan.clone(),
1112 }
1113 }
1114}
1115
1116impl<T> WeakSender<T> {
1117 /// Tries to convert a WeakSender into a [`Sender`]. This will return `Some`
1118 /// if there are other `Sender` instances alive and the channel wasn't
1119 /// previously dropped, otherwise `None` is returned.
1120 pub fn upgrade(&self) -> Option<Sender<T>> {
1121 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1122 }
1123}
1124
1125impl<T> fmt::Debug for WeakSender<T> {
1126 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1127 fmt.debug_struct(name:"WeakSender").finish()
1128 }
1129}
1130
1131// ===== impl Permit =====
1132
1133impl<T> Permit<'_, T> {
1134 /// Sends a value using the reserved capacity.
1135 ///
1136 /// Capacity for the message has already been reserved. The message is sent
1137 /// to the receiver and the permit is consumed. The operation will succeed
1138 /// even if the receiver half has been closed. See [`Receiver::close`] for
1139 /// more details on performing a clean shutdown.
1140 ///
1141 /// [`Receiver::close`]: Receiver::close
1142 ///
1143 /// # Examples
1144 ///
1145 /// ```
1146 /// use tokio::sync::mpsc;
1147 ///
1148 /// #[tokio::main]
1149 /// async fn main() {
1150 /// let (tx, mut rx) = mpsc::channel(1);
1151 ///
1152 /// // Reserve capacity
1153 /// let permit = tx.reserve().await.unwrap();
1154 ///
1155 /// // Trying to send directly on the `tx` will fail due to no
1156 /// // available capacity.
1157 /// assert!(tx.try_send(123).is_err());
1158 ///
1159 /// // Send a message on the permit
1160 /// permit.send(456);
1161 ///
1162 /// // The value sent on the permit is received
1163 /// assert_eq!(rx.recv().await.unwrap(), 456);
1164 /// }
1165 /// ```
1166 pub fn send(self, value: T) {
1167 use std::mem;
1168
1169 self.chan.send(value);
1170
1171 // Avoid the drop logic
1172 mem::forget(self);
1173 }
1174}
1175
1176impl<T> Drop for Permit<'_, T> {
1177 fn drop(&mut self) {
1178 use chan::Semaphore;
1179
1180 let semaphore: &Semaphore = self.chan.semaphore();
1181
1182 // Add the permit back to the semaphore
1183 semaphore.add_permit();
1184
1185 // If this is the last sender for this channel, wake the receiver so
1186 // that it can be notified that the channel is closed.
1187 if semaphore.is_closed() && semaphore.is_idle() {
1188 self.chan.wake_rx();
1189 }
1190 }
1191}
1192
1193impl<T> fmt::Debug for Permit<'_, T> {
1194 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1195 fmt&mut DebugStruct<'_, '_>.debug_struct("Permit")
1196 .field(name:"chan", &self.chan)
1197 .finish()
1198 }
1199}
1200
1201// ===== impl Permit =====
1202
1203impl<T> OwnedPermit<T> {
1204 /// Sends a value using the reserved capacity.
1205 ///
1206 /// Capacity for the message has already been reserved. The message is sent
1207 /// to the receiver and the permit is consumed. The operation will succeed
1208 /// even if the receiver half has been closed. See [`Receiver::close`] for
1209 /// more details on performing a clean shutdown.
1210 ///
1211 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1212 /// the `OwnedPermit` was reserved.
1213 ///
1214 /// [`Receiver::close`]: Receiver::close
1215 ///
1216 /// # Examples
1217 ///
1218 /// ```
1219 /// use tokio::sync::mpsc;
1220 ///
1221 /// #[tokio::main]
1222 /// async fn main() {
1223 /// let (tx, mut rx) = mpsc::channel(1);
1224 ///
1225 /// // Reserve capacity
1226 /// let permit = tx.reserve_owned().await.unwrap();
1227 ///
1228 /// // Send a message on the permit, returning the sender.
1229 /// let tx = permit.send(456);
1230 ///
1231 /// // The value sent on the permit is received
1232 /// assert_eq!(rx.recv().await.unwrap(), 456);
1233 ///
1234 /// // We may now reuse `tx` to send another message.
1235 /// tx.send(789).await.unwrap();
1236 /// }
1237 /// ```
1238 pub fn send(mut self, value: T) -> Sender<T> {
1239 let chan = self.chan.take().unwrap_or_else(|| {
1240 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1241 });
1242 chan.send(value);
1243
1244 Sender { chan }
1245 }
1246
1247 /// Releases the reserved capacity *without* sending a message, returning the
1248 /// [`Sender`].
1249 ///
1250 /// # Examples
1251 ///
1252 /// ```
1253 /// use tokio::sync::mpsc;
1254 ///
1255 /// #[tokio::main]
1256 /// async fn main() {
1257 /// let (tx, rx) = mpsc::channel(1);
1258 ///
1259 /// // Clone the sender and reserve capacity
1260 /// let permit = tx.clone().reserve_owned().await.unwrap();
1261 ///
1262 /// // Trying to send on the original `tx` will fail, since the `permit`
1263 /// // has reserved all the available capacity.
1264 /// assert!(tx.try_send(123).is_err());
1265 ///
1266 /// // Release the permit without sending a message, returning the clone
1267 /// // of the sender.
1268 /// let tx2 = permit.release();
1269 ///
1270 /// // We may now reuse `tx` to send another message.
1271 /// tx.send(789).await.unwrap();
1272 /// # drop(rx); drop(tx2);
1273 /// }
1274 /// ```
1275 ///
1276 /// [`Sender`]: Sender
1277 pub fn release(mut self) -> Sender<T> {
1278 use chan::Semaphore;
1279
1280 let chan = self.chan.take().unwrap_or_else(|| {
1281 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1282 });
1283
1284 // Add the permit back to the semaphore
1285 chan.semaphore().add_permit();
1286 Sender { chan }
1287 }
1288}
1289
1290impl<T> Drop for OwnedPermit<T> {
1291 fn drop(&mut self) {
1292 use chan::Semaphore;
1293
1294 // Are we still holding onto the sender?
1295 if let Some(chan: Tx) = self.chan.take() {
1296 let semaphore: &Semaphore = chan.semaphore();
1297
1298 // Add the permit back to the semaphore
1299 semaphore.add_permit();
1300
1301 // If this `OwnedPermit` is holding the last sender for this
1302 // channel, wake the receiver so that it can be notified that the
1303 // channel is closed.
1304 if semaphore.is_closed() && semaphore.is_idle() {
1305 chan.wake_rx();
1306 }
1307 }
1308
1309 // Otherwise, do nothing.
1310 }
1311}
1312
1313impl<T> fmt::Debug for OwnedPermit<T> {
1314 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1315 fmt&mut DebugStruct<'_, '_>.debug_struct("OwnedPermit")
1316 .field(name:"chan", &self.chan)
1317 .finish()
1318 }
1319}
1320