1"""Selector event loop for Unix with signal handling.""" 2 3import errno 4import io 5import itertools 6import os 7import selectors 8import signal 9import socket 10import stat 11import subprocess 12import sys 13import threading 14import warnings 15 16from . import base_events 17from . import base_subprocess 18from . import constants 19from . import coroutines 20from . import events 21from . import exceptions 22from . import futures 23from . import selector_events 24from . import tasks 25from . import transports 26from .log import logger 27 28 29__all__ = ( 30 'SelectorEventLoop', 31 'AbstractChildWatcher', 'SafeChildWatcher', 32 'FastChildWatcher', 'PidfdChildWatcher', 33 'MultiLoopChildWatcher', 'ThreadedChildWatcher', 34 'DefaultEventLoopPolicy', 35) 36 37 38if sys.platform == 'win32': # pragma: no cover 39 raise ImportError('Signals are not really supported on Windows') 40 41 42def _sighandler_noop(signum, frame): 43 """Dummy signal handler.""" 44 pass 45 46 47class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 48 """Unix event loop. 49 50 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 51 """ 52 53 def __init__(self, selector=None): 54 super().__init__(selector) 55 self._signal_handlers = {} 56 57 def close(self): 58 super().close() 59 if not sys.is_finalizing(): 60 for sig in list(self._signal_handlers): 61 self.remove_signal_handler(sig) 62 else: 63 if self._signal_handlers: 64 warnings.warn(f"Closing the loop {self!r} " 65 f"on interpreter shutdown " 66 f"stage, skipping signal handlers removal", 67 ResourceWarning, 68 source=self) 69 self._signal_handlers.clear() 70 71 def _process_self_data(self, data): 72 for signum in data: 73 if not signum: 74 # ignore null bytes written by _write_to_self() 75 continue 76 self._handle_signal(signum) 77 78 def add_signal_handler(self, sig, callback, *args): 79 """Add a handler for a signal. UNIX only. 80 81 Raise ValueError if the signal number is invalid or uncatchable. 82 Raise RuntimeError if there is a problem setting up the handler. 83 """ 84 if (coroutines.iscoroutine(callback) or 85 coroutines.iscoroutinefunction(callback)): 86 raise TypeError("coroutines cannot be used " 87 "with add_signal_handler()") 88 self._check_signal(sig) 89 self._check_closed() 90 try: 91 # set_wakeup_fd() raises ValueError if this is not the 92 # main thread. By calling it early we ensure that an 93 # event loop running in another thread cannot add a signal 94 # handler. 95 signal.set_wakeup_fd(self._csock.fileno()) 96 except (ValueError, OSError) as exc: 97 raise RuntimeError(str(exc)) 98 99 handle = events.Handle(callback, args, self, None) 100 self._signal_handlers[sig] = handle 101 102 try: 103 # Register a dummy signal handler to ask Python to write the signal 104 # number in the wakeup file descriptor. _process_self_data() will 105 # read signal numbers from this file descriptor to handle signals. 106 signal.signal(sig, _sighandler_noop) 107 108 # Set SA_RESTART to limit EINTR occurrences. 109 signal.siginterrupt(sig, False) 110 except OSError as exc: 111 del self._signal_handlers[sig] 112 if not self._signal_handlers: 113 try: 114 signal.set_wakeup_fd(-1) 115 except (ValueError, OSError) as nexc: 116 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 117 118 if exc.errno == errno.EINVAL: 119 raise RuntimeError(f'sig {sig} cannot be caught') 120 else: 121 raise 122 123 def _handle_signal(self, sig): 124 """Internal helper that is the actual signal handler.""" 125 handle = self._signal_handlers.get(sig) 126 if handle is None: 127 return # Assume it's some race condition. 128 if handle._cancelled: 129 self.remove_signal_handler(sig) # Remove it properly. 130 else: 131 self._add_callback_signalsafe(handle) 132 133 def remove_signal_handler(self, sig): 134 """Remove a handler for a signal. UNIX only. 135 136 Return True if a signal handler was removed, False if not. 137 """ 138 self._check_signal(sig) 139 try: 140 del self._signal_handlers[sig] 141 except KeyError: 142 return False 143 144 if sig == signal.SIGINT: 145 handler = signal.default_int_handler 146 else: 147 handler = signal.SIG_DFL 148 149 try: 150 signal.signal(sig, handler) 151 except OSError as exc: 152 if exc.errno == errno.EINVAL: 153 raise RuntimeError(f'sig {sig} cannot be caught') 154 else: 155 raise 156 157 if not self._signal_handlers: 158 try: 159 signal.set_wakeup_fd(-1) 160 except (ValueError, OSError) as exc: 161 logger.info('set_wakeup_fd(-1) failed: %s', exc) 162 163 return True 164 165 def _check_signal(self, sig): 166 """Internal helper to validate a signal. 167 168 Raise ValueError if the signal number is invalid or uncatchable. 169 Raise RuntimeError if there is a problem setting up the handler. 170 """ 171 if not isinstance(sig, int): 172 raise TypeError(f'sig must be an int, not {sig!r}') 173 174 if sig not in signal.valid_signals(): 175 raise ValueError(f'invalid signal number {sig}') 176 177 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 178 extra=None): 179 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 180 181 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 182 extra=None): 183 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 184 185 async def _make_subprocess_transport(self, protocol, args, shell, 186 stdin, stdout, stderr, bufsize, 187 extra=None, **kwargs): 188 with events.get_child_watcher() as watcher: 189 if not watcher.is_active(): 190 # Check early. 191 # Raising exception before process creation 192 # prevents subprocess execution if the watcher 193 # is not ready to handle it. 194 raise RuntimeError("asyncio.get_child_watcher() is not activated, " 195 "subprocess support is not installed.") 196 waiter = self.create_future() 197 transp = _UnixSubprocessTransport(self, protocol, args, shell, 198 stdin, stdout, stderr, bufsize, 199 waiter=waiter, extra=extra, 200 **kwargs) 201 202 watcher.add_child_handler(transp.get_pid(), 203 self._child_watcher_callback, transp) 204 try: 205 await waiter 206 except (SystemExit, KeyboardInterrupt): 207 raise 208 except BaseException: 209 transp.close() 210 await transp._wait() 211 raise 212 213 return transp 214 215 def _child_watcher_callback(self, pid, returncode, transp): 216 self.call_soon_threadsafe(transp._process_exited, returncode) 217 218 async def create_unix_connection( 219 self, protocol_factory, path=None, *, 220 ssl=None, sock=None, 221 server_hostname=None, 222 ssl_handshake_timeout=None): 223 assert server_hostname is None or isinstance(server_hostname, str) 224 if ssl: 225 if server_hostname is None: 226 raise ValueError( 227 'you have to pass server_hostname when using ssl') 228 else: 229 if server_hostname is not None: 230 raise ValueError('server_hostname is only meaningful with ssl') 231 if ssl_handshake_timeout is not None: 232 raise ValueError( 233 'ssl_handshake_timeout is only meaningful with ssl') 234 235 if path is not None: 236 if sock is not None: 237 raise ValueError( 238 'path and sock can not be specified at the same time') 239 240 path = os.fspath(path) 241 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 242 try: 243 sock.setblocking(False) 244 await self.sock_connect(sock, path) 245 except: 246 sock.close() 247 raise 248 249 else: 250 if sock is None: 251 raise ValueError('no path and sock were specified') 252 if (sock.family != socket.AF_UNIX or 253 sock.type != socket.SOCK_STREAM): 254 raise ValueError( 255 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 256 sock.setblocking(False) 257 258 transport, protocol = await self._create_connection_transport( 259 sock, protocol_factory, ssl, server_hostname, 260 ssl_handshake_timeout=ssl_handshake_timeout) 261 return transport, protocol 262 263 async def create_unix_server( 264 self, protocol_factory, path=None, *, 265 sock=None, backlog=100, ssl=None, 266 ssl_handshake_timeout=None, 267 start_serving=True): 268 if isinstance(ssl, bool): 269 raise TypeError('ssl argument must be an SSLContext or None') 270 271 if ssl_handshake_timeout is not None and not ssl: 272 raise ValueError( 273 'ssl_handshake_timeout is only meaningful with ssl') 274 275 if path is not None: 276 if sock is not None: 277 raise ValueError( 278 'path and sock can not be specified at the same time') 279 280 path = os.fspath(path) 281 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 282 283 # Check for abstract socket. `str` and `bytes` paths are supported. 284 if path[0] not in (0, '\x00'): 285 try: 286 if stat.S_ISSOCK(os.stat(path).st_mode): 287 os.remove(path) 288 except FileNotFoundError: 289 pass 290 except OSError as err: 291 # Directory may have permissions only to create socket. 292 logger.error('Unable to check or remove stale UNIX socket ' 293 '%r: %r', path, err) 294 295 try: 296 sock.bind(path) 297 except OSError as exc: 298 sock.close() 299 if exc.errno == errno.EADDRINUSE: 300 # Let's improve the error message by adding 301 # with what exact address it occurs. 302 msg = f'Address {path!r} is already in use' 303 raise OSError(errno.EADDRINUSE, msg) from None 304 else: 305 raise 306 except: 307 sock.close() 308 raise 309 else: 310 if sock is None: 311 raise ValueError( 312 'path was not specified, and no sock specified') 313 314 if (sock.family != socket.AF_UNIX or 315 sock.type != socket.SOCK_STREAM): 316 raise ValueError( 317 f'A UNIX Domain Stream Socket was expected, got {sock!r}') 318 319 sock.setblocking(False) 320 server = base_events.Server(self, [sock], protocol_factory, 321 ssl, backlog, ssl_handshake_timeout) 322 if start_serving: 323 server._start_serving() 324 # Skip one loop iteration so that all 'loop.add_reader' 325 # go through. 326 await tasks.sleep(0, loop=self) 327 328 return server 329 330 async def _sock_sendfile_native(self, sock, file, offset, count): 331 try: 332 os.sendfile 333 except AttributeError: 334 raise exceptions.SendfileNotAvailableError( 335 "os.sendfile() is not available") 336 try: 337 fileno = file.fileno() 338 except (AttributeError, io.UnsupportedOperation) as err: 339 raise exceptions.SendfileNotAvailableError("not a regular file") 340 try: 341 fsize = os.fstat(fileno).st_size 342 except OSError: 343 raise exceptions.SendfileNotAvailableError("not a regular file") 344 blocksize = count if count else fsize 345 if not blocksize: 346 return 0 # empty file 347 348 fut = self.create_future() 349 self._sock_sendfile_native_impl(fut, None, sock, fileno, 350 offset, count, blocksize, 0) 351 return await fut 352 353 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno, 354 offset, count, blocksize, total_sent): 355 fd = sock.fileno() 356 if registered_fd is not None: 357 # Remove the callback early. It should be rare that the 358 # selector says the fd is ready but the call still returns 359 # EAGAIN, and I am willing to take a hit in that case in 360 # order to simplify the common case. 361 self.remove_writer(registered_fd) 362 if fut.cancelled(): 363 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 364 return 365 if count: 366 blocksize = count - total_sent 367 if blocksize <= 0: 368 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 369 fut.set_result(total_sent) 370 return 371 372 try: 373 sent = os.sendfile(fd, fileno, offset, blocksize) 374 except (BlockingIOError, InterruptedError): 375 if registered_fd is None: 376 self._sock_add_cancellation_callback(fut, sock) 377 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 378 fd, sock, fileno, 379 offset, count, blocksize, total_sent) 380 except OSError as exc: 381 if (registered_fd is not None and 382 exc.errno == errno.ENOTCONN and 383 type(exc) is not ConnectionError): 384 # If we have an ENOTCONN and this isn't a first call to 385 # sendfile(), i.e. the connection was closed in the middle 386 # of the operation, normalize the error to ConnectionError 387 # to make it consistent across all Posix systems. 388 new_exc = ConnectionError( 389 "socket is not connected", errno.ENOTCONN) 390 new_exc.__cause__ = exc 391 exc = new_exc 392 if total_sent == 0: 393 # We can get here for different reasons, the main 394 # one being 'file' is not a regular mmap(2)-like 395 # file, in which case we'll fall back on using 396 # plain send(). 397 err = exceptions.SendfileNotAvailableError( 398 "os.sendfile call failed") 399 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 400 fut.set_exception(err) 401 else: 402 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 403 fut.set_exception(exc) 404 except (SystemExit, KeyboardInterrupt): 405 raise 406 except BaseException as exc: 407 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 408 fut.set_exception(exc) 409 else: 410 if sent == 0: 411 # EOF 412 self._sock_sendfile_update_filepos(fileno, offset, total_sent) 413 fut.set_result(total_sent) 414 else: 415 offset += sent 416 total_sent += sent 417 if registered_fd is None: 418 self._sock_add_cancellation_callback(fut, sock) 419 self.add_writer(fd, self._sock_sendfile_native_impl, fut, 420 fd, sock, fileno, 421 offset, count, blocksize, total_sent) 422 423 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent): 424 if total_sent > 0: 425 os.lseek(fileno, offset, os.SEEK_SET) 426 427 def _sock_add_cancellation_callback(self, fut, sock): 428 def cb(fut): 429 if fut.cancelled(): 430 fd = sock.fileno() 431 if fd != -1: 432 self.remove_writer(fd) 433 fut.add_done_callback(cb) 434 435 436class _UnixReadPipeTransport(transports.ReadTransport): 437 438 max_size = 256 * 1024 # max bytes we read in one event loop iteration 439 440 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 441 super().__init__(extra) 442 self._extra['pipe'] = pipe 443 self._loop = loop 444 self._pipe = pipe 445 self._fileno = pipe.fileno() 446 self._protocol = protocol 447 self._closing = False 448 self._paused = False 449 450 mode = os.fstat(self._fileno).st_mode 451 if not (stat.S_ISFIFO(mode) or 452 stat.S_ISSOCK(mode) or 453 stat.S_ISCHR(mode)): 454 self._pipe = None 455 self._fileno = None 456 self._protocol = None 457 raise ValueError("Pipe transport is for pipes/sockets only.") 458 459 os.set_blocking(self._fileno, False) 460 461 self._loop.call_soon(self._protocol.connection_made, self) 462 # only start reading when connection_made() has been called 463 self._loop.call_soon(self._loop._add_reader, 464 self._fileno, self._read_ready) 465 if waiter is not None: 466 # only wake up the waiter when connection_made() has been called 467 self._loop.call_soon(futures._set_result_unless_cancelled, 468 waiter, None) 469 470 def __repr__(self): 471 info = [self.__class__.__name__] 472 if self._pipe is None: 473 info.append('closed') 474 elif self._closing: 475 info.append('closing') 476 info.append(f'fd={self._fileno}') 477 selector = getattr(self._loop, '_selector', None) 478 if self._pipe is not None and selector is not None: 479 polling = selector_events._test_selector_event( 480 selector, self._fileno, selectors.EVENT_READ) 481 if polling: 482 info.append('polling') 483 else: 484 info.append('idle') 485 elif self._pipe is not None: 486 info.append('open') 487 else: 488 info.append('closed') 489 return '<{}>'.format(' '.join(info)) 490 491 def _read_ready(self): 492 try: 493 data = os.read(self._fileno, self.max_size) 494 except (BlockingIOError, InterruptedError): 495 pass 496 except OSError as exc: 497 self._fatal_error(exc, 'Fatal read error on pipe transport') 498 else: 499 if data: 500 self._protocol.data_received(data) 501 else: 502 if self._loop.get_debug(): 503 logger.info("%r was closed by peer", self) 504 self._closing = True 505 self._loop._remove_reader(self._fileno) 506 self._loop.call_soon(self._protocol.eof_received) 507 self._loop.call_soon(self._call_connection_lost, None) 508 509 def pause_reading(self): 510 if self._closing or self._paused: 511 return 512 self._paused = True 513 self._loop._remove_reader(self._fileno) 514 if self._loop.get_debug(): 515 logger.debug("%r pauses reading", self) 516 517 def resume_reading(self): 518 if self._closing or not self._paused: 519 return 520 self._paused = False 521 self._loop._add_reader(self._fileno, self._read_ready) 522 if self._loop.get_debug(): 523 logger.debug("%r resumes reading", self) 524 525 def set_protocol(self, protocol): 526 self._protocol = protocol 527 528 def get_protocol(self): 529 return self._protocol 530 531 def is_closing(self): 532 return self._closing 533 534 def close(self): 535 if not self._closing: 536 self._close(None) 537 538 def __del__(self, _warn=warnings.warn): 539 if self._pipe is not None: 540 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 541 self._pipe.close() 542 543 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 544 # should be called by exception handler only 545 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 546 if self._loop.get_debug(): 547 logger.debug("%r: %s", self, message, exc_info=True) 548 else: 549 self._loop.call_exception_handler({ 550 'message': message, 551 'exception': exc, 552 'transport': self, 553 'protocol': self._protocol, 554 }) 555 self._close(exc) 556 557 def _close(self, exc): 558 self._closing = True 559 self._loop._remove_reader(self._fileno) 560 self._loop.call_soon(self._call_connection_lost, exc) 561 562 def _call_connection_lost(self, exc): 563 try: 564 self._protocol.connection_lost(exc) 565 finally: 566 self._pipe.close() 567 self._pipe = None 568 self._protocol = None 569 self._loop = None 570 571 572class _UnixWritePipeTransport(transports._FlowControlMixin, 573 transports.WriteTransport): 574 575 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 576 super().__init__(extra, loop) 577 self._extra['pipe'] = pipe 578 self._pipe = pipe 579 self._fileno = pipe.fileno() 580 self._protocol = protocol 581 self._buffer = bytearray() 582 self._conn_lost = 0 583 self._closing = False # Set when close() or write_eof() called. 584 585 mode = os.fstat(self._fileno).st_mode 586 is_char = stat.S_ISCHR(mode) 587 is_fifo = stat.S_ISFIFO(mode) 588 is_socket = stat.S_ISSOCK(mode) 589 if not (is_char or is_fifo or is_socket): 590 self._pipe = None 591 self._fileno = None 592 self._protocol = None 593 raise ValueError("Pipe transport is only for " 594 "pipes, sockets and character devices") 595 596 os.set_blocking(self._fileno, False) 597 self._loop.call_soon(self._protocol.connection_made, self) 598 599 # On AIX, the reader trick (to be notified when the read end of the 600 # socket is closed) only works for sockets. On other platforms it 601 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 602 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 603 # only start reading when connection_made() has been called 604 self._loop.call_soon(self._loop._add_reader, 605 self._fileno, self._read_ready) 606 607 if waiter is not None: 608 # only wake up the waiter when connection_made() has been called 609 self._loop.call_soon(futures._set_result_unless_cancelled, 610 waiter, None) 611 612 def __repr__(self): 613 info = [self.__class__.__name__] 614 if self._pipe is None: 615 info.append('closed') 616 elif self._closing: 617 info.append('closing') 618 info.append(f'fd={self._fileno}') 619 selector = getattr(self._loop, '_selector', None) 620 if self._pipe is not None and selector is not None: 621 polling = selector_events._test_selector_event( 622 selector, self._fileno, selectors.EVENT_WRITE) 623 if polling: 624 info.append('polling') 625 else: 626 info.append('idle') 627 628 bufsize = self.get_write_buffer_size() 629 info.append(f'bufsize={bufsize}') 630 elif self._pipe is not None: 631 info.append('open') 632 else: 633 info.append('closed') 634 return '<{}>'.format(' '.join(info)) 635 636 def get_write_buffer_size(self): 637 return len(self._buffer) 638 639 def _read_ready(self): 640 # Pipe was closed by peer. 641 if self._loop.get_debug(): 642 logger.info("%r was closed by peer", self) 643 if self._buffer: 644 self._close(BrokenPipeError()) 645 else: 646 self._close() 647 648 def write(self, data): 649 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 650 if isinstance(data, bytearray): 651 data = memoryview(data) 652 if not data: 653 return 654 655 if self._conn_lost or self._closing: 656 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 657 logger.warning('pipe closed by peer or ' 658 'os.write(pipe, data) raised exception.') 659 self._conn_lost += 1 660 return 661 662 if not self._buffer: 663 # Attempt to send it right away first. 664 try: 665 n = os.write(self._fileno, data) 666 except (BlockingIOError, InterruptedError): 667 n = 0 668 except (SystemExit, KeyboardInterrupt): 669 raise 670 except BaseException as exc: 671 self._conn_lost += 1 672 self._fatal_error(exc, 'Fatal write error on pipe transport') 673 return 674 if n == len(data): 675 return 676 elif n > 0: 677 data = memoryview(data)[n:] 678 self._loop._add_writer(self._fileno, self._write_ready) 679 680 self._buffer += data 681 self._maybe_pause_protocol() 682 683 def _write_ready(self): 684 assert self._buffer, 'Data should not be empty' 685 686 try: 687 n = os.write(self._fileno, self._buffer) 688 except (BlockingIOError, InterruptedError): 689 pass 690 except (SystemExit, KeyboardInterrupt): 691 raise 692 except BaseException as exc: 693 self._buffer.clear() 694 self._conn_lost += 1 695 # Remove writer here, _fatal_error() doesn't it 696 # because _buffer is empty. 697 self._loop._remove_writer(self._fileno) 698 self._fatal_error(exc, 'Fatal write error on pipe transport') 699 else: 700 if n == len(self._buffer): 701 self._buffer.clear() 702 self._loop._remove_writer(self._fileno) 703 self._maybe_resume_protocol() # May append to buffer. 704 if self._closing: 705 self._loop._remove_reader(self._fileno) 706 self._call_connection_lost(None) 707 return 708 elif n > 0: 709 del self._buffer[:n] 710 711 def can_write_eof(self): 712 return True 713 714 def write_eof(self): 715 if self._closing: 716 return 717 assert self._pipe 718 self._closing = True 719 if not self._buffer: 720 self._loop._remove_reader(self._fileno) 721 self._loop.call_soon(self._call_connection_lost, None) 722 723 def set_protocol(self, protocol): 724 self._protocol = protocol 725 726 def get_protocol(self): 727 return self._protocol 728 729 def is_closing(self): 730 return self._closing 731 732 def close(self): 733 if self._pipe is not None and not self._closing: 734 # write_eof is all what we needed to close the write pipe 735 self.write_eof() 736 737 def __del__(self, _warn=warnings.warn): 738 if self._pipe is not None: 739 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 740 self._pipe.close() 741 742 def abort(self): 743 self._close(None) 744 745 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 746 # should be called by exception handler only 747 if isinstance(exc, OSError): 748 if self._loop.get_debug(): 749 logger.debug("%r: %s", self, message, exc_info=True) 750 else: 751 self._loop.call_exception_handler({ 752 'message': message, 753 'exception': exc, 754 'transport': self, 755 'protocol': self._protocol, 756 }) 757 self._close(exc) 758 759 def _close(self, exc=None): 760 self._closing = True 761 if self._buffer: 762 self._loop._remove_writer(self._fileno) 763 self._buffer.clear() 764 self._loop._remove_reader(self._fileno) 765 self._loop.call_soon(self._call_connection_lost, exc) 766 767 def _call_connection_lost(self, exc): 768 try: 769 self._protocol.connection_lost(exc) 770 finally: 771 self._pipe.close() 772 self._pipe = None 773 self._protocol = None 774 self._loop = None 775 776 777class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 778 779 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 780 stdin_w = None 781 if stdin == subprocess.PIPE: 782 # Use a socket pair for stdin, since not all platforms 783 # support selecting read events on the write end of a 784 # socket (which we use in order to detect closing of the 785 # other end). Notably this is needed on AIX, and works 786 # just fine on other platforms. 787 stdin, stdin_w = socket.socketpair() 788 try: 789 self._proc = subprocess.Popen( 790 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 791 universal_newlines=False, bufsize=bufsize, **kwargs) 792 if stdin_w is not None: 793 stdin.close() 794 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 795 stdin_w = None 796 finally: 797 if stdin_w is not None: 798 stdin.close() 799 stdin_w.close() 800 801 802class AbstractChildWatcher: 803 """Abstract base class for monitoring child processes. 804 805 Objects derived from this class monitor a collection of subprocesses and 806 report their termination or interruption by a signal. 807 808 New callbacks are registered with .add_child_handler(). Starting a new 809 process must be done within a 'with' block to allow the watcher to suspend 810 its activity until the new process if fully registered (this is needed to 811 prevent a race condition in some implementations). 812 813 Example: 814 with watcher: 815 proc = subprocess.Popen("sleep 1") 816 watcher.add_child_handler(proc.pid, callback) 817 818 Notes: 819 Implementations of this class must be thread-safe. 820 821 Since child watcher objects may catch the SIGCHLD signal and call 822 waitpid(-1), there should be only one active object per process. 823 """ 824 825 def add_child_handler(self, pid, callback, *args): 826 """Register a new child handler. 827 828 Arrange for callback(pid, returncode, *args) to be called when 829 process 'pid' terminates. Specifying another callback for the same 830 process replaces the previous handler. 831 832 Note: callback() must be thread-safe. 833 """ 834 raise NotImplementedError() 835 836 def remove_child_handler(self, pid): 837 """Removes the handler for process 'pid'. 838 839 The function returns True if the handler was successfully removed, 840 False if there was nothing to remove.""" 841 842 raise NotImplementedError() 843 844 def attach_loop(self, loop): 845 """Attach the watcher to an event loop. 846 847 If the watcher was previously attached to an event loop, then it is 848 first detached before attaching to the new loop. 849 850 Note: loop may be None. 851 """ 852 raise NotImplementedError() 853 854 def close(self): 855 """Close the watcher. 856 857 This must be called to make sure that any underlying resource is freed. 858 """ 859 raise NotImplementedError() 860 861 def is_active(self): 862 """Return ``True`` if the watcher is active and is used by the event loop. 863 864 Return True if the watcher is installed and ready to handle process exit 865 notifications. 866 867 """ 868 raise NotImplementedError() 869 870 def __enter__(self): 871 """Enter the watcher's context and allow starting new processes 872 873 This function must return self""" 874 raise NotImplementedError() 875 876 def __exit__(self, a, b, c): 877 """Exit the watcher's context""" 878 raise NotImplementedError() 879 880 881class PidfdChildWatcher(AbstractChildWatcher): 882 """Child watcher implementation using Linux's pid file descriptors. 883 884 This child watcher polls process file descriptors (pidfds) to await child 885 process termination. In some respects, PidfdChildWatcher is a "Goldilocks" 886 child watcher implementation. It doesn't require signals or threads, doesn't 887 interfere with any processes launched outside the event loop, and scales 888 linearly with the number of subprocesses launched by the event loop. The 889 main disadvantage is that pidfds are specific to Linux, and only work on 890 recent (5.3+) kernels. 891 """ 892 893 def __init__(self): 894 self._loop = None 895 self._callbacks = {} 896 897 def __enter__(self): 898 return self 899 900 def __exit__(self, exc_type, exc_value, exc_traceback): 901 pass 902 903 def is_active(self): 904 return self._loop is not None and self._loop.is_running() 905 906 def close(self): 907 self.attach_loop(None) 908 909 def attach_loop(self, loop): 910 if self._loop is not None and loop is None and self._callbacks: 911 warnings.warn( 912 'A loop is being detached ' 913 'from a child watcher with pending handlers', 914 RuntimeWarning) 915 for pidfd, _, _ in self._callbacks.values(): 916 self._loop._remove_reader(pidfd) 917 os.close(pidfd) 918 self._callbacks.clear() 919 self._loop = loop 920 921 def add_child_handler(self, pid, callback, *args): 922 existing = self._callbacks.get(pid) 923 if existing is not None: 924 self._callbacks[pid] = existing[0], callback, args 925 else: 926 pidfd = os.pidfd_open(pid) 927 self._loop._add_reader(pidfd, self._do_wait, pid) 928 self._callbacks[pid] = pidfd, callback, args 929 930 def _do_wait(self, pid): 931 pidfd, callback, args = self._callbacks.pop(pid) 932 self._loop._remove_reader(pidfd) 933 try: 934 _, status = os.waitpid(pid, 0) 935 except ChildProcessError: 936 # The child process is already reaped 937 # (may happen if waitpid() is called elsewhere). 938 returncode = 255 939 logger.warning( 940 "child process pid %d exit status already read: " 941 " will report returncode 255", 942 pid) 943 else: 944 returncode = _compute_returncode(status) 945 946 os.close(pidfd) 947 callback(pid, returncode, *args) 948 949 def remove_child_handler(self, pid): 950 try: 951 pidfd, _, _ = self._callbacks.pop(pid) 952 except KeyError: 953 return False 954 self._loop._remove_reader(pidfd) 955 os.close(pidfd) 956 return True 957 958 959def _compute_returncode(status): 960 if os.WIFSIGNALED(status): 961 # The child process died because of a signal. 962 return -os.WTERMSIG(status) 963 elif os.WIFEXITED(status): 964 # The child process exited (e.g sys.exit()). 965 return os.WEXITSTATUS(status) 966 else: 967 # The child exited, but we don't understand its status. 968 # This shouldn't happen, but if it does, let's just 969 # return that status; perhaps that helps debug it. 970 return status 971 972 973class BaseChildWatcher(AbstractChildWatcher): 974 975 def __init__(self): 976 self._loop = None 977 self._callbacks = {} 978 979 def close(self): 980 self.attach_loop(None) 981 982 def is_active(self): 983 return self._loop is not None and self._loop.is_running() 984 985 def _do_waitpid(self, expected_pid): 986 raise NotImplementedError() 987 988 def _do_waitpid_all(self): 989 raise NotImplementedError() 990 991 def attach_loop(self, loop): 992 assert loop is None or isinstance(loop, events.AbstractEventLoop) 993 994 if self._loop is not None and loop is None and self._callbacks: 995 warnings.warn( 996 'A loop is being detached ' 997 'from a child watcher with pending handlers', 998 RuntimeWarning) 999 1000 if self._loop is not None: 1001 self._loop.remove_signal_handler(signal.SIGCHLD) 1002 1003 self._loop = loop 1004 if loop is not None: 1005 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) 1006 1007 # Prevent a race condition in case a child terminated 1008 # during the switch. 1009 self._do_waitpid_all() 1010 1011 def _sig_chld(self): 1012 try: 1013 self._do_waitpid_all() 1014 except (SystemExit, KeyboardInterrupt): 1015 raise 1016 except BaseException as exc: 1017 # self._loop should always be available here 1018 # as '_sig_chld' is added as a signal handler 1019 # in 'attach_loop' 1020 self._loop.call_exception_handler({ 1021 'message': 'Unknown exception in SIGCHLD handler', 1022 'exception': exc, 1023 }) 1024 1025 1026class SafeChildWatcher(BaseChildWatcher): 1027 """'Safe' child watcher implementation. 1028 1029 This implementation avoids disrupting other code spawning processes by 1030 polling explicitly each process in the SIGCHLD handler instead of calling 1031 os.waitpid(-1). 1032 1033 This is a safe solution but it has a significant overhead when handling a 1034 big number of children (O(n) each time SIGCHLD is raised) 1035 """ 1036 1037 def close(self): 1038 self._callbacks.clear() 1039 super().close() 1040 1041 def __enter__(self): 1042 return self 1043 1044 def __exit__(self, a, b, c): 1045 pass 1046 1047 def add_child_handler(self, pid, callback, *args): 1048 self._callbacks[pid] = (callback, args) 1049 1050 # Prevent a race condition in case the child is already terminated. 1051 self._do_waitpid(pid) 1052 1053 def remove_child_handler(self, pid): 1054 try: 1055 del self._callbacks[pid] 1056 return True 1057 except KeyError: 1058 return False 1059 1060 def _do_waitpid_all(self): 1061 1062 for pid in list(self._callbacks): 1063 self._do_waitpid(pid) 1064 1065 def _do_waitpid(self, expected_pid): 1066 assert expected_pid > 0 1067 1068 try: 1069 pid, status = os.waitpid(expected_pid, os.WNOHANG) 1070 except ChildProcessError: 1071 # The child process is already reaped 1072 # (may happen if waitpid() is called elsewhere). 1073 pid = expected_pid 1074 returncode = 255 1075 logger.warning( 1076 "Unknown child process pid %d, will report returncode 255", 1077 pid) 1078 else: 1079 if pid == 0: 1080 # The child process is still alive. 1081 return 1082 1083 returncode = _compute_returncode(status) 1084 if self._loop.get_debug(): 1085 logger.debug('process %s exited with returncode %s', 1086 expected_pid, returncode) 1087 1088 try: 1089 callback, args = self._callbacks.pop(pid) 1090 except KeyError: # pragma: no cover 1091 # May happen if .remove_child_handler() is called 1092 # after os.waitpid() returns. 1093 if self._loop.get_debug(): 1094 logger.warning("Child watcher got an unexpected pid: %r", 1095 pid, exc_info=True) 1096 else: 1097 callback(pid, returncode, *args) 1098 1099 1100class FastChildWatcher(BaseChildWatcher): 1101 """'Fast' child watcher implementation. 1102 1103 This implementation reaps every terminated processes by calling 1104 os.waitpid(-1) directly, possibly breaking other code spawning processes 1105 and waiting for their termination. 1106 1107 There is no noticeable overhead when handling a big number of children 1108 (O(1) each time a child terminates). 1109 """ 1110 def __init__(self): 1111 super().__init__() 1112 self._lock = threading.Lock() 1113 self._zombies = {} 1114 self._forks = 0 1115 1116 def close(self): 1117 self._callbacks.clear() 1118 self._zombies.clear() 1119 super().close() 1120 1121 def __enter__(self): 1122 with self._lock: 1123 self._forks += 1 1124 1125 return self 1126 1127 def __exit__(self, a, b, c): 1128 with self._lock: 1129 self._forks -= 1 1130 1131 if self._forks or not self._zombies: 1132 return 1133 1134 collateral_victims = str(self._zombies) 1135 self._zombies.clear() 1136 1137 logger.warning( 1138 "Caught subprocesses termination from unknown pids: %s", 1139 collateral_victims) 1140 1141 def add_child_handler(self, pid, callback, *args): 1142 assert self._forks, "Must use the context manager" 1143 1144 with self._lock: 1145 try: 1146 returncode = self._zombies.pop(pid) 1147 except KeyError: 1148 # The child is running. 1149 self._callbacks[pid] = callback, args 1150 return 1151 1152 # The child is dead already. We can fire the callback. 1153 callback(pid, returncode, *args) 1154 1155 def remove_child_handler(self, pid): 1156 try: 1157 del self._callbacks[pid] 1158 return True 1159 except KeyError: 1160 return False 1161 1162 def _do_waitpid_all(self): 1163 # Because of signal coalescing, we must keep calling waitpid() as 1164 # long as we're able to reap a child. 1165 while True: 1166 try: 1167 pid, status = os.waitpid(-1, os.WNOHANG) 1168 except ChildProcessError: 1169 # No more child processes exist. 1170 return 1171 else: 1172 if pid == 0: 1173 # A child process is still alive. 1174 return 1175 1176 returncode = _compute_returncode(status) 1177 1178 with self._lock: 1179 try: 1180 callback, args = self._callbacks.pop(pid) 1181 except KeyError: 1182 # unknown child 1183 if self._forks: 1184 # It may not be registered yet. 1185 self._zombies[pid] = returncode 1186 if self._loop.get_debug(): 1187 logger.debug('unknown process %s exited ' 1188 'with returncode %s', 1189 pid, returncode) 1190 continue 1191 callback = None 1192 else: 1193 if self._loop.get_debug(): 1194 logger.debug('process %s exited with returncode %s', 1195 pid, returncode) 1196 1197 if callback is None: 1198 logger.warning( 1199 "Caught subprocess termination from unknown pid: " 1200 "%d -> %d", pid, returncode) 1201 else: 1202 callback(pid, returncode, *args) 1203 1204 1205class MultiLoopChildWatcher(AbstractChildWatcher): 1206 """A watcher that doesn't require running loop in the main thread. 1207 1208 This implementation registers a SIGCHLD signal handler on 1209 instantiation (which may conflict with other code that 1210 install own handler for this signal). 1211 1212 The solution is safe but it has a significant overhead when 1213 handling a big number of processes (*O(n)* each time a 1214 SIGCHLD is received). 1215 """ 1216 1217 # Implementation note: 1218 # The class keeps compatibility with AbstractChildWatcher ABC 1219 # To achieve this it has empty attach_loop() method 1220 # and doesn't accept explicit loop argument 1221 # for add_child_handler()/remove_child_handler() 1222 # but retrieves the current loop by get_running_loop() 1223 1224 def __init__(self): 1225 self._callbacks = {} 1226 self._saved_sighandler = None 1227 1228 def is_active(self): 1229 return self._saved_sighandler is not None 1230 1231 def close(self): 1232 self._callbacks.clear() 1233 if self._saved_sighandler is None: 1234 return 1235 1236 handler = signal.getsignal(signal.SIGCHLD) 1237 if handler != self._sig_chld: 1238 logger.warning("SIGCHLD handler was changed by outside code") 1239 else: 1240 signal.signal(signal.SIGCHLD, self._saved_sighandler) 1241 self._saved_sighandler = None 1242 1243 def __enter__(self): 1244 return self 1245 1246 def __exit__(self, exc_type, exc_val, exc_tb): 1247 pass 1248 1249 def add_child_handler(self, pid, callback, *args): 1250 loop = events.get_running_loop() 1251 self._callbacks[pid] = (loop, callback, args) 1252 1253 # Prevent a race condition in case the child is already terminated. 1254 self._do_waitpid(pid) 1255 1256 def remove_child_handler(self, pid): 1257 try: 1258 del self._callbacks[pid] 1259 return True 1260 except KeyError: 1261 return False 1262 1263 def attach_loop(self, loop): 1264 # Don't save the loop but initialize itself if called first time 1265 # The reason to do it here is that attach_loop() is called from 1266 # unix policy only for the main thread. 1267 # Main thread is required for subscription on SIGCHLD signal 1268 if self._saved_sighandler is not None: 1269 return 1270 1271 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) 1272 if self._saved_sighandler is None: 1273 logger.warning("Previous SIGCHLD handler was set by non-Python code, " 1274 "restore to default handler on watcher close.") 1275 self._saved_sighandler = signal.SIG_DFL 1276 1277 # Set SA_RESTART to limit EINTR occurrences. 1278 signal.siginterrupt(signal.SIGCHLD, False) 1279 1280 def _do_waitpid_all(self): 1281 for pid in list(self._callbacks): 1282 self._do_waitpid(pid) 1283 1284 def _do_waitpid(self, expected_pid): 1285 assert expected_pid > 0 1286 1287 try: 1288 pid, status = os.waitpid(expected_pid, os.WNOHANG) 1289 except ChildProcessError: 1290 # The child process is already reaped 1291 # (may happen if waitpid() is called elsewhere). 1292 pid = expected_pid 1293 returncode = 255 1294 logger.warning( 1295 "Unknown child process pid %d, will report returncode 255", 1296 pid) 1297 debug_log = False 1298 else: 1299 if pid == 0: 1300 # The child process is still alive. 1301 return 1302 1303 returncode = _compute_returncode(status) 1304 debug_log = True 1305 try: 1306 loop, callback, args = self._callbacks.pop(pid) 1307 except KeyError: # pragma: no cover 1308 # May happen if .remove_child_handler() is called 1309 # after os.waitpid() returns. 1310 logger.warning("Child watcher got an unexpected pid: %r", 1311 pid, exc_info=True) 1312 else: 1313 if loop.is_closed(): 1314 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 1315 else: 1316 if debug_log and loop.get_debug(): 1317 logger.debug('process %s exited with returncode %s', 1318 expected_pid, returncode) 1319 loop.call_soon_threadsafe(callback, pid, returncode, *args) 1320 1321 def _sig_chld(self, signum, frame): 1322 try: 1323 self._do_waitpid_all() 1324 except (SystemExit, KeyboardInterrupt): 1325 raise 1326 except BaseException: 1327 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) 1328 1329 1330class ThreadedChildWatcher(AbstractChildWatcher): 1331 """Threaded child watcher implementation. 1332 1333 The watcher uses a thread per process 1334 for waiting for the process finish. 1335 1336 It doesn't require subscription on POSIX signal 1337 but a thread creation is not free. 1338 1339 The watcher has O(1) complexity, its performance doesn't depend 1340 on amount of spawn processes. 1341 """ 1342 1343 def __init__(self): 1344 self._pid_counter = itertools.count(0) 1345 self._threads = {} 1346 1347 def is_active(self): 1348 return True 1349 1350 def close(self): 1351 self._join_threads() 1352 1353 def _join_threads(self): 1354 """Internal: Join all non-daemon threads""" 1355 threads = [thread for thread in list(self._threads.values()) 1356 if thread.is_alive() and not thread.daemon] 1357 for thread in threads: 1358 thread.join() 1359 1360 def __enter__(self): 1361 return self 1362 1363 def __exit__(self, exc_type, exc_val, exc_tb): 1364 pass 1365 1366 def __del__(self, _warn=warnings.warn): 1367 threads = [thread for thread in list(self._threads.values()) 1368 if thread.is_alive()] 1369 if threads: 1370 _warn(f"{self.__class__} has registered but not finished child processes", 1371 ResourceWarning, 1372 source=self) 1373 1374 def add_child_handler(self, pid, callback, *args): 1375 loop = events.get_running_loop() 1376 thread = threading.Thread(target=self._do_waitpid, 1377 name=f"waitpid-{next(self._pid_counter)}", 1378 args=(loop, pid, callback, args), 1379 daemon=True) 1380 self._threads[pid] = thread 1381 thread.start() 1382 1383 def remove_child_handler(self, pid): 1384 # asyncio never calls remove_child_handler() !!! 1385 # The method is no-op but is implemented because 1386 # abstract base classe requires it 1387 return True 1388 1389 def attach_loop(self, loop): 1390 pass 1391 1392 def _do_waitpid(self, loop, expected_pid, callback, args): 1393 assert expected_pid > 0 1394 1395 try: 1396 pid, status = os.waitpid(expected_pid, 0) 1397 except ChildProcessError: 1398 # The child process is already reaped 1399 # (may happen if waitpid() is called elsewhere). 1400 pid = expected_pid 1401 returncode = 255 1402 logger.warning( 1403 "Unknown child process pid %d, will report returncode 255", 1404 pid) 1405 else: 1406 returncode = _compute_returncode(status) 1407 if loop.get_debug(): 1408 logger.debug('process %s exited with returncode %s', 1409 expected_pid, returncode) 1410 1411 if loop.is_closed(): 1412 logger.warning("Loop %r that handles pid %r is closed", loop, pid) 1413 else: 1414 loop.call_soon_threadsafe(callback, pid, returncode, *args) 1415 1416 self._threads.pop(expected_pid) 1417 1418 1419class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 1420 """UNIX event loop policy with a watcher for child processes.""" 1421 _loop_factory = _UnixSelectorEventLoop 1422 1423 def __init__(self): 1424 super().__init__() 1425 self._watcher = None 1426 1427 def _init_watcher(self): 1428 with events._lock: 1429 if self._watcher is None: # pragma: no branch 1430 self._watcher = ThreadedChildWatcher() 1431 if threading.current_thread() is threading.main_thread(): 1432 self._watcher.attach_loop(self._local._loop) 1433 1434 def set_event_loop(self, loop): 1435 """Set the event loop. 1436 1437 As a side effect, if a child watcher was set before, then calling 1438 .set_event_loop() from the main thread will call .attach_loop(loop) on 1439 the child watcher. 1440 """ 1441 1442 super().set_event_loop(loop) 1443 1444 if (self._watcher is not None and 1445 threading.current_thread() is threading.main_thread()): 1446 self._watcher.attach_loop(loop) 1447 1448 def get_child_watcher(self): 1449 """Get the watcher for child processes. 1450 1451 If not yet set, a ThreadedChildWatcher object is automatically created. 1452 """ 1453 if self._watcher is None: 1454 self._init_watcher() 1455 1456 return self._watcher 1457 1458 def set_child_watcher(self, watcher): 1459 """Set the watcher for child processes.""" 1460 1461 assert watcher is None or isinstance(watcher, AbstractChildWatcher) 1462 1463 if self._watcher is not None: 1464 self._watcher.close() 1465 1466 self._watcher = watcher 1467 1468 1469SelectorEventLoop = _UnixSelectorEventLoop 1470DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 1471