1 | use crate::loom::sync::{atomic::AtomicUsize, Arc}; |
2 | use crate::sync::mpsc::chan; |
3 | use crate::sync::mpsc::error::{SendError, TryRecvError}; |
4 | |
5 | use std::fmt; |
6 | use std::task::{Context, Poll}; |
7 | |
8 | /// Send values to the associated `UnboundedReceiver`. |
9 | /// |
10 | /// Instances are created by the [`unbounded_channel`] function. |
11 | pub struct UnboundedSender<T> { |
12 | chan: chan::Tx<T, Semaphore>, |
13 | } |
14 | |
15 | /// An unbounded sender that does not prevent the channel from being closed. |
16 | /// |
17 | /// If all [`UnboundedSender`] instances of a channel were dropped and only |
18 | /// `WeakUnboundedSender` instances remain, the channel is closed. |
19 | /// |
20 | /// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using |
21 | /// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None` |
22 | /// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`. |
23 | /// |
24 | /// [`UnboundedSender`]: UnboundedSender |
25 | /// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade |
26 | /// |
27 | /// # Examples |
28 | /// |
29 | /// ``` |
30 | /// use tokio::sync::mpsc::unbounded_channel; |
31 | /// |
32 | /// #[tokio::main] |
33 | /// async fn main() { |
34 | /// let (tx, _rx) = unbounded_channel::<i32>(); |
35 | /// let tx_weak = tx.downgrade(); |
36 | /// |
37 | /// // Upgrading will succeed because `tx` still exists. |
38 | /// assert!(tx_weak.upgrade().is_some()); |
39 | /// |
40 | /// // If we drop `tx`, then it will fail. |
41 | /// drop(tx); |
42 | /// assert!(tx_weak.clone().upgrade().is_none()); |
43 | /// } |
44 | /// ``` |
45 | pub struct WeakUnboundedSender<T> { |
46 | chan: Arc<chan::Chan<T, Semaphore>>, |
47 | } |
48 | |
49 | impl<T> Clone for UnboundedSender<T> { |
50 | fn clone(&self) -> Self { |
51 | UnboundedSender { |
52 | chan: self.chan.clone(), |
53 | } |
54 | } |
55 | } |
56 | |
57 | impl<T> fmt::Debug for UnboundedSender<T> { |
58 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
59 | fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedSender" ) |
60 | .field(name:"chan" , &self.chan) |
61 | .finish() |
62 | } |
63 | } |
64 | |
65 | /// Receive values from the associated `UnboundedSender`. |
66 | /// |
67 | /// Instances are created by the [`unbounded_channel`] function. |
68 | /// |
69 | /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. |
70 | /// |
71 | /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html |
72 | pub struct UnboundedReceiver<T> { |
73 | /// The channel receiver |
74 | chan: chan::Rx<T, Semaphore>, |
75 | } |
76 | |
77 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
78 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
79 | fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedReceiver" ) |
80 | .field(name:"chan" , &self.chan) |
81 | .finish() |
82 | } |
83 | } |
84 | |
85 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
86 | /// tasks without backpressure. |
87 | /// |
88 | /// A `send` on this channel will always succeed as long as the receive half has |
89 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
90 | /// buffered. |
91 | /// |
92 | /// **Note** that the amount of available system memory is an implicit bound to |
93 | /// the channel. Using an `unbounded` channel has the ability of causing the |
94 | /// process to run out of memory. In this case, the process will be aborted. |
95 | pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
96 | let (tx: Tx, rx: Rx) = chan::channel(Semaphore(AtomicUsize::new(val:0))); |
97 | |
98 | let tx: UnboundedSender = UnboundedSender::new(chan:tx); |
99 | let rx: UnboundedReceiver = UnboundedReceiver::new(chan:rx); |
100 | |
101 | (tx, rx) |
102 | } |
103 | |
104 | /// No capacity |
105 | #[derive (Debug)] |
106 | pub(crate) struct Semaphore(pub(crate) AtomicUsize); |
107 | |
108 | impl<T> UnboundedReceiver<T> { |
109 | pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> { |
110 | UnboundedReceiver { chan } |
111 | } |
112 | |
113 | /// Receives the next value for this receiver. |
114 | /// |
115 | /// This method returns `None` if the channel has been closed and there are |
116 | /// no remaining messages in the channel's buffer. This indicates that no |
117 | /// further values can ever be received from this `Receiver`. The channel is |
118 | /// closed when all senders have been dropped, or when [`close`] is called. |
119 | /// |
120 | /// If there are no messages in the channel's buffer, but the channel has |
121 | /// not yet been closed, this method will sleep until a message is sent or |
122 | /// the channel is closed. |
123 | /// |
124 | /// # Cancel safety |
125 | /// |
126 | /// This method is cancel safe. If `recv` is used as the event in a |
127 | /// [`tokio::select!`](crate::select) statement and some other branch |
128 | /// completes first, it is guaranteed that no messages were received on this |
129 | /// channel. |
130 | /// |
131 | /// [`close`]: Self::close |
132 | /// |
133 | /// # Examples |
134 | /// |
135 | /// ``` |
136 | /// use tokio::sync::mpsc; |
137 | /// |
138 | /// #[tokio::main] |
139 | /// async fn main() { |
140 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
141 | /// |
142 | /// tokio::spawn(async move { |
143 | /// tx.send("hello" ).unwrap(); |
144 | /// }); |
145 | /// |
146 | /// assert_eq!(Some("hello" ), rx.recv().await); |
147 | /// assert_eq!(None, rx.recv().await); |
148 | /// } |
149 | /// ``` |
150 | /// |
151 | /// Values are buffered: |
152 | /// |
153 | /// ``` |
154 | /// use tokio::sync::mpsc; |
155 | /// |
156 | /// #[tokio::main] |
157 | /// async fn main() { |
158 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
159 | /// |
160 | /// tx.send("hello" ).unwrap(); |
161 | /// tx.send("world" ).unwrap(); |
162 | /// |
163 | /// assert_eq!(Some("hello" ), rx.recv().await); |
164 | /// assert_eq!(Some("world" ), rx.recv().await); |
165 | /// } |
166 | /// ``` |
167 | pub async fn recv(&mut self) -> Option<T> { |
168 | use crate::future::poll_fn; |
169 | |
170 | poll_fn(|cx| self.poll_recv(cx)).await |
171 | } |
172 | |
173 | /// Receives the next values for this receiver and extends `buffer`. |
174 | /// |
175 | /// This method extends `buffer` by no more than a fixed number of values |
176 | /// as specified by `limit`. If `limit` is zero, the function returns |
177 | /// immediately with `0`. The return value is the number of values added to |
178 | /// `buffer`. |
179 | /// |
180 | /// For `limit > 0`, if there are no messages in the channel's queue, |
181 | /// but the channel has not yet been closed, this method will sleep |
182 | /// until a message is sent or the channel is closed. |
183 | /// |
184 | /// For non-zero values of `limit`, this method will never return `0` unless |
185 | /// the channel has been closed and there are no remaining messages in the |
186 | /// channel's queue. This indicates that no further values can ever be |
187 | /// received from this `Receiver`. The channel is closed when all senders |
188 | /// have been dropped, or when [`close`] is called. |
189 | /// |
190 | /// The capacity of `buffer` is increased as needed. |
191 | /// |
192 | /// # Cancel safety |
193 | /// |
194 | /// This method is cancel safe. If `recv_many` is used as the event in a |
195 | /// [`tokio::select!`](crate::select) statement and some other branch |
196 | /// completes first, it is guaranteed that no messages were received on this |
197 | /// channel. |
198 | /// |
199 | /// [`close`]: Self::close |
200 | /// |
201 | /// # Examples |
202 | /// |
203 | /// ``` |
204 | /// use tokio::sync::mpsc; |
205 | /// |
206 | /// #[tokio::main] |
207 | /// async fn main() { |
208 | /// let mut buffer: Vec<&str> = Vec::with_capacity(2); |
209 | /// let limit = 2; |
210 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
211 | /// let tx2 = tx.clone(); |
212 | /// tx2.send("first" ).unwrap(); |
213 | /// tx2.send("second" ).unwrap(); |
214 | /// tx2.send("third" ).unwrap(); |
215 | /// |
216 | /// // Call `recv_many` to receive up to `limit` (2) values. |
217 | /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await); |
218 | /// assert_eq!(vec!["first" , "second" ], buffer); |
219 | /// |
220 | /// // If the buffer is full, the next call to `recv_many` |
221 | /// // reserves additional capacity. |
222 | /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
223 | /// |
224 | /// tokio::spawn(async move { |
225 | /// tx.send("fourth" ).unwrap(); |
226 | /// }); |
227 | /// |
228 | /// // 'tx' is dropped, but `recv_many` |
229 | /// // is guaranteed not to return 0 as the channel |
230 | /// // is not yet closed. |
231 | /// assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
232 | /// assert_eq!(vec!["first" , "second" , "third" , "fourth" ], buffer); |
233 | /// |
234 | /// // Once the last sender is dropped, the channel is |
235 | /// // closed and `recv_many` returns 0, capacity unchanged. |
236 | /// drop(tx2); |
237 | /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await); |
238 | /// assert_eq!(vec!["first" , "second" , "third" , "fourth" ], buffer); |
239 | /// } |
240 | /// ``` |
241 | pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize { |
242 | use crate::future::poll_fn; |
243 | poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await |
244 | } |
245 | |
246 | /// Tries to receive the next value for this receiver. |
247 | /// |
248 | /// This method returns the [`Empty`] error if the channel is currently |
249 | /// empty, but there are still outstanding [senders] or [permits]. |
250 | /// |
251 | /// This method returns the [`Disconnected`] error if the channel is |
252 | /// currently empty, and there are no outstanding [senders] or [permits]. |
253 | /// |
254 | /// Unlike the [`poll_recv`] method, this method will never return an |
255 | /// [`Empty`] error spuriously. |
256 | /// |
257 | /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty |
258 | /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected |
259 | /// [`poll_recv`]: Self::poll_recv |
260 | /// [senders]: crate::sync::mpsc::Sender |
261 | /// [permits]: crate::sync::mpsc::Permit |
262 | /// |
263 | /// # Examples |
264 | /// |
265 | /// ``` |
266 | /// use tokio::sync::mpsc; |
267 | /// use tokio::sync::mpsc::error::TryRecvError; |
268 | /// |
269 | /// #[tokio::main] |
270 | /// async fn main() { |
271 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
272 | /// |
273 | /// tx.send("hello" ).unwrap(); |
274 | /// |
275 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
276 | /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
277 | /// |
278 | /// tx.send("hello" ).unwrap(); |
279 | /// // Drop the last sender, closing the channel. |
280 | /// drop(tx); |
281 | /// |
282 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
283 | /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
284 | /// } |
285 | /// ``` |
286 | pub fn try_recv(&mut self) -> Result<T, TryRecvError> { |
287 | self.chan.try_recv() |
288 | } |
289 | |
290 | /// Blocking receive to call outside of asynchronous contexts. |
291 | /// |
292 | /// # Panics |
293 | /// |
294 | /// This function panics if called within an asynchronous execution |
295 | /// context. |
296 | /// |
297 | /// # Examples |
298 | /// |
299 | /// ``` |
300 | /// use std::thread; |
301 | /// use tokio::sync::mpsc; |
302 | /// |
303 | /// #[tokio::main] |
304 | /// async fn main() { |
305 | /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>(); |
306 | /// |
307 | /// let sync_code = thread::spawn(move || { |
308 | /// assert_eq!(Some(10), rx.blocking_recv()); |
309 | /// }); |
310 | /// |
311 | /// let _ = tx.send(10); |
312 | /// sync_code.join().unwrap(); |
313 | /// } |
314 | /// ``` |
315 | #[track_caller ] |
316 | #[cfg (feature = "sync" )] |
317 | #[cfg_attr (docsrs, doc(alias = "recv_blocking" ))] |
318 | pub fn blocking_recv(&mut self) -> Option<T> { |
319 | crate::future::block_on(self.recv()) |
320 | } |
321 | |
322 | /// Closes the receiving half of a channel, without dropping it. |
323 | /// |
324 | /// This prevents any further messages from being sent on the channel while |
325 | /// still enabling the receiver to drain messages that are buffered. |
326 | /// |
327 | /// To guarantee that no messages are dropped, after calling `close()`, |
328 | /// `recv()` must be called until `None` is returned. |
329 | pub fn close(&mut self) { |
330 | self.chan.close(); |
331 | } |
332 | |
333 | /// Polls to receive the next message on this channel. |
334 | /// |
335 | /// This method returns: |
336 | /// |
337 | /// * `Poll::Pending` if no messages are available but the channel is not |
338 | /// closed, or if a spurious failure happens. |
339 | /// * `Poll::Ready(Some(message))` if a message is available. |
340 | /// * `Poll::Ready(None)` if the channel has been closed and all messages |
341 | /// sent before it was closed have been received. |
342 | /// |
343 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
344 | /// `Context` is scheduled to receive a wakeup when a message is sent on any |
345 | /// receiver, or when the channel is closed. Note that on multiple calls to |
346 | /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` |
347 | /// passed to the most recent call is scheduled to receive a wakeup. |
348 | /// |
349 | /// If this method returns `Poll::Pending` due to a spurious failure, then |
350 | /// the `Waker` will be notified when the situation causing the spurious |
351 | /// failure has been resolved. Note that receiving such a wakeup does not |
352 | /// guarantee that the next call will succeed — it could fail with another |
353 | /// spurious failure. |
354 | pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
355 | self.chan.recv(cx) |
356 | } |
357 | |
358 | /// Polls to receive multiple messages on this channel, extending the provided buffer. |
359 | /// |
360 | /// This method returns: |
361 | /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a |
362 | /// spurious failure happens. |
363 | /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and |
364 | /// stored in `buffer`. This can be less than, or equal to, `limit`. |
365 | /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed. |
366 | /// |
367 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
368 | /// `Context` is scheduled to receive a wakeup when a message is sent on any |
369 | /// receiver, or when the channel is closed. Note that on multiple calls to |
370 | /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context` |
371 | /// passed to the most recent call is scheduled to receive a wakeup. |
372 | /// |
373 | /// Note that this method does not guarantee that exactly `limit` messages |
374 | /// are received. Rather, if at least one message is available, it returns |
375 | /// as many messages as it can up to the given limit. This method returns |
376 | /// zero only if the channel is closed (or if `limit` is zero). |
377 | /// |
378 | /// # Examples |
379 | /// |
380 | /// ``` |
381 | /// use std::task::{Context, Poll}; |
382 | /// use std::pin::Pin; |
383 | /// use tokio::sync::mpsc; |
384 | /// use futures::Future; |
385 | /// |
386 | /// struct MyReceiverFuture<'a> { |
387 | /// receiver: mpsc::UnboundedReceiver<i32>, |
388 | /// buffer: &'a mut Vec<i32>, |
389 | /// limit: usize, |
390 | /// } |
391 | /// |
392 | /// impl<'a> Future for MyReceiverFuture<'a> { |
393 | /// type Output = usize; // Number of messages received |
394 | /// |
395 | /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
396 | /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self; |
397 | /// |
398 | /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied |
399 | /// match receiver.poll_recv_many(cx, *buffer, *limit) { |
400 | /// Poll::Pending => Poll::Pending, |
401 | /// Poll::Ready(count) => Poll::Ready(count), |
402 | /// } |
403 | /// } |
404 | /// } |
405 | /// |
406 | /// #[tokio::main] |
407 | /// async fn main() { |
408 | /// let (tx, rx) = mpsc::unbounded_channel::<i32>(); |
409 | /// let mut buffer = Vec::new(); |
410 | /// |
411 | /// let my_receiver_future = MyReceiverFuture { |
412 | /// receiver: rx, |
413 | /// buffer: &mut buffer, |
414 | /// limit: 3, |
415 | /// }; |
416 | /// |
417 | /// for i in 0..10 { |
418 | /// tx.send(i).expect("Unable to send integer" ); |
419 | /// } |
420 | /// |
421 | /// let count = my_receiver_future.await; |
422 | /// assert_eq!(count, 3); |
423 | /// assert_eq!(buffer, vec![0,1,2]) |
424 | /// } |
425 | /// ``` |
426 | pub fn poll_recv_many( |
427 | &mut self, |
428 | cx: &mut Context<'_>, |
429 | buffer: &mut Vec<T>, |
430 | limit: usize, |
431 | ) -> Poll<usize> { |
432 | self.chan.recv_many(cx, buffer, limit) |
433 | } |
434 | } |
435 | |
436 | impl<T> UnboundedSender<T> { |
437 | pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> { |
438 | UnboundedSender { chan } |
439 | } |
440 | |
441 | /// Attempts to send a message on this `UnboundedSender` without blocking. |
442 | /// |
443 | /// This method is not marked async because sending a message to an unbounded channel |
444 | /// never requires any form of waiting. Because of this, the `send` method can be |
445 | /// used in both synchronous and asynchronous code without problems. |
446 | /// |
447 | /// If the receive half of the channel is closed, either due to [`close`] |
448 | /// being called or the [`UnboundedReceiver`] having been dropped, this |
449 | /// function returns an error. The error includes the value passed to `send`. |
450 | /// |
451 | /// [`close`]: UnboundedReceiver::close |
452 | /// [`UnboundedReceiver`]: UnboundedReceiver |
453 | pub fn send(&self, message: T) -> Result<(), SendError<T>> { |
454 | if !self.inc_num_messages() { |
455 | return Err(SendError(message)); |
456 | } |
457 | |
458 | self.chan.send(message); |
459 | Ok(()) |
460 | } |
461 | |
462 | fn inc_num_messages(&self) -> bool { |
463 | use std::process; |
464 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
465 | |
466 | let mut curr = self.chan.semaphore().0.load(Acquire); |
467 | |
468 | loop { |
469 | if curr & 1 == 1 { |
470 | return false; |
471 | } |
472 | |
473 | if curr == usize::MAX ^ 1 { |
474 | // Overflowed the ref count. There is no safe way to recover, so |
475 | // abort the process. In practice, this should never happen. |
476 | process::abort() |
477 | } |
478 | |
479 | match self |
480 | .chan |
481 | .semaphore() |
482 | .0 |
483 | .compare_exchange(curr, curr + 2, AcqRel, Acquire) |
484 | { |
485 | Ok(_) => return true, |
486 | Err(actual) => { |
487 | curr = actual; |
488 | } |
489 | } |
490 | } |
491 | } |
492 | |
493 | /// Completes when the receiver has dropped. |
494 | /// |
495 | /// This allows the producers to get notified when interest in the produced |
496 | /// values is canceled and immediately stop doing work. |
497 | /// |
498 | /// # Cancel safety |
499 | /// |
500 | /// This method is cancel safe. Once the channel is closed, it stays closed |
501 | /// forever and all future calls to `closed` will return immediately. |
502 | /// |
503 | /// # Examples |
504 | /// |
505 | /// ``` |
506 | /// use tokio::sync::mpsc; |
507 | /// |
508 | /// #[tokio::main] |
509 | /// async fn main() { |
510 | /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); |
511 | /// let tx2 = tx1.clone(); |
512 | /// let tx3 = tx1.clone(); |
513 | /// let tx4 = tx1.clone(); |
514 | /// let tx5 = tx1.clone(); |
515 | /// tokio::spawn(async move { |
516 | /// drop(rx); |
517 | /// }); |
518 | /// |
519 | /// futures::join!( |
520 | /// tx1.closed(), |
521 | /// tx2.closed(), |
522 | /// tx3.closed(), |
523 | /// tx4.closed(), |
524 | /// tx5.closed() |
525 | /// ); |
526 | //// println!("Receiver dropped"); |
527 | /// } |
528 | /// ``` |
529 | pub async fn closed(&self) { |
530 | self.chan.closed().await; |
531 | } |
532 | |
533 | /// Checks if the channel has been closed. This happens when the |
534 | /// [`UnboundedReceiver`] is dropped, or when the |
535 | /// [`UnboundedReceiver::close`] method is called. |
536 | /// |
537 | /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver |
538 | /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close |
539 | /// |
540 | /// ``` |
541 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
542 | /// assert!(!tx.is_closed()); |
543 | /// |
544 | /// let tx2 = tx.clone(); |
545 | /// assert!(!tx2.is_closed()); |
546 | /// |
547 | /// drop(rx); |
548 | /// assert!(tx.is_closed()); |
549 | /// assert!(tx2.is_closed()); |
550 | /// ``` |
551 | pub fn is_closed(&self) -> bool { |
552 | self.chan.is_closed() |
553 | } |
554 | |
555 | /// Returns `true` if senders belong to the same channel. |
556 | /// |
557 | /// # Examples |
558 | /// |
559 | /// ``` |
560 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
561 | /// let tx2 = tx.clone(); |
562 | /// assert!(tx.same_channel(&tx2)); |
563 | /// |
564 | /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>(); |
565 | /// assert!(!tx3.same_channel(&tx2)); |
566 | /// ``` |
567 | pub fn same_channel(&self, other: &Self) -> bool { |
568 | self.chan.same_channel(&other.chan) |
569 | } |
570 | |
571 | /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count |
572 | /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the |
573 | /// channel were dropped and only `WeakUnboundedSender` instances remain, |
574 | /// the channel is closed. |
575 | pub fn downgrade(&self) -> WeakUnboundedSender<T> { |
576 | WeakUnboundedSender { |
577 | chan: self.chan.downgrade(), |
578 | } |
579 | } |
580 | } |
581 | |
582 | impl<T> Clone for WeakUnboundedSender<T> { |
583 | fn clone(&self) -> Self { |
584 | WeakUnboundedSender { |
585 | chan: self.chan.clone(), |
586 | } |
587 | } |
588 | } |
589 | |
590 | impl<T> WeakUnboundedSender<T> { |
591 | /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`]. |
592 | /// This will return `Some` if there are other `Sender` instances alive and |
593 | /// the channel wasn't previously dropped, otherwise `None` is returned. |
594 | pub fn upgrade(&self) -> Option<UnboundedSender<T>> { |
595 | chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new) |
596 | } |
597 | } |
598 | |
599 | impl<T> fmt::Debug for WeakUnboundedSender<T> { |
600 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
601 | fmt.debug_struct(name:"WeakUnboundedSender" ).finish() |
602 | } |
603 | } |
604 | |