• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::fmt::Error;
15 use std::mem;
16 use std::mem::MaybeUninit;
17 use std::ptr::{addr_of_mut, NonNull};
18 use std::sync::atomic::AtomicBool;
19 use std::sync::atomic::Ordering::Relaxed;
20 use std::task::Waker;
21 use std::time::Duration;
22 
23 use crate::util::linked_list::{Link, LinkedList, Node};
24 
25 // In a slots, the number of slot.
26 const SLOTS_NUM: usize = 64;
27 
28 // In a levels, the number of level.
29 const LEVELS_NUM: usize = 6;
30 
31 // Maximum sleep duration.
32 pub(crate) const MAX_DURATION: u64 = (1 << (6 * LEVELS_NUM)) - 1;
33 
34 // Struct for timing and waking up corresponding tasks on the timing wheel.
35 pub(crate) struct Clock {
36     // Expected expiration time.
37     expiration: u64,
38 
39     // The level to which the clock will be inserted.
40     level: usize,
41 
42     // Elapsed time duration.
43     duration: u64,
44 
45     // The result obtained when the corresponding Sleep structure is woken up by
46     // which can be used to determine if the Future is completed correctly.
47     result: AtomicBool,
48 
49     // Corresponding waker,
50     // which is used to wake up sleep coroutine.
51     waker: Option<Waker>,
52 
53     // Linked_list node.
54     node: Node<Clock>,
55 }
56 
57 impl Clock {
58     // Creates a default Clock structure.
new() -> Self59     pub(crate) fn new() -> Self {
60         Self {
61             expiration: 0,
62             level: 0,
63             duration: 0,
64             result: AtomicBool::new(false),
65             waker: None,
66             node: Node::new(),
67         }
68     }
69 
70     // Returns the expected expiration time.
expiration(&self) -> u6471     pub(crate) fn expiration(&self) -> u64 {
72         self.expiration
73     }
74 
75     // Sets the expected expiration time
set_expiration(&mut self, expiration: u64)76     pub(crate) fn set_expiration(&mut self, expiration: u64) {
77         self.expiration = expiration;
78     }
79 
80     // Returns the level to which the clock will be inserted.
level(&self) -> usize81     pub(crate) fn level(&self) -> usize {
82         self.level
83     }
84 
85     // Sets the level to which the clock will be inserted.
set_level(&mut self, level: usize)86     pub(crate) fn set_level(&mut self, level: usize) {
87         self.level = level;
88     }
89 
duration(&self) -> u6490     pub(crate) fn duration(&self) -> u64 {
91         self.duration
92     }
93 
set_duration(&mut self, duration: u64)94     pub(crate) fn set_duration(&mut self, duration: u64) {
95         self.duration = duration;
96     }
97 
98     // Returns the corresponding waker.
take_waker(&mut self) -> Option<Waker>99     pub(crate) fn take_waker(&mut self) -> Option<Waker> {
100         self.waker.take()
101     }
102 
103     // Sets the corresponding waker.
set_waker(&mut self, waker: Waker)104     pub(crate) fn set_waker(&mut self, waker: Waker) {
105         self.waker = Some(waker);
106     }
107 
108     // Returns the result.
result(&self) -> bool109     pub(crate) fn result(&self) -> bool {
110         self.result.load(Relaxed)
111     }
112 
113     // Sets the result.
set_result(&mut self, result: bool)114     pub(crate) fn set_result(&mut self, result: bool) {
115         self.result.store(result, Relaxed);
116     }
117 }
118 
119 impl Default for Clock {
default() -> Self120     fn default() -> Self {
121         Clock::new()
122     }
123 }
124 
125 unsafe impl Link for Clock {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,126     unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
127     where
128         Self: Sized,
129     {
130         let node_ptr = addr_of_mut!(ptr.as_mut().node);
131         NonNull::new_unchecked(node_ptr)
132     }
133 }
134 
135 pub(crate) enum TimeOut {
136     ClockEntry(NonNull<Clock>),
137     Duration(Duration),
138     None,
139 }
140 
141 pub(crate) struct Expiration {
142     level: usize,
143     slot: usize,
144     deadline: u64,
145 }
146 
147 pub(crate) struct Wheel {
148     // Since the wheel started,
149     // the number of milliseconds elapsed.
150     elapsed: u64,
151 
152     // The time wheel levels are similar to a multi-layered dial.
153     //
154     // levels:
155     //
156     // 1  ms slots == 64 ms range
157     // 64 ms slots ~= 4 sec range
158     // 4 sec slots ~= 4 min range
159     // 4 min slots ~= 4 hr range
160     // 4 hr slots ~= 12 day range
161     // 12 day slots ~= 2 yr range
162     levels: Vec<Level>,
163 
164     // These corresponding timers have expired,
165     // and are ready to be triggered.
166     trigger: LinkedList<Clock>,
167 }
168 
169 impl Wheel {
170     // Creates a new timing wheel.
new() -> Self171     pub(crate) fn new() -> Self {
172         let levels = (0..LEVELS_NUM).map(Level::new).collect();
173 
174         Self {
175             elapsed: 0,
176             levels,
177             trigger: Default::default(),
178         }
179     }
180 
181     // Return the elapsed.
elapsed(&self) -> u64182     pub(crate) fn elapsed(&self) -> u64 {
183         self.elapsed
184     }
185 
186     // Set the elapsed.
set_elapsed(&mut self, elapsed: u64)187     pub(crate) fn set_elapsed(&mut self, elapsed: u64) {
188         self.elapsed = elapsed;
189     }
190 
191     // Compare the timing wheel elapsed with the expiration,
192     // from which to decide which level to insert.
find_level(expiration: u64, elapsed: u64) -> usize193     pub(crate) fn find_level(expiration: u64, elapsed: u64) -> usize {
194         // 0011 1111
195         const SLOT_MASK: u64 = (1 << 6) - 1;
196 
197         // Use the time difference value to find at which level.
198         // Use XOR to determine the insertion of inspiration currently need to be
199         // inserted into the level, using binary operations to determine which bit has
200         // changed, and further determine which level. If don't use XOR, it will lead to
201         // the re-insertion of the level of the calculation error, and insert to an
202         // incorrect level.
203         let mut masked = (expiration ^ elapsed) | SLOT_MASK;
204         // 1111 1111 1111 1111 1111 1111 1111 1111 1111
205         if masked >= MAX_DURATION {
206             masked = MAX_DURATION - 1;
207         }
208 
209         let leading_zeros = masked.leading_zeros() as usize;
210         // Calculate how many valid bits there are.
211         let significant = 63 - leading_zeros;
212 
213         // One level per 6 bit,
214         // one slots has 2^6 slots.
215         significant / 6
216     }
217 
218     // Insert the corresponding TimerHandle into the specified position in the
219     // timing wheel.
insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error>220     pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error> {
221         let expiration = unsafe { clock_entry.as_ref().expiration() };
222 
223         if expiration <= self.elapsed() {
224             // This means that the timeout period has passed,
225             // and the time should be triggered immediately.
226             return Err(Error);
227         }
228 
229         let level = Self::find_level(expiration, self.elapsed());
230         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
231         // `Sleep` here does not go into `Ready`.
232         unsafe { clock_entry.as_mut().set_level(level) };
233 
234         self.levels[level].insert(clock_entry);
235 
236         Ok(expiration)
237     }
238 
cancel(&mut self, clock_entry: NonNull<Clock>)239     pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
240         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
241         // `Sleep` here does not go into `Ready`.
242         let level = unsafe { clock_entry.as_ref().level() };
243         self.levels[level].cancel(clock_entry);
244 
245         // Caller has unique access to the linked list and the node is not in any other
246         // linked list.
247         unsafe {
248             self.trigger.remove(clock_entry);
249         }
250     }
251 
252     // Return where the next expiration is located, and its deadline.
next_expiration(&self) -> Option<Expiration>253     pub(crate) fn next_expiration(&self) -> Option<Expiration> {
254         for level in 0..LEVELS_NUM {
255             if let Some(expiration) = self.levels[level].next_expiration(self.elapsed()) {
256                 return Some(expiration);
257             }
258         }
259 
260         None
261     }
262 
263     // Retrieve the corresponding expired TimerHandle.
process_expiration(&mut self, expiration: &Expiration)264     pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
265         let mut handles = self.levels[expiration.level].take_slot(expiration.slot);
266         while let Some(mut item) = handles.pop_back() {
267             let expected_expiration = unsafe { item.as_ref().expiration() };
268             if expected_expiration > expiration.deadline {
269                 let level = Self::find_level(expected_expiration, expiration.deadline);
270 
271                 unsafe { item.as_mut().set_level(level) };
272 
273                 self.levels[level].insert(item);
274             } else {
275                 self.trigger.push_front(item);
276             }
277         }
278     }
279 
280     // Determine which timers have timed out at the current time.
poll(&mut self, now: u64) -> TimeOut281     pub(crate) fn poll(&mut self, now: u64) -> TimeOut {
282         loop {
283             if let Some(handle) = self.trigger.pop_back() {
284                 return TimeOut::ClockEntry(handle);
285             }
286 
287             let expiration = self.next_expiration();
288 
289             match expiration {
290                 Some(ref expiration) if expiration.deadline > now => {
291                     return TimeOut::Duration(Duration::from_millis(expiration.deadline - now))
292                 }
293                 Some(ref expiration) => {
294                     self.process_expiration(expiration);
295                     self.set_elapsed(expiration.deadline);
296                 }
297                 None => {
298                     self.set_elapsed(now);
299                     break;
300                 }
301             }
302         }
303 
304         match self.trigger.pop_back() {
305             None => TimeOut::None,
306             Some(handle) => TimeOut::ClockEntry(handle),
307         }
308     }
309 }
310 
311 // Level in the wheel.
312 // All level contains 64 slots.
313 pub struct Level {
314     // current level
315     level: usize,
316 
317     // Determine which slot contains entries based on occupied bit.
318     occupied: u64,
319 
320     // slots in a level.
321     slots: [LinkedList<Clock>; SLOTS_NUM],
322 }
323 
324 impl Level {
325     // Specify the level and create a Level structure.
new(level: usize) -> Self326     pub(crate) fn new(level: usize) -> Self {
327         let mut slots: [MaybeUninit<LinkedList<Clock>>; SLOTS_NUM] =
328             unsafe { MaybeUninit::uninit().assume_init() };
329 
330         for slot in slots.iter_mut() {
331             *slot = MaybeUninit::new(Default::default());
332         }
333 
334         unsafe {
335             let slots = mem::transmute::<_, [LinkedList<Clock>; SLOTS_NUM]>(slots);
336             Self {
337                 level,
338                 occupied: 0,
339                 slots,
340             }
341         }
342     }
343 
344     // Based on the elapsed which the current time wheel is running,
345     // and the expected expiration time of the clock_entry,
346     // find the corresponding slot and insert it.
insert(&mut self, mut clock_entry: NonNull<Clock>)347     pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) {
348         // This duration represents how long it takes for the current slot to complete,
349         // at least 0.
350         let duration = unsafe { clock_entry.as_ref().expiration() };
351 
352         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
353         // `Sleep` here does not go into `Ready`.
354         unsafe { clock_entry.as_mut().set_duration(duration) };
355 
356         let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
357         self.slots[slot].push_front(clock_entry);
358 
359         self.occupied |= 1 << slot;
360     }
361 
cancel(&mut self, clock_entry: NonNull<Clock>)362     pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
363         // Unsafe access to clock_entry is only unsafe when Sleep Drop,
364         // `Sleep` here does not go into `Ready`.
365         let duration = unsafe { clock_entry.as_ref().duration() };
366 
367         let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
368 
369         // Caller has unique access to the linked list and the node is not in any other
370         // linked list.
371         unsafe {
372             self.slots[slot].remove(clock_entry);
373         }
374 
375         if self.slots[slot].is_empty() {
376             // Unset the bit
377             self.occupied &= !(1 << slot);
378         }
379     }
380 
381     // Return where the next expiration is located, and its deadline.
next_expiration(&self, now: u64) -> Option<Expiration>382     pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
383         let slot = self.next_occupied_slot(now)?;
384 
385         let deadline = Self::calculate_deadline(slot, self.level, now);
386 
387         Some(Expiration {
388             level: self.level,
389             slot,
390             deadline,
391         })
392     }
393 
calculate_deadline(slot: usize, level: usize, now: u64) -> u64394     fn calculate_deadline(slot: usize, level: usize, now: u64) -> u64 {
395         let slot_range = slot_range(level);
396         let level_range = slot_range * SLOTS_NUM as u64;
397         let level_start = now & !(level_range - 1);
398         // Add the time of the last slot at this level to represent a time period.
399         let mut deadline = level_start + slot as u64 * slot_range;
400 
401         if deadline <= now {
402             // This only happened when Duration > MAX_DURATION
403             deadline += level_range;
404         }
405 
406         deadline
407     }
408 
409     // Find the next slot that needs to be executed.
next_occupied_slot(&self, now: u64) -> Option<usize>410     pub(crate) fn next_occupied_slot(&self, now: u64) -> Option<usize> {
411         if self.occupied == 0 {
412             return None;
413         }
414 
415         let now_slot = now / slot_range(self.level);
416         let occupied = self.occupied.rotate_right(now_slot as u32);
417         let zeros = occupied.trailing_zeros();
418         let slot = (zeros as u64 + now_slot) % SLOTS_NUM as u64;
419 
420         Some(slot as usize)
421     }
422 
423     // Fetch all timers in a slot of the corresponding level.
take_slot(&mut self, slot: usize) -> LinkedList<Clock>424     pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList<Clock> {
425         self.occupied &= !(1 << slot);
426         mem::take(&mut self.slots[slot])
427     }
428 }
429 
430 // All the slots before this level add up to approximately.
slot_range(level: usize) -> u64431 fn slot_range(level: usize) -> u64 {
432     SLOTS_NUM.pow(level as u32) as u64
433 }
434 
435 #[cfg(test)]
436 mod test {
437     use crate::time::wheel::{Level, Wheel, LEVELS_NUM};
438     cfg_net!(
439         #[cfg(feature = "ffrt")]
440         use crate::time::TimeDriver;
441         use crate::time::{sleep, timeout};
442         use crate::net::UdpSocket;
443         use crate::task::JoinHandle;
444         use std::net::SocketAddr;
445         use std::time::Duration;
446     );
447 
448     /// UT test cases for Wheel::new
449     ///
450     /// # Brief
451     /// 1. Use Wheel::new to create a Wheel Struct.
452     /// 2. Verify the data in the Wheel Struct.
453     #[test]
ut_wheel_new_test()454     fn ut_wheel_new_test() {
455         let wheel = Wheel::new();
456         assert_eq!(wheel.elapsed, 0);
457         assert_eq!(wheel.levels.len(), LEVELS_NUM);
458     }
459 
460     /// UT test cases for Sleep drop.
461     ///
462     /// # Brief
463     /// 1. Use timeout to create a Timeout Struct.
464     /// 2. Enable the Sleep Struct corresponding to the Timeout Struct to enter
465     ///    the Pending state.
466     /// 3. Verify the change of the internal TimerHandle during Sleep Struct
467     ///    drop.
468     #[test]
469     #[cfg(feature = "net")]
ut_sleep_drop()470     fn ut_sleep_drop() {
471         async fn udp_sender(sender_addr: SocketAddr, receiver_addr: SocketAddr) {
472             let sender = UdpSocket::bind(sender_addr).await.unwrap();
473             let buf = [2; 10];
474             sleep(Duration::from_secs(1)).await;
475             sender.send_to(buf.as_slice(), receiver_addr).await.unwrap();
476         }
477 
478         async fn udp_receiver(receiver_addr: SocketAddr) {
479             let receiver = UdpSocket::bind(receiver_addr).await.unwrap();
480             let mut buf = [0; 10];
481             assert!(
482                 timeout(Duration::from_secs(2), receiver.recv_from(&mut buf[..]))
483                     .await
484                     .is_ok()
485             );
486         }
487 
488         let mut tasks: Vec<JoinHandle<()>> = Vec::new();
489         let udp_sender_addr = "127.0.0.1:9093".parse().unwrap();
490         let udp_receiver_addr = "127.0.0.1:9094".parse().unwrap();
491         tasks.push(crate::spawn(udp_sender(udp_sender_addr, udp_receiver_addr)));
492         tasks.push(crate::spawn(udp_receiver(udp_receiver_addr)));
493         for t in tasks {
494             let _ = crate::block_on(t);
495         }
496         #[cfg(feature = "ffrt")]
497         let lock = TimeDriver::get_ref().wheel.lock().unwrap();
498         #[cfg(feature = "ffrt")]
499         for slot in lock.levels[1].slots.iter() {
500             assert!(slot.is_empty());
501         }
502     }
503 
504     /// UT test cases for Level::calculate_deadline
505     ///
506     /// # Brief
507     /// 1. Use Level::calculate_deadline() to calculate Level.
508     /// 2. Verify the deadline is right.
509     #[test]
ut_wheel_calculate_deadline()510     fn ut_wheel_calculate_deadline() {
511         let deadline = Level::calculate_deadline(36, 0, 95);
512         assert_eq!(deadline, 100);
513         let deadline = Level::calculate_deadline(1, 1, 63);
514         assert_eq!(deadline, 64);
515         let deadline = Level::calculate_deadline(37, 0, 79);
516         assert_eq!(deadline, 101);
517         let deadline = Level::calculate_deadline(31, 1, 960);
518         assert_eq!(deadline, 1984);
519         let deadline = Level::calculate_deadline(61, 1, 7001);
520         assert_eq!(deadline, 8000);
521         let deadline = Level::calculate_deadline(2, 2, 8001);
522         assert_eq!(deadline, 8192);
523         let deadline = Level::calculate_deadline(12, 1, 8192);
524         assert_eq!(deadline, 8960);
525         let deadline = Level::calculate_deadline(40, 0, 8960);
526         assert_eq!(deadline, 9000);
527     }
528 }
529