1 #![cfg_attr(not(feature = "rt"), allow(dead_code))]
2
3 //! Signal driver
4
5 use crate::io::driver::{Driver as IoDriver, Interest};
6 use crate::io::PollEvented;
7 use crate::park::Park;
8 use crate::signal::registry::globals;
9
10 use mio::net::UnixStream;
11 use std::io::{self, Read};
12 use std::ptr;
13 use std::sync::{Arc, Weak};
14 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
15 use std::time::Duration;
16
17 /// Responsible for registering wakeups when an OS signal is received, and
18 /// subsequently dispatching notifications to any signal listeners as appropriate.
19 ///
20 /// Note: this driver relies on having an enabled IO driver in order to listen to
21 /// pipe write wakeups.
22 #[derive(Debug)]
23 pub(crate) struct Driver {
24 /// Thread parker. The `Driver` park implementation delegates to this.
25 park: IoDriver,
26
27 /// A pipe for receiving wake events from the signal handler
28 receiver: PollEvented<UnixStream>,
29
30 /// Shared state
31 inner: Arc<Inner>,
32 }
33
34 #[derive(Clone, Debug, Default)]
35 pub(crate) struct Handle {
36 inner: Weak<Inner>,
37 }
38
39 #[derive(Debug)]
40 pub(super) struct Inner(());
41
42 // ===== impl Driver =====
43
44 impl Driver {
45 /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
new(park: IoDriver) -> io::Result<Self>46 pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
47 use std::mem::ManuallyDrop;
48 use std::os::unix::io::{AsRawFd, FromRawFd};
49
50 // NB: We give each driver a "fresh" receiver file descriptor to avoid
51 // the issues described in alexcrichton/tokio-process#42.
52 //
53 // In the past we would reuse the actual receiver file descriptor and
54 // swallow any errors around double registration of the same descriptor.
55 // I'm not sure if the second (failed) registration simply doesn't end
56 // up receiving wake up notifications, or there could be some race
57 // condition when consuming readiness events, but having distinct
58 // descriptors for distinct PollEvented instances appears to mitigate
59 // this.
60 //
61 // Unfortunately we cannot just use a single global PollEvented instance
62 // either, since we can't compare Handles or assume they will always
63 // point to the exact same reactor.
64 //
65 // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
66 // with registering dups with the same reactor. In this case, duping is
67 // safe as each dup is registered with separate reactors **and** we
68 // only expect at least one dup to receive the notification.
69
70 // Manually drop as we don't actually own this instance of UnixStream.
71 let receiver_fd = globals().receiver.as_raw_fd();
72
73 // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
74 let original =
75 ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
76 let receiver = UnixStream::from_std(original.try_clone()?);
77 let receiver = PollEvented::new_with_interest_and_handle(
78 receiver,
79 Interest::READABLE | Interest::WRITABLE,
80 park.handle(),
81 )?;
82
83 Ok(Self {
84 park,
85 receiver,
86 inner: Arc::new(Inner(())),
87 })
88 }
89
90 /// Returns a handle to this event loop which can be sent across threads
91 /// and can be used as a proxy to the event loop itself.
handle(&self) -> Handle92 pub(crate) fn handle(&self) -> Handle {
93 Handle {
94 inner: Arc::downgrade(&self.inner),
95 }
96 }
97
process(&self)98 fn process(&self) {
99 // Check if the pipe is ready to read and therefore has "woken" us up
100 //
101 // To do so, we will `poll_read_ready` with a noop waker, since we don't
102 // need to actually be notified when read ready...
103 let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
104 let mut cx = Context::from_waker(&waker);
105
106 let ev = match self.receiver.registration().poll_read_ready(&mut cx) {
107 Poll::Ready(Ok(ev)) => ev,
108 Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
109 Poll::Pending => return, // No wake has arrived, bail
110 };
111
112 // Drain the pipe completely so we can receive a new readiness event
113 // if another signal has come in.
114 let mut buf = [0; 128];
115 loop {
116 match (&*self.receiver).read(&mut buf) {
117 Ok(0) => panic!("EOF on self-pipe"),
118 Ok(_) => continue, // Keep reading
119 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
120 Err(e) => panic!("Bad read on self-pipe: {}", e),
121 }
122 }
123
124 self.receiver.registration().clear_readiness(ev);
125
126 // Broadcast any signals which were received
127 globals().broadcast();
128 }
129 }
130
131 const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
132
noop_clone(_data: *const ()) -> RawWaker133 unsafe fn noop_clone(_data: *const ()) -> RawWaker {
134 RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
135 }
136
noop(_data: *const ())137 unsafe fn noop(_data: *const ()) {}
138
139 // ===== impl Park for Driver =====
140
141 impl Park for Driver {
142 type Unpark = <IoDriver as Park>::Unpark;
143 type Error = io::Error;
144
unpark(&self) -> Self::Unpark145 fn unpark(&self) -> Self::Unpark {
146 self.park.unpark()
147 }
148
park(&mut self) -> Result<(), Self::Error>149 fn park(&mut self) -> Result<(), Self::Error> {
150 self.park.park()?;
151 self.process();
152 Ok(())
153 }
154
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>155 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
156 self.park.park_timeout(duration)?;
157 self.process();
158 Ok(())
159 }
160
shutdown(&mut self)161 fn shutdown(&mut self) {
162 self.park.shutdown()
163 }
164 }
165
166 // ===== impl Handle =====
167
168 impl Handle {
check_inner(&self) -> io::Result<()>169 pub(super) fn check_inner(&self) -> io::Result<()> {
170 if self.inner.strong_count() > 0 {
171 Ok(())
172 } else {
173 Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
174 }
175 }
176 }
177
178 cfg_rt! {
179 impl Handle {
180 /// Returns a handle to the current driver
181 ///
182 /// # Panics
183 ///
184 /// This function panics if there is no current signal driver set.
185 pub(super) fn current() -> Self {
186 crate::runtime::context::signal_handle().expect(
187 "there is no signal driver running, must be called from the context of Tokio runtime",
188 )
189 }
190 }
191 }
192
193 cfg_not_rt! {
194 impl Handle {
195 /// Returns a handle to the current driver
196 ///
197 /// # Panics
198 ///
199 /// This function panics if there is no current signal driver set.
200 pub(super) fn current() -> Self {
201 panic!(
202 "there is no signal driver running, must be called from the context of Tokio runtime or with\
203 `rt` enabled.",
204 )
205 }
206 }
207 }
208