1use crate::loom::sync::{atomic::AtomicUsize, Arc};
2use crate::sync::mpsc::chan;
3use crate::sync::mpsc::error::{SendError, TryRecvError};
4
5use std::fmt;
6use std::task::{Context, Poll};
7
8/// Send values to the associated `UnboundedReceiver`.
9///
10/// Instances are created by the [`unbounded_channel`] function.
11pub struct UnboundedSender<T> {
12 chan: chan::Tx<T, Semaphore>,
13}
14
15/// An unbounded sender that does not prevent the channel from being closed.
16///
17/// If all [`UnboundedSender`] instances of a channel were dropped and only
18/// `WeakUnboundedSender` instances remain, the channel is closed.
19///
20/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using
21/// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None`
22/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`.
23///
24/// [`UnboundedSender`]: UnboundedSender
25/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade
26///
27/// # Examples
28///
29/// ```
30/// use tokio::sync::mpsc::unbounded_channel;
31///
32/// #[tokio::main]
33/// async fn main() {
34/// let (tx, _rx) = unbounded_channel::<i32>();
35/// let tx_weak = tx.downgrade();
36///
37/// // Upgrading will succeed because `tx` still exists.
38/// assert!(tx_weak.upgrade().is_some());
39///
40/// // If we drop `tx`, then it will fail.
41/// drop(tx);
42/// assert!(tx_weak.clone().upgrade().is_none());
43/// }
44/// ```
45pub struct WeakUnboundedSender<T> {
46 chan: Arc<chan::Chan<T, Semaphore>>,
47}
48
49impl<T> Clone for UnboundedSender<T> {
50 fn clone(&self) -> Self {
51 UnboundedSender {
52 chan: self.chan.clone(),
53 }
54 }
55}
56
57impl<T> fmt::Debug for UnboundedSender<T> {
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt.debug_struct("UnboundedSender")
60 .field("chan", &self.chan)
61 .finish()
62 }
63}
64
65/// Receive values from the associated `UnboundedSender`.
66///
67/// Instances are created by the [`unbounded_channel`] function.
68///
69/// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`].
70///
71/// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html
72pub struct UnboundedReceiver<T> {
73 /// The channel receiver
74 chan: chan::Rx<T, Semaphore>,
75}
76
77impl<T> fmt::Debug for UnboundedReceiver<T> {
78 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
79 fmt.debug_struct("UnboundedReceiver")
80 .field("chan", &self.chan)
81 .finish()
82 }
83}
84
85/// Creates an unbounded mpsc channel for communicating between asynchronous
86/// tasks without backpressure.
87///
88/// A `send` on this channel will always succeed as long as the receive half has
89/// not been closed. If the receiver falls behind, messages will be arbitrarily
90/// buffered.
91///
92/// **Note** that the amount of available system memory is an implicit bound to
93/// the channel. Using an `unbounded` channel has the ability of causing the
94/// process to run out of memory. In this case, the process will be aborted.
95pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
96 let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0)));
97
98 let tx = UnboundedSender::new(tx);
99 let rx = UnboundedReceiver::new(rx);
100
101 (tx, rx)
102}
103
104/// No capacity
105#[derive(Debug)]
106pub(crate) struct Semaphore(pub(crate) AtomicUsize);
107
108impl<T> UnboundedReceiver<T> {
109 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> {
110 UnboundedReceiver { chan }
111 }
112
113 /// Receives the next value for this receiver.
114 ///
115 /// This method returns `None` if the channel has been closed and there are
116 /// no remaining messages in the channel's buffer. This indicates that no
117 /// further values can ever be received from this `Receiver`. The channel is
118 /// closed when all senders have been dropped, or when [`close`] is called.
119 ///
120 /// If there are no messages in the channel's buffer, but the channel has
121 /// not yet been closed, this method will sleep until a message is sent or
122 /// the channel is closed.
123 ///
124 /// # Cancel safety
125 ///
126 /// This method is cancel safe. If `recv` is used as the event in a
127 /// [`tokio::select!`](crate::select) statement and some other branch
128 /// completes first, it is guaranteed that no messages were received on this
129 /// channel.
130 ///
131 /// [`close`]: Self::close
132 ///
133 /// # Examples
134 ///
135 /// ```
136 /// use tokio::sync::mpsc;
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 /// let (tx, mut rx) = mpsc::unbounded_channel();
141 ///
142 /// tokio::spawn(async move {
143 /// tx.send("hello").unwrap();
144 /// });
145 ///
146 /// assert_eq!(Some("hello"), rx.recv().await);
147 /// assert_eq!(None, rx.recv().await);
148 /// }
149 /// ```
150 ///
151 /// Values are buffered:
152 ///
153 /// ```
154 /// use tokio::sync::mpsc;
155 ///
156 /// #[tokio::main]
157 /// async fn main() {
158 /// let (tx, mut rx) = mpsc::unbounded_channel();
159 ///
160 /// tx.send("hello").unwrap();
161 /// tx.send("world").unwrap();
162 ///
163 /// assert_eq!(Some("hello"), rx.recv().await);
164 /// assert_eq!(Some("world"), rx.recv().await);
165 /// }
166 /// ```
167 pub async fn recv(&mut self) -> Option<T> {
168 use crate::future::poll_fn;
169
170 poll_fn(|cx| self.poll_recv(cx)).await
171 }
172
173 /// Receives the next values for this receiver and extends `buffer`.
174 ///
175 /// This method extends `buffer` by no more than a fixed number of values
176 /// as specified by `limit`. If `limit` is zero, the function returns
177 /// immediately with `0`. The return value is the number of values added to
178 /// `buffer`.
179 ///
180 /// For `limit > 0`, if there are no messages in the channel's queue,
181 /// but the channel has not yet been closed, this method will sleep
182 /// until a message is sent or the channel is closed.
183 ///
184 /// For non-zero values of `limit`, this method will never return `0` unless
185 /// the channel has been closed and there are no remaining messages in the
186 /// channel's queue. This indicates that no further values can ever be
187 /// received from this `Receiver`. The channel is closed when all senders
188 /// have been dropped, or when [`close`] is called.
189 ///
190 /// The capacity of `buffer` is increased as needed.
191 ///
192 /// # Cancel safety
193 ///
194 /// This method is cancel safe. If `recv_many` is used as the event in a
195 /// [`tokio::select!`](crate::select) statement and some other branch
196 /// completes first, it is guaranteed that no messages were received on this
197 /// channel.
198 ///
199 /// [`close`]: Self::close
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// use tokio::sync::mpsc;
205 ///
206 /// #[tokio::main]
207 /// async fn main() {
208 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
209 /// let limit = 2;
210 /// let (tx, mut rx) = mpsc::unbounded_channel();
211 /// let tx2 = tx.clone();
212 /// tx2.send("first").unwrap();
213 /// tx2.send("second").unwrap();
214 /// tx2.send("third").unwrap();
215 ///
216 /// // Call `recv_many` to receive up to `limit` (2) values.
217 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
218 /// assert_eq!(vec!["first", "second"], buffer);
219 ///
220 /// // If the buffer is full, the next call to `recv_many`
221 /// // reserves additional capacity.
222 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
223 ///
224 /// tokio::spawn(async move {
225 /// tx.send("fourth").unwrap();
226 /// });
227 ///
228 /// // 'tx' is dropped, but `recv_many`
229 /// // is guaranteed not to return 0 as the channel
230 /// // is not yet closed.
231 /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await);
232 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
233 ///
234 /// // Once the last sender is dropped, the channel is
235 /// // closed and `recv_many` returns 0, capacity unchanged.
236 /// drop(tx2);
237 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
238 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
239 /// }
240 /// ```
241 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
242 use crate::future::poll_fn;
243 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
244 }
245
246 /// Tries to receive the next value for this receiver.
247 ///
248 /// This method returns the [`Empty`] error if the channel is currently
249 /// empty, but there are still outstanding [senders] or [permits].
250 ///
251 /// This method returns the [`Disconnected`] error if the channel is
252 /// currently empty, and there are no outstanding [senders] or [permits].
253 ///
254 /// Unlike the [`poll_recv`] method, this method will never return an
255 /// [`Empty`] error spuriously.
256 ///
257 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
258 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
259 /// [`poll_recv`]: Self::poll_recv
260 /// [senders]: crate::sync::mpsc::Sender
261 /// [permits]: crate::sync::mpsc::Permit
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use tokio::sync::mpsc;
267 /// use tokio::sync::mpsc::error::TryRecvError;
268 ///
269 /// #[tokio::main]
270 /// async fn main() {
271 /// let (tx, mut rx) = mpsc::unbounded_channel();
272 ///
273 /// tx.send("hello").unwrap();
274 ///
275 /// assert_eq!(Ok("hello"), rx.try_recv());
276 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
277 ///
278 /// tx.send("hello").unwrap();
279 /// // Drop the last sender, closing the channel.
280 /// drop(tx);
281 ///
282 /// assert_eq!(Ok("hello"), rx.try_recv());
283 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
284 /// }
285 /// ```
286 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
287 self.chan.try_recv()
288 }
289
290 /// Blocking receive to call outside of asynchronous contexts.
291 ///
292 /// # Panics
293 ///
294 /// This function panics if called within an asynchronous execution
295 /// context.
296 ///
297 /// # Examples
298 ///
299 /// ```
300 /// use std::thread;
301 /// use tokio::sync::mpsc;
302 ///
303 /// #[tokio::main]
304 /// async fn main() {
305 /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>();
306 ///
307 /// let sync_code = thread::spawn(move || {
308 /// assert_eq!(Some(10), rx.blocking_recv());
309 /// });
310 ///
311 /// let _ = tx.send(10);
312 /// sync_code.join().unwrap();
313 /// }
314 /// ```
315 #[track_caller]
316 #[cfg(feature = "sync")]
317 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
318 pub fn blocking_recv(&mut self) -> Option<T> {
319 crate::future::block_on(self.recv())
320 }
321
322 /// Closes the receiving half of a channel, without dropping it.
323 ///
324 /// This prevents any further messages from being sent on the channel while
325 /// still enabling the receiver to drain messages that are buffered.
326 ///
327 /// To guarantee that no messages are dropped, after calling `close()`,
328 /// `recv()` must be called until `None` is returned.
329 pub fn close(&mut self) {
330 self.chan.close();
331 }
332
333 /// Polls to receive the next message on this channel.
334 ///
335 /// This method returns:
336 ///
337 /// * `Poll::Pending` if no messages are available but the channel is not
338 /// closed, or if a spurious failure happens.
339 /// * `Poll::Ready(Some(message))` if a message is available.
340 /// * `Poll::Ready(None)` if the channel has been closed and all messages
341 /// sent before it was closed have been received.
342 ///
343 /// When the method returns `Poll::Pending`, the `Waker` in the provided
344 /// `Context` is scheduled to receive a wakeup when a message is sent on any
345 /// receiver, or when the channel is closed. Note that on multiple calls to
346 /// `poll_recv`, only the `Waker` from the `Context` passed to the most
347 /// recent call is scheduled to receive a wakeup.
348 ///
349 /// If this method returns `Poll::Pending` due to a spurious failure, then
350 /// the `Waker` will be notified when the situation causing the spurious
351 /// failure has been resolved. Note that receiving such a wakeup does not
352 /// guarantee that the next call will succeed — it could fail with another
353 /// spurious failure.
354 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
355 self.chan.recv(cx)
356 }
357}
358
359impl<T> UnboundedSender<T> {
360 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> {
361 UnboundedSender { chan }
362 }
363
364 /// Attempts to send a message on this `UnboundedSender` without blocking.
365 ///
366 /// This method is not marked async because sending a message to an unbounded channel
367 /// never requires any form of waiting. Because of this, the `send` method can be
368 /// used in both synchronous and asynchronous code without problems.
369 ///
370 /// If the receive half of the channel is closed, either due to [`close`]
371 /// being called or the [`UnboundedReceiver`] having been dropped, this
372 /// function returns an error. The error includes the value passed to `send`.
373 ///
374 /// [`close`]: UnboundedReceiver::close
375 /// [`UnboundedReceiver`]: UnboundedReceiver
376 pub fn send(&self, message: T) -> Result<(), SendError<T>> {
377 if !self.inc_num_messages() {
378 return Err(SendError(message));
379 }
380
381 self.chan.send(message);
382 Ok(())
383 }
384
385 fn inc_num_messages(&self) -> bool {
386 use std::process;
387 use std::sync::atomic::Ordering::{AcqRel, Acquire};
388
389 let mut curr = self.chan.semaphore().0.load(Acquire);
390
391 loop {
392 if curr & 1 == 1 {
393 return false;
394 }
395
396 if curr == usize::MAX ^ 1 {
397 // Overflowed the ref count. There is no safe way to recover, so
398 // abort the process. In practice, this should never happen.
399 process::abort()
400 }
401
402 match self
403 .chan
404 .semaphore()
405 .0
406 .compare_exchange(curr, curr + 2, AcqRel, Acquire)
407 {
408 Ok(_) => return true,
409 Err(actual) => {
410 curr = actual;
411 }
412 }
413 }
414 }
415
416 /// Completes when the receiver has dropped.
417 ///
418 /// This allows the producers to get notified when interest in the produced
419 /// values is canceled and immediately stop doing work.
420 ///
421 /// # Cancel safety
422 ///
423 /// This method is cancel safe. Once the channel is closed, it stays closed
424 /// forever and all future calls to `closed` will return immediately.
425 ///
426 /// # Examples
427 ///
428 /// ```
429 /// use tokio::sync::mpsc;
430 ///
431 /// #[tokio::main]
432 /// async fn main() {
433 /// let (tx1, rx) = mpsc::unbounded_channel::<()>();
434 /// let tx2 = tx1.clone();
435 /// let tx3 = tx1.clone();
436 /// let tx4 = tx1.clone();
437 /// let tx5 = tx1.clone();
438 /// tokio::spawn(async move {
439 /// drop(rx);
440 /// });
441 ///
442 /// futures::join!(
443 /// tx1.closed(),
444 /// tx2.closed(),
445 /// tx3.closed(),
446 /// tx4.closed(),
447 /// tx5.closed()
448 /// );
449 //// println!("Receiver dropped");
450 /// }
451 /// ```
452 pub async fn closed(&self) {
453 self.chan.closed().await;
454 }
455
456 /// Checks if the channel has been closed. This happens when the
457 /// [`UnboundedReceiver`] is dropped, or when the
458 /// [`UnboundedReceiver::close`] method is called.
459 ///
460 /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver
461 /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
462 ///
463 /// ```
464 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
465 /// assert!(!tx.is_closed());
466 ///
467 /// let tx2 = tx.clone();
468 /// assert!(!tx2.is_closed());
469 ///
470 /// drop(rx);
471 /// assert!(tx.is_closed());
472 /// assert!(tx2.is_closed());
473 /// ```
474 pub fn is_closed(&self) -> bool {
475 self.chan.is_closed()
476 }
477
478 /// Returns `true` if senders belong to the same channel.
479 ///
480 /// # Examples
481 ///
482 /// ```
483 /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>();
484 /// let tx2 = tx.clone();
485 /// assert!(tx.same_channel(&tx2));
486 ///
487 /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>();
488 /// assert!(!tx3.same_channel(&tx2));
489 /// ```
490 pub fn same_channel(&self, other: &Self) -> bool {
491 self.chan.same_channel(&other.chan)
492 }
493
494 /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count
495 /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the
496 /// channel were dropped and only `WeakUnboundedSender` instances remain,
497 /// the channel is closed.
498 pub fn downgrade(&self) -> WeakUnboundedSender<T> {
499 WeakUnboundedSender {
500 chan: self.chan.downgrade(),
501 }
502 }
503}
504
505impl<T> Clone for WeakUnboundedSender<T> {
506 fn clone(&self) -> Self {
507 WeakUnboundedSender {
508 chan: self.chan.clone(),
509 }
510 }
511}
512
513impl<T> WeakUnboundedSender<T> {
514 /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`].
515 /// This will return `Some` if there are other `Sender` instances alive and
516 /// the channel wasn't previously dropped, otherwise `None` is returned.
517 pub fn upgrade(&self) -> Option<UnboundedSender<T>> {
518 chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new)
519 }
520}
521
522impl<T> fmt::Debug for WeakUnboundedSender<T> {
523 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
524 fmt.debug_struct("WeakUnboundedSender").finish()
525 }
526}
527