1 | use crate::stream::StreamExt; |
2 | use core::pin::Pin; |
3 | use futures_core::future::{FusedFuture, Future}; |
4 | use futures_core::ready; |
5 | use futures_core::stream::Stream; |
6 | use futures_core::task::{Context, Poll}; |
7 | |
8 | /// Future for the [`into_future`](super::StreamExt::into_future) method. |
9 | #[derive(Debug)] |
10 | #[must_use = "futures do nothing unless you `.await` or poll them" ] |
11 | pub struct StreamFuture<St> { |
12 | stream: Option<St>, |
13 | } |
14 | |
15 | impl<St: Stream + Unpin> StreamFuture<St> { |
16 | pub(super) fn new(stream: St) -> Self { |
17 | Self { stream: Some(stream) } |
18 | } |
19 | |
20 | /// Acquires a reference to the underlying stream that this combinator is |
21 | /// pulling from. |
22 | /// |
23 | /// This method returns an `Option` to account for the fact that `StreamFuture`'s |
24 | /// implementation of `Future::poll` consumes the underlying stream during polling |
25 | /// in order to return it to the caller of `Future::poll` if the stream yielded |
26 | /// an element. |
27 | pub fn get_ref(&self) -> Option<&St> { |
28 | self.stream.as_ref() |
29 | } |
30 | |
31 | /// Acquires a mutable reference to the underlying stream that this |
32 | /// combinator is pulling from. |
33 | /// |
34 | /// Note that care must be taken to avoid tampering with the state of the |
35 | /// stream which may otherwise confuse this combinator. |
36 | /// |
37 | /// This method returns an `Option` to account for the fact that `StreamFuture`'s |
38 | /// implementation of `Future::poll` consumes the underlying stream during polling |
39 | /// in order to return it to the caller of `Future::poll` if the stream yielded |
40 | /// an element. |
41 | pub fn get_mut(&mut self) -> Option<&mut St> { |
42 | self.stream.as_mut() |
43 | } |
44 | |
45 | /// Acquires a pinned mutable reference to the underlying stream that this |
46 | /// combinator is pulling from. |
47 | /// |
48 | /// Note that care must be taken to avoid tampering with the state of the |
49 | /// stream which may otherwise confuse this combinator. |
50 | /// |
51 | /// This method returns an `Option` to account for the fact that `StreamFuture`'s |
52 | /// implementation of `Future::poll` consumes the underlying stream during polling |
53 | /// in order to return it to the caller of `Future::poll` if the stream yielded |
54 | /// an element. |
55 | pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> { |
56 | self.get_mut().stream.as_mut().map(Pin::new) |
57 | } |
58 | |
59 | /// Consumes this combinator, returning the underlying stream. |
60 | /// |
61 | /// Note that this may discard intermediate state of this combinator, so |
62 | /// care should be taken to avoid losing resources when this is called. |
63 | /// |
64 | /// This method returns an `Option` to account for the fact that `StreamFuture`'s |
65 | /// implementation of `Future::poll` consumes the underlying stream during polling |
66 | /// in order to return it to the caller of `Future::poll` if the stream yielded |
67 | /// an element. |
68 | pub fn into_inner(self) -> Option<St> { |
69 | self.stream |
70 | } |
71 | } |
72 | |
73 | impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> { |
74 | fn is_terminated(&self) -> bool { |
75 | self.stream.is_none() |
76 | } |
77 | } |
78 | |
79 | impl<St: Stream + Unpin> Future for StreamFuture<St> { |
80 | type Output = (Option<St::Item>, St); |
81 | |
82 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
83 | let item = { |
84 | let s = self.stream.as_mut().expect("polling StreamFuture twice" ); |
85 | ready!(s.poll_next_unpin(cx)) |
86 | }; |
87 | let stream = self.stream.take().unwrap(); |
88 | Poll::Ready((item, stream)) |
89 | } |
90 | } |
91 | |