// Copyright 2020 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. //! The executor runs all given futures to completion. Futures register wakers associated with file //! descriptors. The wakers will be called when the FD becomes readable or writable depending on //! the situation. //! //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and //! utility functions to combine futures. use std::fs::File; use std::future::Future; use std::mem; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::pin::Pin; use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Weak}; use std::task::{Context, Poll, Waker}; use async_task::Task; use futures::task::noop_waker; use pin_utils::pin_mut; use slab::Slab; use sync::Mutex; use sys_util::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents}; use thiserror::Error as ThisError; use crate::queue::RunnableQueue; use crate::waker::{new_waker, WakerToken, WeakWake}; #[derive(Debug, ThisError)] pub enum Error { /// Failed to clone the EventFd for waking the executor. #[error("Failed to clone the EventFd for waking the executor: {0}")] CloneEventFd(sys_util::Error), /// Failed to create the EventFd for waking the executor. #[error("Failed to create the EventFd for waking the executor: {0}")] CreateEventFd(sys_util::Error), /// Failed to copy the FD for the polling context. #[error("Failed to copy the FD for the polling context: {0}")] DuplicatingFd(sys_util::Error), /// The Executor is gone. #[error("The FDExecutor is gone")] ExecutorGone, /// Creating a context to wait on FDs failed. #[error("An error creating the fd waiting context: {0}")] CreatingContext(sys_util::Error), /// PollContext failure. #[error("PollContext failure: {0}")] PollContextError(sys_util::Error), /// An error occurred when setting the FD non-blocking. #[error("An error occurred setting the FD non-blocking: {0}.")] SettingNonBlocking(sys_util::Error), /// Failed to submit the waker to the polling context. #[error("An error adding to the Aio context: {0}")] SubmittingWaker(sys_util::Error), /// A Waker was canceled, but the operation isn't running. #[error("Unknown waker")] UnknownWaker, } pub type Result = std::result::Result; // A poll operation that has been submitted and is potentially being waited on. struct OpData { file: File, waker: Option, } // The current status of a submitted operation. enum OpStatus { Pending(OpData), Completed, } // An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the // associated executor. pub struct RegisteredSource { source: F, ex: Weak, } impl RegisteredSource { // Start an asynchronous operation to wait for this source to become readable. The returned // future will not be ready until the source is readable. pub fn wait_readable(&self) -> Result { let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; let token = ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?; Ok(PendingOperation { token: Some(token), ex: self.ex.clone(), }) } // Start an asynchronous operation to wait for this source to become writable. The returned // future will not be ready until the source is writable. pub fn wait_writable(&self) -> Result { let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?; let token = ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?; Ok(PendingOperation { token: Some(token), ex: self.ex.clone(), }) } } impl RegisteredSource { // Consume this RegisteredSource and return the inner IO source. pub fn into_source(self) -> F { self.source } } impl AsRef for RegisteredSource { fn as_ref(&self) -> &F { &self.source } } impl AsMut for RegisteredSource { fn as_mut(&mut self) -> &mut F { &mut self.source } } /// A token returned from `add_operation` that can be used to cancel the waker before it completes. /// Used to manage getting the result from the underlying executor for a completed operation. /// Dropping a `PendingOperation` will get the result from the executor. pub struct PendingOperation { token: Option, ex: Weak, } impl Future for PendingOperation { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let token = self .token .as_ref() .expect("PendingOperation polled after returning Poll::Ready"); if let Some(ex) = self.ex.upgrade() { if ex.is_ready(token, cx) { self.token = None; Poll::Ready(Ok(())) } else { Poll::Pending } } else { Poll::Ready(Err(Error::ExecutorGone)) } } } impl Drop for PendingOperation { fn drop(&mut self) { if let Some(token) = self.token.take() { if let Some(ex) = self.ex.upgrade() { let _ = ex.cancel_operation(token); } } } } // This function exists to guarantee that non-epoll futures will not starve until an epoll future is // ready to be polled again. The mechanism is very similar to the self-pipe trick used by C programs // to reliably mix select / poll with signal handling. This is how it works: // // * RawExecutor::new creates an eventfd, dupes it, and spawns this async function with the duped fd. // * The first time notify_task is polled it tries to read from the eventfd and if that fails, waits // for the fd to become readable. // * Meanwhile the RawExecutor keeps the original fd for the eventfd. // * Whenever RawExecutor::wake is called it will write to the eventfd if it determines that the // executor thread is currently blocked inside an io_epoll_enter call. This can happen when a // non-epoll future becomes ready to poll. // * The write to the eventfd causes the fd to become readable, which then allows the epoll() call // to return with at least one readable fd. // * The executor then polls the non-epoll future that became ready, any epoll futures that // completed, and the notify_task function, which then queues up another read on the eventfd and // the process can repeat. async fn notify_task(notify: EventFd, raw: Weak) { add_fd_flags(notify.as_raw_fd(), libc::O_NONBLOCK) .expect("Failed to set notify EventFd as non-blocking"); loop { match notify.read() { Ok(_) => {} Err(e) if e.errno() == libc::EWOULDBLOCK => {} Err(e) => panic!("Unexpected error while reading notify EventFd: {}", e), } if let Some(ex) = raw.upgrade() { let token = ex .add_operation(notify.as_raw_fd(), WatchingEvents::empty().set_read()) .expect("Failed to add notify EventFd to PollCtx"); // We don't want to hold an active reference to the executor in the .await below. mem::drop(ex); let op = PendingOperation { token: Some(token), ex: raw.clone(), }; match op.await { Ok(()) => {} Err(Error::ExecutorGone) => break, Err(e) => panic!("Unexpected error while waiting for notify EventFd: {}", e), } } else { // The executor is gone so we should also exit. break; } } } // Indicates that the executor is either within or about to make a PollContext::wait() call. When a // waker sees this value, it will write to the notify EventFd, which will cause the // PollContext::wait() call to return. const WAITING: i32 = 0x1d5b_c019u32 as i32; // Indicates that the executor is processing any futures that are ready to run. const PROCESSING: i32 = 0xd474_77bcu32 as i32; // Indicates that one or more futures may be ready to make progress. const WOKEN: i32 = 0x3e4d_3276u32 as i32; struct RawExecutor { queue: RunnableQueue, poll_ctx: EpollContext, ops: Mutex>, state: AtomicI32, notify: EventFd, } impl RawExecutor { fn new(notify: EventFd) -> Result { Ok(RawExecutor { queue: RunnableQueue::new(), poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?, ops: Mutex::new(Slab::with_capacity(64)), state: AtomicI32::new(PROCESSING), notify, }) } fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result { let duped_fd = unsafe { // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD // will only be added to the poll loop. File::from_raw_fd(dup_fd(fd)?) }; let mut ops = self.ops.lock(); let entry = ops.vacant_entry(); let next_token = entry.key(); self.poll_ctx .add_fd_with_events(&duped_fd, events, next_token) .map_err(Error::SubmittingWaker)?; entry.insert(OpStatus::Pending(OpData { file: duped_fd, waker: None, })); Ok(WakerToken(next_token)) } fn wake(&self) { let oldstate = self.state.swap(WOKEN, Ordering::Release); if oldstate == WAITING { if let Err(e) = self.notify.write(1) { warn!("Failed to notify executor that a future is ready: {}", e); } } } fn spawn(self: &Arc, f: F) -> Task where F: Future + Send + 'static, F::Output: Send + 'static, { let raw = Arc::downgrade(self); let schedule = move |runnable| { if let Some(r) = raw.upgrade() { r.queue.push_back(runnable); r.wake(); } }; let (runnable, task) = async_task::spawn(f, schedule); runnable.schedule(); task } fn spawn_local(self: &Arc, f: F) -> Task where F: Future + 'static, F::Output: 'static, { let raw = Arc::downgrade(self); let schedule = move |runnable| { if let Some(r) = raw.upgrade() { r.queue.push_back(runnable); r.wake(); } }; let (runnable, task) = async_task::spawn_local(f, schedule); runnable.schedule(); task } fn run(&self, cx: &mut Context, done: F) -> Result { let events = EpollEvents::new(); pin_mut!(done); loop { self.state.store(PROCESSING, Ordering::Release); for runnable in self.queue.iter() { runnable.run(); } if let Poll::Ready(val) = done.as_mut().poll(cx) { return Ok(val); } let oldstate = self.state.compare_exchange( PROCESSING, WAITING, Ordering::Acquire, Ordering::Acquire, ); if let Err(oldstate) = oldstate { debug_assert_eq!(oldstate, WOKEN); // One or more futures have become runnable. continue; } let events = self .poll_ctx .wait(&events) .map_err(Error::PollContextError)?; // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from // writing to the eventfd. self.state.store(PROCESSING, Ordering::Release); for e in events.iter() { let token = e.token(); let mut ops = self.ops.lock(); // The op could have been canceled and removed by another thread so ignore it if it // doesn't exist. if let Some(op) = ops.get_mut(token) { let (file, waker) = match mem::replace(op, OpStatus::Completed) { OpStatus::Pending(OpData { file, waker }) => (file, waker), OpStatus::Completed => panic!("poll operation completed more than once"), }; mem::drop(ops); self.poll_ctx .delete(&file) .map_err(Error::PollContextError)?; if let Some(waker) = waker { waker.wake(); } } } } } fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool { let mut ops = self.ops.lock(); let op = ops .get_mut(token.0) .expect("`is_ready` called on unknown operation"); match op { OpStatus::Pending(data) => { data.waker = Some(cx.waker().clone()); false } OpStatus::Completed => { ops.remove(token.0); true } } } // Remove the waker for the given token if it hasn't fired yet. fn cancel_operation(&self, token: WakerToken) -> Result<()> { match self.ops.lock().remove(token.0) { OpStatus::Pending(data) => self .poll_ctx .delete(&data.file) .map_err(Error::PollContextError), OpStatus::Completed => Ok(()), } } } impl WeakWake for RawExecutor { fn wake_by_ref(weak_self: &Weak) { if let Some(arc_self) = weak_self.upgrade() { RawExecutor::wake(&arc_self); } } } impl Drop for RawExecutor { fn drop(&mut self) { // Wake up the notify_task. We set the state to WAITING here so that wake() will write to // the eventfd. self.state.store(WAITING, Ordering::Release); self.wake(); // Wake up any futures still waiting on poll operations as they are just going to get an // ExecutorGone error now. for op in self.ops.get_mut().drain() { match op { OpStatus::Pending(mut data) => { if let Some(waker) = data.waker.take() { waker.wake(); } if let Err(e) = self.poll_ctx.delete(&data.file) { warn!("Failed to remove file from EpollCtx: {}", e); } } OpStatus::Completed => {} } } // Now run the executor one more time to drive any remaining futures to completion. let waker = noop_waker(); let mut cx = Context::from_waker(&waker); if let Err(e) = self.run(&mut cx, async {}) { warn!("Failed to drive FdExecutor to completion: {}", e); } } } #[derive(Clone)] pub struct FdExecutor { raw: Arc, } impl FdExecutor { pub fn new() -> Result { let notify = EventFd::new().map_err(Error::CreateEventFd)?; let raw = notify .try_clone() .map_err(Error::CloneEventFd) .and_then(RawExecutor::new) .map(Arc::new)?; raw.spawn(notify_task(notify, Arc::downgrade(&raw))) .detach(); Ok(FdExecutor { raw }) } pub fn spawn(&self, f: F) -> Task where F: Future + Send + 'static, F::Output: Send + 'static, { self.raw.spawn(f) } pub fn spawn_local(&self, f: F) -> Task where F: Future + 'static, F::Output: 'static, { self.raw.spawn_local(f) } pub fn run(&self) -> Result<()> { let waker = new_waker(Arc::downgrade(&self.raw)); let mut cx = Context::from_waker(&waker); self.raw.run(&mut cx, crate::empty::<()>()) } pub fn run_until(&self, f: F) -> Result { let waker = new_waker(Arc::downgrade(&self.raw)); let mut ctx = Context::from_waker(&waker); self.raw.run(&mut ctx, f) } pub(crate) fn register_source(&self, f: F) -> Result> { add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?; Ok(RegisteredSource { source: f, ex: Arc::downgrade(&self.raw), }) } } // Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while // waiting in TLS to be added to the main polling context. unsafe fn dup_fd(fd: RawFd) -> Result { let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0); if ret < 0 { Err(Error::DuplicatingFd(sys_util::Error::last())) } else { Ok(ret) } } #[cfg(test)] mod test { use std::cell::RefCell; use std::io::{Read, Write}; use std::rc::Rc; use futures::future::Either; use super::*; #[test] fn test_it() { async fn do_test(ex: &FdExecutor) { let (r, _w) = sys_util::pipe(true).unwrap(); let done = Box::pin(async { 5usize }); let source = ex.register_source(r).unwrap(); let pending = source.wait_readable().unwrap(); match futures::future::select(pending, done).await { Either::Right((5, pending)) => std::mem::drop(pending), _ => panic!("unexpected select result"), } } let ex = FdExecutor::new().unwrap(); ex.run_until(do_test(&ex)).unwrap(); // Example of starting the framework and running a future: async fn my_async(x: Rc>) { x.replace(4); } let x = Rc::new(RefCell::new(0)); crate::run_one_poll(my_async(x.clone())).unwrap(); assert_eq!(*x.borrow(), 4); } #[test] fn drop_before_completion() { const VALUE: u64 = 0x66ae_cb65_12fb_d260; async fn write_value(mut tx: File) { let buf = VALUE.to_ne_bytes(); tx.write_all(&buf[..]).expect("Failed to write to pipe"); } async fn check_op(op: PendingOperation) { let err = op.await.expect_err("Task completed successfully"); match err { Error::ExecutorGone => {} e => panic!("Unexpected error from task: {}", e), } } let (mut rx, tx) = sys_util::pipe(true).expect("Pipe failed"); let ex = FdExecutor::new().unwrap(); let source = ex.register_source(tx.try_clone().unwrap()).unwrap(); let op = source.wait_writable().unwrap(); ex.spawn_local(write_value(tx)).detach(); ex.spawn_local(check_op(op)).detach(); // Now drop the executor. It should still run until the write to the pipe is complete. mem::drop(ex); let mut buf = 0u64.to_ne_bytes(); rx.read_exact(&mut buf[..]) .expect("Failed to read from pipe"); assert_eq!(u64::from_ne_bytes(buf), VALUE); } }