1 | //! Futures |
2 | //! |
3 | //! This module contains a number of functions for working with `Future`s, |
4 | //! including the `FutureExt` trait which adds methods to `Future` types. |
5 | |
6 | #[cfg (feature = "alloc" )] |
7 | use alloc::boxed::Box; |
8 | use core::pin::Pin; |
9 | |
10 | use crate::fns::{inspect_fn, into_fn, ok_fn, InspectFn, IntoFn, OkFn}; |
11 | use crate::future::{assert_future, Either}; |
12 | use crate::never::Never; |
13 | use crate::stream::assert_stream; |
14 | #[cfg (feature = "alloc" )] |
15 | use futures_core::future::{BoxFuture, LocalBoxFuture}; |
16 | use futures_core::{ |
17 | future::Future, |
18 | stream::Stream, |
19 | task::{Context, Poll}, |
20 | }; |
21 | use pin_utils::pin_mut; |
22 | |
23 | // Combinators |
24 | |
25 | mod flatten; |
26 | mod fuse; |
27 | mod map; |
28 | |
29 | delegate_all!( |
30 | /// Future for the [`flatten`](super::FutureExt::flatten) method. |
31 | Flatten<F>( |
32 | flatten::Flatten<F, <F as Future>::Output> |
33 | ): Debug + Future + FusedFuture + New[|x: F| flatten::Flatten::new(x)] |
34 | where F: Future |
35 | ); |
36 | |
37 | delegate_all!( |
38 | /// Stream for the [`flatten_stream`](FutureExt::flatten_stream) method. |
39 | FlattenStream<F>( |
40 | flatten::Flatten<F, <F as Future>::Output> |
41 | ): Debug + Sink + Stream + FusedStream + New[|x: F| flatten::Flatten::new(x)] |
42 | where F: Future |
43 | ); |
44 | |
45 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
46 | pub use fuse::Fuse; |
47 | |
48 | delegate_all!( |
49 | /// Future for the [`map`](super::FutureExt::map) method. |
50 | Map<Fut, F>( |
51 | map::Map<Fut, F> |
52 | ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, f)] |
53 | ); |
54 | |
55 | delegate_all!( |
56 | /// Stream for the [`into_stream`](FutureExt::into_stream) method. |
57 | IntoStream<F>( |
58 | crate::stream::Once<F> |
59 | ): Debug + Stream + FusedStream + New[|x: F| crate::stream::Once::new(x)] |
60 | ); |
61 | |
62 | delegate_all!( |
63 | /// Future for the [`map_into`](FutureExt::map_into) combinator. |
64 | MapInto<Fut, T>( |
65 | Map<Fut, IntoFn<T>> |
66 | ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, into_fn())] |
67 | ); |
68 | |
69 | delegate_all!( |
70 | /// Future for the [`then`](FutureExt::then) method. |
71 | Then<Fut1, Fut2, F>( |
72 | flatten::Flatten<Map<Fut1, F>, Fut2> |
73 | ): Debug + Future + FusedFuture + New[|x: Fut1, y: F| flatten::Flatten::new(Map::new(x, y))] |
74 | ); |
75 | |
76 | delegate_all!( |
77 | /// Future for the [`inspect`](FutureExt::inspect) method. |
78 | Inspect<Fut, F>( |
79 | map::Map<Fut, InspectFn<F>> |
80 | ): Debug + Future + FusedFuture + New[|x: Fut, f: F| map::Map::new(x, inspect_fn(f))] |
81 | ); |
82 | |
83 | delegate_all!( |
84 | /// Future for the [`never_error`](super::FutureExt::never_error) combinator. |
85 | NeverError<Fut>( |
86 | Map<Fut, OkFn<Never>> |
87 | ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())] |
88 | ); |
89 | |
90 | delegate_all!( |
91 | /// Future for the [`unit_error`](super::FutureExt::unit_error) combinator. |
92 | UnitError<Fut>( |
93 | Map<Fut, OkFn<()>> |
94 | ): Debug + Future + FusedFuture + New[|x: Fut| Map::new(x, ok_fn())] |
95 | ); |
96 | |
97 | #[cfg (feature = "std" )] |
98 | mod catch_unwind; |
99 | #[cfg (feature = "std" )] |
100 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
101 | pub use self::catch_unwind::CatchUnwind; |
102 | |
103 | #[cfg (feature = "channel" )] |
104 | #[cfg_attr (docsrs, doc(cfg(feature = "channel" )))] |
105 | #[cfg (feature = "std" )] |
106 | mod remote_handle; |
107 | #[cfg (feature = "channel" )] |
108 | #[cfg_attr (docsrs, doc(cfg(feature = "channel" )))] |
109 | #[cfg (feature = "std" )] |
110 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
111 | pub use self::remote_handle::{Remote, RemoteHandle}; |
112 | |
113 | #[cfg (feature = "std" )] |
114 | mod shared; |
115 | #[cfg (feature = "std" )] |
116 | #[allow (unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 |
117 | pub use self::shared::{Shared, WeakShared}; |
118 | |
119 | impl<T: ?Sized> FutureExt for T where T: Future {} |
120 | |
121 | /// An extension trait for `Future`s that provides a variety of convenient |
122 | /// adapters. |
123 | pub trait FutureExt: Future { |
124 | /// Map this future's output to a different type, returning a new future of |
125 | /// the resulting type. |
126 | /// |
127 | /// This function is similar to the `Option::map` or `Iterator::map` where |
128 | /// it will change the type of the underlying future. This is useful to |
129 | /// chain along a computation once a future has been resolved. |
130 | /// |
131 | /// Note that this function consumes the receiving future and returns a |
132 | /// wrapped version of it, similar to the existing `map` methods in the |
133 | /// standard library. |
134 | /// |
135 | /// # Examples |
136 | /// |
137 | /// ``` |
138 | /// # futures::executor::block_on(async { |
139 | /// use futures::future::FutureExt; |
140 | /// |
141 | /// let future = async { 1 }; |
142 | /// let new_future = future.map(|x| x + 3); |
143 | /// assert_eq!(new_future.await, 4); |
144 | /// # }); |
145 | /// ``` |
146 | fn map<U, F>(self, f: F) -> Map<Self, F> |
147 | where |
148 | F: FnOnce(Self::Output) -> U, |
149 | Self: Sized, |
150 | { |
151 | assert_future::<U, _>(Map::new(self, f)) |
152 | } |
153 | |
154 | /// Map this future's output to a different type, returning a new future of |
155 | /// the resulting type. |
156 | /// |
157 | /// This function is equivalent to calling `map(Into::into)` but allows naming |
158 | /// the return type. |
159 | fn map_into<U>(self) -> MapInto<Self, U> |
160 | where |
161 | Self::Output: Into<U>, |
162 | Self: Sized, |
163 | { |
164 | assert_future::<U, _>(MapInto::new(self)) |
165 | } |
166 | |
167 | /// Chain on a computation for when a future finished, passing the result of |
168 | /// the future to the provided closure `f`. |
169 | /// |
170 | /// The returned value of the closure must implement the `Future` trait |
171 | /// and can represent some more work to be done before the composed future |
172 | /// is finished. |
173 | /// |
174 | /// The closure `f` is only run *after* successful completion of the `self` |
175 | /// future. |
176 | /// |
177 | /// Note that this function consumes the receiving future and returns a |
178 | /// wrapped version of it. |
179 | /// |
180 | /// # Examples |
181 | /// |
182 | /// ``` |
183 | /// # futures::executor::block_on(async { |
184 | /// use futures::future::FutureExt; |
185 | /// |
186 | /// let future_of_1 = async { 1 }; |
187 | /// let future_of_4 = future_of_1.then(|x| async move { x + 3 }); |
188 | /// assert_eq!(future_of_4.await, 4); |
189 | /// # }); |
190 | /// ``` |
191 | fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> |
192 | where |
193 | F: FnOnce(Self::Output) -> Fut, |
194 | Fut: Future, |
195 | Self: Sized, |
196 | { |
197 | assert_future::<Fut::Output, _>(Then::new(self, f)) |
198 | } |
199 | |
200 | /// Wrap this future in an `Either` future, making it the left-hand variant |
201 | /// of that `Either`. |
202 | /// |
203 | /// This can be used in combination with the `right_future` method to write `if` |
204 | /// statements that evaluate to different futures in different branches. |
205 | /// |
206 | /// # Examples |
207 | /// |
208 | /// ``` |
209 | /// # futures::executor::block_on(async { |
210 | /// use futures::future::FutureExt; |
211 | /// |
212 | /// let x = 6; |
213 | /// let future = if x < 10 { |
214 | /// async { true }.left_future() |
215 | /// } else { |
216 | /// async { false }.right_future() |
217 | /// }; |
218 | /// |
219 | /// assert_eq!(future.await, true); |
220 | /// # }); |
221 | /// ``` |
222 | fn left_future<B>(self) -> Either<Self, B> |
223 | where |
224 | B: Future<Output = Self::Output>, |
225 | Self: Sized, |
226 | { |
227 | assert_future::<Self::Output, _>(Either::Left(self)) |
228 | } |
229 | |
230 | /// Wrap this future in an `Either` future, making it the right-hand variant |
231 | /// of that `Either`. |
232 | /// |
233 | /// This can be used in combination with the `left_future` method to write `if` |
234 | /// statements that evaluate to different futures in different branches. |
235 | /// |
236 | /// # Examples |
237 | /// |
238 | /// ``` |
239 | /// # futures::executor::block_on(async { |
240 | /// use futures::future::FutureExt; |
241 | /// |
242 | /// let x = 6; |
243 | /// let future = if x > 10 { |
244 | /// async { true }.left_future() |
245 | /// } else { |
246 | /// async { false }.right_future() |
247 | /// }; |
248 | /// |
249 | /// assert_eq!(future.await, false); |
250 | /// # }); |
251 | /// ``` |
252 | fn right_future<A>(self) -> Either<A, Self> |
253 | where |
254 | A: Future<Output = Self::Output>, |
255 | Self: Sized, |
256 | { |
257 | assert_future::<Self::Output, _>(Either::Right(self)) |
258 | } |
259 | |
260 | /// Convert this future into a single element stream. |
261 | /// |
262 | /// The returned stream contains single success if this future resolves to |
263 | /// success or single error if this future resolves into error. |
264 | /// |
265 | /// # Examples |
266 | /// |
267 | /// ``` |
268 | /// # futures::executor::block_on(async { |
269 | /// use futures::future::FutureExt; |
270 | /// use futures::stream::StreamExt; |
271 | /// |
272 | /// let future = async { 17 }; |
273 | /// let stream = future.into_stream(); |
274 | /// let collected: Vec<_> = stream.collect().await; |
275 | /// assert_eq!(collected, vec![17]); |
276 | /// # }); |
277 | /// ``` |
278 | fn into_stream(self) -> IntoStream<Self> |
279 | where |
280 | Self: Sized, |
281 | { |
282 | assert_stream::<Self::Output, _>(IntoStream::new(self)) |
283 | } |
284 | |
285 | /// Flatten the execution of this future when the output of this |
286 | /// future is itself another future. |
287 | /// |
288 | /// This can be useful when combining futures together to flatten the |
289 | /// computation out the final result. |
290 | /// |
291 | /// This method is roughly equivalent to `self.then(|x| x)`. |
292 | /// |
293 | /// Note that this function consumes the receiving future and returns a |
294 | /// wrapped version of it. |
295 | /// |
296 | /// # Examples |
297 | /// |
298 | /// ``` |
299 | /// # futures::executor::block_on(async { |
300 | /// use futures::future::FutureExt; |
301 | /// |
302 | /// let nested_future = async { async { 1 } }; |
303 | /// let future = nested_future.flatten(); |
304 | /// assert_eq!(future.await, 1); |
305 | /// # }); |
306 | /// ``` |
307 | fn flatten(self) -> Flatten<Self> |
308 | where |
309 | Self::Output: Future, |
310 | Self: Sized, |
311 | { |
312 | let f = Flatten::new(self); |
313 | assert_future::<<<Self as Future>::Output as Future>::Output, _>(f) |
314 | } |
315 | |
316 | /// Flatten the execution of this future when the successful result of this |
317 | /// future is a stream. |
318 | /// |
319 | /// This can be useful when stream initialization is deferred, and it is |
320 | /// convenient to work with that stream as if stream was available at the |
321 | /// call site. |
322 | /// |
323 | /// Note that this function consumes this future and returns a wrapped |
324 | /// version of it. |
325 | /// |
326 | /// # Examples |
327 | /// |
328 | /// ``` |
329 | /// # futures::executor::block_on(async { |
330 | /// use futures::future::FutureExt; |
331 | /// use futures::stream::{self, StreamExt}; |
332 | /// |
333 | /// let stream_items = vec![17, 18, 19]; |
334 | /// let future_of_a_stream = async { stream::iter(stream_items) }; |
335 | /// |
336 | /// let stream = future_of_a_stream.flatten_stream(); |
337 | /// let list: Vec<_> = stream.collect().await; |
338 | /// assert_eq!(list, vec![17, 18, 19]); |
339 | /// # }); |
340 | /// ``` |
341 | fn flatten_stream(self) -> FlattenStream<Self> |
342 | where |
343 | Self::Output: Stream, |
344 | Self: Sized, |
345 | { |
346 | assert_stream::<<Self::Output as Stream>::Item, _>(FlattenStream::new(self)) |
347 | } |
348 | |
349 | /// Fuse a future such that `poll` will never again be called once it has |
350 | /// completed. This method can be used to turn any `Future` into a |
351 | /// `FusedFuture`. |
352 | /// |
353 | /// Normally, once a future has returned `Poll::Ready` from `poll`, |
354 | /// any further calls could exhibit bad behavior such as blocking |
355 | /// forever, panicking, never returning, etc. If it is known that `poll` |
356 | /// may be called too often then this method can be used to ensure that it |
357 | /// has defined semantics. |
358 | /// |
359 | /// If a `fuse`d future is `poll`ed after having returned `Poll::Ready` |
360 | /// previously, it will return `Poll::Pending`, from `poll` again (and will |
361 | /// continue to do so for all future calls to `poll`). |
362 | /// |
363 | /// This combinator will drop the underlying future as soon as it has been |
364 | /// completed to ensure resources are reclaimed as soon as possible. |
365 | fn fuse(self) -> Fuse<Self> |
366 | where |
367 | Self: Sized, |
368 | { |
369 | let f = Fuse::new(self); |
370 | assert_future::<Self::Output, _>(f) |
371 | } |
372 | |
373 | /// Do something with the output of a future before passing it on. |
374 | /// |
375 | /// When using futures, you'll often chain several of them together. While |
376 | /// working on such code, you might want to check out what's happening at |
377 | /// various parts in the pipeline, without consuming the intermediate |
378 | /// value. To do that, insert a call to `inspect`. |
379 | /// |
380 | /// # Examples |
381 | /// |
382 | /// ``` |
383 | /// # futures::executor::block_on(async { |
384 | /// use futures::future::FutureExt; |
385 | /// |
386 | /// let future = async { 1 }; |
387 | /// let new_future = future.inspect(|&x| println!("about to resolve: {}" , x)); |
388 | /// assert_eq!(new_future.await, 1); |
389 | /// # }); |
390 | /// ``` |
391 | fn inspect<F>(self, f: F) -> Inspect<Self, F> |
392 | where |
393 | F: FnOnce(&Self::Output), |
394 | Self: Sized, |
395 | { |
396 | assert_future::<Self::Output, _>(Inspect::new(self, f)) |
397 | } |
398 | |
399 | /// Catches unwinding panics while polling the future. |
400 | /// |
401 | /// In general, panics within a future can propagate all the way out to the |
402 | /// task level. This combinator makes it possible to halt unwinding within |
403 | /// the future itself. It's most commonly used within task executors. It's |
404 | /// not recommended to use this for error handling. |
405 | /// |
406 | /// Note that this method requires the `UnwindSafe` bound from the standard |
407 | /// library. This isn't always applied automatically, and the standard |
408 | /// library provides an `AssertUnwindSafe` wrapper type to apply it |
409 | /// after-the fact. To assist using this method, the `Future` trait is also |
410 | /// implemented for `AssertUnwindSafe<F>` where `F` implements `Future`. |
411 | /// |
412 | /// This method is only available when the `std` feature of this |
413 | /// library is activated, and it is activated by default. |
414 | /// |
415 | /// # Examples |
416 | /// |
417 | /// ``` |
418 | /// # futures::executor::block_on(async { |
419 | /// use futures::future::{self, FutureExt, Ready}; |
420 | /// |
421 | /// let future = future::ready(2); |
422 | /// assert!(future.catch_unwind().await.is_ok()); |
423 | /// |
424 | /// let future = future::lazy(|_| -> Ready<i32> { |
425 | /// unimplemented!() |
426 | /// }); |
427 | /// assert!(future.catch_unwind().await.is_err()); |
428 | /// # }); |
429 | /// ``` |
430 | #[cfg (feature = "std" )] |
431 | fn catch_unwind(self) -> CatchUnwind<Self> |
432 | where |
433 | Self: Sized + ::std::panic::UnwindSafe, |
434 | { |
435 | assert_future::<Result<Self::Output, Box<dyn std::any::Any + Send>>, _>(CatchUnwind::new( |
436 | self, |
437 | )) |
438 | } |
439 | |
440 | /// Create a cloneable handle to this future where all handles will resolve |
441 | /// to the same result. |
442 | /// |
443 | /// The `shared` combinator method provides a method to convert any future |
444 | /// into a cloneable future. It enables a future to be polled by multiple |
445 | /// threads. |
446 | /// |
447 | /// This method is only available when the `std` feature of this |
448 | /// library is activated, and it is activated by default. |
449 | /// |
450 | /// # Examples |
451 | /// |
452 | /// ``` |
453 | /// # futures::executor::block_on(async { |
454 | /// use futures::future::FutureExt; |
455 | /// |
456 | /// let future = async { 6 }; |
457 | /// let shared1 = future.shared(); |
458 | /// let shared2 = shared1.clone(); |
459 | /// |
460 | /// assert_eq!(6, shared1.await); |
461 | /// assert_eq!(6, shared2.await); |
462 | /// # }); |
463 | /// ``` |
464 | /// |
465 | /// ``` |
466 | /// # futures::executor::block_on(async { |
467 | /// use futures::future::FutureExt; |
468 | /// use futures::executor::block_on; |
469 | /// use std::thread; |
470 | /// |
471 | /// let future = async { 6 }; |
472 | /// let shared1 = future.shared(); |
473 | /// let shared2 = shared1.clone(); |
474 | /// let join_handle = thread::spawn(move || { |
475 | /// assert_eq!(6, block_on(shared2)); |
476 | /// }); |
477 | /// assert_eq!(6, shared1.await); |
478 | /// join_handle.join().unwrap(); |
479 | /// # }); |
480 | /// ``` |
481 | #[cfg (feature = "std" )] |
482 | fn shared(self) -> Shared<Self> |
483 | where |
484 | Self: Sized, |
485 | Self::Output: Clone, |
486 | { |
487 | assert_future::<Self::Output, _>(Shared::new(self)) |
488 | } |
489 | |
490 | /// Turn this future into a future that yields `()` on completion and sends |
491 | /// its output to another future on a separate task. |
492 | /// |
493 | /// This can be used with spawning executors to easily retrieve the result |
494 | /// of a future executing on a separate task or thread. |
495 | /// |
496 | /// This method is only available when the `std` feature of this |
497 | /// library is activated, and it is activated by default. |
498 | #[cfg (feature = "channel" )] |
499 | #[cfg_attr (docsrs, doc(cfg(feature = "channel" )))] |
500 | #[cfg (feature = "std" )] |
501 | fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>) |
502 | where |
503 | Self: Sized, |
504 | { |
505 | let (wrapped, handle) = remote_handle::remote_handle(self); |
506 | (assert_future::<(), _>(wrapped), handle) |
507 | } |
508 | |
509 | /// Wrap the future in a Box, pinning it. |
510 | /// |
511 | /// This method is only available when the `std` or `alloc` feature of this |
512 | /// library is activated, and it is activated by default. |
513 | #[cfg (feature = "alloc" )] |
514 | fn boxed<'a>(self) -> BoxFuture<'a, Self::Output> |
515 | where |
516 | Self: Sized + Send + 'a, |
517 | { |
518 | assert_future::<Self::Output, _>(Box::pin(self)) |
519 | } |
520 | |
521 | /// Wrap the future in a Box, pinning it. |
522 | /// |
523 | /// Similar to `boxed`, but without the `Send` requirement. |
524 | /// |
525 | /// This method is only available when the `std` or `alloc` feature of this |
526 | /// library is activated, and it is activated by default. |
527 | #[cfg (feature = "alloc" )] |
528 | fn boxed_local<'a>(self) -> LocalBoxFuture<'a, Self::Output> |
529 | where |
530 | Self: Sized + 'a, |
531 | { |
532 | assert_future::<Self::Output, _>(Box::pin(self)) |
533 | } |
534 | |
535 | /// Turns a [`Future<Output = T>`](Future) into a |
536 | /// [`TryFuture<Ok = T, Error = ()`>](futures_core::future::TryFuture). |
537 | fn unit_error(self) -> UnitError<Self> |
538 | where |
539 | Self: Sized, |
540 | { |
541 | assert_future::<Result<Self::Output, ()>, _>(UnitError::new(self)) |
542 | } |
543 | |
544 | /// Turns a [`Future<Output = T>`](Future) into a |
545 | /// [`TryFuture<Ok = T, Error = Never`>](futures_core::future::TryFuture). |
546 | fn never_error(self) -> NeverError<Self> |
547 | where |
548 | Self: Sized, |
549 | { |
550 | assert_future::<Result<Self::Output, Never>, _>(NeverError::new(self)) |
551 | } |
552 | |
553 | /// A convenience for calling `Future::poll` on `Unpin` future types. |
554 | fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output> |
555 | where |
556 | Self: Unpin, |
557 | { |
558 | Pin::new(self).poll(cx) |
559 | } |
560 | |
561 | /// Evaluates and consumes the future, returning the resulting output if |
562 | /// the future is ready after the first call to `Future::poll`. |
563 | /// |
564 | /// If `poll` instead returns `Poll::Pending`, `None` is returned. |
565 | /// |
566 | /// This method is useful in cases where immediacy is more important than |
567 | /// waiting for a result. It is also convenient for quickly obtaining |
568 | /// the value of a future that is known to always resolve immediately. |
569 | /// |
570 | /// # Examples |
571 | /// |
572 | /// ``` |
573 | /// # use futures::prelude::*; |
574 | /// use futures::{future::ready, future::pending}; |
575 | /// let future_ready = ready("foobar" ); |
576 | /// let future_pending = pending::<&'static str>(); |
577 | /// |
578 | /// assert_eq!(future_ready.now_or_never(), Some("foobar" )); |
579 | /// assert_eq!(future_pending.now_or_never(), None); |
580 | /// ``` |
581 | /// |
582 | /// In cases where it is absolutely known that a future should always |
583 | /// resolve immediately and never return `Poll::Pending`, this method can |
584 | /// be combined with `expect()`: |
585 | /// |
586 | /// ``` |
587 | /// # use futures::{prelude::*, future::ready}; |
588 | /// let future_ready = ready("foobar" ); |
589 | /// |
590 | /// assert_eq!(future_ready.now_or_never().expect("Future not ready" ), "foobar" ); |
591 | /// ``` |
592 | fn now_or_never(self) -> Option<Self::Output> |
593 | where |
594 | Self: Sized, |
595 | { |
596 | let noop_waker = crate::task::noop_waker(); |
597 | let mut cx = Context::from_waker(&noop_waker); |
598 | |
599 | let this = self; |
600 | pin_mut!(this); |
601 | match this.poll(&mut cx) { |
602 | Poll::Ready(x) => Some(x), |
603 | _ => None, |
604 | } |
605 | } |
606 | } |
607 | |