//! Windows asynchronous process handling. //! //! Like with Unix we don't actually have a way of registering a process with an //! IOCP object. As a result we similarly need another mechanism for getting a //! signal when a process has exited. For now this is implemented with the //! `RegisterWaitForSingleObject` function in the kernel32.dll. //! //! This strategy is the same that libuv takes and essentially just queues up a //! wait for the process in a kernel32-specific thread pool. Once the object is //! notified (e.g. the process exits) then we have a callback that basically //! just completes a `Oneshot`. //! //! The `poll_exit` implementation will attempt to wait for the process in a //! nonblocking fashion, but failing that it'll fire off a //! `RegisterWaitForSingleObject` and then wait on the other end of the oneshot //! from then on out. use crate::io::{blocking::Blocking, AsyncRead, AsyncWrite, ReadBuf}; use crate::process::kill::Kill; use crate::process::SpawnedChild; use crate::sync::oneshot; use std::fmt; use std::fs::File as StdFile; use std::future::Future; use std::io; use std::os::windows::prelude::{AsRawHandle, IntoRawHandle, RawHandle}; use std::pin::Pin; use std::process::Stdio; use std::process::{Child as StdChild, Command as StdCommand, ExitStatus}; use std::sync::Arc; use std::task::{Context, Poll}; use windows_sys::{ Win32::Foundation::{ DuplicateHandle, BOOLEAN, DUPLICATE_SAME_ACCESS, HANDLE, INVALID_HANDLE_VALUE, }, Win32::System::Threading::{ GetCurrentProcess, RegisterWaitForSingleObject, UnregisterWaitEx, WT_EXECUTEINWAITTHREAD, WT_EXECUTEONLYONCE, }, Win32::System::WindowsProgramming::INFINITE, }; #[must_use = "futures do nothing unless polled"] pub(crate) struct Child { child: StdChild, waiting: Option, } impl fmt::Debug for Child { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Child") .field("pid", &self.id()) .field("child", &self.child) .field("waiting", &"..") .finish() } } struct Waiting { rx: oneshot::Receiver<()>, wait_object: HANDLE, tx: *mut Option>, } unsafe impl Sync for Waiting {} unsafe impl Send for Waiting {} pub(crate) fn spawn_child(cmd: &mut StdCommand) -> io::Result { let mut child = cmd.spawn()?; let stdin = child.stdin.take().map(stdio).transpose()?; let stdout = child.stdout.take().map(stdio).transpose()?; let stderr = child.stderr.take().map(stdio).transpose()?; Ok(SpawnedChild { child: Child { child, waiting: None, }, stdin, stdout, stderr, }) } impl Child { pub(crate) fn id(&self) -> u32 { self.child.id() } pub(crate) fn try_wait(&mut self) -> io::Result> { self.child.try_wait() } } impl Kill for Child { fn kill(&mut self) -> io::Result<()> { self.child.kill() } } impl Future for Child { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let inner = Pin::get_mut(self); loop { if let Some(ref mut w) = inner.waiting { match Pin::new(&mut w.rx).poll(cx) { Poll::Ready(Ok(())) => {} Poll::Ready(Err(_)) => panic!("should not be canceled"), Poll::Pending => return Poll::Pending, } let status = inner.try_wait()?.expect("not ready yet"); return Poll::Ready(Ok(status)); } if let Some(e) = inner.try_wait()? { return Poll::Ready(Ok(e)); } let (tx, rx) = oneshot::channel(); let ptr = Box::into_raw(Box::new(Some(tx))); let mut wait_object = 0; let rc = unsafe { RegisterWaitForSingleObject( &mut wait_object, inner.child.as_raw_handle() as _, Some(callback), ptr as *mut _, INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE, ) }; if rc == 0 { let err = io::Error::last_os_error(); drop(unsafe { Box::from_raw(ptr) }); return Poll::Ready(Err(err)); } inner.waiting = Some(Waiting { rx, wait_object, tx: ptr, }); } } } impl AsRawHandle for Child { fn as_raw_handle(&self) -> RawHandle { self.child.as_raw_handle() } } impl Drop for Waiting { fn drop(&mut self) { unsafe { let rc = UnregisterWaitEx(self.wait_object, INVALID_HANDLE_VALUE); if rc == 0 { panic!("failed to unregister: {}", io::Error::last_os_error()); } drop(Box::from_raw(self.tx)); } } } unsafe extern "system" fn callback(ptr: *mut std::ffi::c_void, _timer_fired: BOOLEAN) { let complete = &mut *(ptr as *mut Option>); let _ = complete.take().unwrap().send(()); } #[derive(Debug)] struct ArcFile(Arc); impl io::Read for ArcFile { fn read(&mut self, bytes: &mut [u8]) -> io::Result { (&*self.0).read(bytes) } } impl io::Write for ArcFile { fn write(&mut self, bytes: &[u8]) -> io::Result { (&*self.0).write(bytes) } fn flush(&mut self) -> io::Result<()> { (&*self.0).flush() } } #[derive(Debug)] pub(crate) struct ChildStdio { // Used for accessing the raw handle, even if the io version is busy raw: Arc, // For doing I/O operations asynchronously io: Blocking, } impl AsRawHandle for ChildStdio { fn as_raw_handle(&self) -> RawHandle { self.raw.as_raw_handle() } } impl AsyncRead for ChildStdio { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { Pin::new(&mut self.io).poll_read(cx, buf) } } impl AsyncWrite for ChildStdio { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.io).poll_write(cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_flush(cx) } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_shutdown(cx) } } pub(super) fn stdio(io: T) -> io::Result where T: IntoRawHandle, { use std::os::windows::prelude::FromRawHandle; let raw = Arc::new(unsafe { StdFile::from_raw_handle(io.into_raw_handle()) }); let io = Blocking::new(ArcFile(raw.clone())); Ok(ChildStdio { raw, io }) } pub(crate) fn convert_to_stdio(child_stdio: ChildStdio) -> io::Result { let ChildStdio { raw, io } = child_stdio; drop(io); // Try to drop the Arc count here Arc::try_unwrap(raw) .or_else(|raw| duplicate_handle(&*raw)) .map(Stdio::from) } fn duplicate_handle(io: &T) -> io::Result { use std::os::windows::prelude::FromRawHandle; unsafe { let mut dup_handle = INVALID_HANDLE_VALUE; let cur_proc = GetCurrentProcess(); let status = DuplicateHandle( cur_proc, io.as_raw_handle() as _, cur_proc, &mut dup_handle, 0, 0, DUPLICATE_SAME_ACCESS, ); if status == 0 { return Err(io::Error::last_os_error()); } Ok(StdFile::from_raw_handle(dup_handle as _)) } }