• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "rt"), allow(dead_code))]
2 
3 //! Signal driver
4 
5 use crate::io::driver::{Driver as IoDriver, Interest};
6 use crate::io::PollEvented;
7 use crate::park::Park;
8 use crate::signal::registry::globals;
9 
10 use mio::net::UnixStream;
11 use std::io::{self, Read};
12 use std::ptr;
13 use std::sync::{Arc, Weak};
14 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
15 use std::time::Duration;
16 
17 /// Responsible for registering wakeups when an OS signal is received, and
18 /// subsequently dispatching notifications to any signal listeners as appropriate.
19 ///
20 /// Note: this driver relies on having an enabled IO driver in order to listen to
21 /// pipe write wakeups.
22 #[derive(Debug)]
23 pub(crate) struct Driver {
24     /// Thread parker. The `Driver` park implementation delegates to this.
25     park: IoDriver,
26 
27     /// A pipe for receiving wake events from the signal handler
28     receiver: PollEvented<UnixStream>,
29 
30     /// Shared state
31     inner: Arc<Inner>,
32 }
33 
34 #[derive(Clone, Debug, Default)]
35 pub(crate) struct Handle {
36     inner: Weak<Inner>,
37 }
38 
39 #[derive(Debug)]
40 pub(super) struct Inner(());
41 
42 // ===== impl Driver =====
43 
44 impl Driver {
45     /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
new(park: IoDriver) -> io::Result<Self>46     pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
47         use std::mem::ManuallyDrop;
48         use std::os::unix::io::{AsRawFd, FromRawFd};
49 
50         // NB: We give each driver a "fresh" receiver file descriptor to avoid
51         // the issues described in alexcrichton/tokio-process#42.
52         //
53         // In the past we would reuse the actual receiver file descriptor and
54         // swallow any errors around double registration of the same descriptor.
55         // I'm not sure if the second (failed) registration simply doesn't end
56         // up receiving wake up notifications, or there could be some race
57         // condition when consuming readiness events, but having distinct
58         // descriptors for distinct PollEvented instances appears to mitigate
59         // this.
60         //
61         // Unfortunately we cannot just use a single global PollEvented instance
62         // either, since we can't compare Handles or assume they will always
63         // point to the exact same reactor.
64         //
65         // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
66         // with registering dups with the same reactor. In this case, duping is
67         // safe as each dup is registered with separate reactors **and** we
68         // only expect at least one dup to receive the notification.
69 
70         // Manually drop as we don't actually own this instance of UnixStream.
71         let receiver_fd = globals().receiver.as_raw_fd();
72 
73         // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
74         let original =
75             ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
76         let receiver = UnixStream::from_std(original.try_clone()?);
77         let receiver = PollEvented::new_with_interest_and_handle(
78             receiver,
79             Interest::READABLE | Interest::WRITABLE,
80             park.handle(),
81         )?;
82 
83         Ok(Self {
84             park,
85             receiver,
86             inner: Arc::new(Inner(())),
87         })
88     }
89 
90     /// Returns a handle to this event loop which can be sent across threads
91     /// and can be used as a proxy to the event loop itself.
handle(&self) -> Handle92     pub(crate) fn handle(&self) -> Handle {
93         Handle {
94             inner: Arc::downgrade(&self.inner),
95         }
96     }
97 
process(&self)98     fn process(&self) {
99         // Check if the pipe is ready to read and therefore has "woken" us up
100         //
101         // To do so, we will `poll_read_ready` with a noop waker, since we don't
102         // need to actually be notified when read ready...
103         let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
104         let mut cx = Context::from_waker(&waker);
105 
106         let ev = match self.receiver.registration().poll_read_ready(&mut cx) {
107             Poll::Ready(Ok(ev)) => ev,
108             Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
109             Poll::Pending => return, // No wake has arrived, bail
110         };
111 
112         // Drain the pipe completely so we can receive a new readiness event
113         // if another signal has come in.
114         let mut buf = [0; 128];
115         loop {
116             match (&*self.receiver).read(&mut buf) {
117                 Ok(0) => panic!("EOF on self-pipe"),
118                 Ok(_) => continue, // Keep reading
119                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
120                 Err(e) => panic!("Bad read on self-pipe: {}", e),
121             }
122         }
123 
124         self.receiver.registration().clear_readiness(ev);
125 
126         // Broadcast any signals which were received
127         globals().broadcast();
128     }
129 }
130 
131 const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
132 
noop_clone(_data: *const ()) -> RawWaker133 unsafe fn noop_clone(_data: *const ()) -> RawWaker {
134     RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
135 }
136 
noop(_data: *const ())137 unsafe fn noop(_data: *const ()) {}
138 
139 // ===== impl Park for Driver =====
140 
141 impl Park for Driver {
142     type Unpark = <IoDriver as Park>::Unpark;
143     type Error = io::Error;
144 
unpark(&self) -> Self::Unpark145     fn unpark(&self) -> Self::Unpark {
146         self.park.unpark()
147     }
148 
park(&mut self) -> Result<(), Self::Error>149     fn park(&mut self) -> Result<(), Self::Error> {
150         self.park.park()?;
151         self.process();
152         Ok(())
153     }
154 
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>155     fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
156         self.park.park_timeout(duration)?;
157         self.process();
158         Ok(())
159     }
160 
shutdown(&mut self)161     fn shutdown(&mut self) {
162         self.park.shutdown()
163     }
164 }
165 
166 // ===== impl Handle =====
167 
168 impl Handle {
check_inner(&self) -> io::Result<()>169     pub(super) fn check_inner(&self) -> io::Result<()> {
170         if self.inner.strong_count() > 0 {
171             Ok(())
172         } else {
173             Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
174         }
175     }
176 }
177 
178 cfg_rt! {
179     impl Handle {
180         /// Returns a handle to the current driver
181         ///
182         /// # Panics
183         ///
184         /// This function panics if there is no current signal driver set.
185         pub(super) fn current() -> Self {
186             crate::runtime::context::signal_handle().expect(
187                 "there is no signal driver running, must be called from the context of Tokio runtime",
188             )
189         }
190     }
191 }
192 
193 cfg_not_rt! {
194     impl Handle {
195         /// Returns a handle to the current driver
196         ///
197         /// # Panics
198         ///
199         /// This function panics if there is no current signal driver set.
200         pub(super) fn current() -> Self {
201             panic!(
202                 "there is no signal driver running, must be called from the context of Tokio runtime or with\
203                 `rt` enabled.",
204             )
205         }
206     }
207 }
208