• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! The executor runs all given futures to completion. Futures register wakers associated with file
6 //! descriptors. The wakers will be called when the FD becomes readable or writable depending on
7 //! the situation.
8 //!
9 //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
10 //! utility functions to combine futures.
11 
12 use std::{
13     fs::File,
14     future::Future,
15     io, mem,
16     os::unix::io::{AsRawFd, FromRawFd, RawFd},
17     pin::Pin,
18     sync::{
19         atomic::{AtomicI32, Ordering},
20         Arc, Weak,
21     },
22     task::{Context, Poll, Waker},
23 };
24 
25 use async_task::Task;
26 use base::{add_fd_flags, warn, EpollContext, EpollEvents, EventFd, WatchingEvents};
27 use futures::task::noop_waker;
28 use pin_utils::pin_mut;
29 use remain::sorted;
30 use slab::Slab;
31 use sync::Mutex;
32 use thiserror::Error as ThisError;
33 
34 use super::{
35     queue::RunnableQueue,
36     waker::{new_waker, WakerToken, WeakWake},
37     BlockingPool,
38 };
39 
40 #[sorted]
41 #[derive(Debug, ThisError)]
42 pub enum Error {
43     /// Failed to clone the EventFd for waking the executor.
44     #[error("Failed to clone the EventFd for waking the executor: {0}")]
45     CloneEventFd(base::Error),
46     /// Failed to create the EventFd for waking the executor.
47     #[error("Failed to create the EventFd for waking the executor: {0}")]
48     CreateEventFd(base::Error),
49     /// Creating a context to wait on FDs failed.
50     #[error("An error creating the fd waiting context: {0}")]
51     CreatingContext(base::Error),
52     /// Failed to copy the FD for the polling context.
53     #[error("Failed to copy the FD for the polling context: {0}")]
54     DuplicatingFd(base::Error),
55     /// The Executor is gone.
56     #[error("The FDExecutor is gone")]
57     ExecutorGone,
58     /// PollContext failure.
59     #[error("PollContext failure: {0}")]
60     PollContextError(base::Error),
61     /// An error occurred when setting the FD non-blocking.
62     #[error("An error occurred setting the FD non-blocking: {0}.")]
63     SettingNonBlocking(base::Error),
64     /// Failed to submit the waker to the polling context.
65     #[error("An error adding to the Aio context: {0}")]
66     SubmittingWaker(base::Error),
67     /// A Waker was canceled, but the operation isn't running.
68     #[error("Unknown waker")]
69     UnknownWaker,
70 }
71 pub type Result<T> = std::result::Result<T, Error>;
72 
73 impl From<Error> for io::Error {
from(e: Error) -> Self74     fn from(e: Error) -> Self {
75         use Error::*;
76         match e {
77             CloneEventFd(e) => e.into(),
78             CreateEventFd(e) => e.into(),
79             DuplicatingFd(e) => e.into(),
80             ExecutorGone => io::Error::new(io::ErrorKind::Other, e),
81             CreatingContext(e) => e.into(),
82             PollContextError(e) => e.into(),
83             SettingNonBlocking(e) => e.into(),
84             SubmittingWaker(e) => e.into(),
85             UnknownWaker => io::Error::new(io::ErrorKind::Other, e),
86         }
87     }
88 }
89 
90 // A poll operation that has been submitted and is potentially being waited on.
91 struct OpData {
92     file: File,
93     waker: Option<Waker>,
94 }
95 
96 // The current status of a submitted operation.
97 enum OpStatus {
98     Pending(OpData),
99     Completed,
100 }
101 
102 // An IO source previously registered with an FdExecutor. Used to initiate asynchronous IO with the
103 // associated executor.
104 pub struct RegisteredSource<F> {
105     source: F,
106     ex: Weak<RawExecutor>,
107 }
108 
109 impl<F: AsRawFd> RegisteredSource<F> {
110     // Start an asynchronous operation to wait for this source to become readable. The returned
111     // future will not be ready until the source is readable.
wait_readable(&self) -> Result<PendingOperation>112     pub fn wait_readable(&self) -> Result<PendingOperation> {
113         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
114 
115         let token =
116             ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_read())?;
117 
118         Ok(PendingOperation {
119             token: Some(token),
120             ex: self.ex.clone(),
121         })
122     }
123 
124     // Start an asynchronous operation to wait for this source to become writable. The returned
125     // future will not be ready until the source is writable.
wait_writable(&self) -> Result<PendingOperation>126     pub fn wait_writable(&self) -> Result<PendingOperation> {
127         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
128 
129         let token =
130             ex.add_operation(self.source.as_raw_fd(), WatchingEvents::empty().set_write())?;
131 
132         Ok(PendingOperation {
133             token: Some(token),
134             ex: self.ex.clone(),
135         })
136     }
137 }
138 
139 impl<F> RegisteredSource<F> {
140     // Consume this RegisteredSource and return the inner IO source.
into_source(self) -> F141     pub fn into_source(self) -> F {
142         self.source
143     }
144 }
145 
146 impl<F> AsRef<F> for RegisteredSource<F> {
as_ref(&self) -> &F147     fn as_ref(&self) -> &F {
148         &self.source
149     }
150 }
151 
152 impl<F> AsMut<F> for RegisteredSource<F> {
as_mut(&mut self) -> &mut F153     fn as_mut(&mut self) -> &mut F {
154         &mut self.source
155     }
156 }
157 
158 /// A token returned from `add_operation` that can be used to cancel the waker before it completes.
159 /// Used to manage getting the result from the underlying executor for a completed operation.
160 /// Dropping a `PendingOperation` will get the result from the executor.
161 pub struct PendingOperation {
162     token: Option<WakerToken>,
163     ex: Weak<RawExecutor>,
164 }
165 
166 impl Future for PendingOperation {
167     type Output = Result<()>;
168 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>169     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
170         let token = self
171             .token
172             .as_ref()
173             .expect("PendingOperation polled after returning Poll::Ready");
174         if let Some(ex) = self.ex.upgrade() {
175             if ex.is_ready(token, cx) {
176                 self.token = None;
177                 Poll::Ready(Ok(()))
178             } else {
179                 Poll::Pending
180             }
181         } else {
182             Poll::Ready(Err(Error::ExecutorGone))
183         }
184     }
185 }
186 
187 impl Drop for PendingOperation {
drop(&mut self)188     fn drop(&mut self) {
189         if let Some(token) = self.token.take() {
190             if let Some(ex) = self.ex.upgrade() {
191                 let _ = ex.cancel_operation(token);
192             }
193         }
194     }
195 }
196 
197 // This function exists to guarantee that non-epoll futures will not starve until an epoll future is
198 // ready to be polled again. The mechanism is very similar to the self-pipe trick used by C programs
199 // to reliably mix select / poll with signal handling. This is how it works:
200 //
201 // * RawExecutor::new creates an eventfd, dupes it, and spawns this async function with the duped fd.
202 // * The first time notify_task is polled it tries to read from the eventfd and if that fails, waits
203 //   for the fd to become readable.
204 // * Meanwhile the RawExecutor keeps the original fd for the eventfd.
205 // * Whenever RawExecutor::wake is called it will write to the eventfd if it determines that the
206 //   executor thread is currently blocked inside an io_epoll_enter call. This can happen when a
207 //   non-epoll future becomes ready to poll.
208 // * The write to the eventfd causes the fd to become readable, which then allows the epoll() call
209 //   to return with at least one readable fd.
210 // * The executor then polls the non-epoll future that became ready, any epoll futures that
211 //   completed, and the notify_task function, which then queues up another read on the eventfd and
212 //   the process can repeat.
notify_task(notify: EventFd, raw: Weak<RawExecutor>)213 async fn notify_task(notify: EventFd, raw: Weak<RawExecutor>) {
214     add_fd_flags(notify.as_raw_fd(), libc::O_NONBLOCK)
215         .expect("Failed to set notify EventFd as non-blocking");
216 
217     loop {
218         match notify.read() {
219             Ok(_) => {}
220             Err(e) if e.errno() == libc::EWOULDBLOCK => {}
221             Err(e) => panic!("Unexpected error while reading notify EventFd: {}", e),
222         }
223 
224         if let Some(ex) = raw.upgrade() {
225             let token = ex
226                 .add_operation(notify.as_raw_fd(), WatchingEvents::empty().set_read())
227                 .expect("Failed to add notify EventFd to PollCtx");
228 
229             // We don't want to hold an active reference to the executor in the .await below.
230             mem::drop(ex);
231 
232             let op = PendingOperation {
233                 token: Some(token),
234                 ex: raw.clone(),
235             };
236 
237             match op.await {
238                 Ok(()) => {}
239                 Err(Error::ExecutorGone) => break,
240                 Err(e) => panic!("Unexpected error while waiting for notify EventFd: {}", e),
241             }
242         } else {
243             // The executor is gone so we should also exit.
244             break;
245         }
246     }
247 }
248 
249 // Indicates that the executor is either within or about to make a PollContext::wait() call. When a
250 // waker sees this value, it will write to the notify EventFd, which will cause the
251 // PollContext::wait() call to return.
252 const WAITING: i32 = 0x1d5b_c019u32 as i32;
253 
254 // Indicates that the executor is processing any futures that are ready to run.
255 const PROCESSING: i32 = 0xd474_77bcu32 as i32;
256 
257 // Indicates that one or more futures may be ready to make progress.
258 const WOKEN: i32 = 0x3e4d_3276u32 as i32;
259 
260 struct RawExecutor {
261     queue: RunnableQueue,
262     poll_ctx: EpollContext<usize>,
263     ops: Mutex<Slab<OpStatus>>,
264     blocking_pool: BlockingPool,
265     state: AtomicI32,
266     notify: EventFd,
267 }
268 
269 impl RawExecutor {
new(notify: EventFd) -> Result<Self>270     fn new(notify: EventFd) -> Result<Self> {
271         Ok(RawExecutor {
272             queue: RunnableQueue::new(),
273             poll_ctx: EpollContext::new().map_err(Error::CreatingContext)?,
274             ops: Mutex::new(Slab::with_capacity(64)),
275             blocking_pool: Default::default(),
276             state: AtomicI32::new(PROCESSING),
277             notify,
278         })
279     }
280 
add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken>281     fn add_operation(&self, fd: RawFd, events: WatchingEvents) -> Result<WakerToken> {
282         let duped_fd = unsafe {
283             // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
284             // will only be added to the poll loop.
285             File::from_raw_fd(dup_fd(fd)?)
286         };
287         let mut ops = self.ops.lock();
288         let entry = ops.vacant_entry();
289         let next_token = entry.key();
290         self.poll_ctx
291             .add_fd_with_events(&duped_fd, events, next_token)
292             .map_err(Error::SubmittingWaker)?;
293         entry.insert(OpStatus::Pending(OpData {
294             file: duped_fd,
295             waker: None,
296         }));
297         Ok(WakerToken(next_token))
298     }
299 
wake(&self)300     fn wake(&self) {
301         let oldstate = self.state.swap(WOKEN, Ordering::Release);
302         if oldstate == WAITING {
303             if let Err(e) = self.notify.write(1) {
304                 warn!("Failed to notify executor that a future is ready: {}", e);
305             }
306         }
307     }
308 
spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,309     fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
310     where
311         F: Future + Send + 'static,
312         F::Output: Send + 'static,
313     {
314         let raw = Arc::downgrade(self);
315         let schedule = move |runnable| {
316             if let Some(r) = raw.upgrade() {
317                 r.queue.push_back(runnable);
318                 r.wake();
319             }
320         };
321         let (runnable, task) = async_task::spawn(f, schedule);
322         runnable.schedule();
323         task
324     }
325 
spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,326     fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
327     where
328         F: Future + 'static,
329         F::Output: 'static,
330     {
331         let raw = Arc::downgrade(self);
332         let schedule = move |runnable| {
333             if let Some(r) = raw.upgrade() {
334                 r.queue.push_back(runnable);
335                 r.wake();
336             }
337         };
338         let (runnable, task) = async_task::spawn_local(f, schedule);
339         runnable.schedule();
340         task
341     }
342 
spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> Task<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,343     fn spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> Task<R>
344     where
345         F: FnOnce() -> R + Send + 'static,
346         R: Send + 'static,
347     {
348         self.blocking_pool.spawn(f)
349     }
350 
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>351     fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
352         let events = EpollEvents::new();
353         pin_mut!(done);
354 
355         loop {
356             self.state.store(PROCESSING, Ordering::Release);
357             for runnable in self.queue.iter() {
358                 runnable.run();
359             }
360 
361             if let Poll::Ready(val) = done.as_mut().poll(cx) {
362                 return Ok(val);
363             }
364 
365             let oldstate = self.state.compare_exchange(
366                 PROCESSING,
367                 WAITING,
368                 Ordering::Acquire,
369                 Ordering::Acquire,
370             );
371             if let Err(oldstate) = oldstate {
372                 debug_assert_eq!(oldstate, WOKEN);
373                 // One or more futures have become runnable.
374                 continue;
375             }
376 
377             let events = self
378                 .poll_ctx
379                 .wait(&events)
380                 .map_err(Error::PollContextError)?;
381 
382             // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
383             // writing to the eventfd.
384             self.state.store(PROCESSING, Ordering::Release);
385             for e in events.iter() {
386                 let token = e.token();
387                 let mut ops = self.ops.lock();
388 
389                 // The op could have been canceled and removed by another thread so ignore it if it
390                 // doesn't exist.
391                 if let Some(op) = ops.get_mut(token) {
392                     let (file, waker) = match mem::replace(op, OpStatus::Completed) {
393                         OpStatus::Pending(OpData { file, waker }) => (file, waker),
394                         OpStatus::Completed => panic!("poll operation completed more than once"),
395                     };
396 
397                     mem::drop(ops);
398 
399                     self.poll_ctx
400                         .delete(&file)
401                         .map_err(Error::PollContextError)?;
402 
403                     if let Some(waker) = waker {
404                         waker.wake();
405                     }
406                 }
407             }
408         }
409     }
410 
is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool411     fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
412         let mut ops = self.ops.lock();
413 
414         let op = ops
415             .get_mut(token.0)
416             .expect("`is_ready` called on unknown operation");
417         match op {
418             OpStatus::Pending(data) => {
419                 data.waker = Some(cx.waker().clone());
420                 false
421             }
422             OpStatus::Completed => {
423                 ops.remove(token.0);
424                 true
425             }
426         }
427     }
428 
429     // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken) -> Result<()>430     fn cancel_operation(&self, token: WakerToken) -> Result<()> {
431         match self.ops.lock().remove(token.0) {
432             OpStatus::Pending(data) => self
433                 .poll_ctx
434                 .delete(&data.file)
435                 .map_err(Error::PollContextError),
436             OpStatus::Completed => Ok(()),
437         }
438     }
439 }
440 
441 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)442     fn wake_by_ref(weak_self: &Weak<Self>) {
443         if let Some(arc_self) = weak_self.upgrade() {
444             RawExecutor::wake(&arc_self);
445         }
446     }
447 }
448 
449 impl Drop for RawExecutor {
drop(&mut self)450     fn drop(&mut self) {
451         // Wake up the notify_task. We set the state to WAITING here so that wake() will write to
452         // the eventfd.
453         self.state.store(WAITING, Ordering::Release);
454         self.wake();
455 
456         // Wake up any futures still waiting on poll operations as they are just going to get an
457         // ExecutorGone error now.
458         for op in self.ops.get_mut().drain() {
459             match op {
460                 OpStatus::Pending(mut data) => {
461                     if let Some(waker) = data.waker.take() {
462                         waker.wake();
463                     }
464 
465                     if let Err(e) = self.poll_ctx.delete(&data.file) {
466                         warn!("Failed to remove file from EpollCtx: {}", e);
467                     }
468                 }
469                 OpStatus::Completed => {}
470             }
471         }
472 
473         // Now run the executor one more time to drive any remaining futures to completion.
474         let waker = noop_waker();
475         let mut cx = Context::from_waker(&waker);
476         if let Err(e) = self.run(&mut cx, async {}) {
477             warn!("Failed to drive FdExecutor to completion: {}", e);
478         }
479     }
480 }
481 
482 #[derive(Clone)]
483 pub struct FdExecutor {
484     raw: Arc<RawExecutor>,
485 }
486 
487 impl FdExecutor {
new() -> Result<FdExecutor>488     pub fn new() -> Result<FdExecutor> {
489         let notify = EventFd::new().map_err(Error::CreateEventFd)?;
490         let raw = notify
491             .try_clone()
492             .map_err(Error::CloneEventFd)
493             .and_then(RawExecutor::new)
494             .map(Arc::new)?;
495 
496         raw.spawn(notify_task(notify, Arc::downgrade(&raw)))
497             .detach();
498 
499         Ok(FdExecutor { raw })
500     }
501 
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,502     pub fn spawn<F>(&self, f: F) -> Task<F::Output>
503     where
504         F: Future + Send + 'static,
505         F::Output: Send + 'static,
506     {
507         self.raw.spawn(f)
508     }
509 
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,510     pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
511     where
512         F: Future + 'static,
513         F::Output: 'static,
514     {
515         self.raw.spawn_local(f)
516     }
517 
spawn_blocking<F, R>(&self, f: F) -> Task<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,518     pub fn spawn_blocking<F, R>(&self, f: F) -> Task<R>
519     where
520         F: FnOnce() -> R + Send + 'static,
521         R: Send + 'static,
522     {
523         self.raw.spawn_blocking(f)
524     }
525 
run(&self) -> Result<()>526     pub fn run(&self) -> Result<()> {
527         let waker = new_waker(Arc::downgrade(&self.raw));
528         let mut cx = Context::from_waker(&waker);
529 
530         self.raw.run(&mut cx, super::empty::<()>())
531     }
532 
run_until<F: Future>(&self, f: F) -> Result<F::Output>533     pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
534         let waker = new_waker(Arc::downgrade(&self.raw));
535         let mut ctx = Context::from_waker(&waker);
536 
537         self.raw.run(&mut ctx, f)
538     }
539 
register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>>540     pub(crate) fn register_source<F: AsRawFd>(&self, f: F) -> Result<RegisteredSource<F>> {
541         add_fd_flags(f.as_raw_fd(), libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
542         Ok(RegisteredSource {
543             source: f,
544             ex: Arc::downgrade(&self.raw),
545         })
546     }
547 }
548 
549 // Used to `dup` the FDs passed to the executor so there is a guarantee they aren't closed while
550 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>551 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
552     let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
553     if ret < 0 {
554         Err(Error::DuplicatingFd(base::Error::last()))
555     } else {
556         Ok(ret)
557     }
558 }
559 
560 #[cfg(test)]
561 mod test {
562     use std::{
563         cell::RefCell,
564         io::{Read, Write},
565         rc::Rc,
566     };
567 
568     use futures::future::Either;
569 
570     use super::*;
571 
572     #[test]
test_it()573     fn test_it() {
574         async fn do_test(ex: &FdExecutor) {
575             let (r, _w) = base::pipe(true).unwrap();
576             let done = Box::pin(async { 5usize });
577             let source = ex.register_source(r).unwrap();
578             let pending = source.wait_readable().unwrap();
579             match futures::future::select(pending, done).await {
580                 Either::Right((5, pending)) => std::mem::drop(pending),
581                 _ => panic!("unexpected select result"),
582             }
583         }
584 
585         let ex = FdExecutor::new().unwrap();
586         ex.run_until(do_test(&ex)).unwrap();
587 
588         // Example of starting the framework and running a future:
589         async fn my_async(x: Rc<RefCell<u64>>) {
590             x.replace(4);
591         }
592 
593         let x = Rc::new(RefCell::new(0));
594         super::super::run_one_poll(my_async(x.clone())).unwrap();
595         assert_eq!(*x.borrow(), 4);
596     }
597 
598     #[test]
drop_before_completion()599     fn drop_before_completion() {
600         const VALUE: u64 = 0x66ae_cb65_12fb_d260;
601 
602         async fn write_value(mut tx: File) {
603             let buf = VALUE.to_ne_bytes();
604             tx.write_all(&buf[..]).expect("Failed to write to pipe");
605         }
606 
607         async fn check_op(op: PendingOperation) {
608             let err = op.await.expect_err("Task completed successfully");
609             match err {
610                 Error::ExecutorGone => {}
611                 e => panic!("Unexpected error from task: {}", e),
612             }
613         }
614 
615         let (mut rx, tx) = base::pipe(true).expect("Pipe failed");
616 
617         let ex = FdExecutor::new().unwrap();
618 
619         let source = ex.register_source(tx.try_clone().unwrap()).unwrap();
620         let op = source.wait_writable().unwrap();
621 
622         ex.spawn_local(write_value(tx)).detach();
623         ex.spawn_local(check_op(op)).detach();
624 
625         // Now drop the executor. It should still run until the write to the pipe is complete.
626         mem::drop(ex);
627 
628         let mut buf = 0u64.to_ne_bytes();
629         rx.read_exact(&mut buf[..])
630             .expect("Failed to read from pipe");
631 
632         assert_eq!(u64::from_ne_bytes(buf), VALUE);
633     }
634 }
635