• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selector and proactor event loops for Windows."""
2
3import _overlapped
4import _winapi
5import errno
6import math
7import msvcrt
8import socket
9import struct
10import time
11import weakref
12
13from . import events
14from . import base_subprocess
15from . import futures
16from . import exceptions
17from . import proactor_events
18from . import selector_events
19from . import tasks
20from . import windows_utils
21from .log import logger
22
23
24__all__ = (
25    'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
26    'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
27    'WindowsProactorEventLoopPolicy',
28)
29
30
31NULL = 0
32INFINITE = 0xffffffff
33ERROR_CONNECTION_REFUSED = 1225
34ERROR_CONNECTION_ABORTED = 1236
35
36# Initial delay in seconds for connect_pipe() before retrying to connect
37CONNECT_PIPE_INIT_DELAY = 0.001
38
39# Maximum delay in seconds for connect_pipe() before retrying to connect
40CONNECT_PIPE_MAX_DELAY = 0.100
41
42
43class _OverlappedFuture(futures.Future):
44    """Subclass of Future which represents an overlapped operation.
45
46    Cancelling it will immediately cancel the overlapped operation.
47    """
48
49    def __init__(self, ov, *, loop=None):
50        super().__init__(loop=loop)
51        if self._source_traceback:
52            del self._source_traceback[-1]
53        self._ov = ov
54
55    def _repr_info(self):
56        info = super()._repr_info()
57        if self._ov is not None:
58            state = 'pending' if self._ov.pending else 'completed'
59            info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
60        return info
61
62    def _cancel_overlapped(self):
63        if self._ov is None:
64            return
65        try:
66            self._ov.cancel()
67        except OSError as exc:
68            context = {
69                'message': 'Cancelling an overlapped future failed',
70                'exception': exc,
71                'future': self,
72            }
73            if self._source_traceback:
74                context['source_traceback'] = self._source_traceback
75            self._loop.call_exception_handler(context)
76        self._ov = None
77
78    def cancel(self):
79        self._cancel_overlapped()
80        return super().cancel()
81
82    def set_exception(self, exception):
83        super().set_exception(exception)
84        self._cancel_overlapped()
85
86    def set_result(self, result):
87        super().set_result(result)
88        self._ov = None
89
90
91class _BaseWaitHandleFuture(futures.Future):
92    """Subclass of Future which represents a wait handle."""
93
94    def __init__(self, ov, handle, wait_handle, *, loop=None):
95        super().__init__(loop=loop)
96        if self._source_traceback:
97            del self._source_traceback[-1]
98        # Keep a reference to the Overlapped object to keep it alive until the
99        # wait is unregistered
100        self._ov = ov
101        self._handle = handle
102        self._wait_handle = wait_handle
103
104        # Should we call UnregisterWaitEx() if the wait completes
105        # or is cancelled?
106        self._registered = True
107
108    def _poll(self):
109        # non-blocking wait: use a timeout of 0 millisecond
110        return (_winapi.WaitForSingleObject(self._handle, 0) ==
111                _winapi.WAIT_OBJECT_0)
112
113    def _repr_info(self):
114        info = super()._repr_info()
115        info.append(f'handle={self._handle:#x}')
116        if self._handle is not None:
117            state = 'signaled' if self._poll() else 'waiting'
118            info.append(state)
119        if self._wait_handle is not None:
120            info.append(f'wait_handle={self._wait_handle:#x}')
121        return info
122
123    def _unregister_wait_cb(self, fut):
124        # The wait was unregistered: it's not safe to destroy the Overlapped
125        # object
126        self._ov = None
127
128    def _unregister_wait(self):
129        if not self._registered:
130            return
131        self._registered = False
132
133        wait_handle = self._wait_handle
134        self._wait_handle = None
135        try:
136            _overlapped.UnregisterWait(wait_handle)
137        except OSError as exc:
138            if exc.winerror != _overlapped.ERROR_IO_PENDING:
139                context = {
140                    'message': 'Failed to unregister the wait handle',
141                    'exception': exc,
142                    'future': self,
143                }
144                if self._source_traceback:
145                    context['source_traceback'] = self._source_traceback
146                self._loop.call_exception_handler(context)
147                return
148            # ERROR_IO_PENDING means that the unregister is pending
149
150        self._unregister_wait_cb(None)
151
152    def cancel(self):
153        self._unregister_wait()
154        return super().cancel()
155
156    def set_exception(self, exception):
157        self._unregister_wait()
158        super().set_exception(exception)
159
160    def set_result(self, result):
161        self._unregister_wait()
162        super().set_result(result)
163
164
165class _WaitCancelFuture(_BaseWaitHandleFuture):
166    """Subclass of Future which represents a wait for the cancellation of a
167    _WaitHandleFuture using an event.
168    """
169
170    def __init__(self, ov, event, wait_handle, *, loop=None):
171        super().__init__(ov, event, wait_handle, loop=loop)
172
173        self._done_callback = None
174
175    def cancel(self):
176        raise RuntimeError("_WaitCancelFuture must not be cancelled")
177
178    def set_result(self, result):
179        super().set_result(result)
180        if self._done_callback is not None:
181            self._done_callback(self)
182
183    def set_exception(self, exception):
184        super().set_exception(exception)
185        if self._done_callback is not None:
186            self._done_callback(self)
187
188
189class _WaitHandleFuture(_BaseWaitHandleFuture):
190    def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
191        super().__init__(ov, handle, wait_handle, loop=loop)
192        self._proactor = proactor
193        self._unregister_proactor = True
194        self._event = _overlapped.CreateEvent(None, True, False, None)
195        self._event_fut = None
196
197    def _unregister_wait_cb(self, fut):
198        if self._event is not None:
199            _winapi.CloseHandle(self._event)
200            self._event = None
201            self._event_fut = None
202
203        # If the wait was cancelled, the wait may never be signalled, so
204        # it's required to unregister it. Otherwise, IocpProactor.close() will
205        # wait forever for an event which will never come.
206        #
207        # If the IocpProactor already received the event, it's safe to call
208        # _unregister() because we kept a reference to the Overlapped object
209        # which is used as a unique key.
210        self._proactor._unregister(self._ov)
211        self._proactor = None
212
213        super()._unregister_wait_cb(fut)
214
215    def _unregister_wait(self):
216        if not self._registered:
217            return
218        self._registered = False
219
220        wait_handle = self._wait_handle
221        self._wait_handle = None
222        try:
223            _overlapped.UnregisterWaitEx(wait_handle, self._event)
224        except OSError as exc:
225            if exc.winerror != _overlapped.ERROR_IO_PENDING:
226                context = {
227                    'message': 'Failed to unregister the wait handle',
228                    'exception': exc,
229                    'future': self,
230                }
231                if self._source_traceback:
232                    context['source_traceback'] = self._source_traceback
233                self._loop.call_exception_handler(context)
234                return
235            # ERROR_IO_PENDING is not an error, the wait was unregistered
236
237        self._event_fut = self._proactor._wait_cancel(self._event,
238                                                      self._unregister_wait_cb)
239
240
241class PipeServer(object):
242    """Class representing a pipe server.
243
244    This is much like a bound, listening socket.
245    """
246    def __init__(self, address):
247        self._address = address
248        self._free_instances = weakref.WeakSet()
249        # initialize the pipe attribute before calling _server_pipe_handle()
250        # because this function can raise an exception and the destructor calls
251        # the close() method
252        self._pipe = None
253        self._accept_pipe_future = None
254        self._pipe = self._server_pipe_handle(True)
255
256    def _get_unconnected_pipe(self):
257        # Create new instance and return previous one.  This ensures
258        # that (until the server is closed) there is always at least
259        # one pipe handle for address.  Therefore if a client attempt
260        # to connect it will not fail with FileNotFoundError.
261        tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
262        return tmp
263
264    def _server_pipe_handle(self, first):
265        # Return a wrapper for a new pipe handle.
266        if self.closed():
267            return None
268        flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
269        if first:
270            flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
271        h = _winapi.CreateNamedPipe(
272            self._address, flags,
273            _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
274            _winapi.PIPE_WAIT,
275            _winapi.PIPE_UNLIMITED_INSTANCES,
276            windows_utils.BUFSIZE, windows_utils.BUFSIZE,
277            _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
278        pipe = windows_utils.PipeHandle(h)
279        self._free_instances.add(pipe)
280        return pipe
281
282    def closed(self):
283        return (self._address is None)
284
285    def close(self):
286        if self._accept_pipe_future is not None:
287            self._accept_pipe_future.cancel()
288            self._accept_pipe_future = None
289        # Close all instances which have not been connected to by a client.
290        if self._address is not None:
291            for pipe in self._free_instances:
292                pipe.close()
293            self._pipe = None
294            self._address = None
295            self._free_instances.clear()
296
297    __del__ = close
298
299
300class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
301    """Windows version of selector event loop."""
302
303
304class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
305    """Windows version of proactor event loop using IOCP."""
306
307    def __init__(self, proactor=None):
308        if proactor is None:
309            proactor = IocpProactor()
310        super().__init__(proactor)
311
312    def run_forever(self):
313        try:
314            assert self._self_reading_future is None
315            self.call_soon(self._loop_self_reading)
316            super().run_forever()
317        finally:
318            if self._self_reading_future is not None:
319                ov = self._self_reading_future._ov
320                self._self_reading_future.cancel()
321                # self_reading_future was just cancelled so it will never be signalled
322                # Unregister it otherwise IocpProactor.close will wait for it forever
323                if ov is not None:
324                    self._proactor._unregister(ov)
325                self._self_reading_future = None
326
327    async def create_pipe_connection(self, protocol_factory, address):
328        f = self._proactor.connect_pipe(address)
329        pipe = await f
330        protocol = protocol_factory()
331        trans = self._make_duplex_pipe_transport(pipe, protocol,
332                                                 extra={'addr': address})
333        return trans, protocol
334
335    async def start_serving_pipe(self, protocol_factory, address):
336        server = PipeServer(address)
337
338        def loop_accept_pipe(f=None):
339            pipe = None
340            try:
341                if f:
342                    pipe = f.result()
343                    server._free_instances.discard(pipe)
344
345                    if server.closed():
346                        # A client connected before the server was closed:
347                        # drop the client (close the pipe) and exit
348                        pipe.close()
349                        return
350
351                    protocol = protocol_factory()
352                    self._make_duplex_pipe_transport(
353                        pipe, protocol, extra={'addr': address})
354
355                pipe = server._get_unconnected_pipe()
356                if pipe is None:
357                    return
358
359                f = self._proactor.accept_pipe(pipe)
360            except OSError as exc:
361                if pipe and pipe.fileno() != -1:
362                    self.call_exception_handler({
363                        'message': 'Pipe accept failed',
364                        'exception': exc,
365                        'pipe': pipe,
366                    })
367                    pipe.close()
368                elif self._debug:
369                    logger.warning("Accept pipe failed on pipe %r",
370                                   pipe, exc_info=True)
371            except exceptions.CancelledError:
372                if pipe:
373                    pipe.close()
374            else:
375                server._accept_pipe_future = f
376                f.add_done_callback(loop_accept_pipe)
377
378        self.call_soon(loop_accept_pipe)
379        return [server]
380
381    async def _make_subprocess_transport(self, protocol, args, shell,
382                                         stdin, stdout, stderr, bufsize,
383                                         extra=None, **kwargs):
384        waiter = self.create_future()
385        transp = _WindowsSubprocessTransport(self, protocol, args, shell,
386                                             stdin, stdout, stderr, bufsize,
387                                             waiter=waiter, extra=extra,
388                                             **kwargs)
389        try:
390            await waiter
391        except (SystemExit, KeyboardInterrupt):
392            raise
393        except BaseException:
394            transp.close()
395            await transp._wait()
396            raise
397
398        return transp
399
400
401class IocpProactor:
402    """Proactor implementation using IOCP."""
403
404    def __init__(self, concurrency=0xffffffff):
405        self._loop = None
406        self._results = []
407        self._iocp = _overlapped.CreateIoCompletionPort(
408            _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
409        self._cache = {}
410        self._registered = weakref.WeakSet()
411        self._unregistered = []
412        self._stopped_serving = weakref.WeakSet()
413
414    def _check_closed(self):
415        if self._iocp is None:
416            raise RuntimeError('IocpProactor is closed')
417
418    def __repr__(self):
419        info = ['overlapped#=%s' % len(self._cache),
420                'result#=%s' % len(self._results)]
421        if self._iocp is None:
422            info.append('closed')
423        return '<%s %s>' % (self.__class__.__name__, " ".join(info))
424
425    def set_loop(self, loop):
426        self._loop = loop
427
428    def select(self, timeout=None):
429        if not self._results:
430            self._poll(timeout)
431        tmp = self._results
432        self._results = []
433        return tmp
434
435    def _result(self, value):
436        fut = self._loop.create_future()
437        fut.set_result(value)
438        return fut
439
440    def recv(self, conn, nbytes, flags=0):
441        self._register_with_iocp(conn)
442        ov = _overlapped.Overlapped(NULL)
443        try:
444            if isinstance(conn, socket.socket):
445                ov.WSARecv(conn.fileno(), nbytes, flags)
446            else:
447                ov.ReadFile(conn.fileno(), nbytes)
448        except BrokenPipeError:
449            return self._result(b'')
450
451        def finish_recv(trans, key, ov):
452            try:
453                return ov.getresult()
454            except OSError as exc:
455                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
456                                    _overlapped.ERROR_OPERATION_ABORTED):
457                    raise ConnectionResetError(*exc.args)
458                else:
459                    raise
460
461        return self._register(ov, conn, finish_recv)
462
463    def recv_into(self, conn, buf, flags=0):
464        self._register_with_iocp(conn)
465        ov = _overlapped.Overlapped(NULL)
466        try:
467            if isinstance(conn, socket.socket):
468                ov.WSARecvInto(conn.fileno(), buf, flags)
469            else:
470                ov.ReadFileInto(conn.fileno(), buf)
471        except BrokenPipeError:
472            return self._result(b'')
473
474        def finish_recv(trans, key, ov):
475            try:
476                return ov.getresult()
477            except OSError as exc:
478                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
479                                    _overlapped.ERROR_OPERATION_ABORTED):
480                    raise ConnectionResetError(*exc.args)
481                else:
482                    raise
483
484        return self._register(ov, conn, finish_recv)
485
486    def recvfrom(self, conn, nbytes, flags=0):
487        self._register_with_iocp(conn)
488        ov = _overlapped.Overlapped(NULL)
489        try:
490            ov.WSARecvFrom(conn.fileno(), nbytes, flags)
491        except BrokenPipeError:
492            return self._result((b'', None))
493
494        def finish_recv(trans, key, ov):
495            try:
496                return ov.getresult()
497            except OSError as exc:
498                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
499                                    _overlapped.ERROR_OPERATION_ABORTED):
500                    raise ConnectionResetError(*exc.args)
501                else:
502                    raise
503
504        return self._register(ov, conn, finish_recv)
505
506    def sendto(self, conn, buf, flags=0, addr=None):
507        self._register_with_iocp(conn)
508        ov = _overlapped.Overlapped(NULL)
509
510        ov.WSASendTo(conn.fileno(), buf, flags, addr)
511
512        def finish_send(trans, key, ov):
513            try:
514                return ov.getresult()
515            except OSError as exc:
516                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
517                                    _overlapped.ERROR_OPERATION_ABORTED):
518                    raise ConnectionResetError(*exc.args)
519                else:
520                    raise
521
522        return self._register(ov, conn, finish_send)
523
524    def send(self, conn, buf, flags=0):
525        self._register_with_iocp(conn)
526        ov = _overlapped.Overlapped(NULL)
527        if isinstance(conn, socket.socket):
528            ov.WSASend(conn.fileno(), buf, flags)
529        else:
530            ov.WriteFile(conn.fileno(), buf)
531
532        def finish_send(trans, key, ov):
533            try:
534                return ov.getresult()
535            except OSError as exc:
536                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
537                                    _overlapped.ERROR_OPERATION_ABORTED):
538                    raise ConnectionResetError(*exc.args)
539                else:
540                    raise
541
542        return self._register(ov, conn, finish_send)
543
544    def accept(self, listener):
545        self._register_with_iocp(listener)
546        conn = self._get_accept_socket(listener.family)
547        ov = _overlapped.Overlapped(NULL)
548        ov.AcceptEx(listener.fileno(), conn.fileno())
549
550        def finish_accept(trans, key, ov):
551            ov.getresult()
552            # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
553            buf = struct.pack('@P', listener.fileno())
554            conn.setsockopt(socket.SOL_SOCKET,
555                            _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
556            conn.settimeout(listener.gettimeout())
557            return conn, conn.getpeername()
558
559        async def accept_coro(future, conn):
560            # Coroutine closing the accept socket if the future is cancelled
561            try:
562                await future
563            except exceptions.CancelledError:
564                conn.close()
565                raise
566
567        future = self._register(ov, listener, finish_accept)
568        coro = accept_coro(future, conn)
569        tasks.ensure_future(coro, loop=self._loop)
570        return future
571
572    def connect(self, conn, address):
573        if conn.type == socket.SOCK_DGRAM:
574            # WSAConnect will complete immediately for UDP sockets so we don't
575            # need to register any IOCP operation
576            _overlapped.WSAConnect(conn.fileno(), address)
577            fut = self._loop.create_future()
578            fut.set_result(None)
579            return fut
580
581        self._register_with_iocp(conn)
582        # The socket needs to be locally bound before we call ConnectEx().
583        try:
584            _overlapped.BindLocal(conn.fileno(), conn.family)
585        except OSError as e:
586            if e.winerror != errno.WSAEINVAL:
587                raise
588            # Probably already locally bound; check using getsockname().
589            if conn.getsockname()[1] == 0:
590                raise
591        ov = _overlapped.Overlapped(NULL)
592        ov.ConnectEx(conn.fileno(), address)
593
594        def finish_connect(trans, key, ov):
595            ov.getresult()
596            # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
597            conn.setsockopt(socket.SOL_SOCKET,
598                            _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
599            return conn
600
601        return self._register(ov, conn, finish_connect)
602
603    def sendfile(self, sock, file, offset, count):
604        self._register_with_iocp(sock)
605        ov = _overlapped.Overlapped(NULL)
606        offset_low = offset & 0xffff_ffff
607        offset_high = (offset >> 32) & 0xffff_ffff
608        ov.TransmitFile(sock.fileno(),
609                        msvcrt.get_osfhandle(file.fileno()),
610                        offset_low, offset_high,
611                        count, 0, 0)
612
613        def finish_sendfile(trans, key, ov):
614            try:
615                return ov.getresult()
616            except OSError as exc:
617                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
618                                    _overlapped.ERROR_OPERATION_ABORTED):
619                    raise ConnectionResetError(*exc.args)
620                else:
621                    raise
622        return self._register(ov, sock, finish_sendfile)
623
624    def accept_pipe(self, pipe):
625        self._register_with_iocp(pipe)
626        ov = _overlapped.Overlapped(NULL)
627        connected = ov.ConnectNamedPipe(pipe.fileno())
628
629        if connected:
630            # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
631            # that the pipe is connected. There is no need to wait for the
632            # completion of the connection.
633            return self._result(pipe)
634
635        def finish_accept_pipe(trans, key, ov):
636            ov.getresult()
637            return pipe
638
639        return self._register(ov, pipe, finish_accept_pipe)
640
641    async def connect_pipe(self, address):
642        delay = CONNECT_PIPE_INIT_DELAY
643        while True:
644            # Unfortunately there is no way to do an overlapped connect to
645            # a pipe.  Call CreateFile() in a loop until it doesn't fail with
646            # ERROR_PIPE_BUSY.
647            try:
648                handle = _overlapped.ConnectPipe(address)
649                break
650            except OSError as exc:
651                if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
652                    raise
653
654            # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
655            delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
656            await tasks.sleep(delay)
657
658        return windows_utils.PipeHandle(handle)
659
660    def wait_for_handle(self, handle, timeout=None):
661        """Wait for a handle.
662
663        Return a Future object. The result of the future is True if the wait
664        completed, or False if the wait did not complete (on timeout).
665        """
666        return self._wait_for_handle(handle, timeout, False)
667
668    def _wait_cancel(self, event, done_callback):
669        fut = self._wait_for_handle(event, None, True)
670        # add_done_callback() cannot be used because the wait may only complete
671        # in IocpProactor.close(), while the event loop is not running.
672        fut._done_callback = done_callback
673        return fut
674
675    def _wait_for_handle(self, handle, timeout, _is_cancel):
676        self._check_closed()
677
678        if timeout is None:
679            ms = _winapi.INFINITE
680        else:
681            # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
682            # round away from zero to wait *at least* timeout seconds.
683            ms = math.ceil(timeout * 1e3)
684
685        # We only create ov so we can use ov.address as a key for the cache.
686        ov = _overlapped.Overlapped(NULL)
687        wait_handle = _overlapped.RegisterWaitWithQueue(
688            handle, self._iocp, ov.address, ms)
689        if _is_cancel:
690            f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
691        else:
692            f = _WaitHandleFuture(ov, handle, wait_handle, self,
693                                  loop=self._loop)
694        if f._source_traceback:
695            del f._source_traceback[-1]
696
697        def finish_wait_for_handle(trans, key, ov):
698            # Note that this second wait means that we should only use
699            # this with handles types where a successful wait has no
700            # effect.  So events or processes are all right, but locks
701            # or semaphores are not.  Also note if the handle is
702            # signalled and then quickly reset, then we may return
703            # False even though we have not timed out.
704            return f._poll()
705
706        self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
707        return f
708
709    def _register_with_iocp(self, obj):
710        # To get notifications of finished ops on this objects sent to the
711        # completion port, were must register the handle.
712        if obj not in self._registered:
713            self._registered.add(obj)
714            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
715            # XXX We could also use SetFileCompletionNotificationModes()
716            # to avoid sending notifications to completion port of ops
717            # that succeed immediately.
718
719    def _register(self, ov, obj, callback):
720        self._check_closed()
721
722        # Return a future which will be set with the result of the
723        # operation when it completes.  The future's value is actually
724        # the value returned by callback().
725        f = _OverlappedFuture(ov, loop=self._loop)
726        if f._source_traceback:
727            del f._source_traceback[-1]
728        if not ov.pending:
729            # The operation has completed, so no need to postpone the
730            # work.  We cannot take this short cut if we need the
731            # NumberOfBytes, CompletionKey values returned by
732            # PostQueuedCompletionStatus().
733            try:
734                value = callback(None, None, ov)
735            except OSError as e:
736                f.set_exception(e)
737            else:
738                f.set_result(value)
739            # Even if GetOverlappedResult() was called, we have to wait for the
740            # notification of the completion in GetQueuedCompletionStatus().
741            # Register the overlapped operation to keep a reference to the
742            # OVERLAPPED object, otherwise the memory is freed and Windows may
743            # read uninitialized memory.
744
745        # Register the overlapped operation for later.  Note that
746        # we only store obj to prevent it from being garbage
747        # collected too early.
748        self._cache[ov.address] = (f, ov, obj, callback)
749        return f
750
751    def _unregister(self, ov):
752        """Unregister an overlapped object.
753
754        Call this method when its future has been cancelled. The event can
755        already be signalled (pending in the proactor event queue). It is also
756        safe if the event is never signalled (because it was cancelled).
757        """
758        self._check_closed()
759        self._unregistered.append(ov)
760
761    def _get_accept_socket(self, family):
762        s = socket.socket(family)
763        s.settimeout(0)
764        return s
765
766    def _poll(self, timeout=None):
767        if timeout is None:
768            ms = INFINITE
769        elif timeout < 0:
770            raise ValueError("negative timeout")
771        else:
772            # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
773            # round away from zero to wait *at least* timeout seconds.
774            ms = math.ceil(timeout * 1e3)
775            if ms >= INFINITE:
776                raise ValueError("timeout too big")
777
778        while True:
779            status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
780            if status is None:
781                break
782            ms = 0
783
784            err, transferred, key, address = status
785            try:
786                f, ov, obj, callback = self._cache.pop(address)
787            except KeyError:
788                if self._loop.get_debug():
789                    self._loop.call_exception_handler({
790                        'message': ('GetQueuedCompletionStatus() returned an '
791                                    'unexpected event'),
792                        'status': ('err=%s transferred=%s key=%#x address=%#x'
793                                   % (err, transferred, key, address)),
794                    })
795
796                # key is either zero, or it is used to return a pipe
797                # handle which should be closed to avoid a leak.
798                if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
799                    _winapi.CloseHandle(key)
800                continue
801
802            if obj in self._stopped_serving:
803                f.cancel()
804            # Don't call the callback if _register() already read the result or
805            # if the overlapped has been cancelled
806            elif not f.done():
807                try:
808                    value = callback(transferred, key, ov)
809                except OSError as e:
810                    f.set_exception(e)
811                    self._results.append(f)
812                else:
813                    f.set_result(value)
814                    self._results.append(f)
815
816        # Remove unregistered futures
817        for ov in self._unregistered:
818            self._cache.pop(ov.address, None)
819        self._unregistered.clear()
820
821    def _stop_serving(self, obj):
822        # obj is a socket or pipe handle.  It will be closed in
823        # BaseProactorEventLoop._stop_serving() which will make any
824        # pending operations fail quickly.
825        self._stopped_serving.add(obj)
826
827    def close(self):
828        if self._iocp is None:
829            # already closed
830            return
831
832        # Cancel remaining registered operations.
833        for address, (fut, ov, obj, callback) in list(self._cache.items()):
834            if fut.cancelled():
835                # Nothing to do with cancelled futures
836                pass
837            elif isinstance(fut, _WaitCancelFuture):
838                # _WaitCancelFuture must not be cancelled
839                pass
840            else:
841                try:
842                    fut.cancel()
843                except OSError as exc:
844                    if self._loop is not None:
845                        context = {
846                            'message': 'Cancelling a future failed',
847                            'exception': exc,
848                            'future': fut,
849                        }
850                        if fut._source_traceback:
851                            context['source_traceback'] = fut._source_traceback
852                        self._loop.call_exception_handler(context)
853
854        # Wait until all cancelled overlapped complete: don't exit with running
855        # overlapped to prevent a crash. Display progress every second if the
856        # loop is still running.
857        msg_update = 1.0
858        start_time = time.monotonic()
859        next_msg = start_time + msg_update
860        while self._cache:
861            if next_msg <= time.monotonic():
862                logger.debug('%r is running after closing for %.1f seconds',
863                             self, time.monotonic() - start_time)
864                next_msg = time.monotonic() + msg_update
865
866            # handle a few events, or timeout
867            self._poll(msg_update)
868
869        self._results = []
870
871        _winapi.CloseHandle(self._iocp)
872        self._iocp = None
873
874    def __del__(self):
875        self.close()
876
877
878class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
879
880    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
881        self._proc = windows_utils.Popen(
882            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
883            bufsize=bufsize, **kwargs)
884
885        def callback(f):
886            returncode = self._proc.poll()
887            self._process_exited(returncode)
888
889        f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
890        f.add_done_callback(callback)
891
892
893SelectorEventLoop = _WindowsSelectorEventLoop
894
895
896class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
897    _loop_factory = SelectorEventLoop
898
899
900class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
901    _loop_factory = ProactorEventLoop
902
903
904DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy
905