1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79 chan: &'a chan::Tx<T, Semaphore>,
80 n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96 chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107 /// The channel receiver.
108 chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages. Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0.
131///
132/// # Examples
133///
134/// ```rust
135/// use tokio::sync::mpsc;
136///
137/// #[tokio::main]
138/// async fn main() {
139/// let (tx, mut rx) = mpsc::channel(100);
140///
141/// tokio::spawn(async move {
142/// for i in 0..10 {
143/// if let Err(_) = tx.send(i).await {
144/// println!("receiver dropped");
145/// return;
146/// }
147/// }
148/// });
149///
150/// while let Some(i) = rx.recv().await {
151/// println!("got = {}", i);
152/// }
153/// }
154/// ```
155#[track_caller]
156pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
157 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
158 let semaphore: Semaphore = Semaphore {
159 semaphore: semaphore::Semaphore::new(permits:buffer),
160 bound: buffer,
161 };
162 let (tx: Tx, rx: Rx) = chan::channel(semaphore);
163
164 let tx: Sender = Sender::new(chan:tx);
165 let rx: Receiver = Receiver::new(chan:rx);
166
167 (tx, rx)
168}
169
170/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
171/// representing the channel bound.
172#[derive(Debug)]
173pub(crate) struct Semaphore {
174 pub(crate) semaphore: semaphore::Semaphore,
175 pub(crate) bound: usize,
176}
177
178impl<T> Receiver<T> {
179 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
180 Receiver { chan }
181 }
182
183 /// Receives the next value for this receiver.
184 ///
185 /// This method returns `None` if the channel has been closed and there are
186 /// no remaining messages in the channel's buffer. This indicates that no
187 /// further values can ever be received from this `Receiver`. The channel is
188 /// closed when all senders have been dropped, or when [`close`] is called.
189 ///
190 /// If there are no messages in the channel's buffer, but the channel has
191 /// not yet been closed, this method will sleep until a message is sent or
192 /// the channel is closed. Note that if [`close`] is called, but there are
193 /// still outstanding [`Permits`] from before it was closed, the channel is
194 /// not considered closed by `recv` until the permits are released.
195 ///
196 /// # Cancel safety
197 ///
198 /// This method is cancel safe. If `recv` is used as the event in a
199 /// [`tokio::select!`](crate::select) statement and some other branch
200 /// completes first, it is guaranteed that no messages were received on this
201 /// channel.
202 ///
203 /// [`close`]: Self::close
204 /// [`Permits`]: struct@crate::sync::mpsc::Permit
205 ///
206 /// # Examples
207 ///
208 /// ```
209 /// use tokio::sync::mpsc;
210 ///
211 /// #[tokio::main]
212 /// async fn main() {
213 /// let (tx, mut rx) = mpsc::channel(100);
214 ///
215 /// tokio::spawn(async move {
216 /// tx.send("hello").await.unwrap();
217 /// });
218 ///
219 /// assert_eq!(Some("hello"), rx.recv().await);
220 /// assert_eq!(None, rx.recv().await);
221 /// }
222 /// ```
223 ///
224 /// Values are buffered:
225 ///
226 /// ```
227 /// use tokio::sync::mpsc;
228 ///
229 /// #[tokio::main]
230 /// async fn main() {
231 /// let (tx, mut rx) = mpsc::channel(100);
232 ///
233 /// tx.send("hello").await.unwrap();
234 /// tx.send("world").await.unwrap();
235 ///
236 /// assert_eq!(Some("hello"), rx.recv().await);
237 /// assert_eq!(Some("world"), rx.recv().await);
238 /// }
239 /// ```
240 pub async fn recv(&mut self) -> Option<T> {
241 use crate::future::poll_fn;
242 poll_fn(|cx| self.chan.recv(cx)).await
243 }
244
245 /// Receives the next values for this receiver and extends `buffer`.
246 ///
247 /// This method extends `buffer` by no more than a fixed number of values
248 /// as specified by `limit`. If `limit` is zero, the function immediately
249 /// returns `0`. The return value is the number of values added to `buffer`.
250 ///
251 /// For `limit > 0`, if there are no messages in the channel's queue, but
252 /// the channel has not yet been closed, this method will sleep until a
253 /// message is sent or the channel is closed. Note that if [`close`] is
254 /// called, but there are still outstanding [`Permits`] from before it was
255 /// closed, the channel is not considered closed by `recv_many` until the
256 /// permits are released.
257 ///
258 /// For non-zero values of `limit`, this method will never return `0` unless
259 /// the channel has been closed and there are no remaining messages in the
260 /// channel's queue. This indicates that no further values can ever be
261 /// received from this `Receiver`. The channel is closed when all senders
262 /// have been dropped, or when [`close`] is called.
263 ///
264 /// The capacity of `buffer` is increased as needed.
265 ///
266 /// # Cancel safety
267 ///
268 /// This method is cancel safe. If `recv_many` is used as the event in a
269 /// [`tokio::select!`](crate::select) statement and some other branch
270 /// completes first, it is guaranteed that no messages were received on this
271 /// channel.
272 ///
273 /// [`close`]: Self::close
274 /// [`Permits`]: struct@crate::sync::mpsc::Permit
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use tokio::sync::mpsc;
280 ///
281 /// #[tokio::main]
282 /// async fn main() {
283 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
284 /// let limit = 2;
285 /// let (tx, mut rx) = mpsc::channel(100);
286 /// let tx2 = tx.clone();
287 /// tx2.send("first").await.unwrap();
288 /// tx2.send("second").await.unwrap();
289 /// tx2.send("third").await.unwrap();
290 ///
291 /// // Call `recv_many` to receive up to `limit` (2) values.
292 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
293 /// assert_eq!(vec!["first", "second"], buffer);
294 ///
295 /// // If the buffer is full, the next call to `recv_many`
296 /// // reserves additional capacity.
297 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
298 ///
299 /// tokio::spawn(async move {
300 /// tx.send("fourth").await.unwrap();
301 /// });
302 ///
303 /// // 'tx' is dropped, but `recv_many`
304 /// // is guaranteed not to return 0 as the channel
305 /// // is not yet closed.
306 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
307 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
308 ///
309 /// // Once the last sender is dropped, the channel is
310 /// // closed and `recv_many` returns 0, capacity unchanged.
311 /// drop(tx2);
312 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
313 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
314 /// }
315 /// ```
316 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
317 use crate::future::poll_fn;
318 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
319 }
320
321 /// Tries to receive the next value for this receiver.
322 ///
323 /// This method returns the [`Empty`] error if the channel is currently
324 /// empty, but there are still outstanding [senders] or [permits].
325 ///
326 /// This method returns the [`Disconnected`] error if the channel is
327 /// currently empty, and there are no outstanding [senders] or [permits].
328 ///
329 /// Unlike the [`poll_recv`] method, this method will never return an
330 /// [`Empty`] error spuriously.
331 ///
332 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
333 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
334 /// [`poll_recv`]: Self::poll_recv
335 /// [senders]: crate::sync::mpsc::Sender
336 /// [permits]: crate::sync::mpsc::Permit
337 ///
338 /// # Examples
339 ///
340 /// ```
341 /// use tokio::sync::mpsc;
342 /// use tokio::sync::mpsc::error::TryRecvError;
343 ///
344 /// #[tokio::main]
345 /// async fn main() {
346 /// let (tx, mut rx) = mpsc::channel(100);
347 ///
348 /// tx.send("hello").await.unwrap();
349 ///
350 /// assert_eq!(Ok("hello"), rx.try_recv());
351 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
352 ///
353 /// tx.send("hello").await.unwrap();
354 /// // Drop the last sender, closing the channel.
355 /// drop(tx);
356 ///
357 /// assert_eq!(Ok("hello"), rx.try_recv());
358 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
359 /// }
360 /// ```
361 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
362 self.chan.try_recv()
363 }
364
365 /// Blocking receive to call outside of asynchronous contexts.
366 ///
367 /// This method returns `None` if the channel has been closed and there are
368 /// no remaining messages in the channel's buffer. This indicates that no
369 /// further values can ever be received from this `Receiver`. The channel is
370 /// closed when all senders have been dropped, or when [`close`] is called.
371 ///
372 /// If there are no messages in the channel's buffer, but the channel has
373 /// not yet been closed, this method will block until a message is sent or
374 /// the channel is closed.
375 ///
376 /// This method is intended for use cases where you are sending from
377 /// asynchronous code to synchronous code, and will work even if the sender
378 /// is not using [`blocking_send`] to send the message.
379 ///
380 /// Note that if [`close`] is called, but there are still outstanding
381 /// [`Permits`] from before it was closed, the channel is not considered
382 /// closed by `blocking_recv` until the permits are released.
383 ///
384 /// [`close`]: Self::close
385 /// [`Permits`]: struct@crate::sync::mpsc::Permit
386 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
387 ///
388 /// # Panics
389 ///
390 /// This function panics if called within an asynchronous execution
391 /// context.
392 ///
393 /// # Examples
394 ///
395 /// ```
396 /// use std::thread;
397 /// use tokio::runtime::Runtime;
398 /// use tokio::sync::mpsc;
399 ///
400 /// fn main() {
401 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
402 ///
403 /// let sync_code = thread::spawn(move || {
404 /// assert_eq!(Some(10), rx.blocking_recv());
405 /// });
406 ///
407 /// Runtime::new()
408 /// .unwrap()
409 /// .block_on(async move {
410 /// let _ = tx.send(10).await;
411 /// });
412 /// sync_code.join().unwrap()
413 /// }
414 /// ```
415 #[track_caller]
416 #[cfg(feature = "sync")]
417 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
418 pub fn blocking_recv(&mut self) -> Option<T> {
419 crate::future::block_on(self.recv())
420 }
421
422 /// Closes the receiving half of a channel without dropping it.
423 ///
424 /// This prevents any further messages from being sent on the channel while
425 /// still enabling the receiver to drain messages that are buffered. Any
426 /// outstanding [`Permit`] values will still be able to send messages.
427 ///
428 /// To guarantee that no messages are dropped, after calling `close()`,
429 /// `recv()` must be called until `None` is returned. If there are
430 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
431 /// not return `None` until those are released.
432 ///
433 /// [`Permit`]: Permit
434 /// [`OwnedPermit`]: OwnedPermit
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// use tokio::sync::mpsc;
440 ///
441 /// #[tokio::main]
442 /// async fn main() {
443 /// let (tx, mut rx) = mpsc::channel(20);
444 ///
445 /// tokio::spawn(async move {
446 /// let mut i = 0;
447 /// while let Ok(permit) = tx.reserve().await {
448 /// permit.send(i);
449 /// i += 1;
450 /// }
451 /// });
452 ///
453 /// rx.close();
454 ///
455 /// while let Some(msg) = rx.recv().await {
456 /// println!("got {}", msg);
457 /// }
458 ///
459 /// // Channel closed and no messages are lost.
460 /// }
461 /// ```
462 pub fn close(&mut self) {
463 self.chan.close();
464 }
465
466 /// Polls to receive the next message on this channel.
467 ///
468 /// This method returns:
469 ///
470 /// * `Poll::Pending` if no messages are available but the channel is not
471 /// closed, or if a spurious failure happens.
472 /// * `Poll::Ready(Some(message))` if a message is available.
473 /// * `Poll::Ready(None)` if the channel has been closed and all messages
474 /// sent before it was closed have been received.
475 ///
476 /// When the method returns `Poll::Pending`, the `Waker` in the provided
477 /// `Context` is scheduled to receive a wakeup when a message is sent on any
478 /// receiver, or when the channel is closed. Note that on multiple calls to
479 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
480 /// passed to the most recent call is scheduled to receive a wakeup.
481 ///
482 /// If this method returns `Poll::Pending` due to a spurious failure, then
483 /// the `Waker` will be notified when the situation causing the spurious
484 /// failure has been resolved. Note that receiving such a wakeup does not
485 /// guarantee that the next call will succeed — it could fail with another
486 /// spurious failure.
487 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
488 self.chan.recv(cx)
489 }
490
491 /// Polls to receive multiple messages on this channel, extending the provided buffer.
492 ///
493 /// This method returns:
494 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
495 /// spurious failure happens.
496 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
497 /// stored in `buffer`. This can be less than, or equal to, `limit`.
498 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
499 ///
500 /// When the method returns `Poll::Pending`, the `Waker` in the provided
501 /// `Context` is scheduled to receive a wakeup when a message is sent on any
502 /// receiver, or when the channel is closed. Note that on multiple calls to
503 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
504 /// passed to the most recent call is scheduled to receive a wakeup.
505 ///
506 /// Note that this method does not guarantee that exactly `limit` messages
507 /// are received. Rather, if at least one message is available, it returns
508 /// as many messages as it can up to the given limit. This method returns
509 /// zero only if the channel is closed (or if `limit` is zero).
510 ///
511 /// # Examples
512 ///
513 /// ```
514 /// use std::task::{Context, Poll};
515 /// use std::pin::Pin;
516 /// use tokio::sync::mpsc;
517 /// use futures::Future;
518 ///
519 /// struct MyReceiverFuture<'a> {
520 /// receiver: mpsc::Receiver<i32>,
521 /// buffer: &'a mut Vec<i32>,
522 /// limit: usize,
523 /// }
524 ///
525 /// impl<'a> Future for MyReceiverFuture<'a> {
526 /// type Output = usize; // Number of messages received
527 ///
528 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
529 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
530 ///
531 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
532 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
533 /// Poll::Pending => Poll::Pending,
534 /// Poll::Ready(count) => Poll::Ready(count),
535 /// }
536 /// }
537 /// }
538 ///
539 /// #[tokio::main]
540 /// async fn main() {
541 /// let (tx, rx) = mpsc::channel(32);
542 /// let mut buffer = Vec::new();
543 ///
544 /// let my_receiver_future = MyReceiverFuture {
545 /// receiver: rx,
546 /// buffer: &mut buffer,
547 /// limit: 3,
548 /// };
549 ///
550 /// for i in 0..10 {
551 /// tx.send(i).await.unwrap();
552 /// }
553 ///
554 /// let count = my_receiver_future.await;
555 /// assert_eq!(count, 3);
556 /// assert_eq!(buffer, vec![0,1,2])
557 /// }
558 /// ```
559 pub fn poll_recv_many(
560 &mut self,
561 cx: &mut Context<'_>,
562 buffer: &mut Vec<T>,
563 limit: usize,
564 ) -> Poll<usize> {
565 self.chan.recv_many(cx, buffer, limit)
566 }
567}
568
569impl<T> fmt::Debug for Receiver<T> {
570 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
571 fmt&mut DebugStruct<'_, '_>.debug_struct("Receiver")
572 .field(name:"chan", &self.chan)
573 .finish()
574 }
575}
576
577impl<T> Unpin for Receiver<T> {}
578
579impl<T> Sender<T> {
580 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
581 Sender { chan }
582 }
583
584 /// Sends a value, waiting until there is capacity.
585 ///
586 /// A successful send occurs when it is determined that the other end of the
587 /// channel has not hung up already. An unsuccessful send would be one where
588 /// the corresponding receiver has already been closed. Note that a return
589 /// value of `Err` means that the data will never be received, but a return
590 /// value of `Ok` does not mean that the data will be received. It is
591 /// possible for the corresponding receiver to hang up immediately after
592 /// this function returns `Ok`.
593 ///
594 /// # Errors
595 ///
596 /// If the receive half of the channel is closed, either due to [`close`]
597 /// being called or the [`Receiver`] handle dropping, the function returns
598 /// an error. The error includes the value passed to `send`.
599 ///
600 /// [`close`]: Receiver::close
601 /// [`Receiver`]: Receiver
602 ///
603 /// # Cancel safety
604 ///
605 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
606 /// statement and some other branch completes first, then it is guaranteed
607 /// that the message was not sent. **However, in that case, the message
608 /// is dropped and will be lost.**
609 ///
610 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
611 /// capacity, then use the returned [`Permit`] to send the message.
612 ///
613 /// This channel uses a queue to ensure that calls to `send` and `reserve`
614 /// complete in the order they were requested. Cancelling a call to
615 /// `send` makes you lose your place in the queue.
616 ///
617 /// # Examples
618 ///
619 /// In the following example, each call to `send` will block until the
620 /// previously sent value was received.
621 ///
622 /// ```rust
623 /// use tokio::sync::mpsc;
624 ///
625 /// #[tokio::main]
626 /// async fn main() {
627 /// let (tx, mut rx) = mpsc::channel(1);
628 ///
629 /// tokio::spawn(async move {
630 /// for i in 0..10 {
631 /// if let Err(_) = tx.send(i).await {
632 /// println!("receiver dropped");
633 /// return;
634 /// }
635 /// }
636 /// });
637 ///
638 /// while let Some(i) = rx.recv().await {
639 /// println!("got = {}", i);
640 /// }
641 /// }
642 /// ```
643 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
644 match self.reserve().await {
645 Ok(permit) => {
646 permit.send(value);
647 Ok(())
648 }
649 Err(_) => Err(SendError(value)),
650 }
651 }
652
653 /// Completes when the receiver has dropped.
654 ///
655 /// This allows the producers to get notified when interest in the produced
656 /// values is canceled and immediately stop doing work.
657 ///
658 /// # Cancel safety
659 ///
660 /// This method is cancel safe. Once the channel is closed, it stays closed
661 /// forever and all future calls to `closed` will return immediately.
662 ///
663 /// # Examples
664 ///
665 /// ```
666 /// use tokio::sync::mpsc;
667 ///
668 /// #[tokio::main]
669 /// async fn main() {
670 /// let (tx1, rx) = mpsc::channel::<()>(1);
671 /// let tx2 = tx1.clone();
672 /// let tx3 = tx1.clone();
673 /// let tx4 = tx1.clone();
674 /// let tx5 = tx1.clone();
675 /// tokio::spawn(async move {
676 /// drop(rx);
677 /// });
678 ///
679 /// futures::join!(
680 /// tx1.closed(),
681 /// tx2.closed(),
682 /// tx3.closed(),
683 /// tx4.closed(),
684 /// tx5.closed()
685 /// );
686 /// println!("Receiver dropped");
687 /// }
688 /// ```
689 pub async fn closed(&self) {
690 self.chan.closed().await;
691 }
692
693 /// Attempts to immediately send a message on this `Sender`
694 ///
695 /// This method differs from [`send`] by returning immediately if the channel's
696 /// buffer is full or no receiver is waiting to acquire some data. Compared
697 /// with [`send`], this function has two failure cases instead of one (one for
698 /// disconnection, one for a full buffer).
699 ///
700 /// # Errors
701 ///
702 /// If the channel capacity has been reached, i.e., the channel has `n`
703 /// buffered values where `n` is the argument passed to [`channel`], then an
704 /// error is returned.
705 ///
706 /// If the receive half of the channel is closed, either due to [`close`]
707 /// being called or the [`Receiver`] handle dropping, the function returns
708 /// an error. The error includes the value passed to `send`.
709 ///
710 /// [`send`]: Sender::send
711 /// [`channel`]: channel
712 /// [`close`]: Receiver::close
713 ///
714 /// # Examples
715 ///
716 /// ```
717 /// use tokio::sync::mpsc;
718 ///
719 /// #[tokio::main]
720 /// async fn main() {
721 /// // Create a channel with buffer size 1
722 /// let (tx1, mut rx) = mpsc::channel(1);
723 /// let tx2 = tx1.clone();
724 ///
725 /// tokio::spawn(async move {
726 /// tx1.send(1).await.unwrap();
727 /// tx1.send(2).await.unwrap();
728 /// // task waits until the receiver receives a value.
729 /// });
730 ///
731 /// tokio::spawn(async move {
732 /// // This will return an error and send
733 /// // no message if the buffer is full
734 /// let _ = tx2.try_send(3);
735 /// });
736 ///
737 /// let mut msg;
738 /// msg = rx.recv().await.unwrap();
739 /// println!("message {} received", msg);
740 ///
741 /// msg = rx.recv().await.unwrap();
742 /// println!("message {} received", msg);
743 ///
744 /// // Third message may have never been sent
745 /// match rx.recv().await {
746 /// Some(msg) => println!("message {} received", msg),
747 /// None => println!("the third message was never sent"),
748 /// }
749 /// }
750 /// ```
751 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
752 match self.chan.semaphore().semaphore.try_acquire(1) {
753 Ok(()) => {}
754 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
755 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
756 }
757
758 // Send the message
759 self.chan.send(message);
760 Ok(())
761 }
762
763 /// Sends a value, waiting until there is capacity, but only for a limited time.
764 ///
765 /// Shares the same success and error conditions as [`send`], adding one more
766 /// condition for an unsuccessful send, which is when the provided timeout has
767 /// elapsed, and there is no capacity available.
768 ///
769 /// [`send`]: Sender::send
770 ///
771 /// # Errors
772 ///
773 /// If the receive half of the channel is closed, either due to [`close`]
774 /// being called or the [`Receiver`] having been dropped,
775 /// the function returns an error. The error includes the value passed to `send`.
776 ///
777 /// [`close`]: Receiver::close
778 /// [`Receiver`]: Receiver
779 ///
780 /// # Panics
781 ///
782 /// This function panics if it is called outside the context of a Tokio
783 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
784 ///
785 /// # Examples
786 ///
787 /// In the following example, each call to `send_timeout` will block until the
788 /// previously sent value was received, unless the timeout has elapsed.
789 ///
790 /// ```rust
791 /// use tokio::sync::mpsc;
792 /// use tokio::time::{sleep, Duration};
793 ///
794 /// #[tokio::main]
795 /// async fn main() {
796 /// let (tx, mut rx) = mpsc::channel(1);
797 ///
798 /// tokio::spawn(async move {
799 /// for i in 0..10 {
800 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
801 /// println!("send error: #{:?}", e);
802 /// return;
803 /// }
804 /// }
805 /// });
806 ///
807 /// while let Some(i) = rx.recv().await {
808 /// println!("got = {}", i);
809 /// sleep(Duration::from_millis(200)).await;
810 /// }
811 /// }
812 /// ```
813 #[cfg(feature = "time")]
814 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
815 pub async fn send_timeout(
816 &self,
817 value: T,
818 timeout: Duration,
819 ) -> Result<(), SendTimeoutError<T>> {
820 let permit = match crate::time::timeout(timeout, self.reserve()).await {
821 Err(_) => {
822 return Err(SendTimeoutError::Timeout(value));
823 }
824 Ok(Err(_)) => {
825 return Err(SendTimeoutError::Closed(value));
826 }
827 Ok(Ok(permit)) => permit,
828 };
829
830 permit.send(value);
831 Ok(())
832 }
833
834 /// Blocking send to call outside of asynchronous contexts.
835 ///
836 /// This method is intended for use cases where you are sending from
837 /// synchronous code to asynchronous code, and will work even if the
838 /// receiver is not using [`blocking_recv`] to receive the message.
839 ///
840 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
841 ///
842 /// # Panics
843 ///
844 /// This function panics if called within an asynchronous execution
845 /// context.
846 ///
847 /// # Examples
848 ///
849 /// ```
850 /// use std::thread;
851 /// use tokio::runtime::Runtime;
852 /// use tokio::sync::mpsc;
853 ///
854 /// fn main() {
855 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
856 ///
857 /// let sync_code = thread::spawn(move || {
858 /// tx.blocking_send(10).unwrap();
859 /// });
860 ///
861 /// Runtime::new().unwrap().block_on(async move {
862 /// assert_eq!(Some(10), rx.recv().await);
863 /// });
864 /// sync_code.join().unwrap()
865 /// }
866 /// ```
867 #[track_caller]
868 #[cfg(feature = "sync")]
869 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
870 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
871 crate::future::block_on(self.send(value))
872 }
873
874 /// Checks if the channel has been closed. This happens when the
875 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
876 /// called.
877 ///
878 /// [`Receiver`]: crate::sync::mpsc::Receiver
879 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
880 ///
881 /// ```
882 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
883 /// assert!(!tx.is_closed());
884 ///
885 /// let tx2 = tx.clone();
886 /// assert!(!tx2.is_closed());
887 ///
888 /// drop(rx);
889 /// assert!(tx.is_closed());
890 /// assert!(tx2.is_closed());
891 /// ```
892 pub fn is_closed(&self) -> bool {
893 self.chan.is_closed()
894 }
895
896 /// Waits for channel capacity. Once capacity to send one message is
897 /// available, it is reserved for the caller.
898 ///
899 /// If the channel is full, the function waits for the number of unreceived
900 /// messages to become less than the channel capacity. Capacity to send one
901 /// message is reserved for the caller. A [`Permit`] is returned to track
902 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
903 /// reserved capacity.
904 ///
905 /// Dropping [`Permit`] without sending a message releases the capacity back
906 /// to the channel.
907 ///
908 /// [`Permit`]: Permit
909 /// [`send`]: Permit::send
910 ///
911 /// # Cancel safety
912 ///
913 /// This channel uses a queue to ensure that calls to `send` and `reserve`
914 /// complete in the order they were requested. Cancelling a call to
915 /// `reserve` makes you lose your place in the queue.
916 ///
917 /// # Examples
918 ///
919 /// ```
920 /// use tokio::sync::mpsc;
921 ///
922 /// #[tokio::main]
923 /// async fn main() {
924 /// let (tx, mut rx) = mpsc::channel(1);
925 ///
926 /// // Reserve capacity
927 /// let permit = tx.reserve().await.unwrap();
928 ///
929 /// // Trying to send directly on the `tx` will fail due to no
930 /// // available capacity.
931 /// assert!(tx.try_send(123).is_err());
932 ///
933 /// // Sending on the permit succeeds
934 /// permit.send(456);
935 ///
936 /// // The value sent on the permit is received
937 /// assert_eq!(rx.recv().await.unwrap(), 456);
938 /// }
939 /// ```
940 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
941 self.reserve_inner(1).await?;
942 Ok(Permit { chan: &self.chan })
943 }
944
945 /// Waits for channel capacity. Once capacity to send `n` messages is
946 /// available, it is reserved for the caller.
947 ///
948 /// If the channel is full or if there are fewer than `n` permits available, the function waits
949 /// for the number of unreceived messages to become `n` less than the channel capacity.
950 /// Capacity to send `n` message is then reserved for the caller.
951 ///
952 /// A [`PermitIterator`] is returned to track the reserved capacity.
953 /// You can call this [`Iterator`] until it is exhausted to
954 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
955 /// [`try_reserve_many`] except it awaits for the slots to become available.
956 ///
957 /// If the channel is closed, the function returns a [`SendError`].
958 ///
959 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
960 /// permits back to the channel.
961 ///
962 /// [`PermitIterator`]: PermitIterator
963 /// [`Permit`]: Permit
964 /// [`send`]: Permit::send
965 /// [`try_reserve_many`]: Sender::try_reserve_many
966 ///
967 /// # Cancel safety
968 ///
969 /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
970 /// complete in the order they were requested. Cancelling a call to
971 /// `reserve_many` makes you lose your place in the queue.
972 ///
973 /// # Examples
974 ///
975 /// ```
976 /// use tokio::sync::mpsc;
977 ///
978 /// #[tokio::main]
979 /// async fn main() {
980 /// let (tx, mut rx) = mpsc::channel(2);
981 ///
982 /// // Reserve capacity
983 /// let mut permit = tx.reserve_many(2).await.unwrap();
984 ///
985 /// // Trying to send directly on the `tx` will fail due to no
986 /// // available capacity.
987 /// assert!(tx.try_send(123).is_err());
988 ///
989 /// // Sending with the permit iterator succeeds
990 /// permit.next().unwrap().send(456);
991 /// permit.next().unwrap().send(457);
992 ///
993 /// // The iterator should now be exhausted
994 /// assert!(permit.next().is_none());
995 ///
996 /// // The value sent on the permit is received
997 /// assert_eq!(rx.recv().await.unwrap(), 456);
998 /// assert_eq!(rx.recv().await.unwrap(), 457);
999 /// }
1000 /// ```
1001 pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1002 self.reserve_inner(n).await?;
1003 Ok(PermitIterator {
1004 chan: &self.chan,
1005 n,
1006 })
1007 }
1008
1009 /// Waits for channel capacity, moving the `Sender` and returning an owned
1010 /// permit. Once capacity to send one message is available, it is reserved
1011 /// for the caller.
1012 ///
1013 /// This moves the sender _by value_, and returns an owned permit that can
1014 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1015 /// this method may be used in cases where the permit must be valid for the
1016 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1017 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1018 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1019 /// moved, it can be cloned prior to calling `reserve_owned`.
1020 ///
1021 /// If the channel is full, the function waits for the number of unreceived
1022 /// messages to become less than the channel capacity. Capacity to send one
1023 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1024 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1025 /// consumes the reserved capacity.
1026 ///
1027 /// Dropping the [`OwnedPermit`] without sending a message releases the
1028 /// capacity back to the channel.
1029 ///
1030 /// # Cancel safety
1031 ///
1032 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1033 /// complete in the order they were requested. Cancelling a call to
1034 /// `reserve_owned` makes you lose your place in the queue.
1035 ///
1036 /// # Examples
1037 /// Sending a message using an [`OwnedPermit`]:
1038 /// ```
1039 /// use tokio::sync::mpsc;
1040 ///
1041 /// #[tokio::main]
1042 /// async fn main() {
1043 /// let (tx, mut rx) = mpsc::channel(1);
1044 ///
1045 /// // Reserve capacity, moving the sender.
1046 /// let permit = tx.reserve_owned().await.unwrap();
1047 ///
1048 /// // Send a message, consuming the permit and returning
1049 /// // the moved sender.
1050 /// let tx = permit.send(123);
1051 ///
1052 /// // The value sent on the permit is received.
1053 /// assert_eq!(rx.recv().await.unwrap(), 123);
1054 ///
1055 /// // The sender can now be used again.
1056 /// tx.send(456).await.unwrap();
1057 /// }
1058 /// ```
1059 ///
1060 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1061 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1062 ///
1063 /// ```
1064 /// use tokio::sync::mpsc;
1065 ///
1066 /// #[tokio::main]
1067 /// async fn main() {
1068 /// let (tx, mut rx) = mpsc::channel(1);
1069 ///
1070 /// // Clone the sender and reserve capacity.
1071 /// let permit = tx.clone().reserve_owned().await.unwrap();
1072 ///
1073 /// // Trying to send directly on the `tx` will fail due to no
1074 /// // available capacity.
1075 /// assert!(tx.try_send(123).is_err());
1076 ///
1077 /// // Sending on the permit succeeds.
1078 /// permit.send(456);
1079 ///
1080 /// // The value sent on the permit is received
1081 /// assert_eq!(rx.recv().await.unwrap(), 456);
1082 /// }
1083 /// ```
1084 ///
1085 /// [`Sender::reserve`]: Sender::reserve
1086 /// [`OwnedPermit`]: OwnedPermit
1087 /// [`send`]: OwnedPermit::send
1088 /// [`Arc::clone`]: std::sync::Arc::clone
1089 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1090 self.reserve_inner(1).await?;
1091 Ok(OwnedPermit {
1092 chan: Some(self.chan),
1093 })
1094 }
1095
1096 async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1097 crate::trace::async_trace_leaf().await;
1098
1099 if n > self.max_capacity() {
1100 return Err(SendError(()));
1101 }
1102 match self.chan.semaphore().semaphore.acquire(n).await {
1103 Ok(()) => Ok(()),
1104 Err(_) => Err(SendError(())),
1105 }
1106 }
1107
1108 /// Tries to acquire a slot in the channel without waiting for the slot to become
1109 /// available.
1110 ///
1111 /// If the channel is full this function will return [`TrySendError`], otherwise
1112 /// if there is a slot available it will return a [`Permit`] that will then allow you
1113 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1114 /// [`reserve`] except it does not await for the slot to become available.
1115 ///
1116 /// Dropping [`Permit`] without sending a message releases the capacity back
1117 /// to the channel.
1118 ///
1119 /// [`Permit`]: Permit
1120 /// [`send`]: Permit::send
1121 /// [`reserve`]: Sender::reserve
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```
1126 /// use tokio::sync::mpsc;
1127 ///
1128 /// #[tokio::main]
1129 /// async fn main() {
1130 /// let (tx, mut rx) = mpsc::channel(1);
1131 ///
1132 /// // Reserve capacity
1133 /// let permit = tx.try_reserve().unwrap();
1134 ///
1135 /// // Trying to send directly on the `tx` will fail due to no
1136 /// // available capacity.
1137 /// assert!(tx.try_send(123).is_err());
1138 ///
1139 /// // Trying to reserve an additional slot on the `tx` will
1140 /// // fail because there is no capacity.
1141 /// assert!(tx.try_reserve().is_err());
1142 ///
1143 /// // Sending on the permit succeeds
1144 /// permit.send(456);
1145 ///
1146 /// // The value sent on the permit is received
1147 /// assert_eq!(rx.recv().await.unwrap(), 456);
1148 ///
1149 /// }
1150 /// ```
1151 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1152 match self.chan.semaphore().semaphore.try_acquire(1) {
1153 Ok(()) => {}
1154 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1155 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1156 }
1157
1158 Ok(Permit { chan: &self.chan })
1159 }
1160
1161 /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1162 /// available.
1163 ///
1164 /// A [`PermitIterator`] is returned to track the reserved capacity.
1165 /// You can call this [`Iterator`] until it is exhausted to
1166 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1167 /// [`reserve_many`] except it does not await for the slots to become available.
1168 ///
1169 /// If there are fewer than `n` permits available on the channel, then
1170 /// this function will return a [`TrySendError::Full`]. If the channel is closed
1171 /// this function will return a [`TrySendError::Closed`].
1172 ///
1173 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1174 /// permits back to the channel.
1175 ///
1176 /// [`PermitIterator`]: PermitIterator
1177 /// [`send`]: Permit::send
1178 /// [`reserve_many`]: Sender::reserve_many
1179 ///
1180 /// # Examples
1181 ///
1182 /// ```
1183 /// use tokio::sync::mpsc;
1184 ///
1185 /// #[tokio::main]
1186 /// async fn main() {
1187 /// let (tx, mut rx) = mpsc::channel(2);
1188 ///
1189 /// // Reserve capacity
1190 /// let mut permit = tx.try_reserve_many(2).unwrap();
1191 ///
1192 /// // Trying to send directly on the `tx` will fail due to no
1193 /// // available capacity.
1194 /// assert!(tx.try_send(123).is_err());
1195 ///
1196 /// // Trying to reserve an additional slot on the `tx` will
1197 /// // fail because there is no capacity.
1198 /// assert!(tx.try_reserve().is_err());
1199 ///
1200 /// // Sending with the permit iterator succeeds
1201 /// permit.next().unwrap().send(456);
1202 /// permit.next().unwrap().send(457);
1203 ///
1204 /// // The iterator should now be exhausted
1205 /// assert!(permit.next().is_none());
1206 ///
1207 /// // The value sent on the permit is received
1208 /// assert_eq!(rx.recv().await.unwrap(), 456);
1209 /// assert_eq!(rx.recv().await.unwrap(), 457);
1210 ///
1211 /// // Trying to call try_reserve_many with 0 will return an empty iterator
1212 /// let mut permit = tx.try_reserve_many(0).unwrap();
1213 /// assert!(permit.next().is_none());
1214 ///
1215 /// // Trying to call try_reserve_many with a number greater than the channel
1216 /// // capacity will return an error
1217 /// let permit = tx.try_reserve_many(3);
1218 /// assert!(permit.is_err());
1219 ///
1220 /// // Trying to call try_reserve_many on a closed channel will return an error
1221 /// drop(rx);
1222 /// let permit = tx.try_reserve_many(1);
1223 /// assert!(permit.is_err());
1224 ///
1225 /// let permit = tx.try_reserve_many(0);
1226 /// assert!(permit.is_err());
1227 /// }
1228 /// ```
1229 pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1230 if n > self.max_capacity() {
1231 return Err(TrySendError::Full(()));
1232 }
1233
1234 match self.chan.semaphore().semaphore.try_acquire(n) {
1235 Ok(()) => {}
1236 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1237 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1238 }
1239
1240 Ok(PermitIterator {
1241 chan: &self.chan,
1242 n,
1243 })
1244 }
1245
1246 /// Tries to acquire a slot in the channel without waiting for the slot to become
1247 /// available, returning an owned permit.
1248 ///
1249 /// This moves the sender _by value_, and returns an owned permit that can
1250 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1251 /// this method may be used in cases where the permit must be valid for the
1252 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1253 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1254 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1255 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1256 ///
1257 /// If the channel is full this function will return a [`TrySendError`].
1258 /// Since the sender is taken by value, the `TrySendError` returned in this
1259 /// case contains the sender, so that it may be used again. Otherwise, if
1260 /// there is a slot available, this method will return an [`OwnedPermit`]
1261 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1262 /// This function is similar to [`reserve_owned`] except it does not await
1263 /// for the slot to become available.
1264 ///
1265 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1266 /// to the channel.
1267 ///
1268 /// [`OwnedPermit`]: OwnedPermit
1269 /// [`send`]: OwnedPermit::send
1270 /// [`reserve_owned`]: Sender::reserve_owned
1271 /// [`Arc::clone`]: std::sync::Arc::clone
1272 ///
1273 /// # Examples
1274 ///
1275 /// ```
1276 /// use tokio::sync::mpsc;
1277 ///
1278 /// #[tokio::main]
1279 /// async fn main() {
1280 /// let (tx, mut rx) = mpsc::channel(1);
1281 ///
1282 /// // Reserve capacity
1283 /// let permit = tx.clone().try_reserve_owned().unwrap();
1284 ///
1285 /// // Trying to send directly on the `tx` will fail due to no
1286 /// // available capacity.
1287 /// assert!(tx.try_send(123).is_err());
1288 ///
1289 /// // Trying to reserve an additional slot on the `tx` will
1290 /// // fail because there is no capacity.
1291 /// assert!(tx.try_reserve().is_err());
1292 ///
1293 /// // Sending on the permit succeeds
1294 /// permit.send(456);
1295 ///
1296 /// // The value sent on the permit is received
1297 /// assert_eq!(rx.recv().await.unwrap(), 456);
1298 ///
1299 /// }
1300 /// ```
1301 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1302 match self.chan.semaphore().semaphore.try_acquire(1) {
1303 Ok(()) => {}
1304 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1305 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1306 }
1307
1308 Ok(OwnedPermit {
1309 chan: Some(self.chan),
1310 })
1311 }
1312
1313 /// Returns `true` if senders belong to the same channel.
1314 ///
1315 /// # Examples
1316 ///
1317 /// ```
1318 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1319 /// let tx2 = tx.clone();
1320 /// assert!(tx.same_channel(&tx2));
1321 ///
1322 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1323 /// assert!(!tx3.same_channel(&tx2));
1324 /// ```
1325 pub fn same_channel(&self, other: &Self) -> bool {
1326 self.chan.same_channel(&other.chan)
1327 }
1328
1329 /// Returns the current capacity of the channel.
1330 ///
1331 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1332 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1333 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1334 /// specified when calling [`channel`]
1335 ///
1336 /// # Examples
1337 ///
1338 /// ```
1339 /// use tokio::sync::mpsc;
1340 ///
1341 /// #[tokio::main]
1342 /// async fn main() {
1343 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1344 ///
1345 /// assert_eq!(tx.capacity(), 5);
1346 ///
1347 /// // Making a reservation drops the capacity by one.
1348 /// let permit = tx.reserve().await.unwrap();
1349 /// assert_eq!(tx.capacity(), 4);
1350 ///
1351 /// // Sending and receiving a value increases the capacity by one.
1352 /// permit.send(());
1353 /// rx.recv().await.unwrap();
1354 /// assert_eq!(tx.capacity(), 5);
1355 /// }
1356 /// ```
1357 ///
1358 /// [`send`]: Sender::send
1359 /// [`reserve`]: Sender::reserve
1360 /// [`channel`]: channel
1361 /// [`max_capacity`]: Sender::max_capacity
1362 pub fn capacity(&self) -> usize {
1363 self.chan.semaphore().semaphore.available_permits()
1364 }
1365
1366 /// Converts the `Sender` to a [`WeakSender`] that does not count
1367 /// towards RAII semantics, i.e. if all `Sender` instances of the
1368 /// channel were dropped and only `WeakSender` instances remain,
1369 /// the channel is closed.
1370 pub fn downgrade(&self) -> WeakSender<T> {
1371 WeakSender {
1372 chan: self.chan.downgrade(),
1373 }
1374 }
1375
1376 /// Returns the maximum buffer capacity of the channel.
1377 ///
1378 /// The maximum capacity is the buffer capacity initially specified when calling
1379 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1380 /// available buffer capacity: as messages are sent and received, the
1381 /// value returned by [`capacity`] will go up or down, whereas the value
1382 /// returned by `max_capacity` will remain constant.
1383 ///
1384 /// # Examples
1385 ///
1386 /// ```
1387 /// use tokio::sync::mpsc;
1388 ///
1389 /// #[tokio::main]
1390 /// async fn main() {
1391 /// let (tx, _rx) = mpsc::channel::<()>(5);
1392 ///
1393 /// // both max capacity and capacity are the same at first
1394 /// assert_eq!(tx.max_capacity(), 5);
1395 /// assert_eq!(tx.capacity(), 5);
1396 ///
1397 /// // Making a reservation doesn't change the max capacity.
1398 /// let permit = tx.reserve().await.unwrap();
1399 /// assert_eq!(tx.max_capacity(), 5);
1400 /// // but drops the capacity by one
1401 /// assert_eq!(tx.capacity(), 4);
1402 /// }
1403 /// ```
1404 ///
1405 /// [`channel`]: channel
1406 /// [`max_capacity`]: Sender::max_capacity
1407 /// [`capacity`]: Sender::capacity
1408 pub fn max_capacity(&self) -> usize {
1409 self.chan.semaphore().bound
1410 }
1411}
1412
1413impl<T> Clone for Sender<T> {
1414 fn clone(&self) -> Self {
1415 Sender {
1416 chan: self.chan.clone(),
1417 }
1418 }
1419}
1420
1421impl<T> fmt::Debug for Sender<T> {
1422 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1423 fmt&mut DebugStruct<'_, '_>.debug_struct("Sender")
1424 .field(name:"chan", &self.chan)
1425 .finish()
1426 }
1427}
1428
1429impl<T> Clone for WeakSender<T> {
1430 fn clone(&self) -> Self {
1431 WeakSender {
1432 chan: self.chan.clone(),
1433 }
1434 }
1435}
1436
1437impl<T> WeakSender<T> {
1438 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1439 /// if there are other `Sender` instances alive and the channel wasn't
1440 /// previously dropped, otherwise `None` is returned.
1441 pub fn upgrade(&self) -> Option<Sender<T>> {
1442 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1443 }
1444}
1445
1446impl<T> fmt::Debug for WeakSender<T> {
1447 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1448 fmt.debug_struct(name:"WeakSender").finish()
1449 }
1450}
1451
1452// ===== impl Permit =====
1453
1454impl<T> Permit<'_, T> {
1455 /// Sends a value using the reserved capacity.
1456 ///
1457 /// Capacity for the message has already been reserved. The message is sent
1458 /// to the receiver and the permit is consumed. The operation will succeed
1459 /// even if the receiver half has been closed. See [`Receiver::close`] for
1460 /// more details on performing a clean shutdown.
1461 ///
1462 /// [`Receiver::close`]: Receiver::close
1463 ///
1464 /// # Examples
1465 ///
1466 /// ```
1467 /// use tokio::sync::mpsc;
1468 ///
1469 /// #[tokio::main]
1470 /// async fn main() {
1471 /// let (tx, mut rx) = mpsc::channel(1);
1472 ///
1473 /// // Reserve capacity
1474 /// let permit = tx.reserve().await.unwrap();
1475 ///
1476 /// // Trying to send directly on the `tx` will fail due to no
1477 /// // available capacity.
1478 /// assert!(tx.try_send(123).is_err());
1479 ///
1480 /// // Send a message on the permit
1481 /// permit.send(456);
1482 ///
1483 /// // The value sent on the permit is received
1484 /// assert_eq!(rx.recv().await.unwrap(), 456);
1485 /// }
1486 /// ```
1487 pub fn send(self, value: T) {
1488 use std::mem;
1489
1490 self.chan.send(value);
1491
1492 // Avoid the drop logic
1493 mem::forget(self);
1494 }
1495}
1496
1497impl<T> Drop for Permit<'_, T> {
1498 fn drop(&mut self) {
1499 use chan::Semaphore;
1500
1501 let semaphore: &Semaphore = self.chan.semaphore();
1502
1503 // Add the permit back to the semaphore
1504 semaphore.add_permit();
1505
1506 // If this is the last sender for this channel, wake the receiver so
1507 // that it can be notified that the channel is closed.
1508 if semaphore.is_closed() && semaphore.is_idle() {
1509 self.chan.wake_rx();
1510 }
1511 }
1512}
1513
1514impl<T> fmt::Debug for Permit<'_, T> {
1515 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1516 fmt&mut DebugStruct<'_, '_>.debug_struct("Permit")
1517 .field(name:"chan", &self.chan)
1518 .finish()
1519 }
1520}
1521
1522// ===== impl PermitIterator =====
1523
1524impl<'a, T> Iterator for PermitIterator<'a, T> {
1525 type Item = Permit<'a, T>;
1526
1527 fn next(&mut self) -> Option<Self::Item> {
1528 if self.n == 0 {
1529 return None;
1530 }
1531
1532 self.n -= 1;
1533 Some(Permit { chan: self.chan })
1534 }
1535
1536 fn size_hint(&self) -> (usize, Option<usize>) {
1537 let n: usize = self.n;
1538 (n, Some(n))
1539 }
1540}
1541impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1542impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1543
1544impl<T> Drop for PermitIterator<'_, T> {
1545 fn drop(&mut self) {
1546 use chan::Semaphore;
1547
1548 if self.n == 0 {
1549 return;
1550 }
1551
1552 let semaphore: &Semaphore = self.chan.semaphore();
1553
1554 // Add the remaining permits back to the semaphore
1555 semaphore.add_permits(self.n);
1556
1557 // If this is the last sender for this channel, wake the receiver so
1558 // that it can be notified that the channel is closed.
1559 if semaphore.is_closed() && semaphore.is_idle() {
1560 self.chan.wake_rx();
1561 }
1562 }
1563}
1564
1565impl<T> fmt::Debug for PermitIterator<'_, T> {
1566 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1567 fmt&mut DebugStruct<'_, '_>.debug_struct("PermitIterator")
1568 .field("chan", &self.chan)
1569 .field(name:"capacity", &self.n)
1570 .finish()
1571 }
1572}
1573
1574// ===== impl Permit =====
1575
1576impl<T> OwnedPermit<T> {
1577 /// Sends a value using the reserved capacity.
1578 ///
1579 /// Capacity for the message has already been reserved. The message is sent
1580 /// to the receiver and the permit is consumed. The operation will succeed
1581 /// even if the receiver half has been closed. See [`Receiver::close`] for
1582 /// more details on performing a clean shutdown.
1583 ///
1584 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1585 /// the `OwnedPermit` was reserved.
1586 ///
1587 /// [`Receiver::close`]: Receiver::close
1588 ///
1589 /// # Examples
1590 ///
1591 /// ```
1592 /// use tokio::sync::mpsc;
1593 ///
1594 /// #[tokio::main]
1595 /// async fn main() {
1596 /// let (tx, mut rx) = mpsc::channel(1);
1597 ///
1598 /// // Reserve capacity
1599 /// let permit = tx.reserve_owned().await.unwrap();
1600 ///
1601 /// // Send a message on the permit, returning the sender.
1602 /// let tx = permit.send(456);
1603 ///
1604 /// // The value sent on the permit is received
1605 /// assert_eq!(rx.recv().await.unwrap(), 456);
1606 ///
1607 /// // We may now reuse `tx` to send another message.
1608 /// tx.send(789).await.unwrap();
1609 /// }
1610 /// ```
1611 pub fn send(mut self, value: T) -> Sender<T> {
1612 let chan = self.chan.take().unwrap_or_else(|| {
1613 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1614 });
1615 chan.send(value);
1616
1617 Sender { chan }
1618 }
1619
1620 /// Releases the reserved capacity *without* sending a message, returning the
1621 /// [`Sender`].
1622 ///
1623 /// # Examples
1624 ///
1625 /// ```
1626 /// use tokio::sync::mpsc;
1627 ///
1628 /// #[tokio::main]
1629 /// async fn main() {
1630 /// let (tx, rx) = mpsc::channel(1);
1631 ///
1632 /// // Clone the sender and reserve capacity
1633 /// let permit = tx.clone().reserve_owned().await.unwrap();
1634 ///
1635 /// // Trying to send on the original `tx` will fail, since the `permit`
1636 /// // has reserved all the available capacity.
1637 /// assert!(tx.try_send(123).is_err());
1638 ///
1639 /// // Release the permit without sending a message, returning the clone
1640 /// // of the sender.
1641 /// let tx2 = permit.release();
1642 ///
1643 /// // We may now reuse `tx` to send another message.
1644 /// tx.send(789).await.unwrap();
1645 /// # drop(rx); drop(tx2);
1646 /// }
1647 /// ```
1648 ///
1649 /// [`Sender`]: Sender
1650 pub fn release(mut self) -> Sender<T> {
1651 use chan::Semaphore;
1652
1653 let chan = self.chan.take().unwrap_or_else(|| {
1654 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1655 });
1656
1657 // Add the permit back to the semaphore
1658 chan.semaphore().add_permit();
1659 Sender { chan }
1660 }
1661}
1662
1663impl<T> Drop for OwnedPermit<T> {
1664 fn drop(&mut self) {
1665 use chan::Semaphore;
1666
1667 // Are we still holding onto the sender?
1668 if let Some(chan: Tx) = self.chan.take() {
1669 let semaphore: &Semaphore = chan.semaphore();
1670
1671 // Add the permit back to the semaphore
1672 semaphore.add_permit();
1673
1674 // If this `OwnedPermit` is holding the last sender for this
1675 // channel, wake the receiver so that it can be notified that the
1676 // channel is closed.
1677 if semaphore.is_closed() && semaphore.is_idle() {
1678 chan.wake_rx();
1679 }
1680 }
1681
1682 // Otherwise, do nothing.
1683 }
1684}
1685
1686impl<T> fmt::Debug for OwnedPermit<T> {
1687 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1688 fmt&mut DebugStruct<'_, '_>.debug_struct("OwnedPermit")
1689 .field(name:"chan", &self.chan)
1690 .finish()
1691 }
1692}
1693