• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::{Mutex, MutexGuard};
2 use crate::runtime::signal::Handle as SignalHandle;
3 use crate::signal::unix::{signal_with_handle, SignalKind};
4 use crate::sync::watch;
5 use std::io;
6 use std::process::ExitStatus;
7 
8 /// An interface for waiting on a process to exit.
9 pub(crate) trait Wait {
10     /// Get the identifier for this process or diagnostics.
id(&self) -> u3211     fn id(&self) -> u32;
12     /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>13     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
14 }
15 
16 impl<T: Wait> Wait for &mut T {
id(&self) -> u3217     fn id(&self) -> u32 {
18         (**self).id()
19     }
20 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>21     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
22         (**self).try_wait()
23     }
24 }
25 
26 /// An interface for queueing up an orphaned process so that it can be reaped.
27 pub(crate) trait OrphanQueue<T> {
28     /// Adds an orphan to the queue.
push_orphan(&self, orphan: T)29     fn push_orphan(&self, orphan: T);
30 }
31 
32 impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
push_orphan(&self, orphan: T)33     fn push_orphan(&self, orphan: T) {
34         (**self).push_orphan(orphan);
35     }
36 }
37 
38 /// An implementation of `OrphanQueue`.
39 #[derive(Debug)]
40 pub(crate) struct OrphanQueueImpl<T> {
41     sigchild: Mutex<Option<watch::Receiver<()>>>,
42     queue: Mutex<Vec<T>>,
43 }
44 
45 impl<T> OrphanQueueImpl<T> {
46     cfg_not_has_const_mutex_new! {
47         pub(crate) fn new() -> Self {
48             Self {
49                 sigchild: Mutex::new(None),
50                 queue: Mutex::new(Vec::new()),
51             }
52         }
53     }
54 
55     cfg_has_const_mutex_new! {
56         pub(crate) const fn new() -> Self {
57             Self {
58                 sigchild: Mutex::const_new(None),
59                 queue: Mutex::const_new(Vec::new()),
60             }
61         }
62     }
63 
64     #[cfg(test)]
len(&self) -> usize65     fn len(&self) -> usize {
66         self.queue.lock().len()
67     }
68 
push_orphan(&self, orphan: T) where T: Wait,69     pub(crate) fn push_orphan(&self, orphan: T)
70     where
71         T: Wait,
72     {
73         self.queue.lock().push(orphan)
74     }
75 
76     /// Attempts to reap every process in the queue, ignoring any errors and
77     /// enqueueing any orphans which have not yet exited.
reap_orphans(&self, handle: &SignalHandle) where T: Wait,78     pub(crate) fn reap_orphans(&self, handle: &SignalHandle)
79     where
80         T: Wait,
81     {
82         // If someone else is holding the lock, they will be responsible for draining
83         // the queue as necessary, so we can safely bail if that happens
84         if let Some(mut sigchild_guard) = self.sigchild.try_lock() {
85             match &mut *sigchild_guard {
86                 Some(sigchild) => {
87                     if sigchild.try_has_changed().and_then(Result::ok).is_some() {
88                         drain_orphan_queue(self.queue.lock());
89                     }
90                 }
91                 None => {
92                     let queue = self.queue.lock();
93 
94                     // Be lazy and only initialize the SIGCHLD listener if there
95                     // are any orphaned processes in the queue.
96                     if !queue.is_empty() {
97                         // An errors shouldn't really happen here, but if it does it
98                         // means that the signal driver isn't running, in
99                         // which case there isn't anything we can
100                         // register/initialize here, so we can try again later
101                         if let Ok(sigchild) = signal_with_handle(SignalKind::child(), handle) {
102                             *sigchild_guard = Some(sigchild);
103                             drain_orphan_queue(queue);
104                         }
105                     }
106                 }
107             }
108         }
109     }
110 }
111 
drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>) where T: Wait,112 fn drain_orphan_queue<T>(mut queue: MutexGuard<'_, Vec<T>>)
113 where
114     T: Wait,
115 {
116     for i in (0..queue.len()).rev() {
117         match queue[i].try_wait() {
118             Ok(None) => {}
119             Ok(Some(_)) | Err(_) => {
120                 // The stdlib handles interruption errors (EINTR) when polling a child process.
121                 // All other errors represent invalid inputs or pids that have already been
122                 // reaped, so we can drop the orphan in case an error is raised.
123                 queue.swap_remove(i);
124             }
125         }
126     }
127 
128     drop(queue);
129 }
130 
131 #[cfg(all(test, not(loom)))]
132 pub(crate) mod test {
133     use super::*;
134     use crate::runtime::io::Driver as IoDriver;
135     use crate::runtime::signal::{Driver as SignalDriver, Handle as SignalHandle};
136     use crate::sync::watch;
137     use std::cell::{Cell, RefCell};
138     use std::io;
139     use std::os::unix::process::ExitStatusExt;
140     use std::process::ExitStatus;
141     use std::rc::Rc;
142 
143     pub(crate) struct MockQueue<W> {
144         pub(crate) all_enqueued: RefCell<Vec<W>>,
145     }
146 
147     impl<W> MockQueue<W> {
new() -> Self148         pub(crate) fn new() -> Self {
149             Self {
150                 all_enqueued: RefCell::new(Vec::new()),
151             }
152         }
153     }
154 
155     impl<W> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)156         fn push_orphan(&self, orphan: W) {
157             self.all_enqueued.borrow_mut().push(orphan);
158         }
159     }
160 
161     struct MockWait {
162         total_waits: Rc<Cell<usize>>,
163         num_wait_until_status: usize,
164         return_err: bool,
165     }
166 
167     impl MockWait {
new(num_wait_until_status: usize) -> Self168         fn new(num_wait_until_status: usize) -> Self {
169             Self {
170                 total_waits: Rc::new(Cell::new(0)),
171                 num_wait_until_status,
172                 return_err: false,
173             }
174         }
175 
with_err() -> Self176         fn with_err() -> Self {
177             Self {
178                 total_waits: Rc::new(Cell::new(0)),
179                 num_wait_until_status: 0,
180                 return_err: true,
181             }
182         }
183     }
184 
185     impl Wait for MockWait {
id(&self) -> u32186         fn id(&self) -> u32 {
187             42
188         }
189 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>190         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
191             let waits = self.total_waits.get();
192 
193             let ret = if self.num_wait_until_status == waits {
194                 if self.return_err {
195                     Ok(Some(ExitStatus::from_raw(0)))
196                 } else {
197                     Err(io::Error::new(io::ErrorKind::Other, "mock err"))
198                 }
199             } else {
200                 Ok(None)
201             };
202 
203             self.total_waits.set(waits + 1);
204             ret
205         }
206     }
207 
208     #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()209     fn drain_attempts_a_single_reap_of_all_queued_orphans() {
210         let first_orphan = MockWait::new(0);
211         let second_orphan = MockWait::new(1);
212         let third_orphan = MockWait::new(2);
213         let fourth_orphan = MockWait::with_err();
214 
215         let first_waits = first_orphan.total_waits.clone();
216         let second_waits = second_orphan.total_waits.clone();
217         let third_waits = third_orphan.total_waits.clone();
218         let fourth_waits = fourth_orphan.total_waits.clone();
219 
220         let orphanage = OrphanQueueImpl::new();
221         orphanage.push_orphan(first_orphan);
222         orphanage.push_orphan(third_orphan);
223         orphanage.push_orphan(second_orphan);
224         orphanage.push_orphan(fourth_orphan);
225 
226         assert_eq!(orphanage.len(), 4);
227 
228         drain_orphan_queue(orphanage.queue.lock());
229         assert_eq!(orphanage.len(), 2);
230         assert_eq!(first_waits.get(), 1);
231         assert_eq!(second_waits.get(), 1);
232         assert_eq!(third_waits.get(), 1);
233         assert_eq!(fourth_waits.get(), 1);
234 
235         drain_orphan_queue(orphanage.queue.lock());
236         assert_eq!(orphanage.len(), 1);
237         assert_eq!(first_waits.get(), 1);
238         assert_eq!(second_waits.get(), 2);
239         assert_eq!(third_waits.get(), 2);
240         assert_eq!(fourth_waits.get(), 1);
241 
242         drain_orphan_queue(orphanage.queue.lock());
243         assert_eq!(orphanage.len(), 0);
244         assert_eq!(first_waits.get(), 1);
245         assert_eq!(second_waits.get(), 2);
246         assert_eq!(third_waits.get(), 3);
247         assert_eq!(fourth_waits.get(), 1);
248 
249         // Safe to reap when empty
250         drain_orphan_queue(orphanage.queue.lock());
251     }
252 
253     #[test]
no_reap_if_no_signal_received()254     fn no_reap_if_no_signal_received() {
255         let (tx, rx) = watch::channel(());
256 
257         let handle = SignalHandle::default();
258 
259         let orphanage = OrphanQueueImpl::new();
260         *orphanage.sigchild.lock() = Some(rx);
261 
262         let orphan = MockWait::new(2);
263         let waits = orphan.total_waits.clone();
264         orphanage.push_orphan(orphan);
265 
266         orphanage.reap_orphans(&handle);
267         assert_eq!(waits.get(), 0);
268 
269         orphanage.reap_orphans(&handle);
270         assert_eq!(waits.get(), 0);
271 
272         tx.send(()).unwrap();
273         orphanage.reap_orphans(&handle);
274         assert_eq!(waits.get(), 1);
275     }
276 
277     #[test]
no_reap_if_signal_lock_held()278     fn no_reap_if_signal_lock_held() {
279         let handle = SignalHandle::default();
280 
281         let orphanage = OrphanQueueImpl::new();
282         let signal_guard = orphanage.sigchild.lock();
283 
284         let orphan = MockWait::new(2);
285         let waits = orphan.total_waits.clone();
286         orphanage.push_orphan(orphan);
287 
288         orphanage.reap_orphans(&handle);
289         assert_eq!(waits.get(), 0);
290 
291         drop(signal_guard);
292     }
293 
294     #[cfg_attr(miri, ignore)] // Miri does not support epoll.
295     #[test]
does_not_register_signal_if_queue_empty()296     fn does_not_register_signal_if_queue_empty() {
297         let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
298         let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
299         let handle = signal_driver.handle();
300 
301         let orphanage = OrphanQueueImpl::new();
302         assert!(orphanage.sigchild.lock().is_none()); // Sanity
303 
304         // No register when queue empty
305         orphanage.reap_orphans(&handle);
306         assert!(orphanage.sigchild.lock().is_none());
307 
308         let orphan = MockWait::new(2);
309         let waits = orphan.total_waits.clone();
310         orphanage.push_orphan(orphan);
311 
312         orphanage.reap_orphans(&handle);
313         assert!(orphanage.sigchild.lock().is_some());
314         assert_eq!(waits.get(), 1); // Eager reap when registering listener
315     }
316 
317     #[test]
does_nothing_if_signal_could_not_be_registered()318     fn does_nothing_if_signal_could_not_be_registered() {
319         let handle = SignalHandle::default();
320 
321         let orphanage = OrphanQueueImpl::new();
322         assert!(orphanage.sigchild.lock().is_none());
323 
324         let orphan = MockWait::new(2);
325         let waits = orphan.total_waits.clone();
326         orphanage.push_orphan(orphan);
327 
328         // Signal handler has "gone away", nothing to register or reap
329         orphanage.reap_orphans(&handle);
330         assert!(orphanage.sigchild.lock().is_none());
331         assert_eq!(waits.get(), 0);
332     }
333 }
334