• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Windows asynchronous process handling.
2 //!
3 //! Like with Unix we don't actually have a way of registering a process with an
4 //! IOCP object. As a result we similarly need another mechanism for getting a
5 //! signal when a process has exited. For now this is implemented with the
6 //! `RegisterWaitForSingleObject` function in the kernel32.dll.
7 //!
8 //! This strategy is the same that libuv takes and essentially just queues up a
9 //! wait for the process in a kernel32-specific thread pool. Once the object is
10 //! notified (e.g. the process exits) then we have a callback that basically
11 //! just completes a `Oneshot`.
12 //!
13 //! The `poll_exit` implementation will attempt to wait for the process in a
14 //! nonblocking fashion, but failing that it'll fire off a
15 //! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot
16 //! from then on out.
17 
18 use crate::io::{blocking::Blocking, AsyncRead, AsyncWrite, ReadBuf};
19 use crate::process::kill::Kill;
20 use crate::process::SpawnedChild;
21 use crate::sync::oneshot;
22 
23 use std::fmt;
24 use std::fs::File as StdFile;
25 use std::future::Future;
26 use std::io;
27 use std::os::windows::prelude::{AsRawHandle, IntoRawHandle, RawHandle};
28 use std::pin::Pin;
29 use std::process::Stdio;
30 use std::process::{Child as StdChild, Command as StdCommand, ExitStatus};
31 use std::sync::Arc;
32 use std::task::{Context, Poll};
33 
34 use windows_sys::{
35     Win32::Foundation::{
36         DuplicateHandle, BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, INVALID_HANDLE_VALUE,
37     },
38     Win32::System::Threading::{
39         GetCurrentProcess, RegisterWaitForSingleObject, UnregisterWaitEx, WT_EXECUTEINWAITTHREAD,
40         WT_EXECUTEONLYONCE,
41     },
42     Win32::System::WindowsProgramming::INFINITE,
43 };
44 
45 #[must_use = "futures do nothing unless polled"]
46 pub(crate) struct Child {
47     child: StdChild,
48     waiting: Option<Waiting>,
49 }
50 
51 impl fmt::Debug for Child {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result52     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
53         fmt.debug_struct("Child")
54             .field("pid", &self.id())
55             .field("child", &self.child)
56             .field("waiting", &"..")
57             .finish()
58     }
59 }
60 
61 struct Waiting {
62     rx: oneshot::Receiver<()>,
63     wait_object: HANDLE,
64     tx: *mut Option<oneshot::Sender<()>>,
65 }
66 
67 unsafe impl Sync for Waiting {}
68 unsafe impl Send for Waiting {}
69 
spawn_child(cmd: &mut StdCommand) -> io::Result<SpawnedChild>70 pub(crate) fn spawn_child(cmd: &mut StdCommand) -> io::Result<SpawnedChild> {
71     let mut child = cmd.spawn()?;
72     let stdin = child.stdin.take().map(stdio).transpose()?;
73     let stdout = child.stdout.take().map(stdio).transpose()?;
74     let stderr = child.stderr.take().map(stdio).transpose()?;
75 
76     Ok(SpawnedChild {
77         child: Child {
78             child,
79             waiting: None,
80         },
81         stdin,
82         stdout,
83         stderr,
84     })
85 }
86 
87 impl Child {
id(&self) -> u3288     pub(crate) fn id(&self) -> u32 {
89         self.child.id()
90     }
91 
try_wait(&mut self) -> io::Result<Option<ExitStatus>>92     pub(crate) fn try_wait(&mut self) -> io::Result<Option<ExitStatus>> {
93         self.child.try_wait()
94     }
95 }
96 
97 impl Kill for Child {
kill(&mut self) -> io::Result<()>98     fn kill(&mut self) -> io::Result<()> {
99         self.child.kill()
100     }
101 }
102 
103 impl Future for Child {
104     type Output = io::Result<ExitStatus>;
105 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>106     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
107         let inner = Pin::get_mut(self);
108         loop {
109             if let Some(ref mut w) = inner.waiting {
110                 match Pin::new(&mut w.rx).poll(cx) {
111                     Poll::Ready(Ok(())) => {}
112                     Poll::Ready(Err(_)) => panic!("should not be canceled"),
113                     Poll::Pending => return Poll::Pending,
114                 }
115                 let status = inner.try_wait()?.expect("not ready yet");
116                 return Poll::Ready(Ok(status));
117             }
118 
119             if let Some(e) = inner.try_wait()? {
120                 return Poll::Ready(Ok(e));
121             }
122             let (tx, rx) = oneshot::channel();
123             let ptr = Box::into_raw(Box::new(Some(tx)));
124             let mut wait_object = 0;
125             let rc = unsafe {
126                 RegisterWaitForSingleObject(
127                     &mut wait_object,
128                     inner.child.as_raw_handle() as _,
129                     Some(callback),
130                     ptr as *mut _,
131                     INFINITE,
132                     WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE,
133                 )
134             };
135             if rc == 0 {
136                 let err = io::Error::last_os_error();
137                 drop(unsafe { Box::from_raw(ptr) });
138                 return Poll::Ready(Err(err));
139             }
140             inner.waiting = Some(Waiting {
141                 rx,
142                 wait_object,
143                 tx: ptr,
144             });
145         }
146     }
147 }
148 
149 impl AsRawHandle for Child {
as_raw_handle(&self) -> RawHandle150     fn as_raw_handle(&self) -> RawHandle {
151         self.child.as_raw_handle()
152     }
153 }
154 
155 impl Drop for Waiting {
drop(&mut self)156     fn drop(&mut self) {
157         unsafe {
158             let rc = UnregisterWaitEx(self.wait_object, INVALID_HANDLE_VALUE);
159             if rc == 0 {
160                 panic!("failed to unregister: {}", io::Error::last_os_error());
161             }
162             drop(Box::from_raw(self.tx));
163         }
164     }
165 }
166 
callback(ptr: *mut std::ffi::c_void, _timer_fired: BOOLEAN)167 unsafe extern "system" fn callback(ptr: *mut std::ffi::c_void, _timer_fired: BOOLEAN) {
168     let complete = &mut *(ptr as *mut Option<oneshot::Sender<()>>);
169     let _ = complete.take().unwrap().send(());
170 }
171 
172 #[derive(Debug)]
173 struct ArcFile(Arc<StdFile>);
174 
175 impl io::Read for ArcFile {
read(&mut self, bytes: &mut [u8]) -> io::Result<usize>176     fn read(&mut self, bytes: &mut [u8]) -> io::Result<usize> {
177         (&*self.0).read(bytes)
178     }
179 }
180 
181 impl io::Write for ArcFile {
write(&mut self, bytes: &[u8]) -> io::Result<usize>182     fn write(&mut self, bytes: &[u8]) -> io::Result<usize> {
183         (&*self.0).write(bytes)
184     }
185 
flush(&mut self) -> io::Result<()>186     fn flush(&mut self) -> io::Result<()> {
187         (&*self.0).flush()
188     }
189 }
190 
191 #[derive(Debug)]
192 pub(crate) struct ChildStdio {
193     // Used for accessing the raw handle, even if the io version is busy
194     raw: Arc<StdFile>,
195     // For doing I/O operations asynchronously
196     io: Blocking<ArcFile>,
197 }
198 
199 impl AsRawHandle for ChildStdio {
as_raw_handle(&self) -> RawHandle200     fn as_raw_handle(&self) -> RawHandle {
201         self.raw.as_raw_handle()
202     }
203 }
204 
205 impl AsyncRead for ChildStdio {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>206     fn poll_read(
207         mut self: Pin<&mut Self>,
208         cx: &mut Context<'_>,
209         buf: &mut ReadBuf<'_>,
210     ) -> Poll<io::Result<()>> {
211         Pin::new(&mut self.io).poll_read(cx, buf)
212     }
213 }
214 
215 impl AsyncWrite for ChildStdio {
poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>216     fn poll_write(
217         mut self: Pin<&mut Self>,
218         cx: &mut Context<'_>,
219         buf: &[u8],
220     ) -> Poll<io::Result<usize>> {
221         Pin::new(&mut self.io).poll_write(cx, buf)
222     }
223 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>224     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
225         Pin::new(&mut self.io).poll_flush(cx)
226     }
227 
poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>228     fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
229         Pin::new(&mut self.io).poll_shutdown(cx)
230     }
231 }
232 
stdio<T>(io: T) -> io::Result<ChildStdio> where T: IntoRawHandle,233 pub(super) fn stdio<T>(io: T) -> io::Result<ChildStdio>
234 where
235     T: IntoRawHandle,
236 {
237     use std::os::windows::prelude::FromRawHandle;
238 
239     let raw = Arc::new(unsafe { StdFile::from_raw_handle(io.into_raw_handle()) });
240     let io = Blocking::new(ArcFile(raw.clone()));
241     Ok(ChildStdio { raw, io })
242 }
243 
convert_to_stdio(child_stdio: ChildStdio) -> io::Result<Stdio>244 pub(crate) fn convert_to_stdio(child_stdio: ChildStdio) -> io::Result<Stdio> {
245     let ChildStdio { raw, io } = child_stdio;
246     drop(io); // Try to drop the Arc count here
247 
248     Arc::try_unwrap(raw)
249         .or_else(|raw| duplicate_handle(&*raw))
250         .map(Stdio::from)
251 }
252 
duplicate_handle<T: AsRawHandle>(io: &T) -> io::Result<StdFile>253 fn duplicate_handle<T: AsRawHandle>(io: &T) -> io::Result<StdFile> {
254     use std::os::windows::prelude::FromRawHandle;
255 
256     unsafe {
257         let mut dup_handle = INVALID_HANDLE_VALUE;
258         let cur_proc = GetCurrentProcess();
259 
260         let status = DuplicateHandle(
261             cur_proc,
262             io.as_raw_handle() as _,
263             cur_proc,
264             &mut dup_handle,
265             0,
266             0,
267             DUPLICATE_SAME_ACCESS,
268         );
269 
270         if status == 0 {
271             return Err(io::Error::last_os_error());
272         }
273 
274         Ok(StdFile::from_raw_handle(dup_handle as _))
275     }
276 }
277