1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (all(feature = "time" , feature = "sync" , feature = "io-util" ))] |
3 | |
4 | use tokio::time; |
5 | use tokio_stream::{self as stream, StreamExt}; |
6 | use tokio_test::assert_pending; |
7 | use tokio_test::task; |
8 | |
9 | use futures::FutureExt; |
10 | use std::time::Duration; |
11 | |
12 | #[tokio::test (start_paused = true)] |
13 | async fn usage() { |
14 | let iter = vec![1, 2, 3].into_iter(); |
15 | let stream0 = stream::iter(iter); |
16 | |
17 | let iter = vec![4].into_iter(); |
18 | let stream1 = |
19 | stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); |
20 | |
21 | let chunk_stream = stream0 |
22 | .chain(stream1) |
23 | .chunks_timeout(4, Duration::from_secs(2)); |
24 | |
25 | let mut chunk_stream = task::spawn(chunk_stream); |
26 | |
27 | assert_pending!(chunk_stream.poll_next()); |
28 | time::advance(Duration::from_secs(2)).await; |
29 | assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); |
30 | |
31 | assert_pending!(chunk_stream.poll_next()); |
32 | time::advance(Duration::from_secs(2)).await; |
33 | assert_eq!(chunk_stream.next().await, Some(vec![4])); |
34 | } |
35 | |
36 | #[tokio::test (start_paused = true)] |
37 | async fn full_chunk_with_timeout() { |
38 | let iter = vec![1, 2].into_iter(); |
39 | let stream0 = stream::iter(iter); |
40 | |
41 | let iter = vec![3].into_iter(); |
42 | let stream1 = |
43 | stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n)); |
44 | |
45 | let iter = vec![4].into_iter(); |
46 | let stream2 = |
47 | stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n)); |
48 | |
49 | let chunk_stream = stream0 |
50 | .chain(stream1) |
51 | .chain(stream2) |
52 | .chunks_timeout(3, Duration::from_secs(2)); |
53 | |
54 | let mut chunk_stream = task::spawn(chunk_stream); |
55 | |
56 | assert_pending!(chunk_stream.poll_next()); |
57 | time::advance(Duration::from_secs(2)).await; |
58 | assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); |
59 | |
60 | assert_pending!(chunk_stream.poll_next()); |
61 | time::advance(Duration::from_secs(2)).await; |
62 | assert_eq!(chunk_stream.next().await, Some(vec![4])); |
63 | } |
64 | |
65 | #[tokio::test ] |
66 | #[ignore ] |
67 | async fn real_time() { |
68 | let iter = vec![1, 2, 3, 4].into_iter(); |
69 | let stream0 = stream::iter(iter); |
70 | |
71 | let iter = vec![5].into_iter(); |
72 | let stream1 = |
73 | stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n)); |
74 | |
75 | let chunk_stream = stream0 |
76 | .chain(stream1) |
77 | .chunks_timeout(3, Duration::from_secs(2)); |
78 | |
79 | let mut chunk_stream = task::spawn(chunk_stream); |
80 | |
81 | assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3])); |
82 | assert_eq!(chunk_stream.next().await, Some(vec![4])); |
83 | assert_eq!(chunk_stream.next().await, Some(vec![5])); |
84 | } |
85 | |