1"""Selector and proactor event loops for Windows.""" 2 3import _overlapped 4import _winapi 5import errno 6import math 7import msvcrt 8import socket 9import struct 10import time 11import weakref 12 13from . import events 14from . import base_subprocess 15from . import futures 16from . import exceptions 17from . import proactor_events 18from . import selector_events 19from . import tasks 20from . import windows_utils 21from .log import logger 22 23 24__all__ = ( 25 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor', 26 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy', 27 'WindowsProactorEventLoopPolicy', 28) 29 30 31NULL = 0 32INFINITE = 0xffffffff 33ERROR_CONNECTION_REFUSED = 1225 34ERROR_CONNECTION_ABORTED = 1236 35 36# Initial delay in seconds for connect_pipe() before retrying to connect 37CONNECT_PIPE_INIT_DELAY = 0.001 38 39# Maximum delay in seconds for connect_pipe() before retrying to connect 40CONNECT_PIPE_MAX_DELAY = 0.100 41 42 43class _OverlappedFuture(futures.Future): 44 """Subclass of Future which represents an overlapped operation. 45 46 Cancelling it will immediately cancel the overlapped operation. 47 """ 48 49 def __init__(self, ov, *, loop=None): 50 super().__init__(loop=loop) 51 if self._source_traceback: 52 del self._source_traceback[-1] 53 self._ov = ov 54 55 def _repr_info(self): 56 info = super()._repr_info() 57 if self._ov is not None: 58 state = 'pending' if self._ov.pending else 'completed' 59 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>') 60 return info 61 62 def _cancel_overlapped(self): 63 if self._ov is None: 64 return 65 try: 66 self._ov.cancel() 67 except OSError as exc: 68 context = { 69 'message': 'Cancelling an overlapped future failed', 70 'exception': exc, 71 'future': self, 72 } 73 if self._source_traceback: 74 context['source_traceback'] = self._source_traceback 75 self._loop.call_exception_handler(context) 76 self._ov = None 77 78 def cancel(self): 79 self._cancel_overlapped() 80 return super().cancel() 81 82 def set_exception(self, exception): 83 super().set_exception(exception) 84 self._cancel_overlapped() 85 86 def set_result(self, result): 87 super().set_result(result) 88 self._ov = None 89 90 91class _BaseWaitHandleFuture(futures.Future): 92 """Subclass of Future which represents a wait handle.""" 93 94 def __init__(self, ov, handle, wait_handle, *, loop=None): 95 super().__init__(loop=loop) 96 if self._source_traceback: 97 del self._source_traceback[-1] 98 # Keep a reference to the Overlapped object to keep it alive until the 99 # wait is unregistered 100 self._ov = ov 101 self._handle = handle 102 self._wait_handle = wait_handle 103 104 # Should we call UnregisterWaitEx() if the wait completes 105 # or is cancelled? 106 self._registered = True 107 108 def _poll(self): 109 # non-blocking wait: use a timeout of 0 millisecond 110 return (_winapi.WaitForSingleObject(self._handle, 0) == 111 _winapi.WAIT_OBJECT_0) 112 113 def _repr_info(self): 114 info = super()._repr_info() 115 info.append(f'handle={self._handle:#x}') 116 if self._handle is not None: 117 state = 'signaled' if self._poll() else 'waiting' 118 info.append(state) 119 if self._wait_handle is not None: 120 info.append(f'wait_handle={self._wait_handle:#x}') 121 return info 122 123 def _unregister_wait_cb(self, fut): 124 # The wait was unregistered: it's not safe to destroy the Overlapped 125 # object 126 self._ov = None 127 128 def _unregister_wait(self): 129 if not self._registered: 130 return 131 self._registered = False 132 133 wait_handle = self._wait_handle 134 self._wait_handle = None 135 try: 136 _overlapped.UnregisterWait(wait_handle) 137 except OSError as exc: 138 if exc.winerror != _overlapped.ERROR_IO_PENDING: 139 context = { 140 'message': 'Failed to unregister the wait handle', 141 'exception': exc, 142 'future': self, 143 } 144 if self._source_traceback: 145 context['source_traceback'] = self._source_traceback 146 self._loop.call_exception_handler(context) 147 return 148 # ERROR_IO_PENDING means that the unregister is pending 149 150 self._unregister_wait_cb(None) 151 152 def cancel(self): 153 self._unregister_wait() 154 return super().cancel() 155 156 def set_exception(self, exception): 157 self._unregister_wait() 158 super().set_exception(exception) 159 160 def set_result(self, result): 161 self._unregister_wait() 162 super().set_result(result) 163 164 165class _WaitCancelFuture(_BaseWaitHandleFuture): 166 """Subclass of Future which represents a wait for the cancellation of a 167 _WaitHandleFuture using an event. 168 """ 169 170 def __init__(self, ov, event, wait_handle, *, loop=None): 171 super().__init__(ov, event, wait_handle, loop=loop) 172 173 self._done_callback = None 174 175 def cancel(self): 176 raise RuntimeError("_WaitCancelFuture must not be cancelled") 177 178 def set_result(self, result): 179 super().set_result(result) 180 if self._done_callback is not None: 181 self._done_callback(self) 182 183 def set_exception(self, exception): 184 super().set_exception(exception) 185 if self._done_callback is not None: 186 self._done_callback(self) 187 188 189class _WaitHandleFuture(_BaseWaitHandleFuture): 190 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None): 191 super().__init__(ov, handle, wait_handle, loop=loop) 192 self._proactor = proactor 193 self._unregister_proactor = True 194 self._event = _overlapped.CreateEvent(None, True, False, None) 195 self._event_fut = None 196 197 def _unregister_wait_cb(self, fut): 198 if self._event is not None: 199 _winapi.CloseHandle(self._event) 200 self._event = None 201 self._event_fut = None 202 203 # If the wait was cancelled, the wait may never be signalled, so 204 # it's required to unregister it. Otherwise, IocpProactor.close() will 205 # wait forever for an event which will never come. 206 # 207 # If the IocpProactor already received the event, it's safe to call 208 # _unregister() because we kept a reference to the Overlapped object 209 # which is used as a unique key. 210 self._proactor._unregister(self._ov) 211 self._proactor = None 212 213 super()._unregister_wait_cb(fut) 214 215 def _unregister_wait(self): 216 if not self._registered: 217 return 218 self._registered = False 219 220 wait_handle = self._wait_handle 221 self._wait_handle = None 222 try: 223 _overlapped.UnregisterWaitEx(wait_handle, self._event) 224 except OSError as exc: 225 if exc.winerror != _overlapped.ERROR_IO_PENDING: 226 context = { 227 'message': 'Failed to unregister the wait handle', 228 'exception': exc, 229 'future': self, 230 } 231 if self._source_traceback: 232 context['source_traceback'] = self._source_traceback 233 self._loop.call_exception_handler(context) 234 return 235 # ERROR_IO_PENDING is not an error, the wait was unregistered 236 237 self._event_fut = self._proactor._wait_cancel(self._event, 238 self._unregister_wait_cb) 239 240 241class PipeServer(object): 242 """Class representing a pipe server. 243 244 This is much like a bound, listening socket. 245 """ 246 def __init__(self, address): 247 self._address = address 248 self._free_instances = weakref.WeakSet() 249 # initialize the pipe attribute before calling _server_pipe_handle() 250 # because this function can raise an exception and the destructor calls 251 # the close() method 252 self._pipe = None 253 self._accept_pipe_future = None 254 self._pipe = self._server_pipe_handle(True) 255 256 def _get_unconnected_pipe(self): 257 # Create new instance and return previous one. This ensures 258 # that (until the server is closed) there is always at least 259 # one pipe handle for address. Therefore if a client attempt 260 # to connect it will not fail with FileNotFoundError. 261 tmp, self._pipe = self._pipe, self._server_pipe_handle(False) 262 return tmp 263 264 def _server_pipe_handle(self, first): 265 # Return a wrapper for a new pipe handle. 266 if self.closed(): 267 return None 268 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED 269 if first: 270 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE 271 h = _winapi.CreateNamedPipe( 272 self._address, flags, 273 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | 274 _winapi.PIPE_WAIT, 275 _winapi.PIPE_UNLIMITED_INSTANCES, 276 windows_utils.BUFSIZE, windows_utils.BUFSIZE, 277 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) 278 pipe = windows_utils.PipeHandle(h) 279 self._free_instances.add(pipe) 280 return pipe 281 282 def closed(self): 283 return (self._address is None) 284 285 def close(self): 286 if self._accept_pipe_future is not None: 287 self._accept_pipe_future.cancel() 288 self._accept_pipe_future = None 289 # Close all instances which have not been connected to by a client. 290 if self._address is not None: 291 for pipe in self._free_instances: 292 pipe.close() 293 self._pipe = None 294 self._address = None 295 self._free_instances.clear() 296 297 __del__ = close 298 299 300class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): 301 """Windows version of selector event loop.""" 302 303 304class ProactorEventLoop(proactor_events.BaseProactorEventLoop): 305 """Windows version of proactor event loop using IOCP.""" 306 307 def __init__(self, proactor=None): 308 if proactor is None: 309 proactor = IocpProactor() 310 super().__init__(proactor) 311 312 def run_forever(self): 313 try: 314 assert self._self_reading_future is None 315 self.call_soon(self._loop_self_reading) 316 super().run_forever() 317 finally: 318 if self._self_reading_future is not None: 319 ov = self._self_reading_future._ov 320 self._self_reading_future.cancel() 321 # self_reading_future was just cancelled so it will never be signalled 322 # Unregister it otherwise IocpProactor.close will wait for it forever 323 if ov is not None: 324 self._proactor._unregister(ov) 325 self._self_reading_future = None 326 327 async def create_pipe_connection(self, protocol_factory, address): 328 f = self._proactor.connect_pipe(address) 329 pipe = await f 330 protocol = protocol_factory() 331 trans = self._make_duplex_pipe_transport(pipe, protocol, 332 extra={'addr': address}) 333 return trans, protocol 334 335 async def start_serving_pipe(self, protocol_factory, address): 336 server = PipeServer(address) 337 338 def loop_accept_pipe(f=None): 339 pipe = None 340 try: 341 if f: 342 pipe = f.result() 343 server._free_instances.discard(pipe) 344 345 if server.closed(): 346 # A client connected before the server was closed: 347 # drop the client (close the pipe) and exit 348 pipe.close() 349 return 350 351 protocol = protocol_factory() 352 self._make_duplex_pipe_transport( 353 pipe, protocol, extra={'addr': address}) 354 355 pipe = server._get_unconnected_pipe() 356 if pipe is None: 357 return 358 359 f = self._proactor.accept_pipe(pipe) 360 except OSError as exc: 361 if pipe and pipe.fileno() != -1: 362 self.call_exception_handler({ 363 'message': 'Pipe accept failed', 364 'exception': exc, 365 'pipe': pipe, 366 }) 367 pipe.close() 368 elif self._debug: 369 logger.warning("Accept pipe failed on pipe %r", 370 pipe, exc_info=True) 371 except exceptions.CancelledError: 372 if pipe: 373 pipe.close() 374 else: 375 server._accept_pipe_future = f 376 f.add_done_callback(loop_accept_pipe) 377 378 self.call_soon(loop_accept_pipe) 379 return [server] 380 381 async def _make_subprocess_transport(self, protocol, args, shell, 382 stdin, stdout, stderr, bufsize, 383 extra=None, **kwargs): 384 waiter = self.create_future() 385 transp = _WindowsSubprocessTransport(self, protocol, args, shell, 386 stdin, stdout, stderr, bufsize, 387 waiter=waiter, extra=extra, 388 **kwargs) 389 try: 390 await waiter 391 except (SystemExit, KeyboardInterrupt): 392 raise 393 except BaseException: 394 transp.close() 395 await transp._wait() 396 raise 397 398 return transp 399 400 401class IocpProactor: 402 """Proactor implementation using IOCP.""" 403 404 def __init__(self, concurrency=0xffffffff): 405 self._loop = None 406 self._results = [] 407 self._iocp = _overlapped.CreateIoCompletionPort( 408 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency) 409 self._cache = {} 410 self._registered = weakref.WeakSet() 411 self._unregistered = [] 412 self._stopped_serving = weakref.WeakSet() 413 414 def _check_closed(self): 415 if self._iocp is None: 416 raise RuntimeError('IocpProactor is closed') 417 418 def __repr__(self): 419 info = ['overlapped#=%s' % len(self._cache), 420 'result#=%s' % len(self._results)] 421 if self._iocp is None: 422 info.append('closed') 423 return '<%s %s>' % (self.__class__.__name__, " ".join(info)) 424 425 def set_loop(self, loop): 426 self._loop = loop 427 428 def select(self, timeout=None): 429 if not self._results: 430 self._poll(timeout) 431 tmp = self._results 432 self._results = [] 433 return tmp 434 435 def _result(self, value): 436 fut = self._loop.create_future() 437 fut.set_result(value) 438 return fut 439 440 def recv(self, conn, nbytes, flags=0): 441 self._register_with_iocp(conn) 442 ov = _overlapped.Overlapped(NULL) 443 try: 444 if isinstance(conn, socket.socket): 445 ov.WSARecv(conn.fileno(), nbytes, flags) 446 else: 447 ov.ReadFile(conn.fileno(), nbytes) 448 except BrokenPipeError: 449 return self._result(b'') 450 451 def finish_recv(trans, key, ov): 452 try: 453 return ov.getresult() 454 except OSError as exc: 455 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 456 _overlapped.ERROR_OPERATION_ABORTED): 457 raise ConnectionResetError(*exc.args) 458 else: 459 raise 460 461 return self._register(ov, conn, finish_recv) 462 463 def recv_into(self, conn, buf, flags=0): 464 self._register_with_iocp(conn) 465 ov = _overlapped.Overlapped(NULL) 466 try: 467 if isinstance(conn, socket.socket): 468 ov.WSARecvInto(conn.fileno(), buf, flags) 469 else: 470 ov.ReadFileInto(conn.fileno(), buf) 471 except BrokenPipeError: 472 return self._result(b'') 473 474 def finish_recv(trans, key, ov): 475 try: 476 return ov.getresult() 477 except OSError as exc: 478 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 479 _overlapped.ERROR_OPERATION_ABORTED): 480 raise ConnectionResetError(*exc.args) 481 else: 482 raise 483 484 return self._register(ov, conn, finish_recv) 485 486 def recvfrom(self, conn, nbytes, flags=0): 487 self._register_with_iocp(conn) 488 ov = _overlapped.Overlapped(NULL) 489 try: 490 ov.WSARecvFrom(conn.fileno(), nbytes, flags) 491 except BrokenPipeError: 492 return self._result((b'', None)) 493 494 def finish_recv(trans, key, ov): 495 try: 496 return ov.getresult() 497 except OSError as exc: 498 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 499 _overlapped.ERROR_OPERATION_ABORTED): 500 raise ConnectionResetError(*exc.args) 501 else: 502 raise 503 504 return self._register(ov, conn, finish_recv) 505 506 def sendto(self, conn, buf, flags=0, addr=None): 507 self._register_with_iocp(conn) 508 ov = _overlapped.Overlapped(NULL) 509 510 ov.WSASendTo(conn.fileno(), buf, flags, addr) 511 512 def finish_send(trans, key, ov): 513 try: 514 return ov.getresult() 515 except OSError as exc: 516 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 517 _overlapped.ERROR_OPERATION_ABORTED): 518 raise ConnectionResetError(*exc.args) 519 else: 520 raise 521 522 return self._register(ov, conn, finish_send) 523 524 def send(self, conn, buf, flags=0): 525 self._register_with_iocp(conn) 526 ov = _overlapped.Overlapped(NULL) 527 if isinstance(conn, socket.socket): 528 ov.WSASend(conn.fileno(), buf, flags) 529 else: 530 ov.WriteFile(conn.fileno(), buf) 531 532 def finish_send(trans, key, ov): 533 try: 534 return ov.getresult() 535 except OSError as exc: 536 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 537 _overlapped.ERROR_OPERATION_ABORTED): 538 raise ConnectionResetError(*exc.args) 539 else: 540 raise 541 542 return self._register(ov, conn, finish_send) 543 544 def accept(self, listener): 545 self._register_with_iocp(listener) 546 conn = self._get_accept_socket(listener.family) 547 ov = _overlapped.Overlapped(NULL) 548 ov.AcceptEx(listener.fileno(), conn.fileno()) 549 550 def finish_accept(trans, key, ov): 551 ov.getresult() 552 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work. 553 buf = struct.pack('@P', listener.fileno()) 554 conn.setsockopt(socket.SOL_SOCKET, 555 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf) 556 conn.settimeout(listener.gettimeout()) 557 return conn, conn.getpeername() 558 559 async def accept_coro(future, conn): 560 # Coroutine closing the accept socket if the future is cancelled 561 try: 562 await future 563 except exceptions.CancelledError: 564 conn.close() 565 raise 566 567 future = self._register(ov, listener, finish_accept) 568 coro = accept_coro(future, conn) 569 tasks.ensure_future(coro, loop=self._loop) 570 return future 571 572 def connect(self, conn, address): 573 if conn.type == socket.SOCK_DGRAM: 574 # WSAConnect will complete immediately for UDP sockets so we don't 575 # need to register any IOCP operation 576 _overlapped.WSAConnect(conn.fileno(), address) 577 fut = self._loop.create_future() 578 fut.set_result(None) 579 return fut 580 581 self._register_with_iocp(conn) 582 # The socket needs to be locally bound before we call ConnectEx(). 583 try: 584 _overlapped.BindLocal(conn.fileno(), conn.family) 585 except OSError as e: 586 if e.winerror != errno.WSAEINVAL: 587 raise 588 # Probably already locally bound; check using getsockname(). 589 if conn.getsockname()[1] == 0: 590 raise 591 ov = _overlapped.Overlapped(NULL) 592 ov.ConnectEx(conn.fileno(), address) 593 594 def finish_connect(trans, key, ov): 595 ov.getresult() 596 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work. 597 conn.setsockopt(socket.SOL_SOCKET, 598 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0) 599 return conn 600 601 return self._register(ov, conn, finish_connect) 602 603 def sendfile(self, sock, file, offset, count): 604 self._register_with_iocp(sock) 605 ov = _overlapped.Overlapped(NULL) 606 offset_low = offset & 0xffff_ffff 607 offset_high = (offset >> 32) & 0xffff_ffff 608 ov.TransmitFile(sock.fileno(), 609 msvcrt.get_osfhandle(file.fileno()), 610 offset_low, offset_high, 611 count, 0, 0) 612 613 def finish_sendfile(trans, key, ov): 614 try: 615 return ov.getresult() 616 except OSError as exc: 617 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, 618 _overlapped.ERROR_OPERATION_ABORTED): 619 raise ConnectionResetError(*exc.args) 620 else: 621 raise 622 return self._register(ov, sock, finish_sendfile) 623 624 def accept_pipe(self, pipe): 625 self._register_with_iocp(pipe) 626 ov = _overlapped.Overlapped(NULL) 627 connected = ov.ConnectNamedPipe(pipe.fileno()) 628 629 if connected: 630 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means 631 # that the pipe is connected. There is no need to wait for the 632 # completion of the connection. 633 return self._result(pipe) 634 635 def finish_accept_pipe(trans, key, ov): 636 ov.getresult() 637 return pipe 638 639 return self._register(ov, pipe, finish_accept_pipe) 640 641 async def connect_pipe(self, address): 642 delay = CONNECT_PIPE_INIT_DELAY 643 while True: 644 # Unfortunately there is no way to do an overlapped connect to 645 # a pipe. Call CreateFile() in a loop until it doesn't fail with 646 # ERROR_PIPE_BUSY. 647 try: 648 handle = _overlapped.ConnectPipe(address) 649 break 650 except OSError as exc: 651 if exc.winerror != _overlapped.ERROR_PIPE_BUSY: 652 raise 653 654 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later 655 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY) 656 await tasks.sleep(delay) 657 658 return windows_utils.PipeHandle(handle) 659 660 def wait_for_handle(self, handle, timeout=None): 661 """Wait for a handle. 662 663 Return a Future object. The result of the future is True if the wait 664 completed, or False if the wait did not complete (on timeout). 665 """ 666 return self._wait_for_handle(handle, timeout, False) 667 668 def _wait_cancel(self, event, done_callback): 669 fut = self._wait_for_handle(event, None, True) 670 # add_done_callback() cannot be used because the wait may only complete 671 # in IocpProactor.close(), while the event loop is not running. 672 fut._done_callback = done_callback 673 return fut 674 675 def _wait_for_handle(self, handle, timeout, _is_cancel): 676 self._check_closed() 677 678 if timeout is None: 679 ms = _winapi.INFINITE 680 else: 681 # RegisterWaitForSingleObject() has a resolution of 1 millisecond, 682 # round away from zero to wait *at least* timeout seconds. 683 ms = math.ceil(timeout * 1e3) 684 685 # We only create ov so we can use ov.address as a key for the cache. 686 ov = _overlapped.Overlapped(NULL) 687 wait_handle = _overlapped.RegisterWaitWithQueue( 688 handle, self._iocp, ov.address, ms) 689 if _is_cancel: 690 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop) 691 else: 692 f = _WaitHandleFuture(ov, handle, wait_handle, self, 693 loop=self._loop) 694 if f._source_traceback: 695 del f._source_traceback[-1] 696 697 def finish_wait_for_handle(trans, key, ov): 698 # Note that this second wait means that we should only use 699 # this with handles types where a successful wait has no 700 # effect. So events or processes are all right, but locks 701 # or semaphores are not. Also note if the handle is 702 # signalled and then quickly reset, then we may return 703 # False even though we have not timed out. 704 return f._poll() 705 706 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle) 707 return f 708 709 def _register_with_iocp(self, obj): 710 # To get notifications of finished ops on this objects sent to the 711 # completion port, were must register the handle. 712 if obj not in self._registered: 713 self._registered.add(obj) 714 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0) 715 # XXX We could also use SetFileCompletionNotificationModes() 716 # to avoid sending notifications to completion port of ops 717 # that succeed immediately. 718 719 def _register(self, ov, obj, callback): 720 self._check_closed() 721 722 # Return a future which will be set with the result of the 723 # operation when it completes. The future's value is actually 724 # the value returned by callback(). 725 f = _OverlappedFuture(ov, loop=self._loop) 726 if f._source_traceback: 727 del f._source_traceback[-1] 728 if not ov.pending: 729 # The operation has completed, so no need to postpone the 730 # work. We cannot take this short cut if we need the 731 # NumberOfBytes, CompletionKey values returned by 732 # PostQueuedCompletionStatus(). 733 try: 734 value = callback(None, None, ov) 735 except OSError as e: 736 f.set_exception(e) 737 else: 738 f.set_result(value) 739 # Even if GetOverlappedResult() was called, we have to wait for the 740 # notification of the completion in GetQueuedCompletionStatus(). 741 # Register the overlapped operation to keep a reference to the 742 # OVERLAPPED object, otherwise the memory is freed and Windows may 743 # read uninitialized memory. 744 745 # Register the overlapped operation for later. Note that 746 # we only store obj to prevent it from being garbage 747 # collected too early. 748 self._cache[ov.address] = (f, ov, obj, callback) 749 return f 750 751 def _unregister(self, ov): 752 """Unregister an overlapped object. 753 754 Call this method when its future has been cancelled. The event can 755 already be signalled (pending in the proactor event queue). It is also 756 safe if the event is never signalled (because it was cancelled). 757 """ 758 self._check_closed() 759 self._unregistered.append(ov) 760 761 def _get_accept_socket(self, family): 762 s = socket.socket(family) 763 s.settimeout(0) 764 return s 765 766 def _poll(self, timeout=None): 767 if timeout is None: 768 ms = INFINITE 769 elif timeout < 0: 770 raise ValueError("negative timeout") 771 else: 772 # GetQueuedCompletionStatus() has a resolution of 1 millisecond, 773 # round away from zero to wait *at least* timeout seconds. 774 ms = math.ceil(timeout * 1e3) 775 if ms >= INFINITE: 776 raise ValueError("timeout too big") 777 778 while True: 779 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms) 780 if status is None: 781 break 782 ms = 0 783 784 err, transferred, key, address = status 785 try: 786 f, ov, obj, callback = self._cache.pop(address) 787 except KeyError: 788 if self._loop.get_debug(): 789 self._loop.call_exception_handler({ 790 'message': ('GetQueuedCompletionStatus() returned an ' 791 'unexpected event'), 792 'status': ('err=%s transferred=%s key=%#x address=%#x' 793 % (err, transferred, key, address)), 794 }) 795 796 # key is either zero, or it is used to return a pipe 797 # handle which should be closed to avoid a leak. 798 if key not in (0, _overlapped.INVALID_HANDLE_VALUE): 799 _winapi.CloseHandle(key) 800 continue 801 802 if obj in self._stopped_serving: 803 f.cancel() 804 # Don't call the callback if _register() already read the result or 805 # if the overlapped has been cancelled 806 elif not f.done(): 807 try: 808 value = callback(transferred, key, ov) 809 except OSError as e: 810 f.set_exception(e) 811 self._results.append(f) 812 else: 813 f.set_result(value) 814 self._results.append(f) 815 816 # Remove unregistered futures 817 for ov in self._unregistered: 818 self._cache.pop(ov.address, None) 819 self._unregistered.clear() 820 821 def _stop_serving(self, obj): 822 # obj is a socket or pipe handle. It will be closed in 823 # BaseProactorEventLoop._stop_serving() which will make any 824 # pending operations fail quickly. 825 self._stopped_serving.add(obj) 826 827 def close(self): 828 if self._iocp is None: 829 # already closed 830 return 831 832 # Cancel remaining registered operations. 833 for address, (fut, ov, obj, callback) in list(self._cache.items()): 834 if fut.cancelled(): 835 # Nothing to do with cancelled futures 836 pass 837 elif isinstance(fut, _WaitCancelFuture): 838 # _WaitCancelFuture must not be cancelled 839 pass 840 else: 841 try: 842 fut.cancel() 843 except OSError as exc: 844 if self._loop is not None: 845 context = { 846 'message': 'Cancelling a future failed', 847 'exception': exc, 848 'future': fut, 849 } 850 if fut._source_traceback: 851 context['source_traceback'] = fut._source_traceback 852 self._loop.call_exception_handler(context) 853 854 # Wait until all cancelled overlapped complete: don't exit with running 855 # overlapped to prevent a crash. Display progress every second if the 856 # loop is still running. 857 msg_update = 1.0 858 start_time = time.monotonic() 859 next_msg = start_time + msg_update 860 while self._cache: 861 if next_msg <= time.monotonic(): 862 logger.debug('%r is running after closing for %.1f seconds', 863 self, time.monotonic() - start_time) 864 next_msg = time.monotonic() + msg_update 865 866 # handle a few events, or timeout 867 self._poll(msg_update) 868 869 self._results = [] 870 871 _winapi.CloseHandle(self._iocp) 872 self._iocp = None 873 874 def __del__(self): 875 self.close() 876 877 878class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport): 879 880 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 881 self._proc = windows_utils.Popen( 882 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 883 bufsize=bufsize, **kwargs) 884 885 def callback(f): 886 returncode = self._proc.poll() 887 self._process_exited(returncode) 888 889 f = self._loop._proactor.wait_for_handle(int(self._proc._handle)) 890 f.add_done_callback(callback) 891 892 893SelectorEventLoop = _WindowsSelectorEventLoop 894 895 896class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 897 _loop_factory = SelectorEventLoop 898 899 900class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 901 _loop_factory = ProactorEventLoop 902 903 904DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy 905