1"""Selector event loop for Unix with signal handling.""" 2 3import errno 4import io 5import itertools 6import os 7import selectors 8import signal 9import socket 10import stat 11import subprocess 12import sys 13import threading 14import warnings 15 16from . import base_events 17from . import base_subprocess 18from . import constants 19from . import coroutines 20from . import events 21from . import exceptions 22from . import futures 23from . import selector_events 24from . import tasks 25from . import transports 26from .log import logger 27 28 29__all__ = ( 30 'SelectorEventLoop', 31 'AbstractChildWatcher', 'SafeChildWatcher', 32 'FastChildWatcher', 'PidfdChildWatcher', 33 'MultiLoopChildWatcher', 'ThreadedChildWatcher', 34 'DefaultEventLoopPolicy', 35) 36 37 38if sys.platform == 'win32': # pragma: no cover 39 raise ImportError('Signals are not really supported on Windows') 40 41 42def _sighandler_noop(signum, frame): 43 """Dummy signal handler.""" 44 pass 45 46 47def waitstatus_to_exitcode(status): 48 try: 49 return os.waitstatus_to_exitcode(status) 50 except ValueError: 51 # The child exited, but we don't understand its status. 52 # This shouldn't happen, but if it does, let's just 53 # return that status; perhaps that helps debug it. 54 return status 55 56 57class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 58 """Unix event loop. 59 60 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 61 """ 62 63 def __init__(self, selector=None): 64 super().__init__(selector) 65 self._signal_handlers = {} 66 67 def close(self): 68 super().close() 69 if not sys.is_finalizing(): 70 for sig in list(self._signal_handlers): 71 self.remove_signal_handler(sig) 72 else: 73 if self._signal_handlers: 74 warnings.warn(f"Closing the loop {self!r} " 75 f"on interpreter shutdown " 76 f"stage, skipping signal handlers removal", 77 ResourceWarning, 78 source=self) 79 self._signal_handlers.clear() 80 81 def _process_self_data(self, data): 82 for signum in data: 83 if not signum: 84 # ignore null bytes written by _write_to_self() 85 continue 86 self._handle_signal(signum) 87 88 def add_signal_handler(self, sig, callback, *args): 89 """Add a handler for a signal. UNIX only. 90 91 Raise ValueError if the signal number is invalid or uncatchable. 92 Raise RuntimeError if there is a problem setting up the handler. 93 """ 94 if (coroutines.iscoroutine(callback) or 95 coroutines.iscoroutinefunction(callback)): 96 raise TypeError("coroutines cannot be used " 97 "with add_signal_handler()") 98 self._check_signal(sig) 99 self._check_closed() 100 try: 101 # set_wakeup_fd() raises ValueError if this is not the 102 # main thread. By calling it early we ensure that an 103 # event loop running in another thread cannot add a signal 104 # handler. 105 signal.set_wakeup_fd(self._csock.fileno()) 106 except (ValueError, OSError) as exc: 107 raise RuntimeError(str(exc)) 108 109 handle = events.Handle(callback, args, self, None) 110 self._signal_handlers[sig] = handle 111 112 try: 113 # Register a dummy signal handler to ask Python to write the signal 114 # number in the wakeup file descriptor. _process_self_data() will 115 # read signal numbers from this file descriptor to handle signals. 116 signal.signal(sig, _sighandler_noop) 117 118 # Set SA_RESTART to limit EINTR occurrences. 119 signal.siginterrupt(sig, False) 120 except OSError as exc: 121 del self._signal_handlers[sig] 122 if not self._signal_handlers: 123 try: 124 signal.set_wakeup_fd(-1) 125 except (ValueError, OSError) as nexc: 126 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 127 128 if exc.errno == errno.EINVAL: 129 raise RuntimeError(f'sig {sig} cannot be caught') 130 else: 131 raise 132 133 def _handle_signal(self, sig): 134 """Internal helper that is the actual signal handler.""" 135 handle = self._signal_handlers.get(sig) 136 if handle is None: 137 return # Assume it's some race condition. 138 if handle._cancelled: 139 self.remove_signal_handler(sig) # Remove it properly. 140 else: 141 self._add_callback_signalsafe(handle) 142 143 def remove_signal_handler(self, sig): 144 """Remove a handler for a signal. UNIX only. 145 146 Return True if a signal handler was removed, False if not. 147 """ 148 self._check_signal(sig) 149 try: 150 del self._signal_handlers[sig] 151 except KeyError: 152 return False 153 154 if sig == signal.SIGINT: 155 handler = signal.default_int_handler 156 else: 157 handler = signal.SIG_DFL 158 159 try: 160 signal.signal(sig, handler) 161 except OSError as exc: 162 if exc.errno == errno.EINVAL: 163 raise RuntimeError(f'sig {sig} cannot be caught') 164 else: 165 raise 166 167 if not self._signal_handlers: 168 try: 169 signal.set_wakeup_fd(-1) 170 except (ValueError, OSError) as exc: 171 logger.info('set_wakeup_fd(-1) failed: %s', exc) 172 173 return True 174 175 def _check_signal(self, sig): 176 """Internal helper to validate a signal. 177 178 Raise ValueError if the signal number is invalid or uncatchable. 179 Raise RuntimeError if there is a problem setting up the handler. 180 """ 181 if not isinstance(sig, int): 182 raise TypeError(f'sig must be an int, not {sig!r}') 183 184 if sig not in signal.valid_signals(): 185 raise ValueError(f'invalid signal number {sig}') 186 187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 188 extra=None): 189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 190 191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 192 extra=None): 193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 194 195 async def _make_subprocess_transport(self, protocol, args, shell, 196 stdin, stdout, stderr, bufsize, 197 extra=None, **kwargs): 198 with events.get_child_watcher() as watcher: 199 if not watcher.is_active(): 200 # Check early. 201 # Raising exception before process creation 202 # prevents subprocess execution if the watcher 203 # is not ready to handle it. 204 raise RuntimeError("asyncio.get_child_watcher() is not activated, " 205 "subprocess support is not installed.") 206 waiter = self.create_future() 207 transp = _UnixSubprocessTransport(self, protocol, args, shell, 208 stdin, stdout, stderr, bufsize, 209 waiter=waiter, extra=extra, 210 **kwargs) 211 212 watcher.add_child_handler(transp.get_pid(), 213 self._child_watcher_callback, transp) 214 try: 215 await waiter 216 except (SystemExit, KeyboardInterrupt): 217 raise 218 except BaseException: 219 transp.close() 220 await transp._wait() 221 raise 222 223 return transp 224 225 def _child_watcher_callback(self, pid, returncode, transp): 226 self.call_soon_threadsafe(transp._process_exited, returncode) 227 228 async def create_unix_connection( 229 self, protocol_factory, path=None, *, 230 ssl=None, sock=None, 231 server_hostname=None, 232 ssl_handshake_timeout=None): 233 assert server_hostname is None or isinstance(server_hostname, str) 234 if ssl: 235 if server_hostname is None: 236 raise ValueError( 237 'you have to pass server_hostname when using ssl') 238 else: 239 if server_hostname is not None: 240 raise ValueError('server_hostname is only meaningful with ssl') 241 if ssl_handshake_timeout is not None: 242 raise ValueError( 243 'ssl_handshake_timeout is only meaningful with ssl') 244 245 if path is not None: 246 if sock is not None: 247 raise ValueError( 248 'path and sock can not be specified at the same time') 249 250 path = os.fspath(path) 251 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 252 try: 253 sock.setblocking(False) 254 await self.sock_connect(sock, path) 255 except: 256 sock.close() 257 raise 258 259 else: 260 if sock is None: 261 raise ValueError('no path and sock were specified') 262 if (sock.family != socket.AF_UNIX or 263 sock.type != socket.SOCK_STREAM): 264 raise ValueError( 265 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 266 sock.setblocking(False) 267 268 transport, protocol = await self._create_connection_transport( 269 sock, protocol_factory, ssl, server_hostname, 270 ssl_handshake_timeout=ssl_handshake_timeout) 271 return transport, protocol 272 273 async def create_unix_server( 274 self, protocol_factory, path=None, *, 275 sock=None, backlog=100, ssl=None, 276 ssl_handshake_timeout=None, 277 start_serving=True): 278 if isinstance(ssl, bool): 279 raise TypeError('ssl argument must be an SSLContext or None') 280 281 if ssl_handshake_timeout is not None and not ssl: 282 raise ValueError( 283 'ssl_handshake_timeout is only meaningful with ssl') 284 285 if path is not None: 286 if sock is not None: 287 raise ValueError( 288 'path and sock can not be specified at the same time') 289 290 path = os.fspath(path) 291 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 292 293 # Check for abstract socket. `str` and `bytes` paths are supported. 294 if path[0] not in (0, '\x00'): 295 try: 296 if stat.S_ISSOCK(os.stat(path).st_mode): 297 os.remove(path) 298 except FileNotFoundError: 299 pass 300 except OSError as err: 301 # Directory may have permissions only to create socket. 302 logger.error('Unable to check or remove stale UNIX socket ' 303 '%r: %r', path, err) 304 305 try: 306 sock.bind(path) 307 except OSError as exc: 308 sock.close() 309 if exc.errno == errno.EADDRINUSE: 310 # Let's improve the error message by adding 311 # with what exact address it occurs. 312 msg = f'Address {path!r} is already in use' 313 raise OSError(errno.EADDRINUSE, msg) from None 314 else: 315 raise 316 except: 317 sock.close() 318 raise 319 else: 320 if sock is None: 321 raise ValueError( 322 'path was not specified, and no sock specified') 323 324 if (sock.family != socket.AF_UNIX or 325 sock.type != socket.SOCK_STREAM): 326 raise ValueError( 327 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 328 329 sock.setblocking(False) 330 server = base_events.Server(self, [sock], protocol_factory, 331 ssl, backlog, ssl_handshake_timeout) 332 if start_serving: 333 server._start_serving() 334 # Skip one loop iteration so that all 'loop.add_reader' 335 # go through. 336 await tasks.sleep(0) 337 338 return server 339 340 async def _sock_sendfile_native(self, sock, file, offset, count): 341 try: 342 os.sendfile 343 except AttributeError: 344 raise exceptions.SendfileNotAvailableError( 345 "os.sendfile() is not available") 346 try: 347 fileno = file.fileno() 348 except (AttributeError, io.UnsupportedOperation) as err: 349 raise exceptions.SendfileNotAvailableError("not a regular file") 350 try: 351 fsize = os.fstat(fileno).st_size 352 except OSError: 353 raise exceptions.SendfileNotAvailableError("not a regular file") 354 blocksize = count if count else fsize 355 if not blocksize: 356 return 0 # empty file 357 358 fut = self.create_future() 359 self._sock_sendfile_native_impl(fut, None, sock, fileno, 360 offset, count, blocksize, 0) 361 return await fut 362 363 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 364 offset, count, blocksize, total_sent): 365 fd = sock.fileno() 366 if registered_fd is not None: 367 # Remove the callback early. It should be rare that the 368 # selector says the fd is ready but the call still returns 369 # EAGAIN, and I am willing to take a hit in that case in 370 # order to simplify the common case. 371 self.remove_writer(registered_fd) 372 if fut.cancelled(): 373 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 374 return 375 if count: 376 blocksize = count - total_sent 377 if blocksize <= 0: 378 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 379 fut.set_result(total_sent) 380 return 381 382 try: 383 sent = os.sendfile(fd, fileno, offset, blocksize) 384 except (BlockingIOError, InterruptedError): 385 if registered_fd is None: 386 self._sock_add_cancellation_callback(fut, sock) 387 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 388 fd, sock, fileno, 389 offset, count, blocksize, total_sent) 390 except OSError as exc: 391 if (registered_fd is not None and 392 exc.errno == errno.ENOTCONN and 393 type(exc) is not ConnectionError): 394 # If we have an ENOTCONN and this isn't a first call to 395 # sendfile(), i.e. the connection was closed in the middle 396 # of the operation, normalize the error to ConnectionError 397 # to make it consistent across all Posix systems. 398 new_exc = ConnectionError( 399 "socket is not connected", errno.ENOTCONN) 400 new_exc.__cause__ = exc 401 exc = new_exc 402 if total_sent == 0: 403 # We can get here for different reasons, the main 404 # one being 'file' is not a regular mmap(2)-like 405 # file, in which case we'll fall back on using 406 # plain send(). 407 err = exceptions.SendfileNotAvailableError( 408 "os.sendfile call failed") 409 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 410 fut.set_exception(err) 411 else: 412 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 413 fut.set_exception(exc) 414 except (SystemExit, KeyboardInterrupt): 415 raise 416 except BaseException as exc: 417 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 418 fut.set_exception(exc) 419 else: 420 if sent == 0: 421 # EOF 422 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 423 fut.set_result(total_sent) 424 else: 425 offset += sent 426 total_sent += sent 427 if registered_fd is None: 428 self._sock_add_cancellation_callback(fut, sock) 429 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 430 fd, sock, fileno, 431 offset, count, blocksize, total_sent) 432 433 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 434 if total_sent > 0: 435 os.lseek(fileno, offset, os.SEEK_SET) 436 437 def _sock_add_cancellation_callback(self, fut, sock): 438 def cb(fut): 439 if fut.cancelled(): 440 fd = sock.fileno() 441 if fd != -1: 442 self.remove_writer(fd) 443 fut.add_done_callback(cb) 444 445 446class _UnixReadPipeTransport(transports.ReadTransport): 447 448 max_size = 256 * 1024 # max bytes we read in one event loop iteration 449 450 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 451 super().__init__(extra) 452 self._extra['pipe'] = pipe 453 self._loop = loop 454 self._pipe = pipe 455 self._fileno = pipe.fileno() 456 self._protocol = protocol 457 self._closing = False 458 self._paused = False 459 460 mode = os.fstat(self._fileno).st_mode 461 if not (stat.S_ISFIFO(mode) or 462 stat.S_ISSOCK(mode) or 463 stat.S_ISCHR(mode)): 464 self._pipe = None 465 self._fileno = None 466 self._protocol = None 467 raise ValueError("Pipe transport is for pipes/sockets only.") 468 469 os.set_blocking(self._fileno, False) 470 471 self._loop.call_soon(self._protocol.connection_made, self) 472 # only start reading when connection_made() has been called 473 self._loop.call_soon(self._loop._add_reader, 474 self._fileno, self._read_ready) 475 if waiter is not None: 476 # only wake up the waiter when connection_made() has been called 477 self._loop.call_soon(futures._set_result_unless_cancelled, 478 waiter, None) 479 480 def __repr__(self): 481 info = [self.__class__.__name__] 482 if self._pipe is None: 483 info.append('closed') 484 elif self._closing: 485 info.append('closing') 486 info.append(f'fd={self._fileno}') 487 selector = getattr(self._loop, '_selector', None) 488 if self._pipe is not None and selector is not None: 489 polling = selector_events._test_selector_event( 490 selector, self._fileno, selectors.EVENT_READ) 491 if polling: 492 info.append('polling') 493 else: 494 info.append('idle') 495 elif self._pipe is not None: 496 info.append('open') 497 else: 498 info.append('closed') 499 return '<{}>'.format(' '.join(info)) 500 501 def _read_ready(self): 502 try: 503 data = os.read(self._fileno, self.max_size) 504 except (BlockingIOError, InterruptedError): 505 pass 506 except OSError as exc: 507 self._fatal_error(exc, 'Fatal read error on pipe transport') 508 else: 509 if data: 510 self._protocol.data_received(data) 511 else: 512 if self._loop.get_debug(): 513 logger.info("%r was closed by peer", self) 514 self._closing = True 515 self._loop._remove_reader(self._fileno) 516 self._loop.call_soon(self._protocol.eof_received) 517 self._loop.call_soon(self._call_connection_lost, None) 518 519 def pause_reading(self): 520 if self._closing or self._paused: 521 return 522 self._paused = True 523 self._loop._remove_reader(self._fileno) 524 if self._loop.get_debug(): 525 logger.debug("%r pauses reading", self) 526 527 def resume_reading(self): 528 if self._closing or not self._paused: 529 return 530 self._paused = False 531 self._loop._add_reader(self._fileno, self._read_ready) 532 if self._loop.get_debug(): 533 logger.debug("%r resumes reading", self) 534 535 def set_protocol(self, protocol): 536 self._protocol = protocol 537 538 def get_protocol(self): 539 return self._protocol 540 541 def is_closing(self): 542 return self._closing 543 544 def close(self): 545 if not self._closing: 546 self._close(None) 547 548 def __del__(self, _warn=warnings.warn): 549 if self._pipe is not None: 550 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 551 self._pipe.close() 552 553 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 554 # should be called by exception handler only 555 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 556 if self._loop.get_debug(): 557 logger.debug("%r: %s", self, message, exc_info=True) 558 else: 559 self._loop.call_exception_handler({ 560 'message': message, 561 'exception': exc, 562 'transport': self, 563 'protocol': self._protocol, 564 }) 565 self._close(exc) 566 567 def _close(self, exc): 568 self._closing = True 569 self._loop._remove_reader(self._fileno) 570 self._loop.call_soon(self._call_connection_lost, exc) 571 572 def _call_connection_lost(self, exc): 573 try: 574 self._protocol.connection_lost(exc) 575 finally: 576 self._pipe.close() 577 self._pipe = None 578 self._protocol = None 579 self._loop = None 580 581 582class _UnixWritePipeTransport(transports._FlowControlMixin, 583 transports.WriteTransport): 584 585 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 586 super().__init__(extra, loop) 587 self._extra['pipe'] = pipe 588 self._pipe = pipe 589 self._fileno = pipe.fileno() 590 self._protocol = protocol 591 self._buffer = bytearray() 592 self._conn_lost = 0 593 self._closing = False # Set when close() or write_eof() called. 594 595 mode = os.fstat(self._fileno).st_mode 596 is_char = stat.S_ISCHR(mode) 597 is_fifo = stat.S_ISFIFO(mode) 598 is_socket = stat.S_ISSOCK(mode) 599 if not (is_char or is_fifo or is_socket): 600 self._pipe = None 601 self._fileno = None 602 self._protocol = None 603 raise ValueError("Pipe transport is only for " 604 "pipes, sockets and character devices") 605 606 os.set_blocking(self._fileno, False) 607 self._loop.call_soon(self._protocol.connection_made, self) 608 609 # On AIX, the reader trick (to be notified when the read end of the 610 # socket is closed) only works for sockets. On other platforms it 611 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 612 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 613 # only start reading when connection_made() has been called 614 self._loop.call_soon(self._loop._add_reader, 615 self._fileno, self._read_ready) 616 617 if waiter is not None: 618 # only wake up the waiter when connection_made() has been called 619 self._loop.call_soon(futures._set_result_unless_cancelled, 620 waiter, None) 621 622 def __repr__(self): 623 info = [self.__class__.__name__] 624 if self._pipe is None: 625 info.append('closed') 626 elif self._closing: 627 info.append('closing') 628 info.append(f'fd={self._fileno}') 629 selector = getattr(self._loop, '_selector', None) 630 if self._pipe is not None and selector is not None: 631 polling = selector_events._test_selector_event( 632 selector, self._fileno, selectors.EVENT_WRITE) 633 if polling: 634 info.append('polling') 635 else: 636 info.append('idle') 637 638 bufsize = self.get_write_buffer_size() 639 info.append(f'bufsize={bufsize}') 640 elif self._pipe is not None: 641 info.append('open') 642 else: 643 info.append('closed') 644 return '<{}>'.format(' '.join(info)) 645 646 def get_write_buffer_size(self): 647 return len(self._buffer) 648 649 def _read_ready(self): 650 # Pipe was closed by peer. 651 if self._loop.get_debug(): 652 logger.info("%r was closed by peer", self) 653 if self._buffer: 654 self._close(BrokenPipeError()) 655 else: 656 self._close() 657 658 def write(self, data): 659 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 660 if isinstance(data, bytearray): 661 data = memoryview(data) 662 if not data: 663 return 664 665 if self._conn_lost or self._closing: 666 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 667 logger.warning('pipe closed by peer or ' 668 'os.write(pipe, data) raised exception.') 669 self._conn_lost += 1 670 return 671 672 if not self._buffer: 673 # Attempt to send it right away first. 674 try: 675 n = os.write(self._fileno, data) 676 except (BlockingIOError, InterruptedError): 677 n = 0 678 except (SystemExit, KeyboardInterrupt): 679 raise 680 except BaseException as exc: 681 self._conn_lost += 1 682 self._fatal_error(exc, 'Fatal write error on pipe transport') 683 return 684 if n == len(data): 685 return 686 elif n > 0: 687 data = memoryview(data)[n:] 688 self._loop._add_writer(self._fileno, self._write_ready) 689 690 self._buffer += data 691 self._maybe_pause_protocol() 692 693 def _write_ready(self): 694 assert self._buffer, 'Data should not be empty' 695 696 try: 697 n = os.write(self._fileno, self._buffer) 698 except (BlockingIOError, InterruptedError): 699 pass 700 except (SystemExit, KeyboardInterrupt): 701 raise 702 except BaseException as exc: 703 self._buffer.clear() 704 self._conn_lost += 1 705 # Remove writer here, _fatal_error() doesn't it 706 # because _buffer is empty. 707 self._loop._remove_writer(self._fileno) 708 self._fatal_error(exc, 'Fatal write error on pipe transport') 709 else: 710 if n == len(self._buffer): 711 self._buffer.clear() 712 self._loop._remove_writer(self._fileno) 713 self._maybe_resume_protocol() # May append to buffer. 714 if self._closing: 715 self._loop._remove_reader(self._fileno) 716 self._call_connection_lost(None) 717 return 718 elif n > 0: 719 del self._buffer[:n] 720 721 def can_write_eof(self): 722 return True 723 724 def write_eof(self): 725 if self._closing: 726 return 727 assert self._pipe 728 self._closing = True 729 if not self._buffer: 730 self._loop._remove_reader(self._fileno) 731 self._loop.call_soon(self._call_connection_lost, None) 732 733 def set_protocol(self, protocol): 734 self._protocol = protocol 735 736 def get_protocol(self): 737 return self._protocol 738 739 def is_closing(self): 740 return self._closing 741 742 def close(self): 743 if self._pipe is not None and not self._closing: 744 # write_eof is all what we needed to close the write pipe 745 self.write_eof() 746 747 def __del__(self, _warn=warnings.warn): 748 if self._pipe is not None: 749 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 750 self._pipe.close() 751 752 def abort(self): 753 self._close(None) 754 755 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 756 # should be called by exception handler only 757 if isinstance(exc, OSError): 758 if self._loop.get_debug(): 759 logger.debug("%r: %s", self, message, exc_info=True) 760 else: 761 self._loop.call_exception_handler({ 762 'message': message, 763 'exception': exc, 764 'transport': self, 765 'protocol': self._protocol, 766 }) 767 self._close(exc) 768 769 def _close(self, exc=None): 770 self._closing = True 771 if self._buffer: 772 self._loop._remove_writer(self._fileno) 773 self._buffer.clear() 774 self._loop._remove_reader(self._fileno) 775 self._loop.call_soon(self._call_connection_lost, exc) 776 777 def _call_connection_lost(self, exc): 778 try: 779 self._protocol.connection_lost(exc) 780 finally: 781 self._pipe.close() 782 self._pipe = None 783 self._protocol = None 784 self._loop = None 785 786 787class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 788 789 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 790 stdin_w = None 791 if stdin == subprocess.PIPE: 792 # Use a socket pair for stdin, since not all platforms 793 # support selecting read events on the write end of a 794 # socket (which we use in order to detect closing of the 795 # other end). Notably this is needed on AIX, and works 796 # just fine on other platforms. 797 stdin, stdin_w = socket.socketpair() 798 try: 799 self._proc = subprocess.Popen( 800 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 801 universal_newlines=False, bufsize=bufsize, **kwargs) 802 if stdin_w is not None: 803 stdin.close() 804 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 805 stdin_w = None 806 finally: 807 if stdin_w is not None: 808 stdin.close() 809 stdin_w.close() 810 811 812class AbstractChildWatcher: 813 """Abstract base class for monitoring child processes. 814 815 Objects derived from this class monitor a collection of subprocesses and 816 report their termination or interruption by a signal. 817 818 New callbacks are registered with .add_child_handler(). Starting a new 819 process must be done within a 'with' block to allow the watcher to suspend 820 its activity until the new process if fully registered (this is needed to 821 prevent a race condition in some implementations). 822 823 Example: 824 with watcher: 825 proc = subprocess.Popen("sleep 1") 826 watcher.add_child_handler(proc.pid, callback) 827 828 Notes: 829 Implementations of this class must be thread-safe. 830 831 Since child watcher objects may catch the SIGCHLD signal and call 832 waitpid(-1), there should be only one active object per process. 833 """ 834 835 def add_child_handler(self, pid, callback, *args): 836 """Register a new child handler. 837 838 Arrange for callback(pid, returncode, *args) to be called when 839 process 'pid' terminates. Specifying another callback for the same 840 process replaces the previous handler. 841 842 Note: callback() must be thread-safe. 843 """ 844 raise NotImplementedError() 845 846 def remove_child_handler(self, pid): 847 """Removes the handler for process 'pid'. 848 849 The function returns True if the handler was successfully removed, 850 False if there was nothing to remove.""" 851 852 raise NotImplementedError() 853 854 def attach_loop(self, loop): 855 """Attach the watcher to an event loop. 856 857 If the watcher was previously attached to an event loop, then it is 858 first detached before attaching to the new loop. 859 860 Note: loop may be None. 861 """ 862 raise NotImplementedError() 863 864 def close(self): 865 """Close the watcher. 866 867 This must be called to make sure that any underlying resource is freed. 868 """ 869 raise NotImplementedError() 870 871 def is_active(self): 872 """Return ``True`` if the watcher is active and is used by the event loop. 873 874 Return True if the watcher is installed and ready to handle process exit 875 notifications. 876 877 """ 878 raise NotImplementedError() 879 880 def __enter__(self): 881 """Enter the watcher's context and allow starting new processes 882 883 This function must return self""" 884 raise NotImplementedError() 885 886 def __exit__(self, a, b, c): 887 """Exit the watcher's context""" 888 raise NotImplementedError() 889 890 891class PidfdChildWatcher(AbstractChildWatcher): 892 """Child watcher implementation using Linux's pid file descriptors. 893 894 This child watcher polls process file descriptors (pidfds) to await child 895 process termination. In some respects, PidfdChildWatcher is a "Goldilocks" 896 child watcher implementation. It doesn't require signals or threads, doesn't 897 interfere with any processes launched outside the event loop, and scales 898 linearly with the number of subprocesses launched by the event loop. The 899 main disadvantage is that pidfds are specific to Linux, and only work on 900 recent (5.3+) kernels. 901 """ 902 903 def __init__(self): 904 self._loop = None 905 self._callbacks = {} 906 907 def __enter__(self): 908 return self 909 910 def __exit__(self, exc_type, exc_value, exc_traceback): 911 pass 912 913 def is_active(self): 914 return self._loop is not None and self._loop.is_running() 915 916 def close(self): 917 self.attach_loop(None) 918 919 def attach_loop(self, loop): 920 if self._loop is not None and loop is None and self._callbacks: 921 warnings.warn( 922 'A loop is being detached ' 923 'from a child watcher with pending handlers', 924 RuntimeWarning) 925 for pidfd, _, _ in self._callbacks.values(): 926 self._loop._remove_reader(pidfd) 927 os.close(pidfd) 928 self._callbacks.clear() 929 self._loop = loop 930 931 def add_child_handler(self, pid, callback, *args): 932 existing = self._callbacks.get(pid) 933 if existing is not None: 934 self._callbacks[pid] = existing[0], callback, args 935 else: 936 pidfd = os.pidfd_open(pid) 937 self._loop._add_reader(pidfd, self._do_wait, pid) 938 self._callbacks[pid] = pidfd, callback, args 939 940 def _do_wait(self, pid): 941 pidfd, callback, args = self._callbacks.pop(pid) 942 self._loop._remove_reader(pidfd) 943 try: 944 _, status = os.waitpid(pid, 0) 945 except ChildProcessError: 946 # The child process is already reaped 947 # (may happen if waitpid() is called elsewhere). 948 returncode = 255 949 logger.warning( 950 "child process pid %d exit status already read: " 951 " will report returncode 255", 952 pid) 953 else: 954 returncode = waitstatus_to_exitcode(status) 955 956 os.close(pidfd) 957 callback(pid, returncode, *args) 958 959 def remove_child_handler(self, pid): 960 try: 961 pidfd, _, _ = self._callbacks.pop(pid) 962 except KeyError: 963 return False 964 self._loop._remove_reader(pidfd) 965 os.close(pidfd) 966 return True 967 968 969class BaseChildWatcher(AbstractChildWatcher): 970 971 def __init__(self): 972 self._loop = None 973 self._callbacks = {} 974 975 def close(self): 976 self.attach_loop(None) 977 978 def is_active(self): 979 return self._loop is not None and self._loop.is_running() 980 981 def _do_waitpid(self, expected_pid): 982 raise NotImplementedError() 983 984 def _do_waitpid_all(self): 985 raise NotImplementedError() 986 987 def attach_loop(self, loop): 988 assert loop is None or isinstance(loop, events.AbstractEventLoop) 989 990 if self._loop is not None and loop is None and self._callbacks: 991 warnings.warn( 992 'A loop is being detached ' 993 'from a child watcher with pending handlers', 994 RuntimeWarning) 995 996 if self._loop is not None: 997 self._loop.remove_signal_handler(signal.SIGCHLD) 998 999 self._loop = loop 1000 if loop is not None: 1001 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) 1002 1003 # Prevent a race condition in case a child terminated 1004 # during the switch. 1005 self._do_waitpid_all() 1006 1007 def _sig_chld(self): 1008 try: 1009 self._do_waitpid_all() 1010 except (SystemExit, KeyboardInterrupt): 1011 raise 1012 except BaseException as exc: 1013 # self._loop should always be available here 1014 # as '_sig_chld' is added as a signal handler 1015 # in 'attach_loop' 1016 self._loop.call_exception_handler({ 1017 'message': 'Unknown exception in SIGCHLD handler', 1018 'exception': exc, 1019 }) 1020 1021 1022class SafeChildWatcher(BaseChildWatcher): 1023 """'Safe' child watcher implementation. 1024 1025 This implementation avoids disrupting other code spawning processes by 1026 polling explicitly each process in the SIGCHLD handler instead of calling 1027 os.waitpid(-1). 1028 1029 This is a safe solution but it has a significant overhead when handling a 1030 big number of children (O(n) each time SIGCHLD is raised) 1031 """ 1032 1033 def close(self): 1034 self._callbacks.clear() 1035 super().close() 1036 1037 def __enter__(self): 1038 return self 1039 1040 def __exit__(self, a, b, c): 1041 pass 1042 1043 def add_child_handler(self, pid, callback, *args): 1044 self._callbacks[pid] = (callback, args) 1045 1046 # Prevent a race condition in case the child is already terminated. 1047 self._do_waitpid(pid) 1048 1049 def remove_child_handler(self, pid): 1050 try: 1051 del self._callbacks[pid] 1052 return True 1053 except KeyError: 1054 return False 1055 1056 def _do_waitpid_all(self): 1057 1058 for pid in list(self._callbacks): 1059 self._do_waitpid(pid) 1060 1061 def _do_waitpid(self, expected_pid): 1062 assert expected_pid > 0 1063 1064 try: 1065 pid, status = os.waitpid(expected_pid, os.WNOHANG) 1066 except ChildProcessError: 1067 # The child process is already reaped 1068 # (may happen if waitpid() is called elsewhere). 1069 pid = expected_pid 1070 returncode = 255 1071 logger.warning( 1072 "Unknown child process pid %d, will report returncode 255", 1073 pid) 1074 else: 1075 if pid == 0: 1076 # The child process is still alive. 1077 return 1078 1079 returncode = waitstatus_to_exitcode(status) 1080 if self._loop.get_debug(): 1081 logger.debug('process %s exited with returncode %s', 1082 expected_pid, returncode) 1083 1084 try: 1085 callback, args = self._callbacks.pop(pid) 1086 except KeyError: # pragma: no cover 1087 # May happen if .remove_child_handler() is called 1088 # after os.waitpid() returns. 1089 if self._loop.get_debug(): 1090 logger.warning("Child watcher got an unexpected pid: %r", 1091 pid, exc_info=True) 1092 else: 1093 callback(pid, returncode, *args) 1094 1095 1096class FastChildWatcher(BaseChildWatcher): 1097 """'Fast' child watcher implementation. 1098 1099 This implementation reaps every terminated processes by calling 1100 os.waitpid(-1) directly, possibly breaking other code spawning processes 1101 and waiting for their termination. 1102 1103 There is no noticeable overhead when handling a big number of children 1104 (O(1) each time a child terminates). 1105 """ 1106 def __init__(self): 1107 super().__init__() 1108 self._lock = threading.Lock() 1109 self._zombies = {} 1110 self._forks = 0 1111 1112 def close(self): 1113 self._callbacks.clear() 1114 self._zombies.clear() 1115 super().close() 1116 1117 def __enter__(self): 1118 with self._lock: 1119 self._forks += 1 1120 1121 return self 1122 1123 def __exit__(self, a, b, c): 1124 with self._lock: 1125 self._forks -= 1 1126 1127 if self._forks or not self._zombies: 1128 return 1129 1130 collateral_victims = str(self._zombies) 1131 self._zombies.clear() 1132 1133 logger.warning( 1134 "Caught subprocesses termination from unknown pids: %s", 1135 collateral_victims) 1136 1137 def add_child_handler(self, pid, callback, *args): 1138 assert self._forks, "Must use the context manager" 1139 1140 with self._lock: 1141 try: 1142 returncode = self._zombies.pop(pid) 1143 except KeyError: 1144 # The child is running. 1145 self._callbacks[pid] = callback, args 1146 return 1147 1148 # The child is dead already. We can fire the callback. 1149 callback(pid, returncode, *args) 1150 1151 def remove_child_handler(self, pid): 1152 try: 1153 del self._callbacks[pid] 1154 return True 1155 except KeyError: 1156 return False 1157 1158 def _do_waitpid_all(self): 1159 # Because of signal coalescing, we must keep calling waitpid() as 1160 # long as we're able to reap a child. 1161 while True: 1162 try: 1163 pid, status = os.waitpid(-1, os.WNOHANG) 1164 except ChildProcessError: 1165 # No more child processes exist. 1166 return 1167 else: 1168 if pid == 0: 1169 # A child process is still alive. 1170 return 1171 1172 returncode = waitstatus_to_exitcode(status) 1173 1174 with self._lock: 1175 try: 1176 callback, args = self._callbacks.pop(pid) 1177 except KeyError: 1178 # unknown child 1179 if self._forks: 1180 # It may not be registered yet. 1181 self._zombies[pid] = returncode 1182 if self._loop.get_debug(): 1183 logger.debug('unknown process %s exited ' 1184 'with returncode %s', 1185 pid, returncode) 1186 continue 1187 callback = None 1188 else: 1189 if self._loop.get_debug(): 1190 logger.debug('process %s exited with returncode %s', 1191 pid, returncode) 1192 1193 if callback is None: 1194 logger.warning( 1195 "Caught subprocess termination from unknown pid: " 1196 "%d -> %d", pid, returncode) 1197 else: 1198 callback(pid, returncode, *args) 1199 1200 1201class MultiLoopChildWatcher(AbstractChildWatcher): 1202 """A watcher that doesn't require running loop in the main thread. 1203 1204 This implementation registers a SIGCHLD signal handler on 1205 instantiation (which may conflict with other code that 1206 install own handler for this signal). 1207 1208 The solution is safe but it has a significant overhead when 1209 handling a big number of processes (*O(n)* each time a 1210 SIGCHLD is received). 1211 """ 1212 1213 # Implementation note: 1214 # The class keeps compatibility with AbstractChildWatcher ABC 1215 # To achieve this it has empty attach_loop() method 1216 # and doesn't accept explicit loop argument 1217 # for add_child_handler()/remove_child_handler() 1218 # but retrieves the current loop by get_running_loop() 1219 1220 def __init__(self): 1221 self._callbacks = {} 1222 self._saved_sighandler = None 1223 1224 def is_active(self): 1225 return self._saved_sighandler is not None 1226 1227 def close(self): 1228 self._callbacks.clear() 1229 if self._saved_sighandler is None: 1230 return 1231 1232 handler = signal.getsignal(signal.SIGCHLD) 1233 if handler != self._sig_chld: 1234 logger.warning("SIGCHLD handler was changed by outside code") 1235 else: 1236 signal.signal(signal.SIGCHLD, self._saved_sighandler) 1237 self._saved_sighandler = None 1238 1239 def __enter__(self): 1240 return self 1241 1242 def __exit__(self, exc_type, exc_val, exc_tb): 1243 pass 1244 1245 def add_child_handler(self, pid, callback, *args): 1246 loop = events.get_running_loop() 1247 self._callbacks[pid] = (loop, callback, args) 1248 1249 # Prevent a race condition in case the child is already terminated. 1250 self._do_waitpid(pid) 1251 1252 def remove_child_handler(self, pid): 1253 try: 1254 del self._callbacks[pid] 1255 return True 1256 except KeyError: 1257 return False 1258 1259 def attach_loop(self, loop): 1260 # Don't save the loop but initialize itself if called first time 1261 # The reason to do it here is that attach_loop() is called from 1262 # unix policy only for the main thread. 1263 # Main thread is required for subscription on SIGCHLD signal 1264 if self._saved_sighandler is not None: 1265 return 1266 1267 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) 1268 if self._saved_sighandler is None: 1269 logger.warning("Previous SIGCHLD handler was set by non-Python code, " 1270 "restore to default handler on watcher close.") 1271 self._saved_sighandler = signal.SIG_DFL 1272 1273 # Set SA_RESTART to limit EINTR occurrences. 1274 signal.siginterrupt(signal.SIGCHLD, False) 1275 1276 def _do_waitpid_all(self): 1277 for pid in list(self._callbacks): 1278 self._do_waitpid(pid) 1279 1280 def _do_waitpid(self, expected_pid): 1281 assert expected_pid > 0 1282 1283 try: 1284 pid, status = os.waitpid(expected_pid, os.WNOHANG) 1285 except ChildProcessError: 1286 # The child process is already reaped 1287 # (may happen if waitpid() is called elsewhere). 1288 pid = expected_pid 1289 returncode = 255 1290 logger.warning( 1291 "Unknown child process pid %d, will report returncode 255", 1292 pid) 1293 debug_log = False 1294 else: 1295 if pid == 0: 1296 # The child process is still alive. 1297 return 1298 1299 returncode = waitstatus_to_exitcode(status) 1300 debug_log = True 1301 try: 1302 loop, callback, args = self._callbacks.pop(pid) 1303 except KeyError: # pragma: no cover 1304 # May happen if .remove_child_handler() is called 1305 # after os.waitpid() returns. 1306 logger.warning("Child watcher got an unexpected pid: %r", 1307 pid, exc_info=True) 1308 else: 1309 if loop.is_closed(): 1310 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 1311 else: 1312 if debug_log and loop.get_debug(): 1313 logger.debug('process %s exited with returncode %s', 1314 expected_pid, returncode) 1315 loop.call_soon_threadsafe(callback, pid, returncode, *args) 1316 1317 def _sig_chld(self, signum, frame): 1318 try: 1319 self._do_waitpid_all() 1320 except (SystemExit, KeyboardInterrupt): 1321 raise 1322 except BaseException: 1323 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) 1324 1325 1326class ThreadedChildWatcher(AbstractChildWatcher): 1327 """Threaded child watcher implementation. 1328 1329 The watcher uses a thread per process 1330 for waiting for the process finish. 1331 1332 It doesn't require subscription on POSIX signal 1333 but a thread creation is not free. 1334 1335 The watcher has O(1) complexity, its performance doesn't depend 1336 on amount of spawn processes. 1337 """ 1338 1339 def __init__(self): 1340 self._pid_counter = itertools.count(0) 1341 self._threads = {} 1342 1343 def is_active(self): 1344 return True 1345 1346 def close(self): 1347 self._join_threads() 1348 1349 def _join_threads(self): 1350 """Internal: Join all non-daemon threads""" 1351 threads = [thread for thread in list(self._threads.values()) 1352 if thread.is_alive() and not thread.daemon] 1353 for thread in threads: 1354 thread.join() 1355 1356 def __enter__(self): 1357 return self 1358 1359 def __exit__(self, exc_type, exc_val, exc_tb): 1360 pass 1361 1362 def __del__(self, _warn=warnings.warn): 1363 threads = [thread for thread in list(self._threads.values()) 1364 if thread.is_alive()] 1365 if threads: 1366 _warn(f"{self.__class__} has registered but not finished child processes", 1367 ResourceWarning, 1368 source=self) 1369 1370 def add_child_handler(self, pid, callback, *args): 1371 loop = events.get_running_loop() 1372 thread = threading.Thread(target=self._do_waitpid, 1373 name=f"waitpid-{next(self._pid_counter)}", 1374 args=(loop, pid, callback, args), 1375 daemon=True) 1376 self._threads[pid] = thread 1377 thread.start() 1378 1379 def remove_child_handler(self, pid): 1380 # asyncio never calls remove_child_handler() !!! 1381 # The method is no-op but is implemented because 1382 # abstract base classes require it. 1383 return True 1384 1385 def attach_loop(self, loop): 1386 pass 1387 1388 def _do_waitpid(self, loop, expected_pid, callback, args): 1389 assert expected_pid > 0 1390 1391 try: 1392 pid, status = os.waitpid(expected_pid, 0) 1393 except ChildProcessError: 1394 # The child process is already reaped 1395 # (may happen if waitpid() is called elsewhere). 1396 pid = expected_pid 1397 returncode = 255 1398 logger.warning( 1399 "Unknown child process pid %d, will report returncode 255", 1400 pid) 1401 else: 1402 returncode = waitstatus_to_exitcode(status) 1403 if loop.get_debug(): 1404 logger.debug('process %s exited with returncode %s', 1405 expected_pid, returncode) 1406 1407 if loop.is_closed(): 1408 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 1409 else: 1410 loop.call_soon_threadsafe(callback, pid, returncode, *args) 1411 1412 self._threads.pop(expected_pid) 1413 1414 1415class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 1416 """UNIX event loop policy with a watcher for child processes.""" 1417 _loop_factory = _UnixSelectorEventLoop 1418 1419 def __init__(self): 1420 super().__init__() 1421 self._watcher = None 1422 1423 def _init_watcher(self): 1424 with events._lock: 1425 if self._watcher is None: # pragma: no branch 1426 self._watcher = ThreadedChildWatcher() 1427 if threading.current_thread() is threading.main_thread(): 1428 self._watcher.attach_loop(self._local._loop) 1429 1430 def set_event_loop(self, loop): 1431 """Set the event loop. 1432 1433 As a side effect, if a child watcher was set before, then calling 1434 .set_event_loop() from the main thread will call .attach_loop(loop) on 1435 the child watcher. 1436 """ 1437 1438 super().set_event_loop(loop) 1439 1440 if (self._watcher is not None and 1441 threading.current_thread() is threading.main_thread()): 1442 self._watcher.attach_loop(loop) 1443 1444 def get_child_watcher(self): 1445 """Get the watcher for child processes. 1446 1447 If not yet set, a ThreadedChildWatcher object is automatically created. 1448 """ 1449 if self._watcher is None: 1450 self._init_watcher() 1451 1452 return self._watcher 1453 1454 def set_child_watcher(self, watcher): 1455 """Set the watcher for child processes.""" 1456 1457 assert watcher is None or isinstance(watcher, AbstractChildWatcher) 1458 1459 if self._watcher is not None: 1460 self._watcher.close() 1461 1462 self._watcher = watcher 1463 1464 1465SelectorEventLoop = _UnixSelectorEventLoop 1466DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 1467