1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! An Executor and future combinators based on operations that block on file descriptors.
6 //!
7 //! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8 //! and utility functions to combine and manage futures. All futures will run until they block on a
9 //! file descriptor becoming readable or writable. Facilities are provided to register future
10 //! wakers based on such events.
11 //!
12 //! # Running top-level futures.
13 //!
14 //! Use helper functions based the desired behavior of your application.
15 //!
16 //! ## Running one future.
17 //!
18 //! If there is only one top-level future to run, use the [`run_one`](fn.run_one.html) function.
19 //!
20 //! ## Completing one of several futures.
21 //!
22 //! If there are several top level tasks that should run until any one completes, use the "select"
23 //! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
24 //! function will return when the first future completes. The uncompleted futures will also be
25 //! returned so they can be run further or otherwise cleaned up. These functions are inspired by
26 //! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
27 //! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
28 //! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
29 //!
30 //! ## Completing all of several futures.
31 //!
32 //! If there are several top level tasks that all need to be completed, use the "complete" family
33 //! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
34 //! function will return only once all the futures passed to it have completed. These functions are
35 //! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
36 //! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
37 //! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
38 //! [`complete5`](fn.complete5.html).
39 //!
40 //! # Implementing new FD-based futures.
41 //!
42 //! For URing implementations should provide an implementation of the `IoSource` trait.
43 //! For the FD executor, new futures can use the existing ability to poll a source to build async
44 //! functionality on top of.
45 //!
46 //! # Implementations
47 //!
48 //! Currently there are two paths for using the asynchronous IO. One uses a PollContext and drivers
49 //! futures based on the FDs signaling they are ready for the opteration. This method will exist so
50 //! long as kernels < 5.4 are supported.
51 //! The other method submits operations to io_uring and is signaled when they complete. This is more
52 //! efficient, but only supported on kernel 5.4+.
53 //! If `IoSourceExt::new` is used to interface with async IO, then the correct backend will be chosen
54 //! automatically.
55 //!
56 //! # Examples
57 //!
58 //! See the docs for `IoSourceExt` if support for kernels <5.4 is required. Focus on `UringSource` if
59 //! all systems have support for io_uring.
60
61 mod complete;
62 mod event;
63 mod executor;
64 mod fd_executor;
65 mod io_ext;
66 pub mod mem;
67 mod poll_source;
68 mod queue;
69 mod select;
70 pub mod sync;
71 mod timer;
72 mod uring_executor;
73 mod uring_source;
74 mod waker;
75
76 pub use event::EventAsync;
77 pub use executor::Executor;
78 pub use fd_executor::FdExecutor;
79 pub use io_ext::{
80 Error as AsyncError, IntoAsync, IoSourceExt, ReadAsync, Result as AsyncResult, WriteAsync,
81 };
82 pub use mem::{BackingMemory, MemRegion};
83 pub use poll_source::PollSource;
84 pub use select::SelectResult;
85 pub use timer::TimerAsync;
86 pub use uring_executor::URingExecutor;
87 pub use uring_source::UringSource;
88
89 use std::future::Future;
90 use std::marker::PhantomData;
91 use std::pin::Pin;
92 use std::task::{Context, Poll};
93
94 use thiserror::Error as ThisError;
95
96 #[derive(ThisError, Debug)]
97 pub enum Error {
98 /// Error from the FD executor.
99 #[error("Failure in the FD executor: {0}")]
100 FdExecutor(fd_executor::Error),
101 /// Error from the uring executor.
102 #[error("Failure in the uring executor: {0}")]
103 URingExecutor(uring_executor::Error),
104 }
105 pub type Result<T> = std::result::Result<T, Error>;
106
107 // A Future that never completes.
108 pub struct Empty<T> {
109 phantom: PhantomData<T>,
110 }
111
112 impl<T> Future for Empty<T> {
113 type Output = T;
114
poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T>115 fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<T> {
116 Poll::Pending
117 }
118 }
119
empty<T>() -> Empty<T>120 pub fn empty<T>() -> Empty<T> {
121 Empty {
122 phantom: PhantomData,
123 }
124 }
125
126 /// Creates an Executor that runs one future to completion.
127 ///
128 /// # Example
129 ///
130 /// ```
131 /// use cros_async::run_one;
132 ///
133 /// let fut = async { 55 };
134 /// assert_eq!(55, run_one(fut).unwrap());
135 /// ```
run_one<F: Future>(fut: F) -> Result<F::Output>136 pub fn run_one<F: Future>(fut: F) -> Result<F::Output> {
137 if uring_executor::use_uring() {
138 run_one_uring(fut)
139 } else {
140 run_one_poll(fut)
141 }
142 }
143
144 /// Creates a URingExecutor that runs one future to completion.
145 ///
146 /// # Example
147 ///
148 /// ```
149 /// use cros_async::run_one_uring;
150 ///
151 /// let fut = async { 55 };
152 /// assert_eq!(55, run_one_uring(fut).unwrap());
153 /// ```
run_one_uring<F: Future>(fut: F) -> Result<F::Output>154 pub fn run_one_uring<F: Future>(fut: F) -> Result<F::Output> {
155 URingExecutor::new()
156 .and_then(|ex| ex.run_until(fut))
157 .map_err(Error::URingExecutor)
158 }
159
160 /// Creates a FdExecutor that runs one future to completion.
161 ///
162 /// # Example
163 ///
164 /// ```
165 /// use cros_async::run_one_poll;
166 ///
167 /// let fut = async { 55 };
168 /// assert_eq!(55, run_one_poll(fut).unwrap());
169 /// ```
run_one_poll<F: Future>(fut: F) -> Result<F::Output>170 pub fn run_one_poll<F: Future>(fut: F) -> Result<F::Output> {
171 FdExecutor::new()
172 .and_then(|ex| ex.run_until(fut))
173 .map_err(Error::FdExecutor)
174 }
175
176 // Select helpers to run until any future completes.
177
178 /// Creates a combinator that runs the two given futures until one completes, returning a tuple
179 /// containing the result of the finished future and the still pending future.
180 ///
181 /// # Example
182 ///
183 /// ```
184 /// use cros_async::{SelectResult, select2, run_one};
185 /// use futures::future::pending;
186 /// use futures::pin_mut;
187 ///
188 /// let first = async {5};
189 /// let second = async {let () = pending().await;};
190 /// pin_mut!(first);
191 /// pin_mut!(second);
192 /// match run_one(select2(first, second)) {
193 /// Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (),
194 /// _ => panic!("Select didn't return the first future"),
195 /// };
196 /// ```
select2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> (SelectResult<F1>, SelectResult<F2>)197 pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
198 f1: F1,
199 f2: F2,
200 ) -> (SelectResult<F1>, SelectResult<F2>) {
201 select::Select2::new(f1, f2).await
202 }
203
204 /// Creates a combinator that runs the three given futures until one or more completes, returning a
205 /// tuple containing the result of the finished future(s) and the still pending future(s).
206 ///
207 /// # Example
208 ///
209 /// ```
210 /// use cros_async::{SelectResult, select3, run_one};
211 /// use futures::future::pending;
212 /// use futures::pin_mut;
213 ///
214 /// let first = async {4};
215 /// let second = async {let () = pending().await;};
216 /// let third = async {5};
217 /// pin_mut!(first);
218 /// pin_mut!(second);
219 /// pin_mut!(third);
220 /// match run_one(select3(first, second, third)) {
221 /// Ok((SelectResult::Finished(4),
222 /// SelectResult::Pending(_second),
223 /// SelectResult::Finished(5))) => (),
224 /// _ => panic!("Select didn't return the futures"),
225 /// };
226 /// ```
select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f1: F1, f2: F2, f3: F3, ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)227 pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
228 f1: F1,
229 f2: F2,
230 f3: F3,
231 ) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
232 select::Select3::new(f1, f2, f3).await
233 }
234
235 /// Creates a combinator that runs the four given futures until one or more completes, returning a
236 /// tuple containing the result of the finished future(s) and the still pending future(s).
237 ///
238 /// # Example
239 ///
240 /// ```
241 /// use cros_async::{SelectResult, select4, run_one};
242 /// use futures::future::pending;
243 /// use futures::pin_mut;
244 ///
245 /// let first = async {4};
246 /// let second = async {let () = pending().await;};
247 /// let third = async {5};
248 /// let fourth = async {let () = pending().await;};
249 /// pin_mut!(first);
250 /// pin_mut!(second);
251 /// pin_mut!(third);
252 /// pin_mut!(fourth);
253 /// match run_one(select4(first, second, third, fourth)) {
254 /// Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
255 /// SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (),
256 /// _ => panic!("Select didn't return the futures"),
257 /// };
258 /// ```
select4< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, )259 pub async fn select4<
260 F1: Future + Unpin,
261 F2: Future + Unpin,
262 F3: Future + Unpin,
263 F4: Future + Unpin,
264 >(
265 f1: F1,
266 f2: F2,
267 f3: F3,
268 f4: F4,
269 ) -> (
270 SelectResult<F1>,
271 SelectResult<F2>,
272 SelectResult<F3>,
273 SelectResult<F4>,
274 ) {
275 select::Select4::new(f1, f2, f3, f4).await
276 }
277
278 /// Creates a combinator that runs the five given futures until one or more completes, returning a
279 /// tuple containing the result of the finished future(s) and the still pending future(s).
280 ///
281 /// # Example
282 ///
283 /// ```
284 /// use cros_async::{SelectResult, select5, run_one};
285 /// use futures::future::pending;
286 /// use futures::pin_mut;
287 ///
288 /// let first = async {4};
289 /// let second = async {let () = pending().await;};
290 /// let third = async {5};
291 /// let fourth = async {let () = pending().await;};
292 /// let fifth = async {6};
293 /// pin_mut!(first);
294 /// pin_mut!(second);
295 /// pin_mut!(third);
296 /// pin_mut!(fourth);
297 /// pin_mut!(fifth);
298 /// match run_one(select5(first, second, third, fourth, fifth)) {
299 /// Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
300 /// SelectResult::Finished(5), SelectResult::Pending(_fourth),
301 /// SelectResult::Finished(6))) => (),
302 /// _ => panic!("Select didn't return the futures"),
303 /// };
304 /// ```
select5< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, )305 pub async fn select5<
306 F1: Future + Unpin,
307 F2: Future + Unpin,
308 F3: Future + Unpin,
309 F4: Future + Unpin,
310 F5: Future + Unpin,
311 >(
312 f1: F1,
313 f2: F2,
314 f3: F3,
315 f4: F4,
316 f5: F5,
317 ) -> (
318 SelectResult<F1>,
319 SelectResult<F2>,
320 SelectResult<F3>,
321 SelectResult<F4>,
322 SelectResult<F5>,
323 ) {
324 select::Select5::new(f1, f2, f3, f4, f5).await
325 }
326
327 /// Creates a combinator that runs the six given futures until one or more completes, returning a
328 /// tuple containing the result of the finished future(s) and the still pending future(s).
329 ///
330 /// # Example
331 ///
332 /// ```
333 /// use cros_async::{SelectResult, select6, run_one};
334 /// use futures::future::pending;
335 /// use futures::pin_mut;
336 ///
337 /// let first = async {1};
338 /// let second = async {let () = pending().await;};
339 /// let third = async {3};
340 /// let fourth = async {let () = pending().await;};
341 /// let fifth = async {5};
342 /// let sixth = async {6};
343 /// pin_mut!(first);
344 /// pin_mut!(second);
345 /// pin_mut!(third);
346 /// pin_mut!(fourth);
347 /// pin_mut!(fifth);
348 /// pin_mut!(sixth);
349 /// match run_one(select6(first, second, third, fourth, fifth, sixth)) {
350 /// Ok((SelectResult::Finished(1), SelectResult::Pending(_second),
351 /// SelectResult::Finished(3), SelectResult::Pending(_fourth),
352 /// SelectResult::Finished(5), SelectResult::Finished(6))) => (),
353 /// _ => panic!("Select didn't return the futures"),
354 /// };
355 /// ```
select6< F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin, F5: Future + Unpin, F6: Future + Unpin, >( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, f6: F6, ) -> ( SelectResult<F1>, SelectResult<F2>, SelectResult<F3>, SelectResult<F4>, SelectResult<F5>, SelectResult<F6>, )356 pub async fn select6<
357 F1: Future + Unpin,
358 F2: Future + Unpin,
359 F3: Future + Unpin,
360 F4: Future + Unpin,
361 F5: Future + Unpin,
362 F6: Future + Unpin,
363 >(
364 f1: F1,
365 f2: F2,
366 f3: F3,
367 f4: F4,
368 f5: F5,
369 f6: F6,
370 ) -> (
371 SelectResult<F1>,
372 SelectResult<F2>,
373 SelectResult<F3>,
374 SelectResult<F4>,
375 SelectResult<F5>,
376 SelectResult<F6>,
377 ) {
378 select::Select6::new(f1, f2, f3, f4, f5, f6).await
379 }
380
381 // Combination helpers to run until all futures are complete.
382
383 /// Creates a combinator that runs the two given futures to completion, returning a tuple of the
384 /// outputs each yields.
385 ///
386 /// # Example
387 ///
388 /// ```
389 /// use cros_async::{complete2, run_one};
390 ///
391 /// let first = async {5};
392 /// let second = async {6};
393 /// assert_eq!(run_one(complete2(first, second)).unwrap_or((0,0)), (5,6));
394 /// ```
complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output) where F1: Future, F2: Future,395 pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
396 where
397 F1: Future,
398 F2: Future,
399 {
400 complete::Complete2::new(f1, f2).await
401 }
402
403 /// Creates a combinator that runs the three given futures to completion, returning a tuple of the
404 /// outputs each yields.
405 ///
406 /// # Example
407 ///
408 /// ```
409 /// use cros_async::{complete3, run_one};
410 ///
411 /// let first = async {5};
412 /// let second = async {6};
413 /// let third = async {7};
414 /// assert_eq!(run_one(complete3(first, second, third)).unwrap_or((0,0,0)), (5,6,7));
415 /// ```
complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output) where F1: Future, F2: Future, F3: Future,416 pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
417 where
418 F1: Future,
419 F2: Future,
420 F3: Future,
421 {
422 complete::Complete3::new(f1, f2, f3).await
423 }
424
425 /// Creates a combinator that runs the four given futures to completion, returning a tuple of the
426 /// outputs each yields.
427 ///
428 /// # Example
429 ///
430 /// ```
431 /// use cros_async::{complete4, run_one};
432 ///
433 /// let first = async {5};
434 /// let second = async {6};
435 /// let third = async {7};
436 /// let fourth = async {8};
437 /// assert_eq!(run_one(complete4(first, second, third, fourth)).unwrap_or((0,0,0,0)), (5,6,7,8));
438 /// ```
complete4<F1, F2, F3, F4>( f1: F1, f2: F2, f3: F3, f4: F4, ) -> (F1::Output, F2::Output, F3::Output, F4::Output) where F1: Future, F2: Future, F3: Future, F4: Future,439 pub async fn complete4<F1, F2, F3, F4>(
440 f1: F1,
441 f2: F2,
442 f3: F3,
443 f4: F4,
444 ) -> (F1::Output, F2::Output, F3::Output, F4::Output)
445 where
446 F1: Future,
447 F2: Future,
448 F3: Future,
449 F4: Future,
450 {
451 complete::Complete4::new(f1, f2, f3, f4).await
452 }
453
454 /// Creates a combinator that runs the five given futures to completion, returning a tuple of the
455 /// outputs each yields.
456 ///
457 /// # Example
458 ///
459 /// ```
460 /// use cros_async::{complete5, run_one};
461 ///
462 /// let first = async {5};
463 /// let second = async {6};
464 /// let third = async {7};
465 /// let fourth = async {8};
466 /// let fifth = async {9};
467 /// assert_eq!(run_one(complete5(first, second, third, fourth, fifth)).unwrap_or((0,0,0,0,0)),
468 /// (5,6,7,8,9));
469 /// ```
complete5<F1, F2, F3, F4, F5>( f1: F1, f2: F2, f3: F3, f4: F4, f5: F5, ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output) where F1: Future, F2: Future, F3: Future, F4: Future, F5: Future,470 pub async fn complete5<F1, F2, F3, F4, F5>(
471 f1: F1,
472 f2: F2,
473 f3: F3,
474 f4: F4,
475 f5: F5,
476 ) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
477 where
478 F1: Future,
479 F2: Future,
480 F3: Future,
481 F4: Future,
482 F5: Future,
483 {
484 complete::Complete5::new(f1, f2, f3, f4, f5).await
485 }
486