1 | use core::fmt; |
2 | use core::pin::Pin; |
3 | use futures_core::future::Future; |
4 | use futures_core::ready; |
5 | use futures_core::stream::{FusedStream, Stream}; |
6 | use futures_core::task::{Context, Poll}; |
7 | #[cfg (feature = "sink" )] |
8 | use futures_sink::Sink; |
9 | use pin_project_lite::pin_project; |
10 | |
11 | // FIXME: docs, tests |
12 | |
13 | pin_project! { |
14 | /// Stream for the [`take_until`](super::StreamExt::take_until) method. |
15 | #[must_use = "streams do nothing unless polled" ] |
16 | pub struct TakeUntil<St: Stream, Fut: Future> { |
17 | #[pin] |
18 | stream: St, |
19 | // Contains the inner Future on start and None once the inner Future is resolved |
20 | // or taken out by the user. |
21 | #[pin] |
22 | fut: Option<Fut>, |
23 | // Contains fut's return value once fut is resolved |
24 | fut_result: Option<Fut::Output>, |
25 | // Whether the future was taken out by the user. |
26 | free: bool, |
27 | } |
28 | } |
29 | |
30 | impl<St, Fut> fmt::Debug for TakeUntil<St, Fut> |
31 | where |
32 | St: Stream + fmt::Debug, |
33 | St::Item: fmt::Debug, |
34 | Fut: Future + fmt::Debug, |
35 | { |
36 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
37 | f.debug_struct("TakeUntil" ).field("stream" , &self.stream).field("fut" , &self.fut).finish() |
38 | } |
39 | } |
40 | |
41 | impl<St, Fut> TakeUntil<St, Fut> |
42 | where |
43 | St: Stream, |
44 | Fut: Future, |
45 | { |
46 | pub(super) fn new(stream: St, fut: Fut) -> Self { |
47 | Self { stream, fut: Some(fut), fut_result: None, free: false } |
48 | } |
49 | |
50 | delegate_access_inner!(stream, St, ()); |
51 | |
52 | /// Extract the stopping future out of the combinator. |
53 | /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet. |
54 | /// Taking out the future means the combinator will be yielding |
55 | /// elements from the wrapped stream without ever stopping it. |
56 | pub fn take_future(&mut self) -> Option<Fut> { |
57 | if self.fut.is_some() { |
58 | self.free = true; |
59 | } |
60 | |
61 | self.fut.take() |
62 | } |
63 | |
64 | /// Once the stopping future is resolved, this method can be used |
65 | /// to extract the value returned by the stopping future. |
66 | /// |
67 | /// This may be used to retrieve arbitrary data from the stopping |
68 | /// future, for example a reason why the stream was stopped. |
69 | /// |
70 | /// This method will return `None` if the future isn't resolved yet, |
71 | /// or if the result was already taken out. |
72 | /// |
73 | /// # Examples |
74 | /// |
75 | /// ``` |
76 | /// # futures::executor::block_on(async { |
77 | /// use futures::future; |
78 | /// use futures::stream::{self, StreamExt}; |
79 | /// use futures::task::Poll; |
80 | /// |
81 | /// let stream = stream::iter(1..=10); |
82 | /// |
83 | /// let mut i = 0; |
84 | /// let stop_fut = future::poll_fn(|_cx| { |
85 | /// i += 1; |
86 | /// if i <= 5 { |
87 | /// Poll::Pending |
88 | /// } else { |
89 | /// Poll::Ready("reason" ) |
90 | /// } |
91 | /// }); |
92 | /// |
93 | /// let mut stream = stream.take_until(stop_fut); |
94 | /// let _ = stream.by_ref().collect::<Vec<_>>().await; |
95 | /// |
96 | /// let result = stream.take_result().unwrap(); |
97 | /// assert_eq!(result, "reason" ); |
98 | /// # }); |
99 | /// ``` |
100 | pub fn take_result(&mut self) -> Option<Fut::Output> { |
101 | self.fut_result.take() |
102 | } |
103 | |
104 | /// Whether the stream was stopped yet by the stopping future |
105 | /// being resolved. |
106 | pub fn is_stopped(&self) -> bool { |
107 | !self.free && self.fut.is_none() |
108 | } |
109 | } |
110 | |
111 | impl<St, Fut> Stream for TakeUntil<St, Fut> |
112 | where |
113 | St: Stream, |
114 | Fut: Future, |
115 | { |
116 | type Item = St::Item; |
117 | |
118 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> { |
119 | let mut this = self.project(); |
120 | |
121 | if let Some(f) = this.fut.as_mut().as_pin_mut() { |
122 | if let Poll::Ready(result) = f.poll(cx) { |
123 | this.fut.set(None); |
124 | *this.fut_result = Some(result); |
125 | } |
126 | } |
127 | |
128 | if !*this.free && this.fut.is_none() { |
129 | // Future resolved, inner stream stopped |
130 | Poll::Ready(None) |
131 | } else { |
132 | // Future either not resolved yet or taken out by the user |
133 | let item = ready!(this.stream.poll_next(cx)); |
134 | if item.is_none() { |
135 | this.fut.set(None); |
136 | } |
137 | Poll::Ready(item) |
138 | } |
139 | } |
140 | |
141 | fn size_hint(&self) -> (usize, Option<usize>) { |
142 | if self.is_stopped() { |
143 | return (0, Some(0)); |
144 | } |
145 | |
146 | self.stream.size_hint() |
147 | } |
148 | } |
149 | |
150 | impl<St, Fut> FusedStream for TakeUntil<St, Fut> |
151 | where |
152 | St: Stream, |
153 | Fut: Future, |
154 | { |
155 | fn is_terminated(&self) -> bool { |
156 | self.is_stopped() |
157 | } |
158 | } |
159 | |
160 | // Forwarding impl of Sink from the underlying stream |
161 | #[cfg (feature = "sink" )] |
162 | impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut> |
163 | where |
164 | S: Stream + Sink<Item>, |
165 | Fut: Future, |
166 | { |
167 | type Error = S::Error; |
168 | |
169 | delegate_sink!(stream, Item); |
170 | } |
171 | |