1 | //! A channel for sending a single message between asynchronous tasks. |
2 | //! |
3 | //! This is a single-producer, single-consumer channel. |
4 | |
5 | use alloc::sync::Arc; |
6 | use core::fmt; |
7 | use core::pin::Pin; |
8 | use core::sync::atomic::AtomicBool; |
9 | use core::sync::atomic::Ordering::SeqCst; |
10 | use futures_core::future::{FusedFuture, Future}; |
11 | use futures_core::task::{Context, Poll, Waker}; |
12 | |
13 | use crate::lock::Lock; |
14 | |
15 | /// A future for a value that will be provided by another asynchronous task. |
16 | /// |
17 | /// This is created by the [`channel`] function. |
18 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
19 | pub struct Receiver<T> { |
20 | inner: Arc<Inner<T>>, |
21 | } |
22 | |
23 | /// A means of transmitting a single value to another task. |
24 | /// |
25 | /// This is created by the [`channel`] function. |
26 | pub struct Sender<T> { |
27 | inner: Arc<Inner<T>>, |
28 | } |
29 | |
30 | // The channels do not ever project Pin to the inner T |
31 | impl<T> Unpin for Receiver<T> {} |
32 | impl<T> Unpin for Sender<T> {} |
33 | |
34 | /// Internal state of the `Receiver`/`Sender` pair above. This is all used as |
35 | /// the internal synchronization between the two for send/recv operations. |
36 | struct Inner<T> { |
37 | /// Indicates whether this oneshot is complete yet. This is filled in both |
38 | /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it |
39 | /// appropriately. |
40 | /// |
41 | /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is |
42 | /// unlocked and ready to be inspected. |
43 | /// |
44 | /// For `Sender` if this is `true` then the oneshot has gone away and it |
45 | /// can return ready from `poll_canceled`. |
46 | complete: AtomicBool, |
47 | |
48 | /// The actual data being transferred as part of this `Receiver`. This is |
49 | /// filled in by `Sender::complete` and read by `Receiver::poll`. |
50 | /// |
51 | /// Note that this is protected by `Lock`, but it is in theory safe to |
52 | /// replace with an `UnsafeCell` as it's actually protected by `complete` |
53 | /// above. I wouldn't recommend doing this, however, unless someone is |
54 | /// supremely confident in the various atomic orderings here and there. |
55 | data: Lock<Option<T>>, |
56 | |
57 | /// Field to store the task which is blocked in `Receiver::poll`. |
58 | /// |
59 | /// This is filled in when a oneshot is polled but not ready yet. Note that |
60 | /// the `Lock` here, unlike in `data` above, is important to resolve races. |
61 | /// Both the `Receiver` and the `Sender` halves understand that if they |
62 | /// can't acquire the lock then some important interference is happening. |
63 | rx_task: Lock<Option<Waker>>, |
64 | |
65 | /// Like `rx_task` above, except for the task blocked in |
66 | /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`. |
67 | tx_task: Lock<Option<Waker>>, |
68 | } |
69 | |
70 | /// Creates a new one-shot channel for sending a single value across asynchronous tasks. |
71 | /// |
72 | /// The channel works for a spsc (single-producer, single-consumer) scheme. |
73 | /// |
74 | /// This function is similar to Rust's channel constructor found in the standard |
75 | /// library. Two halves are returned, the first of which is a `Sender` handle, |
76 | /// used to signal the end of a computation and provide its value. The second |
77 | /// half is a `Receiver` which implements the `Future` trait, resolving to the |
78 | /// value that was given to the `Sender` handle. |
79 | /// |
80 | /// Each half can be separately owned and sent across tasks. |
81 | /// |
82 | /// # Examples |
83 | /// |
84 | /// ``` |
85 | /// use futures::channel::oneshot; |
86 | /// use std::{thread, time::Duration}; |
87 | /// |
88 | /// let (sender, receiver) = oneshot::channel::<i32>(); |
89 | /// |
90 | /// thread::spawn(|| { |
91 | /// println!("THREAD: sleeping zzz..." ); |
92 | /// thread::sleep(Duration::from_millis(1000)); |
93 | /// println!("THREAD: i'm awake! sending." ); |
94 | /// sender.send(3).unwrap(); |
95 | /// }); |
96 | /// |
97 | /// println!("MAIN: doing some useful stuff" ); |
98 | /// |
99 | /// futures::executor::block_on(async { |
100 | /// println!("MAIN: waiting for msg..." ); |
101 | /// println!("MAIN: got: {:?}" , receiver.await) |
102 | /// }); |
103 | /// ``` |
104 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
105 | let inner = Arc::new(Inner::new()); |
106 | let receiver = Receiver { inner: inner.clone() }; |
107 | let sender = Sender { inner }; |
108 | (sender, receiver) |
109 | } |
110 | |
111 | impl<T> Inner<T> { |
112 | fn new() -> Self { |
113 | Self { |
114 | complete: AtomicBool::new(false), |
115 | data: Lock::new(None), |
116 | rx_task: Lock::new(None), |
117 | tx_task: Lock::new(None), |
118 | } |
119 | } |
120 | |
121 | fn send(&self, t: T) -> Result<(), T> { |
122 | if self.complete.load(SeqCst) { |
123 | return Err(t); |
124 | } |
125 | |
126 | // Note that this lock acquisition may fail if the receiver |
127 | // is closed and sets the `complete` flag to `true`, whereupon |
128 | // the receiver may call `poll()`. |
129 | if let Some(mut slot) = self.data.try_lock() { |
130 | assert!(slot.is_none()); |
131 | *slot = Some(t); |
132 | drop(slot); |
133 | |
134 | // If the receiver called `close()` between the check at the |
135 | // start of the function, and the lock being released, then |
136 | // the receiver may not be around to receive it, so try to |
137 | // pull it back out. |
138 | if self.complete.load(SeqCst) { |
139 | // If lock acquisition fails, then receiver is actually |
140 | // receiving it, so we're good. |
141 | if let Some(mut slot) = self.data.try_lock() { |
142 | if let Some(t) = slot.take() { |
143 | return Err(t); |
144 | } |
145 | } |
146 | } |
147 | Ok(()) |
148 | } else { |
149 | // Must have been closed |
150 | Err(t) |
151 | } |
152 | } |
153 | |
154 | fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> { |
155 | // Fast path up first, just read the flag and see if our other half is |
156 | // gone. This flag is set both in our destructor and the oneshot |
157 | // destructor, but our destructor hasn't run yet so if it's set then the |
158 | // oneshot is gone. |
159 | if self.complete.load(SeqCst) { |
160 | return Poll::Ready(()); |
161 | } |
162 | |
163 | // If our other half is not gone then we need to park our current task |
164 | // and move it into the `tx_task` slot to get notified when it's |
165 | // actually gone. |
166 | // |
167 | // If `try_lock` fails, then the `Receiver` is in the process of using |
168 | // it, so we can deduce that it's now in the process of going away and |
169 | // hence we're canceled. If it succeeds then we just store our handle. |
170 | // |
171 | // Crucially we then check `complete` *again* before we return. |
172 | // While we were storing our handle inside `tx_task` the |
173 | // `Receiver` may have been dropped. The first thing it does is set the |
174 | // flag, and if it fails to acquire the lock it assumes that we'll see |
175 | // the flag later on. So... we then try to see the flag later on! |
176 | let handle = cx.waker().clone(); |
177 | match self.tx_task.try_lock() { |
178 | Some(mut p) => *p = Some(handle), |
179 | None => return Poll::Ready(()), |
180 | } |
181 | if self.complete.load(SeqCst) { |
182 | Poll::Ready(()) |
183 | } else { |
184 | Poll::Pending |
185 | } |
186 | } |
187 | |
188 | fn is_canceled(&self) -> bool { |
189 | self.complete.load(SeqCst) |
190 | } |
191 | |
192 | fn drop_tx(&self) { |
193 | // Flag that we're a completed `Sender` and try to wake up a receiver. |
194 | // Whether or not we actually stored any data will get picked up and |
195 | // translated to either an item or cancellation. |
196 | // |
197 | // Note that if we fail to acquire the `rx_task` lock then that means |
198 | // we're in one of two situations: |
199 | // |
200 | // 1. The receiver is trying to block in `poll` |
201 | // 2. The receiver is being dropped |
202 | // |
203 | // In the first case it'll check the `complete` flag after it's done |
204 | // blocking to see if it succeeded. In the latter case we don't need to |
205 | // wake up anyone anyway. So in both cases it's ok to ignore the `None` |
206 | // case of `try_lock` and bail out. |
207 | // |
208 | // The first case crucially depends on `Lock` using `SeqCst` ordering |
209 | // under the hood. If it instead used `Release` / `Acquire` ordering, |
210 | // then it would not necessarily synchronize with `inner.complete` |
211 | // and deadlock might be possible, as was observed in |
212 | // https://github.com/rust-lang/futures-rs/pull/219. |
213 | self.complete.store(true, SeqCst); |
214 | |
215 | if let Some(mut slot) = self.rx_task.try_lock() { |
216 | if let Some(task) = slot.take() { |
217 | drop(slot); |
218 | task.wake(); |
219 | } |
220 | } |
221 | |
222 | // If we registered a task for cancel notification drop it to reduce |
223 | // spurious wakeups |
224 | if let Some(mut slot) = self.tx_task.try_lock() { |
225 | drop(slot.take()); |
226 | } |
227 | } |
228 | |
229 | fn close_rx(&self) { |
230 | // Flag our completion and then attempt to wake up the sender if it's |
231 | // blocked. See comments in `drop` below for more info |
232 | self.complete.store(true, SeqCst); |
233 | if let Some(mut handle) = self.tx_task.try_lock() { |
234 | if let Some(task) = handle.take() { |
235 | drop(handle); |
236 | task.wake() |
237 | } |
238 | } |
239 | } |
240 | |
241 | fn try_recv(&self) -> Result<Option<T>, Canceled> { |
242 | // If we're complete, either `::close_rx` or `::drop_tx` was called. |
243 | // We can assume a successful send if data is present. |
244 | if self.complete.load(SeqCst) { |
245 | if let Some(mut slot) = self.data.try_lock() { |
246 | if let Some(data) = slot.take() { |
247 | return Ok(Some(data)); |
248 | } |
249 | } |
250 | Err(Canceled) |
251 | } else { |
252 | Ok(None) |
253 | } |
254 | } |
255 | |
256 | fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
257 | // Check to see if some data has arrived. If it hasn't then we need to |
258 | // block our task. |
259 | // |
260 | // Note that the acquisition of the `rx_task` lock might fail below, but |
261 | // the only situation where this can happen is during `Sender::drop` |
262 | // when we are indeed completed already. If that's happening then we |
263 | // know we're completed so keep going. |
264 | let done = if self.complete.load(SeqCst) { |
265 | true |
266 | } else { |
267 | let task = cx.waker().clone(); |
268 | match self.rx_task.try_lock() { |
269 | Some(mut slot) => { |
270 | *slot = Some(task); |
271 | false |
272 | } |
273 | None => true, |
274 | } |
275 | }; |
276 | |
277 | // If we're `done` via one of the paths above, then look at the data and |
278 | // figure out what the answer is. If, however, we stored `rx_task` |
279 | // successfully above we need to check again if we're completed in case |
280 | // a message was sent while `rx_task` was locked and couldn't notify us |
281 | // otherwise. |
282 | // |
283 | // If we're not done, and we're not complete, though, then we've |
284 | // successfully blocked our task and we return `Pending`. |
285 | if done || self.complete.load(SeqCst) { |
286 | // If taking the lock fails, the sender will realise that the we're |
287 | // `done` when it checks the `complete` flag on the way out, and |
288 | // will treat the send as a failure. |
289 | if let Some(mut slot) = self.data.try_lock() { |
290 | if let Some(data) = slot.take() { |
291 | return Poll::Ready(Ok(data)); |
292 | } |
293 | } |
294 | Poll::Ready(Err(Canceled)) |
295 | } else { |
296 | Poll::Pending |
297 | } |
298 | } |
299 | |
300 | fn drop_rx(&self) { |
301 | // Indicate to the `Sender` that we're done, so any future calls to |
302 | // `poll_canceled` are weeded out. |
303 | self.complete.store(true, SeqCst); |
304 | |
305 | // If we've blocked a task then there's no need for it to stick around, |
306 | // so we need to drop it. If this lock acquisition fails, though, then |
307 | // it's just because our `Sender` is trying to take the task, so we |
308 | // let them take care of that. |
309 | if let Some(mut slot) = self.rx_task.try_lock() { |
310 | let task = slot.take(); |
311 | drop(slot); |
312 | drop(task); |
313 | } |
314 | |
315 | // Finally, if our `Sender` wants to get notified of us going away, it |
316 | // would have stored something in `tx_task`. Here we try to peel that |
317 | // out and unpark it. |
318 | // |
319 | // Note that the `try_lock` here may fail, but only if the `Sender` is |
320 | // in the process of filling in the task. If that happens then we |
321 | // already flagged `complete` and they'll pick that up above. |
322 | if let Some(mut handle) = self.tx_task.try_lock() { |
323 | if let Some(task) = handle.take() { |
324 | drop(handle); |
325 | task.wake() |
326 | } |
327 | } |
328 | } |
329 | } |
330 | |
331 | impl<T> Sender<T> { |
332 | /// Completes this oneshot with a successful result. |
333 | /// |
334 | /// This function will consume `self` and indicate to the other end, the |
335 | /// [`Receiver`], that the value provided is the result of the computation |
336 | /// this represents. |
337 | /// |
338 | /// If the value is successfully enqueued for the remote end to receive, |
339 | /// then `Ok(())` is returned. If the receiving end was dropped before |
340 | /// this function was called, however, then `Err(t)` is returned. |
341 | pub fn send(self, t: T) -> Result<(), T> { |
342 | self.inner.send(t) |
343 | } |
344 | |
345 | /// Polls this `Sender` half to detect whether its associated |
346 | /// [`Receiver`] has been dropped. |
347 | /// |
348 | /// # Return values |
349 | /// |
350 | /// If `Ready(())` is returned then the associated `Receiver` has been |
351 | /// dropped, which means any work required for sending should be canceled. |
352 | /// |
353 | /// If `Pending` is returned then the associated `Receiver` is still |
354 | /// alive and may be able to receive a message if sent. The current task, |
355 | /// however, is scheduled to receive a notification if the corresponding |
356 | /// `Receiver` goes away. |
357 | pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> { |
358 | self.inner.poll_canceled(cx) |
359 | } |
360 | |
361 | /// Creates a future that resolves when this `Sender`'s corresponding |
362 | /// [`Receiver`] half has hung up. |
363 | /// |
364 | /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled) |
365 | /// to expose a [`Future`]. |
366 | pub fn cancellation(&mut self) -> Cancellation<'_, T> { |
367 | Cancellation { inner: self } |
368 | } |
369 | |
370 | /// Tests to see whether this `Sender`'s corresponding `Receiver` |
371 | /// has been dropped. |
372 | /// |
373 | /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not |
374 | /// enqueue a task for wakeup upon cancellation, but merely reports the |
375 | /// current state, which may be subject to concurrent modification. |
376 | pub fn is_canceled(&self) -> bool { |
377 | self.inner.is_canceled() |
378 | } |
379 | |
380 | /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether |
381 | /// they were created by the same call to `channel`. |
382 | pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool { |
383 | Arc::ptr_eq(&self.inner, &receiver.inner) |
384 | } |
385 | } |
386 | |
387 | impl<T> Drop for Sender<T> { |
388 | fn drop(&mut self) { |
389 | self.inner.drop_tx() |
390 | } |
391 | } |
392 | |
393 | impl<T> fmt::Debug for Sender<T> { |
394 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
395 | f.debug_struct("Sender" ).field("complete" , &self.inner.complete).finish() |
396 | } |
397 | } |
398 | |
399 | /// A future that resolves when the receiving end of a channel has hung up. |
400 | /// |
401 | /// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled). |
402 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
403 | #[derive(Debug)] |
404 | pub struct Cancellation<'a, T> { |
405 | inner: &'a mut Sender<T>, |
406 | } |
407 | |
408 | impl<T> Future for Cancellation<'_, T> { |
409 | type Output = (); |
410 | |
411 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { |
412 | self.inner.poll_canceled(cx) |
413 | } |
414 | } |
415 | |
416 | /// Error returned from a [`Receiver`] when the corresponding [`Sender`] is |
417 | /// dropped. |
418 | #[derive(Clone, Copy, PartialEq, Eq, Debug)] |
419 | pub struct Canceled; |
420 | |
421 | impl fmt::Display for Canceled { |
422 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
423 | write!(f, "oneshot canceled" ) |
424 | } |
425 | } |
426 | |
427 | #[cfg (feature = "std" )] |
428 | impl std::error::Error for Canceled {} |
429 | |
430 | impl<T> Receiver<T> { |
431 | /// Gracefully close this receiver, preventing any subsequent attempts to |
432 | /// send to it. |
433 | /// |
434 | /// Any `send` operation which happens after this method returns is |
435 | /// guaranteed to fail. After calling this method, you can use |
436 | /// [`Receiver::poll`](core::future::Future::poll) to determine whether a |
437 | /// message had previously been sent. |
438 | pub fn close(&mut self) { |
439 | self.inner.close_rx() |
440 | } |
441 | |
442 | /// Attempts to receive a message outside of the context of a task. |
443 | /// |
444 | /// Does not schedule a task wakeup or have any other side effects. |
445 | /// |
446 | /// A return value of `None` must be considered immediately stale (out of |
447 | /// date) unless [`close`](Receiver::close) has been called first. |
448 | /// |
449 | /// Returns an error if the sender was dropped. |
450 | pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> { |
451 | self.inner.try_recv() |
452 | } |
453 | } |
454 | |
455 | impl<T> Future for Receiver<T> { |
456 | type Output = Result<T, Canceled>; |
457 | |
458 | fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> { |
459 | self.inner.recv(cx) |
460 | } |
461 | } |
462 | |
463 | impl<T> FusedFuture for Receiver<T> { |
464 | fn is_terminated(&self) -> bool { |
465 | if self.inner.complete.load(SeqCst) { |
466 | if let Some(slot) = self.inner.data.try_lock() { |
467 | if slot.is_some() { |
468 | return false; |
469 | } |
470 | } |
471 | true |
472 | } else { |
473 | false |
474 | } |
475 | } |
476 | } |
477 | |
478 | impl<T> Drop for Receiver<T> { |
479 | fn drop(&mut self) { |
480 | self.inner.drop_rx() |
481 | } |
482 | } |
483 | |
484 | impl<T> fmt::Debug for Receiver<T> { |
485 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
486 | f.debug_struct("Receiver" ).field("complete" , &self.inner.complete).finish() |
487 | } |
488 | } |
489 | |