1 use crate::os::windows::prelude::*;
2
3 use crate::ffi::OsStr;
4 use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut, Read};
5 use crate::mem;
6 use crate::path::Path;
7 use crate::ptr;
8 use crate::slice;
9 use crate::sync::atomic::AtomicUsize;
10 use crate::sync::atomic::Ordering::SeqCst;
11 use crate::sys::c;
12 use crate::sys::fs::{File, OpenOptions};
13 use crate::sys::handle::Handle;
14 use crate::sys::hashmap_random_keys;
15 use crate::sys_common::IntoInner;
16
17 ////////////////////////////////////////////////////////////////////////////////
18 // Anonymous pipes
19 ////////////////////////////////////////////////////////////////////////////////
20
21 pub struct AnonPipe {
22 inner: Handle,
23 }
24
25 impl IntoInner<Handle> for AnonPipe {
into_inner(self) -> Handle26 fn into_inner(self) -> Handle {
27 self.inner
28 }
29 }
30
31 pub struct Pipes {
32 pub ours: AnonPipe,
33 pub theirs: AnonPipe,
34 }
35
36 /// Although this looks similar to `anon_pipe` in the Unix module it's actually
37 /// subtly different. Here we'll return two pipes in the `Pipes` return value,
38 /// but one is intended for "us" where as the other is intended for "someone
39 /// else".
40 ///
41 /// Currently the only use case for this function is pipes for stdio on
42 /// processes in the standard library, so "ours" is the one that'll stay in our
43 /// process whereas "theirs" will be inherited to a child.
44 ///
45 /// The ours/theirs pipes are *not* specifically readable or writable. Each
46 /// one only supports a read or a write, but which is which depends on the
47 /// boolean flag given. If `ours_readable` is `true`, then `ours` is readable and
48 /// `theirs` is writable. Conversely, if `ours_readable` is `false`, then `ours`
49 /// is writable and `theirs` is readable.
50 ///
51 /// Also note that the `ours` pipe is always a handle opened up in overlapped
52 /// mode. This means that technically speaking it should only ever be used
53 /// with `OVERLAPPED` instances, but also works out ok if it's only ever used
54 /// once at a time (which we do indeed guarantee).
anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes>55 pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
56 // A 64kb pipe capacity is the same as a typical Linux default.
57 const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;
58
59 // Note that we specifically do *not* use `CreatePipe` here because
60 // unfortunately the anonymous pipes returned do not support overlapped
61 // operations. Instead, we create a "hopefully unique" name and create a
62 // named pipe which has overlapped operations enabled.
63 //
64 // Once we do this, we connect do it as usual via `CreateFileW`, and then
65 // we return those reader/writer halves. Note that the `ours` pipe return
66 // value is always the named pipe, whereas `theirs` is just the normal file.
67 // This should hopefully shield us from child processes which assume their
68 // stdout is a named pipe, which would indeed be odd!
69 unsafe {
70 let ours;
71 let mut name;
72 let mut tries = 0;
73 let mut reject_remote_clients_flag = c::PIPE_REJECT_REMOTE_CLIENTS;
74 loop {
75 tries += 1;
76 name = format!(
77 r"\\.\pipe\__rust_anonymous_pipe1__.{}.{}",
78 c::GetCurrentProcessId(),
79 random_number()
80 );
81 let wide_name = OsStr::new(&name).encode_wide().chain(Some(0)).collect::<Vec<_>>();
82 let mut flags = c::FILE_FLAG_FIRST_PIPE_INSTANCE | c::FILE_FLAG_OVERLAPPED;
83 if ours_readable {
84 flags |= c::PIPE_ACCESS_INBOUND;
85 } else {
86 flags |= c::PIPE_ACCESS_OUTBOUND;
87 }
88
89 let handle = c::CreateNamedPipeW(
90 wide_name.as_ptr(),
91 flags,
92 c::PIPE_TYPE_BYTE
93 | c::PIPE_READMODE_BYTE
94 | c::PIPE_WAIT
95 | reject_remote_clients_flag,
96 1,
97 PIPE_BUFFER_CAPACITY,
98 PIPE_BUFFER_CAPACITY,
99 0,
100 ptr::null_mut(),
101 );
102
103 // We pass the `FILE_FLAG_FIRST_PIPE_INSTANCE` flag above, and we're
104 // also just doing a best effort at selecting a unique name. If
105 // `ERROR_ACCESS_DENIED` is returned then it could mean that we
106 // accidentally conflicted with an already existing pipe, so we try
107 // again.
108 //
109 // Don't try again too much though as this could also perhaps be a
110 // legit error.
111 // If `ERROR_INVALID_PARAMETER` is returned, this probably means we're
112 // running on pre-Vista version where `PIPE_REJECT_REMOTE_CLIENTS` is
113 // not supported, so we continue retrying without it. This implies
114 // reduced security on Windows versions older than Vista by allowing
115 // connections to this pipe from remote machines.
116 // Proper fix would increase the number of FFI imports and introduce
117 // significant amount of Windows XP specific code with no clean
118 // testing strategy
119 // For more info, see https://github.com/rust-lang/rust/pull/37677.
120 if handle == c::INVALID_HANDLE_VALUE {
121 let err = io::Error::last_os_error();
122 let raw_os_err = err.raw_os_error();
123 if tries < 10 {
124 if raw_os_err == Some(c::ERROR_ACCESS_DENIED as i32) {
125 continue;
126 } else if reject_remote_clients_flag != 0
127 && raw_os_err == Some(c::ERROR_INVALID_PARAMETER as i32)
128 {
129 reject_remote_clients_flag = 0;
130 tries -= 1;
131 continue;
132 }
133 }
134 return Err(err);
135 }
136 ours = Handle::from_raw_handle(handle);
137 break;
138 }
139
140 // Connect to the named pipe we just created. This handle is going to be
141 // returned in `theirs`, so if `ours` is readable we want this to be
142 // writable, otherwise if `ours` is writable we want this to be
143 // readable.
144 //
145 // Additionally we don't enable overlapped mode on this because most
146 // client processes aren't enabled to work with that.
147 let mut opts = OpenOptions::new();
148 opts.write(ours_readable);
149 opts.read(!ours_readable);
150 opts.share_mode(0);
151 let size = mem::size_of::<c::SECURITY_ATTRIBUTES>();
152 let mut sa = c::SECURITY_ATTRIBUTES {
153 nLength: size as c::DWORD,
154 lpSecurityDescriptor: ptr::null_mut(),
155 bInheritHandle: their_handle_inheritable as i32,
156 };
157 opts.security_attributes(&mut sa);
158 let theirs = File::open(Path::new(&name), &opts)?;
159 let theirs = AnonPipe { inner: theirs.into_inner() };
160
161 Ok(Pipes {
162 ours: AnonPipe { inner: ours },
163 theirs: AnonPipe { inner: theirs.into_inner() },
164 })
165 }
166 }
167
168 /// Takes an asynchronous source pipe and returns a synchronous pipe suitable
169 /// for sending to a child process.
170 ///
171 /// This is achieved by creating a new set of pipes and spawning a thread that
172 /// relays messages between the source and the synchronous pipe.
spawn_pipe_relay( source: &AnonPipe, ours_readable: bool, their_handle_inheritable: bool, ) -> io::Result<AnonPipe>173 pub fn spawn_pipe_relay(
174 source: &AnonPipe,
175 ours_readable: bool,
176 their_handle_inheritable: bool,
177 ) -> io::Result<AnonPipe> {
178 // We need this handle to live for the lifetime of the thread spawned below.
179 let source = source.duplicate()?;
180
181 // create a new pair of anon pipes.
182 let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?;
183
184 // Spawn a thread that passes messages from one pipe to the other.
185 // Any errors will simply cause the thread to exit.
186 let (reader, writer) = if ours_readable { (ours, source) } else { (source, ours) };
187 crate::thread::spawn(move || {
188 let mut buf = [0_u8; 4096];
189 'reader: while let Ok(len) = reader.read(&mut buf) {
190 if len == 0 {
191 break;
192 }
193 let mut start = 0;
194 while let Ok(written) = writer.write(&buf[start..len]) {
195 start += written;
196 if start == len {
197 continue 'reader;
198 }
199 }
200 break;
201 }
202 });
203
204 // Return the pipe that should be sent to the child process.
205 Ok(theirs)
206 }
207
random_number() -> usize208 fn random_number() -> usize {
209 static N: AtomicUsize = AtomicUsize::new(0);
210 loop {
211 if N.load(SeqCst) != 0 {
212 return N.fetch_add(1, SeqCst);
213 }
214
215 N.store(hashmap_random_keys().0 as usize, SeqCst);
216 }
217 }
218
219 // Abstracts over `ReadFileEx` and `WriteFileEx`
220 type AlertableIoFn = unsafe extern "system" fn(
221 BorrowedHandle<'_>,
222 c::LPVOID,
223 c::DWORD,
224 c::LPOVERLAPPED,
225 c::LPOVERLAPPED_COMPLETION_ROUTINE,
226 ) -> c::BOOL;
227
228 impl AnonPipe {
handle(&self) -> &Handle229 pub fn handle(&self) -> &Handle {
230 &self.inner
231 }
into_handle(self) -> Handle232 pub fn into_handle(self) -> Handle {
233 self.inner
234 }
duplicate(&self) -> io::Result<Self>235 fn duplicate(&self) -> io::Result<Self> {
236 self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| AnonPipe { inner })
237 }
238
read(&self, buf: &mut [u8]) -> io::Result<usize>239 pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
240 let result = unsafe {
241 let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
242 self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
243 };
244
245 match result {
246 // The special treatment of BrokenPipe is to deal with Windows
247 // pipe semantics, which yields this error when *reading* from
248 // a pipe after the other end has closed; we interpret that as
249 // EOF on the pipe.
250 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
251 _ => result,
252 }
253 }
254
read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()>255 pub fn read_buf(&self, mut buf: BorrowedCursor<'_>) -> io::Result<()> {
256 let result = unsafe {
257 let len = crate::cmp::min(buf.capacity(), c::DWORD::MAX as usize) as c::DWORD;
258 self.alertable_io_internal(c::ReadFileEx, buf.as_mut().as_mut_ptr() as _, len)
259 };
260
261 match result {
262 // The special treatment of BrokenPipe is to deal with Windows
263 // pipe semantics, which yields this error when *reading* from
264 // a pipe after the other end has closed; we interpret that as
265 // EOF on the pipe.
266 Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(()),
267 Err(e) => Err(e),
268 Ok(n) => {
269 unsafe {
270 buf.advance(n);
271 }
272 Ok(())
273 }
274 }
275 }
276
read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>277 pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
278 self.inner.read_vectored(bufs)
279 }
280
281 #[inline]
is_read_vectored(&self) -> bool282 pub fn is_read_vectored(&self) -> bool {
283 self.inner.is_read_vectored()
284 }
285
read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize>286 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
287 self.handle().read_to_end(buf)
288 }
289
write(&self, buf: &[u8]) -> io::Result<usize>290 pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
291 unsafe {
292 let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
293 self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
294 }
295 }
296
write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize>297 pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
298 self.inner.write_vectored(bufs)
299 }
300
301 #[inline]
is_write_vectored(&self) -> bool302 pub fn is_write_vectored(&self) -> bool {
303 self.inner.is_write_vectored()
304 }
305
306 /// Synchronizes asynchronous reads or writes using our anonymous pipe.
307 ///
308 /// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
309 /// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
310 ///
311 /// Note: This should not be used for handles we don't create.
312 ///
313 /// # Safety
314 ///
315 /// `buf` must be a pointer to a buffer that's valid for reads or writes
316 /// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
317 ///
318 /// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
319 /// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
320 /// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
alertable_io_internal( &self, io: AlertableIoFn, buf: c::LPVOID, len: c::DWORD, ) -> io::Result<usize>321 unsafe fn alertable_io_internal(
322 &self,
323 io: AlertableIoFn,
324 buf: c::LPVOID,
325 len: c::DWORD,
326 ) -> io::Result<usize> {
327 // Use "alertable I/O" to synchronize the pipe I/O.
328 // This has four steps.
329 //
330 // STEP 1: Start the asynchronous I/O operation.
331 // This simply calls either `ReadFileEx` or `WriteFileEx`,
332 // giving it a pointer to the buffer and callback function.
333 //
334 // STEP 2: Enter an alertable state.
335 // The callback set in step 1 will not be called until the thread
336 // enters an "alertable" state. This can be done using `SleepEx`.
337 //
338 // STEP 3: The callback
339 // Once the I/O is complete and the thread is in an alertable state,
340 // the callback will be run on the same thread as the call to
341 // `ReadFileEx` or `WriteFileEx` done in step 1.
342 // In the callback we simply set the result of the async operation.
343 //
344 // STEP 4: Return the result.
345 // At this point we'll have a result from the callback function
346 // and can simply return it. Note that we must not return earlier,
347 // while the I/O is still in progress.
348
349 // The result that will be set from the asynchronous callback.
350 let mut async_result: Option<AsyncResult> = None;
351 struct AsyncResult {
352 error: u32,
353 transferred: u32,
354 }
355
356 // STEP 3: The callback.
357 unsafe extern "system" fn callback(
358 dwErrorCode: u32,
359 dwNumberOfBytesTransferred: u32,
360 lpOverlapped: *mut c::OVERLAPPED,
361 ) {
362 // Set `async_result` using a pointer smuggled through `hEvent`.
363 let result =
364 AsyncResult { error: dwErrorCode, transferred: dwNumberOfBytesTransferred };
365 *(*lpOverlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
366 }
367
368 // STEP 1: Start the I/O operation.
369 let mut overlapped: c::OVERLAPPED = crate::mem::zeroed();
370 // `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
371 // Therefore the documentation suggests using it to smuggle a pointer to the callback.
372 overlapped.hEvent = &mut async_result as *mut _ as *mut _;
373
374 // Asynchronous read of the pipe.
375 // If successful, `callback` will be called once it completes.
376 let result = io(self.inner.as_handle(), buf, len, &mut overlapped, Some(callback));
377 if result == c::FALSE {
378 // We can return here because the call failed.
379 // After this we must not return until the I/O completes.
380 return Err(io::Error::last_os_error());
381 }
382
383 // Wait indefinitely for the result.
384 let result = loop {
385 // STEP 2: Enter an alertable state.
386 // The second parameter of `SleepEx` is used to make this sleep alertable.
387 c::SleepEx(c::INFINITE, c::TRUE);
388 if let Some(result) = async_result {
389 break result;
390 }
391 };
392 // STEP 4: Return the result.
393 // `async_result` is always `Some` at this point
394 match result.error {
395 c::ERROR_SUCCESS => Ok(result.transferred as usize),
396 error => Err(io::Error::from_raw_os_error(error as _)),
397 }
398 }
399 }
400
read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()>401 pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
402 let p1 = p1.into_handle();
403 let p2 = p2.into_handle();
404
405 let mut p1 = AsyncPipe::new(p1, v1)?;
406 let mut p2 = AsyncPipe::new(p2, v2)?;
407 let objs = [p1.event.as_raw_handle(), p2.event.as_raw_handle()];
408
409 // In a loop we wait for either pipe's scheduled read operation to complete.
410 // If the operation completes with 0 bytes, that means EOF was reached, in
411 // which case we just finish out the other pipe entirely.
412 //
413 // Note that overlapped I/O is in general super unsafe because we have to
414 // be careful to ensure that all pointers in play are valid for the entire
415 // duration of the I/O operation (where tons of operations can also fail).
416 // The destructor for `AsyncPipe` ends up taking care of most of this.
417 loop {
418 let res = unsafe { c::WaitForMultipleObjects(2, objs.as_ptr(), c::FALSE, c::INFINITE) };
419 if res == c::WAIT_OBJECT_0 {
420 if !p1.result()? || !p1.schedule_read()? {
421 return p2.finish();
422 }
423 } else if res == c::WAIT_OBJECT_0 + 1 {
424 if !p2.result()? || !p2.schedule_read()? {
425 return p1.finish();
426 }
427 } else {
428 return Err(io::Error::last_os_error());
429 }
430 }
431 }
432
433 struct AsyncPipe<'a> {
434 pipe: Handle,
435 event: Handle,
436 overlapped: Box<c::OVERLAPPED>, // needs a stable address
437 dst: &'a mut Vec<u8>,
438 state: State,
439 }
440
441 #[derive(PartialEq, Debug)]
442 enum State {
443 NotReading,
444 Reading,
445 Read(usize),
446 }
447
448 impl<'a> AsyncPipe<'a> {
new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>>449 fn new(pipe: Handle, dst: &'a mut Vec<u8>) -> io::Result<AsyncPipe<'a>> {
450 // Create an event which we'll use to coordinate our overlapped
451 // operations, this event will be used in WaitForMultipleObjects
452 // and passed as part of the OVERLAPPED handle.
453 //
454 // Note that we do a somewhat clever thing here by flagging the
455 // event as being manually reset and setting it initially to the
456 // signaled state. This means that we'll naturally fall through the
457 // WaitForMultipleObjects call above for pipes created initially,
458 // and the only time an even will go back to "unset" will be once an
459 // I/O operation is successfully scheduled (what we want).
460 let event = Handle::new_event(true, true)?;
461 let mut overlapped: Box<c::OVERLAPPED> = unsafe { Box::new(mem::zeroed()) };
462 overlapped.hEvent = event.as_raw_handle();
463 Ok(AsyncPipe { pipe, overlapped, event, dst, state: State::NotReading })
464 }
465
466 /// Executes an overlapped read operation.
467 ///
468 /// Must not currently be reading, and returns whether the pipe is currently
469 /// at EOF or not. If the pipe is not at EOF then `result()` must be called
470 /// to complete the read later on (may block), but if the pipe is at EOF
471 /// then `result()` should not be called as it will just block forever.
schedule_read(&mut self) -> io::Result<bool>472 fn schedule_read(&mut self) -> io::Result<bool> {
473 assert_eq!(self.state, State::NotReading);
474 let amt = unsafe {
475 let slice = slice_to_end(self.dst);
476 self.pipe.read_overlapped(slice, &mut *self.overlapped)?
477 };
478
479 // If this read finished immediately then our overlapped event will
480 // remain signaled (it was signaled coming in here) and we'll progress
481 // down to the method below.
482 //
483 // Otherwise the I/O operation is scheduled and the system set our event
484 // to not signaled, so we flag ourselves into the reading state and move
485 // on.
486 self.state = match amt {
487 Some(0) => return Ok(false),
488 Some(amt) => State::Read(amt),
489 None => State::Reading,
490 };
491 Ok(true)
492 }
493
494 /// Wait for the result of the overlapped operation previously executed.
495 ///
496 /// Takes a parameter `wait` which indicates if this pipe is currently being
497 /// read whether the function should block waiting for the read to complete.
498 ///
499 /// Returns values:
500 ///
501 /// * `true` - finished any pending read and the pipe is not at EOF (keep
502 /// going)
503 /// * `false` - finished any pending read and pipe is at EOF (stop issuing
504 /// reads)
result(&mut self) -> io::Result<bool>505 fn result(&mut self) -> io::Result<bool> {
506 let amt = match self.state {
507 State::NotReading => return Ok(true),
508 State::Reading => self.pipe.overlapped_result(&mut *self.overlapped, true)?,
509 State::Read(amt) => amt,
510 };
511 self.state = State::NotReading;
512 unsafe {
513 let len = self.dst.len();
514 self.dst.set_len(len + amt);
515 }
516 Ok(amt != 0)
517 }
518
519 /// Finishes out reading this pipe entirely.
520 ///
521 /// Waits for any pending and schedule read, and then calls `read_to_end`
522 /// if necessary to read all the remaining information.
finish(&mut self) -> io::Result<()>523 fn finish(&mut self) -> io::Result<()> {
524 while self.result()? && self.schedule_read()? {
525 // ...
526 }
527 Ok(())
528 }
529 }
530
531 impl<'a> Drop for AsyncPipe<'a> {
drop(&mut self)532 fn drop(&mut self) {
533 match self.state {
534 State::Reading => {}
535 _ => return,
536 }
537
538 // If we have a pending read operation, then we have to make sure that
539 // it's *done* before we actually drop this type. The kernel requires
540 // that the `OVERLAPPED` and buffer pointers are valid for the entire
541 // I/O operation.
542 //
543 // To do that, we call `CancelIo` to cancel any pending operation, and
544 // if that succeeds we wait for the overlapped result.
545 //
546 // If anything here fails, there's not really much we can do, so we leak
547 // the buffer/OVERLAPPED pointers to ensure we're at least memory safe.
548 if self.pipe.cancel_io().is_err() || self.result().is_err() {
549 let buf = mem::take(self.dst);
550 let overlapped = Box::new(unsafe { mem::zeroed() });
551 let overlapped = mem::replace(&mut self.overlapped, overlapped);
552 mem::forget((buf, overlapped));
553 }
554 }
555 }
556
slice_to_end(v: &mut Vec<u8>) -> &mut [u8]557 unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
558 if v.capacity() == 0 {
559 v.reserve(16);
560 }
561 if v.capacity() == v.len() {
562 v.reserve(1);
563 }
564 slice::from_raw_parts_mut(v.as_mut_ptr().add(v.len()), v.capacity() - v.len())
565 }
566