1"""Selector event loop for Unix with signal handling.""" 2 3import errno 4import os 5import signal 6import socket 7import stat 8import subprocess 9import sys 10import threading 11import warnings 12 13 14from . import base_events 15from . import base_subprocess 16from . import compat 17from . import constants 18from . import coroutines 19from . import events 20from . import futures 21from . import selector_events 22from . import selectors 23from . import transports 24from .coroutines import coroutine 25from .log import logger 26 27 28__all__ = ['SelectorEventLoop', 29 'AbstractChildWatcher', 'SafeChildWatcher', 30 'FastChildWatcher', 'DefaultEventLoopPolicy', 31 ] 32 33if sys.platform == 'win32': # pragma: no cover 34 raise ImportError('Signals are not really supported on Windows') 35 36 37def _sighandler_noop(signum, frame): 38 """Dummy signal handler.""" 39 pass 40 41 42try: 43 _fspath = os.fspath 44except AttributeError: 45 # Python 3.5 or earlier 46 _fspath = lambda path: path 47 48 49class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): 50 """Unix event loop. 51 52 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop. 53 """ 54 55 def __init__(self, selector=None): 56 super().__init__(selector) 57 self._signal_handlers = {} 58 59 def _socketpair(self): 60 return socket.socketpair() 61 62 def close(self): 63 super().close() 64 for sig in list(self._signal_handlers): 65 self.remove_signal_handler(sig) 66 67 def _process_self_data(self, data): 68 for signum in data: 69 if not signum: 70 # ignore null bytes written by _write_to_self() 71 continue 72 self._handle_signal(signum) 73 74 def add_signal_handler(self, sig, callback, *args): 75 """Add a handler for a signal. UNIX only. 76 77 Raise ValueError if the signal number is invalid or uncatchable. 78 Raise RuntimeError if there is a problem setting up the handler. 79 """ 80 if (coroutines.iscoroutine(callback) 81 or coroutines.iscoroutinefunction(callback)): 82 raise TypeError("coroutines cannot be used " 83 "with add_signal_handler()") 84 self._check_signal(sig) 85 self._check_closed() 86 try: 87 # set_wakeup_fd() raises ValueError if this is not the 88 # main thread. By calling it early we ensure that an 89 # event loop running in another thread cannot add a signal 90 # handler. 91 signal.set_wakeup_fd(self._csock.fileno()) 92 except (ValueError, OSError) as exc: 93 raise RuntimeError(str(exc)) 94 95 handle = events.Handle(callback, args, self) 96 self._signal_handlers[sig] = handle 97 98 try: 99 # Register a dummy signal handler to ask Python to write the signal 100 # number in the wakup file descriptor. _process_self_data() will 101 # read signal numbers from this file descriptor to handle signals. 102 signal.signal(sig, _sighandler_noop) 103 104 # Set SA_RESTART to limit EINTR occurrences. 105 signal.siginterrupt(sig, False) 106 except OSError as exc: 107 del self._signal_handlers[sig] 108 if not self._signal_handlers: 109 try: 110 signal.set_wakeup_fd(-1) 111 except (ValueError, OSError) as nexc: 112 logger.info('set_wakeup_fd(-1) failed: %s', nexc) 113 114 if exc.errno == errno.EINVAL: 115 raise RuntimeError('sig {} cannot be caught'.format(sig)) 116 else: 117 raise 118 119 def _handle_signal(self, sig): 120 """Internal helper that is the actual signal handler.""" 121 handle = self._signal_handlers.get(sig) 122 if handle is None: 123 return # Assume it's some race condition. 124 if handle._cancelled: 125 self.remove_signal_handler(sig) # Remove it properly. 126 else: 127 self._add_callback_signalsafe(handle) 128 129 def remove_signal_handler(self, sig): 130 """Remove a handler for a signal. UNIX only. 131 132 Return True if a signal handler was removed, False if not. 133 """ 134 self._check_signal(sig) 135 try: 136 del self._signal_handlers[sig] 137 except KeyError: 138 return False 139 140 if sig == signal.SIGINT: 141 handler = signal.default_int_handler 142 else: 143 handler = signal.SIG_DFL 144 145 try: 146 signal.signal(sig, handler) 147 except OSError as exc: 148 if exc.errno == errno.EINVAL: 149 raise RuntimeError('sig {} cannot be caught'.format(sig)) 150 else: 151 raise 152 153 if not self._signal_handlers: 154 try: 155 signal.set_wakeup_fd(-1) 156 except (ValueError, OSError) as exc: 157 logger.info('set_wakeup_fd(-1) failed: %s', exc) 158 159 return True 160 161 def _check_signal(self, sig): 162 """Internal helper to validate a signal. 163 164 Raise ValueError if the signal number is invalid or uncatchable. 165 Raise RuntimeError if there is a problem setting up the handler. 166 """ 167 if not isinstance(sig, int): 168 raise TypeError('sig must be an int, not {!r}'.format(sig)) 169 170 if not (1 <= sig < signal.NSIG): 171 raise ValueError( 172 'sig {} out of range(1, {})'.format(sig, signal.NSIG)) 173 174 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 175 extra=None): 176 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) 177 178 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 179 extra=None): 180 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) 181 182 @coroutine 183 def _make_subprocess_transport(self, protocol, args, shell, 184 stdin, stdout, stderr, bufsize, 185 extra=None, **kwargs): 186 with events.get_child_watcher() as watcher: 187 waiter = self.create_future() 188 transp = _UnixSubprocessTransport(self, protocol, args, shell, 189 stdin, stdout, stderr, bufsize, 190 waiter=waiter, extra=extra, 191 **kwargs) 192 193 watcher.add_child_handler(transp.get_pid(), 194 self._child_watcher_callback, transp) 195 try: 196 yield from waiter 197 except Exception as exc: 198 # Workaround CPython bug #23353: using yield/yield-from in an 199 # except block of a generator doesn't clear properly 200 # sys.exc_info() 201 err = exc 202 else: 203 err = None 204 205 if err is not None: 206 transp.close() 207 yield from transp._wait() 208 raise err 209 210 return transp 211 212 def _child_watcher_callback(self, pid, returncode, transp): 213 self.call_soon_threadsafe(transp._process_exited, returncode) 214 215 @coroutine 216 def create_unix_connection(self, protocol_factory, path, *, 217 ssl=None, sock=None, 218 server_hostname=None): 219 assert server_hostname is None or isinstance(server_hostname, str) 220 if ssl: 221 if server_hostname is None: 222 raise ValueError( 223 'you have to pass server_hostname when using ssl') 224 else: 225 if server_hostname is not None: 226 raise ValueError('server_hostname is only meaningful with ssl') 227 228 if path is not None: 229 if sock is not None: 230 raise ValueError( 231 'path and sock can not be specified at the same time') 232 233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 234 try: 235 sock.setblocking(False) 236 yield from self.sock_connect(sock, path) 237 except: 238 sock.close() 239 raise 240 241 else: 242 if sock is None: 243 raise ValueError('no path and sock were specified') 244 if (sock.family != socket.AF_UNIX or 245 not base_events._is_stream_socket(sock)): 246 raise ValueError( 247 'A UNIX Domain Stream Socket was expected, got {!r}' 248 .format(sock)) 249 sock.setblocking(False) 250 251 transport, protocol = yield from self._create_connection_transport( 252 sock, protocol_factory, ssl, server_hostname) 253 return transport, protocol 254 255 @coroutine 256 def create_unix_server(self, protocol_factory, path=None, *, 257 sock=None, backlog=100, ssl=None): 258 if isinstance(ssl, bool): 259 raise TypeError('ssl argument must be an SSLContext or None') 260 261 if path is not None: 262 if sock is not None: 263 raise ValueError( 264 'path and sock can not be specified at the same time') 265 266 path = _fspath(path) 267 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 268 269 # Check for abstract socket. `str` and `bytes` paths are supported. 270 if path[0] not in (0, '\x00'): 271 try: 272 if stat.S_ISSOCK(os.stat(path).st_mode): 273 os.remove(path) 274 except FileNotFoundError: 275 pass 276 except OSError as err: 277 # Directory may have permissions only to create socket. 278 logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err) 279 280 try: 281 sock.bind(path) 282 except OSError as exc: 283 sock.close() 284 if exc.errno == errno.EADDRINUSE: 285 # Let's improve the error message by adding 286 # with what exact address it occurs. 287 msg = 'Address {!r} is already in use'.format(path) 288 raise OSError(errno.EADDRINUSE, msg) from None 289 else: 290 raise 291 except: 292 sock.close() 293 raise 294 else: 295 if sock is None: 296 raise ValueError( 297 'path was not specified, and no sock specified') 298 299 if (sock.family != socket.AF_UNIX or 300 not base_events._is_stream_socket(sock)): 301 raise ValueError( 302 'A UNIX Domain Stream Socket was expected, got {!r}' 303 .format(sock)) 304 305 server = base_events.Server(self, [sock]) 306 sock.listen(backlog) 307 sock.setblocking(False) 308 self._start_serving(protocol_factory, sock, ssl, server) 309 return server 310 311 312if hasattr(os, 'set_blocking'): 313 def _set_nonblocking(fd): 314 os.set_blocking(fd, False) 315else: 316 import fcntl 317 318 def _set_nonblocking(fd): 319 flags = fcntl.fcntl(fd, fcntl.F_GETFL) 320 flags = flags | os.O_NONBLOCK 321 fcntl.fcntl(fd, fcntl.F_SETFL, flags) 322 323 324class _UnixReadPipeTransport(transports.ReadTransport): 325 326 max_size = 256 * 1024 # max bytes we read in one event loop iteration 327 328 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 329 super().__init__(extra) 330 self._extra['pipe'] = pipe 331 self._loop = loop 332 self._pipe = pipe 333 self._fileno = pipe.fileno() 334 self._protocol = protocol 335 self._closing = False 336 337 mode = os.fstat(self._fileno).st_mode 338 if not (stat.S_ISFIFO(mode) or 339 stat.S_ISSOCK(mode) or 340 stat.S_ISCHR(mode)): 341 self._pipe = None 342 self._fileno = None 343 self._protocol = None 344 raise ValueError("Pipe transport is for pipes/sockets only.") 345 346 _set_nonblocking(self._fileno) 347 348 self._loop.call_soon(self._protocol.connection_made, self) 349 # only start reading when connection_made() has been called 350 self._loop.call_soon(self._loop._add_reader, 351 self._fileno, self._read_ready) 352 if waiter is not None: 353 # only wake up the waiter when connection_made() has been called 354 self._loop.call_soon(futures._set_result_unless_cancelled, 355 waiter, None) 356 357 def __repr__(self): 358 info = [self.__class__.__name__] 359 if self._pipe is None: 360 info.append('closed') 361 elif self._closing: 362 info.append('closing') 363 info.append('fd=%s' % self._fileno) 364 selector = getattr(self._loop, '_selector', None) 365 if self._pipe is not None and selector is not None: 366 polling = selector_events._test_selector_event( 367 selector, 368 self._fileno, selectors.EVENT_READ) 369 if polling: 370 info.append('polling') 371 else: 372 info.append('idle') 373 elif self._pipe is not None: 374 info.append('open') 375 else: 376 info.append('closed') 377 return '<%s>' % ' '.join(info) 378 379 def _read_ready(self): 380 try: 381 data = os.read(self._fileno, self.max_size) 382 except (BlockingIOError, InterruptedError): 383 pass 384 except OSError as exc: 385 self._fatal_error(exc, 'Fatal read error on pipe transport') 386 else: 387 if data: 388 self._protocol.data_received(data) 389 else: 390 if self._loop.get_debug(): 391 logger.info("%r was closed by peer", self) 392 self._closing = True 393 self._loop._remove_reader(self._fileno) 394 self._loop.call_soon(self._protocol.eof_received) 395 self._loop.call_soon(self._call_connection_lost, None) 396 397 def pause_reading(self): 398 self._loop._remove_reader(self._fileno) 399 400 def resume_reading(self): 401 self._loop._add_reader(self._fileno, self._read_ready) 402 403 def set_protocol(self, protocol): 404 self._protocol = protocol 405 406 def get_protocol(self): 407 return self._protocol 408 409 def is_closing(self): 410 return self._closing 411 412 def close(self): 413 if not self._closing: 414 self._close(None) 415 416 # On Python 3.3 and older, objects with a destructor part of a reference 417 # cycle are never destroyed. It's not more the case on Python 3.4 thanks 418 # to the PEP 442. 419 if compat.PY34: 420 def __del__(self): 421 if self._pipe is not None: 422 warnings.warn("unclosed transport %r" % self, ResourceWarning, 423 source=self) 424 self._pipe.close() 425 426 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 427 # should be called by exception handler only 428 if (isinstance(exc, OSError) and exc.errno == errno.EIO): 429 if self._loop.get_debug(): 430 logger.debug("%r: %s", self, message, exc_info=True) 431 else: 432 self._loop.call_exception_handler({ 433 'message': message, 434 'exception': exc, 435 'transport': self, 436 'protocol': self._protocol, 437 }) 438 self._close(exc) 439 440 def _close(self, exc): 441 self._closing = True 442 self._loop._remove_reader(self._fileno) 443 self._loop.call_soon(self._call_connection_lost, exc) 444 445 def _call_connection_lost(self, exc): 446 try: 447 self._protocol.connection_lost(exc) 448 finally: 449 self._pipe.close() 450 self._pipe = None 451 self._protocol = None 452 self._loop = None 453 454 455class _UnixWritePipeTransport(transports._FlowControlMixin, 456 transports.WriteTransport): 457 458 def __init__(self, loop, pipe, protocol, waiter=None, extra=None): 459 super().__init__(extra, loop) 460 self._extra['pipe'] = pipe 461 self._pipe = pipe 462 self._fileno = pipe.fileno() 463 self._protocol = protocol 464 self._buffer = bytearray() 465 self._conn_lost = 0 466 self._closing = False # Set when close() or write_eof() called. 467 468 mode = os.fstat(self._fileno).st_mode 469 is_char = stat.S_ISCHR(mode) 470 is_fifo = stat.S_ISFIFO(mode) 471 is_socket = stat.S_ISSOCK(mode) 472 if not (is_char or is_fifo or is_socket): 473 self._pipe = None 474 self._fileno = None 475 self._protocol = None 476 raise ValueError("Pipe transport is only for " 477 "pipes, sockets and character devices") 478 479 _set_nonblocking(self._fileno) 480 self._loop.call_soon(self._protocol.connection_made, self) 481 482 # On AIX, the reader trick (to be notified when the read end of the 483 # socket is closed) only works for sockets. On other platforms it 484 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) 485 if is_socket or (is_fifo and not sys.platform.startswith("aix")): 486 # only start reading when connection_made() has been called 487 self._loop.call_soon(self._loop._add_reader, 488 self._fileno, self._read_ready) 489 490 if waiter is not None: 491 # only wake up the waiter when connection_made() has been called 492 self._loop.call_soon(futures._set_result_unless_cancelled, 493 waiter, None) 494 495 def __repr__(self): 496 info = [self.__class__.__name__] 497 if self._pipe is None: 498 info.append('closed') 499 elif self._closing: 500 info.append('closing') 501 info.append('fd=%s' % self._fileno) 502 selector = getattr(self._loop, '_selector', None) 503 if self._pipe is not None and selector is not None: 504 polling = selector_events._test_selector_event( 505 selector, 506 self._fileno, selectors.EVENT_WRITE) 507 if polling: 508 info.append('polling') 509 else: 510 info.append('idle') 511 512 bufsize = self.get_write_buffer_size() 513 info.append('bufsize=%s' % bufsize) 514 elif self._pipe is not None: 515 info.append('open') 516 else: 517 info.append('closed') 518 return '<%s>' % ' '.join(info) 519 520 def get_write_buffer_size(self): 521 return len(self._buffer) 522 523 def _read_ready(self): 524 # Pipe was closed by peer. 525 if self._loop.get_debug(): 526 logger.info("%r was closed by peer", self) 527 if self._buffer: 528 self._close(BrokenPipeError()) 529 else: 530 self._close() 531 532 def write(self, data): 533 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) 534 if isinstance(data, bytearray): 535 data = memoryview(data) 536 if not data: 537 return 538 539 if self._conn_lost or self._closing: 540 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 541 logger.warning('pipe closed by peer or ' 542 'os.write(pipe, data) raised exception.') 543 self._conn_lost += 1 544 return 545 546 if not self._buffer: 547 # Attempt to send it right away first. 548 try: 549 n = os.write(self._fileno, data) 550 except (BlockingIOError, InterruptedError): 551 n = 0 552 except Exception as exc: 553 self._conn_lost += 1 554 self._fatal_error(exc, 'Fatal write error on pipe transport') 555 return 556 if n == len(data): 557 return 558 elif n > 0: 559 data = memoryview(data)[n:] 560 self._loop._add_writer(self._fileno, self._write_ready) 561 562 self._buffer += data 563 self._maybe_pause_protocol() 564 565 def _write_ready(self): 566 assert self._buffer, 'Data should not be empty' 567 568 try: 569 n = os.write(self._fileno, self._buffer) 570 except (BlockingIOError, InterruptedError): 571 pass 572 except Exception as exc: 573 self._buffer.clear() 574 self._conn_lost += 1 575 # Remove writer here, _fatal_error() doesn't it 576 # because _buffer is empty. 577 self._loop._remove_writer(self._fileno) 578 self._fatal_error(exc, 'Fatal write error on pipe transport') 579 else: 580 if n == len(self._buffer): 581 self._buffer.clear() 582 self._loop._remove_writer(self._fileno) 583 self._maybe_resume_protocol() # May append to buffer. 584 if self._closing: 585 self._loop._remove_reader(self._fileno) 586 self._call_connection_lost(None) 587 return 588 elif n > 0: 589 del self._buffer[:n] 590 591 def can_write_eof(self): 592 return True 593 594 def write_eof(self): 595 if self._closing: 596 return 597 assert self._pipe 598 self._closing = True 599 if not self._buffer: 600 self._loop._remove_reader(self._fileno) 601 self._loop.call_soon(self._call_connection_lost, None) 602 603 def set_protocol(self, protocol): 604 self._protocol = protocol 605 606 def get_protocol(self): 607 return self._protocol 608 609 def is_closing(self): 610 return self._closing 611 612 def close(self): 613 if self._pipe is not None and not self._closing: 614 # write_eof is all what we needed to close the write pipe 615 self.write_eof() 616 617 # On Python 3.3 and older, objects with a destructor part of a reference 618 # cycle are never destroyed. It's not more the case on Python 3.4 thanks 619 # to the PEP 442. 620 if compat.PY34: 621 def __del__(self): 622 if self._pipe is not None: 623 warnings.warn("unclosed transport %r" % self, ResourceWarning, 624 source=self) 625 self._pipe.close() 626 627 def abort(self): 628 self._close(None) 629 630 def _fatal_error(self, exc, message='Fatal error on pipe transport'): 631 # should be called by exception handler only 632 if isinstance(exc, base_events._FATAL_ERROR_IGNORE): 633 if self._loop.get_debug(): 634 logger.debug("%r: %s", self, message, exc_info=True) 635 else: 636 self._loop.call_exception_handler({ 637 'message': message, 638 'exception': exc, 639 'transport': self, 640 'protocol': self._protocol, 641 }) 642 self._close(exc) 643 644 def _close(self, exc=None): 645 self._closing = True 646 if self._buffer: 647 self._loop._remove_writer(self._fileno) 648 self._buffer.clear() 649 self._loop._remove_reader(self._fileno) 650 self._loop.call_soon(self._call_connection_lost, exc) 651 652 def _call_connection_lost(self, exc): 653 try: 654 self._protocol.connection_lost(exc) 655 finally: 656 self._pipe.close() 657 self._pipe = None 658 self._protocol = None 659 self._loop = None 660 661 662if hasattr(os, 'set_inheritable'): 663 # Python 3.4 and newer 664 _set_inheritable = os.set_inheritable 665else: 666 import fcntl 667 668 def _set_inheritable(fd, inheritable): 669 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1) 670 671 old = fcntl.fcntl(fd, fcntl.F_GETFD) 672 if not inheritable: 673 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag) 674 else: 675 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) 676 677 678class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport): 679 680 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): 681 stdin_w = None 682 if stdin == subprocess.PIPE: 683 # Use a socket pair for stdin, since not all platforms 684 # support selecting read events on the write end of a 685 # socket (which we use in order to detect closing of the 686 # other end). Notably this is needed on AIX, and works 687 # just fine on other platforms. 688 stdin, stdin_w = self._loop._socketpair() 689 690 # Mark the write end of the stdin pipe as non-inheritable, 691 # needed by close_fds=False on Python 3.3 and older 692 # (Python 3.4 implements the PEP 446, socketpair returns 693 # non-inheritable sockets) 694 _set_inheritable(stdin_w.fileno(), False) 695 self._proc = subprocess.Popen( 696 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, 697 universal_newlines=False, bufsize=bufsize, **kwargs) 698 if stdin_w is not None: 699 stdin.close() 700 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize) 701 702 703class AbstractChildWatcher: 704 """Abstract base class for monitoring child processes. 705 706 Objects derived from this class monitor a collection of subprocesses and 707 report their termination or interruption by a signal. 708 709 New callbacks are registered with .add_child_handler(). Starting a new 710 process must be done within a 'with' block to allow the watcher to suspend 711 its activity until the new process if fully registered (this is needed to 712 prevent a race condition in some implementations). 713 714 Example: 715 with watcher: 716 proc = subprocess.Popen("sleep 1") 717 watcher.add_child_handler(proc.pid, callback) 718 719 Notes: 720 Implementations of this class must be thread-safe. 721 722 Since child watcher objects may catch the SIGCHLD signal and call 723 waitpid(-1), there should be only one active object per process. 724 """ 725 726 def add_child_handler(self, pid, callback, *args): 727 """Register a new child handler. 728 729 Arrange for callback(pid, returncode, *args) to be called when 730 process 'pid' terminates. Specifying another callback for the same 731 process replaces the previous handler. 732 733 Note: callback() must be thread-safe. 734 """ 735 raise NotImplementedError() 736 737 def remove_child_handler(self, pid): 738 """Removes the handler for process 'pid'. 739 740 The function returns True if the handler was successfully removed, 741 False if there was nothing to remove.""" 742 743 raise NotImplementedError() 744 745 def attach_loop(self, loop): 746 """Attach the watcher to an event loop. 747 748 If the watcher was previously attached to an event loop, then it is 749 first detached before attaching to the new loop. 750 751 Note: loop may be None. 752 """ 753 raise NotImplementedError() 754 755 def close(self): 756 """Close the watcher. 757 758 This must be called to make sure that any underlying resource is freed. 759 """ 760 raise NotImplementedError() 761 762 def __enter__(self): 763 """Enter the watcher's context and allow starting new processes 764 765 This function must return self""" 766 raise NotImplementedError() 767 768 def __exit__(self, a, b, c): 769 """Exit the watcher's context""" 770 raise NotImplementedError() 771 772 773class BaseChildWatcher(AbstractChildWatcher): 774 775 def __init__(self): 776 self._loop = None 777 self._callbacks = {} 778 779 def close(self): 780 self.attach_loop(None) 781 782 def _do_waitpid(self, expected_pid): 783 raise NotImplementedError() 784 785 def _do_waitpid_all(self): 786 raise NotImplementedError() 787 788 def attach_loop(self, loop): 789 assert loop is None or isinstance(loop, events.AbstractEventLoop) 790 791 if self._loop is not None and loop is None and self._callbacks: 792 warnings.warn( 793 'A loop is being detached ' 794 'from a child watcher with pending handlers', 795 RuntimeWarning) 796 797 if self._loop is not None: 798 self._loop.remove_signal_handler(signal.SIGCHLD) 799 800 self._loop = loop 801 if loop is not None: 802 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld) 803 804 # Prevent a race condition in case a child terminated 805 # during the switch. 806 self._do_waitpid_all() 807 808 def _sig_chld(self): 809 try: 810 self._do_waitpid_all() 811 except Exception as exc: 812 # self._loop should always be available here 813 # as '_sig_chld' is added as a signal handler 814 # in 'attach_loop' 815 self._loop.call_exception_handler({ 816 'message': 'Unknown exception in SIGCHLD handler', 817 'exception': exc, 818 }) 819 820 def _compute_returncode(self, status): 821 if os.WIFSIGNALED(status): 822 # The child process died because of a signal. 823 return -os.WTERMSIG(status) 824 elif os.WIFEXITED(status): 825 # The child process exited (e.g sys.exit()). 826 return os.WEXITSTATUS(status) 827 else: 828 # The child exited, but we don't understand its status. 829 # This shouldn't happen, but if it does, let's just 830 # return that status; perhaps that helps debug it. 831 return status 832 833 834class SafeChildWatcher(BaseChildWatcher): 835 """'Safe' child watcher implementation. 836 837 This implementation avoids disrupting other code spawning processes by 838 polling explicitly each process in the SIGCHLD handler instead of calling 839 os.waitpid(-1). 840 841 This is a safe solution but it has a significant overhead when handling a 842 big number of children (O(n) each time SIGCHLD is raised) 843 """ 844 845 def close(self): 846 self._callbacks.clear() 847 super().close() 848 849 def __enter__(self): 850 return self 851 852 def __exit__(self, a, b, c): 853 pass 854 855 def add_child_handler(self, pid, callback, *args): 856 if self._loop is None: 857 raise RuntimeError( 858 "Cannot add child handler, " 859 "the child watcher does not have a loop attached") 860 861 self._callbacks[pid] = (callback, args) 862 863 # Prevent a race condition in case the child is already terminated. 864 self._do_waitpid(pid) 865 866 def remove_child_handler(self, pid): 867 try: 868 del self._callbacks[pid] 869 return True 870 except KeyError: 871 return False 872 873 def _do_waitpid_all(self): 874 875 for pid in list(self._callbacks): 876 self._do_waitpid(pid) 877 878 def _do_waitpid(self, expected_pid): 879 assert expected_pid > 0 880 881 try: 882 pid, status = os.waitpid(expected_pid, os.WNOHANG) 883 except ChildProcessError: 884 # The child process is already reaped 885 # (may happen if waitpid() is called elsewhere). 886 pid = expected_pid 887 returncode = 255 888 logger.warning( 889 "Unknown child process pid %d, will report returncode 255", 890 pid) 891 else: 892 if pid == 0: 893 # The child process is still alive. 894 return 895 896 returncode = self._compute_returncode(status) 897 if self._loop.get_debug(): 898 logger.debug('process %s exited with returncode %s', 899 expected_pid, returncode) 900 901 try: 902 callback, args = self._callbacks.pop(pid) 903 except KeyError: # pragma: no cover 904 # May happen if .remove_child_handler() is called 905 # after os.waitpid() returns. 906 if self._loop.get_debug(): 907 logger.warning("Child watcher got an unexpected pid: %r", 908 pid, exc_info=True) 909 else: 910 callback(pid, returncode, *args) 911 912 913class FastChildWatcher(BaseChildWatcher): 914 """'Fast' child watcher implementation. 915 916 This implementation reaps every terminated processes by calling 917 os.waitpid(-1) directly, possibly breaking other code spawning processes 918 and waiting for their termination. 919 920 There is no noticeable overhead when handling a big number of children 921 (O(1) each time a child terminates). 922 """ 923 def __init__(self): 924 super().__init__() 925 self._lock = threading.Lock() 926 self._zombies = {} 927 self._forks = 0 928 929 def close(self): 930 self._callbacks.clear() 931 self._zombies.clear() 932 super().close() 933 934 def __enter__(self): 935 with self._lock: 936 self._forks += 1 937 938 return self 939 940 def __exit__(self, a, b, c): 941 with self._lock: 942 self._forks -= 1 943 944 if self._forks or not self._zombies: 945 return 946 947 collateral_victims = str(self._zombies) 948 self._zombies.clear() 949 950 logger.warning( 951 "Caught subprocesses termination from unknown pids: %s", 952 collateral_victims) 953 954 def add_child_handler(self, pid, callback, *args): 955 assert self._forks, "Must use the context manager" 956 957 if self._loop is None: 958 raise RuntimeError( 959 "Cannot add child handler, " 960 "the child watcher does not have a loop attached") 961 962 with self._lock: 963 try: 964 returncode = self._zombies.pop(pid) 965 except KeyError: 966 # The child is running. 967 self._callbacks[pid] = callback, args 968 return 969 970 # The child is dead already. We can fire the callback. 971 callback(pid, returncode, *args) 972 973 def remove_child_handler(self, pid): 974 try: 975 del self._callbacks[pid] 976 return True 977 except KeyError: 978 return False 979 980 def _do_waitpid_all(self): 981 # Because of signal coalescing, we must keep calling waitpid() as 982 # long as we're able to reap a child. 983 while True: 984 try: 985 pid, status = os.waitpid(-1, os.WNOHANG) 986 except ChildProcessError: 987 # No more child processes exist. 988 return 989 else: 990 if pid == 0: 991 # A child process is still alive. 992 return 993 994 returncode = self._compute_returncode(status) 995 996 with self._lock: 997 try: 998 callback, args = self._callbacks.pop(pid) 999 except KeyError: 1000 # unknown child 1001 if self._forks: 1002 # It may not be registered yet. 1003 self._zombies[pid] = returncode 1004 if self._loop.get_debug(): 1005 logger.debug('unknown process %s exited ' 1006 'with returncode %s', 1007 pid, returncode) 1008 continue 1009 callback = None 1010 else: 1011 if self._loop.get_debug(): 1012 logger.debug('process %s exited with returncode %s', 1013 pid, returncode) 1014 1015 if callback is None: 1016 logger.warning( 1017 "Caught subprocess termination from unknown pid: " 1018 "%d -> %d", pid, returncode) 1019 else: 1020 callback(pid, returncode, *args) 1021 1022 1023class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): 1024 """UNIX event loop policy with a watcher for child processes.""" 1025 _loop_factory = _UnixSelectorEventLoop 1026 1027 def __init__(self): 1028 super().__init__() 1029 self._watcher = None 1030 1031 def _init_watcher(self): 1032 with events._lock: 1033 if self._watcher is None: # pragma: no branch 1034 self._watcher = SafeChildWatcher() 1035 if isinstance(threading.current_thread(), 1036 threading._MainThread): 1037 self._watcher.attach_loop(self._local._loop) 1038 1039 def set_event_loop(self, loop): 1040 """Set the event loop. 1041 1042 As a side effect, if a child watcher was set before, then calling 1043 .set_event_loop() from the main thread will call .attach_loop(loop) on 1044 the child watcher. 1045 """ 1046 1047 super().set_event_loop(loop) 1048 1049 if self._watcher is not None and \ 1050 isinstance(threading.current_thread(), threading._MainThread): 1051 self._watcher.attach_loop(loop) 1052 1053 def get_child_watcher(self): 1054 """Get the watcher for child processes. 1055 1056 If not yet set, a SafeChildWatcher object is automatically created. 1057 """ 1058 if self._watcher is None: 1059 self._init_watcher() 1060 1061 return self._watcher 1062 1063 def set_child_watcher(self, watcher): 1064 """Set the watcher for child processes.""" 1065 1066 assert watcher is None or isinstance(watcher, AbstractChildWatcher) 1067 1068 if self._watcher is not None: 1069 self._watcher.close() 1070 1071 self._watcher = watcher 1072 1073SelectorEventLoop = _UnixSelectorEventLoop 1074DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy 1075