1//! Timer state structures.
2//!
3//! This module contains the heart of the intrusive timer implementation, and as
4//! such the structures inside are full of tricky concurrency and unsafe code.
5//!
6//! # Ground rules
7//!
8//! The heart of the timer implementation here is the [`TimerShared`] structure,
9//! shared between the [`TimerEntry`] and the driver. Generally, we permit access
10//! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or
11//! 2) a held driver lock.
12//!
13//! It follows from this that any changes made while holding BOTH 1 and 2 will
14//! be reliably visible, regardless of ordering. This is because of the acq/rel
15//! fences on the driver lock ensuring ordering with 2, and rust mutable
16//! reference rules for 1 (a mutable reference to an object can't be passed
17//! between threads without an acq/rel barrier, and same-thread we have local
18//! happens-before ordering).
19//!
20//! # State field
21//!
22//! Each timer has a state field associated with it. This field contains either
23//! the current scheduled time, or a special flag value indicating its state.
24//! This state can either indicate that the timer is on the 'pending' queue (and
25//! thus will be fired with an `Ok(())` result soon) or that it has already been
26//! fired/deregistered.
27//!
28//! This single state field allows for code that is firing the timer to
29//! synchronize with any racing `reset` calls reliably.
30//!
31//! # Cached vs true timeouts
32//!
33//! To allow for the use case of a timeout that is periodically reset before
34//! expiration to be as lightweight as possible, we support optimistically
35//! lock-free timer resets, in the case where a timer is rescheduled to a later
36//! point than it was originally scheduled for.
37//!
38//! This is accomplished by lazily rescheduling timers. That is, we update the
39//! state field with the true expiration of the timer from the holder of
40//! the [`TimerEntry`]. When the driver services timers (ie, whenever it's
41//! walking lists of timers), it checks this "true when" value, and reschedules
42//! based on it.
43//!
44//! We do, however, also need to track what the expiration time was when we
45//! originally registered the timer; this is used to locate the right linked
46//! list when the timer is being cancelled. This is referred to as the "cached
47//! when" internally.
48//!
49//! There is of course a race condition between timer reset and timer
50//! expiration. If the driver fails to observe the updated expiration time, it
51//! could trigger expiration of the timer too early. However, because
52//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and
53//! refuse to mark the timer as pending.
54//!
55//! [mark_pending]: TimerHandle::mark_pending
56
57use crate::loom::cell::UnsafeCell;
58use crate::loom::sync::atomic::AtomicU64;
59use crate::loom::sync::atomic::Ordering;
60
61use crate::runtime::scheduler;
62use crate::sync::AtomicWaker;
63use crate::time::Instant;
64use crate::util::linked_list;
65
66use std::cell::UnsafeCell as StdUnsafeCell;
67use std::task::{Context, Poll, Waker};
68use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
69
70type TimerResult = Result<(), crate::time::error::Error>;
71
72const STATE_DEREGISTERED: u64 = u64::MAX;
73const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
74const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
75/// The largest safe integer to use for ticks.
76///
77/// This value should be updated if any other signal values are added above.
78pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = u64::MAX - 2;
79
80/// This structure holds the current shared state of the timer - its scheduled
81/// time (if registered), or otherwise the result of the timer completing, as
82/// well as the registered waker.
83///
84/// Generally, the StateCell is only permitted to be accessed from two contexts:
85/// Either a thread holding the corresponding &mut TimerEntry, or a thread
86/// holding the timer driver lock. The write actions on the StateCell amount to
87/// passing "ownership" of the StateCell between these contexts; moving a timer
88/// from the TimerEntry to the driver requires _both_ holding the &mut
89/// TimerEntry and the driver lock, while moving it back (firing the timer)
90/// requires only the driver lock.
91pub(super) struct StateCell {
92 /// Holds either the scheduled expiration time for this timer, or (if the
93 /// timer has been fired and is unregistered), `u64::MAX`.
94 state: AtomicU64,
95 /// If the timer is fired (an Acquire order read on state shows
96 /// `u64::MAX`), holds the result that should be returned from
97 /// polling the timer. Otherwise, the contents are unspecified and reading
98 /// without holding the driver lock is undefined behavior.
99 result: UnsafeCell<TimerResult>,
100 /// The currently-registered waker
101 waker: AtomicWaker,
102}
103
104impl Default for StateCell {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110impl std::fmt::Debug for StateCell {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 write!(f, "StateCell({:?})", self.read_state())
113 }
114}
115
116impl StateCell {
117 fn new() -> Self {
118 Self {
119 state: AtomicU64::new(STATE_DEREGISTERED),
120 result: UnsafeCell::new(Ok(())),
121 waker: AtomicWaker::new(),
122 }
123 }
124
125 fn is_pending(&self) -> bool {
126 self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
127 }
128
129 /// Returns the current expiration time, or None if not currently scheduled.
130 fn when(&self) -> Option<u64> {
131 let cur_state = self.state.load(Ordering::Relaxed);
132
133 if cur_state == STATE_DEREGISTERED {
134 None
135 } else {
136 Some(cur_state)
137 }
138 }
139
140 /// If the timer is completed, returns the result of the timer. Otherwise,
141 /// returns None and registers the waker.
142 fn poll(&self, waker: &Waker) -> Poll<TimerResult> {
143 // We must register first. This ensures that either `fire` will
144 // observe the new waker, or we will observe a racing fire to have set
145 // the state, or both.
146 self.waker.register_by_ref(waker);
147
148 self.read_state()
149 }
150
151 fn read_state(&self) -> Poll<TimerResult> {
152 let cur_state = self.state.load(Ordering::Acquire);
153
154 if cur_state == STATE_DEREGISTERED {
155 // SAFETY: The driver has fired this timer; this involves writing
156 // the result, and then writing (with release ordering) the state
157 // field.
158 Poll::Ready(unsafe { self.result.with(|p| *p) })
159 } else {
160 Poll::Pending
161 }
162 }
163
164 /// Marks this timer as being moved to the pending list, if its scheduled
165 /// time is not after `not_after`.
166 ///
167 /// If the timer is scheduled for a time after not_after, returns an Err
168 /// containing the current scheduled time.
169 ///
170 /// SAFETY: Must hold the driver lock.
171 unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
172 // Quick initial debug check to see if the timer is already fired. Since
173 // firing the timer can only happen with the driver lock held, we know
174 // we shouldn't be able to "miss" a transition to a fired state, even
175 // with relaxed ordering.
176 let mut cur_state = self.state.load(Ordering::Relaxed);
177
178 loop {
179 // improve the error message for things like
180 // https://github.com/tokio-rs/tokio/issues/3675
181 assert!(
182 cur_state < STATE_MIN_VALUE,
183 "mark_pending called when the timer entry is in an invalid state"
184 );
185
186 if cur_state > not_after {
187 break Err(cur_state);
188 }
189
190 match self.state.compare_exchange(
191 cur_state,
192 STATE_PENDING_FIRE,
193 Ordering::AcqRel,
194 Ordering::Acquire,
195 ) {
196 Ok(_) => {
197 break Ok(());
198 }
199 Err(actual_state) => {
200 cur_state = actual_state;
201 }
202 }
203 }
204 }
205
206 /// Fires the timer, setting the result to the provided result.
207 ///
208 /// Returns:
209 /// * `Some(waker) - if fired and a waker needs to be invoked once the
210 /// driver lock is released
211 /// * `None` - if fired and a waker does not need to be invoked, or if
212 /// already fired
213 ///
214 /// SAFETY: The driver lock must be held.
215 unsafe fn fire(&self, result: TimerResult) -> Option<Waker> {
216 // Quick initial check to see if the timer is already fired. Since
217 // firing the timer can only happen with the driver lock held, we know
218 // we shouldn't be able to "miss" a transition to a fired state, even
219 // with relaxed ordering.
220 let cur_state = self.state.load(Ordering::Relaxed);
221 if cur_state == STATE_DEREGISTERED {
222 return None;
223 }
224
225 // SAFETY: We assume the driver lock is held and the timer is not
226 // fired, so only the driver is accessing this field.
227 //
228 // We perform a release-ordered store to state below, to ensure this
229 // write is visible before the state update is visible.
230 unsafe { self.result.with_mut(|p| *p = result) };
231
232 self.state.store(STATE_DEREGISTERED, Ordering::Release);
233
234 self.waker.take_waker()
235 }
236
237 /// Marks the timer as registered (poll will return None) and sets the
238 /// expiration time.
239 ///
240 /// While this function is memory-safe, it should only be called from a
241 /// context holding both `&mut TimerEntry` and the driver lock.
242 fn set_expiration(&self, timestamp: u64) {
243 debug_assert!(timestamp < STATE_MIN_VALUE);
244
245 // We can use relaxed ordering because we hold the driver lock and will
246 // fence when we release the lock.
247 self.state.store(timestamp, Ordering::Relaxed);
248 }
249
250 /// Attempts to adjust the timer to a new timestamp.
251 ///
252 /// If the timer has already been fired, is pending firing, or the new
253 /// timestamp is earlier than the old timestamp, (or occasionally
254 /// spuriously) returns Err without changing the timer's state. In this
255 /// case, the timer must be deregistered and re-registered.
256 fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> {
257 let mut prior = self.state.load(Ordering::Relaxed);
258 loop {
259 if new_timestamp < prior || prior >= STATE_MIN_VALUE {
260 return Err(());
261 }
262
263 match self.state.compare_exchange_weak(
264 prior,
265 new_timestamp,
266 Ordering::AcqRel,
267 Ordering::Acquire,
268 ) {
269 Ok(_) => {
270 return Ok(());
271 }
272 Err(true_prior) => {
273 prior = true_prior;
274 }
275 }
276 }
277 }
278
279 /// Returns true if the state of this timer indicates that the timer might
280 /// be registered with the driver. This check is performed with relaxed
281 /// ordering, but is conservative - if it returns false, the timer is
282 /// definitely _not_ registered.
283 pub(super) fn might_be_registered(&self) -> bool {
284 self.state.load(Ordering::Relaxed) != u64::MAX
285 }
286}
287
288/// A timer entry.
289///
290/// This is the handle to a timer that is controlled by the requester of the
291/// timer. As this participates in intrusive data structures, it must be pinned
292/// before polling.
293#[derive(Debug)]
294pub(crate) struct TimerEntry {
295 /// Arc reference to the runtime handle. We can only free the driver after
296 /// deregistering everything from their respective timer wheels.
297 driver: scheduler::Handle,
298 /// Shared inner structure; this is part of an intrusive linked list, and
299 /// therefore other references can exist to it while mutable references to
300 /// Entry exist.
301 ///
302 /// This is manipulated only under the inner mutex. TODO: Can we use loom
303 /// cells for this?
304 inner: StdUnsafeCell<TimerShared>,
305 /// Deadline for the timer. This is used to register on the first
306 /// poll, as we can't register prior to being pinned.
307 deadline: Instant,
308 /// Whether the deadline has been registered.
309 registered: bool,
310 /// Ensure the type is !Unpin
311 _m: std::marker::PhantomPinned,
312}
313
314unsafe impl Send for TimerEntry {}
315unsafe impl Sync for TimerEntry {}
316
317/// An TimerHandle is the (non-enforced) "unique" pointer from the driver to the
318/// timer entry. Generally, at most one TimerHandle exists for a timer at a time
319/// (enforced by the timer state machine).
320///
321/// SAFETY: An TimerHandle is essentially a raw pointer, and the usual caveats
322/// of pointer safety apply. In particular, TimerHandle does not itself enforce
323/// that the timer does still exist; however, normally an TimerHandle is created
324/// immediately before registering the timer, and is consumed when firing the
325/// timer, to help minimize mistakes. Still, because TimerHandle cannot enforce
326/// memory safety, all operations are unsafe.
327#[derive(Debug)]
328pub(crate) struct TimerHandle {
329 inner: NonNull<TimerShared>,
330}
331
332pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>;
333
334/// The shared state structure of a timer. This structure is shared between the
335/// frontend (`Entry`) and driver backend.
336///
337/// Note that this structure is located inside the `TimerEntry` structure.
338pub(crate) struct TimerShared {
339 /// A link within the doubly-linked list of timers on a particular level and
340 /// slot. Valid only if state is equal to Registered.
341 ///
342 /// Only accessed under the entry lock.
343 pointers: linked_list::Pointers<TimerShared>,
344
345 /// The expiration time for which this entry is currently registered.
346 /// Generally owned by the driver, but is accessed by the entry when not
347 /// registered.
348 cached_when: AtomicU64,
349
350 /// The true expiration time. Set by the timer future, read by the driver.
351 true_when: AtomicU64,
352
353 /// Current state. This records whether the timer entry is currently under
354 /// the ownership of the driver, and if not, its current state (not
355 /// complete, fired, error, etc).
356 state: StateCell,
357
358 _p: PhantomPinned,
359}
360
361unsafe impl Send for TimerShared {}
362unsafe impl Sync for TimerShared {}
363
364impl std::fmt::Debug for TimerShared {
365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366 f&mut DebugStruct<'_, '_>.debug_struct("TimerShared")
367 .field("when", &self.true_when.load(Ordering::Relaxed))
368 .field("cached_when", &self.cached_when.load(Ordering::Relaxed))
369 .field(name:"state", &self.state)
370 .finish()
371 }
372}
373
374generate_addr_of_methods! {
375 impl<> TimerShared {
376 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<TimerShared>> {
377 &self.pointers
378 }
379 }
380}
381
382impl TimerShared {
383 pub(super) fn new() -> Self {
384 Self {
385 cached_when: AtomicU64::new(0),
386 true_when: AtomicU64::new(0),
387 pointers: linked_list::Pointers::new(),
388 state: StateCell::default(),
389 _p: PhantomPinned,
390 }
391 }
392
393 /// Gets the cached time-of-expiration value.
394 pub(super) fn cached_when(&self) -> u64 {
395 // Cached-when is only accessed under the driver lock, so we can use relaxed
396 self.cached_when.load(Ordering::Relaxed)
397 }
398
399 /// Gets the true time-of-expiration value, and copies it into the cached
400 /// time-of-expiration value.
401 ///
402 /// SAFETY: Must be called with the driver lock held, and when this entry is
403 /// not in any timer wheel lists.
404 pub(super) unsafe fn sync_when(&self) -> u64 {
405 let true_when = self.true_when();
406
407 self.cached_when.store(true_when, Ordering::Relaxed);
408
409 true_when
410 }
411
412 /// Sets the cached time-of-expiration value.
413 ///
414 /// SAFETY: Must be called with the driver lock held, and when this entry is
415 /// not in any timer wheel lists.
416 unsafe fn set_cached_when(&self, when: u64) {
417 self.cached_when.store(when, Ordering::Relaxed);
418 }
419
420 /// Returns the true time-of-expiration value, with relaxed memory ordering.
421 pub(super) fn true_when(&self) -> u64 {
422 self.state.when().expect("Timer already fired")
423 }
424
425 /// Sets the true time-of-expiration value, even if it is less than the
426 /// current expiration or the timer is deregistered.
427 ///
428 /// SAFETY: Must only be called with the driver lock held and the entry not
429 /// in the timer wheel.
430 pub(super) unsafe fn set_expiration(&self, t: u64) {
431 self.state.set_expiration(t);
432 self.cached_when.store(t, Ordering::Relaxed);
433 }
434
435 /// Sets the true time-of-expiration only if it is after the current.
436 pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> {
437 self.state.extend_expiration(t)
438 }
439
440 /// Returns a TimerHandle for this timer.
441 pub(super) fn handle(&self) -> TimerHandle {
442 TimerHandle {
443 inner: NonNull::from(self),
444 }
445 }
446
447 /// Returns true if the state of this timer indicates that the timer might
448 /// be registered with the driver. This check is performed with relaxed
449 /// ordering, but is conservative - if it returns false, the timer is
450 /// definitely _not_ registered.
451 pub(super) fn might_be_registered(&self) -> bool {
452 self.state.might_be_registered()
453 }
454}
455
456unsafe impl linked_list::Link for TimerShared {
457 type Handle = TimerHandle;
458
459 type Target = TimerShared;
460
461 fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> {
462 handle.inner
463 }
464
465 unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle {
466 TimerHandle { inner: ptr }
467 }
468
469 unsafe fn pointers(
470 target: NonNull<Self::Target>,
471 ) -> NonNull<linked_list::Pointers<Self::Target>> {
472 TimerShared::addr_of_pointers(me:target)
473 }
474}
475
476// ===== impl Entry =====
477
478impl TimerEntry {
479 #[track_caller]
480 pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self {
481 // Panic if the time driver is not enabled
482 let _ = handle.driver().time();
483
484 let driver = handle.clone();
485
486 Self {
487 driver,
488 inner: StdUnsafeCell::new(TimerShared::new()),
489 deadline,
490 registered: false,
491 _m: std::marker::PhantomPinned,
492 }
493 }
494
495 fn inner(&self) -> &TimerShared {
496 unsafe { &*self.inner.get() }
497 }
498
499 pub(crate) fn deadline(&self) -> Instant {
500 self.deadline
501 }
502
503 pub(crate) fn is_elapsed(&self) -> bool {
504 !self.inner().state.might_be_registered() && self.registered
505 }
506
507 /// Cancels and deregisters the timer. This operation is irreversible.
508 pub(crate) fn cancel(self: Pin<&mut Self>) {
509 // We need to perform an acq/rel fence with the driver thread, and the
510 // simplest way to do so is to grab the driver lock.
511 //
512 // Why is this necessary? We're about to release this timer's memory for
513 // some other non-timer use. However, we've been doing a bunch of
514 // relaxed (or even non-atomic) writes from the driver thread, and we'll
515 // be doing more from _this thread_ (as this memory is interpreted as
516 // something else).
517 //
518 // It is critical to ensure that, from the point of view of the driver,
519 // those future non-timer writes happen-after the timer is fully fired,
520 // and from the purpose of this thread, the driver's writes all
521 // happen-before we drop the timer. This in turn requires us to perform
522 // an acquire-release barrier in _both_ directions between the driver
523 // and dropping thread.
524 //
525 // The lock acquisition in clear_entry serves this purpose. All of the
526 // driver manipulations happen with the lock held, so we can just take
527 // the lock and be sure that this drop happens-after everything the
528 // driver did so far and happens-before everything the driver does in
529 // the future. While we have the lock held, we also go ahead and
530 // deregister the entry if necessary.
531 unsafe { self.driver().clear_entry(NonNull::from(self.inner())) };
532 }
533
534 pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
535 unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
536 unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister;
537
538 let tick = self.driver().time_source().deadline_to_tick(new_time);
539
540 if self.inner().extend_expiration(tick).is_ok() {
541 return;
542 }
543
544 if reregister {
545 unsafe {
546 self.driver()
547 .reregister(&self.driver.driver().io, tick, self.inner().into());
548 }
549 }
550 }
551
552 pub(crate) fn poll_elapsed(
553 mut self: Pin<&mut Self>,
554 cx: &mut Context<'_>,
555 ) -> Poll<Result<(), super::Error>> {
556 if self.driver().is_shutdown() {
557 panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
558 }
559
560 if !self.registered {
561 let deadline = self.deadline;
562 self.as_mut().reset(deadline, true);
563 }
564
565 let this = unsafe { self.get_unchecked_mut() };
566
567 this.inner().state.poll(cx.waker())
568 }
569
570 pub(crate) fn driver(&self) -> &super::Handle {
571 self.driver.driver().time()
572 }
573
574 #[cfg(all(tokio_unstable, feature = "tracing"))]
575 pub(crate) fn clock(&self) -> &super::Clock {
576 self.driver.driver().clock()
577 }
578}
579
580impl TimerHandle {
581 pub(super) unsafe fn cached_when(&self) -> u64 {
582 unsafe { self.inner.as_ref().cached_when() }
583 }
584
585 pub(super) unsafe fn sync_when(&self) -> u64 {
586 unsafe { self.inner.as_ref().sync_when() }
587 }
588
589 pub(super) unsafe fn is_pending(&self) -> bool {
590 unsafe { self.inner.as_ref().state.is_pending() }
591 }
592
593 /// Forcibly sets the true and cached expiration times to the given tick.
594 ///
595 /// SAFETY: The caller must ensure that the handle remains valid, the driver
596 /// lock is held, and that the timer is not in any wheel linked lists.
597 pub(super) unsafe fn set_expiration(&self, tick: u64) {
598 self.inner.as_ref().set_expiration(tick);
599 }
600
601 /// Attempts to mark this entry as pending. If the expiration time is after
602 /// `not_after`, however, returns an Err with the current expiration time.
603 ///
604 /// If an `Err` is returned, the `cached_when` value will be updated to this
605 /// new expiration time.
606 ///
607 /// SAFETY: The caller must ensure that the handle remains valid, the driver
608 /// lock is held, and that the timer is not in any wheel linked lists.
609 /// After returning Ok, the entry must be added to the pending list.
610 pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
611 match self.inner.as_ref().state.mark_pending(not_after) {
612 Ok(()) => {
613 // mark this as being on the pending queue in cached_when
614 self.inner.as_ref().set_cached_when(u64::MAX);
615 Ok(())
616 }
617 Err(tick) => {
618 self.inner.as_ref().set_cached_when(tick);
619 Err(tick)
620 }
621 }
622 }
623
624 /// Attempts to transition to a terminal state. If the state is already a
625 /// terminal state, does nothing.
626 ///
627 /// Because the entry might be dropped after the state is moved to a
628 /// terminal state, this function consumes the handle to ensure we don't
629 /// access the entry afterwards.
630 ///
631 /// Returns the last-registered waker, if any.
632 ///
633 /// SAFETY: The driver lock must be held while invoking this function, and
634 /// the entry must not be in any wheel linked lists.
635 pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> {
636 self.inner.as_ref().state.fire(completed_state)
637 }
638}
639
640impl Drop for TimerEntry {
641 fn drop(&mut self) {
642 unsafe { Pin::new_unchecked(self) }.as_mut().cancel()
643 }
644}
645