• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! An Executor and future combinators based on operations that block on file descriptors.
6 //!
7 //! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8 //! and utility functions to combine and manage futures. All futures will run until they block on a
9 //! file descriptor becoming readable or writable. Facilities are provided to register future
10 //! wakers based on such events.
11 //!
12 //! # Running top-level futures.
13 //!
14 //! Use helper functions based the desired behavior of your application.
15 //!
16 //! ## Running one future.
17 //!
18 //! If there is only one top-level future to run, use the [`run_one`](fn.run_one.html) function.
19 //!
20 //! ## Completing one of several futures.
21 //!
22 //! If there are several top level tasks that should run until any one completes, use the "select"
23 //! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
24 //! function will return when the first future completes. The uncompleted futures will also be
25 //! returned so they can be run further or otherwise cleaned up. These functions are inspired by
26 //! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
27 //! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
28 //! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
29 //!
30 //! ## Completing all of several futures.
31 //!
32 //! If there are several top level tasks that all need to be completed, use the "complete" family
33 //! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
34 //! function will return only once all the futures passed to it have completed. These functions are
35 //! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
36 //! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
37 //! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
38 //! [`complete5`](fn.complete5.html).
39 //!
40 //! # Implementing new FD-based futures.
41 //!
42 //! For URing implementations should provide an implementation of the `IoSource` trait.
43 //! For the FD executor, new futures can use the existing ability to poll a source to build async
44 //! functionality on top of.
45 //!
46 //! # Implementations
47 //!
48 //! Currently there are two paths for using the asynchronous IO. One uses a PollContext and drivers
49 //! futures based on the FDs signaling they are ready for the opteration. This method will exist so
50 //! long as kernels < 5.4 are supported.
51 //! The other method submits operations to io_uring and is signaled when they complete. This is more
52 //! efficient, but only supported on kernel 5.4+.
53 //! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen
54 //! automatically.
55 //!
56 //! # Examples
57 //!
58 //! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if
59 //! all systems have support for io_uring.
60 
61 mod async_types;
62 pub mod audio_streams_async;
63 mod blocking;
64 mod complete;
65 mod event;
66 mod executor;
67 mod fd_executor;
68 mod io_ext;
69 pub mod mem;
70 mod poll_source;
71 mod queue;
72 mod select;
73 pub mod sync;
74 mod timer;
75 mod uring_executor;
76 mod uring_source;
77 mod waker;
78 
79 pub use async_types::*;
80 pub use base;
81 pub use blocking::{block_on, BlockingPool};
82 pub use event::EventAsync;
83 pub use executor::Executor;
84 pub use fd_executor::FdExecutor;
85 pub use io_ext::{
86     AsyncWrapper, Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult,
87     WriteAsync,
88 };
89 pub use mem::{BackingMemory, MemRegion};
90 pub use poll_source::PollSource;
91 pub use select::SelectResult;
92 pub use timer::TimerAsync;
93 pub use uring_executor::URingExecutor;
94 pub use uring_source::UringSource;
95 
96 use std::{
97     future::Future,
98     io,
99     marker::PhantomData,
100     pin::Pin,
101     task::{Context, Poll},
102 };
103 
104 use remain::sorted;
105 use thiserror::Error as ThisError;
106 
107 #[sorted]
108 #[derive(ThisError, Debug)]
109 pub enum Error {
110     /// Error from the FD executor.
111     #[error("Failure in the FD executor: {0}")]
112     FdExecutor(fd_executor::Error),
113     /// Error from TimerFd.
114     #[error("Failure in TimerAsync: {0}")]
115     TimerAsync(AsyncError),
116     /// Error from TimerFd.
117     #[error("Failure in TimerFd: {0}")]
118     TimerFd(base::Error),
119     /// Error from the uring executor.
120     #[error("Failure in the uring executor: {0}")]
121     URingExecutor(uring_executor::Error),
122 }
123 pub type Result<T> = std::result::Result<T, Error>;
124 
125 impl From<Error> for io::Error {
from(e: Error) -> Self126     fn from(e: Error) -> Self {
127         use Error::*;
128         match e {
129             FdExecutor(e) => e.into(),
130             URingExecutor(e) => e.into(),
131             TimerFd(e) => e.into(),
132             TimerAsync(e) => e.into(),
133         }
134     }
135 }
136 
137 // A Future that never completes.
138 pub struct Empty<T> {
139     phantom: PhantomData<T>,
140 }
141 
142 impl<T> Future for Empty<T> {
143     type Output = T;
144 
poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T>145     fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T> {
146         Poll::Pending
147     }
148 }
149 
empty<T>() -> Empty<T>150 pub fn empty<T>() -> Empty<T> {
151     Empty {
152         phantom: PhantomData,
153     }
154 }
155 
156 /// Creates an Executor that runs one future to completion.
157 ///
158 ///  # Example
159 ///
160 ///    ```
161 ///    use cros_async::run_one;
162 ///
163 ///    let fut = async { 55 };
164 ///    assert_eq!(55, run_one(fut).unwrap());
165 ///    ```
run_one<F: Future>(fut: F) -> Result<F::Output>166 pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
167     if uring_executor::use_uring() {
168         run_one_uring(fut)
169     } else {
170         run_one_poll(fut)
171     }
172 }
173 
174 /// Creates a URingExecutor that runs one future to completion.
175 ///
176 ///  # Example
177 ///
178 ///    ```
179 ///    use cros_async::run_one_uring;
180 ///
181 ///    let fut = async { 55 };
182 ///    assert_eq!(55, run_one_uring(fut).unwrap());
183 ///    ```
run_one_uring<F: Future>(fut: F) -> Result<F::Output>184 pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
185     URingExecutor::new()
186         .and_then(|ex| ex.run_until(fut))
187         .map_err(Error::URingExecutor)
188 }
189 
190 /// Creates a FdExecutor that runs one future to completion.
191 ///
192 ///  # Example
193 ///
194 ///    ```
195 ///    use cros_async::run_one_poll;
196 ///
197 ///    let fut = async { 55 };
198 ///    assert_eq!(55, run_one_poll(fut).unwrap());
199 ///    ```
run_one_poll<F: Future>(fut: F) -> Result<F::Output>200 pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
201     FdExecutor::new()
202         .and_then(|ex| ex.run_until(fut))
203         .map_err(Error::FdExecutor)
204 }
205 
206 // Select helpers to run until any future completes.
207 
208 /// Creates a combinator that runs the two given futures until one completes, returning a tuple
209 /// containing the result of the finished future and the still pending future.
210 ///
211 ///  # Example
212 ///
213 ///    ```
214 ///    use cros_async::{SelectResult, select2, run_one};
215 ///    use futures::future::pending;
216 ///    use futures::pin_mut;
217 ///
218 ///    let first = async {5};
219 ///    let second = async {let () = pending().await;};
220 ///    pin_mut!(first);
221 ///    pin_mut!(second);
222 ///    match run_one(select2(first, second)) {
223 ///        Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (),
224 ///        _ => panic!("Select didn't return the first future"),
225 ///    };
226 ///    ```
select2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> (SelectResult<F1>, SelectResult<F2>)227 pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
228     f1: F1,
229     f2: F2,
230 ) -> (SelectResult<F1>, SelectResult<F2>) {
231     select::Select2::new(f1, f2).await
232 }
233 
234 /// Creates a combinator that runs the three given futures until one or more completes, returning a
235 /// tuple containing the result of the finished future(s) and the still pending future(s).
236 ///
237 ///  # Example
238 ///
239 ///    ```
240 ///    use cros_async::{SelectResult, select3, run_one};
241 ///    use futures::future::pending;
242 ///    use futures::pin_mut;
243 ///
244 ///    let first = async {4};
245 ///    let second = async {let () = pending().await;};
246 ///    let third = async {5};
247 ///    pin_mut!(first);
248 ///    pin_mut!(second);
249 ///    pin_mut!(third);
250 ///    match run_one(select3(first, second, third)) {
251 ///        Ok((SelectResult::Finished(4),
252 ///            SelectResult::Pending(_second),
253 ///            SelectResult::Finished(5))) => (),
254 ///        _ => panic!("Select didn't return the futures"),
255 ///    };
256 ///    ```
select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f1: F1, f2: F2, f3: F3, ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)257 pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
258     f1: F1,
259     f2: F2,
260     f3: F3,
261 ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
262     select::Select3::new(f1, f2, f3).await
263 }
264 
265 /// Creates a combinator that runs the four given futures until one or more completes, returning a
266 /// tuple containing the result of the finished future(s) and the still pending future(s).
267 ///
268 ///  # Example
269 ///
270 ///    ```
271 ///    use cros_async::{SelectResult, select4, run_one};
272 ///    use futures::future::pending;
273 ///    use futures::pin_mut;
274 ///
275 ///    let first = async {4};
276 ///    let second = async {let () = pending().await;};
277 ///    let third = async {5};
278 ///    let fourth = async {let () = pending().await;};
279 ///    pin_mut!(first);
280 ///    pin_mut!(second);
281 ///    pin_mut!(third);
282 ///    pin_mut!(fourth);
283 ///    match run_one(select4(first, second, third, fourth)) {
284 ///        Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
285 ///            SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (),
286 ///        _ => panic!("Select didn't return the futures"),
287 ///    };
288 ///    ```
select4< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, )289 pub async fn select4<
290     F1: Future + Unpin,
291     F2: Future + Unpin,
292     F3: Future + Unpin,
293     F4: Future + Unpin,
294 >(
295     f1: F1,
296     f2: F2,
297     f3: F3,
298     f4: F4,
299 ) -> (
300     SelectResult<F1>,
301     SelectResult<F2>,
302     SelectResult<F3>,
303     SelectResult<F4>,
304 ) {
305     select::Select4::new(f1, f2, f3, f4).await
306 }
307 
308 /// Creates a combinator that runs the five given futures until one or more completes, returning a
309 /// tuple containing the result of the finished future(s) and the still pending future(s).
310 ///
311 ///  # Example
312 ///
313 ///    ```
314 ///    use cros_async::{SelectResult, select5, run_one};
315 ///    use futures::future::pending;
316 ///    use futures::pin_mut;
317 ///
318 ///    let first = async {4};
319 ///    let second = async {let () = pending().await;};
320 ///    let third = async {5};
321 ///    let fourth = async {let () = pending().await;};
322 ///    let fifth = async {6};
323 ///    pin_mut!(first);
324 ///    pin_mut!(second);
325 ///    pin_mut!(third);
326 ///    pin_mut!(fourth);
327 ///    pin_mut!(fifth);
328 ///    match run_one(select5(first, second, third, fourth, fifth)) {
329 ///        Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
330 ///            SelectResult::Finished(5), SelectResult::Pending(_fourth),
331 ///            SelectResult::Finished(6))) => (),
332 ///        _ => panic!("Select didn't return the futures"),
333 ///    };
334 ///    ```
select5< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, )335 pub async fn select5<
336     F1: Future + Unpin,
337     F2: Future + Unpin,
338     F3: Future + Unpin,
339     F4: Future + Unpin,
340     F5: Future + Unpin,
341 >(
342     f1: F1,
343     f2: F2,
344     f3: F3,
345     f4: F4,
346     f5: F5,
347 ) -> (
348     SelectResult<F1>,
349     SelectResult<F2>,
350     SelectResult<F3>,
351     SelectResult<F4>,
352     SelectResult<F5>,
353 ) {
354     select::Select5::new(f1, f2, f3, f4, f5).await
355 }
356 
357 /// Creates a combinator that runs the six given futures until one or more completes, returning a
358 /// tuple containing the result of the finished future(s) and the still pending future(s).
359 ///
360 ///  # Example
361 ///
362 ///    ```
363 ///    use cros_async::{SelectResult, select6, run_one};
364 ///    use futures::future::pending;
365 ///    use futures::pin_mut;
366 ///
367 ///    let first = async {1};
368 ///    let second = async {let () = pending().await;};
369 ///    let third = async {3};
370 ///    let fourth = async {let () = pending().await;};
371 ///    let fifth = async {5};
372 ///    let sixth = async {6};
373 ///    pin_mut!(first);
374 ///    pin_mut!(second);
375 ///    pin_mut!(third);
376 ///    pin_mut!(fourth);
377 ///    pin_mut!(fifth);
378 ///    pin_mut!(sixth);
379 ///    match run_one(select6(first, second, third, fourth, fifth, sixth)) {
380 ///        Ok((SelectResult::Finished(1), SelectResult::Pending(_second),
381 ///            SelectResult::Finished(3), SelectResult::Pending(_fourth),
382 ///            SelectResult::Finished(5), SelectResult::Finished(6))) => (),
383 ///        _ => panic!("Select didn't return the futures"),
384 ///    };
385 ///    ```
select6< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, F6: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, f6: F6, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, SelectResult<F6>, )386 pub async fn select6<
387     F1: Future + Unpin,
388     F2: Future + Unpin,
389     F3: Future + Unpin,
390     F4: Future + Unpin,
391     F5: Future + Unpin,
392     F6: Future + Unpin,
393 >(
394     f1: F1,
395     f2: F2,
396     f3: F3,
397     f4: F4,
398     f5: F5,
399     f6: F6,
400 ) -> (
401     SelectResult<F1>,
402     SelectResult<F2>,
403     SelectResult<F3>,
404     SelectResult<F4>,
405     SelectResult<F5>,
406     SelectResult<F6>,
407 ) {
408     select::Select6::new(f1, f2, f3, f4, f5, f6).await
409 }
410 
select7< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, F6: Future + Unpin, F7: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, f6: F6, f7: F7, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, SelectResult<F6>, SelectResult<F7>, )411 pub async fn select7<
412     F1: Future + Unpin,
413     F2: Future + Unpin,
414     F3: Future + Unpin,
415     F4: Future + Unpin,
416     F5: Future + Unpin,
417     F6: Future + Unpin,
418     F7: Future + Unpin,
419 >(
420     f1: F1,
421     f2: F2,
422     f3: F3,
423     f4: F4,
424     f5: F5,
425     f6: F6,
426     f7: F7,
427 ) -> (
428     SelectResult<F1>,
429     SelectResult<F2>,
430     SelectResult<F3>,
431     SelectResult<F4>,
432     SelectResult<F5>,
433     SelectResult<F6>,
434     SelectResult<F7>,
435 ) {
436     select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
437 }
438 // Combination helpers to run until all futures are complete.
439 
440 /// Creates a combinator that runs the two given futures to completion, returning a tuple of the
441 /// outputs each yields.
442 ///
443 ///  # Example
444 ///
445 ///    ```
446 ///    use cros_async::{complete2, run_one};
447 ///
448 ///    let first = async {5};
449 ///    let second = async {6};
450 ///    assert_eq!(run_one(complete2(first, second)).unwrap_or((0,0)), (5,6));
451 ///    ```
complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output) where F1: Future, F2: Future,452 pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
453 where
454     F1: Future,
455     F2: Future,
456 {
457     complete::Complete2::new(f1, f2).await
458 }
459 
460 /// Creates a combinator that runs the three given futures to completion, returning a tuple of the
461 /// outputs each yields.
462 ///
463 ///  # Example
464 ///
465 ///    ```
466 ///    use cros_async::{complete3, run_one};
467 ///
468 ///    let first = async {5};
469 ///    let second = async {6};
470 ///    let third = async {7};
471 ///    assert_eq!(run_one(complete3(first, second, third)).unwrap_or((0,0,0)), (5,6,7));
472 ///    ```
complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output) where F1: Future, F2: Future, F3: Future,473 pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
474 where
475     F1: Future,
476     F2: Future,
477     F3: Future,
478 {
479     complete::Complete3::new(f1, f2, f3).await
480 }
481 
482 /// Creates a combinator that runs the four given futures to completion, returning a tuple of the
483 /// outputs each yields.
484 ///
485 ///  # Example
486 ///
487 ///    ```
488 ///    use cros_async::{complete4, run_one};
489 ///
490 ///    let first = async {5};
491 ///    let second = async {6};
492 ///    let third = async {7};
493 ///    let fourth = async {8};
494 ///    assert_eq!(run_one(complete4(first, second, third, fourth)).unwrap_or((0,0,0,0)), (5,6,7,8));
495 ///    ```
complete4<F1, F2, F3, F4>( f1: F1, f2: F2, f3: F3, f4: F4, ) -> (F1::Output, F2::Output, F3::Output, F4::Output) where F1: Future, F2: Future, F3: Future, F4: Future,496 pub async fn complete4<F1, F2, F3, F4>(
497     f1: F1,
498     f2: F2,
499     f3: F3,
500     f4: F4,
501 ) -> (F1::Output, F2::Output, F3::Output, F4::Output)
502 where
503     F1: Future,
504     F2: Future,
505     F3: Future,
506     F4: Future,
507 {
508     complete::Complete4::new(f1, f2, f3, f4).await
509 }
510 
511 /// Creates a combinator that runs the five given futures to completion, returning a tuple of the
512 /// outputs each yields.
513 ///
514 ///  # Example
515 ///
516 ///    ```
517 ///    use cros_async::{complete5, run_one};
518 ///
519 ///    let first = async {5};
520 ///    let second = async {6};
521 ///    let third = async {7};
522 ///    let fourth = async {8};
523 ///    let fifth = async {9};
524 ///    assert_eq!(run_one(complete5(first, second, third, fourth, fifth)).unwrap_or((0,0,0,0,0)),
525 ///               (5,6,7,8,9));
526 ///    ```
complete5<F1, F2, F3, F4, F5>( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output) where F1: Future, F2: Future, F3: Future, F4: Future, F5: Future,527 pub async fn complete5<F1, F2, F3, F4, F5>(
528     f1: F1,
529     f2: F2,
530     f3: F3,
531     f4: F4,
532     f5: F5,
533 ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
534 where
535     F1: Future,
536     F2: Future,
537     F3: Future,
538     F4: Future,
539     F5: Future,
540 {
541     complete::Complete5::new(f1, f2, f3, f4, f5).await
542 }
543