1 | //! Timer event source |
2 | //! |
3 | //! The [`Timer`] is an event source that will fire its event after a certain amount of time |
4 | //! specified at creation. Its timing is tracked directly by the event loop core logic, and it does |
5 | //! not consume any system resource. |
6 | //! |
7 | //! As of calloop v0.11.0, the event loop always uses high-precision timers. However, the timer |
8 | //! precision varies between operating systems; for instance, the scheduler granularity on Windows |
9 | //! is about 16 milliseconds. If you need to rely on good precision timers in general, you may need |
10 | //! to enable realtime features of your OS to ensure your thread is quickly woken up by the system |
11 | //! scheduler. |
12 | //! |
13 | //! The provided event is an [`Instant`] representing the deadline for which this timer has fired |
14 | //! (which can be earlier than the current time depending on the event loop congestion). |
15 | //! |
16 | //! The callback associated with this event source is expected to return a [`TimeoutAction`], which |
17 | //! can be used to implement self-repeating timers by telling calloop to reprogram the same timer |
18 | //! for a later timeout after it has fired. |
19 | |
20 | /* |
21 | * This module provides two main types: |
22 | * |
23 | * - `Timer` is the user-facing type that represents a timer event source |
24 | * - `TimerWheel` is an internal data structure for tracking registered timeouts, it is used by |
25 | * the polling logic in sys/mod.rs |
26 | */ |
27 | |
28 | use std::{ |
29 | cell::RefCell, |
30 | collections::BinaryHeap, |
31 | rc::Rc, |
32 | task::Waker, |
33 | time::{Duration, Instant}, |
34 | }; |
35 | |
36 | use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory}; |
37 | |
38 | #[derive (Debug)] |
39 | struct Registration { |
40 | token: Token, |
41 | wheel: Rc<RefCell<TimerWheel>>, |
42 | counter: u32, |
43 | } |
44 | |
45 | /// A timer event source |
46 | /// |
47 | /// When registered to the event loop, it will trigger an event once its deadline is reached. |
48 | /// If the deadline is in the past relative to the moment of its insertion in the event loop, |
49 | /// the `TImer` will trigger an event as soon as the event loop is dispatched. |
50 | #[derive (Debug)] |
51 | pub struct Timer { |
52 | registration: Option<Registration>, |
53 | deadline: Option<Instant>, |
54 | } |
55 | |
56 | impl Timer { |
57 | /// Create a timer that will fire immediately when inserted in the event loop |
58 | pub fn immediate() -> Timer { |
59 | Self::from_deadline(Instant::now()) |
60 | } |
61 | |
62 | /// Create a timer that will fire after a given duration from now |
63 | pub fn from_duration(duration: Duration) -> Timer { |
64 | Self::from_deadline_inner(Instant::now().checked_add(duration)) |
65 | } |
66 | |
67 | /// Create a timer that will fire at a given instant |
68 | pub fn from_deadline(deadline: Instant) -> Timer { |
69 | Self::from_deadline_inner(Some(deadline)) |
70 | } |
71 | |
72 | fn from_deadline_inner(deadline: Option<Instant>) -> Timer { |
73 | Timer { |
74 | registration: None, |
75 | deadline, |
76 | } |
77 | } |
78 | |
79 | /// Changes the deadline of this timer to an [`Instant`] |
80 | /// |
81 | /// If the `Timer` is currently registered in the event loop, it needs to be |
82 | /// re-registered for this change to take effect. |
83 | pub fn set_deadline(&mut self, deadline: Instant) { |
84 | self.deadline = Some(deadline); |
85 | } |
86 | |
87 | /// Changes the deadline of this timer to a [`Duration`] from now |
88 | /// |
89 | /// If the `Timer` is currently registered in the event loop, it needs to be |
90 | /// re-registered for this change to take effect. |
91 | pub fn set_duration(&mut self, duration: Duration) { |
92 | self.deadline = Instant::now().checked_add(duration); |
93 | } |
94 | |
95 | /// Get the current deadline of this `Timer` |
96 | /// |
97 | /// Returns `None` if the timer has overflowed. |
98 | pub fn current_deadline(&self) -> Option<Instant> { |
99 | self.deadline |
100 | } |
101 | } |
102 | |
103 | impl EventSource for Timer { |
104 | type Event = Instant; |
105 | type Metadata = (); |
106 | type Ret = TimeoutAction; |
107 | type Error = std::io::Error; |
108 | |
109 | fn process_events<F>( |
110 | &mut self, |
111 | _: Readiness, |
112 | token: Token, |
113 | mut callback: F, |
114 | ) -> Result<PostAction, Self::Error> |
115 | where |
116 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
117 | { |
118 | if let (Some(ref registration), Some(ref deadline)) = (&self.registration, &self.deadline) { |
119 | if registration.token != token { |
120 | return Ok(PostAction::Continue); |
121 | } |
122 | let new_deadline = match callback(*deadline, &mut ()) { |
123 | TimeoutAction::Drop => return Ok(PostAction::Remove), |
124 | TimeoutAction::ToInstant(instant) => instant, |
125 | TimeoutAction::ToDuration(duration) => match Instant::now().checked_add(duration) { |
126 | Some(new_deadline) => new_deadline, |
127 | None => { |
128 | // The timer has overflowed, meaning we have no choice but to drop it. |
129 | self.deadline = None; |
130 | return Ok(PostAction::Remove); |
131 | } |
132 | }, |
133 | }; |
134 | // If we received an event, we MUST have a valid counter value |
135 | registration.wheel.borrow_mut().insert_reuse( |
136 | registration.counter, |
137 | new_deadline, |
138 | registration.token, |
139 | ); |
140 | self.deadline = Some(new_deadline); |
141 | } |
142 | Ok(PostAction::Continue) |
143 | } |
144 | |
145 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
146 | // Only register a deadline if we haven't overflowed. |
147 | if let Some(deadline) = self.deadline { |
148 | let wheel = poll.timers.clone(); |
149 | let token = token_factory.token(); |
150 | let counter = wheel.borrow_mut().insert(deadline, token); |
151 | self.registration = Some(Registration { |
152 | token, |
153 | wheel, |
154 | counter, |
155 | }); |
156 | } |
157 | |
158 | Ok(()) |
159 | } |
160 | |
161 | fn reregister( |
162 | &mut self, |
163 | poll: &mut Poll, |
164 | token_factory: &mut TokenFactory, |
165 | ) -> crate::Result<()> { |
166 | self.unregister(poll)?; |
167 | self.register(poll, token_factory) |
168 | } |
169 | |
170 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
171 | if let Some(registration) = self.registration.take() { |
172 | poll.timers.borrow_mut().cancel(registration.counter); |
173 | } |
174 | Ok(()) |
175 | } |
176 | } |
177 | |
178 | /// Action to reschedule a timeout if necessary |
179 | #[derive (Debug)] |
180 | pub enum TimeoutAction { |
181 | /// Don't reschedule this timer |
182 | Drop, |
183 | /// Reschedule this timer to a given [`Instant`] |
184 | ToInstant(Instant), |
185 | /// Reschedule this timer to a given [`Duration`] in the future |
186 | ToDuration(Duration), |
187 | } |
188 | |
189 | // Internal representation of a timeout registered in the TimerWheel |
190 | #[derive (Debug)] |
191 | struct TimeoutData { |
192 | deadline: Instant, |
193 | token: RefCell<Option<Token>>, |
194 | counter: u32, |
195 | } |
196 | |
197 | // A data structure for tracking registered timeouts |
198 | #[derive (Debug)] |
199 | pub(crate) struct TimerWheel { |
200 | heap: BinaryHeap<TimeoutData>, |
201 | counter: u32, |
202 | } |
203 | |
204 | impl TimerWheel { |
205 | pub(crate) fn new() -> TimerWheel { |
206 | TimerWheel { |
207 | heap: BinaryHeap::new(), |
208 | counter: 0, |
209 | } |
210 | } |
211 | |
212 | pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 { |
213 | self.heap.push(TimeoutData { |
214 | deadline, |
215 | token: RefCell::new(Some(token)), |
216 | counter: self.counter, |
217 | }); |
218 | let ret = self.counter; |
219 | self.counter += 1; |
220 | ret |
221 | } |
222 | |
223 | pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) { |
224 | self.heap.push(TimeoutData { |
225 | deadline, |
226 | token: RefCell::new(Some(token)), |
227 | counter, |
228 | }); |
229 | } |
230 | |
231 | pub(crate) fn cancel(&mut self, counter: u32) { |
232 | if self |
233 | .heap |
234 | .peek() |
235 | .map(|data| data.counter == counter) |
236 | .unwrap_or(false) |
237 | { |
238 | self.heap.pop(); |
239 | return; |
240 | }; |
241 | |
242 | self.heap |
243 | .iter() |
244 | .rev() |
245 | .find(|data| data.counter == counter) |
246 | .map(|data| data.token.take()); |
247 | } |
248 | |
249 | pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> { |
250 | loop { |
251 | // check if there is an expired item |
252 | if let Some(data) = self.heap.peek() { |
253 | if data.deadline > now { |
254 | return None; |
255 | } |
256 | // there is an expired timeout, continue the |
257 | // loop body |
258 | } else { |
259 | return None; |
260 | } |
261 | |
262 | // There is an item in the heap, this unwrap cannot blow |
263 | let data = self.heap.pop().unwrap(); |
264 | if let Some(token) = data.token.into_inner() { |
265 | return Some((data.counter, token)); |
266 | } |
267 | // otherwise this timeout was cancelled, continue looping |
268 | } |
269 | } |
270 | |
271 | pub(crate) fn next_deadline(&self) -> Option<std::time::Instant> { |
272 | self.heap.peek().map(|data| data.deadline) |
273 | } |
274 | } |
275 | |
276 | // trait implementations for TimeoutData |
277 | |
278 | impl std::cmp::Ord for TimeoutData { |
279 | #[inline ] |
280 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { |
281 | // earlier values have priority |
282 | self.deadline.cmp(&other.deadline).reverse() |
283 | } |
284 | } |
285 | |
286 | impl std::cmp::PartialOrd for TimeoutData { |
287 | #[inline ] |
288 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { |
289 | Some(self.cmp(other)) |
290 | } |
291 | } |
292 | |
293 | // This impl is required for PartialOrd but actually never used |
294 | // and the type is private, so ignore its coverage |
295 | impl std::cmp::PartialEq for TimeoutData { |
296 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
297 | #[inline ] |
298 | fn eq(&self, other: &Self) -> bool { |
299 | self.deadline == other.deadline |
300 | } |
301 | } |
302 | |
303 | impl std::cmp::Eq for TimeoutData {} |
304 | |
305 | // Logic for timer futures |
306 | |
307 | /// A future that resolves once a certain timeout is expired |
308 | pub struct TimeoutFuture { |
309 | deadline: Option<Instant>, |
310 | waker: Rc<RefCell<Option<Waker>>>, |
311 | } |
312 | |
313 | impl std::fmt::Debug for TimeoutFuture { |
314 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
315 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
316 | f&mut DebugStruct<'_, '_>.debug_struct("TimeoutFuture" ) |
317 | .field(name:"deadline" , &self.deadline) |
318 | .finish_non_exhaustive() |
319 | } |
320 | } |
321 | |
322 | impl TimeoutFuture { |
323 | /// Create a future that resolves after a given duration |
324 | pub fn from_duration<Data>(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture { |
325 | Self::from_deadline_inner(handle, Instant::now().checked_add(duration)) |
326 | } |
327 | |
328 | /// Create a future that resolves at a given instant |
329 | pub fn from_deadline<Data>(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture { |
330 | Self::from_deadline_inner(handle, Some(deadline)) |
331 | } |
332 | |
333 | /// Create a future that resolves at a given instant |
334 | fn from_deadline_inner<Data>( |
335 | handle: &LoopHandle<'_, Data>, |
336 | deadline: Option<Instant>, |
337 | ) -> TimeoutFuture { |
338 | let timer = Timer::from_deadline_inner(deadline); |
339 | let waker = Rc::new(RefCell::new(None::<Waker>)); |
340 | handle |
341 | .insert_source(timer, { |
342 | let waker = waker.clone(); |
343 | move |_, &mut (), _| { |
344 | if let Some(waker) = waker.borrow_mut().clone() { |
345 | waker.wake() |
346 | } |
347 | TimeoutAction::Drop |
348 | } |
349 | }) |
350 | .unwrap(); |
351 | |
352 | TimeoutFuture { deadline, waker } |
353 | } |
354 | } |
355 | |
356 | impl std::future::Future for TimeoutFuture { |
357 | type Output = (); |
358 | |
359 | fn poll( |
360 | self: std::pin::Pin<&mut Self>, |
361 | cx: &mut std::task::Context<'_>, |
362 | ) -> std::task::Poll<Self::Output> { |
363 | match self.deadline { |
364 | None => return std::task::Poll::Pending, |
365 | |
366 | Some(deadline: Instant) => { |
367 | if Instant::now() >= deadline { |
368 | return std::task::Poll::Ready(()); |
369 | } |
370 | } |
371 | } |
372 | |
373 | *self.waker.borrow_mut() = Some(cx.waker().clone()); |
374 | std::task::Poll::Pending |
375 | } |
376 | } |
377 | |
378 | #[cfg (test)] |
379 | mod tests { |
380 | use super::*; |
381 | use crate::*; |
382 | use std::time::Duration; |
383 | |
384 | #[test ] |
385 | fn simple_timer() { |
386 | let mut event_loop = EventLoop::try_new().unwrap(); |
387 | |
388 | let mut dispatched = false; |
389 | |
390 | event_loop |
391 | .handle() |
392 | .insert_source( |
393 | Timer::from_duration(Duration::from_millis(100)), |
394 | |_, &mut (), dispatched| { |
395 | *dispatched = true; |
396 | TimeoutAction::Drop |
397 | }, |
398 | ) |
399 | .unwrap(); |
400 | |
401 | event_loop |
402 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
403 | .unwrap(); |
404 | // not yet dispatched |
405 | assert!(!dispatched); |
406 | |
407 | event_loop |
408 | .dispatch(Some(Duration::from_millis(150)), &mut dispatched) |
409 | .unwrap(); |
410 | // now dispatched |
411 | assert!(dispatched); |
412 | } |
413 | |
414 | #[test ] |
415 | fn simple_timer_instant() { |
416 | let mut event_loop = EventLoop::try_new().unwrap(); |
417 | |
418 | let mut dispatched = false; |
419 | |
420 | event_loop |
421 | .handle() |
422 | .insert_source( |
423 | Timer::from_duration(Duration::from_millis(100)), |
424 | |_, &mut (), dispatched| { |
425 | *dispatched = true; |
426 | TimeoutAction::Drop |
427 | }, |
428 | ) |
429 | .unwrap(); |
430 | |
431 | event_loop |
432 | .dispatch(Some(Duration::from_millis(150)), &mut dispatched) |
433 | .unwrap(); |
434 | // now dispatched |
435 | assert!(dispatched); |
436 | } |
437 | |
438 | #[test ] |
439 | fn immediate_timer() { |
440 | let mut event_loop = EventLoop::try_new().unwrap(); |
441 | |
442 | let mut dispatched = false; |
443 | |
444 | event_loop |
445 | .handle() |
446 | .insert_source(Timer::immediate(), |_, &mut (), dispatched| { |
447 | *dispatched = true; |
448 | TimeoutAction::Drop |
449 | }) |
450 | .unwrap(); |
451 | |
452 | event_loop |
453 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
454 | .unwrap(); |
455 | // now dispatched |
456 | assert!(dispatched); |
457 | } |
458 | |
459 | // We cannot actually test high precision timers, as they are only high precision in release mode |
460 | // This test is here to ensure that the high-precision codepath are executed and work as intended |
461 | // even if we cannot test if they are actually high precision |
462 | #[test ] |
463 | fn high_precision_timer() { |
464 | let mut event_loop = EventLoop::try_new().unwrap(); |
465 | |
466 | let mut dispatched = false; |
467 | |
468 | event_loop |
469 | .handle() |
470 | .insert_source( |
471 | Timer::from_duration(Duration::from_millis(100)), |
472 | |_, &mut (), dispatched| { |
473 | *dispatched = true; |
474 | TimeoutAction::Drop |
475 | }, |
476 | ) |
477 | .unwrap(); |
478 | |
479 | event_loop |
480 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
481 | .unwrap(); |
482 | // not yet dispatched |
483 | assert!(!dispatched); |
484 | |
485 | event_loop |
486 | .dispatch(Some(Duration::from_micros(10200)), &mut dispatched) |
487 | .unwrap(); |
488 | // yet not dispatched |
489 | assert!(!dispatched); |
490 | |
491 | event_loop |
492 | .dispatch(Some(Duration::from_millis(100)), &mut dispatched) |
493 | .unwrap(); |
494 | // now dispatched |
495 | assert!(dispatched); |
496 | } |
497 | |
498 | #[test ] |
499 | fn cancel_timer() { |
500 | let mut event_loop = EventLoop::try_new().unwrap(); |
501 | |
502 | let mut dispatched = false; |
503 | |
504 | let token = event_loop |
505 | .handle() |
506 | .insert_source( |
507 | Timer::from_duration(Duration::from_millis(100)), |
508 | |_, &mut (), dispatched| { |
509 | *dispatched = true; |
510 | TimeoutAction::Drop |
511 | }, |
512 | ) |
513 | .unwrap(); |
514 | |
515 | event_loop |
516 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
517 | .unwrap(); |
518 | // not yet dispatched |
519 | assert!(!dispatched); |
520 | |
521 | event_loop.handle().remove(token); |
522 | |
523 | event_loop |
524 | .dispatch(Some(Duration::from_millis(150)), &mut dispatched) |
525 | .unwrap(); |
526 | // still not dispatched |
527 | assert!(!dispatched); |
528 | } |
529 | |
530 | #[test ] |
531 | fn repeating_timer() { |
532 | let mut event_loop = EventLoop::try_new().unwrap(); |
533 | |
534 | let mut dispatched = 0; |
535 | |
536 | event_loop |
537 | .handle() |
538 | .insert_source( |
539 | Timer::from_duration(Duration::from_millis(500)), |
540 | |_, &mut (), dispatched| { |
541 | *dispatched += 1; |
542 | TimeoutAction::ToDuration(Duration::from_millis(500)) |
543 | }, |
544 | ) |
545 | .unwrap(); |
546 | |
547 | event_loop |
548 | .dispatch(Some(Duration::from_millis(250)), &mut dispatched) |
549 | .unwrap(); |
550 | assert_eq!(dispatched, 0); |
551 | |
552 | event_loop |
553 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
554 | .unwrap(); |
555 | assert_eq!(dispatched, 1); |
556 | |
557 | event_loop |
558 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
559 | .unwrap(); |
560 | assert_eq!(dispatched, 2); |
561 | |
562 | event_loop |
563 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
564 | .unwrap(); |
565 | assert_eq!(dispatched, 3); |
566 | } |
567 | |
568 | #[cfg (feature = "executor" )] |
569 | #[test ] |
570 | fn timeout_future() { |
571 | let mut event_loop = EventLoop::try_new().unwrap(); |
572 | |
573 | let mut dispatched = 0; |
574 | |
575 | let timeout_1 = |
576 | TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500)); |
577 | let timeout_2 = |
578 | TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500)); |
579 | // This one should never go off. |
580 | let timeout_3 = TimeoutFuture::from_duration(&event_loop.handle(), Duration::MAX); |
581 | |
582 | let (exec, sched) = crate::sources::futures::executor().unwrap(); |
583 | event_loop |
584 | .handle() |
585 | .insert_source(exec, move |(), &mut (), got| { |
586 | *got += 1; |
587 | }) |
588 | .unwrap(); |
589 | |
590 | sched.schedule(timeout_1).unwrap(); |
591 | sched.schedule(timeout_2).unwrap(); |
592 | sched.schedule(timeout_3).unwrap(); |
593 | |
594 | // We do a 0-timeout dispatch after every regular dispatch to let the timeout triggers |
595 | // flow back to the executor |
596 | |
597 | event_loop |
598 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
599 | .unwrap(); |
600 | event_loop |
601 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
602 | .unwrap(); |
603 | assert_eq!(dispatched, 0); |
604 | |
605 | event_loop |
606 | .dispatch(Some(Duration::from_millis(1000)), &mut dispatched) |
607 | .unwrap(); |
608 | event_loop |
609 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
610 | .unwrap(); |
611 | assert_eq!(dispatched, 1); |
612 | |
613 | event_loop |
614 | .dispatch(Some(Duration::from_millis(1100)), &mut dispatched) |
615 | .unwrap(); |
616 | event_loop |
617 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
618 | .unwrap(); |
619 | assert_eq!(dispatched, 2); |
620 | } |
621 | |
622 | #[test ] |
623 | fn no_overflow() { |
624 | let mut event_loop = EventLoop::try_new().unwrap(); |
625 | |
626 | let mut dispatched = 0; |
627 | |
628 | event_loop |
629 | .handle() |
630 | .insert_source( |
631 | Timer::from_duration(Duration::from_millis(500)), |
632 | |_, &mut (), dispatched| { |
633 | *dispatched += 1; |
634 | TimeoutAction::Drop |
635 | }, |
636 | ) |
637 | .unwrap(); |
638 | |
639 | event_loop |
640 | .handle() |
641 | .insert_source(Timer::from_duration(Duration::MAX), |_, &mut (), _| { |
642 | panic!("This timer should never go off" ) |
643 | }) |
644 | .unwrap(); |
645 | |
646 | event_loop |
647 | .dispatch(Some(Duration::from_millis(250)), &mut dispatched) |
648 | .unwrap(); |
649 | assert_eq!(dispatched, 0); |
650 | |
651 | event_loop |
652 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
653 | .unwrap(); |
654 | assert_eq!(dispatched, 1); |
655 | |
656 | event_loop |
657 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
658 | .unwrap(); |
659 | assert_eq!(dispatched, 1); |
660 | } |
661 | } |
662 | |