1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::future::Future;
6 use std::pin::Pin;
7
8 use base::debug;
9 use base::warn;
10 use base::AsRawDescriptors;
11 use base::RawDescriptor;
12 use once_cell::sync::OnceCell;
13 use serde::Deserialize;
14 use serde::Serialize;
15 use thiserror::Error as ThisError;
16
17 use super::poll_source::Error as PollError;
18 use super::uring_executor::check_uring_availability;
19 use super::uring_executor::is_uring_stable;
20 use super::uring_executor::Error as UringError;
21 use super::FdExecutor;
22 use super::PollSource;
23 use super::URingExecutor;
24 use super::UringSource;
25 use crate::AsyncResult;
26 use crate::IntoAsync;
27 use crate::IoSource;
28
async_uring_from<'a, F: IntoAsync + 'a>( f: F, ex: &URingExecutor, ) -> AsyncResult<IoSource<F>>29 pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>(
30 f: F,
31 ex: &URingExecutor,
32 ) -> AsyncResult<IoSource<F>> {
33 Ok(IoSource::Uring(UringSource::new(f, ex)?))
34 }
35
36 /// Creates an `IoSource` using the fd_executor.
async_poll_from<'a, F: IntoAsync + 'a>( f: F, ex: &FdExecutor, ) -> AsyncResult<IoSource<F>>37 pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(
38 f: F,
39 ex: &FdExecutor,
40 ) -> AsyncResult<IoSource<F>> {
41 Ok(IoSource::Epoll(PollSource::new(f, ex)?))
42 }
43
44 /// An executor for scheduling tasks that poll futures to completion.
45 ///
46 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
47 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
48 ///
49 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
50 /// create a new reference, not a new executor.
51 ///
52 /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
53 /// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
54 /// interface duplication, but as far as we understand that is unavoidable.
55 ///
56 /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
57 /// for further details.
58 ///
59 /// # Examples
60 ///
61 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
62 ///
63 /// ```
64 /// use std::cmp::min;
65 /// use std::error::Error;
66 /// use std::fs::{File, OpenOptions};
67 ///
68 /// use cros_async::{AsyncResult, Executor, IoSource, complete3};
69 /// const CHUNK_SIZE: usize = 32;
70 ///
71 /// // Write all bytes from `data` to `f`.
72 /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
73 /// while data.len() > 0 {
74 /// let (count, mut buf) = f.write_from_vec(None, data).await?;
75 ///
76 /// data = buf.split_off(count);
77 /// }
78 ///
79 /// Ok(())
80 /// }
81 ///
82 /// // Transfer `len` bytes of data from `from` to `to`.
83 /// async fn transfer_data(
84 /// from: IoSource<File>,
85 /// to: IoSource<File>,
86 /// len: usize,
87 /// ) -> AsyncResult<usize> {
88 /// let mut rem = len;
89 ///
90 /// while rem > 0 {
91 /// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
92 /// let (count, mut data) = from.read_to_vec(None, buf).await?;
93 ///
94 /// if count == 0 {
95 /// // End of file. Return the number of bytes transferred.
96 /// return Ok(len - rem);
97 /// }
98 ///
99 /// data.truncate(count);
100 /// write_file(&to, data).await?;
101 ///
102 /// rem = rem.saturating_sub(count);
103 /// }
104 ///
105 /// Ok(len)
106 /// }
107 ///
108 /// #[cfg(unix)]
109 /// # fn do_it() -> Result<(), Box<dyn Error>> {
110 /// let ex = Executor::new()?;
111 ///
112 /// let (rx, tx) = base::unix::pipe(true)?;
113 /// let zero = File::open("/dev/zero")?;
114 /// let zero_bytes = CHUNK_SIZE * 7;
115 /// let zero_to_pipe = transfer_data(
116 /// ex.async_from(zero)?,
117 /// ex.async_from(tx.try_clone()?)?,
118 /// zero_bytes,
119 /// );
120 ///
121 /// let rand = File::open("/dev/urandom")?;
122 /// let rand_bytes = CHUNK_SIZE * 19;
123 /// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
124 ///
125 /// let null = OpenOptions::new().write(true).open("/dev/null")?;
126 /// let null_bytes = zero_bytes + rand_bytes;
127 /// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
128 ///
129 /// ex.run_until(complete3(
130 /// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
131 /// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
132 /// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
133 /// ))?;
134 ///
135 /// # Ok(())
136 /// # }
137 /// #[cfg(unix)]
138 /// # do_it().unwrap();
139 /// ```
140
141 #[derive(Clone)]
142 pub enum Executor {
143 Uring(URingExecutor),
144 Fd(FdExecutor),
145 }
146
147 /// An enum to express the kind of the backend of `Executor`
148 #[derive(
149 Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, serde_keyvalue::FromKeyValues,
150 )]
151 #[serde(deny_unknown_fields, rename_all = "kebab-case")]
152 pub enum ExecutorKind {
153 Uring,
154 // For command-line parsing, user-friendly "epoll" is chosen instead of fd.
155 #[serde(rename = "epoll")]
156 Fd,
157 }
158
159 /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
160 /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
161 /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
162 static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new();
163
164 impl Default for ExecutorKind {
default() -> Self165 fn default() -> Self {
166 *DEFAULT_EXECUTOR_KIND.get_or_init(|| ExecutorKind::Fd)
167 }
168 }
169
170 /// The error type for [`Executor::set_default_executor_kind()`].
171 #[derive(Debug, ThisError)]
172 pub enum SetDefaultExecutorKindError {
173 /// The default executor kind is set more than once.
174 #[error("The default executor kind is already set to {0:?}")]
175 SetMoreThanOnce(ExecutorKind),
176
177 /// io_uring is unavailable. The reason might be the lack of the kernel support,
178 /// but is not limited to that.
179 #[error("io_uring is unavailable: {0}")]
180 UringUnavailable(UringError),
181 }
182
183 pub enum TaskHandle<R> {
184 Uring(super::UringExecutorTaskHandle<R>),
185 Fd(super::FdExecutorTaskHandle<R>),
186 }
187
188 impl<R: Send + 'static> TaskHandle<R> {
detach(self)189 pub fn detach(self) {
190 match self {
191 TaskHandle::Uring(x) => x.detach(),
192 TaskHandle::Fd(x) => x.detach(),
193 }
194 }
195 }
196
197 impl<R: 'static> Future for TaskHandle<R> {
198 type Output = R;
199
poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>200 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
201 match self.get_mut() {
202 TaskHandle::Uring(x) => Pin::new(x).poll(cx),
203 TaskHandle::Fd(x) => Pin::new(x).poll(cx),
204 }
205 }
206 }
207
208 impl Executor {
209 /// Create a new `Executor`.
new() -> AsyncResult<Self>210 pub fn new() -> AsyncResult<Self> {
211 Executor::with_executor_kind(ExecutorKind::default())
212 }
213
214 /// Create a new `Executor` of the given `ExecutorKind`.
with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>215 pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
216 match kind {
217 ExecutorKind::Uring => Ok(URingExecutor::new().map(Executor::Uring)?),
218 ExecutorKind::Fd => Ok(FdExecutor::new()
219 .map(Executor::Fd)
220 .map_err(PollError::Executor)?),
221 }
222 }
223
224 /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
225 /// If a call is the first call, it sets the default, and `set_default_executor_kind`
226 /// returns `Ok(())`. Otherwise, it returns `SetDefaultExecutorKindError::SetMoreThanOnce`
227 /// which contains the existing ExecutorKind value configured by the first call.
set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>228 pub fn set_default_executor_kind(
229 executor_kind: ExecutorKind,
230 ) -> Result<(), SetDefaultExecutorKindError> {
231 if executor_kind == ExecutorKind::Uring {
232 check_uring_availability().map_err(SetDefaultExecutorKindError::UringUnavailable)?;
233 if !is_uring_stable() {
234 warn!(
235 "Enabling io_uring executor on the kernel version where io_uring is unstable"
236 );
237 }
238 }
239
240 debug!("setting the default executor to {:?}", executor_kind);
241 DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
242 // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
243 SetDefaultExecutorKindError::SetMoreThanOnce(
244 *DEFAULT_EXECUTOR_KIND
245 .get()
246 .expect("Failed to get DEFAULT_EXECUTOR_KIND"),
247 ))
248 }
249
250 /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
251 /// `IoSource` to directly start async operations without needing a separate reference to the
252 /// executor.
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>253 pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
254 match self {
255 Executor::Uring(ex) => async_uring_from(f, ex),
256 Executor::Fd(ex) => async_poll_from(f, ex),
257 }
258 }
259
260 /// Spawn a new future for this executor to run to completion. Callers may use the returned
261 /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
262 /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
263 /// future associated with it use `TaskHandle::detach`.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// # use cros_async::AsyncResult;
269 /// # fn example_spawn() -> AsyncResult<()> {
270 /// # use std::thread;
271 ///
272 /// # use cros_async::Executor;
273 /// use futures::executor::block_on;
274 ///
275 /// # let ex = Executor::new()?;
276 ///
277 /// # // Spawn a thread that runs the executor.
278 /// # let ex2 = ex.clone();
279 /// # thread::spawn(move || ex2.run());
280 ///
281 /// let task = ex.spawn(async { 7 + 13 });
282 ///
283 /// let result = block_on(task);
284 /// assert_eq!(result, 20);
285 /// # Ok(())
286 /// # }
287 ///
288 /// # example_spawn().unwrap();
289 /// ```
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,290 pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
291 where
292 F: Future + Send + 'static,
293 F::Output: Send + 'static,
294 {
295 match self {
296 Executor::Uring(ex) => TaskHandle::Uring(ex.spawn(f)),
297 Executor::Fd(ex) => TaskHandle::Fd(ex.spawn(f)),
298 }
299 }
300
301 /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
302 /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
303 /// thread where `run()` or `run_until()` is called.
304 ///
305 /// # Panics
306 ///
307 /// `Executor::run` and `Executor::run_until` will panic if they try to poll a future that was
308 /// added by calling `spawn_local` from a different thread.
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// # use cros_async::AsyncResult;
314 /// # fn example_spawn_local() -> AsyncResult<()> {
315 /// # use cros_async::Executor;
316 ///
317 /// # let ex = Executor::new()?;
318 ///
319 /// let task = ex.spawn_local(async { 7 + 13 });
320 ///
321 /// let result = ex.run_until(task)?;
322 /// assert_eq!(result, 20);
323 /// # Ok(())
324 /// # }
325 ///
326 /// # example_spawn_local().unwrap();
327 /// ```
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,328 pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
329 where
330 F: Future + 'static,
331 F::Output: 'static,
332 {
333 match self {
334 Executor::Uring(ex) => TaskHandle::Uring(ex.spawn_local(f)),
335 Executor::Fd(ex) => TaskHandle::Fd(ex.spawn_local(f)),
336 }
337 }
338
339 /// Run the provided closure on a dedicated thread where blocking is allowed.
340 ///
341 /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
342 /// the returned `TaskHandle` may not cancel the operation if it was already started on a
343 /// worker thread.
344 ///
345 /// # Panics
346 ///
347 /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
348 /// already completed.
349 ///
350 /// # Examples
351 ///
352 /// ```edition2018
353 /// # use cros_async::Executor;
354 ///
355 /// # async fn do_it(ex: &Executor) {
356 /// let res = ex.spawn_blocking(move || {
357 /// // Do some CPU-intensive or blocking work here.
358 ///
359 /// 42
360 /// }).await;
361 ///
362 /// assert_eq!(res, 42);
363 /// # }
364 ///
365 /// # let ex = Executor::new().unwrap();
366 /// # ex.run_until(do_it(&ex)).unwrap();
367 /// ```
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,368 pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
369 where
370 F: FnOnce() -> R + Send + 'static,
371 R: Send + 'static,
372 {
373 match self {
374 Executor::Uring(ex) => TaskHandle::Uring(ex.spawn_blocking(f)),
375 Executor::Fd(ex) => TaskHandle::Fd(ex.spawn_blocking(f)),
376 }
377 }
378
379 /// Run the executor indefinitely, driving all spawned futures to completion. This method will
380 /// block the current thread and only return in the case of an error.
381 ///
382 /// # Panics
383 ///
384 /// Once this method has been called on a thread, it may only be called on that thread from that
385 /// point on. Attempting to call it from another thread will panic.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// # use cros_async::AsyncResult;
391 /// # fn example_run() -> AsyncResult<()> {
392 /// use std::thread;
393 ///
394 /// use cros_async::Executor;
395 /// use futures::executor::block_on;
396 ///
397 /// let ex = Executor::new()?;
398 ///
399 /// // Spawn a thread that runs the executor.
400 /// let ex2 = ex.clone();
401 /// thread::spawn(move || ex2.run());
402 ///
403 /// let task = ex.spawn(async { 7 + 13 });
404 ///
405 /// let result = block_on(task);
406 /// assert_eq!(result, 20);
407 /// # Ok(())
408 /// # }
409 ///
410 /// # example_run().unwrap();
411 /// ```
run(&self) -> AsyncResult<()>412 pub fn run(&self) -> AsyncResult<()> {
413 self.run_until(std::future::pending())
414 }
415
416 /// Drive all futures spawned in this executor until `f` completes. This method will block the
417 /// current thread only until `f` is complete and there may still be unfinished futures in the
418 /// executor.
419 ///
420 /// # Panics
421 ///
422 /// Once this method has been called on a thread, from then onwards it may only be called on
423 /// that thread. Attempting to call it from another thread will panic.
424 ///
425 /// # Examples
426 ///
427 /// ```
428 /// # use cros_async::AsyncResult;
429 /// # fn example_run_until() -> AsyncResult<()> {
430 /// use cros_async::Executor;
431 ///
432 /// let ex = Executor::new()?;
433 ///
434 /// let task = ex.spawn_local(async { 7 + 13 });
435 ///
436 /// let result = ex.run_until(task)?;
437 /// assert_eq!(result, 20);
438 /// # Ok(())
439 /// # }
440 ///
441 /// # example_run_until().unwrap();
442 /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>443 pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
444 match self {
445 Executor::Uring(ex) => Ok(ex.run_until(f)?),
446 Executor::Fd(ex) => Ok(ex.run_until(f).map_err(PollError::Executor)?),
447 }
448 }
449 }
450
451 impl AsRawDescriptors for Executor {
as_raw_descriptors(&self) -> Vec<RawDescriptor>452 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
453 match self {
454 Executor::Uring(ex) => ex.as_raw_descriptors(),
455 Executor::Fd(ex) => ex.as_raw_descriptors(),
456 }
457 }
458 }
459