1 // Signal handling
2 cfg_signal_internal_and_unix! {
3 mod signal;
4 }
5
6 use crate::io::interest::Interest;
7 use crate::io::ready::Ready;
8 use crate::loom::sync::Mutex;
9 use crate::runtime::driver;
10 use crate::runtime::io::registration_set;
11 use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
12
13 use mio::event::Source;
14 use std::fmt;
15 use std::io;
16 use std::sync::Arc;
17 use std::time::Duration;
18
19 /// I/O driver, backed by Mio.
20 pub(crate) struct Driver {
21 /// Tracks the number of times `turn` is called. It is safe for this to wrap
22 /// as it is mostly used to determine when to call `compact()`.
23 tick: u8,
24
25 /// True when an event with the signal token is received
26 signal_ready: bool,
27
28 /// Reuse the `mio::Events` value across calls to poll.
29 events: mio::Events,
30
31 /// The system event queue.
32 poll: mio::Poll,
33 }
34
35 /// A reference to an I/O driver.
36 pub(crate) struct Handle {
37 /// Registers I/O resources.
38 registry: mio::Registry,
39
40 /// Tracks all registrations
41 registrations: RegistrationSet,
42
43 /// State that should be synchronized
44 synced: Mutex<registration_set::Synced>,
45
46 /// Used to wake up the reactor from a call to `turn`.
47 /// Not supported on Wasi due to lack of threading support.
48 #[cfg(not(target_os = "wasi"))]
49 waker: mio::Waker,
50
51 pub(crate) metrics: IoDriverMetrics,
52 }
53
54 #[derive(Debug)]
55 pub(crate) struct ReadyEvent {
56 pub(super) tick: u8,
57 pub(crate) ready: Ready,
58 pub(super) is_shutdown: bool,
59 }
60
61 cfg_net_unix!(
62 impl ReadyEvent {
63 pub(crate) fn with_ready(&self, ready: Ready) -> Self {
64 Self {
65 ready,
66 tick: self.tick,
67 is_shutdown: self.is_shutdown,
68 }
69 }
70 }
71 );
72
73 #[derive(Debug, Eq, PartialEq, Clone, Copy)]
74 pub(super) enum Direction {
75 Read,
76 Write,
77 }
78
79 pub(super) enum Tick {
80 Set(u8),
81 Clear(u8),
82 }
83
84 const TOKEN_WAKEUP: mio::Token = mio::Token(0);
85 const TOKEN_SIGNAL: mio::Token = mio::Token(1);
86
_assert_kinds()87 fn _assert_kinds() {
88 fn _assert<T: Send + Sync>() {}
89
90 _assert::<Handle>();
91 }
92
93 // ===== impl Driver =====
94
95 impl Driver {
96 /// Creates a new event loop, returning any error that happened during the
97 /// creation.
new(nevents: usize) -> io::Result<(Driver, Handle)>98 pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
99 let poll = mio::Poll::new()?;
100 #[cfg(not(target_os = "wasi"))]
101 let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
102 let registry = poll.registry().try_clone()?;
103
104 let driver = Driver {
105 tick: 0,
106 signal_ready: false,
107 events: mio::Events::with_capacity(nevents),
108 poll,
109 };
110
111 let (registrations, synced) = RegistrationSet::new();
112
113 let handle = Handle {
114 registry,
115 registrations,
116 synced: Mutex::new(synced),
117 #[cfg(not(target_os = "wasi"))]
118 waker,
119 metrics: IoDriverMetrics::default(),
120 };
121
122 Ok((driver, handle))
123 }
124
park(&mut self, rt_handle: &driver::Handle)125 pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
126 let handle = rt_handle.io();
127 self.turn(handle, None);
128 }
129
park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration)130 pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
131 let handle = rt_handle.io();
132 self.turn(handle, Some(duration));
133 }
134
shutdown(&mut self, rt_handle: &driver::Handle)135 pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
136 let handle = rt_handle.io();
137 let ios = handle.registrations.shutdown(&mut handle.synced.lock());
138
139 // `shutdown()` must be called without holding the lock.
140 for io in ios {
141 io.shutdown();
142 }
143 }
144
turn(&mut self, handle: &Handle, max_wait: Option<Duration>)145 fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
146 debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
147
148 self.tick = self.tick.wrapping_add(1);
149
150 handle.release_pending_registrations();
151
152 let events = &mut self.events;
153
154 // Block waiting for an event to happen, peeling out how many events
155 // happened.
156 match self.poll.poll(events, max_wait) {
157 Ok(_) => {}
158 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
159 #[cfg(target_os = "wasi")]
160 Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
161 // In case of wasm32_wasi this error happens, when trying to poll without subscriptions
162 // just return from the park, as there would be nothing, which wakes us up.
163 }
164 Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
165 }
166
167 // Process all the events that came in, dispatching appropriately
168 let mut ready_count = 0;
169 for event in events.iter() {
170 let token = event.token();
171
172 if token == TOKEN_WAKEUP {
173 // Nothing to do, the event is used to unblock the I/O driver
174 } else if token == TOKEN_SIGNAL {
175 self.signal_ready = true;
176 } else {
177 let ready = Ready::from_mio(event);
178 // Use std::ptr::from_exposed_addr when stable
179 let ptr: *const ScheduledIo = token.0 as *const _;
180
181 // Safety: we ensure that the pointers used as tokens are not freed
182 // until they are both deregistered from mio **and** we know the I/O
183 // driver is not concurrently polling. The I/O driver holds ownership of
184 // an `Arc<ScheduledIo>` so we can safely cast this to a ref.
185 let io: &ScheduledIo = unsafe { &*ptr };
186
187 io.set_readiness(Tick::Set(self.tick), |curr| curr | ready);
188 io.wake(ready);
189
190 ready_count += 1;
191 }
192 }
193
194 handle.metrics.incr_ready_count_by(ready_count);
195 }
196 }
197
198 impl fmt::Debug for Driver {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 write!(f, "Driver")
201 }
202 }
203
204 impl Handle {
205 /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
206 /// makes the next call to `turn` return immediately.
207 ///
208 /// This method is intended to be used in situations where a notification
209 /// needs to otherwise be sent to the main reactor. If the reactor is
210 /// currently blocked inside of `turn` then it will wake up and soon return
211 /// after this method has been called. If the reactor is not currently
212 /// blocked in `turn`, then the next call to `turn` will not block and
213 /// return immediately.
unpark(&self)214 pub(crate) fn unpark(&self) {
215 #[cfg(not(target_os = "wasi"))]
216 self.waker.wake().expect("failed to wake I/O driver");
217 }
218
219 /// Registers an I/O resource with the reactor for a given `mio::Ready` state.
220 ///
221 /// The registration token is returned.
add_source( &self, source: &mut impl mio::event::Source, interest: Interest, ) -> io::Result<Arc<ScheduledIo>>222 pub(super) fn add_source(
223 &self,
224 source: &mut impl mio::event::Source,
225 interest: Interest,
226 ) -> io::Result<Arc<ScheduledIo>> {
227 let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
228 let token = scheduled_io.token();
229
230 // TODO: if this returns an err, the `ScheduledIo` leaks...
231 self.registry.register(source, token, interest.to_mio())?;
232
233 // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList`
234 self.metrics.incr_fd_count();
235
236 Ok(scheduled_io)
237 }
238
239 /// Deregisters an I/O resource from the reactor.
deregister_source( &self, registration: &Arc<ScheduledIo>, source: &mut impl Source, ) -> io::Result<()>240 pub(super) fn deregister_source(
241 &self,
242 registration: &Arc<ScheduledIo>,
243 source: &mut impl Source,
244 ) -> io::Result<()> {
245 // Deregister the source with the OS poller **first**
246 self.registry.deregister(source)?;
247
248 if self
249 .registrations
250 .deregister(&mut self.synced.lock(), registration)
251 {
252 self.unpark();
253 }
254
255 self.metrics.dec_fd_count();
256
257 Ok(())
258 }
259
release_pending_registrations(&self)260 fn release_pending_registrations(&self) {
261 if self.registrations.needs_release() {
262 self.registrations.release(&mut self.synced.lock());
263 }
264 }
265 }
266
267 impl fmt::Debug for Handle {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 write!(f, "Handle")
270 }
271 }
272
273 impl Direction {
mask(self) -> Ready274 pub(super) fn mask(self) -> Ready {
275 match self {
276 Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
277 Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
278 }
279 }
280 }
281