• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::fmt::Error;
15 use std::mem;
16 use std::mem::MaybeUninit;
17 use std::ptr::NonNull;
18 use std::time::Duration;
19 
20 use crate::time::Clock;
21 use crate::util::linked_list::LinkedList;
22 
23 // In a slots, the number of slot.
24 const SLOTS_NUM: usize = 64;
25 
26 // In a levels, the number of level.
27 const LEVELS_NUM: usize = 6;
28 
29 // Maximum sleep duration.
30 pub(crate) const MAX_DURATION: u64 = (1 << (6 * LEVELS_NUM)) - 1;
31 
32 pub(crate) enum TimeOut {
33     ClockEntry(NonNull<Clock>),
34     Duration(Duration),
35     None,
36 }
37 
38 pub(crate) struct Wheel {
39     // Since the wheel started,
40     // the number of milliseconds elapsed.
41     elapsed: u64,
42 
43     // Since the time wheel started,
44     // to the end of the last future run,
45     // the number of milliseconds elapsed.
46     last_elapsed: u64,
47 
48     // The time wheel levels are similar to a multi-layered dial.
49     //
50     // levels:
51     //
52     // 1  ms slots == 64 ms range
53     // 64 ms slots ~= 4 sec range
54     // 4 sec slots ~= 4 min range
55     // 4 min slots ~= 4 hr range
56     // 4 hr slots ~= 12 day range
57     // 12 day slots ~= 2 yr range
58     levels: Vec<Level>,
59 
60     // These corresponding timers have expired,
61     // and are ready to be triggered.
62     trigger: LinkedList<Clock>,
63 }
64 
65 impl Wheel {
66     // Creates a new timing wheel.
new() -> Self67     pub(crate) fn new() -> Self {
68         let levels = (0..LEVELS_NUM).map(Level::new).collect();
69 
70         Self {
71             elapsed: 0,
72             last_elapsed: 0,
73             levels,
74             trigger: Default::default(),
75         }
76     }
77 
78     // Return the elapsed.
elapsed(&self) -> u6479     pub(crate) fn elapsed(&self) -> u64 {
80         self.elapsed
81     }
82 
83     // Set the elapsed.
set_elapsed(&mut self, elapsed: u64)84     pub(crate) fn set_elapsed(&mut self, elapsed: u64) {
85         self.elapsed = elapsed;
86     }
87 
88     // Return the last_elapsed.
last_elapsed(&self) -> u6489     pub(crate) fn last_elapsed(&self) -> u64 {
90         self.last_elapsed
91     }
92 
93     // Set the last_elapsed.
set_last_elapsed(&mut self, last_elapsed: u64)94     pub(crate) fn set_last_elapsed(&mut self, last_elapsed: u64) {
95         self.last_elapsed = last_elapsed;
96     }
97 
98     // Compare the timing wheel elapsed with the expiration,
99     // from which to decide which level to insert.
find_level(&self, expiration: u64) -> usize100     pub(crate) fn find_level(&self, expiration: u64) -> usize {
101         // 0011 1111
102         const SLOT_MASK: u64 = (1 << 6) - 1;
103 
104         // Use the time difference value to find at which level.
105         let mut masked = (expiration - self.elapsed()) | SLOT_MASK;
106 
107         // 1111 1111 1111 1111 1111 1111 1111 1111 1111
108         if masked >= MAX_DURATION {
109             masked = MAX_DURATION - 1;
110         }
111 
112         let leading_zeros = masked.leading_zeros() as usize;
113         // Calculate how many valid bits there are.
114         let significant = 63 - leading_zeros;
115 
116         // One level per 6 bit,
117         // one slots has 2^6 slots.
118         significant / 6
119     }
120 
121     // Insert the corresponding TimerHandle into the specified position in the
122     // timing wheel.
insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error>123     pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error> {
124         let expiration = unsafe { clock_entry.as_ref().expiration() };
125 
126         if expiration <= self.elapsed() {
127             // This means that the timeout period has passed,
128             // and the time should be triggered immediately.
129             return Err(Error::default());
130         }
131 
132         let level = self.find_level(expiration);
133         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
134         // `Sleep` here does not go into `Ready`.
135         unsafe { clock_entry.as_mut().set_level(level) };
136 
137         self.levels[level].insert(clock_entry, self.elapsed);
138 
139         Ok(expiration)
140     }
141 
cancel(&mut self, clock_entry: NonNull<Clock>)142     pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
143         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
144         // `Sleep` here does not go into `Ready`.
145         let level = unsafe { clock_entry.as_ref().level() };
146         self.levels[level].cancel(clock_entry);
147 
148         // Caller has unique access to the linked list and the node is not in any other
149         // linked list.
150         unsafe {
151             self.trigger.remove(clock_entry);
152         }
153     }
154 
155     // Return where the next expiration is located, and its deadline.
next_expiration(&self) -> Option<(usize, usize, u64)>156     pub(crate) fn next_expiration(&self) -> Option<(usize, usize, u64)> {
157         for level in 0..LEVELS_NUM {
158             if let Some(expiration) =
159                 self.levels[level].next_expiration(self.elapsed() - self.last_elapsed())
160             {
161                 return Some(expiration);
162             }
163         }
164 
165         None
166     }
167 
168     // Retrieve the corresponding expired TimerHandle.
process_expiration(&mut self, expiration: &(usize, usize, u64))169     pub(crate) fn process_expiration(&mut self, expiration: &(usize, usize, u64)) {
170         let mut handles = self.levels[expiration.0].take_slot(expiration.1);
171         while let Some(item) = handles.pop_back() {
172             self.trigger.push_front(item);
173         }
174     }
175 
176     // Determine which timers have timed out at the current time.
poll(&mut self, now: u64) -> TimeOut177     pub(crate) fn poll(&mut self, now: u64) -> TimeOut {
178         loop {
179             if let Some(handle) = self.trigger.pop_back() {
180                 return TimeOut::ClockEntry(handle);
181             }
182 
183             let expiration = self.next_expiration();
184 
185             match expiration {
186                 Some(ref expiration) if expiration.2 > now - self.last_elapsed() => {
187                     return TimeOut::Duration(Duration::from_millis(
188                         expiration.2 - (now - self.last_elapsed()),
189                     ))
190                 }
191                 Some(ref expiration) => {
192                     self.process_expiration(expiration);
193                     self.set_elapsed(now);
194                 }
195                 None => {
196                     self.set_elapsed(now);
197                     break;
198                 }
199             }
200         }
201 
202         match self.trigger.pop_back() {
203             None => TimeOut::None,
204             Some(handle) => TimeOut::ClockEntry(handle),
205         }
206     }
207 }
208 
209 // Level in the wheel.
210 // All level contains 64 slots.
211 pub struct Level {
212     // current level
213     level: usize,
214 
215     // Determine which slot contains entries based on occupied bit.
216     occupied: u64,
217 
218     // slots in a level.
219     slots: [LinkedList<Clock>; SLOTS_NUM],
220 }
221 
222 impl Level {
223     // Specify the level and create a Level structure.
new(level: usize) -> Self224     pub(crate) fn new(level: usize) -> Self {
225         let mut slots: [MaybeUninit<LinkedList<Clock>>; SLOTS_NUM] =
226             unsafe { MaybeUninit::uninit().assume_init() };
227 
228         for slot in slots.iter_mut() {
229             *slot = MaybeUninit::new(Default::default());
230         }
231 
232         unsafe {
233             let slots = mem::transmute::<_, [LinkedList<Clock>; SLOTS_NUM]>(slots);
234             Self {
235                 level,
236                 occupied: 0,
237                 slots,
238             }
239         }
240     }
241 
242     // Based on the elapsed which the current time wheel is running,
243     // and the expected expiration time of the clock_entry,
244     // find the corresponding slot and insert it.
insert(&mut self, mut clock_entry: NonNull<Clock>, elapsed: u64)245     pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>, elapsed: u64) {
246         let duration = unsafe { clock_entry.as_ref().expiration() } - elapsed;
247         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
248         // `Sleep` here does not go into `Ready`.
249         unsafe { clock_entry.as_mut().set_duration(duration) };
250 
251         let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
252 
253         self.slots[slot].push_front(clock_entry);
254 
255         self.occupied |= 1 << slot;
256     }
257 
cancel(&mut self, clock_entry: NonNull<Clock>)258     pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
259         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
260         // `Sleep` here does not go into `Ready`.
261         let duration = unsafe { clock_entry.as_ref().duration() };
262 
263         let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
264 
265         // Caller has unique access to the linked list and the node is not in any other
266         // linked list.
267         unsafe {
268             self.slots[slot].remove(clock_entry);
269         }
270 
271         if self.slots[slot].is_empty() {
272             // Unset the bit
273             self.occupied ^= 1 << slot;
274         }
275     }
276 
277     // Return where the next expiration is located, and its deadline.
next_expiration(&self, now: u64) -> Option<(usize, usize, u64)>278     pub(crate) fn next_expiration(&self, now: u64) -> Option<(usize, usize, u64)> {
279         let slot = self.next_occupied_slot(now)?;
280 
281         let level_range = level_range(self.level);
282         let slot_range = slot_range(self.level);
283 
284         // Find the start time at this level for the current point in time.
285         let level_start = now & !(level_range - 1);
286         // Add the time of the last slot at this level to represent a time period.
287         let deadline = level_start + slot as u64 * slot_range;
288 
289         Some((self.level, slot, deadline))
290     }
291 
292     // Find the next slot that needs to be executed.
next_occupied_slot(&self, now: u64) -> Option<usize>293     pub(crate) fn next_occupied_slot(&self, now: u64) -> Option<usize> {
294         if self.occupied == 0 {
295             return None;
296         }
297 
298         let now_slot = now / slot_range(self.level);
299         let occupied = self.occupied.rotate_right(now_slot as u32);
300         let zeros = occupied.trailing_zeros();
301         let slot = (zeros as u64 + now_slot) % SLOTS_NUM as u64;
302 
303         Some(slot as usize)
304     }
305 
306     // Fetch all timers in a slot of the corresponding level.
take_slot(&mut self, slot: usize) -> LinkedList<Clock>307     pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList<Clock> {
308         self.occupied &= !(1 << slot);
309         mem::take(&mut self.slots[slot])
310     }
311 }
312 
313 // All the slots before this level add up to approximately.
slot_range(level: usize) -> u64314 fn slot_range(level: usize) -> u64 {
315     SLOTS_NUM.pow(level as u32) as u64
316 }
317 
318 // All the slots before this level(including this level) add up to
319 // approximately.
level_range(level: usize) -> u64320 fn level_range(level: usize) -> u64 {
321     SLOTS_NUM as u64 * slot_range(level)
322 }
323 
324 #[cfg(test)]
325 mod test {
326     use crate::time::wheel::{Wheel, LEVELS_NUM};
327     cfg_net!(
328         #[cfg(feature = "ffrt")]
329         use crate::time::TimeDriver;
330         use crate::time::{sleep, timeout};
331         use crate::net::UdpSocket;
332         use crate::JoinHandle;
333         use std::net::SocketAddr;
334         use std::time::Duration;
335     );
336 
337     /// UT test cases for Wheel::new
338     ///
339     /// # Brief
340     /// 1. Use Wheel::new to create a Wheel Struct.
341     /// 2. Verify the data in the Wheel Struct.
342     #[test]
ut_wheel_new_test()343     fn ut_wheel_new_test() {
344         let wheel = Wheel::new();
345         assert_eq!(wheel.elapsed, 0);
346         assert_eq!(wheel.last_elapsed, 0);
347         assert_eq!(wheel.levels.len(), LEVELS_NUM);
348     }
349 
350     /// UT test cases for Sleep drop.
351     ///
352     /// # Brief
353     /// 1. Use timeout to create a Timeout Struct.
354     /// 2. Enable the Sleep Struct corresponding to the Timeout Struct to enter
355     ///    the Pending state.
356     /// 3. Verify the change of the internal TimerHandle during Sleep Struct
357     ///    drop.
358     #[test]
359     #[cfg(feature = "net")]
ut_sleep_drop()360     fn ut_sleep_drop() {
361         async fn udp_sender(sender_addr: SocketAddr, receiver_addr: SocketAddr) {
362             let sender = UdpSocket::bind(sender_addr).await.unwrap();
363             let buf = [2; 10];
364             sleep(Duration::from_secs(1)).await;
365             sender.send_to(buf.as_slice(), receiver_addr).await.unwrap();
366         }
367 
368         async fn udp_receiver(receiver_addr: SocketAddr) {
369             let receiver = UdpSocket::bind(receiver_addr).await.unwrap();
370             let mut buf = [0; 10];
371             assert!(
372                 timeout(Duration::from_secs(2), receiver.recv_from(&mut buf[..]))
373                     .await
374                     .is_ok()
375             );
376         }
377 
378         let mut tasks: Vec<JoinHandle<()>> = Vec::new();
379         let udp_sender_addr = "127.0.0.1:9093".parse().unwrap();
380         let udp_receiver_addr = "127.0.0.1:9094".parse().unwrap();
381         tasks.push(crate::spawn(udp_sender(udp_sender_addr, udp_receiver_addr)));
382         tasks.push(crate::spawn(udp_receiver(udp_receiver_addr)));
383         for t in tasks {
384             let _ = crate::block_on(t);
385         }
386         #[cfg(feature = "ffrt")]
387         let lock = TimeDriver::get_ref().wheel.lock().unwrap();
388         #[cfg(feature = "ffrt")]
389         for slot in lock.levels[1].slots.iter() {
390             assert!(slot.is_empty());
391         }
392     }
393 }
394