| 1 | //! Composable asynchronous iteration. |
| 2 | //! |
| 3 | //! This module is an async version of [`std::iter`]. |
| 4 | //! |
| 5 | //! If you've found yourself with an asynchronous collection of some kind, |
| 6 | //! and needed to perform an operation on the elements of said collection, |
| 7 | //! you'll quickly run into 'streams'. Streams are heavily used in idiomatic |
| 8 | //! asynchronous Rust code, so it's worth becoming familiar with them. |
| 9 | //! |
| 10 | //! Before explaining more, let's talk about how this module is structured: |
| 11 | //! |
| 12 | //! # Organization |
| 13 | //! |
| 14 | //! This module is largely organized by type: |
| 15 | //! |
| 16 | //! * [Traits] are the core portion: these traits define what kind of streams |
| 17 | //! exist and what you can do with them. The methods of these traits are worth |
| 18 | //! putting some extra study time into. |
| 19 | //! * [Functions] provide some helpful ways to create some basic streams. |
| 20 | //! * [Structs] are often the return types of the various methods on this |
| 21 | //! module's traits. You'll usually want to look at the method that creates |
| 22 | //! the `struct`, rather than the `struct` itself. For more detail about why, |
| 23 | //! see '[Implementing Stream](#implementing-stream)'. |
| 24 | //! |
| 25 | //! [Traits]: #traits |
| 26 | //! [Functions]: #functions |
| 27 | //! [Structs]: #structs |
| 28 | //! |
| 29 | //! That's it! Let's dig into streams. |
| 30 | //! |
| 31 | //! # Stream |
| 32 | //! |
| 33 | //! The heart and soul of this module is the [`Stream`] trait. The core of |
| 34 | //! [`Stream`] looks like this: |
| 35 | //! |
| 36 | //! ``` |
| 37 | //! #![allow(dead_code)] |
| 38 | //! # use async_std::task::{Context, Poll}; |
| 39 | //! # use std::pin::Pin; |
| 40 | //! pub trait Stream { |
| 41 | //! type Item; |
| 42 | //! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; |
| 43 | //! } |
| 44 | //! # impl Stream for () { |
| 45 | //! # type Item = (); |
| 46 | //! # fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { Poll::Pending } |
| 47 | //! # } |
| 48 | //! ``` |
| 49 | //! |
| 50 | //! A stream has a method, [`next`], which when called, returns an |
| 51 | //! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))` |
| 52 | //! as long as there are elements, and once they've all been exhausted, will |
| 53 | //! return `None` to indicate that iteration is finished. If we're waiting on |
| 54 | //! something asynchronous to resolve `Pending` is returned. |
| 55 | //! |
| 56 | //! Individual streams may choose to resume iteration, and so calling |
| 57 | //! [`next`] again may or may not eventually start returning `Ready(Some(Item))` |
| 58 | //! again at some point. |
| 59 | //! |
| 60 | //! [`Stream`]'s full definition includes a number of other methods as well, |
| 61 | //! but they are default methods, built on top of [`next`], and so you get |
| 62 | //! them for free. |
| 63 | //! |
| 64 | //! Streams are also composable, and it's common to chain them together to do |
| 65 | //! more complex forms of processing. See the [Adapters](#adapters) section |
| 66 | //! below for more details. |
| 67 | //! |
| 68 | //! [`Poll`]: ../task/enum.Poll.html |
| 69 | //! [`Stream`]: trait.Stream.html |
| 70 | //! [`next`]: trait.Stream.html#tymethod.next |
| 71 | //! [`Option`]: ../../std/option/enum.Option.html |
| 72 | //! |
| 73 | //! # The three forms of streaming |
| 74 | //! |
| 75 | //! There are three common methods which can create streams from a collection: |
| 76 | //! |
| 77 | //! * `stream()`, which iterates over `&T`. |
| 78 | //! * `stream_mut()`, which iterates over `&mut T`. |
| 79 | //! * `into_stream()`, which iterates over `T`. |
| 80 | //! |
| 81 | //! Various things in async-std may implement one or more of the |
| 82 | //! three, where appropriate. |
| 83 | //! |
| 84 | //! # Implementing Stream |
| 85 | //! |
| 86 | //! Creating a stream of your own involves two steps: creating a `struct` to |
| 87 | //! hold the stream's state, and then `impl`ementing [`Stream`] for that |
| 88 | //! `struct`. This is why there are so many `struct`s in this module: there is |
| 89 | //! one for each stream and iterator adapter. |
| 90 | //! |
| 91 | //! Let's make a stream named `Counter` which counts from `1` to `5`: |
| 92 | //! |
| 93 | //! ``` |
| 94 | //! # use async_std::prelude::*; |
| 95 | //! # use async_std::task::{Context, Poll}; |
| 96 | //! # use std::pin::Pin; |
| 97 | //! // First, the struct: |
| 98 | //! |
| 99 | //! /// A stream which counts from one to five |
| 100 | //! struct Counter { |
| 101 | //! count: usize, |
| 102 | //! } |
| 103 | //! |
| 104 | //! // we want our count to start at one, so let's add a new() method to help. |
| 105 | //! // This isn't strictly necessary, but is convenient. Note that we start |
| 106 | //! // `count` at zero, we'll see why in `next()`'s implementation below. |
| 107 | //! impl Counter { |
| 108 | //! fn new() -> Counter { |
| 109 | //! Counter { count: 0 } |
| 110 | //! } |
| 111 | //! } |
| 112 | //! |
| 113 | //! // Then, we implement `Stream` for our `Counter`: |
| 114 | //! |
| 115 | //! impl Stream for Counter { |
| 116 | //! // we will be counting with usize |
| 117 | //! type Item = usize; |
| 118 | //! |
| 119 | //! // poll_next() is the only required method |
| 120 | //! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
| 121 | //! // Increment our count. This is why we started at zero. |
| 122 | //! self.count += 1; |
| 123 | //! |
| 124 | //! // Check to see if we've finished counting or not. |
| 125 | //! if self.count < 6 { |
| 126 | //! Poll::Ready(Some(self.count)) |
| 127 | //! } else { |
| 128 | //! Poll::Ready(None) |
| 129 | //! } |
| 130 | //! } |
| 131 | //! } |
| 132 | //! |
| 133 | //! // And now we can use it! |
| 134 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 135 | //! # |
| 136 | //! let mut counter = Counter::new(); |
| 137 | //! |
| 138 | //! let x = counter.next().await.unwrap(); |
| 139 | //! println!("{}" , x); |
| 140 | //! |
| 141 | //! let x = counter.next().await.unwrap(); |
| 142 | //! println!("{}" , x); |
| 143 | //! |
| 144 | //! let x = counter.next().await.unwrap(); |
| 145 | //! println!("{}" , x); |
| 146 | //! |
| 147 | //! let x = counter.next().await.unwrap(); |
| 148 | //! println!("{}" , x); |
| 149 | //! |
| 150 | //! let x = counter.next().await.unwrap(); |
| 151 | //! println!("{}" , x); |
| 152 | //! # |
| 153 | //! # Ok(()) }) } |
| 154 | //! ``` |
| 155 | //! |
| 156 | //! This will print `1` through `5`, each on their own line. |
| 157 | //! |
| 158 | //! Calling `next().await` this way gets repetitive. Rust has a construct which |
| 159 | //! can call `next()` on your stream, until it reaches `None`. Let's go over |
| 160 | //! that next. |
| 161 | //! |
| 162 | //! # while let Loops and IntoStream |
| 163 | //! |
| 164 | //! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic |
| 165 | //! example of `while let`: |
| 166 | //! |
| 167 | //! ``` |
| 168 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 169 | //! # |
| 170 | //! # use async_std::prelude::*; |
| 171 | //! # use async_std::stream; |
| 172 | //! let mut values = stream::from_iter(1u8..6); |
| 173 | //! |
| 174 | //! while let Some(x) = values.next().await { |
| 175 | //! println!("{}" , x); |
| 176 | //! } |
| 177 | //! # |
| 178 | //! # Ok(()) }) } |
| 179 | //! ``` |
| 180 | //! |
| 181 | //! This will print the numbers one through five, each on their own line. But |
| 182 | //! you'll notice something here: we never called anything on our vector to |
| 183 | //! produce a stream. What gives? |
| 184 | //! |
| 185 | //! There's a trait in the standard library for converting something into an |
| 186 | //! stream: [`IntoStream`]. This trait has one method, [`into_stream`], |
| 187 | //! which converts the thing implementing [`IntoStream`] into a stream. |
| 188 | //! |
| 189 | //! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler |
| 190 | //! support yet. This means that automatic conversions like with `for` loops |
| 191 | //! doesn't occur yet, and `into_stream` or `from_iter` as above will always |
| 192 | //! have to be called manually. |
| 193 | //! |
| 194 | //! [`IntoStream`]: trait.IntoStream.html |
| 195 | //! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream |
| 196 | //! |
| 197 | //! # Adapters |
| 198 | //! |
| 199 | //! Functions which take an [`Stream`] and return another [`Stream`] are |
| 200 | //! often called 'stream adapters', as they are a form of the 'adapter |
| 201 | //! pattern'. |
| 202 | //! |
| 203 | //! Common stream adapters include [`map`], [`take`], and [`filter`]. |
| 204 | //! For more, see their documentation. |
| 205 | //! |
| 206 | //! [`map`]: trait.Stream.html#method.map |
| 207 | //! [`take`]: trait.Stream.html#method.take |
| 208 | //! [`filter`]: trait.Stream.html#method.filter |
| 209 | //! |
| 210 | //! # Laziness |
| 211 | //! |
| 212 | //! Streams (and stream [adapters](#adapters)) are *lazy*. This means that |
| 213 | //! just creating a stream doesn't _do_ a whole lot. Nothing really happens |
| 214 | //! until you call [`next`]. This is sometimes a source of confusion when |
| 215 | //! creating a stream solely for its side effects. For example, the [`map`] |
| 216 | //! method calls a closure on each element it iterates over: |
| 217 | //! |
| 218 | //! ``` |
| 219 | //! # #![allow(unused_must_use)] |
| 220 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 221 | //! # |
| 222 | //! # use async_std::prelude::*; |
| 223 | //! # use async_std::stream; |
| 224 | //! let v = stream::repeat(1u8).take(5); |
| 225 | //! v.map(|x| println!("{}" , x)); |
| 226 | //! # |
| 227 | //! # Ok(()) }) } |
| 228 | //! ``` |
| 229 | //! |
| 230 | //! This will not print any values, as we only created a stream, rather than |
| 231 | //! using it. The compiler will warn us about this kind of behavior: |
| 232 | //! |
| 233 | //! ```text |
| 234 | //! warning: unused result that must be used: streams are lazy and |
| 235 | //! do nothing unless consumed |
| 236 | //! ``` |
| 237 | //! |
| 238 | //! The idiomatic way to write a [`map`] for its side effects is to use a |
| 239 | //! `while let` loop instead: |
| 240 | //! |
| 241 | //! ``` |
| 242 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 243 | //! # |
| 244 | //! # use async_std::prelude::*; |
| 245 | //! # use async_std::stream; |
| 246 | //! let mut v = stream::repeat(1u8).take(5); |
| 247 | //! |
| 248 | //! while let Some(x) = &v.next().await { |
| 249 | //! println!("{}" , x); |
| 250 | //! } |
| 251 | //! # |
| 252 | //! # Ok(()) }) } |
| 253 | //! ``` |
| 254 | //! |
| 255 | //! [`map`]: trait.Stream.html#method.map |
| 256 | //! |
| 257 | //! The two most common ways to evaluate a stream are to use a `while let` loop |
| 258 | //! like this, or using the [`collect`] method to produce a new collection. |
| 259 | //! |
| 260 | //! [`collect`]: trait.Stream.html#method.collect |
| 261 | //! |
| 262 | //! # Infinity |
| 263 | //! |
| 264 | //! Streams do not have to be finite. As an example, a repeat stream is |
| 265 | //! an infinite stream: |
| 266 | //! |
| 267 | //! ``` |
| 268 | //! # use async_std::stream; |
| 269 | //! let numbers = stream::repeat(1u8); |
| 270 | //! ``` |
| 271 | //! |
| 272 | //! It is common to use the [`take`] stream adapter to turn an infinite |
| 273 | //! stream into a finite one: |
| 274 | //! |
| 275 | //! ``` |
| 276 | //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { |
| 277 | //! # |
| 278 | //! # use async_std::prelude::*; |
| 279 | //! # use async_std::stream; |
| 280 | //! let numbers = stream::from_iter(0u8..); |
| 281 | //! let mut five_numbers = numbers.take(5); |
| 282 | //! |
| 283 | //! while let Some(number) = five_numbers.next().await { |
| 284 | //! println!("{}" , number); |
| 285 | //! } |
| 286 | //! # |
| 287 | //! # Ok(()) }) } |
| 288 | //! ``` |
| 289 | //! |
| 290 | //! This will print the numbers `0` through `4`, each on their own line. |
| 291 | //! |
| 292 | //! Bear in mind that methods on infinite streams, even those for which a |
| 293 | //! result can be determined mathematically in finite time, may not terminate. |
| 294 | //! Specifically, methods such as [`min`], which in the general case require |
| 295 | //! traversing every element in the stream, are likely not to return |
| 296 | //! successfully for any infinite streams. |
| 297 | //! |
| 298 | //! ```ignore |
| 299 | //! let ones = async_std::stream::repeat(1); |
| 300 | //! let least = ones.min().await.unwrap(); // Oh no! An infinite loop! |
| 301 | //! // `ones.min()` causes an infinite loop, so we won't reach this point! |
| 302 | //! println!("The smallest number one is {}." , least); |
| 303 | //! ``` |
| 304 | //! |
| 305 | //! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html |
| 306 | //! [`take`]: trait.Stream.html#method.take |
| 307 | //! [`min`]: trait.Stream.html#method.min |
| 308 | |
| 309 | pub use empty::{empty, Empty}; |
| 310 | pub use from_fn::{from_fn, FromFn}; |
| 311 | pub use from_iter::{from_iter, FromIter}; |
| 312 | pub use once::{once, Once}; |
| 313 | pub use repeat::{repeat, Repeat}; |
| 314 | pub use repeat_with::{repeat_with, RepeatWith}; |
| 315 | pub use stream::*; |
| 316 | |
| 317 | pub(crate) mod stream; |
| 318 | |
| 319 | mod empty; |
| 320 | mod from_fn; |
| 321 | mod from_iter; |
| 322 | mod once; |
| 323 | mod repeat; |
| 324 | mod repeat_with; |
| 325 | |
| 326 | cfg_unstable! { |
| 327 | mod double_ended_stream; |
| 328 | mod exact_size_stream; |
| 329 | mod extend; |
| 330 | mod from_stream; |
| 331 | mod fused_stream; |
| 332 | mod interval; |
| 333 | mod into_stream; |
| 334 | mod pending; |
| 335 | mod product; |
| 336 | mod successors; |
| 337 | mod sum; |
| 338 | |
| 339 | pub use double_ended_stream::DoubleEndedStream; |
| 340 | pub use exact_size_stream::ExactSizeStream; |
| 341 | pub use extend::{extend, Extend}; |
| 342 | pub use from_stream::FromStream; |
| 343 | pub use fused_stream::FusedStream; |
| 344 | pub use interval::{interval, Interval}; |
| 345 | pub use into_stream::IntoStream; |
| 346 | pub use pending::{pending, Pending}; |
| 347 | pub use product::Product; |
| 348 | pub use stream::Merge; |
| 349 | pub use successors::{successors, Successors}; |
| 350 | pub use sum::Sum; |
| 351 | } |
| 352 | |