• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selector and proactor event loops for Windows."""
2
3import sys
4
5if sys.platform != 'win32':  # pragma: no cover
6    raise ImportError('win32 only')
7
8import _overlapped
9import _winapi
10import errno
11import math
12import msvcrt
13import socket
14import struct
15import time
16import weakref
17
18from . import events
19from . import base_subprocess
20from . import futures
21from . import exceptions
22from . import proactor_events
23from . import selector_events
24from . import tasks
25from . import windows_utils
26from .log import logger
27
28
29__all__ = (
30    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
31    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32    'WindowsProactorEventLoopPolicy',
33)
34
35
36NULL = 0
37INFINITE = 0xffffffff
38ERROR_CONNECTION_REFUSED = 1225
39ERROR_CONNECTION_ABORTED = 1236
40
41# Initial delay in seconds for connect_pipe() before retrying to connect
42CONNECT_PIPE_INIT_DELAY = 0.001
43
44# Maximum delay in seconds for connect_pipe() before retrying to connect
45CONNECT_PIPE_MAX_DELAY = 0.100
46
47
48class _OverlappedFuture(futures.Future):
49    """Subclass of Future which represents an overlapped operation.
50
51    Cancelling it will immediately cancel the overlapped operation.
52    """
53
54    def __init__(self, ov, *, loop=None):
55        super().__init__(loop=loop)
56        if self._source_traceback:
57            del self._source_traceback[-1]
58        self._ov = ov
59
60    def _repr_info(self):
61        info = super()._repr_info()
62        if self._ov is not None:
63            state = 'pending' if self._ov.pending else 'completed'
64            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
65        return info
66
67    def _cancel_overlapped(self):
68        if self._ov is None:
69            return
70        try:
71            self._ov.cancel()
72        except OSError as exc:
73            context = {
74                'message': 'Cancelling an overlapped future failed',
75                'exception': exc,
76                'future': self,
77            }
78            if self._source_traceback:
79                context['source_traceback'] = self._source_traceback
80            self._loop.call_exception_handler(context)
81        self._ov = None
82
83    def cancel(self, msg=None):
84        self._cancel_overlapped()
85        return super().cancel(msg=msg)
86
87    def set_exception(self, exception):
88        super().set_exception(exception)
89        self._cancel_overlapped()
90
91    def set_result(self, result):
92        super().set_result(result)
93        self._ov = None
94
95
96class _BaseWaitHandleFuture(futures.Future):
97    """Subclass of Future which represents a wait handle."""
98
99    def __init__(self, ov, handle, wait_handle, *, loop=None):
100        super().__init__(loop=loop)
101        if self._source_traceback:
102            del self._source_traceback[-1]
103        # Keep a reference to the Overlapped object to keep it alive until the
104        # wait is unregistered
105        self._ov = ov
106        self._handle = handle
107        self._wait_handle = wait_handle
108
109        # Should we call UnregisterWaitEx() if the wait completes
110        # or is cancelled?
111        self._registered = True
112
113    def _poll(self):
114        # non-blocking wait: use a timeout of 0 millisecond
115        return (_winapi.WaitForSingleObject(self._handle, 0) ==
116                _winapi.WAIT_OBJECT_0)
117
118    def _repr_info(self):
119        info = super()._repr_info()
120        info.append(f'handle={self._handle:#x}')
121        if self._handle is not None:
122            state = 'signaled' if self._poll() else 'waiting'
123            info.append(state)
124        if self._wait_handle is not None:
125            info.append(f'wait_handle={self._wait_handle:#x}')
126        return info
127
128    def _unregister_wait_cb(self, fut):
129        # The wait was unregistered: it's not safe to destroy the Overlapped
130        # object
131        self._ov = None
132
133    def _unregister_wait(self):
134        if not self._registered:
135            return
136        self._registered = False
137
138        wait_handle = self._wait_handle
139        self._wait_handle = None
140        try:
141            _overlapped.UnregisterWait(wait_handle)
142        except OSError as exc:
143            if exc.winerror != _overlapped.ERROR_IO_PENDING:
144                context = {
145                    'message': 'Failed to unregister the wait handle',
146                    'exception': exc,
147                    'future': self,
148                }
149                if self._source_traceback:
150                    context['source_traceback'] = self._source_traceback
151                self._loop.call_exception_handler(context)
152                return
153            # ERROR_IO_PENDING means that the unregister is pending
154
155        self._unregister_wait_cb(None)
156
157    def cancel(self, msg=None):
158        self._unregister_wait()
159        return super().cancel(msg=msg)
160
161    def set_exception(self, exception):
162        self._unregister_wait()
163        super().set_exception(exception)
164
165    def set_result(self, result):
166        self._unregister_wait()
167        super().set_result(result)
168
169
170class _WaitCancelFuture(_BaseWaitHandleFuture):
171    """Subclass of Future which represents a wait for the cancellation of a
172    _WaitHandleFuture using an event.
173    """
174
175    def __init__(self, ov, event, wait_handle, *, loop=None):
176        super().__init__(ov, event, wait_handle, loop=loop)
177
178        self._done_callback = None
179
180    def cancel(self):
181        raise RuntimeError("_WaitCancelFuture must not be cancelled")
182
183    def set_result(self, result):
184        super().set_result(result)
185        if self._done_callback is not None:
186            self._done_callback(self)
187
188    def set_exception(self, exception):
189        super().set_exception(exception)
190        if self._done_callback is not None:
191            self._done_callback(self)
192
193
194class _WaitHandleFuture(_BaseWaitHandleFuture):
195    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196        super().__init__(ov, handle, wait_handle, loop=loop)
197        self._proactor = proactor
198        self._unregister_proactor = True
199        self._event = _overlapped.CreateEvent(None, True, False, None)
200        self._event_fut = None
201
202    def _unregister_wait_cb(self, fut):
203        if self._event is not None:
204            _winapi.CloseHandle(self._event)
205            self._event = None
206            self._event_fut = None
207
208        # If the wait was cancelled, the wait may never be signalled, so
209        # it's required to unregister it. Otherwise, IocpProactor.close() will
210        # wait forever for an event which will never come.
211        #
212        # If the IocpProactor already received the event, it's safe to call
213        # _unregister() because we kept a reference to the Overlapped object
214        # which is used as a unique key.
215        self._proactor._unregister(self._ov)
216        self._proactor = None
217
218        super()._unregister_wait_cb(fut)
219
220    def _unregister_wait(self):
221        if not self._registered:
222            return
223        self._registered = False
224
225        wait_handle = self._wait_handle
226        self._wait_handle = None
227        try:
228            _overlapped.UnregisterWaitEx(wait_handle, self._event)
229        except OSError as exc:
230            if exc.winerror != _overlapped.ERROR_IO_PENDING:
231                context = {
232                    'message': 'Failed to unregister the wait handle',
233                    'exception': exc,
234                    'future': self,
235                }
236                if self._source_traceback:
237                    context['source_traceback'] = self._source_traceback
238                self._loop.call_exception_handler(context)
239                return
240            # ERROR_IO_PENDING is not an error, the wait was unregistered
241
242        self._event_fut = self._proactor._wait_cancel(self._event,
243                                                      self._unregister_wait_cb)
244
245
246class PipeServer(object):
247    """Class representing a pipe server.
248
249    This is much like a bound, listening socket.
250    """
251    def __init__(self, address):
252        self._address = address
253        self._free_instances = weakref.WeakSet()
254        # initialize the pipe attribute before calling _server_pipe_handle()
255        # because this function can raise an exception and the destructor calls
256        # the close() method
257        self._pipe = None
258        self._accept_pipe_future = None
259        self._pipe = self._server_pipe_handle(True)
260
261    def _get_unconnected_pipe(self):
262        # Create new instance and return previous one.  This ensures
263        # that (until the server is closed) there is always at least
264        # one pipe handle for address.  Therefore if a client attempt
265        # to connect it will not fail with FileNotFoundError.
266        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267        return tmp
268
269    def _server_pipe_handle(self, first):
270        # Return a wrapper for a new pipe handle.
271        if self.closed():
272            return None
273        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274        if first:
275            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276        h = _winapi.CreateNamedPipe(
277            self._address, flags,
278            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279            _winapi.PIPE_WAIT,
280            _winapi.PIPE_UNLIMITED_INSTANCES,
281            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283        pipe = windows_utils.PipeHandle(h)
284        self._free_instances.add(pipe)
285        return pipe
286
287    def closed(self):
288        return (self._address is None)
289
290    def close(self):
291        if self._accept_pipe_future is not None:
292            self._accept_pipe_future.cancel()
293            self._accept_pipe_future = None
294        # Close all instances which have not been connected to by a client.
295        if self._address is not None:
296            for pipe in self._free_instances:
297                pipe.close()
298            self._pipe = None
299            self._address = None
300            self._free_instances.clear()
301
302    __del__ = close
303
304
305class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
306    """Windows version of selector event loop."""
307
308
309class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
310    """Windows version of proactor event loop using IOCP."""
311
312    def __init__(self, proactor=None):
313        if proactor is None:
314            proactor = IocpProactor()
315        super().__init__(proactor)
316
317    def run_forever(self):
318        try:
319            assert self._self_reading_future is None
320            self.call_soon(self._loop_self_reading)
321            super().run_forever()
322        finally:
323            if self._self_reading_future is not None:
324                ov = self._self_reading_future._ov
325                self._self_reading_future.cancel()
326                # self_reading_future was just cancelled so if it hasn't been
327                # finished yet, it never will be (it's possible that it has
328                # already finished and its callback is waiting in the queue,
329                # where it could still happen if the event loop is restarted).
330                # Unregister it otherwise IocpProactor.close will wait for it
331                # forever
332                if ov is not None:
333                    self._proactor._unregister(ov)
334                self._self_reading_future = None
335
336    async def create_pipe_connection(self, protocol_factory, address):
337        f = self._proactor.connect_pipe(address)
338        pipe = await f
339        protocol = protocol_factory()
340        trans = self._make_duplex_pipe_transport(pipe, protocol,
341                                                 extra={'addr': address})
342        return trans, protocol
343
344    async def start_serving_pipe(self, protocol_factory, address):
345        server = PipeServer(address)
346
347        def loop_accept_pipe(f=None):
348            pipe = None
349            try:
350                if f:
351                    pipe = f.result()
352                    server._free_instances.discard(pipe)
353
354                    if server.closed():
355                        # A client connected before the server was closed:
356                        # drop the client (close the pipe) and exit
357                        pipe.close()
358                        return
359
360                    protocol = protocol_factory()
361                    self._make_duplex_pipe_transport(
362                        pipe, protocol, extra={'addr': address})
363
364                pipe = server._get_unconnected_pipe()
365                if pipe is None:
366                    return
367
368                f = self._proactor.accept_pipe(pipe)
369            except OSError as exc:
370                if pipe and pipe.fileno() != -1:
371                    self.call_exception_handler({
372                        'message': 'Pipe accept failed',
373                        'exception': exc,
374                        'pipe': pipe,
375                    })
376                    pipe.close()
377                elif self._debug:
378                    logger.warning("Accept pipe failed on pipe %r",
379                                   pipe, exc_info=True)
380            except exceptions.CancelledError:
381                if pipe:
382                    pipe.close()
383            else:
384                server._accept_pipe_future = f
385                f.add_done_callback(loop_accept_pipe)
386
387        self.call_soon(loop_accept_pipe)
388        return [server]
389
390    async def _make_subprocess_transport(self, protocol, args, shell,
391                                         stdin, stdout, stderr, bufsize,
392                                         extra=None, **kwargs):
393        waiter = self.create_future()
394        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
395                                             stdin, stdout, stderr, bufsize,
396                                             waiter=waiter, extra=extra,
397                                             **kwargs)
398        try:
399            await waiter
400        except (SystemExit, KeyboardInterrupt):
401            raise
402        except BaseException:
403            transp.close()
404            await transp._wait()
405            raise
406
407        return transp
408
409
410class IocpProactor:
411    """Proactor implementation using IOCP."""
412
413    def __init__(self, concurrency=0xffffffff):
414        self._loop = None
415        self._results = []
416        self._iocp = _overlapped.CreateIoCompletionPort(
417            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
418        self._cache = {}
419        self._registered = weakref.WeakSet()
420        self._unregistered = []
421        self._stopped_serving = weakref.WeakSet()
422
423    def _check_closed(self):
424        if self._iocp is None:
425            raise RuntimeError('IocpProactor is closed')
426
427    def __repr__(self):
428        info = ['overlapped#=%s' % len(self._cache),
429                'result#=%s' % len(self._results)]
430        if self._iocp is None:
431            info.append('closed')
432        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
433
434    def set_loop(self, loop):
435        self._loop = loop
436
437    def select(self, timeout=None):
438        if not self._results:
439            self._poll(timeout)
440        tmp = self._results
441        self._results = []
442        return tmp
443
444    def _result(self, value):
445        fut = self._loop.create_future()
446        fut.set_result(value)
447        return fut
448
449    def recv(self, conn, nbytes, flags=0):
450        self._register_with_iocp(conn)
451        ov = _overlapped.Overlapped(NULL)
452        try:
453            if isinstance(conn, socket.socket):
454                ov.WSARecv(conn.fileno(), nbytes, flags)
455            else:
456                ov.ReadFile(conn.fileno(), nbytes)
457        except BrokenPipeError:
458            return self._result(b'')
459
460        def finish_recv(trans, key, ov):
461            try:
462                return ov.getresult()
463            except OSError as exc:
464                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
465                                    _overlapped.ERROR_OPERATION_ABORTED):
466                    raise ConnectionResetError(*exc.args)
467                else:
468                    raise
469
470        return self._register(ov, conn, finish_recv)
471
472    def recv_into(self, conn, buf, flags=0):
473        self._register_with_iocp(conn)
474        ov = _overlapped.Overlapped(NULL)
475        try:
476            if isinstance(conn, socket.socket):
477                ov.WSARecvInto(conn.fileno(), buf, flags)
478            else:
479                ov.ReadFileInto(conn.fileno(), buf)
480        except BrokenPipeError:
481            return self._result(0)
482
483        def finish_recv(trans, key, ov):
484            try:
485                return ov.getresult()
486            except OSError as exc:
487                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
488                                    _overlapped.ERROR_OPERATION_ABORTED):
489                    raise ConnectionResetError(*exc.args)
490                else:
491                    raise
492
493        return self._register(ov, conn, finish_recv)
494
495    def recvfrom(self, conn, nbytes, flags=0):
496        self._register_with_iocp(conn)
497        ov = _overlapped.Overlapped(NULL)
498        try:
499            ov.WSARecvFrom(conn.fileno(), nbytes, flags)
500        except BrokenPipeError:
501            return self._result((b'', None))
502
503        def finish_recv(trans, key, ov):
504            try:
505                return ov.getresult()
506            except OSError as exc:
507                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
508                                    _overlapped.ERROR_OPERATION_ABORTED):
509                    raise ConnectionResetError(*exc.args)
510                else:
511                    raise
512
513        return self._register(ov, conn, finish_recv)
514
515    def sendto(self, conn, buf, flags=0, addr=None):
516        self._register_with_iocp(conn)
517        ov = _overlapped.Overlapped(NULL)
518
519        ov.WSASendTo(conn.fileno(), buf, flags, addr)
520
521        def finish_send(trans, key, ov):
522            try:
523                return ov.getresult()
524            except OSError as exc:
525                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
526                                    _overlapped.ERROR_OPERATION_ABORTED):
527                    raise ConnectionResetError(*exc.args)
528                else:
529                    raise
530
531        return self._register(ov, conn, finish_send)
532
533    def send(self, conn, buf, flags=0):
534        self._register_with_iocp(conn)
535        ov = _overlapped.Overlapped(NULL)
536        if isinstance(conn, socket.socket):
537            ov.WSASend(conn.fileno(), buf, flags)
538        else:
539            ov.WriteFile(conn.fileno(), buf)
540
541        def finish_send(trans, key, ov):
542            try:
543                return ov.getresult()
544            except OSError as exc:
545                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
546                                    _overlapped.ERROR_OPERATION_ABORTED):
547                    raise ConnectionResetError(*exc.args)
548                else:
549                    raise
550
551        return self._register(ov, conn, finish_send)
552
553    def accept(self, listener):
554        self._register_with_iocp(listener)
555        conn = self._get_accept_socket(listener.family)
556        ov = _overlapped.Overlapped(NULL)
557        ov.AcceptEx(listener.fileno(), conn.fileno())
558
559        def finish_accept(trans, key, ov):
560            ov.getresult()
561            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
562            buf = struct.pack('@P', listener.fileno())
563            conn.setsockopt(socket.SOL_SOCKET,
564                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
565            conn.settimeout(listener.gettimeout())
566            return conn, conn.getpeername()
567
568        async def accept_coro(future, conn):
569            # Coroutine closing the accept socket if the future is cancelled
570            try:
571                await future
572            except exceptions.CancelledError:
573                conn.close()
574                raise
575
576        future = self._register(ov, listener, finish_accept)
577        coro = accept_coro(future, conn)
578        tasks.ensure_future(coro, loop=self._loop)
579        return future
580
581    def connect(self, conn, address):
582        if conn.type == socket.SOCK_DGRAM:
583            # WSAConnect will complete immediately for UDP sockets so we don't
584            # need to register any IOCP operation
585            _overlapped.WSAConnect(conn.fileno(), address)
586            fut = self._loop.create_future()
587            fut.set_result(None)
588            return fut
589
590        self._register_with_iocp(conn)
591        # The socket needs to be locally bound before we call ConnectEx().
592        try:
593            _overlapped.BindLocal(conn.fileno(), conn.family)
594        except OSError as e:
595            if e.winerror != errno.WSAEINVAL:
596                raise
597            # Probably already locally bound; check using getsockname().
598            if conn.getsockname()[1] == 0:
599                raise
600        ov = _overlapped.Overlapped(NULL)
601        ov.ConnectEx(conn.fileno(), address)
602
603        def finish_connect(trans, key, ov):
604            ov.getresult()
605            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
606            conn.setsockopt(socket.SOL_SOCKET,
607                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
608            return conn
609
610        return self._register(ov, conn, finish_connect)
611
612    def sendfile(self, sock, file, offset, count):
613        self._register_with_iocp(sock)
614        ov = _overlapped.Overlapped(NULL)
615        offset_low = offset & 0xffff_ffff
616        offset_high = (offset >> 32) & 0xffff_ffff
617        ov.TransmitFile(sock.fileno(),
618                        msvcrt.get_osfhandle(file.fileno()),
619                        offset_low, offset_high,
620                        count, 0, 0)
621
622        def finish_sendfile(trans, key, ov):
623            try:
624                return ov.getresult()
625            except OSError as exc:
626                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
627                                    _overlapped.ERROR_OPERATION_ABORTED):
628                    raise ConnectionResetError(*exc.args)
629                else:
630                    raise
631        return self._register(ov, sock, finish_sendfile)
632
633    def accept_pipe(self, pipe):
634        self._register_with_iocp(pipe)
635        ov = _overlapped.Overlapped(NULL)
636        connected = ov.ConnectNamedPipe(pipe.fileno())
637
638        if connected:
639            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
640            # that the pipe is connected. There is no need to wait for the
641            # completion of the connection.
642            return self._result(pipe)
643
644        def finish_accept_pipe(trans, key, ov):
645            ov.getresult()
646            return pipe
647
648        return self._register(ov, pipe, finish_accept_pipe)
649
650    async def connect_pipe(self, address):
651        delay = CONNECT_PIPE_INIT_DELAY
652        while True:
653            # Unfortunately there is no way to do an overlapped connect to
654            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
655            # ERROR_PIPE_BUSY.
656            try:
657                handle = _overlapped.ConnectPipe(address)
658                break
659            except OSError as exc:
660                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
661                    raise
662
663            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
664            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
665            await tasks.sleep(delay)
666
667        return windows_utils.PipeHandle(handle)
668
669    def wait_for_handle(self, handle, timeout=None):
670        """Wait for a handle.
671
672        Return a Future object. The result of the future is True if the wait
673        completed, or False if the wait did not complete (on timeout).
674        """
675        return self._wait_for_handle(handle, timeout, False)
676
677    def _wait_cancel(self, event, done_callback):
678        fut = self._wait_for_handle(event, None, True)
679        # add_done_callback() cannot be used because the wait may only complete
680        # in IocpProactor.close(), while the event loop is not running.
681        fut._done_callback = done_callback
682        return fut
683
684    def _wait_for_handle(self, handle, timeout, _is_cancel):
685        self._check_closed()
686
687        if timeout is None:
688            ms = _winapi.INFINITE
689        else:
690            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
691            # round away from zero to wait *at least* timeout seconds.
692            ms = math.ceil(timeout * 1e3)
693
694        # We only create ov so we can use ov.address as a key for the cache.
695        ov = _overlapped.Overlapped(NULL)
696        wait_handle = _overlapped.RegisterWaitWithQueue(
697            handle, self._iocp, ov.address, ms)
698        if _is_cancel:
699            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
700        else:
701            f = _WaitHandleFuture(ov, handle, wait_handle, self,
702                                  loop=self._loop)
703        if f._source_traceback:
704            del f._source_traceback[-1]
705
706        def finish_wait_for_handle(trans, key, ov):
707            # Note that this second wait means that we should only use
708            # this with handles types where a successful wait has no
709            # effect.  So events or processes are all right, but locks
710            # or semaphores are not.  Also note if the handle is
711            # signalled and then quickly reset, then we may return
712            # False even though we have not timed out.
713            return f._poll()
714
715        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
716        return f
717
718    def _register_with_iocp(self, obj):
719        # To get notifications of finished ops on this objects sent to the
720        # completion port, were must register the handle.
721        if obj not in self._registered:
722            self._registered.add(obj)
723            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
724            # XXX We could also use SetFileCompletionNotificationModes()
725            # to avoid sending notifications to completion port of ops
726            # that succeed immediately.
727
728    def _register(self, ov, obj, callback):
729        self._check_closed()
730
731        # Return a future which will be set with the result of the
732        # operation when it completes.  The future's value is actually
733        # the value returned by callback().
734        f = _OverlappedFuture(ov, loop=self._loop)
735        if f._source_traceback:
736            del f._source_traceback[-1]
737        if not ov.pending:
738            # The operation has completed, so no need to postpone the
739            # work.  We cannot take this short cut if we need the
740            # NumberOfBytes, CompletionKey values returned by
741            # PostQueuedCompletionStatus().
742            try:
743                value = callback(None, None, ov)
744            except OSError as e:
745                f.set_exception(e)
746            else:
747                f.set_result(value)
748            # Even if GetOverlappedResult() was called, we have to wait for the
749            # notification of the completion in GetQueuedCompletionStatus().
750            # Register the overlapped operation to keep a reference to the
751            # OVERLAPPED object, otherwise the memory is freed and Windows may
752            # read uninitialized memory.
753
754        # Register the overlapped operation for later.  Note that
755        # we only store obj to prevent it from being garbage
756        # collected too early.
757        self._cache[ov.address] = (f, ov, obj, callback)
758        return f
759
760    def _unregister(self, ov):
761        """Unregister an overlapped object.
762
763        Call this method when its future has been cancelled. The event can
764        already be signalled (pending in the proactor event queue). It is also
765        safe if the event is never signalled (because it was cancelled).
766        """
767        self._check_closed()
768        self._unregistered.append(ov)
769
770    def _get_accept_socket(self, family):
771        s = socket.socket(family)
772        s.settimeout(0)
773        return s
774
775    def _poll(self, timeout=None):
776        if timeout is None:
777            ms = INFINITE
778        elif timeout < 0:
779            raise ValueError("negative timeout")
780        else:
781            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
782            # round away from zero to wait *at least* timeout seconds.
783            ms = math.ceil(timeout * 1e3)
784            if ms >= INFINITE:
785                raise ValueError("timeout too big")
786
787        while True:
788            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
789            if status is None:
790                break
791            ms = 0
792
793            err, transferred, key, address = status
794            try:
795                f, ov, obj, callback = self._cache.pop(address)
796            except KeyError:
797                if self._loop.get_debug():
798                    self._loop.call_exception_handler({
799                        'message': ('GetQueuedCompletionStatus() returned an '
800                                    'unexpected event'),
801                        'status': ('err=%s transferred=%s key=%#x address=%#x'
802                                   % (err, transferred, key, address)),
803                    })
804
805                # key is either zero, or it is used to return a pipe
806                # handle which should be closed to avoid a leak.
807                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
808                    _winapi.CloseHandle(key)
809                continue
810
811            if obj in self._stopped_serving:
812                f.cancel()
813            # Don't call the callback if _register() already read the result or
814            # if the overlapped has been cancelled
815            elif not f.done():
816                try:
817                    value = callback(transferred, key, ov)
818                except OSError as e:
819                    f.set_exception(e)
820                    self._results.append(f)
821                else:
822                    f.set_result(value)
823                    self._results.append(f)
824
825        # Remove unregistered futures
826        for ov in self._unregistered:
827            self._cache.pop(ov.address, None)
828        self._unregistered.clear()
829
830    def _stop_serving(self, obj):
831        # obj is a socket or pipe handle.  It will be closed in
832        # BaseProactorEventLoop._stop_serving() which will make any
833        # pending operations fail quickly.
834        self._stopped_serving.add(obj)
835
836    def close(self):
837        if self._iocp is None:
838            # already closed
839            return
840
841        # Cancel remaining registered operations.
842        for address, (fut, ov, obj, callback) in list(self._cache.items()):
843            if fut.cancelled():
844                # Nothing to do with cancelled futures
845                pass
846            elif isinstance(fut, _WaitCancelFuture):
847                # _WaitCancelFuture must not be cancelled
848                pass
849            else:
850                try:
851                    fut.cancel()
852                except OSError as exc:
853                    if self._loop is not None:
854                        context = {
855                            'message': 'Cancelling a future failed',
856                            'exception': exc,
857                            'future': fut,
858                        }
859                        if fut._source_traceback:
860                            context['source_traceback'] = fut._source_traceback
861                        self._loop.call_exception_handler(context)
862
863        # Wait until all cancelled overlapped complete: don't exit with running
864        # overlapped to prevent a crash. Display progress every second if the
865        # loop is still running.
866        msg_update = 1.0
867        start_time = time.monotonic()
868        next_msg = start_time + msg_update
869        while self._cache:
870            if next_msg <= time.monotonic():
871                logger.debug('%r is running after closing for %.1f seconds',
872                             self, time.monotonic() - start_time)
873                next_msg = time.monotonic() + msg_update
874
875            # handle a few events, or timeout
876            self._poll(msg_update)
877
878        self._results = []
879
880        _winapi.CloseHandle(self._iocp)
881        self._iocp = None
882
883    def __del__(self):
884        self.close()
885
886
887class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
888
889    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
890        self._proc = windows_utils.Popen(
891            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
892            bufsize=bufsize, **kwargs)
893
894        def callback(f):
895            returncode = self._proc.poll()
896            self._process_exited(returncode)
897
898        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
899        f.add_done_callback(callback)
900
901
902SelectorEventLoop = _WindowsSelectorEventLoop
903
904
905class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
906    _loop_factory = SelectorEventLoop
907
908
909class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
910    _loop_factory = ProactorEventLoop
911
912
913DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
914