1use crate::loom::sync::{atomic::AtomicUsize, Arc};
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the [`unbounded_channel`] function.
11pub struct UnboundedSender<T> {
12 chan: chan::Tx<T, Semaphore>,
13}
14
15/// An unbounded sender that does not prevent the channel from being closed.
16///
17/// If all [`UnboundedSender`] instances of a channel were dropped and only
18/// `WeakUnboundedSender` instances remain, the channel is closed.
19///
20/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
21/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
22/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
23///
24/// [`UnboundedSender`]: UnboundedSender
25/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
26///
27/// # Examples
28///
29/// ```
30/// use tokio::sync::mpsc::unbounded_channel;
31///
32/// #[tokio::main]
33/// async fn main() {
34/// let (tx, _rx) = unbounded_channel::<i32>();
35/// let tx_weak = tx.downgrade();
36///
37/// // Upgrading will succeed because `tx` still exists.
38/// assert!(tx_weak.upgrade().is_some());
39///
40/// // If we drop `tx`, then it will fail.
41/// drop(tx);
42/// assert!(tx_weak.clone().upgrade().is_none());
43/// }
44/// ```
45pub struct WeakUnboundedSender<T> {
46 chan: Arc<chan::Chan<T, Semaphore>>,
47}
48
49impl<T> Clone for UnboundedSender<T> {
50 fn clone(&self) -> Self {
51 UnboundedSender {
52 chan: self.chan.clone(),
53 }
54 }
55}
56
57impl<T> fmt::Debug for UnboundedSender<T> {
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedSender")
60 .field(name:"chan", &self.chan)
61 .finish()
62 }
63}
64
65/// Receive values from the associated `UnboundedSender`.
66///
67/// Instances are created by the [`unbounded_channel`] function.
68///
69/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
70///
71/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
72pub struct UnboundedReceiver<T> {
73 /// The channel receiver
74 chan: chan::Rx<T, Semaphore>,
75}
76
77impl<T> fmt::Debug for UnboundedReceiver<T> {
78 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
79 fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedReceiver")
80 .field(name:"chan", &self.chan)
81 .finish()
82 }
83}
84
85/// Creates an unbounded mpsc channel for communicating between asynchronous
86/// tasks without backpressure.
87///
88/// A `send` on this channel will always succeed as long as the receive half has
89/// not been closed. If the receiver falls behind, messages will be arbitrarily
90/// buffered.
91///
92/// **Note** that the amount of available system memory is an implicit bound to
93/// the channel. Using an `unbounded` channel has the ability of causing the
94/// process to run out of memory. In this case, the process will be aborted.
95pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
96 let (tx: Tx, rx: Rx) = chan::channel(Semaphore(AtomicUsize::new(val:0)));
97
98 let tx: UnboundedSender = UnboundedSender::new(chan:tx);
99 let rx: UnboundedReceiver = UnboundedReceiver::new(chan:rx);
100
101 (tx, rx)
102}
103
104/// No capacity
105#[derive(Debug)]
106pub(crate) struct Semaphore(pub(crate) AtomicUsize);
107
108impl<T> UnboundedReceiver<T> {
109 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
110 UnboundedReceiver { chan }
111 }
112
113 /// Receives the next value for this receiver.
114 ///
115 /// This method returns `None` if the channel has been closed and there are
116 /// no remaining messages in the channel's buffer. This indicates that no
117 /// further values can ever be received from this `Receiver`. The channel is
118 /// closed when all senders have been dropped, or when [`close`] is called.
119 ///
120 /// If there are no messages in the channel's buffer, but the channel has
121 /// not yet been closed, this method will sleep until a message is sent or
122 /// the channel is closed.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. If `recv` is used as the event in a
127 /// [`tokio::select!`](crate::select) statement and some other branch
128 /// completes first, it is guaranteed that no messages were received on this
129 /// channel.
130 ///
131 /// [`close`]: Self::close
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::mpsc;
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 /// let (tx, mut rx) = mpsc::unbounded_channel();
141 ///
142 /// tokio::spawn(async move {
143 /// tx.send("hello").unwrap();
144 /// });
145 ///
146 /// assert_eq!(Some("hello"), rx.recv().await);
147 /// assert_eq!(None, rx.recv().await);
148 /// }
149 /// ```
150 ///
151 /// Values are buffered:
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let (tx, mut rx) = mpsc::unbounded_channel();
159 ///
160 /// tx.send("hello").unwrap();
161 /// tx.send("world").unwrap();
162 ///
163 /// assert_eq!(Some("hello"), rx.recv().await);
164 /// assert_eq!(Some("world"), rx.recv().await);
165 /// }
166 /// ```
167 pub async fn recv(&mut self) -> Option<T> {
168 use crate::future::poll_fn;
169
170 poll_fn(|cx| self.poll_recv(cx)).await
171 }
172
173 /// Receives the next values for this receiver and extends `buffer`.
174 ///
175 /// This method extends `buffer` by no more than a fixed number of values
176 /// as specified by `limit`. If `limit` is zero, the function returns
177 /// immediately with `0`. The return value is the number of values added to
178 /// `buffer`.
179 ///
180 /// For `limit > 0`, if there are no messages in the channel's queue,
181 /// but the channel has not yet been closed, this method will sleep
182 /// until a message is sent or the channel is closed.
183 ///
184 /// For non-zero values of `limit`, this method will never return `0` unless
185 /// the channel has been closed and there are no remaining messages in the
186 /// channel's queue. This indicates that no further values can ever be
187 /// received from this `Receiver`. The channel is closed when all senders
188 /// have been dropped, or when [`close`] is called.
189 ///
190 /// The capacity of `buffer` is increased as needed.
191 ///
192 /// # Cancel safety
193 ///
194 /// This method is cancel safe. If `recv_many` is used as the event in a
195 /// [`tokio::select!`](crate::select) statement and some other branch
196 /// completes first, it is guaranteed that no messages were received on this
197 /// channel.
198 ///
199 /// [`close`]: Self::close
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use tokio::sync::mpsc;
205 ///
206 /// #[tokio::main]
207 /// async fn main() {
208 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
209 /// let limit = 2;
210 /// let (tx, mut rx) = mpsc::unbounded_channel();
211 /// let tx2 = tx.clone();
212 /// tx2.send("first").unwrap();
213 /// tx2.send("second").unwrap();
214 /// tx2.send("third").unwrap();
215 ///
216 /// // Call `recv_many` to receive up to `limit` (2) values.
217 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
218 /// assert_eq!(vec!["first", "second"], buffer);
219 ///
220 /// // If the buffer is full, the next call to `recv_many`
221 /// // reserves additional capacity.
222 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
223 ///
224 /// tokio::spawn(async move {
225 /// tx.send("fourth").unwrap();
226 /// });
227 ///
228 /// // 'tx' is dropped, but `recv_many`
229 /// // is guaranteed not to return 0 as the channel
230 /// // is not yet closed.
231 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
232 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
233 ///
234 /// // Once the last sender is dropped, the channel is
235 /// // closed and `recv_many` returns 0, capacity unchanged.
236 /// drop(tx2);
237 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
238 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
239 /// }
240 /// ```
241 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
242 use crate::future::poll_fn;
243 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
244 }
245
246 /// Tries to receive the next value for this receiver.
247 ///
248 /// This method returns the [`Empty`] error if the channel is currently
249 /// empty, but there are still outstanding [senders] or [permits].
250 ///
251 /// This method returns the [`Disconnected`] error if the channel is
252 /// currently empty, and there are no outstanding [senders] or [permits].
253 ///
254 /// Unlike the [`poll_recv`] method, this method will never return an
255 /// [`Empty`] error spuriously.
256 ///
257 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
258 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
259 /// [`poll_recv`]: Self::poll_recv
260 /// [senders]: crate::sync::mpsc::Sender
261 /// [permits]: crate::sync::mpsc::Permit
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use tokio::sync::mpsc;
267 /// use tokio::sync::mpsc::error::TryRecvError;
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let (tx, mut rx) = mpsc::unbounded_channel();
272 ///
273 /// tx.send("hello").unwrap();
274 ///
275 /// assert_eq!(Ok("hello"), rx.try_recv());
276 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
277 ///
278 /// tx.send("hello").unwrap();
279 /// // Drop the last sender, closing the channel.
280 /// drop(tx);
281 ///
282 /// assert_eq!(Ok("hello"), rx.try_recv());
283 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
284 /// }
285 /// ```
286 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
287 self.chan.try_recv()
288 }
289
290 /// Blocking receive to call outside of asynchronous contexts.
291 ///
292 /// # Panics
293 ///
294 /// This function panics if called within an asynchronous execution
295 /// context.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use std::thread;
301 /// use tokio::sync::mpsc;
302 ///
303 /// #[tokio::main]
304 /// async fn main() {
305 /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
306 ///
307 /// let sync_code = thread::spawn(move || {
308 /// assert_eq!(Some(10), rx.blocking_recv());
309 /// });
310 ///
311 /// let _ = tx.send(10);
312 /// sync_code.join().unwrap();
313 /// }
314 /// ```
315 #[track_caller]
316 #[cfg(feature = "sync")]
317 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
318 pub fn blocking_recv(&mut self) -> Option<T> {
319 crate::future::block_on(self.recv())
320 }
321
322 /// Closes the receiving half of a channel, without dropping it.
323 ///
324 /// This prevents any further messages from being sent on the channel while
325 /// still enabling the receiver to drain messages that are buffered.
326 ///
327 /// To guarantee that no messages are dropped, after calling `close()`,
328 /// `recv()` must be called until `None` is returned.
329 pub fn close(&mut self) {
330 self.chan.close();
331 }
332
333 /// Polls to receive the next message on this channel.
334 ///
335 /// This method returns:
336 ///
337 /// * `Poll::Pending` if no messages are available but the channel is not
338 /// closed, or if a spurious failure happens.
339 /// * `Poll::Ready(Some(message))` if a message is available.
340 /// * `Poll::Ready(None)` if the channel has been closed and all messages
341 /// sent before it was closed have been received.
342 ///
343 /// When the method returns `Poll::Pending`, the `Waker` in the provided
344 /// `Context` is scheduled to receive a wakeup when a message is sent on any
345 /// receiver, or when the channel is closed. Note that on multiple calls to
346 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
347 /// passed to the most recent call is scheduled to receive a wakeup.
348 ///
349 /// If this method returns `Poll::Pending` due to a spurious failure, then
350 /// the `Waker` will be notified when the situation causing the spurious
351 /// failure has been resolved. Note that receiving such a wakeup does not
352 /// guarantee that the next call will succeed — it could fail with another
353 /// spurious failure.
354 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
355 self.chan.recv(cx)
356 }
357
358 /// Polls to receive multiple messages on this channel, extending the provided buffer.
359 ///
360 /// This method returns:
361 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
362 /// spurious failure happens.
363 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
364 /// stored in `buffer`. This can be less than, or equal to, `limit`.
365 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
366 ///
367 /// When the method returns `Poll::Pending`, the `Waker` in the provided
368 /// `Context` is scheduled to receive a wakeup when a message is sent on any
369 /// receiver, or when the channel is closed. Note that on multiple calls to
370 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
371 /// passed to the most recent call is scheduled to receive a wakeup.
372 ///
373 /// Note that this method does not guarantee that exactly `limit` messages
374 /// are received. Rather, if at least one message is available, it returns
375 /// as many messages as it can up to the given limit. This method returns
376 /// zero only if the channel is closed (or if `limit` is zero).
377 ///
378 /// # Examples
379 ///
380 /// ```
381 /// use std::task::{Context, Poll};
382 /// use std::pin::Pin;
383 /// use tokio::sync::mpsc;
384 /// use futures::Future;
385 ///
386 /// struct MyReceiverFuture<'a> {
387 /// receiver: mpsc::UnboundedReceiver<i32>,
388 /// buffer: &'a mut Vec<i32>,
389 /// limit: usize,
390 /// }
391 ///
392 /// impl<'a> Future for MyReceiverFuture<'a> {
393 /// type Output = usize; // Number of messages received
394 ///
395 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
396 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
397 ///
398 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
399 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
400 /// Poll::Pending => Poll::Pending,
401 /// Poll::Ready(count) => Poll::Ready(count),
402 /// }
403 /// }
404 /// }
405 ///
406 /// #[tokio::main]
407 /// async fn main() {
408 /// let (tx, rx) = mpsc::unbounded_channel::<i32>();
409 /// let mut buffer = Vec::new();
410 ///
411 /// let my_receiver_future = MyReceiverFuture {
412 /// receiver: rx,
413 /// buffer: &mut buffer,
414 /// limit: 3,
415 /// };
416 ///
417 /// for i in 0..10 {
418 /// tx.send(i).expect("Unable to send integer");
419 /// }
420 ///
421 /// let count = my_receiver_future.await;
422 /// assert_eq!(count, 3);
423 /// assert_eq!(buffer, vec![0,1,2])
424 /// }
425 /// ```
426 pub fn poll_recv_many(
427 &mut self,
428 cx: &mut Context<'_>,
429 buffer: &mut Vec<T>,
430 limit: usize,
431 ) -> Poll<usize> {
432 self.chan.recv_many(cx, buffer, limit)
433 }
434}
435
436impl<T> UnboundedSender<T> {
437 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
438 UnboundedSender { chan }
439 }
440
441 /// Attempts to send a message on this `UnboundedSender` without blocking.
442 ///
443 /// This method is not marked async because sending a message to an unbounded channel
444 /// never requires any form of waiting. Because of this, the `send` method can be
445 /// used in both synchronous and asynchronous code without problems.
446 ///
447 /// If the receive half of the channel is closed, either due to [`close`]
448 /// being called or the [`UnboundedReceiver`] having been dropped, this
449 /// function returns an error. The error includes the value passed to `send`.
450 ///
451 /// [`close`]: UnboundedReceiver::close
452 /// [`UnboundedReceiver`]: UnboundedReceiver
453 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
454 if !self.inc_num_messages() {
455 return Err(SendError(message));
456 }
457
458 self.chan.send(message);
459 Ok(())
460 }
461
462 fn inc_num_messages(&self) -> bool {
463 use std::process;
464 use std::sync::atomic::Ordering::{AcqRel, Acquire};
465
466 let mut curr = self.chan.semaphore().0.load(Acquire);
467
468 loop {
469 if curr & 1 == 1 {
470 return false;
471 }
472
473 if curr == usize::MAX ^ 1 {
474 // Overflowed the ref count. There is no safe way to recover, so
475 // abort the process. In practice, this should never happen.
476 process::abort()
477 }
478
479 match self
480 .chan
481 .semaphore()
482 .0
483 .compare_exchange(curr, curr + 2, AcqRel, Acquire)
484 {
485 Ok(_) => return true,
486 Err(actual) => {
487 curr = actual;
488 }
489 }
490 }
491 }
492
493 /// Completes when the receiver has dropped.
494 ///
495 /// This allows the producers to get notified when interest in the produced
496 /// values is canceled and immediately stop doing work.
497 ///
498 /// # Cancel safety
499 ///
500 /// This method is cancel safe. Once the channel is closed, it stays closed
501 /// forever and all future calls to `closed` will return immediately.
502 ///
503 /// # Examples
504 ///
505 /// ```
506 /// use tokio::sync::mpsc;
507 ///
508 /// #[tokio::main]
509 /// async fn main() {
510 /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
511 /// let tx2 = tx1.clone();
512 /// let tx3 = tx1.clone();
513 /// let tx4 = tx1.clone();
514 /// let tx5 = tx1.clone();
515 /// tokio::spawn(async move {
516 /// drop(rx);
517 /// });
518 ///
519 /// futures::join!(
520 /// tx1.closed(),
521 /// tx2.closed(),
522 /// tx3.closed(),
523 /// tx4.closed(),
524 /// tx5.closed()
525 /// );
526 //// println!("Receiver dropped");
527 /// }
528 /// ```
529 pub async fn closed(&self) {
530 self.chan.closed().await;
531 }
532
533 /// Checks if the channel has been closed. This happens when the
534 /// [`UnboundedReceiver`] is dropped, or when the
535 /// [`UnboundedReceiver::close`] method is called.
536 ///
537 /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
538 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
539 ///
540 /// ```
541 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
542 /// assert!(!tx.is_closed());
543 ///
544 /// let tx2 = tx.clone();
545 /// assert!(!tx2.is_closed());
546 ///
547 /// drop(rx);
548 /// assert!(tx.is_closed());
549 /// assert!(tx2.is_closed());
550 /// ```
551 pub fn is_closed(&self) -> bool {
552 self.chan.is_closed()
553 }
554
555 /// Returns `true` if senders belong to the same channel.
556 ///
557 /// # Examples
558 ///
559 /// ```
560 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
561 /// let tx2 = tx.clone();
562 /// assert!(tx.same_channel(&tx2));
563 ///
564 /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
565 /// assert!(!tx3.same_channel(&tx2));
566 /// ```
567 pub fn same_channel(&self, other: &Self) -> bool {
568 self.chan.same_channel(&other.chan)
569 }
570
571 /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
572 /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
573 /// channel were dropped and only `WeakUnboundedSender` instances remain,
574 /// the channel is closed.
575 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
576 WeakUnboundedSender {
577 chan: self.chan.downgrade(),
578 }
579 }
580}
581
582impl<T> Clone for WeakUnboundedSender<T> {
583 fn clone(&self) -> Self {
584 WeakUnboundedSender {
585 chan: self.chan.clone(),
586 }
587 }
588}
589
590impl<T> WeakUnboundedSender<T> {
591 /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
592 /// This will return `Some` if there are other `Sender` instances alive and
593 /// the channel wasn't previously dropped, otherwise `None` is returned.
594 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
595 chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
596 }
597}
598
599impl<T> fmt::Debug for WeakUnboundedSender<T> {
600 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
601 fmt.debug_struct(name:"WeakUnboundedSender").finish()
602 }
603}
604