1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::convert::TryInto;
15 use std::future::Future;
16 use std::marker::PhantomPinned;
17 use std::pin::Pin;
18 use std::task::{Context, Poll};
19 use std::time::{Duration, Instant};
20
21 const TEN_YEARS: Duration = Duration::from_secs(86400 * 365 * 10);
22
23 /// Waits until 'instant' has reached.
24 ///
25 /// # Panic
26 /// Calling this method outside of a Ylong Runtime could cause panic, for
27 /// example, outside of an async closure that is passed to ylong_runtime::spawn
28 /// or ylong_runtime::block_on. The async wrapping is necessary since it makes
29 /// the function become lazy in order to get successfully executed on the
30 /// runtime.
sleep_until(instant: Instant) -> Sleep31 pub fn sleep_until(instant: Instant) -> Sleep {
32 Sleep::new_timeout(instant)
33 }
34
35 /// Waits until 'duration' has elapsed.
36 ///
37 /// # Panic
38 /// Calling this method outside of a Ylong Runtime could cause panic, for
39 /// example, outside of an async closure that is passed to ylong_runtime::spawn
40 /// or ylong_runtime::block_on. The async wrapping is necessary since it makes
41 /// the function become lazy in order to get successfully executed on the
42 /// runtime.
sleep(duration: Duration) -> Sleep43 pub fn sleep(duration: Duration) -> Sleep {
44 // If the time reaches the maximum value,
45 // then set the default timing time to 10 years.
46 match Instant::now().checked_add(duration) {
47 Some(deadline) => Sleep::new_timeout(deadline),
48 None => Sleep::new_timeout(Instant::now() + TEN_YEARS),
49 }
50 }
51
52 /// A structure that implements Future. returned by func [`sleep`].
53 ///
54 /// [`sleep`]: sleep
55 /// # Examples
56 ///
57 /// ```
58 /// use std::time::Duration;
59 ///
60 /// use ylong_runtime::time::sleep;
61 ///
62 /// async fn sleep_test() {
63 /// let sleep = sleep(Duration::from_secs(2)).await;
64 /// println!("2 secs have elapsed");
65 /// }
66 /// ```
67 pub struct Sleep {
68 // During the polling of this structure, no repeated insertion.
69 need_insert: bool,
70
71 // The time at which the structure should end.
72 deadline: Instant,
73
74 inner: SleepInner,
75
76 _phantom: PhantomPinned,
77 }
78
79 cfg_ffrt!(
80 use crate::ffrt::ffrt_timer::FfrtTimerEntry;
81 use std::task::Waker;
82
83 struct SleepInner {
84 // ffrt timer handle
85 timer: Option<FfrtTimerEntry>,
86 // the waker to wakeup the timer task
87 waker: Option<*mut Waker>,
88 }
89
90 // FFRT needs this unsafe impl since `Sleep` has a mut pointer in it.
91 // In non-ffrt environment, `Sleep` auto-derives Sync & Send.
92 unsafe impl Send for Sleep {}
93 unsafe impl Sync for Sleep {}
94
95 impl Sleep {
96 // Creates a Sleep structure based on the given deadline.
97 fn new_timeout(deadline: Instant) -> Self {
98 Self {
99 need_insert: true,
100 deadline,
101 inner: SleepInner {
102 timer: None,
103 waker: None,
104 }
105 }
106 }
107
108 // Resets the deadline of the Sleep
109 pub(crate) fn reset(&mut self, new_deadline: Instant) {
110 self.need_insert = true;
111 self.deadline = new_deadline;
112
113 if let Some(waker) = self.inner.waker.take() {
114 unsafe {
115 drop(Box::from_raw(waker));
116 }
117 }
118 }
119
120 // Cancels the Sleep
121 fn cancel(&mut self) {
122 if let Some(timer) = self.inner.timer.take() {
123 timer.timer_deregister();
124 }
125 if let Some(waker) = self.inner.waker.take() {
126 unsafe {
127 drop(Box::from_raw(waker));
128 }
129 }
130 }
131 }
132
133 impl Future for Sleep {
134 type Output = ();
135
136 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
137 let this = self.get_mut();
138
139 if this.need_insert {
140 if let Some(duration) = this.deadline.checked_duration_since(Instant::now()) {
141 let ms = duration.as_millis()
142 .try_into()
143 .unwrap_or(u64::MAX);
144
145 let waker = Box::new(cx.waker().clone());
146 let waker_ptr = Box::into_raw(waker);
147
148 if let Some(waker) = this.inner.waker.take() {
149 unsafe { drop(Box::from_raw(waker)); }
150 }
151
152 this.inner.waker = Some(waker_ptr);
153 this.inner.timer = Some(FfrtTimerEntry::timer_register(waker_ptr, ms));
154 this.need_insert = false;
155 } else {
156 return Poll::Ready(());
157 }
158 }
159
160 // this unwrap is safe since we have already insert the timer into the entry
161 let timer = this.inner.timer.as_ref().unwrap();
162 if timer.result() {
163 Poll::Ready(())
164 } else {
165 Poll::Pending
166 }
167 }
168 }
169 );
170
171 impl Sleep {
172 // Returns the deadline of the Sleep
deadline(&self) -> Instant173 pub(crate) fn deadline(&self) -> Instant {
174 self.deadline
175 }
176 }
177
178 cfg_not_ffrt!(
179 use crate::executor::driver::Handle;
180 use crate::time::Clock;
181 use std::sync::Arc;
182 use std::cmp;
183 use std::ptr::NonNull;
184
185 struct SleepInner {
186 // Corresponding Timer structure.
187 timer: Clock,
188 // Timer driver handle
189 handle: Arc<Handle>,
190 }
191
192 impl Sleep {
193 // Creates a Sleep structure based on the given deadline.
194 fn new_timeout(deadline: Instant) -> Self {
195 let handle = Handle::get_handle().unwrap_or_else(|e| panic!("sleep new out of worker ctx, error: {e}"));
196
197 let start_time = handle.start_time();
198 let deadline = cmp::max(deadline, start_time);
199
200 let timer = Clock::new();
201 Self {
202 need_insert: true,
203 deadline,
204 inner: SleepInner {
205 timer,
206 handle,
207 },
208 _phantom: PhantomPinned,
209 }
210 }
211
212 // Resets the deadline of the Sleep
213 pub(crate) fn reset(self: Pin<&mut Self>, new_deadline: Instant) {
214 let this = unsafe { self.get_unchecked_mut() };
215 this.need_insert = true;
216 this.deadline = new_deadline;
217 this.inner.timer.set_result(false);
218 }
219
220 // Cancels the Sleep
221 fn cancel(&mut self) {
222 let driver = &self.inner.handle;
223 driver.timer_cancel(NonNull::from(&self.inner.timer));
224 }
225 }
226
227 impl Future for Sleep {
228 type Output = ();
229
230 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
231 let this = unsafe { self.get_unchecked_mut() };
232 let driver = &this.inner.handle;
233
234 if this.need_insert {
235 // the deadline is guaranteed to be later than the start time
236 let ms = this
237 .deadline
238 .checked_duration_since(driver.start_time())
239 .unwrap()
240 .as_millis()
241 .try_into()
242 .unwrap_or(u64::MAX);
243 this.inner.timer.set_expiration(ms);
244 this.inner.timer.set_waker(cx.waker().clone());
245
246 match driver.timer_register(NonNull::from(&this.inner.timer)) {
247 Ok(_) => this.need_insert = false,
248 Err(_) => {
249 // Even if the insertion fails, there is no need to insert again here,
250 // it is a timeout clock and needs to be triggered immediately at the next poll.
251 this.need_insert = false;
252 this.inner.timer.set_result(true);
253 }
254 }
255 }
256
257 if this.inner.timer.result() {
258 Poll::Ready(())
259 } else {
260 Poll::Pending
261 }
262 }
263 }
264 );
265
266 impl Drop for Sleep {
drop(&mut self)267 fn drop(&mut self) {
268 // For some uses, for example, Timeout,
269 // `Sleep` enters the `Pending` state first and inserts the `TimerHandle` into
270 // the `DRIVER`, the future of timeout returns `Ready` in advance of the
271 // next polling, as a result, the `TimerHandle` pointer in the `DRIVER`
272 // is invalid. need to cancel the `TimerHandle` operation during `Sleep`
273 // drop.
274 self.cancel()
275 }
276 }
277
278 #[cfg(test)]
279 mod test {
280 use std::sync::atomic::{AtomicUsize, Ordering};
281 use std::sync::Arc;
282 use std::time::{Duration, Instant};
283
284 use crate::time::{sleep, sleep_until};
285 use crate::{block_on, spawn};
286
287 /// UT test cases for new_sleep
288 ///
289 /// # Brief
290 /// 1. Uses sleep to create a Sleep Struct.
291 /// 2. Uses block_on to test different sleep duration.
292 #[test]
ut_new_timer_sleep()293 fn ut_new_timer_sleep() {
294 let val = Arc::new(AtomicUsize::new(0));
295 let val_cpy = val.clone();
296 block_on(async move {
297 sleep(Duration::new(0, 20_000_000)).await;
298 sleep(Duration::new(0, 20_000_000)).await;
299 sleep(Duration::new(0, 20_000_000)).await;
300 val_cpy.fetch_add(1, Ordering::Relaxed);
301 });
302
303 assert_eq!(val.load(Ordering::Relaxed), 1);
304 let val_cpy2 = val.clone();
305 let val_cpy3 = val.clone();
306 let val_cpy4 = val.clone();
307 let handle_one = spawn(async move {
308 sleep(Duration::new(0, 20_000_000)).await;
309 val_cpy2.fetch_add(1, Ordering::Relaxed);
310 });
311 let handle_two = spawn(async move {
312 sleep(Duration::new(0, 20_000_000)).await;
313 val_cpy3.fetch_add(1, Ordering::Relaxed);
314 });
315 let handle_three = spawn(async move {
316 sleep(Duration::new(0, 20_000_000)).await;
317 val_cpy4.fetch_add(1, Ordering::Relaxed);
318 });
319 block_on(handle_one).unwrap();
320 block_on(handle_two).unwrap();
321 block_on(handle_three).unwrap();
322 assert_eq!(val.load(Ordering::Relaxed), 4);
323 }
324
325 /// UT test cases for sleep zero second or sleep until a past instant
326 ///
327 /// # Brief
328 /// 1. Call sleep with a duration of zero, check if the val is successfully
329 /// added.
330 /// 2. Call sleep with a past instant, check if the val is successfully
331 /// added.
332 #[test]
ut_timer_sleep_zero()333 fn ut_timer_sleep_zero() {
334 let mut val = 0;
335 let past = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
336 let mut val = block_on(async move {
337 sleep(Duration::new(0, 0)).await;
338 val += 1;
339 val
340 });
341 assert_eq!(val, 1);
342
343 let val = block_on(async move {
344 sleep_until(past).await;
345 val += 1;
346 val
347 });
348 assert_eq!(val, 2);
349 }
350 }
351