• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicUsize;
14 use std::sync::atomic::Ordering;
15 
16 use rand::Rng;
17 use serde::Deserialize;
18 use serde::Serialize;
19 use win_util::fail_if_zero;
20 use win_util::SecurityAttributes;
21 use win_util::SelfRelativeSecurityDescriptor;
22 use winapi::shared::minwindef::DWORD;
23 use winapi::shared::minwindef::FALSE;
24 use winapi::shared::minwindef::TRUE;
25 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
26 use winapi::shared::winerror::ERROR_IO_PENDING;
27 use winapi::shared::winerror::ERROR_NO_DATA;
28 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
29 use winapi::um::errhandlingapi::GetLastError;
30 use winapi::um::fileapi::FlushFileBuffers;
31 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
32 use winapi::um::ioapiset::CancelIoEx;
33 use winapi::um::ioapiset::GetOverlappedResult;
34 use winapi::um::minwinbase::OVERLAPPED;
35 use winapi::um::namedpipeapi::ConnectNamedPipe;
36 use winapi::um::namedpipeapi::DisconnectNamedPipe;
37 use winapi::um::namedpipeapi::GetNamedPipeInfo;
38 use winapi::um::namedpipeapi::PeekNamedPipe;
39 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
40 use winapi::um::winbase::CreateNamedPipeA;
41 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
42 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
43 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
44 use winapi::um::winbase::PIPE_NOWAIT;
45 use winapi::um::winbase::PIPE_READMODE_BYTE;
46 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
47 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
48 use winapi::um::winbase::PIPE_TYPE_BYTE;
49 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
50 use winapi::um::winbase::PIPE_WAIT;
51 use winapi::um::winbase::SECURITY_IDENTIFICATION;
52 
53 use super::RawDescriptor;
54 use crate::descriptor::AsRawDescriptor;
55 use crate::descriptor::FromRawDescriptor;
56 use crate::descriptor::IntoRawDescriptor;
57 use crate::descriptor::SafeDescriptor;
58 use crate::Event;
59 use crate::EventToken;
60 use crate::WaitContext;
61 
62 /// The default buffer size for all named pipes in the system. If this size is too small, writers
63 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
64 ///
65 /// The general rule is this should be *at least* as big as the largest message, otherwise
66 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
67 /// crate::platform::StreamChannel, which expects to be able to make a complete write before releasing
68 /// a lock that the opposite side needs to complete a read. This means that if the buffer is too
69 /// small:
70 ///     * The writer can't complete its write and release the lock because the buffer is too small.
71 ///     * The reader can't start reading because the lock is held by the writer, so it can't
72 ///       relieve buffer pressure. Note that for message pipes, the reader couldn't do anything
73 ///       to help anyway, because a message mode pipe should NOT have a partial read (which is
74 ///       what we would need to relieve pressure).
75 ///     * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
76 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
77 
78 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
79 
80 /// Represents one end of a named pipe
81 #[derive(Serialize, Deserialize, Debug)]
82 pub struct PipeConnection {
83     handle: SafeDescriptor,
84     framing_mode: FramingMode,
85     blocking_mode: BlockingMode,
86 }
87 
88 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
89 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
90 pub struct OverlappedWrapper {
91     // Allocated on the heap so that the OVERLAPPED struct doesn't move when performing I/O
92     // operations.
93     overlapped: Box<OVERLAPPED>,
94     // This field prevents the event handle from being dropped too early and allows callers to
95     // be notified when a read or write overlapped operation has completed.
96     h_event: Option<Event>,
97     in_use: bool,
98 }
99 
100 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>101     pub fn get_h_event_ref(&self) -> Option<&Event> {
102         self.h_event.as_ref()
103     }
104 
105     /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
106     /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
107     /// returned must not be dropped.
108     ///
109     /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
110     /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
111     /// handle will be signaled when the operation is complete. In other words, you can use
112     /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
113     /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>114     pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
115         let mut overlapped = OVERLAPPED::default();
116         let h_event = if include_event {
117             Some(Event::new()?)
118         } else {
119             None
120         };
121 
122         overlapped.hEvent = if let Some(event) = h_event.as_ref() {
123             event.as_raw_descriptor()
124         } else {
125             0 as RawDescriptor
126         };
127 
128         Ok(OverlappedWrapper {
129             overlapped: Box::new(overlapped),
130             h_event,
131             in_use: false,
132         })
133     }
134 }
135 
136 // Safe because all of the contained fields may be safely sent to another thread.
137 unsafe impl Send for OverlappedWrapper {}
138 
139 pub trait WriteOverlapped {
140     /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
141     /// If successful, the write operation will complete asynchronously, and
142     /// `write_result()` should be called to get the result.
143     ///
144     /// # Safety
145     /// `buf` and `overlapped_wrapper` will be in use for the duration of
146     /// the overlapped operation. These must not be reused and must live until
147     /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>148     unsafe fn write_overlapped(
149         &mut self,
150         buf: &mut [u8],
151         overlapped_wrapper: &mut OverlappedWrapper,
152     ) -> io::Result<()>;
153 
154     /// Gets the result of the overlapped write operation. Must only be called
155     /// after issuing an overlapped write operation using `write_overlapped`. The
156     /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>157     fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
158 
159     /// Tries to get the result of the overlapped write operation. Must only be
160     /// called once, and only after issuing an overlapped write operation using
161     /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
162     ///
163     /// An error indicates that the operation hasn't completed yet and
164     /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>165     fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
166         -> io::Result<usize>;
167 }
168 
169 pub trait ReadOverlapped {
170     /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
171     /// If successful, the read operation will complete asynchronously, and
172     /// `read_result()` should be called to get the result.
173     ///
174     /// # Safety
175     /// `buf` and `overlapped_wrapper` will be in use for the duration of
176     /// the overlapped operation. These must not be reused and must live until
177     /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>178     unsafe fn read_overlapped(
179         &mut self,
180         buf: &mut [u8],
181         overlapped_wrapper: &mut OverlappedWrapper,
182     ) -> io::Result<()>;
183 
184     /// Gets the result of the overlapped read operation. Must only be called
185     /// once, and only after issuing an overlapped read operation using
186     /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>187     fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
188 
189     /// Tries to get the result of the overlapped read operation. Must only be called
190     /// after issuing an overlapped read operation using `read_overlapped`. The
191     /// same `overlapped_wrapper` must be provided.
192     ///
193     /// An error indicates that the operation hasn't completed yet and
194     /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>195     fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
196 }
197 
198 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
199 pub enum FramingMode {
200     Byte,
201     Message,
202 }
203 
204 impl FramingMode {
to_readmode(self) -> DWORD205     fn to_readmode(self) -> DWORD {
206         match self {
207             FramingMode::Message => PIPE_READMODE_MESSAGE,
208             FramingMode::Byte => PIPE_READMODE_BYTE,
209         }
210     }
211 
to_pipetype(self) -> DWORD212     fn to_pipetype(self) -> DWORD {
213         match self {
214             FramingMode::Message => PIPE_TYPE_MESSAGE,
215             FramingMode::Byte => PIPE_TYPE_BYTE,
216         }
217     }
218 }
219 
220 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
221 pub enum BlockingMode {
222     /// Calls to read() block until data is received
223     Wait,
224     /// Calls to read() return immediately even if there is nothing read with error code 232
225     /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
226     ///
227     /// NOTE: This mode is discouraged by the Windows API documentation.
228     NoWait,
229 }
230 
231 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD232     fn from(blocking_mode: &BlockingMode) -> DWORD {
233         match blocking_mode {
234             BlockingMode::Wait => PIPE_WAIT,
235             BlockingMode::NoWait => PIPE_NOWAIT,
236         }
237     }
238 }
239 
240 /// Sets the handle state for a named pipe in a rust friendly way.
241 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>242 unsafe fn set_named_pipe_handle_state(
243     pipe_handle: RawDescriptor,
244     client_mode: &mut DWORD,
245 ) -> Result<()> {
246     // Safe when the pipe handle is open. Safety also requires checking the return value, which we
247     // do below.
248     let success_flag = SetNamedPipeHandleState(
249         /* hNamedPipe= */ pipe_handle,
250         /* lpMode= */ client_mode,
251         /* lpMaxCollectionCount= */ ptr::null_mut(),
252         /* lpCollectDataTimeout= */ ptr::null_mut(),
253     );
254     if success_flag == 0 {
255         Err(io::Error::last_os_error())
256     } else {
257         Ok(())
258     }
259 }
260 
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>261 pub fn pair(
262     framing_mode: &FramingMode,
263     blocking_mode: &BlockingMode,
264     timeout: u64,
265 ) -> Result<(PipeConnection, PipeConnection)> {
266     pair_with_buffer_size(
267         framing_mode,
268         blocking_mode,
269         timeout,
270         DEFAULT_BUFFER_SIZE,
271         false,
272     )
273 }
274 
275 /// Creates a pair of handles connected to either end of a duplex named pipe.
276 ///
277 /// The pipe created will have a semi-random name and a default set of security options that
278 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
279 /// remote clients, allow only a single server instance, and prevent impersonation by the server
280 /// end of the pipe.
281 ///
282 /// # Arguments
283 ///
284 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
285 ///                     automatically framed sequence of messages (Message). In message mode it's an
286 ///                     error to read fewer bytes than were sent in a message from the other end of
287 ///                     the pipe.
288 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
289 ///                     return immediately if there is nothing available (NoWait).
290 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds.
291 ///                     Setting this to zero will create sockets with the system
292 ///                     default timeout.
293 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
294 ///                     buffer automatically as needed, except in the case of NOWAIT pipes, where
295 ///                     it will just fail writes that don't fit in the buffer.
296 /// # Return value
297 ///
298 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
299 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>300 pub fn pair_with_buffer_size(
301     framing_mode: &FramingMode,
302     blocking_mode: &BlockingMode,
303     timeout: u64,
304     buffer_size: usize,
305     overlapped: bool,
306 ) -> Result<(PipeConnection, PipeConnection)> {
307     // Give the pipe a unique name to avoid accidental collisions
308     let pipe_name = format!(
309         r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
310         process::id(),
311         NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
312         rand::thread_rng().gen::<u32>(),
313     );
314 
315     let server_end = create_server_pipe(
316         &pipe_name,
317         framing_mode,
318         blocking_mode,
319         timeout,
320         buffer_size,
321         overlapped,
322     )?;
323 
324     // Open the named pipe we just created as the client
325     let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
326 
327     // Accept the client's connection
328     // Not sure if this is strictly needed but I'm doing it just in case.
329     // We expect at this point that the client will already be connected,
330     // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
331     // It's also OK if we get a return code of success.
332     server_end.wait_for_client_connection()?;
333 
334     Ok((server_end, client_end))
335 }
336 
337 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
338 /// settings.
339 ///
340 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
341 ///
342 /// # Arguments
343 ///
344 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
345 ///                     `\\.\pipe\<some-name>`.
346 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
347 ///                     automatically framed sequence of messages (Message). In message mode it's an
348 ///                     error to read fewer bytes than were sent in a message from the other end of
349 ///                     the pipe.
350 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
351 ///                     return immediately if there is nothing available (NoWait).
352 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds.
353 ///                     Setting this to zero will create sockets with the system
354 ///                     default timeout.
355 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
356 ///                     buffer automatically as needed, except in the case of NOWAIT pipes, where
357 ///                     it will just fail writes that don't fit in the buffer.
358 /// * `overlapped`    - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>359 pub fn create_server_pipe(
360     pipe_name: &str,
361     framing_mode: &FramingMode,
362     blocking_mode: &BlockingMode,
363     timeout: u64,
364     buffer_size: usize,
365     overlapped: bool,
366 ) -> Result<PipeConnection> {
367     let c_pipe_name = CString::new(pipe_name).unwrap();
368 
369     let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
370     if overlapped {
371         open_mode_flags |= FILE_FLAG_OVERLAPPED
372     }
373 
374     // This sets flags so there will be an error if >1 instance (server end)
375     // of this pipe name is opened because we expect exactly one.
376     let server_handle = unsafe {
377         // Safe because security attributes are valid, pipe_name is valid C string,
378         // and we're checking the return code
379         CreateNamedPipeA(
380             c_pipe_name.as_ptr(),
381             /* dwOpenMode= */
382             open_mode_flags,
383             /* dwPipeMode= */
384             framing_mode.to_pipetype()
385                 | framing_mode.to_readmode()
386                 | DWORD::from(blocking_mode)
387                 | PIPE_REJECT_REMOTE_CLIENTS,
388             /* nMaxInstances= */ 1,
389             /* nOutBufferSize= */ buffer_size as DWORD,
390             /* nInBufferSize= */ buffer_size as DWORD,
391             /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
392             /* lpSecurityAttributes= */
393             SecurityAttributes::new_with_security_descriptor(
394                 SelfRelativeSecurityDescriptor::get_singleton(),
395                 /* inherit= */ true,
396             )
397             .as_mut(),
398         )
399     };
400 
401     if server_handle == INVALID_HANDLE_VALUE {
402         Err(io::Error::last_os_error())
403     } else {
404         unsafe {
405             Ok(PipeConnection {
406                 handle: SafeDescriptor::from_raw_descriptor(server_handle),
407                 framing_mode: *framing_mode,
408                 blocking_mode: *blocking_mode,
409             })
410         }
411     }
412 }
413 
414 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
415 /// settings.
416 ///
417 /// The pipe will be set to prevent impersonation of the client by the server process.
418 ///
419 /// # Arguments
420 ///
421 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
422 ///                     `\\.\pipe\<some-name>`.
423 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
424 ///                     automatically framed sequence of messages (Message). In message mode it's an
425 ///                     error to read fewer bytes than were sent in a message from the other end of
426 ///                     the pipe.
427 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
428 ///                     return immediately if there is nothing available (NoWait).
429 /// * `overlapped`    - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>430 pub fn create_client_pipe(
431     pipe_name: &str,
432     framing_mode: &FramingMode,
433     blocking_mode: &BlockingMode,
434     overlapped: bool,
435 ) -> Result<PipeConnection> {
436     let client_handle = OpenOptions::new()
437         .read(true)
438         .write(true)
439         .create(true)
440         .security_qos_flags(SECURITY_IDENTIFICATION)
441         .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
442         .open(pipe_name)?
443         .into_raw_descriptor();
444 
445     let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
446 
447     // Safe because client_handle's open() call did not return an error.
448     unsafe {
449         set_named_pipe_handle_state(client_handle, &mut client_mode)?;
450     }
451 
452     Ok(PipeConnection {
453         // Safe because client_handle is valid
454         handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
455         framing_mode: *framing_mode,
456         blocking_mode: *blocking_mode,
457     })
458 }
459 
460 // This is used to mark types which can be appropriately sent through the
461 // generic helper functions write_to_pipe and read_from_pipe.
462 pub trait PipeSendable {
463     // Default values used to fill in new empty indexes when resizing a buffer to
464     // a larger size.
default() -> Self465     fn default() -> Self;
466 }
467 impl PipeSendable for u8 {
default() -> Self468     fn default() -> Self {
469         0
470     }
471 }
472 impl PipeSendable for RawDescriptor {
default() -> Self473     fn default() -> Self {
474         ptr::null_mut()
475     }
476 }
477 
478 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>479     pub fn try_clone(&self) -> Result<PipeConnection> {
480         let copy_handle = self.handle.try_clone()?;
481         Ok(PipeConnection {
482             handle: copy_handle,
483             framing_mode: self.framing_mode,
484             blocking_mode: self.blocking_mode,
485         })
486     }
487 
488     /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
489     /// blocking modes.
490     ///
491     /// # Safety
492     /// 1. rd is valid and ownership is transferred to this function when it is called.
493     ///
494     /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
495     /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection496     pub unsafe fn from_raw_descriptor(
497         rd: RawDescriptor,
498         framing_mode: FramingMode,
499         blocking_mode: BlockingMode,
500     ) -> PipeConnection {
501         PipeConnection {
502             handle: SafeDescriptor::from_raw_descriptor(rd),
503             framing_mode,
504             blocking_mode,
505         }
506     }
507 
508     /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
509     /// Returns the number of bytes (not values) read.
510     ///
511     /// # Safety
512     ///
513     /// This is safe only when the following conditions hold:
514     ///     1. The data on the other end of the pipe is a valid binary representation of data for
515     ///     type T, and
516     ///     2. The number of bytes read is a multiple of the size of T; this must be checked by
517     ///     the caller.
518     /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
519     /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>520     pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
521         PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
522     }
523 
524     /// Similar to `PipeConnection::read` except it also allows:
525     ///     1. The same end of the named pipe to read and write at the same time in different
526     ///        threads.
527     ///     2. Asynchronous read and write (read and write won't block).
528     ///
529     /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
530     /// (can be created with `OverlappedWrapper::new`) will be passed into
531     /// `ReadFile`. That event will be triggered when the read operation is complete.
532     ///
533     /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
534     /// also help with waiting until the read operation is complete.
535     ///
536     /// # Safety
537     ///
538     /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
539     /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>540     pub unsafe fn read_overlapped<T: PipeSendable>(
541         &mut self,
542         buf: &mut [T],
543         overlapped_wrapper: &mut OverlappedWrapper,
544     ) -> Result<()> {
545         if overlapped_wrapper.in_use {
546             return Err(std::io::Error::new(
547                 std::io::ErrorKind::InvalidInput,
548                 "Overlapped struct already in use",
549             ));
550         }
551         overlapped_wrapper.in_use = true;
552 
553         PipeConnection::read_internal(
554             &self.handle,
555             self.blocking_mode,
556             buf,
557             Some(&mut overlapped_wrapper.overlapped),
558         )?;
559         Ok(())
560     }
561 
562     /// Helper for `read_overlapped` and `read`
563     ///
564     /// # Safety
565     /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>566     unsafe fn read_internal<T: PipeSendable>(
567         handle: &SafeDescriptor,
568         blocking_mode: BlockingMode,
569         buf: &mut [T],
570         overlapped: Option<&mut OVERLAPPED>,
571     ) -> Result<usize> {
572         let res = crate::platform::read_file(
573             handle,
574             buf.as_mut_ptr() as *mut u8,
575             mem::size_of_val(buf),
576             overlapped,
577         );
578         match res {
579             Ok(bytes_read) => Ok(bytes_read),
580             Err(e)
581                 if blocking_mode == BlockingMode::NoWait
582                     && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
583             {
584                 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
585                 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
586                 // correct. For further details see:
587                 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
588                 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
589                 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
590             }
591             Err(e) => Err(e),
592         }
593     }
594 
595     /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
596     /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>597     pub fn read_overlapped_blocking<T: PipeSendable>(
598         &mut self,
599         buf: &mut [T],
600         overlapped_wrapper: &mut OverlappedWrapper,
601         exit_event: &Event,
602     ) -> Result<()> {
603         // Safe because we are providing a valid buffer slice and also providing a valid
604         // overlapped struct.
605         unsafe {
606             self.read_overlapped(buf, overlapped_wrapper)?;
607         };
608 
609         #[derive(EventToken)]
610         enum Token {
611             ReadOverlapped,
612             Exit,
613         }
614 
615         let wait_ctx = WaitContext::build_with(&[
616             (
617                 overlapped_wrapper.get_h_event_ref().unwrap(),
618                 Token::ReadOverlapped,
619             ),
620             (exit_event, Token::Exit),
621         ])?;
622 
623         let events = wait_ctx.wait()?;
624         for event in events {
625             match event.token {
626                 Token::ReadOverlapped => {
627                     let size_read_in_bytes =
628                         self.get_overlapped_result(overlapped_wrapper)? as usize;
629 
630                     // If this error shows, most likely the overlapped named pipe was set up
631                     // incorrectly.
632                     if size_read_in_bytes != buf.len() {
633                         return Err(std::io::Error::new(
634                             std::io::ErrorKind::UnexpectedEof,
635                             "Short read",
636                         ));
637                     }
638                 }
639                 Token::Exit => {
640                     return Err(std::io::Error::new(
641                         std::io::ErrorKind::Interrupted,
642                         "IO canceled on exit request",
643                     ));
644                 }
645             }
646         }
647 
648         Ok(())
649     }
650 
651     /// Reads a variable size message and returns the message on success.
652     /// The size of the message is expected to proceed the message in
653     /// the form of `header_size` message.
654     ///
655     /// `parse_message_size` lets caller parse the header to extract
656     /// message size.
657     ///
658     /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &mut self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>659     pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
660         &mut self,
661         header_size: usize,
662         parse_message_size: F,
663         overlapped_wrapper: &mut OverlappedWrapper,
664         exit_event: &Event,
665     ) -> Result<Vec<u8>> {
666         let mut header = vec![0; header_size];
667         header.resize_with(header_size, Default::default);
668         self.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
669         let message_size = parse_message_size(&header);
670         if message_size == 0 {
671             return Ok(vec![]);
672         }
673         let mut buf = vec![];
674         buf.resize_with(message_size, Default::default);
675         self.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
676         Ok(buf)
677     }
678 
679     /// Gets the size in bytes of data in the pipe.
680     ///
681     /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
682     /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>683     pub fn get_available_byte_count(&self) -> io::Result<u32> {
684         let mut total_bytes_avail: DWORD = 0;
685 
686         // Safe because the underlying pipe handle is guaranteed to be open, and the output values
687         // live at valid memory locations.
688         fail_if_zero!(unsafe {
689             PeekNamedPipe(
690                 self.as_raw_descriptor(),
691                 ptr::null_mut(),
692                 0,
693                 ptr::null_mut(),
694                 &mut total_bytes_avail,
695                 ptr::null_mut(),
696             )
697         });
698 
699         Ok(total_bytes_avail)
700     }
701 
702     /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
703     /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>704     pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
705         // SAFETY: overlapped is None so this is safe.
706         unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
707     }
708 
709     /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
710     /// as a failure.
write_overlapped_blocking_message<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>711     pub fn write_overlapped_blocking_message<T: PipeSendable>(
712         &mut self,
713         buf: &[T],
714         overlapped_wrapper: &mut OverlappedWrapper,
715     ) -> Result<()> {
716         // SAFETY: buf & overlapped_wrapper live until the overlapped operation is
717         // complete, so this is safe.
718         unsafe { self.write_overlapped(buf, overlapped_wrapper)? };
719 
720         let size_written_in_bytes = self.get_overlapped_result(overlapped_wrapper)?;
721 
722         if size_written_in_bytes as usize != buf.len() {
723             return Err(std::io::Error::new(
724                 std::io::ErrorKind::UnexpectedEof,
725                 format!(
726                     "Short write expected:{} found:{}",
727                     size_written_in_bytes,
728                     buf.len(),
729                 ),
730             ));
731         }
732         Ok(())
733     }
734 
735     /// Similar to `PipeConnection::write` except it also allows:
736     ///     1. The same end of the named pipe to read and write at the same time in different
737     ///        threads.
738     ///     2. Asynchronous read and write (read and write won't block).
739     ///
740     /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
741     /// (can be created with `OverlappedWrapper::new`) will be passed into
742     /// `WriteFile`. That event will be triggered when the write operation is complete.
743     ///
744     /// In order to get how many bytes were written, call `get_overlapped_result`. That function will
745     /// also help with waiting until the write operation is complete. The pipe must be opened in
746     /// overlapped otherwise there may be unexpected behavior.
747     ///
748     /// # Safety
749     /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>750     pub unsafe fn write_overlapped<T: PipeSendable>(
751         &mut self,
752         buf: &[T],
753         overlapped_wrapper: &mut OverlappedWrapper,
754     ) -> Result<()> {
755         if overlapped_wrapper.in_use {
756             return Err(std::io::Error::new(
757                 std::io::ErrorKind::InvalidInput,
758                 "Overlapped struct already in use",
759             ));
760         }
761         overlapped_wrapper.in_use = true;
762 
763         PipeConnection::write_internal(
764             &self.handle,
765             buf,
766             Some(&mut overlapped_wrapper.overlapped),
767         )?;
768         Ok(())
769     }
770 
771     /// Helper for `write_overlapped` and `write`.
772     ///
773     /// # Safety
774     /// * Safe if overlapped is None.
775     /// * Safe if overlapped is Some and:
776     ///   + buf lives until the overlapped operation is complete.
777     ///   + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>778     unsafe fn write_internal<T: PipeSendable>(
779         handle: &SafeDescriptor,
780         buf: &[T],
781         overlapped: Option<&mut OVERLAPPED>,
782     ) -> Result<usize> {
783         // Safe because buf points to memory valid until the write completes and we pass a valid
784         // length for that memory.
785         unsafe {
786             crate::platform::write_file(
787                 handle,
788                 buf.as_ptr() as *const u8,
789                 mem::size_of_val(buf),
790                 overlapped,
791             )
792         }
793     }
794 
795     /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>796     pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
797         let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
798         self.blocking_mode = *blocking_mode;
799 
800         // Safe because the pipe has not been closed (it is managed by this object).
801         unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
802     }
803 
804     /// For a server named pipe, waits for a client to connect
wait_for_client_connection(&self) -> Result<()>805     pub fn wait_for_client_connection(&self) -> Result<()> {
806         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
807         self.wait_for_client_connection_internal(
808             &mut overlapped_wrapper,
809             /* should_block = */ true,
810         )
811     }
812 
813     /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
814     /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>815     pub fn wait_for_client_connection_overlapped(
816         &self,
817         overlapped_wrapper: &mut OverlappedWrapper,
818     ) -> Result<()> {
819         self.wait_for_client_connection_internal(
820             overlapped_wrapper,
821             /* should_block = */ false,
822         )
823     }
824 
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>825     fn wait_for_client_connection_internal(
826         &self,
827         overlapped_wrapper: &mut OverlappedWrapper,
828         should_block: bool,
829     ) -> Result<()> {
830         // Safe because the handle is valid and we're checking the return
831         // code according to the documentation
832         unsafe {
833             let success_flag = ConnectNamedPipe(
834                 self.as_raw_descriptor(),
835                 // Note: The overlapped structure is only used if the pipe was opened in
836                 // OVERLAPPED mode, but is necessary in that case.
837                 &mut *overlapped_wrapper.overlapped,
838             );
839             if success_flag == 0 {
840                 return match GetLastError() {
841                     ERROR_PIPE_CONNECTED => {
842                         if !should_block {
843                             // If async, make sure the event is signalled to indicate the client
844                             // is ready.
845                             overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
846                         }
847 
848                         Ok(())
849                     }
850                     ERROR_IO_PENDING => {
851                         if should_block {
852                             overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
853                         }
854                         Ok(())
855                     }
856                     err => Err(io::Error::from_raw_os_error(err as i32)),
857                 };
858             }
859         }
860         Ok(())
861     }
862 
863     /// Used for overlapped read and write operations.
864     ///
865     /// This will block until the ReadFile or WriteFile operation that also took in
866     /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
867     /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
868     /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>869     pub fn get_overlapped_result(
870         &mut self,
871         overlapped_wrapper: &mut OverlappedWrapper,
872     ) -> io::Result<u32> {
873         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
874         overlapped_wrapper.in_use = false;
875         res
876     }
877 
878     /// Used for overlapped read and write operations.
879     ///
880     /// This will return immediately, regardless of the completion status of the
881     /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
882     /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
883     /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
884     /// that were read or written, if completed.  If the operation hasn't
885     /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
886     /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>887     pub fn try_get_overlapped_result(
888         &mut self,
889         overlapped_wrapper: &mut OverlappedWrapper,
890     ) -> io::Result<u32> {
891         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
892         match res {
893             Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
894                 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
895             }
896             _ => {
897                 overlapped_wrapper.in_use = false;
898                 res
899             }
900         }
901     }
902 
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>903     fn get_overlapped_result_internal(
904         &mut self,
905         overlapped_wrapper: &mut OverlappedWrapper,
906         wait: bool,
907     ) -> io::Result<u32> {
908         if !overlapped_wrapper.in_use {
909             return Err(std::io::Error::new(
910                 std::io::ErrorKind::InvalidInput,
911                 "Overlapped struct is not in use",
912             ));
913         }
914         let mut size_transferred = 0;
915         // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
916         // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
917         fail_if_zero!(unsafe {
918             GetOverlappedResult(
919                 self.handle.as_raw_descriptor(),
920                 &mut *overlapped_wrapper.overlapped,
921                 &mut size_transferred,
922                 if wait { TRUE } else { FALSE },
923             )
924         });
925 
926         Ok(size_transferred)
927     }
928 
929     /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
930     /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>931     pub fn cancel_io(&mut self) -> Result<()> {
932         fail_if_zero!(unsafe {
933             CancelIoEx(
934                 self.handle.as_raw_descriptor(),
935                 /* lpOverlapped= */ std::ptr::null_mut(),
936             )
937         });
938 
939         Ok(())
940     }
941 
942     /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode943     pub fn get_framing_mode(&self) -> FramingMode {
944         self.framing_mode
945     }
946 
947     /// Returns metadata about the connected NamedPipe.
get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo>948     pub fn get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo> {
949         let mut flags: u32 = 0;
950         // Marked mutable because they are mutated in a system call
951         #[allow(unused_mut)]
952         let mut incoming_buffer_size: u32 = 0;
953         #[allow(unused_mut)]
954         let mut outgoing_buffer_size: u32 = 0;
955         #[allow(unused_mut)]
956         let mut max_instances: u32 = 0;
957         // Client side with BYTE type are default flags
958         if is_server_connection {
959             flags |= 0x00000001 /* PIPE_SERVER_END */
960         }
961         if self.framing_mode == FramingMode::Message {
962             flags |= 0x00000004 /* PIPE_TYPE_MESSAGE */
963         }
964         // Safe because we have allocated all pointers and own
965         // them as mutable.
966         fail_if_zero!(unsafe {
967             GetNamedPipeInfo(
968                 self.as_raw_descriptor(),
969                 flags as *mut u32,
970                 outgoing_buffer_size as *mut u32,
971                 incoming_buffer_size as *mut u32,
972                 max_instances as *mut u32,
973             )
974         });
975 
976         Ok(NamedPipeInfo {
977             outgoing_buffer_size,
978             incoming_buffer_size,
979             max_instances,
980         })
981     }
982 
983     /// For a server pipe, flush the pipe contents. This will
984     /// block until the pipe is cleared by the client. Only
985     /// call this if you are sure the client is reading the
986     /// data!
flush_data_blocking(&self) -> Result<()>987     pub fn flush_data_blocking(&self) -> Result<()> {
988         // Safe because the only buffers interacted with are
989         // outside of Rust memory
990         fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
991         Ok(())
992     }
993 
994     /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>995     pub fn disconnect_clients(&self) -> Result<()> {
996         // Safe because we own the handle passed in and know it will remain valid for the duration
997         // of the call. Discarded buffers are not managed by rust.
998         fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
999         Ok(())
1000     }
1001 }
1002 
1003 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1004     fn as_raw_descriptor(&self) -> RawDescriptor {
1005         self.handle.as_raw_descriptor()
1006     }
1007 }
1008 
1009 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1010     fn into_raw_descriptor(self) -> RawDescriptor {
1011         self.handle.into_raw_descriptor()
1012     }
1013 }
1014 
1015 unsafe impl Send for PipeConnection {}
1016 unsafe impl Sync for PipeConnection {}
1017 
1018 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1019     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1020         // This is safe because PipeConnection::read is always safe for u8
1021         unsafe { PipeConnection::read(self, buf) }
1022     }
1023 }
1024 
1025 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1026     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1027         PipeConnection::write(self, buf)
1028     }
1029 
flush(&mut self) -> io::Result<()>1030     fn flush(&mut self) -> io::Result<()> {
1031         Ok(())
1032     }
1033 }
1034 
1035 /// A simple data struct representing
1036 /// metadata about a NamedPipe.
1037 pub struct NamedPipeInfo {
1038     pub outgoing_buffer_size: u32,
1039     pub incoming_buffer_size: u32,
1040     pub max_instances: u32,
1041 }
1042 
1043 #[cfg(test)]
1044 mod tests {
1045     use std::mem::size_of;
1046 
1047     use super::*;
1048 
1049     #[test]
duplex_pipe_stream()1050     fn duplex_pipe_stream() {
1051         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1052 
1053         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1054         unsafe {
1055             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1056                 println!("{}", dir);
1057 
1058                 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1059 
1060                 // Smaller than what we sent so we get multiple chunks
1061                 let mut recv_buffer: [u8; 4] = [0; 4];
1062 
1063                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1064                 assert_eq!(size, 4);
1065                 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1066 
1067                 size = receiver.read(&mut recv_buffer).unwrap();
1068                 assert_eq!(size, 2);
1069                 assert_eq!(recv_buffer[0..2], [76, 65]);
1070             }
1071         }
1072     }
1073 
1074     #[test]
available_byte_count_byte_mode()1075     fn available_byte_count_byte_mode() {
1076         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1077         p1.write(&[1, 23, 45]).unwrap();
1078         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1079 
1080         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1081         // yield the same value.
1082         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1083     }
1084 
1085     #[test]
available_byte_count_message_mode()1086     fn available_byte_count_message_mode() {
1087         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1088         p1.write(&[1, 23, 45]).unwrap();
1089         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1090 
1091         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1092         // yield the same value.
1093         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1094     }
1095 
1096     #[test]
available_byte_count_message_mode_multiple_messages()1097     fn available_byte_count_message_mode_multiple_messages() {
1098         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1099         p1.write(&[1, 2, 3]).unwrap();
1100         p1.write(&[4, 5]).unwrap();
1101         assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1102     }
1103 
1104     #[test]
duplex_pipe_message()1105     fn duplex_pipe_message() {
1106         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1107 
1108         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1109         unsafe {
1110             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1111                 println!("{}", dir);
1112 
1113                 // Send 2 messages so that we can check that message framing works
1114                 sender.write(&[1, 23, 45]).unwrap();
1115                 sender.write(&[67, 89, 10]).unwrap();
1116 
1117                 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1118 
1119                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1120                 assert_eq!(size, 3);
1121                 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1122 
1123                 size = receiver.read(&mut recv_buffer).unwrap();
1124                 assert_eq!(size, 3);
1125                 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1126             }
1127         }
1128     }
1129 
1130     #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1131     fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1132         let mut recv_buffer: [u8; 1] = [0; 1];
1133 
1134         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1135         unsafe {
1136             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1137                 println!("{}", dir);
1138                 sender.write(&[1]).unwrap();
1139                 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1140                 assert_eq!(
1141                     receiver.read(&mut recv_buffer).unwrap_err().kind(),
1142                     std::io::ErrorKind::WouldBlock
1143                 );
1144             }
1145         }
1146     }
1147 
1148     #[test]
duplex_nowait()1149     fn duplex_nowait() {
1150         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1151         duplex_nowait_helper(&p1, &p2);
1152     }
1153 
1154     #[test]
duplex_nowait_set_after_creation()1155     fn duplex_nowait_set_after_creation() {
1156         // Tests non blocking setting after pipe creation
1157         let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1158         p1.set_blocking(&BlockingMode::NoWait)
1159             .expect("Failed to set blocking mode on pipe p1");
1160         p2.set_blocking(&BlockingMode::NoWait)
1161             .expect("Failed to set blocking mode on pipe p2");
1162         duplex_nowait_helper(&p1, &p2);
1163     }
1164 
1165     #[test]
duplex_overlapped()1166     fn duplex_overlapped() {
1167         let pipe_name = generate_pipe_name();
1168 
1169         let mut p1 = create_server_pipe(
1170             &pipe_name,
1171             &FramingMode::Message,
1172             &BlockingMode::Wait,
1173             /* timeout= */ 0,
1174             /* buffer_size= */ 1000,
1175             /* overlapped= */ true,
1176         )
1177         .unwrap();
1178 
1179         let mut p2 = create_client_pipe(
1180             &pipe_name,
1181             &FramingMode::Message,
1182             &BlockingMode::Wait,
1183             /* overlapped= */ true,
1184         )
1185         .unwrap();
1186 
1187         // Safe because `read_overlapped` can be called since overlapped struct is created.
1188         unsafe {
1189             let mut p1_overlapped_wrapper =
1190                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1191             p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1192                 .unwrap();
1193             let size = p1
1194                 .get_overlapped_result(&mut p1_overlapped_wrapper)
1195                 .unwrap();
1196             assert_eq!(size, 6);
1197 
1198             let mut recv_buffer: [u8; 6] = [0; 6];
1199 
1200             let mut p2_overlapped_wrapper =
1201                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1202             p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1203                 .unwrap();
1204             let size = p2
1205                 .get_overlapped_result(&mut p2_overlapped_wrapper)
1206                 .unwrap();
1207             assert_eq!(size, 6);
1208             assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1209         }
1210     }
1211 
1212     #[test]
duplex_overlapped_test_in_use()1213     fn duplex_overlapped_test_in_use() {
1214         let pipe_name = generate_pipe_name();
1215 
1216         let mut p1 = create_server_pipe(
1217             &pipe_name,
1218             &FramingMode::Message,
1219             &BlockingMode::Wait,
1220             /* timeout= */ 0,
1221             /* buffer_size= */ 1000,
1222             /* overlapped= */ true,
1223         )
1224         .unwrap();
1225 
1226         let mut p2 = create_client_pipe(
1227             &pipe_name,
1228             &FramingMode::Message,
1229             &BlockingMode::Wait,
1230             /* overlapped= */ true,
1231         )
1232         .unwrap();
1233         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1234 
1235         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1236         assert!(res.is_err());
1237 
1238         let data = vec![75, 77, 54, 82, 76, 65];
1239         // SAFETY: safe because: data & overlapped wrapper live until the
1240         // operation is verified completed below.
1241         let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1242         assert!(res.is_ok());
1243 
1244         // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1245         // will error out.
1246         let res =
1247             unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1248         assert!(res.is_err());
1249 
1250         let mut recv_buffer: [u8; 6] = [0; 6];
1251         // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1252         // will error out.
1253         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1254         assert!(res.is_err());
1255 
1256         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1257         assert!(res.is_ok());
1258 
1259         let mut recv_buffer: [u8; 6] = [0; 6];
1260         // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1261         // operation is verified completed below.
1262         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1263         assert!(res.is_ok());
1264         let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1265         assert!(res.is_ok());
1266     }
1267 
generate_pipe_name() -> String1268     fn generate_pipe_name() -> String {
1269         format!(
1270             r"\\.\pipe\test-ipc-pipe-name.rand{}",
1271             rand::thread_rng().gen::<u64>(),
1272         )
1273     }
1274 
1275     #[test]
read_write_overlapped_message()1276     fn read_write_overlapped_message() {
1277         let pipe_name = generate_pipe_name();
1278 
1279         let mut p1 = create_server_pipe(
1280             &pipe_name,
1281             &FramingMode::Message,
1282             &BlockingMode::Wait,
1283             /* timeout= */ 0,
1284             /* buffer_size= */ 1000,
1285             /* overlapped= */ true,
1286         )
1287         .unwrap();
1288 
1289         let mut p2 = create_client_pipe(
1290             &pipe_name,
1291             &FramingMode::Message,
1292             &BlockingMode::Wait,
1293             /* overlapped= */ true,
1294         )
1295         .unwrap();
1296 
1297         // Safe because `read_overlapped` can be called since overlapped struct is created.
1298         let mut p1_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1299         const MSG: [u8; 6] = [75, 77, 54, 82, 76, 65];
1300         p1.write_overlapped_blocking_message(&MSG.len().to_be_bytes(), &mut p1_overlapped_wrapper)
1301             .unwrap();
1302         p1.write_overlapped_blocking_message(&MSG, &mut p1_overlapped_wrapper)
1303             .unwrap();
1304 
1305         let mut p2_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1306         let exit_event = Event::new().unwrap();
1307         let recv_buffer = p2
1308             .read_overlapped_blocking_message(
1309                 size_of::<usize>(),
1310                 |buf| usize::from_be_bytes(buf.try_into().expect("failed to get array from slice")),
1311                 &mut p2_overlapped_wrapper,
1312                 &exit_event,
1313             )
1314             .unwrap();
1315         assert_eq!(recv_buffer, MSG);
1316     }
1317 }
1318