• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::runtime::time::{TimerHandle, TimerShared};
2 use crate::time::error::InsertError;
3 
4 mod level;
5 pub(crate) use self::level::Expiration;
6 use self::level::Level;
7 
8 use std::ptr::NonNull;
9 
10 use super::EntryList;
11 
12 /// Timing wheel implementation.
13 ///
14 /// This type provides the hashed timing wheel implementation that backs `Timer`
15 /// and `DelayQueue`.
16 ///
17 /// The structure is generic over `T: Stack`. This allows handling timeout data
18 /// being stored on the heap or in a slab. In order to support the latter case,
19 /// the slab must be passed into each function allowing the implementation to
20 /// lookup timer entries.
21 ///
22 /// See `Timer` documentation for some implementation notes.
23 #[derive(Debug)]
24 pub(crate) struct Wheel {
25     /// The number of milliseconds elapsed since the wheel started.
26     elapsed: u64,
27 
28     /// Timer wheel.
29     ///
30     /// Levels:
31     ///
32     /// * 1 ms slots / 64 ms range
33     /// * 64 ms slots / ~ 4 sec range
34     /// * ~ 4 sec slots / ~ 4 min range
35     /// * ~ 4 min slots / ~ 4 hr range
36     /// * ~ 4 hr slots / ~ 12 day range
37     /// * ~ 12 day slots / ~ 2 yr range
38     levels: Vec<Level>,
39 
40     /// Entries queued for firing
41     pending: EntryList,
42 }
43 
44 /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots
45 /// each, the timer is able to track time up to 2 years into the future with a
46 /// precision of 1 millisecond.
47 const NUM_LEVELS: usize = 6;
48 
49 /// The maximum duration of a `Sleep`.
50 pub(super) const MAX_DURATION: u64 = (1 << (6 * NUM_LEVELS)) - 1;
51 
52 impl Wheel {
53     /// Creates a new timing wheel.
new() -> Wheel54     pub(crate) fn new() -> Wheel {
55         let levels = (0..NUM_LEVELS).map(Level::new).collect();
56 
57         Wheel {
58             elapsed: 0,
59             levels,
60             pending: EntryList::new(),
61         }
62     }
63 
64     /// Returns the number of milliseconds that have elapsed since the timing
65     /// wheel's creation.
elapsed(&self) -> u6466     pub(crate) fn elapsed(&self) -> u64 {
67         self.elapsed
68     }
69 
70     /// Inserts an entry into the timing wheel.
71     ///
72     /// # Arguments
73     ///
74     /// * `item`: The item to insert into the wheel.
75     ///
76     /// # Return
77     ///
78     /// Returns `Ok` when the item is successfully inserted, `Err` otherwise.
79     ///
80     /// `Err(Elapsed)` indicates that `when` represents an instant that has
81     /// already passed. In this case, the caller should fire the timeout
82     /// immediately.
83     ///
84     /// `Err(Invalid)` indicates an invalid `when` argument as been supplied.
85     ///
86     /// # Safety
87     ///
88     /// This function registers item into an intrusive linked list. The caller
89     /// must ensure that `item` is pinned and will not be dropped without first
90     /// being deregistered.
insert( &mut self, item: TimerHandle, ) -> Result<u64, (TimerHandle, InsertError)>91     pub(crate) unsafe fn insert(
92         &mut self,
93         item: TimerHandle,
94     ) -> Result<u64, (TimerHandle, InsertError)> {
95         let when = item.sync_when();
96 
97         if when <= self.elapsed {
98             return Err((item, InsertError::Elapsed));
99         }
100 
101         // Get the level at which the entry should be stored
102         let level = self.level_for(when);
103 
104         unsafe {
105             self.levels[level].add_entry(item);
106         }
107 
108         debug_assert!({
109             self.levels[level]
110                 .next_expiration(self.elapsed)
111                 .map(|e| e.deadline >= self.elapsed)
112                 .unwrap_or(true)
113         });
114 
115         Ok(when)
116     }
117 
118     /// Removes `item` from the timing wheel.
remove(&mut self, item: NonNull<TimerShared>)119     pub(crate) unsafe fn remove(&mut self, item: NonNull<TimerShared>) {
120         unsafe {
121             let when = item.as_ref().cached_when();
122             if when == u64::MAX {
123                 self.pending.remove(item);
124             } else {
125                 debug_assert!(
126                     self.elapsed <= when,
127                     "elapsed={}; when={}",
128                     self.elapsed,
129                     when
130                 );
131 
132                 let level = self.level_for(when);
133 
134                 self.levels[level].remove_entry(item);
135             }
136         }
137     }
138 
139     /// Instant at which to poll.
poll_at(&self) -> Option<u64>140     pub(crate) fn poll_at(&self) -> Option<u64> {
141         self.next_expiration().map(|expiration| expiration.deadline)
142     }
143 
144     /// Advances the timer up to the instant represented by `now`.
poll(&mut self, now: u64) -> Option<TimerHandle>145     pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
146         loop {
147             if let Some(handle) = self.pending.pop_back() {
148                 return Some(handle);
149             }
150 
151             match self.next_expiration() {
152                 Some(ref expiration) if expiration.deadline <= now => {
153                     self.process_expiration(expiration);
154 
155                     self.set_elapsed(expiration.deadline);
156                 }
157                 _ => {
158                     // in this case the poll did not indicate an expiration
159                     // _and_ we were not able to find a next expiration in
160                     // the current list of timers.  advance to the poll's
161                     // current time and do nothing else.
162                     self.set_elapsed(now);
163                     break;
164                 }
165             }
166         }
167 
168         self.pending.pop_back()
169     }
170 
171     /// Returns the instant at which the next timeout expires.
next_expiration(&self) -> Option<Expiration>172     fn next_expiration(&self) -> Option<Expiration> {
173         if !self.pending.is_empty() {
174             // Expire immediately as we have things pending firing
175             return Some(Expiration {
176                 level: 0,
177                 slot: 0,
178                 deadline: self.elapsed,
179             });
180         }
181 
182         // Check all levels
183         for level in 0..NUM_LEVELS {
184             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed) {
185                 // There cannot be any expirations at a higher level that happen
186                 // before this one.
187                 debug_assert!(self.no_expirations_before(level + 1, expiration.deadline));
188 
189                 return Some(expiration);
190             }
191         }
192 
193         None
194     }
195 
196     /// Returns the tick at which this timer wheel next needs to perform some
197     /// processing, or None if there are no timers registered.
next_expiration_time(&self) -> Option<u64>198     pub(super) fn next_expiration_time(&self) -> Option<u64> {
199         self.next_expiration().map(|ex| ex.deadline)
200     }
201 
202     /// Used for debug assertions
no_expirations_before(&self, start_level: usize, before: u64) -> bool203     fn no_expirations_before(&self, start_level: usize, before: u64) -> bool {
204         let mut res = true;
205 
206         for l2 in start_level..NUM_LEVELS {
207             if let Some(e2) = self.levels[l2].next_expiration(self.elapsed) {
208                 if e2.deadline < before {
209                     res = false;
210                 }
211             }
212         }
213 
214         res
215     }
216 
217     /// iteratively find entries that are between the wheel's current
218     /// time and the expiration time.  for each in that population either
219     /// queue it for notification (in the case of the last level) or tier
220     /// it down to the next level (in all other cases).
process_expiration(&mut self, expiration: &Expiration)221     pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
222         // Note that we need to take _all_ of the entries off the list before
223         // processing any of them. This is important because it's possible that
224         // those entries might need to be reinserted into the same slot.
225         //
226         // This happens only on the highest level, when an entry is inserted
227         // more than MAX_DURATION into the future. When this happens, we wrap
228         // around, and process some entries a multiple of MAX_DURATION before
229         // they actually need to be dropped down a level. We then reinsert them
230         // back into the same position; we must make sure we don't then process
231         // those entries again or we'll end up in an infinite loop.
232         let mut entries = self.take_entries(expiration);
233 
234         while let Some(item) = entries.pop_back() {
235             if expiration.level == 0 {
236                 debug_assert_eq!(unsafe { item.cached_when() }, expiration.deadline);
237             }
238 
239             // Try to expire the entry; this is cheap (doesn't synchronize) if
240             // the timer is not expired, and updates cached_when.
241             match unsafe { item.mark_pending(expiration.deadline) } {
242                 Ok(()) => {
243                     // Item was expired
244                     self.pending.push_front(item);
245                 }
246                 Err(expiration_tick) => {
247                     let level = level_for(expiration.deadline, expiration_tick);
248                     unsafe {
249                         self.levels[level].add_entry(item);
250                     }
251                 }
252             }
253         }
254     }
255 
set_elapsed(&mut self, when: u64)256     fn set_elapsed(&mut self, when: u64) {
257         assert!(
258             self.elapsed <= when,
259             "elapsed={:?}; when={:?}",
260             self.elapsed,
261             when
262         );
263 
264         if when > self.elapsed {
265             self.elapsed = when;
266         }
267     }
268 
269     /// Obtains the list of entries that need processing for the given expiration.
270     ///
take_entries(&mut self, expiration: &Expiration) -> EntryList271     fn take_entries(&mut self, expiration: &Expiration) -> EntryList {
272         self.levels[expiration.level].take_slot(expiration.slot)
273     }
274 
level_for(&self, when: u64) -> usize275     fn level_for(&self, when: u64) -> usize {
276         level_for(self.elapsed, when)
277     }
278 }
279 
level_for(elapsed: u64, when: u64) -> usize280 fn level_for(elapsed: u64, when: u64) -> usize {
281     const SLOT_MASK: u64 = (1 << 6) - 1;
282 
283     // Mask in the trailing bits ignored by the level calculation in order to cap
284     // the possible leading zeros
285     let mut masked = elapsed ^ when | SLOT_MASK;
286 
287     if masked >= MAX_DURATION {
288         // Fudge the timer into the top level
289         masked = MAX_DURATION - 1;
290     }
291 
292     let leading_zeros = masked.leading_zeros() as usize;
293     let significant = 63 - leading_zeros;
294 
295     significant / 6
296 }
297 
298 #[cfg(all(test, not(loom)))]
299 mod test {
300     use super::*;
301 
302     #[test]
test_level_for()303     fn test_level_for() {
304         for pos in 0..64 {
305             assert_eq!(
306                 0,
307                 level_for(0, pos),
308                 "level_for({}) -- binary = {:b}",
309                 pos,
310                 pos
311             );
312         }
313 
314         for level in 1..5 {
315             for pos in level..64 {
316                 let a = pos * 64_usize.pow(level as u32);
317                 assert_eq!(
318                     level,
319                     level_for(0, a as u64),
320                     "level_for({}) -- binary = {:b}",
321                     a,
322                     a
323                 );
324 
325                 if pos > level {
326                     let a = a - 1;
327                     assert_eq!(
328                         level,
329                         level_for(0, a as u64),
330                         "level_for({}) -- binary = {:b}",
331                         a,
332                         a
333                     );
334                 }
335 
336                 if pos < 64 {
337                     let a = a + 1;
338                     assert_eq!(
339                         level,
340                         level_for(0, a as u64),
341                         "level_for({}) -- binary = {:b}",
342                         a,
343                         a
344                     );
345                 }
346             }
347         }
348     }
349 }
350