• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::io;
2 use std::process::ExitStatus;
3 use std::sync::Mutex;
4 
5 /// An interface for waiting on a process to exit.
6 pub(crate) trait Wait {
7     /// Get the identifier for this process or diagnostics.
id(&self) -> u328     fn id(&self) -> u32;
9     /// Try waiting for a process to exit in a non-blocking manner.
try_wait(&mut self) -> io::Result<Option<ExitStatus>>10     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>>;
11 }
12 
13 impl<T: Wait> Wait for &mut T {
id(&self) -> u3214     fn id(&self) -> u32 {
15         (**self).id()
16     }
17 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>18     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
19         (**self).try_wait()
20     }
21 }
22 
23 /// An interface for reaping a set of orphaned processes.
24 pub(crate) trait ReapOrphanQueue {
25     /// Attempts to reap every process in the queue, ignoring any errors and
26     /// enqueueing any orphans which have not yet exited.
reap_orphans(&self)27     fn reap_orphans(&self);
28 }
29 
30 impl<T: ReapOrphanQueue> ReapOrphanQueue for &T {
reap_orphans(&self)31     fn reap_orphans(&self) {
32         (**self).reap_orphans()
33     }
34 }
35 
36 /// An interface for queueing up an orphaned process so that it can be reaped.
37 pub(crate) trait OrphanQueue<T>: ReapOrphanQueue {
38     /// Adds an orphan to the queue.
push_orphan(&self, orphan: T)39     fn push_orphan(&self, orphan: T);
40 }
41 
42 impl<T, O: OrphanQueue<T>> OrphanQueue<T> for &O {
push_orphan(&self, orphan: T)43     fn push_orphan(&self, orphan: T) {
44         (**self).push_orphan(orphan);
45     }
46 }
47 
48 /// An implementation of `OrphanQueue`.
49 #[derive(Debug)]
50 pub(crate) struct OrphanQueueImpl<T> {
51     queue: Mutex<Vec<T>>,
52 }
53 
54 impl<T> OrphanQueueImpl<T> {
new() -> Self55     pub(crate) fn new() -> Self {
56         Self {
57             queue: Mutex::new(Vec::new()),
58         }
59     }
60 
61     #[cfg(test)]
len(&self) -> usize62     fn len(&self) -> usize {
63         self.queue.lock().unwrap().len()
64     }
65 }
66 
67 impl<T: Wait> OrphanQueue<T> for OrphanQueueImpl<T> {
push_orphan(&self, orphan: T)68     fn push_orphan(&self, orphan: T) {
69         self.queue.lock().unwrap().push(orphan)
70     }
71 }
72 
73 impl<T: Wait> ReapOrphanQueue for OrphanQueueImpl<T> {
reap_orphans(&self)74     fn reap_orphans(&self) {
75         let mut queue = self.queue.lock().unwrap();
76         let queue = &mut *queue;
77 
78         for i in (0..queue.len()).rev() {
79             match queue[i].try_wait() {
80                 Ok(None) => {}
81                 Ok(Some(_)) | Err(_) => {
82                     // The stdlib handles interruption errors (EINTR) when polling a child process.
83                     // All other errors represent invalid inputs or pids that have already been
84                     // reaped, so we can drop the orphan in case an error is raised.
85                     queue.swap_remove(i);
86                 }
87             }
88         }
89     }
90 }
91 
92 #[cfg(all(test, not(loom)))]
93 pub(crate) mod test {
94     use super::*;
95     use std::cell::{Cell, RefCell};
96     use std::io;
97     use std::os::unix::process::ExitStatusExt;
98     use std::process::ExitStatus;
99     use std::rc::Rc;
100 
101     pub(crate) struct MockQueue<W> {
102         pub(crate) all_enqueued: RefCell<Vec<W>>,
103         pub(crate) total_reaps: Cell<usize>,
104     }
105 
106     impl<W> MockQueue<W> {
new() -> Self107         pub(crate) fn new() -> Self {
108             Self {
109                 all_enqueued: RefCell::new(Vec::new()),
110                 total_reaps: Cell::new(0),
111             }
112         }
113     }
114 
115     impl<W> OrphanQueue<W> for MockQueue<W> {
push_orphan(&self, orphan: W)116         fn push_orphan(&self, orphan: W) {
117             self.all_enqueued.borrow_mut().push(orphan);
118         }
119     }
120 
121     impl<W> ReapOrphanQueue for MockQueue<W> {
reap_orphans(&self)122         fn reap_orphans(&self) {
123             self.total_reaps.set(self.total_reaps.get() + 1);
124         }
125     }
126 
127     struct MockWait {
128         total_waits: Rc<Cell<usize>>,
129         num_wait_until_status: usize,
130         return_err: bool,
131     }
132 
133     impl MockWait {
new(num_wait_until_status: usize) -> Self134         fn new(num_wait_until_status: usize) -> Self {
135             Self {
136                 total_waits: Rc::new(Cell::new(0)),
137                 num_wait_until_status,
138                 return_err: false,
139             }
140         }
141 
with_err() -> Self142         fn with_err() -> Self {
143             Self {
144                 total_waits: Rc::new(Cell::new(0)),
145                 num_wait_until_status: 0,
146                 return_err: true,
147             }
148         }
149     }
150 
151     impl Wait for MockWait {
id(&self) -> u32152         fn id(&self) -> u32 {
153             42
154         }
155 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>156         fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
157             let waits = self.total_waits.get();
158 
159             let ret = if self.num_wait_until_status == waits {
160                 if self.return_err {
161                     Ok(Some(ExitStatus::from_raw(0)))
162                 } else {
163                     Err(io::Error::new(io::ErrorKind::Other, "mock err"))
164                 }
165             } else {
166                 Ok(None)
167             };
168 
169             self.total_waits.set(waits + 1);
170             ret
171         }
172     }
173 
174     #[test]
drain_attempts_a_single_reap_of_all_queued_orphans()175     fn drain_attempts_a_single_reap_of_all_queued_orphans() {
176         let first_orphan = MockWait::new(0);
177         let second_orphan = MockWait::new(1);
178         let third_orphan = MockWait::new(2);
179         let fourth_orphan = MockWait::with_err();
180 
181         let first_waits = first_orphan.total_waits.clone();
182         let second_waits = second_orphan.total_waits.clone();
183         let third_waits = third_orphan.total_waits.clone();
184         let fourth_waits = fourth_orphan.total_waits.clone();
185 
186         let orphanage = OrphanQueueImpl::new();
187         orphanage.push_orphan(first_orphan);
188         orphanage.push_orphan(third_orphan);
189         orphanage.push_orphan(second_orphan);
190         orphanage.push_orphan(fourth_orphan);
191 
192         assert_eq!(orphanage.len(), 4);
193 
194         orphanage.reap_orphans();
195         assert_eq!(orphanage.len(), 2);
196         assert_eq!(first_waits.get(), 1);
197         assert_eq!(second_waits.get(), 1);
198         assert_eq!(third_waits.get(), 1);
199         assert_eq!(fourth_waits.get(), 1);
200 
201         orphanage.reap_orphans();
202         assert_eq!(orphanage.len(), 1);
203         assert_eq!(first_waits.get(), 1);
204         assert_eq!(second_waits.get(), 2);
205         assert_eq!(third_waits.get(), 2);
206         assert_eq!(fourth_waits.get(), 1);
207 
208         orphanage.reap_orphans();
209         assert_eq!(orphanage.len(), 0);
210         assert_eq!(first_waits.get(), 1);
211         assert_eq!(second_waits.get(), 2);
212         assert_eq!(third_waits.get(), 3);
213         assert_eq!(fourth_waits.get(), 1);
214 
215         orphanage.reap_orphans(); // Safe to reap when empty
216     }
217 }
218