1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicBool;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering;
16 use std::sync::Arc;
17
18 use rand::Rng;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 use win_util::fail_if_zero;
23 use win_util::SecurityAttributes;
24 use win_util::SelfRelativeSecurityDescriptor;
25 use winapi::shared::minwindef::DWORD;
26 use winapi::shared::minwindef::FALSE;
27 use winapi::shared::minwindef::TRUE;
28 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
29 use winapi::shared::winerror::ERROR_IO_PENDING;
30 use winapi::shared::winerror::ERROR_MORE_DATA;
31 use winapi::shared::winerror::ERROR_NO_DATA;
32 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
33 use winapi::um::errhandlingapi::GetLastError;
34 use winapi::um::fileapi::FlushFileBuffers;
35 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
36 use winapi::um::ioapiset::CancelIoEx;
37 use winapi::um::ioapiset::GetOverlappedResult;
38 use winapi::um::minwinbase::OVERLAPPED;
39 use winapi::um::namedpipeapi::ConnectNamedPipe;
40 use winapi::um::namedpipeapi::DisconnectNamedPipe;
41 use winapi::um::namedpipeapi::GetNamedPipeInfo;
42 use winapi::um::namedpipeapi::PeekNamedPipe;
43 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
44 use winapi::um::winbase::CreateNamedPipeA;
45 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
46 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
47 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
48 use winapi::um::winbase::PIPE_NOWAIT;
49 use winapi::um::winbase::PIPE_READMODE_BYTE;
50 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
51 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
52 use winapi::um::winbase::PIPE_TYPE_BYTE;
53 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
54 use winapi::um::winbase::PIPE_WAIT;
55 use winapi::um::winbase::SECURITY_IDENTIFICATION;
56
57 use super::RawDescriptor;
58 use crate::descriptor::AsRawDescriptor;
59 use crate::descriptor::FromRawDescriptor;
60 use crate::descriptor::IntoRawDescriptor;
61 use crate::descriptor::SafeDescriptor;
62 use crate::Event;
63 use crate::EventToken;
64 use crate::WaitContext;
65
66 /// The default buffer size for all named pipes in the system. If this size is too small, writers
67 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
68 ///
69 /// The general rule is this should be *at least* as big as the largest message, otherwise
70 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
71 /// crate::windows::StreamChannel, which expects to be able to make a complete write before
72 /// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
73 /// is too small:
74 /// * The writer can't complete its write and release the lock because the buffer is too small.
75 /// * The reader can't start reading because the lock is held by the writer, so it can't relieve
76 /// buffer pressure. Note that for message pipes, the reader couldn't do anything to help
77 /// anyway, because a message mode pipe should NOT have a partial read (which is what we would
78 /// need to relieve pressure).
79 /// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
80 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
81
82 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
83
84 /// Represents one end of a named pipe
85 #[derive(Serialize, Deserialize, Debug)]
86 pub struct PipeConnection {
87 handle: SafeDescriptor,
88 framing_mode: FramingMode,
89 blocking_mode: BlockingMode,
90 }
91
92 /// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
93 ///
94 /// Defined as a separate type so that we can mark it as `Send` and `Sync`.
95 pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
96
97 // SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
98 // pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
99 unsafe impl Send for BoxedOverlapped {}
100
101 // SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
102 unsafe impl Sync for BoxedOverlapped {}
103
104 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
105 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
106 pub struct OverlappedWrapper {
107 overlapped: BoxedOverlapped,
108 // This field prevents the event handle from being dropped too early and allows callers to
109 // be notified when a read or write overlapped operation has completed.
110 h_event: Option<Event>,
111 in_use: bool,
112 }
113
114 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>115 pub fn get_h_event_ref(&self) -> Option<&Event> {
116 self.h_event.as_ref()
117 }
118
119 /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
120 /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
121 /// returned must not be dropped.
122 ///
123 /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
124 /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
125 /// handle will be signaled when the operation is complete. In other words, you can use
126 /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
127 /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>128 pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
129 let mut overlapped = OVERLAPPED::default();
130 let h_event = if include_event {
131 Some(Event::new()?)
132 } else {
133 None
134 };
135
136 overlapped.hEvent = if let Some(event) = h_event.as_ref() {
137 event.as_raw_descriptor()
138 } else {
139 0 as RawDescriptor
140 };
141
142 Ok(OverlappedWrapper {
143 overlapped: BoxedOverlapped(Box::new(overlapped)),
144 h_event,
145 in_use: false,
146 })
147 }
148 }
149
150 pub trait WriteOverlapped {
151 /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
152 /// If successful, the write operation will complete asynchronously, and
153 /// `write_result()` should be called to get the result.
154 ///
155 /// # Safety
156 /// `buf` and `overlapped_wrapper` will be in use for the duration of
157 /// the overlapped operation. These must not be reused and must live until
158 /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>159 unsafe fn write_overlapped(
160 &mut self,
161 buf: &mut [u8],
162 overlapped_wrapper: &mut OverlappedWrapper,
163 ) -> io::Result<()>;
164
165 /// Gets the result of the overlapped write operation. Must only be called
166 /// after issuing an overlapped write operation using `write_overlapped`. The
167 /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>168 fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
169
170 /// Tries to get the result of the overlapped write operation. Must only be
171 /// called once, and only after issuing an overlapped write operation using
172 /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
173 ///
174 /// An error indicates that the operation hasn't completed yet and
175 /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>176 fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
177 -> io::Result<usize>;
178 }
179
180 pub trait ReadOverlapped {
181 /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
182 /// If successful, the read operation will complete asynchronously, and
183 /// `read_result()` should be called to get the result.
184 ///
185 /// # Safety
186 /// `buf` and `overlapped_wrapper` will be in use for the duration of
187 /// the overlapped operation. These must not be reused and must live until
188 /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>189 unsafe fn read_overlapped(
190 &mut self,
191 buf: &mut [u8],
192 overlapped_wrapper: &mut OverlappedWrapper,
193 ) -> io::Result<()>;
194
195 /// Gets the result of the overlapped read operation. Must only be called
196 /// once, and only after issuing an overlapped read operation using
197 /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>198 fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
199
200 /// Tries to get the result of the overlapped read operation. Must only be called
201 /// after issuing an overlapped read operation using `read_overlapped`. The
202 /// same `overlapped_wrapper` must be provided.
203 ///
204 /// An error indicates that the operation hasn't completed yet and
205 /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>206 fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
207 }
208
209 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
210 pub enum FramingMode {
211 Byte,
212 Message,
213 }
214
215 impl FramingMode {
to_readmode(self) -> DWORD216 fn to_readmode(self) -> DWORD {
217 match self {
218 FramingMode::Message => PIPE_READMODE_MESSAGE,
219 FramingMode::Byte => PIPE_READMODE_BYTE,
220 }
221 }
222
to_pipetype(self) -> DWORD223 fn to_pipetype(self) -> DWORD {
224 match self {
225 FramingMode::Message => PIPE_TYPE_MESSAGE,
226 FramingMode::Byte => PIPE_TYPE_BYTE,
227 }
228 }
229 }
230
231 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
232 pub enum BlockingMode {
233 /// Calls to read() block until data is received
234 Wait,
235 /// Calls to read() return immediately even if there is nothing read with error code 232
236 /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
237 ///
238 /// NOTE: This mode is discouraged by the Windows API documentation.
239 NoWait,
240 }
241
242 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD243 fn from(blocking_mode: &BlockingMode) -> DWORD {
244 match blocking_mode {
245 BlockingMode::Wait => PIPE_WAIT,
246 BlockingMode::NoWait => PIPE_NOWAIT,
247 }
248 }
249 }
250
251 /// Sets the handle state for a named pipe in a rust friendly way.
252 /// SAFETY:
253 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>254 unsafe fn set_named_pipe_handle_state(
255 pipe_handle: RawDescriptor,
256 client_mode: &mut DWORD,
257 ) -> Result<()> {
258 // Safe when the pipe handle is open. Safety also requires checking the return value, which we
259 // do below.
260 let success_flag = SetNamedPipeHandleState(
261 /* hNamedPipe= */ pipe_handle,
262 /* lpMode= */ client_mode,
263 /* lpMaxCollectionCount= */ ptr::null_mut(),
264 /* lpCollectDataTimeout= */ ptr::null_mut(),
265 );
266 if success_flag == 0 {
267 Err(io::Error::last_os_error())
268 } else {
269 Ok(())
270 }
271 }
272
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>273 pub fn pair(
274 framing_mode: &FramingMode,
275 blocking_mode: &BlockingMode,
276 timeout: u64,
277 ) -> Result<(PipeConnection, PipeConnection)> {
278 pair_with_buffer_size(
279 framing_mode,
280 blocking_mode,
281 timeout,
282 DEFAULT_BUFFER_SIZE,
283 false,
284 )
285 }
286
287 /// Creates a pair of handles connected to either end of a duplex named pipe.
288 ///
289 /// The pipe created will have a semi-random name and a default set of security options that
290 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
291 /// remote clients, allow only a single server instance, and prevent impersonation by the server
292 /// end of the pipe.
293 ///
294 /// # Arguments
295 ///
296 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
297 /// automatically framed sequence of messages (Message). In message mode it's an error to read
298 /// fewer bytes than were sent in a message from the other end of the pipe.
299 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
300 /// return immediately if there is nothing available (NoWait).
301 /// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
302 /// zero will create sockets with the system default timeout.
303 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
304 /// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
305 /// writes that don't fit in the buffer.
306 /// # Return value
307 ///
308 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
309 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>310 pub fn pair_with_buffer_size(
311 framing_mode: &FramingMode,
312 blocking_mode: &BlockingMode,
313 timeout: u64,
314 buffer_size: usize,
315 overlapped: bool,
316 ) -> Result<(PipeConnection, PipeConnection)> {
317 // Give the pipe a unique name to avoid accidental collisions
318 let pipe_name = format!(
319 r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
320 process::id(),
321 NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
322 rand::thread_rng().gen::<u32>(),
323 );
324
325 let server_end = create_server_pipe(
326 &pipe_name,
327 framing_mode,
328 blocking_mode,
329 timeout,
330 buffer_size,
331 overlapped,
332 )?;
333
334 // Open the named pipe we just created as the client
335 let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
336
337 // Accept the client's connection
338 // Not sure if this is strictly needed but I'm doing it just in case.
339 // We expect at this point that the client will already be connected,
340 // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
341 // It's also OK if we get a return code of success.
342 server_end.wait_for_client_connection()?;
343
344 Ok((server_end, client_end))
345 }
346
347 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
348 /// settings.
349 ///
350 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
351 ///
352 /// # Arguments
353 ///
354 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
355 /// `\\.\pipe\<some-name>`.
356 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
357 /// automatically framed sequence of messages (Message). In message mode it's an error to read
358 /// fewer bytes than were sent in a message from the other end of the pipe.
359 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
360 /// return immediately if there is nothing available (NoWait).
361 /// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
362 /// zero will create sockets with the system default timeout.
363 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
364 /// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
365 /// writes that don't fit in the buffer.
366 /// * `overlapped` - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>367 pub fn create_server_pipe(
368 pipe_name: &str,
369 framing_mode: &FramingMode,
370 blocking_mode: &BlockingMode,
371 timeout: u64,
372 buffer_size: usize,
373 overlapped: bool,
374 ) -> Result<PipeConnection> {
375 let c_pipe_name = CString::new(pipe_name).unwrap();
376
377 let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
378 if overlapped {
379 open_mode_flags |= FILE_FLAG_OVERLAPPED
380 }
381
382 // This sets flags so there will be an error if >1 instance (server end)
383 // of this pipe name is opened because we expect exactly one.
384 // SAFETY:
385 // Safe because security attributes are valid, pipe_name is valid C string,
386 // and we're checking the return code
387 let server_handle = unsafe {
388 CreateNamedPipeA(
389 c_pipe_name.as_ptr(),
390 /* dwOpenMode= */
391 open_mode_flags,
392 /* dwPipeMode= */
393 framing_mode.to_pipetype()
394 | framing_mode.to_readmode()
395 | DWORD::from(blocking_mode)
396 | PIPE_REJECT_REMOTE_CLIENTS,
397 /* nMaxInstances= */ 1,
398 /* nOutBufferSize= */ buffer_size as DWORD,
399 /* nInBufferSize= */ buffer_size as DWORD,
400 /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
401 /* lpSecurityAttributes= */
402 SecurityAttributes::new_with_security_descriptor(
403 SelfRelativeSecurityDescriptor::get_singleton(),
404 /* inherit= */ true,
405 )
406 .as_mut(),
407 )
408 };
409
410 if server_handle == INVALID_HANDLE_VALUE {
411 Err(io::Error::last_os_error())
412 } else {
413 // SAFETY: Safe because server_handle is valid.
414 unsafe {
415 Ok(PipeConnection {
416 handle: SafeDescriptor::from_raw_descriptor(server_handle),
417 framing_mode: *framing_mode,
418 blocking_mode: *blocking_mode,
419 })
420 }
421 }
422 }
423
424 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
425 /// settings.
426 ///
427 /// The pipe will be set to prevent impersonation of the client by the server process.
428 ///
429 /// # Arguments
430 ///
431 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
432 /// `\\.\pipe\<some-name>`.
433 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
434 /// automatically framed sequence of messages (Message). In message mode it's an error to read
435 /// fewer bytes than were sent in a message from the other end of the pipe.
436 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
437 /// return immediately if there is nothing available (NoWait).
438 /// * `overlapped` - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>439 pub fn create_client_pipe(
440 pipe_name: &str,
441 framing_mode: &FramingMode,
442 blocking_mode: &BlockingMode,
443 overlapped: bool,
444 ) -> Result<PipeConnection> {
445 let client_handle = OpenOptions::new()
446 .read(true)
447 .write(true)
448 .create(true)
449 .security_qos_flags(SECURITY_IDENTIFICATION)
450 .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
451 .open(pipe_name)?
452 .into_raw_descriptor();
453
454 let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
455
456 // SAFETY:
457 // Safe because client_handle's open() call did not return an error.
458 unsafe {
459 set_named_pipe_handle_state(client_handle, &mut client_mode)?;
460 }
461
462 Ok(PipeConnection {
463 // SAFETY:
464 // Safe because client_handle is valid
465 handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
466 framing_mode: *framing_mode,
467 blocking_mode: *blocking_mode,
468 })
469 }
470
471 // This is used to mark types which can be appropriately sent through the
472 // generic helper functions write_to_pipe and read_from_pipe.
473 pub trait PipeSendable {
474 // Default values used to fill in new empty indexes when resizing a buffer to
475 // a larger size.
default() -> Self476 fn default() -> Self;
477 }
478 impl PipeSendable for u8 {
default() -> Self479 fn default() -> Self {
480 0
481 }
482 }
483 impl PipeSendable for RawDescriptor {
default() -> Self484 fn default() -> Self {
485 ptr::null_mut()
486 }
487 }
488
489 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>490 pub fn try_clone(&self) -> Result<PipeConnection> {
491 let copy_handle = self.handle.try_clone()?;
492 Ok(PipeConnection {
493 handle: copy_handle,
494 framing_mode: self.framing_mode,
495 blocking_mode: self.blocking_mode,
496 })
497 }
498
499 /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
500 /// blocking modes.
501 ///
502 /// # Safety
503 /// 1. rd is valid and ownership is transferred to this function when it is called.
504 ///
505 /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
506 /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection507 pub unsafe fn from_raw_descriptor(
508 rd: RawDescriptor,
509 framing_mode: FramingMode,
510 blocking_mode: BlockingMode,
511 ) -> PipeConnection {
512 PipeConnection {
513 handle: SafeDescriptor::from_raw_descriptor(rd),
514 framing_mode,
515 blocking_mode,
516 }
517 }
518
519 /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
520 /// Returns the number of bytes (not values) read.
521 ///
522 /// # Safety
523 ///
524 /// This is safe only when the following conditions hold:
525 /// 1. The data on the other end of the pipe is a valid binary representation of data for
526 /// type T, and
527 /// 2. The number of bytes read is a multiple of the size of T; this must be checked by
528 /// the caller.
529 /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
530 /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>531 pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
532 PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
533 }
534
535 /// Similar to `PipeConnection::read` except it also allows:
536 /// 1. The same end of the named pipe to read and write at the same time in different
537 /// threads.
538 /// 2. Asynchronous read and write (read and write won't block).
539 ///
540 /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
541 /// (can be created with `OverlappedWrapper::new`) will be passed into
542 /// `ReadFile`. That event will be triggered when the read operation is complete.
543 ///
544 /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
545 /// also help with waiting until the read operation is complete.
546 ///
547 /// # Safety
548 ///
549 /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
550 /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>551 pub unsafe fn read_overlapped<T: PipeSendable>(
552 &mut self,
553 buf: &mut [T],
554 overlapped_wrapper: &mut OverlappedWrapper,
555 ) -> Result<()> {
556 if overlapped_wrapper.in_use {
557 return Err(std::io::Error::new(
558 std::io::ErrorKind::InvalidInput,
559 "Overlapped struct already in use",
560 ));
561 }
562 overlapped_wrapper.in_use = true;
563
564 PipeConnection::read_internal(
565 &self.handle,
566 self.blocking_mode,
567 buf,
568 Some(&mut overlapped_wrapper.overlapped.0),
569 )?;
570 Ok(())
571 }
572
573 /// Helper for `read_overlapped` and `read`
574 ///
575 /// # Safety
576 /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>577 unsafe fn read_internal<T: PipeSendable>(
578 handle: &SafeDescriptor,
579 blocking_mode: BlockingMode,
580 buf: &mut [T],
581 overlapped: Option<&mut OVERLAPPED>,
582 ) -> Result<usize> {
583 let res = crate::windows::read_file(
584 handle,
585 buf.as_mut_ptr() as *mut u8,
586 mem::size_of_val(buf),
587 overlapped,
588 );
589 match res {
590 Ok(bytes_read) => Ok(bytes_read),
591 Err(e)
592 if blocking_mode == BlockingMode::NoWait
593 && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
594 {
595 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
596 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
597 // correct. For further details see:
598 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
599 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
600 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
601 }
602 Err(e) => Err(e),
603 }
604 }
605
606 /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
607 /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>608 pub fn read_overlapped_blocking<T: PipeSendable>(
609 &mut self,
610 buf: &mut [T],
611 overlapped_wrapper: &mut OverlappedWrapper,
612 exit_event: &Event,
613 ) -> Result<()> {
614 // SAFETY:
615 // Safe because we are providing a valid buffer slice and also providing a valid
616 // overlapped struct.
617 match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
618 // More data isn't necessarily an error as long as we've filled the provided buffer,
619 // as is checked later in this function.
620 Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(()),
621 Err(e) => Err(e),
622 Ok(()) => Ok(()),
623 }?;
624
625 #[derive(EventToken)]
626 enum Token {
627 ReadOverlapped,
628 Exit,
629 }
630
631 let wait_ctx = WaitContext::build_with(&[
632 (
633 overlapped_wrapper.get_h_event_ref().unwrap(),
634 Token::ReadOverlapped,
635 ),
636 (exit_event, Token::Exit),
637 ])?;
638
639 let events = wait_ctx.wait()?;
640 for event in events {
641 match event.token {
642 Token::ReadOverlapped => {
643 let size_read_in_bytes =
644 self.get_overlapped_result(overlapped_wrapper)? as usize;
645
646 // If this error shows, most likely the overlapped named pipe was set up
647 // incorrectly.
648 if size_read_in_bytes != buf.len() {
649 return Err(std::io::Error::new(
650 std::io::ErrorKind::UnexpectedEof,
651 "Short read",
652 ));
653 }
654 }
655 Token::Exit => {
656 return Err(std::io::Error::new(
657 std::io::ErrorKind::Interrupted,
658 "IO canceled on exit request",
659 ));
660 }
661 }
662 }
663
664 Ok(())
665 }
666
667 /// Gets the size in bytes of data in the pipe.
668 ///
669 /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
670 /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>671 pub fn get_available_byte_count(&self) -> io::Result<u32> {
672 let mut total_bytes_avail: DWORD = 0;
673
674 // SAFETY:
675 // Safe because the underlying pipe handle is guaranteed to be open, and the output values
676 // live at valid memory locations.
677 fail_if_zero!(unsafe {
678 PeekNamedPipe(
679 self.as_raw_descriptor(),
680 ptr::null_mut(),
681 0,
682 ptr::null_mut(),
683 &mut total_bytes_avail,
684 ptr::null_mut(),
685 )
686 });
687
688 Ok(total_bytes_avail)
689 }
690
691 /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
692 /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>693 pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
694 // SAFETY: overlapped is None so this is safe.
695 unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
696 }
697
698 /// Similar to `PipeConnection::write` except it also allows:
699 /// 1. The same end of the named pipe to read and write at the same time in different
700 /// threads.
701 /// 2. Asynchronous read and write (read and write won't block).
702 ///
703 /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
704 /// (can be created with `OverlappedWrapper::new`) will be passed into
705 /// `WriteFile`. That event will be triggered when the write operation is complete.
706 ///
707 /// In order to get how many bytes were written, call `get_overlapped_result`. That function
708 /// will also help with waiting until the write operation is complete. The pipe must be
709 /// opened in overlapped otherwise there may be unexpected behavior.
710 ///
711 /// # Safety
712 /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>713 pub unsafe fn write_overlapped<T: PipeSendable>(
714 &mut self,
715 buf: &[T],
716 overlapped_wrapper: &mut OverlappedWrapper,
717 ) -> Result<()> {
718 if overlapped_wrapper.in_use {
719 return Err(std::io::Error::new(
720 std::io::ErrorKind::InvalidInput,
721 "Overlapped struct already in use",
722 ));
723 }
724 overlapped_wrapper.in_use = true;
725
726 PipeConnection::write_internal(
727 &self.handle,
728 buf,
729 Some(&mut overlapped_wrapper.overlapped.0),
730 )?;
731 Ok(())
732 }
733
734 /// Helper for `write_overlapped` and `write`.
735 ///
736 /// # Safety
737 /// * Safe if overlapped is None.
738 /// * Safe if overlapped is Some and:
739 /// + buf lives until the overlapped operation is complete.
740 /// + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>741 unsafe fn write_internal<T: PipeSendable>(
742 handle: &SafeDescriptor,
743 buf: &[T],
744 overlapped: Option<&mut OVERLAPPED>,
745 ) -> Result<usize> {
746 // SAFETY:
747 // Safe because buf points to memory valid until the write completes and we pass a valid
748 // length for that memory.
749 unsafe {
750 crate::windows::write_file(
751 handle,
752 buf.as_ptr() as *const u8,
753 mem::size_of_val(buf),
754 overlapped,
755 )
756 }
757 }
758
759 /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>760 pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
761 let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
762 self.blocking_mode = *blocking_mode;
763
764 // SAFETY:
765 // Safe because the pipe has not been closed (it is managed by this object).
766 unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
767 }
768
769 /// For a server named pipe, waits for a client to connect (blocking).
wait_for_client_connection(&self) -> Result<()>770 pub fn wait_for_client_connection(&self) -> Result<()> {
771 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
772 self.wait_for_client_connection_internal(
773 &mut overlapped_wrapper,
774 /* should_block = */ true,
775 )
776 }
777
778 /// Interruptable blocking wait for a client to connect.
wait_for_client_connection_overlapped_blocking( &mut self, exit_event: &Event, ) -> Result<()>779 pub fn wait_for_client_connection_overlapped_blocking(
780 &mut self,
781 exit_event: &Event,
782 ) -> Result<()> {
783 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
784 self.wait_for_client_connection_internal(
785 &mut overlapped_wrapper,
786 /* should_block = */ false,
787 )?;
788
789 #[derive(EventToken)]
790 enum Token {
791 Connected,
792 Exit,
793 }
794
795 let wait_ctx = WaitContext::build_with(&[
796 (
797 overlapped_wrapper.get_h_event_ref().unwrap(),
798 Token::Connected,
799 ),
800 (exit_event, Token::Exit),
801 ])?;
802
803 let events = wait_ctx.wait()?;
804 if let Some(event) = events.into_iter().next() {
805 return match event.token {
806 Token::Connected => Ok(()),
807 Token::Exit => {
808 // We must cancel IO here because it is unsafe to free the overlapped wrapper
809 // while the IO operation is active.
810 self.cancel_io()?;
811
812 Err(std::io::Error::new(
813 std::io::ErrorKind::Interrupted,
814 "IO canceled on exit request",
815 ))
816 }
817 };
818 }
819 unreachable!("wait cannot return Ok with zero events");
820 }
821
822 /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
823 /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>824 pub fn wait_for_client_connection_overlapped(
825 &self,
826 overlapped_wrapper: &mut OverlappedWrapper,
827 ) -> Result<()> {
828 self.wait_for_client_connection_internal(
829 overlapped_wrapper,
830 /* should_block = */ false,
831 )
832 }
833
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>834 fn wait_for_client_connection_internal(
835 &self,
836 overlapped_wrapper: &mut OverlappedWrapper,
837 should_block: bool,
838 ) -> Result<()> {
839 // SAFETY:
840 // Safe because the handle is valid and we're checking the return
841 // code according to the documentation
842 //
843 // TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
844 // overlapped_wrapper must live until the overlapped operation is complete; however,
845 // if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
846 // could be freed while the operation is still running.
847 unsafe {
848 let success_flag = ConnectNamedPipe(
849 self.as_raw_descriptor(),
850 // Note: The overlapped structure is only used if the pipe was opened in
851 // OVERLAPPED mode, but is necessary in that case.
852 &mut *overlapped_wrapper.overlapped.0,
853 );
854 if success_flag == 0 {
855 return match GetLastError() {
856 ERROR_PIPE_CONNECTED => {
857 if !should_block {
858 // If async, make sure the event is signalled to indicate the client
859 // is ready.
860 overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
861 }
862
863 Ok(())
864 }
865 ERROR_IO_PENDING => {
866 if should_block {
867 overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
868 }
869 Ok(())
870 }
871 err => Err(io::Error::from_raw_os_error(err as i32)),
872 };
873 }
874 }
875 Ok(())
876 }
877
878 /// Used for overlapped read and write operations.
879 ///
880 /// This will block until the ReadFile or WriteFile operation that also took in
881 /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
882 /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
883 /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>884 pub fn get_overlapped_result(
885 &mut self,
886 overlapped_wrapper: &mut OverlappedWrapper,
887 ) -> io::Result<u32> {
888 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
889 overlapped_wrapper.in_use = false;
890 res
891 }
892
893 /// Used for overlapped read and write operations.
894 ///
895 /// This will return immediately, regardless of the completion status of the
896 /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
897 /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
898 /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
899 /// that were read or written, if completed. If the operation hasn't
900 /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
901 /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>902 pub fn try_get_overlapped_result(
903 &mut self,
904 overlapped_wrapper: &mut OverlappedWrapper,
905 ) -> io::Result<u32> {
906 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
907 match res {
908 Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
909 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
910 }
911 _ => {
912 overlapped_wrapper.in_use = false;
913 res
914 }
915 }
916 }
917
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>918 fn get_overlapped_result_internal(
919 &mut self,
920 overlapped_wrapper: &mut OverlappedWrapper,
921 wait: bool,
922 ) -> io::Result<u32> {
923 if !overlapped_wrapper.in_use {
924 return Err(std::io::Error::new(
925 std::io::ErrorKind::InvalidInput,
926 "Overlapped struct is not in use",
927 ));
928 }
929 let mut size_transferred = 0;
930 // SAFETY:
931 // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
932 // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
933 fail_if_zero!(unsafe {
934 GetOverlappedResult(
935 self.handle.as_raw_descriptor(),
936 &mut *overlapped_wrapper.overlapped.0,
937 &mut size_transferred,
938 if wait { TRUE } else { FALSE },
939 )
940 });
941
942 Ok(size_transferred)
943 }
944
945 /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
946 /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>947 pub fn cancel_io(&mut self) -> Result<()> {
948 fail_if_zero!(
949 // SAFETY: descriptor is valid and the return value is checked.
950 unsafe {
951 CancelIoEx(
952 self.handle.as_raw_descriptor(),
953 /* lpOverlapped= */ std::ptr::null_mut(),
954 )
955 }
956 );
957
958 Ok(())
959 }
960
961 /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode962 pub fn get_framing_mode(&self) -> FramingMode {
963 self.framing_mode
964 }
965
966 /// Returns metadata about the connected NamedPipe.
get_info(&self) -> Result<NamedPipeInfo>967 pub fn get_info(&self) -> Result<NamedPipeInfo> {
968 let mut flags: u32 = 0;
969 let mut incoming_buffer_size: u32 = 0;
970 let mut outgoing_buffer_size: u32 = 0;
971 let mut max_instances: u32 = 0;
972 // SAFETY: all pointers are valid
973 fail_if_zero!(unsafe {
974 GetNamedPipeInfo(
975 self.as_raw_descriptor(),
976 &mut flags,
977 &mut outgoing_buffer_size,
978 &mut incoming_buffer_size,
979 &mut max_instances,
980 )
981 });
982
983 Ok(NamedPipeInfo {
984 outgoing_buffer_size,
985 incoming_buffer_size,
986 max_instances,
987 flags,
988 })
989 }
990
991 /// For a server pipe, flush the pipe contents. This will
992 /// block until the pipe is cleared by the client. Only
993 /// call this if you are sure the client is reading the
994 /// data!
flush_data_blocking(&self) -> Result<()>995 pub fn flush_data_blocking(&self) -> Result<()> {
996 // SAFETY:
997 // Safe because the only buffers interacted with are
998 // outside of Rust memory
999 fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1000 Ok(())
1001 }
1002
1003 /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>1004 pub fn disconnect_clients(&self) -> Result<()> {
1005 // SAFETY:
1006 // Safe because we own the handle passed in and know it will remain valid for the duration
1007 // of the call. Discarded buffers are not managed by rust.
1008 fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1009 Ok(())
1010 }
1011 }
1012
1013 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1014 fn as_raw_descriptor(&self) -> RawDescriptor {
1015 self.handle.as_raw_descriptor()
1016 }
1017 }
1018
1019 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1020 fn into_raw_descriptor(self) -> RawDescriptor {
1021 self.handle.into_raw_descriptor()
1022 }
1023 }
1024
1025 // SAFETY: Send safety is ensured by inner fields.
1026 unsafe impl Send for PipeConnection {}
1027 // SAFETY: Sync safety is ensured by inner fields.
1028 unsafe impl Sync for PipeConnection {}
1029
1030 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1031 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1032 // SAFETY:
1033 // This is safe because PipeConnection::read is always safe for u8
1034 unsafe { PipeConnection::read(self, buf) }
1035 }
1036 }
1037
1038 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1039 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1040 PipeConnection::write(self, buf)
1041 }
1042
flush(&mut self) -> io::Result<()>1043 fn flush(&mut self) -> io::Result<()> {
1044 Ok(())
1045 }
1046 }
1047
1048 /// A simple data struct representing
1049 /// metadata about a NamedPipe.
1050 #[derive(Debug, PartialEq, Eq)]
1051 pub struct NamedPipeInfo {
1052 pub outgoing_buffer_size: u32,
1053 pub incoming_buffer_size: u32,
1054 pub max_instances: u32,
1055 pub flags: u32,
1056 }
1057
1058 /// This is a wrapper around PipeConnection. This allows a read and a write operations
1059 /// to run in parallel but not multiple reads or writes in parallel.
1060 ///
1061 /// Reason: The message from/to service are two-parts - a fixed size header that
1062 /// contains the size of the actual message. By allowing only one write at a time
1063 /// we ensure that the variable size message is written/read right after writing/reading
1064 /// fixed size header. For example it avoid sending or receiving in messages in order like
1065 /// H1, H2, M1, M2
1066 /// - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1067 /// sent by another event loop.
1068 ///
1069 /// Do not expose direct access to reader or writer pipes.
1070 ///
1071 /// The struct is clone-able so that different event loops can talk to the other end.
1072 #[derive(Clone)]
1073 pub struct MultiPartMessagePipe {
1074 // Lock protected pipe to receive messages.
1075 reader: Arc<Mutex<PipeConnection>>,
1076 // Lock protected pipe to send messages.
1077 writer: Arc<Mutex<PipeConnection>>,
1078 // Whether this end is created as server or client. The variable helps to
1079 // decide if something meanigful should be done when `wait_for_connection` is called.
1080 is_server: bool,
1081 // Always true if pipe is created as client.
1082 // Defaults to false on server. Updated to true on calling `wait_for_connection`
1083 // after a client connects.
1084 is_connected: Arc<AtomicBool>,
1085 }
1086
1087 impl MultiPartMessagePipe {
create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self>1088 fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1089 Ok(Self {
1090 reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1091 writer: Arc::new(Mutex::new(pipe)),
1092 is_server,
1093 is_connected: Arc::new(AtomicBool::new(false)),
1094 })
1095 }
1096
1097 /// Create server side of MutiPartMessagePipe.
1098 /// # Safety
1099 /// `pipe` must be a server named pipe.
1100 #[deny(unsafe_op_in_unsafe_fn)]
create_from_server_pipe(pipe: PipeConnection) -> Result<Self>1101 pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1102 Self::create_from_pipe(pipe, true)
1103 }
1104
1105 /// Create client side of MutiPartMessagePipe.
create_as_client(pipe_name: &str) -> Result<Self>1106 pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1107 let pipe = create_client_pipe(
1108 &format!(r"\\.\pipe\{}", pipe_name),
1109 &FramingMode::Message,
1110 &BlockingMode::Wait,
1111 /* overlapped= */ true,
1112 )?;
1113 Self::create_from_pipe(pipe, false)
1114 }
1115
1116 /// Create server side of MutiPartMessagePipe.
create_as_server(pipe_name: &str) -> Result<Self>1117 pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1118 let pipe = create_server_pipe(
1119 &format!(r"\\.\pipe\{}", pipe_name,),
1120 &FramingMode::Message,
1121 &BlockingMode::Wait,
1122 0,
1123 1024 * 1024,
1124 true,
1125 )?;
1126 // SAFETY: `pipe` is a server named pipe.
1127 unsafe { Self::create_from_server_pipe(pipe) }
1128 }
1129
1130 /// If the struct is created as a server then waits for client connection to arrive.
1131 /// It only waits on reader as reader and writer are clones.
wait_for_connection(&self) -> Result<()>1132 pub fn wait_for_connection(&self) -> Result<()> {
1133 if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1134 self.reader.lock().wait_for_client_connection()?;
1135 self.is_connected.store(true, Ordering::Relaxed);
1136 }
1137 Ok(())
1138 }
1139
write_overlapped_blocking_message_internal<T: PipeSendable>( pipe: &mut PipeConnection, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1140 fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1141 pipe: &mut PipeConnection,
1142 buf: &[T],
1143 overlapped_wrapper: &mut OverlappedWrapper,
1144 ) -> Result<()> {
1145 // Safety:
1146 // `buf` and `overlapped_wrapper` will be in use for the duration of
1147 // the overlapped operation. These must not be reused and must live until
1148 // after `get_overlapped_result()` has been called which is done right
1149 // after this call.
1150 unsafe {
1151 pipe.write_overlapped(buf, overlapped_wrapper)?;
1152 }
1153
1154 let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1155
1156 if size_written_in_bytes as usize != buf.len() {
1157 return Err(std::io::Error::new(
1158 std::io::ErrorKind::UnexpectedEof,
1159 format!(
1160 "Short write expected:{} found:{}",
1161 size_written_in_bytes,
1162 buf.len(),
1163 ),
1164 ));
1165 }
1166 Ok(())
1167 }
1168 /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
write_overlapped_blocking_message<T: PipeSendable>( &self, header: &[T], message: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1169 pub fn write_overlapped_blocking_message<T: PipeSendable>(
1170 &self,
1171 header: &[T],
1172 message: &[T],
1173 overlapped_wrapper: &mut OverlappedWrapper,
1174 ) -> Result<()> {
1175 let mut writer = self.writer.lock();
1176 Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1177 Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1178 }
1179
1180 /// Reads a variable size message and returns the message on success.
1181 /// The size of the message is expected to proceed the message in
1182 /// the form of `header_size` message.
1183 ///
1184 /// `parse_message_size` lets caller parse the header to extract
1185 /// message size.
1186 ///
1187 /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>1188 pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1189 &self,
1190 header_size: usize,
1191 parse_message_size: F,
1192 overlapped_wrapper: &mut OverlappedWrapper,
1193 exit_event: &Event,
1194 ) -> Result<Vec<u8>> {
1195 let mut pipe = self.reader.lock();
1196 let mut header = vec![0; header_size];
1197 header.resize_with(header_size, Default::default);
1198 pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1199 let message_size = parse_message_size(&header);
1200 if message_size == 0 {
1201 return Ok(vec![]);
1202 }
1203 let mut buf = vec![];
1204 buf.resize_with(message_size, Default::default);
1205 pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1206 Ok(buf)
1207 }
1208
1209 /// Returns the inner named pipe if the current struct is the sole owner of the underlying
1210 /// named pipe.
1211 ///
1212 /// Otherwise, [`None`] is returned and the struct is dropped.
1213 ///
1214 /// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1215 /// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1216 /// possible that all of them will result in [`None`]. This is Due to Rust version
1217 /// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1218 /// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1219 ///
1220 /// If the underlying named pipe is a server named pipe, this method allows the caller to
1221 /// terminate the connection by first flushing the named pipe then disconnecting the clients
1222 /// idiomatically per
1223 /// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
into_inner_pipe(self) -> Option<PipeConnection>1224 pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1225 let piper = Arc::clone(&self.reader);
1226 drop(self);
1227 Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1228 }
1229 }
1230
1231 impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1232 type Error = std::io::Error;
try_from(pipe: PipeConnection) -> Result<Self>1233 fn try_from(pipe: PipeConnection) -> Result<Self> {
1234 Self::create_from_pipe(pipe, false)
1235 }
1236 }
1237
1238 #[cfg(test)]
1239 mod tests {
1240 use std::mem::size_of;
1241 use std::thread::JoinHandle;
1242 use std::time::Duration;
1243
1244 use super::*;
1245
1246 #[test]
duplex_pipe_stream()1247 fn duplex_pipe_stream() {
1248 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1249
1250 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1251 // SAFETY: trivially safe with pipe created and return value checked.
1252 unsafe {
1253 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1254 println!("{}", dir);
1255
1256 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1257
1258 // Smaller than what we sent so we get multiple chunks
1259 let mut recv_buffer: [u8; 4] = [0; 4];
1260
1261 let mut size = receiver.read(&mut recv_buffer).unwrap();
1262 assert_eq!(size, 4);
1263 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1264
1265 size = receiver.read(&mut recv_buffer).unwrap();
1266 assert_eq!(size, 2);
1267 assert_eq!(recv_buffer[0..2], [76, 65]);
1268 }
1269 }
1270 }
1271
1272 #[test]
available_byte_count_byte_mode()1273 fn available_byte_count_byte_mode() {
1274 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1275 p1.write(&[1, 23, 45]).unwrap();
1276 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1277
1278 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1279 // yield the same value.
1280 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1281 }
1282
1283 #[test]
available_byte_count_message_mode()1284 fn available_byte_count_message_mode() {
1285 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1286 p1.write(&[1, 23, 45]).unwrap();
1287 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1288
1289 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1290 // yield the same value.
1291 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1292 }
1293
1294 #[test]
available_byte_count_message_mode_multiple_messages()1295 fn available_byte_count_message_mode_multiple_messages() {
1296 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1297 p1.write(&[1, 2, 3]).unwrap();
1298 p1.write(&[4, 5]).unwrap();
1299 assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1300 }
1301
1302 #[test]
duplex_pipe_message()1303 fn duplex_pipe_message() {
1304 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1305
1306 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1307 // SAFETY: trivially safe with pipe created and return value checked.
1308 unsafe {
1309 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1310 println!("{}", dir);
1311
1312 // Send 2 messages so that we can check that message framing works
1313 sender.write(&[1, 23, 45]).unwrap();
1314 sender.write(&[67, 89, 10]).unwrap();
1315
1316 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1317
1318 let mut size = receiver.read(&mut recv_buffer).unwrap();
1319 assert_eq!(size, 3);
1320 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1321
1322 size = receiver.read(&mut recv_buffer).unwrap();
1323 assert_eq!(size, 3);
1324 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1325 }
1326 }
1327 }
1328
1329 #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1330 fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1331 let mut recv_buffer: [u8; 1] = [0; 1];
1332
1333 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1334 // SAFETY: trivially safe with PipeConnection created and return value checked.
1335 unsafe {
1336 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1337 println!("{}", dir);
1338 sender.write(&[1]).unwrap();
1339 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1340 assert_eq!(
1341 receiver.read(&mut recv_buffer).unwrap_err().kind(),
1342 std::io::ErrorKind::WouldBlock
1343 );
1344 }
1345 }
1346 }
1347
1348 #[test]
duplex_nowait()1349 fn duplex_nowait() {
1350 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1351 duplex_nowait_helper(&p1, &p2);
1352 }
1353
1354 #[test]
duplex_nowait_set_after_creation()1355 fn duplex_nowait_set_after_creation() {
1356 // Tests non blocking setting after pipe creation
1357 let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1358 p1.set_blocking(&BlockingMode::NoWait)
1359 .expect("Failed to set blocking mode on pipe p1");
1360 p2.set_blocking(&BlockingMode::NoWait)
1361 .expect("Failed to set blocking mode on pipe p2");
1362 duplex_nowait_helper(&p1, &p2);
1363 }
1364
1365 #[test]
duplex_overlapped()1366 fn duplex_overlapped() {
1367 let pipe_name = generate_pipe_name();
1368
1369 let mut p1 = create_server_pipe(
1370 &pipe_name,
1371 &FramingMode::Message,
1372 &BlockingMode::Wait,
1373 /* timeout= */ 0,
1374 /* buffer_size= */ 1000,
1375 /* overlapped= */ true,
1376 )
1377 .unwrap();
1378
1379 let mut p2 = create_client_pipe(
1380 &pipe_name,
1381 &FramingMode::Message,
1382 &BlockingMode::Wait,
1383 /* overlapped= */ true,
1384 )
1385 .unwrap();
1386
1387 // SAFETY:
1388 // Safe because `read_overlapped` can be called since overlapped struct is created.
1389 unsafe {
1390 let mut p1_overlapped_wrapper =
1391 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1392 p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1393 .unwrap();
1394 let size = p1
1395 .get_overlapped_result(&mut p1_overlapped_wrapper)
1396 .unwrap();
1397 assert_eq!(size, 6);
1398
1399 let mut recv_buffer: [u8; 6] = [0; 6];
1400
1401 let mut p2_overlapped_wrapper =
1402 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1403 p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1404 .unwrap();
1405 let size = p2
1406 .get_overlapped_result(&mut p2_overlapped_wrapper)
1407 .unwrap();
1408 assert_eq!(size, 6);
1409 assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1410 }
1411 }
1412
1413 #[test]
duplex_overlapped_test_in_use()1414 fn duplex_overlapped_test_in_use() {
1415 let pipe_name = generate_pipe_name();
1416
1417 let mut p1 = create_server_pipe(
1418 &pipe_name,
1419 &FramingMode::Message,
1420 &BlockingMode::Wait,
1421 /* timeout= */ 0,
1422 /* buffer_size= */ 1000,
1423 /* overlapped= */ true,
1424 )
1425 .unwrap();
1426
1427 let mut p2 = create_client_pipe(
1428 &pipe_name,
1429 &FramingMode::Message,
1430 &BlockingMode::Wait,
1431 /* overlapped= */ true,
1432 )
1433 .unwrap();
1434 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1435
1436 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1437 assert!(res.is_err());
1438
1439 let data = vec![75, 77, 54, 82, 76, 65];
1440 // SAFETY: safe because: data & overlapped wrapper live until the
1441 // operation is verified completed below.
1442 let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1443 assert!(res.is_ok());
1444
1445 let res =
1446 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1447 // will error out.
1448 unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1449 assert!(res.is_err());
1450
1451 let mut recv_buffer: [u8; 6] = [0; 6];
1452 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1453 // will error out.
1454 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1455 assert!(res.is_err());
1456
1457 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1458 assert!(res.is_ok());
1459
1460 let mut recv_buffer: [u8; 6] = [0; 6];
1461 // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1462 // operation is verified completed below.
1463 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1464 assert!(res.is_ok());
1465 let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1466 assert!(res.is_ok());
1467 }
1468
generate_pipe_name() -> String1469 fn generate_pipe_name() -> String {
1470 format!(
1471 r"\\.\pipe\test-ipc-pipe-name.rand{}",
1472 rand::thread_rng().gen::<u64>(),
1473 )
1474 }
1475
send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()>1476 fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1477 let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1478 std::thread::spawn(move || {
1479 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1480 let exit_event = Event::new().unwrap();
1481 for _i in 0..msg_count {
1482 let message = *messages
1483 .get(rand::thread_rng().gen::<usize>() % messages.len())
1484 .unwrap();
1485 pipe.write_overlapped_blocking_message(
1486 &message.len().to_be_bytes(),
1487 message.as_bytes(),
1488 &mut overlapped_wrapper,
1489 )
1490 .unwrap();
1491 }
1492 for _i in 0..msg_count {
1493 let message = pipe
1494 .read_overlapped_blocking_message(
1495 size_of::<usize>(),
1496 |bytes: &[u8]| {
1497 assert_eq!(bytes.len(), size_of::<usize>());
1498 usize::from_be_bytes(
1499 bytes.try_into().expect("failed to get array from slice"),
1500 )
1501 },
1502 &mut overlapped_wrapper,
1503 &exit_event,
1504 )
1505 .unwrap();
1506 assert_eq!(
1507 *messages.get(message.len() - 1).unwrap(),
1508 std::str::from_utf8(&message).unwrap(),
1509 );
1510 }
1511 })
1512 }
1513
1514 #[test]
multipart_message_smoke_test()1515 fn multipart_message_smoke_test() {
1516 let pipe_name = generate_pipe_name();
1517 let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1518 let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1519 let handles = [
1520 send_receive_msgs(server.clone(), 100),
1521 send_receive_msgs(client.clone(), 100),
1522 send_receive_msgs(server, 100),
1523 send_receive_msgs(client, 100),
1524 ];
1525 for h in handles {
1526 h.join().unwrap();
1527 }
1528 }
1529
1530 #[test]
multipart_message_into_inner_pipe()1531 fn multipart_message_into_inner_pipe() {
1532 let pipe_name = generate_pipe_name();
1533 let mut pipe = create_server_pipe(
1534 &format!(r"\\.\pipe\{}", pipe_name),
1535 &FramingMode::Message,
1536 &BlockingMode::Wait,
1537 0,
1538 1024 * 1024,
1539 true,
1540 )
1541 .expect("should create the server pipe with success");
1542 let server1 = {
1543 let pipe = pipe
1544 .try_clone()
1545 .expect("should duplicate the named pipe with success");
1546 // SAFETY: `pipe` is a server named pipe.
1547 unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1548 .expect("should create the multipart message pipe with success")
1549 };
1550 let server2 = server1.clone();
1551 assert!(
1552 server2.into_inner_pipe().is_none(),
1553 "not the last reference, should be None"
1554 );
1555 let inner_pipe = server1
1556 .into_inner_pipe()
1557 .expect("the last reference, should return the underlying pipe");
1558 // CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1559 // that API to compare if 2 handles are the same.
1560 pipe.set_blocking(&BlockingMode::NoWait)
1561 .expect("should set the blocking mode on the original pipe with success");
1562 assert_eq!(
1563 pipe.get_info()
1564 .expect("should get the pipe information on the original pipe successfully"),
1565 inner_pipe
1566 .get_info()
1567 .expect("should get the pipe information on the inner pipe successfully")
1568 );
1569 pipe.set_blocking(&BlockingMode::Wait)
1570 .expect("should set the blocking mode on the original pipe with success");
1571 assert_eq!(
1572 pipe.get_info()
1573 .expect("should get the pipe information on the original pipe successfully"),
1574 inner_pipe
1575 .get_info()
1576 .expect("should get the pipe information on the inner pipe successfully")
1577 );
1578 }
1579
1580 #[test]
test_wait_for_connection_blocking()1581 fn test_wait_for_connection_blocking() {
1582 let pipe_name = generate_pipe_name();
1583
1584 let mut server_pipe = create_server_pipe(
1585 &pipe_name,
1586 &FramingMode::Message,
1587 &BlockingMode::Wait,
1588 /* timeout= */ 0,
1589 /* buffer_size= */ 1000,
1590 /* overlapped= */ true,
1591 )
1592 .unwrap();
1593
1594 let server = crate::thread::spawn_with_timeout(move || {
1595 let exit_event = Event::new().unwrap();
1596 server_pipe
1597 .wait_for_client_connection_overlapped_blocking(&exit_event)
1598 .unwrap();
1599 });
1600
1601 let _client = create_client_pipe(
1602 &pipe_name,
1603 &FramingMode::Message,
1604 &BlockingMode::Wait,
1605 /* overlapped= */ true,
1606 )
1607 .unwrap();
1608 server.try_join(Duration::from_secs(10)).unwrap();
1609 }
1610
1611 #[test]
test_wait_for_connection_blocking_exit_triggered()1612 fn test_wait_for_connection_blocking_exit_triggered() {
1613 let pipe_name = generate_pipe_name();
1614
1615 let mut server_pipe = create_server_pipe(
1616 &pipe_name,
1617 &FramingMode::Message,
1618 &BlockingMode::Wait,
1619 /* timeout= */ 0,
1620 /* buffer_size= */ 1000,
1621 /* overlapped= */ true,
1622 )
1623 .unwrap();
1624
1625 let exit_event = Event::new().unwrap();
1626 let exit_event_for_server = exit_event.try_clone().unwrap();
1627 let server = crate::thread::spawn_with_timeout(move || {
1628 assert!(server_pipe
1629 .wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1630 .is_err());
1631 });
1632 exit_event.signal().unwrap();
1633 server.try_join(Duration::from_secs(10)).unwrap();
1634 }
1635 }
1636