1#![warn(
2 missing_debug_implementations,
3 missing_docs,
4 rust_2018_idioms,
5 unreachable_pub
6)]
7#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
8
9//! Asynchronous stream of elements.
10//!
11//! Provides two macros, `stream!` and `try_stream!`, allowing the caller to
12//! define asynchronous streams of elements. These are implemented using `async`
13//! & `await` notation. This crate works without unstable features.
14//!
15//! The `stream!` macro returns an anonymous type implementing the [`Stream`]
16//! trait. The `Item` associated type is the type of the values yielded from the
17//! stream. The `try_stream!` also returns an anonymous type implementing the
18//! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The
19//! `try_stream!` macro supports using `?` notation as part of the
20//! implementation.
21//!
22//! # Usage
23//!
24//! A basic stream yielding numbers. Values are yielded using the `yield`
25//! keyword. The stream block must return `()`.
26//!
27//! ```rust
28//! use async_stream::stream;
29//!
30//! use futures_util::pin_mut;
31//! use futures_util::stream::StreamExt;
32//!
33//! #[tokio::main]
34//! async fn main() {
35//! let s = stream! {
36//! for i in 0..3 {
37//! yield i;
38//! }
39//! };
40//!
41//! pin_mut!(s); // needed for iteration
42//!
43//! while let Some(value) = s.next().await {
44//! println!("got {}", value);
45//! }
46//! }
47//! ```
48//!
49//! Streams may be returned by using `impl Stream<Item = T>`:
50//!
51//! ```rust
52//! use async_stream::stream;
53//!
54//! use futures_core::stream::Stream;
55//! use futures_util::pin_mut;
56//! use futures_util::stream::StreamExt;
57//!
58//! fn zero_to_three() -> impl Stream<Item = u32> {
59//! stream! {
60//! for i in 0..3 {
61//! yield i;
62//! }
63//! }
64//! }
65//!
66//! #[tokio::main]
67//! async fn main() {
68//! let s = zero_to_three();
69//! pin_mut!(s); // needed for iteration
70//!
71//! while let Some(value) = s.next().await {
72//! println!("got {}", value);
73//! }
74//! }
75//! ```
76//!
77//! Streams may be implemented in terms of other streams - `async-stream` provides `for await`
78//! syntax to assist with this:
79//!
80//! ```rust
81//! use async_stream::stream;
82//!
83//! use futures_core::stream::Stream;
84//! use futures_util::pin_mut;
85//! use futures_util::stream::StreamExt;
86//!
87//! fn zero_to_three() -> impl Stream<Item = u32> {
88//! stream! {
89//! for i in 0..3 {
90//! yield i;
91//! }
92//! }
93//! }
94//!
95//! fn double<S: Stream<Item = u32>>(input: S)
96//! -> impl Stream<Item = u32>
97//! {
98//! stream! {
99//! for await value in input {
100//! yield value * 2;
101//! }
102//! }
103//! }
104//!
105//! #[tokio::main]
106//! async fn main() {
107//! let s = double(zero_to_three());
108//! pin_mut!(s); // needed for iteration
109//!
110//! while let Some(value) = s.next().await {
111//! println!("got {}", value);
112//! }
113//! }
114//! ```
115//!
116//! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item`
117//! of the returned stream is `Result` with `Ok` being the value yielded and
118//! `Err` the error type returned by `?`.
119//!
120//! ```rust
121//! use tokio::net::{TcpListener, TcpStream};
122//!
123//! use async_stream::try_stream;
124//! use futures_core::stream::Stream;
125//!
126//! use std::io;
127//! use std::net::SocketAddr;
128//!
129//! fn bind_and_accept(addr: SocketAddr)
130//! -> impl Stream<Item = io::Result<TcpStream>>
131//! {
132//! try_stream! {
133//! let mut listener = TcpListener::bind(addr).await?;
134//!
135//! loop {
136//! let (stream, addr) = listener.accept().await?;
137//! println!("received on {:?}", addr);
138//! yield stream;
139//! }
140//! }
141//! }
142//! ```
143//!
144//! # Implementation
145//!
146//! The `stream!` and `try_stream!` macros are implemented using proc macros.
147//! The macro searches the syntax tree for instances of `yield $expr` and
148//! transforms them into `sender.send($expr).await`.
149//!
150//! The stream uses a lightweight sender to send values from the stream
151//! implementation to the caller. When entering the stream, an `Option<T>` is
152//! stored on the stack. A pointer to the cell is stored in a thread local and
153//! `poll` is called on the async block. When `poll` returns.
154//! `sender.send(value)` stores the value that cell and yields back to the
155//! caller.
156//!
157//! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html
158
159mod async_stream;
160mod next;
161mod yielder;
162
163/// Asynchronous stream
164///
165/// See [crate](index.html) documentation for more details.
166///
167/// # Examples
168///
169/// ```
170/// use async_stream::stream;
171///
172/// use futures_util::pin_mut;
173/// use futures_util::stream::StreamExt;
174///
175/// #[tokio::main]
176/// async fn main() {
177/// let s = stream! {
178/// for i in 0..3 {
179/// yield i;
180/// }
181/// };
182///
183/// pin_mut!(s); // needed for iteration
184///
185/// while let Some(value) = s.next().await {
186/// println!("got {}", value);
187/// }
188/// }
189/// ```
190#[macro_export]
191macro_rules! stream {
192 ($($tt:tt)*) => {
193 $crate::__private::stream_inner!(($crate) $($tt)*)
194 }
195}
196
197/// Asynchronous fallible stream
198///
199/// See [crate](index.html) documentation for more details.
200///
201/// # Examples
202///
203/// ```
204/// use tokio::net::{TcpListener, TcpStream};
205///
206/// use async_stream::try_stream;
207/// use futures_core::stream::Stream;
208///
209/// use std::io;
210/// use std::net::SocketAddr;
211///
212/// fn bind_and_accept(addr: SocketAddr)
213/// -> impl Stream<Item = io::Result<TcpStream>>
214/// {
215/// try_stream! {
216/// let mut listener = TcpListener::bind(addr).await?;
217///
218/// loop {
219/// let (stream, addr) = listener.accept().await?;
220/// println!("received on {:?}", addr);
221/// yield stream;
222/// }
223/// }
224/// }
225/// ```
226#[macro_export]
227macro_rules! try_stream {
228 ($($tt:tt)*) => {
229 $crate::__private::try_stream_inner!(($crate) $($tt)*)
230 }
231}
232
233// Not public API.
234#[doc(hidden)]
235pub mod __private {
236 pub use crate::async_stream::AsyncStream;
237 pub use crate::next::next;
238 pub use async_stream_impl::{stream_inner, try_stream_inner};
239 pub mod yielder {
240 pub use crate::yielder::pair;
241 }
242}
243