1 | //! Asynchronous streams. |
2 | |
3 | use core::ops::DerefMut; |
4 | use core::pin::Pin; |
5 | use core::task::{Context, Poll}; |
6 | |
7 | /// An owned dynamically typed [`Stream`] for use in cases where you can't |
8 | /// statically type your result or need to add some indirection. |
9 | #[cfg (feature = "alloc" )] |
10 | pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>; |
11 | |
12 | /// `BoxStream`, but without the `Send` requirement. |
13 | #[cfg (feature = "alloc" )] |
14 | pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>; |
15 | |
16 | /// A stream of values produced asynchronously. |
17 | /// |
18 | /// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item |
19 | /// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream |
20 | /// represents a sequence of value-producing events that occur asynchronously to |
21 | /// the caller. |
22 | /// |
23 | /// The trait is modeled after `Future`, but allows `poll_next` to be called |
24 | /// even after a value has been produced, yielding `None` once the stream has |
25 | /// been fully exhausted. |
26 | #[must_use = "streams do nothing unless polled" ] |
27 | pub trait Stream { |
28 | /// Values yielded by the stream. |
29 | type Item; |
30 | |
31 | /// Attempt to pull out the next value of this stream, registering the |
32 | /// current task for wakeup if the value is not yet available, and returning |
33 | /// `None` if the stream is exhausted. |
34 | /// |
35 | /// # Return value |
36 | /// |
37 | /// There are several possible return values, each indicating a distinct |
38 | /// stream state: |
39 | /// |
40 | /// - `Poll::Pending` means that this stream's next value is not ready |
41 | /// yet. Implementations will ensure that the current task will be notified |
42 | /// when the next value may be ready. |
43 | /// |
44 | /// - `Poll::Ready(Some(val))` means that the stream has successfully |
45 | /// produced a value, `val`, and may produce further values on subsequent |
46 | /// `poll_next` calls. |
47 | /// |
48 | /// - `Poll::Ready(None)` means that the stream has terminated, and |
49 | /// `poll_next` should not be invoked again. |
50 | /// |
51 | /// # Panics |
52 | /// |
53 | /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its |
54 | /// `poll_next` method again may panic, block forever, or cause other kinds of |
55 | /// problems; the `Stream` trait places no requirements on the effects of |
56 | /// such a call. However, as the `poll_next` method is not marked `unsafe`, |
57 | /// Rust's usual rules apply: calls must never cause undefined behavior |
58 | /// (memory corruption, incorrect use of `unsafe` functions, or the like), |
59 | /// regardless of the stream's state. |
60 | /// |
61 | /// If this is difficult to guard against then the [`fuse`] adapter can be used |
62 | /// to ensure that `poll_next` always returns `Ready(None)` in subsequent |
63 | /// calls. |
64 | /// |
65 | /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse |
66 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; |
67 | |
68 | /// Returns the bounds on the remaining length of the stream. |
69 | /// |
70 | /// Specifically, `size_hint()` returns a tuple where the first element |
71 | /// is the lower bound, and the second element is the upper bound. |
72 | /// |
73 | /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. |
74 | /// A [`None`] here means that either there is no known upper bound, or the |
75 | /// upper bound is larger than [`usize`]. |
76 | /// |
77 | /// # Implementation notes |
78 | /// |
79 | /// It is not enforced that a stream implementation yields the declared |
80 | /// number of elements. A buggy stream may yield less than the lower bound |
81 | /// or more than the upper bound of elements. |
82 | /// |
83 | /// `size_hint()` is primarily intended to be used for optimizations such as |
84 | /// reserving space for the elements of the stream, but must not be |
85 | /// trusted to e.g., omit bounds checks in unsafe code. An incorrect |
86 | /// implementation of `size_hint()` should not lead to memory safety |
87 | /// violations. |
88 | /// |
89 | /// That said, the implementation should provide a correct estimation, |
90 | /// because otherwise it would be a violation of the trait's protocol. |
91 | /// |
92 | /// The default implementation returns `(0, `[`None`]`)` which is correct for any |
93 | /// stream. |
94 | #[inline ] |
95 | fn size_hint(&self) -> (usize, Option<usize>) { |
96 | (0, None) |
97 | } |
98 | } |
99 | |
100 | impl<S: ?Sized + Stream + Unpin> Stream for &mut S { |
101 | type Item = S::Item; |
102 | |
103 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
104 | S::poll_next(self:Pin::new(&mut **self), cx) |
105 | } |
106 | |
107 | fn size_hint(&self) -> (usize, Option<usize>) { |
108 | (**self).size_hint() |
109 | } |
110 | } |
111 | |
112 | impl<P> Stream for Pin<P> |
113 | where |
114 | P: DerefMut + Unpin, |
115 | P::Target: Stream, |
116 | { |
117 | type Item = <P::Target as Stream>::Item; |
118 | |
119 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
120 | self.get_mut().as_mut().poll_next(cx) |
121 | } |
122 | |
123 | fn size_hint(&self) -> (usize, Option<usize>) { |
124 | (**self).size_hint() |
125 | } |
126 | } |
127 | |
128 | /// A stream which tracks whether or not the underlying stream |
129 | /// should no longer be polled. |
130 | /// |
131 | /// `is_terminated` will return `true` if a future should no longer be polled. |
132 | /// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned |
133 | /// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a |
134 | /// stream has become inactive and can no longer make progress and should be |
135 | /// ignored or dropped rather than being polled again. |
136 | pub trait FusedStream: Stream { |
137 | /// Returns `true` if the stream should no longer be polled. |
138 | fn is_terminated(&self) -> bool; |
139 | } |
140 | |
141 | impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F { |
142 | fn is_terminated(&self) -> bool { |
143 | <F as FusedStream>::is_terminated(&**self) |
144 | } |
145 | } |
146 | |
147 | impl<P> FusedStream for Pin<P> |
148 | where |
149 | P: DerefMut + Unpin, |
150 | P::Target: FusedStream, |
151 | { |
152 | fn is_terminated(&self) -> bool { |
153 | <P::Target as FusedStream>::is_terminated(&**self) |
154 | } |
155 | } |
156 | |
157 | mod private_try_stream { |
158 | use super::Stream; |
159 | |
160 | pub trait Sealed {} |
161 | |
162 | impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {} |
163 | } |
164 | |
165 | /// A convenience for streams that return `Result` values that includes |
166 | /// a variety of adapters tailored to such futures. |
167 | pub trait TryStream: Stream + private_try_stream::Sealed { |
168 | /// The type of successful values yielded by this future |
169 | type Ok; |
170 | |
171 | /// The type of failures yielded by this future |
172 | type Error; |
173 | |
174 | /// Poll this `TryStream` as if it were a `Stream`. |
175 | /// |
176 | /// This method is a stopgap for a compiler limitation that prevents us from |
177 | /// directly inheriting from the `Stream` trait; in the future it won't be |
178 | /// needed. |
179 | fn try_poll_next( |
180 | self: Pin<&mut Self>, |
181 | cx: &mut Context<'_>, |
182 | ) -> Poll<Option<Result<Self::Ok, Self::Error>>>; |
183 | } |
184 | |
185 | impl<S, T, E> TryStream for S |
186 | where |
187 | S: ?Sized + Stream<Item = Result<T, E>>, |
188 | { |
189 | type Ok = T; |
190 | type Error = E; |
191 | |
192 | fn try_poll_next( |
193 | self: Pin<&mut Self>, |
194 | cx: &mut Context<'_>, |
195 | ) -> Poll<Option<Result<Self::Ok, Self::Error>>> { |
196 | self.poll_next(cx) |
197 | } |
198 | } |
199 | |
200 | #[cfg (feature = "alloc" )] |
201 | mod if_alloc { |
202 | use super::*; |
203 | use alloc::boxed::Box; |
204 | |
205 | impl<S: ?Sized + Stream + Unpin> Stream for Box<S> { |
206 | type Item = S::Item; |
207 | |
208 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
209 | Pin::new(&mut **self).poll_next(cx) |
210 | } |
211 | |
212 | fn size_hint(&self) -> (usize, Option<usize>) { |
213 | (**self).size_hint() |
214 | } |
215 | } |
216 | |
217 | #[cfg (feature = "std" )] |
218 | impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> { |
219 | type Item = S::Item; |
220 | |
221 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |
222 | unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) |
223 | } |
224 | |
225 | fn size_hint(&self) -> (usize, Option<usize>) { |
226 | self.0.size_hint() |
227 | } |
228 | } |
229 | |
230 | impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> { |
231 | fn is_terminated(&self) -> bool { |
232 | <S as FusedStream>::is_terminated(&**self) |
233 | } |
234 | } |
235 | } |
236 | |