1mod level;
2pub(crate) use self::level::Expiration;
3use self::level::Level;
4
5mod stack;
6pub(crate) use self::stack::Stack;
7
8use std::borrow::Borrow;
9use std::fmt::Debug;
10use std::usize;
11
12/// Timing wheel implementation.
13///
14/// This type provides the hashed timing wheel implementation that backs `Timer`
15/// and `DelayQueue`.
16///
17/// The structure is generic over `T: Stack`. This allows handling timeout data
18/// being stored on the heap or in a slab. In order to support the latter case,
19/// the slab must be passed into each function allowing the implementation to
20/// lookup timer entries.
21///
22/// See `Timer` documentation for some implementation notes.
23#[derive(Debug)]
24pub(crate) struct Wheel<T> {
25 /// The number of milliseconds elapsed since the wheel started.
26 elapsed: u64,
27
28 /// Timer wheel.
29 ///
30 /// Levels:
31 ///
32 /// * 1 ms slots / 64 ms range
33 /// * 64 ms slots / ~ 4 sec range
34 /// * ~ 4 sec slots / ~ 4 min range
35 /// * ~ 4 min slots / ~ 4 hr range
36 /// * ~ 4 hr slots / ~ 12 day range
37 /// * ~ 12 day slots / ~ 2 yr range
38 levels: Vec<Level<T>>,
39}
40
41/// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
42/// each, the timer is able to track time up to 2 years into the future with a
43/// precision of 1 millisecond.
44const NUM_LEVELS: usize = 6;
45
46/// The maximum duration of a delay
47const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
48
49#[derive(Debug)]
50pub(crate) enum InsertError {
51 Elapsed,
52 Invalid,
53}
54
55impl<T> Wheel<T>
56where
57 T: Stack,
58{
59 /// Create a new timing wheel
60 pub(crate) fn new() -> Wheel<T> {
61 let levels = (0..NUM_LEVELS).map(Level::new).collect();
62
63 Wheel { elapsed: 0, levels }
64 }
65
66 /// Return the number of milliseconds that have elapsed since the timing
67 /// wheel's creation.
68 pub(crate) fn elapsed(&self) -> u64 {
69 self.elapsed
70 }
71
72 /// Insert an entry into the timing wheel.
73 ///
74 /// # Arguments
75 ///
76 /// * `when`: is the instant at which the entry should be fired. It is
77 /// represented as the number of milliseconds since the creation
78 /// of the timing wheel.
79 ///
80 /// * `item`: The item to insert into the wheel.
81 ///
82 /// * `store`: The slab or `()` when using heap storage.
83 ///
84 /// # Return
85 ///
86 /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
87 ///
88 /// `Err(Elapsed)` indicates that `when` represents an instant that has
89 /// already passed. In this case, the caller should fire the timeout
90 /// immediately.
91 ///
92 /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
93 pub(crate) fn insert(
94 &mut self,
95 when: u64,
96 item: T::Owned,
97 store: &mut T::Store,
98 ) -> Result<(), (T::Owned, InsertError)> {
99 if when <= self.elapsed {
100 return Err((item, InsertError::Elapsed));
101 } else if when - self.elapsed > MAX_DURATION {
102 return Err((item, InsertError::Invalid));
103 }
104
105 // Get the level at which the entry should be stored
106 let level = self.level_for(when);
107
108 self.levels[level].add_entry(when, item, store);
109
110 debug_assert!({
111 self.levels[level]
112 .next_expiration(self.elapsed)
113 .map(|e| e.deadline >= self.elapsed)
114 .unwrap_or(true)
115 });
116
117 Ok(())
118 }
119
120 /// Remove `item` from the timing wheel.
121 #[track_caller]
122 pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
123 let when = T::when(item, store);
124
125 assert!(
126 self.elapsed <= when,
127 "elapsed={}; when={}",
128 self.elapsed,
129 when
130 );
131
132 let level = self.level_for(when);
133
134 self.levels[level].remove_entry(when, item, store);
135 }
136
137 /// Instant at which to poll
138 pub(crate) fn poll_at(&self) -> Option<u64> {
139 self.next_expiration().map(|expiration| expiration.deadline)
140 }
141
142 /// Next key that will expire
143 pub(crate) fn peek(&self) -> Option<T::Owned> {
144 self.next_expiration()
145 .and_then(|expiration| self.peek_entry(&expiration))
146 }
147
148 /// Advances the timer up to the instant represented by `now`.
149 pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
150 loop {
151 let expiration = self.next_expiration().and_then(|expiration| {
152 if expiration.deadline > now {
153 None
154 } else {
155 Some(expiration)
156 }
157 });
158
159 match expiration {
160 Some(ref expiration) => {
161 if let Some(item) = self.poll_expiration(expiration, store) {
162 return Some(item);
163 }
164
165 self.set_elapsed(expiration.deadline);
166 }
167 None => {
168 // in this case the poll did not indicate an expiration
169 // _and_ we were not able to find a next expiration in
170 // the current list of timers. advance to the poll's
171 // current time and do nothing else.
172 self.set_elapsed(now);
173 return None;
174 }
175 }
176 }
177 }
178
179 /// Returns the instant at which the next timeout expires.
180 fn next_expiration(&self) -> Option<Expiration> {
181 // Check all levels
182 for level in 0..NUM_LEVELS {
183 if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
184 // There cannot be any expirations at a higher level that happen
185 // before this one.
186 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
187
188 return Some(expiration);
189 }
190 }
191
192 None
193 }
194
195 /// Used for debug assertions
196 fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
197 let mut res = true;
198
199 for l2 in start_level..NUM_LEVELS {
200 if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
201 if e2.deadline < before {
202 res = false;
203 }
204 }
205 }
206
207 res
208 }
209
210 /// iteratively find entries that are between the wheel's current
211 /// time and the expiration time. for each in that population either
212 /// return it for notification (in the case of the last level) or tier
213 /// it down to the next level (in all other cases).
214 pub(crate) fn poll_expiration(
215 &mut self,
216 expiration: &Expiration,
217 store: &mut T::Store,
218 ) -> Option<T::Owned> {
219 while let Some(item) = self.pop_entry(expiration, store) {
220 if expiration.level == 0 {
221 debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
222
223 return Some(item);
224 } else {
225 let when = T::when(item.borrow(), store);
226
227 let next_level = expiration.level - 1;
228
229 self.levels[next_level].add_entry(when, item, store);
230 }
231 }
232
233 None
234 }
235
236 fn set_elapsed(&mut self, when: u64) {
237 assert!(
238 self.elapsed <= when,
239 "elapsed={:?}; when={:?}",
240 self.elapsed,
241 when
242 );
243
244 if when > self.elapsed {
245 self.elapsed = when;
246 }
247 }
248
249 fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
250 self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
251 }
252
253 fn peek_entry(&self, expiration: &Expiration) -> Option<T::Owned> {
254 self.levels[expiration.level].peek_entry_slot(expiration.slot)
255 }
256
257 fn level_for(&self, when: u64) -> usize {
258 level_for(self.elapsed, when)
259 }
260}
261
262fn level_for(elapsed: u64, when: u64) -> usize {
263 const SLOT_MASK: u64 = (1 << 6) - 1;
264
265 // Mask in the trailing bits ignored by the level calculation in order to cap
266 // the possible leading zeros
267 let mut masked = elapsed ^ when | SLOT_MASK;
268 if masked >= MAX_DURATION {
269 // Fudge the timer into the top level
270 masked = MAX_DURATION - 1;
271 }
272 let leading_zeros = masked.leading_zeros() as usize;
273 let significant = 63 - leading_zeros;
274 significant / 6
275}
276
277#[cfg(all(test, not(loom)))]
278mod test {
279 use super::*;
280
281 #[test]
282 fn test_level_for() {
283 for pos in 0..64 {
284 assert_eq!(
285 0,
286 level_for(0, pos),
287 "level_for({}) -- binary = {:b}",
288 pos,
289 pos
290 );
291 }
292
293 for level in 1..5 {
294 for pos in level..64 {
295 let a = pos * 64_usize.pow(level as u32);
296 assert_eq!(
297 level,
298 level_for(0, a as u64),
299 "level_for({}) -- binary = {:b}",
300 a,
301 a
302 );
303
304 if pos > level {
305 let a = a - 1;
306 assert_eq!(
307 level,
308 level_for(0, a as u64),
309 "level_for({}) -- binary = {:b}",
310 a,
311 a
312 );
313 }
314
315 if pos < 64 {
316 let a = a + 1;
317 assert_eq!(
318 level,
319 level_for(0, a as u64),
320 "level_for({}) -- binary = {:b}",
321 a,
322 a
323 );
324 }
325 }
326 }
327 }
328}
329