1//! Asynchronous streams.
2
3use core::ops::DerefMut;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7/// An owned dynamically typed [`Stream`] for use in cases where you can't
8/// statically type your result or need to add some indirection.
9#[cfg(feature = "alloc")]
10pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>;
11
12/// `BoxStream`, but without the `Send` requirement.
13#[cfg(feature = "alloc")]
14pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>;
15
16/// A stream of values produced asynchronously.
17///
18/// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item
19/// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream
20/// represents a sequence of value-producing events that occur asynchronously to
21/// the caller.
22///
23/// The trait is modeled after `Future`, but allows `poll_next` to be called
24/// even after a value has been produced, yielding `None` once the stream has
25/// been fully exhausted.
26#[must_use = "streams do nothing unless polled"]
27pub trait Stream {
28 /// Values yielded by the stream.
29 type Item;
30
31 /// Attempt to pull out the next value of this stream, registering the
32 /// current task for wakeup if the value is not yet available, and returning
33 /// `None` if the stream is exhausted.
34 ///
35 /// # Return value
36 ///
37 /// There are several possible return values, each indicating a distinct
38 /// stream state:
39 ///
40 /// - `Poll::Pending` means that this stream's next value is not ready
41 /// yet. Implementations will ensure that the current task will be notified
42 /// when the next value may be ready.
43 ///
44 /// - `Poll::Ready(Some(val))` means that the stream has successfully
45 /// produced a value, `val`, and may produce further values on subsequent
46 /// `poll_next` calls.
47 ///
48 /// - `Poll::Ready(None)` means that the stream has terminated, and
49 /// `poll_next` should not be invoked again.
50 ///
51 /// # Panics
52 ///
53 /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
54 /// `poll_next` method again may panic, block forever, or cause other kinds of
55 /// problems; the `Stream` trait places no requirements on the effects of
56 /// such a call. However, as the `poll_next` method is not marked `unsafe`,
57 /// Rust's usual rules apply: calls must never cause undefined behavior
58 /// (memory corruption, incorrect use of `unsafe` functions, or the like),
59 /// regardless of the stream's state.
60 ///
61 /// If this is difficult to guard against then the [`fuse`] adapter can be used
62 /// to ensure that `poll_next` always returns `Ready(None)` in subsequent
63 /// calls.
64 ///
65 /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
67
68 /// Returns the bounds on the remaining length of the stream.
69 ///
70 /// Specifically, `size_hint()` returns a tuple where the first element
71 /// is the lower bound, and the second element is the upper bound.
72 ///
73 /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
74 /// A [`None`] here means that either there is no known upper bound, or the
75 /// upper bound is larger than [`usize`].
76 ///
77 /// # Implementation notes
78 ///
79 /// It is not enforced that a stream implementation yields the declared
80 /// number of elements. A buggy stream may yield less than the lower bound
81 /// or more than the upper bound of elements.
82 ///
83 /// `size_hint()` is primarily intended to be used for optimizations such as
84 /// reserving space for the elements of the stream, but must not be
85 /// trusted to e.g., omit bounds checks in unsafe code. An incorrect
86 /// implementation of `size_hint()` should not lead to memory safety
87 /// violations.
88 ///
89 /// That said, the implementation should provide a correct estimation,
90 /// because otherwise it would be a violation of the trait's protocol.
91 ///
92 /// The default implementation returns `(0, `[`None`]`)` which is correct for any
93 /// stream.
94 #[inline]
95 fn size_hint(&self) -> (usize, Option<usize>) {
96 (0, None)
97 }
98}
99
100impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
101 type Item = S::Item;
102
103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
104 S::poll_next(Pin::new(&mut **self), cx)
105 }
106
107 fn size_hint(&self) -> (usize, Option<usize>) {
108 (**self).size_hint()
109 }
110}
111
112impl<P> Stream for Pin<P>
113where
114 P: DerefMut + Unpin,
115 P::Target: Stream,
116{
117 type Item = <P::Target as Stream>::Item;
118
119 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
120 self.get_mut().as_mut().poll_next(cx)
121 }
122
123 fn size_hint(&self) -> (usize, Option<usize>) {
124 (**self).size_hint()
125 }
126}
127
128/// A stream which tracks whether or not the underlying stream
129/// should no longer be polled.
130///
131/// `is_terminated` will return `true` if a future should no longer be polled.
132/// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
133/// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
134/// stream has become inactive and can no longer make progress and should be
135/// ignored or dropped rather than being polled again.
136pub trait FusedStream: Stream {
137 /// Returns `true` if the stream should no longer be polled.
138 fn is_terminated(&self) -> bool;
139}
140
141impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
142 fn is_terminated(&self) -> bool {
143 <F as FusedStream>::is_terminated(&**self)
144 }
145}
146
147impl<P> FusedStream for Pin<P>
148where
149 P: DerefMut + Unpin,
150 P::Target: FusedStream,
151{
152 fn is_terminated(&self) -> bool {
153 <P::Target as FusedStream>::is_terminated(&**self)
154 }
155}
156
157mod private_try_stream {
158 use super::Stream;
159
160 pub trait Sealed {}
161
162 impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
163}
164
165/// A convenience for streams that return `Result` values that includes
166/// a variety of adapters tailored to such futures.
167pub trait TryStream: Stream + private_try_stream::Sealed {
168 /// The type of successful values yielded by this future
169 type Ok;
170
171 /// The type of failures yielded by this future
172 type Error;
173
174 /// Poll this `TryStream` as if it were a `Stream`.
175 ///
176 /// This method is a stopgap for a compiler limitation that prevents us from
177 /// directly inheriting from the `Stream` trait; in the future it won't be
178 /// needed.
179 fn try_poll_next(
180 self: Pin<&mut Self>,
181 cx: &mut Context<'_>,
182 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>;
183}
184
185impl<S, T, E> TryStream for S
186where
187 S: ?Sized + Stream<Item = Result<T, E>>,
188{
189 type Ok = T;
190 type Error = E;
191
192 fn try_poll_next(
193 self: Pin<&mut Self>,
194 cx: &mut Context<'_>,
195 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
196 self.poll_next(cx)
197 }
198}
199
200#[cfg(feature = "alloc")]
201mod if_alloc {
202 use super::*;
203 use alloc::boxed::Box;
204
205 impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
206 type Item = S::Item;
207
208 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
209 Pin::new(&mut **self).poll_next(cx)
210 }
211
212 fn size_hint(&self) -> (usize, Option<usize>) {
213 (**self).size_hint()
214 }
215 }
216
217 #[cfg(feature = "std")]
218 impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
219 type Item = S::Item;
220
221 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
222 unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
223 }
224
225 fn size_hint(&self) -> (usize, Option<usize>) {
226 self.0.size_hint()
227 }
228 }
229
230 impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> {
231 fn is_terminated(&self) -> bool {
232 <S as FusedStream>::is_terminated(&**self)
233 }
234 }
235}
236