1//! A channel for sending a single message between asynchronous tasks.
2//!
3//! This is a single-producer, single-consumer channel.
4
5use alloc::sync::Arc;
6use core::fmt;
7use core::pin::Pin;
8use core::sync::atomic::AtomicBool;
9use core::sync::atomic::Ordering::SeqCst;
10use futures_core::future::{FusedFuture, Future};
11use futures_core::task::{Context, Poll, Waker};
12
13use crate::lock::Lock;
14
15/// A future for a value that will be provided by another asynchronous task.
16///
17/// This is created by the [`channel`](channel) function.
18#[must_use = "futures do nothing unless you `.await` or poll them"]
19pub struct Receiver<T> {
20 inner: Arc<Inner<T>>,
21}
22
23/// A means of transmitting a single value to another task.
24///
25/// This is created by the [`channel`](channel) function.
26pub struct Sender<T> {
27 inner: Arc<Inner<T>>,
28}
29
30// The channels do not ever project Pin to the inner T
31impl<T> Unpin for Receiver<T> {}
32impl<T> Unpin for Sender<T> {}
33
34/// Internal state of the `Receiver`/`Sender` pair above. This is all used as
35/// the internal synchronization between the two for send/recv operations.
36struct Inner<T> {
37 /// Indicates whether this oneshot is complete yet. This is filled in both
38 /// by `Sender::drop` and by `Receiver::drop`, and both sides interpret it
39 /// appropriately.
40 ///
41 /// For `Receiver`, if this is `true`, then it's guaranteed that `data` is
42 /// unlocked and ready to be inspected.
43 ///
44 /// For `Sender` if this is `true` then the oneshot has gone away and it
45 /// can return ready from `poll_canceled`.
46 complete: AtomicBool,
47
48 /// The actual data being transferred as part of this `Receiver`. This is
49 /// filled in by `Sender::complete` and read by `Receiver::poll`.
50 ///
51 /// Note that this is protected by `Lock`, but it is in theory safe to
52 /// replace with an `UnsafeCell` as it's actually protected by `complete`
53 /// above. I wouldn't recommend doing this, however, unless someone is
54 /// supremely confident in the various atomic orderings here and there.
55 data: Lock<Option<T>>,
56
57 /// Field to store the task which is blocked in `Receiver::poll`.
58 ///
59 /// This is filled in when a oneshot is polled but not ready yet. Note that
60 /// the `Lock` here, unlike in `data` above, is important to resolve races.
61 /// Both the `Receiver` and the `Sender` halves understand that if they
62 /// can't acquire the lock then some important interference is happening.
63 rx_task: Lock<Option<Waker>>,
64
65 /// Like `rx_task` above, except for the task blocked in
66 /// `Sender::poll_canceled`. Additionally, `Lock` cannot be `UnsafeCell`.
67 tx_task: Lock<Option<Waker>>,
68}
69
70/// Creates a new one-shot channel for sending a single value across asynchronous tasks.
71///
72/// The channel works for a spsc (single-producer, single-consumer) scheme.
73///
74/// This function is similar to Rust's channel constructor found in the standard
75/// library. Two halves are returned, the first of which is a `Sender` handle,
76/// used to signal the end of a computation and provide its value. The second
77/// half is a `Receiver` which implements the `Future` trait, resolving to the
78/// value that was given to the `Sender` handle.
79///
80/// Each half can be separately owned and sent across tasks.
81///
82/// # Examples
83///
84/// ```
85/// use futures::channel::oneshot;
86/// use std::{thread, time::Duration};
87///
88/// let (sender, receiver) = oneshot::channel::<i32>();
89///
90/// thread::spawn(|| {
91/// println!("THREAD: sleeping zzz...");
92/// thread::sleep(Duration::from_millis(1000));
93/// println!("THREAD: i'm awake! sending.");
94/// sender.send(3).unwrap();
95/// });
96///
97/// println!("MAIN: doing some useful stuff");
98///
99/// futures::executor::block_on(async {
100/// println!("MAIN: waiting for msg...");
101/// println!("MAIN: got: {:?}", receiver.await)
102/// });
103/// ```
104pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
105 let inner: Arc> = Arc::new(data:Inner::new());
106 let receiver: Receiver = Receiver { inner: inner.clone() };
107 let sender: Sender = Sender { inner };
108 (sender, receiver)
109}
110
111impl<T> Inner<T> {
112 fn new() -> Self {
113 Self {
114 complete: AtomicBool::new(false),
115 data: Lock::new(None),
116 rx_task: Lock::new(None),
117 tx_task: Lock::new(None),
118 }
119 }
120
121 fn send(&self, t: T) -> Result<(), T> {
122 if self.complete.load(SeqCst) {
123 return Err(t);
124 }
125
126 // Note that this lock acquisition may fail if the receiver
127 // is closed and sets the `complete` flag to `true`, whereupon
128 // the receiver may call `poll()`.
129 if let Some(mut slot) = self.data.try_lock() {
130 assert!(slot.is_none());
131 *slot = Some(t);
132 drop(slot);
133
134 // If the receiver called `close()` between the check at the
135 // start of the function, and the lock being released, then
136 // the receiver may not be around to receive it, so try to
137 // pull it back out.
138 if self.complete.load(SeqCst) {
139 // If lock acquisition fails, then receiver is actually
140 // receiving it, so we're good.
141 if let Some(mut slot) = self.data.try_lock() {
142 if let Some(t) = slot.take() {
143 return Err(t);
144 }
145 }
146 }
147 Ok(())
148 } else {
149 // Must have been closed
150 Err(t)
151 }
152 }
153
154 fn poll_canceled(&self, cx: &mut Context<'_>) -> Poll<()> {
155 // Fast path up first, just read the flag and see if our other half is
156 // gone. This flag is set both in our destructor and the oneshot
157 // destructor, but our destructor hasn't run yet so if it's set then the
158 // oneshot is gone.
159 if self.complete.load(SeqCst) {
160 return Poll::Ready(());
161 }
162
163 // If our other half is not gone then we need to park our current task
164 // and move it into the `tx_task` slot to get notified when it's
165 // actually gone.
166 //
167 // If `try_lock` fails, then the `Receiver` is in the process of using
168 // it, so we can deduce that it's now in the process of going away and
169 // hence we're canceled. If it succeeds then we just store our handle.
170 //
171 // Crucially we then check `complete` *again* before we return.
172 // While we were storing our handle inside `tx_task` the
173 // `Receiver` may have been dropped. The first thing it does is set the
174 // flag, and if it fails to acquire the lock it assumes that we'll see
175 // the flag later on. So... we then try to see the flag later on!
176 let handle = cx.waker().clone();
177 match self.tx_task.try_lock() {
178 Some(mut p) => *p = Some(handle),
179 None => return Poll::Ready(()),
180 }
181 if self.complete.load(SeqCst) {
182 Poll::Ready(())
183 } else {
184 Poll::Pending
185 }
186 }
187
188 fn is_canceled(&self) -> bool {
189 self.complete.load(SeqCst)
190 }
191
192 fn drop_tx(&self) {
193 // Flag that we're a completed `Sender` and try to wake up a receiver.
194 // Whether or not we actually stored any data will get picked up and
195 // translated to either an item or cancellation.
196 //
197 // Note that if we fail to acquire the `rx_task` lock then that means
198 // we're in one of two situations:
199 //
200 // 1. The receiver is trying to block in `poll`
201 // 2. The receiver is being dropped
202 //
203 // In the first case it'll check the `complete` flag after it's done
204 // blocking to see if it succeeded. In the latter case we don't need to
205 // wake up anyone anyway. So in both cases it's ok to ignore the `None`
206 // case of `try_lock` and bail out.
207 //
208 // The first case crucially depends on `Lock` using `SeqCst` ordering
209 // under the hood. If it instead used `Release` / `Acquire` ordering,
210 // then it would not necessarily synchronize with `inner.complete`
211 // and deadlock might be possible, as was observed in
212 // https://github.com/rust-lang/futures-rs/pull/219.
213 self.complete.store(true, SeqCst);
214
215 if let Some(mut slot) = self.rx_task.try_lock() {
216 if let Some(task) = slot.take() {
217 drop(slot);
218 task.wake();
219 }
220 }
221
222 // If we registered a task for cancel notification drop it to reduce
223 // spurious wakeups
224 if let Some(mut slot) = self.tx_task.try_lock() {
225 drop(slot.take());
226 }
227 }
228
229 fn close_rx(&self) {
230 // Flag our completion and then attempt to wake up the sender if it's
231 // blocked. See comments in `drop` below for more info
232 self.complete.store(true, SeqCst);
233 if let Some(mut handle) = self.tx_task.try_lock() {
234 if let Some(task) = handle.take() {
235 drop(handle);
236 task.wake()
237 }
238 }
239 }
240
241 fn try_recv(&self) -> Result<Option<T>, Canceled> {
242 // If we're complete, either `::close_rx` or `::drop_tx` was called.
243 // We can assume a successful send if data is present.
244 if self.complete.load(SeqCst) {
245 if let Some(mut slot) = self.data.try_lock() {
246 if let Some(data) = slot.take() {
247 return Ok(Some(data));
248 }
249 }
250 Err(Canceled)
251 } else {
252 Ok(None)
253 }
254 }
255
256 fn recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
257 // Check to see if some data has arrived. If it hasn't then we need to
258 // block our task.
259 //
260 // Note that the acquisition of the `rx_task` lock might fail below, but
261 // the only situation where this can happen is during `Sender::drop`
262 // when we are indeed completed already. If that's happening then we
263 // know we're completed so keep going.
264 let done = if self.complete.load(SeqCst) {
265 true
266 } else {
267 let task = cx.waker().clone();
268 match self.rx_task.try_lock() {
269 Some(mut slot) => {
270 *slot = Some(task);
271 false
272 }
273 None => true,
274 }
275 };
276
277 // If we're `done` via one of the paths above, then look at the data and
278 // figure out what the answer is. If, however, we stored `rx_task`
279 // successfully above we need to check again if we're completed in case
280 // a message was sent while `rx_task` was locked and couldn't notify us
281 // otherwise.
282 //
283 // If we're not done, and we're not complete, though, then we've
284 // successfully blocked our task and we return `Pending`.
285 if done || self.complete.load(SeqCst) {
286 // If taking the lock fails, the sender will realise that the we're
287 // `done` when it checks the `complete` flag on the way out, and
288 // will treat the send as a failure.
289 if let Some(mut slot) = self.data.try_lock() {
290 if let Some(data) = slot.take() {
291 return Poll::Ready(Ok(data));
292 }
293 }
294 Poll::Ready(Err(Canceled))
295 } else {
296 Poll::Pending
297 }
298 }
299
300 fn drop_rx(&self) {
301 // Indicate to the `Sender` that we're done, so any future calls to
302 // `poll_canceled` are weeded out.
303 self.complete.store(true, SeqCst);
304
305 // If we've blocked a task then there's no need for it to stick around,
306 // so we need to drop it. If this lock acquisition fails, though, then
307 // it's just because our `Sender` is trying to take the task, so we
308 // let them take care of that.
309 if let Some(mut slot) = self.rx_task.try_lock() {
310 let task = slot.take();
311 drop(slot);
312 drop(task);
313 }
314
315 // Finally, if our `Sender` wants to get notified of us going away, it
316 // would have stored something in `tx_task`. Here we try to peel that
317 // out and unpark it.
318 //
319 // Note that the `try_lock` here may fail, but only if the `Sender` is
320 // in the process of filling in the task. If that happens then we
321 // already flagged `complete` and they'll pick that up above.
322 if let Some(mut handle) = self.tx_task.try_lock() {
323 if let Some(task) = handle.take() {
324 drop(handle);
325 task.wake()
326 }
327 }
328 }
329}
330
331impl<T> Sender<T> {
332 /// Completes this oneshot with a successful result.
333 ///
334 /// This function will consume `self` and indicate to the other end, the
335 /// [`Receiver`](Receiver), that the value provided is the result of the
336 /// computation this represents.
337 ///
338 /// If the value is successfully enqueued for the remote end to receive,
339 /// then `Ok(())` is returned. If the receiving end was dropped before
340 /// this function was called, however, then `Err(t)` is returned.
341 pub fn send(self, t: T) -> Result<(), T> {
342 self.inner.send(t)
343 }
344
345 /// Polls this `Sender` half to detect whether its associated
346 /// [`Receiver`](Receiver) has been dropped.
347 ///
348 /// # Return values
349 ///
350 /// If `Ready(())` is returned then the associated `Receiver` has been
351 /// dropped, which means any work required for sending should be canceled.
352 ///
353 /// If `Pending` is returned then the associated `Receiver` is still
354 /// alive and may be able to receive a message if sent. The current task,
355 /// however, is scheduled to receive a notification if the corresponding
356 /// `Receiver` goes away.
357 pub fn poll_canceled(&mut self, cx: &mut Context<'_>) -> Poll<()> {
358 self.inner.poll_canceled(cx)
359 }
360
361 /// Creates a future that resolves when this `Sender`'s corresponding
362 /// [`Receiver`](Receiver) half has hung up.
363 ///
364 /// This is a utility wrapping [`poll_canceled`](Sender::poll_canceled)
365 /// to expose a [`Future`](core::future::Future).
366 pub fn cancellation(&mut self) -> Cancellation<'_, T> {
367 Cancellation { inner: self }
368 }
369
370 /// Tests to see whether this `Sender`'s corresponding `Receiver`
371 /// has been dropped.
372 ///
373 /// Unlike [`poll_canceled`](Sender::poll_canceled), this function does not
374 /// enqueue a task for wakeup upon cancellation, but merely reports the
375 /// current state, which may be subject to concurrent modification.
376 pub fn is_canceled(&self) -> bool {
377 self.inner.is_canceled()
378 }
379
380 /// Tests to see whether this `Sender` is connected to the given `Receiver`. That is, whether
381 /// they were created by the same call to `channel`.
382 pub fn is_connected_to(&self, receiver: &Receiver<T>) -> bool {
383 Arc::ptr_eq(&self.inner, &receiver.inner)
384 }
385}
386
387impl<T> Drop for Sender<T> {
388 fn drop(&mut self) {
389 self.inner.drop_tx()
390 }
391}
392
393impl<T> fmt::Debug for Sender<T> {
394 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
395 f.debug_struct("Sender").field(name:"complete", &self.inner.complete).finish()
396 }
397}
398
399/// A future that resolves when the receiving end of a channel has hung up.
400///
401/// This is an `.await`-friendly interface around [`poll_canceled`](Sender::poll_canceled).
402#[must_use = "futures do nothing unless you `.await` or poll them"]
403#[derive(Debug)]
404pub struct Cancellation<'a, T> {
405 inner: &'a mut Sender<T>,
406}
407
408impl<T> Future for Cancellation<'_, T> {
409 type Output = ();
410
411 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
412 self.inner.poll_canceled(cx)
413 }
414}
415
416/// Error returned from a [`Receiver`](Receiver) when the corresponding
417/// [`Sender`](Sender) is dropped.
418#[derive(Clone, Copy, PartialEq, Eq, Debug)]
419pub struct Canceled;
420
421impl fmt::Display for Canceled {
422 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423 write!(f, "oneshot canceled")
424 }
425}
426
427#[cfg(feature = "std")]
428impl std::error::Error for Canceled {}
429
430impl<T> Receiver<T> {
431 /// Gracefully close this receiver, preventing any subsequent attempts to
432 /// send to it.
433 ///
434 /// Any `send` operation which happens after this method returns is
435 /// guaranteed to fail. After calling this method, you can use
436 /// [`Receiver::poll`](core::future::Future::poll) to determine whether a
437 /// message had previously been sent.
438 pub fn close(&mut self) {
439 self.inner.close_rx()
440 }
441
442 /// Attempts to receive a message outside of the context of a task.
443 ///
444 /// Does not schedule a task wakeup or have any other side effects.
445 ///
446 /// A return value of `None` must be considered immediately stale (out of
447 /// date) unless [`close`](Receiver::close) has been called first.
448 ///
449 /// Returns an error if the sender was dropped.
450 pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
451 self.inner.try_recv()
452 }
453}
454
455impl<T> Future for Receiver<T> {
456 type Output = Result<T, Canceled>;
457
458 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, Canceled>> {
459 self.inner.recv(cx)
460 }
461}
462
463impl<T> FusedFuture for Receiver<T> {
464 fn is_terminated(&self) -> bool {
465 if self.inner.complete.load(order:SeqCst) {
466 if let Some(slot: TryLock<'_, Option>) = self.inner.data.try_lock() {
467 if slot.is_some() {
468 return false;
469 }
470 }
471 true
472 } else {
473 false
474 }
475 }
476}
477
478impl<T> Drop for Receiver<T> {
479 fn drop(&mut self) {
480 self.inner.drop_rx()
481 }
482}
483
484impl<T> fmt::Debug for Receiver<T> {
485 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486 f.debug_struct("Receiver").field(name:"complete", &self.inner.complete).finish()
487 }
488}
489