1 | // Currently, rust warns when an unsafe fn contains an unsafe {} block. However, |
2 | // in the future, this will change to the reverse. For now, suppress this |
3 | // warning and generally stick with being explicit about unsafety. |
4 | #![allow (unused_unsafe)] |
5 | #![cfg_attr (not(feature = "rt" ), allow(dead_code))] |
6 | |
7 | //! Time driver. |
8 | |
9 | mod entry; |
10 | pub(crate) use entry::TimerEntry; |
11 | use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION}; |
12 | |
13 | mod handle; |
14 | pub(crate) use self::handle::Handle; |
15 | |
16 | mod source; |
17 | pub(crate) use source::TimeSource; |
18 | |
19 | mod wheel; |
20 | |
21 | use crate::loom::sync::atomic::{AtomicBool, Ordering}; |
22 | use crate::loom::sync::Mutex; |
23 | use crate::runtime::driver::{self, IoHandle, IoStack}; |
24 | use crate::time::error::Error; |
25 | use crate::time::{Clock, Duration}; |
26 | |
27 | use std::fmt; |
28 | use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; |
29 | |
30 | /// Time implementation that drives [`Sleep`][sleep], [`Interval`][interval], and [`Timeout`][timeout]. |
31 | /// |
32 | /// A `Driver` instance tracks the state necessary for managing time and |
33 | /// notifying the [`Sleep`][sleep] instances once their deadlines are reached. |
34 | /// |
35 | /// It is expected that a single instance manages many individual [`Sleep`][sleep] |
36 | /// instances. The `Driver` implementation is thread-safe and, as such, is able |
37 | /// to handle callers from across threads. |
38 | /// |
39 | /// After creating the `Driver` instance, the caller must repeatedly call `park` |
40 | /// or `park_timeout`. The time driver will perform no work unless `park` or |
41 | /// `park_timeout` is called repeatedly. |
42 | /// |
43 | /// The driver has a resolution of one millisecond. Any unit of time that falls |
44 | /// between milliseconds are rounded up to the next millisecond. |
45 | /// |
46 | /// When an instance is dropped, any outstanding [`Sleep`][sleep] instance that has not |
47 | /// elapsed will be notified with an error. At this point, calling `poll` on the |
48 | /// [`Sleep`][sleep] instance will result in panic. |
49 | /// |
50 | /// # Implementation |
51 | /// |
52 | /// The time driver is based on the [paper by Varghese and Lauck][paper]. |
53 | /// |
54 | /// A hashed timing wheel is a vector of slots, where each slot handles a time |
55 | /// slice. As time progresses, the timer walks over the slot for the current |
56 | /// instant, and processes each entry for that slot. When the timer reaches the |
57 | /// end of the wheel, it starts again at the beginning. |
58 | /// |
59 | /// The implementation maintains six wheels arranged in a set of levels. As the |
60 | /// levels go up, the slots of the associated wheel represent larger intervals |
61 | /// of time. At each level, the wheel has 64 slots. Each slot covers a range of |
62 | /// time equal to the wheel at the lower level. At level zero, each slot |
63 | /// represents one millisecond of time. |
64 | /// |
65 | /// The wheels are: |
66 | /// |
67 | /// * Level 0: 64 x 1 millisecond slots. |
68 | /// * Level 1: 64 x 64 millisecond slots. |
69 | /// * Level 2: 64 x ~4 second slots. |
70 | /// * Level 3: 64 x ~4 minute slots. |
71 | /// * Level 4: 64 x ~4 hour slots. |
72 | /// * Level 5: 64 x ~12 day slots. |
73 | /// |
74 | /// When the timer processes entries at level zero, it will notify all the |
75 | /// `Sleep` instances as their deadlines have been reached. For all higher |
76 | /// levels, all entries will be redistributed across the wheel at the next level |
77 | /// down. Eventually, as time progresses, entries with [`Sleep`][sleep] instances will |
78 | /// either be canceled (dropped) or their associated entries will reach level |
79 | /// zero and be notified. |
80 | /// |
81 | /// [paper]: http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf |
82 | /// [sleep]: crate::time::Sleep |
83 | /// [timeout]: crate::time::Timeout |
84 | /// [interval]: crate::time::Interval |
85 | #[derive(Debug)] |
86 | pub(crate) struct Driver { |
87 | /// Parker to delegate to. |
88 | park: IoStack, |
89 | } |
90 | |
91 | /// Timer state shared between `Driver`, `Handle`, and `Registration`. |
92 | struct Inner { |
93 | // The state is split like this so `Handle` can access `is_shutdown` without locking the mutex |
94 | pub(super) state: Mutex<InnerState>, |
95 | |
96 | /// True if the driver is being shutdown. |
97 | pub(super) is_shutdown: AtomicBool, |
98 | |
99 | // When `true`, a call to `park_timeout` should immediately return and time |
100 | // should not advance. One reason for this to be `true` is if the task |
101 | // passed to `Runtime::block_on` called `task::yield_now()`. |
102 | // |
103 | // While it may look racy, it only has any effect when the clock is paused |
104 | // and pausing the clock is restricted to a single-threaded runtime. |
105 | #[cfg (feature = "test-util" )] |
106 | did_wake: AtomicBool, |
107 | } |
108 | |
109 | /// Time state shared which must be protected by a `Mutex` |
110 | struct InnerState { |
111 | /// The earliest time at which we promise to wake up without unparking. |
112 | next_wake: Option<NonZeroU64>, |
113 | |
114 | /// Timer wheel. |
115 | wheel: wheel::Wheel, |
116 | } |
117 | |
118 | // ===== impl Driver ===== |
119 | |
120 | impl Driver { |
121 | /// Creates a new `Driver` instance that uses `park` to block the current |
122 | /// thread and `time_source` to get the current time and convert to ticks. |
123 | /// |
124 | /// Specifying the source of time is useful when testing. |
125 | pub(crate) fn new(park: IoStack, clock: &Clock) -> (Driver, Handle) { |
126 | let time_source = TimeSource::new(clock); |
127 | |
128 | let handle = Handle { |
129 | time_source, |
130 | inner: Inner { |
131 | state: Mutex::new(InnerState { |
132 | next_wake: None, |
133 | wheel: wheel::Wheel::new(), |
134 | }), |
135 | is_shutdown: AtomicBool::new(false), |
136 | |
137 | #[cfg (feature = "test-util" )] |
138 | did_wake: AtomicBool::new(false), |
139 | }, |
140 | }; |
141 | |
142 | let driver = Driver { park }; |
143 | |
144 | (driver, handle) |
145 | } |
146 | |
147 | pub(crate) fn park(&mut self, handle: &driver::Handle) { |
148 | self.park_internal(handle, None); |
149 | } |
150 | |
151 | pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { |
152 | self.park_internal(handle, Some(duration)); |
153 | } |
154 | |
155 | pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { |
156 | let handle = rt_handle.time(); |
157 | |
158 | if handle.is_shutdown() { |
159 | return; |
160 | } |
161 | |
162 | handle.inner.is_shutdown.store(true, Ordering::SeqCst); |
163 | |
164 | // Advance time forward to the end of time. |
165 | |
166 | handle.process_at_time(u64::MAX); |
167 | |
168 | self.park.shutdown(rt_handle); |
169 | } |
170 | |
171 | fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) { |
172 | let handle = rt_handle.time(); |
173 | let mut lock = handle.inner.state.lock(); |
174 | |
175 | assert!(!handle.is_shutdown()); |
176 | |
177 | let next_wake = lock.wheel.next_expiration_time(); |
178 | lock.next_wake = |
179 | next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); |
180 | |
181 | drop(lock); |
182 | |
183 | match next_wake { |
184 | Some(when) => { |
185 | let now = handle.time_source.now(rt_handle.clock()); |
186 | // Note that we effectively round up to 1ms here - this avoids |
187 | // very short-duration microsecond-resolution sleeps that the OS |
188 | // might treat as zero-length. |
189 | let mut duration = handle |
190 | .time_source |
191 | .tick_to_duration(when.saturating_sub(now)); |
192 | |
193 | if duration > Duration::from_millis(0) { |
194 | if let Some(limit) = limit { |
195 | duration = std::cmp::min(limit, duration); |
196 | } |
197 | |
198 | self.park_thread_timeout(rt_handle, duration); |
199 | } else { |
200 | self.park.park_timeout(rt_handle, Duration::from_secs(0)); |
201 | } |
202 | } |
203 | None => { |
204 | if let Some(duration) = limit { |
205 | self.park_thread_timeout(rt_handle, duration); |
206 | } else { |
207 | self.park.park(rt_handle); |
208 | } |
209 | } |
210 | } |
211 | |
212 | // Process pending timers after waking up |
213 | handle.process(rt_handle.clock()); |
214 | } |
215 | |
216 | cfg_test_util! { |
217 | fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
218 | let handle = rt_handle.time(); |
219 | let clock = rt_handle.clock(); |
220 | |
221 | if clock.can_auto_advance() { |
222 | self.park.park_timeout(rt_handle, Duration::from_secs(0)); |
223 | |
224 | // If the time driver was woken, then the park completed |
225 | // before the "duration" elapsed (usually caused by a |
226 | // yield in `Runtime::block_on`). In this case, we don't |
227 | // advance the clock. |
228 | if !handle.did_wake() { |
229 | // Simulate advancing time |
230 | if let Err(msg) = clock.advance(duration) { |
231 | panic!("{}" , msg); |
232 | } |
233 | } |
234 | } else { |
235 | self.park.park_timeout(rt_handle, duration); |
236 | } |
237 | } |
238 | } |
239 | |
240 | cfg_not_test_util! { |
241 | fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { |
242 | self.park.park_timeout(rt_handle, duration); |
243 | } |
244 | } |
245 | } |
246 | |
247 | impl Handle { |
248 | /// Runs timer related logic, and returns the next wakeup time |
249 | pub(self) fn process(&self, clock: &Clock) { |
250 | let now = self.time_source().now(clock); |
251 | |
252 | self.process_at_time(now); |
253 | } |
254 | |
255 | pub(self) fn process_at_time(&self, mut now: u64) { |
256 | let mut waker_list: [Option<Waker>; 32] = Default::default(); |
257 | let mut waker_idx = 0; |
258 | |
259 | let mut lock = self.inner.lock(); |
260 | |
261 | if now < lock.wheel.elapsed() { |
262 | // Time went backwards! This normally shouldn't happen as the Rust language |
263 | // guarantees that an Instant is monotonic, but can happen when running |
264 | // Linux in a VM on a Windows host due to std incorrectly trusting the |
265 | // hardware clock to be monotonic. |
266 | // |
267 | // See <https://github.com/tokio-rs/tokio/issues/3619> for more information. |
268 | now = lock.wheel.elapsed(); |
269 | } |
270 | |
271 | while let Some(entry) = lock.wheel.poll(now) { |
272 | debug_assert!(unsafe { entry.is_pending() }); |
273 | |
274 | // SAFETY: We hold the driver lock, and just removed the entry from any linked lists. |
275 | if let Some(waker) = unsafe { entry.fire(Ok(())) } { |
276 | waker_list[waker_idx] = Some(waker); |
277 | |
278 | waker_idx += 1; |
279 | |
280 | if waker_idx == waker_list.len() { |
281 | // Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped. |
282 | drop(lock); |
283 | |
284 | for waker in waker_list.iter_mut() { |
285 | waker.take().unwrap().wake(); |
286 | } |
287 | |
288 | waker_idx = 0; |
289 | |
290 | lock = self.inner.lock(); |
291 | } |
292 | } |
293 | } |
294 | |
295 | lock.next_wake = lock |
296 | .wheel |
297 | .poll_at() |
298 | .map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap())); |
299 | |
300 | drop(lock); |
301 | |
302 | for waker in &mut waker_list[0..waker_idx] { |
303 | waker.take().unwrap().wake(); |
304 | } |
305 | } |
306 | |
307 | /// Removes a registered timer from the driver. |
308 | /// |
309 | /// The timer will be moved to the cancelled state. Wakers will _not_ be |
310 | /// invoked. If the timer is already completed, this function is a no-op. |
311 | /// |
312 | /// This function always acquires the driver lock, even if the entry does |
313 | /// not appear to be registered. |
314 | /// |
315 | /// SAFETY: The timer must not be registered with some other driver, and |
316 | /// `add_entry` must not be called concurrently. |
317 | pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) { |
318 | unsafe { |
319 | let mut lock = self.inner.lock(); |
320 | |
321 | if entry.as_ref().might_be_registered() { |
322 | lock.wheel.remove(entry); |
323 | } |
324 | |
325 | entry.as_ref().handle().fire(Ok(())); |
326 | } |
327 | } |
328 | |
329 | /// Removes and re-adds an entry to the driver. |
330 | /// |
331 | /// SAFETY: The timer must be either unregistered, or registered with this |
332 | /// driver. No other threads are allowed to concurrently manipulate the |
333 | /// timer at all (the current thread should hold an exclusive reference to |
334 | /// the `TimerEntry`) |
335 | pub(self) unsafe fn reregister( |
336 | &self, |
337 | unpark: &IoHandle, |
338 | new_tick: u64, |
339 | entry: NonNull<TimerShared>, |
340 | ) { |
341 | let waker = unsafe { |
342 | let mut lock = self.inner.lock(); |
343 | |
344 | // We may have raced with a firing/deregistration, so check before |
345 | // deregistering. |
346 | if unsafe { entry.as_ref().might_be_registered() } { |
347 | lock.wheel.remove(entry); |
348 | } |
349 | |
350 | // Now that we have exclusive control of this entry, mint a handle to reinsert it. |
351 | let entry = entry.as_ref().handle(); |
352 | |
353 | if self.is_shutdown() { |
354 | unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) } |
355 | } else { |
356 | entry.set_expiration(new_tick); |
357 | |
358 | // Note: We don't have to worry about racing with some other resetting |
359 | // thread, because add_entry and reregister require exclusive control of |
360 | // the timer entry. |
361 | match unsafe { lock.wheel.insert(entry) } { |
362 | Ok(when) => { |
363 | if lock |
364 | .next_wake |
365 | .map(|next_wake| when < next_wake.get()) |
366 | .unwrap_or(true) |
367 | { |
368 | unpark.unpark(); |
369 | } |
370 | |
371 | None |
372 | } |
373 | Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe { |
374 | entry.fire(Ok(())) |
375 | }, |
376 | } |
377 | } |
378 | |
379 | // Must release lock before invoking waker to avoid the risk of deadlock. |
380 | }; |
381 | |
382 | // The timer was fired synchronously as a result of the reregistration. |
383 | // Wake the waker; this is needed because we might reset _after_ a poll, |
384 | // and otherwise the task won't be awoken to poll again. |
385 | if let Some(waker) = waker { |
386 | waker.wake(); |
387 | } |
388 | } |
389 | |
390 | cfg_test_util! { |
391 | fn did_wake(&self) -> bool { |
392 | self.inner.did_wake.swap(false, Ordering::SeqCst) |
393 | } |
394 | } |
395 | } |
396 | |
397 | // ===== impl Inner ===== |
398 | |
399 | impl Inner { |
400 | /// Locks the driver's inner structure |
401 | pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { |
402 | self.state.lock() |
403 | } |
404 | |
405 | // Check whether the driver has been shutdown |
406 | pub(super) fn is_shutdown(&self) -> bool { |
407 | self.is_shutdown.load(Ordering::SeqCst) |
408 | } |
409 | } |
410 | |
411 | impl fmt::Debug for Inner { |
412 | fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { |
413 | fmt.debug_struct("Inner" ).finish() |
414 | } |
415 | } |
416 | |
417 | #[cfg (test)] |
418 | mod tests; |
419 | |