1 // Copyright 2024 The ChromiumOS Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::future::Future; 6 use std::pin::Pin; 7 use std::sync::Arc; 8 9 #[cfg(any(target_os = "android", target_os = "linux"))] 10 use base::warn; 11 #[cfg(any(target_os = "android", target_os = "linux"))] 12 use base::AsRawDescriptors; 13 #[cfg(any(target_os = "android", target_os = "linux"))] 14 use base::RawDescriptor; 15 use once_cell::sync::OnceCell; 16 use serde::Deserialize; 17 use serde_keyvalue::argh::FromArgValue; 18 use serde_keyvalue::ErrorKind; 19 use serde_keyvalue::KeyValueDeserializer; 20 21 use crate::common_executor; 22 use crate::common_executor::RawExecutor; 23 #[cfg(any(target_os = "android", target_os = "linux"))] 24 use crate::sys::linux; 25 #[cfg(windows)] 26 use crate::sys::windows; 27 use crate::sys::ExecutorKindSys; 28 use crate::AsyncResult; 29 use crate::IntoAsync; 30 use crate::IoSource; 31 32 cfg_if::cfg_if! { 33 if #[cfg(feature = "tokio")] { 34 use crate::tokio_executor::TokioExecutor; 35 use crate::tokio_executor::TokioTaskHandle; 36 } 37 } 38 39 #[derive(Clone, Copy, Debug, PartialEq, Eq)] 40 pub enum ExecutorKind { 41 SysVariants(ExecutorKindSys), 42 #[cfg(feature = "tokio")] 43 Tokio, 44 } 45 46 impl From<ExecutorKindSys> for ExecutorKind { from(e: ExecutorKindSys) -> ExecutorKind47 fn from(e: ExecutorKindSys) -> ExecutorKind { 48 ExecutorKind::SysVariants(e) 49 } 50 } 51 52 /// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`. 53 /// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and 54 /// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value. 55 static DEFAULT_EXECUTOR_KIND: OnceCell<ExecutorKind> = OnceCell::new(); 56 57 impl Default for ExecutorKind { default() -> Self58 fn default() -> Self { 59 #[cfg(any(target_os = "android", target_os = "linux"))] 60 let default_fn = || ExecutorKindSys::Fd.into(); 61 #[cfg(windows)] 62 let default_fn = || ExecutorKindSys::Handle.into(); 63 *DEFAULT_EXECUTOR_KIND.get_or_init(default_fn) 64 } 65 } 66 67 /// The error type for [`Executor::set_default_executor_kind()`]. 68 #[derive(thiserror::Error, Debug)] 69 pub enum SetDefaultExecutorKindError { 70 /// The default executor kind is set more than once. 71 #[error("The default executor kind is already set to {0:?}")] 72 SetMoreThanOnce(ExecutorKind), 73 74 #[cfg(any(target_os = "android", target_os = "linux"))] 75 /// io_uring is unavailable. The reason might be the lack of the kernel support, 76 /// but is not limited to that. 77 #[error("io_uring is unavailable: {0}")] 78 UringUnavailable(linux::uring_executor::Error), 79 } 80 81 impl FromArgValue for ExecutorKind { from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String>82 fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> { 83 // `from_arg_value` returns a `String` as error, but our deserializer API defines its own 84 // error type. Perform parsing from a closure so we can easily map returned errors. 85 let builder = move || { 86 let mut des = KeyValueDeserializer::from(value); 87 88 let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) { 89 #[cfg(any(target_os = "android", target_os = "linux"))] 90 ("epoll", None) => ExecutorKindSys::Fd.into(), 91 #[cfg(any(target_os = "android", target_os = "linux"))] 92 ("uring", None) => ExecutorKindSys::Uring.into(), 93 #[cfg(windows)] 94 ("handle", None) => ExecutorKindSys::Handle.into(), 95 #[cfg(windows)] 96 ("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(), 97 #[cfg(windows)] 98 ("overlapped", Some(',')) => { 99 if des.parse_identifier()? != "concurrency" { 100 let kind = ErrorKind::SerdeError("expected `concurrency`".to_string()); 101 return Err(des.error_here(kind)); 102 } 103 if des.next_char() != Some('=') { 104 return Err(des.error_here(ErrorKind::ExpectedEqual)); 105 } 106 let concurrency = des.parse_number()?; 107 ExecutorKindSys::Overlapped { 108 concurrency: Some(concurrency), 109 } 110 .into() 111 } 112 #[cfg(feature = "tokio")] 113 ("tokio", None) => ExecutorKind::Tokio, 114 (_identifier, _next) => { 115 let kind = ErrorKind::SerdeError("unexpected kind".to_string()); 116 return Err(des.error_here(kind)); 117 } 118 }; 119 des.finish()?; 120 Ok(kind) 121 }; 122 123 builder().map_err(|e| e.to_string()) 124 } 125 } 126 127 impl serde::Serialize for ExecutorKind { serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer,128 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> 129 where 130 S: serde::Serializer, 131 { 132 match self { 133 ExecutorKind::SysVariants(sv) => sv.serialize(serializer), 134 #[cfg(feature = "tokio")] 135 ExecutorKind::Tokio => "tokio".serialize(serializer), 136 } 137 } 138 } 139 140 impl<'de> Deserialize<'de> for ExecutorKind { deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> where D: serde::Deserializer<'de>,141 fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error> 142 where 143 D: serde::Deserializer<'de>, 144 { 145 base::error!("ExecutorKind::deserialize"); 146 let string = String::deserialize(deserializer)?; 147 ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom) 148 } 149 } 150 151 /// Reference to a task managed by the executor. 152 /// 153 /// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to 154 /// continue running the background. 155 /// 156 /// `await`ing the `TaskHandle` waits for the task to finish and yields its result. 157 pub enum TaskHandle<R> { 158 #[cfg(any(target_os = "android", target_os = "linux"))] 159 Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>), 160 #[cfg(any(target_os = "android", target_os = "linux"))] 161 Uring(common_executor::RawTaskHandle<linux::UringReactor, R>), 162 #[cfg(windows)] 163 Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>), 164 #[cfg(feature = "tokio")] 165 Tokio(TokioTaskHandle<R>), 166 } 167 168 impl<R: Send + 'static> TaskHandle<R> { detach(self)169 pub fn detach(self) { 170 match self { 171 #[cfg(any(target_os = "android", target_os = "linux"))] 172 TaskHandle::Fd(f) => f.detach(), 173 #[cfg(any(target_os = "android", target_os = "linux"))] 174 TaskHandle::Uring(u) => u.detach(), 175 #[cfg(windows)] 176 TaskHandle::Handle(h) => h.detach(), 177 #[cfg(feature = "tokio")] 178 TaskHandle::Tokio(t) => t.detach(), 179 } 180 } 181 182 // Cancel the task and wait for it to stop. Returns the result of the task if it was already 183 // finished. cancel(self) -> Option<R>184 pub async fn cancel(self) -> Option<R> { 185 match self { 186 #[cfg(any(target_os = "android", target_os = "linux"))] 187 TaskHandle::Fd(f) => f.cancel().await, 188 #[cfg(any(target_os = "android", target_os = "linux"))] 189 TaskHandle::Uring(u) => u.cancel().await, 190 #[cfg(windows)] 191 TaskHandle::Handle(h) => h.cancel().await, 192 #[cfg(feature = "tokio")] 193 TaskHandle::Tokio(t) => t.cancel().await, 194 } 195 } 196 } 197 198 impl<R: 'static> Future for TaskHandle<R> { 199 type Output = R; 200 poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output>201 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> { 202 match self.get_mut() { 203 #[cfg(any(target_os = "android", target_os = "linux"))] 204 TaskHandle::Fd(f) => Pin::new(f).poll(cx), 205 #[cfg(any(target_os = "android", target_os = "linux"))] 206 TaskHandle::Uring(u) => Pin::new(u).poll(cx), 207 #[cfg(windows)] 208 TaskHandle::Handle(h) => Pin::new(h).poll(cx), 209 #[cfg(feature = "tokio")] 210 TaskHandle::Tokio(t) => Pin::new(t).poll(cx), 211 } 212 } 213 } 214 215 pub(crate) trait ExecutorTrait { async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>216 fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>; 217 spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static218 fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 219 where 220 F: Future + Send + 'static, 221 F::Output: Send + 'static; 222 spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static223 fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 224 where 225 F: FnOnce() -> R + Send + 'static, 226 R: Send + 'static; 227 spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static228 fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 229 where 230 F: Future + 'static, 231 F::Output: 'static; 232 run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>233 fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>; 234 } 235 236 /// An executor for scheduling tasks that poll futures to completion. 237 /// 238 /// All asynchronous operations must run within an executor, which is capable of spawning futures as 239 /// tasks. This executor also provides a mechanism for performing asynchronous I/O operations. 240 /// 241 /// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only 242 /// create a new reference, not a new executor. 243 /// 244 /// Note that language limitations (trait objects can have <=1 non auto trait) require this to be 245 /// represented on the POSIX side as an enum, rather than a trait. This leads to some code & 246 /// interface duplication, but as far as we understand that is unavoidable. 247 /// 248 /// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75> 249 /// for further details. 250 /// 251 /// # Examples 252 /// 253 /// Concurrently wait for multiple files to become readable/writable and then read/write the data. 254 /// 255 /// ``` 256 /// use std::cmp::min; 257 /// use std::error::Error; 258 /// use std::fs::{File, OpenOptions}; 259 /// 260 /// use cros_async::{AsyncResult, Executor, IoSource, complete3}; 261 /// const CHUNK_SIZE: usize = 32; 262 /// 263 /// // Write all bytes from `data` to `f`. 264 /// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> { 265 /// while data.len() > 0 { 266 /// let (count, mut buf) = f.write_from_vec(None, data).await?; 267 /// 268 /// data = buf.split_off(count); 269 /// } 270 /// 271 /// Ok(()) 272 /// } 273 /// 274 /// // Transfer `len` bytes of data from `from` to `to`. 275 /// async fn transfer_data( 276 /// from: IoSource<File>, 277 /// to: IoSource<File>, 278 /// len: usize, 279 /// ) -> AsyncResult<usize> { 280 /// let mut rem = len; 281 /// 282 /// while rem > 0 { 283 /// let buf = vec![0u8; min(rem, CHUNK_SIZE)]; 284 /// let (count, mut data) = from.read_to_vec(None, buf).await?; 285 /// 286 /// if count == 0 { 287 /// // End of file. Return the number of bytes transferred. 288 /// return Ok(len - rem); 289 /// } 290 /// 291 /// data.truncate(count); 292 /// write_file(&to, data).await?; 293 /// 294 /// rem = rem.saturating_sub(count); 295 /// } 296 /// 297 /// Ok(len) 298 /// } 299 /// 300 /// #[cfg(any(target_os = "android", target_os = "linux"))] 301 /// # fn do_it() -> Result<(), Box<dyn Error>> { 302 /// let ex = Executor::new()?; 303 /// 304 /// let (rx, tx) = base::linux::pipe()?; 305 /// let zero = File::open("/dev/zero")?; 306 /// let zero_bytes = CHUNK_SIZE * 7; 307 /// let zero_to_pipe = transfer_data( 308 /// ex.async_from(zero)?, 309 /// ex.async_from(tx.try_clone()?)?, 310 /// zero_bytes, 311 /// ); 312 /// 313 /// let rand = File::open("/dev/urandom")?; 314 /// let rand_bytes = CHUNK_SIZE * 19; 315 /// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes); 316 /// 317 /// let null = OpenOptions::new().write(true).open("/dev/null")?; 318 /// let null_bytes = zero_bytes + rand_bytes; 319 /// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes); 320 /// 321 /// ex.run_until(complete3( 322 /// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) }, 323 /// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) }, 324 /// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) }, 325 /// ))?; 326 /// 327 /// # Ok(()) 328 /// # } 329 /// #[cfg(any(target_os = "android", target_os = "linux"))] 330 /// # do_it().unwrap(); 331 /// ``` 332 #[derive(Clone)] 333 pub enum Executor { 334 #[cfg(any(target_os = "android", target_os = "linux"))] 335 Fd(Arc<RawExecutor<linux::EpollReactor>>), 336 #[cfg(any(target_os = "android", target_os = "linux"))] 337 Uring(Arc<RawExecutor<linux::UringReactor>>), 338 #[cfg(windows)] 339 Handle(Arc<RawExecutor<windows::HandleReactor>>), 340 #[cfg(windows)] 341 Overlapped(Arc<RawExecutor<windows::HandleReactor>>), 342 #[cfg(feature = "tokio")] 343 Tokio(TokioExecutor), 344 } 345 346 impl Executor { 347 /// Create a new `Executor`. new() -> AsyncResult<Self>348 pub fn new() -> AsyncResult<Self> { 349 Executor::with_executor_kind(ExecutorKind::default()) 350 } 351 352 /// Create a new `Executor` of the given `ExecutorKind`. with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self>353 pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> { 354 Ok(match kind { 355 #[cfg(any(target_os = "android", target_os = "linux"))] 356 ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?), 357 #[cfg(any(target_os = "android", target_os = "linux"))] 358 ExecutorKind::SysVariants(ExecutorKindSys::Uring) => { 359 Executor::Uring(RawExecutor::new()?) 360 } 361 #[cfg(windows)] 362 ExecutorKind::SysVariants(ExecutorKindSys::Handle) => { 363 Executor::Handle(RawExecutor::new()?) 364 } 365 #[cfg(windows)] 366 ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => { 367 let reactor = match concurrency { 368 Some(concurrency) => windows::HandleReactor::new_with(concurrency)?, 369 None => windows::HandleReactor::new()?, 370 }; 371 Executor::Overlapped(RawExecutor::new_with(reactor)?) 372 } 373 #[cfg(feature = "tokio")] 374 ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?), 375 }) 376 } 377 378 /// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once. set_default_executor_kind( executor_kind: ExecutorKind, ) -> Result<(), SetDefaultExecutorKindError>379 pub fn set_default_executor_kind( 380 executor_kind: ExecutorKind, 381 ) -> Result<(), SetDefaultExecutorKindError> { 382 #[cfg(any(target_os = "android", target_os = "linux"))] 383 if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) { 384 linux::uring_executor::check_uring_availability() 385 .map_err(SetDefaultExecutorKindError::UringUnavailable)?; 386 if !crate::is_uring_stable() { 387 warn!( 388 "Enabling io_uring executor on the kernel version where io_uring is unstable" 389 ); 390 } 391 } 392 DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_| 393 // `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set. 394 SetDefaultExecutorKindError::SetMoreThanOnce( 395 *DEFAULT_EXECUTOR_KIND 396 .get() 397 .expect("Failed to get DEFAULT_EXECUTOR_KIND"), 398 )) 399 } 400 401 /// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned 402 /// `IoSource` to directly start async operations without needing a separate reference to the 403 /// executor. async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>404 pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 405 match self { 406 #[cfg(any(target_os = "android", target_os = "linux"))] 407 Executor::Fd(ex) => ex.async_from(f), 408 #[cfg(any(target_os = "android", target_os = "linux"))] 409 Executor::Uring(ex) => ex.async_from(f), 410 #[cfg(windows)] 411 Executor::Handle(ex) => ex.async_from(f), 412 #[cfg(windows)] 413 Executor::Overlapped(ex) => ex.async_from(f), 414 #[cfg(feature = "tokio")] 415 Executor::Tokio(ex) => ex.async_from(f), 416 } 417 } 418 419 /// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the 420 /// If the executor is not overlapped, then Handle source is returned. 421 /// returned `IoSource` to directly start async operations without needing a separate reference 422 /// to the executor. 423 #[cfg(windows)] async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>424 pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> { 425 match self { 426 Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new( 427 f, ex, false, 428 )?)), 429 _ => self.async_from(f), 430 } 431 } 432 433 /// Spawn a new future for this executor to run to completion. Callers may use the returned 434 /// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel 435 /// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the 436 /// future associated with it use `TaskHandle::detach`. 437 /// 438 /// # Examples 439 /// 440 /// ``` 441 /// # use cros_async::AsyncResult; 442 /// # fn example_spawn() -> AsyncResult<()> { 443 /// # use std::thread; 444 /// 445 /// # use cros_async::Executor; 446 /// use futures::executor::block_on; 447 /// 448 /// # let ex = Executor::new()?; 449 /// 450 /// # // Spawn a thread that runs the executor. 451 /// # let ex2 = ex.clone(); 452 /// # thread::spawn(move || ex2.run()); 453 /// 454 /// let task = ex.spawn(async { 7 + 13 }); 455 /// 456 /// let result = block_on(task); 457 /// assert_eq!(result, 20); 458 /// # Ok(()) 459 /// # } 460 /// 461 /// # example_spawn().unwrap(); 462 /// ``` spawn<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,463 pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output> 464 where 465 F: Future + Send + 'static, 466 F::Output: Send + 'static, 467 { 468 match self { 469 #[cfg(any(target_os = "android", target_os = "linux"))] 470 Executor::Fd(ex) => ex.spawn(f), 471 #[cfg(any(target_os = "android", target_os = "linux"))] 472 Executor::Uring(ex) => ex.spawn(f), 473 #[cfg(windows)] 474 Executor::Handle(ex) => ex.spawn(f), 475 #[cfg(windows)] 476 Executor::Overlapped(ex) => ex.spawn(f), 477 #[cfg(feature = "tokio")] 478 Executor::Tokio(ex) => ex.spawn(f), 479 } 480 } 481 482 /// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without 483 /// requiring `Send` on `F` or `F::Output`. This method should only be called from the same 484 /// thread where `run()` or `run_until()` is called. 485 /// 486 /// # Panics 487 /// 488 /// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was 489 /// added by calling `spawn_local` from a different thread. 490 /// 491 /// # Examples 492 /// 493 /// ``` 494 /// # use cros_async::AsyncResult; 495 /// # fn example_spawn_local() -> AsyncResult<()> { 496 /// # use cros_async::Executor; 497 /// 498 /// # let ex = Executor::new()?; 499 /// 500 /// let task = ex.spawn_local(async { 7 + 13 }); 501 /// 502 /// let result = ex.run_until(task)?; 503 /// assert_eq!(result, 20); 504 /// Ok(()) 505 /// # } 506 /// 507 /// # example_spawn_local().unwrap(); 508 /// ``` spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,509 pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output> 510 where 511 F: Future + 'static, 512 F::Output: 'static, 513 { 514 match self { 515 #[cfg(any(target_os = "android", target_os = "linux"))] 516 Executor::Fd(ex) => ex.spawn_local(f), 517 #[cfg(any(target_os = "android", target_os = "linux"))] 518 Executor::Uring(ex) => ex.spawn_local(f), 519 #[cfg(windows)] 520 Executor::Handle(ex) => ex.spawn_local(f), 521 #[cfg(windows)] 522 Executor::Overlapped(ex) => ex.spawn_local(f), 523 #[cfg(feature = "tokio")] 524 Executor::Tokio(ex) => ex.spawn_local(f), 525 } 526 } 527 528 /// Run the provided closure on a dedicated thread where blocking is allowed. 529 /// 530 /// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping 531 /// the returned `TaskHandle` may not cancel the operation if it was already started on a 532 /// worker thread. 533 /// 534 /// # Panics 535 /// 536 /// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not 537 /// already completed. 538 /// 539 /// # Examples 540 /// 541 /// ```edition2018 542 /// # use cros_async::Executor; 543 /// 544 /// # async fn do_it(ex: &Executor) { 545 /// let res = ex.spawn_blocking(move || { 546 /// // Do some CPU-intensive or blocking work here. 547 /// 548 /// 42 549 /// }).await; 550 /// 551 /// assert_eq!(res, 42); 552 /// # } 553 /// 554 /// # let ex = Executor::new().unwrap(); 555 /// # ex.run_until(do_it(&ex)).unwrap(); 556 /// ``` spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,557 pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R> 558 where 559 F: FnOnce() -> R + Send + 'static, 560 R: Send + 'static, 561 { 562 match self { 563 #[cfg(any(target_os = "android", target_os = "linux"))] 564 Executor::Fd(ex) => ex.spawn_blocking(f), 565 #[cfg(any(target_os = "android", target_os = "linux"))] 566 Executor::Uring(ex) => ex.spawn_blocking(f), 567 #[cfg(windows)] 568 Executor::Handle(ex) => ex.spawn_blocking(f), 569 #[cfg(windows)] 570 Executor::Overlapped(ex) => ex.spawn_blocking(f), 571 #[cfg(feature = "tokio")] 572 Executor::Tokio(ex) => ex.spawn_blocking(f), 573 } 574 } 575 576 /// Run the executor indefinitely, driving all spawned futures to completion. This method will 577 /// block the current thread and only return in the case of an error. 578 /// 579 /// # Panics 580 /// 581 /// Once this method has been called on a thread, it may only be called on that thread from that 582 /// point on. Attempting to call it from another thread will panic. 583 /// 584 /// # Examples 585 /// 586 /// ``` 587 /// # use cros_async::AsyncResult; 588 /// # fn example_run() -> AsyncResult<()> { 589 /// use std::thread; 590 /// 591 /// use cros_async::Executor; 592 /// use futures::executor::block_on; 593 /// 594 /// let ex = Executor::new()?; 595 /// 596 /// // Spawn a thread that runs the executor. 597 /// let ex2 = ex.clone(); 598 /// thread::spawn(move || ex2.run()); 599 /// 600 /// let task = ex.spawn(async { 7 + 13 }); 601 /// 602 /// let result = block_on(task); 603 /// assert_eq!(result, 20); 604 /// # Ok(()) 605 /// # } 606 /// 607 /// # example_run().unwrap(); 608 /// ``` run(&self) -> AsyncResult<()>609 pub fn run(&self) -> AsyncResult<()> { 610 self.run_until(std::future::pending()) 611 } 612 613 /// Drive all futures spawned in this executor until `f` completes. This method will block the 614 /// current thread only until `f` is complete and there may still be unfinished futures in the 615 /// executor. 616 /// 617 /// # Panics 618 /// 619 /// Once this method has been called on a thread, from then onwards it may only be called on 620 /// that thread. Attempting to call it from another thread will panic. 621 /// 622 /// # Examples 623 /// 624 /// ``` 625 /// # use cros_async::AsyncResult; 626 /// # fn example_run_until() -> AsyncResult<()> { 627 /// use cros_async::Executor; 628 /// 629 /// let ex = Executor::new()?; 630 /// 631 /// let task = ex.spawn_local(async { 7 + 13 }); 632 /// 633 /// let result = ex.run_until(task)?; 634 /// assert_eq!(result, 20); 635 /// # Ok(()) 636 /// # } 637 /// 638 /// # example_run_until().unwrap(); 639 /// ``` run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>640 pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> { 641 match self { 642 #[cfg(any(target_os = "android", target_os = "linux"))] 643 Executor::Fd(ex) => ex.run_until(f), 644 #[cfg(any(target_os = "android", target_os = "linux"))] 645 Executor::Uring(ex) => ex.run_until(f), 646 #[cfg(windows)] 647 Executor::Handle(ex) => ex.run_until(f), 648 #[cfg(windows)] 649 Executor::Overlapped(ex) => ex.run_until(f), 650 #[cfg(feature = "tokio")] 651 Executor::Tokio(ex) => ex.run_until(f), 652 } 653 } 654 } 655 656 #[cfg(any(target_os = "android", target_os = "linux"))] 657 impl AsRawDescriptors for Executor { as_raw_descriptors(&self) -> Vec<RawDescriptor>658 fn as_raw_descriptors(&self) -> Vec<RawDescriptor> { 659 match self { 660 Executor::Fd(ex) => ex.as_raw_descriptors(), 661 Executor::Uring(ex) => ex.as_raw_descriptors(), 662 #[cfg(feature = "tokio")] 663 Executor::Tokio(ex) => ex.as_raw_descriptors(), 664 } 665 } 666 } 667