1 | use futures::channel::{mpsc, oneshot}; |
2 | use futures::executor::block_on; |
3 | use futures::future::{self, poll_fn, FutureExt}; |
4 | use futures::sink::SinkExt; |
5 | use futures::stream::StreamExt; |
6 | use futures::task::{Context, Poll}; |
7 | use futures::{ |
8 | join, pending, pin_mut, poll, select, select_biased, stream, stream_select, try_join, |
9 | }; |
10 | use std::mem; |
11 | |
12 | #[test] |
13 | fn poll_and_pending() { |
14 | let pending_once = async { pending!() }; |
15 | block_on(async { |
16 | pin_mut!(pending_once); |
17 | assert_eq!(Poll::Pending, poll!(&mut pending_once)); |
18 | assert_eq!(Poll::Ready(()), poll!(&mut pending_once)); |
19 | }); |
20 | } |
21 | |
22 | #[test] |
23 | fn join() { |
24 | let (tx1, rx1) = oneshot::channel::<i32>(); |
25 | let (tx2, rx2) = oneshot::channel::<i32>(); |
26 | |
27 | let fut = async { |
28 | let res = join!(rx1, rx2); |
29 | assert_eq!((Ok(1), Ok(2)), res); |
30 | }; |
31 | |
32 | block_on(async { |
33 | pin_mut!(fut); |
34 | assert_eq!(Poll::Pending, poll!(&mut fut)); |
35 | tx1.send(1).unwrap(); |
36 | assert_eq!(Poll::Pending, poll!(&mut fut)); |
37 | tx2.send(2).unwrap(); |
38 | assert_eq!(Poll::Ready(()), poll!(&mut fut)); |
39 | }); |
40 | } |
41 | |
42 | #[test] |
43 | fn select() { |
44 | let (tx1, rx1) = oneshot::channel::<i32>(); |
45 | let (_tx2, rx2) = oneshot::channel::<i32>(); |
46 | tx1.send(1).unwrap(); |
47 | let mut ran = false; |
48 | block_on(async { |
49 | select! { |
50 | res = rx1.fuse() => { |
51 | assert_eq!(Ok(1), res); |
52 | ran = true; |
53 | }, |
54 | _ = rx2.fuse() => unreachable!(), |
55 | } |
56 | }); |
57 | assert!(ran); |
58 | } |
59 | |
60 | #[test] |
61 | fn select_biased() { |
62 | let (tx1, rx1) = oneshot::channel::<i32>(); |
63 | let (_tx2, rx2) = oneshot::channel::<i32>(); |
64 | tx1.send(1).unwrap(); |
65 | let mut ran = false; |
66 | block_on(async { |
67 | select_biased! { |
68 | res = rx1.fuse() => { |
69 | assert_eq!(Ok(1), res); |
70 | ran = true; |
71 | }, |
72 | _ = rx2.fuse() => unreachable!(), |
73 | } |
74 | }); |
75 | assert!(ran); |
76 | } |
77 | |
78 | #[test] |
79 | fn select_streams() { |
80 | let (mut tx1, rx1) = mpsc::channel::<i32>(1); |
81 | let (mut tx2, rx2) = mpsc::channel::<i32>(1); |
82 | let mut rx1 = rx1.fuse(); |
83 | let mut rx2 = rx2.fuse(); |
84 | let mut ran = false; |
85 | let mut total = 0; |
86 | block_on(async { |
87 | let mut tx1_opt; |
88 | let mut tx2_opt; |
89 | select! { |
90 | _ = rx1.next() => panic!(), |
91 | _ = rx2.next() => panic!(), |
92 | default => { |
93 | tx1.send(2).await.unwrap(); |
94 | tx2.send(3).await.unwrap(); |
95 | tx1_opt = Some(tx1); |
96 | tx2_opt = Some(tx2); |
97 | } |
98 | complete => panic!(), |
99 | } |
100 | loop { |
101 | select! { |
102 | // runs first and again after default |
103 | x = rx1.next() => if let Some(x) = x { total += x; }, |
104 | // runs second and again after default |
105 | x = rx2.next() => if let Some(x) = x { total += x; }, |
106 | // runs third |
107 | default => { |
108 | assert_eq!(total, 5); |
109 | ran = true; |
110 | drop(tx1_opt.take().unwrap()); |
111 | drop(tx2_opt.take().unwrap()); |
112 | }, |
113 | // runs last |
114 | complete => break, |
115 | }; |
116 | } |
117 | }); |
118 | assert!(ran); |
119 | } |
120 | |
121 | #[test] |
122 | fn select_can_move_uncompleted_futures() { |
123 | let (tx1, rx1) = oneshot::channel::<i32>(); |
124 | let (tx2, rx2) = oneshot::channel::<i32>(); |
125 | tx1.send(1).unwrap(); |
126 | tx2.send(2).unwrap(); |
127 | let mut ran = false; |
128 | let mut rx1 = rx1.fuse(); |
129 | let mut rx2 = rx2.fuse(); |
130 | block_on(async { |
131 | select! { |
132 | res = rx1 => { |
133 | assert_eq!(Ok(1), res); |
134 | assert_eq!(Ok(2), rx2.await); |
135 | ran = true; |
136 | }, |
137 | res = rx2 => { |
138 | assert_eq!(Ok(2), res); |
139 | assert_eq!(Ok(1), rx1.await); |
140 | ran = true; |
141 | }, |
142 | } |
143 | }); |
144 | assert!(ran); |
145 | } |
146 | |
147 | #[test] |
148 | fn select_nested() { |
149 | let mut outer_fut = future::ready(1); |
150 | let mut inner_fut = future::ready(2); |
151 | let res = block_on(async { |
152 | select! { |
153 | x = outer_fut => { |
154 | select! { |
155 | y = inner_fut => x + y, |
156 | } |
157 | } |
158 | } |
159 | }); |
160 | assert_eq!(res, 3); |
161 | } |
162 | |
163 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
164 | #[test] |
165 | fn select_size() { |
166 | let fut = async { |
167 | let mut ready = future::ready(0i32); |
168 | select! { |
169 | _ = ready => {}, |
170 | } |
171 | }; |
172 | assert_eq!(mem::size_of_val(&fut), 24); |
173 | |
174 | let fut = async { |
175 | let mut ready1 = future::ready(0i32); |
176 | let mut ready2 = future::ready(0i32); |
177 | select! { |
178 | _ = ready1 => {}, |
179 | _ = ready2 => {}, |
180 | } |
181 | }; |
182 | assert_eq!(mem::size_of_val(&fut), 40); |
183 | } |
184 | |
185 | #[test] |
186 | fn select_on_non_unpin_expressions() { |
187 | // The returned Future is !Unpin |
188 | let make_non_unpin_fut = || async { 5 }; |
189 | |
190 | let res = block_on(async { |
191 | let select_res; |
192 | select! { |
193 | value_1 = make_non_unpin_fut().fuse() => select_res = value_1, |
194 | value_2 = make_non_unpin_fut().fuse() => select_res = value_2, |
195 | }; |
196 | select_res |
197 | }); |
198 | assert_eq!(res, 5); |
199 | } |
200 | |
201 | #[test] |
202 | fn select_on_non_unpin_expressions_with_default() { |
203 | // The returned Future is !Unpin |
204 | let make_non_unpin_fut = || async { 5 }; |
205 | |
206 | let res = block_on(async { |
207 | let select_res; |
208 | select! { |
209 | value_1 = make_non_unpin_fut().fuse() => select_res = value_1, |
210 | value_2 = make_non_unpin_fut().fuse() => select_res = value_2, |
211 | default => select_res = 7, |
212 | }; |
213 | select_res |
214 | }); |
215 | assert_eq!(res, 5); |
216 | } |
217 | |
218 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
219 | #[test] |
220 | fn select_on_non_unpin_size() { |
221 | // The returned Future is !Unpin |
222 | let make_non_unpin_fut = || async { 5 }; |
223 | |
224 | let fut = async { |
225 | let select_res; |
226 | select! { |
227 | value_1 = make_non_unpin_fut().fuse() => select_res = value_1, |
228 | value_2 = make_non_unpin_fut().fuse() => select_res = value_2, |
229 | }; |
230 | select_res |
231 | }; |
232 | |
233 | assert_eq!(32, mem::size_of_val(&fut)); |
234 | } |
235 | |
236 | #[test] |
237 | fn select_can_be_used_as_expression() { |
238 | block_on(async { |
239 | let res = select! { |
240 | x = future::ready(7) => x, |
241 | y = future::ready(3) => y + 1, |
242 | }; |
243 | assert!(res == 7 || res == 4); |
244 | }); |
245 | } |
246 | |
247 | #[test] |
248 | fn select_with_default_can_be_used_as_expression() { |
249 | fn poll_always_pending<T>(_cx: &mut Context<'_>) -> Poll<T> { |
250 | Poll::Pending |
251 | } |
252 | |
253 | block_on(async { |
254 | let res = select! { |
255 | x = poll_fn(poll_always_pending::<i32>).fuse() => x, |
256 | y = poll_fn(poll_always_pending::<i32>).fuse() => y + 1, |
257 | default => 99, |
258 | }; |
259 | assert_eq!(res, 99); |
260 | }); |
261 | } |
262 | |
263 | #[test] |
264 | fn select_with_complete_can_be_used_as_expression() { |
265 | block_on(async { |
266 | let res = select! { |
267 | x = future::pending::<i32>() => x, |
268 | y = future::pending::<i32>() => y + 1, |
269 | default => 99, |
270 | complete => 237, |
271 | }; |
272 | assert_eq!(res, 237); |
273 | }); |
274 | } |
275 | |
276 | #[test] |
277 | #[allow (unused_assignments)] |
278 | fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { |
279 | async fn require_mutable(_: &mut i32) {} |
280 | async fn async_noop() {} |
281 | |
282 | block_on(async { |
283 | let mut value = 234; |
284 | select! { |
285 | _ = require_mutable(&mut value).fuse() => { }, |
286 | _ = async_noop().fuse() => { |
287 | value += 5; |
288 | }, |
289 | } |
290 | }); |
291 | } |
292 | |
293 | #[test] |
294 | #[allow (unused_assignments)] |
295 | fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { |
296 | async fn require_mutable(_: &mut i32) {} |
297 | async fn async_noop() {} |
298 | |
299 | block_on(async { |
300 | let mut value = 234; |
301 | select! { |
302 | _ = require_mutable(&mut value).fuse() => { }, |
303 | _ = async_noop().fuse() => { |
304 | value += 5; |
305 | }, |
306 | default => { |
307 | value += 27; |
308 | }, |
309 | } |
310 | }); |
311 | } |
312 | |
313 | #[test] |
314 | #[allow (unused_assignments)] |
315 | fn stream_select() { |
316 | // stream_select! macro |
317 | block_on(async { |
318 | let endless_ints = |i| stream::iter(vec![i].into_iter().cycle()); |
319 | |
320 | let mut endless_ones = stream_select!(endless_ints(1i32), stream::pending()); |
321 | assert_eq!(endless_ones.next().await, Some(1)); |
322 | assert_eq!(endless_ones.next().await, Some(1)); |
323 | |
324 | let mut finite_list = |
325 | stream_select!(stream::iter(vec![1].into_iter()), stream::iter(vec![1].into_iter())); |
326 | assert_eq!(finite_list.next().await, Some(1)); |
327 | assert_eq!(finite_list.next().await, Some(1)); |
328 | assert_eq!(finite_list.next().await, None); |
329 | |
330 | let endless_mixed = stream_select!(endless_ints(1i32), endless_ints(2), endless_ints(3)); |
331 | // Take 1000, and assert a somewhat even distribution of values. |
332 | // The fairness is randomized, but over 1000 samples we should be pretty close to even. |
333 | // This test may be a bit flaky. Feel free to adjust the margins as you see fit. |
334 | let mut count = 0; |
335 | let results = endless_mixed |
336 | .take_while(move |_| { |
337 | count += 1; |
338 | let ret = count < 1000; |
339 | async move { ret } |
340 | }) |
341 | .collect::<Vec<_>>() |
342 | .await; |
343 | assert!(results.iter().filter(|x| **x == 1).count() >= 299); |
344 | assert!(results.iter().filter(|x| **x == 2).count() >= 299); |
345 | assert!(results.iter().filter(|x| **x == 3).count() >= 299); |
346 | }); |
347 | } |
348 | |
349 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
350 | #[test] |
351 | fn join_size() { |
352 | let fut = async { |
353 | let ready = future::ready(0i32); |
354 | join!(ready) |
355 | }; |
356 | assert_eq!(mem::size_of_val(&fut), 24); |
357 | |
358 | let fut = async { |
359 | let ready1 = future::ready(0i32); |
360 | let ready2 = future::ready(0i32); |
361 | join!(ready1, ready2) |
362 | }; |
363 | assert_eq!(mem::size_of_val(&fut), 40); |
364 | } |
365 | |
366 | #[cfg_attr (not(target_pointer_width = "64" ), ignore)] |
367 | #[test] |
368 | fn try_join_size() { |
369 | let fut = async { |
370 | let ready = future::ready(Ok::<i32, i32>(0)); |
371 | try_join!(ready) |
372 | }; |
373 | assert_eq!(mem::size_of_val(&fut), 24); |
374 | |
375 | let fut = async { |
376 | let ready1 = future::ready(Ok::<i32, i32>(0)); |
377 | let ready2 = future::ready(Ok::<i32, i32>(0)); |
378 | try_join!(ready1, ready2) |
379 | }; |
380 | assert_eq!(mem::size_of_val(&fut), 48); |
381 | } |
382 | |
383 | #[allow (clippy::let_underscore_future)] |
384 | #[test] |
385 | fn join_doesnt_require_unpin() { |
386 | let _ = async { join!(async {}, async {}) }; |
387 | } |
388 | |
389 | #[allow (clippy::let_underscore_future)] |
390 | #[test] |
391 | fn try_join_doesnt_require_unpin() { |
392 | let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; |
393 | } |
394 | |