1 | use crate::loom::sync::{atomic::AtomicUsize, Arc}; |
2 | use crate::sync::mpsc::chan; |
3 | use crate::sync::mpsc::error::{SendError, TryRecvError}; |
4 | |
5 | use std::fmt; |
6 | use std::task::{Context, Poll}; |
7 | |
8 | /// Send values to the associated `UnboundedReceiver`. |
9 | /// |
10 | /// Instances are created by the [`unbounded_channel`] function. |
11 | pub struct UnboundedSender<T> { |
12 | chan: chan::Tx<T, Semaphore>, |
13 | } |
14 | |
15 | /// An unbounded sender that does not prevent the channel from being closed. |
16 | /// |
17 | /// If all [`UnboundedSender`] instances of a channel were dropped and only |
18 | /// `WeakUnboundedSender` instances remain, the channel is closed. |
19 | /// |
20 | /// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using |
21 | /// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None` |
22 | /// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`. |
23 | /// |
24 | /// [`UnboundedSender`]: UnboundedSender |
25 | /// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade |
26 | /// |
27 | /// # Examples |
28 | /// |
29 | /// ``` |
30 | /// use tokio::sync::mpsc::unbounded_channel; |
31 | /// |
32 | /// #[tokio::main] |
33 | /// async fn main() { |
34 | /// let (tx, _rx) = unbounded_channel::<i32>(); |
35 | /// let tx_weak = tx.downgrade(); |
36 | /// |
37 | /// // Upgrading will succeed because `tx` still exists. |
38 | /// assert!(tx_weak.upgrade().is_some()); |
39 | /// |
40 | /// // If we drop `tx`, then it will fail. |
41 | /// drop(tx); |
42 | /// assert!(tx_weak.clone().upgrade().is_none()); |
43 | /// } |
44 | /// ``` |
45 | pub struct WeakUnboundedSender<T> { |
46 | chan: Arc<chan::Chan<T, Semaphore>>, |
47 | } |
48 | |
49 | impl<T> Clone for UnboundedSender<T> { |
50 | fn clone(&self) -> Self { |
51 | UnboundedSender { |
52 | chan: self.chan.clone(), |
53 | } |
54 | } |
55 | } |
56 | |
57 | impl<T> fmt::Debug for UnboundedSender<T> { |
58 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
59 | fmt.debug_struct("UnboundedSender" ) |
60 | .field("chan" , &self.chan) |
61 | .finish() |
62 | } |
63 | } |
64 | |
65 | /// Receive values from the associated `UnboundedSender`. |
66 | /// |
67 | /// Instances are created by the [`unbounded_channel`] function. |
68 | /// |
69 | /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. |
70 | /// |
71 | /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html |
72 | pub struct UnboundedReceiver<T> { |
73 | /// The channel receiver |
74 | chan: chan::Rx<T, Semaphore>, |
75 | } |
76 | |
77 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
78 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
79 | fmt.debug_struct("UnboundedReceiver" ) |
80 | .field("chan" , &self.chan) |
81 | .finish() |
82 | } |
83 | } |
84 | |
85 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
86 | /// tasks without backpressure. |
87 | /// |
88 | /// A `send` on this channel will always succeed as long as the receive half has |
89 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
90 | /// buffered. |
91 | /// |
92 | /// **Note** that the amount of available system memory is an implicit bound to |
93 | /// the channel. Using an `unbounded` channel has the ability of causing the |
94 | /// process to run out of memory. In this case, the process will be aborted. |
95 | pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
96 | let (tx, rx) = chan::channel(Semaphore(AtomicUsize::new(0))); |
97 | |
98 | let tx = UnboundedSender::new(tx); |
99 | let rx = UnboundedReceiver::new(rx); |
100 | |
101 | (tx, rx) |
102 | } |
103 | |
104 | /// No capacity |
105 | #[derive(Debug)] |
106 | pub(crate) struct Semaphore(pub(crate) AtomicUsize); |
107 | |
108 | impl<T> UnboundedReceiver<T> { |
109 | pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> { |
110 | UnboundedReceiver { chan } |
111 | } |
112 | |
113 | /// Receives the next value for this receiver. |
114 | /// |
115 | /// This method returns `None` if the channel has been closed and there are |
116 | /// no remaining messages in the channel's buffer. This indicates that no |
117 | /// further values can ever be received from this `Receiver`. The channel is |
118 | /// closed when all senders have been dropped, or when [`close`] is called. |
119 | /// |
120 | /// If there are no messages in the channel's buffer, but the channel has |
121 | /// not yet been closed, this method will sleep until a message is sent or |
122 | /// the channel is closed. |
123 | /// |
124 | /// # Cancel safety |
125 | /// |
126 | /// This method is cancel safe. If `recv` is used as the event in a |
127 | /// [`tokio::select!`](crate::select) statement and some other branch |
128 | /// completes first, it is guaranteed that no messages were received on this |
129 | /// channel. |
130 | /// |
131 | /// [`close`]: Self::close |
132 | /// |
133 | /// # Examples |
134 | /// |
135 | /// ``` |
136 | /// use tokio::sync::mpsc; |
137 | /// |
138 | /// #[tokio::main] |
139 | /// async fn main() { |
140 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
141 | /// |
142 | /// tokio::spawn(async move { |
143 | /// tx.send("hello" ).unwrap(); |
144 | /// }); |
145 | /// |
146 | /// assert_eq!(Some("hello" ), rx.recv().await); |
147 | /// assert_eq!(None, rx.recv().await); |
148 | /// } |
149 | /// ``` |
150 | /// |
151 | /// Values are buffered: |
152 | /// |
153 | /// ``` |
154 | /// use tokio::sync::mpsc; |
155 | /// |
156 | /// #[tokio::main] |
157 | /// async fn main() { |
158 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
159 | /// |
160 | /// tx.send("hello" ).unwrap(); |
161 | /// tx.send("world" ).unwrap(); |
162 | /// |
163 | /// assert_eq!(Some("hello" ), rx.recv().await); |
164 | /// assert_eq!(Some("world" ), rx.recv().await); |
165 | /// } |
166 | /// ``` |
167 | pub async fn recv(&mut self) -> Option<T> { |
168 | use crate::future::poll_fn; |
169 | |
170 | poll_fn(|cx| self.poll_recv(cx)).await |
171 | } |
172 | |
173 | /// Receives the next values for this receiver and extends `buffer`. |
174 | /// |
175 | /// This method extends `buffer` by no more than a fixed number of values |
176 | /// as specified by `limit`. If `limit` is zero, the function returns |
177 | /// immediately with `0`. The return value is the number of values added to |
178 | /// `buffer`. |
179 | /// |
180 | /// For `limit > 0`, if there are no messages in the channel's queue, |
181 | /// but the channel has not yet been closed, this method will sleep |
182 | /// until a message is sent or the channel is closed. |
183 | /// |
184 | /// For non-zero values of `limit`, this method will never return `0` unless |
185 | /// the channel has been closed and there are no remaining messages in the |
186 | /// channel's queue. This indicates that no further values can ever be |
187 | /// received from this `Receiver`. The channel is closed when all senders |
188 | /// have been dropped, or when [`close`] is called. |
189 | /// |
190 | /// The capacity of `buffer` is increased as needed. |
191 | /// |
192 | /// # Cancel safety |
193 | /// |
194 | /// This method is cancel safe. If `recv_many` is used as the event in a |
195 | /// [`tokio::select!`](crate::select) statement and some other branch |
196 | /// completes first, it is guaranteed that no messages were received on this |
197 | /// channel. |
198 | /// |
199 | /// [`close`]: Self::close |
200 | /// |
201 | /// # Examples |
202 | /// |
203 | /// ``` |
204 | /// use tokio::sync::mpsc; |
205 | /// |
206 | /// #[tokio::main] |
207 | /// async fn main() { |
208 | /// let mut buffer: Vec<&str> = Vec::with_capacity(2); |
209 | /// let limit = 2; |
210 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
211 | /// let tx2 = tx.clone(); |
212 | /// tx2.send("first" ).unwrap(); |
213 | /// tx2.send("second" ).unwrap(); |
214 | /// tx2.send("third" ).unwrap(); |
215 | /// |
216 | /// // Call `recv_many` to receive up to `limit` (2) values. |
217 | /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await); |
218 | /// assert_eq!(vec!["first" , "second" ], buffer); |
219 | /// |
220 | /// // If the buffer is full, the next call to `recv_many` |
221 | /// // reserves additional capacity. |
222 | /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
223 | /// |
224 | /// tokio::spawn(async move { |
225 | /// tx.send("fourth" ).unwrap(); |
226 | /// }); |
227 | /// |
228 | /// // 'tx' is dropped, but `recv_many` |
229 | /// // is guaranteed not to return 0 as the channel |
230 | /// // is not yet closed. |
231 | /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
232 | /// assert_eq!(vec!["first" , "second" , "third" , "fourth" ], buffer); |
233 | /// |
234 | /// // Once the last sender is dropped, the channel is |
235 | /// // closed and `recv_many` returns 0, capacity unchanged. |
236 | /// drop(tx2); |
237 | /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await); |
238 | /// assert_eq!(vec!["first" , "second" , "third" , "fourth" ], buffer); |
239 | /// } |
240 | /// ``` |
241 | pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize { |
242 | use crate::future::poll_fn; |
243 | poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await |
244 | } |
245 | |
246 | /// Tries to receive the next value for this receiver. |
247 | /// |
248 | /// This method returns the [`Empty`] error if the channel is currently |
249 | /// empty, but there are still outstanding [senders] or [permits]. |
250 | /// |
251 | /// This method returns the [`Disconnected`] error if the channel is |
252 | /// currently empty, and there are no outstanding [senders] or [permits]. |
253 | /// |
254 | /// Unlike the [`poll_recv`] method, this method will never return an |
255 | /// [`Empty`] error spuriously. |
256 | /// |
257 | /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty |
258 | /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected |
259 | /// [`poll_recv`]: Self::poll_recv |
260 | /// [senders]: crate::sync::mpsc::Sender |
261 | /// [permits]: crate::sync::mpsc::Permit |
262 | /// |
263 | /// # Examples |
264 | /// |
265 | /// ``` |
266 | /// use tokio::sync::mpsc; |
267 | /// use tokio::sync::mpsc::error::TryRecvError; |
268 | /// |
269 | /// #[tokio::main] |
270 | /// async fn main() { |
271 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
272 | /// |
273 | /// tx.send("hello" ).unwrap(); |
274 | /// |
275 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
276 | /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
277 | /// |
278 | /// tx.send("hello" ).unwrap(); |
279 | /// // Drop the last sender, closing the channel. |
280 | /// drop(tx); |
281 | /// |
282 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
283 | /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
284 | /// } |
285 | /// ``` |
286 | pub fn try_recv(&mut self) -> Result<T, TryRecvError> { |
287 | self.chan.try_recv() |
288 | } |
289 | |
290 | /// Blocking receive to call outside of asynchronous contexts. |
291 | /// |
292 | /// # Panics |
293 | /// |
294 | /// This function panics if called within an asynchronous execution |
295 | /// context. |
296 | /// |
297 | /// # Examples |
298 | /// |
299 | /// ``` |
300 | /// use std::thread; |
301 | /// use tokio::sync::mpsc; |
302 | /// |
303 | /// #[tokio::main] |
304 | /// async fn main() { |
305 | /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>(); |
306 | /// |
307 | /// let sync_code = thread::spawn(move || { |
308 | /// assert_eq!(Some(10), rx.blocking_recv()); |
309 | /// }); |
310 | /// |
311 | /// let _ = tx.send(10); |
312 | /// sync_code.join().unwrap(); |
313 | /// } |
314 | /// ``` |
315 | #[track_caller ] |
316 | #[cfg (feature = "sync" )] |
317 | #[cfg_attr (docsrs, doc(alias = "recv_blocking" ))] |
318 | pub fn blocking_recv(&mut self) -> Option<T> { |
319 | crate::future::block_on(self.recv()) |
320 | } |
321 | |
322 | /// Closes the receiving half of a channel, without dropping it. |
323 | /// |
324 | /// This prevents any further messages from being sent on the channel while |
325 | /// still enabling the receiver to drain messages that are buffered. |
326 | /// |
327 | /// To guarantee that no messages are dropped, after calling `close()`, |
328 | /// `recv()` must be called until `None` is returned. |
329 | pub fn close(&mut self) { |
330 | self.chan.close(); |
331 | } |
332 | |
333 | /// Polls to receive the next message on this channel. |
334 | /// |
335 | /// This method returns: |
336 | /// |
337 | /// * `Poll::Pending` if no messages are available but the channel is not |
338 | /// closed, or if a spurious failure happens. |
339 | /// * `Poll::Ready(Some(message))` if a message is available. |
340 | /// * `Poll::Ready(None)` if the channel has been closed and all messages |
341 | /// sent before it was closed have been received. |
342 | /// |
343 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
344 | /// `Context` is scheduled to receive a wakeup when a message is sent on any |
345 | /// receiver, or when the channel is closed. Note that on multiple calls to |
346 | /// `poll_recv`, only the `Waker` from the `Context` passed to the most |
347 | /// recent call is scheduled to receive a wakeup. |
348 | /// |
349 | /// If this method returns `Poll::Pending` due to a spurious failure, then |
350 | /// the `Waker` will be notified when the situation causing the spurious |
351 | /// failure has been resolved. Note that receiving such a wakeup does not |
352 | /// guarantee that the next call will succeed — it could fail with another |
353 | /// spurious failure. |
354 | pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
355 | self.chan.recv(cx) |
356 | } |
357 | } |
358 | |
359 | impl<T> UnboundedSender<T> { |
360 | pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> { |
361 | UnboundedSender { chan } |
362 | } |
363 | |
364 | /// Attempts to send a message on this `UnboundedSender` without blocking. |
365 | /// |
366 | /// This method is not marked async because sending a message to an unbounded channel |
367 | /// never requires any form of waiting. Because of this, the `send` method can be |
368 | /// used in both synchronous and asynchronous code without problems. |
369 | /// |
370 | /// If the receive half of the channel is closed, either due to [`close`] |
371 | /// being called or the [`UnboundedReceiver`] having been dropped, this |
372 | /// function returns an error. The error includes the value passed to `send`. |
373 | /// |
374 | /// [`close`]: UnboundedReceiver::close |
375 | /// [`UnboundedReceiver`]: UnboundedReceiver |
376 | pub fn send(&self, message: T) -> Result<(), SendError<T>> { |
377 | if !self.inc_num_messages() { |
378 | return Err(SendError(message)); |
379 | } |
380 | |
381 | self.chan.send(message); |
382 | Ok(()) |
383 | } |
384 | |
385 | fn inc_num_messages(&self) -> bool { |
386 | use std::process; |
387 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
388 | |
389 | let mut curr = self.chan.semaphore().0.load(Acquire); |
390 | |
391 | loop { |
392 | if curr & 1 == 1 { |
393 | return false; |
394 | } |
395 | |
396 | if curr == usize::MAX ^ 1 { |
397 | // Overflowed the ref count. There is no safe way to recover, so |
398 | // abort the process. In practice, this should never happen. |
399 | process::abort() |
400 | } |
401 | |
402 | match self |
403 | .chan |
404 | .semaphore() |
405 | .0 |
406 | .compare_exchange(curr, curr + 2, AcqRel, Acquire) |
407 | { |
408 | Ok(_) => return true, |
409 | Err(actual) => { |
410 | curr = actual; |
411 | } |
412 | } |
413 | } |
414 | } |
415 | |
416 | /// Completes when the receiver has dropped. |
417 | /// |
418 | /// This allows the producers to get notified when interest in the produced |
419 | /// values is canceled and immediately stop doing work. |
420 | /// |
421 | /// # Cancel safety |
422 | /// |
423 | /// This method is cancel safe. Once the channel is closed, it stays closed |
424 | /// forever and all future calls to `closed` will return immediately. |
425 | /// |
426 | /// # Examples |
427 | /// |
428 | /// ``` |
429 | /// use tokio::sync::mpsc; |
430 | /// |
431 | /// #[tokio::main] |
432 | /// async fn main() { |
433 | /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); |
434 | /// let tx2 = tx1.clone(); |
435 | /// let tx3 = tx1.clone(); |
436 | /// let tx4 = tx1.clone(); |
437 | /// let tx5 = tx1.clone(); |
438 | /// tokio::spawn(async move { |
439 | /// drop(rx); |
440 | /// }); |
441 | /// |
442 | /// futures::join!( |
443 | /// tx1.closed(), |
444 | /// tx2.closed(), |
445 | /// tx3.closed(), |
446 | /// tx4.closed(), |
447 | /// tx5.closed() |
448 | /// ); |
449 | //// println!("Receiver dropped"); |
450 | /// } |
451 | /// ``` |
452 | pub async fn closed(&self) { |
453 | self.chan.closed().await; |
454 | } |
455 | |
456 | /// Checks if the channel has been closed. This happens when the |
457 | /// [`UnboundedReceiver`] is dropped, or when the |
458 | /// [`UnboundedReceiver::close`] method is called. |
459 | /// |
460 | /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver |
461 | /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close |
462 | /// |
463 | /// ``` |
464 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
465 | /// assert!(!tx.is_closed()); |
466 | /// |
467 | /// let tx2 = tx.clone(); |
468 | /// assert!(!tx2.is_closed()); |
469 | /// |
470 | /// drop(rx); |
471 | /// assert!(tx.is_closed()); |
472 | /// assert!(tx2.is_closed()); |
473 | /// ``` |
474 | pub fn is_closed(&self) -> bool { |
475 | self.chan.is_closed() |
476 | } |
477 | |
478 | /// Returns `true` if senders belong to the same channel. |
479 | /// |
480 | /// # Examples |
481 | /// |
482 | /// ``` |
483 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
484 | /// let tx2 = tx.clone(); |
485 | /// assert!(tx.same_channel(&tx2)); |
486 | /// |
487 | /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>(); |
488 | /// assert!(!tx3.same_channel(&tx2)); |
489 | /// ``` |
490 | pub fn same_channel(&self, other: &Self) -> bool { |
491 | self.chan.same_channel(&other.chan) |
492 | } |
493 | |
494 | /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count |
495 | /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the |
496 | /// channel were dropped and only `WeakUnboundedSender` instances remain, |
497 | /// the channel is closed. |
498 | pub fn downgrade(&self) -> WeakUnboundedSender<T> { |
499 | WeakUnboundedSender { |
500 | chan: self.chan.downgrade(), |
501 | } |
502 | } |
503 | } |
504 | |
505 | impl<T> Clone for WeakUnboundedSender<T> { |
506 | fn clone(&self) -> Self { |
507 | WeakUnboundedSender { |
508 | chan: self.chan.clone(), |
509 | } |
510 | } |
511 | } |
512 | |
513 | impl<T> WeakUnboundedSender<T> { |
514 | /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`]. |
515 | /// This will return `Some` if there are other `Sender` instances alive and |
516 | /// the channel wasn't previously dropped, otherwise `None` is returned. |
517 | pub fn upgrade(&self) -> Option<UnboundedSender<T>> { |
518 | chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new) |
519 | } |
520 | } |
521 | |
522 | impl<T> fmt::Debug for WeakUnboundedSender<T> { |
523 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
524 | fmt.debug_struct("WeakUnboundedSender" ).finish() |
525 | } |
526 | } |
527 | |