1use crate::stream::StreamExt;
2use core::pin::Pin;
3use futures_core::future::{FusedFuture, Future};
4use futures_core::ready;
5use futures_core::stream::Stream;
6use futures_core::task::{Context, Poll};
7
8/// Future for the [`into_future`](super::StreamExt::into_future) method.
9#[derive(Debug)]
10#[must_use = "futures do nothing unless you `.await` or poll them"]
11pub struct StreamFuture<St> {
12 stream: Option<St>,
13}
14
15impl<St: Stream + Unpin> StreamFuture<St> {
16 pub(super) fn new(stream: St) -> Self {
17 Self { stream: Some(stream) }
18 }
19
20 /// Acquires a reference to the underlying stream that this combinator is
21 /// pulling from.
22 ///
23 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
24 /// implementation of `Future::poll` consumes the underlying stream during polling
25 /// in order to return it to the caller of `Future::poll` if the stream yielded
26 /// an element.
27 pub fn get_ref(&self) -> Option<&St> {
28 self.stream.as_ref()
29 }
30
31 /// Acquires a mutable reference to the underlying stream that this
32 /// combinator is pulling from.
33 ///
34 /// Note that care must be taken to avoid tampering with the state of the
35 /// stream which may otherwise confuse this combinator.
36 ///
37 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
38 /// implementation of `Future::poll` consumes the underlying stream during polling
39 /// in order to return it to the caller of `Future::poll` if the stream yielded
40 /// an element.
41 pub fn get_mut(&mut self) -> Option<&mut St> {
42 self.stream.as_mut()
43 }
44
45 /// Acquires a pinned mutable reference to the underlying stream that this
46 /// combinator is pulling from.
47 ///
48 /// Note that care must be taken to avoid tampering with the state of the
49 /// stream which may otherwise confuse this combinator.
50 ///
51 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
52 /// implementation of `Future::poll` consumes the underlying stream during polling
53 /// in order to return it to the caller of `Future::poll` if the stream yielded
54 /// an element.
55 pub fn get_pin_mut(self: Pin<&mut Self>) -> Option<Pin<&mut St>> {
56 self.get_mut().stream.as_mut().map(Pin::new)
57 }
58
59 /// Consumes this combinator, returning the underlying stream.
60 ///
61 /// Note that this may discard intermediate state of this combinator, so
62 /// care should be taken to avoid losing resources when this is called.
63 ///
64 /// This method returns an `Option` to account for the fact that `StreamFuture`'s
65 /// implementation of `Future::poll` consumes the underlying stream during polling
66 /// in order to return it to the caller of `Future::poll` if the stream yielded
67 /// an element.
68 pub fn into_inner(self) -> Option<St> {
69 self.stream
70 }
71}
72
73impl<St: Stream + Unpin> FusedFuture for StreamFuture<St> {
74 fn is_terminated(&self) -> bool {
75 self.stream.is_none()
76 }
77}
78
79impl<St: Stream + Unpin> Future for StreamFuture<St> {
80 type Output = (Option<St::Item>, St);
81
82 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 let item = {
84 let s = self.stream.as_mut().expect("polling StreamFuture twice");
85 ready!(s.poll_next_unpin(cx))
86 };
87 let stream = self.stream.take().unwrap();
88 Poll::Ready((item, stream))
89 }
90}
91