1 | use crate::loom::sync::{atomic::AtomicUsize, Arc}; |
2 | use crate::sync::mpsc::chan; |
3 | use crate::sync::mpsc::error::{SendError, TryRecvError}; |
4 | |
5 | use std::fmt; |
6 | use std::task::{Context, Poll}; |
7 | |
8 | /// Send values to the associated `UnboundedReceiver`. |
9 | /// |
10 | /// Instances are created by the |
11 | /// [`unbounded_channel`](unbounded_channel) function. |
12 | pub struct UnboundedSender<T> { |
13 | chan: chan::Tx<T, Semaphore>, |
14 | } |
15 | |
16 | /// An unbounded sender that does not prevent the channel from being closed. |
17 | /// |
18 | /// If all [`UnboundedSender`] instances of a channel were dropped and only |
19 | /// `WeakUnboundedSender` instances remain, the channel is closed. |
20 | /// |
21 | /// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using |
22 | /// [`WeakUnboundedSender::upgrade`], which returns `Option<UnboundedSender>`. It returns `None` |
23 | /// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`. |
24 | /// |
25 | /// [`UnboundedSender`]: UnboundedSender |
26 | /// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade |
27 | /// |
28 | /// #Examples |
29 | /// |
30 | /// ``` |
31 | /// use tokio::sync::mpsc::unbounded_channel; |
32 | /// |
33 | /// #[tokio::main] |
34 | /// async fn main() { |
35 | /// let (tx, _rx) = unbounded_channel::<i32>(); |
36 | /// let tx_weak = tx.downgrade(); |
37 | /// |
38 | /// // Upgrading will succeed because `tx` still exists. |
39 | /// assert!(tx_weak.upgrade().is_some()); |
40 | /// |
41 | /// // If we drop `tx`, then it will fail. |
42 | /// drop(tx); |
43 | /// assert!(tx_weak.clone().upgrade().is_none()); |
44 | /// } |
45 | /// ``` |
46 | pub struct WeakUnboundedSender<T> { |
47 | chan: Arc<chan::Chan<T, Semaphore>>, |
48 | } |
49 | |
50 | impl<T> Clone for UnboundedSender<T> { |
51 | fn clone(&self) -> Self { |
52 | UnboundedSender { |
53 | chan: self.chan.clone(), |
54 | } |
55 | } |
56 | } |
57 | |
58 | impl<T> fmt::Debug for UnboundedSender<T> { |
59 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
60 | fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedSender" ) |
61 | .field(name:"chan" , &self.chan) |
62 | .finish() |
63 | } |
64 | } |
65 | |
66 | /// Receive values from the associated `UnboundedSender`. |
67 | /// |
68 | /// Instances are created by the |
69 | /// [`unbounded_channel`](unbounded_channel) function. |
70 | /// |
71 | /// This receiver can be turned into a `Stream` using [`UnboundedReceiverStream`]. |
72 | /// |
73 | /// [`UnboundedReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.UnboundedReceiverStream.html |
74 | pub struct UnboundedReceiver<T> { |
75 | /// The channel receiver |
76 | chan: chan::Rx<T, Semaphore>, |
77 | } |
78 | |
79 | impl<T> fmt::Debug for UnboundedReceiver<T> { |
80 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
81 | fmt&mut DebugStruct<'_, '_>.debug_struct("UnboundedReceiver" ) |
82 | .field(name:"chan" , &self.chan) |
83 | .finish() |
84 | } |
85 | } |
86 | |
87 | /// Creates an unbounded mpsc channel for communicating between asynchronous |
88 | /// tasks without backpressure. |
89 | /// |
90 | /// A `send` on this channel will always succeed as long as the receive half has |
91 | /// not been closed. If the receiver falls behind, messages will be arbitrarily |
92 | /// buffered. |
93 | /// |
94 | /// **Note** that the amount of available system memory is an implicit bound to |
95 | /// the channel. Using an `unbounded` channel has the ability of causing the |
96 | /// process to run out of memory. In this case, the process will be aborted. |
97 | pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) { |
98 | let (tx: Tx, rx: Rx) = chan::channel(Semaphore(AtomicUsize::new(val:0))); |
99 | |
100 | let tx: UnboundedSender = UnboundedSender::new(chan:tx); |
101 | let rx: UnboundedReceiver = UnboundedReceiver::new(chan:rx); |
102 | |
103 | (tx, rx) |
104 | } |
105 | |
106 | /// No capacity |
107 | #[derive (Debug)] |
108 | pub(crate) struct Semaphore(pub(crate) AtomicUsize); |
109 | |
110 | impl<T> UnboundedReceiver<T> { |
111 | pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> UnboundedReceiver<T> { |
112 | UnboundedReceiver { chan } |
113 | } |
114 | |
115 | /// Receives the next value for this receiver. |
116 | /// |
117 | /// This method returns `None` if the channel has been closed and there are |
118 | /// no remaining messages in the channel's buffer. This indicates that no |
119 | /// further values can ever be received from this `Receiver`. The channel is |
120 | /// closed when all senders have been dropped, or when [`close`] is called. |
121 | /// |
122 | /// If there are no messages in the channel's buffer, but the channel has |
123 | /// not yet been closed, this method will sleep until a message is sent or |
124 | /// the channel is closed. |
125 | /// |
126 | /// # Cancel safety |
127 | /// |
128 | /// This method is cancel safe. If `recv` is used as the event in a |
129 | /// [`tokio::select!`](crate::select) statement and some other branch |
130 | /// completes first, it is guaranteed that no messages were received on this |
131 | /// channel. |
132 | /// |
133 | /// [`close`]: Self::close |
134 | /// |
135 | /// # Examples |
136 | /// |
137 | /// ``` |
138 | /// use tokio::sync::mpsc; |
139 | /// |
140 | /// #[tokio::main] |
141 | /// async fn main() { |
142 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
143 | /// |
144 | /// tokio::spawn(async move { |
145 | /// tx.send("hello" ).unwrap(); |
146 | /// }); |
147 | /// |
148 | /// assert_eq!(Some("hello" ), rx.recv().await); |
149 | /// assert_eq!(None, rx.recv().await); |
150 | /// } |
151 | /// ``` |
152 | /// |
153 | /// Values are buffered: |
154 | /// |
155 | /// ``` |
156 | /// use tokio::sync::mpsc; |
157 | /// |
158 | /// #[tokio::main] |
159 | /// async fn main() { |
160 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
161 | /// |
162 | /// tx.send("hello" ).unwrap(); |
163 | /// tx.send("world" ).unwrap(); |
164 | /// |
165 | /// assert_eq!(Some("hello" ), rx.recv().await); |
166 | /// assert_eq!(Some("world" ), rx.recv().await); |
167 | /// } |
168 | /// ``` |
169 | pub async fn recv(&mut self) -> Option<T> { |
170 | use crate::future::poll_fn; |
171 | |
172 | poll_fn(|cx| self.poll_recv(cx)).await |
173 | } |
174 | |
175 | /// Tries to receive the next value for this receiver. |
176 | /// |
177 | /// This method returns the [`Empty`] error if the channel is currently |
178 | /// empty, but there are still outstanding [senders] or [permits]. |
179 | /// |
180 | /// This method returns the [`Disconnected`] error if the channel is |
181 | /// currently empty, and there are no outstanding [senders] or [permits]. |
182 | /// |
183 | /// Unlike the [`poll_recv`] method, this method will never return an |
184 | /// [`Empty`] error spuriously. |
185 | /// |
186 | /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty |
187 | /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected |
188 | /// [`poll_recv`]: Self::poll_recv |
189 | /// [senders]: crate::sync::mpsc::Sender |
190 | /// [permits]: crate::sync::mpsc::Permit |
191 | /// |
192 | /// # Examples |
193 | /// |
194 | /// ``` |
195 | /// use tokio::sync::mpsc; |
196 | /// use tokio::sync::mpsc::error::TryRecvError; |
197 | /// |
198 | /// #[tokio::main] |
199 | /// async fn main() { |
200 | /// let (tx, mut rx) = mpsc::unbounded_channel(); |
201 | /// |
202 | /// tx.send("hello" ).unwrap(); |
203 | /// |
204 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
205 | /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
206 | /// |
207 | /// tx.send("hello" ).unwrap(); |
208 | /// // Drop the last sender, closing the channel. |
209 | /// drop(tx); |
210 | /// |
211 | /// assert_eq!(Ok("hello" ), rx.try_recv()); |
212 | /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
213 | /// } |
214 | /// ``` |
215 | pub fn try_recv(&mut self) -> Result<T, TryRecvError> { |
216 | self.chan.try_recv() |
217 | } |
218 | |
219 | /// Blocking receive to call outside of asynchronous contexts. |
220 | /// |
221 | /// # Panics |
222 | /// |
223 | /// This function panics if called within an asynchronous execution |
224 | /// context. |
225 | /// |
226 | /// # Examples |
227 | /// |
228 | /// ``` |
229 | /// use std::thread; |
230 | /// use tokio::sync::mpsc; |
231 | /// |
232 | /// #[tokio::main] |
233 | /// async fn main() { |
234 | /// let (tx, mut rx) = mpsc::unbounded_channel::<u8>(); |
235 | /// |
236 | /// let sync_code = thread::spawn(move || { |
237 | /// assert_eq!(Some(10), rx.blocking_recv()); |
238 | /// }); |
239 | /// |
240 | /// let _ = tx.send(10); |
241 | /// sync_code.join().unwrap(); |
242 | /// } |
243 | /// ``` |
244 | #[track_caller ] |
245 | #[cfg (feature = "sync" )] |
246 | #[cfg_attr (docsrs, doc(alias = "recv_blocking" ))] |
247 | pub fn blocking_recv(&mut self) -> Option<T> { |
248 | crate::future::block_on(self.recv()) |
249 | } |
250 | |
251 | /// Closes the receiving half of a channel, without dropping it. |
252 | /// |
253 | /// This prevents any further messages from being sent on the channel while |
254 | /// still enabling the receiver to drain messages that are buffered. |
255 | /// |
256 | /// To guarantee that no messages are dropped, after calling `close()`, |
257 | /// `recv()` must be called until `None` is returned. |
258 | pub fn close(&mut self) { |
259 | self.chan.close(); |
260 | } |
261 | |
262 | /// Polls to receive the next message on this channel. |
263 | /// |
264 | /// This method returns: |
265 | /// |
266 | /// * `Poll::Pending` if no messages are available but the channel is not |
267 | /// closed, or if a spurious failure happens. |
268 | /// * `Poll::Ready(Some(message))` if a message is available. |
269 | /// * `Poll::Ready(None)` if the channel has been closed and all messages |
270 | /// sent before it was closed have been received. |
271 | /// |
272 | /// When the method returns `Poll::Pending`, the `Waker` in the provided |
273 | /// `Context` is scheduled to receive a wakeup when a message is sent on any |
274 | /// receiver, or when the channel is closed. Note that on multiple calls to |
275 | /// `poll_recv`, only the `Waker` from the `Context` passed to the most |
276 | /// recent call is scheduled to receive a wakeup. |
277 | /// |
278 | /// If this method returns `Poll::Pending` due to a spurious failure, then |
279 | /// the `Waker` will be notified when the situation causing the spurious |
280 | /// failure has been resolved. Note that receiving such a wakeup does not |
281 | /// guarantee that the next call will succeed — it could fail with another |
282 | /// spurious failure. |
283 | pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { |
284 | self.chan.recv(cx) |
285 | } |
286 | } |
287 | |
288 | impl<T> UnboundedSender<T> { |
289 | pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> UnboundedSender<T> { |
290 | UnboundedSender { chan } |
291 | } |
292 | |
293 | /// Attempts to send a message on this `UnboundedSender` without blocking. |
294 | /// |
295 | /// This method is not marked async because sending a message to an unbounded channel |
296 | /// never requires any form of waiting. Because of this, the `send` method can be |
297 | /// used in both synchronous and asynchronous code without problems. |
298 | /// |
299 | /// If the receive half of the channel is closed, either due to [`close`] |
300 | /// being called or the [`UnboundedReceiver`] having been dropped, this |
301 | /// function returns an error. The error includes the value passed to `send`. |
302 | /// |
303 | /// [`close`]: UnboundedReceiver::close |
304 | /// [`UnboundedReceiver`]: UnboundedReceiver |
305 | pub fn send(&self, message: T) -> Result<(), SendError<T>> { |
306 | if !self.inc_num_messages() { |
307 | return Err(SendError(message)); |
308 | } |
309 | |
310 | self.chan.send(message); |
311 | Ok(()) |
312 | } |
313 | |
314 | fn inc_num_messages(&self) -> bool { |
315 | use std::process; |
316 | use std::sync::atomic::Ordering::{AcqRel, Acquire}; |
317 | |
318 | let mut curr = self.chan.semaphore().0.load(Acquire); |
319 | |
320 | loop { |
321 | if curr & 1 == 1 { |
322 | return false; |
323 | } |
324 | |
325 | if curr == usize::MAX ^ 1 { |
326 | // Overflowed the ref count. There is no safe way to recover, so |
327 | // abort the process. In practice, this should never happen. |
328 | process::abort() |
329 | } |
330 | |
331 | match self |
332 | .chan |
333 | .semaphore() |
334 | .0 |
335 | .compare_exchange(curr, curr + 2, AcqRel, Acquire) |
336 | { |
337 | Ok(_) => return true, |
338 | Err(actual) => { |
339 | curr = actual; |
340 | } |
341 | } |
342 | } |
343 | } |
344 | |
345 | /// Completes when the receiver has dropped. |
346 | /// |
347 | /// This allows the producers to get notified when interest in the produced |
348 | /// values is canceled and immediately stop doing work. |
349 | /// |
350 | /// # Cancel safety |
351 | /// |
352 | /// This method is cancel safe. Once the channel is closed, it stays closed |
353 | /// forever and all future calls to `closed` will return immediately. |
354 | /// |
355 | /// # Examples |
356 | /// |
357 | /// ``` |
358 | /// use tokio::sync::mpsc; |
359 | /// |
360 | /// #[tokio::main] |
361 | /// async fn main() { |
362 | /// let (tx1, rx) = mpsc::unbounded_channel::<()>(); |
363 | /// let tx2 = tx1.clone(); |
364 | /// let tx3 = tx1.clone(); |
365 | /// let tx4 = tx1.clone(); |
366 | /// let tx5 = tx1.clone(); |
367 | /// tokio::spawn(async move { |
368 | /// drop(rx); |
369 | /// }); |
370 | /// |
371 | /// futures::join!( |
372 | /// tx1.closed(), |
373 | /// tx2.closed(), |
374 | /// tx3.closed(), |
375 | /// tx4.closed(), |
376 | /// tx5.closed() |
377 | /// ); |
378 | //// println!("Receiver dropped"); |
379 | /// } |
380 | /// ``` |
381 | pub async fn closed(&self) { |
382 | self.chan.closed().await |
383 | } |
384 | |
385 | /// Checks if the channel has been closed. This happens when the |
386 | /// [`UnboundedReceiver`] is dropped, or when the |
387 | /// [`UnboundedReceiver::close`] method is called. |
388 | /// |
389 | /// [`UnboundedReceiver`]: crate::sync::mpsc::UnboundedReceiver |
390 | /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close |
391 | /// |
392 | /// ``` |
393 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
394 | /// assert!(!tx.is_closed()); |
395 | /// |
396 | /// let tx2 = tx.clone(); |
397 | /// assert!(!tx2.is_closed()); |
398 | /// |
399 | /// drop(rx); |
400 | /// assert!(tx.is_closed()); |
401 | /// assert!(tx2.is_closed()); |
402 | /// ``` |
403 | pub fn is_closed(&self) -> bool { |
404 | self.chan.is_closed() |
405 | } |
406 | |
407 | /// Returns `true` if senders belong to the same channel. |
408 | /// |
409 | /// # Examples |
410 | /// |
411 | /// ``` |
412 | /// let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<()>(); |
413 | /// let tx2 = tx.clone(); |
414 | /// assert!(tx.same_channel(&tx2)); |
415 | /// |
416 | /// let (tx3, rx3) = tokio::sync::mpsc::unbounded_channel::<()>(); |
417 | /// assert!(!tx3.same_channel(&tx2)); |
418 | /// ``` |
419 | pub fn same_channel(&self, other: &Self) -> bool { |
420 | self.chan.same_channel(&other.chan) |
421 | } |
422 | |
423 | /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count |
424 | /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the |
425 | /// channel were dropped and only `WeakUnboundedSender` instances remain, |
426 | /// the channel is closed. |
427 | pub fn downgrade(&self) -> WeakUnboundedSender<T> { |
428 | WeakUnboundedSender { |
429 | chan: self.chan.downgrade(), |
430 | } |
431 | } |
432 | } |
433 | |
434 | impl<T> Clone for WeakUnboundedSender<T> { |
435 | fn clone(&self) -> Self { |
436 | WeakUnboundedSender { |
437 | chan: self.chan.clone(), |
438 | } |
439 | } |
440 | } |
441 | |
442 | impl<T> WeakUnboundedSender<T> { |
443 | /// Tries to convert a WeakUnboundedSender into an [`UnboundedSender`]. |
444 | /// This will return `Some` if there are other `Sender` instances alive and |
445 | /// the channel wasn't previously dropped, otherwise `None` is returned. |
446 | pub fn upgrade(&self) -> Option<UnboundedSender<T>> { |
447 | chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new) |
448 | } |
449 | } |
450 | |
451 | impl<T> fmt::Debug for WeakUnboundedSender<T> { |
452 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
453 | fmt.debug_struct(name:"WeakUnboundedSender" ).finish() |
454 | } |
455 | } |
456 | |