1#![warn(rust_2018_idioms)]
2#![cfg(feature = "full")]
3
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures::{Stream, StreamExt};
8use tokio::time::{self, Duration, Instant, Interval, MissedTickBehavior};
9use tokio_test::{assert_pending, assert_ready_eq, task};
10
11// Takes the `Interval` task, `start` variable, and optional time deltas
12// For each time delta, it polls the `Interval` and asserts that the result is
13// equal to `start` + the specific time delta. Then it asserts that the
14// `Interval` is pending.
15macro_rules! check_interval_poll {
16 ($i:ident, $start:ident, $($delta:expr),*$(,)?) => {
17 $(
18 assert_ready_eq!(poll_next(&mut $i), $start + ms($delta));
19 )*
20 assert_pending!(poll_next(&mut $i));
21 };
22 ($i:ident, $start:ident) => {
23 check_interval_poll!($i, $start,);
24 };
25}
26
27#[tokio::test]
28#[should_panic]
29async fn interval_zero_duration() {
30 let _ = time::interval_at(Instant::now(), ms(0));
31}
32
33// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
34// Actual ticks: | work -----| delay | work | work | work -| work -----|
35// Poll behavior: | | | | | | | |
36// | | | | | | | |
37// Ready(s) | | Ready(s + 2p) | | | |
38// Pending | Ready(s + 3p) | | |
39// Ready(s + p) Ready(s + 4p) | |
40// Ready(s + 5p) |
41// Ready(s + 6p)
42#[tokio::test(start_paused = true)]
43async fn burst() {
44 let start = Instant::now();
45
46 // This is necessary because the timer is only so granular, and in order for
47 // all our ticks to resolve, the time needs to be 1ms ahead of what we
48 // expect, so that the runtime will see that it is time to resolve the timer
49 time::advance(ms(1)).await;
50
51 let mut i = task::spawn(time::interval_at(start, ms(300)));
52
53 check_interval_poll!(i, start, 0);
54
55 time::advance(ms(100)).await;
56 check_interval_poll!(i, start);
57
58 time::advance(ms(200)).await;
59 check_interval_poll!(i, start, 300);
60
61 time::advance(ms(650)).await;
62 check_interval_poll!(i, start, 600, 900);
63
64 time::advance(ms(200)).await;
65 check_interval_poll!(i, start);
66
67 time::advance(ms(100)).await;
68 check_interval_poll!(i, start, 1200);
69
70 time::advance(ms(250)).await;
71 check_interval_poll!(i, start, 1500);
72
73 time::advance(ms(300)).await;
74 check_interval_poll!(i, start, 1800);
75}
76
77// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
78// Actual ticks: | work -----| delay | work -----| work -----| work -----|
79// Poll behavior: | | | | | | | |
80// | | | | | | | |
81// Ready(s) | | Ready(s + 2p) | | | |
82// Pending | Pending | | |
83// Ready(s + p) Ready(s + 2p + d) | |
84// Ready(s + 3p + d) |
85// Ready(s + 4p + d)
86#[tokio::test(start_paused = true)]
87async fn delay() {
88 let start = Instant::now();
89
90 // This is necessary because the timer is only so granular, and in order for
91 // all our ticks to resolve, the time needs to be 1ms ahead of what we
92 // expect, so that the runtime will see that it is time to resolve the timer
93 time::advance(ms(1)).await;
94
95 let mut i = task::spawn(time::interval_at(start, ms(300)));
96 i.set_missed_tick_behavior(MissedTickBehavior::Delay);
97
98 check_interval_poll!(i, start, 0);
99
100 time::advance(ms(100)).await;
101 check_interval_poll!(i, start);
102
103 time::advance(ms(200)).await;
104 check_interval_poll!(i, start, 300);
105
106 time::advance(ms(650)).await;
107 check_interval_poll!(i, start, 600);
108
109 time::advance(ms(100)).await;
110 check_interval_poll!(i, start);
111
112 // We have to add one here for the same reason as is above.
113 // Because `Interval` has reset its timer according to `Instant::now()`,
114 // we have to go forward 1 more millisecond than is expected so that the
115 // runtime realizes that it's time to resolve the timer.
116 time::advance(ms(201)).await;
117 // We add one because when using the `Delay` behavior, `Interval`
118 // adds the `period` from `Instant::now()`, which will always be off by one
119 // because we have to advance time by 1 (see above).
120 check_interval_poll!(i, start, 1251);
121
122 time::advance(ms(300)).await;
123 // Again, we add one.
124 check_interval_poll!(i, start, 1551);
125
126 time::advance(ms(300)).await;
127 check_interval_poll!(i, start, 1851);
128}
129
130// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
131// Actual ticks: | work -----| delay | work ---| work -----| work -----|
132// Poll behavior: | | | | | | |
133// | | | | | | |
134// Ready(s) | | Ready(s + 2p) | | |
135// Pending | Ready(s + 4p) | |
136// Ready(s + p) Ready(s + 5p) |
137// Ready(s + 6p)
138#[tokio::test(start_paused = true)]
139async fn skip() {
140 let start = Instant::now();
141
142 // This is necessary because the timer is only so granular, and in order for
143 // all our ticks to resolve, the time needs to be 1ms ahead of what we
144 // expect, so that the runtime will see that it is time to resolve the timer
145 time::advance(ms(1)).await;
146
147 let mut i = task::spawn(time::interval_at(start, ms(300)));
148 i.set_missed_tick_behavior(MissedTickBehavior::Skip);
149
150 check_interval_poll!(i, start, 0);
151
152 time::advance(ms(100)).await;
153 check_interval_poll!(i, start);
154
155 time::advance(ms(200)).await;
156 check_interval_poll!(i, start, 300);
157
158 time::advance(ms(650)).await;
159 check_interval_poll!(i, start, 600);
160
161 time::advance(ms(250)).await;
162 check_interval_poll!(i, start, 1200);
163
164 time::advance(ms(300)).await;
165 check_interval_poll!(i, start, 1500);
166
167 time::advance(ms(300)).await;
168 check_interval_poll!(i, start, 1800);
169}
170
171#[tokio::test(start_paused = true)]
172async fn reset() {
173 let start = Instant::now();
174
175 // This is necessary because the timer is only so granular, and in order for
176 // all our ticks to resolve, the time needs to be 1ms ahead of what we
177 // expect, so that the runtime will see that it is time to resolve the timer
178 time::advance(ms(1)).await;
179
180 let mut i = task::spawn(time::interval_at(start, ms(300)));
181
182 check_interval_poll!(i, start, 0);
183
184 time::advance(ms(100)).await;
185 check_interval_poll!(i, start);
186
187 time::advance(ms(200)).await;
188 check_interval_poll!(i, start, 300);
189
190 time::advance(ms(100)).await;
191 check_interval_poll!(i, start);
192
193 i.reset();
194
195 time::advance(ms(250)).await;
196 check_interval_poll!(i, start);
197
198 time::advance(ms(50)).await;
199 // We add one because when using `reset` method, `Interval` adds the
200 // `period` from `Instant::now()`, which will always be off by one
201 check_interval_poll!(i, start, 701);
202
203 time::advance(ms(300)).await;
204 check_interval_poll!(i, start, 1001);
205}
206
207#[tokio::test(start_paused = true)]
208async fn reset_immediately() {
209 let start = Instant::now();
210
211 // This is necessary because the timer is only so granular, and in order for
212 // all our ticks to resolve, the time needs to be 1ms ahead of what we
213 // expect, so that the runtime will see that it is time to resolve the timer
214 time::advance(ms(1)).await;
215
216 let mut i = task::spawn(time::interval_at(start, ms(300)));
217
218 check_interval_poll!(i, start, 0);
219
220 time::advance(ms(100)).await;
221 check_interval_poll!(i, start);
222
223 time::advance(ms(200)).await;
224 check_interval_poll!(i, start, 300);
225
226 time::advance(ms(100)).await;
227 check_interval_poll!(i, start);
228
229 i.reset_immediately();
230
231 // We add one because when using `reset` method, `Interval` adds the
232 // `period` from `Instant::now()`, which will always be off by one
233 check_interval_poll!(i, start, 401);
234
235 time::advance(ms(100)).await;
236 check_interval_poll!(i, start);
237
238 time::advance(ms(200)).await;
239 check_interval_poll!(i, start, 701);
240}
241
242#[tokio::test(start_paused = true)]
243async fn reset_after() {
244 let start = Instant::now();
245
246 // This is necessary because the timer is only so granular, and in order for
247 // all our ticks to resolve, the time needs to be 1ms ahead of what we
248 // expect, so that the runtime will see that it is time to resolve the timer
249 time::advance(ms(1)).await;
250
251 let mut i = task::spawn(time::interval_at(start, ms(300)));
252
253 check_interval_poll!(i, start, 0);
254
255 time::advance(ms(100)).await;
256 check_interval_poll!(i, start);
257
258 time::advance(ms(200)).await;
259 check_interval_poll!(i, start, 300);
260
261 time::advance(ms(100)).await;
262 check_interval_poll!(i, start);
263
264 i.reset_after(Duration::from_millis(20));
265
266 // We add one because when using `reset` method, `Interval` adds the
267 // `period` from `Instant::now()`, which will always be off by one
268 time::advance(ms(20)).await;
269 check_interval_poll!(i, start, 421);
270
271 time::advance(ms(100)).await;
272 check_interval_poll!(i, start);
273
274 time::advance(ms(200)).await;
275 check_interval_poll!(i, start, 721);
276}
277
278#[tokio::test(start_paused = true)]
279async fn reset_at() {
280 let start = Instant::now();
281
282 // This is necessary because the timer is only so granular, and in order for
283 // all our ticks to resolve, the time needs to be 1ms ahead of what we
284 // expect, so that the runtime will see that it is time to resolve the timer
285 time::advance(ms(1)).await;
286
287 let mut i = task::spawn(time::interval_at(start, ms(300)));
288
289 check_interval_poll!(i, start, 0);
290
291 time::advance(ms(100)).await;
292 check_interval_poll!(i, start);
293
294 time::advance(ms(200)).await;
295 check_interval_poll!(i, start, 300);
296
297 time::advance(ms(100)).await;
298 check_interval_poll!(i, start);
299
300 i.reset_at(Instant::now() + Duration::from_millis(40));
301
302 // We add one because when using `reset` method, `Interval` adds the
303 // `period` from `Instant::now()`, which will always be off by one
304 time::advance(ms(40)).await;
305 check_interval_poll!(i, start, 441);
306
307 time::advance(ms(100)).await;
308 check_interval_poll!(i, start);
309
310 time::advance(ms(200)).await;
311 check_interval_poll!(i, start, 741);
312}
313
314#[tokio::test(start_paused = true)]
315async fn reset_at_bigger_than_interval() {
316 let start = Instant::now();
317
318 // This is necessary because the timer is only so granular, and in order for
319 // all our ticks to resolve, the time needs to be 1ms ahead of what we
320 // expect, so that the runtime will see that it is time to resolve the timer
321 time::advance(ms(1)).await;
322
323 let mut i = task::spawn(time::interval_at(start, ms(300)));
324
325 check_interval_poll!(i, start, 0);
326
327 time::advance(ms(100)).await;
328 check_interval_poll!(i, start);
329
330 time::advance(ms(200)).await;
331 check_interval_poll!(i, start, 300);
332
333 time::advance(ms(100)).await;
334 check_interval_poll!(i, start);
335
336 i.reset_at(Instant::now() + Duration::from_millis(1000));
337
338 // Validate the interval does not tick until 1000ms have passed
339 time::advance(ms(300)).await;
340 check_interval_poll!(i, start);
341 time::advance(ms(300)).await;
342 check_interval_poll!(i, start);
343 time::advance(ms(300)).await;
344 check_interval_poll!(i, start);
345
346 // We add one because when using `reset` method, `Interval` adds the
347 // `period` from `Instant::now()`, which will always be off by one
348 time::advance(ms(100)).await;
349 check_interval_poll!(i, start, 1401);
350
351 time::advance(ms(300)).await;
352 check_interval_poll!(i, start, 1701);
353}
354
355fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
356 interval.enter(|cx, mut interval| interval.poll_tick(cx))
357}
358
359fn ms(n: u64) -> Duration {
360 Duration::from_millis(n)
361}
362
363/// Helper struct to test the [tokio::time::Interval::poll_tick()] method.
364///
365/// `poll_tick()` should register the waker in the context only if it returns
366/// `Poll::Pending`, not when returning `Poll::Ready`. This struct contains an
367/// interval timer and counts up on every tick when used as stream. When the
368/// counter is a multiple of four, it yields the current counter value.
369/// Depending on the value for `wake_on_pending`, it will reschedule itself when
370/// it returns `Poll::Pending` or not. When used with `wake_on_pending=false`,
371/// we expect that the stream stalls because the timer will **not** reschedule
372/// the next wake-up itself once it returned `Poll::Ready`.
373struct IntervalStreamer {
374 counter: u32,
375 timer: Interval,
376 wake_on_pending: bool,
377}
378
379impl Stream for IntervalStreamer {
380 type Item = u32;
381
382 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
383 let this = Pin::into_inner(self);
384
385 if this.counter > 12 {
386 return Poll::Ready(None);
387 }
388
389 match this.timer.poll_tick(cx) {
390 Poll::Pending => Poll::Pending,
391 Poll::Ready(_) => {
392 this.counter += 1;
393 if this.counter % 4 == 0 {
394 Poll::Ready(Some(this.counter))
395 } else {
396 if this.wake_on_pending {
397 // Schedule this task for wake-up
398 cx.waker().wake_by_ref();
399 }
400 Poll::Pending
401 }
402 }
403 }
404 }
405}
406
407#[tokio::test(start_paused = true)]
408async fn stream_with_interval_poll_tick_self_waking() {
409 let stream = IntervalStreamer {
410 counter: 0,
411 timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
412 wake_on_pending: true,
413 };
414
415 let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);
416
417 // Wrap task in timeout so that it will finish eventually even if the stream
418 // stalls.
419 tokio::spawn(tokio::time::timeout(
420 tokio::time::Duration::from_millis(150),
421 async move {
422 tokio::pin!(stream);
423
424 while let Some(item) = stream.next().await {
425 res_tx.send(item).await.ok();
426 }
427 },
428 ));
429
430 let mut items = Vec::with_capacity(3);
431 while let Some(result) = res_rx.recv().await {
432 items.push(result);
433 }
434
435 // We expect the stream to yield normally and thus three items.
436 assert_eq!(items, vec![4, 8, 12]);
437}
438
439#[tokio::test(start_paused = true)]
440async fn stream_with_interval_poll_tick_no_waking() {
441 let stream = IntervalStreamer {
442 counter: 0,
443 timer: tokio::time::interval(tokio::time::Duration::from_millis(10)),
444 wake_on_pending: false,
445 };
446
447 let (res_tx, mut res_rx) = tokio::sync::mpsc::channel(12);
448
449 // Wrap task in timeout so that it will finish eventually even if the stream
450 // stalls.
451 tokio::spawn(tokio::time::timeout(
452 tokio::time::Duration::from_millis(150),
453 async move {
454 tokio::pin!(stream);
455
456 while let Some(item) = stream.next().await {
457 res_tx.send(item).await.ok();
458 }
459 },
460 ));
461
462 let mut items = Vec::with_capacity(0);
463 while let Some(result) = res_rx.recv().await {
464 items.push(result);
465 }
466
467 // We expect the stream to stall because it does not reschedule itself on
468 // `Poll::Pending` and neither does [tokio::time::Interval] reschedule the
469 // task when returning `Poll::Ready`.
470 assert_eq!(items, vec![]);
471}
472