1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! An Executor and future combinators based on operations that block on file descriptors.
6 //!
7 //! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8 //! and utility functions to combine and manage futures. All futures will run until they block on a
9 //! file descriptor becoming readable or writable. Facilities are provided to register future
10 //! wakers based on such events.
11 //!
12 //! # Running top-level futures.
13 //!
14 //! Use helper functions based the desired behavior of your application.
15 //!
16 //! ## Running one future.
17 //!
18 //! If there is only one top-level future to run, use the [`run_one`](fn.run_one.html) function.
19 //!
20 //! ## Completing one of several futures.
21 //!
22 //! If there are several top level tasks that should run until any one completes, use the "select"
23 //! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
24 //! function will return when the first future completes. The uncompleted futures will also be
25 //! returned so they can be run further or otherwise cleaned up. These functions are inspired by
26 //! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
27 //! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
28 //! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
29 //!
30 //! ## Completing all of several futures.
31 //!
32 //! If there are several top level tasks that all need to be completed, use the "complete" family
33 //! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
34 //! function will return only once all the futures passed to it have completed. These functions are
35 //! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
36 //! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
37 //! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
38 //! [`complete5`](fn.complete5.html).
39 //!
40 //! # Implementing new FD-based futures.
41 //!
42 //! For URing implementations should provide an implementation of the `IoSource` trait.
43 //! For the FD executor, new futures can use the existing ability to poll a source to build async
44 //! functionality on top of.
45 //!
46 //! # Implementations
47 //!
48 //! Currently there are two paths for using the asynchronous IO. One uses a PollContext and drivers
49 //! futures based on the FDs signaling they are ready for the opteration. This method will exist so
50 //! long as kernels < 5.4 are supported.
51 //! The other method submits operations to io_uring and is signaled when they complete. This is more
52 //! efficient, but only supported on kernel 5.4+.
53 //! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen
54 //! automatically.
55 //!
56 //! # Examples
57 //!
58 //! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if
59 //! all systems have support for io_uring.
60
61 mod async_types;
62 pub mod audio_streams_async;
63 mod blocking;
64 mod complete;
65 mod event;
66 mod executor;
67 mod fd_executor;
68 mod io_ext;
69 pub mod mem;
70 mod poll_source;
71 mod queue;
72 mod select;
73 pub mod sync;
74 mod timer;
75 mod uring_executor;
76 mod uring_source;
77 mod waker;
78
79 pub use async_types::*;
80 pub use base;
81 pub use blocking::{block_on, BlockingPool};
82 pub use event::EventAsync;
83 pub use executor::Executor;
84 pub use fd_executor::FdExecutor;
85 pub use io_ext::{
86 AsyncWrapper, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult,
87 WriteAsync,
88 };
89 pub use mem::{BackingMemory, MemRegion};
90 pub use poll_source::PollSource;
91 pub use select::SelectResult;
92 pub use timer::TimerAsync;
93 pub use uring_executor::URingExecutor;
94 pub use uring_source::UringSource;
95
96 use std::{
97 future::Future,
98 io,
99 marker::PhantomData,
100 pin::Pin,
101 task::{Context, Poll},
102 };
103
104 use remain::sorted;
105 use thiserror::Error as ThisError;
106
107 #[sorted]
108 #[derive(ThisError, Debug)]
109 pub enum Error {
110 /// Error from the FD executor.
111 #[error("Failure in the FD executor: {0}")]
112 FdExecutor(fd_executor::Error),
113 /// Error from TimerFd.
114 #[error("Failure in TimerAsync: {0}")]
115 TimerAsync(AsyncError),
116 /// Error from TimerFd.
117 #[error("Failure in TimerFd: {0}")]
118 TimerFd(base::Error),
119 /// Error from the uring executor.
120 #[error("Failure in the uring executor: {0}")]
121 URingExecutor(uring_executor::Error),
122 }
123 pub type Result<T> = std::result::Result<T, Error>;
124
125 impl From<Error> for io::Error {
from(e: Error) -> Self126 fn from(e: Error) -> Self {
127 use Error::*;
128 match e {
129 FdExecutor(e) => e.into(),
130 URingExecutor(e) => e.into(),
131 TimerFd(e) => e.into(),
132 TimerAsync(e) => e.into(),
133 }
134 }
135 }
136
137 // A Future that never completes.
138 pub struct Empty<T> {
139 phantom: PhantomData<T>,
140 }
141
142 impl<T> Future for Empty<T> {
143 type Output = T;
144
poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T>145 fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T> {
146 Poll::Pending
147 }
148 }
149
empty<T>() -> Empty<T>150 pub fn empty<T>() -> Empty<T> {
151 Empty {
152 phantom: PhantomData,
153 }
154 }
155
156 /// Creates an Executor that runs one future to completion.
157 ///
158 /// # Example
159 ///
160 /// ```
161 /// use cros_async::run_one;
162 ///
163 /// let fut = async { 55 };
164 /// assert_eq!(55, run_one(fut).unwrap());
165 /// ```
run_one<F: Future>(fut: F) -> Result<F::Output>166 pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
167 if uring_executor::use_uring() {
168 run_one_uring(fut)
169 } else {
170 run_one_poll(fut)
171 }
172 }
173
174 /// Creates a URingExecutor that runs one future to completion.
175 ///
176 /// # Example
177 ///
178 /// ```
179 /// use cros_async::run_one_uring;
180 ///
181 /// let fut = async { 55 };
182 /// assert_eq!(55, run_one_uring(fut).unwrap());
183 /// ```
run_one_uring<F: Future>(fut: F) -> Result<F::Output>184 pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
185 URingExecutor::new()
186 .and_then(|ex| ex.run_until(fut))
187 .map_err(Error::URingExecutor)
188 }
189
190 /// Creates a FdExecutor that runs one future to completion.
191 ///
192 /// # Example
193 ///
194 /// ```
195 /// use cros_async::run_one_poll;
196 ///
197 /// let fut = async { 55 };
198 /// assert_eq!(55, run_one_poll(fut).unwrap());
199 /// ```
run_one_poll<F: Future>(fut: F) -> Result<F::Output>200 pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
201 FdExecutor::new()
202 .and_then(|ex| ex.run_until(fut))
203 .map_err(Error::FdExecutor)
204 }
205
206 // Select helpers to run until any future completes.
207
208 /// Creates a combinator that runs the two given futures until one completes, returning a tuple
209 /// containing the result of the finished future and the still pending future.
210 ///
211 /// # Example
212 ///
213 /// ```
214 /// use cros_async::{SelectResult, select2, run_one};
215 /// use futures::future::pending;
216 /// use futures::pin_mut;
217 ///
218 /// let first = async {5};
219 /// let second = async {let () = pending().await;};
220 /// pin_mut!(first);
221 /// pin_mut!(second);
222 /// match run_one(select2(first, second)) {
223 /// Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (),
224 /// _ => panic!("Select didn't return the first future"),
225 /// };
226 /// ```
select2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> (SelectResult<F1>, SelectResult<F2>)227 pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
228 f1: F1,
229 f2: F2,
230 ) -> (SelectResult<F1>, SelectResult<F2>) {
231 select::Select2::new(f1, f2).await
232 }
233
234 /// Creates a combinator that runs the three given futures until one or more completes, returning a
235 /// tuple containing the result of the finished future(s) and the still pending future(s).
236 ///
237 /// # Example
238 ///
239 /// ```
240 /// use cros_async::{SelectResult, select3, run_one};
241 /// use futures::future::pending;
242 /// use futures::pin_mut;
243 ///
244 /// let first = async {4};
245 /// let second = async {let () = pending().await;};
246 /// let third = async {5};
247 /// pin_mut!(first);
248 /// pin_mut!(second);
249 /// pin_mut!(third);
250 /// match run_one(select3(first, second, third)) {
251 /// Ok((SelectResult::Finished(4),
252 /// SelectResult::Pending(_second),
253 /// SelectResult::Finished(5))) => (),
254 /// _ => panic!("Select didn't return the futures"),
255 /// };
256 /// ```
select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f1: F1, f2: F2, f3: F3, ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)257 pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
258 f1: F1,
259 f2: F2,
260 f3: F3,
261 ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
262 select::Select3::new(f1, f2, f3).await
263 }
264
265 /// Creates a combinator that runs the four given futures until one or more completes, returning a
266 /// tuple containing the result of the finished future(s) and the still pending future(s).
267 ///
268 /// # Example
269 ///
270 /// ```
271 /// use cros_async::{SelectResult, select4, run_one};
272 /// use futures::future::pending;
273 /// use futures::pin_mut;
274 ///
275 /// let first = async {4};
276 /// let second = async {let () = pending().await;};
277 /// let third = async {5};
278 /// let fourth = async {let () = pending().await;};
279 /// pin_mut!(first);
280 /// pin_mut!(second);
281 /// pin_mut!(third);
282 /// pin_mut!(fourth);
283 /// match run_one(select4(first, second, third, fourth)) {
284 /// Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
285 /// SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (),
286 /// _ => panic!("Select didn't return the futures"),
287 /// };
288 /// ```
select4< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, )289 pub async fn select4<
290 F1: Future + Unpin,
291 F2: Future + Unpin,
292 F3: Future + Unpin,
293 F4: Future + Unpin,
294 >(
295 f1: F1,
296 f2: F2,
297 f3: F3,
298 f4: F4,
299 ) -> (
300 SelectResult<F1>,
301 SelectResult<F2>,
302 SelectResult<F3>,
303 SelectResult<F4>,
304 ) {
305 select::Select4::new(f1, f2, f3, f4).await
306 }
307
308 /// Creates a combinator that runs the five given futures until one or more completes, returning a
309 /// tuple containing the result of the finished future(s) and the still pending future(s).
310 ///
311 /// # Example
312 ///
313 /// ```
314 /// use cros_async::{SelectResult, select5, run_one};
315 /// use futures::future::pending;
316 /// use futures::pin_mut;
317 ///
318 /// let first = async {4};
319 /// let second = async {let () = pending().await;};
320 /// let third = async {5};
321 /// let fourth = async {let () = pending().await;};
322 /// let fifth = async {6};
323 /// pin_mut!(first);
324 /// pin_mut!(second);
325 /// pin_mut!(third);
326 /// pin_mut!(fourth);
327 /// pin_mut!(fifth);
328 /// match run_one(select5(first, second, third, fourth, fifth)) {
329 /// Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
330 /// SelectResult::Finished(5), SelectResult::Pending(_fourth),
331 /// SelectResult::Finished(6))) => (),
332 /// _ => panic!("Select didn't return the futures"),
333 /// };
334 /// ```
select5< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, )335 pub async fn select5<
336 F1: Future + Unpin,
337 F2: Future + Unpin,
338 F3: Future + Unpin,
339 F4: Future + Unpin,
340 F5: Future + Unpin,
341 >(
342 f1: F1,
343 f2: F2,
344 f3: F3,
345 f4: F4,
346 f5: F5,
347 ) -> (
348 SelectResult<F1>,
349 SelectResult<F2>,
350 SelectResult<F3>,
351 SelectResult<F4>,
352 SelectResult<F5>,
353 ) {
354 select::Select5::new(f1, f2, f3, f4, f5).await
355 }
356
357 /// Creates a combinator that runs the six given futures until one or more completes, returning a
358 /// tuple containing the result of the finished future(s) and the still pending future(s).
359 ///
360 /// # Example
361 ///
362 /// ```
363 /// use cros_async::{SelectResult, select6, run_one};
364 /// use futures::future::pending;
365 /// use futures::pin_mut;
366 ///
367 /// let first = async {1};
368 /// let second = async {let () = pending().await;};
369 /// let third = async {3};
370 /// let fourth = async {let () = pending().await;};
371 /// let fifth = async {5};
372 /// let sixth = async {6};
373 /// pin_mut!(first);
374 /// pin_mut!(second);
375 /// pin_mut!(third);
376 /// pin_mut!(fourth);
377 /// pin_mut!(fifth);
378 /// pin_mut!(sixth);
379 /// match run_one(select6(first, second, third, fourth, fifth, sixth)) {
380 /// Ok((SelectResult::Finished(1), SelectResult::Pending(_second),
381 /// SelectResult::Finished(3), SelectResult::Pending(_fourth),
382 /// SelectResult::Finished(5), SelectResult::Finished(6))) => (),
383 /// _ => panic!("Select didn't return the futures"),
384 /// };
385 /// ```
select6< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, F6: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, f6: F6, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, SelectResult<F6>, )386 pub async fn select6<
387 F1: Future + Unpin,
388 F2: Future + Unpin,
389 F3: Future + Unpin,
390 F4: Future + Unpin,
391 F5: Future + Unpin,
392 F6: Future + Unpin,
393 >(
394 f1: F1,
395 f2: F2,
396 f3: F3,
397 f4: F4,
398 f5: F5,
399 f6: F6,
400 ) -> (
401 SelectResult<F1>,
402 SelectResult<F2>,
403 SelectResult<F3>,
404 SelectResult<F4>,
405 SelectResult<F5>,
406 SelectResult<F6>,
407 ) {
408 select::Select6::new(f1, f2, f3, f4, f5, f6).await
409 }
410
select7< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, F6: Future + Unpin, F7: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, f6: F6, f7: F7, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, SelectResult<F6>, SelectResult<F7>, )411 pub async fn select7<
412 F1: Future + Unpin,
413 F2: Future + Unpin,
414 F3: Future + Unpin,
415 F4: Future + Unpin,
416 F5: Future + Unpin,
417 F6: Future + Unpin,
418 F7: Future + Unpin,
419 >(
420 f1: F1,
421 f2: F2,
422 f3: F3,
423 f4: F4,
424 f5: F5,
425 f6: F6,
426 f7: F7,
427 ) -> (
428 SelectResult<F1>,
429 SelectResult<F2>,
430 SelectResult<F3>,
431 SelectResult<F4>,
432 SelectResult<F5>,
433 SelectResult<F6>,
434 SelectResult<F7>,
435 ) {
436 select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
437 }
438 // Combination helpers to run until all futures are complete.
439
440 /// Creates a combinator that runs the two given futures to completion, returning a tuple of the
441 /// outputs each yields.
442 ///
443 /// # Example
444 ///
445 /// ```
446 /// use cros_async::{complete2, run_one};
447 ///
448 /// let first = async {5};
449 /// let second = async {6};
450 /// assert_eq!(run_one(complete2(first, second)).unwrap_or((0,0)), (5,6));
451 /// ```
complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output) where F1: Future, F2: Future,452 pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
453 where
454 F1: Future,
455 F2: Future,
456 {
457 complete::Complete2::new(f1, f2).await
458 }
459
460 /// Creates a combinator that runs the three given futures to completion, returning a tuple of the
461 /// outputs each yields.
462 ///
463 /// # Example
464 ///
465 /// ```
466 /// use cros_async::{complete3, run_one};
467 ///
468 /// let first = async {5};
469 /// let second = async {6};
470 /// let third = async {7};
471 /// assert_eq!(run_one(complete3(first, second, third)).unwrap_or((0,0,0)), (5,6,7));
472 /// ```
complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output) where F1: Future, F2: Future, F3: Future,473 pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
474 where
475 F1: Future,
476 F2: Future,
477 F3: Future,
478 {
479 complete::Complete3::new(f1, f2, f3).await
480 }
481
482 /// Creates a combinator that runs the four given futures to completion, returning a tuple of the
483 /// outputs each yields.
484 ///
485 /// # Example
486 ///
487 /// ```
488 /// use cros_async::{complete4, run_one};
489 ///
490 /// let first = async {5};
491 /// let second = async {6};
492 /// let third = async {7};
493 /// let fourth = async {8};
494 /// assert_eq!(run_one(complete4(first, second, third, fourth)).unwrap_or((0,0,0,0)), (5,6,7,8));
495 /// ```
complete4<F1, F2, F3, F4>( f1: F1, f2: F2, f3: F3, f4: F4, ) -> (F1::Output, F2::Output, F3::Output, F4::Output) where F1: Future, F2: Future, F3: Future, F4: Future,496 pub async fn complete4<F1, F2, F3, F4>(
497 f1: F1,
498 f2: F2,
499 f3: F3,
500 f4: F4,
501 ) -> (F1::Output, F2::Output, F3::Output, F4::Output)
502 where
503 F1: Future,
504 F2: Future,
505 F3: Future,
506 F4: Future,
507 {
508 complete::Complete4::new(f1, f2, f3, f4).await
509 }
510
511 /// Creates a combinator that runs the five given futures to completion, returning a tuple of the
512 /// outputs each yields.
513 ///
514 /// # Example
515 ///
516 /// ```
517 /// use cros_async::{complete5, run_one};
518 ///
519 /// let first = async {5};
520 /// let second = async {6};
521 /// let third = async {7};
522 /// let fourth = async {8};
523 /// let fifth = async {9};
524 /// assert_eq!(run_one(complete5(first, second, third, fourth, fifth)).unwrap_or((0,0,0,0,0)),
525 /// (5,6,7,8,9));
526 /// ```
complete5<F1, F2, F3, F4, F5>( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output) where F1: Future, F2: Future, F3: Future, F4: Future, F5: Future,527 pub async fn complete5<F1, F2, F3, F4, F5>(
528 f1: F1,
529 f2: F2,
530 f3: F3,
531 f4: F4,
532 f5: F5,
533 ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
534 where
535 F1: Future,
536 F2: Future,
537 F3: Future,
538 F4: Future,
539 F5: Future,
540 {
541 complete::Complete5::new(f1, f2, f3, f4, f5).await
542 }
543