• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Tokio support for [Windows named pipes].
2 //!
3 //! [Windows named pipes]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
4 
5 use std::ffi::c_void;
6 use std::ffi::OsStr;
7 use std::io::{self, Read, Write};
8 use std::pin::Pin;
9 use std::ptr;
10 use std::task::{Context, Poll};
11 
12 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
13 use crate::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, RawHandle};
14 
15 cfg_io_util! {
16     use bytes::BufMut;
17 }
18 
19 // Hide imports which are not used when generating documentation.
20 #[cfg(not(docsrs))]
21 mod doc {
22     pub(super) use crate::os::windows::ffi::OsStrExt;
23     pub(super) mod windows_sys {
24         pub(crate) use windows_sys::{
25             Win32::Foundation::*, Win32::Storage::FileSystem::*, Win32::System::Pipes::*,
26             Win32::System::SystemServices::*,
27         };
28     }
29     pub(super) use mio::windows as mio_windows;
30 }
31 
32 // NB: none of these shows up in public API, so don't document them.
33 #[cfg(docsrs)]
34 mod doc {
35     pub(super) mod mio_windows {
36         pub type NamedPipe = crate::doc::NotDefinedHere;
37     }
38 }
39 
40 use self::doc::*;
41 
42 /// A [Windows named pipe] server.
43 ///
44 /// Accepting client connections involves creating a server with
45 /// [`ServerOptions::create`] and waiting for clients to connect using
46 /// [`NamedPipeServer::connect`].
47 ///
48 /// To avoid having clients sporadically fail with
49 /// [`std::io::ErrorKind::NotFound`] when they connect to a server, we must
50 /// ensure that at least one server instance is available at all times. This
51 /// means that the typical listen loop for a server is a bit involved, because
52 /// we have to ensure that we never drop a server accidentally while a client
53 /// might connect.
54 ///
55 /// So a correctly implemented server looks like this:
56 ///
57 /// ```no_run
58 /// use std::io;
59 /// use tokio::net::windows::named_pipe::ServerOptions;
60 ///
61 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-server";
62 ///
63 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
64 /// // The first server needs to be constructed early so that clients can
65 /// // be correctly connected. Otherwise calling .wait will cause the client to
66 /// // error.
67 /// //
68 /// // Here we also make use of `first_pipe_instance`, which will ensure that
69 /// // there are no other servers up and running already.
70 /// let mut server = ServerOptions::new()
71 ///     .first_pipe_instance(true)
72 ///     .create(PIPE_NAME)?;
73 ///
74 /// // Spawn the server loop.
75 /// let server = tokio::spawn(async move {
76 ///     loop {
77 ///         // Wait for a client to connect.
78 ///         let connected = server.connect().await?;
79 ///
80 ///         // Construct the next server to be connected before sending the one
81 ///         // we already have of onto a task. This ensures that the server
82 ///         // isn't closed (after it's done in the task) before a new one is
83 ///         // available. Otherwise the client might error with
84 ///         // `io::ErrorKind::NotFound`.
85 ///         server = ServerOptions::new().create(PIPE_NAME)?;
86 ///
87 ///         let client = tokio::spawn(async move {
88 ///             /* use the connected client */
89 /// #           Ok::<_, std::io::Error>(())
90 ///         });
91 /// #       if true { break } // needed for type inference to work
92 ///     }
93 ///
94 ///     Ok::<_, io::Error>(())
95 /// });
96 ///
97 /// /* do something else not server related here */
98 /// # Ok(()) }
99 /// ```
100 ///
101 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
102 #[derive(Debug)]
103 pub struct NamedPipeServer {
104     io: PollEvented<mio_windows::NamedPipe>,
105 }
106 
107 impl NamedPipeServer {
108     /// Constructs a new named pipe server from the specified raw handle.
109     ///
110     /// This function will consume ownership of the handle given, passing
111     /// responsibility for closing the handle to the returned object.
112     ///
113     /// This function is also unsafe as the primitives currently returned have
114     /// the contract that they are the sole owner of the file descriptor they
115     /// are wrapping. Usage of this function could accidentally allow violating
116     /// this contract which can cause memory unsafety in code that relies on it
117     /// being true.
118     ///
119     /// # Errors
120     ///
121     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
122     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
123     ///
124     /// [Tokio Runtime]: crate::runtime::Runtime
125     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>126     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
127         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
128 
129         Ok(Self {
130             io: PollEvented::new(named_pipe)?,
131         })
132     }
133 
134     /// Retrieves information about the named pipe the server is associated
135     /// with.
136     ///
137     /// ```no_run
138     /// use tokio::net::windows::named_pipe::{PipeEnd, PipeMode, ServerOptions};
139     ///
140     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-info";
141     ///
142     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
143     /// let server = ServerOptions::new()
144     ///     .pipe_mode(PipeMode::Message)
145     ///     .max_instances(5)
146     ///     .create(PIPE_NAME)?;
147     ///
148     /// let server_info = server.info()?;
149     ///
150     /// assert_eq!(server_info.end, PipeEnd::Server);
151     /// assert_eq!(server_info.mode, PipeMode::Message);
152     /// assert_eq!(server_info.max_instances, 5);
153     /// # Ok(()) }
154     /// ```
info(&self) -> io::Result<PipeInfo>155     pub fn info(&self) -> io::Result<PipeInfo> {
156         // Safety: we're ensuring the lifetime of the named pipe.
157         unsafe { named_pipe_info(self.io.as_raw_handle()) }
158     }
159 
160     /// Enables a named pipe server process to wait for a client process to
161     /// connect to an instance of a named pipe. A client process connects by
162     /// creating a named pipe with the same name.
163     ///
164     /// This corresponds to the [`ConnectNamedPipe`] system call.
165     ///
166     /// # Cancel safety
167     ///
168     /// This method is cancellation safe in the sense that if it is used as the
169     /// event in a [`select!`](crate::select) statement and some other branch
170     /// completes first, then no connection events have been lost.
171     ///
172     /// [`ConnectNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-connectnamedpipe
173     ///
174     /// # Example
175     ///
176     /// ```no_run
177     /// use tokio::net::windows::named_pipe::ServerOptions;
178     ///
179     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
180     ///
181     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
182     /// let pipe = ServerOptions::new().create(PIPE_NAME)?;
183     ///
184     /// // Wait for a client to connect.
185     /// pipe.connect().await?;
186     ///
187     /// // Use the connected client...
188     /// # Ok(()) }
189     /// ```
connect(&self) -> io::Result<()>190     pub async fn connect(&self) -> io::Result<()> {
191         match self.io.connect() {
192             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
193                 self.io
194                     .registration()
195                     .async_io(Interest::WRITABLE, || self.io.connect())
196                     .await
197             }
198             x => x,
199         }
200     }
201 
202     /// Disconnects the server end of a named pipe instance from a client
203     /// process.
204     ///
205     /// ```
206     /// use tokio::io::AsyncWriteExt;
207     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
208     /// use windows_sys::Win32::Foundation::ERROR_PIPE_NOT_CONNECTED;
209     ///
210     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-disconnect";
211     ///
212     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
213     /// let server = ServerOptions::new()
214     ///     .create(PIPE_NAME)?;
215     ///
216     /// let mut client = ClientOptions::new()
217     ///     .open(PIPE_NAME)?;
218     ///
219     /// // Wait for a client to become connected.
220     /// server.connect().await?;
221     ///
222     /// // Forcibly disconnect the client.
223     /// server.disconnect()?;
224     ///
225     /// // Write fails with an OS-specific error after client has been
226     /// // disconnected.
227     /// let e = client.write(b"ping").await.unwrap_err();
228     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_NOT_CONNECTED as i32));
229     /// # Ok(()) }
230     /// ```
disconnect(&self) -> io::Result<()>231     pub fn disconnect(&self) -> io::Result<()> {
232         self.io.disconnect()
233     }
234 
235     /// Waits for any of the requested ready states.
236     ///
237     /// This function is usually paired with `try_read()` or `try_write()`. It
238     /// can be used to concurrently read / write to the same pipe on a single
239     /// task without splitting the pipe.
240     ///
241     /// The function may complete without the pipe being ready. This is a
242     /// false-positive and attempting an operation will return with
243     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
244     /// [`Ready`] set, so you should always check the returned value and possibly
245     /// wait again if the requested states are not set.
246     ///
247     /// # Examples
248     ///
249     /// Concurrently read and write to the pipe on the same task without
250     /// splitting.
251     ///
252     /// ```no_run
253     /// use tokio::io::Interest;
254     /// use tokio::net::windows::named_pipe;
255     /// use std::error::Error;
256     /// use std::io;
257     ///
258     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-ready";
259     ///
260     /// #[tokio::main]
261     /// async fn main() -> Result<(), Box<dyn Error>> {
262     ///     let server = named_pipe::ServerOptions::new()
263     ///         .create(PIPE_NAME)?;
264     ///
265     ///     loop {
266     ///         let ready = server.ready(Interest::READABLE | Interest::WRITABLE).await?;
267     ///
268     ///         if ready.is_readable() {
269     ///             let mut data = vec![0; 1024];
270     ///             // Try to read data, this may still fail with `WouldBlock`
271     ///             // if the readiness event is a false positive.
272     ///             match server.try_read(&mut data) {
273     ///                 Ok(n) => {
274     ///                     println!("read {} bytes", n);
275     ///                 }
276     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
277     ///                     continue;
278     ///                 }
279     ///                 Err(e) => {
280     ///                     return Err(e.into());
281     ///                 }
282     ///             }
283     ///         }
284     ///
285     ///         if ready.is_writable() {
286     ///             // Try to write data, this may still fail with `WouldBlock`
287     ///             // if the readiness event is a false positive.
288     ///             match server.try_write(b"hello world") {
289     ///                 Ok(n) => {
290     ///                     println!("write {} bytes", n);
291     ///                 }
292     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
293     ///                     continue;
294     ///                 }
295     ///                 Err(e) => {
296     ///                     return Err(e.into());
297     ///                 }
298     ///             }
299     ///         }
300     ///     }
301     /// }
302     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>303     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
304         let event = self.io.registration().readiness(interest).await?;
305         Ok(event.ready)
306     }
307 
308     /// Waits for the pipe to become readable.
309     ///
310     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
311     /// paired with `try_read()`.
312     ///
313     /// # Examples
314     ///
315     /// ```no_run
316     /// use tokio::net::windows::named_pipe;
317     /// use std::error::Error;
318     /// use std::io;
319     ///
320     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-readable";
321     ///
322     /// #[tokio::main]
323     /// async fn main() -> Result<(), Box<dyn Error>> {
324     ///     let server = named_pipe::ServerOptions::new()
325     ///         .create(PIPE_NAME)?;
326     ///
327     ///     let mut msg = vec![0; 1024];
328     ///
329     ///     loop {
330     ///         // Wait for the pipe to be readable
331     ///         server.readable().await?;
332     ///
333     ///         // Try to read data, this may still fail with `WouldBlock`
334     ///         // if the readiness event is a false positive.
335     ///         match server.try_read(&mut msg) {
336     ///             Ok(n) => {
337     ///                 msg.truncate(n);
338     ///                 break;
339     ///             }
340     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
341     ///                 continue;
342     ///             }
343     ///             Err(e) => {
344     ///                 return Err(e.into());
345     ///             }
346     ///         }
347     ///     }
348     ///
349     ///     println!("GOT = {:?}", msg);
350     ///     Ok(())
351     /// }
352     /// ```
readable(&self) -> io::Result<()>353     pub async fn readable(&self) -> io::Result<()> {
354         self.ready(Interest::READABLE).await?;
355         Ok(())
356     }
357 
358     /// Polls for read readiness.
359     ///
360     /// If the pipe is not currently ready for reading, this method will
361     /// store a clone of the `Waker` from the provided `Context`. When the pipe
362     /// becomes ready for reading, `Waker::wake` will be called on the waker.
363     ///
364     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
365     /// the `Waker` from the `Context` passed to the most recent call is
366     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
367     /// second, independent waker.)
368     ///
369     /// This function is intended for cases where creating and pinning a future
370     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
371     /// preferred, as this supports polling from multiple tasks at once.
372     ///
373     /// # Return value
374     ///
375     /// The function returns:
376     ///
377     /// * `Poll::Pending` if the pipe is not ready for reading.
378     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
379     /// * `Poll::Ready(Err(e))` if an error is encountered.
380     ///
381     /// # Errors
382     ///
383     /// This function may encounter any standard I/O error except `WouldBlock`.
384     ///
385     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>386     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
387         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
388     }
389 
390     /// Tries to read data from the pipe into the provided buffer, returning how
391     /// many bytes were read.
392     ///
393     /// Receives any pending data from the pipe but does not wait for new data
394     /// to arrive. On success, returns the number of bytes read. Because
395     /// `try_read()` is non-blocking, the buffer does not have to be stored by
396     /// the async task and can exist entirely on the stack.
397     ///
398     /// Usually, [`readable()`] or [`ready()`] is used with this function.
399     ///
400     /// [`readable()`]: NamedPipeServer::readable()
401     /// [`ready()`]: NamedPipeServer::ready()
402     ///
403     /// # Return
404     ///
405     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
406     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
407     ///
408     /// 1. The pipe's read half is closed and will no longer yield data.
409     /// 2. The specified buffer was 0 bytes in length.
410     ///
411     /// If the pipe is not ready to read data,
412     /// `Err(io::ErrorKind::WouldBlock)` is returned.
413     ///
414     /// # Examples
415     ///
416     /// ```no_run
417     /// use tokio::net::windows::named_pipe;
418     /// use std::error::Error;
419     /// use std::io;
420     ///
421     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read";
422     ///
423     /// #[tokio::main]
424     /// async fn main() -> Result<(), Box<dyn Error>> {
425     ///     let server = named_pipe::ServerOptions::new()
426     ///         .create(PIPE_NAME)?;
427     ///
428     ///     loop {
429     ///         // Wait for the pipe to be readable
430     ///         server.readable().await?;
431     ///
432     ///         // Creating the buffer **after** the `await` prevents it from
433     ///         // being stored in the async task.
434     ///         let mut buf = [0; 4096];
435     ///
436     ///         // Try to read data, this may still fail with `WouldBlock`
437     ///         // if the readiness event is a false positive.
438     ///         match server.try_read(&mut buf) {
439     ///             Ok(0) => break,
440     ///             Ok(n) => {
441     ///                 println!("read {} bytes", n);
442     ///             }
443     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
444     ///                 continue;
445     ///             }
446     ///             Err(e) => {
447     ///                 return Err(e.into());
448     ///             }
449     ///         }
450     ///     }
451     ///
452     ///     Ok(())
453     /// }
454     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>455     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
456         self.io
457             .registration()
458             .try_io(Interest::READABLE, || (&*self.io).read(buf))
459     }
460 
461     /// Tries to read data from the pipe into the provided buffers, returning
462     /// how many bytes were read.
463     ///
464     /// Data is copied to fill each buffer in order, with the final buffer
465     /// written to possibly being only partially filled. This method behaves
466     /// equivalently to a single call to [`try_read()`] with concatenated
467     /// buffers.
468     ///
469     /// Receives any pending data from the pipe but does not wait for new data
470     /// to arrive. On success, returns the number of bytes read. Because
471     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
472     /// stored by the async task and can exist entirely on the stack.
473     ///
474     /// Usually, [`readable()`] or [`ready()`] is used with this function.
475     ///
476     /// [`try_read()`]: NamedPipeServer::try_read()
477     /// [`readable()`]: NamedPipeServer::readable()
478     /// [`ready()`]: NamedPipeServer::ready()
479     ///
480     /// # Return
481     ///
482     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
483     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
484     /// and will no longer yield data. If the pipe is not ready to read data
485     /// `Err(io::ErrorKind::WouldBlock)` is returned.
486     ///
487     /// # Examples
488     ///
489     /// ```no_run
490     /// use tokio::net::windows::named_pipe;
491     /// use std::error::Error;
492     /// use std::io::{self, IoSliceMut};
493     ///
494     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-read-vectored";
495     ///
496     /// #[tokio::main]
497     /// async fn main() -> Result<(), Box<dyn Error>> {
498     ///     let server = named_pipe::ServerOptions::new()
499     ///         .create(PIPE_NAME)?;
500     ///
501     ///     loop {
502     ///         // Wait for the pipe to be readable
503     ///         server.readable().await?;
504     ///
505     ///         // Creating the buffer **after** the `await` prevents it from
506     ///         // being stored in the async task.
507     ///         let mut buf_a = [0; 512];
508     ///         let mut buf_b = [0; 1024];
509     ///         let mut bufs = [
510     ///             IoSliceMut::new(&mut buf_a),
511     ///             IoSliceMut::new(&mut buf_b),
512     ///         ];
513     ///
514     ///         // Try to read data, this may still fail with `WouldBlock`
515     ///         // if the readiness event is a false positive.
516     ///         match server.try_read_vectored(&mut bufs) {
517     ///             Ok(0) => break,
518     ///             Ok(n) => {
519     ///                 println!("read {} bytes", n);
520     ///             }
521     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
522     ///                 continue;
523     ///             }
524     ///             Err(e) => {
525     ///                 return Err(e.into());
526     ///             }
527     ///         }
528     ///     }
529     ///
530     ///     Ok(())
531     /// }
532     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>533     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
534         self.io
535             .registration()
536             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
537     }
538 
539     cfg_io_util! {
540         /// Tries to read data from the stream into the provided buffer, advancing the
541         /// buffer's internal cursor, returning how many bytes were read.
542         ///
543         /// Receives any pending data from the pipe but does not wait for new data
544         /// to arrive. On success, returns the number of bytes read. Because
545         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
546         /// the async task and can exist entirely on the stack.
547         ///
548         /// Usually, [`readable()`] or [`ready()`] is used with this function.
549         ///
550         /// [`readable()`]: NamedPipeServer::readable()
551         /// [`ready()`]: NamedPipeServer::ready()
552         ///
553         /// # Return
554         ///
555         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
556         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
557         /// and will no longer yield data. If the stream is not ready to read data
558         /// `Err(io::ErrorKind::WouldBlock)` is returned.
559         ///
560         /// # Examples
561         ///
562         /// ```no_run
563         /// use tokio::net::windows::named_pipe;
564         /// use std::error::Error;
565         /// use std::io;
566         ///
567         /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
568         ///
569         /// #[tokio::main]
570         /// async fn main() -> Result<(), Box<dyn Error>> {
571         ///     let server = named_pipe::ServerOptions::new().create(PIPE_NAME)?;
572         ///
573         ///     loop {
574         ///         // Wait for the pipe to be readable
575         ///         server.readable().await?;
576         ///
577         ///         let mut buf = Vec::with_capacity(4096);
578         ///
579         ///         // Try to read data, this may still fail with `WouldBlock`
580         ///         // if the readiness event is a false positive.
581         ///         match server.try_read_buf(&mut buf) {
582         ///             Ok(0) => break,
583         ///             Ok(n) => {
584         ///                 println!("read {} bytes", n);
585         ///             }
586         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
587         ///                 continue;
588         ///             }
589         ///             Err(e) => {
590         ///                 return Err(e.into());
591         ///             }
592         ///         }
593         ///     }
594         ///
595         ///     Ok(())
596         /// }
597         /// ```
598         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
599             self.io.registration().try_io(Interest::READABLE, || {
600                 use std::io::Read;
601 
602                 let dst = buf.chunk_mut();
603                 let dst =
604                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
605 
606                 // Safety: We trust `NamedPipeServer::read` to have filled up `n` bytes in the
607                 // buffer.
608                 let n = (&*self.io).read(dst)?;
609 
610                 unsafe {
611                     buf.advance_mut(n);
612                 }
613 
614                 Ok(n)
615             })
616         }
617     }
618 
619     /// Waits for the pipe to become writable.
620     ///
621     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
622     /// paired with `try_write()`.
623     ///
624     /// # Examples
625     ///
626     /// ```no_run
627     /// use tokio::net::windows::named_pipe;
628     /// use std::error::Error;
629     /// use std::io;
630     ///
631     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-writable";
632     ///
633     /// #[tokio::main]
634     /// async fn main() -> Result<(), Box<dyn Error>> {
635     ///     let server = named_pipe::ServerOptions::new()
636     ///         .create(PIPE_NAME)?;
637     ///
638     ///     loop {
639     ///         // Wait for the pipe to be writable
640     ///         server.writable().await?;
641     ///
642     ///         // Try to write data, this may still fail with `WouldBlock`
643     ///         // if the readiness event is a false positive.
644     ///         match server.try_write(b"hello world") {
645     ///             Ok(n) => {
646     ///                 break;
647     ///             }
648     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
649     ///                 continue;
650     ///             }
651     ///             Err(e) => {
652     ///                 return Err(e.into());
653     ///             }
654     ///         }
655     ///     }
656     ///
657     ///     Ok(())
658     /// }
659     /// ```
writable(&self) -> io::Result<()>660     pub async fn writable(&self) -> io::Result<()> {
661         self.ready(Interest::WRITABLE).await?;
662         Ok(())
663     }
664 
665     /// Polls for write readiness.
666     ///
667     /// If the pipe is not currently ready for writing, this method will
668     /// store a clone of the `Waker` from the provided `Context`. When the pipe
669     /// becomes ready for writing, `Waker::wake` will be called on the waker.
670     ///
671     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
672     /// the `Waker` from the `Context` passed to the most recent call is
673     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
674     /// second, independent waker.)
675     ///
676     /// This function is intended for cases where creating and pinning a future
677     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
678     /// preferred, as this supports polling from multiple tasks at once.
679     ///
680     /// # Return value
681     ///
682     /// The function returns:
683     ///
684     /// * `Poll::Pending` if the pipe is not ready for writing.
685     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
686     /// * `Poll::Ready(Err(e))` if an error is encountered.
687     ///
688     /// # Errors
689     ///
690     /// This function may encounter any standard I/O error except `WouldBlock`.
691     ///
692     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>693     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
694         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
695     }
696 
697     /// Tries to write a buffer to the pipe, returning how many bytes were
698     /// written.
699     ///
700     /// The function will attempt to write the entire contents of `buf`, but
701     /// only part of the buffer may be written.
702     ///
703     /// This function is usually paired with `writable()`.
704     ///
705     /// # Return
706     ///
707     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
708     /// number of bytes written. If the pipe is not ready to write data,
709     /// `Err(io::ErrorKind::WouldBlock)` is returned.
710     ///
711     /// # Examples
712     ///
713     /// ```no_run
714     /// use tokio::net::windows::named_pipe;
715     /// use std::error::Error;
716     /// use std::io;
717     ///
718     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write";
719     ///
720     /// #[tokio::main]
721     /// async fn main() -> Result<(), Box<dyn Error>> {
722     ///     let server = named_pipe::ServerOptions::new()
723     ///         .create(PIPE_NAME)?;
724     ///
725     ///     loop {
726     ///         // Wait for the pipe to be writable
727     ///         server.writable().await?;
728     ///
729     ///         // Try to write data, this may still fail with `WouldBlock`
730     ///         // if the readiness event is a false positive.
731     ///         match server.try_write(b"hello world") {
732     ///             Ok(n) => {
733     ///                 break;
734     ///             }
735     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
736     ///                 continue;
737     ///             }
738     ///             Err(e) => {
739     ///                 return Err(e.into());
740     ///             }
741     ///         }
742     ///     }
743     ///
744     ///     Ok(())
745     /// }
746     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>747     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
748         self.io
749             .registration()
750             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
751     }
752 
753     /// Tries to write several buffers to the pipe, returning how many bytes
754     /// were written.
755     ///
756     /// Data is written from each buffer in order, with the final buffer read
757     /// from possible being only partially consumed. This method behaves
758     /// equivalently to a single call to [`try_write()`] with concatenated
759     /// buffers.
760     ///
761     /// This function is usually paired with `writable()`.
762     ///
763     /// [`try_write()`]: NamedPipeServer::try_write()
764     ///
765     /// # Return
766     ///
767     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
768     /// number of bytes written. If the pipe is not ready to write data,
769     /// `Err(io::ErrorKind::WouldBlock)` is returned.
770     ///
771     /// # Examples
772     ///
773     /// ```no_run
774     /// use tokio::net::windows::named_pipe;
775     /// use std::error::Error;
776     /// use std::io;
777     ///
778     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-server-try-write-vectored";
779     ///
780     /// #[tokio::main]
781     /// async fn main() -> Result<(), Box<dyn Error>> {
782     ///     let server = named_pipe::ServerOptions::new()
783     ///         .create(PIPE_NAME)?;
784     ///
785     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
786     ///
787     ///     loop {
788     ///         // Wait for the pipe to be writable
789     ///         server.writable().await?;
790     ///
791     ///         // Try to write data, this may still fail with `WouldBlock`
792     ///         // if the readiness event is a false positive.
793     ///         match server.try_write_vectored(&bufs) {
794     ///             Ok(n) => {
795     ///                 break;
796     ///             }
797     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
798     ///                 continue;
799     ///             }
800     ///             Err(e) => {
801     ///                 return Err(e.into());
802     ///             }
803     ///         }
804     ///     }
805     ///
806     ///     Ok(())
807     /// }
808     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>809     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
810         self.io
811             .registration()
812             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
813     }
814 
815     /// Tries to read or write from the pipe using a user-provided IO operation.
816     ///
817     /// If the pipe is ready, the provided closure is called. The closure
818     /// should attempt to perform IO operation from the pipe by manually
819     /// calling the appropriate syscall. If the operation fails because the
820     /// pipe is not actually ready, then the closure should return a
821     /// `WouldBlock` error and the readiness flag is cleared. The return value
822     /// of the closure is then returned by `try_io`.
823     ///
824     /// If the pipe is not ready, then the closure is not called
825     /// and a `WouldBlock` error is returned.
826     ///
827     /// The closure should only return a `WouldBlock` error if it has performed
828     /// an IO operation on the pipe that failed due to the pipe not being
829     /// ready. Returning a `WouldBlock` error in any other situation will
830     /// incorrectly clear the readiness flag, which can cause the pipe to
831     /// behave incorrectly.
832     ///
833     /// The closure should not perform the IO operation using any of the
834     /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with
835     /// the readiness flag and can cause the pipe to behave incorrectly.
836     ///
837     /// This method is not intended to be used with combined interests.
838     /// The closure should perform only one type of IO operation, so it should not
839     /// require more than one ready state. This method may panic or sleep forever
840     /// if it is called with a combined interest.
841     ///
842     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
843     ///
844     /// [`readable()`]: NamedPipeServer::readable()
845     /// [`writable()`]: NamedPipeServer::writable()
846     /// [`ready()`]: NamedPipeServer::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>847     pub fn try_io<R>(
848         &self,
849         interest: Interest,
850         f: impl FnOnce() -> io::Result<R>,
851     ) -> io::Result<R> {
852         self.io.registration().try_io(interest, f)
853     }
854 
855     /// Reads or writes from the pipe using a user-provided IO operation.
856     ///
857     /// The readiness of the pipe is awaited and when the pipe is ready,
858     /// the provided closure is called. The closure should attempt to perform
859     /// IO operation on the pipe by manually calling the appropriate syscall.
860     /// If the operation fails because the pipe is not actually ready,
861     /// then the closure should return a `WouldBlock` error. In such case the
862     /// readiness flag is cleared and the pipe readiness is awaited again.
863     /// This loop is repeated until the closure returns an `Ok` or an error
864     /// other than `WouldBlock`.
865     ///
866     /// The closure should only return a `WouldBlock` error if it has performed
867     /// an IO operation on the pipe that failed due to the pipe not being
868     /// ready. Returning a `WouldBlock` error in any other situation will
869     /// incorrectly clear the readiness flag, which can cause the pipe to
870     /// behave incorrectly.
871     ///
872     /// The closure should not perform the IO operation using any of the methods
873     /// defined on the Tokio `NamedPipeServer` type, as this will mess with the
874     /// readiness flag and can cause the pipe to behave incorrectly.
875     ///
876     /// This method is not intended to be used with combined interests.
877     /// The closure should perform only one type of IO operation, so it should not
878     /// require more than one ready state. This method may panic or sleep forever
879     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>880     pub async fn async_io<R>(
881         &self,
882         interest: Interest,
883         f: impl FnMut() -> io::Result<R>,
884     ) -> io::Result<R> {
885         self.io.registration().async_io(interest, f).await
886     }
887 }
888 
889 impl AsyncRead for NamedPipeServer {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>890     fn poll_read(
891         self: Pin<&mut Self>,
892         cx: &mut Context<'_>,
893         buf: &mut ReadBuf<'_>,
894     ) -> Poll<io::Result<()>> {
895         unsafe { self.io.poll_read(cx, buf) }
896     }
897 }
898 
899 impl AsyncWrite for NamedPipeServer {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>900     fn poll_write(
901         self: Pin<&mut Self>,
902         cx: &mut Context<'_>,
903         buf: &[u8],
904     ) -> Poll<io::Result<usize>> {
905         self.io.poll_write(cx, buf)
906     }
907 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>908     fn poll_write_vectored(
909         self: Pin<&mut Self>,
910         cx: &mut Context<'_>,
911         bufs: &[io::IoSlice<'_>],
912     ) -> Poll<io::Result<usize>> {
913         self.io.poll_write_vectored(cx, bufs)
914     }
915 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>916     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
917         Poll::Ready(Ok(()))
918     }
919 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>920     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
921         self.poll_flush(cx)
922     }
923 }
924 
925 impl AsRawHandle for NamedPipeServer {
as_raw_handle(&self) -> RawHandle926     fn as_raw_handle(&self) -> RawHandle {
927         self.io.as_raw_handle()
928     }
929 }
930 
931 impl AsHandle for NamedPipeServer {
as_handle(&self) -> BorrowedHandle<'_>932     fn as_handle(&self) -> BorrowedHandle<'_> {
933         unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
934     }
935 }
936 
937 /// A [Windows named pipe] client.
938 ///
939 /// Constructed using [`ClientOptions::open`].
940 ///
941 /// Connecting a client correctly involves a few steps. When connecting through
942 /// [`ClientOptions::open`], it might error indicating one of two things:
943 ///
944 /// * [`std::io::ErrorKind::NotFound`] - There is no server available.
945 /// * [`ERROR_PIPE_BUSY`] - There is a server available, but it is busy. Sleep
946 ///   for a while and try again.
947 ///
948 /// So a correctly implemented client looks like this:
949 ///
950 /// ```no_run
951 /// use std::time::Duration;
952 /// use tokio::net::windows::named_pipe::ClientOptions;
953 /// use tokio::time;
954 /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
955 ///
956 /// const PIPE_NAME: &str = r"\\.\pipe\named-pipe-idiomatic-client";
957 ///
958 /// # #[tokio::main] async fn main() -> std::io::Result<()> {
959 /// let client = loop {
960 ///     match ClientOptions::new().open(PIPE_NAME) {
961 ///         Ok(client) => break client,
962 ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
963 ///         Err(e) => return Err(e),
964 ///     }
965 ///
966 ///     time::sleep(Duration::from_millis(50)).await;
967 /// };
968 ///
969 /// /* use the connected client */
970 /// # Ok(()) }
971 /// ```
972 ///
973 /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
974 /// [Windows named pipe]: https://docs.microsoft.com/en-us/windows/win32/ipc/named-pipes
975 #[derive(Debug)]
976 pub struct NamedPipeClient {
977     io: PollEvented<mio_windows::NamedPipe>,
978 }
979 
980 impl NamedPipeClient {
981     /// Constructs a new named pipe client from the specified raw handle.
982     ///
983     /// This function will consume ownership of the handle given, passing
984     /// responsibility for closing the handle to the returned object.
985     ///
986     /// This function is also unsafe as the primitives currently returned have
987     /// the contract that they are the sole owner of the file descriptor they
988     /// are wrapping. Usage of this function could accidentally allow violating
989     /// this contract which can cause memory unsafety in code that relies on it
990     /// being true.
991     ///
992     /// # Errors
993     ///
994     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
995     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
996     ///
997     /// [Tokio Runtime]: crate::runtime::Runtime
998     /// [enabled I/O]: crate::runtime::Builder::enable_io
from_raw_handle(handle: RawHandle) -> io::Result<Self>999     pub unsafe fn from_raw_handle(handle: RawHandle) -> io::Result<Self> {
1000         let named_pipe = mio_windows::NamedPipe::from_raw_handle(handle);
1001 
1002         Ok(Self {
1003             io: PollEvented::new(named_pipe)?,
1004         })
1005     }
1006 
1007     /// Retrieves information about the named pipe the client is associated
1008     /// with.
1009     ///
1010     /// ```no_run
1011     /// use tokio::net::windows::named_pipe::{ClientOptions, PipeEnd, PipeMode};
1012     ///
1013     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-info";
1014     ///
1015     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1016     /// let client = ClientOptions::new()
1017     ///     .open(PIPE_NAME)?;
1018     ///
1019     /// let client_info = client.info()?;
1020     ///
1021     /// assert_eq!(client_info.end, PipeEnd::Client);
1022     /// assert_eq!(client_info.mode, PipeMode::Message);
1023     /// assert_eq!(client_info.max_instances, 5);
1024     /// # Ok(()) }
1025     /// ```
info(&self) -> io::Result<PipeInfo>1026     pub fn info(&self) -> io::Result<PipeInfo> {
1027         // Safety: we're ensuring the lifetime of the named pipe.
1028         unsafe { named_pipe_info(self.io.as_raw_handle()) }
1029     }
1030 
1031     /// Waits for any of the requested ready states.
1032     ///
1033     /// This function is usually paired with `try_read()` or `try_write()`. It
1034     /// can be used to concurrently read / write to the same pipe on a single
1035     /// task without splitting the pipe.
1036     ///
1037     /// The function may complete without the pipe being ready. This is a
1038     /// false-positive and attempting an operation will return with
1039     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
1040     /// [`Ready`] set, so you should always check the returned value and possibly
1041     /// wait again if the requested states are not set.
1042     ///
1043     /// # Examples
1044     ///
1045     /// Concurrently read and write to the pipe on the same task without
1046     /// splitting.
1047     ///
1048     /// ```no_run
1049     /// use tokio::io::Interest;
1050     /// use tokio::net::windows::named_pipe;
1051     /// use std::error::Error;
1052     /// use std::io;
1053     ///
1054     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-ready";
1055     ///
1056     /// #[tokio::main]
1057     /// async fn main() -> Result<(), Box<dyn Error>> {
1058     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1059     ///
1060     ///     loop {
1061     ///         let ready = client.ready(Interest::READABLE | Interest::WRITABLE).await?;
1062     ///
1063     ///         if ready.is_readable() {
1064     ///             let mut data = vec![0; 1024];
1065     ///             // Try to read data, this may still fail with `WouldBlock`
1066     ///             // if the readiness event is a false positive.
1067     ///             match client.try_read(&mut data) {
1068     ///                 Ok(n) => {
1069     ///                     println!("read {} bytes", n);
1070     ///                 }
1071     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1072     ///                     continue;
1073     ///                 }
1074     ///                 Err(e) => {
1075     ///                     return Err(e.into());
1076     ///                 }
1077     ///             }
1078     ///         }
1079     ///
1080     ///         if ready.is_writable() {
1081     ///             // Try to write data, this may still fail with `WouldBlock`
1082     ///             // if the readiness event is a false positive.
1083     ///             match client.try_write(b"hello world") {
1084     ///                 Ok(n) => {
1085     ///                     println!("write {} bytes", n);
1086     ///                 }
1087     ///                 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1088     ///                     continue;
1089     ///                 }
1090     ///                 Err(e) => {
1091     ///                     return Err(e.into());
1092     ///                 }
1093     ///             }
1094     ///         }
1095     ///     }
1096     /// }
1097     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>1098     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
1099         let event = self.io.registration().readiness(interest).await?;
1100         Ok(event.ready)
1101     }
1102 
1103     /// Waits for the pipe to become readable.
1104     ///
1105     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
1106     /// paired with `try_read()`.
1107     ///
1108     /// # Examples
1109     ///
1110     /// ```no_run
1111     /// use tokio::net::windows::named_pipe;
1112     /// use std::error::Error;
1113     /// use std::io;
1114     ///
1115     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1116     ///
1117     /// #[tokio::main]
1118     /// async fn main() -> Result<(), Box<dyn Error>> {
1119     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1120     ///
1121     ///     let mut msg = vec![0; 1024];
1122     ///
1123     ///     loop {
1124     ///         // Wait for the pipe to be readable
1125     ///         client.readable().await?;
1126     ///
1127     ///         // Try to read data, this may still fail with `WouldBlock`
1128     ///         // if the readiness event is a false positive.
1129     ///         match client.try_read(&mut msg) {
1130     ///             Ok(n) => {
1131     ///                 msg.truncate(n);
1132     ///                 break;
1133     ///             }
1134     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1135     ///                 continue;
1136     ///             }
1137     ///             Err(e) => {
1138     ///                 return Err(e.into());
1139     ///             }
1140     ///         }
1141     ///     }
1142     ///
1143     ///     println!("GOT = {:?}", msg);
1144     ///     Ok(())
1145     /// }
1146     /// ```
readable(&self) -> io::Result<()>1147     pub async fn readable(&self) -> io::Result<()> {
1148         self.ready(Interest::READABLE).await?;
1149         Ok(())
1150     }
1151 
1152     /// Polls for read readiness.
1153     ///
1154     /// If the pipe is not currently ready for reading, this method will
1155     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1156     /// becomes ready for reading, `Waker::wake` will be called on the waker.
1157     ///
1158     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
1159     /// the `Waker` from the `Context` passed to the most recent call is
1160     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
1161     /// second, independent waker.)
1162     ///
1163     /// This function is intended for cases where creating and pinning a future
1164     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
1165     /// preferred, as this supports polling from multiple tasks at once.
1166     ///
1167     /// # Return value
1168     ///
1169     /// The function returns:
1170     ///
1171     /// * `Poll::Pending` if the pipe is not ready for reading.
1172     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
1173     /// * `Poll::Ready(Err(e))` if an error is encountered.
1174     ///
1175     /// # Errors
1176     ///
1177     /// This function may encounter any standard I/O error except `WouldBlock`.
1178     ///
1179     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1180     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1181         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
1182     }
1183 
1184     /// Tries to read data from the pipe into the provided buffer, returning how
1185     /// many bytes were read.
1186     ///
1187     /// Receives any pending data from the pipe but does not wait for new data
1188     /// to arrive. On success, returns the number of bytes read. Because
1189     /// `try_read()` is non-blocking, the buffer does not have to be stored by
1190     /// the async task and can exist entirely on the stack.
1191     ///
1192     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1193     ///
1194     /// [`readable()`]: NamedPipeClient::readable()
1195     /// [`ready()`]: NamedPipeClient::ready()
1196     ///
1197     /// # Return
1198     ///
1199     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1200     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
1201     ///
1202     /// 1. The pipe's read half is closed and will no longer yield data.
1203     /// 2. The specified buffer was 0 bytes in length.
1204     ///
1205     /// If the pipe is not ready to read data,
1206     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1207     ///
1208     /// # Examples
1209     ///
1210     /// ```no_run
1211     /// use tokio::net::windows::named_pipe;
1212     /// use std::error::Error;
1213     /// use std::io;
1214     ///
1215     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read";
1216     ///
1217     /// #[tokio::main]
1218     /// async fn main() -> Result<(), Box<dyn Error>> {
1219     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1220     ///
1221     ///     loop {
1222     ///         // Wait for the pipe to be readable
1223     ///         client.readable().await?;
1224     ///
1225     ///         // Creating the buffer **after** the `await` prevents it from
1226     ///         // being stored in the async task.
1227     ///         let mut buf = [0; 4096];
1228     ///
1229     ///         // Try to read data, this may still fail with `WouldBlock`
1230     ///         // if the readiness event is a false positive.
1231     ///         match client.try_read(&mut buf) {
1232     ///             Ok(0) => break,
1233     ///             Ok(n) => {
1234     ///                 println!("read {} bytes", n);
1235     ///             }
1236     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1237     ///                 continue;
1238     ///             }
1239     ///             Err(e) => {
1240     ///                 return Err(e.into());
1241     ///             }
1242     ///         }
1243     ///     }
1244     ///
1245     ///     Ok(())
1246     /// }
1247     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>1248     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
1249         self.io
1250             .registration()
1251             .try_io(Interest::READABLE, || (&*self.io).read(buf))
1252     }
1253 
1254     /// Tries to read data from the pipe into the provided buffers, returning
1255     /// how many bytes were read.
1256     ///
1257     /// Data is copied to fill each buffer in order, with the final buffer
1258     /// written to possibly being only partially filled. This method behaves
1259     /// equivalently to a single call to [`try_read()`] with concatenated
1260     /// buffers.
1261     ///
1262     /// Receives any pending data from the pipe but does not wait for new data
1263     /// to arrive. On success, returns the number of bytes read. Because
1264     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1265     /// stored by the async task and can exist entirely on the stack.
1266     ///
1267     /// Usually, [`readable()`] or [`ready()`] is used with this function.
1268     ///
1269     /// [`try_read()`]: NamedPipeClient::try_read()
1270     /// [`readable()`]: NamedPipeClient::readable()
1271     /// [`ready()`]: NamedPipeClient::ready()
1272     ///
1273     /// # Return
1274     ///
1275     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1276     /// number of bytes read. `Ok(0)` indicates the pipe's read half is closed
1277     /// and will no longer yield data. If the pipe is not ready to read data
1278     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1279     ///
1280     /// # Examples
1281     ///
1282     /// ```no_run
1283     /// use tokio::net::windows::named_pipe;
1284     /// use std::error::Error;
1285     /// use std::io::{self, IoSliceMut};
1286     ///
1287     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-read-vectored";
1288     ///
1289     /// #[tokio::main]
1290     /// async fn main() -> Result<(), Box<dyn Error>> {
1291     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1292     ///
1293     ///     loop {
1294     ///         // Wait for the pipe to be readable
1295     ///         client.readable().await?;
1296     ///
1297     ///         // Creating the buffer **after** the `await` prevents it from
1298     ///         // being stored in the async task.
1299     ///         let mut buf_a = [0; 512];
1300     ///         let mut buf_b = [0; 1024];
1301     ///         let mut bufs = [
1302     ///             IoSliceMut::new(&mut buf_a),
1303     ///             IoSliceMut::new(&mut buf_b),
1304     ///         ];
1305     ///
1306     ///         // Try to read data, this may still fail with `WouldBlock`
1307     ///         // if the readiness event is a false positive.
1308     ///         match client.try_read_vectored(&mut bufs) {
1309     ///             Ok(0) => break,
1310     ///             Ok(n) => {
1311     ///                 println!("read {} bytes", n);
1312     ///             }
1313     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1314     ///                 continue;
1315     ///             }
1316     ///             Err(e) => {
1317     ///                 return Err(e.into());
1318     ///             }
1319     ///         }
1320     ///     }
1321     ///
1322     ///     Ok(())
1323     /// }
1324     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1325     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1326         self.io
1327             .registration()
1328             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1329     }
1330 
1331     cfg_io_util! {
1332         /// Tries to read data from the stream into the provided buffer, advancing the
1333         /// buffer's internal cursor, returning how many bytes were read.
1334         ///
1335         /// Receives any pending data from the pipe but does not wait for new data
1336         /// to arrive. On success, returns the number of bytes read. Because
1337         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1338         /// the async task and can exist entirely on the stack.
1339         ///
1340         /// Usually, [`readable()`] or [`ready()`] is used with this function.
1341         ///
1342         /// [`readable()`]: NamedPipeClient::readable()
1343         /// [`ready()`]: NamedPipeClient::ready()
1344         ///
1345         /// # Return
1346         ///
1347         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1348         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
1349         /// and will no longer yield data. If the stream is not ready to read data
1350         /// `Err(io::ErrorKind::WouldBlock)` is returned.
1351         ///
1352         /// # Examples
1353         ///
1354         /// ```no_run
1355         /// use tokio::net::windows::named_pipe;
1356         /// use std::error::Error;
1357         /// use std::io;
1358         ///
1359         /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-readable";
1360         ///
1361         /// #[tokio::main]
1362         /// async fn main() -> Result<(), Box<dyn Error>> {
1363         ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1364         ///
1365         ///     loop {
1366         ///         // Wait for the pipe to be readable
1367         ///         client.readable().await?;
1368         ///
1369         ///         let mut buf = Vec::with_capacity(4096);
1370         ///
1371         ///         // Try to read data, this may still fail with `WouldBlock`
1372         ///         // if the readiness event is a false positive.
1373         ///         match client.try_read_buf(&mut buf) {
1374         ///             Ok(0) => break,
1375         ///             Ok(n) => {
1376         ///                 println!("read {} bytes", n);
1377         ///             }
1378         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1379         ///                 continue;
1380         ///             }
1381         ///             Err(e) => {
1382         ///                 return Err(e.into());
1383         ///             }
1384         ///         }
1385         ///     }
1386         ///
1387         ///     Ok(())
1388         /// }
1389         /// ```
1390         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1391             self.io.registration().try_io(Interest::READABLE, || {
1392                 use std::io::Read;
1393 
1394                 let dst = buf.chunk_mut();
1395                 let dst =
1396                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1397 
1398                 // Safety: We trust `NamedPipeClient::read` to have filled up `n` bytes in the
1399                 // buffer.
1400                 let n = (&*self.io).read(dst)?;
1401 
1402                 unsafe {
1403                     buf.advance_mut(n);
1404                 }
1405 
1406                 Ok(n)
1407             })
1408         }
1409     }
1410 
1411     /// Waits for the pipe to become writable.
1412     ///
1413     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
1414     /// paired with `try_write()`.
1415     ///
1416     /// # Examples
1417     ///
1418     /// ```no_run
1419     /// use tokio::net::windows::named_pipe;
1420     /// use std::error::Error;
1421     /// use std::io;
1422     ///
1423     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-writable";
1424     ///
1425     /// #[tokio::main]
1426     /// async fn main() -> Result<(), Box<dyn Error>> {
1427     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1428     ///
1429     ///     loop {
1430     ///         // Wait for the pipe to be writable
1431     ///         client.writable().await?;
1432     ///
1433     ///         // Try to write data, this may still fail with `WouldBlock`
1434     ///         // if the readiness event is a false positive.
1435     ///         match client.try_write(b"hello world") {
1436     ///             Ok(n) => {
1437     ///                 break;
1438     ///             }
1439     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1440     ///                 continue;
1441     ///             }
1442     ///             Err(e) => {
1443     ///                 return Err(e.into());
1444     ///             }
1445     ///         }
1446     ///     }
1447     ///
1448     ///     Ok(())
1449     /// }
1450     /// ```
writable(&self) -> io::Result<()>1451     pub async fn writable(&self) -> io::Result<()> {
1452         self.ready(Interest::WRITABLE).await?;
1453         Ok(())
1454     }
1455 
1456     /// Polls for write readiness.
1457     ///
1458     /// If the pipe is not currently ready for writing, this method will
1459     /// store a clone of the `Waker` from the provided `Context`. When the pipe
1460     /// becomes ready for writing, `Waker::wake` will be called on the waker.
1461     ///
1462     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
1463     /// the `Waker` from the `Context` passed to the most recent call is
1464     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
1465     /// second, independent waker.)
1466     ///
1467     /// This function is intended for cases where creating and pinning a future
1468     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
1469     /// preferred, as this supports polling from multiple tasks at once.
1470     ///
1471     /// # Return value
1472     ///
1473     /// The function returns:
1474     ///
1475     /// * `Poll::Pending` if the pipe is not ready for writing.
1476     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
1477     /// * `Poll::Ready(Err(e))` if an error is encountered.
1478     ///
1479     /// # Errors
1480     ///
1481     /// This function may encounter any standard I/O error except `WouldBlock`.
1482     ///
1483     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>1484     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1485         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
1486     }
1487 
1488     /// Tries to write a buffer to the pipe, returning how many bytes were
1489     /// written.
1490     ///
1491     /// The function will attempt to write the entire contents of `buf`, but
1492     /// only part of the buffer may be written.
1493     ///
1494     /// This function is usually paired with `writable()`.
1495     ///
1496     /// # Return
1497     ///
1498     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1499     /// number of bytes written. If the pipe is not ready to write data,
1500     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1501     ///
1502     /// # Examples
1503     ///
1504     /// ```no_run
1505     /// use tokio::net::windows::named_pipe;
1506     /// use std::error::Error;
1507     /// use std::io;
1508     ///
1509     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write";
1510     ///
1511     /// #[tokio::main]
1512     /// async fn main() -> Result<(), Box<dyn Error>> {
1513     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1514     ///
1515     ///     loop {
1516     ///         // Wait for the pipe to be writable
1517     ///         client.writable().await?;
1518     ///
1519     ///         // Try to write data, this may still fail with `WouldBlock`
1520     ///         // if the readiness event is a false positive.
1521     ///         match client.try_write(b"hello world") {
1522     ///             Ok(n) => {
1523     ///                 break;
1524     ///             }
1525     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1526     ///                 continue;
1527     ///             }
1528     ///             Err(e) => {
1529     ///                 return Err(e.into());
1530     ///             }
1531     ///         }
1532     ///     }
1533     ///
1534     ///     Ok(())
1535     /// }
1536     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>1537     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
1538         self.io
1539             .registration()
1540             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
1541     }
1542 
1543     /// Tries to write several buffers to the pipe, returning how many bytes
1544     /// were written.
1545     ///
1546     /// Data is written from each buffer in order, with the final buffer read
1547     /// from possible being only partially consumed. This method behaves
1548     /// equivalently to a single call to [`try_write()`] with concatenated
1549     /// buffers.
1550     ///
1551     /// This function is usually paired with `writable()`.
1552     ///
1553     /// [`try_write()`]: NamedPipeClient::try_write()
1554     ///
1555     /// # Return
1556     ///
1557     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
1558     /// number of bytes written. If the pipe is not ready to write data,
1559     /// `Err(io::ErrorKind::WouldBlock)` is returned.
1560     ///
1561     /// # Examples
1562     ///
1563     /// ```no_run
1564     /// use tokio::net::windows::named_pipe;
1565     /// use std::error::Error;
1566     /// use std::io;
1567     ///
1568     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-try-write-vectored";
1569     ///
1570     /// #[tokio::main]
1571     /// async fn main() -> Result<(), Box<dyn Error>> {
1572     ///     let client = named_pipe::ClientOptions::new().open(PIPE_NAME)?;
1573     ///
1574     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
1575     ///
1576     ///     loop {
1577     ///         // Wait for the pipe to be writable
1578     ///         client.writable().await?;
1579     ///
1580     ///         // Try to write data, this may still fail with `WouldBlock`
1581     ///         // if the readiness event is a false positive.
1582     ///         match client.try_write_vectored(&bufs) {
1583     ///             Ok(n) => {
1584     ///                 break;
1585     ///             }
1586     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1587     ///                 continue;
1588     ///             }
1589     ///             Err(e) => {
1590     ///                 return Err(e.into());
1591     ///             }
1592     ///         }
1593     ///     }
1594     ///
1595     ///     Ok(())
1596     /// }
1597     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>1598     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
1599         self.io
1600             .registration()
1601             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
1602     }
1603 
1604     /// Tries to read or write from the pipe using a user-provided IO operation.
1605     ///
1606     /// If the pipe is ready, the provided closure is called. The closure
1607     /// should attempt to perform IO operation from the pipe by manually
1608     /// calling the appropriate syscall. If the operation fails because the
1609     /// pipe is not actually ready, then the closure should return a
1610     /// `WouldBlock` error and the readiness flag is cleared. The return value
1611     /// of the closure is then returned by `try_io`.
1612     ///
1613     /// If the pipe is not ready, then the closure is not called
1614     /// and a `WouldBlock` error is returned.
1615     ///
1616     /// The closure should only return a `WouldBlock` error if it has performed
1617     /// an IO operation on the pipe that failed due to the pipe not being
1618     /// ready. Returning a `WouldBlock` error in any other situation will
1619     /// incorrectly clear the readiness flag, which can cause the pipe to
1620     /// behave incorrectly.
1621     ///
1622     /// The closure should not perform the IO operation using any of the methods
1623     /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1624     /// readiness flag and can cause the pipe to behave incorrectly.
1625     ///
1626     /// This method is not intended to be used with combined interests.
1627     /// The closure should perform only one type of IO operation, so it should not
1628     /// require more than one ready state. This method may panic or sleep forever
1629     /// if it is called with a combined interest.
1630     ///
1631     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1632     ///
1633     /// [`readable()`]: NamedPipeClient::readable()
1634     /// [`writable()`]: NamedPipeClient::writable()
1635     /// [`ready()`]: NamedPipeClient::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1636     pub fn try_io<R>(
1637         &self,
1638         interest: Interest,
1639         f: impl FnOnce() -> io::Result<R>,
1640     ) -> io::Result<R> {
1641         self.io.registration().try_io(interest, f)
1642     }
1643 
1644     /// Reads or writes from the pipe using a user-provided IO operation.
1645     ///
1646     /// The readiness of the pipe is awaited and when the pipe is ready,
1647     /// the provided closure is called. The closure should attempt to perform
1648     /// IO operation on the pipe by manually calling the appropriate syscall.
1649     /// If the operation fails because the pipe is not actually ready,
1650     /// then the closure should return a `WouldBlock` error. In such case the
1651     /// readiness flag is cleared and the pipe readiness is awaited again.
1652     /// This loop is repeated until the closure returns an `Ok` or an error
1653     /// other than `WouldBlock`.
1654     ///
1655     /// The closure should only return a `WouldBlock` error if it has performed
1656     /// an IO operation on the pipe that failed due to the pipe not being
1657     /// ready. Returning a `WouldBlock` error in any other situation will
1658     /// incorrectly clear the readiness flag, which can cause the pipe to
1659     /// behave incorrectly.
1660     ///
1661     /// The closure should not perform the IO operation using any of the methods
1662     /// defined on the Tokio `NamedPipeClient` type, as this will mess with the
1663     /// readiness flag and can cause the pipe to behave incorrectly.
1664     ///
1665     /// This method is not intended to be used with combined interests.
1666     /// The closure should perform only one type of IO operation, so it should not
1667     /// require more than one ready state. This method may panic or sleep forever
1668     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>1669     pub async fn async_io<R>(
1670         &self,
1671         interest: Interest,
1672         f: impl FnMut() -> io::Result<R>,
1673     ) -> io::Result<R> {
1674         self.io.registration().async_io(interest, f).await
1675     }
1676 }
1677 
1678 impl AsyncRead for NamedPipeClient {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1679     fn poll_read(
1680         self: Pin<&mut Self>,
1681         cx: &mut Context<'_>,
1682         buf: &mut ReadBuf<'_>,
1683     ) -> Poll<io::Result<()>> {
1684         unsafe { self.io.poll_read(cx, buf) }
1685     }
1686 }
1687 
1688 impl AsyncWrite for NamedPipeClient {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1689     fn poll_write(
1690         self: Pin<&mut Self>,
1691         cx: &mut Context<'_>,
1692         buf: &[u8],
1693     ) -> Poll<io::Result<usize>> {
1694         self.io.poll_write(cx, buf)
1695     }
1696 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1697     fn poll_write_vectored(
1698         self: Pin<&mut Self>,
1699         cx: &mut Context<'_>,
1700         bufs: &[io::IoSlice<'_>],
1701     ) -> Poll<io::Result<usize>> {
1702         self.io.poll_write_vectored(cx, bufs)
1703     }
1704 
poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>>1705     fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1706         Poll::Ready(Ok(()))
1707     }
1708 
poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>1709     fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
1710         self.poll_flush(cx)
1711     }
1712 }
1713 
1714 impl AsRawHandle for NamedPipeClient {
as_raw_handle(&self) -> RawHandle1715     fn as_raw_handle(&self) -> RawHandle {
1716         self.io.as_raw_handle()
1717     }
1718 }
1719 
1720 impl AsHandle for NamedPipeClient {
as_handle(&self) -> BorrowedHandle<'_>1721     fn as_handle(&self) -> BorrowedHandle<'_> {
1722         unsafe { BorrowedHandle::borrow_raw(self.as_raw_handle()) }
1723     }
1724 }
1725 
1726 /// A builder structure for construct a named pipe with named pipe-specific
1727 /// options. This is required to use for named pipe servers who wants to modify
1728 /// pipe-related options.
1729 ///
1730 /// See [`ServerOptions::create`].
1731 #[derive(Debug, Clone)]
1732 pub struct ServerOptions {
1733     // dwOpenMode
1734     access_inbound: bool,
1735     access_outbound: bool,
1736     first_pipe_instance: bool,
1737     write_dac: bool,
1738     write_owner: bool,
1739     access_system_security: bool,
1740     // dwPipeMode
1741     pipe_mode: PipeMode,
1742     reject_remote_clients: bool,
1743     // other options
1744     max_instances: u32,
1745     out_buffer_size: u32,
1746     in_buffer_size: u32,
1747     default_timeout: u32,
1748 }
1749 
1750 impl ServerOptions {
1751     /// Creates a new named pipe builder with the default settings.
1752     ///
1753     /// ```
1754     /// use tokio::net::windows::named_pipe::ServerOptions;
1755     ///
1756     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-new";
1757     ///
1758     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1759     /// let server = ServerOptions::new().create(PIPE_NAME)?;
1760     /// # Ok(()) }
1761     /// ```
new() -> ServerOptions1762     pub fn new() -> ServerOptions {
1763         ServerOptions {
1764             access_inbound: true,
1765             access_outbound: true,
1766             first_pipe_instance: false,
1767             write_dac: false,
1768             write_owner: false,
1769             access_system_security: false,
1770             pipe_mode: PipeMode::Byte,
1771             reject_remote_clients: true,
1772             max_instances: windows_sys::PIPE_UNLIMITED_INSTANCES,
1773             out_buffer_size: 65536,
1774             in_buffer_size: 65536,
1775             default_timeout: 0,
1776         }
1777     }
1778 
1779     /// The pipe mode.
1780     ///
1781     /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
1782     /// documentation of what each mode means.
1783     ///
1784     /// This corresponds to specifying `PIPE_TYPE_` and `PIPE_READMODE_` in  [`dwPipeMode`].
1785     ///
1786     /// [`dwPipeMode`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self1787     pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
1788         self.pipe_mode = pipe_mode;
1789         self
1790     }
1791 
1792     /// The flow of data in the pipe goes from client to server only.
1793     ///
1794     /// This corresponds to setting [`PIPE_ACCESS_INBOUND`].
1795     ///
1796     /// [`PIPE_ACCESS_INBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_inbound
1797     ///
1798     /// # Errors
1799     ///
1800     /// Server side prevents connecting by denying inbound access, client errors
1801     /// with [`std::io::ErrorKind::PermissionDenied`] when attempting to create
1802     /// the connection.
1803     ///
1804     /// ```
1805     /// use std::io;
1806     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1807     ///
1808     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err1";
1809     ///
1810     /// # #[tokio::main] async fn main() -> io::Result<()> {
1811     /// let _server = ServerOptions::new()
1812     ///     .access_inbound(false)
1813     ///     .create(PIPE_NAME)?;
1814     ///
1815     /// let e = ClientOptions::new()
1816     ///     .open(PIPE_NAME)
1817     ///     .unwrap_err();
1818     ///
1819     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1820     /// # Ok(()) }
1821     /// ```
1822     ///
1823     /// Disabling writing allows a client to connect, but errors with
1824     /// [`std::io::ErrorKind::PermissionDenied`] if a write is attempted.
1825     ///
1826     /// ```
1827     /// use std::io;
1828     /// use tokio::io::AsyncWriteExt;
1829     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1830     ///
1831     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound-err2";
1832     ///
1833     /// # #[tokio::main] async fn main() -> io::Result<()> {
1834     /// let server = ServerOptions::new()
1835     ///     .access_inbound(false)
1836     ///     .create(PIPE_NAME)?;
1837     ///
1838     /// let mut client = ClientOptions::new()
1839     ///     .write(false)
1840     ///     .open(PIPE_NAME)?;
1841     ///
1842     /// server.connect().await?;
1843     ///
1844     /// let e = client.write(b"ping").await.unwrap_err();
1845     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1846     /// # Ok(()) }
1847     /// ```
1848     ///
1849     /// # Examples
1850     ///
1851     /// A unidirectional named pipe that only supports server-to-client
1852     /// communication.
1853     ///
1854     /// ```
1855     /// use std::io;
1856     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1857     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1858     ///
1859     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-inbound";
1860     ///
1861     /// # #[tokio::main] async fn main() -> io::Result<()> {
1862     /// let mut server = ServerOptions::new()
1863     ///     .access_inbound(false)
1864     ///     .create(PIPE_NAME)?;
1865     ///
1866     /// let mut client = ClientOptions::new()
1867     ///     .write(false)
1868     ///     .open(PIPE_NAME)?;
1869     ///
1870     /// server.connect().await?;
1871     ///
1872     /// let write = server.write_all(b"ping");
1873     ///
1874     /// let mut buf = [0u8; 4];
1875     /// let read = client.read_exact(&mut buf);
1876     ///
1877     /// let ((), read) = tokio::try_join!(write, read)?;
1878     ///
1879     /// assert_eq!(read, 4);
1880     /// assert_eq!(&buf[..], b"ping");
1881     /// # Ok(()) }
1882     /// ```
access_inbound(&mut self, allowed: bool) -> &mut Self1883     pub fn access_inbound(&mut self, allowed: bool) -> &mut Self {
1884         self.access_inbound = allowed;
1885         self
1886     }
1887 
1888     /// The flow of data in the pipe goes from server to client only.
1889     ///
1890     /// This corresponds to setting [`PIPE_ACCESS_OUTBOUND`].
1891     ///
1892     /// [`PIPE_ACCESS_OUTBOUND`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_access_outbound
1893     ///
1894     /// # Errors
1895     ///
1896     /// Server side prevents connecting by denying outbound access, client
1897     /// errors with [`std::io::ErrorKind::PermissionDenied`] when attempting to
1898     /// create the connection.
1899     ///
1900     /// ```
1901     /// use std::io;
1902     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1903     ///
1904     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err1";
1905     ///
1906     /// # #[tokio::main] async fn main() -> io::Result<()> {
1907     /// let server = ServerOptions::new()
1908     ///     .access_outbound(false)
1909     ///     .create(PIPE_NAME)?;
1910     ///
1911     /// let e = ClientOptions::new()
1912     ///     .open(PIPE_NAME)
1913     ///     .unwrap_err();
1914     ///
1915     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1916     /// # Ok(()) }
1917     /// ```
1918     ///
1919     /// Disabling reading allows a client to connect, but attempting to read
1920     /// will error with [`std::io::ErrorKind::PermissionDenied`].
1921     ///
1922     /// ```
1923     /// use std::io;
1924     /// use tokio::io::AsyncReadExt;
1925     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1926     ///
1927     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound-err2";
1928     ///
1929     /// # #[tokio::main] async fn main() -> io::Result<()> {
1930     /// let server = ServerOptions::new()
1931     ///     .access_outbound(false)
1932     ///     .create(PIPE_NAME)?;
1933     ///
1934     /// let mut client = ClientOptions::new()
1935     ///     .read(false)
1936     ///     .open(PIPE_NAME)?;
1937     ///
1938     /// server.connect().await?;
1939     ///
1940     /// let mut buf = [0u8; 4];
1941     /// let e = client.read(&mut buf).await.unwrap_err();
1942     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
1943     /// # Ok(()) }
1944     /// ```
1945     ///
1946     /// # Examples
1947     ///
1948     /// A unidirectional named pipe that only supports client-to-server
1949     /// communication.
1950     ///
1951     /// ```
1952     /// use tokio::io::{AsyncReadExt, AsyncWriteExt};
1953     /// use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
1954     ///
1955     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-access-outbound";
1956     ///
1957     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
1958     /// let mut server = ServerOptions::new()
1959     ///     .access_outbound(false)
1960     ///     .create(PIPE_NAME)?;
1961     ///
1962     /// let mut client = ClientOptions::new()
1963     ///     .read(false)
1964     ///     .open(PIPE_NAME)?;
1965     ///
1966     /// server.connect().await?;
1967     ///
1968     /// let write = client.write_all(b"ping");
1969     ///
1970     /// let mut buf = [0u8; 4];
1971     /// let read = server.read_exact(&mut buf);
1972     ///
1973     /// let ((), read) = tokio::try_join!(write, read)?;
1974     ///
1975     /// println!("done reading and writing");
1976     ///
1977     /// assert_eq!(read, 4);
1978     /// assert_eq!(&buf[..], b"ping");
1979     /// # Ok(()) }
1980     /// ```
access_outbound(&mut self, allowed: bool) -> &mut Self1981     pub fn access_outbound(&mut self, allowed: bool) -> &mut Self {
1982         self.access_outbound = allowed;
1983         self
1984     }
1985 
1986     /// If you attempt to create multiple instances of a pipe with this flag
1987     /// set, creation of the first server instance succeeds, but creation of any
1988     /// subsequent instances will fail with
1989     /// [`std::io::ErrorKind::PermissionDenied`].
1990     ///
1991     /// This option is intended to be used with servers that want to ensure that
1992     /// they are the only process listening for clients on a given named pipe.
1993     /// This is accomplished by enabling it for the first server instance
1994     /// created in a process.
1995     ///
1996     /// This corresponds to setting [`FILE_FLAG_FIRST_PIPE_INSTANCE`].
1997     ///
1998     /// # Errors
1999     ///
2000     /// If this option is set and more than one instance of the server for a
2001     /// given named pipe exists, calling [`create`] will fail with
2002     /// [`std::io::ErrorKind::PermissionDenied`].
2003     ///
2004     /// ```
2005     /// use std::io;
2006     /// use tokio::net::windows::named_pipe::ServerOptions;
2007     ///
2008     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance-error";
2009     ///
2010     /// # #[tokio::main] async fn main() -> io::Result<()> {
2011     /// let server1 = ServerOptions::new()
2012     ///     .first_pipe_instance(true)
2013     ///     .create(PIPE_NAME)?;
2014     ///
2015     /// // Second server errs, since it's not the first instance.
2016     /// let e = ServerOptions::new()
2017     ///     .first_pipe_instance(true)
2018     ///     .create(PIPE_NAME)
2019     ///     .unwrap_err();
2020     ///
2021     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2022     /// # Ok(()) }
2023     /// ```
2024     ///
2025     /// # Examples
2026     ///
2027     /// ```
2028     /// use std::io;
2029     /// use tokio::net::windows::named_pipe::ServerOptions;
2030     ///
2031     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-first-instance";
2032     ///
2033     /// # #[tokio::main] async fn main() -> io::Result<()> {
2034     /// let mut builder = ServerOptions::new();
2035     /// builder.first_pipe_instance(true);
2036     ///
2037     /// let server = builder.create(PIPE_NAME)?;
2038     /// let e = builder.create(PIPE_NAME).unwrap_err();
2039     /// assert_eq!(e.kind(), io::ErrorKind::PermissionDenied);
2040     /// drop(server);
2041     ///
2042     /// // OK: since, we've closed the other instance.
2043     /// let _server2 = builder.create(PIPE_NAME)?;
2044     /// # Ok(()) }
2045     /// ```
2046     ///
2047     /// [`create`]: ServerOptions::create
2048     /// [`FILE_FLAG_FIRST_PIPE_INSTANCE`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_first_pipe_instance
first_pipe_instance(&mut self, first: bool) -> &mut Self2049     pub fn first_pipe_instance(&mut self, first: bool) -> &mut Self {
2050         self.first_pipe_instance = first;
2051         self
2052     }
2053 
2054     /// Requests permission to modify the pipe's discretionary access control list.
2055     ///
2056     /// This corresponds to setting [`WRITE_DAC`] in dwOpenMode.
2057     ///
2058     /// # Examples
2059     ///
2060     /// ```
2061     /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2062     //
2063     /// use tokio::net::windows::named_pipe::ServerOptions;
2064     /// use windows_sys::{
2065     ///     Win32::Foundation::ERROR_SUCCESS,
2066     ///     Win32::Security::DACL_SECURITY_INFORMATION,
2067     ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2068     /// };
2069     ///
2070     /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe";
2071     ///
2072     /// # #[tokio::main] async fn main() -> io::Result<()> {
2073     /// let mut pipe_template = ServerOptions::new();
2074     /// pipe_template.write_dac(true);
2075     /// let pipe = pipe_template.create(PIPE_NAME)?;
2076     ///
2077     /// unsafe {
2078     ///     assert_eq!(
2079     ///         ERROR_SUCCESS,
2080     ///         SetSecurityInfo(
2081     ///             pipe.as_raw_handle() as _,
2082     ///             SE_KERNEL_OBJECT,
2083     ///             DACL_SECURITY_INFORMATION,
2084     ///             ptr::null_mut(),
2085     ///             ptr::null_mut(),
2086     ///             ptr::null_mut(),
2087     ///             ptr::null_mut(),
2088     ///         )
2089     ///     );
2090     /// }
2091     ///
2092     /// # Ok(()) }
2093     /// ```
2094     ///
2095     /// ```
2096     /// use std::{io, os::windows::prelude::AsRawHandle, ptr};
2097     //
2098     /// use tokio::net::windows::named_pipe::ServerOptions;
2099     /// use windows_sys::{
2100     ///     Win32::Foundation::ERROR_ACCESS_DENIED,
2101     ///     Win32::Security::DACL_SECURITY_INFORMATION,
2102     ///     Win32::Security::Authorization::{SetSecurityInfo, SE_KERNEL_OBJECT},
2103     /// };
2104     ///
2105     /// const PIPE_NAME: &str = r"\\.\pipe\write_dac_pipe_fail";
2106     ///
2107     /// # #[tokio::main] async fn main() -> io::Result<()> {
2108     /// let mut pipe_template = ServerOptions::new();
2109     /// pipe_template.write_dac(false);
2110     /// let pipe = pipe_template.create(PIPE_NAME)?;
2111     ///
2112     /// unsafe {
2113     ///     assert_eq!(
2114     ///         ERROR_ACCESS_DENIED,
2115     ///         SetSecurityInfo(
2116     ///             pipe.as_raw_handle() as _,
2117     ///             SE_KERNEL_OBJECT,
2118     ///             DACL_SECURITY_INFORMATION,
2119     ///             ptr::null_mut(),
2120     ///             ptr::null_mut(),
2121     ///             ptr::null_mut(),
2122     ///             ptr::null_mut(),
2123     ///         )
2124     ///     );
2125     /// }
2126     ///
2127     /// # Ok(()) }
2128     /// ```
2129     ///
2130     /// [`WRITE_DAC`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_dac(&mut self, requested: bool) -> &mut Self2131     pub fn write_dac(&mut self, requested: bool) -> &mut Self {
2132         self.write_dac = requested;
2133         self
2134     }
2135 
2136     /// Requests permission to modify the pipe's owner.
2137     ///
2138     /// This corresponds to setting [`WRITE_OWNER`] in dwOpenMode.
2139     ///
2140     /// [`WRITE_OWNER`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
write_owner(&mut self, requested: bool) -> &mut Self2141     pub fn write_owner(&mut self, requested: bool) -> &mut Self {
2142         self.write_owner = requested;
2143         self
2144     }
2145 
2146     /// Requests permission to modify the pipe's system access control list.
2147     ///
2148     /// This corresponds to setting [`ACCESS_SYSTEM_SECURITY`] in dwOpenMode.
2149     ///
2150     /// [`ACCESS_SYSTEM_SECURITY`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
access_system_security(&mut self, requested: bool) -> &mut Self2151     pub fn access_system_security(&mut self, requested: bool) -> &mut Self {
2152         self.access_system_security = requested;
2153         self
2154     }
2155 
2156     /// Indicates whether this server can accept remote clients or not. Remote
2157     /// clients are disabled by default.
2158     ///
2159     /// This corresponds to setting [`PIPE_REJECT_REMOTE_CLIENTS`].
2160     ///
2161     /// [`PIPE_REJECT_REMOTE_CLIENTS`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea#pipe_reject_remote_clients
reject_remote_clients(&mut self, reject: bool) -> &mut Self2162     pub fn reject_remote_clients(&mut self, reject: bool) -> &mut Self {
2163         self.reject_remote_clients = reject;
2164         self
2165     }
2166 
2167     /// The maximum number of instances that can be created for this pipe. The
2168     /// first instance of the pipe can specify this value; the same number must
2169     /// be specified for other instances of the pipe. Acceptable values are in
2170     /// the range 1 through 254. The default value is unlimited.
2171     ///
2172     /// This corresponds to specifying [`nMaxInstances`].
2173     ///
2174     /// [`nMaxInstances`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2175     ///
2176     /// # Errors
2177     ///
2178     /// The same numbers of `max_instances` have to be used by all servers. Any
2179     /// additional servers trying to be built which uses a mismatching value
2180     /// might error.
2181     ///
2182     /// ```
2183     /// use std::io;
2184     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2185     /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2186     ///
2187     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-max-instances";
2188     ///
2189     /// # #[tokio::main] async fn main() -> io::Result<()> {
2190     /// let mut server = ServerOptions::new();
2191     /// server.max_instances(2);
2192     ///
2193     /// let s1 = server.create(PIPE_NAME)?;
2194     /// let c1 = ClientOptions::new().open(PIPE_NAME);
2195     ///
2196     /// let s2 = server.create(PIPE_NAME)?;
2197     /// let c2 = ClientOptions::new().open(PIPE_NAME);
2198     ///
2199     /// // Too many servers!
2200     /// let e = server.create(PIPE_NAME).unwrap_err();
2201     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2202     ///
2203     /// // Still too many servers even if we specify a higher value!
2204     /// let e = server.max_instances(100).create(PIPE_NAME).unwrap_err();
2205     /// assert_eq!(e.raw_os_error(), Some(ERROR_PIPE_BUSY as i32));
2206     /// # Ok(()) }
2207     /// ```
2208     ///
2209     /// # Panics
2210     ///
2211     /// This function will panic if more than 254 instances are specified. If
2212     /// you do not wish to set an instance limit, leave it unspecified.
2213     ///
2214     /// ```should_panic
2215     /// use tokio::net::windows::named_pipe::ServerOptions;
2216     ///
2217     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2218     /// let builder = ServerOptions::new().max_instances(255);
2219     /// # Ok(()) }
2220     /// ```
2221     #[track_caller]
max_instances(&mut self, instances: usize) -> &mut Self2222     pub fn max_instances(&mut self, instances: usize) -> &mut Self {
2223         assert!(instances < 255, "cannot specify more than 254 instances");
2224         self.max_instances = instances as u32;
2225         self
2226     }
2227 
2228     /// The number of bytes to reserve for the output buffer.
2229     ///
2230     /// This corresponds to specifying [`nOutBufferSize`].
2231     ///
2232     /// [`nOutBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
out_buffer_size(&mut self, buffer: u32) -> &mut Self2233     pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
2234         self.out_buffer_size = buffer;
2235         self
2236     }
2237 
2238     /// The number of bytes to reserve for the input buffer.
2239     ///
2240     /// This corresponds to specifying [`nInBufferSize`].
2241     ///
2242     /// [`nInBufferSize`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
in_buffer_size(&mut self, buffer: u32) -> &mut Self2243     pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
2244         self.in_buffer_size = buffer;
2245         self
2246     }
2247 
2248     /// Creates the named pipe identified by `addr` for use as a server.
2249     ///
2250     /// This uses the [`CreateNamedPipe`] function.
2251     ///
2252     /// [`CreateNamedPipe`]: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-createnamedpipea
2253     ///
2254     /// # Errors
2255     ///
2256     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2257     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2258     ///
2259     /// [Tokio Runtime]: crate::runtime::Runtime
2260     /// [enabled I/O]: crate::runtime::Builder::enable_io
2261     ///
2262     /// # Examples
2263     ///
2264     /// ```
2265     /// use tokio::net::windows::named_pipe::ServerOptions;
2266     ///
2267     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-create";
2268     ///
2269     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2270     /// let server = ServerOptions::new().create(PIPE_NAME)?;
2271     /// # Ok(()) }
2272     /// ```
create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer>2273     pub fn create(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeServer> {
2274         // Safety: We're calling create_with_security_attributes_raw w/ a null
2275         // pointer which disables it.
2276         unsafe { self.create_with_security_attributes_raw(addr, ptr::null_mut()) }
2277     }
2278 
2279     /// Creates the named pipe identified by `addr` for use as a server.
2280     ///
2281     /// This is the same as [`create`] except that it supports providing the raw
2282     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2283     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2284     ///
2285     /// # Errors
2286     ///
2287     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2288     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2289     ///
2290     /// [Tokio Runtime]: crate::runtime::Runtime
2291     /// [enabled I/O]: crate::runtime::Builder::enable_io
2292     ///
2293     /// # Safety
2294     ///
2295     /// The `attrs` argument must either be null or point at a valid instance of
2296     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2297     /// behavior is identical to calling the [`create`] method.
2298     ///
2299     /// [`create`]: ServerOptions::create
2300     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2301     /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
create_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeServer>2302     pub unsafe fn create_with_security_attributes_raw(
2303         &self,
2304         addr: impl AsRef<OsStr>,
2305         attrs: *mut c_void,
2306     ) -> io::Result<NamedPipeServer> {
2307         let addr = encode_addr(addr);
2308 
2309         let pipe_mode = {
2310             let mut mode = if matches!(self.pipe_mode, PipeMode::Message) {
2311                 windows_sys::PIPE_TYPE_MESSAGE | windows_sys::PIPE_READMODE_MESSAGE
2312             } else {
2313                 windows_sys::PIPE_TYPE_BYTE | windows_sys::PIPE_READMODE_BYTE
2314             };
2315             if self.reject_remote_clients {
2316                 mode |= windows_sys::PIPE_REJECT_REMOTE_CLIENTS;
2317             } else {
2318                 mode |= windows_sys::PIPE_ACCEPT_REMOTE_CLIENTS;
2319             }
2320             mode
2321         };
2322         let open_mode = {
2323             let mut mode = windows_sys::FILE_FLAG_OVERLAPPED;
2324             if self.access_inbound {
2325                 mode |= windows_sys::PIPE_ACCESS_INBOUND;
2326             }
2327             if self.access_outbound {
2328                 mode |= windows_sys::PIPE_ACCESS_OUTBOUND;
2329             }
2330             if self.first_pipe_instance {
2331                 mode |= windows_sys::FILE_FLAG_FIRST_PIPE_INSTANCE;
2332             }
2333             if self.write_dac {
2334                 mode |= windows_sys::WRITE_DAC;
2335             }
2336             if self.write_owner {
2337                 mode |= windows_sys::WRITE_OWNER;
2338             }
2339             if self.access_system_security {
2340                 mode |= windows_sys::ACCESS_SYSTEM_SECURITY;
2341             }
2342             mode
2343         };
2344 
2345         let h = windows_sys::CreateNamedPipeW(
2346             addr.as_ptr(),
2347             open_mode,
2348             pipe_mode,
2349             self.max_instances,
2350             self.out_buffer_size,
2351             self.in_buffer_size,
2352             self.default_timeout,
2353             attrs as *mut _,
2354         );
2355 
2356         if h == windows_sys::INVALID_HANDLE_VALUE {
2357             return Err(io::Error::last_os_error());
2358         }
2359 
2360         NamedPipeServer::from_raw_handle(h as _)
2361     }
2362 }
2363 
2364 /// A builder suitable for building and interacting with named pipes from the
2365 /// client side.
2366 ///
2367 /// See [`ClientOptions::open`].
2368 #[derive(Debug, Clone)]
2369 pub struct ClientOptions {
2370     generic_read: bool,
2371     generic_write: bool,
2372     security_qos_flags: u32,
2373     pipe_mode: PipeMode,
2374 }
2375 
2376 impl ClientOptions {
2377     /// Creates a new named pipe builder with the default settings.
2378     ///
2379     /// ```
2380     /// use tokio::net::windows::named_pipe::{ServerOptions, ClientOptions};
2381     ///
2382     /// const PIPE_NAME: &str = r"\\.\pipe\tokio-named-pipe-client-new";
2383     ///
2384     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2385     /// // Server must be created in order for the client creation to succeed.
2386     /// let server = ServerOptions::new().create(PIPE_NAME)?;
2387     /// let client = ClientOptions::new().open(PIPE_NAME)?;
2388     /// # Ok(()) }
2389     /// ```
new() -> Self2390     pub fn new() -> Self {
2391         Self {
2392             generic_read: true,
2393             generic_write: true,
2394             security_qos_flags: windows_sys::SECURITY_IDENTIFICATION
2395                 | windows_sys::SECURITY_SQOS_PRESENT,
2396             pipe_mode: PipeMode::Byte,
2397         }
2398     }
2399 
2400     /// If the client supports reading data. This is enabled by default.
2401     ///
2402     /// This corresponds to setting [`GENERIC_READ`] in the call to [`CreateFile`].
2403     ///
2404     /// [`GENERIC_READ`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2405     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
read(&mut self, allowed: bool) -> &mut Self2406     pub fn read(&mut self, allowed: bool) -> &mut Self {
2407         self.generic_read = allowed;
2408         self
2409     }
2410 
2411     /// If the created pipe supports writing data. This is enabled by default.
2412     ///
2413     /// This corresponds to setting [`GENERIC_WRITE`] in the call to [`CreateFile`].
2414     ///
2415     /// [`GENERIC_WRITE`]: https://docs.microsoft.com/en-us/windows/win32/secauthz/generic-access-rights
2416     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
write(&mut self, allowed: bool) -> &mut Self2417     pub fn write(&mut self, allowed: bool) -> &mut Self {
2418         self.generic_write = allowed;
2419         self
2420     }
2421 
2422     /// Sets qos flags which are combined with other flags and attributes in the
2423     /// call to [`CreateFile`].
2424     ///
2425     /// By default `security_qos_flags` is set to [`SECURITY_IDENTIFICATION`],
2426     /// calling this function would override that value completely with the
2427     /// argument specified.
2428     ///
2429     /// When `security_qos_flags` is not set, a malicious program can gain the
2430     /// elevated privileges of a privileged Rust process when it allows opening
2431     /// user-specified paths, by tricking it into opening a named pipe. So
2432     /// arguably `security_qos_flags` should also be set when opening arbitrary
2433     /// paths. However the bits can then conflict with other flags, specifically
2434     /// `FILE_FLAG_OPEN_NO_RECALL`.
2435     ///
2436     /// For information about possible values, see [Impersonation Levels] on the
2437     /// Windows Dev Center site. The `SECURITY_SQOS_PRESENT` flag is set
2438     /// automatically when using this method.
2439     ///
2440     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2441     /// [`SECURITY_IDENTIFICATION`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Storage/FileSystem/constant.SECURITY_IDENTIFICATION.html
2442     /// [Impersonation Levels]: https://docs.microsoft.com/en-us/windows/win32/api/winnt/ne-winnt-security_impersonation_level
security_qos_flags(&mut self, flags: u32) -> &mut Self2443     pub fn security_qos_flags(&mut self, flags: u32) -> &mut Self {
2444         // See: https://github.com/rust-lang/rust/pull/58216
2445         self.security_qos_flags = flags | windows_sys::SECURITY_SQOS_PRESENT;
2446         self
2447     }
2448 
2449     /// The pipe mode.
2450     ///
2451     /// The default pipe mode is [`PipeMode::Byte`]. See [`PipeMode`] for
2452     /// documentation of what each mode means.
pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self2453     pub fn pipe_mode(&mut self, pipe_mode: PipeMode) -> &mut Self {
2454         self.pipe_mode = pipe_mode;
2455         self
2456     }
2457 
2458     /// Opens the named pipe identified by `addr`.
2459     ///
2460     /// This opens the client using [`CreateFile`] with the
2461     /// `dwCreationDisposition` option set to `OPEN_EXISTING`.
2462     ///
2463     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilea
2464     ///
2465     /// # Errors
2466     ///
2467     /// This errors if called outside of a [Tokio Runtime], or in a runtime that
2468     /// has not [enabled I/O], or if any OS-specific I/O errors occur.
2469     ///
2470     /// There are a few errors you need to take into account when creating a
2471     /// named pipe on the client side:
2472     ///
2473     /// * [`std::io::ErrorKind::NotFound`] - This indicates that the named pipe
2474     ///   does not exist. Presumably the server is not up.
2475     /// * [`ERROR_PIPE_BUSY`] - This error is raised when the named pipe exists,
2476     ///   but the server is not currently waiting for a connection. Please see the
2477     ///   examples for how to check for this error.
2478     ///
2479     /// [`ERROR_PIPE_BUSY`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_PIPE_BUSY.html
2480     /// [enabled I/O]: crate::runtime::Builder::enable_io
2481     /// [Tokio Runtime]: crate::runtime::Runtime
2482     ///
2483     /// A connect loop that waits until a pipe becomes available looks like
2484     /// this:
2485     ///
2486     /// ```no_run
2487     /// use std::time::Duration;
2488     /// use tokio::net::windows::named_pipe::ClientOptions;
2489     /// use tokio::time;
2490     /// use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
2491     ///
2492     /// const PIPE_NAME: &str = r"\\.\pipe\mynamedpipe";
2493     ///
2494     /// # #[tokio::main] async fn main() -> std::io::Result<()> {
2495     /// let client = loop {
2496     ///     match ClientOptions::new().open(PIPE_NAME) {
2497     ///         Ok(client) => break client,
2498     ///         Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
2499     ///         Err(e) => return Err(e),
2500     ///     }
2501     ///
2502     ///     time::sleep(Duration::from_millis(50)).await;
2503     /// };
2504     ///
2505     /// // use the connected client.
2506     /// # Ok(()) }
2507     /// ```
open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient>2508     pub fn open(&self, addr: impl AsRef<OsStr>) -> io::Result<NamedPipeClient> {
2509         // Safety: We're calling open_with_security_attributes_raw w/ a null
2510         // pointer which disables it.
2511         unsafe { self.open_with_security_attributes_raw(addr, ptr::null_mut()) }
2512     }
2513 
2514     /// Opens the named pipe identified by `addr`.
2515     ///
2516     /// This is the same as [`open`] except that it supports providing the raw
2517     /// pointer to a structure of [`SECURITY_ATTRIBUTES`] which will be passed
2518     /// as the `lpSecurityAttributes` argument to [`CreateFile`].
2519     ///
2520     /// # Safety
2521     ///
2522     /// The `attrs` argument must either be null or point at a valid instance of
2523     /// the [`SECURITY_ATTRIBUTES`] structure. If the argument is null, the
2524     /// behavior is identical to calling the [`open`] method.
2525     ///
2526     /// [`open`]: ClientOptions::open
2527     /// [`CreateFile`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-createfilew
2528     /// [`SECURITY_ATTRIBUTES`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Security/struct.SECURITY_ATTRIBUTES.html
open_with_security_attributes_raw( &self, addr: impl AsRef<OsStr>, attrs: *mut c_void, ) -> io::Result<NamedPipeClient>2529     pub unsafe fn open_with_security_attributes_raw(
2530         &self,
2531         addr: impl AsRef<OsStr>,
2532         attrs: *mut c_void,
2533     ) -> io::Result<NamedPipeClient> {
2534         let addr = encode_addr(addr);
2535 
2536         let desired_access = {
2537             let mut access = 0;
2538             if self.generic_read {
2539                 access |= windows_sys::GENERIC_READ;
2540             }
2541             if self.generic_write {
2542                 access |= windows_sys::GENERIC_WRITE;
2543             }
2544             access
2545         };
2546 
2547         // NB: We could use a platform specialized `OpenOptions` here, but since
2548         // we have access to windows_sys it ultimately doesn't hurt to use
2549         // `CreateFile` explicitly since it allows the use of our already
2550         // well-structured wide `addr` to pass into CreateFileW.
2551         let h = windows_sys::CreateFileW(
2552             addr.as_ptr(),
2553             desired_access,
2554             0,
2555             attrs as *mut _,
2556             windows_sys::OPEN_EXISTING,
2557             self.get_flags(),
2558             0,
2559         );
2560 
2561         if h == windows_sys::INVALID_HANDLE_VALUE {
2562             return Err(io::Error::last_os_error());
2563         }
2564 
2565         if matches!(self.pipe_mode, PipeMode::Message) {
2566             let mode = windows_sys::PIPE_READMODE_MESSAGE;
2567             let result =
2568                 windows_sys::SetNamedPipeHandleState(h, &mode, ptr::null_mut(), ptr::null_mut());
2569 
2570             if result == 0 {
2571                 return Err(io::Error::last_os_error());
2572             }
2573         }
2574 
2575         NamedPipeClient::from_raw_handle(h as _)
2576     }
2577 
get_flags(&self) -> u322578     fn get_flags(&self) -> u32 {
2579         self.security_qos_flags | windows_sys::FILE_FLAG_OVERLAPPED
2580     }
2581 }
2582 
2583 /// The pipe mode of a named pipe.
2584 ///
2585 /// Set through [`ServerOptions::pipe_mode`].
2586 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2587 #[non_exhaustive]
2588 pub enum PipeMode {
2589     /// Data is written to the pipe as a stream of bytes. The pipe does not
2590     /// distinguish bytes written during different write operations.
2591     ///
2592     /// Corresponds to [`PIPE_TYPE_BYTE`].
2593     ///
2594     /// [`PIPE_TYPE_BYTE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_BYTE.html
2595     Byte,
2596     /// Data is written to the pipe as a stream of messages. The pipe treats the
2597     /// bytes written during each write operation as a message unit. Any reading
2598     /// on a named pipe returns [`ERROR_MORE_DATA`] when a message is not read
2599     /// completely.
2600     ///
2601     /// Corresponds to [`PIPE_TYPE_MESSAGE`].
2602     ///
2603     /// [`ERROR_MORE_DATA`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/Foundation/constant.ERROR_MORE_DATA.html
2604     /// [`PIPE_TYPE_MESSAGE`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_TYPE_MESSAGE.html
2605     Message,
2606 }
2607 
2608 /// Indicates the end of a named pipe.
2609 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
2610 #[non_exhaustive]
2611 pub enum PipeEnd {
2612     /// The named pipe refers to the client end of a named pipe instance.
2613     ///
2614     /// Corresponds to [`PIPE_CLIENT_END`].
2615     ///
2616     /// [`PIPE_CLIENT_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_CLIENT_END.html
2617     Client,
2618     /// The named pipe refers to the server end of a named pipe instance.
2619     ///
2620     /// Corresponds to [`PIPE_SERVER_END`].
2621     ///
2622     /// [`PIPE_SERVER_END`]: https://docs.rs/windows-sys/latest/windows_sys/Win32/System/Pipes/constant.PIPE_SERVER_END.html
2623     Server,
2624 }
2625 
2626 /// Information about a named pipe.
2627 ///
2628 /// Constructed through [`NamedPipeServer::info`] or [`NamedPipeClient::info`].
2629 #[derive(Debug)]
2630 #[non_exhaustive]
2631 pub struct PipeInfo {
2632     /// Indicates the mode of a named pipe.
2633     pub mode: PipeMode,
2634     /// Indicates the end of a named pipe.
2635     pub end: PipeEnd,
2636     /// The maximum number of instances that can be created for this pipe.
2637     pub max_instances: u32,
2638     /// The number of bytes to reserve for the output buffer.
2639     pub out_buffer_size: u32,
2640     /// The number of bytes to reserve for the input buffer.
2641     pub in_buffer_size: u32,
2642 }
2643 
2644 /// Encodes an address so that it is a null-terminated wide string.
encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]>2645 fn encode_addr(addr: impl AsRef<OsStr>) -> Box<[u16]> {
2646     let len = addr.as_ref().encode_wide().count();
2647     let mut vec = Vec::with_capacity(len + 1);
2648     vec.extend(addr.as_ref().encode_wide());
2649     vec.push(0);
2650     vec.into_boxed_slice()
2651 }
2652 
2653 /// Internal function to get the info out of a raw named pipe.
named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo>2654 unsafe fn named_pipe_info(handle: RawHandle) -> io::Result<PipeInfo> {
2655     let mut flags = 0;
2656     let mut out_buffer_size = 0;
2657     let mut in_buffer_size = 0;
2658     let mut max_instances = 0;
2659 
2660     let result = windows_sys::GetNamedPipeInfo(
2661         handle as _,
2662         &mut flags,
2663         &mut out_buffer_size,
2664         &mut in_buffer_size,
2665         &mut max_instances,
2666     );
2667 
2668     if result == 0 {
2669         return Err(io::Error::last_os_error());
2670     }
2671 
2672     let mut end = PipeEnd::Client;
2673     let mut mode = PipeMode::Byte;
2674 
2675     if flags & windows_sys::PIPE_SERVER_END != 0 {
2676         end = PipeEnd::Server;
2677     }
2678 
2679     if flags & windows_sys::PIPE_TYPE_MESSAGE != 0 {
2680         mode = PipeMode::Message;
2681     }
2682 
2683     Ok(PipeInfo {
2684         end,
2685         mode,
2686         out_buffer_size,
2687         in_buffer_size,
2688         max_instances,
2689     })
2690 }
2691