1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// #[tokio::main]
44/// async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// Owned permit to send one value into the channel.
72///
73/// This is identical to the [`Permit`] type, except that it moves the sender
74/// rather than borrowing it.
75///
76/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
77/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
78/// before generating a message to send.
79///
80/// [`Permit`]: Permit
81/// [`Sender::reserve_owned()`]: Sender::reserve_owned
82/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
83pub struct OwnedPermit<T> {
84 chan: Option<chan::Tx<T, Semaphore>>,
85}
86
87/// Receives values from the associated `Sender`.
88///
89/// Instances are created by the [`channel`] function.
90///
91/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
92///
93/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
94pub struct Receiver<T> {
95 /// The channel receiver.
96 chan: chan::Rx<T, Semaphore>,
97}
98
99/// Creates a bounded mpsc channel for communicating between asynchronous tasks
100/// with backpressure.
101///
102/// The channel will buffer up to the provided number of messages. Once the
103/// buffer is full, attempts to send new messages will wait until a message is
104/// received from the channel. The provided buffer capacity must be at least 1.
105///
106/// All data sent on `Sender` will become available on `Receiver` in the same
107/// order as it was sent.
108///
109/// The `Sender` can be cloned to `send` to the same channel from multiple code
110/// locations. Only one `Receiver` is supported.
111///
112/// If the `Receiver` is disconnected while trying to `send`, the `send` method
113/// will return a `SendError`. Similarly, if `Sender` is disconnected while
114/// trying to `recv`, the `recv` method will return `None`.
115///
116/// # Panics
117///
118/// Panics if the buffer capacity is 0.
119///
120/// # Examples
121///
122/// ```rust
123/// use tokio::sync::mpsc;
124///
125/// #[tokio::main]
126/// async fn main() {
127/// let (tx, mut rx) = mpsc::channel(100);
128///
129/// tokio::spawn(async move {
130/// for i in 0..10 {
131/// if let Err(_) = tx.send(i).await {
132/// println!("receiver dropped");
133/// return;
134/// }
135/// }
136/// });
137///
138/// while let Some(i) = rx.recv().await {
139/// println!("got = {}", i);
140/// }
141/// }
142/// ```
143#[track_caller]
144pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
145 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
146 let semaphore = Semaphore {
147 semaphore: semaphore::Semaphore::new(buffer),
148 bound: buffer,
149 };
150 let (tx, rx) = chan::channel(semaphore);
151
152 let tx = Sender::new(tx);
153 let rx = Receiver::new(rx);
154
155 (tx, rx)
156}
157
158/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
159/// representing the channel bound.
160#[derive(Debug)]
161pub(crate) struct Semaphore {
162 pub(crate) semaphore: semaphore::Semaphore,
163 pub(crate) bound: usize,
164}
165
166impl<T> Receiver<T> {
167 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
168 Receiver { chan }
169 }
170
171 /// Receives the next value for this receiver.
172 ///
173 /// This method returns `None` if the channel has been closed and there are
174 /// no remaining messages in the channel's buffer. This indicates that no
175 /// further values can ever be received from this `Receiver`. The channel is
176 /// closed when all senders have been dropped, or when [`close`] is called.
177 ///
178 /// If there are no messages in the channel's buffer, but the channel has
179 /// not yet been closed, this method will sleep until a message is sent or
180 /// the channel is closed. Note that if [`close`] is called, but there are
181 /// still outstanding [`Permits`] from before it was closed, the channel is
182 /// not considered closed by `recv` until the permits are released.
183 ///
184 /// # Cancel safety
185 ///
186 /// This method is cancel safe. If `recv` is used as the event in a
187 /// [`tokio::select!`](crate::select) statement and some other branch
188 /// completes first, it is guaranteed that no messages were received on this
189 /// channel.
190 ///
191 /// [`close`]: Self::close
192 /// [`Permits`]: struct@crate::sync::mpsc::Permit
193 ///
194 /// # Examples
195 ///
196 /// ```
197 /// use tokio::sync::mpsc;
198 ///
199 /// #[tokio::main]
200 /// async fn main() {
201 /// let (tx, mut rx) = mpsc::channel(100);
202 ///
203 /// tokio::spawn(async move {
204 /// tx.send("hello").await.unwrap();
205 /// });
206 ///
207 /// assert_eq!(Some("hello"), rx.recv().await);
208 /// assert_eq!(None, rx.recv().await);
209 /// }
210 /// ```
211 ///
212 /// Values are buffered:
213 ///
214 /// ```
215 /// use tokio::sync::mpsc;
216 ///
217 /// #[tokio::main]
218 /// async fn main() {
219 /// let (tx, mut rx) = mpsc::channel(100);
220 ///
221 /// tx.send("hello").await.unwrap();
222 /// tx.send("world").await.unwrap();
223 ///
224 /// assert_eq!(Some("hello"), rx.recv().await);
225 /// assert_eq!(Some("world"), rx.recv().await);
226 /// }
227 /// ```
228 pub async fn recv(&mut self) -> Option<T> {
229 use crate::future::poll_fn;
230 poll_fn(|cx| self.chan.recv(cx)).await
231 }
232
233 /// Receives the next values for this receiver and extends `buffer`.
234 ///
235 /// This method extends `buffer` by no more than a fixed number of values
236 /// as specified by `limit`. If `limit` is zero, the function immediately
237 /// returns `0`. The return value is the number of values added to `buffer`.
238 ///
239 /// For `limit > 0`, if there are no messages in the channel's queue, but
240 /// the channel has not yet been closed, this method will sleep until a
241 /// message is sent or the channel is closed. Note that if [`close`] is
242 /// called, but there are still outstanding [`Permits`] from before it was
243 /// closed, the channel is not considered closed by `recv_many` until the
244 /// permits are released.
245 ///
246 /// For non-zero values of `limit`, this method will never return `0` unless
247 /// the channel has been closed and there are no remaining messages in the
248 /// channel's queue. This indicates that no further values can ever be
249 /// received from this `Receiver`. The channel is closed when all senders
250 /// have been dropped, or when [`close`] is called.
251 ///
252 /// The capacity of `buffer` is increased as needed.
253 ///
254 /// # Cancel safety
255 ///
256 /// This method is cancel safe. If `recv_many` is used as the event in a
257 /// [`tokio::select!`](crate::select) statement and some other branch
258 /// completes first, it is guaranteed that no messages were received on this
259 /// channel.
260 ///
261 /// [`close`]: Self::close
262 /// [`Permits`]: struct@crate::sync::mpsc::Permit
263 ///
264 /// # Examples
265 ///
266 /// ```
267 /// use tokio::sync::mpsc;
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
272 /// let limit = 2;
273 /// let (tx, mut rx) = mpsc::channel(100);
274 /// let tx2 = tx.clone();
275 /// tx2.send("first").await.unwrap();
276 /// tx2.send("second").await.unwrap();
277 /// tx2.send("third").await.unwrap();
278 ///
279 /// // Call `recv_many` to receive up to `limit` (2) values.
280 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
281 /// assert_eq!(vec!["first", "second"], buffer);
282 ///
283 /// // If the buffer is full, the next call to `recv_many`
284 /// // reserves additional capacity.
285 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
286 ///
287 /// tokio::spawn(async move {
288 /// tx.send("fourth").await.unwrap();
289 /// });
290 ///
291 /// // 'tx' is dropped, but `recv_many`
292 /// // is guaranteed not to return 0 as the channel
293 /// // is not yet closed.
294 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
295 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
296 ///
297 /// // Once the last sender is dropped, the channel is
298 /// // closed and `recv_many` returns 0, capacity unchanged.
299 /// drop(tx2);
300 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
301 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
302 /// }
303 /// ```
304 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
305 use crate::future::poll_fn;
306 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
307 }
308
309 /// Tries to receive the next value for this receiver.
310 ///
311 /// This method returns the [`Empty`] error if the channel is currently
312 /// empty, but there are still outstanding [senders] or [permits].
313 ///
314 /// This method returns the [`Disconnected`] error if the channel is
315 /// currently empty, and there are no outstanding [senders] or [permits].
316 ///
317 /// Unlike the [`poll_recv`] method, this method will never return an
318 /// [`Empty`] error spuriously.
319 ///
320 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
321 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
322 /// [`poll_recv`]: Self::poll_recv
323 /// [senders]: crate::sync::mpsc::Sender
324 /// [permits]: crate::sync::mpsc::Permit
325 ///
326 /// # Examples
327 ///
328 /// ```
329 /// use tokio::sync::mpsc;
330 /// use tokio::sync::mpsc::error::TryRecvError;
331 ///
332 /// #[tokio::main]
333 /// async fn main() {
334 /// let (tx, mut rx) = mpsc::channel(100);
335 ///
336 /// tx.send("hello").await.unwrap();
337 ///
338 /// assert_eq!(Ok("hello"), rx.try_recv());
339 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
340 ///
341 /// tx.send("hello").await.unwrap();
342 /// // Drop the last sender, closing the channel.
343 /// drop(tx);
344 ///
345 /// assert_eq!(Ok("hello"), rx.try_recv());
346 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
347 /// }
348 /// ```
349 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
350 self.chan.try_recv()
351 }
352
353 /// Blocking receive to call outside of asynchronous contexts.
354 ///
355 /// This method returns `None` if the channel has been closed and there are
356 /// no remaining messages in the channel's buffer. This indicates that no
357 /// further values can ever be received from this `Receiver`. The channel is
358 /// closed when all senders have been dropped, or when [`close`] is called.
359 ///
360 /// If there are no messages in the channel's buffer, but the channel has
361 /// not yet been closed, this method will block until a message is sent or
362 /// the channel is closed.
363 ///
364 /// This method is intended for use cases where you are sending from
365 /// asynchronous code to synchronous code, and will work even if the sender
366 /// is not using [`blocking_send`] to send the message.
367 ///
368 /// Note that if [`close`] is called, but there are still outstanding
369 /// [`Permits`] from before it was closed, the channel is not considered
370 /// closed by `blocking_recv` until the permits are released.
371 ///
372 /// [`close`]: Self::close
373 /// [`Permits`]: struct@crate::sync::mpsc::Permit
374 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
375 ///
376 /// # Panics
377 ///
378 /// This function panics if called within an asynchronous execution
379 /// context.
380 ///
381 /// # Examples
382 ///
383 /// ```
384 /// use std::thread;
385 /// use tokio::runtime::Runtime;
386 /// use tokio::sync::mpsc;
387 ///
388 /// fn main() {
389 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
390 ///
391 /// let sync_code = thread::spawn(move || {
392 /// assert_eq!(Some(10), rx.blocking_recv());
393 /// });
394 ///
395 /// Runtime::new()
396 /// .unwrap()
397 /// .block_on(async move {
398 /// let _ = tx.send(10).await;
399 /// });
400 /// sync_code.join().unwrap()
401 /// }
402 /// ```
403 #[track_caller]
404 #[cfg(feature = "sync")]
405 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
406 pub fn blocking_recv(&mut self) -> Option<T> {
407 crate::future::block_on(self.recv())
408 }
409
410 /// Closes the receiving half of a channel without dropping it.
411 ///
412 /// This prevents any further messages from being sent on the channel while
413 /// still enabling the receiver to drain messages that are buffered. Any
414 /// outstanding [`Permit`] values will still be able to send messages.
415 ///
416 /// To guarantee that no messages are dropped, after calling `close()`,
417 /// `recv()` must be called until `None` is returned. If there are
418 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
419 /// not return `None` until those are released.
420 ///
421 /// [`Permit`]: Permit
422 /// [`OwnedPermit`]: OwnedPermit
423 ///
424 /// # Examples
425 ///
426 /// ```
427 /// use tokio::sync::mpsc;
428 ///
429 /// #[tokio::main]
430 /// async fn main() {
431 /// let (tx, mut rx) = mpsc::channel(20);
432 ///
433 /// tokio::spawn(async move {
434 /// let mut i = 0;
435 /// while let Ok(permit) = tx.reserve().await {
436 /// permit.send(i);
437 /// i += 1;
438 /// }
439 /// });
440 ///
441 /// rx.close();
442 ///
443 /// while let Some(msg) = rx.recv().await {
444 /// println!("got {}", msg);
445 /// }
446 ///
447 /// // Channel closed and no messages are lost.
448 /// }
449 /// ```
450 pub fn close(&mut self) {
451 self.chan.close();
452 }
453
454 /// Polls to receive the next message on this channel.
455 ///
456 /// This method returns:
457 ///
458 /// * `Poll::Pending` if no messages are available but the channel is not
459 /// closed, or if a spurious failure happens.
460 /// * `Poll::Ready(Some(message))` if a message is available.
461 /// * `Poll::Ready(None)` if the channel has been closed and all messages
462 /// sent before it was closed have been received.
463 ///
464 /// When the method returns `Poll::Pending`, the `Waker` in the provided
465 /// `Context` is scheduled to receive a wakeup when a message is sent on any
466 /// receiver, or when the channel is closed. Note that on multiple calls to
467 /// `poll_recv`, only the `Waker` from the `Context` passed to the most
468 /// recent call is scheduled to receive a wakeup.
469 ///
470 /// If this method returns `Poll::Pending` due to a spurious failure, then
471 /// the `Waker` will be notified when the situation causing the spurious
472 /// failure has been resolved. Note that receiving such a wakeup does not
473 /// guarantee that the next call will succeed — it could fail with another
474 /// spurious failure.
475 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
476 self.chan.recv(cx)
477 }
478}
479
480impl<T> fmt::Debug for Receiver<T> {
481 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
482 fmt.debug_struct("Receiver")
483 .field("chan", &self.chan)
484 .finish()
485 }
486}
487
488impl<T> Unpin for Receiver<T> {}
489
490impl<T> Sender<T> {
491 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
492 Sender { chan }
493 }
494
495 /// Sends a value, waiting until there is capacity.
496 ///
497 /// A successful send occurs when it is determined that the other end of the
498 /// channel has not hung up already. An unsuccessful send would be one where
499 /// the corresponding receiver has already been closed. Note that a return
500 /// value of `Err` means that the data will never be received, but a return
501 /// value of `Ok` does not mean that the data will be received. It is
502 /// possible for the corresponding receiver to hang up immediately after
503 /// this function returns `Ok`.
504 ///
505 /// # Errors
506 ///
507 /// If the receive half of the channel is closed, either due to [`close`]
508 /// being called or the [`Receiver`] handle dropping, the function returns
509 /// an error. The error includes the value passed to `send`.
510 ///
511 /// [`close`]: Receiver::close
512 /// [`Receiver`]: Receiver
513 ///
514 /// # Cancel safety
515 ///
516 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
517 /// statement and some other branch completes first, then it is guaranteed
518 /// that the message was not sent. **However, in that case, the message
519 /// is dropped and will be lost.**
520 ///
521 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
522 /// capacity, then use the returned [`Permit`] to send the message.
523 ///
524 /// This channel uses a queue to ensure that calls to `send` and `reserve`
525 /// complete in the order they were requested. Cancelling a call to
526 /// `send` makes you lose your place in the queue.
527 ///
528 /// # Examples
529 ///
530 /// In the following example, each call to `send` will block until the
531 /// previously sent value was received.
532 ///
533 /// ```rust
534 /// use tokio::sync::mpsc;
535 ///
536 /// #[tokio::main]
537 /// async fn main() {
538 /// let (tx, mut rx) = mpsc::channel(1);
539 ///
540 /// tokio::spawn(async move {
541 /// for i in 0..10 {
542 /// if let Err(_) = tx.send(i).await {
543 /// println!("receiver dropped");
544 /// return;
545 /// }
546 /// }
547 /// });
548 ///
549 /// while let Some(i) = rx.recv().await {
550 /// println!("got = {}", i);
551 /// }
552 /// }
553 /// ```
554 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
555 match self.reserve().await {
556 Ok(permit) => {
557 permit.send(value);
558 Ok(())
559 }
560 Err(_) => Err(SendError(value)),
561 }
562 }
563
564 /// Completes when the receiver has dropped.
565 ///
566 /// This allows the producers to get notified when interest in the produced
567 /// values is canceled and immediately stop doing work.
568 ///
569 /// # Cancel safety
570 ///
571 /// This method is cancel safe. Once the channel is closed, it stays closed
572 /// forever and all future calls to `closed` will return immediately.
573 ///
574 /// # Examples
575 ///
576 /// ```
577 /// use tokio::sync::mpsc;
578 ///
579 /// #[tokio::main]
580 /// async fn main() {
581 /// let (tx1, rx) = mpsc::channel::<()>(1);
582 /// let tx2 = tx1.clone();
583 /// let tx3 = tx1.clone();
584 /// let tx4 = tx1.clone();
585 /// let tx5 = tx1.clone();
586 /// tokio::spawn(async move {
587 /// drop(rx);
588 /// });
589 ///
590 /// futures::join!(
591 /// tx1.closed(),
592 /// tx2.closed(),
593 /// tx3.closed(),
594 /// tx4.closed(),
595 /// tx5.closed()
596 /// );
597 /// println!("Receiver dropped");
598 /// }
599 /// ```
600 pub async fn closed(&self) {
601 self.chan.closed().await;
602 }
603
604 /// Attempts to immediately send a message on this `Sender`
605 ///
606 /// This method differs from [`send`] by returning immediately if the channel's
607 /// buffer is full or no receiver is waiting to acquire some data. Compared
608 /// with [`send`], this function has two failure cases instead of one (one for
609 /// disconnection, one for a full buffer).
610 ///
611 /// # Errors
612 ///
613 /// If the channel capacity has been reached, i.e., the channel has `n`
614 /// buffered values where `n` is the argument passed to [`channel`], then an
615 /// error is returned.
616 ///
617 /// If the receive half of the channel is closed, either due to [`close`]
618 /// being called or the [`Receiver`] handle dropping, the function returns
619 /// an error. The error includes the value passed to `send`.
620 ///
621 /// [`send`]: Sender::send
622 /// [`channel`]: channel
623 /// [`close`]: Receiver::close
624 ///
625 /// # Examples
626 ///
627 /// ```
628 /// use tokio::sync::mpsc;
629 ///
630 /// #[tokio::main]
631 /// async fn main() {
632 /// // Create a channel with buffer size 1
633 /// let (tx1, mut rx) = mpsc::channel(1);
634 /// let tx2 = tx1.clone();
635 ///
636 /// tokio::spawn(async move {
637 /// tx1.send(1).await.unwrap();
638 /// tx1.send(2).await.unwrap();
639 /// // task waits until the receiver receives a value.
640 /// });
641 ///
642 /// tokio::spawn(async move {
643 /// // This will return an error and send
644 /// // no message if the buffer is full
645 /// let _ = tx2.try_send(3);
646 /// });
647 ///
648 /// let mut msg;
649 /// msg = rx.recv().await.unwrap();
650 /// println!("message {} received", msg);
651 ///
652 /// msg = rx.recv().await.unwrap();
653 /// println!("message {} received", msg);
654 ///
655 /// // Third message may have never been sent
656 /// match rx.recv().await {
657 /// Some(msg) => println!("message {} received", msg),
658 /// None => println!("the third message was never sent"),
659 /// }
660 /// }
661 /// ```
662 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
663 match self.chan.semaphore().semaphore.try_acquire(1) {
664 Ok(()) => {}
665 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
666 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
667 }
668
669 // Send the message
670 self.chan.send(message);
671 Ok(())
672 }
673
674 /// Sends a value, waiting until there is capacity, but only for a limited time.
675 ///
676 /// Shares the same success and error conditions as [`send`], adding one more
677 /// condition for an unsuccessful send, which is when the provided timeout has
678 /// elapsed, and there is no capacity available.
679 ///
680 /// [`send`]: Sender::send
681 ///
682 /// # Errors
683 ///
684 /// If the receive half of the channel is closed, either due to [`close`]
685 /// being called or the [`Receiver`] having been dropped,
686 /// the function returns an error. The error includes the value passed to `send`.
687 ///
688 /// [`close`]: Receiver::close
689 /// [`Receiver`]: Receiver
690 ///
691 /// # Panics
692 ///
693 /// This function panics if it is called outside the context of a Tokio
694 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
695 ///
696 /// # Examples
697 ///
698 /// In the following example, each call to `send_timeout` will block until the
699 /// previously sent value was received, unless the timeout has elapsed.
700 ///
701 /// ```rust
702 /// use tokio::sync::mpsc;
703 /// use tokio::time::{sleep, Duration};
704 ///
705 /// #[tokio::main]
706 /// async fn main() {
707 /// let (tx, mut rx) = mpsc::channel(1);
708 ///
709 /// tokio::spawn(async move {
710 /// for i in 0..10 {
711 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
712 /// println!("send error: #{:?}", e);
713 /// return;
714 /// }
715 /// }
716 /// });
717 ///
718 /// while let Some(i) = rx.recv().await {
719 /// println!("got = {}", i);
720 /// sleep(Duration::from_millis(200)).await;
721 /// }
722 /// }
723 /// ```
724 #[cfg(feature = "time")]
725 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
726 pub async fn send_timeout(
727 &self,
728 value: T,
729 timeout: Duration,
730 ) -> Result<(), SendTimeoutError<T>> {
731 let permit = match crate::time::timeout(timeout, self.reserve()).await {
732 Err(_) => {
733 return Err(SendTimeoutError::Timeout(value));
734 }
735 Ok(Err(_)) => {
736 return Err(SendTimeoutError::Closed(value));
737 }
738 Ok(Ok(permit)) => permit,
739 };
740
741 permit.send(value);
742 Ok(())
743 }
744
745 /// Blocking send to call outside of asynchronous contexts.
746 ///
747 /// This method is intended for use cases where you are sending from
748 /// synchronous code to asynchronous code, and will work even if the
749 /// receiver is not using [`blocking_recv`] to receive the message.
750 ///
751 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
752 ///
753 /// # Panics
754 ///
755 /// This function panics if called within an asynchronous execution
756 /// context.
757 ///
758 /// # Examples
759 ///
760 /// ```
761 /// use std::thread;
762 /// use tokio::runtime::Runtime;
763 /// use tokio::sync::mpsc;
764 ///
765 /// fn main() {
766 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
767 ///
768 /// let sync_code = thread::spawn(move || {
769 /// tx.blocking_send(10).unwrap();
770 /// });
771 ///
772 /// Runtime::new().unwrap().block_on(async move {
773 /// assert_eq!(Some(10), rx.recv().await);
774 /// });
775 /// sync_code.join().unwrap()
776 /// }
777 /// ```
778 #[track_caller]
779 #[cfg(feature = "sync")]
780 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
781 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
782 crate::future::block_on(self.send(value))
783 }
784
785 /// Checks if the channel has been closed. This happens when the
786 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
787 /// called.
788 ///
789 /// [`Receiver`]: crate::sync::mpsc::Receiver
790 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
791 ///
792 /// ```
793 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
794 /// assert!(!tx.is_closed());
795 ///
796 /// let tx2 = tx.clone();
797 /// assert!(!tx2.is_closed());
798 ///
799 /// drop(rx);
800 /// assert!(tx.is_closed());
801 /// assert!(tx2.is_closed());
802 /// ```
803 pub fn is_closed(&self) -> bool {
804 self.chan.is_closed()
805 }
806
807 /// Waits for channel capacity. Once capacity to send one message is
808 /// available, it is reserved for the caller.
809 ///
810 /// If the channel is full, the function waits for the number of unreceived
811 /// messages to become less than the channel capacity. Capacity to send one
812 /// message is reserved for the caller. A [`Permit`] is returned to track
813 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
814 /// reserved capacity.
815 ///
816 /// Dropping [`Permit`] without sending a message releases the capacity back
817 /// to the channel.
818 ///
819 /// [`Permit`]: Permit
820 /// [`send`]: Permit::send
821 ///
822 /// # Cancel safety
823 ///
824 /// This channel uses a queue to ensure that calls to `send` and `reserve`
825 /// complete in the order they were requested. Cancelling a call to
826 /// `reserve` makes you lose your place in the queue.
827 ///
828 /// # Examples
829 ///
830 /// ```
831 /// use tokio::sync::mpsc;
832 ///
833 /// #[tokio::main]
834 /// async fn main() {
835 /// let (tx, mut rx) = mpsc::channel(1);
836 ///
837 /// // Reserve capacity
838 /// let permit = tx.reserve().await.unwrap();
839 ///
840 /// // Trying to send directly on the `tx` will fail due to no
841 /// // available capacity.
842 /// assert!(tx.try_send(123).is_err());
843 ///
844 /// // Sending on the permit succeeds
845 /// permit.send(456);
846 ///
847 /// // The value sent on the permit is received
848 /// assert_eq!(rx.recv().await.unwrap(), 456);
849 /// }
850 /// ```
851 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
852 self.reserve_inner().await?;
853 Ok(Permit { chan: &self.chan })
854 }
855
856 /// Waits for channel capacity, moving the `Sender` and returning an owned
857 /// permit. Once capacity to send one message is available, it is reserved
858 /// for the caller.
859 ///
860 /// This moves the sender _by value_, and returns an owned permit that can
861 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
862 /// this method may be used in cases where the permit must be valid for the
863 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
864 /// essentially a reference count increment, comparable to [`Arc::clone`]),
865 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
866 /// moved, it can be cloned prior to calling `reserve_owned`.
867 ///
868 /// If the channel is full, the function waits for the number of unreceived
869 /// messages to become less than the channel capacity. Capacity to send one
870 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
871 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
872 /// consumes the reserved capacity.
873 ///
874 /// Dropping the [`OwnedPermit`] without sending a message releases the
875 /// capacity back to the channel.
876 ///
877 /// # Cancel safety
878 ///
879 /// This channel uses a queue to ensure that calls to `send` and `reserve`
880 /// complete in the order they were requested. Cancelling a call to
881 /// `reserve_owned` makes you lose your place in the queue.
882 ///
883 /// # Examples
884 /// Sending a message using an [`OwnedPermit`]:
885 /// ```
886 /// use tokio::sync::mpsc;
887 ///
888 /// #[tokio::main]
889 /// async fn main() {
890 /// let (tx, mut rx) = mpsc::channel(1);
891 ///
892 /// // Reserve capacity, moving the sender.
893 /// let permit = tx.reserve_owned().await.unwrap();
894 ///
895 /// // Send a message, consuming the permit and returning
896 /// // the moved sender.
897 /// let tx = permit.send(123);
898 ///
899 /// // The value sent on the permit is received.
900 /// assert_eq!(rx.recv().await.unwrap(), 123);
901 ///
902 /// // The sender can now be used again.
903 /// tx.send(456).await.unwrap();
904 /// }
905 /// ```
906 ///
907 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
908 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
909 ///
910 /// ```
911 /// use tokio::sync::mpsc;
912 ///
913 /// #[tokio::main]
914 /// async fn main() {
915 /// let (tx, mut rx) = mpsc::channel(1);
916 ///
917 /// // Clone the sender and reserve capacity.
918 /// let permit = tx.clone().reserve_owned().await.unwrap();
919 ///
920 /// // Trying to send directly on the `tx` will fail due to no
921 /// // available capacity.
922 /// assert!(tx.try_send(123).is_err());
923 ///
924 /// // Sending on the permit succeeds.
925 /// permit.send(456);
926 ///
927 /// // The value sent on the permit is received
928 /// assert_eq!(rx.recv().await.unwrap(), 456);
929 /// }
930 /// ```
931 ///
932 /// [`Sender::reserve`]: Sender::reserve
933 /// [`OwnedPermit`]: OwnedPermit
934 /// [`send`]: OwnedPermit::send
935 /// [`Arc::clone`]: std::sync::Arc::clone
936 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
937 self.reserve_inner().await?;
938 Ok(OwnedPermit {
939 chan: Some(self.chan),
940 })
941 }
942
943 async fn reserve_inner(&self) -> Result<(), SendError<()>> {
944 crate::trace::async_trace_leaf().await;
945
946 match self.chan.semaphore().semaphore.acquire(1).await {
947 Ok(()) => Ok(()),
948 Err(_) => Err(SendError(())),
949 }
950 }
951
952 /// Tries to acquire a slot in the channel without waiting for the slot to become
953 /// available.
954 ///
955 /// If the channel is full this function will return [`TrySendError`], otherwise
956 /// if there is a slot available it will return a [`Permit`] that will then allow you
957 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
958 /// [`reserve`] except it does not await for the slot to become available.
959 ///
960 /// Dropping [`Permit`] without sending a message releases the capacity back
961 /// to the channel.
962 ///
963 /// [`Permit`]: Permit
964 /// [`send`]: Permit::send
965 /// [`reserve`]: Sender::reserve
966 ///
967 /// # Examples
968 ///
969 /// ```
970 /// use tokio::sync::mpsc;
971 ///
972 /// #[tokio::main]
973 /// async fn main() {
974 /// let (tx, mut rx) = mpsc::channel(1);
975 ///
976 /// // Reserve capacity
977 /// let permit = tx.try_reserve().unwrap();
978 ///
979 /// // Trying to send directly on the `tx` will fail due to no
980 /// // available capacity.
981 /// assert!(tx.try_send(123).is_err());
982 ///
983 /// // Trying to reserve an additional slot on the `tx` will
984 /// // fail because there is no capacity.
985 /// assert!(tx.try_reserve().is_err());
986 ///
987 /// // Sending on the permit succeeds
988 /// permit.send(456);
989 ///
990 /// // The value sent on the permit is received
991 /// assert_eq!(rx.recv().await.unwrap(), 456);
992 ///
993 /// }
994 /// ```
995 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
996 match self.chan.semaphore().semaphore.try_acquire(1) {
997 Ok(()) => {}
998 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
999 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1000 }
1001
1002 Ok(Permit { chan: &self.chan })
1003 }
1004
1005 /// Tries to acquire a slot in the channel without waiting for the slot to become
1006 /// available, returning an owned permit.
1007 ///
1008 /// This moves the sender _by value_, and returns an owned permit that can
1009 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1010 /// this method may be used in cases where the permit must be valid for the
1011 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1012 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1013 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1014 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1015 ///
1016 /// If the channel is full this function will return a [`TrySendError`].
1017 /// Since the sender is taken by value, the `TrySendError` returned in this
1018 /// case contains the sender, so that it may be used again. Otherwise, if
1019 /// there is a slot available, this method will return an [`OwnedPermit`]
1020 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1021 /// This function is similar to [`reserve_owned`] except it does not await
1022 /// for the slot to become available.
1023 ///
1024 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1025 /// to the channel.
1026 ///
1027 /// [`OwnedPermit`]: OwnedPermit
1028 /// [`send`]: OwnedPermit::send
1029 /// [`reserve_owned`]: Sender::reserve_owned
1030 /// [`Arc::clone`]: std::sync::Arc::clone
1031 ///
1032 /// # Examples
1033 ///
1034 /// ```
1035 /// use tokio::sync::mpsc;
1036 ///
1037 /// #[tokio::main]
1038 /// async fn main() {
1039 /// let (tx, mut rx) = mpsc::channel(1);
1040 ///
1041 /// // Reserve capacity
1042 /// let permit = tx.clone().try_reserve_owned().unwrap();
1043 ///
1044 /// // Trying to send directly on the `tx` will fail due to no
1045 /// // available capacity.
1046 /// assert!(tx.try_send(123).is_err());
1047 ///
1048 /// // Trying to reserve an additional slot on the `tx` will
1049 /// // fail because there is no capacity.
1050 /// assert!(tx.try_reserve().is_err());
1051 ///
1052 /// // Sending on the permit succeeds
1053 /// permit.send(456);
1054 ///
1055 /// // The value sent on the permit is received
1056 /// assert_eq!(rx.recv().await.unwrap(), 456);
1057 ///
1058 /// }
1059 /// ```
1060 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1061 match self.chan.semaphore().semaphore.try_acquire(1) {
1062 Ok(()) => {}
1063 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1064 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1065 }
1066
1067 Ok(OwnedPermit {
1068 chan: Some(self.chan),
1069 })
1070 }
1071
1072 /// Returns `true` if senders belong to the same channel.
1073 ///
1074 /// # Examples
1075 ///
1076 /// ```
1077 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1078 /// let tx2 = tx.clone();
1079 /// assert!(tx.same_channel(&tx2));
1080 ///
1081 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1082 /// assert!(!tx3.same_channel(&tx2));
1083 /// ```
1084 pub fn same_channel(&self, other: &Self) -> bool {
1085 self.chan.same_channel(&other.chan)
1086 }
1087
1088 /// Returns the current capacity of the channel.
1089 ///
1090 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1091 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1092 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1093 /// specified when calling [`channel`]
1094 ///
1095 /// # Examples
1096 ///
1097 /// ```
1098 /// use tokio::sync::mpsc;
1099 ///
1100 /// #[tokio::main]
1101 /// async fn main() {
1102 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1103 ///
1104 /// assert_eq!(tx.capacity(), 5);
1105 ///
1106 /// // Making a reservation drops the capacity by one.
1107 /// let permit = tx.reserve().await.unwrap();
1108 /// assert_eq!(tx.capacity(), 4);
1109 ///
1110 /// // Sending and receiving a value increases the capacity by one.
1111 /// permit.send(());
1112 /// rx.recv().await.unwrap();
1113 /// assert_eq!(tx.capacity(), 5);
1114 /// }
1115 /// ```
1116 ///
1117 /// [`send`]: Sender::send
1118 /// [`reserve`]: Sender::reserve
1119 /// [`channel`]: channel
1120 /// [`max_capacity`]: Sender::max_capacity
1121 pub fn capacity(&self) -> usize {
1122 self.chan.semaphore().semaphore.available_permits()
1123 }
1124
1125 /// Converts the `Sender` to a [`WeakSender`] that does not count
1126 /// towards RAII semantics, i.e. if all `Sender` instances of the
1127 /// channel were dropped and only `WeakSender` instances remain,
1128 /// the channel is closed.
1129 pub fn downgrade(&self) -> WeakSender<T> {
1130 WeakSender {
1131 chan: self.chan.downgrade(),
1132 }
1133 }
1134
1135 /// Returns the maximum buffer capacity of the channel.
1136 ///
1137 /// The maximum capacity is the buffer capacity initially specified when calling
1138 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1139 /// available buffer capacity: as messages are sent and received, the
1140 /// value returned by [`capacity`] will go up or down, whereas the value
1141 /// returned by `max_capacity` will remain constant.
1142 ///
1143 /// # Examples
1144 ///
1145 /// ```
1146 /// use tokio::sync::mpsc;
1147 ///
1148 /// #[tokio::main]
1149 /// async fn main() {
1150 /// let (tx, _rx) = mpsc::channel::<()>(5);
1151 ///
1152 /// // both max capacity and capacity are the same at first
1153 /// assert_eq!(tx.max_capacity(), 5);
1154 /// assert_eq!(tx.capacity(), 5);
1155 ///
1156 /// // Making a reservation doesn't change the max capacity.
1157 /// let permit = tx.reserve().await.unwrap();
1158 /// assert_eq!(tx.max_capacity(), 5);
1159 /// // but drops the capacity by one
1160 /// assert_eq!(tx.capacity(), 4);
1161 /// }
1162 /// ```
1163 ///
1164 /// [`channel`]: channel
1165 /// [`max_capacity`]: Sender::max_capacity
1166 /// [`capacity`]: Sender::capacity
1167 pub fn max_capacity(&self) -> usize {
1168 self.chan.semaphore().bound
1169 }
1170}
1171
1172impl<T> Clone for Sender<T> {
1173 fn clone(&self) -> Self {
1174 Sender {
1175 chan: self.chan.clone(),
1176 }
1177 }
1178}
1179
1180impl<T> fmt::Debug for Sender<T> {
1181 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1182 fmt.debug_struct("Sender")
1183 .field("chan", &self.chan)
1184 .finish()
1185 }
1186}
1187
1188impl<T> Clone for WeakSender<T> {
1189 fn clone(&self) -> Self {
1190 WeakSender {
1191 chan: self.chan.clone(),
1192 }
1193 }
1194}
1195
1196impl<T> WeakSender<T> {
1197 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1198 /// if there are other `Sender` instances alive and the channel wasn't
1199 /// previously dropped, otherwise `None` is returned.
1200 pub fn upgrade(&self) -> Option<Sender<T>> {
1201 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1202 }
1203}
1204
1205impl<T> fmt::Debug for WeakSender<T> {
1206 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1207 fmt.debug_struct("WeakSender").finish()
1208 }
1209}
1210
1211// ===== impl Permit =====
1212
1213impl<T> Permit<'_, T> {
1214 /// Sends a value using the reserved capacity.
1215 ///
1216 /// Capacity for the message has already been reserved. The message is sent
1217 /// to the receiver and the permit is consumed. The operation will succeed
1218 /// even if the receiver half has been closed. See [`Receiver::close`] for
1219 /// more details on performing a clean shutdown.
1220 ///
1221 /// [`Receiver::close`]: Receiver::close
1222 ///
1223 /// # Examples
1224 ///
1225 /// ```
1226 /// use tokio::sync::mpsc;
1227 ///
1228 /// #[tokio::main]
1229 /// async fn main() {
1230 /// let (tx, mut rx) = mpsc::channel(1);
1231 ///
1232 /// // Reserve capacity
1233 /// let permit = tx.reserve().await.unwrap();
1234 ///
1235 /// // Trying to send directly on the `tx` will fail due to no
1236 /// // available capacity.
1237 /// assert!(tx.try_send(123).is_err());
1238 ///
1239 /// // Send a message on the permit
1240 /// permit.send(456);
1241 ///
1242 /// // The value sent on the permit is received
1243 /// assert_eq!(rx.recv().await.unwrap(), 456);
1244 /// }
1245 /// ```
1246 pub fn send(self, value: T) {
1247 use std::mem;
1248
1249 self.chan.send(value);
1250
1251 // Avoid the drop logic
1252 mem::forget(self);
1253 }
1254}
1255
1256impl<T> Drop for Permit<'_, T> {
1257 fn drop(&mut self) {
1258 use chan::Semaphore;
1259
1260 let semaphore = self.chan.semaphore();
1261
1262 // Add the permit back to the semaphore
1263 semaphore.add_permit();
1264
1265 // If this is the last sender for this channel, wake the receiver so
1266 // that it can be notified that the channel is closed.
1267 if semaphore.is_closed() && semaphore.is_idle() {
1268 self.chan.wake_rx();
1269 }
1270 }
1271}
1272
1273impl<T> fmt::Debug for Permit<'_, T> {
1274 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1275 fmt.debug_struct("Permit")
1276 .field("chan", &self.chan)
1277 .finish()
1278 }
1279}
1280
1281// ===== impl Permit =====
1282
1283impl<T> OwnedPermit<T> {
1284 /// Sends a value using the reserved capacity.
1285 ///
1286 /// Capacity for the message has already been reserved. The message is sent
1287 /// to the receiver and the permit is consumed. The operation will succeed
1288 /// even if the receiver half has been closed. See [`Receiver::close`] for
1289 /// more details on performing a clean shutdown.
1290 ///
1291 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1292 /// the `OwnedPermit` was reserved.
1293 ///
1294 /// [`Receiver::close`]: Receiver::close
1295 ///
1296 /// # Examples
1297 ///
1298 /// ```
1299 /// use tokio::sync::mpsc;
1300 ///
1301 /// #[tokio::main]
1302 /// async fn main() {
1303 /// let (tx, mut rx) = mpsc::channel(1);
1304 ///
1305 /// // Reserve capacity
1306 /// let permit = tx.reserve_owned().await.unwrap();
1307 ///
1308 /// // Send a message on the permit, returning the sender.
1309 /// let tx = permit.send(456);
1310 ///
1311 /// // The value sent on the permit is received
1312 /// assert_eq!(rx.recv().await.unwrap(), 456);
1313 ///
1314 /// // We may now reuse `tx` to send another message.
1315 /// tx.send(789).await.unwrap();
1316 /// }
1317 /// ```
1318 pub fn send(mut self, value: T) -> Sender<T> {
1319 let chan = self.chan.take().unwrap_or_else(|| {
1320 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1321 });
1322 chan.send(value);
1323
1324 Sender { chan }
1325 }
1326
1327 /// Releases the reserved capacity *without* sending a message, returning the
1328 /// [`Sender`].
1329 ///
1330 /// # Examples
1331 ///
1332 /// ```
1333 /// use tokio::sync::mpsc;
1334 ///
1335 /// #[tokio::main]
1336 /// async fn main() {
1337 /// let (tx, rx) = mpsc::channel(1);
1338 ///
1339 /// // Clone the sender and reserve capacity
1340 /// let permit = tx.clone().reserve_owned().await.unwrap();
1341 ///
1342 /// // Trying to send on the original `tx` will fail, since the `permit`
1343 /// // has reserved all the available capacity.
1344 /// assert!(tx.try_send(123).is_err());
1345 ///
1346 /// // Release the permit without sending a message, returning the clone
1347 /// // of the sender.
1348 /// let tx2 = permit.release();
1349 ///
1350 /// // We may now reuse `tx` to send another message.
1351 /// tx.send(789).await.unwrap();
1352 /// # drop(rx); drop(tx2);
1353 /// }
1354 /// ```
1355 ///
1356 /// [`Sender`]: Sender
1357 pub fn release(mut self) -> Sender<T> {
1358 use chan::Semaphore;
1359
1360 let chan = self.chan.take().unwrap_or_else(|| {
1361 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1362 });
1363
1364 // Add the permit back to the semaphore
1365 chan.semaphore().add_permit();
1366 Sender { chan }
1367 }
1368}
1369
1370impl<T> Drop for OwnedPermit<T> {
1371 fn drop(&mut self) {
1372 use chan::Semaphore;
1373
1374 // Are we still holding onto the sender?
1375 if let Some(chan) = self.chan.take() {
1376 let semaphore = chan.semaphore();
1377
1378 // Add the permit back to the semaphore
1379 semaphore.add_permit();
1380
1381 // If this `OwnedPermit` is holding the last sender for this
1382 // channel, wake the receiver so that it can be notified that the
1383 // channel is closed.
1384 if semaphore.is_closed() && semaphore.is_idle() {
1385 chan.wake_rx();
1386 }
1387 }
1388
1389 // Otherwise, do nothing.
1390 }
1391}
1392
1393impl<T> fmt::Debug for OwnedPermit<T> {
1394 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1395 fmt.debug_struct("OwnedPermit")
1396 .field("chan", &self.chan)
1397 .finish()
1398 }
1399}
1400