1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::fmt::Error;
15 use std::mem;
16 use std::mem::MaybeUninit;
17 use std::ptr::NonNull;
18 use std::time::Duration;
19
20 use crate::time::Clock;
21 use crate::util::linked_list::LinkedList;
22
23 // In a slots, the number of slot.
24 const SLOTS_NUM: usize = 64;
25
26 // In a levels, the number of level.
27 const LEVELS_NUM: usize = 6;
28
29 // Maximum sleep duration.
30 pub(crate) const MAX_DURATION: u64 = (1 << (6 * LEVELS_NUM)) - 1;
31
32 pub(crate) enum TimeOut {
33 ClockEntry(NonNull<Clock>),
34 Duration(Duration),
35 None,
36 }
37
38 pub(crate) struct Wheel {
39 // Since the wheel started,
40 // the number of milliseconds elapsed.
41 elapsed: u64,
42
43 // Since the time wheel started,
44 // to the end of the last future run,
45 // the number of milliseconds elapsed.
46 last_elapsed: u64,
47
48 // The time wheel levels are similar to a multi-layered dial.
49 //
50 // levels:
51 //
52 // 1 ms slots == 64 ms range
53 // 64 ms slots ~= 4 sec range
54 // 4 sec slots ~= 4 min range
55 // 4 min slots ~= 4 hr range
56 // 4 hr slots ~= 12 day range
57 // 12 day slots ~= 2 yr range
58 levels: Vec<Level>,
59
60 // These corresponding timers have expired,
61 // and are ready to be triggered.
62 trigger: LinkedList<Clock>,
63 }
64
65 impl Wheel {
66 // Creates a new timing wheel.
new() -> Self67 pub(crate) fn new() -> Self {
68 let levels = (0..LEVELS_NUM).map(Level::new).collect();
69
70 Self {
71 elapsed: 0,
72 last_elapsed: 0,
73 levels,
74 trigger: Default::default(),
75 }
76 }
77
78 // Return the elapsed.
elapsed(&self) -> u6479 pub(crate) fn elapsed(&self) -> u64 {
80 self.elapsed
81 }
82
83 // Set the elapsed.
set_elapsed(&mut self, elapsed: u64)84 pub(crate) fn set_elapsed(&mut self, elapsed: u64) {
85 self.elapsed = elapsed;
86 }
87
88 // Return the last_elapsed.
last_elapsed(&self) -> u6489 pub(crate) fn last_elapsed(&self) -> u64 {
90 self.last_elapsed
91 }
92
93 // Set the last_elapsed.
set_last_elapsed(&mut self, last_elapsed: u64)94 pub(crate) fn set_last_elapsed(&mut self, last_elapsed: u64) {
95 self.last_elapsed = last_elapsed;
96 }
97
98 // Compare the timing wheel elapsed with the expiration,
99 // from which to decide which level to insert.
find_level(&self, expiration: u64) -> usize100 pub(crate) fn find_level(&self, expiration: u64) -> usize {
101 // 0011 1111
102 const SLOT_MASK: u64 = (1 << 6) - 1;
103
104 // Use the time difference value to find at which level.
105 let mut masked = (expiration - self.elapsed()) | SLOT_MASK;
106
107 // 1111 1111 1111 1111 1111 1111 1111 1111 1111
108 if masked >= MAX_DURATION {
109 masked = MAX_DURATION - 1;
110 }
111
112 let leading_zeros = masked.leading_zeros() as usize;
113 // Calculate how many valid bits there are.
114 let significant = 63 - leading_zeros;
115
116 // One level per 6 bit,
117 // one slots has 2^6 slots.
118 significant / 6
119 }
120
121 // Insert the corresponding TimerHandle into the specified position in the
122 // timing wheel.
insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error>123 pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>) -> Result<u64, Error> {
124 let expiration = unsafe { clock_entry.as_ref().expiration() };
125
126 if expiration <= self.elapsed() {
127 // This means that the timeout period has passed,
128 // and the time should be triggered immediately.
129 return Err(Error::default());
130 }
131
132 let level = self.find_level(expiration);
133 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
134 // `Sleep` here does not go into `Ready`.
135 unsafe { clock_entry.as_mut().set_level(level) };
136
137 self.levels[level].insert(clock_entry, self.elapsed);
138
139 Ok(expiration)
140 }
141
cancel(&mut self, clock_entry: NonNull<Clock>)142 pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
143 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
144 // `Sleep` here does not go into `Ready`.
145 let level = unsafe { clock_entry.as_ref().level() };
146 self.levels[level].cancel(clock_entry);
147
148 // Caller has unique access to the linked list and the node is not in any other
149 // linked list.
150 unsafe {
151 self.trigger.remove(clock_entry);
152 }
153 }
154
155 // Return where the next expiration is located, and its deadline.
next_expiration(&self) -> Option<(usize, usize, u64)>156 pub(crate) fn next_expiration(&self) -> Option<(usize, usize, u64)> {
157 for level in 0..LEVELS_NUM {
158 if let Some(expiration) =
159 self.levels[level].next_expiration(self.elapsed() - self.last_elapsed())
160 {
161 return Some(expiration);
162 }
163 }
164
165 None
166 }
167
168 // Retrieve the corresponding expired TimerHandle.
process_expiration(&mut self, expiration: &(usize, usize, u64))169 pub(crate) fn process_expiration(&mut self, expiration: &(usize, usize, u64)) {
170 let mut handles = self.levels[expiration.0].take_slot(expiration.1);
171 while let Some(item) = handles.pop_back() {
172 self.trigger.push_front(item);
173 }
174 }
175
176 // Determine which timers have timed out at the current time.
poll(&mut self, now: u64) -> TimeOut177 pub(crate) fn poll(&mut self, now: u64) -> TimeOut {
178 loop {
179 if let Some(handle) = self.trigger.pop_back() {
180 return TimeOut::ClockEntry(handle);
181 }
182
183 let expiration = self.next_expiration();
184
185 match expiration {
186 Some(ref expiration) if expiration.2 > now - self.last_elapsed() => {
187 return TimeOut::Duration(Duration::from_millis(
188 expiration.2 - (now - self.last_elapsed()),
189 ))
190 }
191 Some(ref expiration) => {
192 self.process_expiration(expiration);
193 self.set_elapsed(now);
194 }
195 None => {
196 self.set_elapsed(now);
197 break;
198 }
199 }
200 }
201
202 match self.trigger.pop_back() {
203 None => TimeOut::None,
204 Some(handle) => TimeOut::ClockEntry(handle),
205 }
206 }
207 }
208
209 // Level in the wheel.
210 // All level contains 64 slots.
211 pub struct Level {
212 // current level
213 level: usize,
214
215 // Determine which slot contains entries based on occupied bit.
216 occupied: u64,
217
218 // slots in a level.
219 slots: [LinkedList<Clock>; SLOTS_NUM],
220 }
221
222 impl Level {
223 // Specify the level and create a Level structure.
new(level: usize) -> Self224 pub(crate) fn new(level: usize) -> Self {
225 let mut slots: [MaybeUninit<LinkedList<Clock>>; SLOTS_NUM] =
226 unsafe { MaybeUninit::uninit().assume_init() };
227
228 for slot in slots.iter_mut() {
229 *slot = MaybeUninit::new(Default::default());
230 }
231
232 unsafe {
233 let slots = mem::transmute::<_, [LinkedList<Clock>; SLOTS_NUM]>(slots);
234 Self {
235 level,
236 occupied: 0,
237 slots,
238 }
239 }
240 }
241
242 // Based on the elapsed which the current time wheel is running,
243 // and the expected expiration time of the clock_entry,
244 // find the corresponding slot and insert it.
insert(&mut self, mut clock_entry: NonNull<Clock>, elapsed: u64)245 pub(crate) fn insert(&mut self, mut clock_entry: NonNull<Clock>, elapsed: u64) {
246 let duration = unsafe { clock_entry.as_ref().expiration() } - elapsed;
247 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
248 // `Sleep` here does not go into `Ready`.
249 unsafe { clock_entry.as_mut().set_duration(duration) };
250
251 let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
252
253 self.slots[slot].push_front(clock_entry);
254
255 self.occupied |= 1 << slot;
256 }
257
cancel(&mut self, clock_entry: NonNull<Clock>)258 pub(crate) fn cancel(&mut self, clock_entry: NonNull<Clock>) {
259 // Unsafe access to clock_entry is only unsafe when Sleep Drop,
260 // `Sleep` here does not go into `Ready`.
261 let duration = unsafe { clock_entry.as_ref().duration() };
262
263 let slot = ((duration >> (self.level * LEVELS_NUM)) % SLOTS_NUM as u64) as usize;
264
265 // Caller has unique access to the linked list and the node is not in any other
266 // linked list.
267 unsafe {
268 self.slots[slot].remove(clock_entry);
269 }
270
271 if self.slots[slot].is_empty() {
272 // Unset the bit
273 self.occupied ^= 1 << slot;
274 }
275 }
276
277 // Return where the next expiration is located, and its deadline.
next_expiration(&self, now: u64) -> Option<(usize, usize, u64)>278 pub(crate) fn next_expiration(&self, now: u64) -> Option<(usize, usize, u64)> {
279 let slot = self.next_occupied_slot(now)?;
280
281 let level_range = level_range(self.level);
282 let slot_range = slot_range(self.level);
283
284 // Find the start time at this level for the current point in time.
285 let level_start = now & !(level_range - 1);
286 // Add the time of the last slot at this level to represent a time period.
287 let deadline = level_start + slot as u64 * slot_range;
288
289 Some((self.level, slot, deadline))
290 }
291
292 // Find the next slot that needs to be executed.
next_occupied_slot(&self, now: u64) -> Option<usize>293 pub(crate) fn next_occupied_slot(&self, now: u64) -> Option<usize> {
294 if self.occupied == 0 {
295 return None;
296 }
297
298 let now_slot = now / slot_range(self.level);
299 let occupied = self.occupied.rotate_right(now_slot as u32);
300 let zeros = occupied.trailing_zeros();
301 let slot = (zeros as u64 + now_slot) % SLOTS_NUM as u64;
302
303 Some(slot as usize)
304 }
305
306 // Fetch all timers in a slot of the corresponding level.
take_slot(&mut self, slot: usize) -> LinkedList<Clock>307 pub(crate) fn take_slot(&mut self, slot: usize) -> LinkedList<Clock> {
308 self.occupied &= !(1 << slot);
309 mem::take(&mut self.slots[slot])
310 }
311 }
312
313 // All the slots before this level add up to approximately.
slot_range(level: usize) -> u64314 fn slot_range(level: usize) -> u64 {
315 SLOTS_NUM.pow(level as u32) as u64
316 }
317
318 // All the slots before this level(including this level) add up to
319 // approximately.
level_range(level: usize) -> u64320 fn level_range(level: usize) -> u64 {
321 SLOTS_NUM as u64 * slot_range(level)
322 }
323
324 #[cfg(test)]
325 mod test {
326 use crate::time::wheel::{Wheel, LEVELS_NUM};
327 cfg_net!(
328 #[cfg(feature = "ffrt")]
329 use crate::time::TimeDriver;
330 use crate::time::{sleep, timeout};
331 use crate::net::UdpSocket;
332 use crate::JoinHandle;
333 use std::net::SocketAddr;
334 use std::time::Duration;
335 );
336
337 /// UT test cases for Wheel::new
338 ///
339 /// # Brief
340 /// 1. Use Wheel::new to create a Wheel Struct.
341 /// 2. Verify the data in the Wheel Struct.
342 #[test]
ut_wheel_new_test()343 fn ut_wheel_new_test() {
344 let wheel = Wheel::new();
345 assert_eq!(wheel.elapsed, 0);
346 assert_eq!(wheel.last_elapsed, 0);
347 assert_eq!(wheel.levels.len(), LEVELS_NUM);
348 }
349
350 /// UT test cases for Sleep drop.
351 ///
352 /// # Brief
353 /// 1. Use timeout to create a Timeout Struct.
354 /// 2. Enable the Sleep Struct corresponding to the Timeout Struct to enter
355 /// the Pending state.
356 /// 3. Verify the change of the internal TimerHandle during Sleep Struct
357 /// drop.
358 #[test]
359 #[cfg(feature = "net")]
ut_sleep_drop()360 fn ut_sleep_drop() {
361 async fn udp_sender(sender_addr: SocketAddr, receiver_addr: SocketAddr) {
362 let sender = UdpSocket::bind(sender_addr).await.unwrap();
363 let buf = [2; 10];
364 sleep(Duration::from_secs(1)).await;
365 sender.send_to(buf.as_slice(), receiver_addr).await.unwrap();
366 }
367
368 async fn udp_receiver(receiver_addr: SocketAddr) {
369 let receiver = UdpSocket::bind(receiver_addr).await.unwrap();
370 let mut buf = [0; 10];
371 assert!(
372 timeout(Duration::from_secs(2), receiver.recv_from(&mut buf[..]))
373 .await
374 .is_ok()
375 );
376 }
377
378 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
379 let udp_sender_addr = "127.0.0.1:9093".parse().unwrap();
380 let udp_receiver_addr = "127.0.0.1:9094".parse().unwrap();
381 tasks.push(crate::spawn(udp_sender(udp_sender_addr, udp_receiver_addr)));
382 tasks.push(crate::spawn(udp_receiver(udp_receiver_addr)));
383 for t in tasks {
384 let _ = crate::block_on(t);
385 }
386 #[cfg(feature = "ffrt")]
387 let lock = TimeDriver::get_ref().wheel.lock().unwrap();
388 #[cfg(feature = "ffrt")]
389 for slot in lock.levels[1].slots.iter() {
390 assert!(slot.is_empty());
391 }
392 }
393 }
394