1use std::cell::Cell;
2use std::iter;
3use std::pin::Pin;
4use std::rc::Rc;
5use std::sync::Arc;
6use std::task::Context;
7
8use futures::channel::mpsc;
9use futures::executor::block_on;
10use futures::future::{self, Future};
11use futures::lock::Mutex;
12use futures::sink::SinkExt;
13use futures::stream::{self, StreamExt};
14use futures::task::Poll;
15use futures::{ready, FutureExt};
16use futures_core::Stream;
17use futures_executor::ThreadPool;
18use futures_test::task::noop_context;
19
20#[test]
21fn select() {
22 fn select_and_compare(a: Vec<u32>, b: Vec<u32>, expected: Vec<u32>) {
23 let a = stream::iter(a);
24 let b = stream::iter(b);
25 let vec = block_on(stream::select(a, b).collect::<Vec<_>>());
26 assert_eq!(vec, expected);
27 }
28
29 select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]);
30 select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]);
31 select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]);
32}
33
34#[test]
35fn flat_map() {
36 block_on(async {
37 let st =
38 stream::iter(vec![stream::iter(0..=4u8), stream::iter(6..=10), stream::iter(0..=2)]);
39
40 let values: Vec<_> =
41 st.flat_map(|s| s.filter(|v| futures::future::ready(v % 2 == 0))).collect().await;
42
43 assert_eq!(values, vec![0, 2, 4, 6, 8, 10, 0, 2]);
44 });
45}
46
47#[test]
48fn scan() {
49 block_on(async {
50 let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2])
51 .scan(1, |state, e| {
52 *state += 1;
53 futures::future::ready(if e < *state { Some(e) } else { None })
54 })
55 .collect::<Vec<_>>()
56 .await;
57
58 assert_eq!(values, vec![1u8, 2, 3, 4]);
59 });
60}
61
62#[test]
63fn flatten_unordered() {
64 use futures::executor::block_on;
65 use futures::stream::*;
66 use futures::task::*;
67 use std::convert::identity;
68 use std::pin::Pin;
69 use std::sync::atomic::{AtomicBool, Ordering};
70 use std::thread;
71 use std::time::Duration;
72
73 struct DataStream {
74 data: Vec<u8>,
75 polled: bool,
76 wake_immediately: bool,
77 }
78
79 impl Stream for DataStream {
80 type Item = u8;
81
82 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
83 if !self.polled {
84 if !self.wake_immediately {
85 let waker = ctx.waker().clone();
86 let sleep_time =
87 Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
88 thread::spawn(move || {
89 thread::sleep(sleep_time);
90 waker.wake_by_ref();
91 });
92 } else {
93 ctx.waker().wake_by_ref();
94 }
95 self.polled = true;
96 Poll::Pending
97 } else {
98 self.polled = false;
99 Poll::Ready(self.data.pop())
100 }
101 }
102 }
103
104 struct Interchanger {
105 polled: bool,
106 base: u8,
107 wake_immediately: bool,
108 }
109
110 impl Stream for Interchanger {
111 type Item = DataStream;
112
113 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
114 if !self.polled {
115 self.polled = true;
116 if !self.wake_immediately {
117 let waker = ctx.waker().clone();
118 let sleep_time = Duration::from_millis(self.base as u64);
119 thread::spawn(move || {
120 thread::sleep(sleep_time);
121 waker.wake_by_ref();
122 });
123 } else {
124 ctx.waker().wake_by_ref();
125 }
126 Poll::Pending
127 } else {
128 let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
129 self.base += 1;
130 self.polled = false;
131 Poll::Ready(Some(DataStream {
132 polled: false,
133 data,
134 wake_immediately: self.wake_immediately && self.base % 2 == 0,
135 }))
136 }
137 }
138 }
139
140 // basic behaviour
141 {
142 block_on(async {
143 let st = stream::iter(vec![
144 stream::iter(0..=4u8),
145 stream::iter(6..=10),
146 stream::iter(10..=12),
147 ]);
148
149 let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
150
151 assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
152 });
153
154 block_on(async {
155 let st = stream::iter(vec![
156 stream::iter(0..=4u8),
157 stream::iter(6..=10),
158 stream::iter(0..=2),
159 ]);
160
161 let mut fm_unordered = st
162 .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
163 .collect::<Vec<_>>()
164 .await;
165
166 fm_unordered.sort_unstable();
167
168 assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
169 });
170 }
171
172 // wake up immediately
173 {
174 block_on(async {
175 let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
176 .take(10)
177 .map(|s| s.map(identity))
178 .flatten_unordered(10)
179 .collect::<Vec<_>>()
180 .await;
181
182 fl_unordered.sort_unstable();
183
184 assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
185 });
186
187 block_on(async {
188 let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
189 .take(10)
190 .flat_map_unordered(10, |s| s.map(identity))
191 .collect::<Vec<_>>()
192 .await;
193
194 fm_unordered.sort_unstable();
195
196 assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
197 });
198 }
199
200 // wake up after delay
201 {
202 block_on(async {
203 let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
204 .take(10)
205 .map(|s| s.map(identity))
206 .flatten_unordered(10)
207 .collect::<Vec<_>>()
208 .await;
209
210 fl_unordered.sort_unstable();
211
212 assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
213 });
214
215 block_on(async {
216 let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
217 .take(10)
218 .flat_map_unordered(10, |s| s.map(identity))
219 .collect::<Vec<_>>()
220 .await;
221
222 fm_unordered.sort_unstable();
223
224 assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
225 });
226
227 block_on(async {
228 let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
229 Interchanger { polled: false, base: 0, wake_immediately: false }
230 .take(10)
231 .flat_map_unordered(10, |s| s.map(identity))
232 .collect::<Vec<_>>(),
233 Interchanger { polled: false, base: 0, wake_immediately: false }
234 .take(10)
235 .map(|s| s.map(identity))
236 .flatten_unordered(10)
237 .collect::<Vec<_>>()
238 );
239
240 fm_unordered.sort_unstable();
241 fl_unordered.sort_unstable();
242
243 assert_eq!(fm_unordered, fl_unordered);
244 assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
245 });
246 }
247
248 // waker panics
249 {
250 let stream = Arc::new(Mutex::new(
251 Interchanger { polled: false, base: 0, wake_immediately: true }
252 .take(10)
253 .flat_map_unordered(10, |s| s.map(identity)),
254 ));
255
256 struct PanicWaker;
257
258 impl ArcWake for PanicWaker {
259 fn wake_by_ref(_arc_self: &Arc<Self>) {
260 panic!("WAKE UP");
261 }
262 }
263
264 std::thread::spawn({
265 let stream = stream.clone();
266 move || {
267 let mut st = poll_fn(|cx| {
268 let mut lock = ready!(stream.lock().poll_unpin(cx));
269
270 let panic_waker = waker(Arc::new(PanicWaker));
271 let mut panic_cx = Context::from_waker(&panic_waker);
272 let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
273
274 Poll::Ready(Some(()))
275 });
276
277 block_on(st.next())
278 }
279 })
280 .join()
281 .unwrap_err();
282
283 block_on(async move {
284 let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
285 values.sort_unstable();
286
287 assert_eq!(values, (0..60).collect::<Vec<u8>>());
288 });
289 }
290
291 // stream panics
292 {
293 let st = stream::iter(iter::once(
294 once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
295 ))
296 .chain(
297 Interchanger { polled: false, base: 0, wake_immediately: true }
298 .map(|stream| stream.right_stream())
299 .take(10),
300 );
301
302 let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
303
304 std::thread::spawn({
305 let stream = stream.clone();
306 move || {
307 let mut st = poll_fn(|cx| {
308 let mut lock = ready!(stream.lock().poll_unpin(cx));
309 let data = ready!(lock.poll_next_unpin(cx));
310
311 Poll::Ready(data)
312 });
313
314 block_on(st.next())
315 }
316 })
317 .join()
318 .unwrap_err();
319
320 block_on(async move {
321 let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
322 values.sort_unstable();
323
324 assert_eq!(values, (0..60).collect::<Vec<u8>>());
325 });
326 }
327
328 fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
329 let ready = Arc::new(AtomicBool::new(false));
330 let mut spawned = false;
331
332 future::poll_fn(move |cx| {
333 if !spawned {
334 let waker = cx.waker().clone();
335 let ready = ready.clone();
336
337 std::thread::spawn(move || {
338 std::thread::sleep(time);
339 ready.store(true, Ordering::Release);
340
341 waker.wake_by_ref()
342 });
343 spawned = true;
344 }
345
346 if ready.load(Ordering::Acquire) {
347 Poll::Ready(value.clone())
348 } else {
349 Poll::Pending
350 }
351 })
352 }
353
354 fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
355 where
356 S::Item: Clone,
357 {
358 let inner = st
359 .then(|item| timeout(Duration::from_millis(50), item))
360 .enumerate()
361 .map(|(idx, value)| {
362 stream::once(if idx % 2 == 0 {
363 future::ready(value).left_future()
364 } else {
365 timeout(Duration::from_millis(100), value).right_future()
366 })
367 })
368 .flatten_unordered(None);
369
370 stream::once(future::ready(inner)).flatten_unordered(None)
371 }
372
373 // nested `flatten_unordered`
374 let te = ThreadPool::new().unwrap();
375 let base_handle = te
376 .spawn_with_handle(async move {
377 let fu = build_nested_fu(stream::iter(1..=10));
378
379 assert_eq!(fu.count().await, 10);
380 })
381 .unwrap();
382
383 block_on(base_handle);
384
385 let empty_state_move_handle = te
386 .spawn_with_handle(async move {
387 let mut fu = build_nested_fu(stream::iter(1..10));
388 {
389 let mut cx = noop_context();
390 let _ = fu.poll_next_unpin(&mut cx);
391 let _ = fu.poll_next_unpin(&mut cx);
392 }
393
394 assert_eq!(fu.count().await, 9);
395 })
396 .unwrap();
397
398 block_on(empty_state_move_handle);
399}
400
401#[test]
402fn take_until() {
403 fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
404 let mut i = 0;
405 future::poll_fn(move |_cx| {
406 i += 1;
407 if i <= stop_on {
408 Poll::Pending
409 } else {
410 Poll::Ready(())
411 }
412 })
413 }
414
415 block_on(async {
416 // Verify stopping works:
417 let stream = stream::iter(1u32..=10);
418 let stop_fut = make_stop_fut(5);
419
420 let stream = stream.take_until(stop_fut);
421 let last = stream.fold(0, |_, i| async move { i }).await;
422 assert_eq!(last, 5);
423
424 // Verify take_future() works:
425 let stream = stream::iter(1..=10);
426 let stop_fut = make_stop_fut(5);
427
428 let mut stream = stream.take_until(stop_fut);
429
430 assert_eq!(stream.next().await, Some(1));
431 assert_eq!(stream.next().await, Some(2));
432
433 stream.take_future();
434
435 let last = stream.fold(0, |_, i| async move { i }).await;
436 assert_eq!(last, 10);
437
438 // Verify take_future() returns None if stream is stopped:
439 let stream = stream::iter(1u32..=10);
440 let stop_fut = make_stop_fut(1);
441 let mut stream = stream.take_until(stop_fut);
442 assert_eq!(stream.next().await, Some(1));
443 assert_eq!(stream.next().await, None);
444 assert!(stream.take_future().is_none());
445
446 // Verify TakeUntil is fused:
447 let mut i = 0;
448 let stream = stream::poll_fn(move |_cx| {
449 i += 1;
450 match i {
451 1 => Poll::Ready(Some(1)),
452 2 => Poll::Ready(None),
453 _ => panic!("TakeUntil not fused"),
454 }
455 });
456
457 let stop_fut = make_stop_fut(1);
458 let mut stream = stream.take_until(stop_fut);
459 assert_eq!(stream.next().await, Some(1));
460 assert_eq!(stream.next().await, None);
461 assert_eq!(stream.next().await, None);
462 });
463}
464
465#[test]
466#[should_panic]
467fn chunks_panic_on_cap_zero() {
468 let (_, rx1) = mpsc::channel::<()>(1);
469
470 let _ = rx1.chunks(0);
471}
472
473#[test]
474#[should_panic]
475fn ready_chunks_panic_on_cap_zero() {
476 let (_, rx1) = mpsc::channel::<()>(1);
477
478 let _ = rx1.ready_chunks(0);
479}
480
481#[test]
482fn ready_chunks() {
483 let (mut tx, rx1) = mpsc::channel::<i32>(16);
484
485 let mut s = rx1.ready_chunks(2);
486
487 let mut cx = noop_context();
488 assert!(s.next().poll_unpin(&mut cx).is_pending());
489
490 block_on(async {
491 tx.send(1).await.unwrap();
492
493 assert_eq!(s.next().await.unwrap(), vec![1]);
494 tx.send(2).await.unwrap();
495 tx.send(3).await.unwrap();
496 tx.send(4).await.unwrap();
497 assert_eq!(s.next().await.unwrap(), vec![2, 3]);
498 assert_eq!(s.next().await.unwrap(), vec![4]);
499 });
500}
501
502struct SlowStream {
503 times_should_poll: usize,
504 times_polled: Rc<Cell<usize>>,
505}
506impl Stream for SlowStream {
507 type Item = usize;
508
509 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
510 self.times_polled.set(self.times_polled.get() + 1);
511 if self.times_polled.get() % 2 == 0 {
512 cx.waker().wake_by_ref();
513 return Poll::Pending;
514 }
515 if self.times_polled.get() >= self.times_should_poll {
516 return Poll::Ready(None);
517 }
518 Poll::Ready(Some(self.times_polled.get()))
519 }
520}
521
522#[test]
523fn select_with_strategy_doesnt_terminate_early() {
524 for side in [stream::PollNext::Left, stream::PollNext::Right] {
525 let times_should_poll = 10;
526 let count = Rc::new(Cell::new(0));
527 let b = stream::iter([10, 20]);
528
529 let mut selected = stream::select_with_strategy(
530 SlowStream { times_should_poll, times_polled: count.clone() },
531 b,
532 |_: &mut ()| side,
533 );
534 block_on(async move { while selected.next().await.is_some() {} });
535 assert_eq!(count.get(), times_should_poll + 1);
536 }
537}
538
539async fn is_even(number: u8) -> bool {
540 number % 2 == 0
541}
542
543#[test]
544fn all() {
545 block_on(async {
546 let empty: [u8; 0] = [];
547 let st = stream::iter(empty);
548 let all = st.all(is_even).await;
549 assert!(all);
550
551 let st = stream::iter([2, 4, 6, 8]);
552 let all = st.all(is_even).await;
553 assert!(all);
554
555 let st = stream::iter([2, 3, 4]);
556 let all = st.all(is_even).await;
557 assert!(!all);
558 });
559}
560
561#[test]
562fn any() {
563 block_on(async {
564 let empty: [u8; 0] = [];
565 let st = stream::iter(empty);
566 let any = st.any(is_even).await;
567 assert!(!any);
568
569 let st = stream::iter([1, 2, 3]);
570 let any = st.any(is_even).await;
571 assert!(any);
572
573 let st = stream::iter([1, 3, 5]);
574 let any = st.any(is_even).await;
575 assert!(!any);
576 });
577}
578