• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::{Mutex, MutexGuard};
2 use crate::signal::unix::driver::Handle as SignalHandle;
3 use crate::signal::unix::{signal_with_handle, SignalKind};
4 use crate::sync::watch;
5 use std::io;
6 use std::process::ExitStatus;
7 
8 /// An interface for waiting on a process to exit.
9 pub(crate) trait Wait {
10     /// Get the identifier for this process or diagnostics.
id(&self) -> u3211     fn id(&self) -> u32;
12     /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>13     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
14 }
15 
16 impl<T: Wait> Wait for &mut T {
id(&self) -> u3217     fn id(&self) -> u32 {
18         (**self).id()
19     }
20 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>21     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
22         (**self).try_wait()
23     }
24 }
25 
26 /// An interface for queueing up an orphaned process so that it can be reaped.
27 pub(crate) trait OrphanQueue<T> {
28     /// Adds an orphan to the queue.
push_orphan(&self, orphan: T)29     fn push_orphan(&self, orphan: T);
30 }
31 
32 impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
push_orphan(&self, orphan: T)33     fn push_orphan(&self, orphan: T) {
34         (**self).push_orphan(orphan);
35     }
36 }
37 
38 /// An implementation of `OrphanQueue`.
39 #[derive(Debug)]
40 pub(crate) struct OrphanQueueImpl<T> {
41     sigchild: Mutex<Option<watch::Receiver<()>>>,
42     queue: Mutex<Vec<T>>,
43 }
44 
45 impl<T> OrphanQueueImpl<T> {
new() -> Self46     pub(crate) fn new() -> Self {
47         Self {
48             sigchild: Mutex::new(None),
49             queue: Mutex::new(Vec::new()),
50         }
51     }
52 
53     #[cfg(test)]
len(&self) -> usize54     fn len(&self) -> usize {
55         self.queue.lock().len()
56     }
57 
push_orphan(&self, orphan: T) where T: Wait,58     pub(crate) fn push_orphan(&self, orphan: T)
59     where
60         T: Wait,
61     {
62         self.queue.lock().push(orphan)
63     }
64 
65     /// Attempts to reap every process in the queue, ignoring any errors and
66     /// enqueueing any orphans which have not yet exited.
reap_orphans(&self, handle: &SignalHandle) where T: Wait,67     pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
68     where
69         T: Wait,
70     {
71         // If someone else is holding the lock, they will be responsible for draining
72         // the queue as necessary, so we can safely bail if that happens
73         if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
74             match &mut *sigchild_guard {
75                 Some(sigchild) => {
76                     if sigchild.try_has_changed().and_then(Result::ok).is_some() {
77                         drain_orphan_queue(self.queue.lock());
78                     }
79                 }
80                 None => {
81                     let queue = self.queue.lock();
82 
83                     // Be lazy and only initialize the SIGCHLD listener if there
84                     // are any orphaned processes in the queue.
85                     if !queue.is_empty() {
86                         // An errors shouldn't really happen here, but if it does it
87                         // means that the signal driver isn't running, in
88                         // which case there isn't anything we can
89                         // register/initialize here, so we can try again later
90                         if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
91                             *sigchild_guard = Some(sigchild);
92                             drain_orphan_queue(queue);
93                         }
94                     }
95                 }
96             }
97         }
98     }
99 }
100 
drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) where T: Wait,101 fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
102 where
103     T: Wait,
104 {
105     for i in (0..queue.len()).rev() {
106         match queue[i].try_wait() {
107             Ok(None) => {}
108             Ok(Some(_)) | Err(_) => {
109                 // The stdlib handles interruption errors (EINTR) when polling a child process.
110                 // All other errors represent invalid inputs or pids that have already been
111                 // reaped, so we can drop the orphan in case an error is raised.
112                 queue.swap_remove(i);
113             }
114         }
115     }
116 
117     drop(queue);
118 }
119 
120 #[cfg(all(test, not(loom)))]
121 pub(crate) mod test {
122     use super::*;
123     use crate::io::driver::Driver as IoDriver;
124     use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
125     use crate::sync::watch;
126     use std::cell::{Cell, RefCell};
127     use std::io;
128     use std::os::unix::process::ExitStatusExt;
129     use std::process::ExitStatus;
130     use std::rc::Rc;
131 
132     pub(crate) struct MockQueue<W> {
133         pub(crate) all_enqueued: RefCell<Vec<W>>,
134     }
135 
136     impl<W> MockQueue<W> {
new() -> Self137         pub(crate) fn new() -> Self {
138             Self {
139                 all_enqueued: RefCell::new(Vec::new()),
140             }
141         }
142     }
143 
144     impl<W> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)145         fn push_orphan(&self, orphan: W) {
146             self.all_enqueued.borrow_mut().push(orphan);
147         }
148     }
149 
150     struct MockWait {
151         total_waits: Rc<Cell<usize>>,
152         num_wait_until_status: usize,
153         return_err: bool,
154     }
155 
156     impl MockWait {
new(num_wait_until_status: usize) -> Self157         fn new(num_wait_until_status: usize) -> Self {
158             Self {
159                 total_waits: Rc::new(Cell::new(0)),
160                 num_wait_until_status,
161                 return_err: false,
162             }
163         }
164 
with_err() -> Self165         fn with_err() -> Self {
166             Self {
167                 total_waits: Rc::new(Cell::new(0)),
168                 num_wait_until_status: 0,
169                 return_err: true,
170             }
171         }
172     }
173 
174     impl Wait for MockWait {
id(&self) -> u32175         fn id(&self) -> u32 {
176             42
177         }
178 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>179         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
180             let waits = self.total_waits.get();
181 
182             let ret = if self.num_wait_until_status == waits {
183                 if self.return_err {
184                     Ok(Some(ExitStatus::from_raw(0)))
185                 } else {
186                     Err(io::Error::new(io::ErrorKind::Other, "mock err"))
187                 }
188             } else {
189                 Ok(None)
190             };
191 
192             self.total_waits.set(waits + 1);
193             ret
194         }
195     }
196 
197     #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()198     fn drain_attempts_a_single_reap_of_all_queued_orphans() {
199         let first_orphan = MockWait::new(0);
200         let second_orphan = MockWait::new(1);
201         let third_orphan = MockWait::new(2);
202         let fourth_orphan = MockWait::with_err();
203 
204         let first_waits = first_orphan.total_waits.clone();
205         let second_waits = second_orphan.total_waits.clone();
206         let third_waits = third_orphan.total_waits.clone();
207         let fourth_waits = fourth_orphan.total_waits.clone();
208 
209         let orphanage = OrphanQueueImpl::new();
210         orphanage.push_orphan(first_orphan);
211         orphanage.push_orphan(third_orphan);
212         orphanage.push_orphan(second_orphan);
213         orphanage.push_orphan(fourth_orphan);
214 
215         assert_eq!(orphanage.len(), 4);
216 
217         drain_orphan_queue(orphanage.queue.lock());
218         assert_eq!(orphanage.len(), 2);
219         assert_eq!(first_waits.get(), 1);
220         assert_eq!(second_waits.get(), 1);
221         assert_eq!(third_waits.get(), 1);
222         assert_eq!(fourth_waits.get(), 1);
223 
224         drain_orphan_queue(orphanage.queue.lock());
225         assert_eq!(orphanage.len(), 1);
226         assert_eq!(first_waits.get(), 1);
227         assert_eq!(second_waits.get(), 2);
228         assert_eq!(third_waits.get(), 2);
229         assert_eq!(fourth_waits.get(), 1);
230 
231         drain_orphan_queue(orphanage.queue.lock());
232         assert_eq!(orphanage.len(), 0);
233         assert_eq!(first_waits.get(), 1);
234         assert_eq!(second_waits.get(), 2);
235         assert_eq!(third_waits.get(), 3);
236         assert_eq!(fourth_waits.get(), 1);
237 
238         // Safe to reap when empty
239         drain_orphan_queue(orphanage.queue.lock());
240     }
241 
242     #[test]
no_reap_if_no_signal_received()243     fn no_reap_if_no_signal_received() {
244         let (tx, rx) = watch::channel(());
245 
246         let handle = SignalHandle::default();
247 
248         let orphanage = OrphanQueueImpl::new();
249         *orphanage.sigchild.lock() = Some(rx);
250 
251         let orphan = MockWait::new(2);
252         let waits = orphan.total_waits.clone();
253         orphanage.push_orphan(orphan);
254 
255         orphanage.reap_orphans(&handle);
256         assert_eq!(waits.get(), 0);
257 
258         orphanage.reap_orphans(&handle);
259         assert_eq!(waits.get(), 0);
260 
261         tx.send(()).unwrap();
262         orphanage.reap_orphans(&handle);
263         assert_eq!(waits.get(), 1);
264     }
265 
266     #[test]
no_reap_if_signal_lock_held()267     fn no_reap_if_signal_lock_held() {
268         let handle = SignalHandle::default();
269 
270         let orphanage = OrphanQueueImpl::new();
271         let signal_guard = orphanage.sigchild.lock();
272 
273         let orphan = MockWait::new(2);
274         let waits = orphan.total_waits.clone();
275         orphanage.push_orphan(orphan);
276 
277         orphanage.reap_orphans(&handle);
278         assert_eq!(waits.get(), 0);
279 
280         drop(signal_guard);
281     }
282 
283     #[test]
does_not_register_signal_if_queue_empty()284     fn does_not_register_signal_if_queue_empty() {
285         let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
286         let handle = signal_driver.handle();
287 
288         let orphanage = OrphanQueueImpl::new();
289         assert!(orphanage.sigchild.lock().is_none()); // Sanity
290 
291         // No register when queue empty
292         orphanage.reap_orphans(&handle);
293         assert!(orphanage.sigchild.lock().is_none());
294 
295         let orphan = MockWait::new(2);
296         let waits = orphan.total_waits.clone();
297         orphanage.push_orphan(orphan);
298 
299         orphanage.reap_orphans(&handle);
300         assert!(orphanage.sigchild.lock().is_some());
301         assert_eq!(waits.get(), 1); // Eager reap when registering listener
302     }
303 
304     #[test]
does_nothing_if_signal_could_not_be_registered()305     fn does_nothing_if_signal_could_not_be_registered() {
306         let handle = SignalHandle::default();
307 
308         let orphanage = OrphanQueueImpl::new();
309         assert!(orphanage.sigchild.lock().is_none());
310 
311         let orphan = MockWait::new(2);
312         let waits = orphan.total_waits.clone();
313         orphanage.push_orphan(orphan);
314 
315         // Signal handler has "gone away", nothing to register or reap
316         orphanage.reap_orphans(&handle);
317         assert!(orphanage.sigchild.lock().is_none());
318         assert_eq!(waits.get(), 0);
319     }
320 }
321