1 | #![allow (clippy::disallowed_names)] |
2 | #![warn (rust_2018_idioms)] |
3 | #![cfg (feature = "full" )] |
4 | |
5 | use futures::StreamExt; |
6 | use tokio::time::{self, sleep, sleep_until, Duration, Instant}; |
7 | use tokio_test::{assert_pending, assert_ready, task}; |
8 | use tokio_util::time::DelayQueue; |
9 | |
10 | macro_rules! poll { |
11 | ($queue:ident) => { |
12 | $queue.enter(|cx, mut queue| queue.poll_expired(cx)) |
13 | }; |
14 | } |
15 | |
16 | macro_rules! assert_ready_some { |
17 | ($e:expr) => {{ |
18 | match assert_ready!($e) { |
19 | Some(v) => v, |
20 | None => panic!("None" ), |
21 | } |
22 | }}; |
23 | } |
24 | |
25 | #[tokio::test ] |
26 | async fn single_immediate_delay() { |
27 | time::pause(); |
28 | |
29 | let mut queue = task::spawn(DelayQueue::new()); |
30 | let _key = queue.insert_at("foo" , Instant::now()); |
31 | |
32 | // Advance time by 1ms to handle thee rounding |
33 | sleep(ms(1)).await; |
34 | |
35 | assert_ready_some!(poll!(queue)); |
36 | |
37 | let entry = assert_ready!(poll!(queue)); |
38 | assert!(entry.is_none()) |
39 | } |
40 | |
41 | #[tokio::test ] |
42 | async fn multi_immediate_delays() { |
43 | time::pause(); |
44 | |
45 | let mut queue = task::spawn(DelayQueue::new()); |
46 | |
47 | let _k = queue.insert_at("1" , Instant::now()); |
48 | let _k = queue.insert_at("2" , Instant::now()); |
49 | let _k = queue.insert_at("3" , Instant::now()); |
50 | |
51 | sleep(ms(1)).await; |
52 | |
53 | let mut res = vec![]; |
54 | |
55 | while res.len() < 3 { |
56 | let entry = assert_ready_some!(poll!(queue)); |
57 | res.push(entry.into_inner()); |
58 | } |
59 | |
60 | let entry = assert_ready!(poll!(queue)); |
61 | assert!(entry.is_none()); |
62 | |
63 | res.sort_unstable(); |
64 | |
65 | assert_eq!("1" , res[0]); |
66 | assert_eq!("2" , res[1]); |
67 | assert_eq!("3" , res[2]); |
68 | } |
69 | |
70 | #[tokio::test ] |
71 | async fn single_short_delay() { |
72 | time::pause(); |
73 | |
74 | let mut queue = task::spawn(DelayQueue::new()); |
75 | let _key = queue.insert_at("foo" , Instant::now() + ms(5)); |
76 | |
77 | assert_pending!(poll!(queue)); |
78 | |
79 | sleep(ms(1)).await; |
80 | |
81 | assert!(!queue.is_woken()); |
82 | |
83 | sleep(ms(5)).await; |
84 | |
85 | assert!(queue.is_woken()); |
86 | |
87 | let entry = assert_ready_some!(poll!(queue)); |
88 | assert_eq!(*entry.get_ref(), "foo" ); |
89 | |
90 | let entry = assert_ready!(poll!(queue)); |
91 | assert!(entry.is_none()); |
92 | } |
93 | |
94 | #[tokio::test ] |
95 | async fn multi_delay_at_start() { |
96 | time::pause(); |
97 | |
98 | let long = 262_144 + 9 * 4096; |
99 | let delays = &[1000, 2, 234, long, 60, 10]; |
100 | |
101 | let mut queue = task::spawn(DelayQueue::new()); |
102 | |
103 | // Setup the delays |
104 | for &i in delays { |
105 | let _key = queue.insert_at(i, Instant::now() + ms(i)); |
106 | } |
107 | |
108 | assert_pending!(poll!(queue)); |
109 | assert!(!queue.is_woken()); |
110 | |
111 | let start = Instant::now(); |
112 | for elapsed in 0..1200 { |
113 | println!("elapsed: {:?}" , elapsed); |
114 | let elapsed = elapsed + 1; |
115 | tokio::time::sleep_until(start + ms(elapsed)).await; |
116 | |
117 | if delays.contains(&elapsed) { |
118 | assert!(queue.is_woken()); |
119 | assert_ready!(poll!(queue)); |
120 | assert_pending!(poll!(queue)); |
121 | } else if queue.is_woken() { |
122 | let cascade = &[192, 960]; |
123 | assert!( |
124 | cascade.contains(&elapsed), |
125 | "elapsed={} dt={:?}" , |
126 | elapsed, |
127 | Instant::now() - start |
128 | ); |
129 | |
130 | assert_pending!(poll!(queue)); |
131 | } |
132 | } |
133 | println!("finished multi_delay_start" ); |
134 | } |
135 | |
136 | #[tokio::test ] |
137 | async fn insert_in_past_fires_immediately() { |
138 | println!("running insert_in_past_fires_immediately" ); |
139 | time::pause(); |
140 | |
141 | let mut queue = task::spawn(DelayQueue::new()); |
142 | let now = Instant::now(); |
143 | |
144 | sleep(ms(10)).await; |
145 | |
146 | queue.insert_at("foo" , now); |
147 | |
148 | assert_ready!(poll!(queue)); |
149 | println!("finished insert_in_past_fires_immediately" ); |
150 | } |
151 | |
152 | #[tokio::test ] |
153 | async fn remove_entry() { |
154 | time::pause(); |
155 | |
156 | let mut queue = task::spawn(DelayQueue::new()); |
157 | |
158 | let key = queue.insert_at("foo" , Instant::now() + ms(5)); |
159 | |
160 | assert_pending!(poll!(queue)); |
161 | |
162 | let entry = queue.remove(&key); |
163 | assert_eq!(entry.into_inner(), "foo" ); |
164 | |
165 | sleep(ms(10)).await; |
166 | |
167 | let entry = assert_ready!(poll!(queue)); |
168 | assert!(entry.is_none()); |
169 | } |
170 | |
171 | #[tokio::test ] |
172 | async fn reset_entry() { |
173 | time::pause(); |
174 | |
175 | let mut queue = task::spawn(DelayQueue::new()); |
176 | |
177 | let now = Instant::now(); |
178 | let key = queue.insert_at("foo" , now + ms(5)); |
179 | |
180 | assert_pending!(poll!(queue)); |
181 | sleep(ms(1)).await; |
182 | |
183 | queue.reset_at(&key, now + ms(10)); |
184 | |
185 | assert_pending!(poll!(queue)); |
186 | |
187 | sleep(ms(7)).await; |
188 | |
189 | assert!(!queue.is_woken()); |
190 | |
191 | assert_pending!(poll!(queue)); |
192 | |
193 | sleep(ms(3)).await; |
194 | |
195 | assert!(queue.is_woken()); |
196 | |
197 | let entry = assert_ready_some!(poll!(queue)); |
198 | assert_eq!(*entry.get_ref(), "foo" ); |
199 | |
200 | let entry = assert_ready!(poll!(queue)); |
201 | assert!(entry.is_none()) |
202 | } |
203 | |
204 | // Reproduces tokio-rs/tokio#849. |
205 | #[tokio::test ] |
206 | async fn reset_much_later() { |
207 | time::pause(); |
208 | |
209 | let mut queue = task::spawn(DelayQueue::new()); |
210 | |
211 | let now = Instant::now(); |
212 | sleep(ms(1)).await; |
213 | |
214 | let key = queue.insert_at("foo" , now + ms(200)); |
215 | assert_pending!(poll!(queue)); |
216 | |
217 | sleep(ms(3)).await; |
218 | |
219 | queue.reset_at(&key, now + ms(10)); |
220 | |
221 | sleep(ms(20)).await; |
222 | |
223 | assert!(queue.is_woken()); |
224 | } |
225 | |
226 | // Reproduces tokio-rs/tokio#849. |
227 | #[tokio::test ] |
228 | async fn reset_twice() { |
229 | time::pause(); |
230 | |
231 | let mut queue = task::spawn(DelayQueue::new()); |
232 | let now = Instant::now(); |
233 | |
234 | sleep(ms(1)).await; |
235 | |
236 | let key = queue.insert_at("foo" , now + ms(200)); |
237 | |
238 | assert_pending!(poll!(queue)); |
239 | |
240 | sleep(ms(3)).await; |
241 | |
242 | queue.reset_at(&key, now + ms(50)); |
243 | |
244 | sleep(ms(20)).await; |
245 | |
246 | queue.reset_at(&key, now + ms(40)); |
247 | |
248 | sleep(ms(20)).await; |
249 | |
250 | assert!(queue.is_woken()); |
251 | } |
252 | |
253 | /// Regression test: Given an entry inserted with a deadline in the past, so |
254 | /// that it is placed directly on the expired queue, reset the entry to a |
255 | /// deadline in the future. Validate that this leaves the entry and queue in an |
256 | /// internally consistent state by running an additional reset on the entry |
257 | /// before polling it to completion. |
258 | #[tokio::test ] |
259 | async fn repeatedly_reset_entry_inserted_as_expired() { |
260 | time::pause(); |
261 | |
262 | // Instants before the start of the test seem to break in wasm. |
263 | time::sleep(ms(1000)).await; |
264 | |
265 | let mut queue = task::spawn(DelayQueue::new()); |
266 | let now = Instant::now(); |
267 | |
268 | let key = queue.insert_at("foo" , now - ms(100)); |
269 | |
270 | queue.reset_at(&key, now + ms(100)); |
271 | queue.reset_at(&key, now + ms(50)); |
272 | |
273 | assert_pending!(poll!(queue)); |
274 | |
275 | time::sleep_until(now + ms(60)).await; |
276 | |
277 | assert!(queue.is_woken()); |
278 | |
279 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
280 | assert_eq!(entry, "foo" ); |
281 | |
282 | let entry = assert_ready!(poll!(queue)); |
283 | assert!(entry.is_none()); |
284 | } |
285 | |
286 | #[tokio::test ] |
287 | async fn remove_expired_item() { |
288 | time::pause(); |
289 | |
290 | let mut queue = DelayQueue::new(); |
291 | |
292 | let now = Instant::now(); |
293 | |
294 | sleep(ms(10)).await; |
295 | |
296 | let key = queue.insert_at("foo" , now); |
297 | |
298 | let entry = queue.remove(&key); |
299 | assert_eq!(entry.into_inner(), "foo" ); |
300 | } |
301 | |
302 | /// Regression test: it should be possible to remove entries which fall in the |
303 | /// 0th slot of the internal timer wheel — that is, entries whose expiration |
304 | /// (a) falls at the beginning of one of the wheel's hierarchical levels and (b) |
305 | /// is equal to the wheel's current elapsed time. |
306 | #[tokio::test ] |
307 | async fn remove_at_timer_wheel_threshold() { |
308 | time::pause(); |
309 | |
310 | let mut queue = task::spawn(DelayQueue::new()); |
311 | |
312 | let now = Instant::now(); |
313 | |
314 | let key1 = queue.insert_at("foo" , now + ms(64)); |
315 | let key2 = queue.insert_at("bar" , now + ms(64)); |
316 | |
317 | sleep(ms(80)).await; |
318 | |
319 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
320 | |
321 | match entry { |
322 | "foo" => { |
323 | let entry = queue.remove(&key2).into_inner(); |
324 | assert_eq!(entry, "bar" ); |
325 | } |
326 | "bar" => { |
327 | let entry = queue.remove(&key1).into_inner(); |
328 | assert_eq!(entry, "foo" ); |
329 | } |
330 | other => panic!("other: {:?}" , other), |
331 | } |
332 | } |
333 | |
334 | #[tokio::test ] |
335 | async fn expires_before_last_insert() { |
336 | time::pause(); |
337 | |
338 | let mut queue = task::spawn(DelayQueue::new()); |
339 | |
340 | let now = Instant::now(); |
341 | |
342 | queue.insert_at("foo" , now + ms(10_000)); |
343 | |
344 | // Delay should be set to 8.192s here. |
345 | assert_pending!(poll!(queue)); |
346 | |
347 | // Delay should be set to the delay of the new item here |
348 | queue.insert_at("bar" , now + ms(600)); |
349 | |
350 | assert_pending!(poll!(queue)); |
351 | |
352 | sleep(ms(600)).await; |
353 | |
354 | assert!(queue.is_woken()); |
355 | |
356 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
357 | assert_eq!(entry, "bar" ); |
358 | } |
359 | |
360 | #[tokio::test ] |
361 | async fn multi_reset() { |
362 | time::pause(); |
363 | |
364 | let mut queue = task::spawn(DelayQueue::new()); |
365 | |
366 | let now = Instant::now(); |
367 | |
368 | let one = queue.insert_at("one" , now + ms(200)); |
369 | let two = queue.insert_at("two" , now + ms(250)); |
370 | |
371 | assert_pending!(poll!(queue)); |
372 | |
373 | queue.reset_at(&one, now + ms(300)); |
374 | queue.reset_at(&two, now + ms(350)); |
375 | queue.reset_at(&one, now + ms(400)); |
376 | |
377 | sleep(ms(310)).await; |
378 | |
379 | assert_pending!(poll!(queue)); |
380 | |
381 | sleep(ms(50)).await; |
382 | |
383 | let entry = assert_ready_some!(poll!(queue)); |
384 | assert_eq!(*entry.get_ref(), "two" ); |
385 | |
386 | assert_pending!(poll!(queue)); |
387 | |
388 | sleep(ms(50)).await; |
389 | |
390 | let entry = assert_ready_some!(poll!(queue)); |
391 | assert_eq!(*entry.get_ref(), "one" ); |
392 | |
393 | let entry = assert_ready!(poll!(queue)); |
394 | assert!(entry.is_none()) |
395 | } |
396 | |
397 | #[tokio::test ] |
398 | async fn expire_first_key_when_reset_to_expire_earlier() { |
399 | time::pause(); |
400 | |
401 | let mut queue = task::spawn(DelayQueue::new()); |
402 | |
403 | let now = Instant::now(); |
404 | |
405 | let one = queue.insert_at("one" , now + ms(200)); |
406 | queue.insert_at("two" , now + ms(250)); |
407 | |
408 | assert_pending!(poll!(queue)); |
409 | |
410 | queue.reset_at(&one, now + ms(100)); |
411 | |
412 | sleep(ms(100)).await; |
413 | |
414 | assert!(queue.is_woken()); |
415 | |
416 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
417 | assert_eq!(entry, "one" ); |
418 | } |
419 | |
420 | #[tokio::test ] |
421 | async fn expire_second_key_when_reset_to_expire_earlier() { |
422 | time::pause(); |
423 | |
424 | let mut queue = task::spawn(DelayQueue::new()); |
425 | |
426 | let now = Instant::now(); |
427 | |
428 | queue.insert_at("one" , now + ms(200)); |
429 | let two = queue.insert_at("two" , now + ms(250)); |
430 | |
431 | assert_pending!(poll!(queue)); |
432 | |
433 | queue.reset_at(&two, now + ms(100)); |
434 | |
435 | sleep(ms(100)).await; |
436 | |
437 | assert!(queue.is_woken()); |
438 | |
439 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
440 | assert_eq!(entry, "two" ); |
441 | } |
442 | |
443 | #[tokio::test ] |
444 | async fn reset_first_expiring_item_to_expire_later() { |
445 | time::pause(); |
446 | |
447 | let mut queue = task::spawn(DelayQueue::new()); |
448 | |
449 | let now = Instant::now(); |
450 | |
451 | let one = queue.insert_at("one" , now + ms(200)); |
452 | let _two = queue.insert_at("two" , now + ms(250)); |
453 | |
454 | assert_pending!(poll!(queue)); |
455 | |
456 | queue.reset_at(&one, now + ms(300)); |
457 | sleep(ms(250)).await; |
458 | |
459 | assert!(queue.is_woken()); |
460 | |
461 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
462 | assert_eq!(entry, "two" ); |
463 | } |
464 | |
465 | #[tokio::test ] |
466 | async fn insert_before_first_after_poll() { |
467 | time::pause(); |
468 | |
469 | let mut queue = task::spawn(DelayQueue::new()); |
470 | |
471 | let now = Instant::now(); |
472 | |
473 | let _one = queue.insert_at("one" , now + ms(200)); |
474 | |
475 | assert_pending!(poll!(queue)); |
476 | |
477 | let _two = queue.insert_at("two" , now + ms(100)); |
478 | |
479 | sleep(ms(99)).await; |
480 | |
481 | assert_pending!(poll!(queue)); |
482 | |
483 | sleep(ms(1)).await; |
484 | |
485 | assert!(queue.is_woken()); |
486 | |
487 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
488 | assert_eq!(entry, "two" ); |
489 | } |
490 | |
491 | #[tokio::test ] |
492 | async fn insert_after_ready_poll() { |
493 | time::pause(); |
494 | |
495 | let mut queue = task::spawn(DelayQueue::new()); |
496 | |
497 | let now = Instant::now(); |
498 | |
499 | queue.insert_at("1" , now + ms(100)); |
500 | queue.insert_at("2" , now + ms(100)); |
501 | queue.insert_at("3" , now + ms(100)); |
502 | |
503 | assert_pending!(poll!(queue)); |
504 | |
505 | sleep(ms(100)).await; |
506 | |
507 | assert!(queue.is_woken()); |
508 | |
509 | let mut res = vec![]; |
510 | |
511 | while res.len() < 3 { |
512 | let entry = assert_ready_some!(poll!(queue)); |
513 | res.push(entry.into_inner()); |
514 | queue.insert_at("foo" , now + ms(500)); |
515 | } |
516 | |
517 | res.sort_unstable(); |
518 | |
519 | assert_eq!("1" , res[0]); |
520 | assert_eq!("2" , res[1]); |
521 | assert_eq!("3" , res[2]); |
522 | } |
523 | |
524 | #[tokio::test ] |
525 | async fn reset_later_after_slot_starts() { |
526 | time::pause(); |
527 | |
528 | let mut queue = task::spawn(DelayQueue::new()); |
529 | |
530 | let now = Instant::now(); |
531 | |
532 | let foo = queue.insert_at("foo" , now + ms(100)); |
533 | |
534 | assert_pending!(poll!(queue)); |
535 | |
536 | sleep_until(now + Duration::from_millis(80)).await; |
537 | |
538 | assert!(!queue.is_woken()); |
539 | |
540 | // At this point the queue hasn't been polled, so `elapsed` on the wheel |
541 | // for the queue is still at 0 and hence the 1ms resolution slots cover |
542 | // [0-64). Resetting the time on the entry to 120 causes it to get put in |
543 | // the [64-128) slot. As the queue knows that the first entry is within |
544 | // that slot, but doesn't know when, it must wake immediately to advance |
545 | // the wheel. |
546 | queue.reset_at(&foo, now + ms(120)); |
547 | assert!(queue.is_woken()); |
548 | |
549 | assert_pending!(poll!(queue)); |
550 | |
551 | sleep_until(now + Duration::from_millis(119)).await; |
552 | assert!(!queue.is_woken()); |
553 | |
554 | sleep(ms(1)).await; |
555 | assert!(queue.is_woken()); |
556 | |
557 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
558 | assert_eq!(entry, "foo" ); |
559 | } |
560 | |
561 | #[tokio::test ] |
562 | async fn reset_inserted_expired() { |
563 | time::pause(); |
564 | |
565 | // Instants before the start of the test seem to break in wasm. |
566 | time::sleep(ms(1000)).await; |
567 | |
568 | let mut queue = task::spawn(DelayQueue::new()); |
569 | let now = Instant::now(); |
570 | |
571 | let key = queue.insert_at("foo" , now - ms(100)); |
572 | |
573 | // this causes the panic described in #2473 |
574 | queue.reset_at(&key, now + ms(100)); |
575 | |
576 | assert_eq!(1, queue.len()); |
577 | |
578 | sleep(ms(200)).await; |
579 | |
580 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
581 | assert_eq!(entry, "foo" ); |
582 | |
583 | assert_eq!(queue.len(), 0); |
584 | } |
585 | |
586 | #[tokio::test ] |
587 | async fn reset_earlier_after_slot_starts() { |
588 | time::pause(); |
589 | |
590 | let mut queue = task::spawn(DelayQueue::new()); |
591 | |
592 | let now = Instant::now(); |
593 | |
594 | let foo = queue.insert_at("foo" , now + ms(200)); |
595 | |
596 | assert_pending!(poll!(queue)); |
597 | |
598 | sleep_until(now + Duration::from_millis(80)).await; |
599 | |
600 | assert!(!queue.is_woken()); |
601 | |
602 | // At this point the queue hasn't been polled, so `elapsed` on the wheel |
603 | // for the queue is still at 0 and hence the 1ms resolution slots cover |
604 | // [0-64). Resetting the time on the entry to 120 causes it to get put in |
605 | // the [64-128) slot. As the queue knows that the first entry is within |
606 | // that slot, but doesn't know when, it must wake immediately to advance |
607 | // the wheel. |
608 | queue.reset_at(&foo, now + ms(120)); |
609 | assert!(queue.is_woken()); |
610 | |
611 | assert_pending!(poll!(queue)); |
612 | |
613 | sleep_until(now + Duration::from_millis(119)).await; |
614 | assert!(!queue.is_woken()); |
615 | |
616 | sleep(ms(1)).await; |
617 | assert!(queue.is_woken()); |
618 | |
619 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
620 | assert_eq!(entry, "foo" ); |
621 | } |
622 | |
623 | #[tokio::test ] |
624 | async fn insert_in_past_after_poll_fires_immediately() { |
625 | time::pause(); |
626 | |
627 | let mut queue = task::spawn(DelayQueue::new()); |
628 | |
629 | let now = Instant::now(); |
630 | |
631 | queue.insert_at("foo" , now + ms(200)); |
632 | |
633 | assert_pending!(poll!(queue)); |
634 | |
635 | sleep(ms(80)).await; |
636 | |
637 | assert!(!queue.is_woken()); |
638 | queue.insert_at("bar" , now + ms(40)); |
639 | |
640 | assert!(queue.is_woken()); |
641 | |
642 | let entry = assert_ready_some!(poll!(queue)).into_inner(); |
643 | assert_eq!(entry, "bar" ); |
644 | } |
645 | |
646 | #[tokio::test ] |
647 | async fn delay_queue_poll_expired_when_empty() { |
648 | let mut delay_queue = task::spawn(DelayQueue::new()); |
649 | let key = delay_queue.insert(0, std::time::Duration::from_secs(10)); |
650 | assert_pending!(poll!(delay_queue)); |
651 | |
652 | delay_queue.remove(&key); |
653 | assert!(assert_ready!(poll!(delay_queue)).is_none()); |
654 | } |
655 | |
656 | #[tokio::test (start_paused = true)] |
657 | async fn compact_expire_empty() { |
658 | let mut queue = task::spawn(DelayQueue::new()); |
659 | |
660 | let now = Instant::now(); |
661 | |
662 | queue.insert_at("foo1" , now + ms(10)); |
663 | queue.insert_at("foo2" , now + ms(10)); |
664 | |
665 | sleep(ms(10)).await; |
666 | |
667 | let mut res = vec![]; |
668 | while res.len() < 2 { |
669 | let entry = assert_ready_some!(poll!(queue)); |
670 | res.push(entry.into_inner()); |
671 | } |
672 | |
673 | queue.compact(); |
674 | |
675 | assert_eq!(queue.len(), 0); |
676 | assert_eq!(queue.capacity(), 0); |
677 | } |
678 | |
679 | #[tokio::test (start_paused = true)] |
680 | async fn compact_remove_empty() { |
681 | let mut queue = task::spawn(DelayQueue::new()); |
682 | |
683 | let now = Instant::now(); |
684 | |
685 | let key1 = queue.insert_at("foo1" , now + ms(10)); |
686 | let key2 = queue.insert_at("foo2" , now + ms(10)); |
687 | |
688 | queue.remove(&key1); |
689 | queue.remove(&key2); |
690 | |
691 | queue.compact(); |
692 | |
693 | assert_eq!(queue.len(), 0); |
694 | assert_eq!(queue.capacity(), 0); |
695 | } |
696 | |
697 | #[tokio::test (start_paused = true)] |
698 | // Trigger a re-mapping of keys in the slab due to a `compact` call and |
699 | // test removal of re-mapped keys |
700 | async fn compact_remove_remapped_keys() { |
701 | let mut queue = task::spawn(DelayQueue::new()); |
702 | |
703 | let now = Instant::now(); |
704 | |
705 | queue.insert_at("foo1" , now + ms(10)); |
706 | queue.insert_at("foo2" , now + ms(10)); |
707 | |
708 | // should be assigned indices 3 and 4 |
709 | let key3 = queue.insert_at("foo3" , now + ms(20)); |
710 | let key4 = queue.insert_at("foo4" , now + ms(20)); |
711 | |
712 | sleep(ms(10)).await; |
713 | |
714 | let mut res = vec![]; |
715 | while res.len() < 2 { |
716 | let entry = assert_ready_some!(poll!(queue)); |
717 | res.push(entry.into_inner()); |
718 | } |
719 | |
720 | // items corresponding to `foo3` and `foo4` will be assigned |
721 | // new indices here |
722 | queue.compact(); |
723 | |
724 | queue.insert_at("foo5" , now + ms(10)); |
725 | |
726 | // test removal of re-mapped keys |
727 | let expired3 = queue.remove(&key3); |
728 | let expired4 = queue.remove(&key4); |
729 | |
730 | assert_eq!(expired3.into_inner(), "foo3" ); |
731 | assert_eq!(expired4.into_inner(), "foo4" ); |
732 | |
733 | queue.compact(); |
734 | assert_eq!(queue.len(), 1); |
735 | assert_eq!(queue.capacity(), 1); |
736 | } |
737 | |
738 | #[tokio::test (start_paused = true)] |
739 | async fn compact_change_deadline() { |
740 | let mut queue = task::spawn(DelayQueue::new()); |
741 | |
742 | let mut now = Instant::now(); |
743 | |
744 | queue.insert_at("foo1" , now + ms(10)); |
745 | queue.insert_at("foo2" , now + ms(10)); |
746 | |
747 | // should be assigned indices 3 and 4 |
748 | queue.insert_at("foo3" , now + ms(20)); |
749 | let key4 = queue.insert_at("foo4" , now + ms(20)); |
750 | |
751 | sleep(ms(10)).await; |
752 | |
753 | let mut res = vec![]; |
754 | while res.len() < 2 { |
755 | let entry = assert_ready_some!(poll!(queue)); |
756 | res.push(entry.into_inner()); |
757 | } |
758 | |
759 | // items corresponding to `foo3` and `foo4` should be assigned |
760 | // new indices |
761 | queue.compact(); |
762 | |
763 | now = Instant::now(); |
764 | |
765 | queue.insert_at("foo5" , now + ms(10)); |
766 | let key6 = queue.insert_at("foo6" , now + ms(10)); |
767 | |
768 | queue.reset_at(&key4, now + ms(20)); |
769 | queue.reset_at(&key6, now + ms(20)); |
770 | |
771 | // foo3 and foo5 will expire |
772 | sleep(ms(10)).await; |
773 | |
774 | while res.len() < 4 { |
775 | let entry = assert_ready_some!(poll!(queue)); |
776 | res.push(entry.into_inner()); |
777 | } |
778 | |
779 | sleep(ms(10)).await; |
780 | |
781 | while res.len() < 6 { |
782 | let entry = assert_ready_some!(poll!(queue)); |
783 | res.push(entry.into_inner()); |
784 | } |
785 | |
786 | let entry = assert_ready!(poll!(queue)); |
787 | assert!(entry.is_none()); |
788 | } |
789 | |
790 | #[tokio::test (start_paused = true)] |
791 | async fn item_expiry_greater_than_wheel() { |
792 | // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted. |
793 | let mut queue = DelayQueue::new(); |
794 | for _ in 0..2 { |
795 | tokio::time::advance(Duration::from_millis(1 << 35)).await; |
796 | queue.insert(0, Duration::from_millis(0)); |
797 | queue.next().await; |
798 | } |
799 | // This should not panic |
800 | let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { |
801 | queue.insert(1, Duration::from_millis(1)); |
802 | })); |
803 | assert!(no_panic.is_ok()); |
804 | } |
805 | |
806 | #[cfg_attr (target_os = "wasi" , ignore = "FIXME: Does not seem to work with WASI" )] |
807 | #[tokio::test (start_paused = true)] |
808 | async fn remove_after_compact() { |
809 | let now = Instant::now(); |
810 | let mut queue = DelayQueue::new(); |
811 | |
812 | let foo_key = queue.insert_at("foo" , now + ms(10)); |
813 | queue.insert_at("bar" , now + ms(20)); |
814 | queue.remove(&foo_key); |
815 | queue.compact(); |
816 | |
817 | let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { |
818 | queue.remove(&foo_key); |
819 | })); |
820 | assert!(panic.is_err()); |
821 | } |
822 | |
823 | #[cfg_attr (target_os = "wasi" , ignore = "FIXME: Does not seem to work with WASI" )] |
824 | #[tokio::test (start_paused = true)] |
825 | async fn remove_after_compact_poll() { |
826 | let now = Instant::now(); |
827 | let mut queue = task::spawn(DelayQueue::new()); |
828 | |
829 | let foo_key = queue.insert_at("foo" , now + ms(10)); |
830 | queue.insert_at("bar" , now + ms(20)); |
831 | |
832 | sleep(ms(10)).await; |
833 | assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key); |
834 | |
835 | queue.compact(); |
836 | |
837 | let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { |
838 | queue.remove(&foo_key); |
839 | })); |
840 | assert!(panic.is_err()); |
841 | } |
842 | |
843 | #[tokio::test (start_paused = true)] |
844 | async fn peek() { |
845 | let mut queue = task::spawn(DelayQueue::new()); |
846 | |
847 | let now = Instant::now(); |
848 | |
849 | let key = queue.insert_at("foo" , now + ms(5)); |
850 | let key2 = queue.insert_at("bar" , now); |
851 | let key3 = queue.insert_at("baz" , now + ms(10)); |
852 | |
853 | assert_eq!(queue.peek(), Some(key2)); |
854 | |
855 | sleep(ms(6)).await; |
856 | |
857 | assert_eq!(queue.peek(), Some(key2)); |
858 | |
859 | let entry = assert_ready_some!(poll!(queue)); |
860 | assert_eq!(entry.get_ref(), &"bar" ); |
861 | |
862 | assert_eq!(queue.peek(), Some(key)); |
863 | |
864 | let entry = assert_ready_some!(poll!(queue)); |
865 | assert_eq!(entry.get_ref(), &"foo" ); |
866 | |
867 | assert_eq!(queue.peek(), Some(key3)); |
868 | |
869 | assert_pending!(poll!(queue)); |
870 | |
871 | sleep(ms(5)).await; |
872 | |
873 | assert_eq!(queue.peek(), Some(key3)); |
874 | |
875 | let entry = assert_ready_some!(poll!(queue)); |
876 | assert_eq!(entry.get_ref(), &"baz" ); |
877 | |
878 | assert!(queue.peek().is_none()); |
879 | } |
880 | |
881 | fn ms(n: u64) -> Duration { |
882 | Duration::from_millis(n) |
883 | } |
884 | |