1#![warn(rust_2018_idioms)]
2#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
3
4use tokio::time;
5use tokio_stream::{self as stream, StreamExt};
6use tokio_test::assert_pending;
7use tokio_test::task;
8
9use futures::FutureExt;
10use std::time::Duration;
11
12#[tokio::test(start_paused = true)]
13async fn usage() {
14 let iter = vec![1, 2, 3].into_iter();
15 let stream0 = stream::iter(iter);
16
17 let iter = vec![4].into_iter();
18 let stream1 =
19 stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
20
21 let chunk_stream = stream0
22 .chain(stream1)
23 .chunks_timeout(4, Duration::from_secs(2));
24
25 let mut chunk_stream = task::spawn(chunk_stream);
26
27 assert_pending!(chunk_stream.poll_next());
28 time::advance(Duration::from_secs(2)).await;
29 assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
30
31 assert_pending!(chunk_stream.poll_next());
32 time::advance(Duration::from_secs(2)).await;
33 assert_eq!(chunk_stream.next().await, Some(vec![4]));
34}
35
36#[tokio::test(start_paused = true)]
37async fn full_chunk_with_timeout() {
38 let iter = vec![1, 2].into_iter();
39 let stream0 = stream::iter(iter);
40
41 let iter = vec![3].into_iter();
42 let stream1 =
43 stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
44
45 let iter = vec![4].into_iter();
46 let stream2 =
47 stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
48
49 let chunk_stream = stream0
50 .chain(stream1)
51 .chain(stream2)
52 .chunks_timeout(3, Duration::from_secs(2));
53
54 let mut chunk_stream = task::spawn(chunk_stream);
55
56 assert_pending!(chunk_stream.poll_next());
57 time::advance(Duration::from_secs(2)).await;
58 assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
59
60 assert_pending!(chunk_stream.poll_next());
61 time::advance(Duration::from_secs(2)).await;
62 assert_eq!(chunk_stream.next().await, Some(vec![4]));
63}
64
65#[tokio::test]
66#[ignore]
67async fn real_time() {
68 let iter = vec![1, 2, 3, 4].into_iter();
69 let stream0 = stream::iter(iter);
70
71 let iter = vec![5].into_iter();
72 let stream1 =
73 stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
74
75 let chunk_stream = stream0
76 .chain(stream1)
77 .chunks_timeout(3, Duration::from_secs(2));
78
79 let mut chunk_stream = task::spawn(chunk_stream);
80
81 assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
82 assert_eq!(chunk_stream.next().await, Some(vec![4]));
83 assert_eq!(chunk_stream.next().await, Some(vec![5]));
84}
85