1 | use std::cell::Cell; |
2 | use std::iter; |
3 | use std::pin::Pin; |
4 | use std::rc::Rc; |
5 | use std::sync::Arc; |
6 | use std::task::Context; |
7 | |
8 | use futures::channel::mpsc; |
9 | use futures::executor::block_on; |
10 | use futures::future::{self, Future}; |
11 | use futures::lock::Mutex; |
12 | use futures::sink::SinkExt; |
13 | use futures::stream::{self, StreamExt}; |
14 | use futures::task::Poll; |
15 | use futures::{ready, FutureExt}; |
16 | use futures_core::Stream; |
17 | use futures_executor::ThreadPool; |
18 | use futures_test::task::noop_context; |
19 | |
20 | #[test] |
21 | fn select() { |
22 | fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) { |
23 | let a = stream::iter(a); |
24 | let b = stream::iter(b); |
25 | let vec = block_on(stream::select(a, b).collect::<Vec<_>>()); |
26 | assert_eq!(vec, expected); |
27 | } |
28 | |
29 | select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]); |
30 | select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]); |
31 | select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]); |
32 | } |
33 | |
34 | #[test] |
35 | fn flat_map() { |
36 | block_on(async { |
37 | let st = |
38 | stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]); |
39 | |
40 | let values: Vec<_> = |
41 | st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await; |
42 | |
43 | assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]); |
44 | }); |
45 | } |
46 | |
47 | #[test] |
48 | fn scan() { |
49 | block_on(async { |
50 | let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) |
51 | .scan(1, |state, e| { |
52 | *state += 1; |
53 | futures::future::ready(if e < *state { Some(e) } else { None }) |
54 | }) |
55 | .collect::<Vec<_>>() |
56 | .await; |
57 | |
58 | assert_eq!(values, vec![1u8, 2, 3, 4]); |
59 | }); |
60 | } |
61 | |
62 | #[test] |
63 | fn flatten_unordered() { |
64 | use futures::executor::block_on; |
65 | use futures::stream::*; |
66 | use futures::task::*; |
67 | use std::convert::identity; |
68 | use std::pin::Pin; |
69 | use std::sync::atomic::{AtomicBool, Ordering}; |
70 | use std::thread; |
71 | use std::time::Duration; |
72 | |
73 | struct DataStream { |
74 | data: Vec<u8>, |
75 | polled: bool, |
76 | wake_immediately: bool, |
77 | } |
78 | |
79 | impl Stream for DataStream { |
80 | type Item = u8; |
81 | |
82 | fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { |
83 | if !self.polled { |
84 | if !self.wake_immediately { |
85 | let waker = ctx.waker().clone(); |
86 | let sleep_time = |
87 | Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10); |
88 | thread::spawn(move || { |
89 | thread::sleep(sleep_time); |
90 | waker.wake_by_ref(); |
91 | }); |
92 | } else { |
93 | ctx.waker().wake_by_ref(); |
94 | } |
95 | self.polled = true; |
96 | Poll::Pending |
97 | } else { |
98 | self.polled = false; |
99 | Poll::Ready(self.data.pop()) |
100 | } |
101 | } |
102 | } |
103 | |
104 | struct Interchanger { |
105 | polled: bool, |
106 | base: u8, |
107 | wake_immediately: bool, |
108 | } |
109 | |
110 | impl Stream for Interchanger { |
111 | type Item = DataStream; |
112 | |
113 | fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> { |
114 | if !self.polled { |
115 | self.polled = true; |
116 | if !self.wake_immediately { |
117 | let waker = ctx.waker().clone(); |
118 | let sleep_time = Duration::from_millis(self.base as u64); |
119 | thread::spawn(move || { |
120 | thread::sleep(sleep_time); |
121 | waker.wake_by_ref(); |
122 | }); |
123 | } else { |
124 | ctx.waker().wake_by_ref(); |
125 | } |
126 | Poll::Pending |
127 | } else { |
128 | let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect(); |
129 | self.base += 1; |
130 | self.polled = false; |
131 | Poll::Ready(Some(DataStream { |
132 | polled: false, |
133 | data, |
134 | wake_immediately: self.wake_immediately && self.base % 2 == 0, |
135 | })) |
136 | } |
137 | } |
138 | } |
139 | |
140 | // basic behaviour |
141 | { |
142 | block_on(async { |
143 | let st = stream::iter(vec![ |
144 | stream::iter(0..=4u8), |
145 | stream::iter(6..=10), |
146 | stream::iter(10..=12), |
147 | ]); |
148 | |
149 | let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await; |
150 | |
151 | assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]); |
152 | }); |
153 | |
154 | block_on(async { |
155 | let st = stream::iter(vec![ |
156 | stream::iter(0..=4u8), |
157 | stream::iter(6..=10), |
158 | stream::iter(0..=2), |
159 | ]); |
160 | |
161 | let mut fm_unordered = st |
162 | .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0))) |
163 | .collect::<Vec<_>>() |
164 | .await; |
165 | |
166 | fm_unordered.sort_unstable(); |
167 | |
168 | assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]); |
169 | }); |
170 | } |
171 | |
172 | // wake up immediately |
173 | { |
174 | block_on(async { |
175 | let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } |
176 | .take(10) |
177 | .map(|s| s.map(identity)) |
178 | .flatten_unordered(10) |
179 | .collect::<Vec<_>>() |
180 | .await; |
181 | |
182 | fl_unordered.sort_unstable(); |
183 | |
184 | assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); |
185 | }); |
186 | |
187 | block_on(async { |
188 | let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true } |
189 | .take(10) |
190 | .flat_map_unordered(10, |s| s.map(identity)) |
191 | .collect::<Vec<_>>() |
192 | .await; |
193 | |
194 | fm_unordered.sort_unstable(); |
195 | |
196 | assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); |
197 | }); |
198 | } |
199 | |
200 | // wake up after delay |
201 | { |
202 | block_on(async { |
203 | let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } |
204 | .take(10) |
205 | .map(|s| s.map(identity)) |
206 | .flatten_unordered(10) |
207 | .collect::<Vec<_>>() |
208 | .await; |
209 | |
210 | fl_unordered.sort_unstable(); |
211 | |
212 | assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>()); |
213 | }); |
214 | |
215 | block_on(async { |
216 | let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false } |
217 | .take(10) |
218 | .flat_map_unordered(10, |s| s.map(identity)) |
219 | .collect::<Vec<_>>() |
220 | .await; |
221 | |
222 | fm_unordered.sort_unstable(); |
223 | |
224 | assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); |
225 | }); |
226 | |
227 | block_on(async { |
228 | let (mut fm_unordered, mut fl_unordered) = futures_util::join!( |
229 | Interchanger { polled: false, base: 0, wake_immediately: false } |
230 | .take(10) |
231 | .flat_map_unordered(10, |s| s.map(identity)) |
232 | .collect::<Vec<_>>(), |
233 | Interchanger { polled: false, base: 0, wake_immediately: false } |
234 | .take(10) |
235 | .map(|s| s.map(identity)) |
236 | .flatten_unordered(10) |
237 | .collect::<Vec<_>>() |
238 | ); |
239 | |
240 | fm_unordered.sort_unstable(); |
241 | fl_unordered.sort_unstable(); |
242 | |
243 | assert_eq!(fm_unordered, fl_unordered); |
244 | assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>()); |
245 | }); |
246 | } |
247 | |
248 | // waker panics |
249 | { |
250 | let stream = Arc::new(Mutex::new( |
251 | Interchanger { polled: false, base: 0, wake_immediately: true } |
252 | .take(10) |
253 | .flat_map_unordered(10, |s| s.map(identity)), |
254 | )); |
255 | |
256 | struct PanicWaker; |
257 | |
258 | impl ArcWake for PanicWaker { |
259 | fn wake_by_ref(_arc_self: &Arc<Self>) { |
260 | panic!("WAKE UP" ); |
261 | } |
262 | } |
263 | |
264 | std::thread::spawn({ |
265 | let stream = stream.clone(); |
266 | move || { |
267 | let mut st = poll_fn(|cx| { |
268 | let mut lock = ready!(stream.lock().poll_unpin(cx)); |
269 | |
270 | let panic_waker = waker(Arc::new(PanicWaker)); |
271 | let mut panic_cx = Context::from_waker(&panic_waker); |
272 | let _ = ready!(lock.poll_next_unpin(&mut panic_cx)); |
273 | |
274 | Poll::Ready(Some(())) |
275 | }); |
276 | |
277 | block_on(st.next()) |
278 | } |
279 | }) |
280 | .join() |
281 | .unwrap_err(); |
282 | |
283 | block_on(async move { |
284 | let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; |
285 | values.sort_unstable(); |
286 | |
287 | assert_eq!(values, (0..60).collect::<Vec<u8>>()); |
288 | }); |
289 | } |
290 | |
291 | // stream panics |
292 | { |
293 | let st = stream::iter(iter::once( |
294 | once(Box::pin(async { panic!("Polled" ) })).left_stream::<DataStream>(), |
295 | )) |
296 | .chain( |
297 | Interchanger { polled: false, base: 0, wake_immediately: true } |
298 | .map(|stream| stream.right_stream()) |
299 | .take(10), |
300 | ); |
301 | |
302 | let stream = Arc::new(Mutex::new(st.flatten_unordered(10))); |
303 | |
304 | std::thread::spawn({ |
305 | let stream = stream.clone(); |
306 | move || { |
307 | let mut st = poll_fn(|cx| { |
308 | let mut lock = ready!(stream.lock().poll_unpin(cx)); |
309 | let data = ready!(lock.poll_next_unpin(cx)); |
310 | |
311 | Poll::Ready(data) |
312 | }); |
313 | |
314 | block_on(st.next()) |
315 | } |
316 | }) |
317 | .join() |
318 | .unwrap_err(); |
319 | |
320 | block_on(async move { |
321 | let mut values: Vec<_> = stream.lock().await.by_ref().collect().await; |
322 | values.sort_unstable(); |
323 | |
324 | assert_eq!(values, (0..60).collect::<Vec<u8>>()); |
325 | }); |
326 | } |
327 | |
328 | fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> { |
329 | let ready = Arc::new(AtomicBool::new(false)); |
330 | let mut spawned = false; |
331 | |
332 | future::poll_fn(move |cx| { |
333 | if !spawned { |
334 | let waker = cx.waker().clone(); |
335 | let ready = ready.clone(); |
336 | |
337 | std::thread::spawn(move || { |
338 | std::thread::sleep(time); |
339 | ready.store(true, Ordering::Release); |
340 | |
341 | waker.wake_by_ref() |
342 | }); |
343 | spawned = true; |
344 | } |
345 | |
346 | if ready.load(Ordering::Acquire) { |
347 | Poll::Ready(value.clone()) |
348 | } else { |
349 | Poll::Pending |
350 | } |
351 | }) |
352 | } |
353 | |
354 | fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin |
355 | where |
356 | S::Item: Clone, |
357 | { |
358 | let inner = st |
359 | .then(|item| timeout(Duration::from_millis(50), item)) |
360 | .enumerate() |
361 | .map(|(idx, value)| { |
362 | stream::once(if idx % 2 == 0 { |
363 | future::ready(value).left_future() |
364 | } else { |
365 | timeout(Duration::from_millis(100), value).right_future() |
366 | }) |
367 | }) |
368 | .flatten_unordered(None); |
369 | |
370 | stream::once(future::ready(inner)).flatten_unordered(None) |
371 | } |
372 | |
373 | // nested `flatten_unordered` |
374 | let te = ThreadPool::new().unwrap(); |
375 | let base_handle = te |
376 | .spawn_with_handle(async move { |
377 | let fu = build_nested_fu(stream::iter(1..=10)); |
378 | |
379 | assert_eq!(fu.count().await, 10); |
380 | }) |
381 | .unwrap(); |
382 | |
383 | block_on(base_handle); |
384 | |
385 | let empty_state_move_handle = te |
386 | .spawn_with_handle(async move { |
387 | let mut fu = build_nested_fu(stream::iter(1..10)); |
388 | { |
389 | let mut cx = noop_context(); |
390 | let _ = fu.poll_next_unpin(&mut cx); |
391 | let _ = fu.poll_next_unpin(&mut cx); |
392 | } |
393 | |
394 | assert_eq!(fu.count().await, 9); |
395 | }) |
396 | .unwrap(); |
397 | |
398 | block_on(empty_state_move_handle); |
399 | } |
400 | |
401 | #[test] |
402 | fn take_until() { |
403 | fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> { |
404 | let mut i = 0; |
405 | future::poll_fn(move |_cx| { |
406 | i += 1; |
407 | if i <= stop_on { |
408 | Poll::Pending |
409 | } else { |
410 | Poll::Ready(()) |
411 | } |
412 | }) |
413 | } |
414 | |
415 | block_on(async { |
416 | // Verify stopping works: |
417 | let stream = stream::iter(1u32..=10); |
418 | let stop_fut = make_stop_fut(5); |
419 | |
420 | let stream = stream.take_until(stop_fut); |
421 | let last = stream.fold(0, |_, i| async move { i }).await; |
422 | assert_eq!(last, 5); |
423 | |
424 | // Verify take_future() works: |
425 | let stream = stream::iter(1..=10); |
426 | let stop_fut = make_stop_fut(5); |
427 | |
428 | let mut stream = stream.take_until(stop_fut); |
429 | |
430 | assert_eq!(stream.next().await, Some(1)); |
431 | assert_eq!(stream.next().await, Some(2)); |
432 | |
433 | stream.take_future(); |
434 | |
435 | let last = stream.fold(0, |_, i| async move { i }).await; |
436 | assert_eq!(last, 10); |
437 | |
438 | // Verify take_future() returns None if stream is stopped: |
439 | let stream = stream::iter(1u32..=10); |
440 | let stop_fut = make_stop_fut(1); |
441 | let mut stream = stream.take_until(stop_fut); |
442 | assert_eq!(stream.next().await, Some(1)); |
443 | assert_eq!(stream.next().await, None); |
444 | assert!(stream.take_future().is_none()); |
445 | |
446 | // Verify TakeUntil is fused: |
447 | let mut i = 0; |
448 | let stream = stream::poll_fn(move |_cx| { |
449 | i += 1; |
450 | match i { |
451 | 1 => Poll::Ready(Some(1)), |
452 | 2 => Poll::Ready(None), |
453 | _ => panic!("TakeUntil not fused" ), |
454 | } |
455 | }); |
456 | |
457 | let stop_fut = make_stop_fut(1); |
458 | let mut stream = stream.take_until(stop_fut); |
459 | assert_eq!(stream.next().await, Some(1)); |
460 | assert_eq!(stream.next().await, None); |
461 | assert_eq!(stream.next().await, None); |
462 | }); |
463 | } |
464 | |
465 | #[test] |
466 | #[should_panic ] |
467 | fn chunks_panic_on_cap_zero() { |
468 | let (_, rx1) = mpsc::channel::<()>(1); |
469 | |
470 | let _ = rx1.chunks(0); |
471 | } |
472 | |
473 | #[test] |
474 | #[should_panic ] |
475 | fn ready_chunks_panic_on_cap_zero() { |
476 | let (_, rx1) = mpsc::channel::<()>(1); |
477 | |
478 | let _ = rx1.ready_chunks(0); |
479 | } |
480 | |
481 | #[test] |
482 | fn ready_chunks() { |
483 | let (mut tx, rx1) = mpsc::channel::<i32>(16); |
484 | |
485 | let mut s = rx1.ready_chunks(2); |
486 | |
487 | let mut cx = noop_context(); |
488 | assert!(s.next().poll_unpin(&mut cx).is_pending()); |
489 | |
490 | block_on(async { |
491 | tx.send(1).await.unwrap(); |
492 | |
493 | assert_eq!(s.next().await.unwrap(), vec![1]); |
494 | tx.send(2).await.unwrap(); |
495 | tx.send(3).await.unwrap(); |
496 | tx.send(4).await.unwrap(); |
497 | assert_eq!(s.next().await.unwrap(), vec![2, 3]); |
498 | assert_eq!(s.next().await.unwrap(), vec![4]); |
499 | }); |
500 | } |
501 | |
502 | struct SlowStream { |
503 | times_should_poll: usize, |
504 | times_polled: Rc<Cell<usize>>, |
505 | } |
506 | impl Stream for SlowStream { |
507 | type Item = usize; |
508 | |
509 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
510 | self.times_polled.set(self.times_polled.get() + 1); |
511 | if self.times_polled.get() % 2 == 0 { |
512 | cx.waker().wake_by_ref(); |
513 | return Poll::Pending; |
514 | } |
515 | if self.times_polled.get() >= self.times_should_poll { |
516 | return Poll::Ready(None); |
517 | } |
518 | Poll::Ready(Some(self.times_polled.get())) |
519 | } |
520 | } |
521 | |
522 | #[test] |
523 | fn select_with_strategy_doesnt_terminate_early() { |
524 | for side in [stream::PollNext::Left, stream::PollNext::Right] { |
525 | let times_should_poll = 10; |
526 | let count = Rc::new(Cell::new(0)); |
527 | let b = stream::iter([10, 20]); |
528 | |
529 | let mut selected = stream::select_with_strategy( |
530 | SlowStream { times_should_poll, times_polled: count.clone() }, |
531 | b, |
532 | |_: &mut ()| side, |
533 | ); |
534 | block_on(async move { while selected.next().await.is_some() {} }); |
535 | assert_eq!(count.get(), times_should_poll + 1); |
536 | } |
537 | } |
538 | |
539 | async fn is_even(number: u8) -> bool { |
540 | number % 2 == 0 |
541 | } |
542 | |
543 | #[test] |
544 | fn all() { |
545 | block_on(async { |
546 | let empty: [u8; 0] = []; |
547 | let st = stream::iter(empty); |
548 | let all = st.all(is_even).await; |
549 | assert!(all); |
550 | |
551 | let st = stream::iter([2, 4, 6, 8]); |
552 | let all = st.all(is_even).await; |
553 | assert!(all); |
554 | |
555 | let st = stream::iter([2, 3, 4]); |
556 | let all = st.all(is_even).await; |
557 | assert!(!all); |
558 | }); |
559 | } |
560 | |
561 | #[test] |
562 | fn any() { |
563 | block_on(async { |
564 | let empty: [u8; 0] = []; |
565 | let st = stream::iter(empty); |
566 | let any = st.any(is_even).await; |
567 | assert!(!any); |
568 | |
569 | let st = stream::iter([1, 2, 3]); |
570 | let any = st.any(is_even).await; |
571 | assert!(any); |
572 | |
573 | let st = stream::iter([1, 3, 5]); |
574 | let any = st.any(is_even).await; |
575 | assert!(!any); |
576 | }); |
577 | } |
578 | |