1 use crate::process::imp::orphan::{OrphanQueue, Wait}; 2 use crate::process::kill::Kill; 3 use crate::signal::unix::InternalStream; 4 5 use std::future::Future; 6 use std::io; 7 use std::ops::Deref; 8 use std::pin::Pin; 9 use std::process::ExitStatus; 10 use std::task::Context; 11 use std::task::Poll; 12 13 /// Orchestrates between registering interest for receiving signals when a 14 /// child process has exited, and attempting to poll for process completion. 15 #[derive(Debug)] 16 pub(crate) struct Reaper<W, Q, S> 17 where 18 W: Wait, 19 Q: OrphanQueue<W>, 20 { 21 inner: Option<W>, 22 orphan_queue: Q, 23 signal: S, 24 } 25 26 impl<W, Q, S> Deref for Reaper<W, Q, S> 27 where 28 W: Wait, 29 Q: OrphanQueue<W>, 30 { 31 type Target = W; 32 deref(&self) -> &Self::Target33 fn deref(&self) -> &Self::Target { 34 self.inner() 35 } 36 } 37 38 impl<W, Q, S> Reaper<W, Q, S> 39 where 40 W: Wait, 41 Q: OrphanQueue<W>, 42 { new(inner: W, orphan_queue: Q, signal: S) -> Self43 pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self { 44 Self { 45 inner: Some(inner), 46 orphan_queue, 47 signal, 48 } 49 } 50 inner(&self) -> &W51 fn inner(&self) -> &W { 52 self.inner.as_ref().expect("inner has gone away") 53 } 54 inner_mut(&mut self) -> &mut W55 pub(crate) fn inner_mut(&mut self) -> &mut W { 56 self.inner.as_mut().expect("inner has gone away") 57 } 58 } 59 60 impl<W, Q, S> Future for Reaper<W, Q, S> 61 where 62 W: Wait + Unpin, 63 Q: OrphanQueue<W> + Unpin, 64 S: InternalStream + Unpin, 65 { 66 type Output = io::Result<ExitStatus>; 67 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>68 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 69 loop { 70 // If the child hasn't exited yet, then it's our responsibility to 71 // ensure the current task gets notified when it might be able to 72 // make progress. We can use the delivery of a SIGCHLD signal as a 73 // sign that we can potentially make progress. 74 // 75 // However, we will register for a notification on the next signal 76 // BEFORE we poll the child. Otherwise it is possible that the child 77 // can exit and the signal can arrive after we last polled the child, 78 // but before we've registered for a notification on the next signal 79 // (this can cause a deadlock if there are no more spawned children 80 // which can generate a different signal for us). A side effect of 81 // pre-registering for signal notifications is that when the child 82 // exits, we will have already registered for an additional 83 // notification we don't need to consume. If another signal arrives, 84 // this future's task will be notified/woken up again. Since the 85 // futures model allows for spurious wake ups this extra wakeup 86 // should not cause significant issues with parent futures. 87 let registered_interest = self.signal.poll_recv(cx).is_pending(); 88 89 if let Some(status) = self.inner_mut().try_wait()? { 90 return Poll::Ready(Ok(status)); 91 } 92 93 // If our attempt to poll for the next signal was not ready, then 94 // we've arranged for our task to get notified and we can bail out. 95 if registered_interest { 96 return Poll::Pending; 97 } else { 98 // Otherwise, if the signal stream delivered a signal to us, we 99 // won't get notified at the next signal, so we'll loop and try 100 // again. 101 continue; 102 } 103 } 104 } 105 } 106 107 impl<W, Q, S> Kill for Reaper<W, Q, S> 108 where 109 W: Kill + Wait, 110 Q: OrphanQueue<W>, 111 { kill(&mut self) -> io::Result<()>112 fn kill(&mut self) -> io::Result<()> { 113 self.inner_mut().kill() 114 } 115 } 116 117 impl<W, Q, S> Drop for Reaper<W, Q, S> 118 where 119 W: Wait, 120 Q: OrphanQueue<W>, 121 { drop(&mut self)122 fn drop(&mut self) { 123 if let Ok(Some(_)) = self.inner_mut().try_wait() { 124 return; 125 } 126 127 let orphan = self.inner.take().unwrap(); 128 self.orphan_queue.push_orphan(orphan); 129 } 130 } 131 132 #[cfg(all(test, not(loom)))] 133 mod test { 134 use super::*; 135 136 use crate::process::unix::orphan::test::MockQueue; 137 use futures::future::FutureExt; 138 use std::os::unix::process::ExitStatusExt; 139 use std::process::ExitStatus; 140 use std::task::Context; 141 use std::task::Poll; 142 143 #[derive(Debug)] 144 struct MockWait { 145 total_kills: usize, 146 total_waits: usize, 147 num_wait_until_status: usize, 148 status: ExitStatus, 149 } 150 151 impl MockWait { new(status: ExitStatus, num_wait_until_status: usize) -> Self152 fn new(status: ExitStatus, num_wait_until_status: usize) -> Self { 153 Self { 154 total_kills: 0, 155 total_waits: 0, 156 num_wait_until_status, 157 status, 158 } 159 } 160 } 161 162 impl Wait for MockWait { id(&self) -> u32163 fn id(&self) -> u32 { 164 0 165 } 166 try_wait(&mut self) -> io::Result<Option<ExitStatus>>167 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> { 168 let ret = if self.num_wait_until_status == self.total_waits { 169 Some(self.status) 170 } else { 171 None 172 }; 173 174 self.total_waits += 1; 175 Ok(ret) 176 } 177 } 178 179 impl Kill for MockWait { kill(&mut self) -> io::Result<()>180 fn kill(&mut self) -> io::Result<()> { 181 self.total_kills += 1; 182 Ok(()) 183 } 184 } 185 186 struct MockStream { 187 total_polls: usize, 188 values: Vec<Option<()>>, 189 } 190 191 impl MockStream { new(values: Vec<Option<()>>) -> Self192 fn new(values: Vec<Option<()>>) -> Self { 193 Self { 194 total_polls: 0, 195 values, 196 } 197 } 198 } 199 200 impl InternalStream for MockStream { poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>>201 fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> { 202 self.total_polls += 1; 203 match self.values.remove(0) { 204 Some(()) => Poll::Ready(Some(())), 205 None => Poll::Pending, 206 } 207 } 208 } 209 210 #[test] reaper()211 fn reaper() { 212 let exit = ExitStatus::from_raw(0); 213 let mock = MockWait::new(exit, 3); 214 let mut grim = Reaper::new( 215 mock, 216 MockQueue::new(), 217 MockStream::new(vec![None, Some(()), None, None, None]), 218 ); 219 220 let waker = futures::task::noop_waker(); 221 let mut context = Context::from_waker(&waker); 222 223 // Not yet exited, interest registered 224 assert!(grim.poll_unpin(&mut context).is_pending()); 225 assert_eq!(1, grim.signal.total_polls); 226 assert_eq!(1, grim.total_waits); 227 assert_eq!(0, grim.orphan_queue.total_reaps.get()); 228 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 229 230 // Not yet exited, couldn't register interest the first time 231 // but managed to register interest the second time around 232 assert!(grim.poll_unpin(&mut context).is_pending()); 233 assert_eq!(3, grim.signal.total_polls); 234 assert_eq!(3, grim.total_waits); 235 assert_eq!(0, grim.orphan_queue.total_reaps.get()); 236 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 237 238 // Exited 239 if let Poll::Ready(r) = grim.poll_unpin(&mut context) { 240 assert!(r.is_ok()); 241 let exit_code = r.unwrap(); 242 assert_eq!(exit_code, exit); 243 } else { 244 unreachable!(); 245 } 246 assert_eq!(4, grim.signal.total_polls); 247 assert_eq!(4, grim.total_waits); 248 assert_eq!(0, grim.orphan_queue.total_reaps.get()); 249 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 250 } 251 252 #[test] kill()253 fn kill() { 254 let exit = ExitStatus::from_raw(0); 255 let mut grim = Reaper::new( 256 MockWait::new(exit, 0), 257 MockQueue::new(), 258 MockStream::new(vec![None]), 259 ); 260 261 grim.kill().unwrap(); 262 assert_eq!(1, grim.total_kills); 263 assert_eq!(0, grim.orphan_queue.total_reaps.get()); 264 assert!(grim.orphan_queue.all_enqueued.borrow().is_empty()); 265 } 266 267 #[test] drop_reaps_if_possible()268 fn drop_reaps_if_possible() { 269 let exit = ExitStatus::from_raw(0); 270 let mut mock = MockWait::new(exit, 0); 271 272 { 273 let queue = MockQueue::new(); 274 275 let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); 276 277 drop(grim); 278 279 assert_eq!(0, queue.total_reaps.get()); 280 assert!(queue.all_enqueued.borrow().is_empty()); 281 } 282 283 assert_eq!(1, mock.total_waits); 284 assert_eq!(0, mock.total_kills); 285 } 286 287 #[test] drop_enqueues_orphan_if_wait_fails()288 fn drop_enqueues_orphan_if_wait_fails() { 289 let exit = ExitStatus::from_raw(0); 290 let mut mock = MockWait::new(exit, 2); 291 292 { 293 let queue = MockQueue::<&mut MockWait>::new(); 294 let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![])); 295 drop(grim); 296 297 assert_eq!(0, queue.total_reaps.get()); 298 assert_eq!(1, queue.all_enqueued.borrow().len()); 299 } 300 301 assert_eq!(1, mock.total_waits); 302 assert_eq!(0, mock.total_kills); 303 } 304 } 305