• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 use std::pin::Pin;
7 
8 use base::debug;
9 use base::warn;
10 use base::AsRawDescriptors;
11 use base::RawDescriptor;
12 use once_cell::sync::OnceCell;
13 use serde::Deserialize;
14 use serde::Serialize;
15 use thiserror::Error as ThisError;
16 
17 use super::poll_source::Error as PollError;
18 use super::uring_executor::check_uring_availability;
19 use super::uring_executor::is_uring_stable;
20 use super::uring_executor::Error as UringError;
21 use super::FdExecutor;
22 use super::PollSource;
23 use super::URingExecutor;
24 use super::UringSource;
25 use crate::AsyncResult;
26 use crate::IntoAsync;
27 use crate::IoSource;
28 
async_uring_from<'a, F: IntoAsync + 'a>( f: F, ex: &URingExecutor, ) -> AsyncResult<IoSource<F>>29 pub(crate) fn async_uring_from<'a, F: IntoAsync + 'a>(
30     f: F,
31     ex: &URingExecutor,
32 ) -> AsyncResult<IoSource<F>> {
33     Ok(IoSource::Uring(UringSource::new(f, ex)?))
34 }
35 
36 /// Creates an `IoSource` using the fd_executor.
async_poll_from<'a, F: IntoAsync + 'a>( f: F, ex: &FdExecutor, ) -> AsyncResult<IoSource<F>>37 pub(crate) fn async_poll_from<'a, F: IntoAsync + 'a>(
38     f: F,
39     ex: &FdExecutor,
40 ) -> AsyncResult<IoSource<F>> {
41     Ok(IoSource::Epoll(PollSource::new(f, ex)?))
42 }
43 
44 /// An executor for scheduling tasks that poll futures to completion.
45 ///
46 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
47 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
48 ///
49 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
50 /// create a new reference, not a new executor.
51 ///
52 /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
53 /// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
54 /// interface duplication, but as far as we understand that is unavoidable.
55 ///
56 /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
57 /// for further details.
58 ///
59 /// # Examples
60 ///
61 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
62 ///
63 /// ```
64 /// use std::cmp::min;
65 /// use std::error::Error;
66 /// use std::fs::{File, OpenOptions};
67 ///
68 /// use cros_async::{AsyncResult, Executor, IoSource, complete3};
69 /// const CHUNK_SIZE: usize = 32;
70 ///
71 /// // Write all bytes from `data` to `f`.
72 /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
73 ///     while data.len() > 0 {
74 ///         let (count, mut buf) = f.write_from_vec(None, data).await?;
75 ///
76 ///         data = buf.split_off(count);
77 ///     }
78 ///
79 ///     Ok(())
80 /// }
81 ///
82 /// // Transfer `len` bytes of data from `from` to `to`.
83 /// async fn transfer_data(
84 ///     from: IoSource<File>,
85 ///     to: IoSource<File>,
86 ///     len: usize,
87 /// ) -> AsyncResult<usize> {
88 ///     let mut rem = len;
89 ///
90 ///     while rem > 0 {
91 ///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
92 ///         let (count, mut data) = from.read_to_vec(None, buf).await?;
93 ///
94 ///         if count == 0 {
95 ///             // End of file. Return the number of bytes transferred.
96 ///             return Ok(len - rem);
97 ///         }
98 ///
99 ///         data.truncate(count);
100 ///         write_file(&to, data).await?;
101 ///
102 ///         rem = rem.saturating_sub(count);
103 ///     }
104 ///
105 ///     Ok(len)
106 /// }
107 ///
108 /// #[cfg(unix)]
109 /// # fn do_it() -> Result<(), Box<dyn Error>> {
110 ///     let ex = Executor::new()?;
111 ///
112 ///     let (rx, tx) = base::unix::pipe(true)?;
113 ///     let zero = File::open("/dev/zero")?;
114 ///     let zero_bytes = CHUNK_SIZE * 7;
115 ///     let zero_to_pipe = transfer_data(
116 ///         ex.async_from(zero)?,
117 ///         ex.async_from(tx.try_clone()?)?,
118 ///         zero_bytes,
119 ///     );
120 ///
121 ///     let rand = File::open("/dev/urandom")?;
122 ///     let rand_bytes = CHUNK_SIZE * 19;
123 ///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
124 ///
125 ///     let null = OpenOptions::new().write(true).open("/dev/null")?;
126 ///     let null_bytes = zero_bytes + rand_bytes;
127 ///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
128 ///
129 ///     ex.run_until(complete3(
130 ///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
131 ///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
132 ///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
133 ///     ))?;
134 ///
135 /// #     Ok(())
136 /// # }
137 /// #[cfg(unix)]
138 /// # do_it().unwrap();
139 /// ```
140 
141 #[derive(Clone)]
142 pub enum Executor {
143     Uring(URingExecutor),
144     Fd(FdExecutor),
145 }
146 
147 /// An enum to express the kind of the backend of `Executor`
148 #[derive(
149     Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, serde_keyvalue::FromKeyValues,
150 )]
151 #[serde(deny_unknown_fields, rename_all = "kebab-case")]
152 pub enum ExecutorKind {
153     Uring,
154     // For command-line parsing, user-friendly "epoll" is chosen instead of fd.
155     #[serde(rename = "epoll")]
156     Fd,
157 }
158 
159 /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
160 /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
161 /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
162 static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new();
163 
164 impl Default for ExecutorKind {
default() -> Self165     fn default() -> Self {
166         *DEFAULT_EXECUTOR_KIND.get_or_init(|| ExecutorKind::Fd)
167     }
168 }
169 
170 /// The error type for [`Executor::set_default_executor_kind()`].
171 #[derive(Debug, ThisError)]
172 pub enum SetDefaultExecutorKindError {
173     /// The default executor kind is set more than once.
174     #[error("The default executor kind is already set to {0:?}")]
175     SetMoreThanOnce(ExecutorKind),
176 
177     /// io_uring is unavailable. The reason might be the lack of the kernel support,
178     /// but is not limited to that.
179     #[error("io_uring is unavailable: {0}")]
180     UringUnavailable(UringError),
181 }
182 
183 pub enum TaskHandle<R> {
184     Uring(super::UringExecutorTaskHandle<R>),
185     Fd(super::FdExecutorTaskHandle<R>),
186 }
187 
188 impl<R: Send + 'static> TaskHandle<R> {
detach(self)189     pub fn detach(self) {
190         match self {
191             TaskHandle::Uring(x) => x.detach(),
192             TaskHandle::Fd(x) => x.detach(),
193         }
194     }
195 }
196 
197 impl<R: 'static> Future for TaskHandle<R> {
198     type Output = R;
199 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>200     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
201         match self.get_mut() {
202             TaskHandle::Uring(x) => Pin::new(x).poll(cx),
203             TaskHandle::Fd(x) => Pin::new(x).poll(cx),
204         }
205     }
206 }
207 
208 impl Executor {
209     /// Create a new `Executor`.
new() -> AsyncResult<Self>210     pub fn new() -> AsyncResult<Self> {
211         Executor::with_executor_kind(ExecutorKind::default())
212     }
213 
214     /// Create a new `Executor` of the given `ExecutorKind`.
with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>215     pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
216         match kind {
217             ExecutorKind::Uring => Ok(URingExecutor::new().map(Executor::Uring)?),
218             ExecutorKind::Fd => Ok(FdExecutor::new()
219                 .map(Executor::Fd)
220                 .map_err(PollError::Executor)?),
221         }
222     }
223 
224     /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
225     /// If a call is the first call, it sets the default, and `set_default_executor_kind`
226     /// returns `Ok(())`. Otherwise, it returns `SetDefaultExecutorKindError::SetMoreThanOnce`
227     /// which contains the existing ExecutorKind value configured by the first call.
set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>228     pub fn set_default_executor_kind(
229         executor_kind: ExecutorKind,
230     ) -> Result<(), SetDefaultExecutorKindError> {
231         if executor_kind == ExecutorKind::Uring {
232             check_uring_availability().map_err(SetDefaultExecutorKindError::UringUnavailable)?;
233             if !is_uring_stable() {
234                 warn!(
235                     "Enabling io_uring executor on the kernel version where io_uring is unstable"
236                 );
237             }
238         }
239 
240         debug!("setting the default executor to {:?}", executor_kind);
241         DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
242             // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
243             SetDefaultExecutorKindError::SetMoreThanOnce(
244                 *DEFAULT_EXECUTOR_KIND
245                     .get()
246                     .expect("Failed to get DEFAULT_EXECUTOR_KIND"),
247             ))
248     }
249 
250     /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
251     /// `IoSource` to directly start async operations without needing a separate reference to the
252     /// executor.
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>253     pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
254         match self {
255             Executor::Uring(ex) => async_uring_from(f, ex),
256             Executor::Fd(ex) => async_poll_from(f, ex),
257         }
258     }
259 
260     /// Spawn a new future for this executor to run to completion. Callers may use the returned
261     /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
262     /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
263     /// future associated with it use `TaskHandle::detach`.
264     ///
265     /// # Examples
266     ///
267     /// ```
268     /// # use cros_async::AsyncResult;
269     /// # fn example_spawn() -> AsyncResult<()> {
270     /// #      use std::thread;
271     ///
272     /// #      use cros_async::Executor;
273     ///       use futures::executor::block_on;
274     ///
275     /// #      let ex = Executor::new()?;
276     ///
277     /// #      // Spawn a thread that runs the executor.
278     /// #      let ex2 = ex.clone();
279     /// #      thread::spawn(move || ex2.run());
280     ///
281     ///       let task = ex.spawn(async { 7 + 13 });
282     ///
283     ///       let result = block_on(task);
284     ///       assert_eq!(result, 20);
285     /// #     Ok(())
286     /// # }
287     ///
288     /// # example_spawn().unwrap();
289     /// ```
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,290     pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
291     where
292         F: Future + Send + 'static,
293         F::Output: Send + 'static,
294     {
295         match self {
296             Executor::Uring(ex) => TaskHandle::Uring(ex.spawn(f)),
297             Executor::Fd(ex) => TaskHandle::Fd(ex.spawn(f)),
298         }
299     }
300 
301     /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
302     /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
303     /// thread where `run()` or `run_until()` is called.
304     ///
305     /// # Panics
306     ///
307     /// `Executor::run` and `Executor::run_until` will panic if they try to poll a future that was
308     /// added by calling `spawn_local` from a different thread.
309     ///
310     /// # Examples
311     ///
312     /// ```
313     /// # use cros_async::AsyncResult;
314     /// # fn example_spawn_local() -> AsyncResult<()> {
315     /// #      use cros_async::Executor;
316     ///
317     /// #      let ex = Executor::new()?;
318     ///
319     ///       let task = ex.spawn_local(async { 7 + 13 });
320     ///
321     ///       let result = ex.run_until(task)?;
322     ///       assert_eq!(result, 20);
323     /// #     Ok(())
324     /// # }
325     ///
326     /// # example_spawn_local().unwrap();
327     /// ```
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,328     pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
329     where
330         F: Future + 'static,
331         F::Output: 'static,
332     {
333         match self {
334             Executor::Uring(ex) => TaskHandle::Uring(ex.spawn_local(f)),
335             Executor::Fd(ex) => TaskHandle::Fd(ex.spawn_local(f)),
336         }
337     }
338 
339     /// Run the provided closure on a dedicated thread where blocking is allowed.
340     ///
341     /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
342     /// the returned `TaskHandle` may not cancel the operation if it was already started on a
343     /// worker thread.
344     ///
345     /// # Panics
346     ///
347     /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
348     /// already completed.
349     ///
350     /// # Examples
351     ///
352     /// ```edition2018
353     /// # use cros_async::Executor;
354     ///
355     /// # async fn do_it(ex: &Executor) {
356     ///     let res = ex.spawn_blocking(move || {
357     ///         // Do some CPU-intensive or blocking work here.
358     ///
359     ///         42
360     ///     }).await;
361     ///
362     ///     assert_eq!(res, 42);
363     /// # }
364     ///
365     /// # let ex = Executor::new().unwrap();
366     /// # ex.run_until(do_it(&ex)).unwrap();
367     /// ```
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,368     pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
369     where
370         F: FnOnce() -> R + Send + 'static,
371         R: Send + 'static,
372     {
373         match self {
374             Executor::Uring(ex) => TaskHandle::Uring(ex.spawn_blocking(f)),
375             Executor::Fd(ex) => TaskHandle::Fd(ex.spawn_blocking(f)),
376         }
377     }
378 
379     /// Run the executor indefinitely, driving all spawned futures to completion. This method will
380     /// block the current thread and only return in the case of an error.
381     ///
382     /// # Panics
383     ///
384     /// Once this method has been called on a thread, it may only be called on that thread from that
385     /// point on. Attempting to call it from another thread will panic.
386     ///
387     /// # Examples
388     ///
389     /// ```
390     /// # use cros_async::AsyncResult;
391     /// # fn example_run() -> AsyncResult<()> {
392     ///       use std::thread;
393     ///
394     ///       use cros_async::Executor;
395     ///       use futures::executor::block_on;
396     ///
397     ///       let ex = Executor::new()?;
398     ///
399     ///       // Spawn a thread that runs the executor.
400     ///       let ex2 = ex.clone();
401     ///       thread::spawn(move || ex2.run());
402     ///
403     ///       let task = ex.spawn(async { 7 + 13 });
404     ///
405     ///       let result = block_on(task);
406     ///       assert_eq!(result, 20);
407     /// #     Ok(())
408     /// # }
409     ///
410     /// # example_run().unwrap();
411     /// ```
run(&self) -> AsyncResult<()>412     pub fn run(&self) -> AsyncResult<()> {
413         self.run_until(std::future::pending())
414     }
415 
416     /// Drive all futures spawned in this executor until `f` completes. This method will block the
417     /// current thread only until `f` is complete and there may still be unfinished futures in the
418     /// executor.
419     ///
420     /// # Panics
421     ///
422     /// Once this method has been called on a thread, from then onwards it may only be called on
423     /// that thread. Attempting to call it from another thread will panic.
424     ///
425     /// # Examples
426     ///
427     /// ```
428     /// # use cros_async::AsyncResult;
429     /// # fn example_run_until() -> AsyncResult<()> {
430     ///       use cros_async::Executor;
431     ///
432     ///       let ex = Executor::new()?;
433     ///
434     ///       let task = ex.spawn_local(async { 7 + 13 });
435     ///
436     ///       let result = ex.run_until(task)?;
437     ///       assert_eq!(result, 20);
438     /// #     Ok(())
439     /// # }
440     ///
441     /// # example_run_until().unwrap();
442     /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>443     pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
444         match self {
445             Executor::Uring(ex) => Ok(ex.run_until(f)?),
446             Executor::Fd(ex) => Ok(ex.run_until(f).map_err(PollError::Executor)?),
447         }
448     }
449 }
450 
451 impl AsRawDescriptors for Executor {
as_raw_descriptors(&self) -> Vec<RawDescriptor>452     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
453         match self {
454             Executor::Uring(ex) => ex.as_raw_descriptors(),
455             Executor::Fd(ex) => ex.as_raw_descriptors(),
456         }
457     }
458 }
459