1 //! Tokio support for [Windows named pipes].
2 //!
3 //! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4 
5 use std::ffi::c_void;
6 use std::ffi::OsStr;
7 use std::io::{self, Read, Write};
8 use std::pin::Pin;
9 use std::ptr;
10 use std::task::{Context, Poll};
11 
12 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
14 
15 cfg_io_util! {
16     use bytes::BufMut;
17 }
18 
19 // Hide imports which are not used when generating documentation.
20 #[cfg(not(docsrs))]
21 mod doc {
22     pub(super) use crate::os::windows::ffi::OsStrExt;
23     pub(super) mod windows_sys {
24         pub(crate) use windows_sys::{
25             Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26             Win32::System::SystemServices::*,
27         };
28     }
29     pub(super) use mio::windows as mio_windows;
30 }
31 
32 // NB: none of these shows up in public API, so don't document them.
33 #[cfg(docsrs)]
34 mod doc {
35     pub(super) mod mio_windows {
36         pub type NamedPipe = crate::doc::NotDefinedHere;
37     }
38 }
39 
40 use self::doc::*;
41 
42 /// A [Windows named pipe] server.
43 ///
44 /// Accepting client connections involves creating a server with
45 /// [`ServerOptions::create`] and waiting for clients to connect using
46 /// [`NamedPipeServer::connect`].
47 ///
48 /// To avoid having clients sporadically fail with
49 /// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50 /// ensure that at least one server instance is available at all times. This
51 /// means that the typical listen loop for a server is a bit involved, because
52 /// we have to ensure that we never drop a server accidentally while a client
53 /// might connect.
54 ///
55 /// So a correctly implemented server looks like this:
56 ///
57 /// ```no_run
58 /// use std::io;
59 /// use tokio::net::windows::named_pipe::ServerOptions;
60 ///
61 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62 ///
63 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
64 /// // The first server needs to be constructed early so that clients can
65 /// // be correctly connected. Otherwise calling .wait will cause the client to
66 /// // error.
67 /// //
68 /// // Here we also make use of `first_pipe_instance`, which will ensure that
69 /// // there are no other servers up and running already.
70 /// let mut server = ServerOptions::new()
71 ///     .first_pipe_instance(true)
72 ///     .create(PIPE_NAME)?;
73 ///
74 /// // Spawn the server loop.
75 /// let server = tokio::spawn(async move {
76 ///     loop {
77 ///         // Wait for a client to connect.
78 ///         let connected = server.connect().await?;
79 ///
80 ///         // Construct the next server to be connected before sending the one
81 ///         // we already have of onto a task. This ensures that the server
82 ///         // isn't closed (after it's done in the task) before a new one is
83 ///         // available. Otherwise the client might error with
84 ///         // `io::ErrorKind::NotFound`.
85 ///         server = ServerOptions::new().create(PIPE_NAME)?;
86 ///
87 ///         let client = tokio::spawn(async move {
88 ///             /* use the connected client */
89 /// #           Ok::<_, std::io::Error>(())
90 ///         });
91 /// #       if true { break } // needed for type inference to work
92 ///     }
93 ///
94 ///     Ok::<_, io::Error>(())
95 /// });
96 ///
97 /// /* do something else not server related here */
98 /// # Ok(()) }
99 /// ```
100 ///
101 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102 #[derive(Debug)]
103 pub struct NamedPipeServer {
104     io: PollEvented<mio_windows::NamedPipe>,
105 }
106 
107 impl NamedPipeServer {
108     /// Constructs a new named pipe server from the specified raw handle.
109     ///
110     /// This function will consume ownership of the handle given, passing
111     /// responsibility for closing the handle to the returned object.
112     ///
113     /// This function is also unsafe as the primitives currently returned have
114     /// the contract that they are the sole owner of the file descriptor they
115     /// are wrapping. Usage of this function could accidentally allow violating
116     /// this contract which can cause memory unsafety in code that relies on it
117     /// being true.
118     ///
119     /// # Errors
120     ///
121     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123     ///
124     /// [Tokio Runtime]: crate::runtime::Runtime
125     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>126     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128 
129         Ok(Self {
130             io: PollEvented::new(named_pipe)?,
131         })
132     }
133 
134     /// Retrieves information about the named pipe the server is associated
135     /// with.
136     ///
137     /// ```no_run
138     /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139     ///
140     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141     ///
142     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143     /// let server = ServerOptions::new()
144     ///     .pipe_mode(PipeMode::Message)
145     ///     .max_instances(5)
146     ///     .create(PIPE_NAME)?;
147     ///
148     /// let server_info = server.info()?;
149     ///
150     /// assert_eq!(server_info.end, PipeEnd::Server);
151     /// assert_eq!(server_info.mode, PipeMode::Message);
152     /// assert_eq!(server_info.max_instances, 5);
153     /// # Ok(()) }
154     /// ```
info(&self) -> io::Result<PipeInfo>155     pub fn info(&self) -> io::Result<PipeInfo> {
156         // Safety: we're ensuring the lifetime of the named pipe.
157         unsafe { named_pipe_info(self.io.as_raw_handle()) }
158     }
159 
160     /// Enables a named pipe server process to wait for a client process to
161     /// connect to an instance of a named pipe. A client process connects by
162     /// creating a named pipe with the same name.
163     ///
164     /// This corresponds to the [`ConnectNamedPipe`] system call.
165     ///
166     /// # Cancel safety
167     ///
168     /// This method is cancellation safe in the sense that if it is used as the
169     /// event in a [`select!`](crate::select) statement and some other branch
170     /// completes first, then no connection events have been lost.
171     ///
172     /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173     ///
174     /// # Example
175     ///
176     /// ```no_run
177     /// use tokio::net::windows::named_pipe::ServerOptions;
178     ///
179     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180     ///
181     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182     /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183     ///
184     /// // Wait for a client to connect.
185     /// pipe.connect().await?;
186     ///
187     /// // Use the connected client...
188     /// # Ok(()) }
189     /// ```
connect(&self) -> io::Result<()>190     pub async fn connect(&self) -> io::Result<()> {
191         match self.io.connect() {
192             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
193                 self.io
194                     .registration()
195                     .async_io(Interest::WRITABLE, || self.io.connect())
196                     .await
197             }
198             x => x,
199         }
200     }
201 
202     /// Disconnects the server end of a named pipe instance from a client
203     /// process.
204     ///
205     /// ```
206     /// use tokio::io::AsyncWriteExt;
207     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
208     /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
209     ///
210     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
211     ///
212     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
213     /// let server = ServerOptions::new()
214     ///     .create(PIPE_NAME)?;
215     ///
216     /// let mut client = ClientOptions::new()
217     ///     .open(PIPE_NAME)?;
218     ///
219     /// // Wait for a client to become connected.
220     /// server.connect().await?;
221     ///
222     /// // Forcibly disconnect the client.
223     /// server.disconnect()?;
224     ///
225     /// // Write fails with an OS-specific error after client has been
226     /// // disconnected.
227     /// let e = client.write(b"ping").await.unwrap_err();
228     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
229     /// # Ok(()) }
230     /// ```
disconnect(&self) -> io::Result<()>231     pub fn disconnect(&self) -> io::Result<()> {
232         self.io.disconnect()
233     }
234 
235     /// Waits for any of the requested ready states.
236     ///
237     /// This function is usually paired with `try_read()` or `try_write()`. It
238     /// can be used to concurrently read / write to the same pipe on a single
239     /// task without splitting the pipe.
240     ///
241     /// The function may complete without the pipe being ready. This is a
242     /// false-positive and attempting an operation will return with
243     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
244     /// [`Ready`] set, so you should always check the returned value and possibly
245     /// wait again if the requested states are not set.
246     ///
247     /// # Examples
248     ///
249     /// Concurrently read and write to the pipe on the same task without
250     /// splitting.
251     ///
252     /// ```no_run
253     /// use tokio::io::Interest;
254     /// use tokio::net::windows::named_pipe;
255     /// use std::error::Error;
256     /// use std::io;
257     ///
258     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
259     ///
260     /// #[tokio::main]
261     /// async fn main() -> Result<(), Box<dyn Error>> {
262     ///     let server = named_pipe::ServerOptions::new()
263     ///         .create(PIPE_NAME)?;
264     ///
265     ///     loop {
266     ///         let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
267     ///
268     ///         if ready.is_readable() {
269     ///             let mut data = vec![0; 1024];
270     ///             // Try to read data, this may still fail with `WouldBlock`
271     ///             // if the readiness event is a false positive.
272     ///             match server.try_read(&mut data) {
273     ///                 Ok(n) => {
274     ///                     println!("read {} bytes", n);
275     ///                 }
276     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
277     ///                     continue;
278     ///                 }
279     ///                 Err(e) => {
280     ///                     return Err(e.into());
281     ///                 }
282     ///             }
283     ///         }
284     ///
285     ///         if ready.is_writable() {
286     ///             // Try to write data, this may still fail with `WouldBlock`
287     ///             // if the readiness event is a false positive.
288     ///             match server.try_write(b"hello world") {
289     ///                 Ok(n) => {
290     ///                     println!("write {} bytes", n);
291     ///                 }
292     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
293     ///                     continue;
294     ///                 }
295     ///                 Err(e) => {
296     ///                     return Err(e.into());
297     ///                 }
298     ///             }
299     ///         }
300     ///     }
301     /// }
302     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>303     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
304         let event = self.io.registration().readiness(interest).await?;
305         Ok(event.ready)
306     }
307 
308     /// Waits for the pipe to become readable.
309     ///
310     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
311     /// paired with `try_read()`.
312     ///
313     /// # Examples
314     ///
315     /// ```no_run
316     /// use tokio::net::windows::named_pipe;
317     /// use std::error::Error;
318     /// use std::io;
319     ///
320     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
321     ///
322     /// #[tokio::main]
323     /// async fn main() -> Result<(), Box<dyn Error>> {
324     ///     let server = named_pipe::ServerOptions::new()
325     ///         .create(PIPE_NAME)?;
326     ///
327     ///     let mut msg = vec![0; 1024];
328     ///
329     ///     loop {
330     ///         // Wait for the pipe to be readable
331     ///         server.readable().await?;
332     ///
333     ///         // Try to read data, this may still fail with `WouldBlock`
334     ///         // if the readiness event is a false positive.
335     ///         match server.try_read(&mut msg) {
336     ///             Ok(n) => {
337     ///                 msg.truncate(n);
338     ///                 break;
339     ///             }
340     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
341     ///                 continue;
342     ///             }
343     ///             Err(e) => {
344     ///                 return Err(e.into());
345     ///             }
346     ///         }
347     ///     }
348     ///
349     ///     println!("GOT = {:?}", msg);
350     ///     Ok(())
351     /// }
352     /// ```
readable(&self) -> io::Result<()>353     pub async fn readable(&self) -> io::Result<()> {
354         self.ready(Interest::READABLE).await?;
355         Ok(())
356     }
357 
358     /// Polls for read readiness.
359     ///
360     /// If the pipe is not currently ready for reading, this method will
361     /// store a clone of the `Waker` from the provided `Context`. When the pipe
362     /// becomes ready for reading, `Waker::wake` will be called on the waker.
363     ///
364     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
365     /// the `Waker` from the `Context` passed to the most recent call is
366     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
367     /// second, independent waker.)
368     ///
369     /// This function is intended for cases where creating and pinning a future
370     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
371     /// preferred, as this supports polling from multiple tasks at once.
372     ///
373     /// # Return value
374     ///
375     /// The function returns:
376     ///
377     /// * `Poll::Pending` if the pipe is not ready for reading.
378     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
379     /// * `Poll::Ready(Err(e))` if an error is encountered.
380     ///
381     /// # Errors
382     ///
383     /// This function may encounter any standard I/O error except `WouldBlock`.
384     ///
385     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>386     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
387         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
388     }
389 
390     /// Tries to read data from the pipe into the provided buffer, returning how
391     /// many bytes were read.
392     ///
393     /// Receives any pending data from the pipe but does not wait for new data
394     /// to arrive. On success, returns the number of bytes read. Because
395     /// `try_read()` is non-blocking, the buffer does not have to be stored by
396     /// the async task and can exist entirely on the stack.
397     ///
398     /// Usually, [`readable()`] or [`ready()`] is used with this function.
399     ///
400     /// [`readable()`]: NamedPipeServer::readable()
401     /// [`ready()`]: NamedPipeServer::ready()
402     ///
403     /// # Return
404     ///
405     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
406     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
407     ///
408     /// 1. The pipe's read half is closed and will no longer yield data.
409     /// 2. The specified buffer was 0 bytes in length.
410     ///
411     /// If the pipe is not ready to read data,
412     /// `Err(io::ErrorKind::WouldBlock)` is returned.
413     ///
414     /// # Examples
415     ///
416     /// ```no_run
417     /// use tokio::net::windows::named_pipe;
418     /// use std::error::Error;
419     /// use std::io;
420     ///
421     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
422     ///
423     /// #[tokio::main]
424     /// async fn main() -> Result<(), Box<dyn Error>> {
425     ///     let server = named_pipe::ServerOptions::new()
426     ///         .create(PIPE_NAME)?;
427     ///
428     ///     loop {
429     ///         // Wait for the pipe to be readable
430     ///         server.readable().await?;
431     ///
432     ///         // Creating the buffer **after** the `await` prevents it from
433     ///         // being stored in the async task.
434     ///         let mut buf = [0; 4096];
435     ///
436     ///         // Try to read data, this may still fail with `WouldBlock`
437     ///         // if the readiness event is a false positive.
438     ///         match server.try_read(&mut buf) {
439     ///             Ok(0) => break,
440     ///             Ok(n) => {
441     ///                 println!("read {} bytes", n);
442     ///             }
443     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
444     ///                 continue;
445     ///             }
446     ///             Err(e) => {
447     ///                 return Err(e.into());
448     ///             }
449     ///         }
450     ///     }
451     ///
452     ///     Ok(())
453     /// }
454     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>455     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
456         self.io
457             .registration()
458             .try_io(Interest::READABLE, || (&*self.io).read(buf))
459     }
460 
461     /// Tries to read data from the pipe into the provided buffers, returning
462     /// how many bytes were read.
463     ///
464     /// Data is copied to fill each buffer in order, with the final buffer
465     /// written to possibly being only partially filled. This method behaves
466     /// equivalently to a single call to [`try_read()`] with concatenated
467     /// buffers.
468     ///
469     /// Receives any pending data from the pipe but does not wait for new data
470     /// to arrive. On success, returns the number of bytes read. Because
471     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
472     /// stored by the async task and can exist entirely on the stack.
473     ///
474     /// Usually, [`readable()`] or [`ready()`] is used with this function.
475     ///
476     /// [`try_read()`]: NamedPipeServer::try_read()
477     /// [`readable()`]: NamedPipeServer::readable()
478     /// [`ready()`]: NamedPipeServer::ready()
479     ///
480     /// # Return
481     ///
482     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
483     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
484     /// and will no longer yield data. If the pipe is not ready to read data
485     /// `Err(io::ErrorKind::WouldBlock)` is returned.
486     ///
487     /// # Examples
488     ///
489     /// ```no_run
490     /// use tokio::net::windows::named_pipe;
491     /// use std::error::Error;
492     /// use std::io::{self, IoSliceMut};
493     ///
494     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
495     ///
496     /// #[tokio::main]
497     /// async fn main() -> Result<(), Box<dyn Error>> {
498     ///     let server = named_pipe::ServerOptions::new()
499     ///         .create(PIPE_NAME)?;
500     ///
501     ///     loop {
502     ///         // Wait for the pipe to be readable
503     ///         server.readable().await?;
504     ///
505     ///         // Creating the buffer **after** the `await` prevents it from
506     ///         // being stored in the async task.
507     ///         let mut buf_a = [0; 512];
508     ///         let mut buf_b = [0; 1024];
509     ///         let mut bufs = [
510     ///             IoSliceMut::new(&mut buf_a),
511     ///             IoSliceMut::new(&mut buf_b),
512     ///         ];
513     ///
514     ///         // Try to read data, this may still fail with `WouldBlock`
515     ///         // if the readiness event is a false positive.
516     ///         match server.try_read_vectored(&mut bufs) {
517     ///             Ok(0) => break,
518     ///             Ok(n) => {
519     ///                 println!("read {} bytes", n);
520     ///             }
521     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
522     ///                 continue;
523     ///             }
524     ///             Err(e) => {
525     ///                 return Err(e.into());
526     ///             }
527     ///         }
528     ///     }
529     ///
530     ///     Ok(())
531     /// }
532     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>533     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
534         self.io
535             .registration()
536             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
537     }
538 
539     cfg_io_util! {
540         /// Tries to read data from the stream into the provided buffer, advancing the
541         /// buffer's internal cursor, returning how many bytes were read.
542         ///
543         /// Receives any pending data from the pipe but does not wait for new data
544         /// to arrive. On success, returns the number of bytes read. Because
545         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
546         /// the async task and can exist entirely on the stack.
547         ///
548         /// Usually, [`readable()`] or [`ready()`] is used with this function.
549         ///
550         /// [`readable()`]: NamedPipeServer::readable()
551         /// [`ready()`]: NamedPipeServer::ready()
552         ///
553         /// # Return
554         ///
555         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
556         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
557         /// and will no longer yield data. If the stream is not ready to read data
558         /// `Err(io::ErrorKind::WouldBlock)` is returned.
559         ///
560         /// # Examples
561         ///
562         /// ```no_run
563         /// use tokio::net::windows::named_pipe;
564         /// use std::error::Error;
565         /// use std::io;
566         ///
567         /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
568         ///
569         /// #[tokio::main]
570         /// async fn main() -> Result<(), Box<dyn Error>> {
571         ///     let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
572         ///
573         ///     loop {
574         ///         // Wait for the pipe to be readable
575         ///         server.readable().await?;
576         ///
577         ///         let mut buf = Vec::with_capacity(4096);
578         ///
579         ///         // Try to read data, this may still fail with `WouldBlock`
580         ///         // if the readiness event is a false positive.
581         ///         match server.try_read_buf(&mut buf) {
582         ///             Ok(0) => break,
583         ///             Ok(n) => {
584         ///                 println!("read {} bytes", n);
585         ///             }
586         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
587         ///                 continue;
588         ///             }
589         ///             Err(e) => {
590         ///                 return Err(e.into());
591         ///             }
592         ///         }
593         ///     }
594         ///
595         ///     Ok(())
596         /// }
597         /// ```
598         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
599             self.io.registration().try_io(Interest::READABLE, || {
600                 use std::io::Read;
601 
602                 let dst = buf.chunk_mut();
603                 let dst =
604                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
605 
606                 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
607                 // buffer.
608                 let n = (&*self.io).read(dst)?;
609 
610                 unsafe {
611                     buf.advance_mut(n);
612                 }
613 
614                 Ok(n)
615             })
616         }
617     }
618 
619     /// Waits for the pipe to become writable.
620     ///
621     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
622     /// paired with `try_write()`.
623     ///
624     /// # Examples
625     ///
626     /// ```no_run
627     /// use tokio::net::windows::named_pipe;
628     /// use std::error::Error;
629     /// use std::io;
630     ///
631     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
632     ///
633     /// #[tokio::main]
634     /// async fn main() -> Result<(), Box<dyn Error>> {
635     ///     let server = named_pipe::ServerOptions::new()
636     ///         .create(PIPE_NAME)?;
637     ///
638     ///     loop {
639     ///         // Wait for the pipe to be writable
640     ///         server.writable().await?;
641     ///
642     ///         // Try to write data, this may still fail with `WouldBlock`
643     ///         // if the readiness event is a false positive.
644     ///         match server.try_write(b"hello world") {
645     ///             Ok(n) => {
646     ///                 break;
647     ///             }
648     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
649     ///                 continue;
650     ///             }
651     ///             Err(e) => {
652     ///                 return Err(e.into());
653     ///             }
654     ///         }
655     ///     }
656     ///
657     ///     Ok(())
658     /// }
659     /// ```
writable(&self) -> io::Result<()>660     pub async fn writable(&self) -> io::Result<()> {
661         self.ready(Interest::WRITABLE).await?;
662         Ok(())
663     }
664 
665     /// Polls for write readiness.
666     ///
667     /// If the pipe is not currently ready for writing, this method will
668     /// store a clone of the `Waker` from the provided `Context`. When the pipe
669     /// becomes ready for writing, `Waker::wake` will be called on the waker.
670     ///
671     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
672     /// the `Waker` from the `Context` passed to the most recent call is
673     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
674     /// second, independent waker.)
675     ///
676     /// This function is intended for cases where creating and pinning a future
677     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
678     /// preferred, as this supports polling from multiple tasks at once.
679     ///
680     /// # Return value
681     ///
682     /// The function returns:
683     ///
684     /// * `Poll::Pending` if the pipe is not ready for writing.
685     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
686     /// * `Poll::Ready(Err(e))` if an error is encountered.
687     ///
688     /// # Errors
689     ///
690     /// This function may encounter any standard I/O error except `WouldBlock`.
691     ///
692     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>693     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
694         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
695     }
696 
697     /// Tries to write a buffer to the pipe, returning how many bytes were
698     /// written.
699     ///
700     /// The function will attempt to write the entire contents of `buf`, but
701     /// only part of the buffer may be written.
702     ///
703     /// This function is usually paired with `writable()`.
704     ///
705     /// # Return
706     ///
707     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
708     /// number of bytes written. If the pipe is not ready to write data,
709     /// `Err(io::ErrorKind::WouldBlock)` is returned.
710     ///
711     /// # Examples
712     ///
713     /// ```no_run
714     /// use tokio::net::windows::named_pipe;
715     /// use std::error::Error;
716     /// use std::io;
717     ///
718     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
719     ///
720     /// #[tokio::main]
721     /// async fn main() -> Result<(), Box<dyn Error>> {
722     ///     let server = named_pipe::ServerOptions::new()
723     ///         .create(PIPE_NAME)?;
724     ///
725     ///     loop {
726     ///         // Wait for the pipe to be writable
727     ///         server.writable().await?;
728     ///
729     ///         // Try to write data, this may still fail with `WouldBlock`
730     ///         // if the readiness event is a false positive.
731     ///         match server.try_write(b"hello world") {
732     ///             Ok(n) => {
733     ///                 break;
734     ///             }
735     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
736     ///                 continue;
737     ///             }
738     ///             Err(e) => {
739     ///                 return Err(e.into());
740     ///             }
741     ///         }
742     ///     }
743     ///
744     ///     Ok(())
745     /// }
746     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>747     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
748         self.io
749             .registration()
750             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
751     }
752 
753     /// Tries to write several buffers to the pipe, returning how many bytes
754     /// were written.
755     ///
756     /// Data is written from each buffer in order, with the final buffer read
757     /// from possible being only partially consumed. This method behaves
758     /// equivalently to a single call to [`try_write()`] with concatenated
759     /// buffers.
760     ///
761     /// This function is usually paired with `writable()`.
762     ///
763     /// [`try_write()`]: NamedPipeServer::try_write()
764     ///
765     /// # Return
766     ///
767     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
768     /// number of bytes written. If the pipe is not ready to write data,
769     /// `Err(io::ErrorKind::WouldBlock)` is returned.
770     ///
771     /// # Examples
772     ///
773     /// ```no_run
774     /// use tokio::net::windows::named_pipe;
775     /// use std::error::Error;
776     /// use std::io;
777     ///
778     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
779     ///
780     /// #[tokio::main]
781     /// async fn main() -> Result<(), Box<dyn Error>> {
782     ///     let server = named_pipe::ServerOptions::new()
783     ///         .create(PIPE_NAME)?;
784     ///
785     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
786     ///
787     ///     loop {
788     ///         // Wait for the pipe to be writable
789     ///         server.writable().await?;
790     ///
791     ///         // Try to write data, this may still fail with `WouldBlock`
792     ///         // if the readiness event is a false positive.
793     ///         match server.try_write_vectored(&bufs) {
794     ///             Ok(n) => {
795     ///                 break;
796     ///             }
797     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
798     ///                 continue;
799     ///             }
800     ///             Err(e) => {
801     ///                 return Err(e.into());
802     ///             }
803     ///         }
804     ///     }
805     ///
806     ///     Ok(())
807     /// }
808     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>809     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
810         self.io
811             .registration()
812             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
813     }
814 
815     /// Tries to read or write from the pipe using a user-provided IO operation.
816     ///
817     /// If the pipe is ready, the provided closure is called. The closure
818     /// should attempt to perform IO operation from the pipe by manually
819     /// calling the appropriate syscall. If the operation fails because the
820     /// pipe is not actually ready, then the closure should return a
821     /// `WouldBlock` error and the readiness flag is cleared. The return value
822     /// of the closure is then returned by `try_io`.
823     ///
824     /// If the pipe is not ready, then the closure is not called
825     /// and a `WouldBlock` error is returned.
826     ///
827     /// The closure should only return a `WouldBlock` error if it has performed
828     /// an IO operation on the pipe that failed due to the pipe not being
829     /// ready. Returning a `WouldBlock` error in any other situation will
830     /// incorrectly clear the readiness flag, which can cause the pipe to
831     /// behave incorrectly.
832     ///
833     /// The closure should not perform the IO operation using any of the
834     /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
835     /// the readiness flag and can cause the pipe to behave incorrectly.
836     ///
837     /// This method is not intended to be used with combined interests.
838     /// The closure should perform only one type of IO operation, so it should not
839     /// require more than one ready state. This method may panic or sleep forever
840     /// if it is called with a combined interest.
841     ///
842     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
843     ///
844     /// [`readable()`]: NamedPipeServer::readable()
845     /// [`writable()`]: NamedPipeServer::writable()
846     /// [`ready()`]: NamedPipeServer::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>847     pub fn try_io<R>(
848         &self,
849         interest: Interest,
850         f: impl FnOnce() -> io::Result<R>,
851     ) -> io::Result<R> {
852         self.io.registration().try_io(interest, f)
853     }
854 }
855 
856 impl AsyncRead for NamedPipeServer {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>857     fn poll_read(
858         self: Pin<&mut Self>,
859         cx: &mut Context<'_>,
860         buf: &mut ReadBuf<'_>,
861     ) -> Poll<io::Result<()>> {
862         unsafe { self.io.poll_read(cx, buf) }
863     }
864 }
865 
866 impl AsyncWrite for NamedPipeServer {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>867     fn poll_write(
868         self: Pin<&mut Self>,
869         cx: &mut Context<'_>,
870         buf: &[u8],
871     ) -> Poll<io::Result<usize>> {
872         self.io.poll_write(cx, buf)
873     }
874 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>875     fn poll_write_vectored(
876         self: Pin<&mut Self>,
877         cx: &mut Context<'_>,
878         bufs: &[io::IoSlice<'_>],
879     ) -> Poll<io::Result<usize>> {
880         self.io.poll_write_vectored(cx, bufs)
881     }
882 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>883     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
884         Poll::Ready(Ok(()))
885     }
886 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>887     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
888         self.poll_flush(cx)
889     }
890 }
891 
892 impl AsRawHandle for NamedPipeServer {
as_raw_handle(&self) -> RawHandle893     fn as_raw_handle(&self) -> RawHandle {
894         self.io.as_raw_handle()
895     }
896 }
897 
898 /// A [Windows named pipe] client.
899 ///
900 /// Constructed using [`ClientOptions::open`].
901 ///
902 /// Connecting a client correctly involves a few steps. When connecting through
903 /// [`ClientOptions::open`], it might error indicating one of two things:
904 ///
905 /// * [`std::io::ErrorKind::NotFound`] - There is no server available.
906 /// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
907 ///   for a while and try again.
908 ///
909 /// So a correctly implemented client looks like this:
910 ///
911 /// ```no_run
912 /// use std::time::Duration;
913 /// use tokio::net::windows::named_pipe::ClientOptions;
914 /// use tokio::time;
915 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
916 ///
917 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
918 ///
919 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
920 /// let client = loop {
921 ///     match ClientOptions::new().open(PIPE_NAME) {
922 ///         Ok(client) => break client,
923 ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
924 ///         Err(e) => return Err(e),
925 ///     }
926 ///
927 ///     time::sleep(Duration::from_millis(50)).await;
928 /// };
929 ///
930 /// /* use the connected client */
931 /// # Ok(()) }
932 /// ```
933 ///
934 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
935 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
936 #[derive(Debug)]
937 pub struct NamedPipeClient {
938     io: PollEvented<mio_windows::NamedPipe>,
939 }
940 
941 impl NamedPipeClient {
942     /// Constructs a new named pipe client from the specified raw handle.
943     ///
944     /// This function will consume ownership of the handle given, passing
945     /// responsibility for closing the handle to the returned object.
946     ///
947     /// This function is also unsafe as the primitives currently returned have
948     /// the contract that they are the sole owner of the file descriptor they
949     /// are wrapping. Usage of this function could accidentally allow violating
950     /// this contract which can cause memory unsafety in code that relies on it
951     /// being true.
952     ///
953     /// # Errors
954     ///
955     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
956     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
957     ///
958     /// [Tokio Runtime]: crate::runtime::Runtime
959     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>960     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
961         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
962 
963         Ok(Self {
964             io: PollEvented::new(named_pipe)?,
965         })
966     }
967 
968     /// Retrieves information about the named pipe the client is associated
969     /// with.
970     ///
971     /// ```no_run
972     /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
973     ///
974     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
975     ///
976     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
977     /// let client = ClientOptions::new()
978     ///     .open(PIPE_NAME)?;
979     ///
980     /// let client_info = client.info()?;
981     ///
982     /// assert_eq!(client_info.end, PipeEnd::Client);
983     /// assert_eq!(client_info.mode, PipeMode::Message);
984     /// assert_eq!(client_info.max_instances, 5);
985     /// # Ok(()) }
986     /// ```
info(&self) -> io::Result<PipeInfo>987     pub fn info(&self) -> io::Result<PipeInfo> {
988         // Safety: we're ensuring the lifetime of the named pipe.
989         unsafe { named_pipe_info(self.io.as_raw_handle()) }
990     }
991 
992     /// Waits for any of the requested ready states.
993     ///
994     /// This function is usually paired with `try_read()` or `try_write()`. It
995     /// can be used to concurrently read / write to the same pipe on a single
996     /// task without splitting the pipe.
997     ///
998     /// The function may complete without the pipe being ready. This is a
999     /// false-positive and attempting an operation will return with
1000     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1001     /// [`Ready`] set, so you should always check the returned value and possibly
1002     /// wait again if the requested states are not set.
1003     ///
1004     /// # Examples
1005     ///
1006     /// Concurrently read and write to the pipe on the same task without
1007     /// splitting.
1008     ///
1009     /// ```no_run
1010     /// use tokio::io::Interest;
1011     /// use tokio::net::windows::named_pipe;
1012     /// use std::error::Error;
1013     /// use std::io;
1014     ///
1015     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1016     ///
1017     /// #[tokio::main]
1018     /// async fn main() -> Result<(), Box<dyn Error>> {
1019     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1020     ///
1021     ///     loop {
1022     ///         let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1023     ///
1024     ///         if ready.is_readable() {
1025     ///             let mut data = vec![0; 1024];
1026     ///             // Try to read data, this may still fail with `WouldBlock`
1027     ///             // if the readiness event is a false positive.
1028     ///             match client.try_read(&mut data) {
1029     ///                 Ok(n) => {
1030     ///                     println!("read {} bytes", n);
1031     ///                 }
1032     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1033     ///                     continue;
1034     ///                 }
1035     ///                 Err(e) => {
1036     ///                     return Err(e.into());
1037     ///                 }
1038     ///             }
1039     ///         }
1040     ///
1041     ///         if ready.is_writable() {
1042     ///             // Try to write data, this may still fail with `WouldBlock`
1043     ///             // if the readiness event is a false positive.
1044     ///             match client.try_write(b"hello world") {
1045     ///                 Ok(n) => {
1046     ///                     println!("write {} bytes", n);
1047     ///                 }
1048     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1049     ///                     continue;
1050     ///                 }
1051     ///                 Err(e) => {
1052     ///                     return Err(e.into());
1053     ///                 }
1054     ///             }
1055     ///         }
1056     ///     }
1057     /// }
1058     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>1059     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1060         let event = self.io.registration().readiness(interest).await?;
1061         Ok(event.ready)
1062     }
1063 
1064     /// Waits for the pipe to become readable.
1065     ///
1066     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1067     /// paired with `try_read()`.
1068     ///
1069     /// # Examples
1070     ///
1071     /// ```no_run
1072     /// use tokio::net::windows::named_pipe;
1073     /// use std::error::Error;
1074     /// use std::io;
1075     ///
1076     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1077     ///
1078     /// #[tokio::main]
1079     /// async fn main() -> Result<(), Box<dyn Error>> {
1080     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1081     ///
1082     ///     let mut msg = vec![0; 1024];
1083     ///
1084     ///     loop {
1085     ///         // Wait for the pipe to be readable
1086     ///         client.readable().await?;
1087     ///
1088     ///         // Try to read data, this may still fail with `WouldBlock`
1089     ///         // if the readiness event is a false positive.
1090     ///         match client.try_read(&mut msg) {
1091     ///             Ok(n) => {
1092     ///                 msg.truncate(n);
1093     ///                 break;
1094     ///             }
1095     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1096     ///                 continue;
1097     ///             }
1098     ///             Err(e) => {
1099     ///                 return Err(e.into());
1100     ///             }
1101     ///         }
1102     ///     }
1103     ///
1104     ///     println!("GOT = {:?}", msg);
1105     ///     Ok(())
1106     /// }
1107     /// ```
readable(&self) -> io::Result<()>1108     pub async fn readable(&self) -> io::Result<()> {
1109         self.ready(Interest::READABLE).await?;
1110         Ok(())
1111     }
1112 
1113     /// Polls for read readiness.
1114     ///
1115     /// If the pipe is not currently ready for reading, this method will
1116     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1117     /// becomes ready for reading, `Waker::wake` will be called on the waker.
1118     ///
1119     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1120     /// the `Waker` from the `Context` passed to the most recent call is
1121     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1122     /// second, independent waker.)
1123     ///
1124     /// This function is intended for cases where creating and pinning a future
1125     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1126     /// preferred, as this supports polling from multiple tasks at once.
1127     ///
1128     /// # Return value
1129     ///
1130     /// The function returns:
1131     ///
1132     /// * `Poll::Pending` if the pipe is not ready for reading.
1133     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1134     /// * `Poll::Ready(Err(e))` if an error is encountered.
1135     ///
1136     /// # Errors
1137     ///
1138     /// This function may encounter any standard I/O error except `WouldBlock`.
1139     ///
1140     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1141     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1142         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1143     }
1144 
1145     /// Tries to read data from the pipe into the provided buffer, returning how
1146     /// many bytes were read.
1147     ///
1148     /// Receives any pending data from the pipe but does not wait for new data
1149     /// to arrive. On success, returns the number of bytes read. Because
1150     /// `try_read()` is non-blocking, the buffer does not have to be stored by
1151     /// the async task and can exist entirely on the stack.
1152     ///
1153     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1154     ///
1155     /// [`readable()`]: NamedPipeClient::readable()
1156     /// [`ready()`]: NamedPipeClient::ready()
1157     ///
1158     /// # Return
1159     ///
1160     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1161     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1162     ///
1163     /// 1. The pipe's read half is closed and will no longer yield data.
1164     /// 2. The specified buffer was 0 bytes in length.
1165     ///
1166     /// If the pipe is not ready to read data,
1167     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1168     ///
1169     /// # Examples
1170     ///
1171     /// ```no_run
1172     /// use tokio::net::windows::named_pipe;
1173     /// use std::error::Error;
1174     /// use std::io;
1175     ///
1176     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1177     ///
1178     /// #[tokio::main]
1179     /// async fn main() -> Result<(), Box<dyn Error>> {
1180     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1181     ///
1182     ///     loop {
1183     ///         // Wait for the pipe to be readable
1184     ///         client.readable().await?;
1185     ///
1186     ///         // Creating the buffer **after** the `await` prevents it from
1187     ///         // being stored in the async task.
1188     ///         let mut buf = [0; 4096];
1189     ///
1190     ///         // Try to read data, this may still fail with `WouldBlock`
1191     ///         // if the readiness event is a false positive.
1192     ///         match client.try_read(&mut buf) {
1193     ///             Ok(0) => break,
1194     ///             Ok(n) => {
1195     ///                 println!("read {} bytes", n);
1196     ///             }
1197     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1198     ///                 continue;
1199     ///             }
1200     ///             Err(e) => {
1201     ///                 return Err(e.into());
1202     ///             }
1203     ///         }
1204     ///     }
1205     ///
1206     ///     Ok(())
1207     /// }
1208     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1209     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1210         self.io
1211             .registration()
1212             .try_io(Interest::READABLE, || (&*self.io).read(buf))
1213     }
1214 
1215     /// Tries to read data from the pipe into the provided buffers, returning
1216     /// how many bytes were read.
1217     ///
1218     /// Data is copied to fill each buffer in order, with the final buffer
1219     /// written to possibly being only partially filled. This method behaves
1220     /// equivalently to a single call to [`try_read()`] with concatenated
1221     /// buffers.
1222     ///
1223     /// Receives any pending data from the pipe but does not wait for new data
1224     /// to arrive. On success, returns the number of bytes read. Because
1225     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1226     /// stored by the async task and can exist entirely on the stack.
1227     ///
1228     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1229     ///
1230     /// [`try_read()`]: NamedPipeClient::try_read()
1231     /// [`readable()`]: NamedPipeClient::readable()
1232     /// [`ready()`]: NamedPipeClient::ready()
1233     ///
1234     /// # Return
1235     ///
1236     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1237     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1238     /// and will no longer yield data. If the pipe is not ready to read data
1239     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1240     ///
1241     /// # Examples
1242     ///
1243     /// ```no_run
1244     /// use tokio::net::windows::named_pipe;
1245     /// use std::error::Error;
1246     /// use std::io::{self, IoSliceMut};
1247     ///
1248     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1249     ///
1250     /// #[tokio::main]
1251     /// async fn main() -> Result<(), Box<dyn Error>> {
1252     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1253     ///
1254     ///     loop {
1255     ///         // Wait for the pipe to be readable
1256     ///         client.readable().await?;
1257     ///
1258     ///         // Creating the buffer **after** the `await` prevents it from
1259     ///         // being stored in the async task.
1260     ///         let mut buf_a = [0; 512];
1261     ///         let mut buf_b = [0; 1024];
1262     ///         let mut bufs = [
1263     ///             IoSliceMut::new(&mut buf_a),
1264     ///             IoSliceMut::new(&mut buf_b),
1265     ///         ];
1266     ///
1267     ///         // Try to read data, this may still fail with `WouldBlock`
1268     ///         // if the readiness event is a false positive.
1269     ///         match client.try_read_vectored(&mut bufs) {
1270     ///             Ok(0) => break,
1271     ///             Ok(n) => {
1272     ///                 println!("read {} bytes", n);
1273     ///             }
1274     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1275     ///                 continue;
1276     ///             }
1277     ///             Err(e) => {
1278     ///                 return Err(e.into());
1279     ///             }
1280     ///         }
1281     ///     }
1282     ///
1283     ///     Ok(())
1284     /// }
1285     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1286     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1287         self.io
1288             .registration()
1289             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1290     }
1291 
1292     cfg_io_util! {
1293         /// Tries to read data from the stream into the provided buffer, advancing the
1294         /// buffer's internal cursor, returning how many bytes were read.
1295         ///
1296         /// Receives any pending data from the pipe but does not wait for new data
1297         /// to arrive. On success, returns the number of bytes read. Because
1298         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1299         /// the async task and can exist entirely on the stack.
1300         ///
1301         /// Usually, [`readable()`] or [`ready()`] is used with this function.
1302         ///
1303         /// [`readable()`]: NamedPipeClient::readable()
1304         /// [`ready()`]: NamedPipeClient::ready()
1305         ///
1306         /// # Return
1307         ///
1308         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1309         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1310         /// and will no longer yield data. If the stream is not ready to read data
1311         /// `Err(io::ErrorKind::WouldBlock)` is returned.
1312         ///
1313         /// # Examples
1314         ///
1315         /// ```no_run
1316         /// use tokio::net::windows::named_pipe;
1317         /// use std::error::Error;
1318         /// use std::io;
1319         ///
1320         /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1321         ///
1322         /// #[tokio::main]
1323         /// async fn main() -> Result<(), Box<dyn Error>> {
1324         ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1325         ///
1326         ///     loop {
1327         ///         // Wait for the pipe to be readable
1328         ///         client.readable().await?;
1329         ///
1330         ///         let mut buf = Vec::with_capacity(4096);
1331         ///
1332         ///         // Try to read data, this may still fail with `WouldBlock`
1333         ///         // if the readiness event is a false positive.
1334         ///         match client.try_read_buf(&mut buf) {
1335         ///             Ok(0) => break,
1336         ///             Ok(n) => {
1337         ///                 println!("read {} bytes", n);
1338         ///             }
1339         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1340         ///                 continue;
1341         ///             }
1342         ///             Err(e) => {
1343         ///                 return Err(e.into());
1344         ///             }
1345         ///         }
1346         ///     }
1347         ///
1348         ///     Ok(())
1349         /// }
1350         /// ```
1351         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1352             self.io.registration().try_io(Interest::READABLE, || {
1353                 use std::io::Read;
1354 
1355                 let dst = buf.chunk_mut();
1356                 let dst =
1357                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1358 
1359                 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1360                 // buffer.
1361                 let n = (&*self.io).read(dst)?;
1362 
1363                 unsafe {
1364                     buf.advance_mut(n);
1365                 }
1366 
1367                 Ok(n)
1368             })
1369         }
1370     }
1371 
1372     /// Waits for the pipe to become writable.
1373     ///
1374     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1375     /// paired with `try_write()`.
1376     ///
1377     /// # Examples
1378     ///
1379     /// ```no_run
1380     /// use tokio::net::windows::named_pipe;
1381     /// use std::error::Error;
1382     /// use std::io;
1383     ///
1384     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1385     ///
1386     /// #[tokio::main]
1387     /// async fn main() -> Result<(), Box<dyn Error>> {
1388     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1389     ///
1390     ///     loop {
1391     ///         // Wait for the pipe to be writable
1392     ///         client.writable().await?;
1393     ///
1394     ///         // Try to write data, this may still fail with `WouldBlock`
1395     ///         // if the readiness event is a false positive.
1396     ///         match client.try_write(b"hello world") {
1397     ///             Ok(n) => {
1398     ///                 break;
1399     ///             }
1400     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1401     ///                 continue;
1402     ///             }
1403     ///             Err(e) => {
1404     ///                 return Err(e.into());
1405     ///             }
1406     ///         }
1407     ///     }
1408     ///
1409     ///     Ok(())
1410     /// }
1411     /// ```
writable(&self) -> io::Result<()>1412     pub async fn writable(&self) -> io::Result<()> {
1413         self.ready(Interest::WRITABLE).await?;
1414         Ok(())
1415     }
1416 
1417     /// Polls for write readiness.
1418     ///
1419     /// If the pipe is not currently ready for writing, this method will
1420     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1421     /// becomes ready for writing, `Waker::wake` will be called on the waker.
1422     ///
1423     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1424     /// the `Waker` from the `Context` passed to the most recent call is
1425     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1426     /// second, independent waker.)
1427     ///
1428     /// This function is intended for cases where creating and pinning a future
1429     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1430     /// preferred, as this supports polling from multiple tasks at once.
1431     ///
1432     /// # Return value
1433     ///
1434     /// The function returns:
1435     ///
1436     /// * `Poll::Pending` if the pipe is not ready for writing.
1437     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1438     /// * `Poll::Ready(Err(e))` if an error is encountered.
1439     ///
1440     /// # Errors
1441     ///
1442     /// This function may encounter any standard I/O error except `WouldBlock`.
1443     ///
1444     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1445     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1446         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1447     }
1448 
1449     /// Tries to write a buffer to the pipe, returning how many bytes were
1450     /// written.
1451     ///
1452     /// The function will attempt to write the entire contents of `buf`, but
1453     /// only part of the buffer may be written.
1454     ///
1455     /// This function is usually paired with `writable()`.
1456     ///
1457     /// # Return
1458     ///
1459     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1460     /// number of bytes written. If the pipe is not ready to write data,
1461     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1462     ///
1463     /// # Examples
1464     ///
1465     /// ```no_run
1466     /// use tokio::net::windows::named_pipe;
1467     /// use std::error::Error;
1468     /// use std::io;
1469     ///
1470     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1471     ///
1472     /// #[tokio::main]
1473     /// async fn main() -> Result<(), Box<dyn Error>> {
1474     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1475     ///
1476     ///     loop {
1477     ///         // Wait for the pipe to be writable
1478     ///         client.writable().await?;
1479     ///
1480     ///         // Try to write data, this may still fail with `WouldBlock`
1481     ///         // if the readiness event is a false positive.
1482     ///         match client.try_write(b"hello world") {
1483     ///             Ok(n) => {
1484     ///                 break;
1485     ///             }
1486     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1487     ///                 continue;
1488     ///             }
1489     ///             Err(e) => {
1490     ///                 return Err(e.into());
1491     ///             }
1492     ///         }
1493     ///     }
1494     ///
1495     ///     Ok(())
1496     /// }
1497     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>1498     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1499         self.io
1500             .registration()
1501             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1502     }
1503 
1504     /// Tries to write several buffers to the pipe, returning how many bytes
1505     /// were written.
1506     ///
1507     /// Data is written from each buffer in order, with the final buffer read
1508     /// from possible being only partially consumed. This method behaves
1509     /// equivalently to a single call to [`try_write()`] with concatenated
1510     /// buffers.
1511     ///
1512     /// This function is usually paired with `writable()`.
1513     ///
1514     /// [`try_write()`]: NamedPipeClient::try_write()
1515     ///
1516     /// # Return
1517     ///
1518     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1519     /// number of bytes written. If the pipe is not ready to write data,
1520     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1521     ///
1522     /// # Examples
1523     ///
1524     /// ```no_run
1525     /// use tokio::net::windows::named_pipe;
1526     /// use std::error::Error;
1527     /// use std::io;
1528     ///
1529     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1530     ///
1531     /// #[tokio::main]
1532     /// async fn main() -> Result<(), Box<dyn Error>> {
1533     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1534     ///
1535     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1536     ///
1537     ///     loop {
1538     ///         // Wait for the pipe to be writable
1539     ///         client.writable().await?;
1540     ///
1541     ///         // Try to write data, this may still fail with `WouldBlock`
1542     ///         // if the readiness event is a false positive.
1543     ///         match client.try_write_vectored(&bufs) {
1544     ///             Ok(n) => {
1545     ///                 break;
1546     ///             }
1547     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1548     ///                 continue;
1549     ///             }
1550     ///             Err(e) => {
1551     ///                 return Err(e.into());
1552     ///             }
1553     ///         }
1554     ///     }
1555     ///
1556     ///     Ok(())
1557     /// }
1558     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>1559     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1560         self.io
1561             .registration()
1562             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1563     }
1564 
1565     /// Tries to read or write from the pipe using a user-provided IO operation.
1566     ///
1567     /// If the pipe is ready, the provided closure is called. The closure
1568     /// should attempt to perform IO operation from the pipe by manually
1569     /// calling the appropriate syscall. If the operation fails because the
1570     /// pipe is not actually ready, then the closure should return a
1571     /// `WouldBlock` error and the readiness flag is cleared. The return value
1572     /// of the closure is then returned by `try_io`.
1573     ///
1574     /// If the pipe is not ready, then the closure is not called
1575     /// and a `WouldBlock` error is returned.
1576     ///
1577     /// The closure should only return a `WouldBlock` error if it has performed
1578     /// an IO operation on the pipe that failed due to the pipe not being
1579     /// ready. Returning a `WouldBlock` error in any other situation will
1580     /// incorrectly clear the readiness flag, which can cause the pipe to
1581     /// behave incorrectly.
1582     ///
1583     /// The closure should not perform the IO operation using any of the methods
1584     /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1585     /// readiness flag and can cause the pipe to behave incorrectly.
1586     ///
1587     /// This method is not intended to be used with combined interests.
1588     /// The closure should perform only one type of IO operation, so it should not
1589     /// require more than one ready state. This method may panic or sleep forever
1590     /// if it is called with a combined interest.
1591     ///
1592     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1593     ///
1594     /// [`readable()`]: NamedPipeClient::readable()
1595     /// [`writable()`]: NamedPipeClient::writable()
1596     /// [`ready()`]: NamedPipeClient::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1597     pub fn try_io<R>(
1598         &self,
1599         interest: Interest,
1600         f: impl FnOnce() -> io::Result<R>,
1601     ) -> io::Result<R> {
1602         self.io.registration().try_io(interest, f)
1603     }
1604 }
1605 
1606 impl AsyncRead for NamedPipeClient {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1607     fn poll_read(
1608         self: Pin<&mut Self>,
1609         cx: &mut Context<'_>,
1610         buf: &mut ReadBuf<'_>,
1611     ) -> Poll<io::Result<()>> {
1612         unsafe { self.io.poll_read(cx, buf) }
1613     }
1614 }
1615 
1616 impl AsyncWrite for NamedPipeClient {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1617     fn poll_write(
1618         self: Pin<&mut Self>,
1619         cx: &mut Context<'_>,
1620         buf: &[u8],
1621     ) -> Poll<io::Result<usize>> {
1622         self.io.poll_write(cx, buf)
1623     }
1624 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1625     fn poll_write_vectored(
1626         self: Pin<&mut Self>,
1627         cx: &mut Context<'_>,
1628         bufs: &[io::IoSlice<'_>],
1629     ) -> Poll<io::Result<usize>> {
1630         self.io.poll_write_vectored(cx, bufs)
1631     }
1632 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>1633     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1634         Poll::Ready(Ok(()))
1635     }
1636 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1637     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1638         self.poll_flush(cx)
1639     }
1640 }
1641 
1642 impl AsRawHandle for NamedPipeClient {
as_raw_handle(&self) -> RawHandle1643     fn as_raw_handle(&self) -> RawHandle {
1644         self.io.as_raw_handle()
1645     }
1646 }
1647 
1648 // Helper to set a boolean flag as a bitfield.
1649 macro_rules! bool_flag {
1650     ($f:expr, $t:expr, $flag:expr) => {{
1651         let current = $f;
1652 
1653         if $t {
1654             $f = current | $flag;
1655         } else {
1656             $f = current & !$flag;
1657         };
1658     }};
1659 }
1660 
1661 /// A builder structure for construct a named pipe with named pipe-specific
1662 /// options. This is required to use for named pipe servers who wants to modify
1663 /// pipe-related options.
1664 ///
1665 /// See [`ServerOptions::create`].
1666 #[derive(Debug, Clone)]
1667 pub struct ServerOptions {
1668     open_mode: u32,
1669     pipe_mode: u32,
1670     max_instances: u32,
1671     out_buffer_size: u32,
1672     in_buffer_size: u32,
1673     default_timeout: u32,
1674 }
1675 
1676 impl ServerOptions {
1677     /// Creates a new named pipe builder with the default settings.
1678     ///
1679     /// ```
1680     /// use tokio::net::windows::named_pipe::ServerOptions;
1681     ///
1682     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1683     ///
1684     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1685     /// let server = ServerOptions::new().create(PIPE_NAME)?;
1686     /// # Ok(()) }
1687     /// ```
new() -> ServerOptions1688     pub fn new() -> ServerOptions {
1689         ServerOptions {
1690             open_mode: windows_sys::PIPE_ACCESS_DUPLEX | windows_sys::FILE_FLAG_OVERLAPPED,
1691             pipe_mode: windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_REJECT_REMOTE_CLIENTS,
1692             max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1693             out_buffer_size: 65536,
1694             in_buffer_size: 65536,
1695             default_timeout: 0,
1696         }
1697     }
1698 
1699     /// The pipe mode.
1700     ///
1701     /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1702     /// documentation of what each mode means.
1703     ///
1704     /// This corresponding to specifying [`dwPipeMode`].
1705     ///
1706     /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self1707     pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1708         let is_msg = matches!(pipe_mode, PipeMode::Message);
1709         // Pipe mode is implemented as a bit flag 0x4. Set is message and unset
1710         // is byte.
1711         bool_flag!(self.pipe_mode, is_msg, windows_sys::PIPE_TYPE_MESSAGE);
1712         self
1713     }
1714 
1715     /// The flow of data in the pipe goes from client to server only.
1716     ///
1717     /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1718     ///
1719     /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1720     ///
1721     /// # Errors
1722     ///
1723     /// Server side prevents connecting by denying inbound access, client errors
1724     /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1725     /// the connection.
1726     ///
1727     /// ```
1728     /// use std::io;
1729     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1730     ///
1731     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1732     ///
1733     /// # #[tokio::main] async fn main() -> io::Result<()> {
1734     /// let _server = ServerOptions::new()
1735     ///     .access_inbound(false)
1736     ///     .create(PIPE_NAME)?;
1737     ///
1738     /// let e = ClientOptions::new()
1739     ///     .open(PIPE_NAME)
1740     ///     .unwrap_err();
1741     ///
1742     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1743     /// # Ok(()) }
1744     /// ```
1745     ///
1746     /// Disabling writing allows a client to connect, but errors with
1747     /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1748     ///
1749     /// ```
1750     /// use std::io;
1751     /// use tokio::io::AsyncWriteExt;
1752     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1753     ///
1754     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1755     ///
1756     /// # #[tokio::main] async fn main() -> io::Result<()> {
1757     /// let server = ServerOptions::new()
1758     ///     .access_inbound(false)
1759     ///     .create(PIPE_NAME)?;
1760     ///
1761     /// let mut client = ClientOptions::new()
1762     ///     .write(false)
1763     ///     .open(PIPE_NAME)?;
1764     ///
1765     /// server.connect().await?;
1766     ///
1767     /// let e = client.write(b"ping").await.unwrap_err();
1768     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1769     /// # Ok(()) }
1770     /// ```
1771     ///
1772     /// # Examples
1773     ///
1774     /// A unidirectional named pipe that only supports server-to-client
1775     /// communication.
1776     ///
1777     /// ```
1778     /// use std::io;
1779     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1780     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1781     ///
1782     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1783     ///
1784     /// # #[tokio::main] async fn main() -> io::Result<()> {
1785     /// let mut server = ServerOptions::new()
1786     ///     .access_inbound(false)
1787     ///     .create(PIPE_NAME)?;
1788     ///
1789     /// let mut client = ClientOptions::new()
1790     ///     .write(false)
1791     ///     .open(PIPE_NAME)?;
1792     ///
1793     /// server.connect().await?;
1794     ///
1795     /// let write = server.write_all(b"ping");
1796     ///
1797     /// let mut buf = [0u8; 4];
1798     /// let read = client.read_exact(&mut buf);
1799     ///
1800     /// let ((), read) = tokio::try_join!(write, read)?;
1801     ///
1802     /// assert_eq!(read, 4);
1803     /// assert_eq!(&buf[..], b"ping");
1804     /// # Ok(()) }
1805     /// ```
access_inbound(&mut self, allowed: bool) -> &mut Self1806     pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1807         bool_flag!(self.open_mode, allowed, windows_sys::PIPE_ACCESS_INBOUND);
1808         self
1809     }
1810 
1811     /// The flow of data in the pipe goes from server to client only.
1812     ///
1813     /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1814     ///
1815     /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1816     ///
1817     /// # Errors
1818     ///
1819     /// Server side prevents connecting by denying outbound access, client
1820     /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1821     /// create the connection.
1822     ///
1823     /// ```
1824     /// use std::io;
1825     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1826     ///
1827     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1828     ///
1829     /// # #[tokio::main] async fn main() -> io::Result<()> {
1830     /// let server = ServerOptions::new()
1831     ///     .access_outbound(false)
1832     ///     .create(PIPE_NAME)?;
1833     ///
1834     /// let e = ClientOptions::new()
1835     ///     .open(PIPE_NAME)
1836     ///     .unwrap_err();
1837     ///
1838     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1839     /// # Ok(()) }
1840     /// ```
1841     ///
1842     /// Disabling reading allows a client to connect, but attempting to read
1843     /// will error with [`std::io::ErrorKind::PermissionDenied`].
1844     ///
1845     /// ```
1846     /// use std::io;
1847     /// use tokio::io::AsyncReadExt;
1848     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1849     ///
1850     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1851     ///
1852     /// # #[tokio::main] async fn main() -> io::Result<()> {
1853     /// let server = ServerOptions::new()
1854     ///     .access_outbound(false)
1855     ///     .create(PIPE_NAME)?;
1856     ///
1857     /// let mut client = ClientOptions::new()
1858     ///     .read(false)
1859     ///     .open(PIPE_NAME)?;
1860     ///
1861     /// server.connect().await?;
1862     ///
1863     /// let mut buf = [0u8; 4];
1864     /// let e = client.read(&mut buf).await.unwrap_err();
1865     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1866     /// # Ok(()) }
1867     /// ```
1868     ///
1869     /// # Examples
1870     ///
1871     /// A unidirectional named pipe that only supports client-to-server
1872     /// communication.
1873     ///
1874     /// ```
1875     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1876     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1877     ///
1878     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1879     ///
1880     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1881     /// let mut server = ServerOptions::new()
1882     ///     .access_outbound(false)
1883     ///     .create(PIPE_NAME)?;
1884     ///
1885     /// let mut client = ClientOptions::new()
1886     ///     .read(false)
1887     ///     .open(PIPE_NAME)?;
1888     ///
1889     /// server.connect().await?;
1890     ///
1891     /// let write = client.write_all(b"ping");
1892     ///
1893     /// let mut buf = [0u8; 4];
1894     /// let read = server.read_exact(&mut buf);
1895     ///
1896     /// let ((), read) = tokio::try_join!(write, read)?;
1897     ///
1898     /// println!("done reading and writing");
1899     ///
1900     /// assert_eq!(read, 4);
1901     /// assert_eq!(&buf[..], b"ping");
1902     /// # Ok(()) }
1903     /// ```
access_outbound(&mut self, allowed: bool) -> &mut Self1904     pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1905         bool_flag!(self.open_mode, allowed, windows_sys::PIPE_ACCESS_OUTBOUND);
1906         self
1907     }
1908 
1909     /// If you attempt to create multiple instances of a pipe with this flag
1910     /// set, creation of the first server instance succeeds, but creation of any
1911     /// subsequent instances will fail with
1912     /// [`std::io::ErrorKind::PermissionDenied`].
1913     ///
1914     /// This option is intended to be used with servers that want to ensure that
1915     /// they are the only process listening for clients on a given named pipe.
1916     /// This is accomplished by enabling it for the first server instance
1917     /// created in a process.
1918     ///
1919     /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1920     ///
1921     /// # Errors
1922     ///
1923     /// If this option is set and more than one instance of the server for a
1924     /// given named pipe exists, calling [`create`] will fail with
1925     /// [`std::io::ErrorKind::PermissionDenied`].
1926     ///
1927     /// ```
1928     /// use std::io;
1929     /// use tokio::net::windows::named_pipe::ServerOptions;
1930     ///
1931     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
1932     ///
1933     /// # #[tokio::main] async fn main() -> io::Result<()> {
1934     /// let server1 = ServerOptions::new()
1935     ///     .first_pipe_instance(true)
1936     ///     .create(PIPE_NAME)?;
1937     ///
1938     /// // Second server errs, since it's not the first instance.
1939     /// let e = ServerOptions::new()
1940     ///     .first_pipe_instance(true)
1941     ///     .create(PIPE_NAME)
1942     ///     .unwrap_err();
1943     ///
1944     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1945     /// # Ok(()) }
1946     /// ```
1947     ///
1948     /// # Examples
1949     ///
1950     /// ```
1951     /// use std::io;
1952     /// use tokio::net::windows::named_pipe::ServerOptions;
1953     ///
1954     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
1955     ///
1956     /// # #[tokio::main] async fn main() -> io::Result<()> {
1957     /// let mut builder = ServerOptions::new();
1958     /// builder.first_pipe_instance(true);
1959     ///
1960     /// let server = builder.create(PIPE_NAME)?;
1961     /// let e = builder.create(PIPE_NAME).unwrap_err();
1962     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1963     /// drop(server);
1964     ///
1965     /// // OK: since, we've closed the other instance.
1966     /// let _server2 = builder.create(PIPE_NAME)?;
1967     /// # Ok(()) }
1968     /// ```
1969     ///
1970     /// [`create`]: ServerOptions::create
1971     /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
first_pipe_instance(&mut self, first: bool) -> &mut Self1972     pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
1973         bool_flag!(
1974             self.open_mode,
1975             first,
1976             windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE
1977         );
1978         self
1979     }
1980 
1981     /// Requests permission to modify the pipe's discretionary access control list.
1982     ///
1983     /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
1984     ///
1985     /// # Examples
1986     ///
1987     /// ```
1988     /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
1989     //
1990     /// use tokio::net::windows::named_pipe::ServerOptions;
1991     /// use windows_sys::{
1992     ///     Win32::Foundation::ERROR_SUCCESS,
1993     ///     Win32::Security::DACL_SECURITY_INFORMATION,
1994     ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
1995     /// };
1996     ///
1997     /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
1998     ///
1999     /// # #[tokio::main] async fn main() -> io::Result<()> {
2000     /// let mut pipe_template = ServerOptions::new();
2001     /// pipe_template.write_dac(true);
2002     /// let pipe = pipe_template.create(PIPE_NAME)?;
2003     ///
2004     /// unsafe {
2005     ///     assert_eq!(
2006     ///         ERROR_SUCCESS,
2007     ///         SetSecurityInfo(
2008     ///             pipe.as_raw_handle() as _,
2009     ///             SE_KERNEL_OBJECT,
2010     ///             DACL_SECURITY_INFORMATION,
2011     ///             ptr::null_mut(),
2012     ///             ptr::null_mut(),
2013     ///             ptr::null_mut(),
2014     ///             ptr::null_mut(),
2015     ///         )
2016     ///     );
2017     /// }
2018     ///
2019     /// # Ok(()) }
2020     /// ```
2021     ///
2022     /// ```
2023     /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2024     //
2025     /// use tokio::net::windows::named_pipe::ServerOptions;
2026     /// use windows_sys::{
2027     ///     Win32::Foundation::ERROR_ACCESS_DENIED,
2028     ///     Win32::Security::DACL_SECURITY_INFORMATION,
2029     ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2030     /// };
2031     ///
2032     /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2033     ///
2034     /// # #[tokio::main] async fn main() -> io::Result<()> {
2035     /// let mut pipe_template = ServerOptions::new();
2036     /// pipe_template.write_dac(false);
2037     /// let pipe = pipe_template.create(PIPE_NAME)?;
2038     ///
2039     /// unsafe {
2040     ///     assert_eq!(
2041     ///         ERROR_ACCESS_DENIED,
2042     ///         SetSecurityInfo(
2043     ///             pipe.as_raw_handle() as _,
2044     ///             SE_KERNEL_OBJECT,
2045     ///             DACL_SECURITY_INFORMATION,
2046     ///             ptr::null_mut(),
2047     ///             ptr::null_mut(),
2048     ///             ptr::null_mut(),
2049     ///             ptr::null_mut(),
2050     ///         )
2051     ///     );
2052     /// }
2053     ///
2054     /// # Ok(()) }
2055     /// ```
2056     ///
2057     /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_dac(&mut self, requested: bool) -> &mut Self2058     pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2059         bool_flag!(self.open_mode, requested, windows_sys::WRITE_DAC);
2060         self
2061     }
2062 
2063     /// Requests permission to modify the pipe's owner.
2064     ///
2065     /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2066     ///
2067     /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_owner(&mut self, requested: bool) -> &mut Self2068     pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2069         bool_flag!(self.open_mode, requested, windows_sys::WRITE_OWNER);
2070         self
2071     }
2072 
2073     /// Requests permission to modify the pipe's system access control list.
2074     ///
2075     /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2076     ///
2077     /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
access_system_security(&mut self, requested: bool) -> &mut Self2078     pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2079         bool_flag!(
2080             self.open_mode,
2081             requested,
2082             windows_sys::ACCESS_SYSTEM_SECURITY
2083         );
2084         self
2085     }
2086 
2087     /// Indicates whether this server can accept remote clients or not. Remote
2088     /// clients are disabled by default.
2089     ///
2090     /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2091     ///
2092     /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
reject_remote_clients(&mut self, reject: bool) -> &mut Self2093     pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2094         bool_flag!(
2095             self.pipe_mode,
2096             reject,
2097             windows_sys::PIPE_REJECT_REMOTE_CLIENTS
2098         );
2099         self
2100     }
2101 
2102     /// The maximum number of instances that can be created for this pipe. The
2103     /// first instance of the pipe can specify this value; the same number must
2104     /// be specified for other instances of the pipe. Acceptable values are in
2105     /// the range 1 through 254. The default value is unlimited.
2106     ///
2107     /// This corresponds to specifying [`nMaxInstances`].
2108     ///
2109     /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2110     ///
2111     /// # Errors
2112     ///
2113     /// The same numbers of `max_instances` have to be used by all servers. Any
2114     /// additional servers trying to be built which uses a mismatching value
2115     /// might error.
2116     ///
2117     /// ```
2118     /// use std::io;
2119     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2120     /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2121     ///
2122     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2123     ///
2124     /// # #[tokio::main] async fn main() -> io::Result<()> {
2125     /// let mut server = ServerOptions::new();
2126     /// server.max_instances(2);
2127     ///
2128     /// let s1 = server.create(PIPE_NAME)?;
2129     /// let c1 = ClientOptions::new().open(PIPE_NAME);
2130     ///
2131     /// let s2 = server.create(PIPE_NAME)?;
2132     /// let c2 = ClientOptions::new().open(PIPE_NAME);
2133     ///
2134     /// // Too many servers!
2135     /// let e = server.create(PIPE_NAME).unwrap_err();
2136     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2137     ///
2138     /// // Still too many servers even if we specify a higher value!
2139     /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2140     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2141     /// # Ok(()) }
2142     /// ```
2143     ///
2144     /// # Panics
2145     ///
2146     /// This function will panic if more than 254 instances are specified. If
2147     /// you do not wish to set an instance limit, leave it unspecified.
2148     ///
2149     /// ```should_panic
2150     /// use tokio::net::windows::named_pipe::ServerOptions;
2151     ///
2152     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2153     /// let builder = ServerOptions::new().max_instances(255);
2154     /// # Ok(()) }
2155     /// ```
2156     #[track_caller]
max_instances(&mut self, instances: usize) -> &mut Self2157     pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2158         assert!(instances < 255, "cannot specify more than 254 instances");
2159         self.max_instances = instances as u32;
2160         self
2161     }
2162 
2163     /// The number of bytes to reserve for the output buffer.
2164     ///
2165     /// This corresponds to specifying [`nOutBufferSize`].
2166     ///
2167     /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
out_buffer_size(&mut self, buffer: u32) -> &mut Self2168     pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2169         self.out_buffer_size = buffer;
2170         self
2171     }
2172 
2173     /// The number of bytes to reserve for the input buffer.
2174     ///
2175     /// This corresponds to specifying [`nInBufferSize`].
2176     ///
2177     /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
in_buffer_size(&mut self, buffer: u32) -> &mut Self2178     pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2179         self.in_buffer_size = buffer;
2180         self
2181     }
2182 
2183     /// Creates the named pipe identified by `addr` for use as a server.
2184     ///
2185     /// This uses the [`CreateNamedPipe`] function.
2186     ///
2187     /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2188     ///
2189     /// # Errors
2190     ///
2191     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2192     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2193     ///
2194     /// [Tokio Runtime]: crate::runtime::Runtime
2195     /// [enabled I/O]: crate::runtime::Builder::enable_io
2196     ///
2197     /// # Examples
2198     ///
2199     /// ```
2200     /// use tokio::net::windows::named_pipe::ServerOptions;
2201     ///
2202     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2203     ///
2204     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2205     /// let server = ServerOptions::new().create(PIPE_NAME)?;
2206     /// # Ok(()) }
2207     /// ```
create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer>2208     pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2209         // Safety: We're calling create_with_security_attributes_raw w/ a null
2210         // pointer which disables it.
2211         unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2212     }
2213 
2214     /// Creates the named pipe identified by `addr` for use as a server.
2215     ///
2216     /// This is the same as [`create`] except that it supports providing the raw
2217     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2218     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2219     ///
2220     /// # Errors
2221     ///
2222     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2223     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2224     ///
2225     /// [Tokio Runtime]: crate::runtime::Runtime
2226     /// [enabled I/O]: crate::runtime::Builder::enable_io
2227     ///
2228     /// # Safety
2229     ///
2230     /// The `attrs` argument must either be null or point at a valid instance of
2231     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2232     /// behavior is identical to calling the [`create`] method.
2233     ///
2234     /// [`create`]: ServerOptions::create
2235     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2236     /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
create_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeServer>2237     pub unsafe fn create_with_security_attributes_raw(
2238         &self,
2239         addr: impl AsRef<OsStr>,
2240         attrs: *mut c_void,
2241     ) -> io::Result<NamedPipeServer> {
2242         let addr = encode_addr(addr);
2243 
2244         let h = windows_sys::CreateNamedPipeW(
2245             addr.as_ptr(),
2246             self.open_mode,
2247             self.pipe_mode,
2248             self.max_instances,
2249             self.out_buffer_size,
2250             self.in_buffer_size,
2251             self.default_timeout,
2252             attrs as *mut _,
2253         );
2254 
2255         if h == windows_sys::INVALID_HANDLE_VALUE {
2256             return Err(io::Error::last_os_error());
2257         }
2258 
2259         NamedPipeServer::from_raw_handle(h as _)
2260     }
2261 }
2262 
2263 /// A builder suitable for building and interacting with named pipes from the
2264 /// client side.
2265 ///
2266 /// See [`ClientOptions::open`].
2267 #[derive(Debug, Clone)]
2268 pub struct ClientOptions {
2269     desired_access: u32,
2270     security_qos_flags: u32,
2271 }
2272 
2273 impl ClientOptions {
2274     /// Creates a new named pipe builder with the default settings.
2275     ///
2276     /// ```
2277     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2278     ///
2279     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2280     ///
2281     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2282     /// // Server must be created in order for the client creation to succeed.
2283     /// let server = ServerOptions::new().create(PIPE_NAME)?;
2284     /// let client = ClientOptions::new().open(PIPE_NAME)?;
2285     /// # Ok(()) }
2286     /// ```
new() -> Self2287     pub fn new() -> Self {
2288         Self {
2289             desired_access: windows_sys::GENERIC_READ | windows_sys::GENERIC_WRITE,
2290             security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2291                 | windows_sys::SECURITY_SQOS_PRESENT,
2292         }
2293     }
2294 
2295     /// If the client supports reading data. This is enabled by default.
2296     ///
2297     /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2298     ///
2299     /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2300     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
read(&mut self, allowed: bool) -> &mut Self2301     pub fn read(&mut self, allowed: bool) -> &mut Self {
2302         bool_flag!(self.desired_access, allowed, windows_sys::GENERIC_READ);
2303         self
2304     }
2305 
2306     /// If the created pipe supports writing data. This is enabled by default.
2307     ///
2308     /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2309     ///
2310     /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2311     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
write(&mut self, allowed: bool) -> &mut Self2312     pub fn write(&mut self, allowed: bool) -> &mut Self {
2313         bool_flag!(self.desired_access, allowed, windows_sys::GENERIC_WRITE);
2314         self
2315     }
2316 
2317     /// Sets qos flags which are combined with other flags and attributes in the
2318     /// call to [`CreateFile`].
2319     ///
2320     /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2321     /// calling this function would override that value completely with the
2322     /// argument specified.
2323     ///
2324     /// When `security_qos_flags` is not set, a malicious program can gain the
2325     /// elevated privileges of a privileged Rust process when it allows opening
2326     /// user-specified paths, by tricking it into opening a named pipe. So
2327     /// arguably `security_qos_flags` should also be set when opening arbitrary
2328     /// paths. However the bits can then conflict with other flags, specifically
2329     /// `FILE_FLAG_OPEN_NO_RECALL`.
2330     ///
2331     /// For information about possible values, see [Impersonation Levels] on the
2332     /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2333     /// automatically when using this method.
2334     ///
2335     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2336     /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2337     /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
security_qos_flags(&mut self, flags: u32) -> &mut Self2338     pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2339         // See: https://github.com/rust-lang/rust/pull/58216
2340         self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2341         self
2342     }
2343 
2344     /// Opens the named pipe identified by `addr`.
2345     ///
2346     /// This opens the client using [`CreateFile`] with the
2347     /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2348     ///
2349     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2350     ///
2351     /// # Errors
2352     ///
2353     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2354     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2355     ///
2356     /// There are a few errors you need to take into account when creating a
2357     /// named pipe on the client side:
2358     ///
2359     /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2360     ///   does not exist. Presumably the server is not up.
2361     /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2362     ///   but the server is not currently waiting for a connection. Please see the
2363     ///   examples for how to check for this error.
2364     ///
2365     /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2366     /// [enabled I/O]: crate::runtime::Builder::enable_io
2367     /// [Tokio Runtime]: crate::runtime::Runtime
2368     ///
2369     /// A connect loop that waits until a pipe becomes available looks like
2370     /// this:
2371     ///
2372     /// ```no_run
2373     /// use std::time::Duration;
2374     /// use tokio::net::windows::named_pipe::ClientOptions;
2375     /// use tokio::time;
2376     /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2377     ///
2378     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2379     ///
2380     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2381     /// let client = loop {
2382     ///     match ClientOptions::new().open(PIPE_NAME) {
2383     ///         Ok(client) => break client,
2384     ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2385     ///         Err(e) => return Err(e),
2386     ///     }
2387     ///
2388     ///     time::sleep(Duration::from_millis(50)).await;
2389     /// };
2390     ///
2391     /// // use the connected client.
2392     /// # Ok(()) }
2393     /// ```
open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient>2394     pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2395         // Safety: We're calling open_with_security_attributes_raw w/ a null
2396         // pointer which disables it.
2397         unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2398     }
2399 
2400     /// Opens the named pipe identified by `addr`.
2401     ///
2402     /// This is the same as [`open`] except that it supports providing the raw
2403     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2404     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2405     ///
2406     /// # Safety
2407     ///
2408     /// The `attrs` argument must either be null or point at a valid instance of
2409     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2410     /// behavior is identical to calling the [`open`] method.
2411     ///
2412     /// [`open`]: ClientOptions::open
2413     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2414     /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
open_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeClient>2415     pub unsafe fn open_with_security_attributes_raw(
2416         &self,
2417         addr: impl AsRef<OsStr>,
2418         attrs: *mut c_void,
2419     ) -> io::Result<NamedPipeClient> {
2420         let addr = encode_addr(addr);
2421 
2422         // NB: We could use a platform specialized `OpenOptions` here, but since
2423         // we have access to windows_sys it ultimately doesn't hurt to use
2424         // `CreateFile` explicitly since it allows the use of our already
2425         // well-structured wide `addr` to pass into CreateFileW.
2426         let h = windows_sys::CreateFileW(
2427             addr.as_ptr(),
2428             self.desired_access,
2429             0,
2430             attrs as *mut _,
2431             windows_sys::OPEN_EXISTING,
2432             self.get_flags(),
2433             0,
2434         );
2435 
2436         if h == windows_sys::INVALID_HANDLE_VALUE {
2437             return Err(io::Error::last_os_error());
2438         }
2439 
2440         NamedPipeClient::from_raw_handle(h as _)
2441     }
2442 
get_flags(&self) -> u322443     fn get_flags(&self) -> u32 {
2444         self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2445     }
2446 }
2447 
2448 /// The pipe mode of a named pipe.
2449 ///
2450 /// Set through [`ServerOptions::pipe_mode`].
2451 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2452 #[non_exhaustive]
2453 pub enum PipeMode {
2454     /// Data is written to the pipe as a stream of bytes. The pipe does not
2455     /// distinguish bytes written during different write operations.
2456     ///
2457     /// Corresponds to [`PIPE_TYPE_BYTE`].
2458     ///
2459     /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2460     Byte,
2461     /// Data is written to the pipe as a stream of messages. The pipe treats the
2462     /// bytes written during each write operation as a message unit. Any reading
2463     /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2464     /// completely.
2465     ///
2466     /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2467     ///
2468     /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2469     /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2470     Message,
2471 }
2472 
2473 /// Indicates the end of a named pipe.
2474 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2475 #[non_exhaustive]
2476 pub enum PipeEnd {
2477     /// The named pipe refers to the client end of a named pipe instance.
2478     ///
2479     /// Corresponds to [`PIPE_CLIENT_END`].
2480     ///
2481     /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2482     Client,
2483     /// The named pipe refers to the server end of a named pipe instance.
2484     ///
2485     /// Corresponds to [`PIPE_SERVER_END`].
2486     ///
2487     /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2488     Server,
2489 }
2490 
2491 /// Information about a named pipe.
2492 ///
2493 /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2494 #[derive(Debug)]
2495 #[non_exhaustive]
2496 pub struct PipeInfo {
2497     /// Indicates the mode of a named pipe.
2498     pub mode: PipeMode,
2499     /// Indicates the end of a named pipe.
2500     pub end: PipeEnd,
2501     /// The maximum number of instances that can be created for this pipe.
2502     pub max_instances: u32,
2503     /// The number of bytes to reserve for the output buffer.
2504     pub out_buffer_size: u32,
2505     /// The number of bytes to reserve for the input buffer.
2506     pub in_buffer_size: u32,
2507 }
2508 
2509 /// Encodes an address so that it is a null-terminated wide string.
encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]>2510 fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2511     let len = addr.as_ref().encode_wide().count();
2512     let mut vec = Vec::with_capacity(len + 1);
2513     vec.extend(addr.as_ref().encode_wide());
2514     vec.push(0);
2515     vec.into_boxed_slice()
2516 }
2517 
2518 /// Internal function to get the info out of a raw named pipe.
named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo>2519 unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2520     let mut flags = 0;
2521     let mut out_buffer_size = 0;
2522     let mut in_buffer_size = 0;
2523     let mut max_instances = 0;
2524 
2525     let result = windows_sys::GetNamedPipeInfo(
2526         handle as _,
2527         &mut flags,
2528         &mut out_buffer_size,
2529         &mut in_buffer_size,
2530         &mut max_instances,
2531     );
2532 
2533     if result == 0 {
2534         return Err(io::Error::last_os_error());
2535     }
2536 
2537     let mut end = PipeEnd::Client;
2538     let mut mode = PipeMode::Byte;
2539 
2540     if flags & windows_sys::PIPE_SERVER_END != 0 {
2541         end = PipeEnd::Server;
2542     }
2543 
2544     if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2545         mode = PipeMode::Message;
2546     }
2547 
2548     Ok(PipeInfo {
2549         end,
2550         mode,
2551         out_buffer_size,
2552         in_buffer_size,
2553         max_instances,
2554     })
2555 }
2556 
2557 #[cfg(test)]
2558 mod test {
2559     use self::windows_sys::{PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_TYPE_MESSAGE};
2560     use super::*;
2561 
2562     #[test]
opts_default_pipe_mode()2563     fn opts_default_pipe_mode() {
2564         let opts = ServerOptions::new();
2565         assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
2566     }
2567 
2568     #[test]
opts_unset_reject_remote()2569     fn opts_unset_reject_remote() {
2570         let mut opts = ServerOptions::new();
2571         opts.reject_remote_clients(false);
2572         assert_eq!(opts.pipe_mode & PIPE_REJECT_REMOTE_CLIENTS, 0);
2573     }
2574 
2575     #[test]
opts_set_pipe_mode_maintains_reject_remote_clients()2576     fn opts_set_pipe_mode_maintains_reject_remote_clients() {
2577         let mut opts = ServerOptions::new();
2578         opts.pipe_mode(PipeMode::Byte);
2579         assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
2580 
2581         opts.reject_remote_clients(false);
2582         opts.pipe_mode(PipeMode::Byte);
2583         assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE);
2584 
2585         opts.reject_remote_clients(true);
2586         opts.pipe_mode(PipeMode::Byte);
2587         assert_eq!(opts.pipe_mode, PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS);
2588 
2589         opts.reject_remote_clients(false);
2590         opts.pipe_mode(PipeMode::Message);
2591         assert_eq!(opts.pipe_mode, PIPE_TYPE_MESSAGE);
2592 
2593         opts.reject_remote_clients(true);
2594         opts.pipe_mode(PipeMode::Message);
2595         assert_eq!(
2596             opts.pipe_mode,
2597             PIPE_TYPE_MESSAGE | PIPE_REJECT_REMOTE_CLIENTS
2598         );
2599     }
2600 }
2601