• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::ffi::OsStr;
2 use std::io::{self, Read, Write};
3 use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
4 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
5 use std::sync::atomic::{AtomicBool, AtomicUsize};
6 use std::sync::{Arc, Mutex};
7 use std::{fmt, mem, slice};
8 
9 use windows_sys::Win32::Foundation::{
10     ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
11     ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
12 };
13 use windows_sys::Win32::Storage::FileSystem::{
14     ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
15 };
16 use windows_sys::Win32::System::Pipes::{
17     ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
18     PIPE_UNLIMITED_INSTANCES,
19 };
20 use windows_sys::Win32::System::IO::{
21     CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
22 };
23 
24 use crate::event::Source;
25 use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
26 use crate::sys::windows::{Event, Handle, Overlapped};
27 use crate::Registry;
28 use crate::{Interest, Token};
29 
30 /// Non-blocking windows named pipe.
31 ///
32 /// This structure internally contains a `HANDLE` which represents the named
33 /// pipe, and also maintains state associated with the mio event loop and active
34 /// I/O operations that have been scheduled to translate IOCP to a readiness
35 /// model.
36 ///
37 /// Note, IOCP is a *completion* based model whereas mio is a *readiness* based
38 /// model. To bridge this, `NamedPipe` performs internal buffering. Writes are
39 /// written to an internal buffer and the buffer is submitted to IOCP. IOCP
40 /// reads are submitted using internal buffers and `NamedPipe::read` reads from
41 /// this internal buffer.
42 ///
43 /// # Trait implementations
44 ///
45 /// The `Read` and `Write` traits are implemented for `NamedPipe` and for
46 /// `&NamedPipe`. This represents that a named pipe can be concurrently read and
47 /// written to and also can be read and written to at all. Typically a named
48 /// pipe needs to be connected to a client before it can be read or written,
49 /// however.
50 ///
51 /// Note that for I/O operations on a named pipe to succeed then the named pipe
52 /// needs to be associated with an event loop. Until this happens all I/O
53 /// operations will return a "would block" error.
54 ///
55 /// # Managing connections
56 ///
57 /// The `NamedPipe` type supports a `connect` method to connect to a client and
58 /// a `disconnect` method to disconnect from that client. These two methods only
59 /// work once a named pipe is associated with an event loop.
60 ///
61 /// The `connect` method will succeed asynchronously and a completion can be
62 /// detected once the object receives a writable notification.
63 ///
64 /// # Named pipe clients
65 ///
66 /// Currently to create a client of a named pipe server then you can use the
67 /// `OpenOptions` type in the standard library to create a `File` that connects
68 /// to a named pipe. Afterwards you can use the `into_raw_handle` method coupled
69 /// with the `NamedPipe::from_raw_handle` method to convert that to a named pipe
70 /// that can operate asynchronously. Don't forget to pass the
71 /// `FILE_FLAG_OVERLAPPED` flag when opening the `File`.
72 pub struct NamedPipe {
73     inner: Arc<Inner>,
74 }
75 
76 /// # Notes
77 ///
78 /// The memory layout of this structure must be fixed as the
79 /// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
80 #[repr(C)]
81 struct Inner {
82     // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
83     // methods depend on the layout!
84     connect: Overlapped,
85     read: Overlapped,
86     write: Overlapped,
87     // END NOTE.
88     handle: Handle,
89     connecting: AtomicBool,
90     io: Mutex<Io>,
91     pool: Mutex<BufferPool>,
92 }
93 
94 impl Inner {
95     /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
96     ///
97     /// # Unsafety
98     ///
99     /// Caller must ensure `ptr` is pointing to `Inner.connect`.
ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner100     unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
101         // `connect` is the first field, so the pointer are the same.
102         ptr.cast()
103     }
104 
105     /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner106     unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
107         // `read` is after `connect: Overlapped`.
108         (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
109     }
110 
111     /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner112     unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
113         // `read` is after `connect: Overlapped` and `read: Overlapped`.
114         (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
115     }
116 
117     /// Issue a connection request with the specified overlapped operation.
118     ///
119     /// This function will issue a request to connect a client to this server,
120     /// returning immediately after starting the overlapped operation.
121     ///
122     /// If this function immediately succeeds then `Ok(true)` is returned. If
123     /// the overlapped operation is enqueued and pending, then `Ok(false)` is
124     /// returned. Otherwise an error is returned indicating what went wrong.
125     ///
126     /// # Unsafety
127     ///
128     /// This function is unsafe because the kernel requires that the
129     /// `overlapped` pointer is valid until the end of the I/O operation. The
130     /// kernel also requires that `overlapped` is unique for this I/O operation
131     /// and is not in use for any other I/O.
132     ///
133     /// To safely use this function callers must ensure that this pointer is
134     /// valid until the I/O operation is completed, typically via completion
135     /// ports and waiting to receive the completion notification on the port.
connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool>136     pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
137         if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
138             return Ok(true);
139         }
140 
141         let err = io::Error::last_os_error();
142 
143         match err.raw_os_error().map(|e| e as u32) {
144             Some(ERROR_PIPE_CONNECTED) => Ok(true),
145             Some(ERROR_NO_DATA) => Ok(true),
146             Some(ERROR_IO_PENDING) => Ok(false),
147             _ => Err(err),
148         }
149     }
150 
151     /// Disconnects this named pipe from any connected client.
disconnect(&self) -> io::Result<()>152     pub fn disconnect(&self) -> io::Result<()> {
153         if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
154             Err(io::Error::last_os_error())
155         } else {
156             Ok(())
157         }
158     }
159 
160     /// Issues an overlapped read operation to occur on this pipe.
161     ///
162     /// This function will issue an asynchronous read to occur in an overlapped
163     /// fashion, returning immediately. The `buf` provided will be filled in
164     /// with data and the request is tracked by the `overlapped` function
165     /// provided.
166     ///
167     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
168     /// `n` is the number of bytes read. If an asynchronous operation is
169     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
170     /// it is returned.
171     ///
172     /// When this operation completes (or if it completes immediately), another
173     /// mechanism must be used to learn how many bytes were transferred (such as
174     /// looking at the filed in the IOCP status message).
175     ///
176     /// # Unsafety
177     ///
178     /// This function is unsafe because the kernel requires that the `buf` and
179     /// `overlapped` pointers to be valid until the end of the I/O operation.
180     /// The kernel also requires that `overlapped` is unique for this I/O
181     /// operation and is not in use for any other I/O.
182     ///
183     /// To safely use this function callers must ensure that the pointers are
184     /// valid until the I/O operation is completed, typically via completion
185     /// ports and waiting to receive the completion notification on the port.
read_overlapped( &self, buf: &mut [u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>186     pub unsafe fn read_overlapped(
187         &self,
188         buf: &mut [u8],
189         overlapped: *mut OVERLAPPED,
190     ) -> io::Result<Option<usize>> {
191         let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
192         let res = ReadFile(
193             self.handle.raw(),
194             buf.as_mut_ptr() as *mut _,
195             len,
196             std::ptr::null_mut(),
197             overlapped,
198         );
199         if res == 0 {
200             let err = io::Error::last_os_error();
201             if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
202                 return Err(err);
203             }
204         }
205 
206         let mut bytes = 0;
207         let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
208         if res == 0 {
209             let err = io::Error::last_os_error();
210             if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
211                 Ok(None)
212             } else {
213                 Err(err)
214             }
215         } else {
216             Ok(Some(bytes as usize))
217         }
218     }
219 
220     /// Issues an overlapped write operation to occur on this pipe.
221     ///
222     /// This function will issue an asynchronous write to occur in an overlapped
223     /// fashion, returning immediately. The `buf` provided will be filled in
224     /// with data and the request is tracked by the `overlapped` function
225     /// provided.
226     ///
227     /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
228     /// `n` is the number of bytes written. If an asynchronous operation is
229     /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
230     /// it is returned.
231     ///
232     /// When this operation completes (or if it completes immediately), another
233     /// mechanism must be used to learn how many bytes were transferred (such as
234     /// looking at the filed in the IOCP status message).
235     ///
236     /// # Unsafety
237     ///
238     /// This function is unsafe because the kernel requires that the `buf` and
239     /// `overlapped` pointers to be valid until the end of the I/O operation.
240     /// The kernel also requires that `overlapped` is unique for this I/O
241     /// operation and is not in use for any other I/O.
242     ///
243     /// To safely use this function callers must ensure that the pointers are
244     /// valid until the I/O operation is completed, typically via completion
245     /// ports and waiting to receive the completion notification on the port.
write_overlapped( &self, buf: &[u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>246     pub unsafe fn write_overlapped(
247         &self,
248         buf: &[u8],
249         overlapped: *mut OVERLAPPED,
250     ) -> io::Result<Option<usize>> {
251         let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
252         let res = WriteFile(
253             self.handle.raw(),
254             buf.as_ptr() as *const _,
255             len,
256             std::ptr::null_mut(),
257             overlapped,
258         );
259         if res == 0 {
260             let err = io::Error::last_os_error();
261             if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
262                 return Err(err);
263             }
264         }
265 
266         let mut bytes = 0;
267         let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
268         if res == 0 {
269             let err = io::Error::last_os_error();
270             if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
271                 Ok(None)
272             } else {
273                 Err(err)
274             }
275         } else {
276             Ok(Some(bytes as usize))
277         }
278     }
279 
280     /// Calls the `GetOverlappedResult` function to get the result of an
281     /// overlapped operation for this handle.
282     ///
283     /// This function takes the `OVERLAPPED` argument which must have been used
284     /// to initiate an overlapped I/O operation, and returns either the
285     /// successful number of bytes transferred during the operation or an error
286     /// if one occurred.
287     ///
288     /// # Unsafety
289     ///
290     /// This function is unsafe as `overlapped` must have previously been used
291     /// to execute an operation for this handle, and it must also be a valid
292     /// pointer to an `Overlapped` instance.
293     #[inline]
result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize>294     unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
295         let mut transferred = 0;
296         let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
297         if r == 0 {
298             Err(io::Error::last_os_error())
299         } else {
300             Ok(transferred as usize)
301         }
302     }
303 }
304 
305 #[test]
ptr_from()306 fn ptr_from() {
307     use std::mem::ManuallyDrop;
308     use std::ptr;
309 
310     let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
311     let inner: &Inner = &pipe.inner;
312     assert_eq!(
313         inner as *const Inner,
314         unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
315         "`ptr_from_conn_overlapped` incorrect"
316     );
317     assert_eq!(
318         inner as *const Inner,
319         unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
320         "`ptr_from_read_overlapped` incorrect"
321     );
322     assert_eq!(
323         inner as *const Inner,
324         unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
325         "`ptr_from_write_overlapped` incorrect"
326     );
327 }
328 
329 struct Io {
330     // Uniquely identifies the selector associated with this named pipe
331     cp: Option<Arc<CompletionPort>>,
332     // Token used to identify events
333     token: Option<Token>,
334     read: State,
335     write: State,
336     connect_error: Option<io::Error>,
337 }
338 
339 #[derive(Debug)]
340 enum State {
341     None,
342     Pending(Vec<u8>, usize),
343     Ok(Vec<u8>, usize),
344     Err(io::Error),
345 }
346 
347 // Odd tokens are for named pipes
348 static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(1);
349 
would_block() -> io::Error350 fn would_block() -> io::Error {
351     io::ErrorKind::WouldBlock.into()
352 }
353 
354 impl NamedPipe {
355     /// Creates a new named pipe at the specified `addr` given a "reasonable
356     /// set" of initial configuration options.
new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe>357     pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
358         use std::os::windows::ffi::OsStrExt;
359         let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
360 
361         // Safety: syscall
362         let h = unsafe {
363             CreateNamedPipeW(
364                 name.as_ptr(),
365                 PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
366                 PIPE_TYPE_BYTE,
367                 PIPE_UNLIMITED_INSTANCES,
368                 65536,
369                 65536,
370                 0,
371                 std::ptr::null_mut(),
372             )
373         };
374 
375         if h == INVALID_HANDLE_VALUE {
376             Err(io::Error::last_os_error())
377         } else {
378             // Safety: nothing actually unsafe about this. The trait fn includes
379             // `unsafe`.
380             Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
381         }
382     }
383 
384     /// Attempts to call `ConnectNamedPipe`, if possible.
385     ///
386     /// This function will attempt to connect this pipe to a client in an
387     /// asynchronous fashion. If the function immediately establishes a
388     /// connection to a client then `Ok(())` is returned. Otherwise if a
389     /// connection attempt was issued and is now in progress then a "would
390     /// block" error is returned.
391     ///
392     /// When the connection is finished then this object will be flagged as
393     /// being ready for a write, or otherwise in the writable state.
394     ///
395     /// # Errors
396     ///
397     /// This function will return a "would block" error if the pipe has not yet
398     /// been registered with an event loop, if the connection operation has
399     /// previously been issued but has not yet completed, or if the connect
400     /// itself was issued and didn't finish immediately.
401     ///
402     /// Normal I/O errors from the call to `ConnectNamedPipe` are returned
403     /// immediately.
connect(&self) -> io::Result<()>404     pub fn connect(&self) -> io::Result<()> {
405         // "Acquire the connecting lock" or otherwise just make sure we're the
406         // only operation that's using the `connect` overlapped instance.
407         if self.inner.connecting.swap(true, SeqCst) {
408             return Err(would_block());
409         }
410 
411         // Now that we've flagged ourselves in the connecting state, issue the
412         // connection attempt. Afterwards interpret the return value and set
413         // internal state accordingly.
414         let res = unsafe {
415             let overlapped = self.inner.connect.as_ptr() as *mut _;
416             self.inner.connect_overlapped(overlapped)
417         };
418 
419         match res {
420             // The connection operation finished immediately, so let's schedule
421             // reads/writes and such.
422             Ok(true) => {
423                 self.inner.connecting.store(false, SeqCst);
424                 Inner::post_register(&self.inner, None);
425                 Ok(())
426             }
427 
428             // If the overlapped operation was successful and didn't finish
429             // immediately then we forget a copy of the arc we hold
430             // internally. This ensures that when the completion status comes
431             // in for the I/O operation finishing it'll have a reference
432             // associated with it and our data will still be valid. The
433             // `connect_done` function will "reify" this forgotten pointer to
434             // drop the refcount on the other side.
435             Ok(false) => {
436                 mem::forget(self.inner.clone());
437                 Err(would_block())
438             }
439 
440             Err(e) => {
441                 self.inner.connecting.store(false, SeqCst);
442                 Err(e)
443             }
444         }
445     }
446 
447     /// Takes any internal error that has happened after the last I/O operation
448     /// which hasn't been retrieved yet.
449     ///
450     /// This is particularly useful when detecting failed attempts to `connect`.
451     /// After a completed `connect` flags this pipe as writable then callers
452     /// must invoke this method to determine whether the connection actually
453     /// succeeded. If this function returns `None` then a client is connected,
454     /// otherwise it returns an error of what happened and a client shouldn't be
455     /// connected.
take_error(&self) -> io::Result<Option<io::Error>>456     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
457         Ok(self.inner.io.lock().unwrap().connect_error.take())
458     }
459 
460     /// Disconnects this named pipe from a connected client.
461     ///
462     /// This function will disconnect the pipe from a connected client, if any,
463     /// transitively calling the `DisconnectNamedPipe` function.
464     ///
465     /// After a `disconnect` is issued, then a `connect` may be called again to
466     /// connect to another client.
disconnect(&self) -> io::Result<()>467     pub fn disconnect(&self) -> io::Result<()> {
468         self.inner.disconnect()
469     }
470 }
471 
472 impl FromRawHandle for NamedPipe {
from_raw_handle(handle: RawHandle) -> NamedPipe473     unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
474         NamedPipe {
475             inner: Arc::new(Inner {
476                 handle: Handle::new(handle as HANDLE),
477                 connect: Overlapped::new(connect_done),
478                 connecting: AtomicBool::new(false),
479                 read: Overlapped::new(read_done),
480                 write: Overlapped::new(write_done),
481                 io: Mutex::new(Io {
482                     cp: None,
483                     token: None,
484                     read: State::None,
485                     write: State::None,
486                     connect_error: None,
487                 }),
488                 pool: Mutex::new(BufferPool::with_capacity(2)),
489             }),
490         }
491     }
492 }
493 
494 impl Read for NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>495     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
496         <&NamedPipe as Read>::read(&mut &*self, buf)
497     }
498 }
499 
500 impl Write for NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>501     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
502         <&NamedPipe as Write>::write(&mut &*self, buf)
503     }
504 
flush(&mut self) -> io::Result<()>505     fn flush(&mut self) -> io::Result<()> {
506         <&NamedPipe as Write>::flush(&mut &*self)
507     }
508 }
509 
510 impl<'a> Read for &'a NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>511     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
512         let mut state = self.inner.io.lock().unwrap();
513 
514         if state.token.is_none() {
515             return Err(would_block());
516         }
517 
518         match mem::replace(&mut state.read, State::None) {
519             // In theory not possible with `token` checked above,
520             // but return would block for now.
521             State::None => Err(would_block()),
522 
523             // A read is in flight, still waiting for it to finish
524             State::Pending(buf, amt) => {
525                 state.read = State::Pending(buf, amt);
526                 Err(would_block())
527             }
528 
529             // We previously read something into `data`, try to copy out some
530             // data. If we copy out all the data schedule a new read and
531             // otherwise store the buffer to get read later.
532             State::Ok(data, cur) => {
533                 let n = {
534                     let mut remaining = &data[cur..];
535                     remaining.read(buf)?
536                 };
537                 let next = cur + n;
538                 if next != data.len() {
539                     state.read = State::Ok(data, next);
540                 } else {
541                     self.inner.put_buffer(data);
542                     Inner::schedule_read(&self.inner, &mut state, None);
543                 }
544                 Ok(n)
545             }
546 
547             // Looks like an in-flight read hit an error, return that here while
548             // we schedule a new one.
549             State::Err(e) => {
550                 Inner::schedule_read(&self.inner, &mut state, None);
551                 if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
552                     Ok(0)
553                 } else {
554                     Err(e)
555                 }
556             }
557         }
558     }
559 }
560 
561 impl<'a> Write for &'a NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>562     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
563         // Make sure there's no writes pending
564         let mut io = self.inner.io.lock().unwrap();
565 
566         if io.token.is_none() {
567             return Err(would_block());
568         }
569 
570         match io.write {
571             State::None => {}
572             State::Err(_) => match mem::replace(&mut io.write, State::None) {
573                 State::Err(e) => return Err(e),
574                 // `io` is locked, so this branch is unreachable
575                 _ => unreachable!(),
576             },
577             // any other state should be handled in `write_done`
578             _ => {
579                 return Err(would_block());
580             }
581         }
582 
583         // Move `buf` onto the heap and fire off the write
584         let mut owned_buf = self.inner.get_buffer();
585         owned_buf.extend(buf);
586         match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
587             // Some bytes are written immediately
588             Some(n) => Ok(n),
589             // Write operation is anqueued for whole buffer
590             None => Ok(buf.len()),
591         }
592     }
593 
flush(&mut self) -> io::Result<()>594     fn flush(&mut self) -> io::Result<()> {
595         Ok(())
596     }
597 }
598 
599 impl Source for NamedPipe {
register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()>600     fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
601         let mut io = self.inner.io.lock().unwrap();
602 
603         io.check_association(registry, false)?;
604 
605         if io.token.is_some() {
606             return Err(io::Error::new(
607                 io::ErrorKind::AlreadyExists,
608                 "I/O source already registered with a `Registry`",
609             ));
610         }
611 
612         if io.cp.is_none() {
613             let selector = registry.selector();
614 
615             io.cp = Some(selector.clone_port());
616 
617             let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
618             selector.inner.cp.add_handle(inner_token, self)?;
619         }
620 
621         io.token = Some(token);
622         drop(io);
623 
624         Inner::post_register(&self.inner, None);
625 
626         Ok(())
627     }
628 
reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()>629     fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
630         let mut io = self.inner.io.lock().unwrap();
631 
632         io.check_association(registry, true)?;
633 
634         io.token = Some(token);
635         drop(io);
636 
637         Inner::post_register(&self.inner, None);
638 
639         Ok(())
640     }
641 
deregister(&mut self, registry: &Registry) -> io::Result<()>642     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
643         let mut io = self.inner.io.lock().unwrap();
644 
645         io.check_association(registry, true)?;
646 
647         if io.token.is_none() {
648             return Err(io::Error::new(
649                 io::ErrorKind::NotFound,
650                 "I/O source not registered with `Registry`",
651             ));
652         }
653 
654         io.token = None;
655         Ok(())
656     }
657 }
658 
659 impl AsRawHandle for NamedPipe {
as_raw_handle(&self) -> RawHandle660     fn as_raw_handle(&self) -> RawHandle {
661         self.inner.handle.raw() as RawHandle
662     }
663 }
664 
665 impl fmt::Debug for NamedPipe {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result666     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667         self.inner.handle.fmt(f)
668     }
669 }
670 
671 impl Drop for NamedPipe {
drop(&mut self)672     fn drop(&mut self) {
673         // Cancel pending reads/connects, but don't cancel writes to ensure that
674         // everything is flushed out.
675         unsafe {
676             if self.inner.connecting.load(SeqCst) {
677                 drop(cancel(&self.inner.handle, &self.inner.connect));
678             }
679 
680             let io = self.inner.io.lock().unwrap();
681             if let State::Pending(..) = io.read {
682                 drop(cancel(&self.inner.handle, &self.inner.read));
683             }
684         }
685     }
686 }
687 
688 impl Inner {
689     /// Schedules a read to happen in the background, executing an overlapped
690     /// operation.
691     ///
692     /// This function returns `true` if a normal error happens or if the read
693     /// is scheduled in the background. If the pipe is no longer connected
694     /// (ERROR_PIPE_LISTENING) then `false` is returned and no read is
695     /// scheduled.
schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool696     fn schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool {
697         // Check to see if a read is already scheduled/completed
698         match io.read {
699             State::None => {}
700             _ => return true,
701         }
702 
703         // Allocate a buffer and schedule the read.
704         let mut buf = me.get_buffer();
705         let e = unsafe {
706             let overlapped = me.read.as_ptr() as *mut _;
707             let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
708             me.read_overlapped(slice, overlapped)
709         };
710 
711         match e {
712             // See `NamedPipe::connect` above for the rationale behind `forget`
713             Ok(_) => {
714                 io.read = State::Pending(buf, 0); // 0 is ignored on read side
715                 mem::forget(me.clone());
716                 true
717             }
718 
719             // If ERROR_PIPE_LISTENING happens then it's not a real read error,
720             // we just need to wait for a connect.
721             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => false,
722 
723             // If some other error happened, though, we're now readable to give
724             // out the error.
725             Err(e) => {
726                 io.read = State::Err(e);
727                 io.notify_readable(events);
728                 true
729             }
730         }
731     }
732 
733     /// Maybe schedules overlapped write operation.
734     ///
735     /// * `None` means that overlapped operation was enqueued
736     /// * `Some(n)` means that `n` bytes was immediately written.
737     ///   Note, that `write_done` will fire anyway to clean up the state.
maybe_schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, ) -> io::Result<Option<usize>>738     fn maybe_schedule_write(
739         me: &Arc<Inner>,
740         buf: Vec<u8>,
741         pos: usize,
742         io: &mut Io,
743     ) -> io::Result<Option<usize>> {
744         // Very similar to `schedule_read` above, just done for the write half.
745         let e = unsafe {
746             let overlapped = me.write.as_ptr() as *mut _;
747             me.write_overlapped(&buf[pos..], overlapped)
748         };
749 
750         // See `connect` above for the rationale behind `forget`
751         match e {
752             // `n` bytes are written immediately
753             Ok(Some(n)) => {
754                 io.write = State::Ok(buf, pos);
755                 mem::forget(me.clone());
756                 Ok(Some(n))
757             }
758             // write operation is enqueued
759             Ok(None) => {
760                 io.write = State::Pending(buf, pos);
761                 mem::forget(me.clone());
762                 Ok(None)
763             }
764             Err(e) => Err(e),
765         }
766     }
767 
schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>, )768     fn schedule_write(
769         me: &Arc<Inner>,
770         buf: Vec<u8>,
771         pos: usize,
772         io: &mut Io,
773         events: Option<&mut Vec<Event>>,
774     ) {
775         match Inner::maybe_schedule_write(me, buf, pos, io) {
776             Ok(Some(_)) => {
777                 // immediate result will be handled in `write_done`,
778                 // so we'll reinterpret the `Ok` state
779                 let state = mem::replace(&mut io.write, State::None);
780                 io.write = match state {
781                     State::Ok(buf, pos) => State::Pending(buf, pos),
782                     // io is locked, so this branch is unreachable
783                     _ => unreachable!(),
784                 };
785                 mem::forget(me.clone());
786             }
787             Ok(None) => (),
788             Err(e) => {
789                 io.write = State::Err(e);
790                 io.notify_writable(events);
791             }
792         }
793     }
794 
post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>)795     fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
796         let mut io = me.io.lock().unwrap();
797         #[allow(clippy::needless_option_as_deref)]
798         if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
799             if let State::None = io.write {
800                 io.notify_writable(events);
801             }
802         }
803     }
804 
get_buffer(&self) -> Vec<u8>805     fn get_buffer(&self) -> Vec<u8> {
806         self.pool.lock().unwrap().get(4 * 1024)
807     }
808 
put_buffer(&self, buf: Vec<u8>)809     fn put_buffer(&self, buf: Vec<u8>) {
810         self.pool.lock().unwrap().put(buf)
811     }
812 }
813 
cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()>814 unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
815     let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
816     // `CancelIoEx` returns 0 on error:
817     // https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
818     if ret == 0 {
819         Err(io::Error::last_os_error())
820     } else {
821         Ok(())
822     }
823 }
824 
connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)825 fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
826     let status = CompletionStatus::from_entry(status);
827 
828     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
829     // the refcount is available to us due to the `mem::forget` in
830     // `connect` above.
831     let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
832 
833     // Flag ourselves as no longer using the `connect` overlapped instances.
834     let prev = me.connecting.swap(false, SeqCst);
835     assert!(prev, "NamedPipe was not previously connecting");
836 
837     // Stash away our connect error if one happened
838     debug_assert_eq!(status.bytes_transferred(), 0);
839     unsafe {
840         match me.result(status.overlapped()) {
841             Ok(n) => debug_assert_eq!(n, 0),
842             Err(e) => me.io.lock().unwrap().connect_error = Some(e),
843         }
844     }
845 
846     // We essentially just finished a registration, so kick off a
847     // read and register write readiness.
848     Inner::post_register(&me, events);
849 }
850 
read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)851 fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
852     let status = CompletionStatus::from_entry(status);
853 
854     // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
855     // the refcount is available to us due to the `mem::forget` in
856     // `schedule_read` above.
857     let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
858 
859     // Move from the `Pending` to `Ok` state.
860     let mut io = me.io.lock().unwrap();
861     let mut buf = match mem::replace(&mut io.read, State::None) {
862         State::Pending(buf, _) => buf,
863         _ => unreachable!(),
864     };
865     unsafe {
866         match me.result(status.overlapped()) {
867             Ok(n) => {
868                 debug_assert_eq!(status.bytes_transferred() as usize, n);
869                 buf.set_len(status.bytes_transferred() as usize);
870                 io.read = State::Ok(buf, 0);
871             }
872             Err(e) => {
873                 debug_assert_eq!(status.bytes_transferred(), 0);
874                 io.read = State::Err(e);
875             }
876         }
877     }
878 
879     // Flag our readiness that we've got data.
880     io.notify_readable(events);
881 }
882 
write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)883 fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
884     let status = CompletionStatus::from_entry(status);
885 
886     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
887     // the refcount is available to us due to the `mem::forget` in
888     // `schedule_write` above.
889     let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
890 
891     // Make the state change out of `Pending`. If we wrote the entire buffer
892     // then we're writable again and otherwise we schedule another write.
893     let mut io = me.io.lock().unwrap();
894     let (buf, pos) = match mem::replace(&mut io.write, State::None) {
895         // `Ok` here means, that the operation was completed immediately
896         // `bytes_transferred` is already reported to a client
897         State::Ok(..) => {
898             io.notify_writable(events);
899             return;
900         }
901         State::Pending(buf, pos) => (buf, pos),
902         _ => unreachable!(),
903     };
904 
905     unsafe {
906         match me.result(status.overlapped()) {
907             Ok(n) => {
908                 debug_assert_eq!(status.bytes_transferred() as usize, n);
909                 let new_pos = pos + (status.bytes_transferred() as usize);
910                 if new_pos == buf.len() {
911                     me.put_buffer(buf);
912                     io.notify_writable(events);
913                 } else {
914                     Inner::schedule_write(&me, buf, new_pos, &mut io, events);
915                 }
916             }
917             Err(e) => {
918                 debug_assert_eq!(status.bytes_transferred(), 0);
919                 io.write = State::Err(e);
920                 io.notify_writable(events);
921             }
922         }
923     }
924 }
925 
926 impl Io {
check_association(&self, registry: &Registry, required: bool) -> io::Result<()>927     fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
928         match self.cp {
929             Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
930                 io::ErrorKind::AlreadyExists,
931                 "I/O source already registered with a different `Registry`",
932             )),
933             None if required => Err(io::Error::new(
934                 io::ErrorKind::NotFound,
935                 "I/O source not registered with `Registry`",
936             )),
937             _ => Ok(()),
938         }
939     }
940 
notify_readable(&self, events: Option<&mut Vec<Event>>)941     fn notify_readable(&self, events: Option<&mut Vec<Event>>) {
942         if let Some(token) = self.token {
943             let mut ev = Event::new(token);
944             ev.set_readable();
945 
946             if let Some(events) = events {
947                 events.push(ev);
948             } else {
949                 let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
950             }
951         }
952     }
953 
notify_writable(&self, events: Option<&mut Vec<Event>>)954     fn notify_writable(&self, events: Option<&mut Vec<Event>>) {
955         if let Some(token) = self.token {
956             let mut ev = Event::new(token);
957             ev.set_writable();
958 
959             if let Some(events) = events {
960                 events.push(ev);
961             } else {
962                 let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
963             }
964         }
965     }
966 }
967 
968 struct BufferPool {
969     pool: Vec<Vec<u8>>,
970 }
971 
972 impl BufferPool {
with_capacity(cap: usize) -> BufferPool973     fn with_capacity(cap: usize) -> BufferPool {
974         BufferPool {
975             pool: Vec::with_capacity(cap),
976         }
977     }
978 
get(&mut self, default_cap: usize) -> Vec<u8>979     fn get(&mut self, default_cap: usize) -> Vec<u8> {
980         self.pool
981             .pop()
982             .unwrap_or_else(|| Vec::with_capacity(default_cap))
983     }
984 
put(&mut self, mut buf: Vec<u8>)985     fn put(&mut self, mut buf: Vec<u8>) {
986         if self.pool.len() < self.pool.capacity() {
987             unsafe {
988                 buf.set_len(0);
989             }
990             self.pool.push(buf);
991         }
992     }
993 }
994