• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicBool;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering;
16 use std::sync::Arc;
17 
18 use rand::Rng;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 use win_util::fail_if_zero;
23 use win_util::SecurityAttributes;
24 use win_util::SelfRelativeSecurityDescriptor;
25 use winapi::shared::minwindef::DWORD;
26 use winapi::shared::minwindef::FALSE;
27 use winapi::shared::minwindef::TRUE;
28 use winapi::shared::winerror::ERROR_BROKEN_PIPE;
29 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
30 use winapi::shared::winerror::ERROR_IO_PENDING;
31 use winapi::shared::winerror::ERROR_MORE_DATA;
32 use winapi::shared::winerror::ERROR_NO_DATA;
33 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
34 use winapi::um::errhandlingapi::GetLastError;
35 use winapi::um::fileapi::FlushFileBuffers;
36 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
37 use winapi::um::ioapiset::CancelIoEx;
38 use winapi::um::ioapiset::GetOverlappedResult;
39 use winapi::um::minwinbase::OVERLAPPED;
40 use winapi::um::namedpipeapi::ConnectNamedPipe;
41 use winapi::um::namedpipeapi::DisconnectNamedPipe;
42 use winapi::um::namedpipeapi::GetNamedPipeInfo;
43 use winapi::um::namedpipeapi::PeekNamedPipe;
44 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
45 use winapi::um::winbase::CreateNamedPipeA;
46 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
47 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
48 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
49 use winapi::um::winbase::PIPE_NOWAIT;
50 use winapi::um::winbase::PIPE_READMODE_BYTE;
51 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
52 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
53 use winapi::um::winbase::PIPE_TYPE_BYTE;
54 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
55 use winapi::um::winbase::PIPE_WAIT;
56 use winapi::um::winbase::SECURITY_IDENTIFICATION;
57 
58 use super::RawDescriptor;
59 use crate::descriptor::AsRawDescriptor;
60 use crate::descriptor::FromRawDescriptor;
61 use crate::descriptor::IntoRawDescriptor;
62 use crate::descriptor::SafeDescriptor;
63 use crate::Event;
64 use crate::EventToken;
65 use crate::WaitContext;
66 
67 /// The default buffer size for all named pipes in the system. If this size is too small, writers
68 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
69 ///
70 /// The general rule is this should be *at least* as big as the largest message, otherwise
71 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
72 /// crate::windows::StreamChannel, which expects to be able to make a complete write before
73 /// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
74 /// is too small:
75 ///     * The writer can't complete its write and release the lock because the buffer is too small.
76 ///     * The reader can't start reading because the lock is held by the writer, so it can't relieve
77 ///       buffer pressure. Note that for message pipes, the reader couldn't do anything to help
78 ///       anyway, because a message mode pipe should NOT have a partial read (which is what we would
79 ///       need to relieve pressure).
80 ///     * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
81 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
82 
83 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
84 
85 #[remain::sorted]
86 #[derive(Debug, thiserror::Error)]
87 pub enum PipeError {
88     #[error("read zero bytes, but this is not an EOF")]
89     ZeroByteReadNoEof,
90 }
91 
92 /// Represents one end of a named pipe
93 ///
94 /// NOTE: implementations of Read & Write are trait complaint for EOF/broken pipe handling
95 /// (returning a successful zero byte read), but overlapped read/write versions are NOT (they will
96 /// return broken pipe directly due to API limitations; see PipeConnection::read for
97 /// details).
98 #[derive(Serialize, Deserialize, Debug)]
99 pub struct PipeConnection {
100     handle: SafeDescriptor,
101     framing_mode: FramingMode,
102     blocking_mode: BlockingMode,
103 }
104 
105 /// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
106 ///
107 /// Defined as a separate type so that we can mark it as `Send` and `Sync`.
108 pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
109 
110 // SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
111 // pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
112 unsafe impl Send for BoxedOverlapped {}
113 
114 // SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
115 unsafe impl Sync for BoxedOverlapped {}
116 
117 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
118 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
119 pub struct OverlappedWrapper {
120     overlapped: BoxedOverlapped,
121     // This field prevents the event handle from being dropped too early and allows callers to
122     // be notified when a read or write overlapped operation has completed.
123     h_event: Option<Event>,
124     in_use: bool,
125 }
126 
127 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>128     pub fn get_h_event_ref(&self) -> Option<&Event> {
129         self.h_event.as_ref()
130     }
131 
132     /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
133     /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
134     /// returned must not be dropped.
135     ///
136     /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
137     /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
138     /// handle will be signaled when the operation is complete. In other words, you can use
139     /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
140     /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>141     pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
142         let mut overlapped = OVERLAPPED::default();
143         let h_event = if include_event {
144             Some(Event::new()?)
145         } else {
146             None
147         };
148 
149         overlapped.hEvent = if let Some(event) = h_event.as_ref() {
150             event.as_raw_descriptor()
151         } else {
152             0 as RawDescriptor
153         };
154 
155         Ok(OverlappedWrapper {
156             overlapped: BoxedOverlapped(Box::new(overlapped)),
157             h_event,
158             in_use: false,
159         })
160     }
161 }
162 
163 pub trait WriteOverlapped {
164     /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
165     /// If successful, the write operation will complete asynchronously, and
166     /// `write_result()` should be called to get the result.
167     ///
168     /// # Safety
169     /// `buf` and `overlapped_wrapper` will be in use for the duration of
170     /// the overlapped operation. These must not be reused and must live until
171     /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>172     unsafe fn write_overlapped(
173         &mut self,
174         buf: &mut [u8],
175         overlapped_wrapper: &mut OverlappedWrapper,
176     ) -> io::Result<()>;
177 
178     /// Gets the result of the overlapped write operation. Must only be called
179     /// after issuing an overlapped write operation using `write_overlapped`. The
180     /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>181     fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
182 
183     /// Tries to get the result of the overlapped write operation. Must only be
184     /// called once, and only after issuing an overlapped write operation using
185     /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
186     ///
187     /// An error indicates that the operation hasn't completed yet and
188     /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>189     fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
190         -> io::Result<usize>;
191 }
192 
193 pub trait ReadOverlapped {
194     /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
195     /// If successful, the read operation will complete asynchronously, and
196     /// `read_result()` should be called to get the result.
197     ///
198     /// # Safety
199     /// `buf` and `overlapped_wrapper` will be in use for the duration of
200     /// the overlapped operation. These must not be reused and must live until
201     /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>202     unsafe fn read_overlapped(
203         &mut self,
204         buf: &mut [u8],
205         overlapped_wrapper: &mut OverlappedWrapper,
206     ) -> io::Result<()>;
207 
208     /// Gets the result of the overlapped read operation. Must only be called
209     /// once, and only after issuing an overlapped read operation using
210     /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>211     fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
212 
213     /// Tries to get the result of the overlapped read operation. Must only be called
214     /// after issuing an overlapped read operation using `read_overlapped`. The
215     /// same `overlapped_wrapper` must be provided.
216     ///
217     /// An error indicates that the operation hasn't completed yet and
218     /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>219     fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
220 }
221 
222 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
223 pub enum FramingMode {
224     Byte,
225     Message,
226 }
227 
228 impl FramingMode {
to_readmode(self) -> DWORD229     fn to_readmode(self) -> DWORD {
230         match self {
231             FramingMode::Message => PIPE_READMODE_MESSAGE,
232             FramingMode::Byte => PIPE_READMODE_BYTE,
233         }
234     }
235 
to_pipetype(self) -> DWORD236     fn to_pipetype(self) -> DWORD {
237         match self {
238             FramingMode::Message => PIPE_TYPE_MESSAGE,
239             FramingMode::Byte => PIPE_TYPE_BYTE,
240         }
241     }
242 }
243 
244 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
245 pub enum BlockingMode {
246     /// Calls to read() block until data is received
247     Wait,
248     /// Calls to read() return immediately even if there is nothing read with error code 232
249     /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
250     ///
251     /// NOTE: This mode is discouraged by the Windows API documentation.
252     NoWait,
253 }
254 
255 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD256     fn from(blocking_mode: &BlockingMode) -> DWORD {
257         match blocking_mode {
258             BlockingMode::Wait => PIPE_WAIT,
259             BlockingMode::NoWait => PIPE_NOWAIT,
260         }
261     }
262 }
263 
264 /// Sets the handle state for a named pipe in a rust friendly way.
265 /// SAFETY:
266 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>267 unsafe fn set_named_pipe_handle_state(
268     pipe_handle: RawDescriptor,
269     client_mode: &mut DWORD,
270 ) -> Result<()> {
271     // Safe when the pipe handle is open. Safety also requires checking the return value, which we
272     // do below.
273     let success_flag = SetNamedPipeHandleState(
274         /* hNamedPipe= */ pipe_handle,
275         /* lpMode= */ client_mode,
276         /* lpMaxCollectionCount= */ ptr::null_mut(),
277         /* lpCollectDataTimeout= */ ptr::null_mut(),
278     );
279     if success_flag == 0 {
280         Err(io::Error::last_os_error())
281     } else {
282         Ok(())
283     }
284 }
285 
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>286 pub fn pair(
287     framing_mode: &FramingMode,
288     blocking_mode: &BlockingMode,
289     timeout: u64,
290 ) -> Result<(PipeConnection, PipeConnection)> {
291     pair_with_buffer_size(
292         framing_mode,
293         blocking_mode,
294         timeout,
295         DEFAULT_BUFFER_SIZE,
296         false,
297     )
298 }
299 
300 /// Creates a pair of handles connected to either end of a duplex named pipe.
301 ///
302 /// The pipe created will have a semi-random name and a default set of security options that
303 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
304 /// remote clients, allow only a single server instance, and prevent impersonation by the server
305 /// end of the pipe.
306 ///
307 /// # Arguments
308 ///
309 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
310 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
311 ///   fewer bytes than were sent in a message from the other end of the pipe.
312 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
313 ///   return immediately if there is nothing available (NoWait).
314 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds. Setting this to
315 ///   zero will create sockets with the system default timeout.
316 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
317 ///   buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
318 ///   writes that don't fit in the buffer.
319 /// # Return value
320 ///
321 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
322 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>323 pub fn pair_with_buffer_size(
324     framing_mode: &FramingMode,
325     blocking_mode: &BlockingMode,
326     timeout: u64,
327     buffer_size: usize,
328     overlapped: bool,
329 ) -> Result<(PipeConnection, PipeConnection)> {
330     // Give the pipe a unique name to avoid accidental collisions
331     let pipe_name = format!(
332         r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
333         process::id(),
334         NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
335         rand::thread_rng().gen::<u32>(),
336     );
337 
338     let server_end = create_server_pipe(
339         &pipe_name,
340         framing_mode,
341         blocking_mode,
342         timeout,
343         buffer_size,
344         overlapped,
345     )?;
346 
347     // Open the named pipe we just created as the client
348     let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
349 
350     // Accept the client's connection
351     // Not sure if this is strictly needed but I'm doing it just in case.
352     // We expect at this point that the client will already be connected,
353     // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
354     // It's also OK if we get a return code of success.
355     server_end.wait_for_client_connection()?;
356 
357     Ok((server_end, client_end))
358 }
359 
360 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
361 /// settings.
362 ///
363 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
364 ///
365 /// # Arguments
366 ///
367 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
368 ///   `\\.\pipe\<some-name>`.
369 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
370 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
371 ///   fewer bytes than were sent in a message from the other end of the pipe.
372 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
373 ///   return immediately if there is nothing available (NoWait).
374 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds. Setting this to
375 ///   zero will create sockets with the system default timeout.
376 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
377 ///   buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
378 ///   writes that don't fit in the buffer.
379 /// * `overlapped`    - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>380 pub fn create_server_pipe(
381     pipe_name: &str,
382     framing_mode: &FramingMode,
383     blocking_mode: &BlockingMode,
384     timeout: u64,
385     buffer_size: usize,
386     overlapped: bool,
387 ) -> Result<PipeConnection> {
388     let c_pipe_name = CString::new(pipe_name).unwrap();
389 
390     let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
391     if overlapped {
392         open_mode_flags |= FILE_FLAG_OVERLAPPED
393     }
394 
395     // This sets flags so there will be an error if >1 instance (server end)
396     // of this pipe name is opened because we expect exactly one.
397     // SAFETY:
398     // Safe because security attributes are valid, pipe_name is valid C string,
399     // and we're checking the return code
400     let server_handle = unsafe {
401         CreateNamedPipeA(
402             c_pipe_name.as_ptr(),
403             /* dwOpenMode= */
404             open_mode_flags,
405             /* dwPipeMode= */
406             framing_mode.to_pipetype()
407                 | framing_mode.to_readmode()
408                 | DWORD::from(blocking_mode)
409                 | PIPE_REJECT_REMOTE_CLIENTS,
410             /* nMaxInstances= */ 1,
411             /* nOutBufferSize= */ buffer_size as DWORD,
412             /* nInBufferSize= */ buffer_size as DWORD,
413             /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
414             /* lpSecurityAttributes= */
415             SecurityAttributes::new_with_security_descriptor(
416                 SelfRelativeSecurityDescriptor::get_singleton(),
417                 /* inherit= */ true,
418             )
419             .as_mut(),
420         )
421     };
422 
423     if server_handle == INVALID_HANDLE_VALUE {
424         Err(io::Error::last_os_error())
425     } else {
426         // SAFETY: Safe because server_handle is valid.
427         unsafe {
428             Ok(PipeConnection {
429                 handle: SafeDescriptor::from_raw_descriptor(server_handle),
430                 framing_mode: *framing_mode,
431                 blocking_mode: *blocking_mode,
432             })
433         }
434     }
435 }
436 
437 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
438 /// settings.
439 ///
440 /// The pipe will be set to prevent impersonation of the client by the server process.
441 ///
442 /// # Arguments
443 ///
444 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
445 ///   `\\.\pipe\<some-name>`.
446 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
447 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
448 ///   fewer bytes than were sent in a message from the other end of the pipe.
449 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
450 ///   return immediately if there is nothing available (NoWait).
451 /// * `overlapped`    - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>452 pub fn create_client_pipe(
453     pipe_name: &str,
454     framing_mode: &FramingMode,
455     blocking_mode: &BlockingMode,
456     overlapped: bool,
457 ) -> Result<PipeConnection> {
458     let client_handle = OpenOptions::new()
459         .read(true)
460         .write(true)
461         .create(true)
462         .security_qos_flags(SECURITY_IDENTIFICATION)
463         .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
464         .open(pipe_name)?
465         .into_raw_descriptor();
466 
467     let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
468 
469     // SAFETY:
470     // Safe because client_handle's open() call did not return an error.
471     unsafe {
472         set_named_pipe_handle_state(client_handle, &mut client_mode)?;
473     }
474 
475     Ok(PipeConnection {
476         // SAFETY:
477         // Safe because client_handle is valid
478         handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
479         framing_mode: *framing_mode,
480         blocking_mode: *blocking_mode,
481     })
482 }
483 
484 // This is used to mark types which can be appropriately sent through the
485 // generic helper functions write_to_pipe and read_from_pipe.
486 pub trait PipeSendable {
487     // Default values used to fill in new empty indexes when resizing a buffer to
488     // a larger size.
default() -> Self489     fn default() -> Self;
490 }
491 impl PipeSendable for u8 {
default() -> Self492     fn default() -> Self {
493         0
494     }
495 }
496 impl PipeSendable for RawDescriptor {
default() -> Self497     fn default() -> Self {
498         ptr::null_mut()
499     }
500 }
501 
502 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>503     pub fn try_clone(&self) -> Result<PipeConnection> {
504         let copy_handle = self.handle.try_clone()?;
505         Ok(PipeConnection {
506             handle: copy_handle,
507             framing_mode: self.framing_mode,
508             blocking_mode: self.blocking_mode,
509         })
510     }
511 
512     /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
513     /// blocking modes.
514     ///
515     /// # Safety
516     /// 1. rd is valid and ownership is transferred to this function when it is called.
517     ///
518     /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
519     /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection520     pub unsafe fn from_raw_descriptor(
521         rd: RawDescriptor,
522         framing_mode: FramingMode,
523         blocking_mode: BlockingMode,
524     ) -> PipeConnection {
525         PipeConnection {
526             handle: SafeDescriptor::from_raw_descriptor(rd),
527             framing_mode,
528             blocking_mode,
529         }
530     }
531 
532     /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
533     /// Returns the number of bytes (not values) read.
534     ///
535     /// # Safety
536     ///
537     /// This is safe only when the following conditions hold:
538     ///     1. The data on the other end of the pipe is a valid binary representation of data for
539     ///     type T, and
540     ///     2. The number of bytes read is a multiple of the size of T; this must be checked by
541     ///     the caller.
542     /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
543     /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>544     pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
545         match PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None) {
546             // Windows allows for zero byte writes on one end of a pipe to be read by the other as
547             // zero byte reads. These zero byte reads DO NOT signify EOF, so from the perspective
548             // of std::io::Read, they cannot be reported as Ok(0). We translate them to errors.
549             //
550             // Within CrosVM, this behavior is not used, but it has been implemented to avoid UB
551             // either in the future, or when talking to non CrosVM named pipes. If we need to
552             // actually use/understand this error from other parts of KiwiVM (e.g. PipeConnection
553             // consumers), we could use ErrorKind::Interrupted (which as of 24/11/26 is not used by
554             // Rust for other purposes).
555             Ok(len) if len == 0 && !buf.is_empty() => Err(io::Error::new(
556                 io::ErrorKind::Other,
557                 PipeError::ZeroByteReadNoEof,
558             )),
559 
560             // Read at least 1 byte, or 0 bytes if a zero byte buffer was provided.
561             Ok(len) => Ok(len),
562 
563             // Treat a closed pipe like an EOF, because that is consistent with the Read trait.
564             //
565             // NOTE: this is explicitly NOT done for overlapped operations for a few reasons:
566             // 1. Overlapped operations do not follow the Read trait, so there is no strong reason
567             //    *to* do it.
568             // 2. Ok(0) also means "overlapped operation started successfully." This is a real
569             //    problem because the general pattern is to start an overlapped operation and then
570             //    wait for it. So if we did that and the Ok(0) meant the pipe is closed, we would
571             //    enter an infinite wait. (The kernel already told us when we started the operation
572             //    that the pipe was closed. It won't tell us again.)
573             Err(e) if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) => Ok(0),
574 
575             Err(e) => Err(e),
576         }
577     }
578 
579     /// Similar to `PipeConnection::read` except it also allows:
580     ///     1. The same end of the named pipe to read and write at the same time in different
581     ///        threads.
582     ///     2. Asynchronous read and write (read and write won't block).
583     ///
584     /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
585     /// (can be created with `OverlappedWrapper::new`) will be passed into
586     /// `ReadFile`. That event will be triggered when the read operation is complete.
587     ///
588     /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
589     /// also help with waiting until the read operation is complete.
590     ///
591     /// # Safety
592     ///
593     /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
594     /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>595     pub unsafe fn read_overlapped<T: PipeSendable>(
596         &mut self,
597         buf: &mut [T],
598         overlapped_wrapper: &mut OverlappedWrapper,
599     ) -> Result<()> {
600         if overlapped_wrapper.in_use {
601             return Err(std::io::Error::new(
602                 std::io::ErrorKind::InvalidInput,
603                 "Overlapped struct already in use",
604             ));
605         }
606         overlapped_wrapper.in_use = true;
607 
608         PipeConnection::read_internal(
609             &self.handle,
610             self.blocking_mode,
611             buf,
612             Some(&mut overlapped_wrapper.overlapped.0),
613         )?;
614         Ok(())
615     }
616 
617     /// Helper for `read_overlapped` and `read`
618     ///
619     /// # Safety
620     /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>621     unsafe fn read_internal<T: PipeSendable>(
622         handle: &SafeDescriptor,
623         blocking_mode: BlockingMode,
624         buf: &mut [T],
625         overlapped: Option<&mut OVERLAPPED>,
626     ) -> Result<usize> {
627         let res = crate::windows::read_file(
628             handle,
629             buf.as_mut_ptr() as *mut u8,
630             mem::size_of_val(buf),
631             overlapped,
632         );
633         match res {
634             Ok(bytes_read) => Ok(bytes_read),
635             // For message mode pipes, if the buffer is too small for the entire message, the kernel
636             // will return ERROR_MORE_DATA. This isn't strictly an "error" because the operation
637             // succeeds. Making it an error also means it's hard to handle this cleanly from the
638             // perspective of an io::Read consumer. So we discard the non-error, and return the
639             // successful result of filling the entire buffer.
640             Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(buf.len()),
641             Err(e)
642                 if blocking_mode == BlockingMode::NoWait
643                     && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
644             {
645                 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
646                 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
647                 // correct. For further details see:
648                 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
649                 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
650                 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
651             }
652             Err(e) => Err(e),
653         }
654     }
655 
656     /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
657     /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>658     pub fn read_overlapped_blocking<T: PipeSendable>(
659         &mut self,
660         buf: &mut [T],
661         overlapped_wrapper: &mut OverlappedWrapper,
662         exit_event: &Event,
663     ) -> Result<()> {
664         // SAFETY:
665         // Safe because we are providing a valid buffer slice and also providing a valid
666         // overlapped struct.
667         match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
668             Err(e) => Err(e),
669             Ok(()) => Ok(()),
670         }?;
671 
672         #[derive(EventToken)]
673         enum Token {
674             ReadOverlapped,
675             Exit,
676         }
677 
678         let wait_ctx = WaitContext::build_with(&[
679             (
680                 overlapped_wrapper.get_h_event_ref().unwrap(),
681                 Token::ReadOverlapped,
682             ),
683             (exit_event, Token::Exit),
684         ])?;
685 
686         let events = wait_ctx.wait()?;
687         for event in events {
688             match event.token {
689                 Token::ReadOverlapped => {
690                     let size_read_in_bytes =
691                         self.get_overlapped_result(overlapped_wrapper)? as usize;
692 
693                     // If this error shows, most likely the overlapped named pipe was set up
694                     // incorrectly.
695                     if size_read_in_bytes != buf.len() {
696                         return Err(std::io::Error::new(
697                             std::io::ErrorKind::UnexpectedEof,
698                             "Short read",
699                         ));
700                     }
701                 }
702                 Token::Exit => {
703                     return Err(std::io::Error::new(
704                         std::io::ErrorKind::Interrupted,
705                         "IO canceled on exit request",
706                     ));
707                 }
708             }
709         }
710 
711         Ok(())
712     }
713 
714     /// Gets the size in bytes of data in the pipe.
715     ///
716     /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
717     /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>718     pub fn get_available_byte_count(&self) -> io::Result<u32> {
719         let mut total_bytes_avail: DWORD = 0;
720 
721         // SAFETY:
722         // Safe because the underlying pipe handle is guaranteed to be open, and the output values
723         // live at valid memory locations.
724         fail_if_zero!(unsafe {
725             PeekNamedPipe(
726                 self.as_raw_descriptor(),
727                 ptr::null_mut(),
728                 0,
729                 ptr::null_mut(),
730                 &mut total_bytes_avail,
731                 ptr::null_mut(),
732             )
733         });
734 
735         Ok(total_bytes_avail)
736     }
737 
738     /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
739     /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>740     pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
741         // SAFETY: overlapped is None so this is safe.
742         unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
743     }
744 
745     /// Similar to `PipeConnection::write` except it also allows:
746     ///     1. The same end of the named pipe to read and write at the same time in different
747     ///        threads.
748     ///     2. Asynchronous read and write (read and write won't block).
749     ///
750     /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
751     /// (can be created with `OverlappedWrapper::new`) will be passed into
752     /// `WriteFile`. That event will be triggered when the write operation is complete.
753     ///
754     /// In order to get how many bytes were written, call `get_overlapped_result`. That function
755     /// will also help with waiting until the write operation is complete. The pipe must be
756     /// opened in overlapped otherwise there may be unexpected behavior.
757     ///
758     /// # Safety
759     /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>760     pub unsafe fn write_overlapped<T: PipeSendable>(
761         &mut self,
762         buf: &[T],
763         overlapped_wrapper: &mut OverlappedWrapper,
764     ) -> Result<()> {
765         if overlapped_wrapper.in_use {
766             return Err(std::io::Error::new(
767                 std::io::ErrorKind::InvalidInput,
768                 "Overlapped struct already in use",
769             ));
770         }
771         overlapped_wrapper.in_use = true;
772 
773         PipeConnection::write_internal(
774             &self.handle,
775             buf,
776             Some(&mut overlapped_wrapper.overlapped.0),
777         )?;
778         Ok(())
779     }
780 
781     /// Helper for `write_overlapped` and `write`.
782     ///
783     /// # Safety
784     /// * Safe if overlapped is None.
785     /// * Safe if overlapped is Some and:
786     ///   + buf lives until the overlapped operation is complete.
787     ///   + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>788     unsafe fn write_internal<T: PipeSendable>(
789         handle: &SafeDescriptor,
790         buf: &[T],
791         overlapped: Option<&mut OVERLAPPED>,
792     ) -> Result<usize> {
793         // SAFETY:
794         // Safe because buf points to memory valid until the write completes and we pass a valid
795         // length for that memory.
796         unsafe {
797             crate::windows::write_file(
798                 handle,
799                 buf.as_ptr() as *const u8,
800                 mem::size_of_val(buf),
801                 overlapped,
802             )
803         }
804     }
805 
806     /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>807     pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
808         let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
809         self.blocking_mode = *blocking_mode;
810 
811         // SAFETY:
812         // Safe because the pipe has not been closed (it is managed by this object).
813         unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
814     }
815 
816     /// For a server named pipe, waits for a client to connect (blocking).
wait_for_client_connection(&self) -> Result<()>817     pub fn wait_for_client_connection(&self) -> Result<()> {
818         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
819         self.wait_for_client_connection_internal(
820             &mut overlapped_wrapper,
821             /* should_block = */ true,
822         )
823     }
824 
825     /// Interruptable blocking wait for a client to connect.
wait_for_client_connection_overlapped_blocking( &mut self, exit_event: &Event, ) -> Result<()>826     pub fn wait_for_client_connection_overlapped_blocking(
827         &mut self,
828         exit_event: &Event,
829     ) -> Result<()> {
830         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
831         self.wait_for_client_connection_internal(
832             &mut overlapped_wrapper,
833             /* should_block = */ false,
834         )?;
835 
836         #[derive(EventToken)]
837         enum Token {
838             Connected,
839             Exit,
840         }
841 
842         let wait_ctx = WaitContext::build_with(&[
843             (
844                 overlapped_wrapper.get_h_event_ref().unwrap(),
845                 Token::Connected,
846             ),
847             (exit_event, Token::Exit),
848         ])?;
849 
850         let events = wait_ctx.wait()?;
851         if let Some(event) = events.into_iter().next() {
852             return match event.token {
853                 Token::Connected => Ok(()),
854                 Token::Exit => {
855                     // We must cancel IO here because it is unsafe to free the overlapped wrapper
856                     // while the IO operation is active.
857                     self.cancel_io()?;
858 
859                     Err(std::io::Error::new(
860                         std::io::ErrorKind::Interrupted,
861                         "IO canceled on exit request",
862                     ))
863                 }
864             };
865         }
866         unreachable!("wait cannot return Ok with zero events");
867     }
868 
869     /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
870     /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>871     pub fn wait_for_client_connection_overlapped(
872         &self,
873         overlapped_wrapper: &mut OverlappedWrapper,
874     ) -> Result<()> {
875         self.wait_for_client_connection_internal(
876             overlapped_wrapper,
877             /* should_block = */ false,
878         )
879     }
880 
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>881     fn wait_for_client_connection_internal(
882         &self,
883         overlapped_wrapper: &mut OverlappedWrapper,
884         should_block: bool,
885     ) -> Result<()> {
886         // SAFETY:
887         // Safe because the handle is valid and we're checking the return
888         // code according to the documentation
889         //
890         // TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
891         //      overlapped_wrapper must live until the overlapped operation is complete; however,
892         //      if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
893         //      could be freed while the operation is still running.
894         unsafe {
895             let success_flag = ConnectNamedPipe(
896                 self.as_raw_descriptor(),
897                 // Note: The overlapped structure is only used if the pipe was opened in
898                 // OVERLAPPED mode, but is necessary in that case.
899                 &mut *overlapped_wrapper.overlapped.0,
900             );
901             if success_flag == 0 {
902                 return match GetLastError() {
903                     ERROR_PIPE_CONNECTED => {
904                         if !should_block {
905                             // If async, make sure the event is signalled to indicate the client
906                             // is ready.
907                             overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
908                         }
909 
910                         Ok(())
911                     }
912                     ERROR_IO_PENDING => {
913                         if should_block {
914                             overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
915                         }
916                         Ok(())
917                     }
918                     err => Err(io::Error::from_raw_os_error(err as i32)),
919                 };
920             }
921         }
922         Ok(())
923     }
924 
925     /// Used for overlapped read and write operations.
926     ///
927     /// This will block until the ReadFile or WriteFile operation that also took in
928     /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
929     /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
930     /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>931     pub fn get_overlapped_result(
932         &mut self,
933         overlapped_wrapper: &mut OverlappedWrapper,
934     ) -> io::Result<u32> {
935         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
936         overlapped_wrapper.in_use = false;
937         res
938     }
939 
940     /// Used for overlapped read and write operations.
941     ///
942     /// This will return immediately, regardless of the completion status of the
943     /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
944     /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
945     /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
946     /// that were read or written, if completed.  If the operation hasn't
947     /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
948     /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>949     pub fn try_get_overlapped_result(
950         &mut self,
951         overlapped_wrapper: &mut OverlappedWrapper,
952     ) -> io::Result<u32> {
953         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
954         match res {
955             Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
956                 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
957             }
958             _ => {
959                 overlapped_wrapper.in_use = false;
960                 res
961             }
962         }
963     }
964 
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>965     fn get_overlapped_result_internal(
966         &mut self,
967         overlapped_wrapper: &mut OverlappedWrapper,
968         wait: bool,
969     ) -> io::Result<u32> {
970         if !overlapped_wrapper.in_use {
971             return Err(std::io::Error::new(
972                 std::io::ErrorKind::InvalidInput,
973                 "Overlapped struct is not in use",
974             ));
975         }
976 
977         let mut size_transferred = 0;
978         // SAFETY:
979         // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
980         // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
981         if (unsafe {
982             GetOverlappedResult(
983                 self.handle.as_raw_descriptor(),
984                 &mut *overlapped_wrapper.overlapped.0,
985                 &mut size_transferred,
986                 if wait { TRUE } else { FALSE },
987             )
988         }) != 0
989         {
990             Ok(size_transferred)
991         } else {
992             let e = io::Error::last_os_error();
993             match e.raw_os_error() {
994                 // More data => partial read of a message on a message pipe. This isn't really an
995                 // error (see PipeConnection::read_internal) since we filled the provided buffer.
996                 Some(error_code) if error_code as u32 == ERROR_MORE_DATA => Ok(size_transferred),
997                 _ => Err(e),
998             }
999         }
1000     }
1001 
1002     /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
1003     /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>1004     pub fn cancel_io(&mut self) -> Result<()> {
1005         fail_if_zero!(
1006             // SAFETY: descriptor is valid and the return value is checked.
1007             unsafe {
1008                 CancelIoEx(
1009                     self.handle.as_raw_descriptor(),
1010                     /* lpOverlapped= */ std::ptr::null_mut(),
1011                 )
1012             }
1013         );
1014 
1015         Ok(())
1016     }
1017 
1018     /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode1019     pub fn get_framing_mode(&self) -> FramingMode {
1020         self.framing_mode
1021     }
1022 
1023     /// Returns metadata about the connected NamedPipe.
get_info(&self) -> Result<NamedPipeInfo>1024     pub fn get_info(&self) -> Result<NamedPipeInfo> {
1025         let mut flags: u32 = 0;
1026         let mut incoming_buffer_size: u32 = 0;
1027         let mut outgoing_buffer_size: u32 = 0;
1028         let mut max_instances: u32 = 0;
1029         // SAFETY: all pointers are valid
1030         fail_if_zero!(unsafe {
1031             GetNamedPipeInfo(
1032                 self.as_raw_descriptor(),
1033                 &mut flags,
1034                 &mut outgoing_buffer_size,
1035                 &mut incoming_buffer_size,
1036                 &mut max_instances,
1037             )
1038         });
1039 
1040         Ok(NamedPipeInfo {
1041             outgoing_buffer_size,
1042             incoming_buffer_size,
1043             max_instances,
1044             flags,
1045         })
1046     }
1047 
1048     /// For a server pipe, flush the pipe contents. This will
1049     /// block until the pipe is cleared by the client. Only
1050     /// call this if you are sure the client is reading the
1051     /// data!
flush_data_blocking(&self) -> Result<()>1052     pub fn flush_data_blocking(&self) -> Result<()> {
1053         // SAFETY:
1054         // Safe because the only buffers interacted with are
1055         // outside of Rust memory
1056         fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1057         Ok(())
1058     }
1059 
1060     /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>1061     pub fn disconnect_clients(&self) -> Result<()> {
1062         // SAFETY:
1063         // Safe because we own the handle passed in and know it will remain valid for the duration
1064         // of the call. Discarded buffers are not managed by rust.
1065         fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1066         Ok(())
1067     }
1068 }
1069 
1070 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1071     fn as_raw_descriptor(&self) -> RawDescriptor {
1072         self.handle.as_raw_descriptor()
1073     }
1074 }
1075 
1076 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1077     fn into_raw_descriptor(self) -> RawDescriptor {
1078         self.handle.into_raw_descriptor()
1079     }
1080 }
1081 
1082 // SAFETY: Send safety is ensured by inner fields.
1083 unsafe impl Send for PipeConnection {}
1084 // SAFETY: Sync safety is ensured by inner fields.
1085 unsafe impl Sync for PipeConnection {}
1086 
1087 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1088     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1089         // SAFETY:
1090         // This is safe because PipeConnection::read is always safe for u8
1091         unsafe { PipeConnection::read(self, buf) }
1092     }
1093 }
1094 
1095 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1096     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1097         PipeConnection::write(self, buf)
1098     }
1099 
flush(&mut self) -> io::Result<()>1100     fn flush(&mut self) -> io::Result<()> {
1101         Ok(())
1102     }
1103 }
1104 
1105 /// A simple data struct representing
1106 /// metadata about a NamedPipe.
1107 #[derive(Debug, PartialEq, Eq)]
1108 pub struct NamedPipeInfo {
1109     pub outgoing_buffer_size: u32,
1110     pub incoming_buffer_size: u32,
1111     pub max_instances: u32,
1112     pub flags: u32,
1113 }
1114 
1115 /// This is a wrapper around PipeConnection. This allows a read and a write operations
1116 /// to run in parallel but not multiple reads or writes in parallel.
1117 ///
1118 /// Reason: The message from/to service are two-parts - a fixed size header that
1119 /// contains the size of the actual message. By allowing only one write at a time
1120 /// we ensure that the variable size message is written/read right after writing/reading
1121 /// fixed size header. For example it avoid sending or receiving in messages in order like
1122 /// H1, H2, M1, M2
1123 ///   - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1124 ///     sent by another event loop.
1125 ///
1126 /// Do not expose direct access to reader or writer pipes.
1127 ///
1128 /// The struct is clone-able so that different event loops can talk to the other end.
1129 #[derive(Clone)]
1130 pub struct MultiPartMessagePipe {
1131     // Lock protected pipe to receive messages.
1132     reader: Arc<Mutex<PipeConnection>>,
1133     // Lock protected pipe to send messages.
1134     writer: Arc<Mutex<PipeConnection>>,
1135     // Whether this end is created as server or client. The variable helps to
1136     // decide if something meanigful should be done when `wait_for_connection` is called.
1137     is_server: bool,
1138     // Always true if pipe is created as client.
1139     // Defaults to false on server. Updated to true on calling `wait_for_connection`
1140     // after a client connects.
1141     is_connected: Arc<AtomicBool>,
1142 }
1143 
1144 impl MultiPartMessagePipe {
create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self>1145     fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1146         Ok(Self {
1147             reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1148             writer: Arc::new(Mutex::new(pipe)),
1149             is_server,
1150             is_connected: Arc::new(AtomicBool::new(false)),
1151         })
1152     }
1153 
1154     /// Create server side of MutiPartMessagePipe.
1155     /// # Safety
1156     /// `pipe` must be a server named pipe.
1157     #[deny(unsafe_op_in_unsafe_fn)]
create_from_server_pipe(pipe: PipeConnection) -> Result<Self>1158     pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1159         Self::create_from_pipe(pipe, true)
1160     }
1161 
1162     /// Create client side of MutiPartMessagePipe.
create_as_client(pipe_name: &str) -> Result<Self>1163     pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1164         let pipe = create_client_pipe(
1165             &format!(r"\\.\pipe\{}", pipe_name),
1166             &FramingMode::Message,
1167             &BlockingMode::Wait,
1168             /* overlapped= */ true,
1169         )?;
1170         Self::create_from_pipe(pipe, false)
1171     }
1172 
1173     /// Create server side of MutiPartMessagePipe.
create_as_server(pipe_name: &str) -> Result<Self>1174     pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1175         let pipe = create_server_pipe(
1176             &format!(r"\\.\pipe\{}", pipe_name,),
1177             &FramingMode::Message,
1178             &BlockingMode::Wait,
1179             0,
1180             1024 * 1024,
1181             true,
1182         )?;
1183         // SAFETY: `pipe` is a server named pipe.
1184         unsafe { Self::create_from_server_pipe(pipe) }
1185     }
1186 
1187     /// If the struct is created as a server then waits for client connection to arrive.
1188     /// It only waits on reader as reader and writer are clones.
wait_for_connection(&self) -> Result<()>1189     pub fn wait_for_connection(&self) -> Result<()> {
1190         if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1191             self.reader.lock().wait_for_client_connection()?;
1192             self.is_connected.store(true, Ordering::Relaxed);
1193         }
1194         Ok(())
1195     }
1196 
write_overlapped_blocking_message_internal<T: PipeSendable>( pipe: &mut PipeConnection, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1197     fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1198         pipe: &mut PipeConnection,
1199         buf: &[T],
1200         overlapped_wrapper: &mut OverlappedWrapper,
1201     ) -> Result<()> {
1202         // Safety:
1203         // `buf` and `overlapped_wrapper` will be in use for the duration of
1204         // the overlapped operation. These must not be reused and must live until
1205         // after `get_overlapped_result()` has been called which is done right
1206         // after this call.
1207         unsafe {
1208             pipe.write_overlapped(buf, overlapped_wrapper)?;
1209         }
1210 
1211         let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1212 
1213         if size_written_in_bytes as usize != buf.len() {
1214             return Err(std::io::Error::new(
1215                 std::io::ErrorKind::UnexpectedEof,
1216                 format!(
1217                     "Short write expected:{} found:{}",
1218                     size_written_in_bytes,
1219                     buf.len(),
1220                 ),
1221             ));
1222         }
1223         Ok(())
1224     }
1225     /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
write_overlapped_blocking_message<T: PipeSendable>( &self, header: &[T], message: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1226     pub fn write_overlapped_blocking_message<T: PipeSendable>(
1227         &self,
1228         header: &[T],
1229         message: &[T],
1230         overlapped_wrapper: &mut OverlappedWrapper,
1231     ) -> Result<()> {
1232         let mut writer = self.writer.lock();
1233         Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1234         Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1235     }
1236 
1237     /// Reads a variable size message and returns the message on success.
1238     /// The size of the message is expected to proceed the message in
1239     /// the form of `header_size` message.
1240     ///
1241     /// `parse_message_size` lets caller parse the header to extract
1242     /// message size.
1243     ///
1244     /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>1245     pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1246         &self,
1247         header_size: usize,
1248         parse_message_size: F,
1249         overlapped_wrapper: &mut OverlappedWrapper,
1250         exit_event: &Event,
1251     ) -> Result<Vec<u8>> {
1252         let mut pipe = self.reader.lock();
1253         let mut header = vec![0; header_size];
1254         header.resize_with(header_size, Default::default);
1255         pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1256         let message_size = parse_message_size(&header);
1257         if message_size == 0 {
1258             return Ok(vec![]);
1259         }
1260         let mut buf = vec![];
1261         buf.resize_with(message_size, Default::default);
1262         pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1263         Ok(buf)
1264     }
1265 
1266     /// Returns the inner named pipe if the current struct is the sole owner of the underlying
1267     /// named pipe.
1268     ///
1269     /// Otherwise, [`None`] is returned and the struct is dropped.
1270     ///
1271     /// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1272     /// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1273     /// possible that all of them will result in [`None`]. This is Due to Rust version
1274     /// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1275     /// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1276     ///
1277     /// If the underlying named pipe is a server named pipe, this method allows the caller to
1278     /// terminate the connection by first flushing the named pipe then disconnecting the clients
1279     /// idiomatically per
1280     /// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
into_inner_pipe(self) -> Option<PipeConnection>1281     pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1282         let piper = Arc::clone(&self.reader);
1283         drop(self);
1284         Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1285     }
1286 }
1287 
1288 impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1289     type Error = std::io::Error;
try_from(pipe: PipeConnection) -> Result<Self>1290     fn try_from(pipe: PipeConnection) -> Result<Self> {
1291         Self::create_from_pipe(pipe, false)
1292     }
1293 }
1294 
1295 #[cfg(test)]
1296 mod tests {
1297     use std::mem::size_of;
1298     use std::thread::JoinHandle;
1299     use std::time::Duration;
1300 
1301     use super::*;
1302 
1303     #[test]
duplex_pipe_stream()1304     fn duplex_pipe_stream() {
1305         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1306 
1307         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1308         // SAFETY: trivially safe with pipe created and return value checked.
1309         unsafe {
1310             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1311                 println!("{}", dir);
1312 
1313                 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1314 
1315                 // Smaller than what we sent so we get multiple chunks
1316                 let mut recv_buffer: [u8; 4] = [0; 4];
1317 
1318                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1319                 assert_eq!(size, 4);
1320                 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1321 
1322                 size = receiver.read(&mut recv_buffer).unwrap();
1323                 assert_eq!(size, 2);
1324                 assert_eq!(recv_buffer[0..2], [76, 65]);
1325             }
1326         }
1327     }
1328 
1329     #[test]
available_byte_count_byte_mode()1330     fn available_byte_count_byte_mode() {
1331         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1332         p1.write(&[1, 23, 45]).unwrap();
1333         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1334 
1335         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1336         // yield the same value.
1337         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1338     }
1339 
1340     #[test]
available_byte_count_message_mode()1341     fn available_byte_count_message_mode() {
1342         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1343         p1.write(&[1, 23, 45]).unwrap();
1344         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1345 
1346         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1347         // yield the same value.
1348         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1349     }
1350 
1351     #[test]
available_byte_count_message_mode_multiple_messages()1352     fn available_byte_count_message_mode_multiple_messages() {
1353         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1354         p1.write(&[1, 2, 3]).unwrap();
1355         p1.write(&[4, 5]).unwrap();
1356         assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1357     }
1358 
1359     #[test]
duplex_pipe_message()1360     fn duplex_pipe_message() {
1361         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1362 
1363         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1364         // SAFETY: trivially safe with pipe created and return value checked.
1365         unsafe {
1366             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1367                 println!("{}", dir);
1368 
1369                 // Send 2 messages so that we can check that message framing works
1370                 sender.write(&[1, 23, 45]).unwrap();
1371                 sender.write(&[67, 89, 10]).unwrap();
1372 
1373                 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1374 
1375                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1376                 assert_eq!(size, 3);
1377                 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1378 
1379                 size = receiver.read(&mut recv_buffer).unwrap();
1380                 assert_eq!(size, 3);
1381                 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1382             }
1383         }
1384     }
1385 
1386     #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1387     fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1388         let mut recv_buffer: [u8; 1] = [0; 1];
1389 
1390         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1391         // SAFETY: trivially safe with PipeConnection created and return value checked.
1392         unsafe {
1393             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1394                 println!("{}", dir);
1395                 sender.write(&[1]).unwrap();
1396                 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1397                 assert_eq!(
1398                     receiver.read(&mut recv_buffer).unwrap_err().kind(),
1399                     std::io::ErrorKind::WouldBlock
1400                 );
1401             }
1402         }
1403     }
1404 
1405     #[test]
duplex_nowait()1406     fn duplex_nowait() {
1407         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1408         duplex_nowait_helper(&p1, &p2);
1409     }
1410 
1411     #[test]
duplex_nowait_set_after_creation()1412     fn duplex_nowait_set_after_creation() {
1413         // Tests non blocking setting after pipe creation
1414         let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1415         p1.set_blocking(&BlockingMode::NoWait)
1416             .expect("Failed to set blocking mode on pipe p1");
1417         p2.set_blocking(&BlockingMode::NoWait)
1418             .expect("Failed to set blocking mode on pipe p2");
1419         duplex_nowait_helper(&p1, &p2);
1420     }
1421 
1422     #[test]
duplex_overlapped()1423     fn duplex_overlapped() {
1424         let pipe_name = generate_pipe_name();
1425 
1426         let mut p1 = create_server_pipe(
1427             &pipe_name,
1428             &FramingMode::Message,
1429             &BlockingMode::Wait,
1430             /* timeout= */ 0,
1431             /* buffer_size= */ 1000,
1432             /* overlapped= */ true,
1433         )
1434         .unwrap();
1435 
1436         let mut p2 = create_client_pipe(
1437             &pipe_name,
1438             &FramingMode::Message,
1439             &BlockingMode::Wait,
1440             /* overlapped= */ true,
1441         )
1442         .unwrap();
1443 
1444         // SAFETY:
1445         // Safe because `read_overlapped` can be called since overlapped struct is created.
1446         unsafe {
1447             let mut p1_overlapped_wrapper =
1448                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1449             p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1450                 .unwrap();
1451             let size = p1
1452                 .get_overlapped_result(&mut p1_overlapped_wrapper)
1453                 .unwrap();
1454             assert_eq!(size, 6);
1455 
1456             let mut recv_buffer: [u8; 6] = [0; 6];
1457 
1458             let mut p2_overlapped_wrapper =
1459                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1460             p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1461                 .unwrap();
1462             let size = p2
1463                 .get_overlapped_result(&mut p2_overlapped_wrapper)
1464                 .unwrap();
1465             assert_eq!(size, 6);
1466             assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1467         }
1468     }
1469 
1470     #[test]
duplex_overlapped_test_in_use()1471     fn duplex_overlapped_test_in_use() {
1472         let pipe_name = generate_pipe_name();
1473 
1474         let mut p1 = create_server_pipe(
1475             &pipe_name,
1476             &FramingMode::Message,
1477             &BlockingMode::Wait,
1478             /* timeout= */ 0,
1479             /* buffer_size= */ 1000,
1480             /* overlapped= */ true,
1481         )
1482         .unwrap();
1483 
1484         let mut p2 = create_client_pipe(
1485             &pipe_name,
1486             &FramingMode::Message,
1487             &BlockingMode::Wait,
1488             /* overlapped= */ true,
1489         )
1490         .unwrap();
1491         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1492 
1493         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1494         assert!(res.is_err());
1495 
1496         let data = vec![75, 77, 54, 82, 76, 65];
1497         // SAFETY: safe because: data & overlapped wrapper live until the
1498         // operation is verified completed below.
1499         let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1500         assert!(res.is_ok());
1501 
1502         let res =
1503             // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1504             // will error out.
1505             unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1506         assert!(res.is_err());
1507 
1508         let mut recv_buffer: [u8; 6] = [0; 6];
1509         // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1510         // will error out.
1511         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1512         assert!(res.is_err());
1513 
1514         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1515         assert!(res.is_ok());
1516 
1517         let mut recv_buffer: [u8; 6] = [0; 6];
1518         // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1519         // operation is verified completed below.
1520         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1521         assert!(res.is_ok());
1522         let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1523         assert!(res.is_ok());
1524     }
1525 
generate_pipe_name() -> String1526     fn generate_pipe_name() -> String {
1527         format!(
1528             r"\\.\pipe\test-ipc-pipe-name.rand{}",
1529             rand::thread_rng().gen::<u64>(),
1530         )
1531     }
1532 
send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()>1533     fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1534         let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1535         std::thread::spawn(move || {
1536             let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1537             let exit_event = Event::new().unwrap();
1538             for _i in 0..msg_count {
1539                 let message = *messages
1540                     .get(rand::thread_rng().gen::<usize>() % messages.len())
1541                     .unwrap();
1542                 pipe.write_overlapped_blocking_message(
1543                     &message.len().to_be_bytes(),
1544                     message.as_bytes(),
1545                     &mut overlapped_wrapper,
1546                 )
1547                 .unwrap();
1548             }
1549             for _i in 0..msg_count {
1550                 let message = pipe
1551                     .read_overlapped_blocking_message(
1552                         size_of::<usize>(),
1553                         |bytes: &[u8]| {
1554                             assert_eq!(bytes.len(), size_of::<usize>());
1555                             usize::from_be_bytes(
1556                                 bytes.try_into().expect("failed to get array from slice"),
1557                             )
1558                         },
1559                         &mut overlapped_wrapper,
1560                         &exit_event,
1561                     )
1562                     .unwrap();
1563                 assert_eq!(
1564                     *messages.get(message.len() - 1).unwrap(),
1565                     std::str::from_utf8(&message).unwrap(),
1566                 );
1567             }
1568         })
1569     }
1570 
1571     #[test]
multipart_message_smoke_test()1572     fn multipart_message_smoke_test() {
1573         let pipe_name = generate_pipe_name();
1574         let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1575         let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1576         let handles = [
1577             send_receive_msgs(server.clone(), 100),
1578             send_receive_msgs(client.clone(), 100),
1579             send_receive_msgs(server, 100),
1580             send_receive_msgs(client, 100),
1581         ];
1582         for h in handles {
1583             h.join().unwrap();
1584         }
1585     }
1586 
1587     #[test]
multipart_message_into_inner_pipe()1588     fn multipart_message_into_inner_pipe() {
1589         let pipe_name = generate_pipe_name();
1590         let mut pipe = create_server_pipe(
1591             &format!(r"\\.\pipe\{}", pipe_name),
1592             &FramingMode::Message,
1593             &BlockingMode::Wait,
1594             0,
1595             1024 * 1024,
1596             true,
1597         )
1598         .expect("should create the server pipe with success");
1599         let server1 = {
1600             let pipe = pipe
1601                 .try_clone()
1602                 .expect("should duplicate the named pipe with success");
1603             // SAFETY: `pipe` is a server named pipe.
1604             unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1605                 .expect("should create the multipart message pipe with success")
1606         };
1607         let server2 = server1.clone();
1608         assert!(
1609             server2.into_inner_pipe().is_none(),
1610             "not the last reference, should be None"
1611         );
1612         let inner_pipe = server1
1613             .into_inner_pipe()
1614             .expect("the last reference, should return the underlying pipe");
1615         // CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1616         // that API to compare if 2 handles are the same.
1617         pipe.set_blocking(&BlockingMode::NoWait)
1618             .expect("should set the blocking mode on the original pipe with success");
1619         assert_eq!(
1620             pipe.get_info()
1621                 .expect("should get the pipe information on the original pipe successfully"),
1622             inner_pipe
1623                 .get_info()
1624                 .expect("should get the pipe information on the inner pipe successfully")
1625         );
1626         pipe.set_blocking(&BlockingMode::Wait)
1627             .expect("should set the blocking mode on the original pipe with success");
1628         assert_eq!(
1629             pipe.get_info()
1630                 .expect("should get the pipe information on the original pipe successfully"),
1631             inner_pipe
1632                 .get_info()
1633                 .expect("should get the pipe information on the inner pipe successfully")
1634         );
1635     }
1636 
1637     #[test]
test_wait_for_connection_blocking()1638     fn test_wait_for_connection_blocking() {
1639         let pipe_name = generate_pipe_name();
1640 
1641         let mut server_pipe = create_server_pipe(
1642             &pipe_name,
1643             &FramingMode::Message,
1644             &BlockingMode::Wait,
1645             /* timeout= */ 0,
1646             /* buffer_size= */ 1000,
1647             /* overlapped= */ true,
1648         )
1649         .unwrap();
1650 
1651         let server = crate::thread::spawn_with_timeout(move || {
1652             let exit_event = Event::new().unwrap();
1653             server_pipe
1654                 .wait_for_client_connection_overlapped_blocking(&exit_event)
1655                 .unwrap();
1656         });
1657 
1658         let _client = create_client_pipe(
1659             &pipe_name,
1660             &FramingMode::Message,
1661             &BlockingMode::Wait,
1662             /* overlapped= */ true,
1663         )
1664         .unwrap();
1665         server.try_join(Duration::from_secs(10)).unwrap();
1666     }
1667 
1668     #[test]
test_wait_for_connection_blocking_exit_triggered()1669     fn test_wait_for_connection_blocking_exit_triggered() {
1670         let pipe_name = generate_pipe_name();
1671 
1672         let mut server_pipe = create_server_pipe(
1673             &pipe_name,
1674             &FramingMode::Message,
1675             &BlockingMode::Wait,
1676             /* timeout= */ 0,
1677             /* buffer_size= */ 1000,
1678             /* overlapped= */ true,
1679         )
1680         .unwrap();
1681 
1682         let exit_event = Event::new().unwrap();
1683         let exit_event_for_server = exit_event.try_clone().unwrap();
1684         let server = crate::thread::spawn_with_timeout(move || {
1685             assert!(server_pipe
1686                 .wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1687                 .is_err());
1688         });
1689         exit_event.signal().unwrap();
1690         server.try_join(Duration::from_secs(10)).unwrap();
1691     }
1692 
1693     #[test]
std_io_read_eof()1694     fn std_io_read_eof() {
1695         let (mut w, mut r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1696         std::io::Write::write(&mut w, &[1, 2, 3]).unwrap();
1697         std::mem::drop(w);
1698 
1699         let mut buffer: [u8; 4] = [0; 4];
1700         assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 3);
1701         assert_eq!(buffer, [1, 2, 3, 0]);
1702         assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1703         assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1704     }
1705 
1706     #[test]
std_io_write_eof()1707     fn std_io_write_eof() {
1708         let (mut w, r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1709         std::mem::drop(r);
1710         let result = std::io::Write::write(&mut w, &[1, 2, 3]);
1711         // Not required to return BrokenPipe here, something like Ok(0) is also acceptable.
1712         assert!(
1713             result.is_err()
1714                 && result.as_ref().unwrap_err().kind() == std::io::ErrorKind::BrokenPipe,
1715             "expected Err(BrokenPipe), got {result:?}"
1716         );
1717     }
1718 }
1719