• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Unix handling of child processes.
2 //!
3 //! Right now the only "fancy" thing about this is how we implement the
4 //! `Future` implementation on `Child` to get the exit status. Unix offers
5 //! no way to register a child with epoll, and the only real way to get a
6 //! notification when a process exits is the SIGCHLD signal.
7 //!
8 //! Signal handling in general is *super* hairy and complicated, and it's even
9 //! more complicated here with the fact that signals are coalesced, so we may
10 //! not get a SIGCHLD-per-child.
11 //!
12 //! Our best approximation here is to check *all spawned processes* for all
13 //! SIGCHLD signals received. To do that we create a `Signal`, implemented in
14 //! the `tokio-net` crate, which is a stream over signals being received.
15 //!
16 //! Later when we poll the process's exit status we simply check to see if a
17 //! SIGCHLD has happened since we last checked, and while that returns "yes" we
18 //! keep trying.
19 //!
20 //! Note that this means that this isn't really scalable, but then again
21 //! processes in general aren't scalable (e.g. millions) so it shouldn't be that
22 //! bad in theory...
23 
24 pub(crate) mod orphan;
25 use orphan::{OrphanQueue, OrphanQueueImpl, Wait};
26 
27 mod reap;
28 use reap::Reaper;
29 
30 use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf};
31 use crate::process::kill::Kill;
32 use crate::process::SpawnedChild;
33 use crate::runtime::signal::Handle as SignalHandle;
34 use crate::signal::unix::{signal, Signal, SignalKind};
35 
36 use mio::event::Source;
37 use mio::unix::SourceFd;
38 use std::fmt;
39 use std::fs::File;
40 use std::future::Future;
41 use std::io;
42 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
43 use std::pin::Pin;
44 use std::process::{Child as StdChild, ExitStatus, Stdio};
45 use std::task::Context;
46 use std::task::Poll;
47 
48 impl Wait for StdChild {
id(&self) -> u3249     fn id(&self) -> u32 {
50         self.id()
51     }
52 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>53     fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
54         self.try_wait()
55     }
56 }
57 
58 impl Kill for StdChild {
kill(&mut self) -> io::Result<()>59     fn kill(&mut self) -> io::Result<()> {
60         self.kill()
61     }
62 }
63 
64 cfg_not_has_const_mutex_new! {
65     fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
66         use crate::util::once_cell::OnceCell;
67 
68         static ORPHAN_QUEUE: OnceCell<OrphanQueueImpl<StdChild>> = OnceCell::new();
69 
70         ORPHAN_QUEUE.get(OrphanQueueImpl::new)
71     }
72 }
73 
74 cfg_has_const_mutex_new! {
75     fn get_orphan_queue() -> &'static OrphanQueueImpl<StdChild> {
76         static ORPHAN_QUEUE: OrphanQueueImpl<StdChild> = OrphanQueueImpl::new();
77 
78         &ORPHAN_QUEUE
79     }
80 }
81 
82 pub(crate) struct GlobalOrphanQueue;
83 
84 impl fmt::Debug for GlobalOrphanQueue {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result85     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
86         get_orphan_queue().fmt(fmt)
87     }
88 }
89 
90 impl GlobalOrphanQueue {
reap_orphans(handle: &SignalHandle)91     pub(crate) fn reap_orphans(handle: &SignalHandle) {
92         get_orphan_queue().reap_orphans(handle)
93     }
94 }
95 
96 impl OrphanQueue<StdChild> for GlobalOrphanQueue {
push_orphan(&self, orphan: StdChild)97     fn push_orphan(&self, orphan: StdChild) {
98         get_orphan_queue().push_orphan(orphan)
99     }
100 }
101 
102 #[must_use = "futures do nothing unless polled"]
103 pub(crate) struct Child {
104     inner: Reaper<StdChild, GlobalOrphanQueue, Signal>,
105 }
106 
107 impl fmt::Debug for Child {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result108     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
109         fmt.debug_struct("Child")
110             .field("pid", &self.inner.id())
111             .finish()
112     }
113 }
114 
spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild>115 pub(crate) fn spawn_child(cmd: &mut std::process::Command) -> io::Result<SpawnedChild> {
116     let mut child = cmd.spawn()?;
117     let stdin = child.stdin.take().map(stdio).transpose()?;
118     let stdout = child.stdout.take().map(stdio).transpose()?;
119     let stderr = child.stderr.take().map(stdio).transpose()?;
120 
121     let signal = signal(SignalKind::child())?;
122 
123     Ok(SpawnedChild {
124         child: Child {
125             inner: Reaper::new(child, GlobalOrphanQueue, signal),
126         },
127         stdin,
128         stdout,
129         stderr,
130     })
131 }
132 
133 impl Child {
id(&self) -> u32134     pub(crate) fn id(&self) -> u32 {
135         self.inner.id()
136     }
137 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>138     pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
139         self.inner.inner_mut().try_wait()
140     }
141 }
142 
143 impl Kill for Child {
kill(&mut self) -> io::Result<()>144     fn kill(&mut self) -> io::Result<()> {
145         self.inner.kill()
146     }
147 }
148 
149 impl Future for Child {
150     type Output = io::Result<ExitStatus>;
151 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>152     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
153         Pin::new(&mut self.inner).poll(cx)
154     }
155 }
156 
157 #[derive(Debug)]
158 pub(crate) struct Pipe {
159     // Actually a pipe is not a File. However, we are reusing `File` to get
160     // close on drop. This is a similar trick as `mio`.
161     fd: File,
162 }
163 
164 impl<T: IntoRawFd> From<T> for Pipe {
from(fd: T) -> Self165     fn from(fd: T) -> Self {
166         let fd = unsafe { File::from_raw_fd(fd.into_raw_fd()) };
167         Self { fd }
168     }
169 }
170 
171 impl<'a> io::Read for &'a Pipe {
read(&mut self, bytes: &mut [u8]) -> io::Result<usize>172     fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
173         (&self.fd).read(bytes)
174     }
175 }
176 
177 impl<'a> io::Write for &'a Pipe {
write(&mut self, bytes: &[u8]) -> io::Result<usize>178     fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
179         (&self.fd).write(bytes)
180     }
181 
flush(&mut self) -> io::Result<()>182     fn flush(&mut self) -> io::Result<()> {
183         (&self.fd).flush()
184     }
185 
write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>186     fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
187         (&self.fd).write_vectored(bufs)
188     }
189 }
190 
191 impl AsRawFd for Pipe {
as_raw_fd(&self) -> RawFd192     fn as_raw_fd(&self) -> RawFd {
193         self.fd.as_raw_fd()
194     }
195 }
196 
197 impl AsFd for Pipe {
as_fd(&self) -> BorrowedFd<'_>198     fn as_fd(&self) -> BorrowedFd<'_> {
199         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
200     }
201 }
202 
convert_to_blocking_file(io: ChildStdio) -> io::Result<File>203 fn convert_to_blocking_file(io: ChildStdio) -> io::Result<File> {
204     let mut fd = io.inner.into_inner()?.fd;
205 
206     // Ensure that the fd to be inherited is set to *blocking* mode, as this
207     // is the default that virtually all programs expect to have. Those
208     // programs that know how to work with nonblocking stdio will know how to
209     // change it to nonblocking mode.
210     set_nonblocking(&mut fd, false)?;
211 
212     Ok(fd)
213 }
214 
convert_to_stdio(io: ChildStdio) -> io::Result<Stdio>215 pub(crate) fn convert_to_stdio(io: ChildStdio) -> io::Result<Stdio> {
216     convert_to_blocking_file(io).map(Stdio::from)
217 }
218 
219 impl Source for Pipe {
register( &mut self, registry: &mio::Registry, token: mio::Token, interest: mio::Interest, ) -> io::Result<()>220     fn register(
221         &mut self,
222         registry: &mio::Registry,
223         token: mio::Token,
224         interest: mio::Interest,
225     ) -> io::Result<()> {
226         SourceFd(&self.as_raw_fd()).register(registry, token, interest)
227     }
228 
reregister( &mut self, registry: &mio::Registry, token: mio::Token, interest: mio::Interest, ) -> io::Result<()>229     fn reregister(
230         &mut self,
231         registry: &mio::Registry,
232         token: mio::Token,
233         interest: mio::Interest,
234     ) -> io::Result<()> {
235         SourceFd(&self.as_raw_fd()).reregister(registry, token, interest)
236     }
237 
deregister(&mut self, registry: &mio::Registry) -> io::Result<()>238     fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
239         SourceFd(&self.as_raw_fd()).deregister(registry)
240     }
241 }
242 
243 pub(crate) struct ChildStdio {
244     inner: PollEvented<Pipe>,
245 }
246 
247 impl ChildStdio {
into_owned_fd(self) -> io::Result<OwnedFd>248     pub(super) fn into_owned_fd(self) -> io::Result<OwnedFd> {
249         convert_to_blocking_file(self).map(OwnedFd::from)
250     }
251 }
252 
253 impl fmt::Debug for ChildStdio {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result254     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
255         self.inner.fmt(fmt)
256     }
257 }
258 
259 impl AsRawFd for ChildStdio {
as_raw_fd(&self) -> RawFd260     fn as_raw_fd(&self) -> RawFd {
261         self.inner.as_raw_fd()
262     }
263 }
264 
265 impl AsFd for ChildStdio {
as_fd(&self) -> BorrowedFd<'_>266     fn as_fd(&self) -> BorrowedFd<'_> {
267         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
268     }
269 }
270 
271 impl AsyncWrite for ChildStdio {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>272     fn poll_write(
273         self: Pin<&mut Self>,
274         cx: &mut Context<'_>,
275         buf: &[u8],
276     ) -> Poll<io::Result<usize>> {
277         self.inner.poll_write(cx, buf)
278     }
279 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>280     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
281         Poll::Ready(Ok(()))
282     }
283 
poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>284     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
285         Poll::Ready(Ok(()))
286     }
287 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>288     fn poll_write_vectored(
289         self: Pin<&mut Self>,
290         cx: &mut Context<'_>,
291         bufs: &[io::IoSlice<'_>],
292     ) -> Poll<Result<usize, io::Error>> {
293         self.inner.poll_write_vectored(cx, bufs)
294     }
295 
is_write_vectored(&self) -> bool296     fn is_write_vectored(&self) -> bool {
297         true
298     }
299 }
300 
301 impl AsyncRead for ChildStdio {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>302     fn poll_read(
303         self: Pin<&mut Self>,
304         cx: &mut Context<'_>,
305         buf: &mut ReadBuf<'_>,
306     ) -> Poll<io::Result<()>> {
307         // Safety: pipes support reading into uninitialized memory
308         unsafe { self.inner.poll_read(cx, buf) }
309     }
310 }
311 
set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()>312 fn set_nonblocking<T: AsRawFd>(fd: &mut T, nonblocking: bool) -> io::Result<()> {
313     unsafe {
314         let fd = fd.as_raw_fd();
315         let previous = libc::fcntl(fd, libc::F_GETFL);
316         if previous == -1 {
317             return Err(io::Error::last_os_error());
318         }
319 
320         let new = if nonblocking {
321             previous | libc::O_NONBLOCK
322         } else {
323             previous & !libc::O_NONBLOCK
324         };
325 
326         let r = libc::fcntl(fd, libc::F_SETFL, new);
327         if r == -1 {
328             return Err(io::Error::last_os_error());
329         }
330     }
331 
332     Ok(())
333 }
334 
stdio<T>(io: T) -> io::Result<ChildStdio> where T: IntoRawFd,335 pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
336 where
337     T: IntoRawFd,
338 {
339     // Set the fd to nonblocking before we pass it to the event loop
340     let mut pipe = Pipe::from(io);
341     set_nonblocking(&mut pipe, true)?;
342 
343     PollEvented::new(pipe).map(|inner| ChildStdio { inner })
344 }
345