1#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
2
3use tokio::time::{self, sleep, Duration};
4use tokio_stream::{self, StreamExt};
5use tokio_test::*;
6
7use futures::stream;
8
9async fn maybe_sleep(idx: i32) -> i32 {
10 if idx % 2 == 0 {
11 sleep(ms(200)).await;
12 }
13 idx
14}
15
16fn ms(n: u64) -> Duration {
17 Duration::from_millis(n)
18}
19
20#[tokio::test]
21async fn basic_usage() {
22 time::pause();
23
24 // Items 2 and 4 time out. If we run the stream until it completes,
25 // we end up with the following items:
26 //
27 // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]
28
29 let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100));
30 let mut stream = task::spawn(stream);
31
32 // First item completes immediately
33 assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
34
35 // Second item is delayed 200ms, times out after 100ms
36 assert_pending!(stream.poll_next());
37
38 time::advance(ms(150)).await;
39 let v = assert_ready!(stream.poll_next());
40 assert!(v.unwrap().is_err());
41
42 assert_pending!(stream.poll_next());
43
44 time::advance(ms(100)).await;
45 assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
46
47 // Third item is ready immediately
48 assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
49
50 // Fourth item is delayed 200ms, times out after 100ms
51 assert_pending!(stream.poll_next());
52
53 time::advance(ms(60)).await;
54 assert_pending!(stream.poll_next()); // nothing ready yet
55
56 time::advance(ms(60)).await;
57 let v = assert_ready!(stream.poll_next());
58 assert!(v.unwrap().is_err()); // timeout!
59
60 time::advance(ms(120)).await;
61 assert_ready_eq!(stream.poll_next(), Some(Ok(4)));
62
63 // Done.
64 assert_ready_eq!(stream.poll_next(), None);
65}
66
67#[tokio::test]
68async fn return_elapsed_errors_only_once() {
69 time::pause();
70
71 let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50));
72 let mut stream = task::spawn(stream);
73
74 // First item completes immediately
75 assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
76
77 // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
78 // error is returned.
79 assert_pending!(stream.poll_next());
80 //
81 time::advance(ms(51)).await;
82 let v = assert_ready!(stream.poll_next());
83 assert!(v.unwrap().is_err()); // timeout!
84
85 // deadline elapses again, but no error is returned
86 time::advance(ms(50)).await;
87 assert_pending!(stream.poll_next());
88
89 time::advance(ms(100)).await;
90 assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
91 assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
92
93 // Done
94 assert_ready_eq!(stream.poll_next(), None);
95}
96
97#[tokio::test]
98async fn no_timeouts() {
99 let stream = stream::iter(vec![1, 3, 5])
100 .then(maybe_sleep)
101 .timeout(ms(100));
102
103 let mut stream = task::spawn(stream);
104
105 assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
106 assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
107 assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
108 assert_ready_eq!(stream.poll_next(), None);
109}
110