• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::process::imp::orphan::{OrphanQueue, Wait};
2 use crate::process::kill::Kill;
3 use crate::signal::unix::InternalStream;
4 
5 use std::future::Future;
6 use std::io;
7 use std::ops::Deref;
8 use std::pin::Pin;
9 use std::process::ExitStatus;
10 use std::task::Context;
11 use std::task::Poll;
12 
13 /// Orchestrates between registering interest for receiving signals when a
14 /// child process has exited, and attempting to poll for process completion.
15 #[derive(Debug)]
16 pub(crate) struct Reaper<W, Q, S>
17 where
18     W: Wait,
19     Q: OrphanQueue<W>,
20 {
21     inner: Option<W>,
22     orphan_queue: Q,
23     signal: S,
24 }
25 
26 impl<W, Q, S> Deref for Reaper<W, Q, S>
27 where
28     W: Wait,
29     Q: OrphanQueue<W>,
30 {
31     type Target = W;
32 
deref(&self) -> &Self::Target33     fn deref(&self) -> &Self::Target {
34         self.inner()
35     }
36 }
37 
38 impl<W, Q, S> Reaper<W, Q, S>
39 where
40     W: Wait,
41     Q: OrphanQueue<W>,
42 {
new(inner: W, orphan_queue: Q, signal: S) -> Self43     pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
44         Self {
45             inner: Some(inner),
46             orphan_queue,
47             signal,
48         }
49     }
50 
inner(&self) -> &W51     fn inner(&self) -> &W {
52         self.inner.as_ref().expect("inner has gone away")
53     }
54 
inner_mut(&mut self) -> &mut W55     pub(crate) fn inner_mut(&mut self) -> &mut W {
56         self.inner.as_mut().expect("inner has gone away")
57     }
58 }
59 
60 impl<W, Q, S> Future for Reaper<W, Q, S>
61 where
62     W: Wait + Unpin,
63     Q: OrphanQueue<W> + Unpin,
64     S: InternalStream + Unpin,
65 {
66     type Output = io::Result<ExitStatus>;
67 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>68     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
69         loop {
70             // If the child hasn't exited yet, then it's our responsibility to
71             // ensure the current task gets notified when it might be able to
72             // make progress. We can use the delivery of a SIGCHLD signal as a
73             // sign that we can potentially make progress.
74             //
75             // However, we will register for a notification on the next signal
76             // BEFORE we poll the child. Otherwise it is possible that the child
77             // can exit and the signal can arrive after we last polled the child,
78             // but before we've registered for a notification on the next signal
79             // (this can cause a deadlock if there are no more spawned children
80             // which can generate a different signal for us). A side effect of
81             // pre-registering for signal notifications is that when the child
82             // exits, we will have already registered for an additional
83             // notification we don't need to consume. If another signal arrives,
84             // this future's task will be notified/woken up again. Since the
85             // futures model allows for spurious wake ups this extra wakeup
86             // should not cause significant issues with parent futures.
87             let registered_interest = self.signal.poll_recv(cx).is_pending();
88 
89             if let Some(status) = self.inner_mut().try_wait()? {
90                 return Poll::Ready(Ok(status));
91             }
92 
93             // If our attempt to poll for the next signal was not ready, then
94             // we've arranged for our task to get notified and we can bail out.
95             if registered_interest {
96                 return Poll::Pending;
97             } else {
98                 // Otherwise, if the signal stream delivered a signal to us, we
99                 // won't get notified at the next signal, so we'll loop and try
100                 // again.
101                 continue;
102             }
103         }
104     }
105 }
106 
107 impl<W, Q, S> Kill for Reaper<W, Q, S>
108 where
109     W: Kill + Wait,
110     Q: OrphanQueue<W>,
111 {
kill(&mut self) -> io::Result<()>112     fn kill(&mut self) -> io::Result<()> {
113         self.inner_mut().kill()
114     }
115 }
116 
117 impl<W, Q, S> Drop for Reaper<W, Q, S>
118 where
119     W: Wait,
120     Q: OrphanQueue<W>,
121 {
drop(&mut self)122     fn drop(&mut self) {
123         if let Ok(Some(_)) = self.inner_mut().try_wait() {
124             return;
125         }
126 
127         let orphan = self.inner.take().unwrap();
128         self.orphan_queue.push_orphan(orphan);
129     }
130 }
131 
132 #[cfg(all(test, not(loom)))]
133 mod test {
134     use super::*;
135 
136     use crate::process::unix::orphan::test::MockQueue;
137     use futures::future::FutureExt;
138     use std::os::unix::process::ExitStatusExt;
139     use std::process::ExitStatus;
140     use std::task::Context;
141     use std::task::Poll;
142 
143     #[derive(Debug)]
144     struct MockWait {
145         total_kills: usize,
146         total_waits: usize,
147         num_wait_until_status: usize,
148         status: ExitStatus,
149     }
150 
151     impl MockWait {
new(status: ExitStatus, num_wait_until_status: usize) -> Self152         fn new(status: ExitStatus, num_wait_until_status: usize) -> Self {
153             Self {
154                 total_kills: 0,
155                 total_waits: 0,
156                 num_wait_until_status,
157                 status,
158             }
159         }
160     }
161 
162     impl Wait for MockWait {
id(&self) -> u32163         fn id(&self) -> u32 {
164             0
165         }
166 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>167         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
168             let ret = if self.num_wait_until_status == self.total_waits {
169                 Some(self.status)
170             } else {
171                 None
172             };
173 
174             self.total_waits += 1;
175             Ok(ret)
176         }
177     }
178 
179     impl Kill for MockWait {
kill(&mut self) -> io::Result<()>180         fn kill(&mut self) -> io::Result<()> {
181             self.total_kills += 1;
182             Ok(())
183         }
184     }
185 
186     struct MockStream {
187         total_polls: usize,
188         values: Vec<Option<()>>,
189     }
190 
191     impl MockStream {
new(values: Vec<Option<()>>) -> Self192         fn new(values: Vec<Option<()>>) -> Self {
193             Self {
194                 total_polls: 0,
195                 values,
196             }
197         }
198     }
199 
200     impl InternalStream for MockStream {
poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>>201         fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
202             self.total_polls += 1;
203             match self.values.remove(0) {
204                 Some(()) => Poll::Ready(Some(())),
205                 None => Poll::Pending,
206             }
207         }
208     }
209 
210     #[test]
reaper()211     fn reaper() {
212         let exit = ExitStatus::from_raw(0);
213         let mock = MockWait::new(exit, 3);
214         let mut grim = Reaper::new(
215             mock,
216             MockQueue::new(),
217             MockStream::new(vec![None, Some(()), None, None, None]),
218         );
219 
220         let waker = futures::task::noop_waker();
221         let mut context = Context::from_waker(&waker);
222 
223         // Not yet exited, interest registered
224         assert!(grim.poll_unpin(&mut context).is_pending());
225         assert_eq!(1, grim.signal.total_polls);
226         assert_eq!(1, grim.total_waits);
227         assert_eq!(0, grim.orphan_queue.total_reaps.get());
228         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
229 
230         // Not yet exited, couldn't register interest the first time
231         // but managed to register interest the second time around
232         assert!(grim.poll_unpin(&mut context).is_pending());
233         assert_eq!(3, grim.signal.total_polls);
234         assert_eq!(3, grim.total_waits);
235         assert_eq!(0, grim.orphan_queue.total_reaps.get());
236         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
237 
238         // Exited
239         if let Poll::Ready(r) = grim.poll_unpin(&mut context) {
240             assert!(r.is_ok());
241             let exit_code = r.unwrap();
242             assert_eq!(exit_code, exit);
243         } else {
244             unreachable!();
245         }
246         assert_eq!(4, grim.signal.total_polls);
247         assert_eq!(4, grim.total_waits);
248         assert_eq!(0, grim.orphan_queue.total_reaps.get());
249         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
250     }
251 
252     #[test]
kill()253     fn kill() {
254         let exit = ExitStatus::from_raw(0);
255         let mut grim = Reaper::new(
256             MockWait::new(exit, 0),
257             MockQueue::new(),
258             MockStream::new(vec![None]),
259         );
260 
261         grim.kill().unwrap();
262         assert_eq!(1, grim.total_kills);
263         assert_eq!(0, grim.orphan_queue.total_reaps.get());
264         assert!(grim.orphan_queue.all_enqueued.borrow().is_empty());
265     }
266 
267     #[test]
drop_reaps_if_possible()268     fn drop_reaps_if_possible() {
269         let exit = ExitStatus::from_raw(0);
270         let mut mock = MockWait::new(exit, 0);
271 
272         {
273             let queue = MockQueue::new();
274 
275             let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
276 
277             drop(grim);
278 
279             assert_eq!(0, queue.total_reaps.get());
280             assert!(queue.all_enqueued.borrow().is_empty());
281         }
282 
283         assert_eq!(1, mock.total_waits);
284         assert_eq!(0, mock.total_kills);
285     }
286 
287     #[test]
drop_enqueues_orphan_if_wait_fails()288     fn drop_enqueues_orphan_if_wait_fails() {
289         let exit = ExitStatus::from_raw(0);
290         let mut mock = MockWait::new(exit, 2);
291 
292         {
293             let queue = MockQueue::<&mut MockWait>::new();
294             let grim = Reaper::new(&mut mock, &queue, MockStream::new(vec![]));
295             drop(grim);
296 
297             assert_eq!(0, queue.total_reaps.get());
298             assert_eq!(1, queue.all_enqueued.borrow().len());
299         }
300 
301         assert_eq!(1, mock.total_waits);
302         assert_eq!(0, mock.total_kills);
303     }
304 }
305