1 | //! Timer event source |
2 | //! |
3 | //! The [`Timer`] is an event source that will fire its event after a certain amount of time |
4 | //! specified at creation. Its timing is tracked directly by the event loop core logic, and it does |
5 | //! not consume any system resource. |
6 | //! |
7 | //! As of calloop v0.11.0, the event loop always uses high-precision timers. However, the timer |
8 | //! precision varies between operating systems; for instance, the scheduler granularity on Windows |
9 | //! is about 16 milliseconds. If you need to rely on good precision timers in general, you may need |
10 | //! to enable realtime features of your OS to ensure your thread is quickly woken up by the system |
11 | //! scheduler. |
12 | //! |
13 | //! The provided event is an [`Instant`] representing the deadline for which this timer has fired |
14 | //! (which can be earlier than the current time depending on the event loop congestion). |
15 | //! |
16 | //! The callback associated with this event source is expected to return a [`TimeoutAction`], which |
17 | //! can be used to implement self-repeating timers by telling calloop to reprogram the same timer |
18 | //! for a later timeout after it has fired. |
19 | |
20 | /* |
21 | * This module provides two main types: |
22 | * |
23 | * - `Timer` is the user-facing type that represents a timer event source |
24 | * - `TimerWheel` is an internal data structure for tracking registered timeouts, it is used by |
25 | * the polling logic in sys/mod.rs |
26 | */ |
27 | |
28 | use std::{ |
29 | cell::RefCell, |
30 | collections::BinaryHeap, |
31 | rc::Rc, |
32 | task::Waker, |
33 | time::{Duration, Instant}, |
34 | }; |
35 | |
36 | use crate::{EventSource, LoopHandle, Poll, PostAction, Readiness, Token, TokenFactory}; |
37 | |
38 | #[derive (Debug)] |
39 | struct Registration { |
40 | token: Token, |
41 | wheel: Rc<RefCell<TimerWheel>>, |
42 | counter: u32, |
43 | } |
44 | |
45 | /// A timer event source |
46 | /// |
47 | /// When registered to the event loop, it will trigger an event once its deadline is reached. |
48 | /// If the deadline is in the past relative to the moment of its insertion in the event loop, |
49 | /// the `TImer` will trigger an event as soon as the event loop is dispatched. |
50 | #[derive (Debug)] |
51 | pub struct Timer { |
52 | registration: Option<Registration>, |
53 | deadline: Option<Instant>, |
54 | } |
55 | |
56 | impl Timer { |
57 | /// Create a timer that will fire immediately when inserted in the event loop |
58 | pub fn immediate() -> Timer { |
59 | Self::from_deadline(Instant::now()) |
60 | } |
61 | |
62 | /// Create a timer that will fire after a given duration from now |
63 | pub fn from_duration(duration: Duration) -> Timer { |
64 | Self::from_deadline_inner(Instant::now().checked_add(duration)) |
65 | } |
66 | |
67 | /// Create a timer that will fire at a given instant |
68 | pub fn from_deadline(deadline: Instant) -> Timer { |
69 | Self::from_deadline_inner(Some(deadline)) |
70 | } |
71 | |
72 | fn from_deadline_inner(deadline: Option<Instant>) -> Timer { |
73 | Timer { |
74 | registration: None, |
75 | deadline, |
76 | } |
77 | } |
78 | |
79 | /// Changes the deadline of this timer to an [`Instant`] |
80 | /// |
81 | /// If the `Timer` is currently registered in the event loop, it needs to be |
82 | /// re-registered for this change to take effect. |
83 | pub fn set_deadline(&mut self, deadline: Instant) { |
84 | self.deadline = Some(deadline); |
85 | } |
86 | |
87 | /// Changes the deadline of this timer to a [`Duration`] from now |
88 | /// |
89 | /// If the `Timer` is currently registered in the event loop, it needs to be |
90 | /// re-registered for this change to take effect. |
91 | pub fn set_duration(&mut self, duration: Duration) { |
92 | self.deadline = Instant::now().checked_add(duration); |
93 | } |
94 | |
95 | /// Get the current deadline of this `Timer` |
96 | /// |
97 | /// Returns `None` if the timer has overflowed. |
98 | pub fn current_deadline(&self) -> Option<Instant> { |
99 | self.deadline |
100 | } |
101 | } |
102 | |
103 | impl EventSource for Timer { |
104 | type Event = Instant; |
105 | type Metadata = (); |
106 | type Ret = TimeoutAction; |
107 | type Error = std::io::Error; |
108 | |
109 | fn process_events<F>( |
110 | &mut self, |
111 | _: Readiness, |
112 | token: Token, |
113 | mut callback: F, |
114 | ) -> Result<PostAction, Self::Error> |
115 | where |
116 | F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
117 | { |
118 | if let (Some(ref registration), Some(ref deadline)) = (&self.registration, &self.deadline) { |
119 | if registration.token != token { |
120 | return Ok(PostAction::Continue); |
121 | } |
122 | let new_deadline = match callback(*deadline, &mut ()) { |
123 | TimeoutAction::Drop => return Ok(PostAction::Remove), |
124 | TimeoutAction::ToInstant(instant) => instant, |
125 | TimeoutAction::ToDuration(duration) => match Instant::now().checked_add(duration) { |
126 | Some(new_deadline) => new_deadline, |
127 | None => { |
128 | // The timer has overflowed, meaning we have no choice but to drop it. |
129 | self.deadline = None; |
130 | return Ok(PostAction::Remove); |
131 | } |
132 | }, |
133 | }; |
134 | // If we received an event, we MUST have a valid counter value |
135 | registration.wheel.borrow_mut().insert_reuse( |
136 | registration.counter, |
137 | new_deadline, |
138 | registration.token, |
139 | ); |
140 | self.deadline = Some(new_deadline); |
141 | } |
142 | Ok(PostAction::Continue) |
143 | } |
144 | |
145 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
146 | // Only register a deadline if we haven't overflowed. |
147 | if let Some(deadline) = self.deadline { |
148 | let wheel = poll.timers.clone(); |
149 | let token = token_factory.token(); |
150 | let counter = wheel.borrow_mut().insert(deadline, token); |
151 | self.registration = Some(Registration { |
152 | token, |
153 | wheel, |
154 | counter, |
155 | }); |
156 | } |
157 | |
158 | Ok(()) |
159 | } |
160 | |
161 | fn reregister( |
162 | &mut self, |
163 | poll: &mut Poll, |
164 | token_factory: &mut TokenFactory, |
165 | ) -> crate::Result<()> { |
166 | self.unregister(poll)?; |
167 | self.register(poll, token_factory) |
168 | } |
169 | |
170 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
171 | if let Some(registration) = self.registration.take() { |
172 | poll.timers.borrow_mut().cancel(registration.counter); |
173 | } |
174 | Ok(()) |
175 | } |
176 | } |
177 | |
178 | /// Action to reschedule a timeout if necessary |
179 | #[derive (Debug)] |
180 | pub enum TimeoutAction { |
181 | /// Don't reschedule this timer |
182 | Drop, |
183 | /// Reschedule this timer to a given [`Instant`] |
184 | ToInstant(Instant), |
185 | /// Reschedule this timer to a given [`Duration`] in the future |
186 | ToDuration(Duration), |
187 | } |
188 | |
189 | // Internal representation of a timeout registered in the TimerWheel |
190 | #[derive (Debug)] |
191 | struct TimeoutData { |
192 | deadline: Instant, |
193 | token: RefCell<Option<Token>>, |
194 | counter: u32, |
195 | } |
196 | |
197 | // A data structure for tracking registered timeouts |
198 | #[derive (Debug)] |
199 | pub(crate) struct TimerWheel { |
200 | heap: BinaryHeap<TimeoutData>, |
201 | counter: u32, |
202 | } |
203 | |
204 | impl TimerWheel { |
205 | pub(crate) fn new() -> TimerWheel { |
206 | TimerWheel { |
207 | heap: BinaryHeap::new(), |
208 | counter: 0, |
209 | } |
210 | } |
211 | |
212 | pub(crate) fn insert(&mut self, deadline: Instant, token: Token) -> u32 { |
213 | self.heap.push(TimeoutData { |
214 | deadline, |
215 | token: RefCell::new(Some(token)), |
216 | counter: self.counter, |
217 | }); |
218 | let ret = self.counter; |
219 | self.counter += 1; |
220 | ret |
221 | } |
222 | |
223 | pub(crate) fn insert_reuse(&mut self, counter: u32, deadline: Instant, token: Token) { |
224 | self.heap.push(TimeoutData { |
225 | deadline, |
226 | token: RefCell::new(Some(token)), |
227 | counter, |
228 | }); |
229 | } |
230 | |
231 | pub(crate) fn cancel(&mut self, counter: u32) { |
232 | self.heap |
233 | .iter() |
234 | .find(|data| data.counter == counter) |
235 | .map(|data| data.token.take()); |
236 | } |
237 | |
238 | pub(crate) fn next_expired(&mut self, now: Instant) -> Option<(u32, Token)> { |
239 | loop { |
240 | // check if there is an expired item |
241 | if let Some(data) = self.heap.peek() { |
242 | if data.deadline > now { |
243 | return None; |
244 | } |
245 | // there is an expired timeout, continue the |
246 | // loop body |
247 | } else { |
248 | return None; |
249 | } |
250 | |
251 | // There is an item in the heap, this unwrap cannot blow |
252 | let data = self.heap.pop().unwrap(); |
253 | if let Some(token) = data.token.into_inner() { |
254 | return Some((data.counter, token)); |
255 | } |
256 | // otherwise this timeout was cancelled, continue looping |
257 | } |
258 | } |
259 | |
260 | pub(crate) fn next_deadline(&self) -> Option<std::time::Instant> { |
261 | self.heap.peek().map(|data| data.deadline) |
262 | } |
263 | } |
264 | |
265 | // trait implementations for TimeoutData |
266 | |
267 | impl std::cmp::Ord for TimeoutData { |
268 | fn cmp(&self, other: &Self) -> std::cmp::Ordering { |
269 | // earlier values have priority |
270 | self.deadline.cmp(&other.deadline).reverse() |
271 | } |
272 | } |
273 | |
274 | impl std::cmp::PartialOrd for TimeoutData { |
275 | fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { |
276 | Some(self.cmp(other)) |
277 | } |
278 | } |
279 | |
280 | // This impl is required for PartialOrd but actually never used |
281 | // and the type is private, so ignore its coverage |
282 | impl std::cmp::PartialEq for TimeoutData { |
283 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
284 | fn eq(&self, other: &Self) -> bool { |
285 | self.deadline == other.deadline |
286 | } |
287 | } |
288 | |
289 | impl std::cmp::Eq for TimeoutData {} |
290 | |
291 | // Logic for timer futures |
292 | |
293 | /// A future that resolves once a certain timeout is expired |
294 | pub struct TimeoutFuture { |
295 | deadline: Option<Instant>, |
296 | waker: Rc<RefCell<Option<Waker>>>, |
297 | } |
298 | |
299 | impl std::fmt::Debug for TimeoutFuture { |
300 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
301 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
302 | f&mut DebugStruct<'_, '_>.debug_struct("TimeoutFuture" ) |
303 | .field(name:"deadline" , &self.deadline) |
304 | .finish_non_exhaustive() |
305 | } |
306 | } |
307 | |
308 | impl TimeoutFuture { |
309 | /// Create a future that resolves after a given duration |
310 | pub fn from_duration<Data>(handle: &LoopHandle<'_, Data>, duration: Duration) -> TimeoutFuture { |
311 | Self::from_deadline_inner(handle, Instant::now().checked_add(duration)) |
312 | } |
313 | |
314 | /// Create a future that resolves at a given instant |
315 | pub fn from_deadline<Data>(handle: &LoopHandle<'_, Data>, deadline: Instant) -> TimeoutFuture { |
316 | Self::from_deadline_inner(handle, Some(deadline)) |
317 | } |
318 | |
319 | /// Create a future that resolves at a given instant |
320 | fn from_deadline_inner<Data>( |
321 | handle: &LoopHandle<'_, Data>, |
322 | deadline: Option<Instant>, |
323 | ) -> TimeoutFuture { |
324 | let timer = Timer::from_deadline_inner(deadline); |
325 | let waker = Rc::new(RefCell::new(None::<Waker>)); |
326 | handle |
327 | .insert_source(timer, { |
328 | let waker = waker.clone(); |
329 | move |_, &mut (), _| { |
330 | if let Some(waker) = waker.borrow_mut().clone() { |
331 | waker.wake() |
332 | } |
333 | TimeoutAction::Drop |
334 | } |
335 | }) |
336 | .unwrap(); |
337 | |
338 | TimeoutFuture { deadline, waker } |
339 | } |
340 | } |
341 | |
342 | impl std::future::Future for TimeoutFuture { |
343 | type Output = (); |
344 | |
345 | fn poll( |
346 | self: std::pin::Pin<&mut Self>, |
347 | cx: &mut std::task::Context<'_>, |
348 | ) -> std::task::Poll<Self::Output> { |
349 | match self.deadline { |
350 | None => return std::task::Poll::Pending, |
351 | |
352 | Some(deadline: Instant) => { |
353 | if Instant::now() >= deadline { |
354 | return std::task::Poll::Ready(()); |
355 | } |
356 | } |
357 | } |
358 | |
359 | *self.waker.borrow_mut() = Some(cx.waker().clone()); |
360 | std::task::Poll::Pending |
361 | } |
362 | } |
363 | |
364 | #[cfg (test)] |
365 | mod tests { |
366 | use super::*; |
367 | use crate::*; |
368 | use std::time::Duration; |
369 | |
370 | #[test ] |
371 | fn simple_timer() { |
372 | let mut event_loop = EventLoop::try_new().unwrap(); |
373 | |
374 | let mut dispatched = false; |
375 | |
376 | event_loop |
377 | .handle() |
378 | .insert_source( |
379 | Timer::from_duration(Duration::from_millis(100)), |
380 | |_, &mut (), dispatched| { |
381 | *dispatched = true; |
382 | TimeoutAction::Drop |
383 | }, |
384 | ) |
385 | .unwrap(); |
386 | |
387 | event_loop |
388 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
389 | .unwrap(); |
390 | // not yet dispatched |
391 | assert!(!dispatched); |
392 | |
393 | event_loop |
394 | .dispatch(Some(Duration::from_millis(150)), &mut dispatched) |
395 | .unwrap(); |
396 | // now dispatched |
397 | assert!(dispatched); |
398 | } |
399 | |
400 | #[test ] |
401 | fn simple_timer_instant() { |
402 | let mut event_loop = EventLoop::try_new().unwrap(); |
403 | |
404 | let mut dispatched = false; |
405 | |
406 | event_loop |
407 | .handle() |
408 | .insert_source( |
409 | Timer::from_duration(Duration::from_millis(100)), |
410 | |_, &mut (), dispatched| { |
411 | *dispatched = true; |
412 | TimeoutAction::Drop |
413 | }, |
414 | ) |
415 | .unwrap(); |
416 | |
417 | event_loop |
418 | .dispatch(Some(Duration::from_millis(150)), &mut dispatched) |
419 | .unwrap(); |
420 | // now dispatched |
421 | assert!(dispatched); |
422 | } |
423 | |
424 | #[test ] |
425 | fn immediate_timer() { |
426 | let mut event_loop = EventLoop::try_new().unwrap(); |
427 | |
428 | let mut dispatched = false; |
429 | |
430 | event_loop |
431 | .handle() |
432 | .insert_source(Timer::immediate(), |_, &mut (), dispatched| { |
433 | *dispatched = true; |
434 | TimeoutAction::Drop |
435 | }) |
436 | .unwrap(); |
437 | |
438 | event_loop |
439 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
440 | .unwrap(); |
441 | // now dispatched |
442 | assert!(dispatched); |
443 | } |
444 | |
445 | // We cannot actually test high precision timers, as they are only high precision in release mode |
446 | // This test is here to ensure that the high-precision codepath are executed and work as intended |
447 | // even if we cannot test if they are actually high precision |
448 | #[test ] |
449 | fn high_precision_timer() { |
450 | let mut event_loop = EventLoop::try_new().unwrap(); |
451 | |
452 | let mut dispatched = false; |
453 | |
454 | event_loop |
455 | .handle() |
456 | .insert_source( |
457 | Timer::from_duration(Duration::from_millis(100)), |
458 | |_, &mut (), dispatched| { |
459 | *dispatched = true; |
460 | TimeoutAction::Drop |
461 | }, |
462 | ) |
463 | .unwrap(); |
464 | |
465 | event_loop |
466 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
467 | .unwrap(); |
468 | // not yet dispatched |
469 | assert!(!dispatched); |
470 | |
471 | event_loop |
472 | .dispatch(Some(Duration::from_micros(10200)), &mut dispatched) |
473 | .unwrap(); |
474 | // yet not dispatched |
475 | assert!(!dispatched); |
476 | |
477 | event_loop |
478 | .dispatch(Some(Duration::from_millis(100)), &mut dispatched) |
479 | .unwrap(); |
480 | // now dispatched |
481 | assert!(dispatched); |
482 | } |
483 | |
484 | #[test ] |
485 | fn cancel_timer() { |
486 | let mut event_loop = EventLoop::try_new().unwrap(); |
487 | |
488 | let mut dispatched = false; |
489 | |
490 | let token = event_loop |
491 | .handle() |
492 | .insert_source( |
493 | Timer::from_duration(Duration::from_millis(100)), |
494 | |_, &mut (), dispatched| { |
495 | *dispatched = true; |
496 | TimeoutAction::Drop |
497 | }, |
498 | ) |
499 | .unwrap(); |
500 | |
501 | event_loop |
502 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
503 | .unwrap(); |
504 | // not yet dispatched |
505 | assert!(!dispatched); |
506 | |
507 | event_loop.handle().remove(token); |
508 | |
509 | event_loop |
510 | .dispatch(Some(Duration::from_millis(150)), &mut dispatched) |
511 | .unwrap(); |
512 | // still not dispatched |
513 | assert!(!dispatched); |
514 | } |
515 | |
516 | #[test ] |
517 | fn repeating_timer() { |
518 | let mut event_loop = EventLoop::try_new().unwrap(); |
519 | |
520 | let mut dispatched = 0; |
521 | |
522 | event_loop |
523 | .handle() |
524 | .insert_source( |
525 | Timer::from_duration(Duration::from_millis(500)), |
526 | |_, &mut (), dispatched| { |
527 | *dispatched += 1; |
528 | TimeoutAction::ToDuration(Duration::from_millis(500)) |
529 | }, |
530 | ) |
531 | .unwrap(); |
532 | |
533 | event_loop |
534 | .dispatch(Some(Duration::from_millis(250)), &mut dispatched) |
535 | .unwrap(); |
536 | assert_eq!(dispatched, 0); |
537 | |
538 | event_loop |
539 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
540 | .unwrap(); |
541 | assert_eq!(dispatched, 1); |
542 | |
543 | event_loop |
544 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
545 | .unwrap(); |
546 | assert_eq!(dispatched, 2); |
547 | |
548 | event_loop |
549 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
550 | .unwrap(); |
551 | assert_eq!(dispatched, 3); |
552 | } |
553 | |
554 | #[cfg (feature = "executor" )] |
555 | #[test ] |
556 | fn timeout_future() { |
557 | let mut event_loop = EventLoop::try_new().unwrap(); |
558 | |
559 | let mut dispatched = 0; |
560 | |
561 | let timeout_1 = |
562 | TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(500)); |
563 | let timeout_2 = |
564 | TimeoutFuture::from_duration(&event_loop.handle(), Duration::from_millis(1500)); |
565 | // This one should never go off. |
566 | let timeout_3 = TimeoutFuture::from_duration(&event_loop.handle(), Duration::MAX); |
567 | |
568 | let (exec, sched) = crate::sources::futures::executor().unwrap(); |
569 | event_loop |
570 | .handle() |
571 | .insert_source(exec, move |(), &mut (), got| { |
572 | *got += 1; |
573 | }) |
574 | .unwrap(); |
575 | |
576 | sched.schedule(timeout_1).unwrap(); |
577 | sched.schedule(timeout_2).unwrap(); |
578 | sched.schedule(timeout_3).unwrap(); |
579 | |
580 | // We do a 0-timeout dispatch after every regular dispatch to let the timeout triggers |
581 | // flow back to the executor |
582 | |
583 | event_loop |
584 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
585 | .unwrap(); |
586 | event_loop |
587 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
588 | .unwrap(); |
589 | assert_eq!(dispatched, 0); |
590 | |
591 | event_loop |
592 | .dispatch(Some(Duration::from_millis(1000)), &mut dispatched) |
593 | .unwrap(); |
594 | event_loop |
595 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
596 | .unwrap(); |
597 | assert_eq!(dispatched, 1); |
598 | |
599 | event_loop |
600 | .dispatch(Some(Duration::from_millis(1100)), &mut dispatched) |
601 | .unwrap(); |
602 | event_loop |
603 | .dispatch(Some(Duration::ZERO), &mut dispatched) |
604 | .unwrap(); |
605 | assert_eq!(dispatched, 2); |
606 | } |
607 | |
608 | #[test ] |
609 | fn no_overflow() { |
610 | let mut event_loop = EventLoop::try_new().unwrap(); |
611 | |
612 | let mut dispatched = 0; |
613 | |
614 | event_loop |
615 | .handle() |
616 | .insert_source( |
617 | Timer::from_duration(Duration::from_millis(500)), |
618 | |_, &mut (), dispatched| { |
619 | *dispatched += 1; |
620 | TimeoutAction::Drop |
621 | }, |
622 | ) |
623 | .unwrap(); |
624 | |
625 | event_loop |
626 | .handle() |
627 | .insert_source(Timer::from_duration(Duration::MAX), |_, &mut (), _| { |
628 | panic!("This timer should never go off" ) |
629 | }) |
630 | .unwrap(); |
631 | |
632 | event_loop |
633 | .dispatch(Some(Duration::from_millis(250)), &mut dispatched) |
634 | .unwrap(); |
635 | assert_eq!(dispatched, 0); |
636 | |
637 | event_loop |
638 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
639 | .unwrap(); |
640 | assert_eq!(dispatched, 1); |
641 | |
642 | event_loop |
643 | .dispatch(Some(Duration::from_millis(510)), &mut dispatched) |
644 | .unwrap(); |
645 | assert_eq!(dispatched, 1); |
646 | } |
647 | } |
648 | |