• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::afd::{self, Afd, AfdPollInfo};
2 use super::io_status_block::IoStatusBlock;
3 use super::Event;
4 use crate::sys::Events;
5 
6 cfg_net! {
7     use crate::sys::event::{
8         ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS,
9     };
10     use crate::Interest;
11 }
12 
13 use super::iocp::{CompletionPort, CompletionStatus};
14 use std::collections::VecDeque;
15 use std::ffi::c_void;
16 use std::io;
17 use std::marker::PhantomPinned;
18 use std::os::windows::io::RawSocket;
19 use std::pin::Pin;
20 #[cfg(debug_assertions)]
21 use std::sync::atomic::AtomicUsize;
22 use std::sync::atomic::{AtomicBool, Ordering};
23 use std::sync::{Arc, Mutex};
24 use std::time::Duration;
25 
26 use windows_sys::Win32::Foundation::{
27     ERROR_INVALID_HANDLE, ERROR_IO_PENDING, HANDLE, STATUS_CANCELLED, WAIT_TIMEOUT,
28 };
29 use windows_sys::Win32::System::IO::OVERLAPPED;
30 
31 #[derive(Debug)]
32 struct AfdGroup {
33     #[cfg_attr(not(feature = "net"), allow(dead_code))]
34     cp: Arc<CompletionPort>,
35     afd_group: Mutex<Vec<Arc<Afd>>>,
36 }
37 
38 impl AfdGroup {
new(cp: Arc<CompletionPort>) -> AfdGroup39     pub fn new(cp: Arc<CompletionPort>) -> AfdGroup {
40         AfdGroup {
41             afd_group: Mutex::new(Vec::new()),
42             cp,
43         }
44     }
45 
release_unused_afd(&self)46     pub fn release_unused_afd(&self) {
47         let mut afd_group = self.afd_group.lock().unwrap();
48         afd_group.retain(|g| Arc::strong_count(g) > 1);
49     }
50 }
51 
52 cfg_io_source! {
53     const POLL_GROUP__MAX_GROUP_SIZE: usize = 32;
54 
55     impl AfdGroup {
56         pub fn acquire(&self) -> io::Result<Arc<Afd>> {
57             let mut afd_group = self.afd_group.lock().unwrap();
58             if afd_group.len() == 0 {
59                 self._alloc_afd_group(&mut afd_group)?;
60             } else {
61                 // + 1 reference in Vec
62                 if Arc::strong_count(afd_group.last().unwrap()) > POLL_GROUP__MAX_GROUP_SIZE  {
63                     self._alloc_afd_group(&mut afd_group)?;
64                 }
65             }
66 
67             match afd_group.last() {
68                 Some(arc) => Ok(arc.clone()),
69                 None => unreachable!(
70                     "Cannot acquire afd, {:#?}, afd_group: {:#?}",
71                     self, afd_group
72                 ),
73             }
74         }
75 
76         fn _alloc_afd_group(&self, afd_group: &mut Vec<Arc<Afd>>) -> io::Result<()> {
77             let afd = Afd::new(&self.cp)?;
78             let arc = Arc::new(afd);
79             afd_group.push(arc);
80             Ok(())
81         }
82     }
83 }
84 
85 #[derive(Debug)]
86 enum SockPollStatus {
87     Idle,
88     Pending,
89     Cancelled,
90 }
91 
92 #[derive(Debug)]
93 pub struct SockState {
94     iosb: IoStatusBlock,
95     poll_info: AfdPollInfo,
96     afd: Arc<Afd>,
97 
98     base_socket: RawSocket,
99 
100     user_evts: u32,
101     pending_evts: u32,
102 
103     user_data: u64,
104 
105     poll_status: SockPollStatus,
106     delete_pending: bool,
107 
108     // last raw os error
109     error: Option<i32>,
110 
111     _pinned: PhantomPinned,
112 }
113 
114 impl SockState {
update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()>115     fn update(&mut self, self_arc: &Pin<Arc<Mutex<SockState>>>) -> io::Result<()> {
116         assert!(!self.delete_pending);
117 
118         // make sure to reset previous error before a new update
119         self.error = None;
120 
121         if let SockPollStatus::Pending = self.poll_status {
122             if (self.user_evts & afd::KNOWN_EVENTS & !self.pending_evts) == 0 {
123                 /* All the events the user is interested in are already being monitored by
124                  * the pending poll operation. It might spuriously complete because of an
125                  * event that we're no longer interested in; when that happens we'll submit
126                  * a new poll operation with the updated event mask. */
127             } else {
128                 /* A poll operation is already pending, but it's not monitoring for all the
129                  * events that the user is interested in. Therefore, cancel the pending
130                  * poll operation; when we receive it's completion package, a new poll
131                  * operation will be submitted with the correct event mask. */
132                 if let Err(e) = self.cancel() {
133                     self.error = e.raw_os_error();
134                     return Err(e);
135                 }
136                 return Ok(());
137             }
138         } else if let SockPollStatus::Cancelled = self.poll_status {
139             /* The poll operation has already been cancelled, we're still waiting for
140              * it to return. For now, there's nothing that needs to be done. */
141         } else if let SockPollStatus::Idle = self.poll_status {
142             /* No poll operation is pending; start one. */
143             self.poll_info.exclusive = 0;
144             self.poll_info.number_of_handles = 1;
145             self.poll_info.timeout = i64::MAX;
146             self.poll_info.handles[0].handle = self.base_socket as HANDLE;
147             self.poll_info.handles[0].status = 0;
148             self.poll_info.handles[0].events = self.user_evts | afd::POLL_LOCAL_CLOSE;
149 
150             // Increase the ref count as the memory will be used by the kernel.
151             let overlapped_ptr = into_overlapped(self_arc.clone());
152 
153             let result = unsafe {
154                 self.afd
155                     .poll(&mut self.poll_info, &mut *self.iosb, overlapped_ptr)
156             };
157             if let Err(e) = result {
158                 let code = e.raw_os_error().unwrap();
159                 if code == ERROR_IO_PENDING as i32 {
160                     /* Overlapped poll operation in progress; this is expected. */
161                 } else {
162                     // Since the operation failed it means the kernel won't be
163                     // using the memory any more.
164                     drop(from_overlapped(overlapped_ptr as *mut _));
165                     if code == ERROR_INVALID_HANDLE as i32 {
166                         /* Socket closed; it'll be dropped. */
167                         self.mark_delete();
168                         return Ok(());
169                     } else {
170                         self.error = e.raw_os_error();
171                         return Err(e);
172                     }
173                 }
174             }
175 
176             self.poll_status = SockPollStatus::Pending;
177             self.pending_evts = self.user_evts;
178         } else {
179             unreachable!("Invalid poll status during update, {:#?}", self)
180         }
181 
182         Ok(())
183     }
184 
cancel(&mut self) -> io::Result<()>185     fn cancel(&mut self) -> io::Result<()> {
186         match self.poll_status {
187             SockPollStatus::Pending => {}
188             _ => unreachable!("Invalid poll status during cancel, {:#?}", self),
189         };
190         unsafe {
191             self.afd.cancel(&mut *self.iosb)?;
192         }
193         self.poll_status = SockPollStatus::Cancelled;
194         self.pending_evts = 0;
195         Ok(())
196     }
197 
198     // This is the function called from the overlapped using as Arc<Mutex<SockState>>. Watch out for reference counting.
feed_event(&mut self) -> Option<Event>199     fn feed_event(&mut self) -> Option<Event> {
200         self.poll_status = SockPollStatus::Idle;
201         self.pending_evts = 0;
202 
203         let mut afd_events = 0;
204         // We use the status info in IO_STATUS_BLOCK to determine the socket poll status. It is unsafe to use a pointer of IO_STATUS_BLOCK.
205         unsafe {
206             if self.delete_pending {
207                 return None;
208             } else if self.iosb.Anonymous.Status == STATUS_CANCELLED {
209                 /* The poll request was cancelled by CancelIoEx. */
210             } else if self.iosb.Anonymous.Status < 0 {
211                 /* The overlapped request itself failed in an unexpected way. */
212                 afd_events = afd::POLL_CONNECT_FAIL;
213             } else if self.poll_info.number_of_handles < 1 {
214                 /* This poll operation succeeded but didn't report any socket events. */
215             } else if self.poll_info.handles[0].events & afd::POLL_LOCAL_CLOSE != 0 {
216                 /* The poll operation reported that the socket was closed. */
217                 self.mark_delete();
218                 return None;
219             } else {
220                 afd_events = self.poll_info.handles[0].events;
221             }
222         }
223 
224         afd_events &= self.user_evts;
225 
226         if afd_events == 0 {
227             return None;
228         }
229 
230         // In mio, we have to simulate Edge-triggered behavior to match API usage.
231         // The strategy here is to intercept all read/write from user that could cause WouldBlock usage,
232         // then reregister the socket to reset the interests.
233         self.user_evts &= !afd_events;
234 
235         Some(Event {
236             data: self.user_data,
237             flags: afd_events,
238         })
239     }
240 
is_pending_deletion(&self) -> bool241     pub fn is_pending_deletion(&self) -> bool {
242         self.delete_pending
243     }
244 
mark_delete(&mut self)245     pub fn mark_delete(&mut self) {
246         if !self.delete_pending {
247             if let SockPollStatus::Pending = self.poll_status {
248                 drop(self.cancel());
249             }
250 
251             self.delete_pending = true;
252         }
253     }
254 
has_error(&self) -> bool255     fn has_error(&self) -> bool {
256         self.error.is_some()
257     }
258 }
259 
260 cfg_io_source! {
261     impl SockState {
262         fn new(raw_socket: RawSocket, afd: Arc<Afd>) -> io::Result<SockState> {
263             Ok(SockState {
264                 iosb: IoStatusBlock::zeroed(),
265                 poll_info: AfdPollInfo::zeroed(),
266                 afd,
267                 base_socket: get_base_socket(raw_socket)?,
268                 user_evts: 0,
269                 pending_evts: 0,
270                 user_data: 0,
271                 poll_status: SockPollStatus::Idle,
272                 delete_pending: false,
273                 error: None,
274                 _pinned: PhantomPinned,
275             })
276         }
277 
278         /// True if need to be added on update queue, false otherwise.
279         fn set_event(&mut self, ev: Event) -> bool {
280             /* afd::POLL_CONNECT_FAIL and afd::POLL_ABORT are always reported, even when not requested by the caller. */
281             let events = ev.flags | afd::POLL_CONNECT_FAIL | afd::POLL_ABORT;
282 
283             self.user_evts = events;
284             self.user_data = ev.data;
285 
286             (events & !self.pending_evts) != 0
287         }
288     }
289 }
290 
291 impl Drop for SockState {
drop(&mut self)292     fn drop(&mut self) {
293         self.mark_delete();
294     }
295 }
296 
297 /// Converts the pointer to a `SockState` into a raw pointer.
298 /// To revert see `from_overlapped`.
into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void299 fn into_overlapped(sock_state: Pin<Arc<Mutex<SockState>>>) -> *mut c_void {
300     let overlapped_ptr: *const Mutex<SockState> =
301         unsafe { Arc::into_raw(Pin::into_inner_unchecked(sock_state)) };
302     overlapped_ptr as *mut _
303 }
304 
305 /// Convert a raw overlapped pointer into a reference to `SockState`.
306 /// Reverts `into_overlapped`.
from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>>307 fn from_overlapped(ptr: *mut OVERLAPPED) -> Pin<Arc<Mutex<SockState>>> {
308     let sock_ptr: *const Mutex<SockState> = ptr as *const _;
309     unsafe { Pin::new_unchecked(Arc::from_raw(sock_ptr)) }
310 }
311 
312 /// Each Selector has a globally unique(ish) ID associated with it. This ID
313 /// gets tracked by `TcpStream`, `TcpListener`, etc... when they are first
314 /// registered with the `Selector`. If a type that is previously associated with
315 /// a `Selector` attempts to register itself with a different `Selector`, the
316 /// operation will return with an error. This matches windows behavior.
317 #[cfg(debug_assertions)]
318 static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
319 
320 /// Windows implementaion of `sys::Selector`
321 ///
322 /// Edge-triggered event notification is simulated by resetting internal event flag of each socket state `SockState`
323 /// and setting all events back by intercepting all requests that could cause `io::ErrorKind::WouldBlock` happening.
324 ///
325 /// This selector is currently only support socket due to `Afd` driver is winsock2 specific.
326 #[derive(Debug)]
327 pub struct Selector {
328     #[cfg(debug_assertions)]
329     id: usize,
330     pub(super) inner: Arc<SelectorInner>,
331     #[cfg(debug_assertions)]
332     has_waker: AtomicBool,
333 }
334 
335 impl Selector {
new() -> io::Result<Selector>336     pub fn new() -> io::Result<Selector> {
337         SelectorInner::new().map(|inner| {
338             #[cfg(debug_assertions)]
339             let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
340             Selector {
341                 #[cfg(debug_assertions)]
342                 id,
343                 inner: Arc::new(inner),
344                 #[cfg(debug_assertions)]
345                 has_waker: AtomicBool::new(false),
346             }
347         })
348     }
349 
try_clone(&self) -> io::Result<Selector>350     pub fn try_clone(&self) -> io::Result<Selector> {
351         Ok(Selector {
352             #[cfg(debug_assertions)]
353             id: self.id,
354             inner: Arc::clone(&self.inner),
355             #[cfg(debug_assertions)]
356             has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
357         })
358     }
359 
360     /// # Safety
361     ///
362     /// This requires a mutable reference to self because only a single thread
363     /// can poll IOCP at a time.
select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>364     pub fn select(&mut self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
365         self.inner.select(events, timeout)
366     }
367 
368     #[cfg(debug_assertions)]
register_waker(&self) -> bool369     pub fn register_waker(&self) -> bool {
370         self.has_waker.swap(true, Ordering::AcqRel)
371     }
372 
clone_port(&self) -> Arc<CompletionPort>373     pub(super) fn clone_port(&self) -> Arc<CompletionPort> {
374         self.inner.cp.clone()
375     }
376 
377     #[cfg(feature = "os-ext")]
same_port(&self, other: &Arc<CompletionPort>) -> bool378     pub(super) fn same_port(&self, other: &Arc<CompletionPort>) -> bool {
379         Arc::ptr_eq(&self.inner.cp, other)
380     }
381 }
382 
383 cfg_io_source! {
384     use super::InternalState;
385     use crate::Token;
386 
387     impl Selector {
388         pub(super) fn register(
389             &self,
390             socket: RawSocket,
391             token: Token,
392             interests: Interest,
393         ) -> io::Result<InternalState> {
394             SelectorInner::register(&self.inner, socket, token, interests)
395         }
396 
397         pub(super) fn reregister(
398             &self,
399             state: Pin<Arc<Mutex<SockState>>>,
400             token: Token,
401             interests: Interest,
402         ) -> io::Result<()> {
403             self.inner.reregister(state, token, interests)
404         }
405 
406         #[cfg(debug_assertions)]
407         pub fn id(&self) -> usize {
408             self.id
409         }
410     }
411 }
412 
413 #[derive(Debug)]
414 pub struct SelectorInner {
415     pub(super) cp: Arc<CompletionPort>,
416     update_queue: Mutex<VecDeque<Pin<Arc<Mutex<SockState>>>>>,
417     afd_group: AfdGroup,
418     is_polling: AtomicBool,
419 }
420 
421 // We have ensured thread safety by introducing lock manually.
422 unsafe impl Sync for SelectorInner {}
423 
424 impl SelectorInner {
new() -> io::Result<SelectorInner>425     pub fn new() -> io::Result<SelectorInner> {
426         CompletionPort::new(0).map(|cp| {
427             let cp = Arc::new(cp);
428             let cp_afd = Arc::clone(&cp);
429 
430             SelectorInner {
431                 cp,
432                 update_queue: Mutex::new(VecDeque::new()),
433                 afd_group: AfdGroup::new(cp_afd),
434                 is_polling: AtomicBool::new(false),
435             }
436         })
437     }
438 
439     /// # Safety
440     ///
441     /// May only be calling via `Selector::select`.
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>442     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
443         events.clear();
444 
445         if timeout.is_none() {
446             loop {
447                 let len = self.select2(&mut events.statuses, &mut events.events, None)?;
448                 if len == 0 {
449                     continue;
450                 }
451                 break Ok(());
452             }
453         } else {
454             self.select2(&mut events.statuses, &mut events.events, timeout)?;
455             Ok(())
456         }
457     }
458 
select2( &self, statuses: &mut [CompletionStatus], events: &mut Vec<Event>, timeout: Option<Duration>, ) -> io::Result<usize>459     pub fn select2(
460         &self,
461         statuses: &mut [CompletionStatus],
462         events: &mut Vec<Event>,
463         timeout: Option<Duration>,
464     ) -> io::Result<usize> {
465         assert!(!self.is_polling.swap(true, Ordering::AcqRel));
466 
467         unsafe { self.update_sockets_events() }?;
468 
469         let result = self.cp.get_many(statuses, timeout);
470 
471         self.is_polling.store(false, Ordering::Relaxed);
472 
473         match result {
474             Ok(iocp_events) => Ok(unsafe { self.feed_events(events, iocp_events) }),
475             Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => Ok(0),
476             Err(e) => Err(e),
477         }
478     }
479 
update_sockets_events(&self) -> io::Result<()>480     unsafe fn update_sockets_events(&self) -> io::Result<()> {
481         let mut update_queue = self.update_queue.lock().unwrap();
482         for sock in update_queue.iter_mut() {
483             let mut sock_internal = sock.lock().unwrap();
484             if !sock_internal.is_pending_deletion() {
485                 sock_internal.update(sock)?;
486             }
487         }
488 
489         // remove all sock which do not have error, they have afd op pending
490         update_queue.retain(|sock| sock.lock().unwrap().has_error());
491 
492         self.afd_group.release_unused_afd();
493         Ok(())
494     }
495 
496     // It returns processed count of iocp_events rather than the events itself.
feed_events( &self, events: &mut Vec<Event>, iocp_events: &[CompletionStatus], ) -> usize497     unsafe fn feed_events(
498         &self,
499         events: &mut Vec<Event>,
500         iocp_events: &[CompletionStatus],
501     ) -> usize {
502         let mut n = 0;
503         let mut update_queue = self.update_queue.lock().unwrap();
504         for iocp_event in iocp_events.iter() {
505             if iocp_event.overlapped().is_null() {
506                 events.push(Event::from_completion_status(iocp_event));
507                 n += 1;
508                 continue;
509             } else if iocp_event.token() % 2 == 1 {
510                 // Handle is a named pipe. This could be extended to be any non-AFD event.
511                 let callback = (*(iocp_event.overlapped() as *mut super::Overlapped)).callback;
512 
513                 let len = events.len();
514                 callback(iocp_event.entry(), Some(events));
515                 n += events.len() - len;
516                 continue;
517             }
518 
519             let sock_state = from_overlapped(iocp_event.overlapped());
520             let mut sock_guard = sock_state.lock().unwrap();
521             if let Some(e) = sock_guard.feed_event() {
522                 events.push(e);
523                 n += 1;
524             }
525 
526             if !sock_guard.is_pending_deletion() {
527                 update_queue.push_back(sock_state.clone());
528             }
529         }
530         self.afd_group.release_unused_afd();
531         n
532     }
533 }
534 
535 cfg_io_source! {
536     use std::mem::size_of;
537     use std::ptr::null_mut;
538 
539     use windows_sys::Win32::Networking::WinSock::{
540         WSAGetLastError, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE,
541         SIO_BSP_HANDLE_POLL, SIO_BSP_HANDLE_SELECT, SOCKET_ERROR,
542     };
543 
544 
545     impl SelectorInner {
546         fn register(
547             this: &Arc<Self>,
548             socket: RawSocket,
549             token: Token,
550             interests: Interest,
551         ) -> io::Result<InternalState> {
552             let flags = interests_to_afd_flags(interests);
553 
554             let sock = {
555                 let sock = this._alloc_sock_for_rawsocket(socket)?;
556                 let event = Event {
557                     flags,
558                     data: token.0 as u64,
559                 };
560                 sock.lock().unwrap().set_event(event);
561                 sock
562             };
563 
564             let state = InternalState {
565                 selector: this.clone(),
566                 token,
567                 interests,
568                 sock_state: sock.clone(),
569             };
570 
571             this.queue_state(sock);
572             unsafe { this.update_sockets_events_if_polling()? };
573 
574             Ok(state)
575         }
576 
577         // Directly accessed in `IoSourceState::do_io`.
578         pub(super) fn reregister(
579             &self,
580             state: Pin<Arc<Mutex<SockState>>>,
581             token: Token,
582             interests: Interest,
583         ) -> io::Result<()> {
584             {
585                 let event = Event {
586                     flags: interests_to_afd_flags(interests),
587                     data: token.0 as u64,
588                 };
589 
590                 state.lock().unwrap().set_event(event);
591             }
592 
593             // FIXME: a sock which has_error true should not be re-added to
594             // the update queue because it's already there.
595             self.queue_state(state);
596             unsafe { self.update_sockets_events_if_polling() }
597         }
598 
599         /// This function is called by register() and reregister() to start an
600         /// IOCTL_AFD_POLL operation corresponding to the registered events, but
601         /// only if necessary.
602         ///
603         /// Since it is not possible to modify or synchronously cancel an AFD_POLL
604         /// operation, and there can be only one active AFD_POLL operation per
605         /// (socket, completion port) pair at any time, it is expensive to change
606         /// a socket's event registration after it has been submitted to the kernel.
607         ///
608         /// Therefore, if no other threads are polling when interest in a socket
609         /// event is (re)registered, the socket is added to the 'update queue', but
610         /// the actual syscall to start the IOCTL_AFD_POLL operation is deferred
611         /// until just before the GetQueuedCompletionStatusEx() syscall is made.
612         ///
613         /// However, when another thread is already blocked on
614         /// GetQueuedCompletionStatusEx() we tell the kernel about the registered
615         /// socket event(s) immediately.
616         unsafe fn update_sockets_events_if_polling(&self) -> io::Result<()> {
617             if self.is_polling.load(Ordering::Acquire) {
618                 self.update_sockets_events()
619             } else {
620                 Ok(())
621             }
622         }
623 
624         fn queue_state(&self, sock_state: Pin<Arc<Mutex<SockState>>>) {
625             let mut update_queue = self.update_queue.lock().unwrap();
626             update_queue.push_back(sock_state);
627         }
628 
629         fn _alloc_sock_for_rawsocket(
630             &self,
631             raw_socket: RawSocket,
632         ) -> io::Result<Pin<Arc<Mutex<SockState>>>> {
633             let afd = self.afd_group.acquire()?;
634             Ok(Arc::pin(Mutex::new(SockState::new(raw_socket, afd)?)))
635         }
636     }
637 
638     fn try_get_base_socket(raw_socket: RawSocket, ioctl: u32) -> Result<RawSocket, i32> {
639         let mut base_socket: RawSocket = 0;
640         let mut bytes: u32 = 0;
641         unsafe {
642             if WSAIoctl(
643                 raw_socket as usize,
644                 ioctl,
645                 null_mut(),
646                 0,
647                 &mut base_socket as *mut _ as *mut c_void,
648                 size_of::<RawSocket>() as u32,
649                 &mut bytes,
650                 null_mut(),
651                 None,
652             ) != SOCKET_ERROR
653             {
654                 Ok(base_socket)
655             } else {
656                 Err(WSAGetLastError())
657             }
658         }
659     }
660 
661     fn get_base_socket(raw_socket: RawSocket) -> io::Result<RawSocket> {
662         let res = try_get_base_socket(raw_socket, SIO_BASE_HANDLE);
663         if let Ok(base_socket) = res {
664             return Ok(base_socket);
665         }
666 
667         // The `SIO_BASE_HANDLE` should not be intercepted by LSPs, therefore
668         // it should not fail as long as `raw_socket` is a valid socket. See
669         // https://docs.microsoft.com/en-us/windows/win32/winsock/winsock-ioctls.
670         // However, at least one known LSP deliberately breaks it, so we try
671         // some alternative IOCTLs, starting with the most appropriate one.
672         for &ioctl in &[
673             SIO_BSP_HANDLE_SELECT,
674             SIO_BSP_HANDLE_POLL,
675             SIO_BSP_HANDLE,
676         ] {
677             if let Ok(base_socket) = try_get_base_socket(raw_socket, ioctl) {
678                 // Since we know now that we're dealing with an LSP (otherwise
679                 // SIO_BASE_HANDLE would't have failed), only return any result
680                 // when it is different from the original `raw_socket`.
681                 if base_socket != raw_socket {
682                     return Ok(base_socket);
683                 }
684             }
685         }
686 
687         // If the alternative IOCTLs also failed, return the original error.
688         let os_error = res.unwrap_err();
689         let err = io::Error::from_raw_os_error(os_error);
690         Err(err)
691     }
692 }
693 
694 impl Drop for SelectorInner {
drop(&mut self)695     fn drop(&mut self) {
696         loop {
697             let events_num: usize;
698             let mut statuses: [CompletionStatus; 1024] = [CompletionStatus::zero(); 1024];
699 
700             let result = self
701                 .cp
702                 .get_many(&mut statuses, Some(std::time::Duration::from_millis(0)));
703             match result {
704                 Ok(iocp_events) => {
705                     events_num = iocp_events.iter().len();
706                     for iocp_event in iocp_events.iter() {
707                         if iocp_event.overlapped().is_null() {
708                             // Custom event
709                         } else if iocp_event.token() % 2 == 1 {
710                             // Named pipe, dispatch the event so it can release resources
711                             let callback = unsafe {
712                                 (*(iocp_event.overlapped() as *mut super::Overlapped)).callback
713                             };
714 
715                             callback(iocp_event.entry(), None);
716                         } else {
717                             // drain sock state to release memory of Arc reference
718                             let _sock_state = from_overlapped(iocp_event.overlapped());
719                         }
720                     }
721                 }
722 
723                 Err(_) => {
724                     break;
725                 }
726             }
727 
728             if events_num == 0 {
729                 // continue looping until all completion statuses have been drained
730                 break;
731             }
732         }
733 
734         self.afd_group.release_unused_afd();
735     }
736 }
737 
738 cfg_net! {
739     fn interests_to_afd_flags(interests: Interest) -> u32 {
740         let mut flags = 0;
741 
742         if interests.is_readable() {
743             flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS;
744         }
745 
746         if interests.is_writable() {
747             flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS;
748         }
749 
750         flags
751     }
752 }
753