1 use std::ffi::OsStr;
2 use std::io::{self, Read, Write};
3 use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle};
4 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
5 use std::sync::atomic::{AtomicBool, AtomicUsize};
6 use std::sync::{Arc, Mutex};
7 use std::{fmt, mem, slice};
8
9 use windows_sys::Win32::Foundation::{
10 ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
11 ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
12 };
13 use windows_sys::Win32::Storage::FileSystem::{
14 ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
15 };
16 use windows_sys::Win32::System::Pipes::{
17 ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
18 PIPE_UNLIMITED_INSTANCES,
19 };
20 use windows_sys::Win32::System::IO::{
21 CancelIoEx, GetOverlappedResult, OVERLAPPED, OVERLAPPED_ENTRY,
22 };
23
24 use crate::event::Source;
25 use crate::sys::windows::iocp::{CompletionPort, CompletionStatus};
26 use crate::sys::windows::{Event, Handle, Overlapped};
27 use crate::Registry;
28 use crate::{Interest, Token};
29
30 /// Non-blocking windows named pipe.
31 ///
32 /// This structure internally contains a `HANDLE` which represents the named
33 /// pipe, and also maintains state associated with the mio event loop and active
34 /// I/O operations that have been scheduled to translate IOCP to a readiness
35 /// model.
36 ///
37 /// Note, IOCP is a *completion* based model whereas mio is a *readiness* based
38 /// model. To bridge this, `NamedPipe` performs internal buffering. Writes are
39 /// written to an internal buffer and the buffer is submitted to IOCP. IOCP
40 /// reads are submitted using internal buffers and `NamedPipe::read` reads from
41 /// this internal buffer.
42 ///
43 /// # Trait implementations
44 ///
45 /// The `Read` and `Write` traits are implemented for `NamedPipe` and for
46 /// `&NamedPipe`. This represents that a named pipe can be concurrently read and
47 /// written to and also can be read and written to at all. Typically a named
48 /// pipe needs to be connected to a client before it can be read or written,
49 /// however.
50 ///
51 /// Note that for I/O operations on a named pipe to succeed then the named pipe
52 /// needs to be associated with an event loop. Until this happens all I/O
53 /// operations will return a "would block" error.
54 ///
55 /// # Managing connections
56 ///
57 /// The `NamedPipe` type supports a `connect` method to connect to a client and
58 /// a `disconnect` method to disconnect from that client. These two methods only
59 /// work once a named pipe is associated with an event loop.
60 ///
61 /// The `connect` method will succeed asynchronously and a completion can be
62 /// detected once the object receives a writable notification.
63 ///
64 /// # Named pipe clients
65 ///
66 /// Currently to create a client of a named pipe server then you can use the
67 /// `OpenOptions` type in the standard library to create a `File` that connects
68 /// to a named pipe. Afterwards you can use the `into_raw_handle` method coupled
69 /// with the `NamedPipe::from_raw_handle` method to convert that to a named pipe
70 /// that can operate asynchronously. Don't forget to pass the
71 /// `FILE_FLAG_OVERLAPPED` flag when opening the `File`.
72 pub struct NamedPipe {
73 inner: Arc<Inner>,
74 }
75
76 /// # Notes
77 ///
78 /// The memory layout of this structure must be fixed as the
79 /// `ptr_from_*_overlapped` methods depend on it, see the `ptr_from` test.
80 #[repr(C)]
81 struct Inner {
82 // NOTE: careful modifying the order of these three fields, the `ptr_from_*`
83 // methods depend on the layout!
84 connect: Overlapped,
85 read: Overlapped,
86 write: Overlapped,
87 // END NOTE.
88 handle: Handle,
89 connecting: AtomicBool,
90 io: Mutex<Io>,
91 pool: Mutex<BufferPool>,
92 }
93
94 impl Inner {
95 /// Converts a pointer to `Inner.connect` to a pointer to `Inner`.
96 ///
97 /// # Unsafety
98 ///
99 /// Caller must ensure `ptr` is pointing to `Inner.connect`.
ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner100 unsafe fn ptr_from_conn_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
101 // `connect` is the first field, so the pointer are the same.
102 ptr.cast()
103 }
104
105 /// Same as [`ptr_from_conn_overlapped`] but for `Inner.read`.
ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner106 unsafe fn ptr_from_read_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
107 // `read` is after `connect: Overlapped`.
108 (ptr as *mut Overlapped).wrapping_sub(1) as *const Inner
109 }
110
111 /// Same as [`ptr_from_conn_overlapped`] but for `Inner.write`.
ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner112 unsafe fn ptr_from_write_overlapped(ptr: *mut OVERLAPPED) -> *const Inner {
113 // `read` is after `connect: Overlapped` and `read: Overlapped`.
114 (ptr as *mut Overlapped).wrapping_sub(2) as *const Inner
115 }
116
117 /// Issue a connection request with the specified overlapped operation.
118 ///
119 /// This function will issue a request to connect a client to this server,
120 /// returning immediately after starting the overlapped operation.
121 ///
122 /// If this function immediately succeeds then `Ok(true)` is returned. If
123 /// the overlapped operation is enqueued and pending, then `Ok(false)` is
124 /// returned. Otherwise an error is returned indicating what went wrong.
125 ///
126 /// # Unsafety
127 ///
128 /// This function is unsafe because the kernel requires that the
129 /// `overlapped` pointer is valid until the end of the I/O operation. The
130 /// kernel also requires that `overlapped` is unique for this I/O operation
131 /// and is not in use for any other I/O.
132 ///
133 /// To safely use this function callers must ensure that this pointer is
134 /// valid until the I/O operation is completed, typically via completion
135 /// ports and waiting to receive the completion notification on the port.
connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool>136 pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
137 if ConnectNamedPipe(self.handle.raw(), overlapped) != 0 {
138 return Ok(true);
139 }
140
141 let err = io::Error::last_os_error();
142
143 match err.raw_os_error().map(|e| e as u32) {
144 Some(ERROR_PIPE_CONNECTED) => Ok(true),
145 Some(ERROR_NO_DATA) => Ok(true),
146 Some(ERROR_IO_PENDING) => Ok(false),
147 _ => Err(err),
148 }
149 }
150
151 /// Disconnects this named pipe from any connected client.
disconnect(&self) -> io::Result<()>152 pub fn disconnect(&self) -> io::Result<()> {
153 if unsafe { DisconnectNamedPipe(self.handle.raw()) } == 0 {
154 Err(io::Error::last_os_error())
155 } else {
156 Ok(())
157 }
158 }
159
160 /// Issues an overlapped read operation to occur on this pipe.
161 ///
162 /// This function will issue an asynchronous read to occur in an overlapped
163 /// fashion, returning immediately. The `buf` provided will be filled in
164 /// with data and the request is tracked by the `overlapped` function
165 /// provided.
166 ///
167 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
168 /// `n` is the number of bytes read. If an asynchronous operation is
169 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
170 /// it is returned.
171 ///
172 /// When this operation completes (or if it completes immediately), another
173 /// mechanism must be used to learn how many bytes were transferred (such as
174 /// looking at the filed in the IOCP status message).
175 ///
176 /// # Unsafety
177 ///
178 /// This function is unsafe because the kernel requires that the `buf` and
179 /// `overlapped` pointers to be valid until the end of the I/O operation.
180 /// The kernel also requires that `overlapped` is unique for this I/O
181 /// operation and is not in use for any other I/O.
182 ///
183 /// To safely use this function callers must ensure that the pointers are
184 /// valid until the I/O operation is completed, typically via completion
185 /// ports and waiting to receive the completion notification on the port.
read_overlapped( &self, buf: &mut [u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>186 pub unsafe fn read_overlapped(
187 &self,
188 buf: &mut [u8],
189 overlapped: *mut OVERLAPPED,
190 ) -> io::Result<Option<usize>> {
191 let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
192 let res = ReadFile(
193 self.handle.raw(),
194 buf.as_mut_ptr() as *mut _,
195 len,
196 std::ptr::null_mut(),
197 overlapped,
198 );
199 if res == 0 {
200 let err = io::Error::last_os_error();
201 if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
202 return Err(err);
203 }
204 }
205
206 let mut bytes = 0;
207 let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
208 if res == 0 {
209 let err = io::Error::last_os_error();
210 if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
211 Ok(None)
212 } else {
213 Err(err)
214 }
215 } else {
216 Ok(Some(bytes as usize))
217 }
218 }
219
220 /// Issues an overlapped write operation to occur on this pipe.
221 ///
222 /// This function will issue an asynchronous write to occur in an overlapped
223 /// fashion, returning immediately. The `buf` provided will be filled in
224 /// with data and the request is tracked by the `overlapped` function
225 /// provided.
226 ///
227 /// If the operation succeeds immediately, `Ok(Some(n))` is returned where
228 /// `n` is the number of bytes written. If an asynchronous operation is
229 /// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
230 /// it is returned.
231 ///
232 /// When this operation completes (or if it completes immediately), another
233 /// mechanism must be used to learn how many bytes were transferred (such as
234 /// looking at the filed in the IOCP status message).
235 ///
236 /// # Unsafety
237 ///
238 /// This function is unsafe because the kernel requires that the `buf` and
239 /// `overlapped` pointers to be valid until the end of the I/O operation.
240 /// The kernel also requires that `overlapped` is unique for this I/O
241 /// operation and is not in use for any other I/O.
242 ///
243 /// To safely use this function callers must ensure that the pointers are
244 /// valid until the I/O operation is completed, typically via completion
245 /// ports and waiting to receive the completion notification on the port.
write_overlapped( &self, buf: &[u8], overlapped: *mut OVERLAPPED, ) -> io::Result<Option<usize>>246 pub unsafe fn write_overlapped(
247 &self,
248 buf: &[u8],
249 overlapped: *mut OVERLAPPED,
250 ) -> io::Result<Option<usize>> {
251 let len = std::cmp::min(buf.len(), u32::MAX as usize) as u32;
252 let res = WriteFile(
253 self.handle.raw(),
254 buf.as_ptr() as *const _,
255 len,
256 std::ptr::null_mut(),
257 overlapped,
258 );
259 if res == 0 {
260 let err = io::Error::last_os_error();
261 if err.raw_os_error() != Some(ERROR_IO_PENDING as i32) {
262 return Err(err);
263 }
264 }
265
266 let mut bytes = 0;
267 let res = GetOverlappedResult(self.handle.raw(), overlapped, &mut bytes, 0);
268 if res == 0 {
269 let err = io::Error::last_os_error();
270 if err.raw_os_error() == Some(ERROR_IO_INCOMPLETE as i32) {
271 Ok(None)
272 } else {
273 Err(err)
274 }
275 } else {
276 Ok(Some(bytes as usize))
277 }
278 }
279
280 /// Calls the `GetOverlappedResult` function to get the result of an
281 /// overlapped operation for this handle.
282 ///
283 /// This function takes the `OVERLAPPED` argument which must have been used
284 /// to initiate an overlapped I/O operation, and returns either the
285 /// successful number of bytes transferred during the operation or an error
286 /// if one occurred.
287 ///
288 /// # Unsafety
289 ///
290 /// This function is unsafe as `overlapped` must have previously been used
291 /// to execute an operation for this handle, and it must also be a valid
292 /// pointer to an `Overlapped` instance.
293 #[inline]
result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize>294 unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
295 let mut transferred = 0;
296 let r = GetOverlappedResult(self.handle.raw(), overlapped, &mut transferred, 0);
297 if r == 0 {
298 Err(io::Error::last_os_error())
299 } else {
300 Ok(transferred as usize)
301 }
302 }
303 }
304
305 #[test]
ptr_from()306 fn ptr_from() {
307 use std::mem::ManuallyDrop;
308 use std::ptr;
309
310 let pipe = unsafe { ManuallyDrop::new(NamedPipe::from_raw_handle(ptr::null_mut())) };
311 let inner: &Inner = &pipe.inner;
312 assert_eq!(
313 inner as *const Inner,
314 unsafe { Inner::ptr_from_conn_overlapped(&inner.connect as *const _ as *mut OVERLAPPED) },
315 "`ptr_from_conn_overlapped` incorrect"
316 );
317 assert_eq!(
318 inner as *const Inner,
319 unsafe { Inner::ptr_from_read_overlapped(&inner.read as *const _ as *mut OVERLAPPED) },
320 "`ptr_from_read_overlapped` incorrect"
321 );
322 assert_eq!(
323 inner as *const Inner,
324 unsafe { Inner::ptr_from_write_overlapped(&inner.write as *const _ as *mut OVERLAPPED) },
325 "`ptr_from_write_overlapped` incorrect"
326 );
327 }
328
329 struct Io {
330 // Uniquely identifies the selector associated with this named pipe
331 cp: Option<Arc<CompletionPort>>,
332 // Token used to identify events
333 token: Option<Token>,
334 read: State,
335 write: State,
336 connect_error: Option<io::Error>,
337 }
338
339 #[derive(Debug)]
340 enum State {
341 None,
342 Pending(Vec<u8>, usize),
343 Ok(Vec<u8>, usize),
344 Err(io::Error),
345 }
346
347 // Odd tokens are for named pipes
348 static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(1);
349
would_block() -> io::Error350 fn would_block() -> io::Error {
351 io::ErrorKind::WouldBlock.into()
352 }
353
354 impl NamedPipe {
355 /// Creates a new named pipe at the specified `addr` given a "reasonable
356 /// set" of initial configuration options.
new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe>357 pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
358 use std::os::windows::ffi::OsStrExt;
359 let name: Vec<_> = addr.as_ref().encode_wide().chain(Some(0)).collect();
360
361 // Safety: syscall
362 let h = unsafe {
363 CreateNamedPipeW(
364 name.as_ptr(),
365 PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
366 PIPE_TYPE_BYTE,
367 PIPE_UNLIMITED_INSTANCES,
368 65536,
369 65536,
370 0,
371 std::ptr::null_mut(),
372 )
373 };
374
375 if h == INVALID_HANDLE_VALUE {
376 Err(io::Error::last_os_error())
377 } else {
378 // Safety: nothing actually unsafe about this. The trait fn includes
379 // `unsafe`.
380 Ok(unsafe { Self::from_raw_handle(h as RawHandle) })
381 }
382 }
383
384 /// Attempts to call `ConnectNamedPipe`, if possible.
385 ///
386 /// This function will attempt to connect this pipe to a client in an
387 /// asynchronous fashion. If the function immediately establishes a
388 /// connection to a client then `Ok(())` is returned. Otherwise if a
389 /// connection attempt was issued and is now in progress then a "would
390 /// block" error is returned.
391 ///
392 /// When the connection is finished then this object will be flagged as
393 /// being ready for a write, or otherwise in the writable state.
394 ///
395 /// # Errors
396 ///
397 /// This function will return a "would block" error if the pipe has not yet
398 /// been registered with an event loop, if the connection operation has
399 /// previously been issued but has not yet completed, or if the connect
400 /// itself was issued and didn't finish immediately.
401 ///
402 /// Normal I/O errors from the call to `ConnectNamedPipe` are returned
403 /// immediately.
connect(&self) -> io::Result<()>404 pub fn connect(&self) -> io::Result<()> {
405 // "Acquire the connecting lock" or otherwise just make sure we're the
406 // only operation that's using the `connect` overlapped instance.
407 if self.inner.connecting.swap(true, SeqCst) {
408 return Err(would_block());
409 }
410
411 // Now that we've flagged ourselves in the connecting state, issue the
412 // connection attempt. Afterwards interpret the return value and set
413 // internal state accordingly.
414 let res = unsafe {
415 let overlapped = self.inner.connect.as_ptr() as *mut _;
416 self.inner.connect_overlapped(overlapped)
417 };
418
419 match res {
420 // The connection operation finished immediately, so let's schedule
421 // reads/writes and such.
422 Ok(true) => {
423 self.inner.connecting.store(false, SeqCst);
424 Inner::post_register(&self.inner, None);
425 Ok(())
426 }
427
428 // If the overlapped operation was successful and didn't finish
429 // immediately then we forget a copy of the arc we hold
430 // internally. This ensures that when the completion status comes
431 // in for the I/O operation finishing it'll have a reference
432 // associated with it and our data will still be valid. The
433 // `connect_done` function will "reify" this forgotten pointer to
434 // drop the refcount on the other side.
435 Ok(false) => {
436 mem::forget(self.inner.clone());
437 Err(would_block())
438 }
439
440 Err(e) => {
441 self.inner.connecting.store(false, SeqCst);
442 Err(e)
443 }
444 }
445 }
446
447 /// Takes any internal error that has happened after the last I/O operation
448 /// which hasn't been retrieved yet.
449 ///
450 /// This is particularly useful when detecting failed attempts to `connect`.
451 /// After a completed `connect` flags this pipe as writable then callers
452 /// must invoke this method to determine whether the connection actually
453 /// succeeded. If this function returns `None` then a client is connected,
454 /// otherwise it returns an error of what happened and a client shouldn't be
455 /// connected.
take_error(&self) -> io::Result<Option<io::Error>>456 pub fn take_error(&self) -> io::Result<Option<io::Error>> {
457 Ok(self.inner.io.lock().unwrap().connect_error.take())
458 }
459
460 /// Disconnects this named pipe from a connected client.
461 ///
462 /// This function will disconnect the pipe from a connected client, if any,
463 /// transitively calling the `DisconnectNamedPipe` function.
464 ///
465 /// After a `disconnect` is issued, then a `connect` may be called again to
466 /// connect to another client.
disconnect(&self) -> io::Result<()>467 pub fn disconnect(&self) -> io::Result<()> {
468 self.inner.disconnect()
469 }
470 }
471
472 impl FromRawHandle for NamedPipe {
from_raw_handle(handle: RawHandle) -> NamedPipe473 unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
474 NamedPipe {
475 inner: Arc::new(Inner {
476 handle: Handle::new(handle as HANDLE),
477 connect: Overlapped::new(connect_done),
478 connecting: AtomicBool::new(false),
479 read: Overlapped::new(read_done),
480 write: Overlapped::new(write_done),
481 io: Mutex::new(Io {
482 cp: None,
483 token: None,
484 read: State::None,
485 write: State::None,
486 connect_error: None,
487 }),
488 pool: Mutex::new(BufferPool::with_capacity(2)),
489 }),
490 }
491 }
492 }
493
494 impl Read for NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>495 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
496 <&NamedPipe as Read>::read(&mut &*self, buf)
497 }
498 }
499
500 impl Write for NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>501 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
502 <&NamedPipe as Write>::write(&mut &*self, buf)
503 }
504
flush(&mut self) -> io::Result<()>505 fn flush(&mut self) -> io::Result<()> {
506 <&NamedPipe as Write>::flush(&mut &*self)
507 }
508 }
509
510 impl<'a> Read for &'a NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>511 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
512 let mut state = self.inner.io.lock().unwrap();
513
514 if state.token.is_none() {
515 return Err(would_block());
516 }
517
518 match mem::replace(&mut state.read, State::None) {
519 // In theory not possible with `token` checked above,
520 // but return would block for now.
521 State::None => Err(would_block()),
522
523 // A read is in flight, still waiting for it to finish
524 State::Pending(buf, amt) => {
525 state.read = State::Pending(buf, amt);
526 Err(would_block())
527 }
528
529 // We previously read something into `data`, try to copy out some
530 // data. If we copy out all the data schedule a new read and
531 // otherwise store the buffer to get read later.
532 State::Ok(data, cur) => {
533 let n = {
534 let mut remaining = &data[cur..];
535 remaining.read(buf)?
536 };
537 let next = cur + n;
538 if next != data.len() {
539 state.read = State::Ok(data, next);
540 } else {
541 self.inner.put_buffer(data);
542 Inner::schedule_read(&self.inner, &mut state, None);
543 }
544 Ok(n)
545 }
546
547 // Looks like an in-flight read hit an error, return that here while
548 // we schedule a new one.
549 State::Err(e) => {
550 Inner::schedule_read(&self.inner, &mut state, None);
551 if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
552 Ok(0)
553 } else {
554 Err(e)
555 }
556 }
557 }
558 }
559 }
560
561 impl<'a> Write for &'a NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>562 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
563 // Make sure there's no writes pending
564 let mut io = self.inner.io.lock().unwrap();
565
566 if io.token.is_none() {
567 return Err(would_block());
568 }
569
570 match io.write {
571 State::None => {}
572 State::Err(_) => match mem::replace(&mut io.write, State::None) {
573 State::Err(e) => return Err(e),
574 // `io` is locked, so this branch is unreachable
575 _ => unreachable!(),
576 },
577 // any other state should be handled in `write_done`
578 _ => {
579 return Err(would_block());
580 }
581 }
582
583 // Move `buf` onto the heap and fire off the write
584 let mut owned_buf = self.inner.get_buffer();
585 owned_buf.extend(buf);
586 match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
587 // Some bytes are written immediately
588 Some(n) => Ok(n),
589 // Write operation is anqueued for whole buffer
590 None => Ok(buf.len()),
591 }
592 }
593
flush(&mut self) -> io::Result<()>594 fn flush(&mut self) -> io::Result<()> {
595 Ok(())
596 }
597 }
598
599 impl Source for NamedPipe {
register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()>600 fn register(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
601 let mut io = self.inner.io.lock().unwrap();
602
603 io.check_association(registry, false)?;
604
605 if io.token.is_some() {
606 return Err(io::Error::new(
607 io::ErrorKind::AlreadyExists,
608 "I/O source already registered with a `Registry`",
609 ));
610 }
611
612 if io.cp.is_none() {
613 let selector = registry.selector();
614
615 io.cp = Some(selector.clone_port());
616
617 let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
618 selector.inner.cp.add_handle(inner_token, self)?;
619 }
620
621 io.token = Some(token);
622 drop(io);
623
624 Inner::post_register(&self.inner, None);
625
626 Ok(())
627 }
628
reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()>629 fn reregister(&mut self, registry: &Registry, token: Token, _: Interest) -> io::Result<()> {
630 let mut io = self.inner.io.lock().unwrap();
631
632 io.check_association(registry, true)?;
633
634 io.token = Some(token);
635 drop(io);
636
637 Inner::post_register(&self.inner, None);
638
639 Ok(())
640 }
641
deregister(&mut self, registry: &Registry) -> io::Result<()>642 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
643 let mut io = self.inner.io.lock().unwrap();
644
645 io.check_association(registry, true)?;
646
647 if io.token.is_none() {
648 return Err(io::Error::new(
649 io::ErrorKind::NotFound,
650 "I/O source not registered with `Registry`",
651 ));
652 }
653
654 io.token = None;
655 Ok(())
656 }
657 }
658
659 impl AsRawHandle for NamedPipe {
as_raw_handle(&self) -> RawHandle660 fn as_raw_handle(&self) -> RawHandle {
661 self.inner.handle.raw() as RawHandle
662 }
663 }
664
665 impl fmt::Debug for NamedPipe {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 self.inner.handle.fmt(f)
668 }
669 }
670
671 impl Drop for NamedPipe {
drop(&mut self)672 fn drop(&mut self) {
673 // Cancel pending reads/connects, but don't cancel writes to ensure that
674 // everything is flushed out.
675 unsafe {
676 if self.inner.connecting.load(SeqCst) {
677 drop(cancel(&self.inner.handle, &self.inner.connect));
678 }
679
680 let io = self.inner.io.lock().unwrap();
681 if let State::Pending(..) = io.read {
682 drop(cancel(&self.inner.handle, &self.inner.read));
683 }
684 }
685 }
686 }
687
688 impl Inner {
689 /// Schedules a read to happen in the background, executing an overlapped
690 /// operation.
691 ///
692 /// This function returns `true` if a normal error happens or if the read
693 /// is scheduled in the background. If the pipe is no longer connected
694 /// (ERROR_PIPE_LISTENING) then `false` is returned and no read is
695 /// scheduled.
schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool696 fn schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool {
697 // Check to see if a read is already scheduled/completed
698 match io.read {
699 State::None => {}
700 _ => return true,
701 }
702
703 // Allocate a buffer and schedule the read.
704 let mut buf = me.get_buffer();
705 let e = unsafe {
706 let overlapped = me.read.as_ptr() as *mut _;
707 let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
708 me.read_overlapped(slice, overlapped)
709 };
710
711 match e {
712 // See `NamedPipe::connect` above for the rationale behind `forget`
713 Ok(_) => {
714 io.read = State::Pending(buf, 0); // 0 is ignored on read side
715 mem::forget(me.clone());
716 true
717 }
718
719 // If ERROR_PIPE_LISTENING happens then it's not a real read error,
720 // we just need to wait for a connect.
721 Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => false,
722
723 // If some other error happened, though, we're now readable to give
724 // out the error.
725 Err(e) => {
726 io.read = State::Err(e);
727 io.notify_readable(events);
728 true
729 }
730 }
731 }
732
733 /// Maybe schedules overlapped write operation.
734 ///
735 /// * `None` means that overlapped operation was enqueued
736 /// * `Some(n)` means that `n` bytes was immediately written.
737 /// Note, that `write_done` will fire anyway to clean up the state.
maybe_schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, ) -> io::Result<Option<usize>>738 fn maybe_schedule_write(
739 me: &Arc<Inner>,
740 buf: Vec<u8>,
741 pos: usize,
742 io: &mut Io,
743 ) -> io::Result<Option<usize>> {
744 // Very similar to `schedule_read` above, just done for the write half.
745 let e = unsafe {
746 let overlapped = me.write.as_ptr() as *mut _;
747 me.write_overlapped(&buf[pos..], overlapped)
748 };
749
750 // See `connect` above for the rationale behind `forget`
751 match e {
752 // `n` bytes are written immediately
753 Ok(Some(n)) => {
754 io.write = State::Ok(buf, pos);
755 mem::forget(me.clone());
756 Ok(Some(n))
757 }
758 // write operation is enqueued
759 Ok(None) => {
760 io.write = State::Pending(buf, pos);
761 mem::forget(me.clone());
762 Ok(None)
763 }
764 Err(e) => Err(e),
765 }
766 }
767
schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>, )768 fn schedule_write(
769 me: &Arc<Inner>,
770 buf: Vec<u8>,
771 pos: usize,
772 io: &mut Io,
773 events: Option<&mut Vec<Event>>,
774 ) {
775 match Inner::maybe_schedule_write(me, buf, pos, io) {
776 Ok(Some(_)) => {
777 // immediate result will be handled in `write_done`,
778 // so we'll reinterpret the `Ok` state
779 let state = mem::replace(&mut io.write, State::None);
780 io.write = match state {
781 State::Ok(buf, pos) => State::Pending(buf, pos),
782 // io is locked, so this branch is unreachable
783 _ => unreachable!(),
784 };
785 mem::forget(me.clone());
786 }
787 Ok(None) => (),
788 Err(e) => {
789 io.write = State::Err(e);
790 io.notify_writable(events);
791 }
792 }
793 }
794
post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>)795 fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
796 let mut io = me.io.lock().unwrap();
797 #[allow(clippy::needless_option_as_deref)]
798 if Inner::schedule_read(me, &mut io, events.as_deref_mut()) {
799 if let State::None = io.write {
800 io.notify_writable(events);
801 }
802 }
803 }
804
get_buffer(&self) -> Vec<u8>805 fn get_buffer(&self) -> Vec<u8> {
806 self.pool.lock().unwrap().get(4 * 1024)
807 }
808
put_buffer(&self, buf: Vec<u8>)809 fn put_buffer(&self, buf: Vec<u8>) {
810 self.pool.lock().unwrap().put(buf)
811 }
812 }
813
cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()>814 unsafe fn cancel(handle: &Handle, overlapped: &Overlapped) -> io::Result<()> {
815 let ret = CancelIoEx(handle.raw(), overlapped.as_ptr());
816 // `CancelIoEx` returns 0 on error:
817 // https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
818 if ret == 0 {
819 Err(io::Error::last_os_error())
820 } else {
821 Ok(())
822 }
823 }
824
connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)825 fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
826 let status = CompletionStatus::from_entry(status);
827
828 // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
829 // the refcount is available to us due to the `mem::forget` in
830 // `connect` above.
831 let me = unsafe { Arc::from_raw(Inner::ptr_from_conn_overlapped(status.overlapped())) };
832
833 // Flag ourselves as no longer using the `connect` overlapped instances.
834 let prev = me.connecting.swap(false, SeqCst);
835 assert!(prev, "NamedPipe was not previously connecting");
836
837 // Stash away our connect error if one happened
838 debug_assert_eq!(status.bytes_transferred(), 0);
839 unsafe {
840 match me.result(status.overlapped()) {
841 Ok(n) => debug_assert_eq!(n, 0),
842 Err(e) => me.io.lock().unwrap().connect_error = Some(e),
843 }
844 }
845
846 // We essentially just finished a registration, so kick off a
847 // read and register write readiness.
848 Inner::post_register(&me, events);
849 }
850
read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)851 fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
852 let status = CompletionStatus::from_entry(status);
853
854 // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
855 // the refcount is available to us due to the `mem::forget` in
856 // `schedule_read` above.
857 let me = unsafe { Arc::from_raw(Inner::ptr_from_read_overlapped(status.overlapped())) };
858
859 // Move from the `Pending` to `Ok` state.
860 let mut io = me.io.lock().unwrap();
861 let mut buf = match mem::replace(&mut io.read, State::None) {
862 State::Pending(buf, _) => buf,
863 _ => unreachable!(),
864 };
865 unsafe {
866 match me.result(status.overlapped()) {
867 Ok(n) => {
868 debug_assert_eq!(status.bytes_transferred() as usize, n);
869 buf.set_len(status.bytes_transferred() as usize);
870 io.read = State::Ok(buf, 0);
871 }
872 Err(e) => {
873 debug_assert_eq!(status.bytes_transferred(), 0);
874 io.read = State::Err(e);
875 }
876 }
877 }
878
879 // Flag our readiness that we've got data.
880 io.notify_readable(events);
881 }
882
write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)883 fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
884 let status = CompletionStatus::from_entry(status);
885
886 // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
887 // the refcount is available to us due to the `mem::forget` in
888 // `schedule_write` above.
889 let me = unsafe { Arc::from_raw(Inner::ptr_from_write_overlapped(status.overlapped())) };
890
891 // Make the state change out of `Pending`. If we wrote the entire buffer
892 // then we're writable again and otherwise we schedule another write.
893 let mut io = me.io.lock().unwrap();
894 let (buf, pos) = match mem::replace(&mut io.write, State::None) {
895 // `Ok` here means, that the operation was completed immediately
896 // `bytes_transferred` is already reported to a client
897 State::Ok(..) => {
898 io.notify_writable(events);
899 return;
900 }
901 State::Pending(buf, pos) => (buf, pos),
902 _ => unreachable!(),
903 };
904
905 unsafe {
906 match me.result(status.overlapped()) {
907 Ok(n) => {
908 debug_assert_eq!(status.bytes_transferred() as usize, n);
909 let new_pos = pos + (status.bytes_transferred() as usize);
910 if new_pos == buf.len() {
911 me.put_buffer(buf);
912 io.notify_writable(events);
913 } else {
914 Inner::schedule_write(&me, buf, new_pos, &mut io, events);
915 }
916 }
917 Err(e) => {
918 debug_assert_eq!(status.bytes_transferred(), 0);
919 io.write = State::Err(e);
920 io.notify_writable(events);
921 }
922 }
923 }
924 }
925
926 impl Io {
check_association(&self, registry: &Registry, required: bool) -> io::Result<()>927 fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
928 match self.cp {
929 Some(ref cp) if !registry.selector().same_port(cp) => Err(io::Error::new(
930 io::ErrorKind::AlreadyExists,
931 "I/O source already registered with a different `Registry`",
932 )),
933 None if required => Err(io::Error::new(
934 io::ErrorKind::NotFound,
935 "I/O source not registered with `Registry`",
936 )),
937 _ => Ok(()),
938 }
939 }
940
notify_readable(&self, events: Option<&mut Vec<Event>>)941 fn notify_readable(&self, events: Option<&mut Vec<Event>>) {
942 if let Some(token) = self.token {
943 let mut ev = Event::new(token);
944 ev.set_readable();
945
946 if let Some(events) = events {
947 events.push(ev);
948 } else {
949 let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
950 }
951 }
952 }
953
notify_writable(&self, events: Option<&mut Vec<Event>>)954 fn notify_writable(&self, events: Option<&mut Vec<Event>>) {
955 if let Some(token) = self.token {
956 let mut ev = Event::new(token);
957 ev.set_writable();
958
959 if let Some(events) = events {
960 events.push(ev);
961 } else {
962 let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
963 }
964 }
965 }
966 }
967
968 struct BufferPool {
969 pool: Vec<Vec<u8>>,
970 }
971
972 impl BufferPool {
with_capacity(cap: usize) -> BufferPool973 fn with_capacity(cap: usize) -> BufferPool {
974 BufferPool {
975 pool: Vec::with_capacity(cap),
976 }
977 }
978
get(&mut self, default_cap: usize) -> Vec<u8>979 fn get(&mut self, default_cap: usize) -> Vec<u8> {
980 self.pool
981 .pop()
982 .unwrap_or_else(|| Vec::with_capacity(default_cap))
983 }
984
put(&mut self, mut buf: Vec<u8>)985 fn put(&mut self, mut buf: Vec<u8>) {
986 if self.pool.len() < self.pool.capacity() {
987 unsafe {
988 buf.set_len(0);
989 }
990 self.pool.push(buf);
991 }
992 }
993 }
994