1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::future::Future;
6
7 use async_task::Task;
8
9 use crate::poll_source::Error as PollError;
10 use crate::uring_executor::use_uring;
11 use crate::{
12 AsyncResult, FdExecutor, IntoAsync, IoSourceExt, PollSource, URingExecutor, UringSource,
13 };
14
async_uring_from<'a, F: IntoAsync + 'a>( f: F, ex: &URingExecutor, ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>>15 pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>(
16 f: F,
17 ex: &URingExecutor,
18 ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>> {
19 Ok(UringSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)?)
20 }
21
22 /// Creates a concrete `IoSourceExt` using the fd_executor.
async_poll_from<'a, F: IntoAsync + 'a>( f: F, ex: &FdExecutor, ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>>23 pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(
24 f: F,
25 ex: &FdExecutor,
26 ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>> {
27 Ok(PollSource::new(f, ex).map(|u| Box::new(u) as Box<dyn IoSourceExt<F>>)?)
28 }
29
30 /// An executor for scheduling tasks that poll futures to completion.
31 ///
32 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
33 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
34 ///
35 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
36 /// create a new reference, not a new executor.
37 ///
38 /// # Examples
39 ///
40 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
41 ///
42 /// ```
43 /// use std::cmp::min;
44 /// use std::error::Error;
45 /// use std::fs::{File, OpenOptions};
46 ///
47 /// use cros_async::{AsyncResult, Executor, IoSourceExt, complete3};
48 /// const CHUNK_SIZE: usize = 32;
49 ///
50 /// // Write all bytes from `data` to `f`.
51 /// async fn write_file(f: &dyn IoSourceExt<File>, mut data: Vec<u8>) -> AsyncResult<()> {
52 /// while data.len() > 0 {
53 /// let (count, mut buf) = f.write_from_vec(0, data).await?;
54 ///
55 /// data = buf.split_off(count);
56 /// }
57 ///
58 /// Ok(())
59 /// }
60 ///
61 /// // Transfer `len` bytes of data from `from` to `to`.
62 /// async fn transfer_data(
63 /// from: Box<dyn IoSourceExt<File>>,
64 /// to: Box<dyn IoSourceExt<File>>,
65 /// len: usize,
66 /// ) -> AsyncResult<usize> {
67 /// let mut rem = len;
68 ///
69 /// while rem > 0 {
70 /// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
71 /// let (count, mut data) = from.read_to_vec(0, buf).await?;
72 ///
73 /// if count == 0 {
74 /// // End of file. Return the number of bytes transferred.
75 /// return Ok(len - rem);
76 /// }
77 ///
78 /// data.truncate(count);
79 /// write_file(&*to, data).await?;
80 ///
81 /// rem = rem.saturating_sub(count);
82 /// }
83 ///
84 /// Ok(len)
85 /// }
86 ///
87 /// # fn do_it() -> Result<(), Box<dyn Error>> {
88 /// let ex = Executor::new()?;
89 ///
90 /// let (rx, tx) = sys_util::pipe(true)?;
91 /// let zero = File::open("/dev/zero")?;
92 /// let zero_bytes = CHUNK_SIZE * 7;
93 /// let zero_to_pipe = transfer_data(
94 /// ex.async_from(zero)?,
95 /// ex.async_from(tx.try_clone()?)?,
96 /// zero_bytes,
97 /// );
98 ///
99 /// let rand = File::open("/dev/urandom")?;
100 /// let rand_bytes = CHUNK_SIZE * 19;
101 /// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
102 ///
103 /// let null = OpenOptions::new().write(true).open("/dev/null")?;
104 /// let null_bytes = zero_bytes + rand_bytes;
105 /// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
106 ///
107 /// ex.run_until(complete3(
108 /// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
109 /// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
110 /// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
111 /// ))?;
112 ///
113 /// # Ok(())
114 /// # }
115 ///
116 /// # do_it().unwrap();
117 /// ```
118
119 #[derive(Clone)]
120 pub enum Executor {
121 Uring(URingExecutor),
122 Fd(FdExecutor),
123 }
124
125 impl Executor {
126 /// Create a new `Executor`.
new() -> AsyncResult<Self>127 pub fn new() -> AsyncResult<Self> {
128 if use_uring() {
129 Ok(URingExecutor::new().map(Executor::Uring)?)
130 } else {
131 Ok(FdExecutor::new()
132 .map(Executor::Fd)
133 .map_err(PollError::Executor)?)
134 }
135 }
136
137 /// Create a new `Box<dyn IoSourceExt<F>>` associated with `self`. Callers may then use the
138 /// returned `IoSourceExt` to directly start async operations without needing a separate
139 /// reference to the executor.
async_from<'a, F: IntoAsync + 'a>( &self, f: F, ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>>140 pub fn async_from<'a, F: IntoAsync + 'a>(
141 &self,
142 f: F,
143 ) -> AsyncResult<Box<dyn IoSourceExt<F> + 'a>> {
144 match self {
145 Executor::Uring(ex) => async_uring_from(f, ex),
146 Executor::Fd(ex) => async_poll_from(f, ex),
147 }
148 }
149
150 /// Spawn a new future for this executor to run to completion. Callers may use the returned
151 /// `Task` to await on the result of `f`. Dropping the returned `Task` will cancel `f`,
152 /// preventing it from being polled again. To drop a `Task` without canceling the future
153 /// associated with it use `Task::detach`. To cancel a task gracefully and wait until it is
154 /// fully destroyed, use `Task::cancel`.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// # use cros_async::AsyncResult;
160 /// # fn example_spawn() -> AsyncResult<()> {
161 /// # use std::thread;
162 ///
163 /// # use cros_async::Executor;
164 /// use futures::executor::block_on;
165 ///
166 /// # let ex = Executor::new()?;
167 ///
168 /// # // Spawn a thread that runs the executor.
169 /// # let ex2 = ex.clone();
170 /// # thread::spawn(move || ex2.run());
171 ///
172 /// let task = ex.spawn(async { 7 + 13 });
173 ///
174 /// let result = block_on(task);
175 /// assert_eq!(result, 20);
176 /// # Ok(())
177 /// # }
178 ///
179 /// # example_spawn().unwrap();
180 /// ```
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,181 pub fn spawn<F>(&self, f: F) -> Task<F::Output>
182 where
183 F: Future + Send + 'static,
184 F::Output: Send + 'static,
185 {
186 match self {
187 Executor::Uring(ex) => ex.spawn(f),
188 Executor::Fd(ex) => ex.spawn(f),
189 }
190 }
191
192 /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
193 /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
194 /// thread where `run()` or `run_until()` is called.
195 ///
196 /// # Panics
197 ///
198 /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
199 /// added by calling `spawn_local` from a different thread.
200 ///
201 /// # Examples
202 ///
203 /// ```
204 /// # use cros_async::AsyncResult;
205 /// # fn example_spawn_local() -> AsyncResult<()> {
206 /// # use cros_async::Executor;
207 ///
208 /// # let ex = Executor::new()?;
209 ///
210 /// let task = ex.spawn_local(async { 7 + 13 });
211 ///
212 /// let result = ex.run_until(task)?;
213 /// assert_eq!(result, 20);
214 /// # Ok(())
215 /// # }
216 ///
217 /// # example_spawn_local().unwrap();
218 /// ```
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,219 pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
220 where
221 F: Future + 'static,
222 F::Output: 'static,
223 {
224 match self {
225 Executor::Uring(ex) => ex.spawn_local(f),
226 Executor::Fd(ex) => ex.spawn_local(f),
227 }
228 }
229
230 /// Run the executor indefinitely, driving all spawned futures to completion. This method will
231 /// block the current thread and only return in the case of an error.
232 ///
233 /// # Panics
234 ///
235 /// Once this method has been called on a thread, it may only be called on that thread from that
236 /// point on. Attempting to call it from another thread will panic.
237 ///
238 /// # Examples
239 ///
240 /// ```
241 /// # use cros_async::AsyncResult;
242 /// # fn example_run() -> AsyncResult<()> {
243 /// use std::thread;
244 ///
245 /// use cros_async::Executor;
246 /// use futures::executor::block_on;
247 ///
248 /// let ex = Executor::new()?;
249 ///
250 /// // Spawn a thread that runs the executor.
251 /// let ex2 = ex.clone();
252 /// thread::spawn(move || ex2.run());
253 ///
254 /// let task = ex.spawn(async { 7 + 13 });
255 ///
256 /// let result = block_on(task);
257 /// assert_eq!(result, 20);
258 /// # Ok(())
259 /// # }
260 ///
261 /// # example_run().unwrap();
262 /// ```
run(&self) -> AsyncResult<()>263 pub fn run(&self) -> AsyncResult<()> {
264 match self {
265 Executor::Uring(ex) => ex.run()?,
266 Executor::Fd(ex) => ex.run().map_err(PollError::Executor)?,
267 }
268
269 Ok(())
270 }
271
272 /// Drive all futures spawned in this executor until `f` completes. This method will block the
273 /// current thread only until `f` is complete and there may still be unfinished futures in the
274 /// executor.
275 ///
276 /// # Panics
277 ///
278 /// Once this method has been called on a thread, from then onwards it may only be called on
279 /// that thread. Attempting to call it from another thread will panic.
280 ///
281 /// # Examples
282 ///
283 /// ```
284 /// # use cros_async::AsyncResult;
285 /// # fn example_run_until() -> AsyncResult<()> {
286 /// use cros_async::Executor;
287 ///
288 /// let ex = Executor::new()?;
289 ///
290 /// let task = ex.spawn_local(async { 7 + 13 });
291 ///
292 /// let result = ex.run_until(task)?;
293 /// assert_eq!(result, 20);
294 /// # Ok(())
295 /// # }
296 ///
297 /// # example_run_until().unwrap();
298 /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>299 pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
300 match self {
301 Executor::Uring(ex) => Ok(ex.run_until(f)?),
302 Executor::Fd(ex) => Ok(ex.run_until(f).map_err(PollError::Executor)?),
303 }
304 }
305 }
306