1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::fmt::Error;
15 use std::mem;
16 use std::mem::MaybeUninit;
17 use std::ptr::{addr_of_mut, NonNull};
18 use std::sync::atomic::AtomicBool;
19 use std::sync::atomic::Ordering::Relaxed;
20 use std::task::Waker;
21 use std::time::Duration;
22
23 use crate::util::linked_list::{Link, LinkedList, Node};
24
25 // In a slots, the number of slot.
26 const SLOTS_NUM: usize = 64;
27
28 // In a levels, the number of level.
29 const LEVELS_NUM: usize = 6;
30
31 // Maximum sleep duration.
32 pub(crate) const MAX_DURATION: u64 = (1 << (6 * LEVELS_NUM)) - 1;
33
34 // Struct for timing and waking up corresponding tasks on the timing wheel.
35 pub(crate) struct Clock {
36 // Expected expiration time.
37 expiration: u64,
38
39 // The level to which the clock will be inserted.
40 level: usize,
41
42 // Elapsed time duration.
43 duration: u64,
44
45 // The result obtained when the corresponding Sleep structure is woken up by
46 // which can be used to determine if the Future is completed correctly.
47 result: AtomicBool,
48
49 // Corresponding waker,
50 // which is used to wake up sleep coroutine.
51 waker: Option<Waker>,
52
53 // Linked_list node.
54 node: Node<Clock>,
55 }
56
57 impl Clock {
58 // Creates a default Clock structure.
new() -> Self59 pub(crate) fn new() -> Self {
60 Self {
61 expiration: 0,
62 level: 0,
63 duration: 0,
64 result: AtomicBool::new(false),
65 waker: None,
66 node: Node::new(),
67 }
68 }
69
70 // Returns the expected expiration time.
expiration(&self) -> u6471 pub(crate) fn expiration(&self) -> u64 {
72 self.expiration
73 }
74
75 // Sets the expected expiration time
set_expiration(&mut self, expiration: u64)76 pub(crate) fn set_expiration(&mut self, expiration: u64) {
77 self.expiration = expiration;
78 }
79
80 // Returns the level to which the clock will be inserted.
level(&self) -> usize81 pub(crate) fn level(&self) -> usize {
82 self.level
83 }
84
85 // Sets the level to which the clock will be inserted.
set_level(&mut self, level: usize)86 pub(crate) fn set_level(&mut self, level: usize) {
87 self.level = level;
88 }
89
duration(&self) -> u6490 pub(crate) fn duration(&self) -> u64 {
91 self.duration
92 }
93
set_duration(&mut self, duration: u64)94 pub(crate) fn set_duration(&mut self, duration: u64) {
95 self.duration = duration;
96 }
97
98 // Returns the corresponding waker.
take_waker(&mut self) -> Option<Waker>99 pub(crate) fn take_waker(&mut self) -> Option<Waker> {
100 self.waker.take()
101 }
102
103 // Sets the corresponding waker.
set_waker(&mut self, waker: Waker)104 pub(crate) fn set_waker(&mut self, waker: Waker) {
105 self.waker = Some(waker);
106 }
107
108 // Returns the result.
result(&self) -> bool109 pub(crate) fn result(&self) -> bool {
110 self.result.load(Relaxed)
111 }
112
113 // Sets the result.
set_result(&mut self, result: bool)114 pub(crate) fn set_result(&mut self, result: bool) {
115 self.result.store(result, Relaxed);
116 }
117 }
118
119 impl Default for Clock {
default() -> Self120 fn default() -> Self {
121 Clock::new()
122 }
123 }
124
125 unsafe impl Link for Clock {
node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>> where Self: Sized,126 unsafe fn node(mut ptr: NonNull<Self>) -> NonNull<Node<Self>>
127 where
128 Self: Sized,
129 {
130 let node_ptr = addr_of_mut!(ptr.as_mut().node);
131 NonNull::new_unchecked(node_ptr)
132 }
133 }
134
135 pub(crate) enum TimeOut {
136 ClockEntry(NonNull<Clock>),
137 Duration(Duration),
138 None,
139 }
140
141 pub(crate) struct Expiration {
142 level: usize,
143 slot: usize,
144 deadline: u64,
145 }
146
147 pub(crate) struct Wheel {
148 // Since the wheel started,
149 // the number of milliseconds elapsed.
150 elapsed: u64,
151
152 // The time wheel levels are similar to a multi-layered dial.
153 //
154 // levels:
155 //
156 // 1 ms slots == 64 ms range
157 // 64 ms slots ~= 4 sec range
158 // 4 sec slots ~= 4 min range
159 // 4 min slots ~= 4 hr range
160 // 4 hr slots ~= 12 day range
161 // 12 day slots ~= 2 yr range
162 levels: Vec<Level>,
163
164 // These corresponding timers have expired,
165 // and are ready to be triggered.
166 trigger: LinkedList<Clock>,
167 }
168
169 impl Wheel {
170 // Creates a new timing wheel.
new() -> Self171 pub(crate) fn new() -> Self {
172 let levels = (0..LEVELS_NUM).map(Level::new).collect();
173
174 Self {
175 elapsed: 0,
176 levels,
177 trigger: Default::default(),
178 }
179 }
180
181 // Return the elapsed.
elapsed(&self) -> u64182 pub(crate) fn elapsed(&self) -> u64 {
183 self.elapsed
184 }
185
186 // Set the elapsed.
set_elapsed(&mut self, elapsed: u64)187 pub(crate) fn set_elapsed(&mut self, elapsed: u64) {
188 self.elapsed = elapsed;
189 }
190
191 // Compare the timing wheel elapsed with the expiration,
192 // from which to decide which level to insert.
find_level(expiration: u64, elapsed: u64) -> usize193 pub(crate) fn find_level(expiration: u64, elapsed: u64) -> usize {
194 // 0011 1111
195 const SLOT_MASK: u64 = (1 << 6) - 1;
196
197 // Use the time difference value to find at which level.
198 // Use XOR to determine the insertion of inspiration currently need to be
199 // inserted into the level, using binary operations to determine which bit has
200 // changed, and further determine which level. If don't use XOR, it will lead to
201 // the re-insertion of the level of the calculation error, and insert to an
202 // incorrect level.
203 let mut masked = (expiration ^ elapsed) | SLOT_MASK;
204 // 1111 1111 1111 1111 1111 1111 1111 1111 1111
205 if masked >= MAX_DURATION {
206 masked = MAX_DURATION - 1;
207 }
208
209 let leading_zeros = masked.leading_zeros() as usize;
210 // Calculate how many valid bits there are.
211 let significant = 63 - leading_zeros;
212
213 // One level per 6 bit,
214 // one slots has 2^6 slots.
215 significant / 6
216 }
217
218 // Insert the corresponding TimerHandle into the specified position in the
219 // timing wheel.
insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error>220 pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error> {
221 let expiration = unsafe { clock_entry.as_ref().expiration() };
222
223 if expiration <= self.elapsed() {
224 // This means that the timeout period has passed,
225 // and the time should be triggered immediately.
226 return Err(Error);
227 }
228
229 let level = Self::find_level(expiration, self.elapsed());
230 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
231 // `Sleep` here does not go into `Ready`.
232 unsafe { clock_entry.as_mut().set_level(level) };
233
234 self.levels[level].insert(clock_entry);
235
236 Ok(expiration)
237 }
238
cancel(&mut self, clock_entry: NonNull<Clock>)239 pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
240 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
241 // `Sleep` here does not go into `Ready`.
242 let level = unsafe { clock_entry.as_ref().level() };
243 self.levels[level].cancel(clock_entry);
244
245 // Caller has unique access to the linked list and the node is not in any other
246 // linked list.
247 unsafe {
248 self.trigger.remove(clock_entry);
249 }
250 }
251
252 // Return where the next expiration is located, and its deadline.
next_expiration(&self) -> Option<Expiration>253 pub(crate) fn next_expiration(&self) -> Option<Expiration> {
254 for level in 0..LEVELS_NUM {
255 if let Some(expiration) = self.levels[level].next_expiration(self.elapsed()) {
256 return Some(expiration);
257 }
258 }
259
260 None
261 }
262
263 // Retrieve the corresponding expired TimerHandle.
process_expiration(&mut self, expiration: &Expiration)264 pub(crate) fn process_expiration(&mut self, expiration: &Expiration) {
265 let mut handles = self.levels[expiration.level].take_slot(expiration.slot);
266 while let Some(mut item) = handles.pop_back() {
267 let expected_expiration = unsafe { item.as_ref().expiration() };
268 if expected_expiration > expiration.deadline {
269 let level = Self::find_level(expected_expiration, expiration.deadline);
270
271 unsafe { item.as_mut().set_level(level) };
272
273 self.levels[level].insert(item);
274 } else {
275 self.trigger.push_front(item);
276 }
277 }
278 }
279
280 // Determine which timers have timed out at the current time.
poll(&mut self, now: u64) -> TimeOut281 pub(crate) fn poll(&mut self, now: u64) -> TimeOut {
282 loop {
283 if let Some(handle) = self.trigger.pop_back() {
284 return TimeOut::ClockEntry(handle);
285 }
286
287 let expiration = self.next_expiration();
288
289 match expiration {
290 Some(ref expiration) if expiration.deadline > now => {
291 return TimeOut::Duration(Duration::from_millis(expiration.deadline - now))
292 }
293 Some(ref expiration) => {
294 self.process_expiration(expiration);
295 self.set_elapsed(expiration.deadline);
296 }
297 None => {
298 self.set_elapsed(now);
299 break;
300 }
301 }
302 }
303
304 match self.trigger.pop_back() {
305 None => TimeOut::None,
306 Some(handle) => TimeOut::ClockEntry(handle),
307 }
308 }
309 }
310
311 // Level in the wheel.
312 // All level contains 64 slots.
313 pub struct Level {
314 // current level
315 level: usize,
316
317 // Determine which slot contains entries based on occupied bit.
318 occupied: u64,
319
320 // slots in a level.
321 slots: [LinkedList<Clock>; SLOTS_NUM],
322 }
323
324 impl Level {
325 // Specify the level and create a Level structure.
new(level: usize) -> Self326 pub(crate) fn new(level: usize) -> Self {
327 let mut slots: [MaybeUninit<LinkedList<Clock>>; SLOTS_NUM] =
328 unsafe { MaybeUninit::uninit().assume_init() };
329
330 for slot in slots.iter_mut() {
331 *slot = MaybeUninit::new(Default::default());
332 }
333
334 unsafe {
335 let slots = mem::transmute::<_, [LinkedList<Clock>; SLOTS_NUM]>(slots);
336 Self {
337 level,
338 occupied: 0,
339 slots,
340 }
341 }
342 }
343
344 // Based on the elapsed which the current time wheel is running,
345 // and the expected expiration time of the clock_entry,
346 // find the corresponding slot and insert it.
insert(&mut self, mut clock_entry: NonNull<Clock>)347 pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) {
348 // This duration represents how long it takes for the current slot to complete,
349 // at least 0.
350 let duration = unsafe { clock_entry.as_ref().expiration() };
351
352 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
353 // `Sleep` here does not go into `Ready`.
354 unsafe { clock_entry.as_mut().set_duration(duration) };
355
356 let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
357 self.slots[slot].push_front(clock_entry);
358
359 self.occupied |= 1 << slot;
360 }
361
cancel(&mut self, clock_entry: NonNull<Clock>)362 pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
363 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
364 // `Sleep` here does not go into `Ready`.
365 let duration = unsafe { clock_entry.as_ref().duration() };
366
367 let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
368
369 // Caller has unique access to the linked list and the node is not in any other
370 // linked list.
371 unsafe {
372 self.slots[slot].remove(clock_entry);
373 }
374
375 if self.slots[slot].is_empty() {
376 // Unset the bit
377 self.occupied &= !(1 << slot);
378 }
379 }
380
381 // Return where the next expiration is located, and its deadline.
next_expiration(&self, now: u64) -> Option<Expiration>382 pub(crate) fn next_expiration(&self, now: u64) -> Option<Expiration> {
383 let slot = self.next_occupied_slot(now)?;
384
385 let deadline = Self::calculate_deadline(slot, self.level, now);
386
387 Some(Expiration {
388 level: self.level,
389 slot,
390 deadline,
391 })
392 }
393
calculate_deadline(slot: usize, level: usize, now: u64) -> u64394 fn calculate_deadline(slot: usize, level: usize, now: u64) -> u64 {
395 let slot_range = slot_range(level);
396 let level_range = slot_range * SLOTS_NUM as u64;
397 let level_start = now & !(level_range - 1);
398 // Add the time of the last slot at this level to represent a time period.
399 let mut deadline = level_start + slot as u64 * slot_range;
400
401 if deadline <= now {
402 // This only happened when Duration > MAX_DURATION
403 deadline += level_range;
404 }
405
406 deadline
407 }
408
409 // Find the next slot that needs to be executed.
next_occupied_slot(&self, now: u64) -> Option<usize>410 pub(crate) fn next_occupied_slot(&self, now: u64) -> Option<usize> {
411 if self.occupied == 0 {
412 return None;
413 }
414
415 let now_slot = now / slot_range(self.level);
416 let occupied = self.occupied.rotate_right(now_slot as u32);
417 let zeros = occupied.trailing_zeros();
418 let slot = (zeros as u64 + now_slot) % SLOTS_NUM as u64;
419
420 Some(slot as usize)
421 }
422
423 // Fetch all timers in a slot of the corresponding level.
take_slot(&mut self, slot: usize) -> LinkedList<Clock>424 pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList<Clock> {
425 self.occupied &= !(1 << slot);
426 mem::take(&mut self.slots[slot])
427 }
428 }
429
430 // All the slots before this level add up to approximately.
slot_range(level: usize) -> u64431 fn slot_range(level: usize) -> u64 {
432 SLOTS_NUM.pow(level as u32) as u64
433 }
434
435 #[cfg(test)]
436 mod test {
437 use crate::time::wheel::{Level, Wheel, LEVELS_NUM};
438 cfg_net!(
439 #[cfg(feature = "ffrt")]
440 use crate::time::TimeDriver;
441 use crate::time::{sleep, timeout};
442 use crate::net::UdpSocket;
443 use crate::task::JoinHandle;
444 use std::net::SocketAddr;
445 use std::time::Duration;
446 );
447
448 /// UT test cases for Wheel::new
449 ///
450 /// # Brief
451 /// 1. Use Wheel::new to create a Wheel Struct.
452 /// 2. Verify the data in the Wheel Struct.
453 #[test]
ut_wheel_new_test()454 fn ut_wheel_new_test() {
455 let wheel = Wheel::new();
456 assert_eq!(wheel.elapsed, 0);
457 assert_eq!(wheel.levels.len(), LEVELS_NUM);
458 }
459
460 /// UT test cases for Sleep drop.
461 ///
462 /// # Brief
463 /// 1. Use timeout to create a Timeout Struct.
464 /// 2. Enable the Sleep Struct corresponding to the Timeout Struct to enter
465 /// the Pending state.
466 /// 3. Verify the change of the internal TimerHandle during Sleep Struct
467 /// drop.
468 #[test]
469 #[cfg(feature = "net")]
ut_sleep_drop()470 fn ut_sleep_drop() {
471 async fn udp_sender(sender_addr: SocketAddr, receiver_addr: SocketAddr) {
472 let sender = UdpSocket::bind(sender_addr).await.unwrap();
473 let buf = [2; 10];
474 sleep(Duration::from_secs(1)).await;
475 sender.send_to(buf.as_slice(), receiver_addr).await.unwrap();
476 }
477
478 async fn udp_receiver(receiver_addr: SocketAddr) {
479 let receiver = UdpSocket::bind(receiver_addr).await.unwrap();
480 let mut buf = [0; 10];
481 assert!(
482 timeout(Duration::from_secs(2), receiver.recv_from(&mut buf[..]))
483 .await
484 .is_ok()
485 );
486 }
487
488 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
489 let udp_sender_addr = "127.0.0.1:9093".parse().unwrap();
490 let udp_receiver_addr = "127.0.0.1:9094".parse().unwrap();
491 tasks.push(crate::spawn(udp_sender(udp_sender_addr, udp_receiver_addr)));
492 tasks.push(crate::spawn(udp_receiver(udp_receiver_addr)));
493 for t in tasks {
494 let _ = crate::block_on(t);
495 }
496 #[cfg(feature = "ffrt")]
497 let lock = TimeDriver::get_ref().wheel.lock().unwrap();
498 #[cfg(feature = "ffrt")]
499 for slot in lock.levels[1].slots.iter() {
500 assert!(slot.is_empty());
501 }
502 }
503
504 /// UT test cases for Level::calculate_deadline
505 ///
506 /// # Brief
507 /// 1. Use Level::calculate_deadline() to calculate Level.
508 /// 2. Verify the deadline is right.
509 #[test]
ut_wheel_calculate_deadline()510 fn ut_wheel_calculate_deadline() {
511 let deadline = Level::calculate_deadline(36, 0, 95);
512 assert_eq!(deadline, 100);
513 let deadline = Level::calculate_deadline(1, 1, 63);
514 assert_eq!(deadline, 64);
515 let deadline = Level::calculate_deadline(37, 0, 79);
516 assert_eq!(deadline, 101);
517 let deadline = Level::calculate_deadline(31, 1, 960);
518 assert_eq!(deadline, 1984);
519 let deadline = Level::calculate_deadline(61, 1, 7001);
520 assert_eq!(deadline, 8000);
521 let deadline = Level::calculate_deadline(2, 2, 8001);
522 assert_eq!(deadline, 8192);
523 let deadline = Level::calculate_deadline(12, 1, 8192);
524 assert_eq!(deadline, 8960);
525 let deadline = Level::calculate_deadline(40, 0, 8960);
526 assert_eq!(deadline, 9000);
527 }
528 }
529