1 | //! Composable asynchronous iteration. |
2 | //! |
3 | //! This module is an async version of [`std::iter`]. |
4 | //! |
5 | //! If you've found yourself with an asynchronous collection of some kind, |
6 | //! and needed to perform an operation on the elements of said collection, |
7 | //! you'll quickly run into 'streams'. Streams are heavily used in idiomatic |
8 | //! asynchronous Rust code, so it's worth becoming familiar with them. |
9 | //! |
10 | //! Before explaining more, let's talk about how this module is structured: |
11 | //! |
12 | //! # Organization |
13 | //! |
14 | //! This module is largely organized by type: |
15 | //! |
16 | //! * [Traits] are the core portion: these traits define what kind of streams |
17 | //! exist and what you can do with them. The methods of these traits are worth |
18 | //! putting some extra study time into. |
19 | //! * [Functions] provide some helpful ways to create some basic streams. |
20 | //! * [Structs] are often the return types of the various methods on this |
21 | //! module's traits. You'll usually want to look at the method that creates |
22 | //! the `struct`, rather than the `struct` itself. For more detail about why, |
23 | //! see '[Implementing Stream](#implementing-stream)'. |
24 | //! |
25 | //! [Traits]: #traits |
26 | //! [Functions]: #functions |
27 | //! [Structs]: #structs |
28 | //! |
29 | //! That's it! Let's dig into streams. |
30 | //! |
31 | //! # Stream |
32 | //! |
33 | //! The heart and soul of this module is the [`Stream`] trait. The core of |
34 | //! [`Stream`] looks like this: |
35 | //! |
36 | //! ``` |
37 | //! #![allow(dead_code)] |
38 | //! # use async_std::task::{Context, Poll}; |
39 | //! # use std::pin::Pin; |
40 | //! pub trait Stream { |
41 | //! type Item; |
42 | //! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; |
43 | //! } |
44 | //! # impl Stream for () { |
45 | //! # type Item = (); |
46 | //! # fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Pending } |
47 | //! # } |
48 | //! ``` |
49 | //! |
50 | //! A stream has a method, [`next`], which when called, returns an |
51 | //! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))` |
52 | //! as long as there are elements, and once they've all been exhausted, will |
53 | //! return `None` to indicate that iteration is finished. If we're waiting on |
54 | //! something asynchronous to resolve `Pending` is returned. |
55 | //! |
56 | //! Individual streams may choose to resume iteration, and so calling |
57 | //! [`next`] again may or may not eventually start returning `Ready(Some(Item))` |
58 | //! again at some point. |
59 | //! |
60 | //! [`Stream`]'s full definition includes a number of other methods as well, |
61 | //! but they are default methods, built on top of [`next`], and so you get |
62 | //! them for free. |
63 | //! |
64 | //! Streams are also composable, and it's common to chain them together to do |
65 | //! more complex forms of processing. See the [Adapters](#adapters) section |
66 | //! below for more details. |
67 | //! |
68 | //! [`Poll`]: ../task/enum.Poll.html |
69 | //! [`Stream`]: trait.Stream.html |
70 | //! [`next`]: trait.Stream.html#tymethod.next |
71 | //! [`Option`]: ../../std/option/enum.Option.html |
72 | //! |
73 | //! # The three forms of streaming |
74 | //! |
75 | //! There are three common methods which can create streams from a collection: |
76 | //! |
77 | //! * `stream()`, which iterates over `&T`. |
78 | //! * `stream_mut()`, which iterates over `&mut T`. |
79 | //! * `into_stream()`, which iterates over `T`. |
80 | //! |
81 | //! Various things in async-std may implement one or more of the |
82 | //! three, where appropriate. |
83 | //! |
84 | //! # Implementing Stream |
85 | //! |
86 | //! Creating a stream of your own involves two steps: creating a `struct` to |
87 | //! hold the stream's state, and then `impl`ementing [`Stream`] for that |
88 | //! `struct`. This is why there are so many `struct`s in this module: there is |
89 | //! one for each stream and iterator adapter. |
90 | //! |
91 | //! Let's make a stream named `Counter` which counts from `1` to `5`: |
92 | //! |
93 | //! ``` |
94 | //! # use async_std::prelude::*; |
95 | //! # use async_std::task::{Context, Poll}; |
96 | //! # use std::pin::Pin; |
97 | //! // First, the struct: |
98 | //! |
99 | //! /// A stream which counts from one to five |
100 | //! struct Counter { |
101 | //! count: usize, |
102 | //! } |
103 | //! |
104 | //! // we want our count to start at one, so let's add a new() method to help. |
105 | //! // This isn't strictly necessary, but is convenient. Note that we start |
106 | //! // `count` at zero, we'll see why in `next()`'s implementation below. |
107 | //! impl Counter { |
108 | //! fn new() -> Counter { |
109 | //! Counter { count: 0 } |
110 | //! } |
111 | //! } |
112 | //! |
113 | //! // Then, we implement `Stream` for our `Counter`: |
114 | //! |
115 | //! impl Stream for Counter { |
116 | //! // we will be counting with usize |
117 | //! type Item = usize; |
118 | //! |
119 | //! // poll_next() is the only required method |
120 | //! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
121 | //! // Increment our count. This is why we started at zero. |
122 | //! self.count += 1; |
123 | //! |
124 | //! // Check to see if we've finished counting or not. |
125 | //! if self.count < 6 { |
126 | //! Poll::Ready(Some(self.count)) |
127 | //! } else { |
128 | //! Poll::Ready(None) |
129 | //! } |
130 | //! } |
131 | //! } |
132 | //! |
133 | //! // And now we can use it! |
134 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
135 | //! # |
136 | //! let mut counter = Counter::new(); |
137 | //! |
138 | //! let x = counter.next().await.unwrap(); |
139 | //! println!("{}" , x); |
140 | //! |
141 | //! let x = counter.next().await.unwrap(); |
142 | //! println!("{}" , x); |
143 | //! |
144 | //! let x = counter.next().await.unwrap(); |
145 | //! println!("{}" , x); |
146 | //! |
147 | //! let x = counter.next().await.unwrap(); |
148 | //! println!("{}" , x); |
149 | //! |
150 | //! let x = counter.next().await.unwrap(); |
151 | //! println!("{}" , x); |
152 | //! # |
153 | //! # Ok(()) }) } |
154 | //! ``` |
155 | //! |
156 | //! This will print `1` through `5`, each on their own line. |
157 | //! |
158 | //! Calling `next().await` this way gets repetitive. Rust has a construct which |
159 | //! can call `next()` on your stream, until it reaches `None`. Let's go over |
160 | //! that next. |
161 | //! |
162 | //! # while let Loops and IntoStream |
163 | //! |
164 | //! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic |
165 | //! example of `while let`: |
166 | //! |
167 | //! ``` |
168 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
169 | //! # |
170 | //! # use async_std::prelude::*; |
171 | //! # use async_std::stream; |
172 | //! let mut values = stream::from_iter(1u8..6); |
173 | //! |
174 | //! while let Some(x) = values.next().await { |
175 | //! println!("{}" , x); |
176 | //! } |
177 | //! # |
178 | //! # Ok(()) }) } |
179 | //! ``` |
180 | //! |
181 | //! This will print the numbers one through five, each on their own line. But |
182 | //! you'll notice something here: we never called anything on our vector to |
183 | //! produce a stream. What gives? |
184 | //! |
185 | //! There's a trait in the standard library for converting something into an |
186 | //! stream: [`IntoStream`]. This trait has one method, [`into_stream`], |
187 | //! which converts the thing implementing [`IntoStream`] into a stream. |
188 | //! |
189 | //! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler |
190 | //! support yet. This means that automatic conversions like with `for` loops |
191 | //! doesn't occur yet, and `into_stream` or `from_iter` as above will always |
192 | //! have to be called manually. |
193 | //! |
194 | //! [`IntoStream`]: trait.IntoStream.html |
195 | //! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream |
196 | //! |
197 | //! # Adapters |
198 | //! |
199 | //! Functions which take an [`Stream`] and return another [`Stream`] are |
200 | //! often called 'stream adapters', as they are a form of the 'adapter |
201 | //! pattern'. |
202 | //! |
203 | //! Common stream adapters include [`map`], [`take`], and [`filter`]. |
204 | //! For more, see their documentation. |
205 | //! |
206 | //! [`map`]: trait.Stream.html#method.map |
207 | //! [`take`]: trait.Stream.html#method.take |
208 | //! [`filter`]: trait.Stream.html#method.filter |
209 | //! |
210 | //! # Laziness |
211 | //! |
212 | //! Streams (and stream [adapters](#adapters)) are *lazy*. This means that |
213 | //! just creating a stream doesn't _do_ a whole lot. Nothing really happens |
214 | //! until you call [`next`]. This is sometimes a source of confusion when |
215 | //! creating a stream solely for its side effects. For example, the [`map`] |
216 | //! method calls a closure on each element it iterates over: |
217 | //! |
218 | //! ``` |
219 | //! # #![allow(unused_must_use)] |
220 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
221 | //! # |
222 | //! # use async_std::prelude::*; |
223 | //! # use async_std::stream; |
224 | //! let v = stream::repeat(1u8).take(5); |
225 | //! v.map(|x| println!("{}" , x)); |
226 | //! # |
227 | //! # Ok(()) }) } |
228 | //! ``` |
229 | //! |
230 | //! This will not print any values, as we only created a stream, rather than |
231 | //! using it. The compiler will warn us about this kind of behavior: |
232 | //! |
233 | //! ```text |
234 | //! warning: unused result that must be used: streams are lazy and |
235 | //! do nothing unless consumed |
236 | //! ``` |
237 | //! |
238 | //! The idiomatic way to write a [`map`] for its side effects is to use a |
239 | //! `while let` loop instead: |
240 | //! |
241 | //! ``` |
242 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
243 | //! # |
244 | //! # use async_std::prelude::*; |
245 | //! # use async_std::stream; |
246 | //! let mut v = stream::repeat(1u8).take(5); |
247 | //! |
248 | //! while let Some(x) = &v.next().await { |
249 | //! println!("{}" , x); |
250 | //! } |
251 | //! # |
252 | //! # Ok(()) }) } |
253 | //! ``` |
254 | //! |
255 | //! [`map`]: trait.Stream.html#method.map |
256 | //! |
257 | //! The two most common ways to evaluate a stream are to use a `while let` loop |
258 | //! like this, or using the [`collect`] method to produce a new collection. |
259 | //! |
260 | //! [`collect`]: trait.Stream.html#method.collect |
261 | //! |
262 | //! # Infinity |
263 | //! |
264 | //! Streams do not have to be finite. As an example, a repeat stream is |
265 | //! an infinite stream: |
266 | //! |
267 | //! ``` |
268 | //! # use async_std::stream; |
269 | //! let numbers = stream::repeat(1u8); |
270 | //! ``` |
271 | //! |
272 | //! It is common to use the [`take`] stream adapter to turn an infinite |
273 | //! stream into a finite one: |
274 | //! |
275 | //! ``` |
276 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
277 | //! # |
278 | //! # use async_std::prelude::*; |
279 | //! # use async_std::stream; |
280 | //! let numbers = stream::from_iter(0u8..); |
281 | //! let mut five_numbers = numbers.take(5); |
282 | //! |
283 | //! while let Some(number) = five_numbers.next().await { |
284 | //! println!("{}" , number); |
285 | //! } |
286 | //! # |
287 | //! # Ok(()) }) } |
288 | //! ``` |
289 | //! |
290 | //! This will print the numbers `0` through `4`, each on their own line. |
291 | //! |
292 | //! Bear in mind that methods on infinite streams, even those for which a |
293 | //! result can be determined mathematically in finite time, may not terminate. |
294 | //! Specifically, methods such as [`min`], which in the general case require |
295 | //! traversing every element in the stream, are likely not to return |
296 | //! successfully for any infinite streams. |
297 | //! |
298 | //! ```ignore |
299 | //! let ones = async_std::stream::repeat(1); |
300 | //! let least = ones.min().await.unwrap(); // Oh no! An infinite loop! |
301 | //! // `ones.min()` causes an infinite loop, so we won't reach this point! |
302 | //! println!("The smallest number one is {}." , least); |
303 | //! ``` |
304 | //! |
305 | //! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html |
306 | //! [`take`]: trait.Stream.html#method.take |
307 | //! [`min`]: trait.Stream.html#method.min |
308 | |
309 | pub use empty::{empty, Empty}; |
310 | pub use from_fn::{from_fn, FromFn}; |
311 | pub use from_iter::{from_iter, FromIter}; |
312 | pub use once::{once, Once}; |
313 | pub use repeat::{repeat, Repeat}; |
314 | pub use repeat_with::{repeat_with, RepeatWith}; |
315 | pub use stream::*; |
316 | |
317 | pub(crate) mod stream; |
318 | |
319 | mod empty; |
320 | mod from_fn; |
321 | mod from_iter; |
322 | mod once; |
323 | mod repeat; |
324 | mod repeat_with; |
325 | |
326 | cfg_unstable! { |
327 | mod double_ended_stream; |
328 | mod exact_size_stream; |
329 | mod extend; |
330 | mod from_stream; |
331 | mod fused_stream; |
332 | mod interval; |
333 | mod into_stream; |
334 | mod pending; |
335 | mod product; |
336 | mod successors; |
337 | mod sum; |
338 | |
339 | pub use double_ended_stream::DoubleEndedStream; |
340 | pub use exact_size_stream::ExactSizeStream; |
341 | pub use extend::{extend, Extend}; |
342 | pub use from_stream::FromStream; |
343 | pub use fused_stream::FusedStream; |
344 | pub use interval::{interval, Interval}; |
345 | pub use into_stream::IntoStream; |
346 | pub use pending::{pending, Pending}; |
347 | pub use product::Product; |
348 | pub use stream::Merge; |
349 | pub use successors::{successors, Successors}; |
350 | pub use sum::Sum; |
351 | } |
352 | |