1 #![cfg_attr(not(feature = "rt"), allow(dead_code))] 2 3 //! Signal driver 4 5 use crate::runtime::{driver, io}; 6 use crate::signal::registry::globals; 7 8 use mio::net::UnixStream; 9 use std::io::{self as std_io, Read}; 10 use std::sync::{Arc, Weak}; 11 use std::time::Duration; 12 13 /// Responsible for registering wakeups when an OS signal is received, and 14 /// subsequently dispatching notifications to any signal listeners as appropriate. 15 /// 16 /// Note: this driver relies on having an enabled IO driver in order to listen to 17 /// pipe write wakeups. 18 #[derive(Debug)] 19 pub(crate) struct Driver { 20 /// Thread parker. The `Driver` park implementation delegates to this. 21 io: io::Driver, 22 23 /// A pipe for receiving wake events from the signal handler 24 receiver: UnixStream, 25 26 /// Shared state. The driver keeps a strong ref and the handle keeps a weak 27 /// ref. The weak ref is used to check if the driver is still active before 28 /// trying to register a signal handler. 29 inner: Arc<()>, 30 } 31 32 #[derive(Debug, Default)] 33 pub(crate) struct Handle { 34 /// Paired w/ the `Arc` above and is used to check if the driver is still 35 /// around before attempting to register a signal handler. 36 inner: Weak<()>, 37 } 38 39 // ===== impl Driver ===== 40 41 impl Driver { 42 /// Creates a new signal `Driver` instance that delegates wakeups to `park`. new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self>43 pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result<Self> { 44 use std::mem::ManuallyDrop; 45 use std::os::unix::io::{AsRawFd, FromRawFd}; 46 47 // NB: We give each driver a "fresh" receiver file descriptor to avoid 48 // the issues described in alexcrichton/tokio-process#42. 49 // 50 // In the past we would reuse the actual receiver file descriptor and 51 // swallow any errors around double registration of the same descriptor. 52 // I'm not sure if the second (failed) registration simply doesn't end 53 // up receiving wake up notifications, or there could be some race 54 // condition when consuming readiness events, but having distinct 55 // descriptors appears to mitigate this. 56 // 57 // Unfortunately we cannot just use a single global UnixStream instance 58 // either, since we can't assume they will always be registered with the 59 // exact same reactor. 60 // 61 // Mio 0.7 removed `try_clone()` as an API due to unexpected behavior 62 // with registering dups with the same reactor. In this case, duping is 63 // safe as each dup is registered with separate reactors **and** we 64 // only expect at least one dup to receive the notification. 65 66 // Manually drop as we don't actually own this instance of UnixStream. 67 let receiver_fd = globals().receiver.as_raw_fd(); 68 69 // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. 70 let original = 71 ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); 72 let mut receiver = UnixStream::from_std(original.try_clone()?); 73 74 io_handle.register_signal_receiver(&mut receiver)?; 75 76 Ok(Self { 77 io, 78 receiver, 79 inner: Arc::new(()), 80 }) 81 } 82 83 /// Returns a handle to this event loop which can be sent across threads 84 /// and can be used as a proxy to the event loop itself. handle(&self) -> Handle85 pub(crate) fn handle(&self) -> Handle { 86 Handle { 87 inner: Arc::downgrade(&self.inner), 88 } 89 } 90 park(&mut self, handle: &driver::Handle)91 pub(crate) fn park(&mut self, handle: &driver::Handle) { 92 self.io.park(handle); 93 self.process(); 94 } 95 park_timeout(&mut self, handle: &driver::Handle, duration: Duration)96 pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) { 97 self.io.park_timeout(handle, duration); 98 self.process(); 99 } 100 shutdown(&mut self, handle: &driver::Handle)101 pub(crate) fn shutdown(&mut self, handle: &driver::Handle) { 102 self.io.shutdown(handle) 103 } 104 process(&mut self)105 fn process(&mut self) { 106 // If the signal pipe has not received a readiness event, then there is 107 // nothing else to do. 108 if !self.io.consume_signal_ready() { 109 return; 110 } 111 112 // Drain the pipe completely so we can receive a new readiness event 113 // if another signal has come in. 114 let mut buf = [0; 128]; 115 loop { 116 match self.receiver.read(&mut buf) { 117 Ok(0) => panic!("EOF on self-pipe"), 118 Ok(_) => continue, // Keep reading 119 Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, 120 Err(e) => panic!("Bad read on self-pipe: {}", e), 121 } 122 } 123 124 // Broadcast any signals which were received 125 globals().broadcast(); 126 } 127 } 128 129 // ===== impl Handle ===== 130 131 impl Handle { check_inner(&self) -> std_io::Result<()>132 pub(crate) fn check_inner(&self) -> std_io::Result<()> { 133 if self.inner.strong_count() > 0 { 134 Ok(()) 135 } else { 136 Err(std_io::Error::new( 137 std_io::ErrorKind::Other, 138 "signal driver gone", 139 )) 140 } 141 } 142 } 143