1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicUsize;
14 use std::sync::atomic::Ordering;
15
16 use rand::Rng;
17 use serde::Deserialize;
18 use serde::Serialize;
19 use win_util::fail_if_zero;
20 use win_util::SecurityAttributes;
21 use win_util::SelfRelativeSecurityDescriptor;
22 use winapi::shared::minwindef::DWORD;
23 use winapi::shared::minwindef::FALSE;
24 use winapi::shared::minwindef::TRUE;
25 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
26 use winapi::shared::winerror::ERROR_IO_PENDING;
27 use winapi::shared::winerror::ERROR_NO_DATA;
28 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
29 use winapi::um::errhandlingapi::GetLastError;
30 use winapi::um::fileapi::FlushFileBuffers;
31 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
32 use winapi::um::ioapiset::CancelIoEx;
33 use winapi::um::ioapiset::GetOverlappedResult;
34 use winapi::um::minwinbase::OVERLAPPED;
35 use winapi::um::namedpipeapi::ConnectNamedPipe;
36 use winapi::um::namedpipeapi::DisconnectNamedPipe;
37 use winapi::um::namedpipeapi::GetNamedPipeInfo;
38 use winapi::um::namedpipeapi::PeekNamedPipe;
39 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
40 use winapi::um::winbase::CreateNamedPipeA;
41 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
42 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
43 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
44 use winapi::um::winbase::PIPE_NOWAIT;
45 use winapi::um::winbase::PIPE_READMODE_BYTE;
46 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
47 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
48 use winapi::um::winbase::PIPE_TYPE_BYTE;
49 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
50 use winapi::um::winbase::PIPE_WAIT;
51 use winapi::um::winbase::SECURITY_IDENTIFICATION;
52
53 use super::RawDescriptor;
54 use crate::descriptor::AsRawDescriptor;
55 use crate::descriptor::FromRawDescriptor;
56 use crate::descriptor::IntoRawDescriptor;
57 use crate::descriptor::SafeDescriptor;
58 use crate::Event;
59 use crate::EventToken;
60 use crate::WaitContext;
61
62 /// The default buffer size for all named pipes in the system. If this size is too small, writers
63 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
64 ///
65 /// The general rule is this should be *at least* as big as the largest message, otherwise
66 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
67 /// crate::platform::StreamChannel, which expects to be able to make a complete write before releasing
68 /// a lock that the opposite side needs to complete a read. This means that if the buffer is too
69 /// small:
70 /// * The writer can't complete its write and release the lock because the buffer is too small.
71 /// * The reader can't start reading because the lock is held by the writer, so it can't
72 /// relieve buffer pressure. Note that for message pipes, the reader couldn't do anything
73 /// to help anyway, because a message mode pipe should NOT have a partial read (which is
74 /// what we would need to relieve pressure).
75 /// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
76 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
77
78 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
79
80 /// Represents one end of a named pipe
81 #[derive(Serialize, Deserialize, Debug)]
82 pub struct PipeConnection {
83 handle: SafeDescriptor,
84 framing_mode: FramingMode,
85 blocking_mode: BlockingMode,
86 }
87
88 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
89 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
90 pub struct OverlappedWrapper {
91 // Allocated on the heap so that the OVERLAPPED struct doesn't move when performing I/O
92 // operations.
93 overlapped: Box<OVERLAPPED>,
94 // This field prevents the event handle from being dropped too early and allows callers to
95 // be notified when a read or write overlapped operation has completed.
96 h_event: Option<Event>,
97 in_use: bool,
98 }
99
100 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>101 pub fn get_h_event_ref(&self) -> Option<&Event> {
102 self.h_event.as_ref()
103 }
104
105 /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
106 /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
107 /// returned must not be dropped.
108 ///
109 /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
110 /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
111 /// handle will be signaled when the operation is complete. In other words, you can use
112 /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
113 /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>114 pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
115 let mut overlapped = OVERLAPPED::default();
116 let h_event = if include_event {
117 Some(Event::new()?)
118 } else {
119 None
120 };
121
122 overlapped.hEvent = if let Some(event) = h_event.as_ref() {
123 event.as_raw_descriptor()
124 } else {
125 0 as RawDescriptor
126 };
127
128 Ok(OverlappedWrapper {
129 overlapped: Box::new(overlapped),
130 h_event,
131 in_use: false,
132 })
133 }
134 }
135
136 // Safe because all of the contained fields may be safely sent to another thread.
137 unsafe impl Send for OverlappedWrapper {}
138
139 pub trait WriteOverlapped {
140 /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
141 /// If successful, the write operation will complete asynchronously, and
142 /// `write_result()` should be called to get the result.
143 ///
144 /// # Safety
145 /// `buf` and `overlapped_wrapper` will be in use for the duration of
146 /// the overlapped operation. These must not be reused and must live until
147 /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>148 unsafe fn write_overlapped(
149 &mut self,
150 buf: &mut [u8],
151 overlapped_wrapper: &mut OverlappedWrapper,
152 ) -> io::Result<()>;
153
154 /// Gets the result of the overlapped write operation. Must only be called
155 /// after issuing an overlapped write operation using `write_overlapped`. The
156 /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>157 fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
158
159 /// Tries to get the result of the overlapped write operation. Must only be
160 /// called once, and only after issuing an overlapped write operation using
161 /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
162 ///
163 /// An error indicates that the operation hasn't completed yet and
164 /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>165 fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
166 -> io::Result<usize>;
167 }
168
169 pub trait ReadOverlapped {
170 /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
171 /// If successful, the read operation will complete asynchronously, and
172 /// `read_result()` should be called to get the result.
173 ///
174 /// # Safety
175 /// `buf` and `overlapped_wrapper` will be in use for the duration of
176 /// the overlapped operation. These must not be reused and must live until
177 /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>178 unsafe fn read_overlapped(
179 &mut self,
180 buf: &mut [u8],
181 overlapped_wrapper: &mut OverlappedWrapper,
182 ) -> io::Result<()>;
183
184 /// Gets the result of the overlapped read operation. Must only be called
185 /// once, and only after issuing an overlapped read operation using
186 /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>187 fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
188
189 /// Tries to get the result of the overlapped read operation. Must only be called
190 /// after issuing an overlapped read operation using `read_overlapped`. The
191 /// same `overlapped_wrapper` must be provided.
192 ///
193 /// An error indicates that the operation hasn't completed yet and
194 /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>195 fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
196 }
197
198 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
199 pub enum FramingMode {
200 Byte,
201 Message,
202 }
203
204 impl FramingMode {
to_readmode(self) -> DWORD205 fn to_readmode(self) -> DWORD {
206 match self {
207 FramingMode::Message => PIPE_READMODE_MESSAGE,
208 FramingMode::Byte => PIPE_READMODE_BYTE,
209 }
210 }
211
to_pipetype(self) -> DWORD212 fn to_pipetype(self) -> DWORD {
213 match self {
214 FramingMode::Message => PIPE_TYPE_MESSAGE,
215 FramingMode::Byte => PIPE_TYPE_BYTE,
216 }
217 }
218 }
219
220 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
221 pub enum BlockingMode {
222 /// Calls to read() block until data is received
223 Wait,
224 /// Calls to read() return immediately even if there is nothing read with error code 232
225 /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
226 ///
227 /// NOTE: This mode is discouraged by the Windows API documentation.
228 NoWait,
229 }
230
231 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD232 fn from(blocking_mode: &BlockingMode) -> DWORD {
233 match blocking_mode {
234 BlockingMode::Wait => PIPE_WAIT,
235 BlockingMode::NoWait => PIPE_NOWAIT,
236 }
237 }
238 }
239
240 /// Sets the handle state for a named pipe in a rust friendly way.
241 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>242 unsafe fn set_named_pipe_handle_state(
243 pipe_handle: RawDescriptor,
244 client_mode: &mut DWORD,
245 ) -> Result<()> {
246 // Safe when the pipe handle is open. Safety also requires checking the return value, which we
247 // do below.
248 let success_flag = SetNamedPipeHandleState(
249 /* hNamedPipe= */ pipe_handle,
250 /* lpMode= */ client_mode,
251 /* lpMaxCollectionCount= */ ptr::null_mut(),
252 /* lpCollectDataTimeout= */ ptr::null_mut(),
253 );
254 if success_flag == 0 {
255 Err(io::Error::last_os_error())
256 } else {
257 Ok(())
258 }
259 }
260
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>261 pub fn pair(
262 framing_mode: &FramingMode,
263 blocking_mode: &BlockingMode,
264 timeout: u64,
265 ) -> Result<(PipeConnection, PipeConnection)> {
266 pair_with_buffer_size(
267 framing_mode,
268 blocking_mode,
269 timeout,
270 DEFAULT_BUFFER_SIZE,
271 false,
272 )
273 }
274
275 /// Creates a pair of handles connected to either end of a duplex named pipe.
276 ///
277 /// The pipe created will have a semi-random name and a default set of security options that
278 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
279 /// remote clients, allow only a single server instance, and prevent impersonation by the server
280 /// end of the pipe.
281 ///
282 /// # Arguments
283 ///
284 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
285 /// automatically framed sequence of messages (Message). In message mode it's an
286 /// error to read fewer bytes than were sent in a message from the other end of
287 /// the pipe.
288 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
289 /// return immediately if there is nothing available (NoWait).
290 /// * `timeout` - A timeout to apply for socket operations, in milliseconds.
291 /// Setting this to zero will create sockets with the system
292 /// default timeout.
293 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
294 /// buffer automatically as needed, except in the case of NOWAIT pipes, where
295 /// it will just fail writes that don't fit in the buffer.
296 /// # Return value
297 ///
298 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
299 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>300 pub fn pair_with_buffer_size(
301 framing_mode: &FramingMode,
302 blocking_mode: &BlockingMode,
303 timeout: u64,
304 buffer_size: usize,
305 overlapped: bool,
306 ) -> Result<(PipeConnection, PipeConnection)> {
307 // Give the pipe a unique name to avoid accidental collisions
308 let pipe_name = format!(
309 r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
310 process::id(),
311 NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
312 rand::thread_rng().gen::<u32>(),
313 );
314
315 let server_end = create_server_pipe(
316 &pipe_name,
317 framing_mode,
318 blocking_mode,
319 timeout,
320 buffer_size,
321 overlapped,
322 )?;
323
324 // Open the named pipe we just created as the client
325 let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
326
327 // Accept the client's connection
328 // Not sure if this is strictly needed but I'm doing it just in case.
329 // We expect at this point that the client will already be connected,
330 // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
331 // It's also OK if we get a return code of success.
332 server_end.wait_for_client_connection()?;
333
334 Ok((server_end, client_end))
335 }
336
337 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
338 /// settings.
339 ///
340 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
341 ///
342 /// # Arguments
343 ///
344 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
345 /// `\\.\pipe\<some-name>`.
346 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
347 /// automatically framed sequence of messages (Message). In message mode it's an
348 /// error to read fewer bytes than were sent in a message from the other end of
349 /// the pipe.
350 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
351 /// return immediately if there is nothing available (NoWait).
352 /// * `timeout` - A timeout to apply for socket operations, in milliseconds.
353 /// Setting this to zero will create sockets with the system
354 /// default timeout.
355 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
356 /// buffer automatically as needed, except in the case of NOWAIT pipes, where
357 /// it will just fail writes that don't fit in the buffer.
358 /// * `overlapped` - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>359 pub fn create_server_pipe(
360 pipe_name: &str,
361 framing_mode: &FramingMode,
362 blocking_mode: &BlockingMode,
363 timeout: u64,
364 buffer_size: usize,
365 overlapped: bool,
366 ) -> Result<PipeConnection> {
367 let c_pipe_name = CString::new(pipe_name).unwrap();
368
369 let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
370 if overlapped {
371 open_mode_flags |= FILE_FLAG_OVERLAPPED
372 }
373
374 // This sets flags so there will be an error if >1 instance (server end)
375 // of this pipe name is opened because we expect exactly one.
376 let server_handle = unsafe {
377 // Safe because security attributes are valid, pipe_name is valid C string,
378 // and we're checking the return code
379 CreateNamedPipeA(
380 c_pipe_name.as_ptr(),
381 /* dwOpenMode= */
382 open_mode_flags,
383 /* dwPipeMode= */
384 framing_mode.to_pipetype()
385 | framing_mode.to_readmode()
386 | DWORD::from(blocking_mode)
387 | PIPE_REJECT_REMOTE_CLIENTS,
388 /* nMaxInstances= */ 1,
389 /* nOutBufferSize= */ buffer_size as DWORD,
390 /* nInBufferSize= */ buffer_size as DWORD,
391 /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
392 /* lpSecurityAttributes= */
393 SecurityAttributes::new_with_security_descriptor(
394 SelfRelativeSecurityDescriptor::get_singleton(),
395 /* inherit= */ true,
396 )
397 .as_mut(),
398 )
399 };
400
401 if server_handle == INVALID_HANDLE_VALUE {
402 Err(io::Error::last_os_error())
403 } else {
404 unsafe {
405 Ok(PipeConnection {
406 handle: SafeDescriptor::from_raw_descriptor(server_handle),
407 framing_mode: *framing_mode,
408 blocking_mode: *blocking_mode,
409 })
410 }
411 }
412 }
413
414 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
415 /// settings.
416 ///
417 /// The pipe will be set to prevent impersonation of the client by the server process.
418 ///
419 /// # Arguments
420 ///
421 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
422 /// `\\.\pipe\<some-name>`.
423 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
424 /// automatically framed sequence of messages (Message). In message mode it's an
425 /// error to read fewer bytes than were sent in a message from the other end of
426 /// the pipe.
427 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
428 /// return immediately if there is nothing available (NoWait).
429 /// * `overlapped` - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>430 pub fn create_client_pipe(
431 pipe_name: &str,
432 framing_mode: &FramingMode,
433 blocking_mode: &BlockingMode,
434 overlapped: bool,
435 ) -> Result<PipeConnection> {
436 let client_handle = OpenOptions::new()
437 .read(true)
438 .write(true)
439 .create(true)
440 .security_qos_flags(SECURITY_IDENTIFICATION)
441 .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
442 .open(pipe_name)?
443 .into_raw_descriptor();
444
445 let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
446
447 // Safe because client_handle's open() call did not return an error.
448 unsafe {
449 set_named_pipe_handle_state(client_handle, &mut client_mode)?;
450 }
451
452 Ok(PipeConnection {
453 // Safe because client_handle is valid
454 handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
455 framing_mode: *framing_mode,
456 blocking_mode: *blocking_mode,
457 })
458 }
459
460 // This is used to mark types which can be appropriately sent through the
461 // generic helper functions write_to_pipe and read_from_pipe.
462 pub trait PipeSendable {
463 // Default values used to fill in new empty indexes when resizing a buffer to
464 // a larger size.
default() -> Self465 fn default() -> Self;
466 }
467 impl PipeSendable for u8 {
default() -> Self468 fn default() -> Self {
469 0
470 }
471 }
472 impl PipeSendable for RawDescriptor {
default() -> Self473 fn default() -> Self {
474 ptr::null_mut()
475 }
476 }
477
478 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>479 pub fn try_clone(&self) -> Result<PipeConnection> {
480 let copy_handle = self.handle.try_clone()?;
481 Ok(PipeConnection {
482 handle: copy_handle,
483 framing_mode: self.framing_mode,
484 blocking_mode: self.blocking_mode,
485 })
486 }
487
488 /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
489 /// blocking modes.
490 ///
491 /// # Safety
492 /// 1. rd is valid and ownership is transferred to this function when it is called.
493 ///
494 /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
495 /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection496 pub unsafe fn from_raw_descriptor(
497 rd: RawDescriptor,
498 framing_mode: FramingMode,
499 blocking_mode: BlockingMode,
500 ) -> PipeConnection {
501 PipeConnection {
502 handle: SafeDescriptor::from_raw_descriptor(rd),
503 framing_mode,
504 blocking_mode,
505 }
506 }
507
508 /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
509 /// Returns the number of bytes (not values) read.
510 ///
511 /// # Safety
512 ///
513 /// This is safe only when the following conditions hold:
514 /// 1. The data on the other end of the pipe is a valid binary representation of data for
515 /// type T, and
516 /// 2. The number of bytes read is a multiple of the size of T; this must be checked by
517 /// the caller.
518 /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
519 /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>520 pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
521 PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
522 }
523
524 /// Similar to `PipeConnection::read` except it also allows:
525 /// 1. The same end of the named pipe to read and write at the same time in different
526 /// threads.
527 /// 2. Asynchronous read and write (read and write won't block).
528 ///
529 /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
530 /// (can be created with `OverlappedWrapper::new`) will be passed into
531 /// `ReadFile`. That event will be triggered when the read operation is complete.
532 ///
533 /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
534 /// also help with waiting until the read operation is complete.
535 ///
536 /// # Safety
537 ///
538 /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
539 /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>540 pub unsafe fn read_overlapped<T: PipeSendable>(
541 &mut self,
542 buf: &mut [T],
543 overlapped_wrapper: &mut OverlappedWrapper,
544 ) -> Result<()> {
545 if overlapped_wrapper.in_use {
546 return Err(std::io::Error::new(
547 std::io::ErrorKind::InvalidInput,
548 "Overlapped struct already in use",
549 ));
550 }
551 overlapped_wrapper.in_use = true;
552
553 PipeConnection::read_internal(
554 &self.handle,
555 self.blocking_mode,
556 buf,
557 Some(&mut overlapped_wrapper.overlapped),
558 )?;
559 Ok(())
560 }
561
562 /// Helper for `read_overlapped` and `read`
563 ///
564 /// # Safety
565 /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>566 unsafe fn read_internal<T: PipeSendable>(
567 handle: &SafeDescriptor,
568 blocking_mode: BlockingMode,
569 buf: &mut [T],
570 overlapped: Option<&mut OVERLAPPED>,
571 ) -> Result<usize> {
572 let res = crate::platform::read_file(
573 handle,
574 buf.as_mut_ptr() as *mut u8,
575 mem::size_of_val(buf),
576 overlapped,
577 );
578 match res {
579 Ok(bytes_read) => Ok(bytes_read),
580 Err(e)
581 if blocking_mode == BlockingMode::NoWait
582 && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
583 {
584 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
585 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
586 // correct. For further details see:
587 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
588 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
589 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
590 }
591 Err(e) => Err(e),
592 }
593 }
594
595 /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
596 /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>597 pub fn read_overlapped_blocking<T: PipeSendable>(
598 &mut self,
599 buf: &mut [T],
600 overlapped_wrapper: &mut OverlappedWrapper,
601 exit_event: &Event,
602 ) -> Result<()> {
603 // Safe because we are providing a valid buffer slice and also providing a valid
604 // overlapped struct.
605 unsafe {
606 self.read_overlapped(buf, overlapped_wrapper)?;
607 };
608
609 #[derive(EventToken)]
610 enum Token {
611 ReadOverlapped,
612 Exit,
613 }
614
615 let wait_ctx = WaitContext::build_with(&[
616 (
617 overlapped_wrapper.get_h_event_ref().unwrap(),
618 Token::ReadOverlapped,
619 ),
620 (exit_event, Token::Exit),
621 ])?;
622
623 let events = wait_ctx.wait()?;
624 for event in events {
625 match event.token {
626 Token::ReadOverlapped => {
627 let size_read_in_bytes =
628 self.get_overlapped_result(overlapped_wrapper)? as usize;
629
630 // If this error shows, most likely the overlapped named pipe was set up
631 // incorrectly.
632 if size_read_in_bytes != buf.len() {
633 return Err(std::io::Error::new(
634 std::io::ErrorKind::UnexpectedEof,
635 "Short read",
636 ));
637 }
638 }
639 Token::Exit => {
640 return Err(std::io::Error::new(
641 std::io::ErrorKind::Interrupted,
642 "IO canceled on exit request",
643 ));
644 }
645 }
646 }
647
648 Ok(())
649 }
650
651 /// Reads a variable size message and returns the message on success.
652 /// The size of the message is expected to proceed the message in
653 /// the form of `header_size` message.
654 ///
655 /// `parse_message_size` lets caller parse the header to extract
656 /// message size.
657 ///
658 /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &mut self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>659 pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
660 &mut self,
661 header_size: usize,
662 parse_message_size: F,
663 overlapped_wrapper: &mut OverlappedWrapper,
664 exit_event: &Event,
665 ) -> Result<Vec<u8>> {
666 let mut header = vec![0; header_size];
667 header.resize_with(header_size, Default::default);
668 self.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
669 let message_size = parse_message_size(&header);
670 if message_size == 0 {
671 return Ok(vec![]);
672 }
673 let mut buf = vec![];
674 buf.resize_with(message_size, Default::default);
675 self.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
676 Ok(buf)
677 }
678
679 /// Gets the size in bytes of data in the pipe.
680 ///
681 /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
682 /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>683 pub fn get_available_byte_count(&self) -> io::Result<u32> {
684 let mut total_bytes_avail: DWORD = 0;
685
686 // Safe because the underlying pipe handle is guaranteed to be open, and the output values
687 // live at valid memory locations.
688 fail_if_zero!(unsafe {
689 PeekNamedPipe(
690 self.as_raw_descriptor(),
691 ptr::null_mut(),
692 0,
693 ptr::null_mut(),
694 &mut total_bytes_avail,
695 ptr::null_mut(),
696 )
697 });
698
699 Ok(total_bytes_avail)
700 }
701
702 /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
703 /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>704 pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
705 // SAFETY: overlapped is None so this is safe.
706 unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
707 }
708
709 /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
710 /// as a failure.
write_overlapped_blocking_message<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>711 pub fn write_overlapped_blocking_message<T: PipeSendable>(
712 &mut self,
713 buf: &[T],
714 overlapped_wrapper: &mut OverlappedWrapper,
715 ) -> Result<()> {
716 // SAFETY: buf & overlapped_wrapper live until the overlapped operation is
717 // complete, so this is safe.
718 unsafe { self.write_overlapped(buf, overlapped_wrapper)? };
719
720 let size_written_in_bytes = self.get_overlapped_result(overlapped_wrapper)?;
721
722 if size_written_in_bytes as usize != buf.len() {
723 return Err(std::io::Error::new(
724 std::io::ErrorKind::UnexpectedEof,
725 format!(
726 "Short write expected:{} found:{}",
727 size_written_in_bytes,
728 buf.len(),
729 ),
730 ));
731 }
732 Ok(())
733 }
734
735 /// Similar to `PipeConnection::write` except it also allows:
736 /// 1. The same end of the named pipe to read and write at the same time in different
737 /// threads.
738 /// 2. Asynchronous read and write (read and write won't block).
739 ///
740 /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
741 /// (can be created with `OverlappedWrapper::new`) will be passed into
742 /// `WriteFile`. That event will be triggered when the write operation is complete.
743 ///
744 /// In order to get how many bytes were written, call `get_overlapped_result`. That function will
745 /// also help with waiting until the write operation is complete. The pipe must be opened in
746 /// overlapped otherwise there may be unexpected behavior.
747 ///
748 /// # Safety
749 /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>750 pub unsafe fn write_overlapped<T: PipeSendable>(
751 &mut self,
752 buf: &[T],
753 overlapped_wrapper: &mut OverlappedWrapper,
754 ) -> Result<()> {
755 if overlapped_wrapper.in_use {
756 return Err(std::io::Error::new(
757 std::io::ErrorKind::InvalidInput,
758 "Overlapped struct already in use",
759 ));
760 }
761 overlapped_wrapper.in_use = true;
762
763 PipeConnection::write_internal(
764 &self.handle,
765 buf,
766 Some(&mut overlapped_wrapper.overlapped),
767 )?;
768 Ok(())
769 }
770
771 /// Helper for `write_overlapped` and `write`.
772 ///
773 /// # Safety
774 /// * Safe if overlapped is None.
775 /// * Safe if overlapped is Some and:
776 /// + buf lives until the overlapped operation is complete.
777 /// + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>778 unsafe fn write_internal<T: PipeSendable>(
779 handle: &SafeDescriptor,
780 buf: &[T],
781 overlapped: Option<&mut OVERLAPPED>,
782 ) -> Result<usize> {
783 // Safe because buf points to memory valid until the write completes and we pass a valid
784 // length for that memory.
785 unsafe {
786 crate::platform::write_file(
787 handle,
788 buf.as_ptr() as *const u8,
789 mem::size_of_val(buf),
790 overlapped,
791 )
792 }
793 }
794
795 /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>796 pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
797 let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
798 self.blocking_mode = *blocking_mode;
799
800 // Safe because the pipe has not been closed (it is managed by this object).
801 unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
802 }
803
804 /// For a server named pipe, waits for a client to connect
wait_for_client_connection(&self) -> Result<()>805 pub fn wait_for_client_connection(&self) -> Result<()> {
806 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
807 self.wait_for_client_connection_internal(
808 &mut overlapped_wrapper,
809 /* should_block = */ true,
810 )
811 }
812
813 /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
814 /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>815 pub fn wait_for_client_connection_overlapped(
816 &self,
817 overlapped_wrapper: &mut OverlappedWrapper,
818 ) -> Result<()> {
819 self.wait_for_client_connection_internal(
820 overlapped_wrapper,
821 /* should_block = */ false,
822 )
823 }
824
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>825 fn wait_for_client_connection_internal(
826 &self,
827 overlapped_wrapper: &mut OverlappedWrapper,
828 should_block: bool,
829 ) -> Result<()> {
830 // Safe because the handle is valid and we're checking the return
831 // code according to the documentation
832 unsafe {
833 let success_flag = ConnectNamedPipe(
834 self.as_raw_descriptor(),
835 // Note: The overlapped structure is only used if the pipe was opened in
836 // OVERLAPPED mode, but is necessary in that case.
837 &mut *overlapped_wrapper.overlapped,
838 );
839 if success_flag == 0 {
840 return match GetLastError() {
841 ERROR_PIPE_CONNECTED => {
842 if !should_block {
843 // If async, make sure the event is signalled to indicate the client
844 // is ready.
845 overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
846 }
847
848 Ok(())
849 }
850 ERROR_IO_PENDING => {
851 if should_block {
852 overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
853 }
854 Ok(())
855 }
856 err => Err(io::Error::from_raw_os_error(err as i32)),
857 };
858 }
859 }
860 Ok(())
861 }
862
863 /// Used for overlapped read and write operations.
864 ///
865 /// This will block until the ReadFile or WriteFile operation that also took in
866 /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
867 /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
868 /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>869 pub fn get_overlapped_result(
870 &mut self,
871 overlapped_wrapper: &mut OverlappedWrapper,
872 ) -> io::Result<u32> {
873 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
874 overlapped_wrapper.in_use = false;
875 res
876 }
877
878 /// Used for overlapped read and write operations.
879 ///
880 /// This will return immediately, regardless of the completion status of the
881 /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
882 /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
883 /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
884 /// that were read or written, if completed. If the operation hasn't
885 /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
886 /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>887 pub fn try_get_overlapped_result(
888 &mut self,
889 overlapped_wrapper: &mut OverlappedWrapper,
890 ) -> io::Result<u32> {
891 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
892 match res {
893 Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
894 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
895 }
896 _ => {
897 overlapped_wrapper.in_use = false;
898 res
899 }
900 }
901 }
902
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>903 fn get_overlapped_result_internal(
904 &mut self,
905 overlapped_wrapper: &mut OverlappedWrapper,
906 wait: bool,
907 ) -> io::Result<u32> {
908 if !overlapped_wrapper.in_use {
909 return Err(std::io::Error::new(
910 std::io::ErrorKind::InvalidInput,
911 "Overlapped struct is not in use",
912 ));
913 }
914 let mut size_transferred = 0;
915 // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
916 // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
917 fail_if_zero!(unsafe {
918 GetOverlappedResult(
919 self.handle.as_raw_descriptor(),
920 &mut *overlapped_wrapper.overlapped,
921 &mut size_transferred,
922 if wait { TRUE } else { FALSE },
923 )
924 });
925
926 Ok(size_transferred)
927 }
928
929 /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
930 /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>931 pub fn cancel_io(&mut self) -> Result<()> {
932 fail_if_zero!(unsafe {
933 CancelIoEx(
934 self.handle.as_raw_descriptor(),
935 /* lpOverlapped= */ std::ptr::null_mut(),
936 )
937 });
938
939 Ok(())
940 }
941
942 /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode943 pub fn get_framing_mode(&self) -> FramingMode {
944 self.framing_mode
945 }
946
947 /// Returns metadata about the connected NamedPipe.
get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo>948 pub fn get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo> {
949 let mut flags: u32 = 0;
950 // Marked mutable because they are mutated in a system call
951 #[allow(unused_mut)]
952 let mut incoming_buffer_size: u32 = 0;
953 #[allow(unused_mut)]
954 let mut outgoing_buffer_size: u32 = 0;
955 #[allow(unused_mut)]
956 let mut max_instances: u32 = 0;
957 // Client side with BYTE type are default flags
958 if is_server_connection {
959 flags |= 0x00000001 /* PIPE_SERVER_END */
960 }
961 if self.framing_mode == FramingMode::Message {
962 flags |= 0x00000004 /* PIPE_TYPE_MESSAGE */
963 }
964 // Safe because we have allocated all pointers and own
965 // them as mutable.
966 fail_if_zero!(unsafe {
967 GetNamedPipeInfo(
968 self.as_raw_descriptor(),
969 flags as *mut u32,
970 outgoing_buffer_size as *mut u32,
971 incoming_buffer_size as *mut u32,
972 max_instances as *mut u32,
973 )
974 });
975
976 Ok(NamedPipeInfo {
977 outgoing_buffer_size,
978 incoming_buffer_size,
979 max_instances,
980 })
981 }
982
983 /// For a server pipe, flush the pipe contents. This will
984 /// block until the pipe is cleared by the client. Only
985 /// call this if you are sure the client is reading the
986 /// data!
flush_data_blocking(&self) -> Result<()>987 pub fn flush_data_blocking(&self) -> Result<()> {
988 // Safe because the only buffers interacted with are
989 // outside of Rust memory
990 fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
991 Ok(())
992 }
993
994 /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>995 pub fn disconnect_clients(&self) -> Result<()> {
996 // Safe because we own the handle passed in and know it will remain valid for the duration
997 // of the call. Discarded buffers are not managed by rust.
998 fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
999 Ok(())
1000 }
1001 }
1002
1003 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1004 fn as_raw_descriptor(&self) -> RawDescriptor {
1005 self.handle.as_raw_descriptor()
1006 }
1007 }
1008
1009 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1010 fn into_raw_descriptor(self) -> RawDescriptor {
1011 self.handle.into_raw_descriptor()
1012 }
1013 }
1014
1015 unsafe impl Send for PipeConnection {}
1016 unsafe impl Sync for PipeConnection {}
1017
1018 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1019 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1020 // This is safe because PipeConnection::read is always safe for u8
1021 unsafe { PipeConnection::read(self, buf) }
1022 }
1023 }
1024
1025 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1026 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1027 PipeConnection::write(self, buf)
1028 }
1029
flush(&mut self) -> io::Result<()>1030 fn flush(&mut self) -> io::Result<()> {
1031 Ok(())
1032 }
1033 }
1034
1035 /// A simple data struct representing
1036 /// metadata about a NamedPipe.
1037 pub struct NamedPipeInfo {
1038 pub outgoing_buffer_size: u32,
1039 pub incoming_buffer_size: u32,
1040 pub max_instances: u32,
1041 }
1042
1043 #[cfg(test)]
1044 mod tests {
1045 use std::mem::size_of;
1046
1047 use super::*;
1048
1049 #[test]
duplex_pipe_stream()1050 fn duplex_pipe_stream() {
1051 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1052
1053 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1054 unsafe {
1055 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1056 println!("{}", dir);
1057
1058 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1059
1060 // Smaller than what we sent so we get multiple chunks
1061 let mut recv_buffer: [u8; 4] = [0; 4];
1062
1063 let mut size = receiver.read(&mut recv_buffer).unwrap();
1064 assert_eq!(size, 4);
1065 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1066
1067 size = receiver.read(&mut recv_buffer).unwrap();
1068 assert_eq!(size, 2);
1069 assert_eq!(recv_buffer[0..2], [76, 65]);
1070 }
1071 }
1072 }
1073
1074 #[test]
available_byte_count_byte_mode()1075 fn available_byte_count_byte_mode() {
1076 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1077 p1.write(&[1, 23, 45]).unwrap();
1078 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1079
1080 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1081 // yield the same value.
1082 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1083 }
1084
1085 #[test]
available_byte_count_message_mode()1086 fn available_byte_count_message_mode() {
1087 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1088 p1.write(&[1, 23, 45]).unwrap();
1089 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1090
1091 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1092 // yield the same value.
1093 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1094 }
1095
1096 #[test]
available_byte_count_message_mode_multiple_messages()1097 fn available_byte_count_message_mode_multiple_messages() {
1098 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1099 p1.write(&[1, 2, 3]).unwrap();
1100 p1.write(&[4, 5]).unwrap();
1101 assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1102 }
1103
1104 #[test]
duplex_pipe_message()1105 fn duplex_pipe_message() {
1106 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1107
1108 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1109 unsafe {
1110 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1111 println!("{}", dir);
1112
1113 // Send 2 messages so that we can check that message framing works
1114 sender.write(&[1, 23, 45]).unwrap();
1115 sender.write(&[67, 89, 10]).unwrap();
1116
1117 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1118
1119 let mut size = receiver.read(&mut recv_buffer).unwrap();
1120 assert_eq!(size, 3);
1121 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1122
1123 size = receiver.read(&mut recv_buffer).unwrap();
1124 assert_eq!(size, 3);
1125 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1126 }
1127 }
1128 }
1129
1130 #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1131 fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1132 let mut recv_buffer: [u8; 1] = [0; 1];
1133
1134 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1135 unsafe {
1136 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1137 println!("{}", dir);
1138 sender.write(&[1]).unwrap();
1139 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1140 assert_eq!(
1141 receiver.read(&mut recv_buffer).unwrap_err().kind(),
1142 std::io::ErrorKind::WouldBlock
1143 );
1144 }
1145 }
1146 }
1147
1148 #[test]
duplex_nowait()1149 fn duplex_nowait() {
1150 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1151 duplex_nowait_helper(&p1, &p2);
1152 }
1153
1154 #[test]
duplex_nowait_set_after_creation()1155 fn duplex_nowait_set_after_creation() {
1156 // Tests non blocking setting after pipe creation
1157 let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1158 p1.set_blocking(&BlockingMode::NoWait)
1159 .expect("Failed to set blocking mode on pipe p1");
1160 p2.set_blocking(&BlockingMode::NoWait)
1161 .expect("Failed to set blocking mode on pipe p2");
1162 duplex_nowait_helper(&p1, &p2);
1163 }
1164
1165 #[test]
duplex_overlapped()1166 fn duplex_overlapped() {
1167 let pipe_name = generate_pipe_name();
1168
1169 let mut p1 = create_server_pipe(
1170 &pipe_name,
1171 &FramingMode::Message,
1172 &BlockingMode::Wait,
1173 /* timeout= */ 0,
1174 /* buffer_size= */ 1000,
1175 /* overlapped= */ true,
1176 )
1177 .unwrap();
1178
1179 let mut p2 = create_client_pipe(
1180 &pipe_name,
1181 &FramingMode::Message,
1182 &BlockingMode::Wait,
1183 /* overlapped= */ true,
1184 )
1185 .unwrap();
1186
1187 // Safe because `read_overlapped` can be called since overlapped struct is created.
1188 unsafe {
1189 let mut p1_overlapped_wrapper =
1190 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1191 p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1192 .unwrap();
1193 let size = p1
1194 .get_overlapped_result(&mut p1_overlapped_wrapper)
1195 .unwrap();
1196 assert_eq!(size, 6);
1197
1198 let mut recv_buffer: [u8; 6] = [0; 6];
1199
1200 let mut p2_overlapped_wrapper =
1201 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1202 p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1203 .unwrap();
1204 let size = p2
1205 .get_overlapped_result(&mut p2_overlapped_wrapper)
1206 .unwrap();
1207 assert_eq!(size, 6);
1208 assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1209 }
1210 }
1211
1212 #[test]
duplex_overlapped_test_in_use()1213 fn duplex_overlapped_test_in_use() {
1214 let pipe_name = generate_pipe_name();
1215
1216 let mut p1 = create_server_pipe(
1217 &pipe_name,
1218 &FramingMode::Message,
1219 &BlockingMode::Wait,
1220 /* timeout= */ 0,
1221 /* buffer_size= */ 1000,
1222 /* overlapped= */ true,
1223 )
1224 .unwrap();
1225
1226 let mut p2 = create_client_pipe(
1227 &pipe_name,
1228 &FramingMode::Message,
1229 &BlockingMode::Wait,
1230 /* overlapped= */ true,
1231 )
1232 .unwrap();
1233 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1234
1235 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1236 assert!(res.is_err());
1237
1238 let data = vec![75, 77, 54, 82, 76, 65];
1239 // SAFETY: safe because: data & overlapped wrapper live until the
1240 // operation is verified completed below.
1241 let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1242 assert!(res.is_ok());
1243
1244 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1245 // will error out.
1246 let res =
1247 unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1248 assert!(res.is_err());
1249
1250 let mut recv_buffer: [u8; 6] = [0; 6];
1251 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1252 // will error out.
1253 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1254 assert!(res.is_err());
1255
1256 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1257 assert!(res.is_ok());
1258
1259 let mut recv_buffer: [u8; 6] = [0; 6];
1260 // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1261 // operation is verified completed below.
1262 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1263 assert!(res.is_ok());
1264 let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1265 assert!(res.is_ok());
1266 }
1267
generate_pipe_name() -> String1268 fn generate_pipe_name() -> String {
1269 format!(
1270 r"\\.\pipe\test-ipc-pipe-name.rand{}",
1271 rand::thread_rng().gen::<u64>(),
1272 )
1273 }
1274
1275 #[test]
read_write_overlapped_message()1276 fn read_write_overlapped_message() {
1277 let pipe_name = generate_pipe_name();
1278
1279 let mut p1 = create_server_pipe(
1280 &pipe_name,
1281 &FramingMode::Message,
1282 &BlockingMode::Wait,
1283 /* timeout= */ 0,
1284 /* buffer_size= */ 1000,
1285 /* overlapped= */ true,
1286 )
1287 .unwrap();
1288
1289 let mut p2 = create_client_pipe(
1290 &pipe_name,
1291 &FramingMode::Message,
1292 &BlockingMode::Wait,
1293 /* overlapped= */ true,
1294 )
1295 .unwrap();
1296
1297 // Safe because `read_overlapped` can be called since overlapped struct is created.
1298 let mut p1_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1299 const MSG: [u8; 6] = [75, 77, 54, 82, 76, 65];
1300 p1.write_overlapped_blocking_message(&MSG.len().to_be_bytes(), &mut p1_overlapped_wrapper)
1301 .unwrap();
1302 p1.write_overlapped_blocking_message(&MSG, &mut p1_overlapped_wrapper)
1303 .unwrap();
1304
1305 let mut p2_overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1306 let exit_event = Event::new().unwrap();
1307 let recv_buffer = p2
1308 .read_overlapped_blocking_message(
1309 size_of::<usize>(),
1310 |buf| usize::from_be_bytes(buf.try_into().expect("failed to get array from slice")),
1311 &mut p2_overlapped_wrapper,
1312 &exit_event,
1313 )
1314 .unwrap();
1315 assert_eq!(recv_buffer, MSG);
1316 }
1317 }
1318