1 | // Currently, rust warns when an unsafe fn contains an unsafe {} block. However, |
2 | // in the future, this will change to the reverse. For now, suppress this |
3 | // warning and generally stick with being explicit about unsafety. |
4 | #![allow (unused_unsafe)] |
5 | #![cfg_attr (not(feature = "rt" ), allow(dead_code))] |
6 | |
7 | //! Time driver. |
8 | |
9 | mod entry; |
10 | pub(crate) use entry::TimerEntry; |
11 | use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION}; |
12 | |
13 | mod handle; |
14 | pub(crate) use self::handle::Handle; |
15 | |
16 | mod source; |
17 | pub(crate) use source::TimeSource; |
18 | |
19 | mod wheel; |
20 | |
21 | use crate::loom::sync::atomic::{AtomicBool, Ordering}; |
22 | use crate::loom::sync::Mutex; |
23 | use crate::runtime::driver::{self, IoHandle, IoStack}; |
24 | use crate::time::error::Error; |
25 | use crate::time::{Clock, Duration}; |
26 | |
27 | use std::fmt; |
28 | use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; |
29 | |
30 | /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. |
31 | /// |
32 | /// A `Driver` instance tracks the state necessary for managing time and |
33 | /// notifying the [`Sleep`][sleep] instances once their deadlines are reached. |
34 | /// |
35 | /// It is expected that a single instance manages many individual [`Sleep`][sleep] |
36 | /// instances. The `Driver` implementation is thread-safe and, as such, is able |
37 | /// to handle callers from across threads. |
38 | /// |
39 | /// After creating the `Driver` instance, the caller must repeatedly call `park` |
40 | /// or `park_timeout`. The time driver will perform no work unless `park` or |
41 | /// `park_timeout` is called repeatedly. |
42 | /// |
43 | /// The driver has a resolution of one millisecond. Any unit of time that falls |
44 | /// between milliseconds are rounded up to the next millisecond. |
45 | /// |
46 | /// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not |
47 | /// elapsed will be notified with an error. At this point, calling `poll` on the |
48 | /// [`Sleep`][sleep] instance will result in panic. |
49 | /// |
50 | /// # Implementation |
51 | /// |
52 | /// The time driver is based on the [paper by Varghese and Lauck][paper]. |
53 | /// |
54 | /// A hashed timing wheel is a vector of slots, where each slot handles a time |
55 | /// slice. As time progresses, the timer walks over the slot for the current |
56 | /// instant, and processes each entry for that slot. When the timer reaches the |
57 | /// end of the wheel, it starts again at the beginning. |
58 | /// |
59 | /// The implementation maintains six wheels arranged in a set of levels. As the |
60 | /// levels go up, the slots of the associated wheel represent larger intervals |
61 | /// of time. At each level, the wheel has 64 slots. Each slot covers a range of |
62 | /// time equal to the wheel at the lower level. At level zero, each slot |
63 | /// represents one millisecond of time. |
64 | /// |
65 | /// The wheels are: |
66 | /// |
67 | /// * Level 0: 64 x 1 millisecond slots. |
68 | /// * Level 1: 64 x 64 millisecond slots. |
69 | /// * Level 2: 64 x ~4 second slots. |
70 | /// * Level 3: 64 x ~4 minute slots. |
71 | /// * Level 4: 64 x ~4 hour slots. |
72 | /// * Level 5: 64 x ~12 day slots. |
73 | /// |
74 | /// When the timer processes entries at level zero, it will notify all the |
75 | /// `Sleep` instances as their deadlines have been reached. For all higher |
76 | /// levels, all entries will be redistributed across the wheel at the next level |
77 | /// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will |
78 | /// either be canceled (dropped) or their associated entries will reach level |
79 | /// zero and be notified. |
80 | /// |
81 | /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf |
82 | /// [sleep]: crate::time::Sleep |
83 | /// [timeout]: crate::time::Timeout |
84 | /// [interval]: crate::time::Interval |
85 | #[derive (Debug)] |
86 | pub(crate) struct Driver { |
87 | /// Parker to delegate to. |
88 | park: IoStack, |
89 | } |
90 | |
91 | /// Timer state shared between `Driver`, `Handle`, and `Registration`. |
92 | struct Inner { |
93 | // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex |
94 | pub(super) state: Mutex<InnerState>, |
95 | |
96 | /// True if the driver is being shutdown. |
97 | pub(super) is_shutdown: AtomicBool, |
98 | |
99 | // When `true`, a call to `park_timeout` should immediately return and time |
100 | // should not advance. One reason for this to be `true` is if the task |
101 | // passed to `Runtime::block_on` called `task::yield_now()`. |
102 | // |
103 | // While it may look racy, it only has any effect when the clock is paused |
104 | // and pausing the clock is restricted to a single-threaded runtime. |
105 | #[cfg (feature = "test-util" )] |
106 | did_wake: AtomicBool, |
107 | } |
108 | |
109 | /// Time state shared which must be protected by a `Mutex` |
110 | struct InnerState { |
111 | /// The last published timer `elapsed` value. |
112 | elapsed: u64, |
113 | |
114 | /// The earliest time at which we promise to wake up without unparking. |
115 | next_wake: Option<NonZeroU64>, |
116 | |
117 | /// Timer wheel. |
118 | wheel: wheel::Wheel, |
119 | } |
120 | |
121 | // ===== impl Driver ===== |
122 | |
123 | impl Driver { |
124 | /// Creates a new `Driver` instance that uses `park` to block the current |
125 | /// thread and `time_source` to get the current time and convert to ticks. |
126 | /// |
127 | /// Specifying the source of time is useful when testing. |
128 | pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { |
129 | let time_source = TimeSource::new(clock); |
130 | |
131 | let handle = Handle { |
132 | time_source, |
133 | inner: Inner { |
134 | state: Mutex::new(InnerState { |
135 | elapsed: 0, |
136 | next_wake: None, |
137 | wheel: wheel::Wheel::new(), |
138 | }), |
139 | is_shutdown: AtomicBool::new(false), |
140 | |
141 | #[cfg (feature = "test-util" )] |
142 | did_wake: AtomicBool::new(false), |
143 | }, |
144 | }; |
145 | |
146 | let driver = Driver { park }; |
147 | |
148 | (driver, handle) |
149 | } |
150 | |
151 | pub(crate) fn park(&mut self, handle: &driver::Handle) { |
152 | self.park_internal(handle, None) |
153 | } |
154 | |
155 | pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { |
156 | self.park_internal(handle, Some(duration)) |
157 | } |
158 | |
159 | pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { |
160 | let handle = rt_handle.time(); |
161 | |
162 | if handle.is_shutdown() { |
163 | return; |
164 | } |
165 | |
166 | handle.inner.is_shutdown.store(true, Ordering::SeqCst); |
167 | |
168 | // Advance time forward to the end of time. |
169 | |
170 | handle.process_at_time(u64::MAX); |
171 | |
172 | self.park.shutdown(rt_handle); |
173 | } |
174 | |
175 | fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) { |
176 | let handle = rt_handle.time(); |
177 | let mut lock = handle.inner.state.lock(); |
178 | |
179 | assert!(!handle.is_shutdown()); |
180 | |
181 | let next_wake = lock.wheel.next_expiration_time(); |
182 | lock.next_wake = |
183 | next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); |
184 | |
185 | drop(lock); |
186 | |
187 | match next_wake { |
188 | Some(when) => { |
189 | let now = handle.time_source.now(rt_handle.clock()); |
190 | // Note that we effectively round up to 1ms here - this avoids |
191 | // very short-duration microsecond-resolution sleeps that the OS |
192 | // might treat as zero-length. |
193 | let mut duration = handle |
194 | .time_source |
195 | .tick_to_duration(when.saturating_sub(now)); |
196 | |
197 | if duration > Duration::from_millis(0) { |
198 | if let Some(limit) = limit { |
199 | duration = std::cmp::min(limit, duration); |
200 | } |
201 | |
202 | self.park_thread_timeout(rt_handle, duration); |
203 | } else { |
204 | self.park.park_timeout(rt_handle, Duration::from_secs(0)); |
205 | } |
206 | } |
207 | None => { |
208 | if let Some(duration) = limit { |
209 | self.park_thread_timeout(rt_handle, duration); |
210 | } else { |
211 | self.park.park(rt_handle); |
212 | } |
213 | } |
214 | } |
215 | |
216 | // Process pending timers after waking up |
217 | handle.process(rt_handle.clock()); |
218 | } |
219 | |
220 | cfg_test_util! { |
221 | fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
222 | let handle = rt_handle.time(); |
223 | let clock = rt_handle.clock(); |
224 | |
225 | if clock.can_auto_advance() { |
226 | self.park.park_timeout(rt_handle, Duration::from_secs(0)); |
227 | |
228 | // If the time driver was woken, then the park completed |
229 | // before the "duration" elapsed (usually caused by a |
230 | // yield in `Runtime::block_on`). In this case, we don't |
231 | // advance the clock. |
232 | if !handle.did_wake() { |
233 | // Simulate advancing time |
234 | if let Err(msg) = clock.advance(duration) { |
235 | panic!("{}" , msg); |
236 | } |
237 | } |
238 | } else { |
239 | self.park.park_timeout(rt_handle, duration); |
240 | } |
241 | } |
242 | } |
243 | |
244 | cfg_not_test_util! { |
245 | fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
246 | self.park.park_timeout(rt_handle, duration); |
247 | } |
248 | } |
249 | } |
250 | |
251 | impl Handle { |
252 | /// Runs timer related logic, and returns the next wakeup time |
253 | pub(self) fn process(&self, clock: &Clock) { |
254 | let now = self.time_source().now(clock); |
255 | |
256 | self.process_at_time(now) |
257 | } |
258 | |
259 | pub(self) fn process_at_time(&self, mut now: u64) { |
260 | let mut waker_list: [Option<Waker>; 32] = Default::default(); |
261 | let mut waker_idx = 0; |
262 | |
263 | let mut lock = self.inner.lock(); |
264 | |
265 | if now < lock.elapsed { |
266 | // Time went backwards! This normally shouldn't happen as the Rust language |
267 | // guarantees that an Instant is monotonic, but can happen when running |
268 | // Linux in a VM on a Windows host due to std incorrectly trusting the |
269 | // hardware clock to be monotonic. |
270 | // |
271 | // See <https://github.com/tokio-rs/tokio/issues/3619> for more information. |
272 | now = lock.elapsed; |
273 | } |
274 | |
275 | while let Some(entry) = lock.wheel.poll(now) { |
276 | debug_assert!(unsafe { entry.is_pending() }); |
277 | |
278 | // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. |
279 | if let Some(waker) = unsafe { entry.fire(Ok(())) } { |
280 | waker_list[waker_idx] = Some(waker); |
281 | |
282 | waker_idx += 1; |
283 | |
284 | if waker_idx == waker_list.len() { |
285 | // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. |
286 | drop(lock); |
287 | |
288 | for waker in waker_list.iter_mut() { |
289 | waker.take().unwrap().wake(); |
290 | } |
291 | |
292 | waker_idx = 0; |
293 | |
294 | lock = self.inner.lock(); |
295 | } |
296 | } |
297 | } |
298 | |
299 | // Update the elapsed cache |
300 | lock.elapsed = lock.wheel.elapsed(); |
301 | lock.next_wake = lock |
302 | .wheel |
303 | .poll_at() |
304 | .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); |
305 | |
306 | drop(lock); |
307 | |
308 | for waker in waker_list[0..waker_idx].iter_mut() { |
309 | waker.take().unwrap().wake(); |
310 | } |
311 | } |
312 | |
313 | /// Removes a registered timer from the driver. |
314 | /// |
315 | /// The timer will be moved to the cancelled state. Wakers will _not_ be |
316 | /// invoked. If the timer is already completed, this function is a no-op. |
317 | /// |
318 | /// This function always acquires the driver lock, even if the entry does |
319 | /// not appear to be registered. |
320 | /// |
321 | /// SAFETY: The timer must not be registered with some other driver, and |
322 | /// `add_entry` must not be called concurrently. |
323 | pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { |
324 | unsafe { |
325 | let mut lock = self.inner.lock(); |
326 | |
327 | if entry.as_ref().might_be_registered() { |
328 | lock.wheel.remove(entry); |
329 | } |
330 | |
331 | entry.as_ref().handle().fire(Ok(())); |
332 | } |
333 | } |
334 | |
335 | /// Removes and re-adds an entry to the driver. |
336 | /// |
337 | /// SAFETY: The timer must be either unregistered, or registered with this |
338 | /// driver. No other threads are allowed to concurrently manipulate the |
339 | /// timer at all (the current thread should hold an exclusive reference to |
340 | /// the `TimerEntry`) |
341 | pub(self) unsafe fn reregister( |
342 | &self, |
343 | unpark: &IoHandle, |
344 | new_tick: u64, |
345 | entry: NonNull<TimerShared>, |
346 | ) { |
347 | let waker = unsafe { |
348 | let mut lock = self.inner.lock(); |
349 | |
350 | // We may have raced with a firing/deregistration, so check before |
351 | // deregistering. |
352 | if unsafe { entry.as_ref().might_be_registered() } { |
353 | lock.wheel.remove(entry); |
354 | } |
355 | |
356 | // Now that we have exclusive control of this entry, mint a handle to reinsert it. |
357 | let entry = entry.as_ref().handle(); |
358 | |
359 | if self.is_shutdown() { |
360 | unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } |
361 | } else { |
362 | entry.set_expiration(new_tick); |
363 | |
364 | // Note: We don't have to worry about racing with some other resetting |
365 | // thread, because add_entry and reregister require exclusive control of |
366 | // the timer entry. |
367 | match unsafe { lock.wheel.insert(entry) } { |
368 | Ok(when) => { |
369 | if lock |
370 | .next_wake |
371 | .map(|next_wake| when < next_wake.get()) |
372 | .unwrap_or(true) |
373 | { |
374 | unpark.unpark(); |
375 | } |
376 | |
377 | None |
378 | } |
379 | Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe { |
380 | entry.fire(Ok(())) |
381 | }, |
382 | } |
383 | } |
384 | |
385 | // Must release lock before invoking waker to avoid the risk of deadlock. |
386 | }; |
387 | |
388 | // The timer was fired synchronously as a result of the reregistration. |
389 | // Wake the waker; this is needed because we might reset _after_ a poll, |
390 | // and otherwise the task won't be awoken to poll again. |
391 | if let Some(waker) = waker { |
392 | waker.wake(); |
393 | } |
394 | } |
395 | |
396 | cfg_test_util! { |
397 | fn did_wake(&self) -> bool { |
398 | self.inner.did_wake.swap(false, Ordering::SeqCst) |
399 | } |
400 | } |
401 | } |
402 | |
403 | // ===== impl Inner ===== |
404 | |
405 | impl Inner { |
406 | /// Locks the driver's inner structure |
407 | pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { |
408 | self.state.lock() |
409 | } |
410 | |
411 | // Check whether the driver has been shutdown |
412 | pub(super) fn is_shutdown(&self) -> bool { |
413 | self.is_shutdown.load(order:Ordering::SeqCst) |
414 | } |
415 | } |
416 | |
417 | impl fmt::Debug for Inner { |
418 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
419 | fmt.debug_struct(name:"Inner" ).finish() |
420 | } |
421 | } |
422 | |
423 | #[cfg (test)] |
424 | mod tests; |
425 | |