• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 
7 use async_task::Task;
8 
9 use super::{
10     poll_source::Error as PollError, uring_executor::use_uring, AsyncResult, FdExecutor, IntoAsync,
11     IoSourceExt, PollSource, URingExecutor, UringSource,
12 };
13 
async_uring_from<'a, F: IntoAsync + Send + 'a>( f: F, ex: &URingExecutor, ) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>>14 pub(crate) fn async_uring_from<'a, F: IntoAsync + Send + 'a>(
15     f: F,
16     ex: &URingExecutor,
17 ) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
18     Ok(UringSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F> + Send>)?)
19 }
20 
21 /// Creates a concrete `IoSourceExt` using the fd_executor.
async_poll_from<'a, F: IntoAsync + Send + 'a>( f: F, ex: &FdExecutor, ) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>>22 pub(crate) fn async_poll_from<'a, F: IntoAsync + Send + 'a>(
23     f: F,
24     ex: &FdExecutor,
25 ) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
26     Ok(PollSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F> + Send>)?)
27 }
28 
29 /// An executor for scheduling tasks that poll futures to completion.
30 ///
31 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
32 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
33 ///
34 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
35 /// create a new reference, not a new executor.
36 ///
37 /// # Examples
38 ///
39 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
40 ///
41 /// ```
42 /// use std::cmp::min;
43 /// use std::error::Error;
44 /// use std::fs::{File, OpenOptions};
45 ///
46 /// use cros_async::{AsyncResult, Executor, IoSourceExt, complete3};
47 /// const CHUNK_SIZE: usize = 32;
48 ///
49 /// // Write all bytes from `data` to `f`.
50 /// async fn write_file(f: &dyn IoSourceExt<File>, mut data: Vec<u8>) -> AsyncResult<()> {
51 ///     while data.len() > 0 {
52 ///         let (count, mut buf) = f.write_from_vec(None, data).await?;
53 ///
54 ///         data = buf.split_off(count);
55 ///     }
56 ///
57 ///     Ok(())
58 /// }
59 ///
60 /// // Transfer `len` bytes of data from `from` to `to`.
61 /// async fn transfer_data(
62 ///     from: Box<dyn IoSourceExt<File>>,
63 ///     to: Box<dyn IoSourceExt<File>>,
64 ///     len: usize,
65 /// ) -> AsyncResult<usize> {
66 ///     let mut rem = len;
67 ///
68 ///     while rem > 0 {
69 ///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
70 ///         let (count, mut data) = from.read_to_vec(None, buf).await?;
71 ///
72 ///         if count == 0 {
73 ///             // End of file. Return the number of bytes transferred.
74 ///             return Ok(len - rem);
75 ///         }
76 ///
77 ///         data.truncate(count);
78 ///         write_file(&*to, data).await?;
79 ///
80 ///         rem = rem.saturating_sub(count);
81 ///     }
82 ///
83 ///     Ok(len)
84 /// }
85 ///
86 /// # fn do_it() -> Result<(), Box<dyn Error>> {
87 ///     let ex = Executor::new()?;
88 ///
89 ///     let (rx, tx) = base::pipe(true)?;
90 ///     let zero = File::open("/dev/zero")?;
91 ///     let zero_bytes = CHUNK_SIZE * 7;
92 ///     let zero_to_pipe = transfer_data(
93 ///         ex.async_from(zero)?,
94 ///         ex.async_from(tx.try_clone()?)?,
95 ///         zero_bytes,
96 ///     );
97 ///
98 ///     let rand = File::open("/dev/urandom")?;
99 ///     let rand_bytes = CHUNK_SIZE * 19;
100 ///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
101 ///
102 ///     let null = OpenOptions::new().write(true).open("/dev/null")?;
103 ///     let null_bytes = zero_bytes + rand_bytes;
104 ///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
105 ///
106 ///     ex.run_until(complete3(
107 ///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
108 ///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
109 ///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
110 ///     ))?;
111 ///
112 /// #     Ok(())
113 /// # }
114 ///
115 /// # do_it().unwrap();
116 /// ```
117 
118 #[derive(Clone)]
119 pub enum Executor {
120     Uring(URingExecutor),
121     Fd(FdExecutor),
122 }
123 
124 impl Executor {
125     /// Create a new `Executor`.
new() -> AsyncResult<Self>126     pub fn new() -> AsyncResult<Self> {
127         if use_uring() {
128             Ok(URingExecutor::new().map(Executor::Uring)?)
129         } else {
130             Ok(FdExecutor::new()
131                 .map(Executor::Fd)
132                 .map_err(PollError::Executor)?)
133         }
134     }
135 
136     /// Create a new `Box<dyn IoSourceExt<F>>` associated with `self`. Callers may then use the
137     /// returned `IoSourceExt` to directly start async operations without needing a separate
138     /// reference to the executor.
async_from<'a, F: IntoAsync + Send + 'a>( &self, f: F, ) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>>139     pub fn async_from<'a, F: IntoAsync + Send + 'a>(
140         &self,
141         f: F,
142     ) -> AsyncResult<Box<dyn IoSourceExt<F> + Send + 'a>> {
143         match self {
144             Executor::Uring(ex) => async_uring_from(f, ex),
145             Executor::Fd(ex) => async_poll_from(f, ex),
146         }
147     }
148 
149     /// Spawn a new future for this executor to run to completion. Callers may use the returned
150     /// `Task` to await on the result of `f`. Dropping the returned `Task` will cancel `f`,
151     /// preventing it from being polled again. To drop a `Task` without canceling the future
152     /// associated with it use `Task::detach`. To cancel a task gracefully and wait until it is
153     /// fully destroyed, use `Task::cancel`.
154     ///
155     /// # Examples
156     ///
157     /// ```
158     /// # use cros_async::AsyncResult;
159     /// # fn example_spawn() -> AsyncResult<()> {
160     /// #      use std::thread;
161     ///
162     /// #      use cros_async::Executor;
163     ///       use futures::executor::block_on;
164     ///
165     /// #      let ex = Executor::new()?;
166     ///
167     /// #      // Spawn a thread that runs the executor.
168     /// #      let ex2 = ex.clone();
169     /// #      thread::spawn(move || ex2.run());
170     ///
171     ///       let task = ex.spawn(async { 7 + 13 });
172     ///
173     ///       let result = block_on(task);
174     ///       assert_eq!(result, 20);
175     /// #     Ok(())
176     /// # }
177     ///
178     /// # example_spawn().unwrap();
179     /// ```
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,180     pub fn spawn<F>(&self, f: F) -> Task<F::Output>
181     where
182         F: Future + Send + 'static,
183         F::Output: Send + 'static,
184     {
185         match self {
186             Executor::Uring(ex) => ex.spawn(f),
187             Executor::Fd(ex) => ex.spawn(f),
188         }
189     }
190 
191     /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
192     /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
193     /// thread where `run()` or `run_until()` is called.
194     ///
195     /// # Panics
196     ///
197     /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
198     /// added by calling `spawn_local` from a different thread.
199     ///
200     /// # Examples
201     ///
202     /// ```
203     /// # use cros_async::AsyncResult;
204     /// # fn example_spawn_local() -> AsyncResult<()> {
205     /// #      use cros_async::Executor;
206     ///
207     /// #      let ex = Executor::new()?;
208     ///
209     ///       let task = ex.spawn_local(async { 7 + 13 });
210     ///
211     ///       let result = ex.run_until(task)?;
212     ///       assert_eq!(result, 20);
213     /// #     Ok(())
214     /// # }
215     ///
216     /// # example_spawn_local().unwrap();
217     /// ```
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,218     pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
219     where
220         F: Future + 'static,
221         F::Output: 'static,
222     {
223         match self {
224             Executor::Uring(ex) => ex.spawn_local(f),
225             Executor::Fd(ex) => ex.spawn_local(f),
226         }
227     }
228 
229     /// Run the provided closure on a dedicated thread where blocking is allowed.
230     ///
231     /// Callers may `await` on the returned `Task` to wait for the result of `f`. Dropping or
232     /// canceling the returned `Task` may not cancel the operation if it was already started on a
233     /// worker thread.
234     ///
235     /// # Panics
236     ///
237     /// `await`ing the `Task` after the `Executor` is dropped will panic if the work was not already
238     /// completed.
239     ///
240     /// # Examples
241     ///
242     /// ```edition2018
243     /// # use cros_async::Executor;
244     ///
245     /// # async fn do_it(ex: &Executor) {
246     ///     let res = ex.spawn_blocking(move || {
247     ///         // Do some CPU-intensive or blocking work here.
248     ///
249     ///         42
250     ///     }).await;
251     ///
252     ///     assert_eq!(res, 42);
253     /// # }
254     ///
255     /// # let ex = Executor::new().unwrap();
256     /// # ex.run_until(do_it(&ex)).unwrap();
257     /// ```
spawn_blocking<F, R>(&self, f: F) -> Task<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,258     pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
259     where
260         F: FnOnce() -> R + Send + 'static,
261         R: Send + 'static,
262     {
263         match self {
264             Executor::Uring(ex) => ex.spawn_blocking(f),
265             Executor::Fd(ex) => ex.spawn_blocking(f),
266         }
267     }
268 
269     /// Run the executor indefinitely, driving all spawned futures to completion. This method will
270     /// block the current thread and only return in the case of an error.
271     ///
272     /// # Panics
273     ///
274     /// Once this method has been called on a thread, it may only be called on that thread from that
275     /// point on. Attempting to call it from another thread will panic.
276     ///
277     /// # Examples
278     ///
279     /// ```
280     /// # use cros_async::AsyncResult;
281     /// # fn example_run() -> AsyncResult<()> {
282     ///       use std::thread;
283     ///
284     ///       use cros_async::Executor;
285     ///       use futures::executor::block_on;
286     ///
287     ///       let ex = Executor::new()?;
288     ///
289     ///       // Spawn a thread that runs the executor.
290     ///       let ex2 = ex.clone();
291     ///       thread::spawn(move || ex2.run());
292     ///
293     ///       let task = ex.spawn(async { 7 + 13 });
294     ///
295     ///       let result = block_on(task);
296     ///       assert_eq!(result, 20);
297     /// #     Ok(())
298     /// # }
299     ///
300     /// # example_run().unwrap();
301     /// ```
run(&self) -> AsyncResult<()>302     pub fn run(&self) -> AsyncResult<()> {
303         match self {
304             Executor::Uring(ex) => ex.run()?,
305             Executor::Fd(ex) => ex.run().map_err(PollError::Executor)?,
306         }
307 
308         Ok(())
309     }
310 
311     /// Drive all futures spawned in this executor until `f` completes. This method will block the
312     /// current thread only until `f` is complete and there may still be unfinished futures in the
313     /// executor.
314     ///
315     /// # Panics
316     ///
317     /// Once this method has been called on a thread, from then onwards it may only be called on
318     /// that thread. Attempting to call it from another thread will panic.
319     ///
320     /// # Examples
321     ///
322     /// ```
323     /// # use cros_async::AsyncResult;
324     /// # fn example_run_until() -> AsyncResult<()> {
325     ///       use cros_async::Executor;
326     ///
327     ///       let ex = Executor::new()?;
328     ///
329     ///       let task = ex.spawn_local(async { 7 + 13 });
330     ///
331     ///       let result = ex.run_until(task)?;
332     ///       assert_eq!(result, 20);
333     /// #     Ok(())
334     /// # }
335     ///
336     /// # example_run_until().unwrap();
337     /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>338     pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
339         match self {
340             Executor::Uring(ex) => Ok(ex.run_until(f)?),
341             Executor::Fd(ex) => Ok(ex.run_until(f).map_err(PollError::Executor)?),
342         }
343     }
344 }
345