1 // Copyright 2022 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::ffi::CString;
6 use std::fs::OpenOptions;
7 use std::io;
8 use std::io::Result;
9 use std::mem;
10 use std::os::windows::fs::OpenOptionsExt;
11 use std::process;
12 use std::ptr;
13 use std::sync::atomic::AtomicBool;
14 use std::sync::atomic::AtomicUsize;
15 use std::sync::atomic::Ordering;
16 use std::sync::Arc;
17
18 use rand::Rng;
19 use serde::Deserialize;
20 use serde::Serialize;
21 use sync::Mutex;
22 use win_util::fail_if_zero;
23 use win_util::SecurityAttributes;
24 use win_util::SelfRelativeSecurityDescriptor;
25 use winapi::shared::minwindef::DWORD;
26 use winapi::shared::minwindef::FALSE;
27 use winapi::shared::minwindef::TRUE;
28 use winapi::shared::winerror::ERROR_BROKEN_PIPE;
29 use winapi::shared::winerror::ERROR_IO_INCOMPLETE;
30 use winapi::shared::winerror::ERROR_IO_PENDING;
31 use winapi::shared::winerror::ERROR_MORE_DATA;
32 use winapi::shared::winerror::ERROR_NO_DATA;
33 use winapi::shared::winerror::ERROR_PIPE_CONNECTED;
34 use winapi::um::errhandlingapi::GetLastError;
35 use winapi::um::fileapi::FlushFileBuffers;
36 use winapi::um::handleapi::INVALID_HANDLE_VALUE;
37 use winapi::um::ioapiset::CancelIoEx;
38 use winapi::um::ioapiset::GetOverlappedResult;
39 use winapi::um::minwinbase::OVERLAPPED;
40 use winapi::um::namedpipeapi::ConnectNamedPipe;
41 use winapi::um::namedpipeapi::DisconnectNamedPipe;
42 use winapi::um::namedpipeapi::GetNamedPipeInfo;
43 use winapi::um::namedpipeapi::PeekNamedPipe;
44 use winapi::um::namedpipeapi::SetNamedPipeHandleState;
45 use winapi::um::winbase::CreateNamedPipeA;
46 use winapi::um::winbase::FILE_FLAG_FIRST_PIPE_INSTANCE;
47 use winapi::um::winbase::FILE_FLAG_OVERLAPPED;
48 use winapi::um::winbase::PIPE_ACCESS_DUPLEX;
49 use winapi::um::winbase::PIPE_NOWAIT;
50 use winapi::um::winbase::PIPE_READMODE_BYTE;
51 use winapi::um::winbase::PIPE_READMODE_MESSAGE;
52 use winapi::um::winbase::PIPE_REJECT_REMOTE_CLIENTS;
53 use winapi::um::winbase::PIPE_TYPE_BYTE;
54 use winapi::um::winbase::PIPE_TYPE_MESSAGE;
55 use winapi::um::winbase::PIPE_WAIT;
56 use winapi::um::winbase::SECURITY_IDENTIFICATION;
57
58 use super::RawDescriptor;
59 use crate::descriptor::AsRawDescriptor;
60 use crate::descriptor::FromRawDescriptor;
61 use crate::descriptor::IntoRawDescriptor;
62 use crate::descriptor::SafeDescriptor;
63 use crate::Event;
64 use crate::EventToken;
65 use crate::WaitContext;
66
67 /// The default buffer size for all named pipes in the system. If this size is too small, writers
68 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
69 ///
70 /// The general rule is this should be *at least* as big as the largest message, otherwise
71 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
72 /// crate::windows::StreamChannel, which expects to be able to make a complete write before
73 /// releasing a lock that the opposite side needs to complete a read. This means that if the buffer
74 /// is too small:
75 /// * The writer can't complete its write and release the lock because the buffer is too small.
76 /// * The reader can't start reading because the lock is held by the writer, so it can't relieve
77 /// buffer pressure. Note that for message pipes, the reader couldn't do anything to help
78 /// anyway, because a message mode pipe should NOT have a partial read (which is what we would
79 /// need to relieve pressure).
80 /// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
81 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
82
83 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
84
85 #[remain::sorted]
86 #[derive(Debug, thiserror::Error)]
87 pub enum PipeError {
88 #[error("read zero bytes, but this is not an EOF")]
89 ZeroByteReadNoEof,
90 }
91
92 /// Represents one end of a named pipe
93 ///
94 /// NOTE: implementations of Read & Write are trait complaint for EOF/broken pipe handling
95 /// (returning a successful zero byte read), but overlapped read/write versions are NOT (they will
96 /// return broken pipe directly due to API limitations; see PipeConnection::read for
97 /// details).
98 #[derive(Serialize, Deserialize, Debug)]
99 pub struct PipeConnection {
100 handle: SafeDescriptor,
101 framing_mode: FramingMode,
102 blocking_mode: BlockingMode,
103 }
104
105 /// `OVERLAPPED` is allocated on the heap because it must not move while performing I/O operations.
106 ///
107 /// Defined as a separate type so that we can mark it as `Send` and `Sync`.
108 pub struct BoxedOverlapped(pub Box<OVERLAPPED>);
109
110 // SAFETY: `OVERLAPPED` is not automatically `Send` because it contains a `HANDLE`, which is a raw
111 // pointer, but `HANDLE`s are safe to move between threads and thus so is `OVERLAPPED`.
112 unsafe impl Send for BoxedOverlapped {}
113
114 // SAFETY: See the argument for `Send` above. `HANDLE`s are also safe to share between threads.
115 unsafe impl Sync for BoxedOverlapped {}
116
117 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
118 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
119 pub struct OverlappedWrapper {
120 overlapped: BoxedOverlapped,
121 // This field prevents the event handle from being dropped too early and allows callers to
122 // be notified when a read or write overlapped operation has completed.
123 h_event: Option<Event>,
124 in_use: bool,
125 }
126
127 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>128 pub fn get_h_event_ref(&self) -> Option<&Event> {
129 self.h_event.as_ref()
130 }
131
132 /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
133 /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
134 /// returned must not be dropped.
135 ///
136 /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
137 /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
138 /// handle will be signaled when the operation is complete. In other words, you can use
139 /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
140 /// Microsoft though.
new(include_event: bool) -> Result<OverlappedWrapper>141 pub fn new(include_event: bool) -> Result<OverlappedWrapper> {
142 let mut overlapped = OVERLAPPED::default();
143 let h_event = if include_event {
144 Some(Event::new()?)
145 } else {
146 None
147 };
148
149 overlapped.hEvent = if let Some(event) = h_event.as_ref() {
150 event.as_raw_descriptor()
151 } else {
152 0 as RawDescriptor
153 };
154
155 Ok(OverlappedWrapper {
156 overlapped: BoxedOverlapped(Box::new(overlapped)),
157 h_event,
158 in_use: false,
159 })
160 }
161 }
162
163 pub trait WriteOverlapped {
164 /// Perform an overlapped write operation with the specified buffer and overlapped wrapper.
165 /// If successful, the write operation will complete asynchronously, and
166 /// `write_result()` should be called to get the result.
167 ///
168 /// # Safety
169 /// `buf` and `overlapped_wrapper` will be in use for the duration of
170 /// the overlapped operation. These must not be reused and must live until
171 /// after `write_result()` has been called.
write_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>172 unsafe fn write_overlapped(
173 &mut self,
174 buf: &mut [u8],
175 overlapped_wrapper: &mut OverlappedWrapper,
176 ) -> io::Result<()>;
177
178 /// Gets the result of the overlapped write operation. Must only be called
179 /// after issuing an overlapped write operation using `write_overlapped`. The
180 /// same `overlapped_wrapper` must be provided.
write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>181 fn write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
182
183 /// Tries to get the result of the overlapped write operation. Must only be
184 /// called once, and only after issuing an overlapped write operation using
185 /// `write_overlapped`. The same `overlapped_wrapper` must be provided.
186 ///
187 /// An error indicates that the operation hasn't completed yet and
188 /// `write_result` or `try_write_result` should be called again.
try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>189 fn try_write_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper)
190 -> io::Result<usize>;
191 }
192
193 pub trait ReadOverlapped {
194 /// Perform an overlapped read operation with the specified buffer and overlapped wrapper.
195 /// If successful, the read operation will complete asynchronously, and
196 /// `read_result()` should be called to get the result.
197 ///
198 /// # Safety
199 /// `buf` and `overlapped_wrapper` will be in use for the duration of
200 /// the overlapped operation. These must not be reused and must live until
201 /// after `read_result()` has been called.
read_overlapped( &mut self, buf: &mut [u8], overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<()>202 unsafe fn read_overlapped(
203 &mut self,
204 buf: &mut [u8],
205 overlapped_wrapper: &mut OverlappedWrapper,
206 ) -> io::Result<()>;
207
208 /// Gets the result of the overlapped read operation. Must only be called
209 /// once, and only after issuing an overlapped read operation using
210 /// `read_overlapped`. The same `overlapped_wrapper` must be provided.
read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>211 fn read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
212
213 /// Tries to get the result of the overlapped read operation. Must only be called
214 /// after issuing an overlapped read operation using `read_overlapped`. The
215 /// same `overlapped_wrapper` must be provided.
216 ///
217 /// An error indicates that the operation hasn't completed yet and
218 /// `read_result` or `try_read_result` should be called again.
try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>219 fn try_read_result(&mut self, overlapped_wrapper: &mut OverlappedWrapper) -> io::Result<usize>;
220 }
221
222 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq, Eq)]
223 pub enum FramingMode {
224 Byte,
225 Message,
226 }
227
228 impl FramingMode {
to_readmode(self) -> DWORD229 fn to_readmode(self) -> DWORD {
230 match self {
231 FramingMode::Message => PIPE_READMODE_MESSAGE,
232 FramingMode::Byte => PIPE_READMODE_BYTE,
233 }
234 }
235
to_pipetype(self) -> DWORD236 fn to_pipetype(self) -> DWORD {
237 match self {
238 FramingMode::Message => PIPE_TYPE_MESSAGE,
239 FramingMode::Byte => PIPE_TYPE_BYTE,
240 }
241 }
242 }
243
244 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug, Eq)]
245 pub enum BlockingMode {
246 /// Calls to read() block until data is received
247 Wait,
248 /// Calls to read() return immediately even if there is nothing read with error code 232
249 /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
250 ///
251 /// NOTE: This mode is discouraged by the Windows API documentation.
252 NoWait,
253 }
254
255 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD256 fn from(blocking_mode: &BlockingMode) -> DWORD {
257 match blocking_mode {
258 BlockingMode::Wait => PIPE_WAIT,
259 BlockingMode::NoWait => PIPE_NOWAIT,
260 }
261 }
262 }
263
264 /// Sets the handle state for a named pipe in a rust friendly way.
265 /// SAFETY:
266 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>267 unsafe fn set_named_pipe_handle_state(
268 pipe_handle: RawDescriptor,
269 client_mode: &mut DWORD,
270 ) -> Result<()> {
271 // Safe when the pipe handle is open. Safety also requires checking the return value, which we
272 // do below.
273 let success_flag = SetNamedPipeHandleState(
274 /* hNamedPipe= */ pipe_handle,
275 /* lpMode= */ client_mode,
276 /* lpMaxCollectionCount= */ ptr::null_mut(),
277 /* lpCollectDataTimeout= */ ptr::null_mut(),
278 );
279 if success_flag == 0 {
280 Err(io::Error::last_os_error())
281 } else {
282 Ok(())
283 }
284 }
285
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>286 pub fn pair(
287 framing_mode: &FramingMode,
288 blocking_mode: &BlockingMode,
289 timeout: u64,
290 ) -> Result<(PipeConnection, PipeConnection)> {
291 pair_with_buffer_size(
292 framing_mode,
293 blocking_mode,
294 timeout,
295 DEFAULT_BUFFER_SIZE,
296 false,
297 )
298 }
299
300 /// Creates a pair of handles connected to either end of a duplex named pipe.
301 ///
302 /// The pipe created will have a semi-random name and a default set of security options that
303 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
304 /// remote clients, allow only a single server instance, and prevent impersonation by the server
305 /// end of the pipe.
306 ///
307 /// # Arguments
308 ///
309 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
310 /// automatically framed sequence of messages (Message). In message mode it's an error to read
311 /// fewer bytes than were sent in a message from the other end of the pipe.
312 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
313 /// return immediately if there is nothing available (NoWait).
314 /// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
315 /// zero will create sockets with the system default timeout.
316 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
317 /// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
318 /// writes that don't fit in the buffer.
319 /// # Return value
320 ///
321 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
322 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>323 pub fn pair_with_buffer_size(
324 framing_mode: &FramingMode,
325 blocking_mode: &BlockingMode,
326 timeout: u64,
327 buffer_size: usize,
328 overlapped: bool,
329 ) -> Result<(PipeConnection, PipeConnection)> {
330 // Give the pipe a unique name to avoid accidental collisions
331 let pipe_name = format!(
332 r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
333 process::id(),
334 NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
335 rand::thread_rng().gen::<u32>(),
336 );
337
338 let server_end = create_server_pipe(
339 &pipe_name,
340 framing_mode,
341 blocking_mode,
342 timeout,
343 buffer_size,
344 overlapped,
345 )?;
346
347 // Open the named pipe we just created as the client
348 let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
349
350 // Accept the client's connection
351 // Not sure if this is strictly needed but I'm doing it just in case.
352 // We expect at this point that the client will already be connected,
353 // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
354 // It's also OK if we get a return code of success.
355 server_end.wait_for_client_connection()?;
356
357 Ok((server_end, client_end))
358 }
359
360 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
361 /// settings.
362 ///
363 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
364 ///
365 /// # Arguments
366 ///
367 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
368 /// `\\.\pipe\<some-name>`.
369 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
370 /// automatically framed sequence of messages (Message). In message mode it's an error to read
371 /// fewer bytes than were sent in a message from the other end of the pipe.
372 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
373 /// return immediately if there is nothing available (NoWait).
374 /// * `timeout` - A timeout to apply for socket operations, in milliseconds. Setting this to
375 /// zero will create sockets with the system default timeout.
376 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
377 /// buffer automatically as needed, except in the case of NOWAIT pipes, where it will just fail
378 /// writes that don't fit in the buffer.
379 /// * `overlapped` - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>380 pub fn create_server_pipe(
381 pipe_name: &str,
382 framing_mode: &FramingMode,
383 blocking_mode: &BlockingMode,
384 timeout: u64,
385 buffer_size: usize,
386 overlapped: bool,
387 ) -> Result<PipeConnection> {
388 let c_pipe_name = CString::new(pipe_name).unwrap();
389
390 let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
391 if overlapped {
392 open_mode_flags |= FILE_FLAG_OVERLAPPED
393 }
394
395 // This sets flags so there will be an error if >1 instance (server end)
396 // of this pipe name is opened because we expect exactly one.
397 // SAFETY:
398 // Safe because security attributes are valid, pipe_name is valid C string,
399 // and we're checking the return code
400 let server_handle = unsafe {
401 CreateNamedPipeA(
402 c_pipe_name.as_ptr(),
403 /* dwOpenMode= */
404 open_mode_flags,
405 /* dwPipeMode= */
406 framing_mode.to_pipetype()
407 | framing_mode.to_readmode()
408 | DWORD::from(blocking_mode)
409 | PIPE_REJECT_REMOTE_CLIENTS,
410 /* nMaxInstances= */ 1,
411 /* nOutBufferSize= */ buffer_size as DWORD,
412 /* nInBufferSize= */ buffer_size as DWORD,
413 /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
414 /* lpSecurityAttributes= */
415 SecurityAttributes::new_with_security_descriptor(
416 SelfRelativeSecurityDescriptor::get_singleton(),
417 /* inherit= */ true,
418 )
419 .as_mut(),
420 )
421 };
422
423 if server_handle == INVALID_HANDLE_VALUE {
424 Err(io::Error::last_os_error())
425 } else {
426 // SAFETY: Safe because server_handle is valid.
427 unsafe {
428 Ok(PipeConnection {
429 handle: SafeDescriptor::from_raw_descriptor(server_handle),
430 framing_mode: *framing_mode,
431 blocking_mode: *blocking_mode,
432 })
433 }
434 }
435 }
436
437 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
438 /// settings.
439 ///
440 /// The pipe will be set to prevent impersonation of the client by the server process.
441 ///
442 /// # Arguments
443 ///
444 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
445 /// `\\.\pipe\<some-name>`.
446 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
447 /// automatically framed sequence of messages (Message). In message mode it's an error to read
448 /// fewer bytes than were sent in a message from the other end of the pipe.
449 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
450 /// return immediately if there is nothing available (NoWait).
451 /// * `overlapped` - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>452 pub fn create_client_pipe(
453 pipe_name: &str,
454 framing_mode: &FramingMode,
455 blocking_mode: &BlockingMode,
456 overlapped: bool,
457 ) -> Result<PipeConnection> {
458 let client_handle = OpenOptions::new()
459 .read(true)
460 .write(true)
461 .create(true)
462 .security_qos_flags(SECURITY_IDENTIFICATION)
463 .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
464 .open(pipe_name)?
465 .into_raw_descriptor();
466
467 let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
468
469 // SAFETY:
470 // Safe because client_handle's open() call did not return an error.
471 unsafe {
472 set_named_pipe_handle_state(client_handle, &mut client_mode)?;
473 }
474
475 Ok(PipeConnection {
476 // SAFETY:
477 // Safe because client_handle is valid
478 handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
479 framing_mode: *framing_mode,
480 blocking_mode: *blocking_mode,
481 })
482 }
483
484 // This is used to mark types which can be appropriately sent through the
485 // generic helper functions write_to_pipe and read_from_pipe.
486 pub trait PipeSendable {
487 // Default values used to fill in new empty indexes when resizing a buffer to
488 // a larger size.
default() -> Self489 fn default() -> Self;
490 }
491 impl PipeSendable for u8 {
default() -> Self492 fn default() -> Self {
493 0
494 }
495 }
496 impl PipeSendable for RawDescriptor {
default() -> Self497 fn default() -> Self {
498 ptr::null_mut()
499 }
500 }
501
502 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>503 pub fn try_clone(&self) -> Result<PipeConnection> {
504 let copy_handle = self.handle.try_clone()?;
505 Ok(PipeConnection {
506 handle: copy_handle,
507 framing_mode: self.framing_mode,
508 blocking_mode: self.blocking_mode,
509 })
510 }
511
512 /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
513 /// blocking modes.
514 ///
515 /// # Safety
516 /// 1. rd is valid and ownership is transferred to this function when it is called.
517 ///
518 /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
519 /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection520 pub unsafe fn from_raw_descriptor(
521 rd: RawDescriptor,
522 framing_mode: FramingMode,
523 blocking_mode: BlockingMode,
524 ) -> PipeConnection {
525 PipeConnection {
526 handle: SafeDescriptor::from_raw_descriptor(rd),
527 framing_mode,
528 blocking_mode,
529 }
530 }
531
532 /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
533 /// Returns the number of bytes (not values) read.
534 ///
535 /// # Safety
536 ///
537 /// This is safe only when the following conditions hold:
538 /// 1. The data on the other end of the pipe is a valid binary representation of data for
539 /// type T, and
540 /// 2. The number of bytes read is a multiple of the size of T; this must be checked by
541 /// the caller.
542 /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
543 /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>544 pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
545 match PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None) {
546 // Windows allows for zero byte writes on one end of a pipe to be read by the other as
547 // zero byte reads. These zero byte reads DO NOT signify EOF, so from the perspective
548 // of std::io::Read, they cannot be reported as Ok(0). We translate them to errors.
549 //
550 // Within CrosVM, this behavior is not used, but it has been implemented to avoid UB
551 // either in the future, or when talking to non CrosVM named pipes. If we need to
552 // actually use/understand this error from other parts of KiwiVM (e.g. PipeConnection
553 // consumers), we could use ErrorKind::Interrupted (which as of 24/11/26 is not used by
554 // Rust for other purposes).
555 Ok(len) if len == 0 && !buf.is_empty() => Err(io::Error::new(
556 io::ErrorKind::Other,
557 PipeError::ZeroByteReadNoEof,
558 )),
559
560 // Read at least 1 byte, or 0 bytes if a zero byte buffer was provided.
561 Ok(len) => Ok(len),
562
563 // Treat a closed pipe like an EOF, because that is consistent with the Read trait.
564 //
565 // NOTE: this is explicitly NOT done for overlapped operations for a few reasons:
566 // 1. Overlapped operations do not follow the Read trait, so there is no strong reason
567 // *to* do it.
568 // 2. Ok(0) also means "overlapped operation started successfully." This is a real
569 // problem because the general pattern is to start an overlapped operation and then
570 // wait for it. So if we did that and the Ok(0) meant the pipe is closed, we would
571 // enter an infinite wait. (The kernel already told us when we started the operation
572 // that the pipe was closed. It won't tell us again.)
573 Err(e) if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) => Ok(0),
574
575 Err(e) => Err(e),
576 }
577 }
578
579 /// Similar to `PipeConnection::read` except it also allows:
580 /// 1. The same end of the named pipe to read and write at the same time in different
581 /// threads.
582 /// 2. Asynchronous read and write (read and write won't block).
583 ///
584 /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
585 /// (can be created with `OverlappedWrapper::new`) will be passed into
586 /// `ReadFile`. That event will be triggered when the read operation is complete.
587 ///
588 /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
589 /// also help with waiting until the read operation is complete.
590 ///
591 /// # Safety
592 ///
593 /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
594 /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>595 pub unsafe fn read_overlapped<T: PipeSendable>(
596 &mut self,
597 buf: &mut [T],
598 overlapped_wrapper: &mut OverlappedWrapper,
599 ) -> Result<()> {
600 if overlapped_wrapper.in_use {
601 return Err(std::io::Error::new(
602 std::io::ErrorKind::InvalidInput,
603 "Overlapped struct already in use",
604 ));
605 }
606 overlapped_wrapper.in_use = true;
607
608 PipeConnection::read_internal(
609 &self.handle,
610 self.blocking_mode,
611 buf,
612 Some(&mut overlapped_wrapper.overlapped.0),
613 )?;
614 Ok(())
615 }
616
617 /// Helper for `read_overlapped` and `read`
618 ///
619 /// # Safety
620 /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>621 unsafe fn read_internal<T: PipeSendable>(
622 handle: &SafeDescriptor,
623 blocking_mode: BlockingMode,
624 buf: &mut [T],
625 overlapped: Option<&mut OVERLAPPED>,
626 ) -> Result<usize> {
627 let res = crate::windows::read_file(
628 handle,
629 buf.as_mut_ptr() as *mut u8,
630 mem::size_of_val(buf),
631 overlapped,
632 );
633 match res {
634 Ok(bytes_read) => Ok(bytes_read),
635 // For message mode pipes, if the buffer is too small for the entire message, the kernel
636 // will return ERROR_MORE_DATA. This isn't strictly an "error" because the operation
637 // succeeds. Making it an error also means it's hard to handle this cleanly from the
638 // perspective of an io::Read consumer. So we discard the non-error, and return the
639 // successful result of filling the entire buffer.
640 Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => Ok(buf.len()),
641 Err(e)
642 if blocking_mode == BlockingMode::NoWait
643 && e.raw_os_error() == Some(ERROR_NO_DATA as i32) =>
644 {
645 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
646 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
647 // correct. For further details see:
648 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
649 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
650 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
651 }
652 Err(e) => Err(e),
653 }
654 }
655
656 /// Blockingly reads a `buf` bytes from the pipe. The blocking read can be interrupted
657 /// by an event on `exit_event`.
read_overlapped_blocking<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<()>658 pub fn read_overlapped_blocking<T: PipeSendable>(
659 &mut self,
660 buf: &mut [T],
661 overlapped_wrapper: &mut OverlappedWrapper,
662 exit_event: &Event,
663 ) -> Result<()> {
664 // SAFETY:
665 // Safe because we are providing a valid buffer slice and also providing a valid
666 // overlapped struct.
667 match unsafe { self.read_overlapped(buf, overlapped_wrapper) } {
668 Err(e) => Err(e),
669 Ok(()) => Ok(()),
670 }?;
671
672 #[derive(EventToken)]
673 enum Token {
674 ReadOverlapped,
675 Exit,
676 }
677
678 let wait_ctx = WaitContext::build_with(&[
679 (
680 overlapped_wrapper.get_h_event_ref().unwrap(),
681 Token::ReadOverlapped,
682 ),
683 (exit_event, Token::Exit),
684 ])?;
685
686 let events = wait_ctx.wait()?;
687 for event in events {
688 match event.token {
689 Token::ReadOverlapped => {
690 let size_read_in_bytes =
691 self.get_overlapped_result(overlapped_wrapper)? as usize;
692
693 // If this error shows, most likely the overlapped named pipe was set up
694 // incorrectly.
695 if size_read_in_bytes != buf.len() {
696 return Err(std::io::Error::new(
697 std::io::ErrorKind::UnexpectedEof,
698 "Short read",
699 ));
700 }
701 }
702 Token::Exit => {
703 return Err(std::io::Error::new(
704 std::io::ErrorKind::Interrupted,
705 "IO canceled on exit request",
706 ));
707 }
708 }
709 }
710
711 Ok(())
712 }
713
714 /// Gets the size in bytes of data in the pipe.
715 ///
716 /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
717 /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>718 pub fn get_available_byte_count(&self) -> io::Result<u32> {
719 let mut total_bytes_avail: DWORD = 0;
720
721 // SAFETY:
722 // Safe because the underlying pipe handle is guaranteed to be open, and the output values
723 // live at valid memory locations.
724 fail_if_zero!(unsafe {
725 PeekNamedPipe(
726 self.as_raw_descriptor(),
727 ptr::null_mut(),
728 0,
729 ptr::null_mut(),
730 &mut total_bytes_avail,
731 ptr::null_mut(),
732 )
733 });
734
735 Ok(total_bytes_avail)
736 }
737
738 /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
739 /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>740 pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
741 // SAFETY: overlapped is None so this is safe.
742 unsafe { PipeConnection::write_internal(&self.handle, buf, None) }
743 }
744
745 /// Similar to `PipeConnection::write` except it also allows:
746 /// 1. The same end of the named pipe to read and write at the same time in different
747 /// threads.
748 /// 2. Asynchronous read and write (read and write won't block).
749 ///
750 /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
751 /// (can be created with `OverlappedWrapper::new`) will be passed into
752 /// `WriteFile`. That event will be triggered when the write operation is complete.
753 ///
754 /// In order to get how many bytes were written, call `get_overlapped_result`. That function
755 /// will also help with waiting until the write operation is complete. The pipe must be
756 /// opened in overlapped otherwise there may be unexpected behavior.
757 ///
758 /// # Safety
759 /// * buf & overlapped_wrapper MUST live until the overlapped operation is complete.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>760 pub unsafe fn write_overlapped<T: PipeSendable>(
761 &mut self,
762 buf: &[T],
763 overlapped_wrapper: &mut OverlappedWrapper,
764 ) -> Result<()> {
765 if overlapped_wrapper.in_use {
766 return Err(std::io::Error::new(
767 std::io::ErrorKind::InvalidInput,
768 "Overlapped struct already in use",
769 ));
770 }
771 overlapped_wrapper.in_use = true;
772
773 PipeConnection::write_internal(
774 &self.handle,
775 buf,
776 Some(&mut overlapped_wrapper.overlapped.0),
777 )?;
778 Ok(())
779 }
780
781 /// Helper for `write_overlapped` and `write`.
782 ///
783 /// # Safety
784 /// * Safe if overlapped is None.
785 /// * Safe if overlapped is Some and:
786 /// + buf lives until the overlapped operation is complete.
787 /// + overlapped lives until the overlapped operation is complete.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>788 unsafe fn write_internal<T: PipeSendable>(
789 handle: &SafeDescriptor,
790 buf: &[T],
791 overlapped: Option<&mut OVERLAPPED>,
792 ) -> Result<usize> {
793 // SAFETY:
794 // Safe because buf points to memory valid until the write completes and we pass a valid
795 // length for that memory.
796 unsafe {
797 crate::windows::write_file(
798 handle,
799 buf.as_ptr() as *const u8,
800 mem::size_of_val(buf),
801 overlapped,
802 )
803 }
804 }
805
806 /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>807 pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
808 let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
809 self.blocking_mode = *blocking_mode;
810
811 // SAFETY:
812 // Safe because the pipe has not been closed (it is managed by this object).
813 unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
814 }
815
816 /// For a server named pipe, waits for a client to connect (blocking).
wait_for_client_connection(&self) -> Result<()>817 pub fn wait_for_client_connection(&self) -> Result<()> {
818 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
819 self.wait_for_client_connection_internal(
820 &mut overlapped_wrapper,
821 /* should_block = */ true,
822 )
823 }
824
825 /// Interruptable blocking wait for a client to connect.
wait_for_client_connection_overlapped_blocking( &mut self, exit_event: &Event, ) -> Result<()>826 pub fn wait_for_client_connection_overlapped_blocking(
827 &mut self,
828 exit_event: &Event,
829 ) -> Result<()> {
830 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event = */ true)?;
831 self.wait_for_client_connection_internal(
832 &mut overlapped_wrapper,
833 /* should_block = */ false,
834 )?;
835
836 #[derive(EventToken)]
837 enum Token {
838 Connected,
839 Exit,
840 }
841
842 let wait_ctx = WaitContext::build_with(&[
843 (
844 overlapped_wrapper.get_h_event_ref().unwrap(),
845 Token::Connected,
846 ),
847 (exit_event, Token::Exit),
848 ])?;
849
850 let events = wait_ctx.wait()?;
851 if let Some(event) = events.into_iter().next() {
852 return match event.token {
853 Token::Connected => Ok(()),
854 Token::Exit => {
855 // We must cancel IO here because it is unsafe to free the overlapped wrapper
856 // while the IO operation is active.
857 self.cancel_io()?;
858
859 Err(std::io::Error::new(
860 std::io::ErrorKind::Interrupted,
861 "IO canceled on exit request",
862 ))
863 }
864 };
865 }
866 unreachable!("wait cannot return Ok with zero events");
867 }
868
869 /// For a server named pipe, waits for a client to connect using the given overlapped wrapper
870 /// to signal connection.
wait_for_client_connection_overlapped( &self, overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>871 pub fn wait_for_client_connection_overlapped(
872 &self,
873 overlapped_wrapper: &mut OverlappedWrapper,
874 ) -> Result<()> {
875 self.wait_for_client_connection_internal(
876 overlapped_wrapper,
877 /* should_block = */ false,
878 )
879 }
880
wait_for_client_connection_internal( &self, overlapped_wrapper: &mut OverlappedWrapper, should_block: bool, ) -> Result<()>881 fn wait_for_client_connection_internal(
882 &self,
883 overlapped_wrapper: &mut OverlappedWrapper,
884 should_block: bool,
885 ) -> Result<()> {
886 // SAFETY:
887 // Safe because the handle is valid and we're checking the return
888 // code according to the documentation
889 //
890 // TODO(b/279669296) this safety statement is incomplete, and as such incorrect in one case:
891 // overlapped_wrapper must live until the overlapped operation is complete; however,
892 // if should_block is false, nothing guarantees that lifetime and so overlapped_wrapper
893 // could be freed while the operation is still running.
894 unsafe {
895 let success_flag = ConnectNamedPipe(
896 self.as_raw_descriptor(),
897 // Note: The overlapped structure is only used if the pipe was opened in
898 // OVERLAPPED mode, but is necessary in that case.
899 &mut *overlapped_wrapper.overlapped.0,
900 );
901 if success_flag == 0 {
902 return match GetLastError() {
903 ERROR_PIPE_CONNECTED => {
904 if !should_block {
905 // If async, make sure the event is signalled to indicate the client
906 // is ready.
907 overlapped_wrapper.get_h_event_ref().unwrap().signal()?;
908 }
909
910 Ok(())
911 }
912 ERROR_IO_PENDING => {
913 if should_block {
914 overlapped_wrapper.get_h_event_ref().unwrap().wait()?;
915 }
916 Ok(())
917 }
918 err => Err(io::Error::from_raw_os_error(err as i32)),
919 };
920 }
921 }
922 Ok(())
923 }
924
925 /// Used for overlapped read and write operations.
926 ///
927 /// This will block until the ReadFile or WriteFile operation that also took in
928 /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
929 /// `OverlappedWrapper::new` or that `OVERLAPPED.hEvent` is set. This will also get
930 /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>931 pub fn get_overlapped_result(
932 &mut self,
933 overlapped_wrapper: &mut OverlappedWrapper,
934 ) -> io::Result<u32> {
935 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ true);
936 overlapped_wrapper.in_use = false;
937 res
938 }
939
940 /// Used for overlapped read and write operations.
941 ///
942 /// This will return immediately, regardless of the completion status of the
943 /// ReadFile or WriteFile operation that took in `overlapped_wrapper`,
944 /// assuming `overlapped_wrapper` was created from `OverlappedWrapper::new`
945 /// or that `OVERLAPPED.hEvent` is set. This will also get the number of bytes
946 /// that were read or written, if completed. If the operation hasn't
947 /// completed, an error of kind `io::ErrorKind::WouldBlock` will be
948 /// returned.
try_get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>949 pub fn try_get_overlapped_result(
950 &mut self,
951 overlapped_wrapper: &mut OverlappedWrapper,
952 ) -> io::Result<u32> {
953 let res = self.get_overlapped_result_internal(overlapped_wrapper, /* wait= */ false);
954 match res {
955 Err(err) if err.raw_os_error().unwrap() as u32 == ERROR_IO_INCOMPLETE => {
956 Err(io::Error::new(io::ErrorKind::WouldBlock, err))
957 }
958 _ => {
959 overlapped_wrapper.in_use = false;
960 res
961 }
962 }
963 }
964
get_overlapped_result_internal( &mut self, overlapped_wrapper: &mut OverlappedWrapper, wait: bool, ) -> io::Result<u32>965 fn get_overlapped_result_internal(
966 &mut self,
967 overlapped_wrapper: &mut OverlappedWrapper,
968 wait: bool,
969 ) -> io::Result<u32> {
970 if !overlapped_wrapper.in_use {
971 return Err(std::io::Error::new(
972 std::io::ErrorKind::InvalidInput,
973 "Overlapped struct is not in use",
974 ));
975 }
976
977 let mut size_transferred = 0;
978 // SAFETY:
979 // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
980 // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
981 if (unsafe {
982 GetOverlappedResult(
983 self.handle.as_raw_descriptor(),
984 &mut *overlapped_wrapper.overlapped.0,
985 &mut size_transferred,
986 if wait { TRUE } else { FALSE },
987 )
988 }) != 0
989 {
990 Ok(size_transferred)
991 } else {
992 let e = io::Error::last_os_error();
993 match e.raw_os_error() {
994 // More data => partial read of a message on a message pipe. This isn't really an
995 // error (see PipeConnection::read_internal) since we filled the provided buffer.
996 Some(error_code) if error_code as u32 == ERROR_MORE_DATA => Ok(size_transferred),
997 _ => Err(e),
998 }
999 }
1000 }
1001
1002 /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
1003 /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>1004 pub fn cancel_io(&mut self) -> Result<()> {
1005 fail_if_zero!(
1006 // SAFETY: descriptor is valid and the return value is checked.
1007 unsafe {
1008 CancelIoEx(
1009 self.handle.as_raw_descriptor(),
1010 /* lpOverlapped= */ std::ptr::null_mut(),
1011 )
1012 }
1013 );
1014
1015 Ok(())
1016 }
1017
1018 /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode1019 pub fn get_framing_mode(&self) -> FramingMode {
1020 self.framing_mode
1021 }
1022
1023 /// Returns metadata about the connected NamedPipe.
get_info(&self) -> Result<NamedPipeInfo>1024 pub fn get_info(&self) -> Result<NamedPipeInfo> {
1025 let mut flags: u32 = 0;
1026 let mut incoming_buffer_size: u32 = 0;
1027 let mut outgoing_buffer_size: u32 = 0;
1028 let mut max_instances: u32 = 0;
1029 // SAFETY: all pointers are valid
1030 fail_if_zero!(unsafe {
1031 GetNamedPipeInfo(
1032 self.as_raw_descriptor(),
1033 &mut flags,
1034 &mut outgoing_buffer_size,
1035 &mut incoming_buffer_size,
1036 &mut max_instances,
1037 )
1038 });
1039
1040 Ok(NamedPipeInfo {
1041 outgoing_buffer_size,
1042 incoming_buffer_size,
1043 max_instances,
1044 flags,
1045 })
1046 }
1047
1048 /// For a server pipe, flush the pipe contents. This will
1049 /// block until the pipe is cleared by the client. Only
1050 /// call this if you are sure the client is reading the
1051 /// data!
flush_data_blocking(&self) -> Result<()>1052 pub fn flush_data_blocking(&self) -> Result<()> {
1053 // SAFETY:
1054 // Safe because the only buffers interacted with are
1055 // outside of Rust memory
1056 fail_if_zero!(unsafe { FlushFileBuffers(self.as_raw_descriptor()) });
1057 Ok(())
1058 }
1059
1060 /// For a server pipe, disconnect all clients, discarding any buffered data.
disconnect_clients(&self) -> Result<()>1061 pub fn disconnect_clients(&self) -> Result<()> {
1062 // SAFETY:
1063 // Safe because we own the handle passed in and know it will remain valid for the duration
1064 // of the call. Discarded buffers are not managed by rust.
1065 fail_if_zero!(unsafe { DisconnectNamedPipe(self.as_raw_descriptor()) });
1066 Ok(())
1067 }
1068 }
1069
1070 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor1071 fn as_raw_descriptor(&self) -> RawDescriptor {
1072 self.handle.as_raw_descriptor()
1073 }
1074 }
1075
1076 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor1077 fn into_raw_descriptor(self) -> RawDescriptor {
1078 self.handle.into_raw_descriptor()
1079 }
1080 }
1081
1082 // SAFETY: Send safety is ensured by inner fields.
1083 unsafe impl Send for PipeConnection {}
1084 // SAFETY: Sync safety is ensured by inner fields.
1085 unsafe impl Sync for PipeConnection {}
1086
1087 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>1088 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1089 // SAFETY:
1090 // This is safe because PipeConnection::read is always safe for u8
1091 unsafe { PipeConnection::read(self, buf) }
1092 }
1093 }
1094
1095 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>1096 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1097 PipeConnection::write(self, buf)
1098 }
1099
flush(&mut self) -> io::Result<()>1100 fn flush(&mut self) -> io::Result<()> {
1101 Ok(())
1102 }
1103 }
1104
1105 /// A simple data struct representing
1106 /// metadata about a NamedPipe.
1107 #[derive(Debug, PartialEq, Eq)]
1108 pub struct NamedPipeInfo {
1109 pub outgoing_buffer_size: u32,
1110 pub incoming_buffer_size: u32,
1111 pub max_instances: u32,
1112 pub flags: u32,
1113 }
1114
1115 /// This is a wrapper around PipeConnection. This allows a read and a write operations
1116 /// to run in parallel but not multiple reads or writes in parallel.
1117 ///
1118 /// Reason: The message from/to service are two-parts - a fixed size header that
1119 /// contains the size of the actual message. By allowing only one write at a time
1120 /// we ensure that the variable size message is written/read right after writing/reading
1121 /// fixed size header. For example it avoid sending or receiving in messages in order like
1122 /// H1, H2, M1, M2
1123 /// - where header H1 and its message M1 are sent by one event loop and H2 and its message M2 are
1124 /// sent by another event loop.
1125 ///
1126 /// Do not expose direct access to reader or writer pipes.
1127 ///
1128 /// The struct is clone-able so that different event loops can talk to the other end.
1129 #[derive(Clone)]
1130 pub struct MultiPartMessagePipe {
1131 // Lock protected pipe to receive messages.
1132 reader: Arc<Mutex<PipeConnection>>,
1133 // Lock protected pipe to send messages.
1134 writer: Arc<Mutex<PipeConnection>>,
1135 // Whether this end is created as server or client. The variable helps to
1136 // decide if something meanigful should be done when `wait_for_connection` is called.
1137 is_server: bool,
1138 // Always true if pipe is created as client.
1139 // Defaults to false on server. Updated to true on calling `wait_for_connection`
1140 // after a client connects.
1141 is_connected: Arc<AtomicBool>,
1142 }
1143
1144 impl MultiPartMessagePipe {
create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self>1145 fn create_from_pipe(pipe: PipeConnection, is_server: bool) -> Result<Self> {
1146 Ok(Self {
1147 reader: Arc::new(Mutex::new(pipe.try_clone()?)),
1148 writer: Arc::new(Mutex::new(pipe)),
1149 is_server,
1150 is_connected: Arc::new(AtomicBool::new(false)),
1151 })
1152 }
1153
1154 /// Create server side of MutiPartMessagePipe.
1155 /// # Safety
1156 /// `pipe` must be a server named pipe.
1157 #[deny(unsafe_op_in_unsafe_fn)]
create_from_server_pipe(pipe: PipeConnection) -> Result<Self>1158 pub unsafe fn create_from_server_pipe(pipe: PipeConnection) -> Result<Self> {
1159 Self::create_from_pipe(pipe, true)
1160 }
1161
1162 /// Create client side of MutiPartMessagePipe.
create_as_client(pipe_name: &str) -> Result<Self>1163 pub fn create_as_client(pipe_name: &str) -> Result<Self> {
1164 let pipe = create_client_pipe(
1165 &format!(r"\\.\pipe\{}", pipe_name),
1166 &FramingMode::Message,
1167 &BlockingMode::Wait,
1168 /* overlapped= */ true,
1169 )?;
1170 Self::create_from_pipe(pipe, false)
1171 }
1172
1173 /// Create server side of MutiPartMessagePipe.
create_as_server(pipe_name: &str) -> Result<Self>1174 pub fn create_as_server(pipe_name: &str) -> Result<Self> {
1175 let pipe = create_server_pipe(
1176 &format!(r"\\.\pipe\{}", pipe_name,),
1177 &FramingMode::Message,
1178 &BlockingMode::Wait,
1179 0,
1180 1024 * 1024,
1181 true,
1182 )?;
1183 // SAFETY: `pipe` is a server named pipe.
1184 unsafe { Self::create_from_server_pipe(pipe) }
1185 }
1186
1187 /// If the struct is created as a server then waits for client connection to arrive.
1188 /// It only waits on reader as reader and writer are clones.
wait_for_connection(&self) -> Result<()>1189 pub fn wait_for_connection(&self) -> Result<()> {
1190 if self.is_server && !self.is_connected.load(Ordering::Relaxed) {
1191 self.reader.lock().wait_for_client_connection()?;
1192 self.is_connected.store(true, Ordering::Relaxed);
1193 }
1194 Ok(())
1195 }
1196
write_overlapped_blocking_message_internal<T: PipeSendable>( pipe: &mut PipeConnection, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1197 fn write_overlapped_blocking_message_internal<T: PipeSendable>(
1198 pipe: &mut PipeConnection,
1199 buf: &[T],
1200 overlapped_wrapper: &mut OverlappedWrapper,
1201 ) -> Result<()> {
1202 // Safety:
1203 // `buf` and `overlapped_wrapper` will be in use for the duration of
1204 // the overlapped operation. These must not be reused and must live until
1205 // after `get_overlapped_result()` has been called which is done right
1206 // after this call.
1207 unsafe {
1208 pipe.write_overlapped(buf, overlapped_wrapper)?;
1209 }
1210
1211 let size_written_in_bytes = pipe.get_overlapped_result(overlapped_wrapper)?;
1212
1213 if size_written_in_bytes as usize != buf.len() {
1214 return Err(std::io::Error::new(
1215 std::io::ErrorKind::UnexpectedEof,
1216 format!(
1217 "Short write expected:{} found:{}",
1218 size_written_in_bytes,
1219 buf.len(),
1220 ),
1221 ));
1222 }
1223 Ok(())
1224 }
1225 /// Sends, blockingly,`buf` over the pipe in its entirety. Partial write is considered
write_overlapped_blocking_message<T: PipeSendable>( &self, header: &[T], message: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>1226 pub fn write_overlapped_blocking_message<T: PipeSendable>(
1227 &self,
1228 header: &[T],
1229 message: &[T],
1230 overlapped_wrapper: &mut OverlappedWrapper,
1231 ) -> Result<()> {
1232 let mut writer = self.writer.lock();
1233 Self::write_overlapped_blocking_message_internal(&mut writer, header, overlapped_wrapper)?;
1234 Self::write_overlapped_blocking_message_internal(&mut writer, message, overlapped_wrapper)
1235 }
1236
1237 /// Reads a variable size message and returns the message on success.
1238 /// The size of the message is expected to proceed the message in
1239 /// the form of `header_size` message.
1240 ///
1241 /// `parse_message_size` lets caller parse the header to extract
1242 /// message size.
1243 ///
1244 /// Event on `exit_event` is used to interrupt the blocked read.
read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>( &self, header_size: usize, parse_message_size: F, overlapped_wrapper: &mut OverlappedWrapper, exit_event: &Event, ) -> Result<Vec<u8>>1245 pub fn read_overlapped_blocking_message<F: FnOnce(&[u8]) -> usize>(
1246 &self,
1247 header_size: usize,
1248 parse_message_size: F,
1249 overlapped_wrapper: &mut OverlappedWrapper,
1250 exit_event: &Event,
1251 ) -> Result<Vec<u8>> {
1252 let mut pipe = self.reader.lock();
1253 let mut header = vec![0; header_size];
1254 header.resize_with(header_size, Default::default);
1255 pipe.read_overlapped_blocking(&mut header, overlapped_wrapper, exit_event)?;
1256 let message_size = parse_message_size(&header);
1257 if message_size == 0 {
1258 return Ok(vec![]);
1259 }
1260 let mut buf = vec![];
1261 buf.resize_with(message_size, Default::default);
1262 pipe.read_overlapped_blocking(&mut buf, overlapped_wrapper, exit_event)?;
1263 Ok(buf)
1264 }
1265
1266 /// Returns the inner named pipe if the current struct is the sole owner of the underlying
1267 /// named pipe.
1268 ///
1269 /// Otherwise, [`None`] is returned and the struct is dropped.
1270 ///
1271 /// Note that this has a similar race condition like [`Arc::try_unwrap`]: if multiple threads
1272 /// call this function simultaneously on the same clone of [`MultiPartMessagePipe`], it is
1273 /// possible that all of them will result in [`None`]. This is Due to Rust version
1274 /// restriction(1.68.2) when this function is introduced). This race condition can be resolved
1275 /// once we upgrade to 1.70.0 or higher by using [`Arc::into_inner`].
1276 ///
1277 /// If the underlying named pipe is a server named pipe, this method allows the caller to
1278 /// terminate the connection by first flushing the named pipe then disconnecting the clients
1279 /// idiomatically per
1280 /// https://learn.microsoft.com/en-us/windows/win32/ipc/named-pipe-operations#:~:text=When%20a%20client,of%20the%20pipe.
into_inner_pipe(self) -> Option<PipeConnection>1281 pub fn into_inner_pipe(self) -> Option<PipeConnection> {
1282 let piper = Arc::clone(&self.reader);
1283 drop(self);
1284 Arc::try_unwrap(piper).ok().map(Mutex::into_inner)
1285 }
1286 }
1287
1288 impl TryFrom<PipeConnection> for MultiPartMessagePipe {
1289 type Error = std::io::Error;
try_from(pipe: PipeConnection) -> Result<Self>1290 fn try_from(pipe: PipeConnection) -> Result<Self> {
1291 Self::create_from_pipe(pipe, false)
1292 }
1293 }
1294
1295 #[cfg(test)]
1296 mod tests {
1297 use std::mem::size_of;
1298 use std::thread::JoinHandle;
1299 use std::time::Duration;
1300
1301 use super::*;
1302
1303 #[test]
duplex_pipe_stream()1304 fn duplex_pipe_stream() {
1305 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1306
1307 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1308 // SAFETY: trivially safe with pipe created and return value checked.
1309 unsafe {
1310 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1311 println!("{}", dir);
1312
1313 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
1314
1315 // Smaller than what we sent so we get multiple chunks
1316 let mut recv_buffer: [u8; 4] = [0; 4];
1317
1318 let mut size = receiver.read(&mut recv_buffer).unwrap();
1319 assert_eq!(size, 4);
1320 assert_eq!(recv_buffer, [75, 77, 54, 82]);
1321
1322 size = receiver.read(&mut recv_buffer).unwrap();
1323 assert_eq!(size, 2);
1324 assert_eq!(recv_buffer[0..2], [76, 65]);
1325 }
1326 }
1327 }
1328
1329 #[test]
available_byte_count_byte_mode()1330 fn available_byte_count_byte_mode() {
1331 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1332 p1.write(&[1, 23, 45]).unwrap();
1333 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1334
1335 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1336 // yield the same value.
1337 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1338 }
1339
1340 #[test]
available_byte_count_message_mode()1341 fn available_byte_count_message_mode() {
1342 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1343 p1.write(&[1, 23, 45]).unwrap();
1344 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1345
1346 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
1347 // yield the same value.
1348 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
1349 }
1350
1351 #[test]
available_byte_count_message_mode_multiple_messages()1352 fn available_byte_count_message_mode_multiple_messages() {
1353 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1354 p1.write(&[1, 2, 3]).unwrap();
1355 p1.write(&[4, 5]).unwrap();
1356 assert_eq!(p2.get_available_byte_count().unwrap(), 5);
1357 }
1358
1359 #[test]
duplex_pipe_message()1360 fn duplex_pipe_message() {
1361 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
1362
1363 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1364 // SAFETY: trivially safe with pipe created and return value checked.
1365 unsafe {
1366 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1367 println!("{}", dir);
1368
1369 // Send 2 messages so that we can check that message framing works
1370 sender.write(&[1, 23, 45]).unwrap();
1371 sender.write(&[67, 89, 10]).unwrap();
1372
1373 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
1374
1375 let mut size = receiver.read(&mut recv_buffer).unwrap();
1376 assert_eq!(size, 3);
1377 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
1378
1379 size = receiver.read(&mut recv_buffer).unwrap();
1380 assert_eq!(size, 3);
1381 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
1382 }
1383 }
1384 }
1385
1386 #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)1387 fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
1388 let mut recv_buffer: [u8; 1] = [0; 1];
1389
1390 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
1391 // SAFETY: trivially safe with PipeConnection created and return value checked.
1392 unsafe {
1393 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
1394 println!("{}", dir);
1395 sender.write(&[1]).unwrap();
1396 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
1397 assert_eq!(
1398 receiver.read(&mut recv_buffer).unwrap_err().kind(),
1399 std::io::ErrorKind::WouldBlock
1400 );
1401 }
1402 }
1403 }
1404
1405 #[test]
duplex_nowait()1406 fn duplex_nowait() {
1407 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
1408 duplex_nowait_helper(&p1, &p2);
1409 }
1410
1411 #[test]
duplex_nowait_set_after_creation()1412 fn duplex_nowait_set_after_creation() {
1413 // Tests non blocking setting after pipe creation
1414 let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1415 p1.set_blocking(&BlockingMode::NoWait)
1416 .expect("Failed to set blocking mode on pipe p1");
1417 p2.set_blocking(&BlockingMode::NoWait)
1418 .expect("Failed to set blocking mode on pipe p2");
1419 duplex_nowait_helper(&p1, &p2);
1420 }
1421
1422 #[test]
duplex_overlapped()1423 fn duplex_overlapped() {
1424 let pipe_name = generate_pipe_name();
1425
1426 let mut p1 = create_server_pipe(
1427 &pipe_name,
1428 &FramingMode::Message,
1429 &BlockingMode::Wait,
1430 /* timeout= */ 0,
1431 /* buffer_size= */ 1000,
1432 /* overlapped= */ true,
1433 )
1434 .unwrap();
1435
1436 let mut p2 = create_client_pipe(
1437 &pipe_name,
1438 &FramingMode::Message,
1439 &BlockingMode::Wait,
1440 /* overlapped= */ true,
1441 )
1442 .unwrap();
1443
1444 // SAFETY:
1445 // Safe because `read_overlapped` can be called since overlapped struct is created.
1446 unsafe {
1447 let mut p1_overlapped_wrapper =
1448 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1449 p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
1450 .unwrap();
1451 let size = p1
1452 .get_overlapped_result(&mut p1_overlapped_wrapper)
1453 .unwrap();
1454 assert_eq!(size, 6);
1455
1456 let mut recv_buffer: [u8; 6] = [0; 6];
1457
1458 let mut p2_overlapped_wrapper =
1459 OverlappedWrapper::new(/* include_event= */ true).unwrap();
1460 p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
1461 .unwrap();
1462 let size = p2
1463 .get_overlapped_result(&mut p2_overlapped_wrapper)
1464 .unwrap();
1465 assert_eq!(size, 6);
1466 assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
1467 }
1468 }
1469
1470 #[test]
duplex_overlapped_test_in_use()1471 fn duplex_overlapped_test_in_use() {
1472 let pipe_name = generate_pipe_name();
1473
1474 let mut p1 = create_server_pipe(
1475 &pipe_name,
1476 &FramingMode::Message,
1477 &BlockingMode::Wait,
1478 /* timeout= */ 0,
1479 /* buffer_size= */ 1000,
1480 /* overlapped= */ true,
1481 )
1482 .unwrap();
1483
1484 let mut p2 = create_client_pipe(
1485 &pipe_name,
1486 &FramingMode::Message,
1487 &BlockingMode::Wait,
1488 /* overlapped= */ true,
1489 )
1490 .unwrap();
1491 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1492
1493 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1494 assert!(res.is_err());
1495
1496 let data = vec![75, 77, 54, 82, 76, 65];
1497 // SAFETY: safe because: data & overlapped wrapper live until the
1498 // operation is verified completed below.
1499 let res = unsafe { p1.write_overlapped(&data, &mut overlapped_wrapper) };
1500 assert!(res.is_ok());
1501
1502 let res =
1503 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1504 // will error out.
1505 unsafe { p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper) };
1506 assert!(res.is_err());
1507
1508 let mut recv_buffer: [u8; 6] = [0; 6];
1509 // SAFETY: safe because we know the unsafe re-use of overlapped wrapper
1510 // will error out.
1511 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1512 assert!(res.is_err());
1513
1514 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1515 assert!(res.is_ok());
1516
1517 let mut recv_buffer: [u8; 6] = [0; 6];
1518 // SAFETY: safe because recv_buffer & overlapped_wrapper live until the
1519 // operation is verified completed below.
1520 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1521 assert!(res.is_ok());
1522 let res = p2.get_overlapped_result(&mut overlapped_wrapper);
1523 assert!(res.is_ok());
1524 }
1525
generate_pipe_name() -> String1526 fn generate_pipe_name() -> String {
1527 format!(
1528 r"\\.\pipe\test-ipc-pipe-name.rand{}",
1529 rand::thread_rng().gen::<u64>(),
1530 )
1531 }
1532
send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()>1533 fn send_receive_msgs(pipe: MultiPartMessagePipe, msg_count: u32) -> JoinHandle<()> {
1534 let messages = ["a", "bb", "ccc", "dddd", "eeeee", "ffffff"];
1535 std::thread::spawn(move || {
1536 let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
1537 let exit_event = Event::new().unwrap();
1538 for _i in 0..msg_count {
1539 let message = *messages
1540 .get(rand::thread_rng().gen::<usize>() % messages.len())
1541 .unwrap();
1542 pipe.write_overlapped_blocking_message(
1543 &message.len().to_be_bytes(),
1544 message.as_bytes(),
1545 &mut overlapped_wrapper,
1546 )
1547 .unwrap();
1548 }
1549 for _i in 0..msg_count {
1550 let message = pipe
1551 .read_overlapped_blocking_message(
1552 size_of::<usize>(),
1553 |bytes: &[u8]| {
1554 assert_eq!(bytes.len(), size_of::<usize>());
1555 usize::from_be_bytes(
1556 bytes.try_into().expect("failed to get array from slice"),
1557 )
1558 },
1559 &mut overlapped_wrapper,
1560 &exit_event,
1561 )
1562 .unwrap();
1563 assert_eq!(
1564 *messages.get(message.len() - 1).unwrap(),
1565 std::str::from_utf8(&message).unwrap(),
1566 );
1567 }
1568 })
1569 }
1570
1571 #[test]
multipart_message_smoke_test()1572 fn multipart_message_smoke_test() {
1573 let pipe_name = generate_pipe_name();
1574 let server = MultiPartMessagePipe::create_as_server(&pipe_name).unwrap();
1575 let client = MultiPartMessagePipe::create_as_client(&pipe_name).unwrap();
1576 let handles = [
1577 send_receive_msgs(server.clone(), 100),
1578 send_receive_msgs(client.clone(), 100),
1579 send_receive_msgs(server, 100),
1580 send_receive_msgs(client, 100),
1581 ];
1582 for h in handles {
1583 h.join().unwrap();
1584 }
1585 }
1586
1587 #[test]
multipart_message_into_inner_pipe()1588 fn multipart_message_into_inner_pipe() {
1589 let pipe_name = generate_pipe_name();
1590 let mut pipe = create_server_pipe(
1591 &format!(r"\\.\pipe\{}", pipe_name),
1592 &FramingMode::Message,
1593 &BlockingMode::Wait,
1594 0,
1595 1024 * 1024,
1596 true,
1597 )
1598 .expect("should create the server pipe with success");
1599 let server1 = {
1600 let pipe = pipe
1601 .try_clone()
1602 .expect("should duplicate the named pipe with success");
1603 // SAFETY: `pipe` is a server named pipe.
1604 unsafe { MultiPartMessagePipe::create_from_server_pipe(pipe) }
1605 .expect("should create the multipart message pipe with success")
1606 };
1607 let server2 = server1.clone();
1608 assert!(
1609 server2.into_inner_pipe().is_none(),
1610 "not the last reference, should be None"
1611 );
1612 let inner_pipe = server1
1613 .into_inner_pipe()
1614 .expect("the last reference, should return the underlying pipe");
1615 // CompareObjectHandles is a Windows 10 API and is not available in mingw, so we can't use
1616 // that API to compare if 2 handles are the same.
1617 pipe.set_blocking(&BlockingMode::NoWait)
1618 .expect("should set the blocking mode on the original pipe with success");
1619 assert_eq!(
1620 pipe.get_info()
1621 .expect("should get the pipe information on the original pipe successfully"),
1622 inner_pipe
1623 .get_info()
1624 .expect("should get the pipe information on the inner pipe successfully")
1625 );
1626 pipe.set_blocking(&BlockingMode::Wait)
1627 .expect("should set the blocking mode on the original pipe with success");
1628 assert_eq!(
1629 pipe.get_info()
1630 .expect("should get the pipe information on the original pipe successfully"),
1631 inner_pipe
1632 .get_info()
1633 .expect("should get the pipe information on the inner pipe successfully")
1634 );
1635 }
1636
1637 #[test]
test_wait_for_connection_blocking()1638 fn test_wait_for_connection_blocking() {
1639 let pipe_name = generate_pipe_name();
1640
1641 let mut server_pipe = create_server_pipe(
1642 &pipe_name,
1643 &FramingMode::Message,
1644 &BlockingMode::Wait,
1645 /* timeout= */ 0,
1646 /* buffer_size= */ 1000,
1647 /* overlapped= */ true,
1648 )
1649 .unwrap();
1650
1651 let server = crate::thread::spawn_with_timeout(move || {
1652 let exit_event = Event::new().unwrap();
1653 server_pipe
1654 .wait_for_client_connection_overlapped_blocking(&exit_event)
1655 .unwrap();
1656 });
1657
1658 let _client = create_client_pipe(
1659 &pipe_name,
1660 &FramingMode::Message,
1661 &BlockingMode::Wait,
1662 /* overlapped= */ true,
1663 )
1664 .unwrap();
1665 server.try_join(Duration::from_secs(10)).unwrap();
1666 }
1667
1668 #[test]
test_wait_for_connection_blocking_exit_triggered()1669 fn test_wait_for_connection_blocking_exit_triggered() {
1670 let pipe_name = generate_pipe_name();
1671
1672 let mut server_pipe = create_server_pipe(
1673 &pipe_name,
1674 &FramingMode::Message,
1675 &BlockingMode::Wait,
1676 /* timeout= */ 0,
1677 /* buffer_size= */ 1000,
1678 /* overlapped= */ true,
1679 )
1680 .unwrap();
1681
1682 let exit_event = Event::new().unwrap();
1683 let exit_event_for_server = exit_event.try_clone().unwrap();
1684 let server = crate::thread::spawn_with_timeout(move || {
1685 assert!(server_pipe
1686 .wait_for_client_connection_overlapped_blocking(&exit_event_for_server)
1687 .is_err());
1688 });
1689 exit_event.signal().unwrap();
1690 server.try_join(Duration::from_secs(10)).unwrap();
1691 }
1692
1693 #[test]
std_io_read_eof()1694 fn std_io_read_eof() {
1695 let (mut w, mut r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1696 std::io::Write::write(&mut w, &[1, 2, 3]).unwrap();
1697 std::mem::drop(w);
1698
1699 let mut buffer: [u8; 4] = [0; 4];
1700 assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 3);
1701 assert_eq!(buffer, [1, 2, 3, 0]);
1702 assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1703 assert_eq!(std::io::Read::read(&mut r, &mut buffer).unwrap(), 0);
1704 }
1705
1706 #[test]
std_io_write_eof()1707 fn std_io_write_eof() {
1708 let (mut w, r) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
1709 std::mem::drop(r);
1710 let result = std::io::Write::write(&mut w, &[1, 2, 3]);
1711 // Not required to return BrokenPipe here, something like Ok(0) is also acceptable.
1712 assert!(
1713 result.is_err()
1714 && result.as_ref().unwrap_err().kind() == std::io::ErrorKind::BrokenPipe,
1715 "expected Err(BrokenPipe), got {result:?}"
1716 );
1717 }
1718 }
1719