1 //! Tokio support for [Windows named pipes].
2 //!
3 //! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4
5 use std::ffi::c_void;
6 use std::ffi::OsStr;
7 use std::io::{self, Read, Write};
8 use std::pin::Pin;
9 use std::ptr;
10 use std::task::{Context, Poll};
11
12 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
14
15 cfg_io_util! {
16 use bytes::BufMut;
17 }
18
19 // Hide imports which are not used when generating documentation.
20 #[cfg(not(docsrs))]
21 mod doc {
22 pub(super) use crate::os::windows::ffi::OsStrExt;
23 pub(super) mod windows_sys {
24 pub(crate) use windows_sys::{
25 Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26 Win32::System::SystemServices::*,
27 };
28 }
29 pub(super) use mio::windows as mio_windows;
30 }
31
32 // NB: none of these shows up in public API, so don't document them.
33 #[cfg(docsrs)]
34 mod doc {
35 pub(super) mod mio_windows {
36 pub type NamedPipe = crate::doc::NotDefinedHere;
37 }
38 }
39
40 use self::doc::*;
41
42 /// A [Windows named pipe] server.
43 ///
44 /// Accepting client connections involves creating a server with
45 /// [`ServerOptions::create`] and waiting for clients to connect using
46 /// [`NamedPipeServer::connect`].
47 ///
48 /// To avoid having clients sporadically fail with
49 /// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50 /// ensure that at least one server instance is available at all times. This
51 /// means that the typical listen loop for a server is a bit involved, because
52 /// we have to ensure that we never drop a server accidentally while a client
53 /// might connect.
54 ///
55 /// So a correctly implemented server looks like this:
56 ///
57 /// ```no_run
58 /// use std::io;
59 /// use tokio::net::windows::named_pipe::ServerOptions;
60 ///
61 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62 ///
63 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
64 /// // The first server needs to be constructed early so that clients can
65 /// // be correctly connected. Otherwise calling .wait will cause the client to
66 /// // error.
67 /// //
68 /// // Here we also make use of `first_pipe_instance`, which will ensure that
69 /// // there are no other servers up and running already.
70 /// let mut server = ServerOptions::new()
71 /// .first_pipe_instance(true)
72 /// .create(PIPE_NAME)?;
73 ///
74 /// // Spawn the server loop.
75 /// let server = tokio::spawn(async move {
76 /// loop {
77 /// // Wait for a client to connect.
78 /// let connected = server.connect().await?;
79 ///
80 /// // Construct the next server to be connected before sending the one
81 /// // we already have of onto a task. This ensures that the server
82 /// // isn't closed (after it's done in the task) before a new one is
83 /// // available. Otherwise the client might error with
84 /// // `io::ErrorKind::NotFound`.
85 /// server = ServerOptions::new().create(PIPE_NAME)?;
86 ///
87 /// let client = tokio::spawn(async move {
88 /// /* use the connected client */
89 /// # Ok::<_, std::io::Error>(())
90 /// });
91 /// # if true { break } // needed for type inference to work
92 /// }
93 ///
94 /// Ok::<_, io::Error>(())
95 /// });
96 ///
97 /// /* do something else not server related here */
98 /// # Ok(()) }
99 /// ```
100 ///
101 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102 #[derive(Debug)]
103 pub struct NamedPipeServer {
104 io: PollEvented<mio_windows::NamedPipe>,
105 }
106
107 impl NamedPipeServer {
108 /// Constructs a new named pipe server from the specified raw handle.
109 ///
110 /// This function will consume ownership of the handle given, passing
111 /// responsibility for closing the handle to the returned object.
112 ///
113 /// This function is also unsafe as the primitives currently returned have
114 /// the contract that they are the sole owner of the file descriptor they
115 /// are wrapping. Usage of this function could accidentally allow violating
116 /// this contract which can cause memory unsafety in code that relies on it
117 /// being true.
118 ///
119 /// # Errors
120 ///
121 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123 ///
124 /// [Tokio Runtime]: crate::runtime::Runtime
125 /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>126 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128
129 Ok(Self {
130 io: PollEvented::new(named_pipe)?,
131 })
132 }
133
134 /// Retrieves information about the named pipe the server is associated
135 /// with.
136 ///
137 /// ```no_run
138 /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139 ///
140 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141 ///
142 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143 /// let server = ServerOptions::new()
144 /// .pipe_mode(PipeMode::Message)
145 /// .max_instances(5)
146 /// .create(PIPE_NAME)?;
147 ///
148 /// let server_info = server.info()?;
149 ///
150 /// assert_eq!(server_info.end, PipeEnd::Server);
151 /// assert_eq!(server_info.mode, PipeMode::Message);
152 /// assert_eq!(server_info.max_instances, 5);
153 /// # Ok(()) }
154 /// ```
info(&self) -> io::Result<PipeInfo>155 pub fn info(&self) -> io::Result<PipeInfo> {
156 // Safety: we're ensuring the lifetime of the named pipe.
157 unsafe { named_pipe_info(self.io.as_raw_handle()) }
158 }
159
160 /// Enables a named pipe server process to wait for a client process to
161 /// connect to an instance of a named pipe. A client process connects by
162 /// creating a named pipe with the same name.
163 ///
164 /// This corresponds to the [`ConnectNamedPipe`] system call.
165 ///
166 /// # Cancel safety
167 ///
168 /// This method is cancellation safe in the sense that if it is used as the
169 /// event in a [`select!`](crate::select) statement and some other branch
170 /// completes first, then no connection events have been lost.
171 ///
172 /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// use tokio::net::windows::named_pipe::ServerOptions;
178 ///
179 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180 ///
181 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182 /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183 ///
184 /// // Wait for a client to connect.
185 /// pipe.connect().await?;
186 ///
187 /// // Use the connected client...
188 /// # Ok(()) }
189 /// ```
connect(&self) -> io::Result<()>190 pub async fn connect(&self) -> io::Result<()> {
191 match self.io.connect() {
192 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
193 self.io
194 .registration()
195 .async_io(Interest::WRITABLE, || self.io.connect())
196 .await
197 }
198 x => x,
199 }
200 }
201
202 /// Disconnects the server end of a named pipe instance from a client
203 /// process.
204 ///
205 /// ```
206 /// use tokio::io::AsyncWriteExt;
207 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
208 /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
209 ///
210 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
211 ///
212 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
213 /// let server = ServerOptions::new()
214 /// .create(PIPE_NAME)?;
215 ///
216 /// let mut client = ClientOptions::new()
217 /// .open(PIPE_NAME)?;
218 ///
219 /// // Wait for a client to become connected.
220 /// server.connect().await?;
221 ///
222 /// // Forcibly disconnect the client.
223 /// server.disconnect()?;
224 ///
225 /// // Write fails with an OS-specific error after client has been
226 /// // disconnected.
227 /// let e = client.write(b"ping").await.unwrap_err();
228 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
229 /// # Ok(()) }
230 /// ```
disconnect(&self) -> io::Result<()>231 pub fn disconnect(&self) -> io::Result<()> {
232 self.io.disconnect()
233 }
234
235 /// Waits for any of the requested ready states.
236 ///
237 /// This function is usually paired with `try_read()` or `try_write()`. It
238 /// can be used to concurrently read / write to the same pipe on a single
239 /// task without splitting the pipe.
240 ///
241 /// The function may complete without the pipe being ready. This is a
242 /// false-positive and attempting an operation will return with
243 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
244 /// [`Ready`] set, so you should always check the returned value and possibly
245 /// wait again if the requested states are not set.
246 ///
247 /// # Examples
248 ///
249 /// Concurrently read and write to the pipe on the same task without
250 /// splitting.
251 ///
252 /// ```no_run
253 /// use tokio::io::Interest;
254 /// use tokio::net::windows::named_pipe;
255 /// use std::error::Error;
256 /// use std::io;
257 ///
258 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
259 ///
260 /// #[tokio::main]
261 /// async fn main() -> Result<(), Box<dyn Error>> {
262 /// let server = named_pipe::ServerOptions::new()
263 /// .create(PIPE_NAME)?;
264 ///
265 /// loop {
266 /// let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
267 ///
268 /// if ready.is_readable() {
269 /// let mut data = vec![0; 1024];
270 /// // Try to read data, this may still fail with `WouldBlock`
271 /// // if the readiness event is a false positive.
272 /// match server.try_read(&mut data) {
273 /// Ok(n) => {
274 /// println!("read {} bytes", n);
275 /// }
276 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
277 /// continue;
278 /// }
279 /// Err(e) => {
280 /// return Err(e.into());
281 /// }
282 /// }
283 /// }
284 ///
285 /// if ready.is_writable() {
286 /// // Try to write data, this may still fail with `WouldBlock`
287 /// // if the readiness event is a false positive.
288 /// match server.try_write(b"hello world") {
289 /// Ok(n) => {
290 /// println!("write {} bytes", n);
291 /// }
292 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
293 /// continue;
294 /// }
295 /// Err(e) => {
296 /// return Err(e.into());
297 /// }
298 /// }
299 /// }
300 /// }
301 /// }
302 /// ```
ready(&self, interest: Interest) -> io::Result<Ready>303 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
304 let event = self.io.registration().readiness(interest).await?;
305 Ok(event.ready)
306 }
307
308 /// Waits for the pipe to become readable.
309 ///
310 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
311 /// paired with `try_read()`.
312 ///
313 /// # Examples
314 ///
315 /// ```no_run
316 /// use tokio::net::windows::named_pipe;
317 /// use std::error::Error;
318 /// use std::io;
319 ///
320 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
321 ///
322 /// #[tokio::main]
323 /// async fn main() -> Result<(), Box<dyn Error>> {
324 /// let server = named_pipe::ServerOptions::new()
325 /// .create(PIPE_NAME)?;
326 ///
327 /// let mut msg = vec![0; 1024];
328 ///
329 /// loop {
330 /// // Wait for the pipe to be readable
331 /// server.readable().await?;
332 ///
333 /// // Try to read data, this may still fail with `WouldBlock`
334 /// // if the readiness event is a false positive.
335 /// match server.try_read(&mut msg) {
336 /// Ok(n) => {
337 /// msg.truncate(n);
338 /// break;
339 /// }
340 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
341 /// continue;
342 /// }
343 /// Err(e) => {
344 /// return Err(e.into());
345 /// }
346 /// }
347 /// }
348 ///
349 /// println!("GOT = {:?}", msg);
350 /// Ok(())
351 /// }
352 /// ```
readable(&self) -> io::Result<()>353 pub async fn readable(&self) -> io::Result<()> {
354 self.ready(Interest::READABLE).await?;
355 Ok(())
356 }
357
358 /// Polls for read readiness.
359 ///
360 /// If the pipe is not currently ready for reading, this method will
361 /// store a clone of the `Waker` from the provided `Context`. When the pipe
362 /// becomes ready for reading, `Waker::wake` will be called on the waker.
363 ///
364 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
365 /// the `Waker` from the `Context` passed to the most recent call is
366 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
367 /// second, independent waker.)
368 ///
369 /// This function is intended for cases where creating and pinning a future
370 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
371 /// preferred, as this supports polling from multiple tasks at once.
372 ///
373 /// # Return value
374 ///
375 /// The function returns:
376 ///
377 /// * `Poll::Pending` if the pipe is not ready for reading.
378 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
379 /// * `Poll::Ready(Err(e))` if an error is encountered.
380 ///
381 /// # Errors
382 ///
383 /// This function may encounter any standard I/O error except `WouldBlock`.
384 ///
385 /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>386 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
387 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
388 }
389
390 /// Tries to read data from the pipe into the provided buffer, returning how
391 /// many bytes were read.
392 ///
393 /// Receives any pending data from the pipe but does not wait for new data
394 /// to arrive. On success, returns the number of bytes read. Because
395 /// `try_read()` is non-blocking, the buffer does not have to be stored by
396 /// the async task and can exist entirely on the stack.
397 ///
398 /// Usually, [`readable()`] or [`ready()`] is used with this function.
399 ///
400 /// [`readable()`]: NamedPipeServer::readable()
401 /// [`ready()`]: NamedPipeServer::ready()
402 ///
403 /// # Return
404 ///
405 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
406 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
407 ///
408 /// 1. The pipe's read half is closed and will no longer yield data.
409 /// 2. The specified buffer was 0 bytes in length.
410 ///
411 /// If the pipe is not ready to read data,
412 /// `Err(io::ErrorKind::WouldBlock)` is returned.
413 ///
414 /// # Examples
415 ///
416 /// ```no_run
417 /// use tokio::net::windows::named_pipe;
418 /// use std::error::Error;
419 /// use std::io;
420 ///
421 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
422 ///
423 /// #[tokio::main]
424 /// async fn main() -> Result<(), Box<dyn Error>> {
425 /// let server = named_pipe::ServerOptions::new()
426 /// .create(PIPE_NAME)?;
427 ///
428 /// loop {
429 /// // Wait for the pipe to be readable
430 /// server.readable().await?;
431 ///
432 /// // Creating the buffer **after** the `await` prevents it from
433 /// // being stored in the async task.
434 /// let mut buf = [0; 4096];
435 ///
436 /// // Try to read data, this may still fail with `WouldBlock`
437 /// // if the readiness event is a false positive.
438 /// match server.try_read(&mut buf) {
439 /// Ok(0) => break,
440 /// Ok(n) => {
441 /// println!("read {} bytes", n);
442 /// }
443 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
444 /// continue;
445 /// }
446 /// Err(e) => {
447 /// return Err(e.into());
448 /// }
449 /// }
450 /// }
451 ///
452 /// Ok(())
453 /// }
454 /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>455 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
456 self.io
457 .registration()
458 .try_io(Interest::READABLE, || (&*self.io).read(buf))
459 }
460
461 /// Tries to read data from the pipe into the provided buffers, returning
462 /// how many bytes were read.
463 ///
464 /// Data is copied to fill each buffer in order, with the final buffer
465 /// written to possibly being only partially filled. This method behaves
466 /// equivalently to a single call to [`try_read()`] with concatenated
467 /// buffers.
468 ///
469 /// Receives any pending data from the pipe but does not wait for new data
470 /// to arrive. On success, returns the number of bytes read. Because
471 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
472 /// stored by the async task and can exist entirely on the stack.
473 ///
474 /// Usually, [`readable()`] or [`ready()`] is used with this function.
475 ///
476 /// [`try_read()`]: NamedPipeServer::try_read()
477 /// [`readable()`]: NamedPipeServer::readable()
478 /// [`ready()`]: NamedPipeServer::ready()
479 ///
480 /// # Return
481 ///
482 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
483 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
484 /// and will no longer yield data. If the pipe is not ready to read data
485 /// `Err(io::ErrorKind::WouldBlock)` is returned.
486 ///
487 /// # Examples
488 ///
489 /// ```no_run
490 /// use tokio::net::windows::named_pipe;
491 /// use std::error::Error;
492 /// use std::io::{self, IoSliceMut};
493 ///
494 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
495 ///
496 /// #[tokio::main]
497 /// async fn main() -> Result<(), Box<dyn Error>> {
498 /// let server = named_pipe::ServerOptions::new()
499 /// .create(PIPE_NAME)?;
500 ///
501 /// loop {
502 /// // Wait for the pipe to be readable
503 /// server.readable().await?;
504 ///
505 /// // Creating the buffer **after** the `await` prevents it from
506 /// // being stored in the async task.
507 /// let mut buf_a = [0; 512];
508 /// let mut buf_b = [0; 1024];
509 /// let mut bufs = [
510 /// IoSliceMut::new(&mut buf_a),
511 /// IoSliceMut::new(&mut buf_b),
512 /// ];
513 ///
514 /// // Try to read data, this may still fail with `WouldBlock`
515 /// // if the readiness event is a false positive.
516 /// match server.try_read_vectored(&mut bufs) {
517 /// Ok(0) => break,
518 /// Ok(n) => {
519 /// println!("read {} bytes", n);
520 /// }
521 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
522 /// continue;
523 /// }
524 /// Err(e) => {
525 /// return Err(e.into());
526 /// }
527 /// }
528 /// }
529 ///
530 /// Ok(())
531 /// }
532 /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>533 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
534 self.io
535 .registration()
536 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
537 }
538
539 cfg_io_util! {
540 /// Tries to read data from the stream into the provided buffer, advancing the
541 /// buffer's internal cursor, returning how many bytes were read.
542 ///
543 /// Receives any pending data from the pipe but does not wait for new data
544 /// to arrive. On success, returns the number of bytes read. Because
545 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
546 /// the async task and can exist entirely on the stack.
547 ///
548 /// Usually, [`readable()`] or [`ready()`] is used with this function.
549 ///
550 /// [`readable()`]: NamedPipeServer::readable()
551 /// [`ready()`]: NamedPipeServer::ready()
552 ///
553 /// # Return
554 ///
555 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
556 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
557 /// and will no longer yield data. If the stream is not ready to read data
558 /// `Err(io::ErrorKind::WouldBlock)` is returned.
559 ///
560 /// # Examples
561 ///
562 /// ```no_run
563 /// use tokio::net::windows::named_pipe;
564 /// use std::error::Error;
565 /// use std::io;
566 ///
567 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
568 ///
569 /// #[tokio::main]
570 /// async fn main() -> Result<(), Box<dyn Error>> {
571 /// let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
572 ///
573 /// loop {
574 /// // Wait for the pipe to be readable
575 /// server.readable().await?;
576 ///
577 /// let mut buf = Vec::with_capacity(4096);
578 ///
579 /// // Try to read data, this may still fail with `WouldBlock`
580 /// // if the readiness event is a false positive.
581 /// match server.try_read_buf(&mut buf) {
582 /// Ok(0) => break,
583 /// Ok(n) => {
584 /// println!("read {} bytes", n);
585 /// }
586 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
587 /// continue;
588 /// }
589 /// Err(e) => {
590 /// return Err(e.into());
591 /// }
592 /// }
593 /// }
594 ///
595 /// Ok(())
596 /// }
597 /// ```
598 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
599 self.io.registration().try_io(Interest::READABLE, || {
600 use std::io::Read;
601
602 let dst = buf.chunk_mut();
603 let dst =
604 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
605
606 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
607 // buffer.
608 let n = (&*self.io).read(dst)?;
609
610 unsafe {
611 buf.advance_mut(n);
612 }
613
614 Ok(n)
615 })
616 }
617 }
618
619 /// Waits for the pipe to become writable.
620 ///
621 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
622 /// paired with `try_write()`.
623 ///
624 /// # Examples
625 ///
626 /// ```no_run
627 /// use tokio::net::windows::named_pipe;
628 /// use std::error::Error;
629 /// use std::io;
630 ///
631 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
632 ///
633 /// #[tokio::main]
634 /// async fn main() -> Result<(), Box<dyn Error>> {
635 /// let server = named_pipe::ServerOptions::new()
636 /// .create(PIPE_NAME)?;
637 ///
638 /// loop {
639 /// // Wait for the pipe to be writable
640 /// server.writable().await?;
641 ///
642 /// // Try to write data, this may still fail with `WouldBlock`
643 /// // if the readiness event is a false positive.
644 /// match server.try_write(b"hello world") {
645 /// Ok(n) => {
646 /// break;
647 /// }
648 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
649 /// continue;
650 /// }
651 /// Err(e) => {
652 /// return Err(e.into());
653 /// }
654 /// }
655 /// }
656 ///
657 /// Ok(())
658 /// }
659 /// ```
writable(&self) -> io::Result<()>660 pub async fn writable(&self) -> io::Result<()> {
661 self.ready(Interest::WRITABLE).await?;
662 Ok(())
663 }
664
665 /// Polls for write readiness.
666 ///
667 /// If the pipe is not currently ready for writing, this method will
668 /// store a clone of the `Waker` from the provided `Context`. When the pipe
669 /// becomes ready for writing, `Waker::wake` will be called on the waker.
670 ///
671 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
672 /// the `Waker` from the `Context` passed to the most recent call is
673 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
674 /// second, independent waker.)
675 ///
676 /// This function is intended for cases where creating and pinning a future
677 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
678 /// preferred, as this supports polling from multiple tasks at once.
679 ///
680 /// # Return value
681 ///
682 /// The function returns:
683 ///
684 /// * `Poll::Pending` if the pipe is not ready for writing.
685 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
686 /// * `Poll::Ready(Err(e))` if an error is encountered.
687 ///
688 /// # Errors
689 ///
690 /// This function may encounter any standard I/O error except `WouldBlock`.
691 ///
692 /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>693 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
694 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
695 }
696
697 /// Tries to write a buffer to the pipe, returning how many bytes were
698 /// written.
699 ///
700 /// The function will attempt to write the entire contents of `buf`, but
701 /// only part of the buffer may be written.
702 ///
703 /// This function is usually paired with `writable()`.
704 ///
705 /// # Return
706 ///
707 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
708 /// number of bytes written. If the pipe is not ready to write data,
709 /// `Err(io::ErrorKind::WouldBlock)` is returned.
710 ///
711 /// # Examples
712 ///
713 /// ```no_run
714 /// use tokio::net::windows::named_pipe;
715 /// use std::error::Error;
716 /// use std::io;
717 ///
718 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
719 ///
720 /// #[tokio::main]
721 /// async fn main() -> Result<(), Box<dyn Error>> {
722 /// let server = named_pipe::ServerOptions::new()
723 /// .create(PIPE_NAME)?;
724 ///
725 /// loop {
726 /// // Wait for the pipe to be writable
727 /// server.writable().await?;
728 ///
729 /// // Try to write data, this may still fail with `WouldBlock`
730 /// // if the readiness event is a false positive.
731 /// match server.try_write(b"hello world") {
732 /// Ok(n) => {
733 /// break;
734 /// }
735 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
736 /// continue;
737 /// }
738 /// Err(e) => {
739 /// return Err(e.into());
740 /// }
741 /// }
742 /// }
743 ///
744 /// Ok(())
745 /// }
746 /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>747 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
748 self.io
749 .registration()
750 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
751 }
752
753 /// Tries to write several buffers to the pipe, returning how many bytes
754 /// were written.
755 ///
756 /// Data is written from each buffer in order, with the final buffer read
757 /// from possible being only partially consumed. This method behaves
758 /// equivalently to a single call to [`try_write()`] with concatenated
759 /// buffers.
760 ///
761 /// This function is usually paired with `writable()`.
762 ///
763 /// [`try_write()`]: NamedPipeServer::try_write()
764 ///
765 /// # Return
766 ///
767 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
768 /// number of bytes written. If the pipe is not ready to write data,
769 /// `Err(io::ErrorKind::WouldBlock)` is returned.
770 ///
771 /// # Examples
772 ///
773 /// ```no_run
774 /// use tokio::net::windows::named_pipe;
775 /// use std::error::Error;
776 /// use std::io;
777 ///
778 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
779 ///
780 /// #[tokio::main]
781 /// async fn main() -> Result<(), Box<dyn Error>> {
782 /// let server = named_pipe::ServerOptions::new()
783 /// .create(PIPE_NAME)?;
784 ///
785 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
786 ///
787 /// loop {
788 /// // Wait for the pipe to be writable
789 /// server.writable().await?;
790 ///
791 /// // Try to write data, this may still fail with `WouldBlock`
792 /// // if the readiness event is a false positive.
793 /// match server.try_write_vectored(&bufs) {
794 /// Ok(n) => {
795 /// break;
796 /// }
797 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
798 /// continue;
799 /// }
800 /// Err(e) => {
801 /// return Err(e.into());
802 /// }
803 /// }
804 /// }
805 ///
806 /// Ok(())
807 /// }
808 /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>809 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
810 self.io
811 .registration()
812 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
813 }
814
815 /// Tries to read or write from the pipe using a user-provided IO operation.
816 ///
817 /// If the pipe is ready, the provided closure is called. The closure
818 /// should attempt to perform IO operation from the pipe by manually
819 /// calling the appropriate syscall. If the operation fails because the
820 /// pipe is not actually ready, then the closure should return a
821 /// `WouldBlock` error and the readiness flag is cleared. The return value
822 /// of the closure is then returned by `try_io`.
823 ///
824 /// If the pipe is not ready, then the closure is not called
825 /// and a `WouldBlock` error is returned.
826 ///
827 /// The closure should only return a `WouldBlock` error if it has performed
828 /// an IO operation on the pipe that failed due to the pipe not being
829 /// ready. Returning a `WouldBlock` error in any other situation will
830 /// incorrectly clear the readiness flag, which can cause the pipe to
831 /// behave incorrectly.
832 ///
833 /// The closure should not perform the IO operation using any of the
834 /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
835 /// the readiness flag and can cause the pipe to behave incorrectly.
836 ///
837 /// This method is not intended to be used with combined interests.
838 /// The closure should perform only one type of IO operation, so it should not
839 /// require more than one ready state. This method may panic or sleep forever
840 /// if it is called with a combined interest.
841 ///
842 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
843 ///
844 /// [`readable()`]: NamedPipeServer::readable()
845 /// [`writable()`]: NamedPipeServer::writable()
846 /// [`ready()`]: NamedPipeServer::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>847 pub fn try_io<R>(
848 &self,
849 interest: Interest,
850 f: impl FnOnce() -> io::Result<R>,
851 ) -> io::Result<R> {
852 self.io.registration().try_io(interest, f)
853 }
854 }
855
856 impl AsyncRead for NamedPipeServer {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>857 fn poll_read(
858 self: Pin<&mut Self>,
859 cx: &mut Context<'_>,
860 buf: &mut ReadBuf<'_>,
861 ) -> Poll<io::Result<()>> {
862 unsafe { self.io.poll_read(cx, buf) }
863 }
864 }
865
866 impl AsyncWrite for NamedPipeServer {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>867 fn poll_write(
868 self: Pin<&mut Self>,
869 cx: &mut Context<'_>,
870 buf: &[u8],
871 ) -> Poll<io::Result<usize>> {
872 self.io.poll_write(cx, buf)
873 }
874
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>875 fn poll_write_vectored(
876 self: Pin<&mut Self>,
877 cx: &mut Context<'_>,
878 bufs: &[io::IoSlice<'_>],
879 ) -> Poll<io::Result<usize>> {
880 self.io.poll_write_vectored(cx, bufs)
881 }
882
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>883 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
884 Poll::Ready(Ok(()))
885 }
886
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>887 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
888 self.poll_flush(cx)
889 }
890 }
891
892 impl AsRawHandle for NamedPipeServer {
as_raw_handle(&self) -> RawHandle893 fn as_raw_handle(&self) -> RawHandle {
894 self.io.as_raw_handle()
895 }
896 }
897
898 /// A [Windows named pipe] client.
899 ///
900 /// Constructed using [`ClientOptions::open`].
901 ///
902 /// Connecting a client correctly involves a few steps. When connecting through
903 /// [`ClientOptions::open`], it might error indicating one of two things:
904 ///
905 /// * [`std::io::ErrorKind::NotFound`] - There is no server available.
906 /// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
907 /// for a while and try again.
908 ///
909 /// So a correctly implemented client looks like this:
910 ///
911 /// ```no_run
912 /// use std::time::Duration;
913 /// use tokio::net::windows::named_pipe::ClientOptions;
914 /// use tokio::time;
915 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
916 ///
917 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
918 ///
919 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
920 /// let client = loop {
921 /// match ClientOptions::new().open(PIPE_NAME) {
922 /// Ok(client) => break client,
923 /// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
924 /// Err(e) => return Err(e),
925 /// }
926 ///
927 /// time::sleep(Duration::from_millis(50)).await;
928 /// };
929 ///
930 /// /* use the connected client */
931 /// # Ok(()) }
932 /// ```
933 ///
934 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
935 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
936 #[derive(Debug)]
937 pub struct NamedPipeClient {
938 io: PollEvented<mio_windows::NamedPipe>,
939 }
940
941 impl NamedPipeClient {
942 /// Constructs a new named pipe client from the specified raw handle.
943 ///
944 /// This function will consume ownership of the handle given, passing
945 /// responsibility for closing the handle to the returned object.
946 ///
947 /// This function is also unsafe as the primitives currently returned have
948 /// the contract that they are the sole owner of the file descriptor they
949 /// are wrapping. Usage of this function could accidentally allow violating
950 /// this contract which can cause memory unsafety in code that relies on it
951 /// being true.
952 ///
953 /// # Errors
954 ///
955 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
956 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
957 ///
958 /// [Tokio Runtime]: crate::runtime::Runtime
959 /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>960 pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
961 let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
962
963 Ok(Self {
964 io: PollEvented::new(named_pipe)?,
965 })
966 }
967
968 /// Retrieves information about the named pipe the client is associated
969 /// with.
970 ///
971 /// ```no_run
972 /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
973 ///
974 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
975 ///
976 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
977 /// let client = ClientOptions::new()
978 /// .open(PIPE_NAME)?;
979 ///
980 /// let client_info = client.info()?;
981 ///
982 /// assert_eq!(client_info.end, PipeEnd::Client);
983 /// assert_eq!(client_info.mode, PipeMode::Message);
984 /// assert_eq!(client_info.max_instances, 5);
985 /// # Ok(()) }
986 /// ```
info(&self) -> io::Result<PipeInfo>987 pub fn info(&self) -> io::Result<PipeInfo> {
988 // Safety: we're ensuring the lifetime of the named pipe.
989 unsafe { named_pipe_info(self.io.as_raw_handle()) }
990 }
991
992 /// Waits for any of the requested ready states.
993 ///
994 /// This function is usually paired with `try_read()` or `try_write()`. It
995 /// can be used to concurrently read / write to the same pipe on a single
996 /// task without splitting the pipe.
997 ///
998 /// The function may complete without the pipe being ready. This is a
999 /// false-positive and attempting an operation will return with
1000 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1001 /// [`Ready`] set, so you should always check the returned value and possibly
1002 /// wait again if the requested states are not set.
1003 ///
1004 /// # Examples
1005 ///
1006 /// Concurrently read and write to the pipe on the same task without
1007 /// splitting.
1008 ///
1009 /// ```no_run
1010 /// use tokio::io::Interest;
1011 /// use tokio::net::windows::named_pipe;
1012 /// use std::error::Error;
1013 /// use std::io;
1014 ///
1015 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1016 ///
1017 /// #[tokio::main]
1018 /// async fn main() -> Result<(), Box<dyn Error>> {
1019 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1020 ///
1021 /// loop {
1022 /// let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1023 ///
1024 /// if ready.is_readable() {
1025 /// let mut data = vec![0; 1024];
1026 /// // Try to read data, this may still fail with `WouldBlock`
1027 /// // if the readiness event is a false positive.
1028 /// match client.try_read(&mut data) {
1029 /// Ok(n) => {
1030 /// println!("read {} bytes", n);
1031 /// }
1032 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1033 /// continue;
1034 /// }
1035 /// Err(e) => {
1036 /// return Err(e.into());
1037 /// }
1038 /// }
1039 /// }
1040 ///
1041 /// if ready.is_writable() {
1042 /// // Try to write data, this may still fail with `WouldBlock`
1043 /// // if the readiness event is a false positive.
1044 /// match client.try_write(b"hello world") {
1045 /// Ok(n) => {
1046 /// println!("write {} bytes", n);
1047 /// }
1048 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1049 /// continue;
1050 /// }
1051 /// Err(e) => {
1052 /// return Err(e.into());
1053 /// }
1054 /// }
1055 /// }
1056 /// }
1057 /// }
1058 /// ```
ready(&self, interest: Interest) -> io::Result<Ready>1059 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1060 let event = self.io.registration().readiness(interest).await?;
1061 Ok(event.ready)
1062 }
1063
1064 /// Waits for the pipe to become readable.
1065 ///
1066 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1067 /// paired with `try_read()`.
1068 ///
1069 /// # Examples
1070 ///
1071 /// ```no_run
1072 /// use tokio::net::windows::named_pipe;
1073 /// use std::error::Error;
1074 /// use std::io;
1075 ///
1076 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1077 ///
1078 /// #[tokio::main]
1079 /// async fn main() -> Result<(), Box<dyn Error>> {
1080 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1081 ///
1082 /// let mut msg = vec![0; 1024];
1083 ///
1084 /// loop {
1085 /// // Wait for the pipe to be readable
1086 /// client.readable().await?;
1087 ///
1088 /// // Try to read data, this may still fail with `WouldBlock`
1089 /// // if the readiness event is a false positive.
1090 /// match client.try_read(&mut msg) {
1091 /// Ok(n) => {
1092 /// msg.truncate(n);
1093 /// break;
1094 /// }
1095 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1096 /// continue;
1097 /// }
1098 /// Err(e) => {
1099 /// return Err(e.into());
1100 /// }
1101 /// }
1102 /// }
1103 ///
1104 /// println!("GOT = {:?}", msg);
1105 /// Ok(())
1106 /// }
1107 /// ```
readable(&self) -> io::Result<()>1108 pub async fn readable(&self) -> io::Result<()> {
1109 self.ready(Interest::READABLE).await?;
1110 Ok(())
1111 }
1112
1113 /// Polls for read readiness.
1114 ///
1115 /// If the pipe is not currently ready for reading, this method will
1116 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1117 /// becomes ready for reading, `Waker::wake` will be called on the waker.
1118 ///
1119 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1120 /// the `Waker` from the `Context` passed to the most recent call is
1121 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1122 /// second, independent waker.)
1123 ///
1124 /// This function is intended for cases where creating and pinning a future
1125 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1126 /// preferred, as this supports polling from multiple tasks at once.
1127 ///
1128 /// # Return value
1129 ///
1130 /// The function returns:
1131 ///
1132 /// * `Poll::Pending` if the pipe is not ready for reading.
1133 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1134 /// * `Poll::Ready(Err(e))` if an error is encountered.
1135 ///
1136 /// # Errors
1137 ///
1138 /// This function may encounter any standard I/O error except `WouldBlock`.
1139 ///
1140 /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1141 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1142 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1143 }
1144
1145 /// Tries to read data from the pipe into the provided buffer, returning how
1146 /// many bytes were read.
1147 ///
1148 /// Receives any pending data from the pipe but does not wait for new data
1149 /// to arrive. On success, returns the number of bytes read. Because
1150 /// `try_read()` is non-blocking, the buffer does not have to be stored by
1151 /// the async task and can exist entirely on the stack.
1152 ///
1153 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1154 ///
1155 /// [`readable()`]: NamedPipeClient::readable()
1156 /// [`ready()`]: NamedPipeClient::ready()
1157 ///
1158 /// # Return
1159 ///
1160 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1161 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1162 ///
1163 /// 1. The pipe's read half is closed and will no longer yield data.
1164 /// 2. The specified buffer was 0 bytes in length.
1165 ///
1166 /// If the pipe is not ready to read data,
1167 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1168 ///
1169 /// # Examples
1170 ///
1171 /// ```no_run
1172 /// use tokio::net::windows::named_pipe;
1173 /// use std::error::Error;
1174 /// use std::io;
1175 ///
1176 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1177 ///
1178 /// #[tokio::main]
1179 /// async fn main() -> Result<(), Box<dyn Error>> {
1180 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1181 ///
1182 /// loop {
1183 /// // Wait for the pipe to be readable
1184 /// client.readable().await?;
1185 ///
1186 /// // Creating the buffer **after** the `await` prevents it from
1187 /// // being stored in the async task.
1188 /// let mut buf = [0; 4096];
1189 ///
1190 /// // Try to read data, this may still fail with `WouldBlock`
1191 /// // if the readiness event is a false positive.
1192 /// match client.try_read(&mut buf) {
1193 /// Ok(0) => break,
1194 /// Ok(n) => {
1195 /// println!("read {} bytes", n);
1196 /// }
1197 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1198 /// continue;
1199 /// }
1200 /// Err(e) => {
1201 /// return Err(e.into());
1202 /// }
1203 /// }
1204 /// }
1205 ///
1206 /// Ok(())
1207 /// }
1208 /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1209 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1210 self.io
1211 .registration()
1212 .try_io(Interest::READABLE, || (&*self.io).read(buf))
1213 }
1214
1215 /// Tries to read data from the pipe into the provided buffers, returning
1216 /// how many bytes were read.
1217 ///
1218 /// Data is copied to fill each buffer in order, with the final buffer
1219 /// written to possibly being only partially filled. This method behaves
1220 /// equivalently to a single call to [`try_read()`] with concatenated
1221 /// buffers.
1222 ///
1223 /// Receives any pending data from the pipe but does not wait for new data
1224 /// to arrive. On success, returns the number of bytes read. Because
1225 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1226 /// stored by the async task and can exist entirely on the stack.
1227 ///
1228 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1229 ///
1230 /// [`try_read()`]: NamedPipeClient::try_read()
1231 /// [`readable()`]: NamedPipeClient::readable()
1232 /// [`ready()`]: NamedPipeClient::ready()
1233 ///
1234 /// # Return
1235 ///
1236 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1237 /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1238 /// and will no longer yield data. If the pipe is not ready to read data
1239 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1240 ///
1241 /// # Examples
1242 ///
1243 /// ```no_run
1244 /// use tokio::net::windows::named_pipe;
1245 /// use std::error::Error;
1246 /// use std::io::{self, IoSliceMut};
1247 ///
1248 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1249 ///
1250 /// #[tokio::main]
1251 /// async fn main() -> Result<(), Box<dyn Error>> {
1252 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1253 ///
1254 /// loop {
1255 /// // Wait for the pipe to be readable
1256 /// client.readable().await?;
1257 ///
1258 /// // Creating the buffer **after** the `await` prevents it from
1259 /// // being stored in the async task.
1260 /// let mut buf_a = [0; 512];
1261 /// let mut buf_b = [0; 1024];
1262 /// let mut bufs = [
1263 /// IoSliceMut::new(&mut buf_a),
1264 /// IoSliceMut::new(&mut buf_b),
1265 /// ];
1266 ///
1267 /// // Try to read data, this may still fail with `WouldBlock`
1268 /// // if the readiness event is a false positive.
1269 /// match client.try_read_vectored(&mut bufs) {
1270 /// Ok(0) => break,
1271 /// Ok(n) => {
1272 /// println!("read {} bytes", n);
1273 /// }
1274 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1275 /// continue;
1276 /// }
1277 /// Err(e) => {
1278 /// return Err(e.into());
1279 /// }
1280 /// }
1281 /// }
1282 ///
1283 /// Ok(())
1284 /// }
1285 /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1286 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1287 self.io
1288 .registration()
1289 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1290 }
1291
1292 cfg_io_util! {
1293 /// Tries to read data from the stream into the provided buffer, advancing the
1294 /// buffer's internal cursor, returning how many bytes were read.
1295 ///
1296 /// Receives any pending data from the pipe but does not wait for new data
1297 /// to arrive. On success, returns the number of bytes read. Because
1298 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1299 /// the async task and can exist entirely on the stack.
1300 ///
1301 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1302 ///
1303 /// [`readable()`]: NamedPipeClient::readable()
1304 /// [`ready()`]: NamedPipeClient::ready()
1305 ///
1306 /// # Return
1307 ///
1308 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1309 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1310 /// and will no longer yield data. If the stream is not ready to read data
1311 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1312 ///
1313 /// # Examples
1314 ///
1315 /// ```no_run
1316 /// use tokio::net::windows::named_pipe;
1317 /// use std::error::Error;
1318 /// use std::io;
1319 ///
1320 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1321 ///
1322 /// #[tokio::main]
1323 /// async fn main() -> Result<(), Box<dyn Error>> {
1324 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1325 ///
1326 /// loop {
1327 /// // Wait for the pipe to be readable
1328 /// client.readable().await?;
1329 ///
1330 /// let mut buf = Vec::with_capacity(4096);
1331 ///
1332 /// // Try to read data, this may still fail with `WouldBlock`
1333 /// // if the readiness event is a false positive.
1334 /// match client.try_read_buf(&mut buf) {
1335 /// Ok(0) => break,
1336 /// Ok(n) => {
1337 /// println!("read {} bytes", n);
1338 /// }
1339 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1340 /// continue;
1341 /// }
1342 /// Err(e) => {
1343 /// return Err(e.into());
1344 /// }
1345 /// }
1346 /// }
1347 ///
1348 /// Ok(())
1349 /// }
1350 /// ```
1351 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1352 self.io.registration().try_io(Interest::READABLE, || {
1353 use std::io::Read;
1354
1355 let dst = buf.chunk_mut();
1356 let dst =
1357 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1358
1359 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1360 // buffer.
1361 let n = (&*self.io).read(dst)?;
1362
1363 unsafe {
1364 buf.advance_mut(n);
1365 }
1366
1367 Ok(n)
1368 })
1369 }
1370 }
1371
1372 /// Waits for the pipe to become writable.
1373 ///
1374 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1375 /// paired with `try_write()`.
1376 ///
1377 /// # Examples
1378 ///
1379 /// ```no_run
1380 /// use tokio::net::windows::named_pipe;
1381 /// use std::error::Error;
1382 /// use std::io;
1383 ///
1384 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1385 ///
1386 /// #[tokio::main]
1387 /// async fn main() -> Result<(), Box<dyn Error>> {
1388 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1389 ///
1390 /// loop {
1391 /// // Wait for the pipe to be writable
1392 /// client.writable().await?;
1393 ///
1394 /// // Try to write data, this may still fail with `WouldBlock`
1395 /// // if the readiness event is a false positive.
1396 /// match client.try_write(b"hello world") {
1397 /// Ok(n) => {
1398 /// break;
1399 /// }
1400 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1401 /// continue;
1402 /// }
1403 /// Err(e) => {
1404 /// return Err(e.into());
1405 /// }
1406 /// }
1407 /// }
1408 ///
1409 /// Ok(())
1410 /// }
1411 /// ```
writable(&self) -> io::Result<()>1412 pub async fn writable(&self) -> io::Result<()> {
1413 self.ready(Interest::WRITABLE).await?;
1414 Ok(())
1415 }
1416
1417 /// Polls for write readiness.
1418 ///
1419 /// If the pipe is not currently ready for writing, this method will
1420 /// store a clone of the `Waker` from the provided `Context`. When the pipe
1421 /// becomes ready for writing, `Waker::wake` will be called on the waker.
1422 ///
1423 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1424 /// the `Waker` from the `Context` passed to the most recent call is
1425 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1426 /// second, independent waker.)
1427 ///
1428 /// This function is intended for cases where creating and pinning a future
1429 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1430 /// preferred, as this supports polling from multiple tasks at once.
1431 ///
1432 /// # Return value
1433 ///
1434 /// The function returns:
1435 ///
1436 /// * `Poll::Pending` if the pipe is not ready for writing.
1437 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1438 /// * `Poll::Ready(Err(e))` if an error is encountered.
1439 ///
1440 /// # Errors
1441 ///
1442 /// This function may encounter any standard I/O error except `WouldBlock`.
1443 ///
1444 /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1445 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1446 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1447 }
1448
1449 /// Tries to write a buffer to the pipe, returning how many bytes were
1450 /// written.
1451 ///
1452 /// The function will attempt to write the entire contents of `buf`, but
1453 /// only part of the buffer may be written.
1454 ///
1455 /// This function is usually paired with `writable()`.
1456 ///
1457 /// # Return
1458 ///
1459 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1460 /// number of bytes written. If the pipe is not ready to write data,
1461 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1462 ///
1463 /// # Examples
1464 ///
1465 /// ```no_run
1466 /// use tokio::net::windows::named_pipe;
1467 /// use std::error::Error;
1468 /// use std::io;
1469 ///
1470 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1471 ///
1472 /// #[tokio::main]
1473 /// async fn main() -> Result<(), Box<dyn Error>> {
1474 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1475 ///
1476 /// loop {
1477 /// // Wait for the pipe to be writable
1478 /// client.writable().await?;
1479 ///
1480 /// // Try to write data, this may still fail with `WouldBlock`
1481 /// // if the readiness event is a false positive.
1482 /// match client.try_write(b"hello world") {
1483 /// Ok(n) => {
1484 /// break;
1485 /// }
1486 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1487 /// continue;
1488 /// }
1489 /// Err(e) => {
1490 /// return Err(e.into());
1491 /// }
1492 /// }
1493 /// }
1494 ///
1495 /// Ok(())
1496 /// }
1497 /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>1498 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1499 self.io
1500 .registration()
1501 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1502 }
1503
1504 /// Tries to write several buffers to the pipe, returning how many bytes
1505 /// were written.
1506 ///
1507 /// Data is written from each buffer in order, with the final buffer read
1508 /// from possible being only partially consumed. This method behaves
1509 /// equivalently to a single call to [`try_write()`] with concatenated
1510 /// buffers.
1511 ///
1512 /// This function is usually paired with `writable()`.
1513 ///
1514 /// [`try_write()`]: NamedPipeClient::try_write()
1515 ///
1516 /// # Return
1517 ///
1518 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1519 /// number of bytes written. If the pipe is not ready to write data,
1520 /// `Err(io::ErrorKind::WouldBlock)` is returned.
1521 ///
1522 /// # Examples
1523 ///
1524 /// ```no_run
1525 /// use tokio::net::windows::named_pipe;
1526 /// use std::error::Error;
1527 /// use std::io;
1528 ///
1529 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1530 ///
1531 /// #[tokio::main]
1532 /// async fn main() -> Result<(), Box<dyn Error>> {
1533 /// let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1534 ///
1535 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1536 ///
1537 /// loop {
1538 /// // Wait for the pipe to be writable
1539 /// client.writable().await?;
1540 ///
1541 /// // Try to write data, this may still fail with `WouldBlock`
1542 /// // if the readiness event is a false positive.
1543 /// match client.try_write_vectored(&bufs) {
1544 /// Ok(n) => {
1545 /// break;
1546 /// }
1547 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1548 /// continue;
1549 /// }
1550 /// Err(e) => {
1551 /// return Err(e.into());
1552 /// }
1553 /// }
1554 /// }
1555 ///
1556 /// Ok(())
1557 /// }
1558 /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>1559 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1560 self.io
1561 .registration()
1562 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1563 }
1564
1565 /// Tries to read or write from the pipe using a user-provided IO operation.
1566 ///
1567 /// If the pipe is ready, the provided closure is called. The closure
1568 /// should attempt to perform IO operation from the pipe by manually
1569 /// calling the appropriate syscall. If the operation fails because the
1570 /// pipe is not actually ready, then the closure should return a
1571 /// `WouldBlock` error and the readiness flag is cleared. The return value
1572 /// of the closure is then returned by `try_io`.
1573 ///
1574 /// If the pipe is not ready, then the closure is not called
1575 /// and a `WouldBlock` error is returned.
1576 ///
1577 /// The closure should only return a `WouldBlock` error if it has performed
1578 /// an IO operation on the pipe that failed due to the pipe not being
1579 /// ready. Returning a `WouldBlock` error in any other situation will
1580 /// incorrectly clear the readiness flag, which can cause the pipe to
1581 /// behave incorrectly.
1582 ///
1583 /// The closure should not perform the IO operation using any of the methods
1584 /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1585 /// readiness flag and can cause the pipe to behave incorrectly.
1586 ///
1587 /// This method is not intended to be used with combined interests.
1588 /// The closure should perform only one type of IO operation, so it should not
1589 /// require more than one ready state. This method may panic or sleep forever
1590 /// if it is called with a combined interest.
1591 ///
1592 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1593 ///
1594 /// [`readable()`]: NamedPipeClient::readable()
1595 /// [`writable()`]: NamedPipeClient::writable()
1596 /// [`ready()`]: NamedPipeClient::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1597 pub fn try_io<R>(
1598 &self,
1599 interest: Interest,
1600 f: impl FnOnce() -> io::Result<R>,
1601 ) -> io::Result<R> {
1602 self.io.registration().try_io(interest, f)
1603 }
1604 }
1605
1606 impl AsyncRead for NamedPipeClient {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1607 fn poll_read(
1608 self: Pin<&mut Self>,
1609 cx: &mut Context<'_>,
1610 buf: &mut ReadBuf<'_>,
1611 ) -> Poll<io::Result<()>> {
1612 unsafe { self.io.poll_read(cx, buf) }
1613 }
1614 }
1615
1616 impl AsyncWrite for NamedPipeClient {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1617 fn poll_write(
1618 self: Pin<&mut Self>,
1619 cx: &mut Context<'_>,
1620 buf: &[u8],
1621 ) -> Poll<io::Result<usize>> {
1622 self.io.poll_write(cx, buf)
1623 }
1624
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1625 fn poll_write_vectored(
1626 self: Pin<&mut Self>,
1627 cx: &mut Context<'_>,
1628 bufs: &[io::IoSlice<'_>],
1629 ) -> Poll<io::Result<usize>> {
1630 self.io.poll_write_vectored(cx, bufs)
1631 }
1632
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>1633 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1634 Poll::Ready(Ok(()))
1635 }
1636
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1637 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1638 self.poll_flush(cx)
1639 }
1640 }
1641
1642 impl AsRawHandle for NamedPipeClient {
as_raw_handle(&self) -> RawHandle1643 fn as_raw_handle(&self) -> RawHandle {
1644 self.io.as_raw_handle()
1645 }
1646 }
1647
1648 // Helper to set a boolean flag as a bitfield.
1649 macro_rules! bool_flag {
1650 ($f:expr, $t:expr, $flag:expr) => {{
1651 let current = $f;
1652
1653 if $t {
1654 $f = current | $flag;
1655 } else {
1656 $f = current & !$flag;
1657 };
1658 }};
1659 }
1660
1661 /// A builder structure for construct a named pipe with named pipe-specific
1662 /// options. This is required to use for named pipe servers who wants to modify
1663 /// pipe-related options.
1664 ///
1665 /// See [`ServerOptions::create`].
1666 #[derive(Debug, Clone)]
1667 pub struct ServerOptions {
1668 open_mode: u32,
1669 pipe_mode: u32,
1670 max_instances: u32,
1671 out_buffer_size: u32,
1672 in_buffer_size: u32,
1673 default_timeout: u32,
1674 }
1675
1676 impl ServerOptions {
1677 /// Creates a new named pipe builder with the default settings.
1678 ///
1679 /// ```
1680 /// use tokio::net::windows::named_pipe::ServerOptions;
1681 ///
1682 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1683 ///
1684 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1685 /// let server = ServerOptions::new().create(PIPE_NAME)?;
1686 /// # Ok(()) }
1687 /// ```
new() -> ServerOptions1688 pub fn new() -> ServerOptions {
1689 ServerOptions {
1690 open_mode: windows_sys::PIPE_ACCESS_DUPLEX | windows_sys::FILE_FLAG_OVERLAPPED,
1691 pipe_mode: windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_REJECT_REMOTE_CLIENTS,
1692 max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1693 out_buffer_size: 65536,
1694 in_buffer_size: 65536,
1695 default_timeout: 0,
1696 }
1697 }
1698
1699 /// The pipe mode.
1700 ///
1701 /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1702 /// documentation of what each mode means.
1703 ///
1704 /// This corresponding to specifying [`dwPipeMode`].
1705 ///
1706 /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self1707 pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1708 let is_msg = matches!(pipe_mode, PipeMode::Message);
1709 // Pipe mode is implemented as a bit flag 0x4. Set is message and unset
1710 // is byte.
1711 bool_flag!(self.pipe_mode, is_msg, windows_sys::PIPE_TYPE_MESSAGE);
1712 self
1713 }
1714
1715 /// The flow of data in the pipe goes from client to server only.
1716 ///
1717 /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1718 ///
1719 /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1720 ///
1721 /// # Errors
1722 ///
1723 /// Server side prevents connecting by denying inbound access, client errors
1724 /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1725 /// the connection.
1726 ///
1727 /// ```
1728 /// use std::io;
1729 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1730 ///
1731 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1732 ///
1733 /// # #[tokio::main] async fn main() -> io::Result<()> {
1734 /// let _server = ServerOptions::new()
1735 /// .access_inbound(false)
1736 /// .create(PIPE_NAME)?;
1737 ///
1738 /// let e = ClientOptions::new()
1739 /// .open(PIPE_NAME)
1740 /// .unwrap_err();
1741 ///
1742 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1743 /// # Ok(()) }
1744 /// ```
1745 ///
1746 /// Disabling writing allows a client to connect, but errors with
1747 /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1748 ///
1749 /// ```
1750 /// use std::io;
1751 /// use tokio::io::AsyncWriteExt;
1752 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1753 ///
1754 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1755 ///
1756 /// # #[tokio::main] async fn main() -> io::Result<()> {
1757 /// let server = ServerOptions::new()
1758 /// .access_inbound(false)
1759 /// .create(PIPE_NAME)?;
1760 ///
1761 /// let mut client = ClientOptions::new()
1762 /// .write(false)
1763 /// .open(PIPE_NAME)?;
1764 ///
1765 /// server.connect().await?;
1766 ///
1767 /// let e = client.write(b"ping").await.unwrap_err();
1768 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1769 /// # Ok(()) }
1770 /// ```
1771 ///
1772 /// # Examples
1773 ///
1774 /// A unidirectional named pipe that only supports server-to-client
1775 /// communication.
1776 ///
1777 /// ```
1778 /// use std::io;
1779 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1780 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1781 ///
1782 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1783 ///
1784 /// # #[tokio::main] async fn main() -> io::Result<()> {
1785 /// let mut server = ServerOptions::new()
1786 /// .access_inbound(false)
1787 /// .create(PIPE_NAME)?;
1788 ///
1789 /// let mut client = ClientOptions::new()
1790 /// .write(false)
1791 /// .open(PIPE_NAME)?;
1792 ///
1793 /// server.connect().await?;
1794 ///
1795 /// let write = server.write_all(b"ping");
1796 ///
1797 /// let mut buf = [0u8; 4];
1798 /// let read = client.read_exact(&mut buf);
1799 ///
1800 /// let ((), read) = tokio::try_join!(write, read)?;
1801 ///
1802 /// assert_eq!(read, 4);
1803 /// assert_eq!(&buf[..], b"ping");
1804 /// # Ok(()) }
1805 /// ```
access_inbound(&mut self, allowed: bool) -> &mut Self1806 pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1807 bool_flag!(self.open_mode, allowed, windows_sys::PIPE_ACCESS_INBOUND);
1808 self
1809 }
1810
1811 /// The flow of data in the pipe goes from server to client only.
1812 ///
1813 /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1814 ///
1815 /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1816 ///
1817 /// # Errors
1818 ///
1819 /// Server side prevents connecting by denying outbound access, client
1820 /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1821 /// create the connection.
1822 ///
1823 /// ```
1824 /// use std::io;
1825 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1826 ///
1827 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1828 ///
1829 /// # #[tokio::main] async fn main() -> io::Result<()> {
1830 /// let server = ServerOptions::new()
1831 /// .access_outbound(false)
1832 /// .create(PIPE_NAME)?;
1833 ///
1834 /// let e = ClientOptions::new()
1835 /// .open(PIPE_NAME)
1836 /// .unwrap_err();
1837 ///
1838 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1839 /// # Ok(()) }
1840 /// ```
1841 ///
1842 /// Disabling reading allows a client to connect, but attempting to read
1843 /// will error with [`std::io::ErrorKind::PermissionDenied`].
1844 ///
1845 /// ```
1846 /// use std::io;
1847 /// use tokio::io::AsyncReadExt;
1848 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1849 ///
1850 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1851 ///
1852 /// # #[tokio::main] async fn main() -> io::Result<()> {
1853 /// let server = ServerOptions::new()
1854 /// .access_outbound(false)
1855 /// .create(PIPE_NAME)?;
1856 ///
1857 /// let mut client = ClientOptions::new()
1858 /// .read(false)
1859 /// .open(PIPE_NAME)?;
1860 ///
1861 /// server.connect().await?;
1862 ///
1863 /// let mut buf = [0u8; 4];
1864 /// let e = client.read(&mut buf).await.unwrap_err();
1865 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1866 /// # Ok(()) }
1867 /// ```
1868 ///
1869 /// # Examples
1870 ///
1871 /// A unidirectional named pipe that only supports client-to-server
1872 /// communication.
1873 ///
1874 /// ```
1875 /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1876 /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1877 ///
1878 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1879 ///
1880 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1881 /// let mut server = ServerOptions::new()
1882 /// .access_outbound(false)
1883 /// .create(PIPE_NAME)?;
1884 ///
1885 /// let mut client = ClientOptions::new()
1886 /// .read(false)
1887 /// .open(PIPE_NAME)?;
1888 ///
1889 /// server.connect().await?;
1890 ///
1891 /// let write = client.write_all(b"ping");
1892 ///
1893 /// let mut buf = [0u8; 4];
1894 /// let read = server.read_exact(&mut buf);
1895 ///
1896 /// let ((), read) = tokio::try_join!(write, read)?;
1897 ///
1898 /// println!("done reading and writing");
1899 ///
1900 /// assert_eq!(read, 4);
1901 /// assert_eq!(&buf[..], b"ping");
1902 /// # Ok(()) }
1903 /// ```
access_outbound(&mut self, allowed: bool) -> &mut Self1904 pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1905 bool_flag!(self.open_mode, allowed, windows_sys::PIPE_ACCESS_OUTBOUND);
1906 self
1907 }
1908
1909 /// If you attempt to create multiple instances of a pipe with this flag
1910 /// set, creation of the first server instance succeeds, but creation of any
1911 /// subsequent instances will fail with
1912 /// [`std::io::ErrorKind::PermissionDenied`].
1913 ///
1914 /// This option is intended to be used with servers that want to ensure that
1915 /// they are the only process listening for clients on a given named pipe.
1916 /// This is accomplished by enabling it for the first server instance
1917 /// created in a process.
1918 ///
1919 /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1920 ///
1921 /// # Errors
1922 ///
1923 /// If this option is set and more than one instance of the server for a
1924 /// given named pipe exists, calling [`create`] will fail with
1925 /// [`std::io::ErrorKind::PermissionDenied`].
1926 ///
1927 /// ```
1928 /// use std::io;
1929 /// use tokio::net::windows::named_pipe::ServerOptions;
1930 ///
1931 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
1932 ///
1933 /// # #[tokio::main] async fn main() -> io::Result<()> {
1934 /// let server1 = ServerOptions::new()
1935 /// .first_pipe_instance(true)
1936 /// .create(PIPE_NAME)?;
1937 ///
1938 /// // Second server errs, since it's not the first instance.
1939 /// let e = ServerOptions::new()
1940 /// .first_pipe_instance(true)
1941 /// .create(PIPE_NAME)
1942 /// .unwrap_err();
1943 ///
1944 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1945 /// # Ok(()) }
1946 /// ```
1947 ///
1948 /// # Examples
1949 ///
1950 /// ```
1951 /// use std::io;
1952 /// use tokio::net::windows::named_pipe::ServerOptions;
1953 ///
1954 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
1955 ///
1956 /// # #[tokio::main] async fn main() -> io::Result<()> {
1957 /// let mut builder = ServerOptions::new();
1958 /// builder.first_pipe_instance(true);
1959 ///
1960 /// let server = builder.create(PIPE_NAME)?;
1961 /// let e = builder.create(PIPE_NAME).unwrap_err();
1962 /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1963 /// drop(server);
1964 ///
1965 /// // OK: since, we've closed the other instance.
1966 /// let _server2 = builder.create(PIPE_NAME)?;
1967 /// # Ok(()) }
1968 /// ```
1969 ///
1970 /// [`create`]: ServerOptions::create
1971 /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
first_pipe_instance(&mut self, first: bool) -> &mut Self1972 pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
1973 bool_flag!(
1974 self.open_mode,
1975 first,
1976 windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE
1977 );
1978 self
1979 }
1980
1981 /// Requests permission to modify the pipe's discretionary access control list.
1982 ///
1983 /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
1984 ///
1985 /// # Examples
1986 ///
1987 /// ```
1988 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
1989 //
1990 /// use tokio::net::windows::named_pipe::ServerOptions;
1991 /// use windows_sys::{
1992 /// Win32::Foundation::ERROR_SUCCESS,
1993 /// Win32::Security::DACL_SECURITY_INFORMATION,
1994 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
1995 /// };
1996 ///
1997 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
1998 ///
1999 /// # #[tokio::main] async fn main() -> io::Result<()> {
2000 /// let mut pipe_template = ServerOptions::new();
2001 /// pipe_template.write_dac(true);
2002 /// let pipe = pipe_template.create(PIPE_NAME)?;
2003 ///
2004 /// unsafe {
2005 /// assert_eq!(
2006 /// ERROR_SUCCESS,
2007 /// SetSecurityInfo(
2008 /// pipe.as_raw_handle() as _,
2009 /// SE_KERNEL_OBJECT,
2010 /// DACL_SECURITY_INFORMATION,
2011 /// ptr::null_mut(),
2012 /// ptr::null_mut(),
2013 /// ptr::null_mut(),
2014 /// ptr::null_mut(),
2015 /// )
2016 /// );
2017 /// }
2018 ///
2019 /// # Ok(()) }
2020 /// ```
2021 ///
2022 /// ```
2023 /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2024 //
2025 /// use tokio::net::windows::named_pipe::ServerOptions;
2026 /// use windows_sys::{
2027 /// Win32::Foundation::ERROR_ACCESS_DENIED,
2028 /// Win32::Security::DACL_SECURITY_INFORMATION,
2029 /// Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2030 /// };
2031 ///
2032 /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2033 ///
2034 /// # #[tokio::main] async fn main() -> io::Result<()> {
2035 /// let mut pipe_template = ServerOptions::new();
2036 /// pipe_template.write_dac(false);
2037 /// let pipe = pipe_template.create(PIPE_NAME)?;
2038 ///
2039 /// unsafe {
2040 /// assert_eq!(
2041 /// ERROR_ACCESS_DENIED,
2042 /// SetSecurityInfo(
2043 /// pipe.as_raw_handle() as _,
2044 /// SE_KERNEL_OBJECT,
2045 /// DACL_SECURITY_INFORMATION,
2046 /// ptr::null_mut(),
2047 /// ptr::null_mut(),
2048 /// ptr::null_mut(),
2049 /// ptr::null_mut(),
2050 /// )
2051 /// );
2052 /// }
2053 ///
2054 /// # Ok(()) }
2055 /// ```
2056 ///
2057 /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_dac(&mut self, requested: bool) -> &mut Self2058 pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2059 bool_flag!(self.open_mode, requested, windows_sys::WRITE_DAC);
2060 self
2061 }
2062
2063 /// Requests permission to modify the pipe's owner.
2064 ///
2065 /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2066 ///
2067 /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_owner(&mut self, requested: bool) -> &mut Self2068 pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2069 bool_flag!(self.open_mode, requested, windows_sys::WRITE_OWNER);
2070 self
2071 }
2072
2073 /// Requests permission to modify the pipe's system access control list.
2074 ///
2075 /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2076 ///
2077 /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
access_system_security(&mut self, requested: bool) -> &mut Self2078 pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2079 bool_flag!(
2080 self.open_mode,
2081 requested,
2082 windows_sys::ACCESS_SYSTEM_SECURITY
2083 );
2084 self
2085 }
2086
2087 /// Indicates whether this server can accept remote clients or not. Remote
2088 /// clients are disabled by default.
2089 ///
2090 /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2091 ///
2092 /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
reject_remote_clients(&mut self, reject: bool) -> &mut Self2093 pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2094 bool_flag!(
2095 self.pipe_mode,
2096 reject,
2097 windows_sys::PIPE_REJECT_REMOTE_CLIENTS
2098 );
2099 self
2100 }
2101
2102 /// The maximum number of instances that can be created for this pipe. The
2103 /// first instance of the pipe can specify this value; the same number must
2104 /// be specified for other instances of the pipe. Acceptable values are in
2105 /// the range 1 through 254. The default value is unlimited.
2106 ///
2107 /// This corresponds to specifying [`nMaxInstances`].
2108 ///
2109 /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2110 ///
2111 /// # Errors
2112 ///
2113 /// The same numbers of `max_instances` have to be used by all servers. Any
2114 /// additional servers trying to be built which uses a mismatching value
2115 /// might error.
2116 ///
2117 /// ```
2118 /// use std::io;
2119 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2120 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2121 ///
2122 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2123 ///
2124 /// # #[tokio::main] async fn main() -> io::Result<()> {
2125 /// let mut server = ServerOptions::new();
2126 /// server.max_instances(2);
2127 ///
2128 /// let s1 = server.create(PIPE_NAME)?;
2129 /// let c1 = ClientOptions::new().open(PIPE_NAME);
2130 ///
2131 /// let s2 = server.create(PIPE_NAME)?;
2132 /// let c2 = ClientOptions::new().open(PIPE_NAME);
2133 ///
2134 /// // Too many servers!
2135 /// let e = server.create(PIPE_NAME).unwrap_err();
2136 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2137 ///
2138 /// // Still too many servers even if we specify a higher value!
2139 /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2140 /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2141 /// # Ok(()) }
2142 /// ```
2143 ///
2144 /// # Panics
2145 ///
2146 /// This function will panic if more than 254 instances are specified. If
2147 /// you do not wish to set an instance limit, leave it unspecified.
2148 ///
2149 /// ```should_panic
2150 /// use tokio::net::windows::named_pipe::ServerOptions;
2151 ///
2152 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2153 /// let builder = ServerOptions::new().max_instances(255);
2154 /// # Ok(()) }
2155 /// ```
2156 #[track_caller]
max_instances(&mut self, instances: usize) -> &mut Self2157 pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2158 assert!(instances < 255, "cannot specify more than 254 instances");
2159 self.max_instances = instances as u32;
2160 self
2161 }
2162
2163 /// The number of bytes to reserve for the output buffer.
2164 ///
2165 /// This corresponds to specifying [`nOutBufferSize`].
2166 ///
2167 /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
out_buffer_size(&mut self, buffer: u32) -> &mut Self2168 pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2169 self.out_buffer_size = buffer;
2170 self
2171 }
2172
2173 /// The number of bytes to reserve for the input buffer.
2174 ///
2175 /// This corresponds to specifying [`nInBufferSize`].
2176 ///
2177 /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
in_buffer_size(&mut self, buffer: u32) -> &mut Self2178 pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2179 self.in_buffer_size = buffer;
2180 self
2181 }
2182
2183 /// Creates the named pipe identified by `addr` for use as a server.
2184 ///
2185 /// This uses the [`CreateNamedPipe`] function.
2186 ///
2187 /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2188 ///
2189 /// # Errors
2190 ///
2191 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2192 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2193 ///
2194 /// [Tokio Runtime]: crate::runtime::Runtime
2195 /// [enabled I/O]: crate::runtime::Builder::enable_io
2196 ///
2197 /// # Examples
2198 ///
2199 /// ```
2200 /// use tokio::net::windows::named_pipe::ServerOptions;
2201 ///
2202 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2203 ///
2204 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2205 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2206 /// # Ok(()) }
2207 /// ```
create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer>2208 pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2209 // Safety: We're calling create_with_security_attributes_raw w/ a null
2210 // pointer which disables it.
2211 unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2212 }
2213
2214 /// Creates the named pipe identified by `addr` for use as a server.
2215 ///
2216 /// This is the same as [`create`] except that it supports providing the raw
2217 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2218 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2219 ///
2220 /// # Errors
2221 ///
2222 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2223 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2224 ///
2225 /// [Tokio Runtime]: crate::runtime::Runtime
2226 /// [enabled I/O]: crate::runtime::Builder::enable_io
2227 ///
2228 /// # Safety
2229 ///
2230 /// The `attrs` argument must either be null or point at a valid instance of
2231 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2232 /// behavior is identical to calling the [`create`] method.
2233 ///
2234 /// [`create`]: ServerOptions::create
2235 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2236 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
create_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeServer>2237 pub unsafe fn create_with_security_attributes_raw(
2238 &self,
2239 addr: impl AsRef<OsStr>,
2240 attrs: *mut c_void,
2241 ) -> io::Result<NamedPipeServer> {
2242 let addr = encode_addr(addr);
2243
2244 let h = windows_sys::CreateNamedPipeW(
2245 addr.as_ptr(),
2246 self.open_mode,
2247 self.pipe_mode,
2248 self.max_instances,
2249 self.out_buffer_size,
2250 self.in_buffer_size,
2251 self.default_timeout,
2252 attrs as *mut _,
2253 );
2254
2255 if h == windows_sys::INVALID_HANDLE_VALUE {
2256 return Err(io::Error::last_os_error());
2257 }
2258
2259 NamedPipeServer::from_raw_handle(h as _)
2260 }
2261 }
2262
2263 /// A builder suitable for building and interacting with named pipes from the
2264 /// client side.
2265 ///
2266 /// See [`ClientOptions::open`].
2267 #[derive(Debug, Clone)]
2268 pub struct ClientOptions {
2269 desired_access: u32,
2270 security_qos_flags: u32,
2271 }
2272
2273 impl ClientOptions {
2274 /// Creates a new named pipe builder with the default settings.
2275 ///
2276 /// ```
2277 /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2278 ///
2279 /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2280 ///
2281 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2282 /// // Server must be created in order for the client creation to succeed.
2283 /// let server = ServerOptions::new().create(PIPE_NAME)?;
2284 /// let client = ClientOptions::new().open(PIPE_NAME)?;
2285 /// # Ok(()) }
2286 /// ```
new() -> Self2287 pub fn new() -> Self {
2288 Self {
2289 desired_access: windows_sys::GENERIC_READ | windows_sys::GENERIC_WRITE,
2290 security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2291 | windows_sys::SECURITY_SQOS_PRESENT,
2292 }
2293 }
2294
2295 /// If the client supports reading data. This is enabled by default.
2296 ///
2297 /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2298 ///
2299 /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2300 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
read(&mut self, allowed: bool) -> &mut Self2301 pub fn read(&mut self, allowed: bool) -> &mut Self {
2302 bool_flag!(self.desired_access, allowed, windows_sys::GENERIC_READ);
2303 self
2304 }
2305
2306 /// If the created pipe supports writing data. This is enabled by default.
2307 ///
2308 /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2309 ///
2310 /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2311 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
write(&mut self, allowed: bool) -> &mut Self2312 pub fn write(&mut self, allowed: bool) -> &mut Self {
2313 bool_flag!(self.desired_access, allowed, windows_sys::GENERIC_WRITE);
2314 self
2315 }
2316
2317 /// Sets qos flags which are combined with other flags and attributes in the
2318 /// call to [`CreateFile`].
2319 ///
2320 /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2321 /// calling this function would override that value completely with the
2322 /// argument specified.
2323 ///
2324 /// When `security_qos_flags` is not set, a malicious program can gain the
2325 /// elevated privileges of a privileged Rust process when it allows opening
2326 /// user-specified paths, by tricking it into opening a named pipe. So
2327 /// arguably `security_qos_flags` should also be set when opening arbitrary
2328 /// paths. However the bits can then conflict with other flags, specifically
2329 /// `FILE_FLAG_OPEN_NO_RECALL`.
2330 ///
2331 /// For information about possible values, see [Impersonation Levels] on the
2332 /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2333 /// automatically when using this method.
2334 ///
2335 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2336 /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2337 /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
security_qos_flags(&mut self, flags: u32) -> &mut Self2338 pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2339 // See: https://github.com/rust-lang/rust/pull/58216
2340 self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2341 self
2342 }
2343
2344 /// Opens the named pipe identified by `addr`.
2345 ///
2346 /// This opens the client using [`CreateFile`] with the
2347 /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2348 ///
2349 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2350 ///
2351 /// # Errors
2352 ///
2353 /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2354 /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2355 ///
2356 /// There are a few errors you need to take into account when creating a
2357 /// named pipe on the client side:
2358 ///
2359 /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2360 /// does not exist. Presumably the server is not up.
2361 /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2362 /// but the server is not currently waiting for a connection. Please see the
2363 /// examples for how to check for this error.
2364 ///
2365 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2366 /// [enabled I/O]: crate::runtime::Builder::enable_io
2367 /// [Tokio Runtime]: crate::runtime::Runtime
2368 ///
2369 /// A connect loop that waits until a pipe becomes available looks like
2370 /// this:
2371 ///
2372 /// ```no_run
2373 /// use std::time::Duration;
2374 /// use tokio::net::windows::named_pipe::ClientOptions;
2375 /// use tokio::time;
2376 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2377 ///
2378 /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2379 ///
2380 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2381 /// let client = loop {
2382 /// match ClientOptions::new().open(PIPE_NAME) {
2383 /// Ok(client) => break client,
2384 /// Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2385 /// Err(e) => return Err(e),
2386 /// }
2387 ///
2388 /// time::sleep(Duration::from_millis(50)).await;
2389 /// };
2390 ///
2391 /// // use the connected client.
2392 /// # Ok(()) }
2393 /// ```
open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient>2394 pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2395 // Safety: We're calling open_with_security_attributes_raw w/ a null
2396 // pointer which disables it.
2397 unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2398 }
2399
2400 /// Opens the named pipe identified by `addr`.
2401 ///
2402 /// This is the same as [`open`] except that it supports providing the raw
2403 /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2404 /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2405 ///
2406 /// # Safety
2407 ///
2408 /// The `attrs` argument must either be null or point at a valid instance of
2409 /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2410 /// behavior is identical to calling the [`open`] method.
2411 ///
2412 /// [`open`]: ClientOptions::open
2413 /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2414 /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
open_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeClient>2415 pub unsafe fn open_with_security_attributes_raw(
2416 &self,
2417 addr: impl AsRef<OsStr>,
2418 attrs: *mut c_void,
2419 ) -> io::Result<NamedPipeClient> {
2420 let addr = encode_addr(addr);
2421
2422 // NB: We could use a platform specialized `OpenOptions` here, but since
2423 // we have access to windows_sys it ultimately doesn't hurt to use
2424 // `CreateFile` explicitly since it allows the use of our already
2425 // well-structured wide `addr` to pass into CreateFileW.
2426 let h = windows_sys::CreateFileW(
2427 addr.as_ptr(),
2428 self.desired_access,
2429 0,
2430 attrs as *mut _,
2431 windows_sys::OPEN_EXISTING,
2432 self.get_flags(),
2433 0,
2434 );
2435
2436 if h == windows_sys::INVALID_HANDLE_VALUE {
2437 return Err(io::Error::last_os_error());
2438 }
2439
2440 NamedPipeClient::from_raw_handle(h as _)
2441 }
2442
get_flags(&self) -> u322443 fn get_flags(&self) -> u32 {
2444 self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2445 }
2446 }
2447
2448 /// The pipe mode of a named pipe.
2449 ///
2450 /// Set through [`ServerOptions::pipe_mode`].
2451 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2452 #[non_exhaustive]
2453 pub enum PipeMode {
2454 /// Data is written to the pipe as a stream of bytes. The pipe does not
2455 /// distinguish bytes written during different write operations.
2456 ///
2457 /// Corresponds to [`PIPE_TYPE_BYTE`].
2458 ///
2459 /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2460 Byte,
2461 /// Data is written to the pipe as a stream of messages. The pipe treats the
2462 /// bytes written during each write operation as a message unit. Any reading
2463 /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2464 /// completely.
2465 ///
2466 /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2467 ///
2468 /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2469 /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2470 Message,
2471 }
2472
2473 /// Indicates the end of a named pipe.
2474 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2475 #[non_exhaustive]
2476 pub enum PipeEnd {
2477 /// The named pipe refers to the client end of a named pipe instance.
2478 ///
2479 /// Corresponds to [`PIPE_CLIENT_END`].
2480 ///
2481 /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2482 Client,
2483 /// The named pipe refers to the server end of a named pipe instance.
2484 ///
2485 /// Corresponds to [`PIPE_SERVER_END`].
2486 ///
2487 /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2488 Server,
2489 }
2490
2491 /// Information about a named pipe.
2492 ///
2493 /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2494 #[derive(Debug)]
2495 #[non_exhaustive]
2496 pub struct PipeInfo {
2497 /// Indicates the mode of a named pipe.
2498 pub mode: PipeMode,
2499 /// Indicates the end of a named pipe.
2500 pub end: PipeEnd,
2501 /// The maximum number of instances that can be created for this pipe.
2502 pub max_instances: u32,
2503 /// The number of bytes to reserve for the output buffer.
2504 pub out_buffer_size: u32,
2505 /// The number of bytes to reserve for the input buffer.
2506 pub in_buffer_size: u32,
2507 }
2508
2509 /// Encodes an address so that it is a null-terminated wide string.
encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]>2510 fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2511 let len = addr.as_ref().encode_wide().count();
2512 let mut vec = Vec::with_capacity(len + 1);
2513 vec.extend(addr.as_ref().encode_wide());
2514 vec.push(0);
2515 vec.into_boxed_slice()
2516 }
2517
2518 /// Internal function to get the info out of a raw named pipe.
named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo>2519 unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2520 let mut flags = 0;
2521 let mut out_buffer_size = 0;
2522 let mut in_buffer_size = 0;
2523 let mut max_instances = 0;
2524
2525 let result = windows_sys::GetNamedPipeInfo(
2526 handle as _,
2527 &mut flags,
2528 &mut out_buffer_size,
2529 &mut in_buffer_size,
2530 &mut max_instances,
2531 );
2532
2533 if result == 0 {
2534 return Err(io::Error::last_os_error());
2535 }
2536
2537 let mut end = PipeEnd::Client;
2538 let mut mode = PipeMode::Byte;
2539
2540 if flags & windows_sys::PIPE_SERVER_END != 0 {
2541 end = PipeEnd::Server;
2542 }
2543
2544 if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2545 mode = PipeMode::Message;
2546 }
2547
2548 Ok(PipeInfo {
2549 end,
2550 mode,
2551 out_buffer_size,
2552 in_buffer_size,
2553 max_instances,
2554 })
2555 }
2556
2557 #[cfg(test)]
2558 mod test {
2559 use self::windows_sys::{PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE};
2560 use super::*;
2561
2562 #[test]
opts_default_pipe_mode()2563 fn opts_default_pipe_mode() {
2564 let opts = ServerOptions::new();
2565 assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
2566 }
2567
2568 #[test]
opts_unset_reject_remote()2569 fn opts_unset_reject_remote() {
2570 let mut opts = ServerOptions::new();
2571 opts.reject_remote_clients(false);
2572 assert_eq!(opts.pipe_mode & PIPE_REJECT_REMOTE_CLIENTS, 0);
2573 }
2574
2575 #[test]
opts_set_pipe_mode_maintains_reject_remote_clients()2576 fn opts_set_pipe_mode_maintains_reject_remote_clients() {
2577 let mut opts = ServerOptions::new();
2578 opts.pipe_mode(PipeMode::Byte);
2579 assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
2580
2581 opts.reject_remote_clients(false);
2582 opts.pipe_mode(PipeMode::Byte);
2583 assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE);
2584
2585 opts.reject_remote_clients(true);
2586 opts.pipe_mode(PipeMode::Byte);
2587 assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
2588
2589 opts.reject_remote_clients(false);
2590 opts.pipe_mode(PipeMode::Message);
2591 assert_eq!(opts.pipe_mode, PIPE_TYPE_MESSAGE);
2592
2593 opts.reject_remote_clients(true);
2594 opts.pipe_mode(PipeMode::Message);
2595 assert_eq!(
2596 opts.pipe_mode,
2597 PIPE_TYPE_MESSAGE | PIPE_REJECT_REMOTE_CLIENTS
2598 );
2599 }
2600 }
2601