1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! The executor runs all given futures to completion. Futures register wakers associated with file
6 //! descriptors. The wakers will be called when the FD becomes readable or writable depending on
7 //! the situation.
8 //!
9 //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
10 //! utility functions to combine futures.
11
12 use std::fs::File;
13 use std::future::Future;
14 use std::mem;
15 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
16 use std::pin::Pin;
17 use std::sync::atomic::{AtomicI32, Ordering};
18 use std::sync::{Arc, Weak};
19 use std::task::{Context, Poll, Waker};
20
21 use async_task::Task;
22 use futures::task::noop_waker;
23 use pin_utils::pin_mut;
24 use slab::Slab;
25 use sync::Mutex;
26 use sys_util::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
27 use thiserror::Error as ThisError;
28
29 use crate::queue::RunnableQueue;
30 use crate::waker::{new_waker, WakerToken, WeakWake};
31
32 #[derive(Debug, ThisError)]
33 pub enum Error {
34 /// Failed to clone the EventFd for waking the executor.
35 #[error("Failed to clone the EventFd for waking the executor: {0}")]
36 CloneEventFd(sys_util::Error),
37 /// Failed to create the EventFd for waking the executor.
38 #[error("Failed to create the EventFd for waking the executor: {0}")]
39 CreateEventFd(sys_util::Error),
40 /// Failed to copy the FD for the polling context.
41 #[error("Failed to copy the FD for the polling context: {0}")]
42 DuplicatingFd(sys_util::Error),
43 /// The Executor is gone.
44 #[error("The FDExecutor is gone")]
45 ExecutorGone,
46 /// Creating a context to wait on FDs failed.
47 #[error("An error creating the fd waiting context: {0}")]
48 CreatingContext(sys_util::Error),
49 /// PollContext failure.
50 #[error("PollContext failure: {0}")]
51 PollContextError(sys_util::Error),
52 /// An error occurred when setting the FD non-blocking.
53 #[error("An error occurred setting the FD non-blocking: {0}.")]
54 SettingNonBlocking(sys_util::Error),
55 /// Failed to submit the waker to the polling context.
56 #[error("An error adding to the Aio context: {0}")]
57 SubmittingWaker(sys_util::Error),
58 /// A Waker was canceled, but the operation isn't running.
59 #[error("Unknown waker")]
60 UnknownWaker,
61 }
62 pub type Result<T> = std::result::Result<T, Error>;
63
64 // A poll operation that has been submitted and is potentially being waited on.
65 struct OpData {
66 file: File,
67 waker: Option<Waker>,
68 }
69
70 // The current status of a submitted operation.
71 enum OpStatus {
72 Pending(OpData),
73 Completed,
74 }
75
76 // An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the
77 // associated executor.
78 pub struct RegisteredSource<F> {
79 source: F,
80 ex: Weak<RawExecutor>,
81 }
82
83 impl<F: AsRawFd> RegisteredSource<F> {
84 // Start an asynchronous operation to wait for this source to become readable. The returned
85 // future will not be ready until the source is readable.
wait_readable(&self) -> Result<PendingOperation>86 pub fn wait_readable(&self) -> Result<PendingOperation> {
87 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
88
89 let token =
90 ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?;
91
92 Ok(PendingOperation {
93 token: Some(token),
94 ex: self.ex.clone(),
95 })
96 }
97
98 // Start an asynchronous operation to wait for this source to become writable. The returned
99 // future will not be ready until the source is writable.
wait_writable(&self) -> Result<PendingOperation>100 pub fn wait_writable(&self) -> Result<PendingOperation> {
101 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
102
103 let token =
104 ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?;
105
106 Ok(PendingOperation {
107 token: Some(token),
108 ex: self.ex.clone(),
109 })
110 }
111 }
112
113 impl<F> RegisteredSource<F> {
114 // Consume this RegisteredSource and return the inner IO source.
into_source(self) -> F115 pub fn into_source(self) -> F {
116 self.source
117 }
118 }
119
120 impl<F> AsRef<F> for RegisteredSource<F> {
as_ref(&self) -> &F121 fn as_ref(&self) -> &F {
122 &self.source
123 }
124 }
125
126 impl<F> AsMut<F> for RegisteredSource<F> {
as_mut(&mut self) -> &mut F127 fn as_mut(&mut self) -> &mut F {
128 &mut self.source
129 }
130 }
131
132 /// A token returned from `add_operation` that can be used to cancel the waker before it completes.
133 /// Used to manage getting the result from the underlying executor for a completed operation.
134 /// Dropping a `PendingOperation` will get the result from the executor.
135 pub struct PendingOperation {
136 token: Option<WakerToken>,
137 ex: Weak<RawExecutor>,
138 }
139
140 impl Future for PendingOperation {
141 type Output = Result<()>;
142
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>143 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
144 let token = self
145 .token
146 .as_ref()
147 .expect("PendingOperation polled after returning Poll::Ready");
148 if let Some(ex) = self.ex.upgrade() {
149 if ex.is_ready(token, cx) {
150 self.token = None;
151 Poll::Ready(Ok(()))
152 } else {
153 Poll::Pending
154 }
155 } else {
156 Poll::Ready(Err(Error::ExecutorGone))
157 }
158 }
159 }
160
161 impl Drop for PendingOperation {
drop(&mut self)162 fn drop(&mut self) {
163 if let Some(token) = self.token.take() {
164 if let Some(ex) = self.ex.upgrade() {
165 let _ = ex.cancel_operation(token);
166 }
167 }
168 }
169 }
170
171 // This function exists to guarantee that non-epoll futures will not starve until an epoll future is
172 // ready to be polled again. The mechanism is very similar to the self-pipe trick used by C programs
173 // to reliably mix select / poll with signal handling. This is how it works:
174 //
175 // * RawExecutor::new creates an eventfd, dupes it, and spawns this async function with the duped fd.
176 // * The first time notify_task is polled it tries to read from the eventfd and if that fails, waits
177 // for the fd to become readable.
178 // * Meanwhile the RawExecutor keeps the original fd for the eventfd.
179 // * Whenever RawExecutor::wake is called it will write to the eventfd if it determines that the
180 // executor thread is currently blocked inside an io_epoll_enter call. This can happen when a
181 // non-epoll future becomes ready to poll.
182 // * The write to the eventfd causes the fd to become readable, which then allows the epoll() call
183 // to return with at least one readable fd.
184 // * The executor then polls the non-epoll future that became ready, any epoll futures that
185 // completed, and the notify_task function, which then queues up another read on the eventfd and
186 // the process can repeat.
notify_task(notify: EventFd, raw: Weak<RawExecutor>)187 async fn notify_task(notify: EventFd, raw: Weak<RawExecutor>) {
188 add_fd_flags(notify.as_raw_fd(), libc::O_NONBLOCK)
189 .expect("Failed to set notify EventFd as non-blocking");
190
191 loop {
192 match notify.read() {
193 Ok(_) => {}
194 Err(e) if e.errno() == libc::EWOULDBLOCK => {}
195 Err(e) => panic!("Unexpected error while reading notify EventFd: {}", e),
196 }
197
198 if let Some(ex) = raw.upgrade() {
199 let token = ex
200 .add_operation(notify.as_raw_fd(), WatchingEvents::empty().set_read())
201 .expect("Failed to add notify EventFd to PollCtx");
202
203 // We don't want to hold an active reference to the executor in the .await below.
204 mem::drop(ex);
205
206 let op = PendingOperation {
207 token: Some(token),
208 ex: raw.clone(),
209 };
210
211 match op.await {
212 Ok(()) => {}
213 Err(Error::ExecutorGone) => break,
214 Err(e) => panic!("Unexpected error while waiting for notify EventFd: {}", e),
215 }
216 } else {
217 // The executor is gone so we should also exit.
218 break;
219 }
220 }
221 }
222
223 // Indicates that the executor is either within or about to make a PollContext::wait() call. When a
224 // waker sees this value, it will write to the notify EventFd, which will cause the
225 // PollContext::wait() call to return.
226 const WAITING: i32 = 0x1d5b_c019u32 as i32;
227
228 // Indicates that the executor is processing any futures that are ready to run.
229 const PROCESSING: i32 = 0xd474_77bcu32 as i32;
230
231 // Indicates that one or more futures may be ready to make progress.
232 const WOKEN: i32 = 0x3e4d_3276u32 as i32;
233
234 struct RawExecutor {
235 queue: RunnableQueue,
236 poll_ctx: EpollContext<usize>,
237 ops: Mutex<Slab<OpStatus>>,
238 state: AtomicI32,
239 notify: EventFd,
240 }
241
242 impl RawExecutor {
new(notify: EventFd) -> Result<Self>243 fn new(notify: EventFd) -> Result<Self> {
244 Ok(RawExecutor {
245 queue: RunnableQueue::new(),
246 poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?,
247 ops: Mutex::new(Slab::with_capacity(64)),
248 state: AtomicI32::new(PROCESSING),
249 notify,
250 })
251 }
252
add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken>253 fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken> {
254 let duped_fd = unsafe {
255 // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
256 // will only be added to the poll loop.
257 File::from_raw_fd(dup_fd(fd)?)
258 };
259 let mut ops = self.ops.lock();
260 let entry = ops.vacant_entry();
261 let next_token = entry.key();
262 self.poll_ctx
263 .add_fd_with_events(&duped_fd, events, next_token)
264 .map_err(Error::SubmittingWaker)?;
265 entry.insert(OpStatus::Pending(OpData {
266 file: duped_fd,
267 waker: None,
268 }));
269 Ok(WakerToken(next_token))
270 }
271
wake(&self)272 fn wake(&self) {
273 let oldstate = self.state.swap(WOKEN, Ordering::Release);
274 if oldstate == WAITING {
275 if let Err(e) = self.notify.write(1) {
276 warn!("Failed to notify executor that a future is ready: {}", e);
277 }
278 }
279 }
280
spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,281 fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
282 where
283 F: Future + Send + 'static,
284 F::Output: Send + 'static,
285 {
286 let raw = Arc::downgrade(self);
287 let schedule = move |runnable| {
288 if let Some(r) = raw.upgrade() {
289 r.queue.push_back(runnable);
290 r.wake();
291 }
292 };
293 let (runnable, task) = async_task::spawn(f, schedule);
294 runnable.schedule();
295 task
296 }
297
spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,298 fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
299 where
300 F: Future + 'static,
301 F::Output: 'static,
302 {
303 let raw = Arc::downgrade(self);
304 let schedule = move |runnable| {
305 if let Some(r) = raw.upgrade() {
306 r.queue.push_back(runnable);
307 r.wake();
308 }
309 };
310 let (runnable, task) = async_task::spawn_local(f, schedule);
311 runnable.schedule();
312 task
313 }
314
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>315 fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
316 let events = EpollEvents::new();
317 pin_mut!(done);
318
319 loop {
320 self.state.store(PROCESSING, Ordering::Release);
321 for runnable in self.queue.iter() {
322 runnable.run();
323 }
324
325 if let Poll::Ready(val) = done.as_mut().poll(cx) {
326 return Ok(val);
327 }
328
329 let oldstate = self.state.compare_exchange(
330 PROCESSING,
331 WAITING,
332 Ordering::Acquire,
333 Ordering::Acquire,
334 );
335 if let Err(oldstate) = oldstate {
336 debug_assert_eq!(oldstate, WOKEN);
337 // One or more futures have become runnable.
338 continue;
339 }
340
341 let events = self
342 .poll_ctx
343 .wait(&events)
344 .map_err(Error::PollContextError)?;
345
346 // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
347 // writing to the eventfd.
348 self.state.store(PROCESSING, Ordering::Release);
349 for e in events.iter() {
350 let token = e.token();
351 let mut ops = self.ops.lock();
352
353 // The op could have been canceled and removed by another thread so ignore it if it
354 // doesn't exist.
355 if let Some(op) = ops.get_mut(token) {
356 let (file, waker) = match mem::replace(op, OpStatus::Completed) {
357 OpStatus::Pending(OpData { file, waker }) => (file, waker),
358 OpStatus::Completed => panic!("poll operation completed more than once"),
359 };
360
361 mem::drop(ops);
362
363 self.poll_ctx
364 .delete(&file)
365 .map_err(Error::PollContextError)?;
366
367 if let Some(waker) = waker {
368 waker.wake();
369 }
370 }
371 }
372 }
373 }
374
is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool375 fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
376 let mut ops = self.ops.lock();
377
378 let op = ops
379 .get_mut(token.0)
380 .expect("`is_ready` called on unknown operation");
381 match op {
382 OpStatus::Pending(data) => {
383 data.waker = Some(cx.waker().clone());
384 false
385 }
386 OpStatus::Completed => {
387 ops.remove(token.0);
388 true
389 }
390 }
391 }
392
393 // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken) -> Result<()>394 fn cancel_operation(&self, token: WakerToken) -> Result<()> {
395 match self.ops.lock().remove(token.0) {
396 OpStatus::Pending(data) => self
397 .poll_ctx
398 .delete(&data.file)
399 .map_err(Error::PollContextError),
400 OpStatus::Completed => Ok(()),
401 }
402 }
403 }
404
405 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)406 fn wake_by_ref(weak_self: &Weak<Self>) {
407 if let Some(arc_self) = weak_self.upgrade() {
408 RawExecutor::wake(&arc_self);
409 }
410 }
411 }
412
413 impl Drop for RawExecutor {
drop(&mut self)414 fn drop(&mut self) {
415 // Wake up the notify_task. We set the state to WAITING here so that wake() will write to
416 // the eventfd.
417 self.state.store(WAITING, Ordering::Release);
418 self.wake();
419
420 // Wake up any futures still waiting on poll operations as they are just going to get an
421 // ExecutorGone error now.
422 for op in self.ops.get_mut().drain() {
423 match op {
424 OpStatus::Pending(mut data) => {
425 if let Some(waker) = data.waker.take() {
426 waker.wake();
427 }
428
429 if let Err(e) = self.poll_ctx.delete(&data.file) {
430 warn!("Failed to remove file from EpollCtx: {}", e);
431 }
432 }
433 OpStatus::Completed => {}
434 }
435 }
436
437 // Now run the executor one more time to drive any remaining futures to completion.
438 let waker = noop_waker();
439 let mut cx = Context::from_waker(&waker);
440 if let Err(e) = self.run(&mut cx, async {}) {
441 warn!("Failed to drive FdExecutor to completion: {}", e);
442 }
443 }
444 }
445
446 #[derive(Clone)]
447 pub struct FdExecutor {
448 raw: Arc<RawExecutor>,
449 }
450
451 impl FdExecutor {
new() -> Result<FdExecutor>452 pub fn new() -> Result<FdExecutor> {
453 let notify = EventFd::new().map_err(Error::CreateEventFd)?;
454 let raw = notify
455 .try_clone()
456 .map_err(Error::CloneEventFd)
457 .and_then(RawExecutor::new)
458 .map(Arc::new)?;
459
460 raw.spawn(notify_task(notify, Arc::downgrade(&raw)))
461 .detach();
462
463 Ok(FdExecutor { raw })
464 }
465
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,466 pub fn spawn<F>(&self, f: F) -> Task<F::Output>
467 where
468 F: Future + Send + 'static,
469 F::Output: Send + 'static,
470 {
471 self.raw.spawn(f)
472 }
473
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,474 pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
475 where
476 F: Future + 'static,
477 F::Output: 'static,
478 {
479 self.raw.spawn_local(f)
480 }
481
run(&self) -> Result<()>482 pub fn run(&self) -> Result<()> {
483 let waker = new_waker(Arc::downgrade(&self.raw));
484 let mut cx = Context::from_waker(&waker);
485
486 self.raw.run(&mut cx, crate::empty::<()>())
487 }
488
run_until<F: Future>(&self, f: F) -> Result<F::Output>489 pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
490 let waker = new_waker(Arc::downgrade(&self.raw));
491 let mut ctx = Context::from_waker(&waker);
492
493 self.raw.run(&mut ctx, f)
494 }
495
register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>>496 pub(crate) fn register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>> {
497 add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
498 Ok(RegisteredSource {
499 source: f,
500 ex: Arc::downgrade(&self.raw),
501 })
502 }
503 }
504
505 // Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while
506 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>507 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
508 let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
509 if ret < 0 {
510 Err(Error::DuplicatingFd(sys_util::Error::last()))
511 } else {
512 Ok(ret)
513 }
514 }
515
516 #[cfg(test)]
517 mod test {
518 use std::cell::RefCell;
519 use std::io::{Read, Write};
520 use std::rc::Rc;
521
522 use futures::future::Either;
523
524 use super::*;
525
526 #[test]
test_it()527 fn test_it() {
528 async fn do_test(ex: &FdExecutor) {
529 let (r, _w) = sys_util::pipe(true).unwrap();
530 let done = Box::pin(async { 5usize });
531 let source = ex.register_source(r).unwrap();
532 let pending = source.wait_readable().unwrap();
533 match futures::future::select(pending, done).await {
534 Either::Right((5, pending)) => std::mem::drop(pending),
535 _ => panic!("unexpected select result"),
536 }
537 }
538
539 let ex = FdExecutor::new().unwrap();
540 ex.run_until(do_test(&ex)).unwrap();
541
542 // Example of starting the framework and running a future:
543 async fn my_async(x: Rc<RefCell<u64>>) {
544 x.replace(4);
545 }
546
547 let x = Rc::new(RefCell::new(0));
548 crate::run_one_poll(my_async(x.clone())).unwrap();
549 assert_eq!(*x.borrow(), 4);
550 }
551
552 #[test]
drop_before_completion()553 fn drop_before_completion() {
554 const VALUE: u64 = 0x66ae_cb65_12fb_d260;
555
556 async fn write_value(mut tx: File) {
557 let buf = VALUE.to_ne_bytes();
558 tx.write_all(&buf[..]).expect("Failed to write to pipe");
559 }
560
561 async fn check_op(op: PendingOperation) {
562 let err = op.await.expect_err("Task completed successfully");
563 match err {
564 Error::ExecutorGone => {}
565 e => panic!("Unexpected error from task: {}", e),
566 }
567 }
568
569 let (mut rx, tx) = sys_util::pipe(true).expect("Pipe failed");
570
571 let ex = FdExecutor::new().unwrap();
572
573 let source = ex.register_source(tx.try_clone().unwrap()).unwrap();
574 let op = source.wait_writable().unwrap();
575
576 ex.spawn_local(write_value(tx)).detach();
577 ex.spawn_local(check_op(op)).detach();
578
579 // Now drop the executor. It should still run until the write to the pipe is complete.
580 mem::drop(ex);
581
582 let mut buf = 0u64.to_ne_bytes();
583 rx.read_exact(&mut buf[..])
584 .expect("Failed to read from pipe");
585
586 assert_eq!(u64::from_ne_bytes(buf), VALUE);
587 }
588 }
589