1use core::fmt;
2use core::pin::Pin;
3use futures_core::future::Future;
4use futures_core::ready;
5use futures_core::stream::{FusedStream, Stream};
6use futures_core::task::{Context, Poll};
7#[cfg(feature = "sink")]
8use futures_sink::Sink;
9use pin_project_lite::pin_project;
10
11// FIXME: docs, tests
12
13pin_project! {
14 /// Stream for the [`take_until`](super::StreamExt::take_until) method.
15 #[must_use = "streams do nothing unless polled"]
16 pub struct TakeUntil<St: Stream, Fut: Future> {
17 #[pin]
18 stream: St,
19 // Contains the inner Future on start and None once the inner Future is resolved
20 // or taken out by the user.
21 #[pin]
22 fut: Option<Fut>,
23 // Contains fut's return value once fut is resolved
24 fut_result: Option<Fut::Output>,
25 // Whether the future was taken out by the user.
26 free: bool,
27 }
28}
29
30impl<St, Fut> fmt::Debug for TakeUntil<St, Fut>
31where
32 St: Stream + fmt::Debug,
33 St::Item: fmt::Debug,
34 Fut: Future + fmt::Debug,
35{
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 f.debug_struct("TakeUntil").field("stream", &self.stream).field("fut", &self.fut).finish()
38 }
39}
40
41impl<St, Fut> TakeUntil<St, Fut>
42where
43 St: Stream,
44 Fut: Future,
45{
46 pub(super) fn new(stream: St, fut: Fut) -> Self {
47 Self { stream, fut: Some(fut), fut_result: None, free: false }
48 }
49
50 delegate_access_inner!(stream, St, ());
51
52 /// Extract the stopping future out of the combinator.
53 /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
54 /// Taking out the future means the combinator will be yielding
55 /// elements from the wrapped stream without ever stopping it.
56 pub fn take_future(&mut self) -> Option<Fut> {
57 if self.fut.is_some() {
58 self.free = true;
59 }
60
61 self.fut.take()
62 }
63
64 /// Once the stopping future is resolved, this method can be used
65 /// to extract the value returned by the stopping future.
66 ///
67 /// This may be used to retrieve arbitrary data from the stopping
68 /// future, for example a reason why the stream was stopped.
69 ///
70 /// This method will return `None` if the future isn't resolved yet,
71 /// or if the result was already taken out.
72 ///
73 /// # Examples
74 ///
75 /// ```
76 /// # futures::executor::block_on(async {
77 /// use futures::future;
78 /// use futures::stream::{self, StreamExt};
79 /// use futures::task::Poll;
80 ///
81 /// let stream = stream::iter(1..=10);
82 ///
83 /// let mut i = 0;
84 /// let stop_fut = future::poll_fn(|_cx| {
85 /// i += 1;
86 /// if i <= 5 {
87 /// Poll::Pending
88 /// } else {
89 /// Poll::Ready("reason")
90 /// }
91 /// });
92 ///
93 /// let mut stream = stream.take_until(stop_fut);
94 /// let _ = stream.by_ref().collect::<Vec<_>>().await;
95 ///
96 /// let result = stream.take_result().unwrap();
97 /// assert_eq!(result, "reason");
98 /// # });
99 /// ```
100 pub fn take_result(&mut self) -> Option<Fut::Output> {
101 self.fut_result.take()
102 }
103
104 /// Whether the stream was stopped yet by the stopping future
105 /// being resolved.
106 pub fn is_stopped(&self) -> bool {
107 !self.free && self.fut.is_none()
108 }
109}
110
111impl<St, Fut> Stream for TakeUntil<St, Fut>
112where
113 St: Stream,
114 Fut: Future,
115{
116 type Item = St::Item;
117
118 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
119 let mut this = self.project();
120
121 if let Some(f) = this.fut.as_mut().as_pin_mut() {
122 if let Poll::Ready(result) = f.poll(cx) {
123 this.fut.set(None);
124 *this.fut_result = Some(result);
125 }
126 }
127
128 if !*this.free && this.fut.is_none() {
129 // Future resolved, inner stream stopped
130 Poll::Ready(None)
131 } else {
132 // Future either not resolved yet or taken out by the user
133 let item = ready!(this.stream.poll_next(cx));
134 if item.is_none() {
135 this.fut.set(None);
136 }
137 Poll::Ready(item)
138 }
139 }
140
141 fn size_hint(&self) -> (usize, Option<usize>) {
142 if self.is_stopped() {
143 return (0, Some(0));
144 }
145
146 self.stream.size_hint()
147 }
148}
149
150impl<St, Fut> FusedStream for TakeUntil<St, Fut>
151where
152 St: Stream,
153 Fut: Future,
154{
155 fn is_terminated(&self) -> bool {
156 self.is_stopped()
157 }
158}
159
160// Forwarding impl of Sink from the underlying stream
161#[cfg(feature = "sink")]
162impl<S, Fut, Item> Sink<Item> for TakeUntil<S, Fut>
163where
164 S: Stream + Sink<Item>,
165 Fut: Future,
166{
167 type Error = S::Error;
168
169 delegate_sink!(stream, Item);
170}
171