• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "rt"), allow(dead_code))]
2 
3 //! Signal driver
4 
5 use crate::runtime::{driver, io};
6 use crate::signal::registry::globals;
7 
8 use mio::net::UnixStream;
9 use std::io::{self as std_io, Read};
10 use std::sync::{Arc, Weak};
11 use std::time::Duration;
12 
13 /// Responsible for registering wakeups when an OS signal is received, and
14 /// subsequently dispatching notifications to any signal listeners as appropriate.
15 ///
16 /// Note: this driver relies on having an enabled IO driver in order to listen to
17 /// pipe write wakeups.
18 #[derive(Debug)]
19 pub(crate) struct Driver {
20     /// Thread parker. The `Driver` park implementation delegates to this.
21     io: io::Driver,
22 
23     /// A pipe for receiving wake events from the signal handler
24     receiver: UnixStream,
25 
26     /// Shared state. The driver keeps a strong ref and the handle keeps a weak
27     /// ref. The weak ref is used to check if the driver is still active before
28     /// trying to register a signal handler.
29     inner: Arc<()>,
30 }
31 
32 #[derive(Debug, Default)]
33 pub(crate) struct Handle {
34     /// Paired w/ the `Arc` above and is used to check if the driver is still
35     /// around before attempting to register a signal handler.
36     inner: Weak<()>,
37 }
38 
39 // ===== impl Driver =====
40 
41 impl Driver {
42     /// Creates a new signal `Driver` instance that delegates wakeups to `park`.
new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self>43     pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> {
44         use std::mem::ManuallyDrop;
45         use std::os::unix::io::{AsRawFd, FromRawFd};
46 
47         // NB: We give each driver a "fresh" receiver file descriptor to avoid
48         // the issues described in alexcrichton/tokio-process#42.
49         //
50         // In the past we would reuse the actual receiver file descriptor and
51         // swallow any errors around double registration of the same descriptor.
52         // I'm not sure if the second (failed) registration simply doesn't end
53         // up receiving wake up notifications, or there could be some race
54         // condition when consuming readiness events, but having distinct
55         // descriptors appears to mitigate this.
56         //
57         // Unfortunately we cannot just use a single global UnixStream instance
58         // either, since we can't assume they will always be registered with the
59         // exact same reactor.
60         //
61         // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior
62         // with registering dups with the same reactor. In this case, duping is
63         // safe as each dup is registered with separate reactors **and** we
64         // only expect at least one dup to receive the notification.
65 
66         // Manually drop as we don't actually own this instance of UnixStream.
67         let receiver_fd = globals().receiver.as_raw_fd();
68 
69         // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe.
70         let original =
71             ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) });
72         let mut receiver = UnixStream::from_std(original.try_clone()?);
73 
74         io_handle.register_signal_receiver(&mut receiver)?;
75 
76         Ok(Self {
77             io,
78             receiver,
79             inner: Arc::new(()),
80         })
81     }
82 
83     /// Returns a handle to this event loop which can be sent across threads
84     /// and can be used as a proxy to the event loop itself.
handle(&self) -> Handle85     pub(crate) fn handle(&self) -> Handle {
86         Handle {
87             inner: Arc::downgrade(&self.inner),
88         }
89     }
90 
park(&mut self, handle: &driver::Handle)91     pub(crate) fn park(&mut self, handle: &driver::Handle) {
92         self.io.park(handle);
93         self.process();
94     }
95 
park_timeout(&mut self, handle: &driver::Handle, duration: Duration)96     pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
97         self.io.park_timeout(handle, duration);
98         self.process();
99     }
100 
shutdown(&mut self, handle: &driver::Handle)101     pub(crate) fn shutdown(&mut self, handle: &driver::Handle) {
102         self.io.shutdown(handle)
103     }
104 
process(&mut self)105     fn process(&mut self) {
106         // If the signal pipe has not received a readiness event, then there is
107         // nothing else to do.
108         if !self.io.consume_signal_ready() {
109             return;
110         }
111 
112         // Drain the pipe completely so we can receive a new readiness event
113         // if another signal has come in.
114         let mut buf = [0; 128];
115         loop {
116             match self.receiver.read(&mut buf) {
117                 Ok(0) => panic!("EOF on self-pipe"),
118                 Ok(_) => continue, // Keep reading
119                 Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break,
120                 Err(e) => panic!("Bad read on self-pipe: {}", e),
121             }
122         }
123 
124         // Broadcast any signals which were received
125         globals().broadcast();
126     }
127 }
128 
129 // ===== impl Handle =====
130 
131 impl Handle {
check_inner(&self) -> std_io::Result<()>132     pub(crate) fn check_inner(&self) -> std_io::Result<()> {
133         if self.inner.strong_count() > 0 {
134             Ok(())
135         } else {
136             Err(std_io::Error::new(
137                 std_io::ErrorKind::Other,
138                 "signal driver gone",
139             ))
140         }
141     }
142 }
143