1"""Selector and proactor event loops for Windows.""" 2 3import _overlapped 4import _winapi 5import errno 6import math 7import msvcrt 8import socket 9import struct 10import time 11import weakref 12 13from . import events 14from . import base_subprocess 15from . import futures 16from . import exceptions 17from . import proactor_events 18from . import selector_events 19from . import tasks 20from . import windows_utils 21from .log import logger 22 23 24__all__ = ( 25 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 26 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 27 'WindowsProactorEventLoopPolicy', 28) 29 30 31NULL = 0 32INFINITE = 0xffffffff 33ERROR_CONNECTION_REFUSED = 1225 34ERROR_CONNECTION_ABORTED = 1236 35 36# Initial delay in seconds for connect_pipe() before retrying to connect 37CONNECT_PIPE_INIT_DELAY = 0.001 38 39# Maximum delay in seconds for connect_pipe() before retrying to connect 40CONNECT_PIPE_MAX_DELAY = 0.100 41 42 43class _OverlappedFuture(futures.Future): 44 """Subclass of Future which represents an overlapped operation. 45 46 Cancelling it will immediately cancel the overlapped operation. 47 """ 48 49 def __init__(self, ov, *, loop=None): 50 super().__init__(loop=loop) 51 if self._source_traceback: 52 del self._source_traceback[-1] 53 self._ov = ov 54 55 def _repr_info(self): 56 info = super()._repr_info() 57 if self._ov is not None: 58 state = 'pending' if self._ov.pending else 'completed' 59 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 60 return info 61 62 def _cancel_overlapped(self): 63 if self._ov is None: 64 return 65 try: 66 self._ov.cancel() 67 except OSError as exc: 68 context = { 69 'message': 'Cancelling an overlapped future failed', 70 'exception': exc, 71 'future': self, 72 } 73 if self._source_traceback: 74 context['source_traceback'] = self._source_traceback 75 self._loop.call_exception_handler(context) 76 self._ov = None 77 78 def cancel(self, msg=None): 79 self._cancel_overlapped() 80 return super().cancel(msg=msg) 81 82 def set_exception(self, exception): 83 super().set_exception(exception) 84 self._cancel_overlapped() 85 86 def set_result(self, result): 87 super().set_result(result) 88 self._ov = None 89 90 91class _BaseWaitHandleFuture(futures.Future): 92 """Subclass of Future which represents a wait handle.""" 93 94 def __init__(self, ov, handle, wait_handle, *, loop=None): 95 super().__init__(loop=loop) 96 if self._source_traceback: 97 del self._source_traceback[-1] 98 # Keep a reference to the Overlapped object to keep it alive until the 99 # wait is unregistered 100 self._ov = ov 101 self._handle = handle 102 self._wait_handle = wait_handle 103 104 # Should we call UnregisterWaitEx() if the wait completes 105 # or is cancelled? 106 self._registered = True 107 108 def _poll(self): 109 # non-blocking wait: use a timeout of 0 millisecond 110 return (_winapi.WaitForSingleObject(self._handle, 0) == 111 _winapi.WAIT_OBJECT_0) 112 113 def _repr_info(self): 114 info = super()._repr_info() 115 info.append(f'handle={self._handle:#x}') 116 if self._handle is not None: 117 state = 'signaled' if self._poll() else 'waiting' 118 info.append(state) 119 if self._wait_handle is not None: 120 info.append(f'wait_handle={self._wait_handle:#x}') 121 return info 122 123 def _unregister_wait_cb(self, fut): 124 # The wait was unregistered: it's not safe to destroy the Overlapped 125 # object 126 self._ov = None 127 128 def _unregister_wait(self): 129 if not self._registered: 130 return 131 self._registered = False 132 133 wait_handle = self._wait_handle 134 self._wait_handle = None 135 try: 136 _overlapped.UnregisterWait(wait_handle) 137 except OSError as exc: 138 if exc.winerror != _overlapped.ERROR_IO_PENDING: 139 context = { 140 'message': 'Failed to unregister the wait handle', 141 'exception': exc, 142 'future': self, 143 } 144 if self._source_traceback: 145 context['source_traceback'] = self._source_traceback 146 self._loop.call_exception_handler(context) 147 return 148 # ERROR_IO_PENDING means that the unregister is pending 149 150 self._unregister_wait_cb(None) 151 152 def cancel(self, msg=None): 153 self._unregister_wait() 154 return super().cancel(msg=msg) 155 156 def set_exception(self, exception): 157 self._unregister_wait() 158 super().set_exception(exception) 159 160 def set_result(self, result): 161 self._unregister_wait() 162 super().set_result(result) 163 164 165class _WaitCancelFuture(_BaseWaitHandleFuture): 166 """Subclass of Future which represents a wait for the cancellation of a 167 _WaitHandleFuture using an event. 168 """ 169 170 def __init__(self, ov, event, wait_handle, *, loop=None): 171 super().__init__(ov, event, wait_handle, loop=loop) 172 173 self._done_callback = None 174 175 def cancel(self): 176 raise RuntimeError("_WaitCancelFuture must not be cancelled") 177 178 def set_result(self, result): 179 super().set_result(result) 180 if self._done_callback is not None: 181 self._done_callback(self) 182 183 def set_exception(self, exception): 184 super().set_exception(exception) 185 if self._done_callback is not None: 186 self._done_callback(self) 187 188 189class _WaitHandleFuture(_BaseWaitHandleFuture): 190 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 191 super().__init__(ov, handle, wait_handle, loop=loop) 192 self._proactor = proactor 193 self._unregister_proactor = True 194 self._event = _overlapped.CreateEvent(None, True, False, None) 195 self._event_fut = None 196 197 def _unregister_wait_cb(self, fut): 198 if self._event is not None: 199 _winapi.CloseHandle(self._event) 200 self._event = None 201 self._event_fut = None 202 203 # If the wait was cancelled, the wait may never be signalled, so 204 # it's required to unregister it. Otherwise, IocpProactor.close() will 205 # wait forever for an event which will never come. 206 # 207 # If the IocpProactor already received the event, it's safe to call 208 # _unregister() because we kept a reference to the Overlapped object 209 # which is used as a unique key. 210 self._proactor._unregister(self._ov) 211 self._proactor = None 212 213 super()._unregister_wait_cb(fut) 214 215 def _unregister_wait(self): 216 if not self._registered: 217 return 218 self._registered = False 219 220 wait_handle = self._wait_handle 221 self._wait_handle = None 222 try: 223 _overlapped.UnregisterWaitEx(wait_handle, self._event) 224 except OSError as exc: 225 if exc.winerror != _overlapped.ERROR_IO_PENDING: 226 context = { 227 'message': 'Failed to unregister the wait handle', 228 'exception': exc, 229 'future': self, 230 } 231 if self._source_traceback: 232 context['source_traceback'] = self._source_traceback 233 self._loop.call_exception_handler(context) 234 return 235 # ERROR_IO_PENDING is not an error, the wait was unregistered 236 237 self._event_fut = self._proactor._wait_cancel(self._event, 238 self._unregister_wait_cb) 239 240 241class PipeServer(object): 242 """Class representing a pipe server. 243 244 This is much like a bound, listening socket. 245 """ 246 def __init__(self, address): 247 self._address = address 248 self._free_instances = weakref.WeakSet() 249 # initialize the pipe attribute before calling _server_pipe_handle() 250 # because this function can raise an exception and the destructor calls 251 # the close() method 252 self._pipe = None 253 self._accept_pipe_future = None 254 self._pipe = self._server_pipe_handle(True) 255 256 def _get_unconnected_pipe(self): 257 # Create new instance and return previous one. This ensures 258 # that (until the server is closed) there is always at least 259 # one pipe handle for address. Therefore if a client attempt 260 # to connect it will not fail with FileNotFoundError. 261 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 262 return tmp 263 264 def _server_pipe_handle(self, first): 265 # Return a wrapper for a new pipe handle. 266 if self.closed(): 267 return None 268 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 269 if first: 270 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 271 h = _winapi.CreateNamedPipe( 272 self._address, flags, 273 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 274 _winapi.PIPE_WAIT, 275 _winapi.PIPE_UNLIMITED_INSTANCES, 276 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 277 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 278 pipe = windows_utils.PipeHandle(h) 279 self._free_instances.add(pipe) 280 return pipe 281 282 def closed(self): 283 return (self._address is None) 284 285 def close(self): 286 if self._accept_pipe_future is not None: 287 self._accept_pipe_future.cancel() 288 self._accept_pipe_future = None 289 # Close all instances which have not been connected to by a client. 290 if self._address is not None: 291 for pipe in self._free_instances: 292 pipe.close() 293 self._pipe = None 294 self._address = None 295 self._free_instances.clear() 296 297 __del__ = close 298 299 300class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 301 """Windows version of selector event loop.""" 302 303 304class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 305 """Windows version of proactor event loop using IOCP.""" 306 307 def __init__(self, proactor=None): 308 if proactor is None: 309 proactor = IocpProactor() 310 super().__init__(proactor) 311 312 def run_forever(self): 313 try: 314 assert self._self_reading_future is None 315 self.call_soon(self._loop_self_reading) 316 super().run_forever() 317 finally: 318 if self._self_reading_future is not None: 319 ov = self._self_reading_future._ov 320 self._self_reading_future.cancel() 321 # self_reading_future was just cancelled so if it hasn't been 322 # finished yet, it never will be (it's possible that it has 323 # already finished and its callback is waiting in the queue, 324 # where it could still happen if the event loop is restarted). 325 # Unregister it otherwise IocpProactor.close will wait for it 326 # forever 327 if ov is not None: 328 self._proactor._unregister(ov) 329 self._self_reading_future = None 330 331 async def create_pipe_connection(self, protocol_factory, address): 332 f = self._proactor.connect_pipe(address) 333 pipe = await f 334 protocol = protocol_factory() 335 trans = self._make_duplex_pipe_transport(pipe, protocol, 336 extra={'addr': address}) 337 return trans, protocol 338 339 async def start_serving_pipe(self, protocol_factory, address): 340 server = PipeServer(address) 341 342 def loop_accept_pipe(f=None): 343 pipe = None 344 try: 345 if f: 346 pipe = f.result() 347 server._free_instances.discard(pipe) 348 349 if server.closed(): 350 # A client connected before the server was closed: 351 # drop the client (close the pipe) and exit 352 pipe.close() 353 return 354 355 protocol = protocol_factory() 356 self._make_duplex_pipe_transport( 357 pipe, protocol, extra={'addr': address}) 358 359 pipe = server._get_unconnected_pipe() 360 if pipe is None: 361 return 362 363 f = self._proactor.accept_pipe(pipe) 364 except OSError as exc: 365 if pipe and pipe.fileno() != -1: 366 self.call_exception_handler({ 367 'message': 'Pipe accept failed', 368 'exception': exc, 369 'pipe': pipe, 370 }) 371 pipe.close() 372 elif self._debug: 373 logger.warning("Accept pipe failed on pipe %r", 374 pipe, exc_info=True) 375 except exceptions.CancelledError: 376 if pipe: 377 pipe.close() 378 else: 379 server._accept_pipe_future = f 380 f.add_done_callback(loop_accept_pipe) 381 382 self.call_soon(loop_accept_pipe) 383 return [server] 384 385 async def _make_subprocess_transport(self, protocol, args, shell, 386 stdin, stdout, stderr, bufsize, 387 extra=None, **kwargs): 388 waiter = self.create_future() 389 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 390 stdin, stdout, stderr, bufsize, 391 waiter=waiter, extra=extra, 392 **kwargs) 393 try: 394 await waiter 395 except (SystemExit, KeyboardInterrupt): 396 raise 397 except BaseException: 398 transp.close() 399 await transp._wait() 400 raise 401 402 return transp 403 404 405class IocpProactor: 406 """Proactor implementation using IOCP.""" 407 408 def __init__(self, concurrency=0xffffffff): 409 self._loop = None 410 self._results = [] 411 self._iocp = _overlapped.CreateIoCompletionPort( 412 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 413 self._cache = {} 414 self._registered = weakref.WeakSet() 415 self._unregistered = [] 416 self._stopped_serving = weakref.WeakSet() 417 418 def _check_closed(self): 419 if self._iocp is None: 420 raise RuntimeError('IocpProactor is closed') 421 422 def __repr__(self): 423 info = ['overlapped#=%s' % len(self._cache), 424 'result#=%s' % len(self._results)] 425 if self._iocp is None: 426 info.append('closed') 427 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 428 429 def set_loop(self, loop): 430 self._loop = loop 431 432 def select(self, timeout=None): 433 if not self._results: 434 self._poll(timeout) 435 tmp = self._results 436 self._results = [] 437 return tmp 438 439 def _result(self, value): 440 fut = self._loop.create_future() 441 fut.set_result(value) 442 return fut 443 444 def recv(self, conn, nbytes, flags=0): 445 self._register_with_iocp(conn) 446 ov = _overlapped.Overlapped(NULL) 447 try: 448 if isinstance(conn, socket.socket): 449 ov.WSARecv(conn.fileno(), nbytes, flags) 450 else: 451 ov.ReadFile(conn.fileno(), nbytes) 452 except BrokenPipeError: 453 return self._result(b'') 454 455 def finish_recv(trans, key, ov): 456 try: 457 return ov.getresult() 458 except OSError as exc: 459 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 460 _overlapped.ERROR_OPERATION_ABORTED): 461 raise ConnectionResetError(*exc.args) 462 else: 463 raise 464 465 return self._register(ov, conn, finish_recv) 466 467 def recv_into(self, conn, buf, flags=0): 468 self._register_with_iocp(conn) 469 ov = _overlapped.Overlapped(NULL) 470 try: 471 if isinstance(conn, socket.socket): 472 ov.WSARecvInto(conn.fileno(), buf, flags) 473 else: 474 ov.ReadFileInto(conn.fileno(), buf) 475 except BrokenPipeError: 476 return self._result(0) 477 478 def finish_recv(trans, key, ov): 479 try: 480 return ov.getresult() 481 except OSError as exc: 482 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 483 _overlapped.ERROR_OPERATION_ABORTED): 484 raise ConnectionResetError(*exc.args) 485 else: 486 raise 487 488 return self._register(ov, conn, finish_recv) 489 490 def recvfrom(self, conn, nbytes, flags=0): 491 self._register_with_iocp(conn) 492 ov = _overlapped.Overlapped(NULL) 493 try: 494 ov.WSARecvFrom(conn.fileno(), nbytes, flags) 495 except BrokenPipeError: 496 return self._result((b'', None)) 497 498 def finish_recv(trans, key, ov): 499 try: 500 return ov.getresult() 501 except OSError as exc: 502 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 503 _overlapped.ERROR_OPERATION_ABORTED): 504 raise ConnectionResetError(*exc.args) 505 else: 506 raise 507 508 return self._register(ov, conn, finish_recv) 509 510 def sendto(self, conn, buf, flags=0, addr=None): 511 self._register_with_iocp(conn) 512 ov = _overlapped.Overlapped(NULL) 513 514 ov.WSASendTo(conn.fileno(), buf, flags, addr) 515 516 def finish_send(trans, key, ov): 517 try: 518 return ov.getresult() 519 except OSError as exc: 520 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 521 _overlapped.ERROR_OPERATION_ABORTED): 522 raise ConnectionResetError(*exc.args) 523 else: 524 raise 525 526 return self._register(ov, conn, finish_send) 527 528 def send(self, conn, buf, flags=0): 529 self._register_with_iocp(conn) 530 ov = _overlapped.Overlapped(NULL) 531 if isinstance(conn, socket.socket): 532 ov.WSASend(conn.fileno(), buf, flags) 533 else: 534 ov.WriteFile(conn.fileno(), buf) 535 536 def finish_send(trans, key, ov): 537 try: 538 return ov.getresult() 539 except OSError as exc: 540 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 541 _overlapped.ERROR_OPERATION_ABORTED): 542 raise ConnectionResetError(*exc.args) 543 else: 544 raise 545 546 return self._register(ov, conn, finish_send) 547 548 def accept(self, listener): 549 self._register_with_iocp(listener) 550 conn = self._get_accept_socket(listener.family) 551 ov = _overlapped.Overlapped(NULL) 552 ov.AcceptEx(listener.fileno(), conn.fileno()) 553 554 def finish_accept(trans, key, ov): 555 ov.getresult() 556 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 557 buf = struct.pack('@P', listener.fileno()) 558 conn.setsockopt(socket.SOL_SOCKET, 559 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 560 conn.settimeout(listener.gettimeout()) 561 return conn, conn.getpeername() 562 563 async def accept_coro(future, conn): 564 # Coroutine closing the accept socket if the future is cancelled 565 try: 566 await future 567 except exceptions.CancelledError: 568 conn.close() 569 raise 570 571 future = self._register(ov, listener, finish_accept) 572 coro = accept_coro(future, conn) 573 tasks.ensure_future(coro, loop=self._loop) 574 return future 575 576 def connect(self, conn, address): 577 if conn.type == socket.SOCK_DGRAM: 578 # WSAConnect will complete immediately for UDP sockets so we don't 579 # need to register any IOCP operation 580 _overlapped.WSAConnect(conn.fileno(), address) 581 fut = self._loop.create_future() 582 fut.set_result(None) 583 return fut 584 585 self._register_with_iocp(conn) 586 # The socket needs to be locally bound before we call ConnectEx(). 587 try: 588 _overlapped.BindLocal(conn.fileno(), conn.family) 589 except OSError as e: 590 if e.winerror != errno.WSAEINVAL: 591 raise 592 # Probably already locally bound; check using getsockname(). 593 if conn.getsockname()[1] == 0: 594 raise 595 ov = _overlapped.Overlapped(NULL) 596 ov.ConnectEx(conn.fileno(), address) 597 598 def finish_connect(trans, key, ov): 599 ov.getresult() 600 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 601 conn.setsockopt(socket.SOL_SOCKET, 602 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 603 return conn 604 605 return self._register(ov, conn, finish_connect) 606 607 def sendfile(self, sock, file, offset, count): 608 self._register_with_iocp(sock) 609 ov = _overlapped.Overlapped(NULL) 610 offset_low = offset & 0xffff_ffff 611 offset_high = (offset >> 32) & 0xffff_ffff 612 ov.TransmitFile(sock.fileno(), 613 msvcrt.get_osfhandle(file.fileno()), 614 offset_low, offset_high, 615 count, 0, 0) 616 617 def finish_sendfile(trans, key, ov): 618 try: 619 return ov.getresult() 620 except OSError as exc: 621 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 622 _overlapped.ERROR_OPERATION_ABORTED): 623 raise ConnectionResetError(*exc.args) 624 else: 625 raise 626 return self._register(ov, sock, finish_sendfile) 627 628 def accept_pipe(self, pipe): 629 self._register_with_iocp(pipe) 630 ov = _overlapped.Overlapped(NULL) 631 connected = ov.ConnectNamedPipe(pipe.fileno()) 632 633 if connected: 634 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 635 # that the pipe is connected. There is no need to wait for the 636 # completion of the connection. 637 return self._result(pipe) 638 639 def finish_accept_pipe(trans, key, ov): 640 ov.getresult() 641 return pipe 642 643 return self._register(ov, pipe, finish_accept_pipe) 644 645 async def connect_pipe(self, address): 646 delay = CONNECT_PIPE_INIT_DELAY 647 while True: 648 # Unfortunately there is no way to do an overlapped connect to 649 # a pipe. Call CreateFile() in a loop until it doesn't fail with 650 # ERROR_PIPE_BUSY. 651 try: 652 handle = _overlapped.ConnectPipe(address) 653 break 654 except OSError as exc: 655 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 656 raise 657 658 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 659 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 660 await tasks.sleep(delay) 661 662 return windows_utils.PipeHandle(handle) 663 664 def wait_for_handle(self, handle, timeout=None): 665 """Wait for a handle. 666 667 Return a Future object. The result of the future is True if the wait 668 completed, or False if the wait did not complete (on timeout). 669 """ 670 return self._wait_for_handle(handle, timeout, False) 671 672 def _wait_cancel(self, event, done_callback): 673 fut = self._wait_for_handle(event, None, True) 674 # add_done_callback() cannot be used because the wait may only complete 675 # in IocpProactor.close(), while the event loop is not running. 676 fut._done_callback = done_callback 677 return fut 678 679 def _wait_for_handle(self, handle, timeout, _is_cancel): 680 self._check_closed() 681 682 if timeout is None: 683 ms = _winapi.INFINITE 684 else: 685 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 686 # round away from zero to wait *at least* timeout seconds. 687 ms = math.ceil(timeout * 1e3) 688 689 # We only create ov so we can use ov.address as a key for the cache. 690 ov = _overlapped.Overlapped(NULL) 691 wait_handle = _overlapped.RegisterWaitWithQueue( 692 handle, self._iocp, ov.address, ms) 693 if _is_cancel: 694 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 695 else: 696 f = _WaitHandleFuture(ov, handle, wait_handle, self, 697 loop=self._loop) 698 if f._source_traceback: 699 del f._source_traceback[-1] 700 701 def finish_wait_for_handle(trans, key, ov): 702 # Note that this second wait means that we should only use 703 # this with handles types where a successful wait has no 704 # effect. So events or processes are all right, but locks 705 # or semaphores are not. Also note if the handle is 706 # signalled and then quickly reset, then we may return 707 # False even though we have not timed out. 708 return f._poll() 709 710 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 711 return f 712 713 def _register_with_iocp(self, obj): 714 # To get notifications of finished ops on this objects sent to the 715 # completion port, were must register the handle. 716 if obj not in self._registered: 717 self._registered.add(obj) 718 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 719 # XXX We could also use SetFileCompletionNotificationModes() 720 # to avoid sending notifications to completion port of ops 721 # that succeed immediately. 722 723 def _register(self, ov, obj, callback): 724 self._check_closed() 725 726 # Return a future which will be set with the result of the 727 # operation when it completes. The future's value is actually 728 # the value returned by callback(). 729 f = _OverlappedFuture(ov, loop=self._loop) 730 if f._source_traceback: 731 del f._source_traceback[-1] 732 if not ov.pending: 733 # The operation has completed, so no need to postpone the 734 # work. We cannot take this short cut if we need the 735 # NumberOfBytes, CompletionKey values returned by 736 # PostQueuedCompletionStatus(). 737 try: 738 value = callback(None, None, ov) 739 except OSError as e: 740 f.set_exception(e) 741 else: 742 f.set_result(value) 743 # Even if GetOverlappedResult() was called, we have to wait for the 744 # notification of the completion in GetQueuedCompletionStatus(). 745 # Register the overlapped operation to keep a reference to the 746 # OVERLAPPED object, otherwise the memory is freed and Windows may 747 # read uninitialized memory. 748 749 # Register the overlapped operation for later. Note that 750 # we only store obj to prevent it from being garbage 751 # collected too early. 752 self._cache[ov.address] = (f, ov, obj, callback) 753 return f 754 755 def _unregister(self, ov): 756 """Unregister an overlapped object. 757 758 Call this method when its future has been cancelled. The event can 759 already be signalled (pending in the proactor event queue). It is also 760 safe if the event is never signalled (because it was cancelled). 761 """ 762 self._check_closed() 763 self._unregistered.append(ov) 764 765 def _get_accept_socket(self, family): 766 s = socket.socket(family) 767 s.settimeout(0) 768 return s 769 770 def _poll(self, timeout=None): 771 if timeout is None: 772 ms = INFINITE 773 elif timeout < 0: 774 raise ValueError("negative timeout") 775 else: 776 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 777 # round away from zero to wait *at least* timeout seconds. 778 ms = math.ceil(timeout * 1e3) 779 if ms >= INFINITE: 780 raise ValueError("timeout too big") 781 782 while True: 783 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 784 if status is None: 785 break 786 ms = 0 787 788 err, transferred, key, address = status 789 try: 790 f, ov, obj, callback = self._cache.pop(address) 791 except KeyError: 792 if self._loop.get_debug(): 793 self._loop.call_exception_handler({ 794 'message': ('GetQueuedCompletionStatus() returned an ' 795 'unexpected event'), 796 'status': ('err=%s transferred=%s key=%#x address=%#x' 797 % (err, transferred, key, address)), 798 }) 799 800 # key is either zero, or it is used to return a pipe 801 # handle which should be closed to avoid a leak. 802 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 803 _winapi.CloseHandle(key) 804 continue 805 806 if obj in self._stopped_serving: 807 f.cancel() 808 # Don't call the callback if _register() already read the result or 809 # if the overlapped has been cancelled 810 elif not f.done(): 811 try: 812 value = callback(transferred, key, ov) 813 except OSError as e: 814 f.set_exception(e) 815 self._results.append(f) 816 else: 817 f.set_result(value) 818 self._results.append(f) 819 820 # Remove unregistered futures 821 for ov in self._unregistered: 822 self._cache.pop(ov.address, None) 823 self._unregistered.clear() 824 825 def _stop_serving(self, obj): 826 # obj is a socket or pipe handle. It will be closed in 827 # BaseProactorEventLoop._stop_serving() which will make any 828 # pending operations fail quickly. 829 self._stopped_serving.add(obj) 830 831 def close(self): 832 if self._iocp is None: 833 # already closed 834 return 835 836 # Cancel remaining registered operations. 837 for address, (fut, ov, obj, callback) in list(self._cache.items()): 838 if fut.cancelled(): 839 # Nothing to do with cancelled futures 840 pass 841 elif isinstance(fut, _WaitCancelFuture): 842 # _WaitCancelFuture must not be cancelled 843 pass 844 else: 845 try: 846 fut.cancel() 847 except OSError as exc: 848 if self._loop is not None: 849 context = { 850 'message': 'Cancelling a future failed', 851 'exception': exc, 852 'future': fut, 853 } 854 if fut._source_traceback: 855 context['source_traceback'] = fut._source_traceback 856 self._loop.call_exception_handler(context) 857 858 # Wait until all cancelled overlapped complete: don't exit with running 859 # overlapped to prevent a crash. Display progress every second if the 860 # loop is still running. 861 msg_update = 1.0 862 start_time = time.monotonic() 863 next_msg = start_time + msg_update 864 while self._cache: 865 if next_msg <= time.monotonic(): 866 logger.debug('%r is running after closing for %.1f seconds', 867 self, time.monotonic() - start_time) 868 next_msg = time.monotonic() + msg_update 869 870 # handle a few events, or timeout 871 self._poll(msg_update) 872 873 self._results = [] 874 875 _winapi.CloseHandle(self._iocp) 876 self._iocp = None 877 878 def __del__(self): 879 self.close() 880 881 882class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 883 884 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 885 self._proc = windows_utils.Popen( 886 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 887 bufsize=bufsize, **kwargs) 888 889 def callback(f): 890 returncode = self._proc.poll() 891 self._process_exited(returncode) 892 893 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 894 f.add_done_callback(callback) 895 896 897SelectorEventLoop = _WindowsSelectorEventLoop 898 899 900class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 901 _loop_factory = SelectorEventLoop 902 903 904class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 905 _loop_factory = ProactorEventLoop 906 907 908DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy 909