1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::{
6 future::Future,
7 io,
8 pin::Pin,
9 task::{Context, Poll},
10 time::Instant,
11 };
12
13 use futures::{
14 future::{select, Either},
15 pin_mut,
16 };
17 use thiserror::Error as ThisError;
18
19 use crate::executor;
20
21 /// A timer that expires at a specific time.
22 #[derive(Debug, Clone)]
23 pub struct Timer {
24 deadline: Instant,
25 }
26
27 impl Timer {
28 /// Start a timer that will expire at `deadline`.
29 ///
30 /// This function only guarantees that the timer will not expire before `deadline`. The actual
31 /// elapsed time may be much longer depending on various factors such as the current load in the
32 /// application as well as the OS scheduler.
33 ///
34 /// If `deadline` is in the future then any tasks await-ing on this `Timer` will only be
35 /// notified if it is created on a thread that is currently running or will run the
36 /// `Executor::run` or `Executor::run_until` methods.
37 ///
38 /// The returned `Timer` can be cheaply cloned and all clones will share the same deadline.
39 ///
40 /// # Examples
41 ///
42 /// Put the current task to sleep for 10 milliseconds.
43 ///
44 /// ```
45 /// # use std::time::{Duration, Instant};
46 /// # use cros_async::{Timer, Executor};
47 /// #
48 /// # async fn sleep() {
49 /// Timer::new(Instant::now() + Duration::from_millis(10)).await;
50 /// # }
51 /// #
52 /// # let ex = Executor::new();
53 /// # let start = Instant::now();
54 /// # ex.run_until(sleep()).unwrap();
55 /// # assert!(start.elapsed() >= Duration::from_millis(10));
56 /// ```
new(deadline: Instant) -> Timer57 pub fn new(deadline: Instant) -> Timer {
58 Timer { deadline }
59 }
60
61 /// Returns the time at which this `Timer` expires.
deadline(&self) -> Instant62 pub fn deadline(&self) -> Instant {
63 self.deadline
64 }
65 }
66
67 impl Future for Timer {
68 type Output = ();
69
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71 if self.deadline <= Instant::now() {
72 Poll::Ready(())
73 } else {
74 executor::add_timer(self.deadline, cx.waker());
75
76 Poll::Pending
77 }
78 }
79 }
80
81 /// The error returned from `with_deadline` when the deadline expires before the future completes.
82 ///
83 /// # Examples
84 ///
85 /// Convert the `TimedOut` error into an `io::Error`.
86 ///
87 /// ```
88 /// # use std::{
89 /// # future::{pending, Future},
90 /// # io,
91 /// # time::Instant
92 /// # };
93 /// use cros_async::with_deadline;
94 ///
95 /// async fn deadline_with_io_error<F: Future>(deadline: Instant, f: F) -> io::Result<F::Output> {
96 /// with_deadline(deadline, f)
97 /// .await
98 /// .map_err(io::Error::from)
99 /// }
100 /// # let err = cros_async::Executor::new()
101 /// # .run_until(deadline_with_io_error(Instant::now(), pending::<()>()))
102 /// # .unwrap()
103 /// # .unwrap_err();
104 /// # assert_eq!(err.kind(), io::ErrorKind::TimedOut);
105 /// ```
106 #[derive(Debug, ThisError)]
107 #[error("Operation timed out")]
108 pub struct TimedOut;
109
110 impl From<TimedOut> for io::Error {
from(_: TimedOut) -> Self111 fn from(_: TimedOut) -> Self {
112 io::Error::from(io::ErrorKind::TimedOut)
113 }
114 }
115
116 /// Add a deadline to an asynchronous operation.
117 ///
118 /// Returns the output of the asynchronous operation if it completes before the deadline
119 /// expires. Otherwise returns a `TimedOut` error.
120 ///
121 /// If the deadline expires before the asynchronous operation completes then `f` is dropped.
122 /// However, this may not cancel any underlying asynchronous I/O operations registered with the OS.
123 ///
124 /// # Examples
125 ///
126 /// Set a timeout for reading from a data source.
127 ///
128 /// ```
129 /// use std::time::{Duration, Instant};
130 ///
131 /// use cros_async::{with_deadline, File};
132 ///
133 /// async fn read_with_timeout(
134 /// rx: &File,
135 /// buf: &mut [u8],
136 /// timeout: Duration,
137 /// ) -> anyhow::Result<usize> {
138 /// with_deadline(Instant::now() + timeout, rx.read(buf, None)).await?
139 /// }
140 /// #
141 /// # use std::io;
142 /// # use cros_async::{Executor, TimedOut};
143 /// #
144 /// # let ex = Executor::new();
145 /// # let (rx, _tx) = sys_util::pipe(true).unwrap();
146 /// # let rx = cros_async::File::from_std(rx).unwrap();
147 /// # let mut buf = 0u64.to_ne_bytes();
148 /// #
149 /// # let _err = ex
150 /// # .run_until(read_with_timeout(&rx, &mut buf, Duration::from_millis(10)))
151 /// # .unwrap()
152 /// # .unwrap_err()
153 /// # .downcast::<TimedOut>()
154 /// # .unwrap();
155 /// ```
with_deadline<F: Future>(deadline: Instant, f: F) -> Result<F::Output, TimedOut>156 pub async fn with_deadline<F: Future>(deadline: Instant, f: F) -> Result<F::Output, TimedOut> {
157 let timer = Timer::new(deadline);
158 pin_mut!(timer, f);
159 match select(timer, f).await {
160 Either::Left(((), _)) => Err(TimedOut),
161 Either::Right((v, _)) => Ok(v),
162 }
163 }
164
165 #[cfg(test)]
166 mod test {
167 use super::*;
168
169 use std::{mem, sync::Arc, task, thread, time::Duration};
170
171 use futures::{future::join5, stream::FuturesUnordered, StreamExt};
172 use sync::Mutex;
173
174 use crate::Executor;
175
176 #[test]
basic()177 fn basic() {
178 let ex = Executor::new();
179
180 let dur = Duration::from_millis(5);
181 let start = Instant::now();
182 let sleep = Timer::new(start + dur);
183
184 ex.run_until(sleep).unwrap();
185
186 assert!(start.elapsed() >= dur);
187 }
188
189 #[test]
multiple()190 fn multiple() {
191 let ex = Executor::new();
192
193 let start = Instant::now();
194 let t1 = Timer::new(start + Duration::from_millis(10));
195 let t2 = Timer::new(start + Duration::from_secs(10));
196
197 match ex.run_until(select(t1, t2)).unwrap() {
198 Either::Left(_) => {
199 let elapsed = start.elapsed();
200 assert!(elapsed >= Duration::from_millis(10));
201 assert!(elapsed < Duration::from_secs(10));
202 }
203 Either::Right(_) => panic!("Longer deadline finished first"),
204 }
205 }
206
207 #[test]
run_until_identical_deadline()208 fn run_until_identical_deadline() {
209 let ex = Executor::new();
210
211 let start = Instant::now();
212 let deadline = start + Duration::from_millis(10);
213 let t1 = Timer::new(deadline);
214 let t2 = Timer::new(deadline);
215 let t3 = Timer::new(deadline);
216 let t4 = Timer::new(deadline);
217 let t5 = Timer::new(deadline);
218
219 ex.run_until(join5(t1, t2, t3, t4, t5)).unwrap();
220 assert!(deadline <= Instant::now());
221 }
222
223 #[test]
spawn_identical_deadline()224 fn spawn_identical_deadline() {
225 let ex = Executor::new();
226
227 let start = Instant::now();
228 let deadline = start + Duration::from_millis(10);
229 let t1 = ex.spawn(Timer::new(deadline));
230 let t2 = ex.spawn(Timer::new(deadline));
231 let t3 = ex.spawn(Timer::new(deadline));
232 let t4 = ex.spawn(Timer::new(deadline));
233 let t5 = ex.spawn(Timer::new(deadline));
234
235 ex.run_until(join5(t1, t2, t3, t4, t5)).unwrap();
236 assert!(deadline <= Instant::now());
237 }
238
239 #[derive(Default)]
240 struct QuitShared {
241 wakers: Vec<task::Waker>,
242 should_quit: bool,
243 }
244
245 #[derive(Clone, Default)]
246 struct Quit {
247 shared: Arc<Mutex<QuitShared>>,
248 }
249
250 impl Quit {
quit(self)251 fn quit(self) {
252 let wakers = {
253 let mut shared = self.shared.lock();
254 shared.should_quit = true;
255 mem::take(&mut shared.wakers)
256 };
257
258 for w in wakers {
259 w.wake();
260 }
261 }
262 }
263
264 impl Future for Quit {
265 type Output = ();
266
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>267 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
268 let mut shared = self.shared.lock();
269 if shared.should_quit {
270 return Poll::Ready(());
271 }
272
273 if shared.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
274 shared.wakers.push(cx.waker().clone());
275 }
276
277 Poll::Pending
278 }
279 }
280
281 #[test]
multiple_threads()282 fn multiple_threads() {
283 const NUM_THREADS: usize = 7;
284 const NUM_TIMERS: usize = 19;
285
286 let ex = Executor::new();
287 let quit = Quit::default();
288 let mut threads = Vec::with_capacity(NUM_THREADS);
289 for _ in 0..NUM_THREADS {
290 let thread_ex = ex.clone();
291 let thread_quit = quit.clone();
292 threads.push(thread::spawn(move || thread_ex.run_until(thread_quit)));
293 }
294
295 let start = Instant::now();
296 let timers = FuturesUnordered::new();
297 let deadline = start + Duration::from_millis(10);
298 for _ in 0..NUM_TIMERS {
299 timers.push(ex.spawn(Timer::new(deadline)));
300 }
301
302 ex.run_until(timers.collect::<Vec<()>>()).unwrap();
303 quit.quit();
304
305 for t in threads {
306 t.join().unwrap().unwrap();
307 }
308 }
309 }
310