1 | use async_stream::stream; |
2 | |
3 | use futures_core::stream::{FusedStream, Stream}; |
4 | use futures_util::pin_mut; |
5 | use futures_util::stream::StreamExt; |
6 | use tokio::sync::mpsc; |
7 | use tokio_test::assert_ok; |
8 | |
9 | #[tokio::test ] |
10 | async fn noop_stream() { |
11 | let s = stream! {}; |
12 | pin_mut!(s); |
13 | |
14 | while s.next().await.is_some() { |
15 | unreachable!(); |
16 | } |
17 | } |
18 | |
19 | #[tokio::test ] |
20 | async fn empty_stream() { |
21 | let mut ran = false; |
22 | |
23 | { |
24 | let r = &mut ran; |
25 | let s = stream! { |
26 | *r = true; |
27 | println!("hello world!" ); |
28 | }; |
29 | pin_mut!(s); |
30 | |
31 | while s.next().await.is_some() { |
32 | unreachable!(); |
33 | } |
34 | } |
35 | |
36 | assert!(ran); |
37 | } |
38 | |
39 | #[tokio::test ] |
40 | async fn yield_single_value() { |
41 | let s = stream! { |
42 | yield "hello" ; |
43 | }; |
44 | |
45 | let values: Vec<_> = s.collect().await; |
46 | |
47 | assert_eq!(1, values.len()); |
48 | assert_eq!("hello" , values[0]); |
49 | } |
50 | |
51 | #[tokio::test ] |
52 | async fn fused() { |
53 | let s = stream! { |
54 | yield "hello" ; |
55 | }; |
56 | pin_mut!(s); |
57 | |
58 | assert!(!s.is_terminated()); |
59 | assert_eq!(s.next().await, Some("hello" )); |
60 | assert_eq!(s.next().await, None); |
61 | |
62 | assert!(s.is_terminated()); |
63 | // This should return None from now on |
64 | assert_eq!(s.next().await, None); |
65 | } |
66 | |
67 | #[tokio::test ] |
68 | async fn yield_multi_value() { |
69 | let s = stream! { |
70 | yield "hello" ; |
71 | yield "world" ; |
72 | yield "dizzy" ; |
73 | }; |
74 | |
75 | let values: Vec<_> = s.collect().await; |
76 | |
77 | assert_eq!(3, values.len()); |
78 | assert_eq!("hello" , values[0]); |
79 | assert_eq!("world" , values[1]); |
80 | assert_eq!("dizzy" , values[2]); |
81 | } |
82 | |
83 | #[tokio::test ] |
84 | async fn unit_yield_in_select() { |
85 | use tokio::select; |
86 | |
87 | async fn do_stuff_async() {} |
88 | |
89 | let s = stream! { |
90 | select! { |
91 | _ = do_stuff_async() => yield, |
92 | else => yield, |
93 | } |
94 | }; |
95 | |
96 | let values: Vec<_> = s.collect().await; |
97 | assert_eq!(values.len(), 1); |
98 | } |
99 | |
100 | #[tokio::test ] |
101 | async fn yield_with_select() { |
102 | use tokio::select; |
103 | |
104 | async fn do_stuff_async() {} |
105 | async fn more_async_work() {} |
106 | |
107 | let s = stream! { |
108 | select! { |
109 | _ = do_stuff_async() => yield "hey" , |
110 | _ = more_async_work() => yield "hey" , |
111 | else => yield "hey" , |
112 | } |
113 | }; |
114 | |
115 | let values: Vec<_> = s.collect().await; |
116 | assert_eq!(values, vec!["hey" ]); |
117 | } |
118 | |
119 | #[tokio::test ] |
120 | async fn return_stream() { |
121 | fn build_stream() -> impl Stream<Item = u32> { |
122 | stream! { |
123 | yield 1; |
124 | yield 2; |
125 | yield 3; |
126 | } |
127 | } |
128 | |
129 | let s = build_stream(); |
130 | |
131 | let values: Vec<_> = s.collect().await; |
132 | assert_eq!(3, values.len()); |
133 | assert_eq!(1, values[0]); |
134 | assert_eq!(2, values[1]); |
135 | assert_eq!(3, values[2]); |
136 | } |
137 | |
138 | #[tokio::test ] |
139 | async fn consume_channel() { |
140 | let (tx, mut rx) = mpsc::channel(10); |
141 | |
142 | let s = stream! { |
143 | while let Some(v) = rx.recv().await { |
144 | yield v; |
145 | } |
146 | }; |
147 | |
148 | pin_mut!(s); |
149 | |
150 | for i in 0..3 { |
151 | assert_ok!(tx.send(i).await); |
152 | assert_eq!(Some(i), s.next().await); |
153 | } |
154 | |
155 | drop(tx); |
156 | assert_eq!(None, s.next().await); |
157 | } |
158 | |
159 | #[tokio::test ] |
160 | async fn borrow_self() { |
161 | struct Data(String); |
162 | |
163 | impl Data { |
164 | fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a { |
165 | stream! { |
166 | yield &self.0[..]; |
167 | } |
168 | } |
169 | } |
170 | |
171 | let data = Data("hello" .to_string()); |
172 | let s = data.stream(); |
173 | pin_mut!(s); |
174 | |
175 | assert_eq!(Some("hello" ), s.next().await); |
176 | } |
177 | |
178 | #[tokio::test ] |
179 | async fn stream_in_stream() { |
180 | let s = stream! { |
181 | let s = stream! { |
182 | for i in 0..3 { |
183 | yield i; |
184 | } |
185 | }; |
186 | |
187 | pin_mut!(s); |
188 | while let Some(v) = s.next().await { |
189 | yield v; |
190 | } |
191 | }; |
192 | |
193 | let values: Vec<_> = s.collect().await; |
194 | assert_eq!(3, values.len()); |
195 | } |
196 | |
197 | #[tokio::test ] |
198 | async fn yield_non_unpin_value() { |
199 | let s: Vec<_> = stream! { |
200 | for i in 0..3 { |
201 | yield async move { i }; |
202 | } |
203 | } |
204 | .buffered(1) |
205 | .collect() |
206 | .await; |
207 | |
208 | assert_eq!(s, vec![0, 1, 2]); |
209 | } |
210 | |
211 | #[test] |
212 | fn inner_try_stream() { |
213 | use async_stream::try_stream; |
214 | use tokio::select; |
215 | |
216 | async fn do_stuff_async() {} |
217 | |
218 | let _ = stream! { |
219 | select! { |
220 | _ = do_stuff_async() => { |
221 | let another_s = try_stream! { |
222 | yield; |
223 | }; |
224 | let _: Result<(), ()> = Box::pin(another_s).next().await.unwrap(); |
225 | }, |
226 | else => {}, |
227 | } |
228 | yield |
229 | }; |
230 | } |
231 | |
232 | #[rustversion::attr (not(stable), ignore)] |
233 | #[test] |
234 | fn test() { |
235 | let t = trybuild::TestCases::new(); |
236 | t.compile_fail("tests/ui/*.rs" ); |
237 | } |
238 | |