• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Signal handling
2 cfg_signal_internal_and_unix! {
3     mod signal;
4 }
5 
6 use crate::io::interest::Interest;
7 use crate::io::ready::Ready;
8 use crate::loom::sync::Mutex;
9 use crate::runtime::driver;
10 use crate::runtime::io::registration_set;
11 use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
12 
13 use mio::event::Source;
14 use std::fmt;
15 use std::io;
16 use std::sync::Arc;
17 use std::time::Duration;
18 
19 /// I/O driver, backed by Mio.
20 pub(crate) struct Driver {
21     /// Tracks the number of times `turn` is called. It is safe for this to wrap
22     /// as it is mostly used to determine when to call `compact()`.
23     tick: u8,
24 
25     /// True when an event with the signal token is received
26     signal_ready: bool,
27 
28     /// Reuse the `mio::Events` value across calls to poll.
29     events: mio::Events,
30 
31     /// The system event queue.
32     poll: mio::Poll,
33 }
34 
35 /// A reference to an I/O driver.
36 pub(crate) struct Handle {
37     /// Registers I/O resources.
38     registry: mio::Registry,
39 
40     /// Tracks all registrations
41     registrations: RegistrationSet,
42 
43     /// State that should be synchronized
44     synced: Mutex<registration_set::Synced>,
45 
46     /// Used to wake up the reactor from a call to `turn`.
47     /// Not supported on Wasi due to lack of threading support.
48     #[cfg(not(target_os = "wasi"))]
49     waker: mio::Waker,
50 
51     pub(crate) metrics: IoDriverMetrics,
52 }
53 
54 #[derive(Debug)]
55 pub(crate) struct ReadyEvent {
56     pub(super) tick: u8,
57     pub(crate) ready: Ready,
58     pub(super) is_shutdown: bool,
59 }
60 
61 cfg_net_unix!(
62     impl ReadyEvent {
63         pub(crate) fn with_ready(&self, ready: Ready) -> Self {
64             Self {
65                 ready,
66                 tick: self.tick,
67                 is_shutdown: self.is_shutdown,
68             }
69         }
70     }
71 );
72 
73 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
74 pub(super) enum Direction {
75     Read,
76     Write,
77 }
78 
79 pub(super) enum Tick {
80     Set(u8),
81     Clear(u8),
82 }
83 
84 const TOKEN_WAKEUP: mio::Token = mio::Token(0);
85 const TOKEN_SIGNAL: mio::Token = mio::Token(1);
86 
_assert_kinds()87 fn _assert_kinds() {
88     fn _assert<T: Send + Sync>() {}
89 
90     _assert::<Handle>();
91 }
92 
93 // ===== impl Driver =====
94 
95 impl Driver {
96     /// Creates a new event loop, returning any error that happened during the
97     /// creation.
new(nevents: usize) -> io::Result<(Driver, Handle)>98     pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
99         let poll = mio::Poll::new()?;
100         #[cfg(not(target_os = "wasi"))]
101         let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
102         let registry = poll.registry().try_clone()?;
103 
104         let driver = Driver {
105             tick: 0,
106             signal_ready: false,
107             events: mio::Events::with_capacity(nevents),
108             poll,
109         };
110 
111         let (registrations, synced) = RegistrationSet::new();
112 
113         let handle = Handle {
114             registry,
115             registrations,
116             synced: Mutex::new(synced),
117             #[cfg(not(target_os = "wasi"))]
118             waker,
119             metrics: IoDriverMetrics::default(),
120         };
121 
122         Ok((driver, handle))
123     }
124 
park(&mut self, rt_handle: &driver::Handle)125     pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
126         let handle = rt_handle.io();
127         self.turn(handle, None);
128     }
129 
park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration)130     pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
131         let handle = rt_handle.io();
132         self.turn(handle, Some(duration));
133     }
134 
shutdown(&mut self, rt_handle: &driver::Handle)135     pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
136         let handle = rt_handle.io();
137         let ios = handle.registrations.shutdown(&mut handle.synced.lock());
138 
139         // `shutdown()` must be called without holding the lock.
140         for io in ios {
141             io.shutdown();
142         }
143     }
144 
turn(&mut self, handle: &Handle, max_wait: Option<Duration>)145     fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
146         debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
147 
148         self.tick = self.tick.wrapping_add(1);
149 
150         handle.release_pending_registrations();
151 
152         let events = &mut self.events;
153 
154         // Block waiting for an event to happen, peeling out how many events
155         // happened.
156         match self.poll.poll(events, max_wait) {
157             Ok(_) => {}
158             Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
159             #[cfg(target_os = "wasi")]
160             Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
161                 // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
162                 // just return from the park, as there would be nothing, which wakes us up.
163             }
164             Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
165         }
166 
167         // Process all the events that came in, dispatching appropriately
168         let mut ready_count = 0;
169         for event in events.iter() {
170             let token = event.token();
171 
172             if token == TOKEN_WAKEUP {
173                 // Nothing to do, the event is used to unblock the I/O driver
174             } else if token == TOKEN_SIGNAL {
175                 self.signal_ready = true;
176             } else {
177                 let ready = Ready::from_mio(event);
178                 // Use std::ptr::from_exposed_addr when stable
179                 let ptr: *const ScheduledIo = token.0 as *const _;
180 
181                 // Safety: we ensure that the pointers used as tokens are not freed
182                 // until they are both deregistered from mio **and** we know the I/O
183                 // driver is not concurrently polling. The I/O driver holds ownership of
184                 // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
185                 let io: &ScheduledIo = unsafe { &*ptr };
186 
187                 io.set_readiness(Tick::Set(self.tick), |curr| curr | ready);
188                 io.wake(ready);
189 
190                 ready_count += 1;
191             }
192         }
193 
194         handle.metrics.incr_ready_count_by(ready_count);
195     }
196 }
197 
198 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result199     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200         write!(f, "Driver")
201     }
202 }
203 
204 impl Handle {
205     /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
206     /// makes the next call to `turn` return immediately.
207     ///
208     /// This method is intended to be used in situations where a notification
209     /// needs to otherwise be sent to the main reactor. If the reactor is
210     /// currently blocked inside of `turn` then it will wake up and soon return
211     /// after this method has been called. If the reactor is not currently
212     /// blocked in `turn`, then the next call to `turn` will not block and
213     /// return immediately.
unpark(&self)214     pub(crate) fn unpark(&self) {
215         #[cfg(not(target_os = "wasi"))]
216         self.waker.wake().expect("failed to wake I/O driver");
217     }
218 
219     /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
220     ///
221     /// The registration token is returned.
add_source( &self, source: &mut impl mio::event::Source, interest: Interest, ) -> io::Result<Arc<ScheduledIo>>222     pub(super) fn add_source(
223         &self,
224         source: &mut impl mio::event::Source,
225         interest: Interest,
226     ) -> io::Result<Arc<ScheduledIo>> {
227         let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
228         let token = scheduled_io.token();
229 
230         // TODO: if this returns an err, the `ScheduledIo` leaks...
231         self.registry.register(source, token, interest.to_mio())?;
232 
233         // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
234         self.metrics.incr_fd_count();
235 
236         Ok(scheduled_io)
237     }
238 
239     /// Deregisters an I/O resource from the reactor.
deregister_source( &self, registration: &Arc<ScheduledIo>, source: &mut impl Source, ) -> io::Result<()>240     pub(super) fn deregister_source(
241         &self,
242         registration: &Arc<ScheduledIo>,
243         source: &mut impl Source,
244     ) -> io::Result<()> {
245         // Deregister the source with the OS poller **first**
246         self.registry.deregister(source)?;
247 
248         if self
249             .registrations
250             .deregister(&mut self.synced.lock(), registration)
251         {
252             self.unpark();
253         }
254 
255         self.metrics.dec_fd_count();
256 
257         Ok(())
258     }
259 
release_pending_registrations(&self)260     fn release_pending_registrations(&self) {
261         if self.registrations.needs_release() {
262             self.registrations.release(&mut self.synced.lock());
263         }
264     }
265 }
266 
267 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result268     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269         write!(f, "Handle")
270     }
271 }
272 
273 impl Direction {
mask(self) -> Ready274     pub(super) fn mask(self) -> Ready {
275         match self {
276             Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
277             Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
278         }
279     }
280 }
281