1"""Selector and proactor event loops for Windows.""" 2 3import sys 4 5if sys.platform != 'win32': # pragma: no cover 6 raise ImportError('win32 only') 7 8import _overlapped 9import _winapi 10import errno 11import math 12import msvcrt 13import socket 14import struct 15import time 16import weakref 17 18from . import events 19from . import base_subprocess 20from . import futures 21from . import exceptions 22from . import proactor_events 23from . import selector_events 24from . import tasks 25from . import windows_utils 26from .log import logger 27 28 29__all__ = ( 30 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 31 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 32 'WindowsProactorEventLoopPolicy', 33) 34 35 36NULL = 0 37INFINITE = 0xffffffff 38ERROR_CONNECTION_REFUSED = 1225 39ERROR_CONNECTION_ABORTED = 1236 40 41# Initial delay in seconds for connect_pipe() before retrying to connect 42CONNECT_PIPE_INIT_DELAY = 0.001 43 44# Maximum delay in seconds for connect_pipe() before retrying to connect 45CONNECT_PIPE_MAX_DELAY = 0.100 46 47 48class _OverlappedFuture(futures.Future): 49 """Subclass of Future which represents an overlapped operation. 50 51 Cancelling it will immediately cancel the overlapped operation. 52 """ 53 54 def __init__(self, ov, *, loop=None): 55 super().__init__(loop=loop) 56 if self._source_traceback: 57 del self._source_traceback[-1] 58 self._ov = ov 59 60 def _repr_info(self): 61 info = super()._repr_info() 62 if self._ov is not None: 63 state = 'pending' if self._ov.pending else 'completed' 64 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 65 return info 66 67 def _cancel_overlapped(self): 68 if self._ov is None: 69 return 70 try: 71 self._ov.cancel() 72 except OSError as exc: 73 context = { 74 'message': 'Cancelling an overlapped future failed', 75 'exception': exc, 76 'future': self, 77 } 78 if self._source_traceback: 79 context['source_traceback'] = self._source_traceback 80 self._loop.call_exception_handler(context) 81 self._ov = None 82 83 def cancel(self, msg=None): 84 self._cancel_overlapped() 85 return super().cancel(msg=msg) 86 87 def set_exception(self, exception): 88 super().set_exception(exception) 89 self._cancel_overlapped() 90 91 def set_result(self, result): 92 super().set_result(result) 93 self._ov = None 94 95 96class _BaseWaitHandleFuture(futures.Future): 97 """Subclass of Future which represents a wait handle.""" 98 99 def __init__(self, ov, handle, wait_handle, *, loop=None): 100 super().__init__(loop=loop) 101 if self._source_traceback: 102 del self._source_traceback[-1] 103 # Keep a reference to the Overlapped object to keep it alive until the 104 # wait is unregistered 105 self._ov = ov 106 self._handle = handle 107 self._wait_handle = wait_handle 108 109 # Should we call UnregisterWaitEx() if the wait completes 110 # or is cancelled? 111 self._registered = True 112 113 def _poll(self): 114 # non-blocking wait: use a timeout of 0 millisecond 115 return (_winapi.WaitForSingleObject(self._handle, 0) == 116 _winapi.WAIT_OBJECT_0) 117 118 def _repr_info(self): 119 info = super()._repr_info() 120 info.append(f'handle={self._handle:#x}') 121 if self._handle is not None: 122 state = 'signaled' if self._poll() else 'waiting' 123 info.append(state) 124 if self._wait_handle is not None: 125 info.append(f'wait_handle={self._wait_handle:#x}') 126 return info 127 128 def _unregister_wait_cb(self, fut): 129 # The wait was unregistered: it's not safe to destroy the Overlapped 130 # object 131 self._ov = None 132 133 def _unregister_wait(self): 134 if not self._registered: 135 return 136 self._registered = False 137 138 wait_handle = self._wait_handle 139 self._wait_handle = None 140 try: 141 _overlapped.UnregisterWait(wait_handle) 142 except OSError as exc: 143 if exc.winerror != _overlapped.ERROR_IO_PENDING: 144 context = { 145 'message': 'Failed to unregister the wait handle', 146 'exception': exc, 147 'future': self, 148 } 149 if self._source_traceback: 150 context['source_traceback'] = self._source_traceback 151 self._loop.call_exception_handler(context) 152 return 153 # ERROR_IO_PENDING means that the unregister is pending 154 155 self._unregister_wait_cb(None) 156 157 def cancel(self, msg=None): 158 self._unregister_wait() 159 return super().cancel(msg=msg) 160 161 def set_exception(self, exception): 162 self._unregister_wait() 163 super().set_exception(exception) 164 165 def set_result(self, result): 166 self._unregister_wait() 167 super().set_result(result) 168 169 170class _WaitCancelFuture(_BaseWaitHandleFuture): 171 """Subclass of Future which represents a wait for the cancellation of a 172 _WaitHandleFuture using an event. 173 """ 174 175 def __init__(self, ov, event, wait_handle, *, loop=None): 176 super().__init__(ov, event, wait_handle, loop=loop) 177 178 self._done_callback = None 179 180 def cancel(self): 181 raise RuntimeError("_WaitCancelFuture must not be cancelled") 182 183 def set_result(self, result): 184 super().set_result(result) 185 if self._done_callback is not None: 186 self._done_callback(self) 187 188 def set_exception(self, exception): 189 super().set_exception(exception) 190 if self._done_callback is not None: 191 self._done_callback(self) 192 193 194class _WaitHandleFuture(_BaseWaitHandleFuture): 195 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 196 super().__init__(ov, handle, wait_handle, loop=loop) 197 self._proactor = proactor 198 self._unregister_proactor = True 199 self._event = _overlapped.CreateEvent(None, True, False, None) 200 self._event_fut = None 201 202 def _unregister_wait_cb(self, fut): 203 if self._event is not None: 204 _winapi.CloseHandle(self._event) 205 self._event = None 206 self._event_fut = None 207 208 # If the wait was cancelled, the wait may never be signalled, so 209 # it's required to unregister it. Otherwise, IocpProactor.close() will 210 # wait forever for an event which will never come. 211 # 212 # If the IocpProactor already received the event, it's safe to call 213 # _unregister() because we kept a reference to the Overlapped object 214 # which is used as a unique key. 215 self._proactor._unregister(self._ov) 216 self._proactor = None 217 218 super()._unregister_wait_cb(fut) 219 220 def _unregister_wait(self): 221 if not self._registered: 222 return 223 self._registered = False 224 225 wait_handle = self._wait_handle 226 self._wait_handle = None 227 try: 228 _overlapped.UnregisterWaitEx(wait_handle, self._event) 229 except OSError as exc: 230 if exc.winerror != _overlapped.ERROR_IO_PENDING: 231 context = { 232 'message': 'Failed to unregister the wait handle', 233 'exception': exc, 234 'future': self, 235 } 236 if self._source_traceback: 237 context['source_traceback'] = self._source_traceback 238 self._loop.call_exception_handler(context) 239 return 240 # ERROR_IO_PENDING is not an error, the wait was unregistered 241 242 self._event_fut = self._proactor._wait_cancel(self._event, 243 self._unregister_wait_cb) 244 245 246class PipeServer(object): 247 """Class representing a pipe server. 248 249 This is much like a bound, listening socket. 250 """ 251 def __init__(self, address): 252 self._address = address 253 self._free_instances = weakref.WeakSet() 254 # initialize the pipe attribute before calling _server_pipe_handle() 255 # because this function can raise an exception and the destructor calls 256 # the close() method 257 self._pipe = None 258 self._accept_pipe_future = None 259 self._pipe = self._server_pipe_handle(True) 260 261 def _get_unconnected_pipe(self): 262 # Create new instance and return previous one. This ensures 263 # that (until the server is closed) there is always at least 264 # one pipe handle for address. Therefore if a client attempt 265 # to connect it will not fail with FileNotFoundError. 266 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 267 return tmp 268 269 def _server_pipe_handle(self, first): 270 # Return a wrapper for a new pipe handle. 271 if self.closed(): 272 return None 273 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 274 if first: 275 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 276 h = _winapi.CreateNamedPipe( 277 self._address, flags, 278 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 279 _winapi.PIPE_WAIT, 280 _winapi.PIPE_UNLIMITED_INSTANCES, 281 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 282 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 283 pipe = windows_utils.PipeHandle(h) 284 self._free_instances.add(pipe) 285 return pipe 286 287 def closed(self): 288 return (self._address is None) 289 290 def close(self): 291 if self._accept_pipe_future is not None: 292 self._accept_pipe_future.cancel() 293 self._accept_pipe_future = None 294 # Close all instances which have not been connected to by a client. 295 if self._address is not None: 296 for pipe in self._free_instances: 297 pipe.close() 298 self._pipe = None 299 self._address = None 300 self._free_instances.clear() 301 302 __del__ = close 303 304 305class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 306 """Windows version of selector event loop.""" 307 308 309class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 310 """Windows version of proactor event loop using IOCP.""" 311 312 def __init__(self, proactor=None): 313 if proactor is None: 314 proactor = IocpProactor() 315 super().__init__(proactor) 316 317 def run_forever(self): 318 try: 319 assert self._self_reading_future is None 320 self.call_soon(self._loop_self_reading) 321 super().run_forever() 322 finally: 323 if self._self_reading_future is not None: 324 ov = self._self_reading_future._ov 325 self._self_reading_future.cancel() 326 # self_reading_future was just cancelled so if it hasn't been 327 # finished yet, it never will be (it's possible that it has 328 # already finished and its callback is waiting in the queue, 329 # where it could still happen if the event loop is restarted). 330 # Unregister it otherwise IocpProactor.close will wait for it 331 # forever 332 if ov is not None: 333 self._proactor._unregister(ov) 334 self._self_reading_future = None 335 336 async def create_pipe_connection(self, protocol_factory, address): 337 f = self._proactor.connect_pipe(address) 338 pipe = await f 339 protocol = protocol_factory() 340 trans = self._make_duplex_pipe_transport(pipe, protocol, 341 extra={'addr': address}) 342 return trans, protocol 343 344 async def start_serving_pipe(self, protocol_factory, address): 345 server = PipeServer(address) 346 347 def loop_accept_pipe(f=None): 348 pipe = None 349 try: 350 if f: 351 pipe = f.result() 352 server._free_instances.discard(pipe) 353 354 if server.closed(): 355 # A client connected before the server was closed: 356 # drop the client (close the pipe) and exit 357 pipe.close() 358 return 359 360 protocol = protocol_factory() 361 self._make_duplex_pipe_transport( 362 pipe, protocol, extra={'addr': address}) 363 364 pipe = server._get_unconnected_pipe() 365 if pipe is None: 366 return 367 368 f = self._proactor.accept_pipe(pipe) 369 except OSError as exc: 370 if pipe and pipe.fileno() != -1: 371 self.call_exception_handler({ 372 'message': 'Pipe accept failed', 373 'exception': exc, 374 'pipe': pipe, 375 }) 376 pipe.close() 377 elif self._debug: 378 logger.warning("Accept pipe failed on pipe %r", 379 pipe, exc_info=True) 380 except exceptions.CancelledError: 381 if pipe: 382 pipe.close() 383 else: 384 server._accept_pipe_future = f 385 f.add_done_callback(loop_accept_pipe) 386 387 self.call_soon(loop_accept_pipe) 388 return [server] 389 390 async def _make_subprocess_transport(self, protocol, args, shell, 391 stdin, stdout, stderr, bufsize, 392 extra=None, **kwargs): 393 waiter = self.create_future() 394 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 395 stdin, stdout, stderr, bufsize, 396 waiter=waiter, extra=extra, 397 **kwargs) 398 try: 399 await waiter 400 except (SystemExit, KeyboardInterrupt): 401 raise 402 except BaseException: 403 transp.close() 404 await transp._wait() 405 raise 406 407 return transp 408 409 410class IocpProactor: 411 """Proactor implementation using IOCP.""" 412 413 def __init__(self, concurrency=0xffffffff): 414 self._loop = None 415 self._results = [] 416 self._iocp = _overlapped.CreateIoCompletionPort( 417 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 418 self._cache = {} 419 self._registered = weakref.WeakSet() 420 self._unregistered = [] 421 self._stopped_serving = weakref.WeakSet() 422 423 def _check_closed(self): 424 if self._iocp is None: 425 raise RuntimeError('IocpProactor is closed') 426 427 def __repr__(self): 428 info = ['overlapped#=%s' % len(self._cache), 429 'result#=%s' % len(self._results)] 430 if self._iocp is None: 431 info.append('closed') 432 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 433 434 def set_loop(self, loop): 435 self._loop = loop 436 437 def select(self, timeout=None): 438 if not self._results: 439 self._poll(timeout) 440 tmp = self._results 441 self._results = [] 442 return tmp 443 444 def _result(self, value): 445 fut = self._loop.create_future() 446 fut.set_result(value) 447 return fut 448 449 def recv(self, conn, nbytes, flags=0): 450 self._register_with_iocp(conn) 451 ov = _overlapped.Overlapped(NULL) 452 try: 453 if isinstance(conn, socket.socket): 454 ov.WSARecv(conn.fileno(), nbytes, flags) 455 else: 456 ov.ReadFile(conn.fileno(), nbytes) 457 except BrokenPipeError: 458 return self._result(b'') 459 460 def finish_recv(trans, key, ov): 461 try: 462 return ov.getresult() 463 except OSError as exc: 464 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 465 _overlapped.ERROR_OPERATION_ABORTED): 466 raise ConnectionResetError(*exc.args) 467 else: 468 raise 469 470 return self._register(ov, conn, finish_recv) 471 472 def recv_into(self, conn, buf, flags=0): 473 self._register_with_iocp(conn) 474 ov = _overlapped.Overlapped(NULL) 475 try: 476 if isinstance(conn, socket.socket): 477 ov.WSARecvInto(conn.fileno(), buf, flags) 478 else: 479 ov.ReadFileInto(conn.fileno(), buf) 480 except BrokenPipeError: 481 return self._result(0) 482 483 def finish_recv(trans, key, ov): 484 try: 485 return ov.getresult() 486 except OSError as exc: 487 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 488 _overlapped.ERROR_OPERATION_ABORTED): 489 raise ConnectionResetError(*exc.args) 490 else: 491 raise 492 493 return self._register(ov, conn, finish_recv) 494 495 def recvfrom(self, conn, nbytes, flags=0): 496 self._register_with_iocp(conn) 497 ov = _overlapped.Overlapped(NULL) 498 try: 499 ov.WSARecvFrom(conn.fileno(), nbytes, flags) 500 except BrokenPipeError: 501 return self._result((b'', None)) 502 503 def finish_recv(trans, key, ov): 504 try: 505 return ov.getresult() 506 except OSError as exc: 507 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 508 _overlapped.ERROR_OPERATION_ABORTED): 509 raise ConnectionResetError(*exc.args) 510 else: 511 raise 512 513 return self._register(ov, conn, finish_recv) 514 515 def sendto(self, conn, buf, flags=0, addr=None): 516 self._register_with_iocp(conn) 517 ov = _overlapped.Overlapped(NULL) 518 519 ov.WSASendTo(conn.fileno(), buf, flags, addr) 520 521 def finish_send(trans, key, ov): 522 try: 523 return ov.getresult() 524 except OSError as exc: 525 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 526 _overlapped.ERROR_OPERATION_ABORTED): 527 raise ConnectionResetError(*exc.args) 528 else: 529 raise 530 531 return self._register(ov, conn, finish_send) 532 533 def send(self, conn, buf, flags=0): 534 self._register_with_iocp(conn) 535 ov = _overlapped.Overlapped(NULL) 536 if isinstance(conn, socket.socket): 537 ov.WSASend(conn.fileno(), buf, flags) 538 else: 539 ov.WriteFile(conn.fileno(), buf) 540 541 def finish_send(trans, key, ov): 542 try: 543 return ov.getresult() 544 except OSError as exc: 545 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 546 _overlapped.ERROR_OPERATION_ABORTED): 547 raise ConnectionResetError(*exc.args) 548 else: 549 raise 550 551 return self._register(ov, conn, finish_send) 552 553 def accept(self, listener): 554 self._register_with_iocp(listener) 555 conn = self._get_accept_socket(listener.family) 556 ov = _overlapped.Overlapped(NULL) 557 ov.AcceptEx(listener.fileno(), conn.fileno()) 558 559 def finish_accept(trans, key, ov): 560 ov.getresult() 561 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 562 buf = struct.pack('@P', listener.fileno()) 563 conn.setsockopt(socket.SOL_SOCKET, 564 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 565 conn.settimeout(listener.gettimeout()) 566 return conn, conn.getpeername() 567 568 async def accept_coro(future, conn): 569 # Coroutine closing the accept socket if the future is cancelled 570 try: 571 await future 572 except exceptions.CancelledError: 573 conn.close() 574 raise 575 576 future = self._register(ov, listener, finish_accept) 577 coro = accept_coro(future, conn) 578 tasks.ensure_future(coro, loop=self._loop) 579 return future 580 581 def connect(self, conn, address): 582 if conn.type == socket.SOCK_DGRAM: 583 # WSAConnect will complete immediately for UDP sockets so we don't 584 # need to register any IOCP operation 585 _overlapped.WSAConnect(conn.fileno(), address) 586 fut = self._loop.create_future() 587 fut.set_result(None) 588 return fut 589 590 self._register_with_iocp(conn) 591 # The socket needs to be locally bound before we call ConnectEx(). 592 try: 593 _overlapped.BindLocal(conn.fileno(), conn.family) 594 except OSError as e: 595 if e.winerror != errno.WSAEINVAL: 596 raise 597 # Probably already locally bound; check using getsockname(). 598 if conn.getsockname()[1] == 0: 599 raise 600 ov = _overlapped.Overlapped(NULL) 601 ov.ConnectEx(conn.fileno(), address) 602 603 def finish_connect(trans, key, ov): 604 ov.getresult() 605 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 606 conn.setsockopt(socket.SOL_SOCKET, 607 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 608 return conn 609 610 return self._register(ov, conn, finish_connect) 611 612 def sendfile(self, sock, file, offset, count): 613 self._register_with_iocp(sock) 614 ov = _overlapped.Overlapped(NULL) 615 offset_low = offset & 0xffff_ffff 616 offset_high = (offset >> 32) & 0xffff_ffff 617 ov.TransmitFile(sock.fileno(), 618 msvcrt.get_osfhandle(file.fileno()), 619 offset_low, offset_high, 620 count, 0, 0) 621 622 def finish_sendfile(trans, key, ov): 623 try: 624 return ov.getresult() 625 except OSError as exc: 626 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 627 _overlapped.ERROR_OPERATION_ABORTED): 628 raise ConnectionResetError(*exc.args) 629 else: 630 raise 631 return self._register(ov, sock, finish_sendfile) 632 633 def accept_pipe(self, pipe): 634 self._register_with_iocp(pipe) 635 ov = _overlapped.Overlapped(NULL) 636 connected = ov.ConnectNamedPipe(pipe.fileno()) 637 638 if connected: 639 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 640 # that the pipe is connected. There is no need to wait for the 641 # completion of the connection. 642 return self._result(pipe) 643 644 def finish_accept_pipe(trans, key, ov): 645 ov.getresult() 646 return pipe 647 648 return self._register(ov, pipe, finish_accept_pipe) 649 650 async def connect_pipe(self, address): 651 delay = CONNECT_PIPE_INIT_DELAY 652 while True: 653 # Unfortunately there is no way to do an overlapped connect to 654 # a pipe. Call CreateFile() in a loop until it doesn't fail with 655 # ERROR_PIPE_BUSY. 656 try: 657 handle = _overlapped.ConnectPipe(address) 658 break 659 except OSError as exc: 660 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 661 raise 662 663 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 664 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 665 await tasks.sleep(delay) 666 667 return windows_utils.PipeHandle(handle) 668 669 def wait_for_handle(self, handle, timeout=None): 670 """Wait for a handle. 671 672 Return a Future object. The result of the future is True if the wait 673 completed, or False if the wait did not complete (on timeout). 674 """ 675 return self._wait_for_handle(handle, timeout, False) 676 677 def _wait_cancel(self, event, done_callback): 678 fut = self._wait_for_handle(event, None, True) 679 # add_done_callback() cannot be used because the wait may only complete 680 # in IocpProactor.close(), while the event loop is not running. 681 fut._done_callback = done_callback 682 return fut 683 684 def _wait_for_handle(self, handle, timeout, _is_cancel): 685 self._check_closed() 686 687 if timeout is None: 688 ms = _winapi.INFINITE 689 else: 690 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 691 # round away from zero to wait *at least* timeout seconds. 692 ms = math.ceil(timeout * 1e3) 693 694 # We only create ov so we can use ov.address as a key for the cache. 695 ov = _overlapped.Overlapped(NULL) 696 wait_handle = _overlapped.RegisterWaitWithQueue( 697 handle, self._iocp, ov.address, ms) 698 if _is_cancel: 699 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 700 else: 701 f = _WaitHandleFuture(ov, handle, wait_handle, self, 702 loop=self._loop) 703 if f._source_traceback: 704 del f._source_traceback[-1] 705 706 def finish_wait_for_handle(trans, key, ov): 707 # Note that this second wait means that we should only use 708 # this with handles types where a successful wait has no 709 # effect. So events or processes are all right, but locks 710 # or semaphores are not. Also note if the handle is 711 # signalled and then quickly reset, then we may return 712 # False even though we have not timed out. 713 return f._poll() 714 715 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 716 return f 717 718 def _register_with_iocp(self, obj): 719 # To get notifications of finished ops on this objects sent to the 720 # completion port, were must register the handle. 721 if obj not in self._registered: 722 self._registered.add(obj) 723 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 724 # XXX We could also use SetFileCompletionNotificationModes() 725 # to avoid sending notifications to completion port of ops 726 # that succeed immediately. 727 728 def _register(self, ov, obj, callback): 729 self._check_closed() 730 731 # Return a future which will be set with the result of the 732 # operation when it completes. The future's value is actually 733 # the value returned by callback(). 734 f = _OverlappedFuture(ov, loop=self._loop) 735 if f._source_traceback: 736 del f._source_traceback[-1] 737 if not ov.pending: 738 # The operation has completed, so no need to postpone the 739 # work. We cannot take this short cut if we need the 740 # NumberOfBytes, CompletionKey values returned by 741 # PostQueuedCompletionStatus(). 742 try: 743 value = callback(None, None, ov) 744 except OSError as e: 745 f.set_exception(e) 746 else: 747 f.set_result(value) 748 # Even if GetOverlappedResult() was called, we have to wait for the 749 # notification of the completion in GetQueuedCompletionStatus(). 750 # Register the overlapped operation to keep a reference to the 751 # OVERLAPPED object, otherwise the memory is freed and Windows may 752 # read uninitialized memory. 753 754 # Register the overlapped operation for later. Note that 755 # we only store obj to prevent it from being garbage 756 # collected too early. 757 self._cache[ov.address] = (f, ov, obj, callback) 758 return f 759 760 def _unregister(self, ov): 761 """Unregister an overlapped object. 762 763 Call this method when its future has been cancelled. The event can 764 already be signalled (pending in the proactor event queue). It is also 765 safe if the event is never signalled (because it was cancelled). 766 """ 767 self._check_closed() 768 self._unregistered.append(ov) 769 770 def _get_accept_socket(self, family): 771 s = socket.socket(family) 772 s.settimeout(0) 773 return s 774 775 def _poll(self, timeout=None): 776 if timeout is None: 777 ms = INFINITE 778 elif timeout < 0: 779 raise ValueError("negative timeout") 780 else: 781 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 782 # round away from zero to wait *at least* timeout seconds. 783 ms = math.ceil(timeout * 1e3) 784 if ms >= INFINITE: 785 raise ValueError("timeout too big") 786 787 while True: 788 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 789 if status is None: 790 break 791 ms = 0 792 793 err, transferred, key, address = status 794 try: 795 f, ov, obj, callback = self._cache.pop(address) 796 except KeyError: 797 if self._loop.get_debug(): 798 self._loop.call_exception_handler({ 799 'message': ('GetQueuedCompletionStatus() returned an ' 800 'unexpected event'), 801 'status': ('err=%s transferred=%s key=%#x address=%#x' 802 % (err, transferred, key, address)), 803 }) 804 805 # key is either zero, or it is used to return a pipe 806 # handle which should be closed to avoid a leak. 807 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 808 _winapi.CloseHandle(key) 809 continue 810 811 if obj in self._stopped_serving: 812 f.cancel() 813 # Don't call the callback if _register() already read the result or 814 # if the overlapped has been cancelled 815 elif not f.done(): 816 try: 817 value = callback(transferred, key, ov) 818 except OSError as e: 819 f.set_exception(e) 820 self._results.append(f) 821 else: 822 f.set_result(value) 823 self._results.append(f) 824 825 # Remove unregistered futures 826 for ov in self._unregistered: 827 self._cache.pop(ov.address, None) 828 self._unregistered.clear() 829 830 def _stop_serving(self, obj): 831 # obj is a socket or pipe handle. It will be closed in 832 # BaseProactorEventLoop._stop_serving() which will make any 833 # pending operations fail quickly. 834 self._stopped_serving.add(obj) 835 836 def close(self): 837 if self._iocp is None: 838 # already closed 839 return 840 841 # Cancel remaining registered operations. 842 for address, (fut, ov, obj, callback) in list(self._cache.items()): 843 if fut.cancelled(): 844 # Nothing to do with cancelled futures 845 pass 846 elif isinstance(fut, _WaitCancelFuture): 847 # _WaitCancelFuture must not be cancelled 848 pass 849 else: 850 try: 851 fut.cancel() 852 except OSError as exc: 853 if self._loop is not None: 854 context = { 855 'message': 'Cancelling a future failed', 856 'exception': exc, 857 'future': fut, 858 } 859 if fut._source_traceback: 860 context['source_traceback'] = fut._source_traceback 861 self._loop.call_exception_handler(context) 862 863 # Wait until all cancelled overlapped complete: don't exit with running 864 # overlapped to prevent a crash. Display progress every second if the 865 # loop is still running. 866 msg_update = 1.0 867 start_time = time.monotonic() 868 next_msg = start_time + msg_update 869 while self._cache: 870 if next_msg <= time.monotonic(): 871 logger.debug('%r is running after closing for %.1f seconds', 872 self, time.monotonic() - start_time) 873 next_msg = time.monotonic() + msg_update 874 875 # handle a few events, or timeout 876 self._poll(msg_update) 877 878 self._results = [] 879 880 _winapi.CloseHandle(self._iocp) 881 self._iocp = None 882 883 def __del__(self): 884 self.close() 885 886 887class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 888 889 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 890 self._proc = windows_utils.Popen( 891 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 892 bufsize=bufsize, **kwargs) 893 894 def callback(f): 895 returncode = self._proc.poll() 896 self._process_exited(returncode) 897 898 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 899 f.add_done_callback(callback) 900 901 902SelectorEventLoop = _WindowsSelectorEventLoop 903 904 905class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 906 _loop_factory = SelectorEventLoop 907 908 909class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 910 _loop_factory = ProactorEventLoop 911 912 913DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy 914