• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use rand::Rng;
6 use std::{
7     ffi::CString,
8     fs::OpenOptions,
9     io,
10     io::Result,
11     mem,
12     os::windows::fs::OpenOptionsExt,
13     process, ptr,
14     sync::atomic::{AtomicUsize, Ordering},
15 };
16 
17 use super::{Event, RawDescriptor};
18 use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, IntoRawDescriptor, SafeDescriptor};
19 use serde::{Deserialize, Serialize};
20 use win_util::{SecurityAttributes, SelfRelativeSecurityDescriptor};
21 use winapi::{
22     shared::{
23         minwindef::{DWORD, LPCVOID, LPVOID, TRUE},
24         winerror::{ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED},
25     },
26     um::{
27         errhandlingapi::GetLastError,
28         fileapi::{FlushFileBuffers, ReadFile, WriteFile},
29         handleapi::INVALID_HANDLE_VALUE,
30         ioapiset::{CancelIoEx, GetOverlappedResult},
31         minwinbase::OVERLAPPED,
32         namedpipeapi::{
33             ConnectNamedPipe, GetNamedPipeInfo, PeekNamedPipe, SetNamedPipeHandleState,
34         },
35         winbase::{
36             CreateNamedPipeA, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED,
37             PIPE_ACCESS_DUPLEX, PIPE_NOWAIT, PIPE_READMODE_BYTE, PIPE_READMODE_MESSAGE,
38             PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE, PIPE_WAIT,
39             SECURITY_IDENTIFICATION,
40         },
41     },
42 };
43 
44 /// The default buffer size for all named pipes in the system. If this size is too small, writers
45 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
46 ///
47 /// The general rule is this should be *at least* as big as the largest message, otherwise
48 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
49 /// crate::platform::StreamChannel, which expects to be able to make a complete write before releasing
50 /// a lock that the opposite side needs to complete a read. This means that if the buffer is too
51 /// small:
52 ///     * The writer can't complete its write and release the lock because the buffer is too small.
53 ///     * The reader can't start reading because the lock is held by the writer, so it can't
54 ///       relieve buffer pressure. Note that for message pipes, the reader couldn't do anything
55 ///       to help anyway, because a message mode pipe should NOT have a partial read (which is
56 ///       what we would need to relieve pressure).
57 ///     * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
58 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
59 
60 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
61 
62 /// Represents one end of a named pipe
63 #[derive(Serialize, Deserialize, Debug)]
64 pub struct PipeConnection {
65     handle: SafeDescriptor,
66     framing_mode: FramingMode,
67     blocking_mode: BlockingMode,
68 }
69 
70 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
71 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
72 pub struct OverlappedWrapper {
73     // Allocated on the heap so that the OVERLAPPED struct doesn't move when performing I/O
74     // operations.
75     overlapped: Box<OVERLAPPED>,
76     // This field prevents the event handle from being dropped too early and allows callers to
77     // be notified when a read or write overlapped operation has completed.
78     h_event: Option<Event>,
79     in_use: bool,
80 }
81 
82 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>83     pub fn get_h_event_ref(&self) -> Option<&Event> {
84         self.h_event.as_ref()
85     }
86 }
87 
88 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq)]
89 pub enum FramingMode {
90     Byte,
91     Message,
92 }
93 
94 impl FramingMode {
to_readmode(self) -> DWORD95     fn to_readmode(self) -> DWORD {
96         match self {
97             FramingMode::Message => PIPE_READMODE_MESSAGE,
98             FramingMode::Byte => PIPE_READMODE_BYTE,
99         }
100     }
101 
to_pipetype(self) -> DWORD102     fn to_pipetype(self) -> DWORD {
103         match self {
104             FramingMode::Message => PIPE_TYPE_MESSAGE,
105             FramingMode::Byte => PIPE_TYPE_BYTE,
106         }
107     }
108 }
109 
110 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug)]
111 pub enum BlockingMode {
112     /// Calls to read() block until data is received
113     Wait,
114     /// Calls to read() return immediately even if there is nothing read with error code 232
115     /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
116     ///
117     /// NOTE: This mode is discouraged by the Windows API documentation.
118     NoWait,
119 }
120 
121 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD122     fn from(blocking_mode: &BlockingMode) -> DWORD {
123         match blocking_mode {
124             BlockingMode::Wait => PIPE_WAIT,
125             BlockingMode::NoWait => PIPE_NOWAIT,
126         }
127     }
128 }
129 
130 /// Sets the handle state for a named pipe in a rust friendly way.
131 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>132 unsafe fn set_named_pipe_handle_state(
133     pipe_handle: RawDescriptor,
134     client_mode: &mut DWORD,
135 ) -> Result<()> {
136     // Safe when the pipe handle is open. Safety also requires checking the return value, which we
137     // do below.
138     let success_flag = SetNamedPipeHandleState(
139         /* hNamedPipe= */ pipe_handle,
140         /* lpMode= */ client_mode,
141         /* lpMaxCollectionCount= */ ptr::null_mut(),
142         /* lpCollectDataTimeout= */ ptr::null_mut(),
143     );
144     if success_flag == 0 {
145         Err(io::Error::last_os_error())
146     } else {
147         Ok(())
148     }
149 }
150 
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>151 pub fn pair(
152     framing_mode: &FramingMode,
153     blocking_mode: &BlockingMode,
154     timeout: u64,
155 ) -> Result<(PipeConnection, PipeConnection)> {
156     pair_with_buffer_size(
157         framing_mode,
158         blocking_mode,
159         timeout,
160         DEFAULT_BUFFER_SIZE,
161         false,
162     )
163 }
164 
165 /// Creates a pair of handles connected to either end of a duplex named pipe.
166 ///
167 /// The pipe created will have a semi-random name and a default set of security options that
168 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
169 /// remote clients, allow only a single server instance, and prevent impersonation by the server
170 /// end of the pipe.
171 ///
172 /// # Arguments
173 ///
174 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
175 ///                     automatically framed sequence of messages (Message). In message mode it's an
176 ///                     error to read fewer bytes than were sent in a message from the other end of
177 ///                     the pipe.
178 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
179 ///                     return immediately if there is nothing available (NoWait).
180 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds.
181 ///                     Setting this to zero will create sockets with the system
182 ///                     default timeout.
183 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
184 ///                     buffer automatically as needed, except in the case of NOWAIT pipes, where
185 ///                     it will just fail writes that don't fit in the buffer.
186 /// # Return value
187 ///
188 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
189 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>190 pub fn pair_with_buffer_size(
191     framing_mode: &FramingMode,
192     blocking_mode: &BlockingMode,
193     timeout: u64,
194     buffer_size: usize,
195     overlapped: bool,
196 ) -> Result<(PipeConnection, PipeConnection)> {
197     // Give the pipe a unique name to avoid accidental collisions
198     let pipe_name = format!(
199         r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
200         process::id(),
201         NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
202         rand::thread_rng().gen::<u32>(),
203     );
204 
205     let server_end = create_server_pipe(
206         &pipe_name,
207         framing_mode,
208         blocking_mode,
209         timeout,
210         buffer_size,
211         overlapped,
212     )?;
213 
214     // Open the named pipe we just created as the client
215     let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
216 
217     // Accept the client's connection
218     // Not sure if this is strictly needed but I'm doing it just in case.
219     // We expect at this point that the client will already be connected,
220     // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
221     // It's also OK if we get a return code of success.
222     server_end.wait_for_client_connection()?;
223 
224     Ok((server_end, client_end))
225 }
226 
227 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
228 /// settings.
229 ///
230 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
231 ///
232 /// # Arguments
233 ///
234 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
235 ///                     `\\.\pipe\<some-name>`.
236 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
237 ///                     automatically framed sequence of messages (Message). In message mode it's an
238 ///                     error to read fewer bytes than were sent in a message from the other end of
239 ///                     the pipe.
240 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
241 ///                     return immediately if there is nothing available (NoWait).
242 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds.
243 ///                     Setting this to zero will create sockets with the system
244 ///                     default timeout.
245 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
246 ///                     buffer automatically as needed, except in the case of NOWAIT pipes, where
247 ///                     it will just fail writes that don't fit in the buffer.
248 /// * `overlapped`    - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>249 pub fn create_server_pipe(
250     pipe_name: &str,
251     framing_mode: &FramingMode,
252     blocking_mode: &BlockingMode,
253     timeout: u64,
254     buffer_size: usize,
255     overlapped: bool,
256 ) -> Result<PipeConnection> {
257     let c_pipe_name = CString::new(pipe_name).unwrap();
258 
259     let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
260     if overlapped {
261         open_mode_flags |= FILE_FLAG_OVERLAPPED
262     }
263 
264     // This sets flags so there will be an error if >1 instance (server end)
265     // of this pipe name is opened because we expect exactly one.
266     let server_handle = unsafe {
267         // Safe because security attributes are valid, pipe_name is valid C string,
268         // and we're checking the return code
269         CreateNamedPipeA(
270             c_pipe_name.as_ptr(),
271             /* dwOpenMode= */
272             open_mode_flags,
273             /* dwPipeMode= */
274             framing_mode.to_pipetype()
275                 | framing_mode.to_readmode()
276                 | DWORD::from(blocking_mode)
277                 | PIPE_REJECT_REMOTE_CLIENTS,
278             /* nMaxInstances= */ 1,
279             /* nOutBufferSize= */ buffer_size as DWORD,
280             /* nInBufferSize= */ buffer_size as DWORD,
281             /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
282             /* lpSecurityAttributes= */
283             SecurityAttributes::new_with_security_descriptor(
284                 SelfRelativeSecurityDescriptor::get_singleton(),
285                 /* inherit= */ true,
286             )
287             .as_mut(),
288         )
289     };
290 
291     if server_handle == INVALID_HANDLE_VALUE {
292         Err(io::Error::last_os_error())
293     } else {
294         unsafe {
295             Ok(PipeConnection {
296                 handle: SafeDescriptor::from_raw_descriptor(server_handle),
297                 framing_mode: *framing_mode,
298                 blocking_mode: *blocking_mode,
299             })
300         }
301     }
302 }
303 
304 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
305 /// settings.
306 ///
307 /// The pipe will be set to prevent impersonation of the client by the server process.
308 ///
309 /// # Arguments
310 ///
311 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
312 ///                     `\\.\pipe\<some-name>`.
313 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
314 ///                     automatically framed sequence of messages (Message). In message mode it's an
315 ///                     error to read fewer bytes than were sent in a message from the other end of
316 ///                     the pipe.
317 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
318 ///                     return immediately if there is nothing available (NoWait).
319 /// * `overlapped`    - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>320 pub fn create_client_pipe(
321     pipe_name: &str,
322     framing_mode: &FramingMode,
323     blocking_mode: &BlockingMode,
324     overlapped: bool,
325 ) -> Result<PipeConnection> {
326     let client_handle = OpenOptions::new()
327         .read(true)
328         .write(true)
329         .create(true)
330         .security_qos_flags(SECURITY_IDENTIFICATION)
331         .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
332         .open(pipe_name)?
333         .into_raw_descriptor();
334 
335     let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
336 
337     // Safe because client_handle's open() call did not return an error.
338     unsafe {
339         set_named_pipe_handle_state(client_handle, &mut client_mode)?;
340     }
341 
342     Ok(PipeConnection {
343         // Safe because client_handle is valid
344         handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
345         framing_mode: *framing_mode,
346         blocking_mode: *blocking_mode,
347     })
348 }
349 
350 // This is used to mark types which can be appropriately sent through the
351 // generic helper functions write_to_pipe and read_from_pipe.
352 pub trait PipeSendable {
353     // Default values used to fill in new empty indexes when resizing a buffer to
354     // a larger size.
default() -> Self355     fn default() -> Self;
356 }
357 impl PipeSendable for u8 {
default() -> Self358     fn default() -> Self {
359         0
360     }
361 }
362 impl PipeSendable for RawDescriptor {
default() -> Self363     fn default() -> Self {
364         ptr::null_mut()
365     }
366 }
367 
368 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>369     pub fn try_clone(&self) -> Result<PipeConnection> {
370         let copy_handle = self.handle.try_clone()?;
371         Ok(PipeConnection {
372             handle: copy_handle,
373             framing_mode: self.framing_mode,
374             blocking_mode: self.blocking_mode,
375         })
376     }
377 
378     /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
379     /// blocking modes.
380     ///
381     /// # Safety
382     /// 1. rd is valid and ownership is transferred to this function when it is called.
383     ///
384     /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
385     /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection386     pub unsafe fn from_raw_descriptor(
387         rd: RawDescriptor,
388         framing_mode: FramingMode,
389         blocking_mode: BlockingMode,
390     ) -> PipeConnection {
391         PipeConnection {
392             handle: SafeDescriptor::from_raw_descriptor(rd),
393             framing_mode,
394             blocking_mode,
395         }
396     }
397 
398     /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
399     /// Returns the number of bytes (not values) read.
400     ///
401     /// # Safety
402     ///
403     /// This is safe only when the following conditions hold:
404     ///     1. The data on the other end of the pipe is a valid binary representation of data for
405     ///     type T, and
406     ///     2. The number of bytes read is a multiple of the size of T; this must be checked by
407     ///     the caller.
408     /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
409     /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>410     pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
411         PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
412     }
413 
414     /// Similar to `PipeConnection::read` except it also allows:
415     ///     1. The same end of the named pipe to read and write at the same time in different
416     ///        threads.
417     ///     2. Asynchronous read and write (read and write won't block).
418     ///
419     /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
420     /// (can be created with `PipeConnection::create_overlapped_struct`) will be passed into
421     /// `ReadFile`. That event will be triggered when the read operation is complete.
422     ///
423     /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
424     /// also help with waiting until the read operation is complete.
425     ///
426     /// # Safety
427     ///
428     /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
429     /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>430     pub unsafe fn read_overlapped<T: PipeSendable>(
431         &mut self,
432         buf: &mut [T],
433         overlapped_wrapper: &mut OverlappedWrapper,
434     ) -> Result<()> {
435         if overlapped_wrapper.in_use {
436             return Err(std::io::Error::new(
437                 std::io::ErrorKind::InvalidInput,
438                 "Overlapped struct already in use",
439             ));
440         }
441         overlapped_wrapper.in_use = true;
442 
443         PipeConnection::read_internal(
444             &self.handle,
445             self.blocking_mode,
446             buf,
447             Some(&mut overlapped_wrapper.overlapped),
448         )?;
449         Ok(())
450     }
451 
452     /// Helper for `read_overlapped` and `read`
453     ///
454     /// # Safety
455     /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>456     unsafe fn read_internal<T: PipeSendable>(
457         handle: &SafeDescriptor,
458         blocking_mode: BlockingMode,
459         buf: &mut [T],
460         overlapped: Option<&mut OVERLAPPED>,
461     ) -> Result<usize> {
462         let max_bytes_to_read: DWORD = mem::size_of_val(buf) as DWORD;
463         // Used to verify if ERROR_IO_PENDING should be an error.
464         let is_overlapped = overlapped.is_some();
465 
466         // Safe because we cap the size of the read to the size of the buffer
467         // and check the return code
468         let mut bytes_read: DWORD = 0;
469         let success_flag = ReadFile(
470             handle.as_raw_descriptor(),
471             buf.as_ptr() as LPVOID,
472             max_bytes_to_read,
473             match overlapped {
474                 Some(_) => std::ptr::null_mut(),
475                 None => &mut bytes_read,
476             },
477             match overlapped {
478                 Some(v) => v,
479                 None => std::ptr::null_mut(),
480             },
481         );
482 
483         if success_flag == 0 {
484             let e = io::Error::last_os_error();
485             match e.raw_os_error() {
486                 Some(error_code)
487                     if blocking_mode == BlockingMode::NoWait
488                         && error_code == ERROR_NO_DATA as i32 =>
489                 {
490                     // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
491                     // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
492                     // correct. For further details see:
493                     // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
494                     // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
495                     Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
496                 }
497                 // ERROR_IO_PENDING, according the to docs, isn't really an error. This just means
498                 // that the ReadFile operation hasn't completed. In this case,
499                 // `get_overlapped_result` will wait until the operation is completed.
500                 Some(error_code) if error_code == ERROR_IO_PENDING as i32 && is_overlapped => {
501                     return Ok(0);
502                 }
503                 _ => Err(e),
504             }
505         } else {
506             Ok(bytes_read as usize)
507         }
508     }
509 
510     /// Gets the size in bytes of data in the pipe.
511     ///
512     /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
513     /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>514     pub fn get_available_byte_count(&self) -> io::Result<u32> {
515         let mut total_bytes_avail: DWORD = 0;
516 
517         // Safe because the underlying pipe handle is guaranteed to be open, and the output values
518         // live at valid memory locations.
519         let res = unsafe {
520             PeekNamedPipe(
521                 self.as_raw_descriptor(),
522                 ptr::null_mut(),
523                 0,
524                 ptr::null_mut(),
525                 &mut total_bytes_avail,
526                 ptr::null_mut(),
527             )
528         };
529 
530         if res == 0 {
531             Err(io::Error::last_os_error())
532         } else {
533             Ok(total_bytes_avail)
534         }
535     }
536 
537     /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
538     /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>539     pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
540         PipeConnection::write_internal(&self.handle, buf, None)
541     }
542 
543     /// Similar to `PipeConnection::write` except it also allows:
544     ///     1. The same end of the named pipe to read and write at the same time in different
545     ///        threads.
546     ///     2. Asynchronous read and write (read and write won't block).
547     ///
548     /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
549     /// (can be created with `PipeConnection::create_overlapped_struct`) will be passed into
550     /// `WriteFile`. That event will be triggered when the write operation is complete.
551     ///
552     /// In order to get how many bytes were written, call `get_overlapped_result`. That function will
553     /// also help with waiting until the write operation is complete. The pipe must be opened in
554     /// overlapped otherwise there may be unexpected behavior.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>555     pub fn write_overlapped<T: PipeSendable>(
556         &mut self,
557         buf: &[T],
558         overlapped_wrapper: &mut OverlappedWrapper,
559     ) -> Result<()> {
560         if overlapped_wrapper.in_use {
561             return Err(std::io::Error::new(
562                 std::io::ErrorKind::InvalidInput,
563                 "Overlapped struct already in use",
564             ));
565         }
566         overlapped_wrapper.in_use = true;
567 
568         PipeConnection::write_internal(
569             &self.handle,
570             buf,
571             Some(&mut overlapped_wrapper.overlapped),
572         )?;
573         Ok(())
574     }
575 
576     /// Helper for `write_overlapped` and `write`.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>577     fn write_internal<T: PipeSendable>(
578         handle: &SafeDescriptor,
579         buf: &[T],
580         overlapped: Option<&mut OVERLAPPED>,
581     ) -> Result<usize> {
582         let bytes_to_write: DWORD = mem::size_of_val(buf) as DWORD;
583         let is_overlapped = overlapped.is_some();
584 
585         // Safe because buf points to a valid region of memory whose size we have computed,
586         // pipe has not been closed (as it's managed by this object), and we check the return
587         // value for any errors
588         unsafe {
589             let mut bytes_written: DWORD = 0;
590             let success_flag = WriteFile(
591                 handle.as_raw_descriptor(),
592                 buf.as_ptr() as LPCVOID,
593                 bytes_to_write,
594                 match overlapped {
595                     Some(_) => std::ptr::null_mut(),
596                     None => &mut bytes_written,
597                 },
598                 match overlapped {
599                     Some(v) => v,
600                     None => std::ptr::null_mut(),
601                 },
602             );
603 
604             if success_flag == 0 {
605                 let err = io::Error::last_os_error().raw_os_error().unwrap() as u32;
606                 if err == ERROR_IO_PENDING && is_overlapped {
607                     return Ok(0);
608                 }
609                 Err(io::Error::last_os_error())
610             } else {
611                 Ok(bytes_written as usize)
612             }
613         }
614     }
615 
616     /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>617     pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
618         let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
619         self.blocking_mode = *blocking_mode;
620 
621         // Safe because the pipe has not been closed (it is managed by this object).
622         unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
623     }
624 
625     /// For a server named pipe, waits for a client to connect
wait_for_client_connection(&self) -> Result<()>626     pub fn wait_for_client_connection(&self) -> Result<()> {
627         // Safe because the handle is valid and we're checking the return
628         // code according to the documentation
629         unsafe {
630             let success_flag = ConnectNamedPipe(
631                 self.as_raw_descriptor(),
632                 /* lpOverlapped= */ ptr::null_mut(),
633             );
634             if success_flag == 0 && GetLastError() != ERROR_PIPE_CONNECTED {
635                 return Err(io::Error::last_os_error());
636             }
637         }
638         Ok(())
639     }
640 
641     /// Used for overlapped read and write operations.
642     ///
643     /// This will block until the ReadFile or WriteFile operation that also took in
644     /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
645     /// `create_overlapped_struct` or that OVERLAPPED.hEvent is set. This will also get
646     /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>647     pub fn get_overlapped_result(
648         &mut self,
649         overlapped_wrapper: &mut OverlappedWrapper,
650     ) -> io::Result<u32> {
651         if !overlapped_wrapper.in_use {
652             return Err(std::io::Error::new(
653                 std::io::ErrorKind::InvalidInput,
654                 "Overlapped struct is not in use",
655             ));
656         }
657         let mut size_transferred = 0;
658         // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
659         // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
660         let res = unsafe {
661             GetOverlappedResult(
662                 self.handle.as_raw_descriptor(),
663                 &mut *overlapped_wrapper.overlapped,
664                 &mut size_transferred,
665                 TRUE,
666             )
667         };
668         overlapped_wrapper.in_use = false;
669         if res == 0 {
670             Err(io::Error::last_os_error())
671         } else {
672             Ok(size_transferred)
673         }
674     }
675 
676     /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
677     /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
678     /// returned must not be dropped.
679     ///
680     /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
681     /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
682     /// handle will be signaled when the operation is complete. In other words, you can use
683     /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
684     /// Microsoft though.
create_overlapped_struct(include_event: bool) -> Result<OverlappedWrapper>685     pub fn create_overlapped_struct(include_event: bool) -> Result<OverlappedWrapper> {
686         let mut overlapped = OVERLAPPED::default();
687         let h_event = if include_event {
688             Some(Event::new()?)
689         } else {
690             None
691         };
692         overlapped.hEvent = h_event.as_ref().unwrap().as_raw_descriptor();
693         Ok(OverlappedWrapper {
694             overlapped: Box::new(overlapped),
695             h_event,
696             in_use: false,
697         })
698     }
699 
700     /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
701     /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>702     pub fn cancel_io(&mut self) -> Result<()> {
703         let res = unsafe {
704             CancelIoEx(
705                 self.handle.as_raw_descriptor(),
706                 /* lpOverlapped= */ std::ptr::null_mut(),
707             )
708         };
709         if res == 0 {
710             Err(io::Error::last_os_error())
711         } else {
712             Ok(())
713         }
714     }
715 
716     /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode717     pub fn get_framing_mode(&self) -> FramingMode {
718         self.framing_mode
719     }
720 
721     /// Returns metadata about the connected NamedPipe.
get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo>722     pub fn get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo> {
723         let mut flags: u32 = 0;
724         // Marked mutable because they are mutated in a system call
725         #[allow(unused_mut)]
726         let mut incoming_buffer_size: u32 = 0;
727         #[allow(unused_mut)]
728         let mut outgoing_buffer_size: u32 = 0;
729         #[allow(unused_mut)]
730         let mut max_instances: u32 = 0;
731         // Client side with BYTE type are default flags
732         if is_server_connection {
733             flags |= 0x00000001 /* PIPE_SERVER_END */
734         }
735         if self.framing_mode == FramingMode::Message {
736             flags |= 0x00000004 /* PIPE_TYPE_MESSAGE */
737         }
738         // Safe because we have allocated all pointers and own
739         // them as mutable.
740         let res = unsafe {
741             GetNamedPipeInfo(
742                 self.as_raw_descriptor(),
743                 flags as *mut u32,
744                 outgoing_buffer_size as *mut u32,
745                 incoming_buffer_size as *mut u32,
746                 max_instances as *mut u32,
747             )
748         };
749 
750         if res == 0 {
751             Err(io::Error::last_os_error())
752         } else {
753             Ok(NamedPipeInfo {
754                 outgoing_buffer_size,
755                 incoming_buffer_size,
756                 max_instances,
757             })
758         }
759     }
760 
761     /// For a server pipe, flush the pipe contents. This will
762     /// block until the pipe is cleared by the client. Only
763     /// call this if you are sure the client is reading the
764     /// data!
flush_data_blocking(&self) -> Result<()>765     pub fn flush_data_blocking(&self) -> Result<()> {
766         // Safe because the only buffers interacted with are
767         // outside of Rust memory
768         let res = unsafe { FlushFileBuffers(self.as_raw_descriptor()) };
769         if res == 0 {
770             Err(io::Error::last_os_error())
771         } else {
772             Ok(())
773         }
774     }
775 }
776 
777 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor778     fn as_raw_descriptor(&self) -> RawDescriptor {
779         self.handle.as_raw_descriptor()
780     }
781 }
782 
783 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor784     fn into_raw_descriptor(self) -> RawDescriptor {
785         self.handle.into_raw_descriptor()
786     }
787 }
788 
789 unsafe impl Send for PipeConnection {}
790 unsafe impl Sync for PipeConnection {}
791 
792 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>793     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
794         // This is safe because PipeConnection::read is always safe for u8
795         unsafe { PipeConnection::read(self, buf) }
796     }
797 }
798 
799 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>800     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
801         PipeConnection::write(self, buf)
802     }
803 
flush(&mut self) -> io::Result<()>804     fn flush(&mut self) -> io::Result<()> {
805         Ok(())
806     }
807 }
808 
809 /// A simple data struct representing
810 /// metadata about a NamedPipe.
811 pub struct NamedPipeInfo {
812     pub outgoing_buffer_size: u32,
813     pub incoming_buffer_size: u32,
814     pub max_instances: u32,
815 }
816 
817 #[cfg(test)]
818 mod tests {
819     use super::*;
820 
821     #[test]
duplex_pipe_stream()822     fn duplex_pipe_stream() {
823         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
824 
825         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
826         unsafe {
827             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
828                 println!("{}", dir);
829 
830                 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
831 
832                 // Smaller than what we sent so we get multiple chunks
833                 let mut recv_buffer: [u8; 4] = [0; 4];
834 
835                 let mut size = receiver.read(&mut recv_buffer).unwrap();
836                 assert_eq!(size, 4);
837                 assert_eq!(recv_buffer, [75, 77, 54, 82]);
838 
839                 size = receiver.read(&mut recv_buffer).unwrap();
840                 assert_eq!(size, 2);
841                 assert_eq!(recv_buffer[0..2], [76, 65]);
842             }
843         }
844     }
845 
846     #[test]
available_byte_count_byte_mode()847     fn available_byte_count_byte_mode() {
848         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
849         p1.write(&[1, 23, 45]).unwrap();
850         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
851 
852         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
853         // yield the same value.
854         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
855     }
856 
857     #[test]
available_byte_count_message_mode()858     fn available_byte_count_message_mode() {
859         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
860         p1.write(&[1, 23, 45]).unwrap();
861         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
862 
863         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
864         // yield the same value.
865         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
866     }
867 
868     #[test]
available_byte_count_message_mode_multiple_messages()869     fn available_byte_count_message_mode_multiple_messages() {
870         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
871         p1.write(&[1, 2, 3]).unwrap();
872         p1.write(&[4, 5]).unwrap();
873         assert_eq!(p2.get_available_byte_count().unwrap(), 5);
874     }
875 
876     #[test]
duplex_pipe_message()877     fn duplex_pipe_message() {
878         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
879 
880         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
881         unsafe {
882             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
883                 println!("{}", dir);
884 
885                 // Send 2 messages so that we can check that message framing works
886                 sender.write(&[1, 23, 45]).unwrap();
887                 sender.write(&[67, 89, 10]).unwrap();
888 
889                 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
890 
891                 let mut size = receiver.read(&mut recv_buffer).unwrap();
892                 assert_eq!(size, 3);
893                 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
894 
895                 size = receiver.read(&mut recv_buffer).unwrap();
896                 assert_eq!(size, 3);
897                 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
898             }
899         }
900     }
901 
902     #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)903     fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
904         let mut recv_buffer: [u8; 1] = [0; 1];
905 
906         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
907         unsafe {
908             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
909                 println!("{}", dir);
910                 sender.write(&[1]).unwrap();
911                 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
912                 assert_eq!(
913                     receiver.read(&mut recv_buffer).unwrap_err().kind(),
914                     std::io::ErrorKind::WouldBlock
915                 );
916             }
917         }
918     }
919 
920     #[test]
duplex_nowait()921     fn duplex_nowait() {
922         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
923         duplex_nowait_helper(&p1, &p2);
924     }
925 
926     #[test]
duplex_nowait_set_after_creation()927     fn duplex_nowait_set_after_creation() {
928         // Tests non blocking setting after pipe creation
929         let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
930         p1.set_blocking(&BlockingMode::NoWait)
931             .expect("Failed to set blocking mode on pipe p1");
932         p2.set_blocking(&BlockingMode::NoWait)
933             .expect("Failed to set blocking mode on pipe p2");
934         duplex_nowait_helper(&p1, &p2);
935     }
936 
937     #[test]
duplex_overlapped()938     fn duplex_overlapped() {
939         let pipe_name = generate_pipe_name();
940 
941         let mut p1 = create_server_pipe(
942             &pipe_name,
943             &FramingMode::Message,
944             &BlockingMode::Wait,
945             /* timeout= */ 0,
946             /* buffer_size= */ 1000,
947             /* overlapped= */ true,
948         )
949         .unwrap();
950 
951         let mut p2 = create_client_pipe(
952             &pipe_name,
953             &FramingMode::Message,
954             &BlockingMode::Wait,
955             /* overlapped= */ true,
956         )
957         .unwrap();
958 
959         // Safe because `read_overlapped` can be called since overlapped struct is created.
960         unsafe {
961             let mut p1_overlapped_wrapper =
962                 PipeConnection::create_overlapped_struct(/* include_event= */ true).unwrap();
963             p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
964                 .unwrap();
965             let size = p1
966                 .get_overlapped_result(&mut p1_overlapped_wrapper)
967                 .unwrap();
968             assert_eq!(size, 6);
969 
970             let mut recv_buffer: [u8; 6] = [0; 6];
971 
972             let mut p2_overlapped_wrapper =
973                 PipeConnection::create_overlapped_struct(/* include_event= */ true).unwrap();
974             p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
975                 .unwrap();
976             let size = p2
977                 .get_overlapped_result(&mut p2_overlapped_wrapper)
978                 .unwrap();
979             assert_eq!(size, 6);
980             assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
981         }
982     }
983 
984     #[test]
duplex_overlapped_test_in_use()985     fn duplex_overlapped_test_in_use() {
986         let pipe_name = generate_pipe_name();
987 
988         let mut p1 = create_server_pipe(
989             &pipe_name,
990             &FramingMode::Message,
991             &BlockingMode::Wait,
992             /* timeout= */ 0,
993             /* buffer_size= */ 1000,
994             /* overlapped= */ true,
995         )
996         .unwrap();
997 
998         let mut p2 = create_client_pipe(
999             &pipe_name,
1000             &FramingMode::Message,
1001             &BlockingMode::Wait,
1002             /* overlapped= */ true,
1003         )
1004         .unwrap();
1005         let mut overlapped_wrapper =
1006             PipeConnection::create_overlapped_struct(/* include_event= */ true).unwrap();
1007 
1008         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1009         assert!(res.is_err());
1010 
1011         let res = p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper);
1012         assert!(res.is_ok());
1013 
1014         let res = p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper);
1015         assert!(res.is_err());
1016 
1017         let mut recv_buffer: [u8; 6] = [0; 6];
1018         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1019         assert!(res.is_err());
1020 
1021         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1022         assert!(res.is_ok());
1023 
1024         let mut recv_buffer: [u8; 6] = [0; 6];
1025         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1026         assert!(res.is_ok());
1027     }
1028 
generate_pipe_name() -> String1029     fn generate_pipe_name() -> String {
1030         format!(
1031             r"\\.\pipe\test-ipc-pipe-name.rand{}",
1032             rand::thread_rng().gen::<u64>(),
1033         )
1034     }
1035 }
1036