1 // Copyright 2022 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use rand::Rng;
6 use std::{
7 ffi::CString,
8 fs::OpenOptions,
9 io,
10 io::Result,
11 mem,
12 os::windows::fs::OpenOptionsExt,
13 process, ptr,
14 sync::atomic::{AtomicUsize, Ordering},
15 };
16
17 use super::{Event, RawDescriptor};
18 use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, IntoRawDescriptor, SafeDescriptor};
19 use serde::{Deserialize, Serialize};
20 use win_util::{SecurityAttributes, SelfRelativeSecurityDescriptor};
21 use winapi::{
22 shared::{
23 minwindef::{DWORD, LPCVOID, LPVOID, TRUE},
24 winerror::{ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED},
25 },
26 um::{
27 errhandlingapi::GetLastError,
28 fileapi::{FlushFileBuffers, ReadFile, WriteFile},
29 handleapi::INVALID_HANDLE_VALUE,
30 ioapiset::{CancelIoEx, GetOverlappedResult},
31 minwinbase::OVERLAPPED,
32 namedpipeapi::{
33 ConnectNamedPipe, GetNamedPipeInfo, PeekNamedPipe, SetNamedPipeHandleState,
34 },
35 winbase::{
36 CreateNamedPipeA, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED,
37 PIPE_ACCESS_DUPLEX, PIPE_NOWAIT, PIPE_READMODE_BYTE, PIPE_READMODE_MESSAGE,
38 PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE, PIPE_WAIT,
39 SECURITY_IDENTIFICATION,
40 },
41 },
42 };
43
44 /// The default buffer size for all named pipes in the system. If this size is too small, writers
45 /// on named pipes that expect not to block *can* block until the reading side empties the buffer.
46 ///
47 /// The general rule is this should be *at least* as big as the largest message, otherwise
48 /// unexpected blocking behavior can result; for example, if too small, this can interact badly with
49 /// crate::platform::StreamChannel, which expects to be able to make a complete write before releasing
50 /// a lock that the opposite side needs to complete a read. This means that if the buffer is too
51 /// small:
52 /// * The writer can't complete its write and release the lock because the buffer is too small.
53 /// * The reader can't start reading because the lock is held by the writer, so it can't
54 /// relieve buffer pressure. Note that for message pipes, the reader couldn't do anything
55 /// to help anyway, because a message mode pipe should NOT have a partial read (which is
56 /// what we would need to relieve pressure).
57 /// * Conditions for deadlock are met, and both the reader & writer enter circular waiting.
58 pub const DEFAULT_BUFFER_SIZE: usize = 50 * 1024;
59
60 static NEXT_PIPE_INDEX: AtomicUsize = AtomicUsize::new(1);
61
62 /// Represents one end of a named pipe
63 #[derive(Serialize, Deserialize, Debug)]
64 pub struct PipeConnection {
65 handle: SafeDescriptor,
66 framing_mode: FramingMode,
67 blocking_mode: BlockingMode,
68 }
69
70 /// Wraps the OVERLAPPED structure. Also keeps track of whether OVERLAPPED is being used by a
71 /// Readfile or WriteFile operation and holds onto the event object so it doesn't get dropped.
72 pub struct OverlappedWrapper {
73 // Allocated on the heap so that the OVERLAPPED struct doesn't move when performing I/O
74 // operations.
75 overlapped: Box<OVERLAPPED>,
76 // This field prevents the event handle from being dropped too early and allows callers to
77 // be notified when a read or write overlapped operation has completed.
78 h_event: Option<Event>,
79 in_use: bool,
80 }
81
82 impl OverlappedWrapper {
get_h_event_ref(&self) -> Option<&Event>83 pub fn get_h_event_ref(&self) -> Option<&Event> {
84 self.h_event.as_ref()
85 }
86 }
87
88 #[derive(Serialize, Deserialize, Copy, Clone, Debug, PartialEq)]
89 pub enum FramingMode {
90 Byte,
91 Message,
92 }
93
94 impl FramingMode {
to_readmode(self) -> DWORD95 fn to_readmode(self) -> DWORD {
96 match self {
97 FramingMode::Message => PIPE_READMODE_MESSAGE,
98 FramingMode::Byte => PIPE_READMODE_BYTE,
99 }
100 }
101
to_pipetype(self) -> DWORD102 fn to_pipetype(self) -> DWORD {
103 match self {
104 FramingMode::Message => PIPE_TYPE_MESSAGE,
105 FramingMode::Byte => PIPE_TYPE_BYTE,
106 }
107 }
108 }
109
110 #[derive(Serialize, Deserialize, Copy, Clone, PartialEq, Debug)]
111 pub enum BlockingMode {
112 /// Calls to read() block until data is received
113 Wait,
114 /// Calls to read() return immediately even if there is nothing read with error code 232
115 /// (Rust maps this to BrokenPipe but it's actually ERROR_NO_DATA)
116 ///
117 /// NOTE: This mode is discouraged by the Windows API documentation.
118 NoWait,
119 }
120
121 impl From<&BlockingMode> for DWORD {
from(blocking_mode: &BlockingMode) -> DWORD122 fn from(blocking_mode: &BlockingMode) -> DWORD {
123 match blocking_mode {
124 BlockingMode::Wait => PIPE_WAIT,
125 BlockingMode::NoWait => PIPE_NOWAIT,
126 }
127 }
128 }
129
130 /// Sets the handle state for a named pipe in a rust friendly way.
131 /// This is safe if the pipe handle is open.
set_named_pipe_handle_state( pipe_handle: RawDescriptor, client_mode: &mut DWORD, ) -> Result<()>132 unsafe fn set_named_pipe_handle_state(
133 pipe_handle: RawDescriptor,
134 client_mode: &mut DWORD,
135 ) -> Result<()> {
136 // Safe when the pipe handle is open. Safety also requires checking the return value, which we
137 // do below.
138 let success_flag = SetNamedPipeHandleState(
139 /* hNamedPipe= */ pipe_handle,
140 /* lpMode= */ client_mode,
141 /* lpMaxCollectionCount= */ ptr::null_mut(),
142 /* lpCollectDataTimeout= */ ptr::null_mut(),
143 );
144 if success_flag == 0 {
145 Err(io::Error::last_os_error())
146 } else {
147 Ok(())
148 }
149 }
150
pair( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, ) -> Result<(PipeConnection, PipeConnection)>151 pub fn pair(
152 framing_mode: &FramingMode,
153 blocking_mode: &BlockingMode,
154 timeout: u64,
155 ) -> Result<(PipeConnection, PipeConnection)> {
156 pair_with_buffer_size(
157 framing_mode,
158 blocking_mode,
159 timeout,
160 DEFAULT_BUFFER_SIZE,
161 false,
162 )
163 }
164
165 /// Creates a pair of handles connected to either end of a duplex named pipe.
166 ///
167 /// The pipe created will have a semi-random name and a default set of security options that
168 /// help prevent common named-pipe based vulnerabilities. Specifically the pipe is set to reject
169 /// remote clients, allow only a single server instance, and prevent impersonation by the server
170 /// end of the pipe.
171 ///
172 /// # Arguments
173 ///
174 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
175 /// automatically framed sequence of messages (Message). In message mode it's an
176 /// error to read fewer bytes than were sent in a message from the other end of
177 /// the pipe.
178 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
179 /// return immediately if there is nothing available (NoWait).
180 /// * `timeout` - A timeout to apply for socket operations, in milliseconds.
181 /// Setting this to zero will create sockets with the system
182 /// default timeout.
183 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
184 /// buffer automatically as needed, except in the case of NOWAIT pipes, where
185 /// it will just fail writes that don't fit in the buffer.
186 /// # Return value
187 ///
188 /// Returns a pair of pipes, of the form (server, client). Note that for some winapis, such as
189 /// FlushFileBuffers, the server & client ends WILL BEHAVE DIFFERENTLY.
pair_with_buffer_size( framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<(PipeConnection, PipeConnection)>190 pub fn pair_with_buffer_size(
191 framing_mode: &FramingMode,
192 blocking_mode: &BlockingMode,
193 timeout: u64,
194 buffer_size: usize,
195 overlapped: bool,
196 ) -> Result<(PipeConnection, PipeConnection)> {
197 // Give the pipe a unique name to avoid accidental collisions
198 let pipe_name = format!(
199 r"\\.\pipe\crosvm_ipc.pid{}.{}.rand{}",
200 process::id(),
201 NEXT_PIPE_INDEX.fetch_add(1, Ordering::SeqCst),
202 rand::thread_rng().gen::<u32>(),
203 );
204
205 let server_end = create_server_pipe(
206 &pipe_name,
207 framing_mode,
208 blocking_mode,
209 timeout,
210 buffer_size,
211 overlapped,
212 )?;
213
214 // Open the named pipe we just created as the client
215 let client_end = create_client_pipe(&pipe_name, framing_mode, blocking_mode, overlapped)?;
216
217 // Accept the client's connection
218 // Not sure if this is strictly needed but I'm doing it just in case.
219 // We expect at this point that the client will already be connected,
220 // so we'll get a return code of 0 and an ERROR_PIPE_CONNECTED.
221 // It's also OK if we get a return code of success.
222 server_end.wait_for_client_connection()?;
223
224 Ok((server_end, client_end))
225 }
226
227 /// Creates a PipeConnection for the server end of a named pipe with the given path and pipe
228 /// settings.
229 ///
230 /// The pipe will be set to reject remote clients and allow only a single connection at a time.
231 ///
232 /// # Arguments
233 ///
234 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
235 /// `\\.\pipe\<some-name>`.
236 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
237 /// automatically framed sequence of messages (Message). In message mode it's an
238 /// error to read fewer bytes than were sent in a message from the other end of
239 /// the pipe.
240 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
241 /// return immediately if there is nothing available (NoWait).
242 /// * `timeout` - A timeout to apply for socket operations, in milliseconds.
243 /// Setting this to zero will create sockets with the system
244 /// default timeout.
245 /// * `buffer_size` - The default buffer size for the named pipe. The system should expand the
246 /// buffer automatically as needed, except in the case of NOWAIT pipes, where
247 /// it will just fail writes that don't fit in the buffer.
248 /// * `overlapped` - Sets whether overlapped mode is set on the pipe.
create_server_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, timeout: u64, buffer_size: usize, overlapped: bool, ) -> Result<PipeConnection>249 pub fn create_server_pipe(
250 pipe_name: &str,
251 framing_mode: &FramingMode,
252 blocking_mode: &BlockingMode,
253 timeout: u64,
254 buffer_size: usize,
255 overlapped: bool,
256 ) -> Result<PipeConnection> {
257 let c_pipe_name = CString::new(pipe_name).unwrap();
258
259 let mut open_mode_flags = PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE;
260 if overlapped {
261 open_mode_flags |= FILE_FLAG_OVERLAPPED
262 }
263
264 // This sets flags so there will be an error if >1 instance (server end)
265 // of this pipe name is opened because we expect exactly one.
266 let server_handle = unsafe {
267 // Safe because security attributes are valid, pipe_name is valid C string,
268 // and we're checking the return code
269 CreateNamedPipeA(
270 c_pipe_name.as_ptr(),
271 /* dwOpenMode= */
272 open_mode_flags,
273 /* dwPipeMode= */
274 framing_mode.to_pipetype()
275 | framing_mode.to_readmode()
276 | DWORD::from(blocking_mode)
277 | PIPE_REJECT_REMOTE_CLIENTS,
278 /* nMaxInstances= */ 1,
279 /* nOutBufferSize= */ buffer_size as DWORD,
280 /* nInBufferSize= */ buffer_size as DWORD,
281 /* nDefaultTimeOut= */ timeout as DWORD, // Default is 50ms
282 /* lpSecurityAttributes= */
283 SecurityAttributes::new_with_security_descriptor(
284 SelfRelativeSecurityDescriptor::get_singleton(),
285 /* inherit= */ true,
286 )
287 .as_mut(),
288 )
289 };
290
291 if server_handle == INVALID_HANDLE_VALUE {
292 Err(io::Error::last_os_error())
293 } else {
294 unsafe {
295 Ok(PipeConnection {
296 handle: SafeDescriptor::from_raw_descriptor(server_handle),
297 framing_mode: *framing_mode,
298 blocking_mode: *blocking_mode,
299 })
300 }
301 }
302 }
303
304 /// Creates a PipeConnection for the client end of a named pipe with the given path and pipe
305 /// settings.
306 ///
307 /// The pipe will be set to prevent impersonation of the client by the server process.
308 ///
309 /// # Arguments
310 ///
311 /// * `pipe_name` - The path of the named pipe to create. Should be in the form
312 /// `\\.\pipe\<some-name>`.
313 /// * `framing_mode` - Whether the system should provide a simple byte stream (Byte) or an
314 /// automatically framed sequence of messages (Message). In message mode it's an
315 /// error to read fewer bytes than were sent in a message from the other end of
316 /// the pipe.
317 /// * `blocking_mode` - Whether the system should wait on read() until data is available (Wait) or
318 /// return immediately if there is nothing available (NoWait).
319 /// * `overlapped` - Sets whether the pipe is opened in overlapped mode.
create_client_pipe( pipe_name: &str, framing_mode: &FramingMode, blocking_mode: &BlockingMode, overlapped: bool, ) -> Result<PipeConnection>320 pub fn create_client_pipe(
321 pipe_name: &str,
322 framing_mode: &FramingMode,
323 blocking_mode: &BlockingMode,
324 overlapped: bool,
325 ) -> Result<PipeConnection> {
326 let client_handle = OpenOptions::new()
327 .read(true)
328 .write(true)
329 .create(true)
330 .security_qos_flags(SECURITY_IDENTIFICATION)
331 .custom_flags(if overlapped { FILE_FLAG_OVERLAPPED } else { 0 })
332 .open(pipe_name)?
333 .into_raw_descriptor();
334
335 let mut client_mode = framing_mode.to_readmode() | DWORD::from(blocking_mode);
336
337 // Safe because client_handle's open() call did not return an error.
338 unsafe {
339 set_named_pipe_handle_state(client_handle, &mut client_mode)?;
340 }
341
342 Ok(PipeConnection {
343 // Safe because client_handle is valid
344 handle: unsafe { SafeDescriptor::from_raw_descriptor(client_handle) },
345 framing_mode: *framing_mode,
346 blocking_mode: *blocking_mode,
347 })
348 }
349
350 // This is used to mark types which can be appropriately sent through the
351 // generic helper functions write_to_pipe and read_from_pipe.
352 pub trait PipeSendable {
353 // Default values used to fill in new empty indexes when resizing a buffer to
354 // a larger size.
default() -> Self355 fn default() -> Self;
356 }
357 impl PipeSendable for u8 {
default() -> Self358 fn default() -> Self {
359 0
360 }
361 }
362 impl PipeSendable for RawDescriptor {
default() -> Self363 fn default() -> Self {
364 ptr::null_mut()
365 }
366 }
367
368 impl PipeConnection {
try_clone(&self) -> Result<PipeConnection>369 pub fn try_clone(&self) -> Result<PipeConnection> {
370 let copy_handle = self.handle.try_clone()?;
371 Ok(PipeConnection {
372 handle: copy_handle,
373 framing_mode: self.framing_mode,
374 blocking_mode: self.blocking_mode,
375 })
376 }
377
378 /// Creates a PipeConnection from an existing RawDescriptor, and the underlying the framing &
379 /// blocking modes.
380 ///
381 /// # Safety
382 /// 1. rd is valid and ownership is transferred to this function when it is called.
383 ///
384 /// To avoid undefined behavior, framing_mode & blocking_modes must match those of the
385 /// underlying pipe.
from_raw_descriptor( rd: RawDescriptor, framing_mode: FramingMode, blocking_mode: BlockingMode, ) -> PipeConnection386 pub unsafe fn from_raw_descriptor(
387 rd: RawDescriptor,
388 framing_mode: FramingMode,
389 blocking_mode: BlockingMode,
390 ) -> PipeConnection {
391 PipeConnection {
392 handle: SafeDescriptor::from_raw_descriptor(rd),
393 framing_mode,
394 blocking_mode,
395 }
396 }
397
398 /// Reads bytes from the pipe into the provided buffer, up to the capacity of the buffer.
399 /// Returns the number of bytes (not values) read.
400 ///
401 /// # Safety
402 ///
403 /// This is safe only when the following conditions hold:
404 /// 1. The data on the other end of the pipe is a valid binary representation of data for
405 /// type T, and
406 /// 2. The number of bytes read is a multiple of the size of T; this must be checked by
407 /// the caller.
408 /// If buf's type is file descriptors, this is only safe when those file descriptors are valid
409 /// for the process where this function was called.
read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize>410 pub unsafe fn read<T: PipeSendable>(&self, buf: &mut [T]) -> Result<usize> {
411 PipeConnection::read_internal(&self.handle, self.blocking_mode, buf, None)
412 }
413
414 /// Similar to `PipeConnection::read` except it also allows:
415 /// 1. The same end of the named pipe to read and write at the same time in different
416 /// threads.
417 /// 2. Asynchronous read and write (read and write won't block).
418 ///
419 /// When reading, it will not block, but instead an `OVERLAPPED` struct that contains an event
420 /// (can be created with `PipeConnection::create_overlapped_struct`) will be passed into
421 /// `ReadFile`. That event will be triggered when the read operation is complete.
422 ///
423 /// In order to get how many bytes were read, call `get_overlapped_result`. That function will
424 /// also help with waiting until the read operation is complete.
425 ///
426 /// # Safety
427 ///
428 /// Same as `PipeConnection::read` safety comments. In addition, the pipe MUST be opened in
429 /// overlapped mode otherwise there may be unexpected behavior.
read_overlapped<T: PipeSendable>( &mut self, buf: &mut [T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>430 pub unsafe fn read_overlapped<T: PipeSendable>(
431 &mut self,
432 buf: &mut [T],
433 overlapped_wrapper: &mut OverlappedWrapper,
434 ) -> Result<()> {
435 if overlapped_wrapper.in_use {
436 return Err(std::io::Error::new(
437 std::io::ErrorKind::InvalidInput,
438 "Overlapped struct already in use",
439 ));
440 }
441 overlapped_wrapper.in_use = true;
442
443 PipeConnection::read_internal(
444 &self.handle,
445 self.blocking_mode,
446 buf,
447 Some(&mut overlapped_wrapper.overlapped),
448 )?;
449 Ok(())
450 }
451
452 /// Helper for `read_overlapped` and `read`
453 ///
454 /// # Safety
455 /// Comments `read_overlapped` or `read`, depending on which is used.
read_internal<T: PipeSendable>( handle: &SafeDescriptor, blocking_mode: BlockingMode, buf: &mut [T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>456 unsafe fn read_internal<T: PipeSendable>(
457 handle: &SafeDescriptor,
458 blocking_mode: BlockingMode,
459 buf: &mut [T],
460 overlapped: Option<&mut OVERLAPPED>,
461 ) -> Result<usize> {
462 let max_bytes_to_read: DWORD = mem::size_of_val(buf) as DWORD;
463 // Used to verify if ERROR_IO_PENDING should be an error.
464 let is_overlapped = overlapped.is_some();
465
466 // Safe because we cap the size of the read to the size of the buffer
467 // and check the return code
468 let mut bytes_read: DWORD = 0;
469 let success_flag = ReadFile(
470 handle.as_raw_descriptor(),
471 buf.as_ptr() as LPVOID,
472 max_bytes_to_read,
473 match overlapped {
474 Some(_) => std::ptr::null_mut(),
475 None => &mut bytes_read,
476 },
477 match overlapped {
478 Some(v) => v,
479 None => std::ptr::null_mut(),
480 },
481 );
482
483 if success_flag == 0 {
484 let e = io::Error::last_os_error();
485 match e.raw_os_error() {
486 Some(error_code)
487 if blocking_mode == BlockingMode::NoWait
488 && error_code == ERROR_NO_DATA as i32 =>
489 {
490 // A NOWAIT pipe will return ERROR_NO_DATA when no data is available; however,
491 // this code is interpreted as a std::io::ErrorKind::BrokenPipe, which is not
492 // correct. For further details see:
493 // https://docs.microsoft.com/en-us/windows/win32/debug/system-error-codes--0-499-
494 // https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipe-type-read-and-wait-modes
495 Err(std::io::Error::new(std::io::ErrorKind::WouldBlock, e))
496 }
497 // ERROR_IO_PENDING, according the to docs, isn't really an error. This just means
498 // that the ReadFile operation hasn't completed. In this case,
499 // `get_overlapped_result` will wait until the operation is completed.
500 Some(error_code) if error_code == ERROR_IO_PENDING as i32 && is_overlapped => {
501 return Ok(0);
502 }
503 _ => Err(e),
504 }
505 } else {
506 Ok(bytes_read as usize)
507 }
508 }
509
510 /// Gets the size in bytes of data in the pipe.
511 ///
512 /// Note that PeekNamedPipes (the underlying win32 API) will return zero if the packets have
513 /// not finished writing on the producer side.
get_available_byte_count(&self) -> io::Result<u32>514 pub fn get_available_byte_count(&self) -> io::Result<u32> {
515 let mut total_bytes_avail: DWORD = 0;
516
517 // Safe because the underlying pipe handle is guaranteed to be open, and the output values
518 // live at valid memory locations.
519 let res = unsafe {
520 PeekNamedPipe(
521 self.as_raw_descriptor(),
522 ptr::null_mut(),
523 0,
524 ptr::null_mut(),
525 &mut total_bytes_avail,
526 ptr::null_mut(),
527 )
528 };
529
530 if res == 0 {
531 Err(io::Error::last_os_error())
532 } else {
533 Ok(total_bytes_avail)
534 }
535 }
536
537 /// Writes the bytes from a slice into the pipe. Returns the number of bytes written, which
538 /// callers should check to ensure that it was the number expected.
write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize>539 pub fn write<T: PipeSendable>(&self, buf: &[T]) -> Result<usize> {
540 PipeConnection::write_internal(&self.handle, buf, None)
541 }
542
543 /// Similar to `PipeConnection::write` except it also allows:
544 /// 1. The same end of the named pipe to read and write at the same time in different
545 /// threads.
546 /// 2. Asynchronous read and write (read and write won't block).
547 ///
548 /// When writing, it will not block, but instead an `OVERLAPPED` struct that contains an event
549 /// (can be created with `PipeConnection::create_overlapped_struct`) will be passed into
550 /// `WriteFile`. That event will be triggered when the write operation is complete.
551 ///
552 /// In order to get how many bytes were written, call `get_overlapped_result`. That function will
553 /// also help with waiting until the write operation is complete. The pipe must be opened in
554 /// overlapped otherwise there may be unexpected behavior.
write_overlapped<T: PipeSendable>( &mut self, buf: &[T], overlapped_wrapper: &mut OverlappedWrapper, ) -> Result<()>555 pub fn write_overlapped<T: PipeSendable>(
556 &mut self,
557 buf: &[T],
558 overlapped_wrapper: &mut OverlappedWrapper,
559 ) -> Result<()> {
560 if overlapped_wrapper.in_use {
561 return Err(std::io::Error::new(
562 std::io::ErrorKind::InvalidInput,
563 "Overlapped struct already in use",
564 ));
565 }
566 overlapped_wrapper.in_use = true;
567
568 PipeConnection::write_internal(
569 &self.handle,
570 buf,
571 Some(&mut overlapped_wrapper.overlapped),
572 )?;
573 Ok(())
574 }
575
576 /// Helper for `write_overlapped` and `write`.
write_internal<T: PipeSendable>( handle: &SafeDescriptor, buf: &[T], overlapped: Option<&mut OVERLAPPED>, ) -> Result<usize>577 fn write_internal<T: PipeSendable>(
578 handle: &SafeDescriptor,
579 buf: &[T],
580 overlapped: Option<&mut OVERLAPPED>,
581 ) -> Result<usize> {
582 let bytes_to_write: DWORD = mem::size_of_val(buf) as DWORD;
583 let is_overlapped = overlapped.is_some();
584
585 // Safe because buf points to a valid region of memory whose size we have computed,
586 // pipe has not been closed (as it's managed by this object), and we check the return
587 // value for any errors
588 unsafe {
589 let mut bytes_written: DWORD = 0;
590 let success_flag = WriteFile(
591 handle.as_raw_descriptor(),
592 buf.as_ptr() as LPCVOID,
593 bytes_to_write,
594 match overlapped {
595 Some(_) => std::ptr::null_mut(),
596 None => &mut bytes_written,
597 },
598 match overlapped {
599 Some(v) => v,
600 None => std::ptr::null_mut(),
601 },
602 );
603
604 if success_flag == 0 {
605 let err = io::Error::last_os_error().raw_os_error().unwrap() as u32;
606 if err == ERROR_IO_PENDING && is_overlapped {
607 return Ok(0);
608 }
609 Err(io::Error::last_os_error())
610 } else {
611 Ok(bytes_written as usize)
612 }
613 }
614 }
615
616 /// Sets the blocking mode on the pipe.
set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()>617 pub fn set_blocking(&mut self, blocking_mode: &BlockingMode) -> io::Result<()> {
618 let mut client_mode = DWORD::from(blocking_mode) | self.framing_mode.to_readmode();
619 self.blocking_mode = *blocking_mode;
620
621 // Safe because the pipe has not been closed (it is managed by this object).
622 unsafe { set_named_pipe_handle_state(self.handle.as_raw_descriptor(), &mut client_mode) }
623 }
624
625 /// For a server named pipe, waits for a client to connect
wait_for_client_connection(&self) -> Result<()>626 pub fn wait_for_client_connection(&self) -> Result<()> {
627 // Safe because the handle is valid and we're checking the return
628 // code according to the documentation
629 unsafe {
630 let success_flag = ConnectNamedPipe(
631 self.as_raw_descriptor(),
632 /* lpOverlapped= */ ptr::null_mut(),
633 );
634 if success_flag == 0 && GetLastError() != ERROR_PIPE_CONNECTED {
635 return Err(io::Error::last_os_error());
636 }
637 }
638 Ok(())
639 }
640
641 /// Used for overlapped read and write operations.
642 ///
643 /// This will block until the ReadFile or WriteFile operation that also took in
644 /// `overlapped_wrapper` is complete, assuming `overlapped_wrapper` was created from
645 /// `create_overlapped_struct` or that OVERLAPPED.hEvent is set. This will also get
646 /// the number of bytes that were read or written.
get_overlapped_result( &mut self, overlapped_wrapper: &mut OverlappedWrapper, ) -> io::Result<u32>647 pub fn get_overlapped_result(
648 &mut self,
649 overlapped_wrapper: &mut OverlappedWrapper,
650 ) -> io::Result<u32> {
651 if !overlapped_wrapper.in_use {
652 return Err(std::io::Error::new(
653 std::io::ErrorKind::InvalidInput,
654 "Overlapped struct is not in use",
655 ));
656 }
657 let mut size_transferred = 0;
658 // Safe as long as `overlapped_struct` isn't copied and also contains a valid event.
659 // Also the named pipe handle must created with `FILE_FLAG_OVERLAPPED`.
660 let res = unsafe {
661 GetOverlappedResult(
662 self.handle.as_raw_descriptor(),
663 &mut *overlapped_wrapper.overlapped,
664 &mut size_transferred,
665 TRUE,
666 )
667 };
668 overlapped_wrapper.in_use = false;
669 if res == 0 {
670 Err(io::Error::last_os_error())
671 } else {
672 Ok(size_transferred)
673 }
674 }
675
676 /// Creates a valid `OVERLAPPED` struct used to pass into `ReadFile` and `WriteFile` in order
677 /// to perform asynchronous I/O. When passing in the OVERLAPPED struct, the Event object
678 /// returned must not be dropped.
679 ///
680 /// There is an option to create the event object and set it to the `hEvent` field. If hEvent
681 /// is not set and the named pipe handle was created with `FILE_FLAG_OVERLAPPED`, then the file
682 /// handle will be signaled when the operation is complete. In other words, you can use
683 /// `WaitForSingleObject` on the file handle. Not setting an event is highly discouraged by
684 /// Microsoft though.
create_overlapped_struct(include_event: bool) -> Result<OverlappedWrapper>685 pub fn create_overlapped_struct(include_event: bool) -> Result<OverlappedWrapper> {
686 let mut overlapped = OVERLAPPED::default();
687 let h_event = if include_event {
688 Some(Event::new()?)
689 } else {
690 None
691 };
692 overlapped.hEvent = h_event.as_ref().unwrap().as_raw_descriptor();
693 Ok(OverlappedWrapper {
694 overlapped: Box::new(overlapped),
695 h_event,
696 in_use: false,
697 })
698 }
699
700 /// Cancels I/O Operations in the current process. Since `lpOverlapped` is null, this will
701 /// cancel all I/O requests for the file handle passed in.
cancel_io(&mut self) -> Result<()>702 pub fn cancel_io(&mut self) -> Result<()> {
703 let res = unsafe {
704 CancelIoEx(
705 self.handle.as_raw_descriptor(),
706 /* lpOverlapped= */ std::ptr::null_mut(),
707 )
708 };
709 if res == 0 {
710 Err(io::Error::last_os_error())
711 } else {
712 Ok(())
713 }
714 }
715
716 /// Get the framing mode of the pipe.
get_framing_mode(&self) -> FramingMode717 pub fn get_framing_mode(&self) -> FramingMode {
718 self.framing_mode
719 }
720
721 /// Returns metadata about the connected NamedPipe.
get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo>722 pub fn get_info(&self, is_server_connection: bool) -> Result<NamedPipeInfo> {
723 let mut flags: u32 = 0;
724 // Marked mutable because they are mutated in a system call
725 #[allow(unused_mut)]
726 let mut incoming_buffer_size: u32 = 0;
727 #[allow(unused_mut)]
728 let mut outgoing_buffer_size: u32 = 0;
729 #[allow(unused_mut)]
730 let mut max_instances: u32 = 0;
731 // Client side with BYTE type are default flags
732 if is_server_connection {
733 flags |= 0x00000001 /* PIPE_SERVER_END */
734 }
735 if self.framing_mode == FramingMode::Message {
736 flags |= 0x00000004 /* PIPE_TYPE_MESSAGE */
737 }
738 // Safe because we have allocated all pointers and own
739 // them as mutable.
740 let res = unsafe {
741 GetNamedPipeInfo(
742 self.as_raw_descriptor(),
743 flags as *mut u32,
744 outgoing_buffer_size as *mut u32,
745 incoming_buffer_size as *mut u32,
746 max_instances as *mut u32,
747 )
748 };
749
750 if res == 0 {
751 Err(io::Error::last_os_error())
752 } else {
753 Ok(NamedPipeInfo {
754 outgoing_buffer_size,
755 incoming_buffer_size,
756 max_instances,
757 })
758 }
759 }
760
761 /// For a server pipe, flush the pipe contents. This will
762 /// block until the pipe is cleared by the client. Only
763 /// call this if you are sure the client is reading the
764 /// data!
flush_data_blocking(&self) -> Result<()>765 pub fn flush_data_blocking(&self) -> Result<()> {
766 // Safe because the only buffers interacted with are
767 // outside of Rust memory
768 let res = unsafe { FlushFileBuffers(self.as_raw_descriptor()) };
769 if res == 0 {
770 Err(io::Error::last_os_error())
771 } else {
772 Ok(())
773 }
774 }
775 }
776
777 impl AsRawDescriptor for PipeConnection {
as_raw_descriptor(&self) -> RawDescriptor778 fn as_raw_descriptor(&self) -> RawDescriptor {
779 self.handle.as_raw_descriptor()
780 }
781 }
782
783 impl IntoRawDescriptor for PipeConnection {
into_raw_descriptor(self) -> RawDescriptor784 fn into_raw_descriptor(self) -> RawDescriptor {
785 self.handle.into_raw_descriptor()
786 }
787 }
788
789 unsafe impl Send for PipeConnection {}
790 unsafe impl Sync for PipeConnection {}
791
792 impl io::Read for PipeConnection {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>793 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
794 // This is safe because PipeConnection::read is always safe for u8
795 unsafe { PipeConnection::read(self, buf) }
796 }
797 }
798
799 impl io::Write for PipeConnection {
write(&mut self, buf: &[u8]) -> io::Result<usize>800 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
801 PipeConnection::write(self, buf)
802 }
803
flush(&mut self) -> io::Result<()>804 fn flush(&mut self) -> io::Result<()> {
805 Ok(())
806 }
807 }
808
809 /// A simple data struct representing
810 /// metadata about a NamedPipe.
811 pub struct NamedPipeInfo {
812 pub outgoing_buffer_size: u32,
813 pub incoming_buffer_size: u32,
814 pub max_instances: u32,
815 }
816
817 #[cfg(test)]
818 mod tests {
819 use super::*;
820
821 #[test]
duplex_pipe_stream()822 fn duplex_pipe_stream() {
823 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
824
825 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
826 unsafe {
827 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
828 println!("{}", dir);
829
830 sender.write(&[75, 77, 54, 82, 76, 65]).unwrap();
831
832 // Smaller than what we sent so we get multiple chunks
833 let mut recv_buffer: [u8; 4] = [0; 4];
834
835 let mut size = receiver.read(&mut recv_buffer).unwrap();
836 assert_eq!(size, 4);
837 assert_eq!(recv_buffer, [75, 77, 54, 82]);
838
839 size = receiver.read(&mut recv_buffer).unwrap();
840 assert_eq!(size, 2);
841 assert_eq!(recv_buffer[0..2], [76, 65]);
842 }
843 }
844 }
845
846 #[test]
available_byte_count_byte_mode()847 fn available_byte_count_byte_mode() {
848 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
849 p1.write(&[1, 23, 45]).unwrap();
850 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
851
852 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
853 // yield the same value.
854 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
855 }
856
857 #[test]
available_byte_count_message_mode()858 fn available_byte_count_message_mode() {
859 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
860 p1.write(&[1, 23, 45]).unwrap();
861 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
862
863 // PeekNamedPipe should NOT touch the data in the pipe. So if we call it again, it should
864 // yield the same value.
865 assert_eq!(p2.get_available_byte_count().unwrap(), 3);
866 }
867
868 #[test]
available_byte_count_message_mode_multiple_messages()869 fn available_byte_count_message_mode_multiple_messages() {
870 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
871 p1.write(&[1, 2, 3]).unwrap();
872 p1.write(&[4, 5]).unwrap();
873 assert_eq!(p2.get_available_byte_count().unwrap(), 5);
874 }
875
876 #[test]
duplex_pipe_message()877 fn duplex_pipe_message() {
878 let (p1, p2) = pair(&FramingMode::Message, &BlockingMode::Wait, 0).unwrap();
879
880 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
881 unsafe {
882 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
883 println!("{}", dir);
884
885 // Send 2 messages so that we can check that message framing works
886 sender.write(&[1, 23, 45]).unwrap();
887 sender.write(&[67, 89, 10]).unwrap();
888
889 let mut recv_buffer: [u8; 5] = [0; 5]; // Larger than required for messages
890
891 let mut size = receiver.read(&mut recv_buffer).unwrap();
892 assert_eq!(size, 3);
893 assert_eq!(recv_buffer[0..3], [1, 23, 45]);
894
895 size = receiver.read(&mut recv_buffer).unwrap();
896 assert_eq!(size, 3);
897 assert_eq!(recv_buffer[0..3], [67, 89, 10]);
898 }
899 }
900 }
901
902 #[cfg(test)]
duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection)903 fn duplex_nowait_helper(p1: &PipeConnection, p2: &PipeConnection) {
904 let mut recv_buffer: [u8; 1] = [0; 1];
905
906 // Test both forward and reverse direction since the underlying APIs are a bit asymmetrical
907 unsafe {
908 for (dir, sender, receiver) in [("1 -> 2", &p1, &p2), ("2 -> 1", &p2, &p1)].iter() {
909 println!("{}", dir);
910 sender.write(&[1]).unwrap();
911 assert_eq!(receiver.read(&mut recv_buffer).unwrap(), 1); // Should succeed!
912 assert_eq!(
913 receiver.read(&mut recv_buffer).unwrap_err().kind(),
914 std::io::ErrorKind::WouldBlock
915 );
916 }
917 }
918 }
919
920 #[test]
duplex_nowait()921 fn duplex_nowait() {
922 let (p1, p2) = pair(&FramingMode::Byte, &BlockingMode::NoWait, 0).unwrap();
923 duplex_nowait_helper(&p1, &p2);
924 }
925
926 #[test]
duplex_nowait_set_after_creation()927 fn duplex_nowait_set_after_creation() {
928 // Tests non blocking setting after pipe creation
929 let (mut p1, mut p2) = pair(&FramingMode::Byte, &BlockingMode::Wait, 0).unwrap();
930 p1.set_blocking(&BlockingMode::NoWait)
931 .expect("Failed to set blocking mode on pipe p1");
932 p2.set_blocking(&BlockingMode::NoWait)
933 .expect("Failed to set blocking mode on pipe p2");
934 duplex_nowait_helper(&p1, &p2);
935 }
936
937 #[test]
duplex_overlapped()938 fn duplex_overlapped() {
939 let pipe_name = generate_pipe_name();
940
941 let mut p1 = create_server_pipe(
942 &pipe_name,
943 &FramingMode::Message,
944 &BlockingMode::Wait,
945 /* timeout= */ 0,
946 /* buffer_size= */ 1000,
947 /* overlapped= */ true,
948 )
949 .unwrap();
950
951 let mut p2 = create_client_pipe(
952 &pipe_name,
953 &FramingMode::Message,
954 &BlockingMode::Wait,
955 /* overlapped= */ true,
956 )
957 .unwrap();
958
959 // Safe because `read_overlapped` can be called since overlapped struct is created.
960 unsafe {
961 let mut p1_overlapped_wrapper =
962 PipeConnection::create_overlapped_struct(/* include_event= */ true).unwrap();
963 p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut p1_overlapped_wrapper)
964 .unwrap();
965 let size = p1
966 .get_overlapped_result(&mut p1_overlapped_wrapper)
967 .unwrap();
968 assert_eq!(size, 6);
969
970 let mut recv_buffer: [u8; 6] = [0; 6];
971
972 let mut p2_overlapped_wrapper =
973 PipeConnection::create_overlapped_struct(/* include_event= */ true).unwrap();
974 p2.read_overlapped(&mut recv_buffer, &mut p2_overlapped_wrapper)
975 .unwrap();
976 let size = p2
977 .get_overlapped_result(&mut p2_overlapped_wrapper)
978 .unwrap();
979 assert_eq!(size, 6);
980 assert_eq!(recv_buffer, [75, 77, 54, 82, 76, 65]);
981 }
982 }
983
984 #[test]
duplex_overlapped_test_in_use()985 fn duplex_overlapped_test_in_use() {
986 let pipe_name = generate_pipe_name();
987
988 let mut p1 = create_server_pipe(
989 &pipe_name,
990 &FramingMode::Message,
991 &BlockingMode::Wait,
992 /* timeout= */ 0,
993 /* buffer_size= */ 1000,
994 /* overlapped= */ true,
995 )
996 .unwrap();
997
998 let mut p2 = create_client_pipe(
999 &pipe_name,
1000 &FramingMode::Message,
1001 &BlockingMode::Wait,
1002 /* overlapped= */ true,
1003 )
1004 .unwrap();
1005 let mut overlapped_wrapper =
1006 PipeConnection::create_overlapped_struct(/* include_event= */ true).unwrap();
1007
1008 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1009 assert!(res.is_err());
1010
1011 let res = p1.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper);
1012 assert!(res.is_ok());
1013
1014 let res = p2.write_overlapped(&[75, 77, 54, 82, 76, 65], &mut overlapped_wrapper);
1015 assert!(res.is_err());
1016
1017 let mut recv_buffer: [u8; 6] = [0; 6];
1018 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1019 assert!(res.is_err());
1020
1021 let res = p1.get_overlapped_result(&mut overlapped_wrapper);
1022 assert!(res.is_ok());
1023
1024 let mut recv_buffer: [u8; 6] = [0; 6];
1025 let res = unsafe { p2.read_overlapped(&mut recv_buffer, &mut overlapped_wrapper) };
1026 assert!(res.is_ok());
1027 }
1028
generate_pipe_name() -> String1029 fn generate_pipe_name() -> String {
1030 format!(
1031 r"\\.\pipe\test-ipc-pipe-name.rand{}",
1032 rand::thread_rng().gen::<u64>(),
1033 )
1034 }
1035 }
1036