1 | #![warn (rust_2018_idioms)] |
2 | #![cfg (feature = "full" )] |
3 | |
4 | use std::pin::Pin; |
5 | use std::task::{Context, Poll}; |
6 | |
7 | use futures::{Stream, StreamExt}; |
8 | use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior}; |
9 | use tokio_test::{assert_pending, assert_ready_eq, task}; |
10 | |
11 | // Takes the `Interval` task, `start` variable, and optional time deltas |
12 | // For each time delta, it polls the `Interval` and asserts that the result is |
13 | // equal to `start` + the specific time delta. Then it asserts that the |
14 | // `Interval` is pending. |
15 | macro_rules! check_interval_poll { |
16 | ($i:ident, $start:ident, $($delta:expr),*$(,)?) => { |
17 | $( |
18 | assert_ready_eq!(poll_next(&mut $i), $start + ms($delta)); |
19 | )* |
20 | assert_pending!(poll_next(&mut $i)); |
21 | }; |
22 | ($i:ident, $start:ident) => { |
23 | check_interval_poll!($i, $start,); |
24 | }; |
25 | } |
26 | |
27 | #[tokio::test ] |
28 | #[should_panic ] |
29 | async fn interval_zero_duration() { |
30 | let _ = time::interval_at(Instant::now(), ms(0)); |
31 | } |
32 | |
33 | // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | |
34 | // Actual ticks: | work -----| delay | work | work | work -| work -----| |
35 | // Poll behavior: | | | | | | | | |
36 | // | | | | | | | | |
37 | // Ready(s) | | Ready(s + 2p) | | | | |
38 | // Pending | Ready(s + 3p) | | | |
39 | // Ready(s + p) Ready(s + 4p) | | |
40 | // Ready(s + 5p) | |
41 | // Ready(s + 6p) |
42 | #[tokio::test (start_paused = true)] |
43 | async fn burst() { |
44 | let start = Instant::now(); |
45 | |
46 | // This is necessary because the timer is only so granular, and in order for |
47 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
48 | // expect, so that the runtime will see that it is time to resolve the timer |
49 | time::advance(ms(1)).await; |
50 | |
51 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
52 | |
53 | check_interval_poll!(i, start, 0); |
54 | |
55 | time::advance(ms(100)).await; |
56 | check_interval_poll!(i, start); |
57 | |
58 | time::advance(ms(200)).await; |
59 | check_interval_poll!(i, start, 300); |
60 | |
61 | time::advance(ms(650)).await; |
62 | check_interval_poll!(i, start, 600, 900); |
63 | |
64 | time::advance(ms(200)).await; |
65 | check_interval_poll!(i, start); |
66 | |
67 | time::advance(ms(100)).await; |
68 | check_interval_poll!(i, start, 1200); |
69 | |
70 | time::advance(ms(250)).await; |
71 | check_interval_poll!(i, start, 1500); |
72 | |
73 | time::advance(ms(300)).await; |
74 | check_interval_poll!(i, start, 1800); |
75 | } |
76 | |
77 | // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | |
78 | // Actual ticks: | work -----| delay | work -----| work -----| work -----| |
79 | // Poll behavior: | | | | | | | | |
80 | // | | | | | | | | |
81 | // Ready(s) | | Ready(s + 2p) | | | | |
82 | // Pending | Pending | | | |
83 | // Ready(s + p) Ready(s + 2p + d) | | |
84 | // Ready(s + 3p + d) | |
85 | // Ready(s + 4p + d) |
86 | #[tokio::test (start_paused = true)] |
87 | async fn delay() { |
88 | let start = Instant::now(); |
89 | |
90 | // This is necessary because the timer is only so granular, and in order for |
91 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
92 | // expect, so that the runtime will see that it is time to resolve the timer |
93 | time::advance(ms(1)).await; |
94 | |
95 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
96 | i.set_missed_tick_behavior(MissedTickBehavior::Delay); |
97 | |
98 | check_interval_poll!(i, start, 0); |
99 | |
100 | time::advance(ms(100)).await; |
101 | check_interval_poll!(i, start); |
102 | |
103 | time::advance(ms(200)).await; |
104 | check_interval_poll!(i, start, 300); |
105 | |
106 | time::advance(ms(650)).await; |
107 | check_interval_poll!(i, start, 600); |
108 | |
109 | time::advance(ms(100)).await; |
110 | check_interval_poll!(i, start); |
111 | |
112 | // We have to add one here for the same reason as is above. |
113 | // Because `Interval` has reset its timer according to `Instant::now()`, |
114 | // we have to go forward 1 more millisecond than is expected so that the |
115 | // runtime realizes that it's time to resolve the timer. |
116 | time::advance(ms(201)).await; |
117 | // We add one because when using the `Delay` behavior, `Interval` |
118 | // adds the `period` from `Instant::now()`, which will always be off by one |
119 | // because we have to advance time by 1 (see above). |
120 | check_interval_poll!(i, start, 1251); |
121 | |
122 | time::advance(ms(300)).await; |
123 | // Again, we add one. |
124 | check_interval_poll!(i, start, 1551); |
125 | |
126 | time::advance(ms(300)).await; |
127 | check_interval_poll!(i, start, 1851); |
128 | } |
129 | |
130 | // Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 | |
131 | // Actual ticks: | work -----| delay | work ---| work -----| work -----| |
132 | // Poll behavior: | | | | | | | |
133 | // | | | | | | | |
134 | // Ready(s) | | Ready(s + 2p) | | | |
135 | // Pending | Ready(s + 4p) | | |
136 | // Ready(s + p) Ready(s + 5p) | |
137 | // Ready(s + 6p) |
138 | #[tokio::test (start_paused = true)] |
139 | async fn skip() { |
140 | let start = Instant::now(); |
141 | |
142 | // This is necessary because the timer is only so granular, and in order for |
143 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
144 | // expect, so that the runtime will see that it is time to resolve the timer |
145 | time::advance(ms(1)).await; |
146 | |
147 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
148 | i.set_missed_tick_behavior(MissedTickBehavior::Skip); |
149 | |
150 | check_interval_poll!(i, start, 0); |
151 | |
152 | time::advance(ms(100)).await; |
153 | check_interval_poll!(i, start); |
154 | |
155 | time::advance(ms(200)).await; |
156 | check_interval_poll!(i, start, 300); |
157 | |
158 | time::advance(ms(650)).await; |
159 | check_interval_poll!(i, start, 600); |
160 | |
161 | time::advance(ms(250)).await; |
162 | check_interval_poll!(i, start, 1200); |
163 | |
164 | time::advance(ms(300)).await; |
165 | check_interval_poll!(i, start, 1500); |
166 | |
167 | time::advance(ms(300)).await; |
168 | check_interval_poll!(i, start, 1800); |
169 | } |
170 | |
171 | #[tokio::test (start_paused = true)] |
172 | async fn reset() { |
173 | let start = Instant::now(); |
174 | |
175 | // This is necessary because the timer is only so granular, and in order for |
176 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
177 | // expect, so that the runtime will see that it is time to resolve the timer |
178 | time::advance(ms(1)).await; |
179 | |
180 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
181 | |
182 | check_interval_poll!(i, start, 0); |
183 | |
184 | time::advance(ms(100)).await; |
185 | check_interval_poll!(i, start); |
186 | |
187 | time::advance(ms(200)).await; |
188 | check_interval_poll!(i, start, 300); |
189 | |
190 | time::advance(ms(100)).await; |
191 | check_interval_poll!(i, start); |
192 | |
193 | i.reset(); |
194 | |
195 | time::advance(ms(250)).await; |
196 | check_interval_poll!(i, start); |
197 | |
198 | time::advance(ms(50)).await; |
199 | // We add one because when using `reset` method, `Interval` adds the |
200 | // `period` from `Instant::now()`, which will always be off by one |
201 | check_interval_poll!(i, start, 701); |
202 | |
203 | time::advance(ms(300)).await; |
204 | check_interval_poll!(i, start, 1001); |
205 | } |
206 | |
207 | #[tokio::test (start_paused = true)] |
208 | async fn reset_immediately() { |
209 | let start = Instant::now(); |
210 | |
211 | // This is necessary because the timer is only so granular, and in order for |
212 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
213 | // expect, so that the runtime will see that it is time to resolve the timer |
214 | time::advance(ms(1)).await; |
215 | |
216 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
217 | |
218 | check_interval_poll!(i, start, 0); |
219 | |
220 | time::advance(ms(100)).await; |
221 | check_interval_poll!(i, start); |
222 | |
223 | time::advance(ms(200)).await; |
224 | check_interval_poll!(i, start, 300); |
225 | |
226 | time::advance(ms(100)).await; |
227 | check_interval_poll!(i, start); |
228 | |
229 | i.reset_immediately(); |
230 | |
231 | // We add one because when using `reset` method, `Interval` adds the |
232 | // `period` from `Instant::now()`, which will always be off by one |
233 | check_interval_poll!(i, start, 401); |
234 | |
235 | time::advance(ms(100)).await; |
236 | check_interval_poll!(i, start); |
237 | |
238 | time::advance(ms(200)).await; |
239 | check_interval_poll!(i, start, 701); |
240 | } |
241 | |
242 | #[tokio::test (start_paused = true)] |
243 | async fn reset_after() { |
244 | let start = Instant::now(); |
245 | |
246 | // This is necessary because the timer is only so granular, and in order for |
247 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
248 | // expect, so that the runtime will see that it is time to resolve the timer |
249 | time::advance(ms(1)).await; |
250 | |
251 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
252 | |
253 | check_interval_poll!(i, start, 0); |
254 | |
255 | time::advance(ms(100)).await; |
256 | check_interval_poll!(i, start); |
257 | |
258 | time::advance(ms(200)).await; |
259 | check_interval_poll!(i, start, 300); |
260 | |
261 | time::advance(ms(100)).await; |
262 | check_interval_poll!(i, start); |
263 | |
264 | i.reset_after(Duration::from_millis(20)); |
265 | |
266 | // We add one because when using `reset` method, `Interval` adds the |
267 | // `period` from `Instant::now()`, which will always be off by one |
268 | time::advance(ms(20)).await; |
269 | check_interval_poll!(i, start, 421); |
270 | |
271 | time::advance(ms(100)).await; |
272 | check_interval_poll!(i, start); |
273 | |
274 | time::advance(ms(200)).await; |
275 | check_interval_poll!(i, start, 721); |
276 | } |
277 | |
278 | #[tokio::test (start_paused = true)] |
279 | async fn reset_at() { |
280 | let start = Instant::now(); |
281 | |
282 | // This is necessary because the timer is only so granular, and in order for |
283 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
284 | // expect, so that the runtime will see that it is time to resolve the timer |
285 | time::advance(ms(1)).await; |
286 | |
287 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
288 | |
289 | check_interval_poll!(i, start, 0); |
290 | |
291 | time::advance(ms(100)).await; |
292 | check_interval_poll!(i, start); |
293 | |
294 | time::advance(ms(200)).await; |
295 | check_interval_poll!(i, start, 300); |
296 | |
297 | time::advance(ms(100)).await; |
298 | check_interval_poll!(i, start); |
299 | |
300 | i.reset_at(Instant::now() + Duration::from_millis(40)); |
301 | |
302 | // We add one because when using `reset` method, `Interval` adds the |
303 | // `period` from `Instant::now()`, which will always be off by one |
304 | time::advance(ms(40)).await; |
305 | check_interval_poll!(i, start, 441); |
306 | |
307 | time::advance(ms(100)).await; |
308 | check_interval_poll!(i, start); |
309 | |
310 | time::advance(ms(200)).await; |
311 | check_interval_poll!(i, start, 741); |
312 | } |
313 | |
314 | #[tokio::test (start_paused = true)] |
315 | async fn reset_at_bigger_than_interval() { |
316 | let start = Instant::now(); |
317 | |
318 | // This is necessary because the timer is only so granular, and in order for |
319 | // all our ticks to resolve, the time needs to be 1ms ahead of what we |
320 | // expect, so that the runtime will see that it is time to resolve the timer |
321 | time::advance(ms(1)).await; |
322 | |
323 | let mut i = task::spawn(time::interval_at(start, ms(300))); |
324 | |
325 | check_interval_poll!(i, start, 0); |
326 | |
327 | time::advance(ms(100)).await; |
328 | check_interval_poll!(i, start); |
329 | |
330 | time::advance(ms(200)).await; |
331 | check_interval_poll!(i, start, 300); |
332 | |
333 | time::advance(ms(100)).await; |
334 | check_interval_poll!(i, start); |
335 | |
336 | i.reset_at(Instant::now() + Duration::from_millis(1000)); |
337 | |
338 | // Validate the interval does not tick until 1000ms have passed |
339 | time::advance(ms(300)).await; |
340 | check_interval_poll!(i, start); |
341 | time::advance(ms(300)).await; |
342 | check_interval_poll!(i, start); |
343 | time::advance(ms(300)).await; |
344 | check_interval_poll!(i, start); |
345 | |
346 | // We add one because when using `reset` method, `Interval` adds the |
347 | // `period` from `Instant::now()`, which will always be off by one |
348 | time::advance(ms(100)).await; |
349 | check_interval_poll!(i, start, 1401); |
350 | |
351 | time::advance(ms(300)).await; |
352 | check_interval_poll!(i, start, 1701); |
353 | } |
354 | |
355 | fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> { |
356 | interval.enter(|cx, mut interval| interval.poll_tick(cx)) |
357 | } |
358 | |
359 | fn ms(n: u64) -> Duration { |
360 | Duration::from_millis(n) |
361 | } |
362 | |
363 | /// Helper struct to test the [tokio::time::Interval::poll_tick()] method. |
364 | /// |
365 | /// `poll_tick()` should register the waker in the context only if it returns |
366 | /// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an |
367 | /// interval timer and counts up on every tick when used as stream. When the |
368 | /// counter is a multiple of four, it yields the current counter value. |
369 | /// Depending on the value for `wake_on_pending`, it will reschedule itself when |
370 | /// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`, |
371 | /// we expect that the stream stalls because the timer will **not** reschedule |
372 | /// the next wake-up itself once it returned `Poll::Ready`. |
373 | struct IntervalStreamer { |
374 | counter: u32, |
375 | timer: Interval, |
376 | wake_on_pending: bool, |
377 | } |
378 | |
379 | impl Stream for IntervalStreamer { |
380 | type Item = u32; |
381 | |
382 | fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |
383 | let this = Pin::into_inner(self); |
384 | |
385 | if this.counter > 12 { |
386 | return Poll::Ready(None); |
387 | } |
388 | |
389 | match this.timer.poll_tick(cx) { |
390 | Poll::Pending => Poll::Pending, |
391 | Poll::Ready(_) => { |
392 | this.counter += 1; |
393 | if this.counter % 4 == 0 { |
394 | Poll::Ready(Some(this.counter)) |
395 | } else { |
396 | if this.wake_on_pending { |
397 | // Schedule this task for wake-up |
398 | cx.waker().wake_by_ref(); |
399 | } |
400 | Poll::Pending |
401 | } |
402 | } |
403 | } |
404 | } |
405 | } |
406 | |
407 | #[tokio::test (start_paused = true)] |
408 | async fn stream_with_interval_poll_tick_self_waking() { |
409 | let stream = IntervalStreamer { |
410 | counter: 0, |
411 | timer: tokio::time::interval(tokio::time::Duration::from_millis(10)), |
412 | wake_on_pending: true, |
413 | }; |
414 | |
415 | let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12); |
416 | |
417 | // Wrap task in timeout so that it will finish eventually even if the stream |
418 | // stalls. |
419 | tokio::spawn(tokio::time::timeout( |
420 | tokio::time::Duration::from_millis(150), |
421 | async move { |
422 | tokio::pin!(stream); |
423 | |
424 | while let Some(item) = stream.next().await { |
425 | res_tx.send(item).await.ok(); |
426 | } |
427 | }, |
428 | )); |
429 | |
430 | let mut items = Vec::with_capacity(3); |
431 | while let Some(result) = res_rx.recv().await { |
432 | items.push(result); |
433 | } |
434 | |
435 | // We expect the stream to yield normally and thus three items. |
436 | assert_eq!(items, vec![4, 8, 12]); |
437 | } |
438 | |
439 | #[tokio::test (start_paused = true)] |
440 | async fn stream_with_interval_poll_tick_no_waking() { |
441 | let stream = IntervalStreamer { |
442 | counter: 0, |
443 | timer: tokio::time::interval(tokio::time::Duration::from_millis(10)), |
444 | wake_on_pending: false, |
445 | }; |
446 | |
447 | let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12); |
448 | |
449 | // Wrap task in timeout so that it will finish eventually even if the stream |
450 | // stalls. |
451 | tokio::spawn(tokio::time::timeout( |
452 | tokio::time::Duration::from_millis(150), |
453 | async move { |
454 | tokio::pin!(stream); |
455 | |
456 | while let Some(item) = stream.next().await { |
457 | res_tx.send(item).await.ok(); |
458 | } |
459 | }, |
460 | )); |
461 | |
462 | let mut items = Vec::with_capacity(0); |
463 | while let Some(result) = res_rx.recv().await { |
464 | items.push(result); |
465 | } |
466 | |
467 | // We expect the stream to stall because it does not reschedule itself on |
468 | // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the |
469 | // task when returning `Poll::Ready`. |
470 | assert_eq!(items, vec![]); |
471 | } |
472 | |