1 | #![cfg (all(feature = "time" , feature = "sync" , feature = "io-util" ))] |
2 | |
3 | use tokio::time::{self, sleep, Duration}; |
4 | use tokio_stream::{self, StreamExt}; |
5 | use tokio_test::*; |
6 | |
7 | use futures::stream; |
8 | |
9 | async fn maybe_sleep(idx: i32) -> i32 { |
10 | if idx % 2 == 0 { |
11 | sleep(ms(200)).await; |
12 | } |
13 | idx |
14 | } |
15 | |
16 | fn ms(n: u64) -> Duration { |
17 | Duration::from_millis(n) |
18 | } |
19 | |
20 | #[tokio::test ] |
21 | async fn basic_usage() { |
22 | time::pause(); |
23 | |
24 | // Items 2 and 4 time out. If we run the stream until it completes, |
25 | // we end up with the following items: |
26 | // |
27 | // [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)] |
28 | |
29 | let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100)); |
30 | let mut stream = task::spawn(stream); |
31 | |
32 | // First item completes immediately |
33 | assert_ready_eq!(stream.poll_next(), Some(Ok(1))); |
34 | |
35 | // Second item is delayed 200ms, times out after 100ms |
36 | assert_pending!(stream.poll_next()); |
37 | |
38 | time::advance(ms(150)).await; |
39 | let v = assert_ready!(stream.poll_next()); |
40 | assert!(v.unwrap().is_err()); |
41 | |
42 | assert_pending!(stream.poll_next()); |
43 | |
44 | time::advance(ms(100)).await; |
45 | assert_ready_eq!(stream.poll_next(), Some(Ok(2))); |
46 | |
47 | // Third item is ready immediately |
48 | assert_ready_eq!(stream.poll_next(), Some(Ok(3))); |
49 | |
50 | // Fourth item is delayed 200ms, times out after 100ms |
51 | assert_pending!(stream.poll_next()); |
52 | |
53 | time::advance(ms(60)).await; |
54 | assert_pending!(stream.poll_next()); // nothing ready yet |
55 | |
56 | time::advance(ms(60)).await; |
57 | let v = assert_ready!(stream.poll_next()); |
58 | assert!(v.unwrap().is_err()); // timeout! |
59 | |
60 | time::advance(ms(120)).await; |
61 | assert_ready_eq!(stream.poll_next(), Some(Ok(4))); |
62 | |
63 | // Done. |
64 | assert_ready_eq!(stream.poll_next(), None); |
65 | } |
66 | |
67 | #[tokio::test ] |
68 | async fn return_elapsed_errors_only_once() { |
69 | time::pause(); |
70 | |
71 | let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50)); |
72 | let mut stream = task::spawn(stream); |
73 | |
74 | // First item completes immediately |
75 | assert_ready_eq!(stream.poll_next(), Some(Ok(1))); |
76 | |
77 | // Second item is delayed 200ms, times out after 50ms. Only one `Elapsed` |
78 | // error is returned. |
79 | assert_pending!(stream.poll_next()); |
80 | // |
81 | time::advance(ms(51)).await; |
82 | let v = assert_ready!(stream.poll_next()); |
83 | assert!(v.unwrap().is_err()); // timeout! |
84 | |
85 | // deadline elapses again, but no error is returned |
86 | time::advance(ms(50)).await; |
87 | assert_pending!(stream.poll_next()); |
88 | |
89 | time::advance(ms(100)).await; |
90 | assert_ready_eq!(stream.poll_next(), Some(Ok(2))); |
91 | assert_ready_eq!(stream.poll_next(), Some(Ok(3))); |
92 | |
93 | // Done |
94 | assert_ready_eq!(stream.poll_next(), None); |
95 | } |
96 | |
97 | #[tokio::test ] |
98 | async fn no_timeouts() { |
99 | let stream = stream::iter(vec![1, 3, 5]) |
100 | .then(maybe_sleep) |
101 | .timeout(ms(100)); |
102 | |
103 | let mut stream = task::spawn(stream); |
104 | |
105 | assert_ready_eq!(stream.poll_next(), Some(Ok(1))); |
106 | assert_ready_eq!(stream.poll_next(), Some(Ok(3))); |
107 | assert_ready_eq!(stream.poll_next(), Some(Ok(5))); |
108 | assert_ready_eq!(stream.poll_next(), None); |
109 | } |
110 | |