1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! The executor runs all given futures to completion. Futures register wakers associated with file
6 //! descriptors. The wakers will be called when the FD becomes readable or writable depending on
7 //! the situation.
8 //!
9 //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
10 //! utility functions to combine futures.
11
12 use std::{
13 fs::File,
14 future::Future,
15 io, mem,
16 os::unix::io::{AsRawFd, FromRawFd, RawFd},
17 pin::Pin,
18 sync::{
19 atomic::{AtomicI32, Ordering},
20 Arc, Weak,
21 },
22 task::{Context, Poll, Waker},
23 };
24
25 use async_task::Task;
26 use base::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
27 use futures::task::noop_waker;
28 use pin_utils::pin_mut;
29 use remain::sorted;
30 use slab::Slab;
31 use sync::Mutex;
32 use thiserror::Error as ThisError;
33
34 use super::{
35 queue::RunnableQueue,
36 waker::{new_waker, WakerToken, WeakWake},
37 BlockingPool,
38 };
39
40 #[sorted]
41 #[derive(Debug, ThisError)]
42 pub enum Error {
43 /// Failed to clone the EventFd for waking the executor.
44 #[error("Failed to clone the EventFd for waking the executor: {0}")]
45 CloneEventFd(base::Error),
46 /// Failed to create the EventFd for waking the executor.
47 #[error("Failed to create the EventFd for waking the executor: {0}")]
48 CreateEventFd(base::Error),
49 /// Creating a context to wait on FDs failed.
50 #[error("An error creating the fd waiting context: {0}")]
51 CreatingContext(base::Error),
52 /// Failed to copy the FD for the polling context.
53 #[error("Failed to copy the FD for the polling context: {0}")]
54 DuplicatingFd(base::Error),
55 /// The Executor is gone.
56 #[error("The FDExecutor is gone")]
57 ExecutorGone,
58 /// PollContext failure.
59 #[error("PollContext failure: {0}")]
60 PollContextError(base::Error),
61 /// An error occurred when setting the FD non-blocking.
62 #[error("An error occurred setting the FD non-blocking: {0}.")]
63 SettingNonBlocking(base::Error),
64 /// Failed to submit the waker to the polling context.
65 #[error("An error adding to the Aio context: {0}")]
66 SubmittingWaker(base::Error),
67 /// A Waker was canceled, but the operation isn't running.
68 #[error("Unknown waker")]
69 UnknownWaker,
70 }
71 pub type Result<T> = std::result::Result<T, Error>;
72
73 impl From<Error> for io::Error {
from(e: Error) -> Self74 fn from(e: Error) -> Self {
75 use Error::*;
76 match e {
77 CloneEventFd(e) => e.into(),
78 CreateEventFd(e) => e.into(),
79 DuplicatingFd(e) => e.into(),
80 ExecutorGone => io::Error::new(io::ErrorKind::Other, e),
81 CreatingContext(e) => e.into(),
82 PollContextError(e) => e.into(),
83 SettingNonBlocking(e) => e.into(),
84 SubmittingWaker(e) => e.into(),
85 UnknownWaker => io::Error::new(io::ErrorKind::Other, e),
86 }
87 }
88 }
89
90 // A poll operation that has been submitted and is potentially being waited on.
91 struct OpData {
92 file: File,
93 waker: Option<Waker>,
94 }
95
96 // The current status of a submitted operation.
97 enum OpStatus {
98 Pending(OpData),
99 Completed,
100 }
101
102 // An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the
103 // associated executor.
104 pub struct RegisteredSource<F> {
105 source: F,
106 ex: Weak<RawExecutor>,
107 }
108
109 impl<F: AsRawFd> RegisteredSource<F> {
110 // Start an asynchronous operation to wait for this source to become readable. The returned
111 // future will not be ready until the source is readable.
wait_readable(&self) -> Result<PendingOperation>112 pub fn wait_readable(&self) -> Result<PendingOperation> {
113 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
114
115 let token =
116 ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?;
117
118 Ok(PendingOperation {
119 token: Some(token),
120 ex: self.ex.clone(),
121 })
122 }
123
124 // Start an asynchronous operation to wait for this source to become writable. The returned
125 // future will not be ready until the source is writable.
wait_writable(&self) -> Result<PendingOperation>126 pub fn wait_writable(&self) -> Result<PendingOperation> {
127 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
128
129 let token =
130 ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?;
131
132 Ok(PendingOperation {
133 token: Some(token),
134 ex: self.ex.clone(),
135 })
136 }
137 }
138
139 impl<F> RegisteredSource<F> {
140 // Consume this RegisteredSource and return the inner IO source.
into_source(self) -> F141 pub fn into_source(self) -> F {
142 self.source
143 }
144 }
145
146 impl<F> AsRef<F> for RegisteredSource<F> {
as_ref(&self) -> &F147 fn as_ref(&self) -> &F {
148 &self.source
149 }
150 }
151
152 impl<F> AsMut<F> for RegisteredSource<F> {
as_mut(&mut self) -> &mut F153 fn as_mut(&mut self) -> &mut F {
154 &mut self.source
155 }
156 }
157
158 /// A token returned from `add_operation` that can be used to cancel the waker before it completes.
159 /// Used to manage getting the result from the underlying executor for a completed operation.
160 /// Dropping a `PendingOperation` will get the result from the executor.
161 pub struct PendingOperation {
162 token: Option<WakerToken>,
163 ex: Weak<RawExecutor>,
164 }
165
166 impl Future for PendingOperation {
167 type Output = Result<()>;
168
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>169 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
170 let token = self
171 .token
172 .as_ref()
173 .expect("PendingOperation polled after returning Poll::Ready");
174 if let Some(ex) = self.ex.upgrade() {
175 if ex.is_ready(token, cx) {
176 self.token = None;
177 Poll::Ready(Ok(()))
178 } else {
179 Poll::Pending
180 }
181 } else {
182 Poll::Ready(Err(Error::ExecutorGone))
183 }
184 }
185 }
186
187 impl Drop for PendingOperation {
drop(&mut self)188 fn drop(&mut self) {
189 if let Some(token) = self.token.take() {
190 if let Some(ex) = self.ex.upgrade() {
191 let _ = ex.cancel_operation(token);
192 }
193 }
194 }
195 }
196
197 // This function exists to guarantee that non-epoll futures will not starve until an epoll future is
198 // ready to be polled again. The mechanism is very similar to the self-pipe trick used by C programs
199 // to reliably mix select / poll with signal handling. This is how it works:
200 //
201 // * RawExecutor::new creates an eventfd, dupes it, and spawns this async function with the duped fd.
202 // * The first time notify_task is polled it tries to read from the eventfd and if that fails, waits
203 // for the fd to become readable.
204 // * Meanwhile the RawExecutor keeps the original fd for the eventfd.
205 // * Whenever RawExecutor::wake is called it will write to the eventfd if it determines that the
206 // executor thread is currently blocked inside an io_epoll_enter call. This can happen when a
207 // non-epoll future becomes ready to poll.
208 // * The write to the eventfd causes the fd to become readable, which then allows the epoll() call
209 // to return with at least one readable fd.
210 // * The executor then polls the non-epoll future that became ready, any epoll futures that
211 // completed, and the notify_task function, which then queues up another read on the eventfd and
212 // the process can repeat.
notify_task(notify: EventFd, raw: Weak<RawExecutor>)213 async fn notify_task(notify: EventFd, raw: Weak<RawExecutor>) {
214 add_fd_flags(notify.as_raw_fd(), libc::O_NONBLOCK)
215 .expect("Failed to set notify EventFd as non-blocking");
216
217 loop {
218 match notify.read() {
219 Ok(_) => {}
220 Err(e) if e.errno() == libc::EWOULDBLOCK => {}
221 Err(e) => panic!("Unexpected error while reading notify EventFd: {}", e),
222 }
223
224 if let Some(ex) = raw.upgrade() {
225 let token = ex
226 .add_operation(notify.as_raw_fd(), WatchingEvents::empty().set_read())
227 .expect("Failed to add notify EventFd to PollCtx");
228
229 // We don't want to hold an active reference to the executor in the .await below.
230 mem::drop(ex);
231
232 let op = PendingOperation {
233 token: Some(token),
234 ex: raw.clone(),
235 };
236
237 match op.await {
238 Ok(()) => {}
239 Err(Error::ExecutorGone) => break,
240 Err(e) => panic!("Unexpected error while waiting for notify EventFd: {}", e),
241 }
242 } else {
243 // The executor is gone so we should also exit.
244 break;
245 }
246 }
247 }
248
249 // Indicates that the executor is either within or about to make a PollContext::wait() call. When a
250 // waker sees this value, it will write to the notify EventFd, which will cause the
251 // PollContext::wait() call to return.
252 const WAITING: i32 = 0x1d5b_c019u32 as i32;
253
254 // Indicates that the executor is processing any futures that are ready to run.
255 const PROCESSING: i32 = 0xd474_77bcu32 as i32;
256
257 // Indicates that one or more futures may be ready to make progress.
258 const WOKEN: i32 = 0x3e4d_3276u32 as i32;
259
260 struct RawExecutor {
261 queue: RunnableQueue,
262 poll_ctx: EpollContext<usize>,
263 ops: Mutex<Slab<OpStatus>>,
264 blocking_pool: BlockingPool,
265 state: AtomicI32,
266 notify: EventFd,
267 }
268
269 impl RawExecutor {
new(notify: EventFd) -> Result<Self>270 fn new(notify: EventFd) -> Result<Self> {
271 Ok(RawExecutor {
272 queue: RunnableQueue::new(),
273 poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?,
274 ops: Mutex::new(Slab::with_capacity(64)),
275 blocking_pool: Default::default(),
276 state: AtomicI32::new(PROCESSING),
277 notify,
278 })
279 }
280
add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken>281 fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken> {
282 let duped_fd = unsafe {
283 // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
284 // will only be added to the poll loop.
285 File::from_raw_fd(dup_fd(fd)?)
286 };
287 let mut ops = self.ops.lock();
288 let entry = ops.vacant_entry();
289 let next_token = entry.key();
290 self.poll_ctx
291 .add_fd_with_events(&duped_fd, events, next_token)
292 .map_err(Error::SubmittingWaker)?;
293 entry.insert(OpStatus::Pending(OpData {
294 file: duped_fd,
295 waker: None,
296 }));
297 Ok(WakerToken(next_token))
298 }
299
wake(&self)300 fn wake(&self) {
301 let oldstate = self.state.swap(WOKEN, Ordering::Release);
302 if oldstate == WAITING {
303 if let Err(e) = self.notify.write(1) {
304 warn!("Failed to notify executor that a future is ready: {}", e);
305 }
306 }
307 }
308
spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,309 fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
310 where
311 F: Future + Send + 'static,
312 F::Output: Send + 'static,
313 {
314 let raw = Arc::downgrade(self);
315 let schedule = move |runnable| {
316 if let Some(r) = raw.upgrade() {
317 r.queue.push_back(runnable);
318 r.wake();
319 }
320 };
321 let (runnable, task) = async_task::spawn(f, schedule);
322 runnable.schedule();
323 task
324 }
325
spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,326 fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
327 where
328 F: Future + 'static,
329 F::Output: 'static,
330 {
331 let raw = Arc::downgrade(self);
332 let schedule = move |runnable| {
333 if let Some(r) = raw.upgrade() {
334 r.queue.push_back(runnable);
335 r.wake();
336 }
337 };
338 let (runnable, task) = async_task::spawn_local(f, schedule);
339 runnable.schedule();
340 task
341 }
342
spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> Task<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,343 fn spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> Task<R>
344 where
345 F: FnOnce() -> R + Send + 'static,
346 R: Send + 'static,
347 {
348 self.blocking_pool.spawn(f)
349 }
350
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>351 fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
352 let events = EpollEvents::new();
353 pin_mut!(done);
354
355 loop {
356 self.state.store(PROCESSING, Ordering::Release);
357 for runnable in self.queue.iter() {
358 runnable.run();
359 }
360
361 if let Poll::Ready(val) = done.as_mut().poll(cx) {
362 return Ok(val);
363 }
364
365 let oldstate = self.state.compare_exchange(
366 PROCESSING,
367 WAITING,
368 Ordering::Acquire,
369 Ordering::Acquire,
370 );
371 if let Err(oldstate) = oldstate {
372 debug_assert_eq!(oldstate, WOKEN);
373 // One or more futures have become runnable.
374 continue;
375 }
376
377 let events = self
378 .poll_ctx
379 .wait(&events)
380 .map_err(Error::PollContextError)?;
381
382 // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
383 // writing to the eventfd.
384 self.state.store(PROCESSING, Ordering::Release);
385 for e in events.iter() {
386 let token = e.token();
387 let mut ops = self.ops.lock();
388
389 // The op could have been canceled and removed by another thread so ignore it if it
390 // doesn't exist.
391 if let Some(op) = ops.get_mut(token) {
392 let (file, waker) = match mem::replace(op, OpStatus::Completed) {
393 OpStatus::Pending(OpData { file, waker }) => (file, waker),
394 OpStatus::Completed => panic!("poll operation completed more than once"),
395 };
396
397 mem::drop(ops);
398
399 self.poll_ctx
400 .delete(&file)
401 .map_err(Error::PollContextError)?;
402
403 if let Some(waker) = waker {
404 waker.wake();
405 }
406 }
407 }
408 }
409 }
410
is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool411 fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
412 let mut ops = self.ops.lock();
413
414 let op = ops
415 .get_mut(token.0)
416 .expect("`is_ready` called on unknown operation");
417 match op {
418 OpStatus::Pending(data) => {
419 data.waker = Some(cx.waker().clone());
420 false
421 }
422 OpStatus::Completed => {
423 ops.remove(token.0);
424 true
425 }
426 }
427 }
428
429 // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken) -> Result<()>430 fn cancel_operation(&self, token: WakerToken) -> Result<()> {
431 match self.ops.lock().remove(token.0) {
432 OpStatus::Pending(data) => self
433 .poll_ctx
434 .delete(&data.file)
435 .map_err(Error::PollContextError),
436 OpStatus::Completed => Ok(()),
437 }
438 }
439 }
440
441 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)442 fn wake_by_ref(weak_self: &Weak<Self>) {
443 if let Some(arc_self) = weak_self.upgrade() {
444 RawExecutor::wake(&arc_self);
445 }
446 }
447 }
448
449 impl Drop for RawExecutor {
drop(&mut self)450 fn drop(&mut self) {
451 // Wake up the notify_task. We set the state to WAITING here so that wake() will write to
452 // the eventfd.
453 self.state.store(WAITING, Ordering::Release);
454 self.wake();
455
456 // Wake up any futures still waiting on poll operations as they are just going to get an
457 // ExecutorGone error now.
458 for op in self.ops.get_mut().drain() {
459 match op {
460 OpStatus::Pending(mut data) => {
461 if let Some(waker) = data.waker.take() {
462 waker.wake();
463 }
464
465 if let Err(e) = self.poll_ctx.delete(&data.file) {
466 warn!("Failed to remove file from EpollCtx: {}", e);
467 }
468 }
469 OpStatus::Completed => {}
470 }
471 }
472
473 // Now run the executor one more time to drive any remaining futures to completion.
474 let waker = noop_waker();
475 let mut cx = Context::from_waker(&waker);
476 if let Err(e) = self.run(&mut cx, async {}) {
477 warn!("Failed to drive FdExecutor to completion: {}", e);
478 }
479 }
480 }
481
482 #[derive(Clone)]
483 pub struct FdExecutor {
484 raw: Arc<RawExecutor>,
485 }
486
487 impl FdExecutor {
new() -> Result<FdExecutor>488 pub fn new() -> Result<FdExecutor> {
489 let notify = EventFd::new().map_err(Error::CreateEventFd)?;
490 let raw = notify
491 .try_clone()
492 .map_err(Error::CloneEventFd)
493 .and_then(RawExecutor::new)
494 .map(Arc::new)?;
495
496 raw.spawn(notify_task(notify, Arc::downgrade(&raw)))
497 .detach();
498
499 Ok(FdExecutor { raw })
500 }
501
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,502 pub fn spawn<F>(&self, f: F) -> Task<F::Output>
503 where
504 F: Future + Send + 'static,
505 F::Output: Send + 'static,
506 {
507 self.raw.spawn(f)
508 }
509
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,510 pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
511 where
512 F: Future + 'static,
513 F::Output: 'static,
514 {
515 self.raw.spawn_local(f)
516 }
517
spawn_blocking<F, R>(&self, f: F) -> Task<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,518 pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
519 where
520 F: FnOnce() -> R + Send + 'static,
521 R: Send + 'static,
522 {
523 self.raw.spawn_blocking(f)
524 }
525
run(&self) -> Result<()>526 pub fn run(&self) -> Result<()> {
527 let waker = new_waker(Arc::downgrade(&self.raw));
528 let mut cx = Context::from_waker(&waker);
529
530 self.raw.run(&mut cx, super::empty::<()>())
531 }
532
run_until<F: Future>(&self, f: F) -> Result<F::Output>533 pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
534 let waker = new_waker(Arc::downgrade(&self.raw));
535 let mut ctx = Context::from_waker(&waker);
536
537 self.raw.run(&mut ctx, f)
538 }
539
register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>>540 pub(crate) fn register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>> {
541 add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
542 Ok(RegisteredSource {
543 source: f,
544 ex: Arc::downgrade(&self.raw),
545 })
546 }
547 }
548
549 // Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while
550 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>551 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
552 let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
553 if ret < 0 {
554 Err(Error::DuplicatingFd(base::Error::last()))
555 } else {
556 Ok(ret)
557 }
558 }
559
560 #[cfg(test)]
561 mod test {
562 use std::{
563 cell::RefCell,
564 io::{Read, Write},
565 rc::Rc,
566 };
567
568 use futures::future::Either;
569
570 use super::*;
571
572 #[test]
test_it()573 fn test_it() {
574 async fn do_test(ex: &FdExecutor) {
575 let (r, _w) = base::pipe(true).unwrap();
576 let done = Box::pin(async { 5usize });
577 let source = ex.register_source(r).unwrap();
578 let pending = source.wait_readable().unwrap();
579 match futures::future::select(pending, done).await {
580 Either::Right((5, pending)) => std::mem::drop(pending),
581 _ => panic!("unexpected select result"),
582 }
583 }
584
585 let ex = FdExecutor::new().unwrap();
586 ex.run_until(do_test(&ex)).unwrap();
587
588 // Example of starting the framework and running a future:
589 async fn my_async(x: Rc<RefCell<u64>>) {
590 x.replace(4);
591 }
592
593 let x = Rc::new(RefCell::new(0));
594 super::super::run_one_poll(my_async(x.clone())).unwrap();
595 assert_eq!(*x.borrow(), 4);
596 }
597
598 #[test]
drop_before_completion()599 fn drop_before_completion() {
600 const VALUE: u64 = 0x66ae_cb65_12fb_d260;
601
602 async fn write_value(mut tx: File) {
603 let buf = VALUE.to_ne_bytes();
604 tx.write_all(&buf[..]).expect("Failed to write to pipe");
605 }
606
607 async fn check_op(op: PendingOperation) {
608 let err = op.await.expect_err("Task completed successfully");
609 match err {
610 Error::ExecutorGone => {}
611 e => panic!("Unexpected error from task: {}", e),
612 }
613 }
614
615 let (mut rx, tx) = base::pipe(true).expect("Pipe failed");
616
617 let ex = FdExecutor::new().unwrap();
618
619 let source = ex.register_source(tx.try_clone().unwrap()).unwrap();
620 let op = source.wait_writable().unwrap();
621
622 ex.spawn_local(write_value(tx)).detach();
623 ex.spawn_local(check_op(op)).detach();
624
625 // Now drop the executor. It should still run until the write to the pipe is complete.
626 mem::drop(ex);
627
628 let mut buf = 0u64.to_ne_bytes();
629 rx.read_exact(&mut buf[..])
630 .expect("Failed to read from pipe");
631
632 assert_eq!(u64::from_ne_bytes(buf), VALUE);
633 }
634 }
635