1#![allow(clippy::disallowed_names)]
2#![warn(rust_2018_idioms)]
3#![cfg(feature = "full")]
4
5use futures::StreamExt;
6use tokio::time::{self, sleep, sleep_until, Duration, Instant};
7use tokio_test::{assert_pending, assert_ready, task};
8use tokio_util::time::DelayQueue;
9
10macro_rules! poll {
11 ($queue:ident) => {
12 $queue.enter(|cx, mut queue| queue.poll_expired(cx))
13 };
14}
15
16macro_rules! assert_ready_some {
17 ($e:expr) => {{
18 match assert_ready!($e) {
19 Some(v) => v,
20 None => panic!("None"),
21 }
22 }};
23}
24
25#[tokio::test]
26async fn single_immediate_delay() {
27 time::pause();
28
29 let mut queue = task::spawn(DelayQueue::new());
30 let _key = queue.insert_at("foo", Instant::now());
31
32 // Advance time by 1ms to handle thee rounding
33 sleep(ms(1)).await;
34
35 assert_ready_some!(poll!(queue));
36
37 let entry = assert_ready!(poll!(queue));
38 assert!(entry.is_none())
39}
40
41#[tokio::test]
42async fn multi_immediate_delays() {
43 time::pause();
44
45 let mut queue = task::spawn(DelayQueue::new());
46
47 let _k = queue.insert_at("1", Instant::now());
48 let _k = queue.insert_at("2", Instant::now());
49 let _k = queue.insert_at("3", Instant::now());
50
51 sleep(ms(1)).await;
52
53 let mut res = vec![];
54
55 while res.len() < 3 {
56 let entry = assert_ready_some!(poll!(queue));
57 res.push(entry.into_inner());
58 }
59
60 let entry = assert_ready!(poll!(queue));
61 assert!(entry.is_none());
62
63 res.sort_unstable();
64
65 assert_eq!("1", res[0]);
66 assert_eq!("2", res[1]);
67 assert_eq!("3", res[2]);
68}
69
70#[tokio::test]
71async fn single_short_delay() {
72 time::pause();
73
74 let mut queue = task::spawn(DelayQueue::new());
75 let _key = queue.insert_at("foo", Instant::now() + ms(5));
76
77 assert_pending!(poll!(queue));
78
79 sleep(ms(1)).await;
80
81 assert!(!queue.is_woken());
82
83 sleep(ms(5)).await;
84
85 assert!(queue.is_woken());
86
87 let entry = assert_ready_some!(poll!(queue));
88 assert_eq!(*entry.get_ref(), "foo");
89
90 let entry = assert_ready!(poll!(queue));
91 assert!(entry.is_none());
92}
93
94#[tokio::test]
95async fn multi_delay_at_start() {
96 time::pause();
97
98 let long = 262_144 + 9 * 4096;
99 let delays = &[1000, 2, 234, long, 60, 10];
100
101 let mut queue = task::spawn(DelayQueue::new());
102
103 // Setup the delays
104 for &i in delays {
105 let _key = queue.insert_at(i, Instant::now() + ms(i));
106 }
107
108 assert_pending!(poll!(queue));
109 assert!(!queue.is_woken());
110
111 let start = Instant::now();
112 for elapsed in 0..1200 {
113 println!("elapsed: {:?}", elapsed);
114 let elapsed = elapsed + 1;
115 tokio::time::sleep_until(start + ms(elapsed)).await;
116
117 if delays.contains(&elapsed) {
118 assert!(queue.is_woken());
119 assert_ready!(poll!(queue));
120 assert_pending!(poll!(queue));
121 } else if queue.is_woken() {
122 let cascade = &[192, 960];
123 assert!(
124 cascade.contains(&elapsed),
125 "elapsed={} dt={:?}",
126 elapsed,
127 Instant::now() - start
128 );
129
130 assert_pending!(poll!(queue));
131 }
132 }
133 println!("finished multi_delay_start");
134}
135
136#[tokio::test]
137async fn insert_in_past_fires_immediately() {
138 println!("running insert_in_past_fires_immediately");
139 time::pause();
140
141 let mut queue = task::spawn(DelayQueue::new());
142 let now = Instant::now();
143
144 sleep(ms(10)).await;
145
146 queue.insert_at("foo", now);
147
148 assert_ready!(poll!(queue));
149 println!("finished insert_in_past_fires_immediately");
150}
151
152#[tokio::test]
153async fn remove_entry() {
154 time::pause();
155
156 let mut queue = task::spawn(DelayQueue::new());
157
158 let key = queue.insert_at("foo", Instant::now() + ms(5));
159
160 assert_pending!(poll!(queue));
161
162 let entry = queue.remove(&key);
163 assert_eq!(entry.into_inner(), "foo");
164
165 sleep(ms(10)).await;
166
167 let entry = assert_ready!(poll!(queue));
168 assert!(entry.is_none());
169}
170
171#[tokio::test]
172async fn reset_entry() {
173 time::pause();
174
175 let mut queue = task::spawn(DelayQueue::new());
176
177 let now = Instant::now();
178 let key = queue.insert_at("foo", now + ms(5));
179
180 assert_pending!(poll!(queue));
181 sleep(ms(1)).await;
182
183 queue.reset_at(&key, now + ms(10));
184
185 assert_pending!(poll!(queue));
186
187 sleep(ms(7)).await;
188
189 assert!(!queue.is_woken());
190
191 assert_pending!(poll!(queue));
192
193 sleep(ms(3)).await;
194
195 assert!(queue.is_woken());
196
197 let entry = assert_ready_some!(poll!(queue));
198 assert_eq!(*entry.get_ref(), "foo");
199
200 let entry = assert_ready!(poll!(queue));
201 assert!(entry.is_none())
202}
203
204// Reproduces tokio-rs/tokio#849.
205#[tokio::test]
206async fn reset_much_later() {
207 time::pause();
208
209 let mut queue = task::spawn(DelayQueue::new());
210
211 let now = Instant::now();
212 sleep(ms(1)).await;
213
214 let key = queue.insert_at("foo", now + ms(200));
215 assert_pending!(poll!(queue));
216
217 sleep(ms(3)).await;
218
219 queue.reset_at(&key, now + ms(10));
220
221 sleep(ms(20)).await;
222
223 assert!(queue.is_woken());
224}
225
226// Reproduces tokio-rs/tokio#849.
227#[tokio::test]
228async fn reset_twice() {
229 time::pause();
230
231 let mut queue = task::spawn(DelayQueue::new());
232 let now = Instant::now();
233
234 sleep(ms(1)).await;
235
236 let key = queue.insert_at("foo", now + ms(200));
237
238 assert_pending!(poll!(queue));
239
240 sleep(ms(3)).await;
241
242 queue.reset_at(&key, now + ms(50));
243
244 sleep(ms(20)).await;
245
246 queue.reset_at(&key, now + ms(40));
247
248 sleep(ms(20)).await;
249
250 assert!(queue.is_woken());
251}
252
253/// Regression test: Given an entry inserted with a deadline in the past, so
254/// that it is placed directly on the expired queue, reset the entry to a
255/// deadline in the future. Validate that this leaves the entry and queue in an
256/// internally consistent state by running an additional reset on the entry
257/// before polling it to completion.
258#[tokio::test]
259async fn repeatedly_reset_entry_inserted_as_expired() {
260 time::pause();
261
262 // Instants before the start of the test seem to break in wasm.
263 time::sleep(ms(1000)).await;
264
265 let mut queue = task::spawn(DelayQueue::new());
266 let now = Instant::now();
267
268 let key = queue.insert_at("foo", now - ms(100));
269
270 queue.reset_at(&key, now + ms(100));
271 queue.reset_at(&key, now + ms(50));
272
273 assert_pending!(poll!(queue));
274
275 time::sleep_until(now + ms(60)).await;
276
277 assert!(queue.is_woken());
278
279 let entry = assert_ready_some!(poll!(queue)).into_inner();
280 assert_eq!(entry, "foo");
281
282 let entry = assert_ready!(poll!(queue));
283 assert!(entry.is_none());
284}
285
286#[tokio::test]
287async fn remove_expired_item() {
288 time::pause();
289
290 let mut queue = DelayQueue::new();
291
292 let now = Instant::now();
293
294 sleep(ms(10)).await;
295
296 let key = queue.insert_at("foo", now);
297
298 let entry = queue.remove(&key);
299 assert_eq!(entry.into_inner(), "foo");
300}
301
302/// Regression test: it should be possible to remove entries which fall in the
303/// 0th slot of the internal timer wheel — that is, entries whose expiration
304/// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
305/// is equal to the wheel's current elapsed time.
306#[tokio::test]
307async fn remove_at_timer_wheel_threshold() {
308 time::pause();
309
310 let mut queue = task::spawn(DelayQueue::new());
311
312 let now = Instant::now();
313
314 let key1 = queue.insert_at("foo", now + ms(64));
315 let key2 = queue.insert_at("bar", now + ms(64));
316
317 sleep(ms(80)).await;
318
319 let entry = assert_ready_some!(poll!(queue)).into_inner();
320
321 match entry {
322 "foo" => {
323 let entry = queue.remove(&key2).into_inner();
324 assert_eq!(entry, "bar");
325 }
326 "bar" => {
327 let entry = queue.remove(&key1).into_inner();
328 assert_eq!(entry, "foo");
329 }
330 other => panic!("other: {:?}", other),
331 }
332}
333
334#[tokio::test]
335async fn expires_before_last_insert() {
336 time::pause();
337
338 let mut queue = task::spawn(DelayQueue::new());
339
340 let now = Instant::now();
341
342 queue.insert_at("foo", now + ms(10_000));
343
344 // Delay should be set to 8.192s here.
345 assert_pending!(poll!(queue));
346
347 // Delay should be set to the delay of the new item here
348 queue.insert_at("bar", now + ms(600));
349
350 assert_pending!(poll!(queue));
351
352 sleep(ms(600)).await;
353
354 assert!(queue.is_woken());
355
356 let entry = assert_ready_some!(poll!(queue)).into_inner();
357 assert_eq!(entry, "bar");
358}
359
360#[tokio::test]
361async fn multi_reset() {
362 time::pause();
363
364 let mut queue = task::spawn(DelayQueue::new());
365
366 let now = Instant::now();
367
368 let one = queue.insert_at("one", now + ms(200));
369 let two = queue.insert_at("two", now + ms(250));
370
371 assert_pending!(poll!(queue));
372
373 queue.reset_at(&one, now + ms(300));
374 queue.reset_at(&two, now + ms(350));
375 queue.reset_at(&one, now + ms(400));
376
377 sleep(ms(310)).await;
378
379 assert_pending!(poll!(queue));
380
381 sleep(ms(50)).await;
382
383 let entry = assert_ready_some!(poll!(queue));
384 assert_eq!(*entry.get_ref(), "two");
385
386 assert_pending!(poll!(queue));
387
388 sleep(ms(50)).await;
389
390 let entry = assert_ready_some!(poll!(queue));
391 assert_eq!(*entry.get_ref(), "one");
392
393 let entry = assert_ready!(poll!(queue));
394 assert!(entry.is_none())
395}
396
397#[tokio::test]
398async fn expire_first_key_when_reset_to_expire_earlier() {
399 time::pause();
400
401 let mut queue = task::spawn(DelayQueue::new());
402
403 let now = Instant::now();
404
405 let one = queue.insert_at("one", now + ms(200));
406 queue.insert_at("two", now + ms(250));
407
408 assert_pending!(poll!(queue));
409
410 queue.reset_at(&one, now + ms(100));
411
412 sleep(ms(100)).await;
413
414 assert!(queue.is_woken());
415
416 let entry = assert_ready_some!(poll!(queue)).into_inner();
417 assert_eq!(entry, "one");
418}
419
420#[tokio::test]
421async fn expire_second_key_when_reset_to_expire_earlier() {
422 time::pause();
423
424 let mut queue = task::spawn(DelayQueue::new());
425
426 let now = Instant::now();
427
428 queue.insert_at("one", now + ms(200));
429 let two = queue.insert_at("two", now + ms(250));
430
431 assert_pending!(poll!(queue));
432
433 queue.reset_at(&two, now + ms(100));
434
435 sleep(ms(100)).await;
436
437 assert!(queue.is_woken());
438
439 let entry = assert_ready_some!(poll!(queue)).into_inner();
440 assert_eq!(entry, "two");
441}
442
443#[tokio::test]
444async fn reset_first_expiring_item_to_expire_later() {
445 time::pause();
446
447 let mut queue = task::spawn(DelayQueue::new());
448
449 let now = Instant::now();
450
451 let one = queue.insert_at("one", now + ms(200));
452 let _two = queue.insert_at("two", now + ms(250));
453
454 assert_pending!(poll!(queue));
455
456 queue.reset_at(&one, now + ms(300));
457 sleep(ms(250)).await;
458
459 assert!(queue.is_woken());
460
461 let entry = assert_ready_some!(poll!(queue)).into_inner();
462 assert_eq!(entry, "two");
463}
464
465#[tokio::test]
466async fn insert_before_first_after_poll() {
467 time::pause();
468
469 let mut queue = task::spawn(DelayQueue::new());
470
471 let now = Instant::now();
472
473 let _one = queue.insert_at("one", now + ms(200));
474
475 assert_pending!(poll!(queue));
476
477 let _two = queue.insert_at("two", now + ms(100));
478
479 sleep(ms(99)).await;
480
481 assert_pending!(poll!(queue));
482
483 sleep(ms(1)).await;
484
485 assert!(queue.is_woken());
486
487 let entry = assert_ready_some!(poll!(queue)).into_inner();
488 assert_eq!(entry, "two");
489}
490
491#[tokio::test]
492async fn insert_after_ready_poll() {
493 time::pause();
494
495 let mut queue = task::spawn(DelayQueue::new());
496
497 let now = Instant::now();
498
499 queue.insert_at("1", now + ms(100));
500 queue.insert_at("2", now + ms(100));
501 queue.insert_at("3", now + ms(100));
502
503 assert_pending!(poll!(queue));
504
505 sleep(ms(100)).await;
506
507 assert!(queue.is_woken());
508
509 let mut res = vec![];
510
511 while res.len() < 3 {
512 let entry = assert_ready_some!(poll!(queue));
513 res.push(entry.into_inner());
514 queue.insert_at("foo", now + ms(500));
515 }
516
517 res.sort_unstable();
518
519 assert_eq!("1", res[0]);
520 assert_eq!("2", res[1]);
521 assert_eq!("3", res[2]);
522}
523
524#[tokio::test]
525async fn reset_later_after_slot_starts() {
526 time::pause();
527
528 let mut queue = task::spawn(DelayQueue::new());
529
530 let now = Instant::now();
531
532 let foo = queue.insert_at("foo", now + ms(100));
533
534 assert_pending!(poll!(queue));
535
536 sleep_until(now + Duration::from_millis(80)).await;
537
538 assert!(!queue.is_woken());
539
540 // At this point the queue hasn't been polled, so `elapsed` on the wheel
541 // for the queue is still at 0 and hence the 1ms resolution slots cover
542 // [0-64). Resetting the time on the entry to 120 causes it to get put in
543 // the [64-128) slot. As the queue knows that the first entry is within
544 // that slot, but doesn't know when, it must wake immediately to advance
545 // the wheel.
546 queue.reset_at(&foo, now + ms(120));
547 assert!(queue.is_woken());
548
549 assert_pending!(poll!(queue));
550
551 sleep_until(now + Duration::from_millis(119)).await;
552 assert!(!queue.is_woken());
553
554 sleep(ms(1)).await;
555 assert!(queue.is_woken());
556
557 let entry = assert_ready_some!(poll!(queue)).into_inner();
558 assert_eq!(entry, "foo");
559}
560
561#[tokio::test]
562async fn reset_inserted_expired() {
563 time::pause();
564
565 // Instants before the start of the test seem to break in wasm.
566 time::sleep(ms(1000)).await;
567
568 let mut queue = task::spawn(DelayQueue::new());
569 let now = Instant::now();
570
571 let key = queue.insert_at("foo", now - ms(100));
572
573 // this causes the panic described in #2473
574 queue.reset_at(&key, now + ms(100));
575
576 assert_eq!(1, queue.len());
577
578 sleep(ms(200)).await;
579
580 let entry = assert_ready_some!(poll!(queue)).into_inner();
581 assert_eq!(entry, "foo");
582
583 assert_eq!(queue.len(), 0);
584}
585
586#[tokio::test]
587async fn reset_earlier_after_slot_starts() {
588 time::pause();
589
590 let mut queue = task::spawn(DelayQueue::new());
591
592 let now = Instant::now();
593
594 let foo = queue.insert_at("foo", now + ms(200));
595
596 assert_pending!(poll!(queue));
597
598 sleep_until(now + Duration::from_millis(80)).await;
599
600 assert!(!queue.is_woken());
601
602 // At this point the queue hasn't been polled, so `elapsed` on the wheel
603 // for the queue is still at 0 and hence the 1ms resolution slots cover
604 // [0-64). Resetting the time on the entry to 120 causes it to get put in
605 // the [64-128) slot. As the queue knows that the first entry is within
606 // that slot, but doesn't know when, it must wake immediately to advance
607 // the wheel.
608 queue.reset_at(&foo, now + ms(120));
609 assert!(queue.is_woken());
610
611 assert_pending!(poll!(queue));
612
613 sleep_until(now + Duration::from_millis(119)).await;
614 assert!(!queue.is_woken());
615
616 sleep(ms(1)).await;
617 assert!(queue.is_woken());
618
619 let entry = assert_ready_some!(poll!(queue)).into_inner();
620 assert_eq!(entry, "foo");
621}
622
623#[tokio::test]
624async fn insert_in_past_after_poll_fires_immediately() {
625 time::pause();
626
627 let mut queue = task::spawn(DelayQueue::new());
628
629 let now = Instant::now();
630
631 queue.insert_at("foo", now + ms(200));
632
633 assert_pending!(poll!(queue));
634
635 sleep(ms(80)).await;
636
637 assert!(!queue.is_woken());
638 queue.insert_at("bar", now + ms(40));
639
640 assert!(queue.is_woken());
641
642 let entry = assert_ready_some!(poll!(queue)).into_inner();
643 assert_eq!(entry, "bar");
644}
645
646#[tokio::test]
647async fn delay_queue_poll_expired_when_empty() {
648 let mut delay_queue = task::spawn(DelayQueue::new());
649 let key = delay_queue.insert(0, std::time::Duration::from_secs(10));
650 assert_pending!(poll!(delay_queue));
651
652 delay_queue.remove(&key);
653 assert!(assert_ready!(poll!(delay_queue)).is_none());
654}
655
656#[tokio::test(start_paused = true)]
657async fn compact_expire_empty() {
658 let mut queue = task::spawn(DelayQueue::new());
659
660 let now = Instant::now();
661
662 queue.insert_at("foo1", now + ms(10));
663 queue.insert_at("foo2", now + ms(10));
664
665 sleep(ms(10)).await;
666
667 let mut res = vec![];
668 while res.len() < 2 {
669 let entry = assert_ready_some!(poll!(queue));
670 res.push(entry.into_inner());
671 }
672
673 queue.compact();
674
675 assert_eq!(queue.len(), 0);
676 assert_eq!(queue.capacity(), 0);
677}
678
679#[tokio::test(start_paused = true)]
680async fn compact_remove_empty() {
681 let mut queue = task::spawn(DelayQueue::new());
682
683 let now = Instant::now();
684
685 let key1 = queue.insert_at("foo1", now + ms(10));
686 let key2 = queue.insert_at("foo2", now + ms(10));
687
688 queue.remove(&key1);
689 queue.remove(&key2);
690
691 queue.compact();
692
693 assert_eq!(queue.len(), 0);
694 assert_eq!(queue.capacity(), 0);
695}
696
697#[tokio::test(start_paused = true)]
698// Trigger a re-mapping of keys in the slab due to a `compact` call and
699// test removal of re-mapped keys
700async fn compact_remove_remapped_keys() {
701 let mut queue = task::spawn(DelayQueue::new());
702
703 let now = Instant::now();
704
705 queue.insert_at("foo1", now + ms(10));
706 queue.insert_at("foo2", now + ms(10));
707
708 // should be assigned indices 3 and 4
709 let key3 = queue.insert_at("foo3", now + ms(20));
710 let key4 = queue.insert_at("foo4", now + ms(20));
711
712 sleep(ms(10)).await;
713
714 let mut res = vec![];
715 while res.len() < 2 {
716 let entry = assert_ready_some!(poll!(queue));
717 res.push(entry.into_inner());
718 }
719
720 // items corresponding to `foo3` and `foo4` will be assigned
721 // new indices here
722 queue.compact();
723
724 queue.insert_at("foo5", now + ms(10));
725
726 // test removal of re-mapped keys
727 let expired3 = queue.remove(&key3);
728 let expired4 = queue.remove(&key4);
729
730 assert_eq!(expired3.into_inner(), "foo3");
731 assert_eq!(expired4.into_inner(), "foo4");
732
733 queue.compact();
734 assert_eq!(queue.len(), 1);
735 assert_eq!(queue.capacity(), 1);
736}
737
738#[tokio::test(start_paused = true)]
739async fn compact_change_deadline() {
740 let mut queue = task::spawn(DelayQueue::new());
741
742 let mut now = Instant::now();
743
744 queue.insert_at("foo1", now + ms(10));
745 queue.insert_at("foo2", now + ms(10));
746
747 // should be assigned indices 3 and 4
748 queue.insert_at("foo3", now + ms(20));
749 let key4 = queue.insert_at("foo4", now + ms(20));
750
751 sleep(ms(10)).await;
752
753 let mut res = vec![];
754 while res.len() < 2 {
755 let entry = assert_ready_some!(poll!(queue));
756 res.push(entry.into_inner());
757 }
758
759 // items corresponding to `foo3` and `foo4` should be assigned
760 // new indices
761 queue.compact();
762
763 now = Instant::now();
764
765 queue.insert_at("foo5", now + ms(10));
766 let key6 = queue.insert_at("foo6", now + ms(10));
767
768 queue.reset_at(&key4, now + ms(20));
769 queue.reset_at(&key6, now + ms(20));
770
771 // foo3 and foo5 will expire
772 sleep(ms(10)).await;
773
774 while res.len() < 4 {
775 let entry = assert_ready_some!(poll!(queue));
776 res.push(entry.into_inner());
777 }
778
779 sleep(ms(10)).await;
780
781 while res.len() < 6 {
782 let entry = assert_ready_some!(poll!(queue));
783 res.push(entry.into_inner());
784 }
785
786 let entry = assert_ready!(poll!(queue));
787 assert!(entry.is_none());
788}
789
790#[tokio::test(start_paused = true)]
791async fn item_expiry_greater_than_wheel() {
792 // This function tests that a delay queue that has existed for at least 2^36 milliseconds won't panic when a new item is inserted.
793 let mut queue = DelayQueue::new();
794 for _ in 0..2 {
795 tokio::time::advance(Duration::from_millis(1 << 35)).await;
796 queue.insert(0, Duration::from_millis(0));
797 queue.next().await;
798 }
799 // This should not panic
800 let no_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
801 queue.insert(1, Duration::from_millis(1));
802 }));
803 assert!(no_panic.is_ok());
804}
805
806#[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
807#[tokio::test(start_paused = true)]
808async fn remove_after_compact() {
809 let now = Instant::now();
810 let mut queue = DelayQueue::new();
811
812 let foo_key = queue.insert_at("foo", now + ms(10));
813 queue.insert_at("bar", now + ms(20));
814 queue.remove(&foo_key);
815 queue.compact();
816
817 let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
818 queue.remove(&foo_key);
819 }));
820 assert!(panic.is_err());
821}
822
823#[cfg_attr(target_os = "wasi", ignore = "FIXME: Does not seem to work with WASI")]
824#[tokio::test(start_paused = true)]
825async fn remove_after_compact_poll() {
826 let now = Instant::now();
827 let mut queue = task::spawn(DelayQueue::new());
828
829 let foo_key = queue.insert_at("foo", now + ms(10));
830 queue.insert_at("bar", now + ms(20));
831
832 sleep(ms(10)).await;
833 assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);
834
835 queue.compact();
836
837 let panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
838 queue.remove(&foo_key);
839 }));
840 assert!(panic.is_err());
841}
842
843#[tokio::test(start_paused = true)]
844async fn peek() {
845 let mut queue = task::spawn(DelayQueue::new());
846
847 let now = Instant::now();
848
849 let key = queue.insert_at("foo", now + ms(5));
850 let key2 = queue.insert_at("bar", now);
851 let key3 = queue.insert_at("baz", now + ms(10));
852
853 assert_eq!(queue.peek(), Some(key2));
854
855 sleep(ms(6)).await;
856
857 assert_eq!(queue.peek(), Some(key2));
858
859 let entry = assert_ready_some!(poll!(queue));
860 assert_eq!(entry.get_ref(), &"bar");
861
862 assert_eq!(queue.peek(), Some(key));
863
864 let entry = assert_ready_some!(poll!(queue));
865 assert_eq!(entry.get_ref(), &"foo");
866
867 assert_eq!(queue.peek(), Some(key3));
868
869 assert_pending!(poll!(queue));
870
871 sleep(ms(5)).await;
872
873 assert_eq!(queue.peek(), Some(key3));
874
875 let entry = assert_ready_some!(poll!(queue));
876 assert_eq!(entry.get_ref(), &"baz");
877
878 assert!(queue.peek().is_none());
879}
880
881fn ms(n: u64) -> Duration {
882 Duration::from_millis(n)
883}
884