• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicBool;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering;
16 use std::sync::Arc;
17 
18 use rand::Rng;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 use win_util::fail_if_zero;
23 use win_util::SecurityAttributes;
24 use win_util::SelfRelativeSecurityDescriptor;
25 use winapi::shared::minwindef::DWORD;
26 use winapi::shared::minwindef::FALSE;
27 use winapi::shared::minwindef::TRUE;
28 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
29 use winapi::shared::winerror::ERROR_IO_PENDING;
30 use winapi::shared::winerror::ERROR_MORE_DATA;
31 use winapi::shared::winerror::ERROR_NO_DATA;
32 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
33 use winapi::um::errhandlingapi::GetLastError;
34 use winapi::um::fileapi::FlushFileBuffers;
35 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
36 use winapi::um::ioapiset::CancelIoEx;
37 use winapi::um::ioapiset::GetOverlappedResult;
38 use winapi::um::minwinbase::OVERLAPPED;
39 use winapi::um::namedpipeapi::ConnectNamedPipe;
40 use winapi::um::namedpipeapi::DisconnectNamedPipe;
41 use winapi::um::namedpipeapi::GetNamedPipeInfo;
42 use winapi::um::namedpipeapi::PeekNamedPipe;
43 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
44 use winapi::um::winbase::CreateNamedPipeA;
45 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
46 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
47 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
48 use winapi::um::winbase::PIPE_NOWAIT;
49 use winapi::um::winbase::PIPE_READMODE_BYTE;
50 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
51 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
52 use winapi::um::winbase::PIPE_TYPE_BYTE;
53 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
54 use winapi::um::winbase::PIPE_WAIT;
55 use winapi::um::winbase::SECURITY_IDENTIFICATION;
56 
57 use super::RawDescriptor;
58 use crate::descriptor::AsRawDescriptor;
59 use crate::descriptor::FromRawDescriptor;
60 use crate::descriptor::IntoRawDescriptor;
61 use crate::descriptor::SafeDescriptor;
62 use crate::Event;
63 use crate::EventToken;
64 use crate::WaitContext;
65 
66 /// The default buffer size for all named pipes in the system. If this size is too small, writers
67 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
68 ///
69 /// The general rule is this should be *at least* as big as the largest message, otherwise
70 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
71 /// crate::windows::StreamChannel, which expects to be able to make a complete write before
72 /// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
73 /// is too small:
74 ///     * The writer can't complete its write and release the lock because the buffer is too small.
75 ///     * The reader can't start reading because the lock is held by the writer, so it can't relieve
76 ///       buffer pressure. Note that for message pipes, the reader couldn't do anything to help
77 ///       anyway, because a message mode pipe should NOT have a partial read (which is what we would
78 ///       need to relieve pressure).
79 ///     * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
80 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
81 
82 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
83 
84 /// Represents one end of a named pipe
85 #[derive(Serialize, Deserialize, Debug)]
86 pub struct PipeConnection {
87     handle: SafeDescriptor,
88     framing_mode: FramingMode,
89     blocking_mode: BlockingMode,
90 }
91 
92 /// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
93 ///
94 /// Defined as a separate type so that we can mark it as `Send` and `Sync`.
95 pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
96 
97 // SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
98 // pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
99 unsafe impl Send for BoxedOverlapped {}
100 
101 // SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
102 unsafe impl Sync for BoxedOverlapped {}
103 
104 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
105 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
106 pub struct OverlappedWrapper {
107     overlapped: BoxedOverlapped,
108     // This field prevents the event handle from being dropped too early and allows callers to
109     // be notified when a read or write overlapped operation has completed.
110     h_event: Option<Event>,
111     in_use: bool,
112 }
113 
114 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>115     pub fn get_h_event_ref(&self) -> Option<&Event> {
116         self.h_event.as_ref()
117     }
118 
119     /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
120     /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
121     /// returned must not be dropped.
122     ///
123     /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
124     /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
125     /// handle will be signaled when the operation is complete. In other words, you can use
126     /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
127     /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>128     pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
129         let mut overlapped = OVERLAPPED::default();
130         let h_event = if include_event {
131             Some(Event::new()?)
132         } else {
133             None
134         };
135 
136         overlapped.hEvent = if let Some(event) = h_event.as_ref() {
137             event.as_raw_descriptor()
138         } else {
139             0 as RawDescriptor
140         };
141 
142         Ok(OverlappedWrapper {
143             overlapped: BoxedOverlapped(Box::new(overlapped)),
144             h_event,
145             in_use: false,
146         })
147     }
148 }
149 
150 pub trait WriteOverlapped {
151     /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
152     /// If successful, the write operation will complete asynchronously, and
153     /// `write_result()` should be called to get the result.
154     ///
155     /// # Safety
156     /// `buf` and `overlapped_wrapper` will be in use for the duration of
157     /// the overlapped operation. These must not be reused and must live until
158     /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>159     unsafe fn write_overlapped(
160         &mut self,
161         buf: &mut [u8],
162         overlapped_wrapper: &mut OverlappedWrapper,
163     ) -> io::Result<()>;
164 
165     /// Gets the result of the overlapped write operation. Must only be called
166     /// after issuing an overlapped write operation using `write_overlapped`. The
167     /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>168     fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
169 
170     /// Tries to get the result of the overlapped write operation. Must only be
171     /// called once, and only after issuing an overlapped write operation using
172     /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
173     ///
174     /// An error indicates that the operation hasn't completed yet and
175     /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>176     fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
177         -> io::Result<usize>;
178 }
179 
180 pub trait ReadOverlapped {
181     /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
182     /// If successful, the read operation will complete asynchronously, and
183     /// `read_result()` should be called to get the result.
184     ///
185     /// # Safety
186     /// `buf` and `overlapped_wrapper` will be in use for the duration of
187     /// the overlapped operation. These must not be reused and must live until
188     /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>189     unsafe fn read_overlapped(
190         &mut self,
191         buf: &mut [u8],
192         overlapped_wrapper: &mut OverlappedWrapper,
193     ) -> io::Result<()>;
194 
195     /// Gets the result of the overlapped read operation. Must only be called
196     /// once, and only after issuing an overlapped read operation using
197     /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>198     fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
199 
200     /// Tries to get the result of the overlapped read operation. Must only be called
201     /// after issuing an overlapped read operation using `read_overlapped`. The
202     /// same `overlapped_wrapper` must be provided.
203     ///
204     /// An error indicates that the operation hasn't completed yet and
205     /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>206     fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
207 }
208 
209 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
210 pub enum FramingMode {
211     Byte,
212     Message,
213 }
214 
215 impl FramingMode {
to_readmode(self) -> DWORD216     fn to_readmode(self) -> DWORD {
217         match self {
218             FramingMode::Message => PIPE_READMODE_MESSAGE,
219             FramingMode::Byte => PIPE_READMODE_BYTE,
220         }
221     }
222 
to_pipetype(self) -> DWORD223     fn to_pipetype(self) -> DWORD {
224         match self {
225             FramingMode::Message => PIPE_TYPE_MESSAGE,
226             FramingMode::Byte => PIPE_TYPE_BYTE,
227         }
228     }
229 }
230 
231 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
232 pub enum BlockingMode {
233     /// Calls to read() block until data is received
234     Wait,
235     /// Calls to read() return immediately even if there is nothing read with error code 232
236     /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
237     ///
238     /// NOTE: This mode is discouraged by the Windows API documentation.
239     NoWait,
240 }
241 
242 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD243     fn from(blocking_mode: &BlockingMode) -> DWORD {
244         match blocking_mode {
245             BlockingMode::Wait => PIPE_WAIT,
246             BlockingMode::NoWait => PIPE_NOWAIT,
247         }
248     }
249 }
250 
251 /// Sets the handle state for a named pipe in a rust friendly way.
252 /// SAFETY:
253 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>254 unsafe fn set_named_pipe_handle_state(
255     pipe_handle: RawDescriptor,
256     client_mode: &mut DWORD,
257 ) -> Result<()> {
258     // Safe when the pipe handle is open. Safety also requires checking the return value, which we
259     // do below.
260     let success_flag = SetNamedPipeHandleState(
261         /* hNamedPipe= */ pipe_handle,
262         /* lpMode= */ client_mode,
263         /* lpMaxCollectionCount= */ ptr::null_mut(),
264         /* lpCollectDataTimeout= */ ptr::null_mut(),
265     );
266     if success_flag == 0 {
267         Err(io::Error::last_os_error())
268     } else {
269         Ok(())
270     }
271 }
272 
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>273 pub fn pair(
274     framing_mode: &FramingMode,
275     blocking_mode: &BlockingMode,
276     timeout: u64,
277 ) -> Result<(PipeConnection, PipeConnection)> {
278     pair_with_buffer_size(
279         framing_mode,
280         blocking_mode,
281         timeout,
282         DEFAULT_BUFFER_SIZE,
283         false,
284     )
285 }
286 
287 /// Creates a pair of handles connected to either end of a duplex named pipe.
288 ///
289 /// The pipe created will have a semi-random name and a default set of security options that
290 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
291 /// remote clients, allow only a single server instance, and prevent impersonation by the server
292 /// end of the pipe.
293 ///
294 /// # Arguments
295 ///
296 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
297 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
298 ///   fewer bytes than were sent in a message from the other end of the pipe.
299 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
300 ///   return immediately if there is nothing available (NoWait).
301 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds. Setting this to
302 ///   zero will create sockets with the system default timeout.
303 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
304 ///   buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
305 ///   writes that don't fit in the buffer.
306 /// # Return value
307 ///
308 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
309 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>310 pub fn pair_with_buffer_size(
311     framing_mode: &FramingMode,
312     blocking_mode: &BlockingMode,
313     timeout: u64,
314     buffer_size: usize,
315     overlapped: bool,
316 ) -> Result<(PipeConnection, PipeConnection)> {
317     // Give the pipe a unique name to avoid accidental collisions
318     let pipe_name = format!(
319         r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
320         process::id(),
321         NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
322         rand::thread_rng().gen::<u32>(),
323     );
324 
325     let server_end = create_server_pipe(
326         &pipe_name,
327         framing_mode,
328         blocking_mode,
329         timeout,
330         buffer_size,
331         overlapped,
332     )?;
333 
334     // Open the named pipe we just created as the client
335     let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
336 
337     // Accept the client's connection
338     // Not sure if this is strictly needed but I'm doing it just in case.
339     // We expect at this point that the client will already be connected,
340     // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
341     // It's also OK if we get a return code of success.
342     server_end.wait_for_client_connection()?;
343 
344     Ok((server_end, client_end))
345 }
346 
347 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
348 /// settings.
349 ///
350 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
351 ///
352 /// # Arguments
353 ///
354 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
355 ///   `\\.\pipe\<some-name>`.
356 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
357 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
358 ///   fewer bytes than were sent in a message from the other end of the pipe.
359 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
360 ///   return immediately if there is nothing available (NoWait).
361 /// * `timeout`       - A timeout to apply for socket operations, in milliseconds. Setting this to
362 ///   zero will create sockets with the system default timeout.
363 /// * `buffer_size`   - The default buffer size for the named pipe. The system should expand the
364 ///   buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
365 ///   writes that don't fit in the buffer.
366 /// * `overlapped`    - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>367 pub fn create_server_pipe(
368     pipe_name: &str,
369     framing_mode: &FramingMode,
370     blocking_mode: &BlockingMode,
371     timeout: u64,
372     buffer_size: usize,
373     overlapped: bool,
374 ) -> Result<PipeConnection> {
375     let c_pipe_name = CString::new(pipe_name).unwrap();
376 
377     let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
378     if overlapped {
379         open_mode_flags |= FILE_FLAG_OVERLAPPED
380     }
381 
382     // This sets flags so there will be an error if >1 instance (server end)
383     // of this pipe name is opened because we expect exactly one.
384     // SAFETY:
385     // Safe because security attributes are valid, pipe_name is valid C string,
386     // and we're checking the return code
387     let server_handle = unsafe {
388         CreateNamedPipeA(
389             c_pipe_name.as_ptr(),
390             /* dwOpenMode= */
391             open_mode_flags,
392             /* dwPipeMode= */
393             framing_mode.to_pipetype()
394                 | framing_mode.to_readmode()
395                 | DWORD::from(blocking_mode)
396                 | PIPE_REJECT_REMOTE_CLIENTS,
397             /* nMaxInstances= */ 1,
398             /* nOutBufferSize= */ buffer_size as DWORD,
399             /* nInBufferSize= */ buffer_size as DWORD,
400             /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
401             /* lpSecurityAttributes= */
402             SecurityAttributes::new_with_security_descriptor(
403                 SelfRelativeSecurityDescriptor::get_singleton(),
404                 /* inherit= */ true,
405             )
406             .as_mut(),
407         )
408     };
409 
410     if server_handle == INVALID_HANDLE_VALUE {
411         Err(io::Error::last_os_error())
412     } else {
413         // SAFETY: Safe because server_handle is valid.
414         unsafe {
415             Ok(PipeConnection {
416                 handle: SafeDescriptor::from_raw_descriptor(server_handle),
417                 framing_mode: *framing_mode,
418                 blocking_mode: *blocking_mode,
419             })
420         }
421     }
422 }
423 
424 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
425 /// settings.
426 ///
427 /// The pipe will be set to prevent impersonation of the client by the server process.
428 ///
429 /// # Arguments
430 ///
431 /// * `pipe_name`     - The path of the named pipe to create. Should be in the form
432 ///   `\\.\pipe\<some-name>`.
433 /// * `framing_mode`  - Whether the system should provide a simple byte stream (Byte) or an
434 ///   automatically framed sequence of messages (Message). In message mode it's an error to read
435 ///   fewer bytes than were sent in a message from the other end of the pipe.
436 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
437 ///   return immediately if there is nothing available (NoWait).
438 /// * `overlapped`    - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>439 pub fn create_client_pipe(
440     pipe_name: &str,
441     framing_mode: &FramingMode,
442     blocking_mode: &BlockingMode,
443     overlapped: bool,
444 ) -> Result<PipeConnection> {
445     let client_handle = OpenOptions::new()
446         .read(true)
447         .write(true)
448         .create(true)
449         .security_qos_flags(SECURITY_IDENTIFICATION)
450         .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
451         .open(pipe_name)?
452         .into_raw_descriptor();
453 
454     let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
455 
456     // SAFETY:
457     // Safe because client_handle's open() call did not return an error.
458     unsafe {
459         set_named_pipe_handle_state(client_handle, &mut client_mode)?;
460     }
461 
462     Ok(PipeConnection {
463         // SAFETY:
464         // Safe because client_handle is valid
465         handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
466         framing_mode: *framing_mode,
467         blocking_mode: *blocking_mode,
468     })
469 }
470 
471 // This is used to mark types which can be appropriately sent through the
472 // generic helper functions write_to_pipe and read_from_pipe.
473 pub trait PipeSendable {
474     // Default values used to fill in new empty indexes when resizing a buffer to
475     // a larger size.
default() -> Self476     fn default() -> Self;
477 }
478 impl PipeSendable for u8 {
default() -> Self479     fn default() -> Self {
480         0
481     }
482 }
483 impl PipeSendable for RawDescriptor {
default() -> Self484     fn default() -> Self {
485         ptr::null_mut()
486     }
487 }
488 
489 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>490     pub fn try_clone(&self) -> Result<PipeConnection> {
491         let copy_handle = self.handle.try_clone()?;
492         Ok(PipeConnection {
493             handle: copy_handle,
494             framing_mode: self.framing_mode,
495             blocking_mode: self.blocking_mode,
496         })
497     }
498 
499     /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
500     /// blocking modes.
501     ///
502     /// # Safety
503     /// 1. rd is valid and ownership is transferred to this function when it is called.
504     ///
505     /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
506     /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection507     pub unsafe fn from_raw_descriptor(
508         rd: RawDescriptor,
509         framing_mode: FramingMode,
510         blocking_mode: BlockingMode,
511     ) -> PipeConnection {
512         PipeConnection {
513             handle: SafeDescriptor::from_raw_descriptor(rd),
514             framing_mode,
515             blocking_mode,
516         }
517     }
518 
519     /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
520     /// Returns the number of bytes (not values) read.
521     ///
522     /// # Safety
523     ///
524     /// This is safe only when the following conditions hold:
525     ///     1. The data on the other end of the pipe is a valid binary representation of data for
526     ///     type T, and
527     ///     2. The number of bytes read is a multiple of the size of T; this must be checked by
528     ///     the caller.
529     /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
530     /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>531     pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
532         PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
533     }
534 
535     /// Similar to `PipeConnection::read` except it also allows:
536     ///     1. The same end of the named pipe to read and write at the same time in different
537     ///        threads.
538     ///     2. Asynchronous read and write (read and write won't block).
539     ///
540     /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
541     /// (can be created with `OverlappedWrapper::new`) will be passed into
542     /// `ReadFile`. That event will be triggered when the read operation is complete.
543     ///
544     /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
545     /// also help with waiting until the read operation is complete.
546     ///
547     /// # Safety
548     ///
549     /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
550     /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>551     pub unsafe fn read_overlapped<T: PipeSendable>(
552         &mut self,
553         buf: &mut [T],
554         overlapped_wrapper: &mut OverlappedWrapper,
555     ) -> Result<()> {
556         if overlapped_wrapper.in_use {
557             return Err(std::io::Error::new(
558                 std::io::ErrorKind::InvalidInput,
559                 "Overlapped struct already in use",
560             ));
561         }
562         overlapped_wrapper.in_use = true;
563 
564         PipeConnection::read_internal(
565             &self.handle,
566             self.blocking_mode,
567             buf,
568             Some(&mut overlapped_wrapper.overlapped.0),
569         )?;
570         Ok(())
571     }
572 
573     /// Helper for `read_overlapped` and `read`
574     ///
575     /// # Safety
576     /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>577     unsafe fn read_internal<T: PipeSendable>(
578         handle: &SafeDescriptor,
579         blocking_mode: BlockingMode,
580         buf: &mut [T],
581         overlapped: Option<&mut OVERLAPPED>,
582     ) -> Result<usize> {
583         let res = crate::windows::read_file(
584             handle,
585             buf.as_mut_ptr() as *mut u8,
586             mem::size_of_val(buf),
587             overlapped,
588         );
589         match res {
590             Ok(bytes_read) => Ok(bytes_read),
591             Err(e)
592                 if blocking_mode == BlockingMode::NoWait
593                     && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
594             {
595                 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
596                 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
597                 // correct. For further details see:
598                 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
599                 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
600                 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
601             }
602             Err(e) => Err(e),
603         }
604     }
605 
606     /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
607     /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>608     pub fn read_overlapped_blocking<T: PipeSendable>(
609         &mut self,
610         buf: &mut [T],
611         overlapped_wrapper: &mut OverlappedWrapper,
612         exit_event: &Event,
613     ) -> Result<()> {
614         // SAFETY:
615         // Safe because we are providing a valid buffer slice and also providing a valid
616         // overlapped struct.
617         match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
618             // More data isn't necessarily an error as long as we've filled the provided buffer,
619             // as is checked later in this function.
620             Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(()),
621             Err(e) => Err(e),
622             Ok(()) => Ok(()),
623         }?;
624 
625         #[derive(EventToken)]
626         enum Token {
627             ReadOverlapped,
628             Exit,
629         }
630 
631         let wait_ctx = WaitContext::build_with(&[
632             (
633                 overlapped_wrapper.get_h_event_ref().unwrap(),
634                 Token::ReadOverlapped,
635             ),
636             (exit_event, Token::Exit),
637         ])?;
638 
639         let events = wait_ctx.wait()?;
640         for event in events {
641             match event.token {
642                 Token::ReadOverlapped => {
643                     let size_read_in_bytes =
644                         self.get_overlapped_result(overlapped_wrapper)? as usize;
645 
646                     // If this error shows, most likely the overlapped named pipe was set up
647                     // incorrectly.
648                     if size_read_in_bytes != buf.len() {
649                         return Err(std::io::Error::new(
650                             std::io::ErrorKind::UnexpectedEof,
651                             "Short read",
652                         ));
653                     }
654                 }
655                 Token::Exit => {
656                     return Err(std::io::Error::new(
657                         std::io::ErrorKind::Interrupted,
658                         "IO canceled on exit request",
659                     ));
660                 }
661             }
662         }
663 
664         Ok(())
665     }
666 
667     /// Gets the size in bytes of data in the pipe.
668     ///
669     /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
670     /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>671     pub fn get_available_byte_count(&self) -> io::Result<u32> {
672         let mut total_bytes_avail: DWORD = 0;
673 
674         // SAFETY:
675         // Safe because the underlying pipe handle is guaranteed to be open, and the output values
676         // live at valid memory locations.
677         fail_if_zero!(unsafe {
678             PeekNamedPipe(
679                 self.as_raw_descriptor(),
680                 ptr::null_mut(),
681                 0,
682                 ptr::null_mut(),
683                 &mut total_bytes_avail,
684                 ptr::null_mut(),
685             )
686         });
687 
688         Ok(total_bytes_avail)
689     }
690 
691     /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
692     /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>693     pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
694         // SAFETY: overlapped is None so this is safe.
695         unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
696     }
697 
698     /// Similar to `PipeConnection::write` except it also allows:
699     ///     1. The same end of the named pipe to read and write at the same time in different
700     ///        threads.
701     ///     2. Asynchronous read and write (read and write won't block).
702     ///
703     /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
704     /// (can be created with `OverlappedWrapper::new`) will be passed into
705     /// `WriteFile`. That event will be triggered when the write operation is complete.
706     ///
707     /// In order to get how many bytes were written, call `get_overlapped_result`. That function
708     /// will also help with waiting until the write operation is complete. The pipe must be
709     /// opened in overlapped otherwise there may be unexpected behavior.
710     ///
711     /// # Safety
712     /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>713     pub unsafe fn write_overlapped<T: PipeSendable>(
714         &mut self,
715         buf: &[T],
716         overlapped_wrapper: &mut OverlappedWrapper,
717     ) -> Result<()> {
718         if overlapped_wrapper.in_use {
719             return Err(std::io::Error::new(
720                 std::io::ErrorKind::InvalidInput,
721                 "Overlapped struct already in use",
722             ));
723         }
724         overlapped_wrapper.in_use = true;
725 
726         PipeConnection::write_internal(
727             &self.handle,
728             buf,
729             Some(&mut overlapped_wrapper.overlapped.0),
730         )?;
731         Ok(())
732     }
733 
734     /// Helper for `write_overlapped` and `write`.
735     ///
736     /// # Safety
737     /// * Safe if overlapped is None.
738     /// * Safe if overlapped is Some and:
739     ///   + buf lives until the overlapped operation is complete.
740     ///   + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>741     unsafe fn write_internal<T: PipeSendable>(
742         handle: &SafeDescriptor,
743         buf: &[T],
744         overlapped: Option<&mut OVERLAPPED>,
745     ) -> Result<usize> {
746         // SAFETY:
747         // Safe because buf points to memory valid until the write completes and we pass a valid
748         // length for that memory.
749         unsafe {
750             crate::windows::write_file(
751                 handle,
752                 buf.as_ptr() as *const u8,
753                 mem::size_of_val(buf),
754                 overlapped,
755             )
756         }
757     }
758 
759     /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>760     pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
761         let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
762         self.blocking_mode = *blocking_mode;
763 
764         // SAFETY:
765         // Safe because the pipe has not been closed (it is managed by this object).
766         unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
767     }
768 
769     /// For a server named pipe, waits for a client to connect (blocking).
wait_for_client_connection(&self) -> Result<()>770     pub fn wait_for_client_connection(&self) -> Result<()> {
771         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
772         self.wait_for_client_connection_internal(
773             &mut overlapped_wrapper,
774             /* should_block = */ true,
775         )
776     }
777 
778     /// Interruptable blocking wait for a client to connect.
wait_for_client_connection_overlapped_blocking( &mut self, exit_event: &Event, ) -> Result<()>779     pub fn wait_for_client_connection_overlapped_blocking(
780         &mut self,
781         exit_event: &Event,
782     ) -> Result<()> {
783         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
784         self.wait_for_client_connection_internal(
785             &mut overlapped_wrapper,
786             /* should_block = */ false,
787         )?;
788 
789         #[derive(EventToken)]
790         enum Token {
791             Connected,
792             Exit,
793         }
794 
795         let wait_ctx = WaitContext::build_with(&[
796             (
797                 overlapped_wrapper.get_h_event_ref().unwrap(),
798                 Token::Connected,
799             ),
800             (exit_event, Token::Exit),
801         ])?;
802 
803         let events = wait_ctx.wait()?;
804         if let Some(event) = events.into_iter().next() {
805             return match event.token {
806                 Token::Connected => Ok(()),
807                 Token::Exit => {
808                     // We must cancel IO here because it is unsafe to free the overlapped wrapper
809                     // while the IO operation is active.
810                     self.cancel_io()?;
811 
812                     Err(std::io::Error::new(
813                         std::io::ErrorKind::Interrupted,
814                         "IO canceled on exit request",
815                     ))
816                 }
817             };
818         }
819         unreachable!("wait cannot return Ok with zero events");
820     }
821 
822     /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
823     /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>824     pub fn wait_for_client_connection_overlapped(
825         &self,
826         overlapped_wrapper: &mut OverlappedWrapper,
827     ) -> Result<()> {
828         self.wait_for_client_connection_internal(
829             overlapped_wrapper,
830             /* should_block = */ false,
831         )
832     }
833 
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>834     fn wait_for_client_connection_internal(
835         &self,
836         overlapped_wrapper: &mut OverlappedWrapper,
837         should_block: bool,
838     ) -> Result<()> {
839         // SAFETY:
840         // Safe because the handle is valid and we're checking the return
841         // code according to the documentation
842         //
843         // TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
844         //      overlapped_wrapper must live until the overlapped operation is complete; however,
845         //      if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
846         //      could be freed while the operation is still running.
847         unsafe {
848             let success_flag = ConnectNamedPipe(
849                 self.as_raw_descriptor(),
850                 // Note: The overlapped structure is only used if the pipe was opened in
851                 // OVERLAPPED mode, but is necessary in that case.
852                 &mut *overlapped_wrapper.overlapped.0,
853             );
854             if success_flag == 0 {
855                 return match GetLastError() {
856                     ERROR_PIPE_CONNECTED => {
857                         if !should_block {
858                             // If async, make sure the event is signalled to indicate the client
859                             // is ready.
860                             overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
861                         }
862 
863                         Ok(())
864                     }
865                     ERROR_IO_PENDING => {
866                         if should_block {
867                             overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
868                         }
869                         Ok(())
870                     }
871                     err => Err(io::Error::from_raw_os_error(err as i32)),
872                 };
873             }
874         }
875         Ok(())
876     }
877 
878     /// Used for overlapped read and write operations.
879     ///
880     /// This will block until the ReadFile or WriteFile operation that also took in
881     /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
882     /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
883     /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>884     pub fn get_overlapped_result(
885         &mut self,
886         overlapped_wrapper: &mut OverlappedWrapper,
887     ) -> io::Result<u32> {
888         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
889         overlapped_wrapper.in_use = false;
890         res
891     }
892 
893     /// Used for overlapped read and write operations.
894     ///
895     /// This will return immediately, regardless of the completion status of the
896     /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
897     /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
898     /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
899     /// that were read or written, if completed.  If the operation hasn't
900     /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
901     /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>902     pub fn try_get_overlapped_result(
903         &mut self,
904         overlapped_wrapper: &mut OverlappedWrapper,
905     ) -> io::Result<u32> {
906         let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
907         match res {
908             Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
909                 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
910             }
911             _ => {
912                 overlapped_wrapper.in_use = false;
913                 res
914             }
915         }
916     }
917 
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>918     fn get_overlapped_result_internal(
919         &mut self,
920         overlapped_wrapper: &mut OverlappedWrapper,
921         wait: bool,
922     ) -> io::Result<u32> {
923         if !overlapped_wrapper.in_use {
924             return Err(std::io::Error::new(
925                 std::io::ErrorKind::InvalidInput,
926                 "Overlapped struct is not in use",
927             ));
928         }
929         let mut size_transferred = 0;
930         // SAFETY:
931         // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
932         // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
933         fail_if_zero!(unsafe {
934             GetOverlappedResult(
935                 self.handle.as_raw_descriptor(),
936                 &mut *overlapped_wrapper.overlapped.0,
937                 &mut size_transferred,
938                 if wait { TRUE } else { FALSE },
939             )
940         });
941 
942         Ok(size_transferred)
943     }
944 
945     /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
946     /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>947     pub fn cancel_io(&mut self) -> Result<()> {
948         fail_if_zero!(
949             // SAFETY: descriptor is valid and the return value is checked.
950             unsafe {
951                 CancelIoEx(
952                     self.handle.as_raw_descriptor(),
953                     /* lpOverlapped= */ std::ptr::null_mut(),
954                 )
955             }
956         );
957 
958         Ok(())
959     }
960 
961     /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode962     pub fn get_framing_mode(&self) -> FramingMode {
963         self.framing_mode
964     }
965 
966     /// Returns metadata about the connected NamedPipe.
get_info(&self) -> Result<NamedPipeInfo>967     pub fn get_info(&self) -> Result<NamedPipeInfo> {
968         let mut flags: u32 = 0;
969         let mut incoming_buffer_size: u32 = 0;
970         let mut outgoing_buffer_size: u32 = 0;
971         let mut max_instances: u32 = 0;
972         // SAFETY: all pointers are valid
973         fail_if_zero!(unsafe {
974             GetNamedPipeInfo(
975                 self.as_raw_descriptor(),
976                 &mut flags,
977                 &mut outgoing_buffer_size,
978                 &mut incoming_buffer_size,
979                 &mut max_instances,
980             )
981         });
982 
983         Ok(NamedPipeInfo {
984             outgoing_buffer_size,
985             incoming_buffer_size,
986             max_instances,
987             flags,
988         })
989     }
990 
991     /// For a server pipe, flush the pipe contents. This will
992     /// block until the pipe is cleared by the client. Only
993     /// call this if you are sure the client is reading the
994     /// data!
flush_data_blocking(&self) -> Result<()>995     pub fn flush_data_blocking(&self) -> Result<()> {
996         // SAFETY:
997         // Safe because the only buffers interacted with are
998         // outside of Rust memory
999         fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1000         Ok(())
1001     }
1002 
1003     /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>1004     pub fn disconnect_clients(&self) -> Result<()> {
1005         // SAFETY:
1006         // Safe because we own the handle passed in and know it will remain valid for the duration
1007         // of the call. Discarded buffers are not managed by rust.
1008         fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1009         Ok(())
1010     }
1011 }
1012 
1013 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1014     fn as_raw_descriptor(&self) -> RawDescriptor {
1015         self.handle.as_raw_descriptor()
1016     }
1017 }
1018 
1019 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1020     fn into_raw_descriptor(self) -> RawDescriptor {
1021         self.handle.into_raw_descriptor()
1022     }
1023 }
1024 
1025 // SAFETY: Send safety is ensured by inner fields.
1026 unsafe impl Send for PipeConnection {}
1027 // SAFETY: Sync safety is ensured by inner fields.
1028 unsafe impl Sync for PipeConnection {}
1029 
1030 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1031     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1032         // SAFETY:
1033         // This is safe because PipeConnection::read is always safe for u8
1034         unsafe { PipeConnection::read(self, buf) }
1035     }
1036 }
1037 
1038 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1039     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1040         PipeConnection::write(self, buf)
1041     }
1042 
flush(&mut self) -> io::Result<()>1043     fn flush(&mut self) -> io::Result<()> {
1044         Ok(())
1045     }
1046 }
1047 
1048 /// A simple data struct representing
1049 /// metadata about a NamedPipe.
1050 #[derive(Debug, PartialEq, Eq)]
1051 pub struct NamedPipeInfo {
1052     pub outgoing_buffer_size: u32,
1053     pub incoming_buffer_size: u32,
1054     pub max_instances: u32,
1055     pub flags: u32,
1056 }
1057 
1058 /// This is a wrapper around PipeConnection. This allows a read and a write operations
1059 /// to run in parallel but not multiple reads or writes in parallel.
1060 ///
1061 /// Reason: The message from/to service are two-parts - a fixed size header that
1062 /// contains the size of the actual message. By allowing only one write at a time
1063 /// we ensure that the variable size message is written/read right after writing/reading
1064 /// fixed size header. For example it avoid sending or receiving in messages in order like
1065 /// H1, H2, M1, M2
1066 ///   - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1067 ///     sent by another event loop.
1068 ///
1069 /// Do not expose direct access to reader or writer pipes.
1070 ///
1071 /// The struct is clone-able so that different event loops can talk to the other end.
1072 #[derive(Clone)]
1073 pub struct MultiPartMessagePipe {
1074     // Lock protected pipe to receive messages.
1075     reader: Arc<Mutex<PipeConnection>>,
1076     // Lock protected pipe to send messages.
1077     writer: Arc<Mutex<PipeConnection>>,
1078     // Whether this end is created as server or client. The variable helps to
1079     // decide if something meanigful should be done when `wait_for_connection` is called.
1080     is_server: bool,
1081     // Always true if pipe is created as client.
1082     // Defaults to false on server. Updated to true on calling `wait_for_connection`
1083     // after a client connects.
1084     is_connected: Arc<AtomicBool>,
1085 }
1086 
1087 impl MultiPartMessagePipe {
create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self>1088     fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1089         Ok(Self {
1090             reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1091             writer: Arc::new(Mutex::new(pipe)),
1092             is_server,
1093             is_connected: Arc::new(AtomicBool::new(false)),
1094         })
1095     }
1096 
1097     /// Create server side of MutiPartMessagePipe.
1098     /// # Safety
1099     /// `pipe` must be a server named pipe.
1100     #[deny(unsafe_op_in_unsafe_fn)]
create_from_server_pipe(pipe: PipeConnection) -> Result<Self>1101     pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1102         Self::create_from_pipe(pipe, true)
1103     }
1104 
1105     /// Create client side of MutiPartMessagePipe.
create_as_client(pipe_name: &str) -> Result<Self>1106     pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1107         let pipe = create_client_pipe(
1108             &format!(r"\\.\pipe\{}", pipe_name),
1109             &FramingMode::Message,
1110             &BlockingMode::Wait,
1111             /* overlapped= */ true,
1112         )?;
1113         Self::create_from_pipe(pipe, false)
1114     }
1115 
1116     /// Create server side of MutiPartMessagePipe.
create_as_server(pipe_name: &str) -> Result<Self>1117     pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1118         let pipe = create_server_pipe(
1119             &format!(r"\\.\pipe\{}", pipe_name,),
1120             &FramingMode::Message,
1121             &BlockingMode::Wait,
1122             0,
1123             1024 * 1024,
1124             true,
1125         )?;
1126         // SAFETY: `pipe` is a server named pipe.
1127         unsafe { Self::create_from_server_pipe(pipe) }
1128     }
1129 
1130     /// If the struct is created as a server then waits for client connection to arrive.
1131     /// It only waits on reader as reader and writer are clones.
wait_for_connection(&self) -> Result<()>1132     pub fn wait_for_connection(&self) -> Result<()> {
1133         if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1134             self.reader.lock().wait_for_client_connection()?;
1135             self.is_connected.store(true, Ordering::Relaxed);
1136         }
1137         Ok(())
1138     }
1139 
write_overlapped_blocking_message_internal<T: PipeSendable>( pipe: &mut PipeConnection, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1140     fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1141         pipe: &mut PipeConnection,
1142         buf: &[T],
1143         overlapped_wrapper: &mut OverlappedWrapper,
1144     ) -> Result<()> {
1145         // Safety:
1146         // `buf` and `overlapped_wrapper` will be in use for the duration of
1147         // the overlapped operation. These must not be reused and must live until
1148         // after `get_overlapped_result()` has been called which is done right
1149         // after this call.
1150         unsafe {
1151             pipe.write_overlapped(buf, overlapped_wrapper)?;
1152         }
1153 
1154         let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1155 
1156         if size_written_in_bytes as usize != buf.len() {
1157             return Err(std::io::Error::new(
1158                 std::io::ErrorKind::UnexpectedEof,
1159                 format!(
1160                     "Short write expected:{} found:{}",
1161                     size_written_in_bytes,
1162                     buf.len(),
1163                 ),
1164             ));
1165         }
1166         Ok(())
1167     }
1168     /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
write_overlapped_blocking_message<T: PipeSendable>( &self, header: &[T], message: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1169     pub fn write_overlapped_blocking_message<T: PipeSendable>(
1170         &self,
1171         header: &[T],
1172         message: &[T],
1173         overlapped_wrapper: &mut OverlappedWrapper,
1174     ) -> Result<()> {
1175         let mut writer = self.writer.lock();
1176         Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1177         Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1178     }
1179 
1180     /// Reads a variable size message and returns the message on success.
1181     /// The size of the message is expected to proceed the message in
1182     /// the form of `header_size` message.
1183     ///
1184     /// `parse_message_size` lets caller parse the header to extract
1185     /// message size.
1186     ///
1187     /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>1188     pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1189         &self,
1190         header_size: usize,
1191         parse_message_size: F,
1192         overlapped_wrapper: &mut OverlappedWrapper,
1193         exit_event: &Event,
1194     ) -> Result<Vec<u8>> {
1195         let mut pipe = self.reader.lock();
1196         let mut header = vec![0; header_size];
1197         header.resize_with(header_size, Default::default);
1198         pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1199         let message_size = parse_message_size(&header);
1200         if message_size == 0 {
1201             return Ok(vec![]);
1202         }
1203         let mut buf = vec![];
1204         buf.resize_with(message_size, Default::default);
1205         pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1206         Ok(buf)
1207     }
1208 
1209     /// Returns the inner named pipe if the current struct is the sole owner of the underlying
1210     /// named pipe.
1211     ///
1212     /// Otherwise, [`None`] is returned and the struct is dropped.
1213     ///
1214     /// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1215     /// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1216     /// possible that all of them will result in [`None`]. This is Due to Rust version
1217     /// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1218     /// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1219     ///
1220     /// If the underlying named pipe is a server named pipe, this method allows the caller to
1221     /// terminate the connection by first flushing the named pipe then disconnecting the clients
1222     /// idiomatically per
1223     /// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
into_inner_pipe(self) -> Option<PipeConnection>1224     pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1225         let piper = Arc::clone(&self.reader);
1226         drop(self);
1227         Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1228     }
1229 }
1230 
1231 impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1232     type Error = std::io::Error;
try_from(pipe: PipeConnection) -> Result<Self>1233     fn try_from(pipe: PipeConnection) -> Result<Self> {
1234         Self::create_from_pipe(pipe, false)
1235     }
1236 }
1237 
1238 #[cfg(test)]
1239 mod tests {
1240     use std::mem::size_of;
1241     use std::thread::JoinHandle;
1242     use std::time::Duration;
1243 
1244     use super::*;
1245 
1246     #[test]
duplex_pipe_stream()1247     fn duplex_pipe_stream() {
1248         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1249 
1250         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1251         // SAFETY: trivially safe with pipe created and return value checked.
1252         unsafe {
1253             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1254                 println!("{}", dir);
1255 
1256                 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1257 
1258                 // Smaller than what we sent so we get multiple chunks
1259                 let mut recv_buffer: [u8; 4] = [0; 4];
1260 
1261                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1262                 assert_eq!(size, 4);
1263                 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1264 
1265                 size = receiver.read(&mut recv_buffer).unwrap();
1266                 assert_eq!(size, 2);
1267                 assert_eq!(recv_buffer[0..2], [76, 65]);
1268             }
1269         }
1270     }
1271 
1272     #[test]
available_byte_count_byte_mode()1273     fn available_byte_count_byte_mode() {
1274         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1275         p1.write(&[1, 23, 45]).unwrap();
1276         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1277 
1278         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1279         // yield the same value.
1280         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1281     }
1282 
1283     #[test]
available_byte_count_message_mode()1284     fn available_byte_count_message_mode() {
1285         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1286         p1.write(&[1, 23, 45]).unwrap();
1287         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1288 
1289         // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1290         // yield the same value.
1291         assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1292     }
1293 
1294     #[test]
available_byte_count_message_mode_multiple_messages()1295     fn available_byte_count_message_mode_multiple_messages() {
1296         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1297         p1.write(&[1, 2, 3]).unwrap();
1298         p1.write(&[4, 5]).unwrap();
1299         assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1300     }
1301 
1302     #[test]
duplex_pipe_message()1303     fn duplex_pipe_message() {
1304         let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1305 
1306         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1307         // SAFETY: trivially safe with pipe created and return value checked.
1308         unsafe {
1309             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1310                 println!("{}", dir);
1311 
1312                 // Send 2 messages so that we can check that message framing works
1313                 sender.write(&[1, 23, 45]).unwrap();
1314                 sender.write(&[67, 89, 10]).unwrap();
1315 
1316                 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1317 
1318                 let mut size = receiver.read(&mut recv_buffer).unwrap();
1319                 assert_eq!(size, 3);
1320                 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1321 
1322                 size = receiver.read(&mut recv_buffer).unwrap();
1323                 assert_eq!(size, 3);
1324                 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1325             }
1326         }
1327     }
1328 
1329     #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1330     fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1331         let mut recv_buffer: [u8; 1] = [0; 1];
1332 
1333         // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1334         // SAFETY: trivially safe with PipeConnection created and return value checked.
1335         unsafe {
1336             for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1337                 println!("{}", dir);
1338                 sender.write(&[1]).unwrap();
1339                 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1340                 assert_eq!(
1341                     receiver.read(&mut recv_buffer).unwrap_err().kind(),
1342                     std::io::ErrorKind::WouldBlock
1343                 );
1344             }
1345         }
1346     }
1347 
1348     #[test]
duplex_nowait()1349     fn duplex_nowait() {
1350         let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1351         duplex_nowait_helper(&p1, &p2);
1352     }
1353 
1354     #[test]
duplex_nowait_set_after_creation()1355     fn duplex_nowait_set_after_creation() {
1356         // Tests non blocking setting after pipe creation
1357         let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1358         p1.set_blocking(&BlockingMode::NoWait)
1359             .expect("Failed to set blocking mode on pipe p1");
1360         p2.set_blocking(&BlockingMode::NoWait)
1361             .expect("Failed to set blocking mode on pipe p2");
1362         duplex_nowait_helper(&p1, &p2);
1363     }
1364 
1365     #[test]
duplex_overlapped()1366     fn duplex_overlapped() {
1367         let pipe_name = generate_pipe_name();
1368 
1369         let mut p1 = create_server_pipe(
1370             &pipe_name,
1371             &FramingMode::Message,
1372             &BlockingMode::Wait,
1373             /* timeout= */ 0,
1374             /* buffer_size= */ 1000,
1375             /* overlapped= */ true,
1376         )
1377         .unwrap();
1378 
1379         let mut p2 = create_client_pipe(
1380             &pipe_name,
1381             &FramingMode::Message,
1382             &BlockingMode::Wait,
1383             /* overlapped= */ true,
1384         )
1385         .unwrap();
1386 
1387         // SAFETY:
1388         // Safe because `read_overlapped` can be called since overlapped struct is created.
1389         unsafe {
1390             let mut p1_overlapped_wrapper =
1391                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1392             p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1393                 .unwrap();
1394             let size = p1
1395                 .get_overlapped_result(&mut p1_overlapped_wrapper)
1396                 .unwrap();
1397             assert_eq!(size, 6);
1398 
1399             let mut recv_buffer: [u8; 6] = [0; 6];
1400 
1401             let mut p2_overlapped_wrapper =
1402                 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1403             p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1404                 .unwrap();
1405             let size = p2
1406                 .get_overlapped_result(&mut p2_overlapped_wrapper)
1407                 .unwrap();
1408             assert_eq!(size, 6);
1409             assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1410         }
1411     }
1412 
1413     #[test]
duplex_overlapped_test_in_use()1414     fn duplex_overlapped_test_in_use() {
1415         let pipe_name = generate_pipe_name();
1416 
1417         let mut p1 = create_server_pipe(
1418             &pipe_name,
1419             &FramingMode::Message,
1420             &BlockingMode::Wait,
1421             /* timeout= */ 0,
1422             /* buffer_size= */ 1000,
1423             /* overlapped= */ true,
1424         )
1425         .unwrap();
1426 
1427         let mut p2 = create_client_pipe(
1428             &pipe_name,
1429             &FramingMode::Message,
1430             &BlockingMode::Wait,
1431             /* overlapped= */ true,
1432         )
1433         .unwrap();
1434         let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1435 
1436         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1437         assert!(res.is_err());
1438 
1439         let data = vec![75, 77, 54, 82, 76, 65];
1440         // SAFETY: safe because: data & overlapped wrapper live until the
1441         // operation is verified completed below.
1442         let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1443         assert!(res.is_ok());
1444 
1445         let res =
1446             // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1447             // will error out.
1448             unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1449         assert!(res.is_err());
1450 
1451         let mut recv_buffer: [u8; 6] = [0; 6];
1452         // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1453         // will error out.
1454         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1455         assert!(res.is_err());
1456 
1457         let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1458         assert!(res.is_ok());
1459 
1460         let mut recv_buffer: [u8; 6] = [0; 6];
1461         // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1462         // operation is verified completed below.
1463         let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1464         assert!(res.is_ok());
1465         let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1466         assert!(res.is_ok());
1467     }
1468 
generate_pipe_name() -> String1469     fn generate_pipe_name() -> String {
1470         format!(
1471             r"\\.\pipe\test-ipc-pipe-name.rand{}",
1472             rand::thread_rng().gen::<u64>(),
1473         )
1474     }
1475 
send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()>1476     fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1477         let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1478         std::thread::spawn(move || {
1479             let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1480             let exit_event = Event::new().unwrap();
1481             for _i in 0..msg_count {
1482                 let message = *messages
1483                     .get(rand::thread_rng().gen::<usize>() % messages.len())
1484                     .unwrap();
1485                 pipe.write_overlapped_blocking_message(
1486                     &message.len().to_be_bytes(),
1487                     message.as_bytes(),
1488                     &mut overlapped_wrapper,
1489                 )
1490                 .unwrap();
1491             }
1492             for _i in 0..msg_count {
1493                 let message = pipe
1494                     .read_overlapped_blocking_message(
1495                         size_of::<usize>(),
1496                         |bytes: &[u8]| {
1497                             assert_eq!(bytes.len(), size_of::<usize>());
1498                             usize::from_be_bytes(
1499                                 bytes.try_into().expect("failed to get array from slice"),
1500                             )
1501                         },
1502                         &mut overlapped_wrapper,
1503                         &exit_event,
1504                     )
1505                     .unwrap();
1506                 assert_eq!(
1507                     *messages.get(message.len() - 1).unwrap(),
1508                     std::str::from_utf8(&message).unwrap(),
1509                 );
1510             }
1511         })
1512     }
1513 
1514     #[test]
multipart_message_smoke_test()1515     fn multipart_message_smoke_test() {
1516         let pipe_name = generate_pipe_name();
1517         let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1518         let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1519         let handles = [
1520             send_receive_msgs(server.clone(), 100),
1521             send_receive_msgs(client.clone(), 100),
1522             send_receive_msgs(server, 100),
1523             send_receive_msgs(client, 100),
1524         ];
1525         for h in handles {
1526             h.join().unwrap();
1527         }
1528     }
1529 
1530     #[test]
multipart_message_into_inner_pipe()1531     fn multipart_message_into_inner_pipe() {
1532         let pipe_name = generate_pipe_name();
1533         let mut pipe = create_server_pipe(
1534             &format!(r"\\.\pipe\{}", pipe_name),
1535             &FramingMode::Message,
1536             &BlockingMode::Wait,
1537             0,
1538             1024 * 1024,
1539             true,
1540         )
1541         .expect("should create the server pipe with success");
1542         let server1 = {
1543             let pipe = pipe
1544                 .try_clone()
1545                 .expect("should duplicate the named pipe with success");
1546             // SAFETY: `pipe` is a server named pipe.
1547             unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1548                 .expect("should create the multipart message pipe with success")
1549         };
1550         let server2 = server1.clone();
1551         assert!(
1552             server2.into_inner_pipe().is_none(),
1553             "not the last reference, should be None"
1554         );
1555         let inner_pipe = server1
1556             .into_inner_pipe()
1557             .expect("the last reference, should return the underlying pipe");
1558         // CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1559         // that API to compare if 2 handles are the same.
1560         pipe.set_blocking(&BlockingMode::NoWait)
1561             .expect("should set the blocking mode on the original pipe with success");
1562         assert_eq!(
1563             pipe.get_info()
1564                 .expect("should get the pipe information on the original pipe successfully"),
1565             inner_pipe
1566                 .get_info()
1567                 .expect("should get the pipe information on the inner pipe successfully")
1568         );
1569         pipe.set_blocking(&BlockingMode::Wait)
1570             .expect("should set the blocking mode on the original pipe with success");
1571         assert_eq!(
1572             pipe.get_info()
1573                 .expect("should get the pipe information on the original pipe successfully"),
1574             inner_pipe
1575                 .get_info()
1576                 .expect("should get the pipe information on the inner pipe successfully")
1577         );
1578     }
1579 
1580     #[test]
test_wait_for_connection_blocking()1581     fn test_wait_for_connection_blocking() {
1582         let pipe_name = generate_pipe_name();
1583 
1584         let mut server_pipe = create_server_pipe(
1585             &pipe_name,
1586             &FramingMode::Message,
1587             &BlockingMode::Wait,
1588             /* timeout= */ 0,
1589             /* buffer_size= */ 1000,
1590             /* overlapped= */ true,
1591         )
1592         .unwrap();
1593 
1594         let server = crate::thread::spawn_with_timeout(move || {
1595             let exit_event = Event::new().unwrap();
1596             server_pipe
1597                 .wait_for_client_connection_overlapped_blocking(&exit_event)
1598                 .unwrap();
1599         });
1600 
1601         let _client = create_client_pipe(
1602             &pipe_name,
1603             &FramingMode::Message,
1604             &BlockingMode::Wait,
1605             /* overlapped= */ true,
1606         )
1607         .unwrap();
1608         server.try_join(Duration::from_secs(10)).unwrap();
1609     }
1610 
1611     #[test]
test_wait_for_connection_blocking_exit_triggered()1612     fn test_wait_for_connection_blocking_exit_triggered() {
1613         let pipe_name = generate_pipe_name();
1614 
1615         let mut server_pipe = create_server_pipe(
1616             &pipe_name,
1617             &FramingMode::Message,
1618             &BlockingMode::Wait,
1619             /* timeout= */ 0,
1620             /* buffer_size= */ 1000,
1621             /* overlapped= */ true,
1622         )
1623         .unwrap();
1624 
1625         let exit_event = Event::new().unwrap();
1626         let exit_event_for_server = exit_event.try_clone().unwrap();
1627         let server = crate::thread::spawn_with_timeout(move || {
1628             assert!(server_pipe
1629                 .wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1630                 .is_err());
1631         });
1632         exit_event.signal().unwrap();
1633         server.try_join(Duration::from_secs(10)).unwrap();
1634     }
1635 }
1636