• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::{
6     future::Future,
7     io,
8     pin::Pin,
9     task::{Context, Poll},
10     time::Instant,
11 };
12 
13 use futures::{
14     future::{select, Either},
15     pin_mut,
16 };
17 use thiserror::Error as ThisError;
18 
19 use crate::executor;
20 
21 /// A timer that expires at a specific time.
22 #[derive(Debug, Clone)]
23 pub struct Timer {
24     deadline: Instant,
25 }
26 
27 impl Timer {
28     /// Start a timer that will expire at `deadline`.
29     ///
30     /// This function only guarantees that the timer will not expire before `deadline`. The actual
31     /// elapsed time may be much longer depending on various factors such as the current load in the
32     /// application as well as the OS scheduler.
33     ///
34     /// If `deadline` is in the future then any tasks await-ing on this `Timer` will only be
35     /// notified if it is created on a thread that is currently running or will run the
36     /// `Executor::run` or `Executor::run_until` methods.
37     ///
38     /// The returned `Timer` can be cheaply cloned and all clones will share the same deadline.
39     ///
40     /// # Examples
41     ///
42     /// Put the current task to sleep for 10 milliseconds.
43     ///
44     /// ```
45     /// # use std::time::{Duration, Instant};
46     /// # use cros_async::{Timer, Executor};
47     /// #
48     /// # async fn sleep() {
49     ///     Timer::new(Instant::now() + Duration::from_millis(10)).await;
50     /// # }
51     /// #
52     /// # let ex = Executor::new();
53     /// # let start = Instant::now();
54     /// # ex.run_until(sleep()).unwrap();
55     /// # assert!(start.elapsed() >= Duration::from_millis(10));
56     /// ```
new(deadline: Instant) -> Timer57     pub fn new(deadline: Instant) -> Timer {
58         Timer { deadline }
59     }
60 
61     /// Returns the time at which this `Timer` expires.
deadline(&self) -> Instant62     pub fn deadline(&self) -> Instant {
63         self.deadline
64     }
65 }
66 
67 impl Future for Timer {
68     type Output = ();
69 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>70     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71         if self.deadline <= Instant::now() {
72             Poll::Ready(())
73         } else {
74             executor::add_timer(self.deadline, cx.waker());
75 
76             Poll::Pending
77         }
78     }
79 }
80 
81 /// The error returned from `with_deadline` when the deadline expires before the future completes.
82 ///
83 /// # Examples
84 ///
85 /// Convert the `TimedOut` error into an `io::Error`.
86 ///
87 /// ```
88 /// # use std::{
89 /// #     future::{pending, Future},
90 /// #     io,
91 /// #     time::Instant
92 /// # };
93 /// use cros_async::with_deadline;
94 ///
95 /// async fn deadline_with_io_error<F: Future>(deadline: Instant, f: F) -> io::Result<F::Output> {
96 ///     with_deadline(deadline, f)
97 ///         .await
98 ///         .map_err(io::Error::from)
99 /// }
100 /// # let err = cros_async::Executor::new()
101 /// #     .run_until(deadline_with_io_error(Instant::now(), pending::<()>()))
102 /// #     .unwrap()
103 /// #     .unwrap_err();
104 /// # assert_eq!(err.kind(), io::ErrorKind::TimedOut);
105 /// ```
106 #[derive(Debug, ThisError)]
107 #[error("Operation timed out")]
108 pub struct TimedOut;
109 
110 impl From<TimedOut> for io::Error {
from(_: TimedOut) -> Self111     fn from(_: TimedOut) -> Self {
112         io::Error::from(io::ErrorKind::TimedOut)
113     }
114 }
115 
116 /// Add a deadline to an asynchronous operation.
117 ///
118 /// Returns the output of the asynchronous operation if it completes before the deadline
119 /// expires. Otherwise returns a `TimedOut` error.
120 ///
121 /// If the deadline expires before the asynchronous operation completes then `f` is dropped.
122 /// However, this may not cancel any underlying asynchronous I/O operations registered with the OS.
123 ///
124 /// # Examples
125 ///
126 /// Set a timeout for reading from a data source.
127 ///
128 /// ```
129 /// use std::time::{Duration, Instant};
130 ///
131 /// use cros_async::{with_deadline, File};
132 ///
133 /// async fn read_with_timeout(
134 ///     rx: &File,
135 ///     buf: &mut [u8],
136 ///     timeout: Duration,
137 /// ) -> anyhow::Result<usize> {
138 ///     with_deadline(Instant::now() + timeout, rx.read(buf, None)).await?
139 /// }
140 /// #
141 /// # use std::io;
142 /// # use cros_async::{Executor, TimedOut};
143 /// #
144 /// # let ex = Executor::new();
145 /// # let (rx, _tx) = sys_util::pipe(true).unwrap();
146 /// # let rx = cros_async::File::from_std(rx).unwrap();
147 /// # let mut buf = 0u64.to_ne_bytes();
148 /// #
149 /// # let _err = ex
150 /// #     .run_until(read_with_timeout(&rx, &mut buf, Duration::from_millis(10)))
151 /// #     .unwrap()
152 /// #     .unwrap_err()
153 /// #     .downcast::<TimedOut>()
154 /// #     .unwrap();
155 /// ```
with_deadline<F: Future>(deadline: Instant, f: F) -> Result<F::Output, TimedOut>156 pub async fn with_deadline<F: Future>(deadline: Instant, f: F) -> Result<F::Output, TimedOut> {
157     let timer = Timer::new(deadline);
158     pin_mut!(timer, f);
159     match select(timer, f).await {
160         Either::Left(((), _)) => Err(TimedOut),
161         Either::Right((v, _)) => Ok(v),
162     }
163 }
164 
165 #[cfg(test)]
166 mod test {
167     use super::*;
168 
169     use std::{mem, sync::Arc, task, thread, time::Duration};
170 
171     use futures::{future::join5, stream::FuturesUnordered, StreamExt};
172     use sync::Mutex;
173 
174     use crate::Executor;
175 
176     #[test]
basic()177     fn basic() {
178         let ex = Executor::new();
179 
180         let dur = Duration::from_millis(5);
181         let start = Instant::now();
182         let sleep = Timer::new(start + dur);
183 
184         ex.run_until(sleep).unwrap();
185 
186         assert!(start.elapsed() >= dur);
187     }
188 
189     #[test]
multiple()190     fn multiple() {
191         let ex = Executor::new();
192 
193         let start = Instant::now();
194         let t1 = Timer::new(start + Duration::from_millis(10));
195         let t2 = Timer::new(start + Duration::from_secs(10));
196 
197         match ex.run_until(select(t1, t2)).unwrap() {
198             Either::Left(_) => {
199                 let elapsed = start.elapsed();
200                 assert!(elapsed >= Duration::from_millis(10));
201                 assert!(elapsed < Duration::from_secs(10));
202             }
203             Either::Right(_) => panic!("Longer deadline finished first"),
204         }
205     }
206 
207     #[test]
run_until_identical_deadline()208     fn run_until_identical_deadline() {
209         let ex = Executor::new();
210 
211         let start = Instant::now();
212         let deadline = start + Duration::from_millis(10);
213         let t1 = Timer::new(deadline);
214         let t2 = Timer::new(deadline);
215         let t3 = Timer::new(deadline);
216         let t4 = Timer::new(deadline);
217         let t5 = Timer::new(deadline);
218 
219         ex.run_until(join5(t1, t2, t3, t4, t5)).unwrap();
220         assert!(deadline <= Instant::now());
221     }
222 
223     #[test]
spawn_identical_deadline()224     fn spawn_identical_deadline() {
225         let ex = Executor::new();
226 
227         let start = Instant::now();
228         let deadline = start + Duration::from_millis(10);
229         let t1 = ex.spawn(Timer::new(deadline));
230         let t2 = ex.spawn(Timer::new(deadline));
231         let t3 = ex.spawn(Timer::new(deadline));
232         let t4 = ex.spawn(Timer::new(deadline));
233         let t5 = ex.spawn(Timer::new(deadline));
234 
235         ex.run_until(join5(t1, t2, t3, t4, t5)).unwrap();
236         assert!(deadline <= Instant::now());
237     }
238 
239     #[derive(Default)]
240     struct QuitShared {
241         wakers: Vec<task::Waker>,
242         should_quit: bool,
243     }
244 
245     #[derive(Clone, Default)]
246     struct Quit {
247         shared: Arc<Mutex<QuitShared>>,
248     }
249 
250     impl Quit {
quit(self)251         fn quit(self) {
252             let wakers = {
253                 let mut shared = self.shared.lock();
254                 shared.should_quit = true;
255                 mem::take(&mut shared.wakers)
256             };
257 
258             for w in wakers {
259                 w.wake();
260             }
261         }
262     }
263 
264     impl Future for Quit {
265         type Output = ();
266 
poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>267         fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
268             let mut shared = self.shared.lock();
269             if shared.should_quit {
270                 return Poll::Ready(());
271             }
272 
273             if shared.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
274                 shared.wakers.push(cx.waker().clone());
275             }
276 
277             Poll::Pending
278         }
279     }
280 
281     #[test]
multiple_threads()282     fn multiple_threads() {
283         const NUM_THREADS: usize = 7;
284         const NUM_TIMERS: usize = 19;
285 
286         let ex = Executor::new();
287         let quit = Quit::default();
288         let mut threads = Vec::with_capacity(NUM_THREADS);
289         for _ in 0..NUM_THREADS {
290             let thread_ex = ex.clone();
291             let thread_quit = quit.clone();
292             threads.push(thread::spawn(move || thread_ex.run_until(thread_quit)));
293         }
294 
295         let start = Instant::now();
296         let timers = FuturesUnordered::new();
297         let deadline = start + Duration::from_millis(10);
298         for _ in 0..NUM_TIMERS {
299             timers.push(ex.spawn(Timer::new(deadline)));
300         }
301 
302         ex.run_until(timers.collect::<Vec<()>>()).unwrap();
303         quit.quit();
304 
305         for t in threads {
306             t.join().unwrap().unwrap();
307         }
308     }
309 }
310