• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! The executor runs all given futures to completion. Futures register wakers associated with file
6 //! descriptors. The wakers will be called when the FD becomes readable or writable depending on
7 //! the situation.
8 //!
9 //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
10 //! utility functions to combine futures.
11 
12 use std::fs::File;
13 use std::future::Future;
14 use std::mem;
15 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
16 use std::pin::Pin;
17 use std::sync::atomic::{AtomicI32, Ordering};
18 use std::sync::{Arc, Weak};
19 use std::task::{Context, Poll, Waker};
20 
21 use async_task::Task;
22 use futures::task::noop_waker;
23 use pin_utils::pin_mut;
24 use slab::Slab;
25 use sync::Mutex;
26 use sys_util::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
27 use thiserror::Error as ThisError;
28 
29 use crate::queue::RunnableQueue;
30 use crate::waker::{new_waker, WakerToken, WeakWake};
31 
32 #[derive(Debug, ThisError)]
33 pub enum Error {
34     /// Failed to clone the EventFd for waking the executor.
35     #[error("Failed to clone the EventFd for waking the executor: {0}")]
36     CloneEventFd(sys_util::Error),
37     /// Failed to create the EventFd for waking the executor.
38     #[error("Failed to create the EventFd for waking the executor: {0}")]
39     CreateEventFd(sys_util::Error),
40     /// Failed to copy the FD for the polling context.
41     #[error("Failed to copy the FD for the polling context: {0}")]
42     DuplicatingFd(sys_util::Error),
43     /// The Executor is gone.
44     #[error("The FDExecutor is gone")]
45     ExecutorGone,
46     /// Creating a context to wait on FDs failed.
47     #[error("An error creating the fd waiting context: {0}")]
48     CreatingContext(sys_util::Error),
49     /// PollContext failure.
50     #[error("PollContext failure: {0}")]
51     PollContextError(sys_util::Error),
52     /// An error occurred when setting the FD non-blocking.
53     #[error("An error occurred setting the FD non-blocking: {0}.")]
54     SettingNonBlocking(sys_util::Error),
55     /// Failed to submit the waker to the polling context.
56     #[error("An error adding to the Aio context: {0}")]
57     SubmittingWaker(sys_util::Error),
58     /// A Waker was canceled, but the operation isn't running.
59     #[error("Unknown waker")]
60     UnknownWaker,
61 }
62 pub type Result<T> = std::result::Result<T, Error>;
63 
64 // A poll operation that has been submitted and is potentially being waited on.
65 struct OpData {
66     file: File,
67     waker: Option<Waker>,
68 }
69 
70 // The current status of a submitted operation.
71 enum OpStatus {
72     Pending(OpData),
73     Completed,
74 }
75 
76 // An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the
77 // associated executor.
78 pub struct RegisteredSource<F> {
79     source: F,
80     ex: Weak<RawExecutor>,
81 }
82 
83 impl<F: AsRawFd> RegisteredSource<F> {
84     // Start an asynchronous operation to wait for this source to become readable. The returned
85     // future will not be ready until the source is readable.
wait_readable(&self) -> Result<PendingOperation>86     pub fn wait_readable(&self) -> Result<PendingOperation> {
87         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
88 
89         let token =
90             ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?;
91 
92         Ok(PendingOperation {
93             token: Some(token),
94             ex: self.ex.clone(),
95         })
96     }
97 
98     // Start an asynchronous operation to wait for this source to become writable. The returned
99     // future will not be ready until the source is writable.
wait_writable(&self) -> Result<PendingOperation>100     pub fn wait_writable(&self) -> Result<PendingOperation> {
101         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
102 
103         let token =
104             ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?;
105 
106         Ok(PendingOperation {
107             token: Some(token),
108             ex: self.ex.clone(),
109         })
110     }
111 }
112 
113 impl<F> RegisteredSource<F> {
114     // Consume this RegisteredSource and return the inner IO source.
into_source(self) -> F115     pub fn into_source(self) -> F {
116         self.source
117     }
118 }
119 
120 impl<F> AsRef<F> for RegisteredSource<F> {
as_ref(&self) -> &F121     fn as_ref(&self) -> &F {
122         &self.source
123     }
124 }
125 
126 impl<F> AsMut<F> for RegisteredSource<F> {
as_mut(&mut self) -> &mut F127     fn as_mut(&mut self) -> &mut F {
128         &mut self.source
129     }
130 }
131 
132 /// A token returned from `add_operation` that can be used to cancel the waker before it completes.
133 /// Used to manage getting the result from the underlying executor for a completed operation.
134 /// Dropping a `PendingOperation` will get the result from the executor.
135 pub struct PendingOperation {
136     token: Option<WakerToken>,
137     ex: Weak<RawExecutor>,
138 }
139 
140 impl Future for PendingOperation {
141     type Output = Result<()>;
142 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>143     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
144         let token = self
145             .token
146             .as_ref()
147             .expect("PendingOperation polled after returning Poll::Ready");
148         if let Some(ex) = self.ex.upgrade() {
149             if ex.is_ready(token, cx) {
150                 self.token = None;
151                 Poll::Ready(Ok(()))
152             } else {
153                 Poll::Pending
154             }
155         } else {
156             Poll::Ready(Err(Error::ExecutorGone))
157         }
158     }
159 }
160 
161 impl Drop for PendingOperation {
drop(&mut self)162     fn drop(&mut self) {
163         if let Some(token) = self.token.take() {
164             if let Some(ex) = self.ex.upgrade() {
165                 let _ = ex.cancel_operation(token);
166             }
167         }
168     }
169 }
170 
171 // This function exists to guarantee that non-epoll futures will not starve until an epoll future is
172 // ready to be polled again. The mechanism is very similar to the self-pipe trick used by C programs
173 // to reliably mix select / poll with signal handling. This is how it works:
174 //
175 // * RawExecutor::new creates an eventfd, dupes it, and spawns this async function with the duped fd.
176 // * The first time notify_task is polled it tries to read from the eventfd and if that fails, waits
177 //   for the fd to become readable.
178 // * Meanwhile the RawExecutor keeps the original fd for the eventfd.
179 // * Whenever RawExecutor::wake is called it will write to the eventfd if it determines that the
180 //   executor thread is currently blocked inside an io_epoll_enter call. This can happen when a
181 //   non-epoll future becomes ready to poll.
182 // * The write to the eventfd causes the fd to become readable, which then allows the epoll() call
183 //   to return with at least one readable fd.
184 // * The executor then polls the non-epoll future that became ready, any epoll futures that
185 //   completed, and the notify_task function, which then queues up another read on the eventfd and
186 //   the process can repeat.
notify_task(notify: EventFd, raw: Weak<RawExecutor>)187 async fn notify_task(notify: EventFd, raw: Weak<RawExecutor>) {
188     add_fd_flags(notify.as_raw_fd(), libc::O_NONBLOCK)
189         .expect("Failed to set notify EventFd as non-blocking");
190 
191     loop {
192         match notify.read() {
193             Ok(_) => {}
194             Err(e) if e.errno() == libc::EWOULDBLOCK => {}
195             Err(e) => panic!("Unexpected error while reading notify EventFd: {}", e),
196         }
197 
198         if let Some(ex) = raw.upgrade() {
199             let token = ex
200                 .add_operation(notify.as_raw_fd(), WatchingEvents::empty().set_read())
201                 .expect("Failed to add notify EventFd to PollCtx");
202 
203             // We don't want to hold an active reference to the executor in the .await below.
204             mem::drop(ex);
205 
206             let op = PendingOperation {
207                 token: Some(token),
208                 ex: raw.clone(),
209             };
210 
211             match op.await {
212                 Ok(()) => {}
213                 Err(Error::ExecutorGone) => break,
214                 Err(e) => panic!("Unexpected error while waiting for notify EventFd: {}", e),
215             }
216         } else {
217             // The executor is gone so we should also exit.
218             break;
219         }
220     }
221 }
222 
223 // Indicates that the executor is either within or about to make a PollContext::wait() call. When a
224 // waker sees this value, it will write to the notify EventFd, which will cause the
225 // PollContext::wait() call to return.
226 const WAITING: i32 = 0x1d5b_c019u32 as i32;
227 
228 // Indicates that the executor is processing any futures that are ready to run.
229 const PROCESSING: i32 = 0xd474_77bcu32 as i32;
230 
231 // Indicates that one or more futures may be ready to make progress.
232 const WOKEN: i32 = 0x3e4d_3276u32 as i32;
233 
234 struct RawExecutor {
235     queue: RunnableQueue,
236     poll_ctx: EpollContext<usize>,
237     ops: Mutex<Slab<OpStatus>>,
238     state: AtomicI32,
239     notify: EventFd,
240 }
241 
242 impl RawExecutor {
new(notify: EventFd) -> Result<Self>243     fn new(notify: EventFd) -> Result<Self> {
244         Ok(RawExecutor {
245             queue: RunnableQueue::new(),
246             poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?,
247             ops: Mutex::new(Slab::with_capacity(64)),
248             state: AtomicI32::new(PROCESSING),
249             notify,
250         })
251     }
252 
add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken>253     fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken> {
254         let duped_fd = unsafe {
255             // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
256             // will only be added to the poll loop.
257             File::from_raw_fd(dup_fd(fd)?)
258         };
259         let mut ops = self.ops.lock();
260         let entry = ops.vacant_entry();
261         let next_token = entry.key();
262         self.poll_ctx
263             .add_fd_with_events(&duped_fd, events, next_token)
264             .map_err(Error::SubmittingWaker)?;
265         entry.insert(OpStatus::Pending(OpData {
266             file: duped_fd,
267             waker: None,
268         }));
269         Ok(WakerToken(next_token))
270     }
271 
wake(&self)272     fn wake(&self) {
273         let oldstate = self.state.swap(WOKEN, Ordering::Release);
274         if oldstate == WAITING {
275             if let Err(e) = self.notify.write(1) {
276                 warn!("Failed to notify executor that a future is ready: {}", e);
277             }
278         }
279     }
280 
spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,281     fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
282     where
283         F: Future + Send + 'static,
284         F::Output: Send + 'static,
285     {
286         let raw = Arc::downgrade(self);
287         let schedule = move |runnable| {
288             if let Some(r) = raw.upgrade() {
289                 r.queue.push_back(runnable);
290                 r.wake();
291             }
292         };
293         let (runnable, task) = async_task::spawn(f, schedule);
294         runnable.schedule();
295         task
296     }
297 
spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,298     fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
299     where
300         F: Future + 'static,
301         F::Output: 'static,
302     {
303         let raw = Arc::downgrade(self);
304         let schedule = move |runnable| {
305             if let Some(r) = raw.upgrade() {
306                 r.queue.push_back(runnable);
307                 r.wake();
308             }
309         };
310         let (runnable, task) = async_task::spawn_local(f, schedule);
311         runnable.schedule();
312         task
313     }
314 
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>315     fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
316         let events = EpollEvents::new();
317         pin_mut!(done);
318 
319         loop {
320             self.state.store(PROCESSING, Ordering::Release);
321             for runnable in self.queue.iter() {
322                 runnable.run();
323             }
324 
325             if let Poll::Ready(val) = done.as_mut().poll(cx) {
326                 return Ok(val);
327             }
328 
329             let oldstate = self.state.compare_exchange(
330                 PROCESSING,
331                 WAITING,
332                 Ordering::Acquire,
333                 Ordering::Acquire,
334             );
335             if let Err(oldstate) = oldstate {
336                 debug_assert_eq!(oldstate, WOKEN);
337                 // One or more futures have become runnable.
338                 continue;
339             }
340 
341             let events = self
342                 .poll_ctx
343                 .wait(&events)
344                 .map_err(Error::PollContextError)?;
345 
346             // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
347             // writing to the eventfd.
348             self.state.store(PROCESSING, Ordering::Release);
349             for e in events.iter() {
350                 let token = e.token();
351                 let mut ops = self.ops.lock();
352 
353                 // The op could have been canceled and removed by another thread so ignore it if it
354                 // doesn't exist.
355                 if let Some(op) = ops.get_mut(token) {
356                     let (file, waker) = match mem::replace(op, OpStatus::Completed) {
357                         OpStatus::Pending(OpData { file, waker }) => (file, waker),
358                         OpStatus::Completed => panic!("poll operation completed more than once"),
359                     };
360 
361                     mem::drop(ops);
362 
363                     self.poll_ctx
364                         .delete(&file)
365                         .map_err(Error::PollContextError)?;
366 
367                     if let Some(waker) = waker {
368                         waker.wake();
369                     }
370                 }
371             }
372         }
373     }
374 
is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool375     fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
376         let mut ops = self.ops.lock();
377 
378         let op = ops
379             .get_mut(token.0)
380             .expect("`is_ready` called on unknown operation");
381         match op {
382             OpStatus::Pending(data) => {
383                 data.waker = Some(cx.waker().clone());
384                 false
385             }
386             OpStatus::Completed => {
387                 ops.remove(token.0);
388                 true
389             }
390         }
391     }
392 
393     // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken) -> Result<()>394     fn cancel_operation(&self, token: WakerToken) -> Result<()> {
395         match self.ops.lock().remove(token.0) {
396             OpStatus::Pending(data) => self
397                 .poll_ctx
398                 .delete(&data.file)
399                 .map_err(Error::PollContextError),
400             OpStatus::Completed => Ok(()),
401         }
402     }
403 }
404 
405 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)406     fn wake_by_ref(weak_self: &Weak<Self>) {
407         if let Some(arc_self) = weak_self.upgrade() {
408             RawExecutor::wake(&arc_self);
409         }
410     }
411 }
412 
413 impl Drop for RawExecutor {
drop(&mut self)414     fn drop(&mut self) {
415         // Wake up the notify_task. We set the state to WAITING here so that wake() will write to
416         // the eventfd.
417         self.state.store(WAITING, Ordering::Release);
418         self.wake();
419 
420         // Wake up any futures still waiting on poll operations as they are just going to get an
421         // ExecutorGone error now.
422         for op in self.ops.get_mut().drain() {
423             match op {
424                 OpStatus::Pending(mut data) => {
425                     if let Some(waker) = data.waker.take() {
426                         waker.wake();
427                     }
428 
429                     if let Err(e) = self.poll_ctx.delete(&data.file) {
430                         warn!("Failed to remove file from EpollCtx: {}", e);
431                     }
432                 }
433                 OpStatus::Completed => {}
434             }
435         }
436 
437         // Now run the executor one more time to drive any remaining futures to completion.
438         let waker = noop_waker();
439         let mut cx = Context::from_waker(&waker);
440         if let Err(e) = self.run(&mut cx, async {}) {
441             warn!("Failed to drive FdExecutor to completion: {}", e);
442         }
443     }
444 }
445 
446 #[derive(Clone)]
447 pub struct FdExecutor {
448     raw: Arc<RawExecutor>,
449 }
450 
451 impl FdExecutor {
new() -> Result<FdExecutor>452     pub fn new() -> Result<FdExecutor> {
453         let notify = EventFd::new().map_err(Error::CreateEventFd)?;
454         let raw = notify
455             .try_clone()
456             .map_err(Error::CloneEventFd)
457             .and_then(RawExecutor::new)
458             .map(Arc::new)?;
459 
460         raw.spawn(notify_task(notify, Arc::downgrade(&raw)))
461             .detach();
462 
463         Ok(FdExecutor { raw })
464     }
465 
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,466     pub fn spawn<F>(&self, f: F) -> Task<F::Output>
467     where
468         F: Future + Send + 'static,
469         F::Output: Send + 'static,
470     {
471         self.raw.spawn(f)
472     }
473 
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,474     pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
475     where
476         F: Future + 'static,
477         F::Output: 'static,
478     {
479         self.raw.spawn_local(f)
480     }
481 
run(&self) -> Result<()>482     pub fn run(&self) -> Result<()> {
483         let waker = new_waker(Arc::downgrade(&self.raw));
484         let mut cx = Context::from_waker(&waker);
485 
486         self.raw.run(&mut cx, crate::empty::<()>())
487     }
488 
run_until<F: Future>(&self, f: F) -> Result<F::Output>489     pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
490         let waker = new_waker(Arc::downgrade(&self.raw));
491         let mut ctx = Context::from_waker(&waker);
492 
493         self.raw.run(&mut ctx, f)
494     }
495 
register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>>496     pub(crate) fn register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>> {
497         add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
498         Ok(RegisteredSource {
499             source: f,
500             ex: Arc::downgrade(&self.raw),
501         })
502     }
503 }
504 
505 // Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while
506 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>507 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
508     let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
509     if ret < 0 {
510         Err(Error::DuplicatingFd(sys_util::Error::last()))
511     } else {
512         Ok(ret)
513     }
514 }
515 
516 #[cfg(test)]
517 mod test {
518     use std::cell::RefCell;
519     use std::io::{Read, Write};
520     use std::rc::Rc;
521 
522     use futures::future::Either;
523 
524     use super::*;
525 
526     #[test]
test_it()527     fn test_it() {
528         async fn do_test(ex: &FdExecutor) {
529             let (r, _w) = sys_util::pipe(true).unwrap();
530             let done = Box::pin(async { 5usize });
531             let source = ex.register_source(r).unwrap();
532             let pending = source.wait_readable().unwrap();
533             match futures::future::select(pending, done).await {
534                 Either::Right((5, pending)) => std::mem::drop(pending),
535                 _ => panic!("unexpected select result"),
536             }
537         }
538 
539         let ex = FdExecutor::new().unwrap();
540         ex.run_until(do_test(&ex)).unwrap();
541 
542         // Example of starting the framework and running a future:
543         async fn my_async(x: Rc<RefCell<u64>>) {
544             x.replace(4);
545         }
546 
547         let x = Rc::new(RefCell::new(0));
548         crate::run_one_poll(my_async(x.clone())).unwrap();
549         assert_eq!(*x.borrow(), 4);
550     }
551 
552     #[test]
drop_before_completion()553     fn drop_before_completion() {
554         const VALUE: u64 = 0x66ae_cb65_12fb_d260;
555 
556         async fn write_value(mut tx: File) {
557             let buf = VALUE.to_ne_bytes();
558             tx.write_all(&buf[..]).expect("Failed to write to pipe");
559         }
560 
561         async fn check_op(op: PendingOperation) {
562             let err = op.await.expect_err("Task completed successfully");
563             match err {
564                 Error::ExecutorGone => {}
565                 e => panic!("Unexpected error from task: {}", e),
566             }
567         }
568 
569         let (mut rx, tx) = sys_util::pipe(true).expect("Pipe failed");
570 
571         let ex = FdExecutor::new().unwrap();
572 
573         let source = ex.register_source(tx.try_clone().unwrap()).unwrap();
574         let op = source.wait_writable().unwrap();
575 
576         ex.spawn_local(write_value(tx)).detach();
577         ex.spawn_local(check_op(op)).detach();
578 
579         // Now drop the executor. It should still run until the write to the pipe is complete.
580         mem::drop(ex);
581 
582         let mut buf = 0u64.to_ne_bytes();
583         rx.read_exact(&mut buf[..])
584             .expect("Failed to read from pipe");
585 
586         assert_eq!(u64::from_ne_bytes(buf), VALUE);
587     }
588 }
589