| 1 | //! Timer state structures. |
| 2 | //! |
| 3 | //! This module contains the heart of the intrusive timer implementation, and as |
| 4 | //! such the structures inside are full of tricky concurrency and unsafe code. |
| 5 | //! |
| 6 | //! # Ground rules |
| 7 | //! |
| 8 | //! The heart of the timer implementation here is the [`TimerShared`] structure, |
| 9 | //! shared between the [`TimerEntry`] and the driver. Generally, we permit access |
| 10 | //! to [`TimerShared`] ONLY via either 1) a mutable reference to [`TimerEntry`] or |
| 11 | //! 2) a held driver lock. |
| 12 | //! |
| 13 | //! It follows from this that any changes made while holding BOTH 1 and 2 will |
| 14 | //! be reliably visible, regardless of ordering. This is because of the `acq/rel` |
| 15 | //! fences on the driver lock ensuring ordering with 2, and rust mutable |
| 16 | //! reference rules for 1 (a mutable reference to an object can't be passed |
| 17 | //! between threads without an `acq/rel` barrier, and same-thread we have local |
| 18 | //! happens-before ordering). |
| 19 | //! |
| 20 | //! # State field |
| 21 | //! |
| 22 | //! Each timer has a state field associated with it. This field contains either |
| 23 | //! the current scheduled time, or a special flag value indicating its state. |
| 24 | //! This state can either indicate that the timer is on the 'pending' queue (and |
| 25 | //! thus will be fired with an `Ok(())` result soon) or that it has already been |
| 26 | //! fired/deregistered. |
| 27 | //! |
| 28 | //! This single state field allows for code that is firing the timer to |
| 29 | //! synchronize with any racing `reset` calls reliably. |
| 30 | //! |
| 31 | //! # Cached vs true timeouts |
| 32 | //! |
| 33 | //! To allow for the use case of a timeout that is periodically reset before |
| 34 | //! expiration to be as lightweight as possible, we support optimistically |
| 35 | //! lock-free timer resets, in the case where a timer is rescheduled to a later |
| 36 | //! point than it was originally scheduled for. |
| 37 | //! |
| 38 | //! This is accomplished by lazily rescheduling timers. That is, we update the |
| 39 | //! state field with the true expiration of the timer from the holder of |
| 40 | //! the [`TimerEntry`]. When the driver services timers (ie, whenever it's |
| 41 | //! walking lists of timers), it checks this "true when" value, and reschedules |
| 42 | //! based on it. |
| 43 | //! |
| 44 | //! We do, however, also need to track what the expiration time was when we |
| 45 | //! originally registered the timer; this is used to locate the right linked |
| 46 | //! list when the timer is being cancelled. This is referred to as the "cached |
| 47 | //! when" internally. |
| 48 | //! |
| 49 | //! There is of course a race condition between timer reset and timer |
| 50 | //! expiration. If the driver fails to observe the updated expiration time, it |
| 51 | //! could trigger expiration of the timer too early. However, because |
| 52 | //! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and |
| 53 | //! refuse to mark the timer as pending. |
| 54 | //! |
| 55 | //! [mark_pending]: TimerHandle::mark_pending |
| 56 | |
| 57 | use crate::loom::cell::UnsafeCell; |
| 58 | use crate::loom::sync::atomic::AtomicU64; |
| 59 | use crate::loom::sync::atomic::Ordering; |
| 60 | |
| 61 | use crate::runtime::context; |
| 62 | use crate::runtime::scheduler; |
| 63 | use crate::sync::AtomicWaker; |
| 64 | use crate::time::Instant; |
| 65 | use crate::util::linked_list; |
| 66 | |
| 67 | use std::cell::UnsafeCell as StdUnsafeCell; |
| 68 | use std::task::{Context, Poll, Waker}; |
| 69 | use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; |
| 70 | |
| 71 | type TimerResult = Result<(), crate::time::error::Error>; |
| 72 | |
| 73 | const STATE_DEREGISTERED: u64 = u64::MAX; |
| 74 | const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1; |
| 75 | const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE; |
| 76 | /// The largest safe integer to use for ticks. |
| 77 | /// |
| 78 | /// This value should be updated if any other signal values are added above. |
| 79 | pub(super) const MAX_SAFE_MILLIS_DURATION: u64 = STATE_MIN_VALUE - 1; |
| 80 | |
| 81 | /// This structure holds the current shared state of the timer - its scheduled |
| 82 | /// time (if registered), or otherwise the result of the timer completing, as |
| 83 | /// well as the registered waker. |
| 84 | /// |
| 85 | /// Generally, the `StateCell` is only permitted to be accessed from two contexts: |
| 86 | /// Either a thread holding the corresponding `&mut TimerEntry`, or a thread |
| 87 | /// holding the timer driver lock. The write actions on the `StateCell` amount to |
| 88 | /// passing "ownership" of the `StateCell` between these contexts; moving a timer |
| 89 | /// from the `TimerEntry` to the driver requires _both_ holding the `&mut |
| 90 | /// TimerEntry` and the driver lock, while moving it back (firing the timer) |
| 91 | /// requires only the driver lock. |
| 92 | pub(super) struct StateCell { |
| 93 | /// Holds either the scheduled expiration time for this timer, or (if the |
| 94 | /// timer has been fired and is unregistered), `u64::MAX`. |
| 95 | state: AtomicU64, |
| 96 | /// If the timer is fired (an Acquire order read on state shows |
| 97 | /// `u64::MAX`), holds the result that should be returned from |
| 98 | /// polling the timer. Otherwise, the contents are unspecified and reading |
| 99 | /// without holding the driver lock is undefined behavior. |
| 100 | result: UnsafeCell<TimerResult>, |
| 101 | /// The currently-registered waker |
| 102 | waker: AtomicWaker, |
| 103 | } |
| 104 | |
| 105 | impl Default for StateCell { |
| 106 | fn default() -> Self { |
| 107 | Self::new() |
| 108 | } |
| 109 | } |
| 110 | |
| 111 | impl std::fmt::Debug for StateCell { |
| 112 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 113 | write!(f, "StateCell( {:?})" , self.read_state()) |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | impl StateCell { |
| 118 | fn new() -> Self { |
| 119 | Self { |
| 120 | state: AtomicU64::new(STATE_DEREGISTERED), |
| 121 | result: UnsafeCell::new(Ok(())), |
| 122 | waker: AtomicWaker::new(), |
| 123 | } |
| 124 | } |
| 125 | |
| 126 | fn is_pending(&self) -> bool { |
| 127 | self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE |
| 128 | } |
| 129 | |
| 130 | /// Returns the current expiration time, or None if not currently scheduled. |
| 131 | fn when(&self) -> Option<u64> { |
| 132 | let cur_state = self.state.load(Ordering::Relaxed); |
| 133 | |
| 134 | if cur_state == STATE_DEREGISTERED { |
| 135 | None |
| 136 | } else { |
| 137 | Some(cur_state) |
| 138 | } |
| 139 | } |
| 140 | |
| 141 | /// If the timer is completed, returns the result of the timer. Otherwise, |
| 142 | /// returns None and registers the waker. |
| 143 | fn poll(&self, waker: &Waker) -> Poll<TimerResult> { |
| 144 | // We must register first. This ensures that either `fire` will |
| 145 | // observe the new waker, or we will observe a racing fire to have set |
| 146 | // the state, or both. |
| 147 | self.waker.register_by_ref(waker); |
| 148 | |
| 149 | self.read_state() |
| 150 | } |
| 151 | |
| 152 | fn read_state(&self) -> Poll<TimerResult> { |
| 153 | let cur_state = self.state.load(Ordering::Acquire); |
| 154 | |
| 155 | if cur_state == STATE_DEREGISTERED { |
| 156 | // SAFETY: The driver has fired this timer; this involves writing |
| 157 | // the result, and then writing (with release ordering) the state |
| 158 | // field. |
| 159 | Poll::Ready(unsafe { self.result.with(|p| *p) }) |
| 160 | } else { |
| 161 | Poll::Pending |
| 162 | } |
| 163 | } |
| 164 | |
| 165 | /// Marks this timer as being moved to the pending list, if its scheduled |
| 166 | /// time is not after `not_after`. |
| 167 | /// |
| 168 | /// If the timer is scheduled for a time after `not_after`, returns an Err |
| 169 | /// containing the current scheduled time. |
| 170 | /// |
| 171 | /// SAFETY: Must hold the driver lock. |
| 172 | unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { |
| 173 | // Quick initial debug check to see if the timer is already fired. Since |
| 174 | // firing the timer can only happen with the driver lock held, we know |
| 175 | // we shouldn't be able to "miss" a transition to a fired state, even |
| 176 | // with relaxed ordering. |
| 177 | let mut cur_state = self.state.load(Ordering::Relaxed); |
| 178 | |
| 179 | loop { |
| 180 | // improve the error message for things like |
| 181 | // https://github.com/tokio-rs/tokio/issues/3675 |
| 182 | assert!( |
| 183 | cur_state < STATE_MIN_VALUE, |
| 184 | "mark_pending called when the timer entry is in an invalid state" |
| 185 | ); |
| 186 | |
| 187 | if cur_state > not_after { |
| 188 | break Err(cur_state); |
| 189 | } |
| 190 | |
| 191 | match self.state.compare_exchange_weak( |
| 192 | cur_state, |
| 193 | STATE_PENDING_FIRE, |
| 194 | Ordering::AcqRel, |
| 195 | Ordering::Acquire, |
| 196 | ) { |
| 197 | Ok(_) => break Ok(()), |
| 198 | Err(actual_state) => cur_state = actual_state, |
| 199 | } |
| 200 | } |
| 201 | } |
| 202 | |
| 203 | /// Fires the timer, setting the result to the provided result. |
| 204 | /// |
| 205 | /// Returns: |
| 206 | /// * `Some(waker)` - if fired and a waker needs to be invoked once the |
| 207 | /// driver lock is released |
| 208 | /// * `None` - if fired and a waker does not need to be invoked, or if |
| 209 | /// already fired |
| 210 | /// |
| 211 | /// SAFETY: The driver lock must be held. |
| 212 | unsafe fn fire(&self, result: TimerResult) -> Option<Waker> { |
| 213 | // Quick initial check to see if the timer is already fired. Since |
| 214 | // firing the timer can only happen with the driver lock held, we know |
| 215 | // we shouldn't be able to "miss" a transition to a fired state, even |
| 216 | // with relaxed ordering. |
| 217 | let cur_state = self.state.load(Ordering::Relaxed); |
| 218 | if cur_state == STATE_DEREGISTERED { |
| 219 | return None; |
| 220 | } |
| 221 | |
| 222 | // SAFETY: We assume the driver lock is held and the timer is not |
| 223 | // fired, so only the driver is accessing this field. |
| 224 | // |
| 225 | // We perform a release-ordered store to state below, to ensure this |
| 226 | // write is visible before the state update is visible. |
| 227 | unsafe { self.result.with_mut(|p| *p = result) }; |
| 228 | |
| 229 | self.state.store(STATE_DEREGISTERED, Ordering::Release); |
| 230 | |
| 231 | self.waker.take_waker() |
| 232 | } |
| 233 | |
| 234 | /// Marks the timer as registered (poll will return None) and sets the |
| 235 | /// expiration time. |
| 236 | /// |
| 237 | /// While this function is memory-safe, it should only be called from a |
| 238 | /// context holding both `&mut TimerEntry` and the driver lock. |
| 239 | fn set_expiration(&self, timestamp: u64) { |
| 240 | debug_assert!(timestamp < STATE_MIN_VALUE); |
| 241 | |
| 242 | // We can use relaxed ordering because we hold the driver lock and will |
| 243 | // fence when we release the lock. |
| 244 | self.state.store(timestamp, Ordering::Relaxed); |
| 245 | } |
| 246 | |
| 247 | /// Attempts to adjust the timer to a new timestamp. |
| 248 | /// |
| 249 | /// If the timer has already been fired, is pending firing, or the new |
| 250 | /// timestamp is earlier than the old timestamp, (or occasionally |
| 251 | /// spuriously) returns Err without changing the timer's state. In this |
| 252 | /// case, the timer must be deregistered and re-registered. |
| 253 | fn extend_expiration(&self, new_timestamp: u64) -> Result<(), ()> { |
| 254 | let mut prior = self.state.load(Ordering::Relaxed); |
| 255 | loop { |
| 256 | if new_timestamp < prior || prior >= STATE_MIN_VALUE { |
| 257 | return Err(()); |
| 258 | } |
| 259 | |
| 260 | match self.state.compare_exchange_weak( |
| 261 | prior, |
| 262 | new_timestamp, |
| 263 | Ordering::AcqRel, |
| 264 | Ordering::Acquire, |
| 265 | ) { |
| 266 | Ok(_) => return Ok(()), |
| 267 | Err(true_prior) => prior = true_prior, |
| 268 | } |
| 269 | } |
| 270 | } |
| 271 | |
| 272 | /// Returns true if the state of this timer indicates that the timer might |
| 273 | /// be registered with the driver. This check is performed with relaxed |
| 274 | /// ordering, but is conservative - if it returns false, the timer is |
| 275 | /// definitely _not_ registered. |
| 276 | pub(super) fn might_be_registered(&self) -> bool { |
| 277 | self.state.load(Ordering::Relaxed) != u64::MAX |
| 278 | } |
| 279 | } |
| 280 | |
| 281 | /// A timer entry. |
| 282 | /// |
| 283 | /// This is the handle to a timer that is controlled by the requester of the |
| 284 | /// timer. As this participates in intrusive data structures, it must be pinned |
| 285 | /// before polling. |
| 286 | #[derive (Debug)] |
| 287 | pub(crate) struct TimerEntry { |
| 288 | /// Arc reference to the runtime handle. We can only free the driver after |
| 289 | /// deregistering everything from their respective timer wheels. |
| 290 | driver: scheduler::Handle, |
| 291 | /// Shared inner structure; this is part of an intrusive linked list, and |
| 292 | /// therefore other references can exist to it while mutable references to |
| 293 | /// Entry exist. |
| 294 | /// |
| 295 | /// This is manipulated only under the inner mutex. TODO: Can we use loom |
| 296 | /// cells for this? |
| 297 | inner: StdUnsafeCell<Option<TimerShared>>, |
| 298 | /// Deadline for the timer. This is used to register on the first |
| 299 | /// poll, as we can't register prior to being pinned. |
| 300 | deadline: Instant, |
| 301 | /// Whether the deadline has been registered. |
| 302 | registered: bool, |
| 303 | /// Ensure the type is !Unpin |
| 304 | _m: std::marker::PhantomPinned, |
| 305 | } |
| 306 | |
| 307 | unsafe impl Send for TimerEntry {} |
| 308 | unsafe impl Sync for TimerEntry {} |
| 309 | |
| 310 | /// An `TimerHandle` is the (non-enforced) "unique" pointer from the driver to the |
| 311 | /// timer entry. Generally, at most one `TimerHandle` exists for a timer at a time |
| 312 | /// (enforced by the timer state machine). |
| 313 | /// |
| 314 | /// SAFETY: An `TimerHandle` is essentially a raw pointer, and the usual caveats |
| 315 | /// of pointer safety apply. In particular, `TimerHandle` does not itself enforce |
| 316 | /// that the timer does still exist; however, normally an `TimerHandle` is created |
| 317 | /// immediately before registering the timer, and is consumed when firing the |
| 318 | /// timer, to help minimize mistakes. Still, because `TimerHandle` cannot enforce |
| 319 | /// memory safety, all operations are unsafe. |
| 320 | #[derive (Debug)] |
| 321 | pub(crate) struct TimerHandle { |
| 322 | inner: NonNull<TimerShared>, |
| 323 | } |
| 324 | |
| 325 | pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, TimerShared>; |
| 326 | |
| 327 | /// The shared state structure of a timer. This structure is shared between the |
| 328 | /// frontend (`Entry`) and driver backend. |
| 329 | /// |
| 330 | /// Note that this structure is located inside the `TimerEntry` structure. |
| 331 | pub(crate) struct TimerShared { |
| 332 | /// The shard id. We should never change it. |
| 333 | shard_id: u32, |
| 334 | /// A link within the doubly-linked list of timers on a particular level and |
| 335 | /// slot. Valid only if state is equal to Registered. |
| 336 | /// |
| 337 | /// Only accessed under the entry lock. |
| 338 | pointers: linked_list::Pointers<TimerShared>, |
| 339 | |
| 340 | /// The expiration time for which this entry is currently registered. |
| 341 | /// Generally owned by the driver, but is accessed by the entry when not |
| 342 | /// registered. |
| 343 | cached_when: AtomicU64, |
| 344 | |
| 345 | /// Current state. This records whether the timer entry is currently under |
| 346 | /// the ownership of the driver, and if not, its current state (not |
| 347 | /// complete, fired, error, etc). |
| 348 | state: StateCell, |
| 349 | |
| 350 | _p: PhantomPinned, |
| 351 | } |
| 352 | |
| 353 | unsafe impl Send for TimerShared {} |
| 354 | unsafe impl Sync for TimerShared {} |
| 355 | |
| 356 | impl std::fmt::Debug for TimerShared { |
| 357 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 358 | f&mut DebugStruct<'_, '_>.debug_struct("TimerShared" ) |
| 359 | .field("cached_when" , &self.cached_when.load(Ordering::Relaxed)) |
| 360 | .field(name:"state" , &self.state) |
| 361 | .finish() |
| 362 | } |
| 363 | } |
| 364 | |
| 365 | generate_addr_of_methods! { |
| 366 | impl<> TimerShared { |
| 367 | unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<TimerShared>> { |
| 368 | &self.pointers |
| 369 | } |
| 370 | } |
| 371 | } |
| 372 | |
| 373 | impl TimerShared { |
| 374 | pub(super) fn new(shard_id: u32) -> Self { |
| 375 | Self { |
| 376 | shard_id, |
| 377 | cached_when: AtomicU64::new(0), |
| 378 | pointers: linked_list::Pointers::new(), |
| 379 | state: StateCell::default(), |
| 380 | _p: PhantomPinned, |
| 381 | } |
| 382 | } |
| 383 | |
| 384 | /// Gets the cached time-of-expiration value. |
| 385 | pub(super) fn cached_when(&self) -> u64 { |
| 386 | // Cached-when is only accessed under the driver lock, so we can use relaxed |
| 387 | self.cached_when.load(Ordering::Relaxed) |
| 388 | } |
| 389 | |
| 390 | /// Gets the true time-of-expiration value, and copies it into the cached |
| 391 | /// time-of-expiration value. |
| 392 | /// |
| 393 | /// SAFETY: Must be called with the driver lock held, and when this entry is |
| 394 | /// not in any timer wheel lists. |
| 395 | pub(super) unsafe fn sync_when(&self) -> u64 { |
| 396 | let true_when = self.true_when(); |
| 397 | |
| 398 | self.cached_when.store(true_when, Ordering::Relaxed); |
| 399 | |
| 400 | true_when |
| 401 | } |
| 402 | |
| 403 | /// Sets the cached time-of-expiration value. |
| 404 | /// |
| 405 | /// SAFETY: Must be called with the driver lock held, and when this entry is |
| 406 | /// not in any timer wheel lists. |
| 407 | unsafe fn set_cached_when(&self, when: u64) { |
| 408 | self.cached_when.store(when, Ordering::Relaxed); |
| 409 | } |
| 410 | |
| 411 | /// Returns the true time-of-expiration value, with relaxed memory ordering. |
| 412 | pub(super) fn true_when(&self) -> u64 { |
| 413 | self.state.when().expect("Timer already fired" ) |
| 414 | } |
| 415 | |
| 416 | /// Sets the true time-of-expiration value, even if it is less than the |
| 417 | /// current expiration or the timer is deregistered. |
| 418 | /// |
| 419 | /// SAFETY: Must only be called with the driver lock held and the entry not |
| 420 | /// in the timer wheel. |
| 421 | pub(super) unsafe fn set_expiration(&self, t: u64) { |
| 422 | self.state.set_expiration(t); |
| 423 | self.cached_when.store(t, Ordering::Relaxed); |
| 424 | } |
| 425 | |
| 426 | /// Sets the true time-of-expiration only if it is after the current. |
| 427 | pub(super) fn extend_expiration(&self, t: u64) -> Result<(), ()> { |
| 428 | self.state.extend_expiration(t) |
| 429 | } |
| 430 | |
| 431 | /// Returns a `TimerHandle` for this timer. |
| 432 | pub(super) fn handle(&self) -> TimerHandle { |
| 433 | TimerHandle { |
| 434 | inner: NonNull::from(self), |
| 435 | } |
| 436 | } |
| 437 | |
| 438 | /// Returns true if the state of this timer indicates that the timer might |
| 439 | /// be registered with the driver. This check is performed with relaxed |
| 440 | /// ordering, but is conservative - if it returns false, the timer is |
| 441 | /// definitely _not_ registered. |
| 442 | pub(super) fn might_be_registered(&self) -> bool { |
| 443 | self.state.might_be_registered() |
| 444 | } |
| 445 | |
| 446 | /// Gets the shard id. |
| 447 | pub(super) fn shard_id(&self) -> u32 { |
| 448 | self.shard_id |
| 449 | } |
| 450 | } |
| 451 | |
| 452 | unsafe impl linked_list::Link for TimerShared { |
| 453 | type Handle = TimerHandle; |
| 454 | |
| 455 | type Target = TimerShared; |
| 456 | |
| 457 | fn as_raw(handle: &Self::Handle) -> NonNull<Self::Target> { |
| 458 | handle.inner |
| 459 | } |
| 460 | |
| 461 | unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle { |
| 462 | TimerHandle { inner: ptr } |
| 463 | } |
| 464 | |
| 465 | unsafe fn pointers( |
| 466 | target: NonNull<Self::Target>, |
| 467 | ) -> NonNull<linked_list::Pointers<Self::Target>> { |
| 468 | TimerShared::addr_of_pointers(me:target) |
| 469 | } |
| 470 | } |
| 471 | |
| 472 | // ===== impl Entry ===== |
| 473 | |
| 474 | impl TimerEntry { |
| 475 | #[track_caller ] |
| 476 | pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self { |
| 477 | // Panic if the time driver is not enabled |
| 478 | let _ = handle.driver().time(); |
| 479 | |
| 480 | Self { |
| 481 | driver: handle, |
| 482 | inner: StdUnsafeCell::new(None), |
| 483 | deadline, |
| 484 | registered: false, |
| 485 | _m: std::marker::PhantomPinned, |
| 486 | } |
| 487 | } |
| 488 | |
| 489 | fn is_inner_init(&self) -> bool { |
| 490 | unsafe { &*self.inner.get() }.is_some() |
| 491 | } |
| 492 | |
| 493 | // This lazy initialization is for performance purposes. |
| 494 | fn inner(&self) -> &TimerShared { |
| 495 | let inner = unsafe { &*self.inner.get() }; |
| 496 | if inner.is_none() { |
| 497 | let shard_size = self.driver.driver().time().inner.get_shard_size(); |
| 498 | let shard_id = generate_shard_id(shard_size); |
| 499 | unsafe { |
| 500 | *self.inner.get() = Some(TimerShared::new(shard_id)); |
| 501 | } |
| 502 | } |
| 503 | return inner.as_ref().unwrap(); |
| 504 | } |
| 505 | |
| 506 | pub(crate) fn deadline(&self) -> Instant { |
| 507 | self.deadline |
| 508 | } |
| 509 | |
| 510 | pub(crate) fn is_elapsed(&self) -> bool { |
| 511 | self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered |
| 512 | } |
| 513 | |
| 514 | /// Cancels and deregisters the timer. This operation is irreversible. |
| 515 | pub(crate) fn cancel(self: Pin<&mut Self>) { |
| 516 | // Avoid calling the `clear_entry` method, because it has not been initialized yet. |
| 517 | if !self.is_inner_init() { |
| 518 | return; |
| 519 | } |
| 520 | // We need to perform an acq/rel fence with the driver thread, and the |
| 521 | // simplest way to do so is to grab the driver lock. |
| 522 | // |
| 523 | // Why is this necessary? We're about to release this timer's memory for |
| 524 | // some other non-timer use. However, we've been doing a bunch of |
| 525 | // relaxed (or even non-atomic) writes from the driver thread, and we'll |
| 526 | // be doing more from _this thread_ (as this memory is interpreted as |
| 527 | // something else). |
| 528 | // |
| 529 | // It is critical to ensure that, from the point of view of the driver, |
| 530 | // those future non-timer writes happen-after the timer is fully fired, |
| 531 | // and from the purpose of this thread, the driver's writes all |
| 532 | // happen-before we drop the timer. This in turn requires us to perform |
| 533 | // an acquire-release barrier in _both_ directions between the driver |
| 534 | // and dropping thread. |
| 535 | // |
| 536 | // The lock acquisition in clear_entry serves this purpose. All of the |
| 537 | // driver manipulations happen with the lock held, so we can just take |
| 538 | // the lock and be sure that this drop happens-after everything the |
| 539 | // driver did so far and happens-before everything the driver does in |
| 540 | // the future. While we have the lock held, we also go ahead and |
| 541 | // deregister the entry if necessary. |
| 542 | unsafe { self.driver().clear_entry(NonNull::from(self.inner())) }; |
| 543 | } |
| 544 | |
| 545 | pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { |
| 546 | let this = unsafe { self.as_mut().get_unchecked_mut() }; |
| 547 | this.deadline = new_time; |
| 548 | this.registered = reregister; |
| 549 | |
| 550 | let tick = self.driver().time_source().deadline_to_tick(new_time); |
| 551 | |
| 552 | if self.inner().extend_expiration(tick).is_ok() { |
| 553 | return; |
| 554 | } |
| 555 | |
| 556 | if reregister { |
| 557 | unsafe { |
| 558 | self.driver() |
| 559 | .reregister(&self.driver.driver().io, tick, self.inner().into()); |
| 560 | } |
| 561 | } |
| 562 | } |
| 563 | |
| 564 | pub(crate) fn poll_elapsed( |
| 565 | mut self: Pin<&mut Self>, |
| 566 | cx: &mut Context<'_>, |
| 567 | ) -> Poll<Result<(), super::Error>> { |
| 568 | assert!( |
| 569 | !self.driver().is_shutdown(), |
| 570 | "{}" , |
| 571 | crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR |
| 572 | ); |
| 573 | |
| 574 | if !self.registered { |
| 575 | let deadline = self.deadline; |
| 576 | self.as_mut().reset(deadline, true); |
| 577 | } |
| 578 | |
| 579 | self.inner().state.poll(cx.waker()) |
| 580 | } |
| 581 | |
| 582 | pub(crate) fn driver(&self) -> &super::Handle { |
| 583 | self.driver.driver().time() |
| 584 | } |
| 585 | |
| 586 | #[cfg (all(tokio_unstable, feature = "tracing" ))] |
| 587 | pub(crate) fn clock(&self) -> &super::Clock { |
| 588 | self.driver.driver().clock() |
| 589 | } |
| 590 | } |
| 591 | |
| 592 | impl TimerHandle { |
| 593 | pub(super) unsafe fn cached_when(&self) -> u64 { |
| 594 | unsafe { self.inner.as_ref().cached_when() } |
| 595 | } |
| 596 | |
| 597 | pub(super) unsafe fn sync_when(&self) -> u64 { |
| 598 | unsafe { self.inner.as_ref().sync_when() } |
| 599 | } |
| 600 | |
| 601 | pub(super) unsafe fn is_pending(&self) -> bool { |
| 602 | unsafe { self.inner.as_ref().state.is_pending() } |
| 603 | } |
| 604 | |
| 605 | /// Forcibly sets the true and cached expiration times to the given tick. |
| 606 | /// |
| 607 | /// SAFETY: The caller must ensure that the handle remains valid, the driver |
| 608 | /// lock is held, and that the timer is not in any wheel linked lists. |
| 609 | pub(super) unsafe fn set_expiration(&self, tick: u64) { |
| 610 | self.inner.as_ref().set_expiration(tick); |
| 611 | } |
| 612 | |
| 613 | /// Attempts to mark this entry as pending. If the expiration time is after |
| 614 | /// `not_after`, however, returns an Err with the current expiration time. |
| 615 | /// |
| 616 | /// If an `Err` is returned, the `cached_when` value will be updated to this |
| 617 | /// new expiration time. |
| 618 | /// |
| 619 | /// SAFETY: The caller must ensure that the handle remains valid, the driver |
| 620 | /// lock is held, and that the timer is not in any wheel linked lists. |
| 621 | /// After returning Ok, the entry must be added to the pending list. |
| 622 | pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> { |
| 623 | match self.inner.as_ref().state.mark_pending(not_after) { |
| 624 | Ok(()) => { |
| 625 | // mark this as being on the pending queue in cached_when |
| 626 | self.inner.as_ref().set_cached_when(u64::MAX); |
| 627 | Ok(()) |
| 628 | } |
| 629 | Err(tick) => { |
| 630 | self.inner.as_ref().set_cached_when(tick); |
| 631 | Err(tick) |
| 632 | } |
| 633 | } |
| 634 | } |
| 635 | |
| 636 | /// Attempts to transition to a terminal state. If the state is already a |
| 637 | /// terminal state, does nothing. |
| 638 | /// |
| 639 | /// Because the entry might be dropped after the state is moved to a |
| 640 | /// terminal state, this function consumes the handle to ensure we don't |
| 641 | /// access the entry afterwards. |
| 642 | /// |
| 643 | /// Returns the last-registered waker, if any. |
| 644 | /// |
| 645 | /// SAFETY: The driver lock must be held while invoking this function, and |
| 646 | /// the entry must not be in any wheel linked lists. |
| 647 | pub(super) unsafe fn fire(self, completed_state: TimerResult) -> Option<Waker> { |
| 648 | self.inner.as_ref().state.fire(completed_state) |
| 649 | } |
| 650 | } |
| 651 | |
| 652 | impl Drop for TimerEntry { |
| 653 | fn drop(&mut self) { |
| 654 | unsafe { Pin::new_unchecked(self) }.as_mut().cancel(); |
| 655 | } |
| 656 | } |
| 657 | |
| 658 | // Generates a shard id. If current thread is a worker thread, we use its worker index as a shard id. |
| 659 | // Otherwise, we use a random number generator to obtain the shard id. |
| 660 | cfg_rt! { |
| 661 | fn generate_shard_id(shard_size: u32) -> u32 { |
| 662 | let id = context::with_scheduler(|ctx| match ctx { |
| 663 | Some(scheduler::Context::CurrentThread(_ctx)) => 0, |
| 664 | #[cfg (feature = "rt-multi-thread" )] |
| 665 | Some(scheduler::Context::MultiThread(ctx)) => ctx.get_worker_index() as u32, |
| 666 | #[cfg (all(tokio_unstable, feature = "rt-multi-thread" ))] |
| 667 | Some(scheduler::Context::MultiThreadAlt(ctx)) => ctx.get_worker_index() as u32, |
| 668 | None => context::thread_rng_n(shard_size), |
| 669 | }); |
| 670 | id % shard_size |
| 671 | } |
| 672 | } |
| 673 | |
| 674 | cfg_not_rt! { |
| 675 | fn generate_shard_id(shard_size: u32) -> u32 { |
| 676 | context::thread_rng_n(shard_size) |
| 677 | } |
| 678 | } |
| 679 | |