• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::future::Future;
6 use std::pin::Pin;
7 use std::sync::Arc;
8 
9 #[cfg(any(target_os = "android", target_os = "linux"))]
10 use base::warn;
11 #[cfg(any(target_os = "android", target_os = "linux"))]
12 use base::AsRawDescriptors;
13 #[cfg(any(target_os = "android", target_os = "linux"))]
14 use base::RawDescriptor;
15 use once_cell::sync::OnceCell;
16 use serde::Deserialize;
17 use serde_keyvalue::argh::FromArgValue;
18 use serde_keyvalue::ErrorKind;
19 use serde_keyvalue::KeyValueDeserializer;
20 
21 use crate::common_executor;
22 use crate::common_executor::RawExecutor;
23 #[cfg(any(target_os = "android", target_os = "linux"))]
24 use crate::sys::linux;
25 #[cfg(windows)]
26 use crate::sys::windows;
27 use crate::sys::ExecutorKindSys;
28 use crate::AsyncResult;
29 use crate::IntoAsync;
30 use crate::IoSource;
31 
32 cfg_if::cfg_if! {
33     if #[cfg(feature = "tokio")] {
34         use crate::tokio_executor::TokioExecutor;
35         use crate::tokio_executor::TokioTaskHandle;
36     }
37 }
38 
39 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
40 pub enum ExecutorKind {
41     SysVariants(ExecutorKindSys),
42     #[cfg(feature = "tokio")]
43     Tokio,
44 }
45 
46 impl From<ExecutorKindSys> for ExecutorKind {
from(e: ExecutorKindSys) -> ExecutorKind47     fn from(e: ExecutorKindSys) -> ExecutorKind {
48         ExecutorKind::SysVariants(e)
49     }
50 }
51 
52 /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
53 /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
54 /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
55 static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new();
56 
57 impl Default for ExecutorKind {
default() -> Self58     fn default() -> Self {
59         #[cfg(any(target_os = "android", target_os = "linux"))]
60         let default_fn = || ExecutorKindSys::Fd.into();
61         #[cfg(windows)]
62         let default_fn = || ExecutorKindSys::Handle.into();
63         *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)
64     }
65 }
66 
67 /// The error type for [`Executor::set_default_executor_kind()`].
68 #[derive(thiserror::Error, Debug)]
69 pub enum SetDefaultExecutorKindError {
70     /// The default executor kind is set more than once.
71     #[error("The default executor kind is already set to {0:?}")]
72     SetMoreThanOnce(ExecutorKind),
73 
74     #[cfg(any(target_os = "android", target_os = "linux"))]
75     /// io_uring is unavailable. The reason might be the lack of the kernel support,
76     /// but is not limited to that.
77     #[error("io_uring is unavailable: {0}")]
78     UringUnavailable(linux::uring_executor::Error),
79 }
80 
81 impl FromArgValue for ExecutorKind {
from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String>82     fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> {
83         // `from_arg_value` returns a `String` as error, but our deserializer API defines its own
84         // error type. Perform parsing from a closure so we can easily map returned errors.
85         let builder = move || {
86             let mut des = KeyValueDeserializer::from(value);
87 
88             let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) {
89                 #[cfg(any(target_os = "android", target_os = "linux"))]
90                 ("epoll", None) => ExecutorKindSys::Fd.into(),
91                 #[cfg(any(target_os = "android", target_os = "linux"))]
92                 ("uring", None) => ExecutorKindSys::Uring.into(),
93                 #[cfg(windows)]
94                 ("handle", None) => ExecutorKindSys::Handle.into(),
95                 #[cfg(windows)]
96                 ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(),
97                 #[cfg(windows)]
98                 ("overlapped", Some(',')) => {
99                     if des.parse_identifier()? != "concurrency" {
100                         let kind = ErrorKind::SerdeError("expected `concurrency`".to_string());
101                         return Err(des.error_here(kind));
102                     }
103                     if des.next_char() != Some('=') {
104                         return Err(des.error_here(ErrorKind::ExpectedEqual));
105                     }
106                     let concurrency = des.parse_number()?;
107                     ExecutorKindSys::Overlapped {
108                         concurrency: Some(concurrency),
109                     }
110                     .into()
111                 }
112                 #[cfg(feature = "tokio")]
113                 ("tokio", None) => ExecutorKind::Tokio,
114                 (_identifier, _next) => {
115                     let kind = ErrorKind::SerdeError("unexpected kind".to_string());
116                     return Err(des.error_here(kind));
117                 }
118             };
119             des.finish()?;
120             Ok(kind)
121         };
122 
123         builder().map_err(|e| e.to_string())
124     }
125 }
126 
127 impl serde::Serialize for ExecutorKind {
serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer,128     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129     where
130         S: serde::Serializer,
131     {
132         match self {
133             ExecutorKind::SysVariants(sv) => sv.serialize(serializer),
134             #[cfg(feature = "tokio")]
135             ExecutorKind::Tokio => "tokio".serialize(serializer),
136         }
137     }
138 }
139 
140 impl<'de> Deserialize<'de> for ExecutorKind {
deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> where D: serde::Deserializer<'de>,141     fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error>
142     where
143         D: serde::Deserializer<'de>,
144     {
145         base::error!("ExecutorKind::deserialize");
146         let string = String::deserialize(deserializer)?;
147         ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom)
148     }
149 }
150 
151 /// Reference to a task managed by the executor.
152 ///
153 /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to
154 /// continue running the background.
155 ///
156 /// `await`ing the `TaskHandle` waits for the task to finish and yields its result.
157 pub enum TaskHandle<R> {
158     #[cfg(any(target_os = "android", target_os = "linux"))]
159     Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>),
160     #[cfg(any(target_os = "android", target_os = "linux"))]
161     Uring(common_executor::RawTaskHandle<linux::UringReactor, R>),
162     #[cfg(windows)]
163     Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>),
164     #[cfg(feature = "tokio")]
165     Tokio(TokioTaskHandle<R>),
166 }
167 
168 impl<R: Send + 'static> TaskHandle<R> {
detach(self)169     pub fn detach(self) {
170         match self {
171             #[cfg(any(target_os = "android", target_os = "linux"))]
172             TaskHandle::Fd(f) => f.detach(),
173             #[cfg(any(target_os = "android", target_os = "linux"))]
174             TaskHandle::Uring(u) => u.detach(),
175             #[cfg(windows)]
176             TaskHandle::Handle(h) => h.detach(),
177             #[cfg(feature = "tokio")]
178             TaskHandle::Tokio(t) => t.detach(),
179         }
180     }
181 
182     // Cancel the task and wait for it to stop. Returns the result of the task if it was already
183     // finished.
cancel(self) -> Option<R>184     pub async fn cancel(self) -> Option<R> {
185         match self {
186             #[cfg(any(target_os = "android", target_os = "linux"))]
187             TaskHandle::Fd(f) => f.cancel().await,
188             #[cfg(any(target_os = "android", target_os = "linux"))]
189             TaskHandle::Uring(u) => u.cancel().await,
190             #[cfg(windows)]
191             TaskHandle::Handle(h) => h.cancel().await,
192             #[cfg(feature = "tokio")]
193             TaskHandle::Tokio(t) => t.cancel().await,
194         }
195     }
196 }
197 
198 impl<R: 'static> Future for TaskHandle<R> {
199     type Output = R;
200 
poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>201     fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
202         match self.get_mut() {
203             #[cfg(any(target_os = "android", target_os = "linux"))]
204             TaskHandle::Fd(f) => Pin::new(f).poll(cx),
205             #[cfg(any(target_os = "android", target_os = "linux"))]
206             TaskHandle::Uring(u) => Pin::new(u).poll(cx),
207             #[cfg(windows)]
208             TaskHandle::Handle(h) => Pin::new(h).poll(cx),
209             #[cfg(feature = "tokio")]
210             TaskHandle::Tokio(t) => Pin::new(t).poll(cx),
211         }
212     }
213 }
214 
215 pub(crate) trait ExecutorTrait {
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>216     fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
217 
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static218     fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
219     where
220         F: Future + Send + 'static,
221         F::Output: Send + 'static;
222 
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static223     fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
224     where
225         F: FnOnce() -> R + Send + 'static,
226         R: Send + 'static;
227 
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static228     fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
229     where
230         F: Future + 'static,
231         F::Output: 'static;
232 
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>233     fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
234 }
235 
236 /// An executor for scheduling tasks that poll futures to completion.
237 ///
238 /// All asynchronous operations must run within an executor, which is capable of spawning futures as
239 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
240 ///
241 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
242 /// create a new reference, not a new executor.
243 ///
244 /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
245 /// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
246 /// interface duplication, but as far as we understand that is unavoidable.
247 ///
248 /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
249 /// for further details.
250 ///
251 /// # Examples
252 ///
253 /// Concurrently wait for multiple files to become readable/writable and then read/write the data.
254 ///
255 /// ```
256 /// use std::cmp::min;
257 /// use std::error::Error;
258 /// use std::fs::{File, OpenOptions};
259 ///
260 /// use cros_async::{AsyncResult, Executor, IoSource, complete3};
261 /// const CHUNK_SIZE: usize = 32;
262 ///
263 /// // Write all bytes from `data` to `f`.
264 /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
265 ///     while data.len() > 0 {
266 ///         let (count, mut buf) = f.write_from_vec(None, data).await?;
267 ///
268 ///         data = buf.split_off(count);
269 ///     }
270 ///
271 ///     Ok(())
272 /// }
273 ///
274 /// // Transfer `len` bytes of data from `from` to `to`.
275 /// async fn transfer_data(
276 ///     from: IoSource<File>,
277 ///     to: IoSource<File>,
278 ///     len: usize,
279 /// ) -> AsyncResult<usize> {
280 ///     let mut rem = len;
281 ///
282 ///     while rem > 0 {
283 ///         let buf = vec![0u8; min(rem, CHUNK_SIZE)];
284 ///         let (count, mut data) = from.read_to_vec(None, buf).await?;
285 ///
286 ///         if count == 0 {
287 ///             // End of file. Return the number of bytes transferred.
288 ///             return Ok(len - rem);
289 ///         }
290 ///
291 ///         data.truncate(count);
292 ///         write_file(&to, data).await?;
293 ///
294 ///         rem = rem.saturating_sub(count);
295 ///     }
296 ///
297 ///     Ok(len)
298 /// }
299 ///
300 /// #[cfg(any(target_os = "android", target_os = "linux"))]
301 /// # fn do_it() -> Result<(), Box<dyn Error>> {
302 ///     let ex = Executor::new()?;
303 ///
304 ///     let (rx, tx) = base::linux::pipe()?;
305 ///     let zero = File::open("/dev/zero")?;
306 ///     let zero_bytes = CHUNK_SIZE * 7;
307 ///     let zero_to_pipe = transfer_data(
308 ///         ex.async_from(zero)?,
309 ///         ex.async_from(tx.try_clone()?)?,
310 ///         zero_bytes,
311 ///     );
312 ///
313 ///     let rand = File::open("/dev/urandom")?;
314 ///     let rand_bytes = CHUNK_SIZE * 19;
315 ///     let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
316 ///
317 ///     let null = OpenOptions::new().write(true).open("/dev/null")?;
318 ///     let null_bytes = zero_bytes + rand_bytes;
319 ///     let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
320 ///
321 ///     ex.run_until(complete3(
322 ///         async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
323 ///         async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
324 ///         async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
325 ///     ))?;
326 ///
327 /// #     Ok(())
328 /// # }
329 /// #[cfg(any(target_os = "android", target_os = "linux"))]
330 /// # do_it().unwrap();
331 /// ```
332 #[derive(Clone)]
333 pub enum Executor {
334     #[cfg(any(target_os = "android", target_os = "linux"))]
335     Fd(Arc<RawExecutor<linux::EpollReactor>>),
336     #[cfg(any(target_os = "android", target_os = "linux"))]
337     Uring(Arc<RawExecutor<linux::UringReactor>>),
338     #[cfg(windows)]
339     Handle(Arc<RawExecutor<windows::HandleReactor>>),
340     #[cfg(windows)]
341     Overlapped(Arc<RawExecutor<windows::HandleReactor>>),
342     #[cfg(feature = "tokio")]
343     Tokio(TokioExecutor),
344 }
345 
346 impl Executor {
347     /// Create a new `Executor`.
new() -> AsyncResult<Self>348     pub fn new() -> AsyncResult<Self> {
349         Executor::with_executor_kind(ExecutorKind::default())
350     }
351 
352     /// Create a new `Executor` of the given `ExecutorKind`.
with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>353     pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
354         Ok(match kind {
355             #[cfg(any(target_os = "android", target_os = "linux"))]
356             ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),
357             #[cfg(any(target_os = "android", target_os = "linux"))]
358             ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {
359                 Executor::Uring(RawExecutor::new()?)
360             }
361             #[cfg(windows)]
362             ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {
363                 Executor::Handle(RawExecutor::new()?)
364             }
365             #[cfg(windows)]
366             ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => {
367                 let reactor = match concurrency {
368                     Some(concurrency) => windows::HandleReactor::new_with(concurrency)?,
369                     None => windows::HandleReactor::new()?,
370                 };
371                 Executor::Overlapped(RawExecutor::new_with(reactor)?)
372             }
373             #[cfg(feature = "tokio")]
374             ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?),
375         })
376     }
377 
378     /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>379     pub fn set_default_executor_kind(
380         executor_kind: ExecutorKind,
381     ) -> Result<(), SetDefaultExecutorKindError> {
382         #[cfg(any(target_os = "android", target_os = "linux"))]
383         if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {
384             linux::uring_executor::check_uring_availability()
385                 .map_err(SetDefaultExecutorKindError::UringUnavailable)?;
386             if !crate::is_uring_stable() {
387                 warn!(
388                     "Enabling io_uring executor on the kernel version where io_uring is unstable"
389                 );
390             }
391         }
392         DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
393             // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
394             SetDefaultExecutorKindError::SetMoreThanOnce(
395                 *DEFAULT_EXECUTOR_KIND
396                     .get()
397                     .expect("Failed to get DEFAULT_EXECUTOR_KIND"),
398             ))
399     }
400 
401     /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
402     /// `IoSource` to directly start async operations without needing a separate reference to the
403     /// executor.
async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>404     pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
405         match self {
406             #[cfg(any(target_os = "android", target_os = "linux"))]
407             Executor::Fd(ex) => ex.async_from(f),
408             #[cfg(any(target_os = "android", target_os = "linux"))]
409             Executor::Uring(ex) => ex.async_from(f),
410             #[cfg(windows)]
411             Executor::Handle(ex) => ex.async_from(f),
412             #[cfg(windows)]
413             Executor::Overlapped(ex) => ex.async_from(f),
414             #[cfg(feature = "tokio")]
415             Executor::Tokio(ex) => ex.async_from(f),
416         }
417     }
418 
419     /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
420     /// If the executor is not overlapped, then Handle source is returned.
421     /// returned `IoSource` to directly start async operations without needing a separate reference
422     /// to the executor.
423     #[cfg(windows)]
async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>424     pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
425         match self {
426             Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(
427                 f, ex, false,
428             )?)),
429             _ => self.async_from(f),
430         }
431     }
432 
433     /// Spawn a new future for this executor to run to completion. Callers may use the returned
434     /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
435     /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
436     /// future associated with it use `TaskHandle::detach`.
437     ///
438     /// # Examples
439     ///
440     /// ```
441     /// # use cros_async::AsyncResult;
442     /// # fn example_spawn() -> AsyncResult<()> {
443     /// #      use std::thread;
444     ///
445     /// #      use cros_async::Executor;
446     ///        use futures::executor::block_on;
447     ///
448     /// #      let ex = Executor::new()?;
449     ///
450     /// #      // Spawn a thread that runs the executor.
451     /// #      let ex2 = ex.clone();
452     /// #      thread::spawn(move || ex2.run());
453     ///
454     ///       let task = ex.spawn(async { 7 + 13 });
455     ///
456     ///       let result = block_on(task);
457     ///       assert_eq!(result, 20);
458     /// #     Ok(())
459     /// # }
460     ///
461     /// # example_spawn().unwrap();
462     /// ```
spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,463     pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
464     where
465         F: Future + Send + 'static,
466         F::Output: Send + 'static,
467     {
468         match self {
469             #[cfg(any(target_os = "android", target_os = "linux"))]
470             Executor::Fd(ex) => ex.spawn(f),
471             #[cfg(any(target_os = "android", target_os = "linux"))]
472             Executor::Uring(ex) => ex.spawn(f),
473             #[cfg(windows)]
474             Executor::Handle(ex) => ex.spawn(f),
475             #[cfg(windows)]
476             Executor::Overlapped(ex) => ex.spawn(f),
477             #[cfg(feature = "tokio")]
478             Executor::Tokio(ex) => ex.spawn(f),
479         }
480     }
481 
482     /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
483     /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
484     /// thread where `run()` or `run_until()` is called.
485     ///
486     /// # Panics
487     ///
488     /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
489     /// added by calling `spawn_local` from a different thread.
490     ///
491     /// # Examples
492     ///
493     /// ```
494     /// # use cros_async::AsyncResult;
495     /// # fn example_spawn_local() -> AsyncResult<()> {
496     /// #      use cros_async::Executor;
497     ///
498     /// #      let ex = Executor::new()?;
499     ///
500     ///        let task = ex.spawn_local(async { 7 + 13 });
501     ///
502     ///        let result = ex.run_until(task)?;
503     ///        assert_eq!(result, 20);
504     ///        Ok(())
505     /// # }
506     ///
507     /// # example_spawn_local().unwrap();
508     /// ```
spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,509     pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
510     where
511         F: Future + 'static,
512         F::Output: 'static,
513     {
514         match self {
515             #[cfg(any(target_os = "android", target_os = "linux"))]
516             Executor::Fd(ex) => ex.spawn_local(f),
517             #[cfg(any(target_os = "android", target_os = "linux"))]
518             Executor::Uring(ex) => ex.spawn_local(f),
519             #[cfg(windows)]
520             Executor::Handle(ex) => ex.spawn_local(f),
521             #[cfg(windows)]
522             Executor::Overlapped(ex) => ex.spawn_local(f),
523             #[cfg(feature = "tokio")]
524             Executor::Tokio(ex) => ex.spawn_local(f),
525         }
526     }
527 
528     /// Run the provided closure on a dedicated thread where blocking is allowed.
529     ///
530     /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
531     /// the returned `TaskHandle` may not cancel the operation if it was already started on a
532     /// worker thread.
533     ///
534     /// # Panics
535     ///
536     /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
537     /// already completed.
538     ///
539     /// # Examples
540     ///
541     /// ```edition2018
542     /// # use cros_async::Executor;
543     ///
544     /// # async fn do_it(ex: &Executor) {
545     ///     let res = ex.spawn_blocking(move || {
546     ///         // Do some CPU-intensive or blocking work here.
547     ///
548     ///         42
549     ///     }).await;
550     ///
551     ///     assert_eq!(res, 42);
552     /// # }
553     ///
554     /// # let ex = Executor::new().unwrap();
555     /// # ex.run_until(do_it(&ex)).unwrap();
556     /// ```
spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,557     pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
558     where
559         F: FnOnce() -> R + Send + 'static,
560         R: Send + 'static,
561     {
562         match self {
563             #[cfg(any(target_os = "android", target_os = "linux"))]
564             Executor::Fd(ex) => ex.spawn_blocking(f),
565             #[cfg(any(target_os = "android", target_os = "linux"))]
566             Executor::Uring(ex) => ex.spawn_blocking(f),
567             #[cfg(windows)]
568             Executor::Handle(ex) => ex.spawn_blocking(f),
569             #[cfg(windows)]
570             Executor::Overlapped(ex) => ex.spawn_blocking(f),
571             #[cfg(feature = "tokio")]
572             Executor::Tokio(ex) => ex.spawn_blocking(f),
573         }
574     }
575 
576     /// Run the executor indefinitely, driving all spawned futures to completion. This method will
577     /// block the current thread and only return in the case of an error.
578     ///
579     /// # Panics
580     ///
581     /// Once this method has been called on a thread, it may only be called on that thread from that
582     /// point on. Attempting to call it from another thread will panic.
583     ///
584     /// # Examples
585     ///
586     /// ```
587     /// # use cros_async::AsyncResult;
588     /// # fn example_run() -> AsyncResult<()> {
589     ///       use std::thread;
590     ///
591     ///       use cros_async::Executor;
592     ///       use futures::executor::block_on;
593     ///
594     ///       let ex = Executor::new()?;
595     ///
596     ///       // Spawn a thread that runs the executor.
597     ///       let ex2 = ex.clone();
598     ///       thread::spawn(move || ex2.run());
599     ///
600     ///       let task = ex.spawn(async { 7 + 13 });
601     ///
602     ///       let result = block_on(task);
603     ///       assert_eq!(result, 20);
604     /// #     Ok(())
605     /// # }
606     ///
607     /// # example_run().unwrap();
608     /// ```
run(&self) -> AsyncResult<()>609     pub fn run(&self) -> AsyncResult<()> {
610         self.run_until(std::future::pending())
611     }
612 
613     /// Drive all futures spawned in this executor until `f` completes. This method will block the
614     /// current thread only until `f` is complete and there may still be unfinished futures in the
615     /// executor.
616     ///
617     /// # Panics
618     ///
619     /// Once this method has been called on a thread, from then onwards it may only be called on
620     /// that thread. Attempting to call it from another thread will panic.
621     ///
622     /// # Examples
623     ///
624     /// ```
625     /// # use cros_async::AsyncResult;
626     /// # fn example_run_until() -> AsyncResult<()> {
627     ///       use cros_async::Executor;
628     ///
629     ///       let ex = Executor::new()?;
630     ///
631     ///       let task = ex.spawn_local(async { 7 + 13 });
632     ///
633     ///       let result = ex.run_until(task)?;
634     ///       assert_eq!(result, 20);
635     /// #     Ok(())
636     /// # }
637     ///
638     /// # example_run_until().unwrap();
639     /// ```
run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>640     pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
641         match self {
642             #[cfg(any(target_os = "android", target_os = "linux"))]
643             Executor::Fd(ex) => ex.run_until(f),
644             #[cfg(any(target_os = "android", target_os = "linux"))]
645             Executor::Uring(ex) => ex.run_until(f),
646             #[cfg(windows)]
647             Executor::Handle(ex) => ex.run_until(f),
648             #[cfg(windows)]
649             Executor::Overlapped(ex) => ex.run_until(f),
650             #[cfg(feature = "tokio")]
651             Executor::Tokio(ex) => ex.run_until(f),
652         }
653     }
654 }
655 
656 #[cfg(any(target_os = "android", target_os = "linux"))]
657 impl AsRawDescriptors for Executor {
as_raw_descriptors(&self) -> Vec<RawDescriptor>658     fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
659         match self {
660             Executor::Fd(ex) => ex.as_raw_descriptors(),
661             Executor::Uring(ex) => ex.as_raw_descriptors(),
662             #[cfg(feature = "tokio")]
663             Executor::Tokio(ex) => ex.as_raw_descriptors(),
664         }
665     }
666 }
667