• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::convert::TryInto;
15 use std::future::Future;
16 use std::marker::PhantomPinned;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 use std::time::{Duration, Instant};
20 
21 const TEN_YEARS: Duration = Duration::from_secs(86400 * 365 * 10);
22 
23 /// Waits until 'instant' has reached.
24 ///
25 /// # Panic
26 /// Calling this method outside of a Ylong Runtime could cause panic, for
27 /// example, outside of an async closure that is passed to ylong_runtime::spawn
28 /// or ylong_runtime::block_on. The async wrapping is necessary since it makes
29 /// the function become lazy in order to get successfully executed on the
30 /// runtime.
sleep_until(instant: Instant) -> Sleep31 pub fn sleep_until(instant: Instant) -> Sleep {
32     Sleep::new_timeout(instant)
33 }
34 
35 /// Waits until 'duration' has elapsed.
36 ///
37 /// # Panic
38 /// Calling this method outside of a Ylong Runtime could cause panic, for
39 /// example, outside of an async closure that is passed to ylong_runtime::spawn
40 /// or ylong_runtime::block_on. The async wrapping is necessary since it makes
41 /// the function become lazy in order to get successfully executed on the
42 /// runtime.
sleep(duration: Duration) -> Sleep43 pub fn sleep(duration: Duration) -> Sleep {
44     // If the time reaches the maximum value,
45     // then set the default timing time to 10 years.
46     match Instant::now().checked_add(duration) {
47         Some(deadline) => Sleep::new_timeout(deadline),
48         None => Sleep::new_timeout(Instant::now() + TEN_YEARS),
49     }
50 }
51 
52 /// A structure that implements Future. returned by func [`sleep`].
53 ///
54 /// [`sleep`]: sleep
55 /// # Examples
56 ///
57 /// ```
58 /// use std::time::Duration;
59 ///
60 /// use ylong_runtime::time::sleep;
61 ///
62 /// async fn sleep_test() {
63 ///     let sleep = sleep(Duration::from_secs(2)).await;
64 ///     println!("2 secs have elapsed");
65 /// }
66 /// ```
67 pub struct Sleep {
68     // During the polling of this structure, no repeated insertion.
69     need_insert: bool,
70 
71     // The time at which the structure should end.
72     deadline: Instant,
73 
74     inner: SleepInner,
75 
76     _phantom: PhantomPinned,
77 }
78 
79 cfg_ffrt!(
80     use crate::ffrt::ffrt_timer::FfrtTimerEntry;
81     use std::task::Waker;
82 
83     struct SleepInner {
84         // ffrt timer handle
85         timer: Option<FfrtTimerEntry>,
86         // the waker to wakeup the timer task
87         waker: Option<*mut Waker>,
88     }
89 
90     // FFRT needs this unsafe impl since `Sleep` has a mut pointer in it.
91     // In non-ffrt environment, `Sleep` auto-derives Sync & Send.
92     unsafe impl Send for Sleep {}
93     unsafe impl Sync for Sleep {}
94 
95     impl Sleep {
96         // Creates a Sleep structure based on the given deadline.
97         fn new_timeout(deadline: Instant) -> Self {
98             Self {
99                 need_insert: true,
100                 deadline,
101                 inner: SleepInner {
102                     timer: None,
103                     waker: None,
104                 }
105             }
106         }
107 
108         // Resets the deadline of the Sleep
109         pub(crate) fn reset(&mut self, new_deadline: Instant) {
110             self.need_insert = true;
111             self.deadline = new_deadline;
112 
113             if let Some(waker) = self.inner.waker.take() {
114                 unsafe {
115                     drop(Box::from_raw(waker));
116                 }
117             }
118         }
119 
120         // Cancels the Sleep
121         fn cancel(&mut self) {
122             if let Some(timer) = self.inner.timer.take() {
123                 timer.timer_deregister();
124             }
125             if let Some(waker) = self.inner.waker.take() {
126                 unsafe {
127                     drop(Box::from_raw(waker));
128                 }
129             }
130         }
131     }
132 
133     impl Future for Sleep {
134         type Output = ();
135 
136         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137             let this = self.get_mut();
138 
139             if this.need_insert {
140                 if let Some(duration) = this.deadline.checked_duration_since(Instant::now()) {
141                     let ms = duration.as_millis()
142                     .try_into()
143                     .unwrap_or(u64::MAX);
144 
145                     let waker = Box::new(cx.waker().clone());
146                     let waker_ptr = Box::into_raw(waker);
147 
148                     if let Some(waker) = this.inner.waker.take() {
149                         unsafe { drop(Box::from_raw(waker)); }
150                     }
151 
152                     this.inner.waker = Some(waker_ptr);
153                     this.inner.timer = Some(FfrtTimerEntry::timer_register(waker_ptr, ms));
154                     this.need_insert = false;
155                 } else {
156                     return Poll::Ready(());
157                 }
158             }
159 
160             // this unwrap is safe since we have already insert the timer into the entry
161             let timer = this.inner.timer.as_ref().unwrap();
162             if timer.result() {
163                 Poll::Ready(())
164             } else {
165                 Poll::Pending
166             }
167         }
168     }
169 );
170 
171 impl Sleep {
172     // Returns the deadline of the Sleep
deadline(&self) -> Instant173     pub(crate) fn deadline(&self) -> Instant {
174         self.deadline
175     }
176 }
177 
178 cfg_not_ffrt!(
179     use crate::executor::driver::Handle;
180     use crate::time::Clock;
181     use std::sync::Arc;
182     use std::cmp;
183     use std::ptr::NonNull;
184 
185     struct SleepInner {
186         // Corresponding Timer structure.
187         timer: Clock,
188         // Timer driver handle
189         handle: Arc<Handle>,
190     }
191 
192     impl Sleep {
193         // Creates a Sleep structure based on the given deadline.
194         fn new_timeout(deadline: Instant) -> Self {
195             let handle = Handle::get_handle().unwrap_or_else(|e| panic!("sleep new out of worker ctx, error: {e}"));
196 
197             let start_time = handle.start_time();
198             let deadline = cmp::max(deadline, start_time);
199 
200             let timer = Clock::new();
201             Self {
202                 need_insert: true,
203                 deadline,
204                 inner: SleepInner {
205                     timer,
206                     handle,
207                 },
208                 _phantom: PhantomPinned,
209             }
210         }
211 
212         // Resets the deadline of the Sleep
213         pub(crate) fn reset(self: Pin<&mut Self>, new_deadline: Instant) {
214             let this = unsafe { self.get_unchecked_mut() };
215             this.need_insert = true;
216             this.deadline = new_deadline;
217             this.inner.timer.set_result(false);
218         }
219 
220         // Cancels the Sleep
221         fn cancel(&mut self) {
222             let driver = &self.inner.handle;
223             driver.timer_cancel(NonNull::from(&self.inner.timer));
224         }
225     }
226 
227     impl Future for Sleep {
228         type Output = ();
229 
230         fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231             let this = unsafe { self.get_unchecked_mut() };
232             let driver = &this.inner.handle;
233 
234             if this.need_insert {
235                 // the deadline is guaranteed to be later than the start time
236                 let ms = this
237                     .deadline
238                     .checked_duration_since(driver.start_time())
239                     .unwrap()
240                     .as_millis()
241                     .try_into()
242                     .unwrap_or(u64::MAX);
243                 this.inner.timer.set_expiration(ms);
244                 this.inner.timer.set_waker(cx.waker().clone());
245 
246                 match driver.timer_register(NonNull::from(&this.inner.timer)) {
247                     Ok(_) => this.need_insert = false,
248                     Err(_) => {
249                         // Even if the insertion fails, there is no need to insert again here,
250                         // it is a timeout clock and needs to be triggered immediately at the next poll.
251                         this.need_insert = false;
252                         this.inner.timer.set_result(true);
253                     }
254                 }
255             }
256 
257             if this.inner.timer.result() {
258                 Poll::Ready(())
259             } else {
260                 Poll::Pending
261             }
262         }
263     }
264 );
265 
266 impl Drop for Sleep {
drop(&mut self)267     fn drop(&mut self) {
268         // For some uses, for example, Timeout,
269         // `Sleep` enters the `Pending` state first and inserts the `TimerHandle` into
270         // the `DRIVER`, the future of timeout returns `Ready` in advance of the
271         // next polling, as a result, the `TimerHandle` pointer in the `DRIVER`
272         // is invalid. need to cancel the `TimerHandle` operation during `Sleep`
273         // drop.
274         self.cancel()
275     }
276 }
277 
278 #[cfg(test)]
279 mod test {
280     use std::sync::atomic::{AtomicUsize, Ordering};
281     use std::sync::Arc;
282     use std::time::{Duration, Instant};
283 
284     use crate::time::{sleep, sleep_until};
285     use crate::{block_on, spawn};
286 
287     /// UT test cases for new_sleep
288     ///
289     /// # Brief
290     /// 1. Uses sleep to create a Sleep Struct.
291     /// 2. Uses block_on to test different sleep duration.
292     #[test]
ut_new_timer_sleep()293     fn ut_new_timer_sleep() {
294         let val = Arc::new(AtomicUsize::new(0));
295         let val_cpy = val.clone();
296         block_on(async move {
297             sleep(Duration::new(0, 20_000_000)).await;
298             sleep(Duration::new(0, 20_000_000)).await;
299             sleep(Duration::new(0, 20_000_000)).await;
300             val_cpy.fetch_add(1, Ordering::Relaxed);
301         });
302 
303         assert_eq!(val.load(Ordering::Relaxed), 1);
304         let val_cpy2 = val.clone();
305         let val_cpy3 = val.clone();
306         let val_cpy4 = val.clone();
307         let handle_one = spawn(async move {
308             sleep(Duration::new(0, 20_000_000)).await;
309             val_cpy2.fetch_add(1, Ordering::Relaxed);
310         });
311         let handle_two = spawn(async move {
312             sleep(Duration::new(0, 20_000_000)).await;
313             val_cpy3.fetch_add(1, Ordering::Relaxed);
314         });
315         let handle_three = spawn(async move {
316             sleep(Duration::new(0, 20_000_000)).await;
317             val_cpy4.fetch_add(1, Ordering::Relaxed);
318         });
319         block_on(handle_one).unwrap();
320         block_on(handle_two).unwrap();
321         block_on(handle_three).unwrap();
322         assert_eq!(val.load(Ordering::Relaxed), 4);
323     }
324 
325     /// UT test cases for sleep zero second or sleep until a past instant
326     ///
327     /// # Brief
328     /// 1. Call sleep with a duration of zero, check if the val is successfully
329     ///    added.
330     /// 2. Call sleep with a past instant, check if the val is successfully
331     ///    added.
332     #[test]
ut_timer_sleep_zero()333     fn ut_timer_sleep_zero() {
334         let mut val = 0;
335         let past = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
336         let mut val = block_on(async move {
337             sleep(Duration::new(0, 0)).await;
338             val += 1;
339             val
340         });
341         assert_eq!(val, 1);
342 
343         let val = block_on(async move {
344             sleep_until(past).await;
345             val += 1;
346             val
347         });
348         assert_eq!(val, 2);
349     }
350 }
351