• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 mod level;
2 pub(crate) use self::level::Expiration;
3 use self::level::Level;
4 
5 mod stack;
6 pub(crate) use self::stack::Stack;
7 
8 use std::borrow::Borrow;
9 use std::fmt::Debug;
10 use std::usize;
11 
12 /// Timing wheel implementation.
13 ///
14 /// This type provides the hashed timing wheel implementation that backs `Timer`
15 /// and `DelayQueue`.
16 ///
17 /// The structure is generic over `T: Stack`. This allows handling timeout data
18 /// being stored on the heap or in a slab. In order to support the latter case,
19 /// the slab must be passed into each function allowing the implementation to
20 /// lookup timer entries.
21 ///
22 /// See `Timer` documentation for some implementation notes.
23 #[derive(Debug)]
24 pub(crate) struct Wheel<T> {
25     /// The number of milliseconds elapsed since the wheel started.
26     elapsed: u64,
27 
28     /// Timer wheel.
29     ///
30     /// Levels:
31     ///
32     /// * 1 ms slots / 64 ms range
33     /// * 64 ms slots / ~ 4 sec range
34     /// * ~ 4 sec slots / ~ 4 min range
35     /// * ~ 4 min slots / ~ 4 hr range
36     /// * ~ 4 hr slots / ~ 12 day range
37     /// * ~ 12 day slots / ~ 2 yr range
38     levels: Vec<Level<T>>,
39 }
40 
41 /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
42 /// each, the timer is able to track time up to 2 years into the future with a
43 /// precision of 1 millisecond.
44 const NUM_LEVELS: usize = 6;
45 
46 /// The maximum duration of a delay
47 const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
48 
49 #[derive(Debug)]
50 pub(crate) enum InsertError {
51     Elapsed,
52     Invalid,
53 }
54 
55 impl<T> Wheel<T>
56 where
57     T: Stack,
58 {
59     /// Create a new timing wheel
new() -> Wheel<T>60     pub(crate) fn new() -> Wheel<T> {
61         let levels = (0..NUM_LEVELS).map(Level::new).collect();
62 
63         Wheel { elapsed: 0, levels }
64     }
65 
66     /// Return the number of milliseconds that have elapsed since the timing
67     /// wheel's creation.
elapsed(&self) -> u6468     pub(crate) fn elapsed(&self) -> u64 {
69         self.elapsed
70     }
71 
72     /// Insert an entry into the timing wheel.
73     ///
74     /// # Arguments
75     ///
76     /// * `when`: is the instant at which the entry should be fired. It is
77     ///           represented as the number of milliseconds since the creation
78     ///           of the timing wheel.
79     ///
80     /// * `item`: The item to insert into the wheel.
81     ///
82     /// * `store`: The slab or `()` when using heap storage.
83     ///
84     /// # Return
85     ///
86     /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
87     ///
88     /// `Err(Elapsed)` indicates that `when` represents an instant that has
89     /// already passed. In this case, the caller should fire the timeout
90     /// immediately.
91     ///
92     /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
insert( &mut self, when: u64, item: T::Owned, store: &mut T::Store, ) -> Result<(), (T::Owned, InsertError)>93     pub(crate) fn insert(
94         &mut self,
95         when: u64,
96         item: T::Owned,
97         store: &mut T::Store,
98     ) -> Result<(), (T::Owned, InsertError)> {
99         if when <= self.elapsed {
100             return Err((item, InsertError::Elapsed));
101         } else if when - self.elapsed > MAX_DURATION {
102             return Err((item, InsertError::Invalid));
103         }
104 
105         // Get the level at which the entry should be stored
106         let level = self.level_for(when);
107 
108         self.levels[level].add_entry(when, item, store);
109 
110         debug_assert!({
111             self.levels[level]
112                 .next_expiration(self.elapsed)
113                 .map(|e| e.deadline >= self.elapsed)
114                 .unwrap_or(true)
115         });
116 
117         Ok(())
118     }
119 
120     /// Remove `item` from the timing wheel.
121     #[track_caller]
remove(&mut self, item: &T::Borrowed, store: &mut T::Store)122     pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
123         let when = T::when(item, store);
124 
125         assert!(
126             self.elapsed <= when,
127             "elapsed={}; when={}",
128             self.elapsed,
129             when
130         );
131 
132         let level = self.level_for(when);
133 
134         self.levels[level].remove_entry(when, item, store);
135     }
136 
137     /// Instant at which to poll
poll_at(&self) -> Option<u64>138     pub(crate) fn poll_at(&self) -> Option<u64> {
139         self.next_expiration().map(|expiration| expiration.deadline)
140     }
141 
142     /// Next key that will expire
peek(&self) -> Option<T::Owned>143     pub(crate) fn peek(&self) -> Option<T::Owned> {
144         self.next_expiration()
145             .and_then(|expiration| self.peek_entry(&expiration))
146     }
147 
148     /// Advances the timer up to the instant represented by `now`.
poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned>149     pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
150         loop {
151             let expiration = self.next_expiration().and_then(|expiration| {
152                 if expiration.deadline > now {
153                     None
154                 } else {
155                     Some(expiration)
156                 }
157             });
158 
159             match expiration {
160                 Some(ref expiration) => {
161                     if let Some(item) = self.poll_expiration(expiration, store) {
162                         return Some(item);
163                     }
164 
165                     self.set_elapsed(expiration.deadline);
166                 }
167                 None => {
168                     // in this case the poll did not indicate an expiration
169                     // _and_ we were not able to find a next expiration in
170                     // the current list of timers.  advance to the poll's
171                     // current time and do nothing else.
172                     self.set_elapsed(now);
173                     return None;
174                 }
175             }
176         }
177     }
178 
179     /// Returns the instant at which the next timeout expires.
next_expiration(&self) -> Option<Expiration>180     fn next_expiration(&self) -> Option<Expiration> {
181         // Check all levels
182         for level in 0..NUM_LEVELS {
183             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
184                 // There cannot be any expirations at a higher level that happen
185                 // before this one.
186                 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
187 
188                 return Some(expiration);
189             }
190         }
191 
192         None
193     }
194 
195     /// Used for debug assertions
no_expirations_before(&self, start_level: usize, before: u64) -> bool196     fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
197         let mut res = true;
198 
199         for l2 in start_level..NUM_LEVELS {
200             if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
201                 if e2.deadline < before {
202                     res = false;
203                 }
204             }
205         }
206 
207         res
208     }
209 
210     /// iteratively find entries that are between the wheel's current
211     /// time and the expiration time.  for each in that population either
212     /// return it for notification (in the case of the last level) or tier
213     /// it down to the next level (in all other cases).
poll_expiration( &mut self, expiration: &Expiration, store: &mut T::Store, ) -> Option<T::Owned>214     pub(crate) fn poll_expiration(
215         &mut self,
216         expiration: &Expiration,
217         store: &mut T::Store,
218     ) -> Option<T::Owned> {
219         while let Some(item) = self.pop_entry(expiration, store) {
220             if expiration.level == 0 {
221                 debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline);
222 
223                 return Some(item);
224             } else {
225                 let when = T::when(item.borrow(), store);
226 
227                 let next_level = expiration.level - 1;
228 
229                 self.levels[next_level].add_entry(when, item, store);
230             }
231         }
232 
233         None
234     }
235 
set_elapsed(&mut self, when: u64)236     fn set_elapsed(&mut self, when: u64) {
237         assert!(
238             self.elapsed <= when,
239             "elapsed={:?}; when={:?}",
240             self.elapsed,
241             when
242         );
243 
244         if when > self.elapsed {
245             self.elapsed = when;
246         }
247     }
248 
pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned>249     fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option<T::Owned> {
250         self.levels[expiration.level].pop_entry_slot(expiration.slot, store)
251     }
252 
peek_entry(&self, expiration: &Expiration) -> Option<T::Owned>253     fn peek_entry(&self, expiration: &Expiration) -> Option<T::Owned> {
254         self.levels[expiration.level].peek_entry_slot(expiration.slot)
255     }
256 
level_for(&self, when: u64) -> usize257     fn level_for(&self, when: u64) -> usize {
258         level_for(self.elapsed, when)
259     }
260 }
261 
level_for(elapsed: u64, when: u64) -> usize262 fn level_for(elapsed: u64, when: u64) -> usize {
263     const SLOT_MASK: u64 = (1 << 6) - 1;
264 
265     // Mask in the trailing bits ignored by the level calculation in order to cap
266     // the possible leading zeros
267     let mut masked = elapsed ^ when | SLOT_MASK;
268     if masked >= MAX_DURATION {
269         // Fudge the timer into the top level
270         masked = MAX_DURATION - 1;
271     }
272     let leading_zeros = masked.leading_zeros() as usize;
273     let significant = 63 - leading_zeros;
274     significant / 6
275 }
276 
277 #[cfg(all(test, not(loom)))]
278 mod test {
279     use super::*;
280 
281     #[test]
test_level_for()282     fn test_level_for() {
283         for pos in 0..64 {
284             assert_eq!(
285                 0,
286                 level_for(0, pos),
287                 "level_for({}) -- binary = {:b}",
288                 pos,
289                 pos
290             );
291         }
292 
293         for level in 1..5 {
294             for pos in level..64 {
295                 let a = pos * 64_usize.pow(level as u32);
296                 assert_eq!(
297                     level,
298                     level_for(0, a as u64),
299                     "level_for({}) -- binary = {:b}",
300                     a,
301                     a
302                 );
303 
304                 if pos > level {
305                     let a = a - 1;
306                     assert_eq!(
307                         level,
308                         level_for(0, a as u64),
309                         "level_for({}) -- binary = {:b}",
310                         a,
311                         a
312                     );
313                 }
314 
315                 if pos < 64 {
316                     let a = a + 1;
317                     assert_eq!(
318                         level,
319                         level_for(0, a as u64),
320                         "level_for({}) -- binary = {:b}",
321                         a,
322                         a
323                     );
324                 }
325             }
326         }
327     }
328 }
329