1 | #![warn ( |
2 | missing_debug_implementations, |
3 | missing_docs, |
4 | rust_2018_idioms, |
5 | unreachable_pub |
6 | )] |
7 | #![doc (test(no_crate_inject, attr(deny(rust_2018_idioms))))] |
8 | |
9 | //! Asynchronous stream of elements. |
10 | //! |
11 | //! Provides two macros, `stream!` and `try_stream!`, allowing the caller to |
12 | //! define asynchronous streams of elements. These are implemented using `async` |
13 | //! & `await` notation. This crate works without unstable features. |
14 | //! |
15 | //! The `stream!` macro returns an anonymous type implementing the [`Stream`] |
16 | //! trait. The `Item` associated type is the type of the values yielded from the |
17 | //! stream. The `try_stream!` also returns an anonymous type implementing the |
18 | //! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The |
19 | //! `try_stream!` macro supports using `?` notation as part of the |
20 | //! implementation. |
21 | //! |
22 | //! # Usage |
23 | //! |
24 | //! A basic stream yielding numbers. Values are yielded using the `yield` |
25 | //! keyword. The stream block must return `()`. |
26 | //! |
27 | //! ```rust |
28 | //! use async_stream::stream; |
29 | //! |
30 | //! use futures_util::pin_mut; |
31 | //! use futures_util::stream::StreamExt; |
32 | //! |
33 | //! #[tokio::main] |
34 | //! async fn main() { |
35 | //! let s = stream! { |
36 | //! for i in 0..3 { |
37 | //! yield i; |
38 | //! } |
39 | //! }; |
40 | //! |
41 | //! pin_mut!(s); // needed for iteration |
42 | //! |
43 | //! while let Some(value) = s.next().await { |
44 | //! println!("got {}" , value); |
45 | //! } |
46 | //! } |
47 | //! ``` |
48 | //! |
49 | //! Streams may be returned by using `impl Stream<Item = T>`: |
50 | //! |
51 | //! ```rust |
52 | //! use async_stream::stream; |
53 | //! |
54 | //! use futures_core::stream::Stream; |
55 | //! use futures_util::pin_mut; |
56 | //! use futures_util::stream::StreamExt; |
57 | //! |
58 | //! fn zero_to_three() -> impl Stream<Item = u32> { |
59 | //! stream! { |
60 | //! for i in 0..3 { |
61 | //! yield i; |
62 | //! } |
63 | //! } |
64 | //! } |
65 | //! |
66 | //! #[tokio::main] |
67 | //! async fn main() { |
68 | //! let s = zero_to_three(); |
69 | //! pin_mut!(s); // needed for iteration |
70 | //! |
71 | //! while let Some(value) = s.next().await { |
72 | //! println!("got {}" , value); |
73 | //! } |
74 | //! } |
75 | //! ``` |
76 | //! |
77 | //! Streams may be implemented in terms of other streams - `async-stream` provides `for await` |
78 | //! syntax to assist with this: |
79 | //! |
80 | //! ```rust |
81 | //! use async_stream::stream; |
82 | //! |
83 | //! use futures_core::stream::Stream; |
84 | //! use futures_util::pin_mut; |
85 | //! use futures_util::stream::StreamExt; |
86 | //! |
87 | //! fn zero_to_three() -> impl Stream<Item = u32> { |
88 | //! stream! { |
89 | //! for i in 0..3 { |
90 | //! yield i; |
91 | //! } |
92 | //! } |
93 | //! } |
94 | //! |
95 | //! fn double<S: Stream<Item = u32>>(input: S) |
96 | //! -> impl Stream<Item = u32> |
97 | //! { |
98 | //! stream! { |
99 | //! for await value in input { |
100 | //! yield value * 2; |
101 | //! } |
102 | //! } |
103 | //! } |
104 | //! |
105 | //! #[tokio::main] |
106 | //! async fn main() { |
107 | //! let s = double(zero_to_three()); |
108 | //! pin_mut!(s); // needed for iteration |
109 | //! |
110 | //! while let Some(value) = s.next().await { |
111 | //! println!("got {}" , value); |
112 | //! } |
113 | //! } |
114 | //! ``` |
115 | //! |
116 | //! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` |
117 | //! of the returned stream is `Result` with `Ok` being the value yielded and |
118 | //! `Err` the error type returned by `?`. |
119 | //! |
120 | //! ```rust |
121 | //! use tokio::net::{TcpListener, TcpStream}; |
122 | //! |
123 | //! use async_stream::try_stream; |
124 | //! use futures_core::stream::Stream; |
125 | //! |
126 | //! use std::io; |
127 | //! use std::net::SocketAddr; |
128 | //! |
129 | //! fn bind_and_accept(addr: SocketAddr) |
130 | //! -> impl Stream<Item = io::Result<TcpStream>> |
131 | //! { |
132 | //! try_stream! { |
133 | //! let mut listener = TcpListener::bind(addr).await?; |
134 | //! |
135 | //! loop { |
136 | //! let (stream, addr) = listener.accept().await?; |
137 | //! println!("received on {:?}" , addr); |
138 | //! yield stream; |
139 | //! } |
140 | //! } |
141 | //! } |
142 | //! ``` |
143 | //! |
144 | //! # Implementation |
145 | //! |
146 | //! The `stream!` and `try_stream!` macros are implemented using proc macros. |
147 | //! The macro searches the syntax tree for instances of `yield $expr` and |
148 | //! transforms them into `sender.send($expr).await`. |
149 | //! |
150 | //! The stream uses a lightweight sender to send values from the stream |
151 | //! implementation to the caller. When entering the stream, an `Option<T>` is |
152 | //! stored on the stack. A pointer to the cell is stored in a thread local and |
153 | //! `poll` is called on the async block. When `poll` returns. |
154 | //! `sender.send(value)` stores the value that cell and yields back to the |
155 | //! caller. |
156 | //! |
157 | //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html |
158 | |
159 | mod async_stream; |
160 | mod next; |
161 | mod yielder; |
162 | |
163 | /// Asynchronous stream |
164 | /// |
165 | /// See [crate](index.html) documentation for more details. |
166 | /// |
167 | /// # Examples |
168 | /// |
169 | /// ``` |
170 | /// use async_stream::stream; |
171 | /// |
172 | /// use futures_util::pin_mut; |
173 | /// use futures_util::stream::StreamExt; |
174 | /// |
175 | /// #[tokio::main] |
176 | /// async fn main() { |
177 | /// let s = stream! { |
178 | /// for i in 0..3 { |
179 | /// yield i; |
180 | /// } |
181 | /// }; |
182 | /// |
183 | /// pin_mut!(s); // needed for iteration |
184 | /// |
185 | /// while let Some(value) = s.next().await { |
186 | /// println!("got {}" , value); |
187 | /// } |
188 | /// } |
189 | /// ``` |
190 | #[macro_export ] |
191 | macro_rules! stream { |
192 | ($($tt:tt)*) => { |
193 | $crate::__private::stream_inner!(($crate) $($tt)*) |
194 | } |
195 | } |
196 | |
197 | /// Asynchronous fallible stream |
198 | /// |
199 | /// See [crate](index.html) documentation for more details. |
200 | /// |
201 | /// # Examples |
202 | /// |
203 | /// ``` |
204 | /// use tokio::net::{TcpListener, TcpStream}; |
205 | /// |
206 | /// use async_stream::try_stream; |
207 | /// use futures_core::stream::Stream; |
208 | /// |
209 | /// use std::io; |
210 | /// use std::net::SocketAddr; |
211 | /// |
212 | /// fn bind_and_accept(addr: SocketAddr) |
213 | /// -> impl Stream<Item = io::Result<TcpStream>> |
214 | /// { |
215 | /// try_stream! { |
216 | /// let mut listener = TcpListener::bind(addr).await?; |
217 | /// |
218 | /// loop { |
219 | /// let (stream, addr) = listener.accept().await?; |
220 | /// println!("received on {:?}" , addr); |
221 | /// yield stream; |
222 | /// } |
223 | /// } |
224 | /// } |
225 | /// ``` |
226 | #[macro_export ] |
227 | macro_rules! try_stream { |
228 | ($($tt:tt)*) => { |
229 | $crate::__private::try_stream_inner!(($crate) $($tt)*) |
230 | } |
231 | } |
232 | |
233 | // Not public API. |
234 | #[doc (hidden)] |
235 | pub mod __private { |
236 | pub use crate::async_stream::AsyncStream; |
237 | pub use crate::next::next; |
238 | pub use async_stream_impl::{stream_inner, try_stream_inner}; |
239 | pub mod yielder { |
240 | pub use crate::yielder::pair; |
241 | } |
242 | } |
243 | |