• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::event::Source;
2 use crate::sys::windows::{Event, Overlapped};
3 use crate::{poll, Registry};
4 use winapi::um::minwinbase::OVERLAPPED_ENTRY;
5 
6 use std::ffi::OsStr;
7 use std::fmt;
8 use std::io::{self, Read, Write};
9 use std::mem;
10 use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
11 use std::slice;
12 use std::sync::atomic::Ordering::{Relaxed, SeqCst};
13 use std::sync::atomic::{AtomicBool, AtomicUsize};
14 use std::sync::{Arc, Mutex};
15 
16 use crate::{Interest, Token};
17 use miow::iocp::{CompletionPort, CompletionStatus};
18 use miow::pipe;
19 use winapi::shared::winerror::{ERROR_BROKEN_PIPE, ERROR_PIPE_LISTENING};
20 use winapi::um::ioapiset::CancelIoEx;
21 
22 /// # Safety
23 ///
24 /// Only valid if the strict is annotated with `#[repr(C)]`. This is only used
25 /// with `Overlapped` and `Inner`, which are correctly annotated.
26 macro_rules! offset_of {
27     ($t:ty, $($field:ident).+) => (
28         &(*(0 as *const $t)).$($field).+ as *const _ as usize
29     )
30 }
31 
32 macro_rules! overlapped2arc {
33     ($e:expr, $t:ty, $($field:ident).+) => ({
34         let offset = offset_of!($t, $($field).+);
35         debug_assert!(offset < mem::size_of::<$t>());
36         Arc::from_raw(($e as usize - offset) as *mut $t)
37     })
38 }
39 
40 /// Non-blocking windows named pipe.
41 ///
42 /// This structure internally contains a `HANDLE` which represents the named
43 /// pipe, and also maintains state associated with the mio event loop and active
44 /// I/O operations that have been scheduled to translate IOCP to a readiness
45 /// model.
46 ///
47 /// Note, IOCP is a *completion* based model whereas mio is a *readiness* based
48 /// model. To bridge this, `NamedPipe` performs internal buffering. Writes are
49 /// written to an internal buffer and the buffer is submitted to IOCP. IOCP
50 /// reads are submitted using internal buffers and `NamedPipe::read` reads from
51 /// this internal buffer.
52 ///
53 /// # Trait implementations
54 ///
55 /// The `Read` and `Write` traits are implemented for `NamedPipe` and for
56 /// `&NamedPipe`. This represents that a named pipe can be concurrently read and
57 /// written to and also can be read and written to at all. Typically a named
58 /// pipe needs to be connected to a client before it can be read or written,
59 /// however.
60 ///
61 /// Note that for I/O operations on a named pipe to succeed then the named pipe
62 /// needs to be associated with an event loop. Until this happens all I/O
63 /// operations will return a "would block" error.
64 ///
65 /// # Managing connections
66 ///
67 /// The `NamedPipe` type supports a `connect` method to connect to a client and
68 /// a `disconnect` method to disconnect from that client. These two methods only
69 /// work once a named pipe is associated with an event loop.
70 ///
71 /// The `connect` method will succeed asynchronously and a completion can be
72 /// detected once the object receives a writable notification.
73 ///
74 /// # Named pipe clients
75 ///
76 /// Currently to create a client of a named pipe server then you can use the
77 /// `OpenOptions` type in the standard library to create a `File` that connects
78 /// to a named pipe. Afterwards you can use the `into_raw_handle` method coupled
79 /// with the `NamedPipe::from_raw_handle` method to convert that to a named pipe
80 /// that can operate asynchronously. Don't forget to pass the
81 /// `FILE_FLAG_OVERLAPPED` flag when opening the `File`.
82 pub struct NamedPipe {
83     inner: Arc<Inner>,
84 }
85 
86 #[repr(C)]
87 struct Inner {
88     handle: pipe::NamedPipe,
89 
90     connect: Overlapped,
91     connecting: AtomicBool,
92 
93     read: Overlapped,
94     write: Overlapped,
95 
96     io: Mutex<Io>,
97 
98     pool: Mutex<BufferPool>,
99 }
100 
101 struct Io {
102     // Uniquely identifies the selector associated with this named pipe
103     cp: Option<Arc<CompletionPort>>,
104     // Token used to identify events
105     token: Option<Token>,
106     read: State,
107     read_interest: bool,
108     write: State,
109     write_interest: bool,
110     connect_error: Option<io::Error>,
111 }
112 
113 #[derive(Debug)]
114 enum State {
115     None,
116     Pending(Vec<u8>, usize),
117     Ok(Vec<u8>, usize),
118     Err(io::Error),
119 }
120 
121 // Odd tokens are for named pipes
122 static NEXT_TOKEN: AtomicUsize = AtomicUsize::new(1);
123 
would_block() -> io::Error124 fn would_block() -> io::Error {
125     io::ErrorKind::WouldBlock.into()
126 }
127 
128 impl NamedPipe {
129     /// Creates a new named pipe at the specified `addr` given a "reasonable
130     /// set" of initial configuration options.
new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe>131     pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
132         let pipe = pipe::NamedPipe::new(addr)?;
133         // Safety: nothing actually unsafe about this. The trait fn includes
134         // `unsafe`.
135         Ok(unsafe { NamedPipe::from_raw_handle(pipe.into_raw_handle()) })
136     }
137 
138     /// Attempts to call `ConnectNamedPipe`, if possible.
139     ///
140     /// This function will attempt to connect this pipe to a client in an
141     /// asynchronous fashion. If the function immediately establishes a
142     /// connection to a client then `Ok(())` is returned. Otherwise if a
143     /// connection attempt was issued and is now in progress then a "would
144     /// block" error is returned.
145     ///
146     /// When the connection is finished then this object will be flagged as
147     /// being ready for a write, or otherwise in the writable state.
148     ///
149     /// # Errors
150     ///
151     /// This function will return a "would block" error if the pipe has not yet
152     /// been registered with an event loop, if the connection operation has
153     /// previously been issued but has not yet completed, or if the connect
154     /// itself was issued and didn't finish immediately.
155     ///
156     /// Normal I/O errors from the call to `ConnectNamedPipe` are returned
157     /// immediately.
connect(&self) -> io::Result<()>158     pub fn connect(&self) -> io::Result<()> {
159         // "Acquire the connecting lock" or otherwise just make sure we're the
160         // only operation that's using the `connect` overlapped instance.
161         if self.inner.connecting.swap(true, SeqCst) {
162             return Err(would_block());
163         }
164 
165         // Now that we've flagged ourselves in the connecting state, issue the
166         // connection attempt. Afterwards interpret the return value and set
167         // internal state accordingly.
168         let res = unsafe {
169             let overlapped = self.inner.connect.as_ptr() as *mut _;
170             self.inner.handle.connect_overlapped(overlapped)
171         };
172 
173         match res {
174             // The connection operation finished immediately, so let's schedule
175             // reads/writes and such.
176             Ok(true) => {
177                 self.inner.connecting.store(false, SeqCst);
178                 Inner::post_register(&self.inner, None);
179                 Ok(())
180             }
181 
182             // If the overlapped operation was successful and didn't finish
183             // immediately then we forget a copy of the arc we hold
184             // internally. This ensures that when the completion status comes
185             // in for the I/O operation finishing it'll have a reference
186             // associated with it and our data will still be valid. The
187             // `connect_done` function will "reify" this forgotten pointer to
188             // drop the refcount on the other side.
189             Ok(false) => {
190                 mem::forget(self.inner.clone());
191                 Err(would_block())
192             }
193 
194             Err(e) => {
195                 self.inner.connecting.store(false, SeqCst);
196                 Err(e)
197             }
198         }
199     }
200 
201     /// Takes any internal error that has happened after the last I/O operation
202     /// which hasn't been retrieved yet.
203     ///
204     /// This is particularly useful when detecting failed attempts to `connect`.
205     /// After a completed `connect` flags this pipe as writable then callers
206     /// must invoke this method to determine whether the connection actually
207     /// succeeded. If this function returns `None` then a client is connected,
208     /// otherwise it returns an error of what happened and a client shouldn't be
209     /// connected.
take_error(&self) -> io::Result<Option<io::Error>>210     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
211         Ok(self.inner.io.lock().unwrap().connect_error.take())
212     }
213 
214     /// Disconnects this named pipe from a connected client.
215     ///
216     /// This function will disconnect the pipe from a connected client, if any,
217     /// transitively calling the `DisconnectNamedPipe` function.
218     ///
219     /// After a `disconnect` is issued, then a `connect` may be called again to
220     /// connect to another client.
disconnect(&self) -> io::Result<()>221     pub fn disconnect(&self) -> io::Result<()> {
222         self.inner.handle.disconnect()
223     }
224 }
225 
226 impl FromRawHandle for NamedPipe {
from_raw_handle(handle: RawHandle) -> NamedPipe227     unsafe fn from_raw_handle(handle: RawHandle) -> NamedPipe {
228         NamedPipe {
229             inner: Arc::new(Inner {
230                 // Safety: not really unsafe
231                 handle: pipe::NamedPipe::from_raw_handle(handle),
232                 // transmutes to straddle winapi versions (mio 0.6 is on an
233                 // older winapi)
234                 connect: Overlapped::new(connect_done),
235                 connecting: AtomicBool::new(false),
236                 read: Overlapped::new(read_done),
237                 write: Overlapped::new(write_done),
238                 io: Mutex::new(Io {
239                     cp: None,
240                     token: None,
241                     read: State::None,
242                     read_interest: false,
243                     write: State::None,
244                     write_interest: false,
245                     connect_error: None,
246                 }),
247                 pool: Mutex::new(BufferPool::with_capacity(2)),
248             }),
249         }
250     }
251 }
252 
253 impl Read for NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>254     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
255         <&NamedPipe as Read>::read(&mut &*self, buf)
256     }
257 }
258 
259 impl Write for NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>260     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
261         <&NamedPipe as Write>::write(&mut &*self, buf)
262     }
263 
flush(&mut self) -> io::Result<()>264     fn flush(&mut self) -> io::Result<()> {
265         <&NamedPipe as Write>::flush(&mut &*self)
266     }
267 }
268 
269 impl<'a> Read for &'a NamedPipe {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>270     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
271         let mut state = self.inner.io.lock().unwrap();
272 
273         if state.token.is_none() {
274             return Err(would_block());
275         }
276 
277         match mem::replace(&mut state.read, State::None) {
278             // In theory not possible with `token` checked above,
279             // but return would block for now.
280             State::None => Err(would_block()),
281 
282             // A read is in flight, still waiting for it to finish
283             State::Pending(buf, amt) => {
284                 state.read = State::Pending(buf, amt);
285                 Err(would_block())
286             }
287 
288             // We previously read something into `data`, try to copy out some
289             // data. If we copy out all the data schedule a new read and
290             // otherwise store the buffer to get read later.
291             State::Ok(data, cur) => {
292                 let n = {
293                     let mut remaining = &data[cur..];
294                     remaining.read(buf)?
295                 };
296                 let next = cur + n;
297                 if next != data.len() {
298                     state.read = State::Ok(data, next);
299                 } else {
300                     self.inner.put_buffer(data);
301                     Inner::schedule_read(&self.inner, &mut state, None);
302                 }
303                 Ok(n)
304             }
305 
306             // Looks like an in-flight read hit an error, return that here while
307             // we schedule a new one.
308             State::Err(e) => {
309                 Inner::schedule_read(&self.inner, &mut state, None);
310                 if e.raw_os_error() == Some(ERROR_BROKEN_PIPE as i32) {
311                     Ok(0)
312                 } else {
313                     Err(e)
314                 }
315             }
316         }
317     }
318 }
319 
320 impl<'a> Write for &'a NamedPipe {
write(&mut self, buf: &[u8]) -> io::Result<usize>321     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
322         // Make sure there's no writes pending
323         let mut io = self.inner.io.lock().unwrap();
324 
325         if io.token.is_none() {
326             return Err(would_block());
327         }
328 
329         match io.write {
330             State::None => {}
331             State::Err(_) => match mem::replace(&mut io.write, State::None) {
332                 State::Err(e) => return Err(e),
333                 // `io` is locked, so this branch is unreachable
334                 _ => unreachable!(),
335             },
336             // any other state should be handled in `write_done`
337             _ => {
338                 return Err(would_block());
339             }
340         }
341 
342         // Move `buf` onto the heap and fire off the write
343         let mut owned_buf = self.inner.get_buffer();
344         owned_buf.extend(buf);
345         match Inner::maybe_schedule_write(&self.inner, owned_buf, 0, &mut io)? {
346             // Some bytes are written immediately
347             Some(n) => Ok(n),
348             // Write operation is anqueued for whole buffer
349             None => Ok(buf.len()),
350         }
351     }
352 
flush(&mut self) -> io::Result<()>353     fn flush(&mut self) -> io::Result<()> {
354         Ok(())
355     }
356 }
357 
358 impl Source for NamedPipe {
register( &mut self, registry: &Registry, token: Token, interest: Interest, ) -> io::Result<()>359     fn register(
360         &mut self,
361         registry: &Registry,
362         token: Token,
363         interest: Interest,
364     ) -> io::Result<()> {
365         let mut io = self.inner.io.lock().unwrap();
366 
367         io.check_association(registry, false)?;
368 
369         if io.token.is_some() {
370             return Err(io::Error::new(
371                 io::ErrorKind::AlreadyExists,
372                 "I/O source already registered with a `Registry`",
373             ));
374         }
375 
376         if io.cp.is_none() {
377             io.cp = Some(poll::selector(registry).clone_port());
378 
379             let inner_token = NEXT_TOKEN.fetch_add(2, Relaxed) + 2;
380             poll::selector(registry)
381                 .inner
382                 .cp
383                 .add_handle(inner_token, &self.inner.handle)?;
384         }
385 
386         io.token = Some(token);
387         io.read_interest = interest.is_readable();
388         io.write_interest = interest.is_writable();
389         drop(io);
390 
391         Inner::post_register(&self.inner, None);
392 
393         Ok(())
394     }
395 
reregister( &mut self, registry: &Registry, token: Token, interest: Interest, ) -> io::Result<()>396     fn reregister(
397         &mut self,
398         registry: &Registry,
399         token: Token,
400         interest: Interest,
401     ) -> io::Result<()> {
402         let mut io = self.inner.io.lock().unwrap();
403 
404         io.check_association(registry, true)?;
405 
406         io.token = Some(token);
407         io.read_interest = interest.is_readable();
408         io.write_interest = interest.is_writable();
409         drop(io);
410 
411         Inner::post_register(&self.inner, None);
412 
413         Ok(())
414     }
415 
deregister(&mut self, registry: &Registry) -> io::Result<()>416     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
417         let mut io = self.inner.io.lock().unwrap();
418 
419         io.check_association(registry, true)?;
420 
421         if io.token.is_none() {
422             return Err(io::Error::new(
423                 io::ErrorKind::NotFound,
424                 "I/O source not registered with `Registry`",
425             ));
426         }
427 
428         io.token = None;
429         Ok(())
430     }
431 }
432 
433 impl AsRawHandle for NamedPipe {
as_raw_handle(&self) -> RawHandle434     fn as_raw_handle(&self) -> RawHandle {
435         self.inner.handle.as_raw_handle()
436     }
437 }
438 
439 impl fmt::Debug for NamedPipe {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result440     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
441         self.inner.handle.fmt(f)
442     }
443 }
444 
445 impl Drop for NamedPipe {
drop(&mut self)446     fn drop(&mut self) {
447         // Cancel pending reads/connects, but don't cancel writes to ensure that
448         // everything is flushed out.
449         unsafe {
450             if self.inner.connecting.load(SeqCst) {
451                 drop(cancel(&self.inner.handle, &self.inner.connect));
452             }
453 
454             let io = self.inner.io.lock().unwrap();
455 
456             match io.read {
457                 State::Pending(..) => {
458                     drop(cancel(&self.inner.handle, &self.inner.read));
459                 }
460                 _ => {}
461             }
462         }
463     }
464 }
465 
466 impl Inner {
467     /// Schedules a read to happen in the background, executing an overlapped
468     /// operation.
469     ///
470     /// This function returns `true` if a normal error happens or if the read
471     /// is scheduled in the background. If the pipe is no longer connected
472     /// (ERROR_PIPE_LISTENING) then `false` is returned and no read is
473     /// scheduled.
schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool474     fn schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool {
475         // Check to see if a read is already scheduled/completed
476         match io.read {
477             State::None => {}
478             _ => return true,
479         }
480 
481         // Allocate a buffer and schedule the read.
482         let mut buf = me.get_buffer();
483         let e = unsafe {
484             let overlapped = me.read.as_ptr() as *mut _;
485             let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
486             me.handle.read_overlapped(slice, overlapped)
487         };
488 
489         match e {
490             // See `NamedPipe::connect` above for the rationale behind `forget`
491             Ok(_) => {
492                 io.read = State::Pending(buf, 0); // 0 is ignored on read side
493                 mem::forget(me.clone());
494                 true
495             }
496 
497             // If ERROR_PIPE_LISTENING happens then it's not a real read error,
498             // we just need to wait for a connect.
499             Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_LISTENING as i32) => false,
500 
501             // If some other error happened, though, we're now readable to give
502             // out the error.
503             Err(e) => {
504                 io.read = State::Err(e);
505                 io.notify_readable(events);
506                 true
507             }
508         }
509     }
510 
511     /// Maybe schedules overlapped write operation.
512     ///
513     /// * `None` means that overlapped operation was enqueued
514     /// * `Some(n)` means that `n` bytes was immediately written.
515     ///   Note, that `write_done` will fire anyway to clean up the state.
maybe_schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, ) -> io::Result<Option<usize>>516     fn maybe_schedule_write(
517         me: &Arc<Inner>,
518         buf: Vec<u8>,
519         pos: usize,
520         io: &mut Io,
521     ) -> io::Result<Option<usize>> {
522         // Very similar to `schedule_read` above, just done for the write half.
523         let e = unsafe {
524             let overlapped = me.write.as_ptr() as *mut _;
525             me.handle.write_overlapped(&buf[pos..], overlapped)
526         };
527 
528         // See `connect` above for the rationale behind `forget`
529         match e {
530             // `n` bytes are written immediately
531             Ok(Some(n)) => {
532                 io.write = State::Ok(buf, pos);
533                 mem::forget(me.clone());
534                 Ok(Some(n))
535             }
536             // write operation is enqueued
537             Ok(None) => {
538                 io.write = State::Pending(buf, pos);
539                 mem::forget(me.clone());
540                 Ok(None)
541             }
542             Err(e) => Err(e),
543         }
544     }
545 
schedule_write( me: &Arc<Inner>, buf: Vec<u8>, pos: usize, io: &mut Io, events: Option<&mut Vec<Event>>, )546     fn schedule_write(
547         me: &Arc<Inner>,
548         buf: Vec<u8>,
549         pos: usize,
550         io: &mut Io,
551         events: Option<&mut Vec<Event>>,
552     ) {
553         match Inner::maybe_schedule_write(me, buf, pos, io) {
554             Ok(Some(_)) => {
555                 // immediate result will be handled in `write_done`,
556                 // so we'll reinterpret the `Ok` state
557                 let state = mem::replace(&mut io.write, State::None);
558                 io.write = match state {
559                     State::Ok(buf, pos) => State::Pending(buf, pos),
560                     // io is locked, so this branch is unreachable
561                     _ => unreachable!(),
562                 };
563                 mem::forget(me.clone());
564             }
565             Ok(None) => (),
566             Err(e) => {
567                 io.write = State::Err(e);
568                 io.notify_writable(events);
569             }
570         }
571     }
572 
post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>)573     fn post_register(me: &Arc<Inner>, mut events: Option<&mut Vec<Event>>) {
574         let mut io = me.io.lock().unwrap();
575         if Inner::schedule_read(&me, &mut io, events.as_mut().map(|ptr| &mut **ptr)) {
576             if let State::None = io.write {
577                 io.notify_writable(events);
578             }
579         }
580     }
581 
get_buffer(&self) -> Vec<u8>582     fn get_buffer(&self) -> Vec<u8> {
583         self.pool.lock().unwrap().get(4 * 1024)
584     }
585 
put_buffer(&self, buf: Vec<u8>)586     fn put_buffer(&self, buf: Vec<u8>) {
587         self.pool.lock().unwrap().put(buf)
588     }
589 }
590 
cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()>591 unsafe fn cancel<T: AsRawHandle>(handle: &T, overlapped: &Overlapped) -> io::Result<()> {
592     let ret = CancelIoEx(handle.as_raw_handle(), overlapped.as_ptr() as *mut _);
593     // `CancelIoEx` returns 0 on error:
594     // https://docs.microsoft.com/en-us/windows/win32/fileio/cancelioex-func
595     if ret == 0 {
596         Err(io::Error::last_os_error())
597     } else {
598         Ok(())
599     }
600 }
601 
connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)602 fn connect_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
603     let status = CompletionStatus::from_entry(status);
604 
605     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
606     // the refcount is available to us due to the `mem::forget` in
607     // `connect` above.
608     let me = unsafe { overlapped2arc!(status.overlapped(), Inner, connect) };
609 
610     // Flag ourselves as no longer using the `connect` overlapped instances.
611     let prev = me.connecting.swap(false, SeqCst);
612     assert!(prev, "NamedPipe was not previously connecting");
613 
614     // Stash away our connect error if one happened
615     debug_assert_eq!(status.bytes_transferred(), 0);
616     unsafe {
617         match me.handle.result(status.overlapped()) {
618             Ok(n) => debug_assert_eq!(n, 0),
619             Err(e) => me.io.lock().unwrap().connect_error = Some(e),
620         }
621     }
622 
623     // We essentially just finished a registration, so kick off a
624     // read and register write readiness.
625     Inner::post_register(&me, events);
626 }
627 
read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)628 fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
629     let status = CompletionStatus::from_entry(status);
630 
631     // Acquire the `FromRawArc<Inner>`. Note that we should be guaranteed that
632     // the refcount is available to us due to the `mem::forget` in
633     // `schedule_read` above.
634     let me = unsafe { overlapped2arc!(status.overlapped(), Inner, read) };
635 
636     // Move from the `Pending` to `Ok` state.
637     let mut io = me.io.lock().unwrap();
638     let mut buf = match mem::replace(&mut io.read, State::None) {
639         State::Pending(buf, _) => buf,
640         _ => unreachable!(),
641     };
642     unsafe {
643         match me.handle.result(status.overlapped()) {
644             Ok(n) => {
645                 debug_assert_eq!(status.bytes_transferred() as usize, n);
646                 buf.set_len(status.bytes_transferred() as usize);
647                 io.read = State::Ok(buf, 0);
648             }
649             Err(e) => {
650                 debug_assert_eq!(status.bytes_transferred(), 0);
651                 io.read = State::Err(e);
652             }
653         }
654     }
655 
656     // Flag our readiness that we've got data.
657     io.notify_readable(events);
658 }
659 
write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>)660 fn write_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
661     let status = CompletionStatus::from_entry(status);
662 
663     // Acquire the `Arc<Inner>`. Note that we should be guaranteed that
664     // the refcount is available to us due to the `mem::forget` in
665     // `schedule_write` above.
666     let me = unsafe { overlapped2arc!(status.overlapped(), Inner, write) };
667 
668     // Make the state change out of `Pending`. If we wrote the entire buffer
669     // then we're writable again and otherwise we schedule another write.
670     let mut io = me.io.lock().unwrap();
671     let (buf, pos) = match mem::replace(&mut io.write, State::None) {
672         // `Ok` here means, that the operation was completed immediately
673         // `bytes_transferred` is already reported to a client
674         State::Ok(..) => {
675             io.notify_writable(events);
676             return;
677         }
678         State::Pending(buf, pos) => (buf, pos),
679         _ => unreachable!(),
680     };
681 
682     unsafe {
683         match me.handle.result(status.overlapped()) {
684             Ok(n) => {
685                 debug_assert_eq!(status.bytes_transferred() as usize, n);
686                 let new_pos = pos + (status.bytes_transferred() as usize);
687                 if new_pos == buf.len() {
688                     me.put_buffer(buf);
689                     io.notify_writable(events);
690                 } else {
691                     Inner::schedule_write(&me, buf, new_pos, &mut io, events);
692                 }
693             }
694             Err(e) => {
695                 debug_assert_eq!(status.bytes_transferred(), 0);
696                 io.write = State::Err(e);
697                 io.notify_writable(events);
698             }
699         }
700     }
701 }
702 
703 impl Io {
check_association(&self, registry: &Registry, required: bool) -> io::Result<()>704     fn check_association(&self, registry: &Registry, required: bool) -> io::Result<()> {
705         match self.cp {
706             Some(ref cp) if !poll::selector(registry).same_port(cp) => Err(io::Error::new(
707                 io::ErrorKind::AlreadyExists,
708                 "I/O source already registered with a different `Registry`",
709             )),
710             None if required => Err(io::Error::new(
711                 io::ErrorKind::NotFound,
712                 "I/O source not registered with `Registry`",
713             )),
714             _ => Ok(()),
715         }
716     }
717 
notify_readable(&self, events: Option<&mut Vec<Event>>)718     fn notify_readable(&self, events: Option<&mut Vec<Event>>) {
719         if let Some(token) = self.token {
720             let mut ev = Event::new(token);
721             ev.set_readable();
722 
723             if let Some(events) = events {
724                 events.push(ev);
725             } else {
726                 let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
727             }
728         }
729     }
730 
notify_writable(&self, events: Option<&mut Vec<Event>>)731     fn notify_writable(&self, events: Option<&mut Vec<Event>>) {
732         if let Some(token) = self.token {
733             let mut ev = Event::new(token);
734             ev.set_writable();
735 
736             if let Some(events) = events {
737                 events.push(ev);
738             } else {
739                 let _ = self.cp.as_ref().unwrap().post(ev.to_completion_status());
740             }
741         }
742     }
743 }
744 
745 struct BufferPool {
746     pool: Vec<Vec<u8>>,
747 }
748 
749 impl BufferPool {
with_capacity(cap: usize) -> BufferPool750     fn with_capacity(cap: usize) -> BufferPool {
751         BufferPool {
752             pool: Vec::with_capacity(cap),
753         }
754     }
755 
get(&mut self, default_cap: usize) -> Vec<u8>756     fn get(&mut self, default_cap: usize) -> Vec<u8> {
757         self.pool
758             .pop()
759             .unwrap_or_else(|| Vec::with_capacity(default_cap))
760     }
761 
put(&mut self, mut buf: Vec<u8>)762     fn put(&mut self, mut buf: Vec<u8>) {
763         if self.pool.len() < self.pool.capacity() {
764             unsafe {
765                 buf.set_len(0);
766             }
767             self.pool.push(buf);
768         }
769     }
770 }
771