1 | use crate::loom::sync::{atomic::AtomicUsize, Arc}; |
2 | use crate::sync::mpsc::chan; |
3 | use crate::sync::mpsc::error::{SendError, TryRecvError}; |
4 | |
5 | use std::fmt; |
6 | use std::task::{Context, Poll}; |
7 | |
8 | /// Send values to the associated `UnboundedReceiver`. |
9 | /// |
10 | /// Instances are created by the [`unbounded_channel`] function. |
11 | pub struct UnboundedSender<T> { |
12 | chan: chan::Tx<T, Semaphore>, |
13 | } |
14 | |
15 | /// An unbounded sender that does not prevent the channel from being closed. |
16 | /// |
17 | /// If all [`UnboundedSender`] instances of a channel were dropped and only |
18 | /// `WeakUnboundedSender` instances remain, the channel is closed. |
19 | /// |
20 | /// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using |
21 | /// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None` |
22 | /// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`. |
23 | /// |
24 | /// [`UnboundedSender`]: UnboundedSender |
25 | /// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade |
26 | /// |
27 | /// # Examples |
28 | /// |
29 | /// ``` |
30 | /// use tokio::sync::mpsc::unbounded_channel; |
31 | /// |
32 | /// #[tokio::main] |
33 | /// async fn main() { |
34 | /// let (tx, _rx) = unbounded_channel::<i32>(); |
35 | /// let tx_weak = tx.downgrade(); |
36 | /// |
37 | /// // Upgrading will succeed because `tx` still exists. |
38 | /// assert!(tx_weak.upgrade().is_some()); |
39 | /// |
40 | /// // If we drop `tx`, then it will fail. |
41 | /// drop(tx); |
42 | /// assert!(tx_weak.clone().upgrade().is_none()); |
43 | /// } |
44 | /// ``` |
45 | pub struct WeakUnboundedSender<T> { |
46 | chan: Arc<chan::Chan<T, Semaphore>>, |
47 | } |
48 | |
49 | impl<T> Clone for UnboundedSender<T> { |
50 | fn clone(&self) -> Self { |
51 | UnboundedSender { |
52 | chan: self.chan.clone(), |
53 | } |
54 | } |
55 | } |
56 | |
57 | impl<T> fmt::Debug for UnboundedSender<T> { |
58 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
59 | fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedSender" ) |
60 | .field(name:"chan" , &self.chan) |
61 | .finish() |
62 | } |
63 | } |
64 | |
65 | /// Receive values from the associated `UnboundedSender`. |
66 | /// |
67 | /// Instances are created by the [`unbounded_channel`] function. |
68 | /// |
69 | /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. |
70 | /// |
71 | /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html |
72 | pub struct UnboundedReceiver<T> { |
73 | /// The channel receiver |
74 | chan: chan::Rx<T, Semaphore>, |
75 | } |
76 | |
77 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
78 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
79 | fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedReceiver" ) |
80 | .field(name:"chan" , &self.chan) |
81 | .finish() |
82 | } |
83 | } |
84 | |
85 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
86 | /// tasks without backpressure. |
87 | /// |
88 | /// A `send` on this channel will always succeed as long as the receive half has |
89 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
90 | /// buffered. |
91 | /// |
92 | /// **Note** that the amount of available system memory is an implicit bound to |
93 | /// the channel. Using an `unbounded` channel has the ability of causing the |
94 | /// process to run out of memory. In this case, the process will be aborted. |
95 | pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
96 | let (tx: Tx, rx: Rx) = chan::channel(Semaphore(AtomicUsize::new(val:0))); |
97 | |
98 | let tx: UnboundedSender = UnboundedSender::new(chan:tx); |
99 | let rx: UnboundedReceiver = UnboundedReceiver::new(chan:rx); |
100 | |
101 | (tx, rx) |
102 | } |
103 | |
104 | /// No capacity |
105 | #[derive (Debug)] |
106 | pub(crate) struct Semaphore(pub(crate) AtomicUsize); |
107 | |
108 | impl<T> UnboundedReceiver<T> { |
109 | pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> { |
110 | UnboundedReceiver { chan } |
111 | } |
112 | |
113 | /// Receives the next value for this receiver. |
114 | /// |
115 | /// This method returns `None` if the channel has been closed and there are |
116 | /// no remaining messages in the channel's buffer. This indicates that no |
117 | /// further values can ever be received from this `Receiver`. The channel is |
118 | /// closed when all senders have been dropped, or when [`close`] is called. |
119 | /// |
120 | /// If there are no messages in the channel's buffer, but the channel has |
121 | /// not yet been closed, this method will sleep until a message is sent or |
122 | /// the channel is closed. |
123 | /// |
124 | /// # Cancel safety |
125 | /// |
126 | /// This method is cancel safe. If `recv` is used as the event in a |
127 | /// [`tokio::select!`](crate::select) statement and some other branch |
128 | /// completes first, it is guaranteed that no messages were received on this |
129 | /// channel. |
130 | /// |
131 | /// [`close`]: Self::close |
132 | /// |
133 | /// # Examples |
134 | /// |
135 | /// ``` |
136 | /// use tokio::sync::mpsc; |
137 | /// |
138 | /// #[tokio::main] |
139 | /// async fn main() { |
140 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
141 | /// |
142 | /// tokio::spawn(async move { |
143 | /// tx.send("hello" ).unwrap(); |
144 | /// }); |
145 | /// |
146 | /// assert_eq!(Some("hello" ), rx.recv().await); |
147 | /// assert_eq!(None, rx.recv().await); |
148 | /// } |
149 | /// ``` |
150 | /// |
151 | /// Values are buffered: |
152 | /// |
153 | /// ``` |
154 | /// use tokio::sync::mpsc; |
155 | /// |
156 | /// #[tokio::main] |
157 | /// async fn main() { |
158 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
159 | /// |
160 | /// tx.send("hello" ).unwrap(); |
161 | /// tx.send("world" ).unwrap(); |
162 | /// |
163 | /// assert_eq!(Some("hello" ), rx.recv().await); |
164 | /// assert_eq!(Some("world" ), rx.recv().await); |
165 | /// } |
166 | /// ``` |
167 | pub async fn recv(&mut self) -> Option<T> { |
168 | use std::future::poll_fn; |
169 | |
170 | poll_fn(|cx| self.poll_recv(cx)).await |
171 | } |
172 | |
173 | /// Receives the next values for this receiver and extends `buffer`. |
174 | /// |
175 | /// This method extends `buffer` by no more than a fixed number of values |
176 | /// as specified by `limit`. If `limit` is zero, the function returns |
177 | /// immediately with `0`. The return value is the number of values added to |
178 | /// `buffer`. |
179 | /// |
180 | /// For `limit > 0`, if there are no messages in the channel's queue, |
181 | /// but the channel has not yet been closed, this method will sleep |
182 | /// until a message is sent or the channel is closed. |
183 | /// |
184 | /// For non-zero values of `limit`, this method will never return `0` unless |
185 | /// the channel has been closed and there are no remaining messages in the |
186 | /// channel's queue. This indicates that no further values can ever be |
187 | /// received from this `Receiver`. The channel is closed when all senders |
188 | /// have been dropped, or when [`close`] is called. |
189 | /// |
190 | /// The capacity of `buffer` is increased as needed. |
191 | /// |
192 | /// # Cancel safety |
193 | /// |
194 | /// This method is cancel safe. If `recv_many` is used as the event in a |
195 | /// [`tokio::select!`](crate::select) statement and some other branch |
196 | /// completes first, it is guaranteed that no messages were received on this |
197 | /// channel. |
198 | /// |
199 | /// [`close`]: Self::close |
200 | /// |
201 | /// # Examples |
202 | /// |
203 | /// ``` |
204 | /// use tokio::sync::mpsc; |
205 | /// |
206 | /// #[tokio::main] |
207 | /// async fn main() { |
208 | /// let mut buffer: Vec<&str> = Vec::with_capacity(2); |
209 | /// let limit = 2; |
210 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
211 | /// let tx2 = tx.clone(); |
212 | /// tx2.send("first" ).unwrap(); |
213 | /// tx2.send("second" ).unwrap(); |
214 | /// tx2.send("third" ).unwrap(); |
215 | /// |
216 | /// // Call `recv_many` to receive up to `limit` (2) values. |
217 | /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await); |
218 | /// assert_eq!(vec!["first" , "second" ], buffer); |
219 | /// |
220 | /// // If the buffer is full, the next call to `recv_many` |
221 | /// // reserves additional capacity. |
222 | /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
223 | /// |
224 | /// tokio::spawn(async move { |
225 | /// tx.send("fourth" ).unwrap(); |
226 | /// }); |
227 | /// |
228 | /// // 'tx' is dropped, but `recv_many` |
229 | /// // is guaranteed not to return 0 as the channel |
230 | /// // is not yet closed. |
231 | /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
232 | /// assert_eq!(vec!["first" , "second" , "third" , "fourth" ], buffer); |
233 | /// |
234 | /// // Once the last sender is dropped, the channel is |
235 | /// // closed and `recv_many` returns 0, capacity unchanged. |
236 | /// drop(tx2); |
237 | /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await); |
238 | /// assert_eq!(vec!["first" , "second" , "third" , "fourth" ], buffer); |
239 | /// } |
240 | /// ``` |
241 | pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize { |
242 | use std::future::poll_fn; |
243 | poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await |
244 | } |
245 | |
246 | /// Tries to receive the next value for this receiver. |
247 | /// |
248 | /// This method returns the [`Empty`] error if the channel is currently |
249 | /// empty, but there are still outstanding [senders] or [permits]. |
250 | /// |
251 | /// This method returns the [`Disconnected`] error if the channel is |
252 | /// currently empty, and there are no outstanding [senders] or [permits]. |
253 | /// |
254 | /// Unlike the [`poll_recv`] method, this method will never return an |
255 | /// [`Empty`] error spuriously. |
256 | /// |
257 | /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty |
258 | /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected |
259 | /// [`poll_recv`]: Self::poll_recv |
260 | /// [senders]: crate::sync::mpsc::Sender |
261 | /// [permits]: crate::sync::mpsc::Permit |
262 | /// |
263 | /// # Examples |
264 | /// |
265 | /// ``` |
266 | /// use tokio::sync::mpsc; |
267 | /// use tokio::sync::mpsc::error::TryRecvError; |
268 | /// |
269 | /// #[tokio::main] |
270 | /// async fn main() { |
271 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
272 | /// |
273 | /// tx.send("hello" ).unwrap(); |
274 | /// |
275 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
276 | /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
277 | /// |
278 | /// tx.send("hello" ).unwrap(); |
279 | /// // Drop the last sender, closing the channel. |
280 | /// drop(tx); |
281 | /// |
282 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
283 | /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
284 | /// } |
285 | /// ``` |
286 | pub fn try_recv(&mut self) -> Result<T, TryRecvError> { |
287 | self.chan.try_recv() |
288 | } |
289 | |
290 | /// Blocking receive to call outside of asynchronous contexts. |
291 | /// |
292 | /// # Panics |
293 | /// |
294 | /// This function panics if called within an asynchronous execution |
295 | /// context. |
296 | /// |
297 | /// # Examples |
298 | /// |
299 | /// ``` |
300 | /// use std::thread; |
301 | /// use tokio::sync::mpsc; |
302 | /// |
303 | /// #[tokio::main] |
304 | /// async fn main() { |
305 | /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>(); |
306 | /// |
307 | /// let sync_code = thread::spawn(move || { |
308 | /// assert_eq!(Some(10), rx.blocking_recv()); |
309 | /// }); |
310 | /// |
311 | /// let _ = tx.send(10); |
312 | /// sync_code.join().unwrap(); |
313 | /// } |
314 | /// ``` |
315 | #[track_caller ] |
316 | #[cfg (feature = "sync" )] |
317 | #[cfg_attr (docsrs, doc(alias = "recv_blocking" ))] |
318 | pub fn blocking_recv(&mut self) -> Option<T> { |
319 | crate::future::block_on(self.recv()) |
320 | } |
321 | |
322 | /// Variant of [`Self::recv_many`] for blocking contexts. |
323 | /// |
324 | /// The same conditions as in [`Self::blocking_recv`] apply. |
325 | #[track_caller ] |
326 | #[cfg (feature = "sync" )] |
327 | #[cfg_attr (docsrs, doc(alias = "recv_many_blocking" ))] |
328 | pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize { |
329 | crate::future::block_on(self.recv_many(buffer, limit)) |
330 | } |
331 | |
332 | /// Closes the receiving half of a channel, without dropping it. |
333 | /// |
334 | /// This prevents any further messages from being sent on the channel while |
335 | /// still enabling the receiver to drain messages that are buffered. |
336 | /// |
337 | /// To guarantee that no messages are dropped, after calling `close()`, |
338 | /// `recv()` must be called until `None` is returned. |
339 | pub fn close(&mut self) { |
340 | self.chan.close(); |
341 | } |
342 | |
343 | /// Checks if a channel is closed. |
344 | /// |
345 | /// This method returns `true` if the channel has been closed. The channel is closed |
346 | /// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called. |
347 | /// |
348 | /// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender |
349 | /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close |
350 | /// |
351 | /// # Examples |
352 | /// ``` |
353 | /// use tokio::sync::mpsc; |
354 | /// |
355 | /// #[tokio::main] |
356 | /// async fn main() { |
357 | /// let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); |
358 | /// assert!(!rx.is_closed()); |
359 | /// |
360 | /// rx.close(); |
361 | /// |
362 | /// assert!(rx.is_closed()); |
363 | /// } |
364 | /// ``` |
365 | pub fn is_closed(&self) -> bool { |
366 | self.chan.is_closed() |
367 | } |
368 | |
369 | /// Checks if a channel is empty. |
370 | /// |
371 | /// This method returns `true` if the channel has no messages. |
372 | /// |
373 | /// # Examples |
374 | /// ``` |
375 | /// use tokio::sync::mpsc; |
376 | /// |
377 | /// #[tokio::main] |
378 | /// async fn main() { |
379 | /// let (tx, rx) = mpsc::unbounded_channel(); |
380 | /// assert!(rx.is_empty()); |
381 | /// |
382 | /// tx.send(0).unwrap(); |
383 | /// assert!(!rx.is_empty()); |
384 | /// } |
385 | /// |
386 | /// ``` |
387 | pub fn is_empty(&self) -> bool { |
388 | self.chan.is_empty() |
389 | } |
390 | |
391 | /// Returns the number of messages in the channel. |
392 | /// |
393 | /// # Examples |
394 | /// ``` |
395 | /// use tokio::sync::mpsc; |
396 | /// |
397 | /// #[tokio::main] |
398 | /// async fn main() { |
399 | /// let (tx, rx) = mpsc::unbounded_channel(); |
400 | /// assert_eq!(0, rx.len()); |
401 | /// |
402 | /// tx.send(0).unwrap(); |
403 | /// assert_eq!(1, rx.len()); |
404 | /// } |
405 | /// ``` |
406 | pub fn len(&self) -> usize { |
407 | self.chan.len() |
408 | } |
409 | |
410 | /// Polls to receive the next message on this channel. |
411 | /// |
412 | /// This method returns: |
413 | /// |
414 | /// * `Poll::Pending` if no messages are available but the channel is not |
415 | /// closed, or if a spurious failure happens. |
416 | /// * `Poll::Ready(Some(message))` if a message is available. |
417 | /// * `Poll::Ready(None)` if the channel has been closed and all messages |
418 | /// sent before it was closed have been received. |
419 | /// |
420 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
421 | /// `Context` is scheduled to receive a wakeup when a message is sent on any |
422 | /// receiver, or when the channel is closed. Note that on multiple calls to |
423 | /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` |
424 | /// passed to the most recent call is scheduled to receive a wakeup. |
425 | /// |
426 | /// If this method returns `Poll::Pending` due to a spurious failure, then |
427 | /// the `Waker` will be notified when the situation causing the spurious |
428 | /// failure has been resolved. Note that receiving such a wakeup does not |
429 | /// guarantee that the next call will succeed — it could fail with another |
430 | /// spurious failure. |
431 | pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
432 | self.chan.recv(cx) |
433 | } |
434 | |
435 | /// Polls to receive multiple messages on this channel, extending the provided buffer. |
436 | /// |
437 | /// This method returns: |
438 | /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a |
439 | /// spurious failure happens. |
440 | /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and |
441 | /// stored in `buffer`. This can be less than, or equal to, `limit`. |
442 | /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed. |
443 | /// |
444 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
445 | /// `Context` is scheduled to receive a wakeup when a message is sent on any |
446 | /// receiver, or when the channel is closed. Note that on multiple calls to |
447 | /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` |
448 | /// passed to the most recent call is scheduled to receive a wakeup. |
449 | /// |
450 | /// Note that this method does not guarantee that exactly `limit` messages |
451 | /// are received. Rather, if at least one message is available, it returns |
452 | /// as many messages as it can up to the given limit. This method returns |
453 | /// zero only if the channel is closed (or if `limit` is zero). |
454 | /// |
455 | /// # Examples |
456 | /// |
457 | /// ``` |
458 | /// use std::task::{Context, Poll}; |
459 | /// use std::pin::Pin; |
460 | /// use tokio::sync::mpsc; |
461 | /// use futures::Future; |
462 | /// |
463 | /// struct MyReceiverFuture<'a> { |
464 | /// receiver: mpsc::UnboundedReceiver<i32>, |
465 | /// buffer: &'a mut Vec<i32>, |
466 | /// limit: usize, |
467 | /// } |
468 | /// |
469 | /// impl<'a> Future for MyReceiverFuture<'a> { |
470 | /// type Output = usize; // Number of messages received |
471 | /// |
472 | /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
473 | /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self; |
474 | /// |
475 | /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied |
476 | /// match receiver.poll_recv_many(cx, *buffer, *limit) { |
477 | /// Poll::Pending => Poll::Pending, |
478 | /// Poll::Ready(count) => Poll::Ready(count), |
479 | /// } |
480 | /// } |
481 | /// } |
482 | /// |
483 | /// #[tokio::main] |
484 | /// async fn main() { |
485 | /// let (tx, rx) = mpsc::unbounded_channel::<i32>(); |
486 | /// let mut buffer = Vec::new(); |
487 | /// |
488 | /// let my_receiver_future = MyReceiverFuture { |
489 | /// receiver: rx, |
490 | /// buffer: &mut buffer, |
491 | /// limit: 3, |
492 | /// }; |
493 | /// |
494 | /// for i in 0..10 { |
495 | /// tx.send(i).expect("Unable to send integer" ); |
496 | /// } |
497 | /// |
498 | /// let count = my_receiver_future.await; |
499 | /// assert_eq!(count, 3); |
500 | /// assert_eq!(buffer, vec![0,1,2]) |
501 | /// } |
502 | /// ``` |
503 | pub fn poll_recv_many( |
504 | &mut self, |
505 | cx: &mut Context<'_>, |
506 | buffer: &mut Vec<T>, |
507 | limit: usize, |
508 | ) -> Poll<usize> { |
509 | self.chan.recv_many(cx, buffer, limit) |
510 | } |
511 | |
512 | /// Returns the number of [`UnboundedSender`] handles. |
513 | pub fn sender_strong_count(&self) -> usize { |
514 | self.chan.sender_strong_count() |
515 | } |
516 | |
517 | /// Returns the number of [`WeakUnboundedSender`] handles. |
518 | pub fn sender_weak_count(&self) -> usize { |
519 | self.chan.sender_weak_count() |
520 | } |
521 | } |
522 | |
523 | impl<T> UnboundedSender<T> { |
524 | pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> { |
525 | UnboundedSender { chan } |
526 | } |
527 | |
528 | /// Attempts to send a message on this `UnboundedSender` without blocking. |
529 | /// |
530 | /// This method is not marked async because sending a message to an unbounded channel |
531 | /// never requires any form of waiting. Because of this, the `send` method can be |
532 | /// used in both synchronous and asynchronous code without problems. |
533 | /// |
534 | /// If the receive half of the channel is closed, either due to [`close`] |
535 | /// being called or the [`UnboundedReceiver`] having been dropped, this |
536 | /// function returns an error. The error includes the value passed to `send`. |
537 | /// |
538 | /// [`close`]: UnboundedReceiver::close |
539 | /// [`UnboundedReceiver`]: UnboundedReceiver |
540 | pub fn send(&self, message: T) -> Result<(), SendError<T>> { |
541 | if !self.inc_num_messages() { |
542 | return Err(SendError(message)); |
543 | } |
544 | |
545 | self.chan.send(message); |
546 | Ok(()) |
547 | } |
548 | |
549 | fn inc_num_messages(&self) -> bool { |
550 | use std::process; |
551 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
552 | |
553 | let mut curr = self.chan.semaphore().0.load(Acquire); |
554 | |
555 | loop { |
556 | if curr & 1 == 1 { |
557 | return false; |
558 | } |
559 | |
560 | if curr == usize::MAX ^ 1 { |
561 | // Overflowed the ref count. There is no safe way to recover, so |
562 | // abort the process. In practice, this should never happen. |
563 | process::abort() |
564 | } |
565 | |
566 | match self |
567 | .chan |
568 | .semaphore() |
569 | .0 |
570 | .compare_exchange(curr, curr + 2, AcqRel, Acquire) |
571 | { |
572 | Ok(_) => return true, |
573 | Err(actual) => { |
574 | curr = actual; |
575 | } |
576 | } |
577 | } |
578 | } |
579 | |
580 | /// Completes when the receiver has dropped. |
581 | /// |
582 | /// This allows the producers to get notified when interest in the produced |
583 | /// values is canceled and immediately stop doing work. |
584 | /// |
585 | /// # Cancel safety |
586 | /// |
587 | /// This method is cancel safe. Once the channel is closed, it stays closed |
588 | /// forever and all future calls to `closed` will return immediately. |
589 | /// |
590 | /// # Examples |
591 | /// |
592 | /// ``` |
593 | /// use tokio::sync::mpsc; |
594 | /// |
595 | /// #[tokio::main] |
596 | /// async fn main() { |
597 | /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); |
598 | /// let tx2 = tx1.clone(); |
599 | /// let tx3 = tx1.clone(); |
600 | /// let tx4 = tx1.clone(); |
601 | /// let tx5 = tx1.clone(); |
602 | /// tokio::spawn(async move { |
603 | /// drop(rx); |
604 | /// }); |
605 | /// |
606 | /// futures::join!( |
607 | /// tx1.closed(), |
608 | /// tx2.closed(), |
609 | /// tx3.closed(), |
610 | /// tx4.closed(), |
611 | /// tx5.closed() |
612 | /// ); |
613 | //// println!("Receiver dropped"); |
614 | /// } |
615 | /// ``` |
616 | pub async fn closed(&self) { |
617 | self.chan.closed().await; |
618 | } |
619 | |
620 | /// Checks if the channel has been closed. This happens when the |
621 | /// [`UnboundedReceiver`] is dropped, or when the |
622 | /// [`UnboundedReceiver::close`] method is called. |
623 | /// |
624 | /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver |
625 | /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close |
626 | /// |
627 | /// ``` |
628 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
629 | /// assert!(!tx.is_closed()); |
630 | /// |
631 | /// let tx2 = tx.clone(); |
632 | /// assert!(!tx2.is_closed()); |
633 | /// |
634 | /// drop(rx); |
635 | /// assert!(tx.is_closed()); |
636 | /// assert!(tx2.is_closed()); |
637 | /// ``` |
638 | pub fn is_closed(&self) -> bool { |
639 | self.chan.is_closed() |
640 | } |
641 | |
642 | /// Returns `true` if senders belong to the same channel. |
643 | /// |
644 | /// # Examples |
645 | /// |
646 | /// ``` |
647 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
648 | /// let tx2 = tx.clone(); |
649 | /// assert!(tx.same_channel(&tx2)); |
650 | /// |
651 | /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>(); |
652 | /// assert!(!tx3.same_channel(&tx2)); |
653 | /// ``` |
654 | pub fn same_channel(&self, other: &Self) -> bool { |
655 | self.chan.same_channel(&other.chan) |
656 | } |
657 | |
658 | /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count |
659 | /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the |
660 | /// channel were dropped and only `WeakUnboundedSender` instances remain, |
661 | /// the channel is closed. |
662 | #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender." ] |
663 | pub fn downgrade(&self) -> WeakUnboundedSender<T> { |
664 | WeakUnboundedSender { |
665 | chan: self.chan.downgrade(), |
666 | } |
667 | } |
668 | |
669 | /// Returns the number of [`UnboundedSender`] handles. |
670 | pub fn strong_count(&self) -> usize { |
671 | self.chan.strong_count() |
672 | } |
673 | |
674 | /// Returns the number of [`WeakUnboundedSender`] handles. |
675 | pub fn weak_count(&self) -> usize { |
676 | self.chan.weak_count() |
677 | } |
678 | } |
679 | |
680 | impl<T> Clone for WeakUnboundedSender<T> { |
681 | fn clone(&self) -> Self { |
682 | self.chan.increment_weak_count(); |
683 | |
684 | WeakUnboundedSender { |
685 | chan: self.chan.clone(), |
686 | } |
687 | } |
688 | } |
689 | |
690 | impl<T> Drop for WeakUnboundedSender<T> { |
691 | fn drop(&mut self) { |
692 | self.chan.decrement_weak_count(); |
693 | } |
694 | } |
695 | |
696 | impl<T> WeakUnboundedSender<T> { |
697 | /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`]. |
698 | /// This will return `Some` if there are other `Sender` instances alive and |
699 | /// the channel wasn't previously dropped, otherwise `None` is returned. |
700 | pub fn upgrade(&self) -> Option<UnboundedSender<T>> { |
701 | chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new) |
702 | } |
703 | |
704 | /// Returns the number of [`UnboundedSender`] handles. |
705 | pub fn strong_count(&self) -> usize { |
706 | self.chan.strong_count() |
707 | } |
708 | |
709 | /// Returns the number of [`WeakUnboundedSender`] handles. |
710 | pub fn weak_count(&self) -> usize { |
711 | self.chan.weak_count() |
712 | } |
713 | } |
714 | |
715 | impl<T> fmt::Debug for WeakUnboundedSender<T> { |
716 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
717 | fmt.debug_struct(name:"WeakUnboundedSender" ).finish() |
718 | } |
719 | } |
720 | |