1 use crate::loom::sync::{Mutex, MutexGuard};
2 use crate::signal::unix::driver::Handle as SignalHandle;
3 use crate::signal::unix::{signal_with_handle, SignalKind};
4 use crate::sync::watch;
5 use std::io;
6 use std::process::ExitStatus;
7
8 /// An interface for waiting on a process to exit.
9 pub(crate) trait Wait {
10 /// Get the identifier for this process or diagnostics.
id(&self) -> u3211 fn id(&self) -> u32;
12 /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>13 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
14 }
15
16 impl<T: Wait> Wait for &mut T {
id(&self) -> u3217 fn id(&self) -> u32 {
18 (**self).id()
19 }
20
try_wait(&mut self) -> io::Result<Option<ExitStatus>>21 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
22 (**self).try_wait()
23 }
24 }
25
26 /// An interface for queueing up an orphaned process so that it can be reaped.
27 pub(crate) trait OrphanQueue<T> {
28 /// Adds an orphan to the queue.
push_orphan(&self, orphan: T)29 fn push_orphan(&self, orphan: T);
30 }
31
32 impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
push_orphan(&self, orphan: T)33 fn push_orphan(&self, orphan: T) {
34 (**self).push_orphan(orphan);
35 }
36 }
37
38 /// An implementation of `OrphanQueue`.
39 #[derive(Debug)]
40 pub(crate) struct OrphanQueueImpl<T> {
41 sigchild: Mutex<Option<watch::Receiver<()>>>,
42 queue: Mutex<Vec<T>>,
43 }
44
45 impl<T> OrphanQueueImpl<T> {
new() -> Self46 pub(crate) fn new() -> Self {
47 Self {
48 sigchild: Mutex::new(None),
49 queue: Mutex::new(Vec::new()),
50 }
51 }
52
53 #[cfg(test)]
len(&self) -> usize54 fn len(&self) -> usize {
55 self.queue.lock().len()
56 }
57
push_orphan(&self, orphan: T) where T: Wait,58 pub(crate) fn push_orphan(&self, orphan: T)
59 where
60 T: Wait,
61 {
62 self.queue.lock().push(orphan)
63 }
64
65 /// Attempts to reap every process in the queue, ignoring any errors and
66 /// enqueueing any orphans which have not yet exited.
reap_orphans(&self, handle: &SignalHandle) where T: Wait,67 pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
68 where
69 T: Wait,
70 {
71 // If someone else is holding the lock, they will be responsible for draining
72 // the queue as necessary, so we can safely bail if that happens
73 if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
74 match &mut *sigchild_guard {
75 Some(sigchild) => {
76 if sigchild.try_has_changed().and_then(Result::ok).is_some() {
77 drain_orphan_queue(self.queue.lock());
78 }
79 }
80 None => {
81 let queue = self.queue.lock();
82
83 // Be lazy and only initialize the SIGCHLD listener if there
84 // are any orphaned processes in the queue.
85 if !queue.is_empty() {
86 // An errors shouldn't really happen here, but if it does it
87 // means that the signal driver isn't running, in
88 // which case there isn't anything we can
89 // register/initialize here, so we can try again later
90 if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
91 *sigchild_guard = Some(sigchild);
92 drain_orphan_queue(queue);
93 }
94 }
95 }
96 }
97 }
98 }
99 }
100
drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) where T: Wait,101 fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
102 where
103 T: Wait,
104 {
105 for i in (0..queue.len()).rev() {
106 match queue[i].try_wait() {
107 Ok(None) => {}
108 Ok(Some(_)) | Err(_) => {
109 // The stdlib handles interruption errors (EINTR) when polling a child process.
110 // All other errors represent invalid inputs or pids that have already been
111 // reaped, so we can drop the orphan in case an error is raised.
112 queue.swap_remove(i);
113 }
114 }
115 }
116
117 drop(queue);
118 }
119
120 #[cfg(all(test, not(loom)))]
121 pub(crate) mod test {
122 use super::*;
123 use crate::io::driver::Driver as IoDriver;
124 use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle};
125 use crate::sync::watch;
126 use std::cell::{Cell, RefCell};
127 use std::io;
128 use std::os::unix::process::ExitStatusExt;
129 use std::process::ExitStatus;
130 use std::rc::Rc;
131
132 pub(crate) struct MockQueue<W> {
133 pub(crate) all_enqueued: RefCell<Vec<W>>,
134 }
135
136 impl<W> MockQueue<W> {
new() -> Self137 pub(crate) fn new() -> Self {
138 Self {
139 all_enqueued: RefCell::new(Vec::new()),
140 }
141 }
142 }
143
144 impl<W> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)145 fn push_orphan(&self, orphan: W) {
146 self.all_enqueued.borrow_mut().push(orphan);
147 }
148 }
149
150 struct MockWait {
151 total_waits: Rc<Cell<usize>>,
152 num_wait_until_status: usize,
153 return_err: bool,
154 }
155
156 impl MockWait {
new(num_wait_until_status: usize) -> Self157 fn new(num_wait_until_status: usize) -> Self {
158 Self {
159 total_waits: Rc::new(Cell::new(0)),
160 num_wait_until_status,
161 return_err: false,
162 }
163 }
164
with_err() -> Self165 fn with_err() -> Self {
166 Self {
167 total_waits: Rc::new(Cell::new(0)),
168 num_wait_until_status: 0,
169 return_err: true,
170 }
171 }
172 }
173
174 impl Wait for MockWait {
id(&self) -> u32175 fn id(&self) -> u32 {
176 42
177 }
178
try_wait(&mut self) -> io::Result<Option<ExitStatus>>179 fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
180 let waits = self.total_waits.get();
181
182 let ret = if self.num_wait_until_status == waits {
183 if self.return_err {
184 Ok(Some(ExitStatus::from_raw(0)))
185 } else {
186 Err(io::Error::new(io::ErrorKind::Other, "mock err"))
187 }
188 } else {
189 Ok(None)
190 };
191
192 self.total_waits.set(waits + 1);
193 ret
194 }
195 }
196
197 #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()198 fn drain_attempts_a_single_reap_of_all_queued_orphans() {
199 let first_orphan = MockWait::new(0);
200 let second_orphan = MockWait::new(1);
201 let third_orphan = MockWait::new(2);
202 let fourth_orphan = MockWait::with_err();
203
204 let first_waits = first_orphan.total_waits.clone();
205 let second_waits = second_orphan.total_waits.clone();
206 let third_waits = third_orphan.total_waits.clone();
207 let fourth_waits = fourth_orphan.total_waits.clone();
208
209 let orphanage = OrphanQueueImpl::new();
210 orphanage.push_orphan(first_orphan);
211 orphanage.push_orphan(third_orphan);
212 orphanage.push_orphan(second_orphan);
213 orphanage.push_orphan(fourth_orphan);
214
215 assert_eq!(orphanage.len(), 4);
216
217 drain_orphan_queue(orphanage.queue.lock());
218 assert_eq!(orphanage.len(), 2);
219 assert_eq!(first_waits.get(), 1);
220 assert_eq!(second_waits.get(), 1);
221 assert_eq!(third_waits.get(), 1);
222 assert_eq!(fourth_waits.get(), 1);
223
224 drain_orphan_queue(orphanage.queue.lock());
225 assert_eq!(orphanage.len(), 1);
226 assert_eq!(first_waits.get(), 1);
227 assert_eq!(second_waits.get(), 2);
228 assert_eq!(third_waits.get(), 2);
229 assert_eq!(fourth_waits.get(), 1);
230
231 drain_orphan_queue(orphanage.queue.lock());
232 assert_eq!(orphanage.len(), 0);
233 assert_eq!(first_waits.get(), 1);
234 assert_eq!(second_waits.get(), 2);
235 assert_eq!(third_waits.get(), 3);
236 assert_eq!(fourth_waits.get(), 1);
237
238 // Safe to reap when empty
239 drain_orphan_queue(orphanage.queue.lock());
240 }
241
242 #[test]
no_reap_if_no_signal_received()243 fn no_reap_if_no_signal_received() {
244 let (tx, rx) = watch::channel(());
245
246 let handle = SignalHandle::default();
247
248 let orphanage = OrphanQueueImpl::new();
249 *orphanage.sigchild.lock() = Some(rx);
250
251 let orphan = MockWait::new(2);
252 let waits = orphan.total_waits.clone();
253 orphanage.push_orphan(orphan);
254
255 orphanage.reap_orphans(&handle);
256 assert_eq!(waits.get(), 0);
257
258 orphanage.reap_orphans(&handle);
259 assert_eq!(waits.get(), 0);
260
261 tx.send(()).unwrap();
262 orphanage.reap_orphans(&handle);
263 assert_eq!(waits.get(), 1);
264 }
265
266 #[test]
no_reap_if_signal_lock_held()267 fn no_reap_if_signal_lock_held() {
268 let handle = SignalHandle::default();
269
270 let orphanage = OrphanQueueImpl::new();
271 let signal_guard = orphanage.sigchild.lock();
272
273 let orphan = MockWait::new(2);
274 let waits = orphan.total_waits.clone();
275 orphanage.push_orphan(orphan);
276
277 orphanage.reap_orphans(&handle);
278 assert_eq!(waits.get(), 0);
279
280 drop(signal_guard);
281 }
282
283 #[test]
does_not_register_signal_if_queue_empty()284 fn does_not_register_signal_if_queue_empty() {
285 let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();
286 let handle = signal_driver.handle();
287
288 let orphanage = OrphanQueueImpl::new();
289 assert!(orphanage.sigchild.lock().is_none()); // Sanity
290
291 // No register when queue empty
292 orphanage.reap_orphans(&handle);
293 assert!(orphanage.sigchild.lock().is_none());
294
295 let orphan = MockWait::new(2);
296 let waits = orphan.total_waits.clone();
297 orphanage.push_orphan(orphan);
298
299 orphanage.reap_orphans(&handle);
300 assert!(orphanage.sigchild.lock().is_some());
301 assert_eq!(waits.get(), 1); // Eager reap when registering listener
302 }
303
304 #[test]
does_nothing_if_signal_could_not_be_registered()305 fn does_nothing_if_signal_could_not_be_registered() {
306 let handle = SignalHandle::default();
307
308 let orphanage = OrphanQueueImpl::new();
309 assert!(orphanage.sigchild.lock().is_none());
310
311 let orphan = MockWait::new(2);
312 let waits = orphan.total_waits.clone();
313 orphanage.push_orphan(orphan);
314
315 // Signal handler has "gone away", nothing to register or reap
316 orphanage.reap_orphans(&handle);
317 assert!(orphanage.sigchild.lock().is_none());
318 assert_eq!(waits.get(), 0);
319 }
320 }
321