• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selector event loop for Unix with signal handling."""
2
3import errno
4import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
10import threading
11import warnings
12
13
14from . import base_events
15from . import base_subprocess
16from . import compat
17from . import constants
18from . import coroutines
19from . import events
20from . import futures
21from . import selector_events
22from . import selectors
23from . import transports
24from .coroutines import coroutine
25from .log import logger
26
27
28__all__ = ['SelectorEventLoop',
29           'AbstractChildWatcher', 'SafeChildWatcher',
30           'FastChildWatcher', 'DefaultEventLoopPolicy',
31           ]
32
33if sys.platform == 'win32':  # pragma: no cover
34    raise ImportError('Signals are not really supported on Windows')
35
36
37def _sighandler_noop(signum, frame):
38    """Dummy signal handler."""
39    pass
40
41
42try:
43    _fspath = os.fspath
44except AttributeError:
45    # Python 3.5 or earlier
46    _fspath = lambda path: path
47
48
49class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
50    """Unix event loop.
51
52    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
53    """
54
55    def __init__(self, selector=None):
56        super().__init__(selector)
57        self._signal_handlers = {}
58
59    def _socketpair(self):
60        return socket.socketpair()
61
62    def close(self):
63        super().close()
64        for sig in list(self._signal_handlers):
65            self.remove_signal_handler(sig)
66
67    def _process_self_data(self, data):
68        for signum in data:
69            if not signum:
70                # ignore null bytes written by _write_to_self()
71                continue
72            self._handle_signal(signum)
73
74    def add_signal_handler(self, sig, callback, *args):
75        """Add a handler for a signal.  UNIX only.
76
77        Raise ValueError if the signal number is invalid or uncatchable.
78        Raise RuntimeError if there is a problem setting up the handler.
79        """
80        if (coroutines.iscoroutine(callback)
81        or coroutines.iscoroutinefunction(callback)):
82            raise TypeError("coroutines cannot be used "
83                            "with add_signal_handler()")
84        self._check_signal(sig)
85        self._check_closed()
86        try:
87            # set_wakeup_fd() raises ValueError if this is not the
88            # main thread.  By calling it early we ensure that an
89            # event loop running in another thread cannot add a signal
90            # handler.
91            signal.set_wakeup_fd(self._csock.fileno())
92        except (ValueError, OSError) as exc:
93            raise RuntimeError(str(exc))
94
95        handle = events.Handle(callback, args, self)
96        self._signal_handlers[sig] = handle
97
98        try:
99            # Register a dummy signal handler to ask Python to write the signal
100            # number in the wakup file descriptor. _process_self_data() will
101            # read signal numbers from this file descriptor to handle signals.
102            signal.signal(sig, _sighandler_noop)
103
104            # Set SA_RESTART to limit EINTR occurrences.
105            signal.siginterrupt(sig, False)
106        except OSError as exc:
107            del self._signal_handlers[sig]
108            if not self._signal_handlers:
109                try:
110                    signal.set_wakeup_fd(-1)
111                except (ValueError, OSError) as nexc:
112                    logger.info('set_wakeup_fd(-1) failed: %s', nexc)
113
114            if exc.errno == errno.EINVAL:
115                raise RuntimeError('sig {} cannot be caught'.format(sig))
116            else:
117                raise
118
119    def _handle_signal(self, sig):
120        """Internal helper that is the actual signal handler."""
121        handle = self._signal_handlers.get(sig)
122        if handle is None:
123            return  # Assume it's some race condition.
124        if handle._cancelled:
125            self.remove_signal_handler(sig)  # Remove it properly.
126        else:
127            self._add_callback_signalsafe(handle)
128
129    def remove_signal_handler(self, sig):
130        """Remove a handler for a signal.  UNIX only.
131
132        Return True if a signal handler was removed, False if not.
133        """
134        self._check_signal(sig)
135        try:
136            del self._signal_handlers[sig]
137        except KeyError:
138            return False
139
140        if sig == signal.SIGINT:
141            handler = signal.default_int_handler
142        else:
143            handler = signal.SIG_DFL
144
145        try:
146            signal.signal(sig, handler)
147        except OSError as exc:
148            if exc.errno == errno.EINVAL:
149                raise RuntimeError('sig {} cannot be caught'.format(sig))
150            else:
151                raise
152
153        if not self._signal_handlers:
154            try:
155                signal.set_wakeup_fd(-1)
156            except (ValueError, OSError) as exc:
157                logger.info('set_wakeup_fd(-1) failed: %s', exc)
158
159        return True
160
161    def _check_signal(self, sig):
162        """Internal helper to validate a signal.
163
164        Raise ValueError if the signal number is invalid or uncatchable.
165        Raise RuntimeError if there is a problem setting up the handler.
166        """
167        if not isinstance(sig, int):
168            raise TypeError('sig must be an int, not {!r}'.format(sig))
169
170        if not (1 <= sig < signal.NSIG):
171            raise ValueError(
172                'sig {} out of range(1, {})'.format(sig, signal.NSIG))
173
174    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
175                                  extra=None):
176        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
177
178    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
179                                   extra=None):
180        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
181
182    @coroutine
183    def _make_subprocess_transport(self, protocol, args, shell,
184                                   stdin, stdout, stderr, bufsize,
185                                   extra=None, **kwargs):
186        with events.get_child_watcher() as watcher:
187            waiter = self.create_future()
188            transp = _UnixSubprocessTransport(self, protocol, args, shell,
189                                              stdin, stdout, stderr, bufsize,
190                                              waiter=waiter, extra=extra,
191                                              **kwargs)
192
193            watcher.add_child_handler(transp.get_pid(),
194                                      self._child_watcher_callback, transp)
195            try:
196                yield from waiter
197            except Exception as exc:
198                # Workaround CPython bug #23353: using yield/yield-from in an
199                # except block of a generator doesn't clear properly
200                # sys.exc_info()
201                err = exc
202            else:
203                err = None
204
205            if err is not None:
206                transp.close()
207                yield from transp._wait()
208                raise err
209
210        return transp
211
212    def _child_watcher_callback(self, pid, returncode, transp):
213        self.call_soon_threadsafe(transp._process_exited, returncode)
214
215    @coroutine
216    def create_unix_connection(self, protocol_factory, path, *,
217                               ssl=None, sock=None,
218                               server_hostname=None):
219        assert server_hostname is None or isinstance(server_hostname, str)
220        if ssl:
221            if server_hostname is None:
222                raise ValueError(
223                    'you have to pass server_hostname when using ssl')
224        else:
225            if server_hostname is not None:
226                raise ValueError('server_hostname is only meaningful with ssl')
227
228        if path is not None:
229            if sock is not None:
230                raise ValueError(
231                    'path and sock can not be specified at the same time')
232
233            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
234            try:
235                sock.setblocking(False)
236                yield from self.sock_connect(sock, path)
237            except:
238                sock.close()
239                raise
240
241        else:
242            if sock is None:
243                raise ValueError('no path and sock were specified')
244            if (sock.family != socket.AF_UNIX or
245                    not base_events._is_stream_socket(sock)):
246                raise ValueError(
247                    'A UNIX Domain Stream Socket was expected, got {!r}'
248                    .format(sock))
249            sock.setblocking(False)
250
251        transport, protocol = yield from self._create_connection_transport(
252            sock, protocol_factory, ssl, server_hostname)
253        return transport, protocol
254
255    @coroutine
256    def create_unix_server(self, protocol_factory, path=None, *,
257                           sock=None, backlog=100, ssl=None):
258        if isinstance(ssl, bool):
259            raise TypeError('ssl argument must be an SSLContext or None')
260
261        if path is not None:
262            if sock is not None:
263                raise ValueError(
264                    'path and sock can not be specified at the same time')
265
266            path = _fspath(path)
267            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
268
269            # Check for abstract socket. `str` and `bytes` paths are supported.
270            if path[0] not in (0, '\x00'):
271                try:
272                    if stat.S_ISSOCK(os.stat(path).st_mode):
273                        os.remove(path)
274                except FileNotFoundError:
275                    pass
276                except OSError as err:
277                    # Directory may have permissions only to create socket.
278                    logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
279
280            try:
281                sock.bind(path)
282            except OSError as exc:
283                sock.close()
284                if exc.errno == errno.EADDRINUSE:
285                    # Let's improve the error message by adding
286                    # with what exact address it occurs.
287                    msg = 'Address {!r} is already in use'.format(path)
288                    raise OSError(errno.EADDRINUSE, msg) from None
289                else:
290                    raise
291            except:
292                sock.close()
293                raise
294        else:
295            if sock is None:
296                raise ValueError(
297                    'path was not specified, and no sock specified')
298
299            if (sock.family != socket.AF_UNIX or
300                    not base_events._is_stream_socket(sock)):
301                raise ValueError(
302                    'A UNIX Domain Stream Socket was expected, got {!r}'
303                    .format(sock))
304
305        server = base_events.Server(self, [sock])
306        sock.listen(backlog)
307        sock.setblocking(False)
308        self._start_serving(protocol_factory, sock, ssl, server)
309        return server
310
311
312if hasattr(os, 'set_blocking'):
313    def _set_nonblocking(fd):
314        os.set_blocking(fd, False)
315else:
316    import fcntl
317
318    def _set_nonblocking(fd):
319        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
320        flags = flags | os.O_NONBLOCK
321        fcntl.fcntl(fd, fcntl.F_SETFL, flags)
322
323
324class _UnixReadPipeTransport(transports.ReadTransport):
325
326    max_size = 256 * 1024  # max bytes we read in one event loop iteration
327
328    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
329        super().__init__(extra)
330        self._extra['pipe'] = pipe
331        self._loop = loop
332        self._pipe = pipe
333        self._fileno = pipe.fileno()
334        self._protocol = protocol
335        self._closing = False
336
337        mode = os.fstat(self._fileno).st_mode
338        if not (stat.S_ISFIFO(mode) or
339                stat.S_ISSOCK(mode) or
340                stat.S_ISCHR(mode)):
341            self._pipe = None
342            self._fileno = None
343            self._protocol = None
344            raise ValueError("Pipe transport is for pipes/sockets only.")
345
346        _set_nonblocking(self._fileno)
347
348        self._loop.call_soon(self._protocol.connection_made, self)
349        # only start reading when connection_made() has been called
350        self._loop.call_soon(self._loop._add_reader,
351                             self._fileno, self._read_ready)
352        if waiter is not None:
353            # only wake up the waiter when connection_made() has been called
354            self._loop.call_soon(futures._set_result_unless_cancelled,
355                                 waiter, None)
356
357    def __repr__(self):
358        info = [self.__class__.__name__]
359        if self._pipe is None:
360            info.append('closed')
361        elif self._closing:
362            info.append('closing')
363        info.append('fd=%s' % self._fileno)
364        selector = getattr(self._loop, '_selector', None)
365        if self._pipe is not None and selector is not None:
366            polling = selector_events._test_selector_event(
367                          selector,
368                          self._fileno, selectors.EVENT_READ)
369            if polling:
370                info.append('polling')
371            else:
372                info.append('idle')
373        elif self._pipe is not None:
374            info.append('open')
375        else:
376            info.append('closed')
377        return '<%s>' % ' '.join(info)
378
379    def _read_ready(self):
380        try:
381            data = os.read(self._fileno, self.max_size)
382        except (BlockingIOError, InterruptedError):
383            pass
384        except OSError as exc:
385            self._fatal_error(exc, 'Fatal read error on pipe transport')
386        else:
387            if data:
388                self._protocol.data_received(data)
389            else:
390                if self._loop.get_debug():
391                    logger.info("%r was closed by peer", self)
392                self._closing = True
393                self._loop._remove_reader(self._fileno)
394                self._loop.call_soon(self._protocol.eof_received)
395                self._loop.call_soon(self._call_connection_lost, None)
396
397    def pause_reading(self):
398        self._loop._remove_reader(self._fileno)
399
400    def resume_reading(self):
401        self._loop._add_reader(self._fileno, self._read_ready)
402
403    def set_protocol(self, protocol):
404        self._protocol = protocol
405
406    def get_protocol(self):
407        return self._protocol
408
409    def is_closing(self):
410        return self._closing
411
412    def close(self):
413        if not self._closing:
414            self._close(None)
415
416    # On Python 3.3 and older, objects with a destructor part of a reference
417    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
418    # to the PEP 442.
419    if compat.PY34:
420        def __del__(self):
421            if self._pipe is not None:
422                warnings.warn("unclosed transport %r" % self, ResourceWarning,
423                              source=self)
424                self._pipe.close()
425
426    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
427        # should be called by exception handler only
428        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
429            if self._loop.get_debug():
430                logger.debug("%r: %s", self, message, exc_info=True)
431        else:
432            self._loop.call_exception_handler({
433                'message': message,
434                'exception': exc,
435                'transport': self,
436                'protocol': self._protocol,
437            })
438        self._close(exc)
439
440    def _close(self, exc):
441        self._closing = True
442        self._loop._remove_reader(self._fileno)
443        self._loop.call_soon(self._call_connection_lost, exc)
444
445    def _call_connection_lost(self, exc):
446        try:
447            self._protocol.connection_lost(exc)
448        finally:
449            self._pipe.close()
450            self._pipe = None
451            self._protocol = None
452            self._loop = None
453
454
455class _UnixWritePipeTransport(transports._FlowControlMixin,
456                              transports.WriteTransport):
457
458    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
459        super().__init__(extra, loop)
460        self._extra['pipe'] = pipe
461        self._pipe = pipe
462        self._fileno = pipe.fileno()
463        self._protocol = protocol
464        self._buffer = bytearray()
465        self._conn_lost = 0
466        self._closing = False  # Set when close() or write_eof() called.
467
468        mode = os.fstat(self._fileno).st_mode
469        is_char = stat.S_ISCHR(mode)
470        is_fifo = stat.S_ISFIFO(mode)
471        is_socket = stat.S_ISSOCK(mode)
472        if not (is_char or is_fifo or is_socket):
473            self._pipe = None
474            self._fileno = None
475            self._protocol = None
476            raise ValueError("Pipe transport is only for "
477                             "pipes, sockets and character devices")
478
479        _set_nonblocking(self._fileno)
480        self._loop.call_soon(self._protocol.connection_made, self)
481
482        # On AIX, the reader trick (to be notified when the read end of the
483        # socket is closed) only works for sockets. On other platforms it
484        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
485        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
486            # only start reading when connection_made() has been called
487            self._loop.call_soon(self._loop._add_reader,
488                                 self._fileno, self._read_ready)
489
490        if waiter is not None:
491            # only wake up the waiter when connection_made() has been called
492            self._loop.call_soon(futures._set_result_unless_cancelled,
493                                 waiter, None)
494
495    def __repr__(self):
496        info = [self.__class__.__name__]
497        if self._pipe is None:
498            info.append('closed')
499        elif self._closing:
500            info.append('closing')
501        info.append('fd=%s' % self._fileno)
502        selector = getattr(self._loop, '_selector', None)
503        if self._pipe is not None and selector is not None:
504            polling = selector_events._test_selector_event(
505                          selector,
506                          self._fileno, selectors.EVENT_WRITE)
507            if polling:
508                info.append('polling')
509            else:
510                info.append('idle')
511
512            bufsize = self.get_write_buffer_size()
513            info.append('bufsize=%s' % bufsize)
514        elif self._pipe is not None:
515            info.append('open')
516        else:
517            info.append('closed')
518        return '<%s>' % ' '.join(info)
519
520    def get_write_buffer_size(self):
521        return len(self._buffer)
522
523    def _read_ready(self):
524        # Pipe was closed by peer.
525        if self._loop.get_debug():
526            logger.info("%r was closed by peer", self)
527        if self._buffer:
528            self._close(BrokenPipeError())
529        else:
530            self._close()
531
532    def write(self, data):
533        assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
534        if isinstance(data, bytearray):
535            data = memoryview(data)
536        if not data:
537            return
538
539        if self._conn_lost or self._closing:
540            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
541                logger.warning('pipe closed by peer or '
542                               'os.write(pipe, data) raised exception.')
543            self._conn_lost += 1
544            return
545
546        if not self._buffer:
547            # Attempt to send it right away first.
548            try:
549                n = os.write(self._fileno, data)
550            except (BlockingIOError, InterruptedError):
551                n = 0
552            except Exception as exc:
553                self._conn_lost += 1
554                self._fatal_error(exc, 'Fatal write error on pipe transport')
555                return
556            if n == len(data):
557                return
558            elif n > 0:
559                data = memoryview(data)[n:]
560            self._loop._add_writer(self._fileno, self._write_ready)
561
562        self._buffer += data
563        self._maybe_pause_protocol()
564
565    def _write_ready(self):
566        assert self._buffer, 'Data should not be empty'
567
568        try:
569            n = os.write(self._fileno, self._buffer)
570        except (BlockingIOError, InterruptedError):
571            pass
572        except Exception as exc:
573            self._buffer.clear()
574            self._conn_lost += 1
575            # Remove writer here, _fatal_error() doesn't it
576            # because _buffer is empty.
577            self._loop._remove_writer(self._fileno)
578            self._fatal_error(exc, 'Fatal write error on pipe transport')
579        else:
580            if n == len(self._buffer):
581                self._buffer.clear()
582                self._loop._remove_writer(self._fileno)
583                self._maybe_resume_protocol()  # May append to buffer.
584                if self._closing:
585                    self._loop._remove_reader(self._fileno)
586                    self._call_connection_lost(None)
587                return
588            elif n > 0:
589                del self._buffer[:n]
590
591    def can_write_eof(self):
592        return True
593
594    def write_eof(self):
595        if self._closing:
596            return
597        assert self._pipe
598        self._closing = True
599        if not self._buffer:
600            self._loop._remove_reader(self._fileno)
601            self._loop.call_soon(self._call_connection_lost, None)
602
603    def set_protocol(self, protocol):
604        self._protocol = protocol
605
606    def get_protocol(self):
607        return self._protocol
608
609    def is_closing(self):
610        return self._closing
611
612    def close(self):
613        if self._pipe is not None and not self._closing:
614            # write_eof is all what we needed to close the write pipe
615            self.write_eof()
616
617    # On Python 3.3 and older, objects with a destructor part of a reference
618    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
619    # to the PEP 442.
620    if compat.PY34:
621        def __del__(self):
622            if self._pipe is not None:
623                warnings.warn("unclosed transport %r" % self, ResourceWarning,
624                              source=self)
625                self._pipe.close()
626
627    def abort(self):
628        self._close(None)
629
630    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
631        # should be called by exception handler only
632        if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
633            if self._loop.get_debug():
634                logger.debug("%r: %s", self, message, exc_info=True)
635        else:
636            self._loop.call_exception_handler({
637                'message': message,
638                'exception': exc,
639                'transport': self,
640                'protocol': self._protocol,
641            })
642        self._close(exc)
643
644    def _close(self, exc=None):
645        self._closing = True
646        if self._buffer:
647            self._loop._remove_writer(self._fileno)
648        self._buffer.clear()
649        self._loop._remove_reader(self._fileno)
650        self._loop.call_soon(self._call_connection_lost, exc)
651
652    def _call_connection_lost(self, exc):
653        try:
654            self._protocol.connection_lost(exc)
655        finally:
656            self._pipe.close()
657            self._pipe = None
658            self._protocol = None
659            self._loop = None
660
661
662if hasattr(os, 'set_inheritable'):
663    # Python 3.4 and newer
664    _set_inheritable = os.set_inheritable
665else:
666    import fcntl
667
668    def _set_inheritable(fd, inheritable):
669        cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
670
671        old = fcntl.fcntl(fd, fcntl.F_GETFD)
672        if not inheritable:
673            fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
674        else:
675            fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
676
677
678class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
679
680    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
681        stdin_w = None
682        if stdin == subprocess.PIPE:
683            # Use a socket pair for stdin, since not all platforms
684            # support selecting read events on the write end of a
685            # socket (which we use in order to detect closing of the
686            # other end).  Notably this is needed on AIX, and works
687            # just fine on other platforms.
688            stdin, stdin_w = self._loop._socketpair()
689
690            # Mark the write end of the stdin pipe as non-inheritable,
691            # needed by close_fds=False on Python 3.3 and older
692            # (Python 3.4 implements the PEP 446, socketpair returns
693            # non-inheritable sockets)
694            _set_inheritable(stdin_w.fileno(), False)
695        self._proc = subprocess.Popen(
696            args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
697            universal_newlines=False, bufsize=bufsize, **kwargs)
698        if stdin_w is not None:
699            stdin.close()
700            self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
701
702
703class AbstractChildWatcher:
704    """Abstract base class for monitoring child processes.
705
706    Objects derived from this class monitor a collection of subprocesses and
707    report their termination or interruption by a signal.
708
709    New callbacks are registered with .add_child_handler(). Starting a new
710    process must be done within a 'with' block to allow the watcher to suspend
711    its activity until the new process if fully registered (this is needed to
712    prevent a race condition in some implementations).
713
714    Example:
715        with watcher:
716            proc = subprocess.Popen("sleep 1")
717            watcher.add_child_handler(proc.pid, callback)
718
719    Notes:
720        Implementations of this class must be thread-safe.
721
722        Since child watcher objects may catch the SIGCHLD signal and call
723        waitpid(-1), there should be only one active object per process.
724    """
725
726    def add_child_handler(self, pid, callback, *args):
727        """Register a new child handler.
728
729        Arrange for callback(pid, returncode, *args) to be called when
730        process 'pid' terminates. Specifying another callback for the same
731        process replaces the previous handler.
732
733        Note: callback() must be thread-safe.
734        """
735        raise NotImplementedError()
736
737    def remove_child_handler(self, pid):
738        """Removes the handler for process 'pid'.
739
740        The function returns True if the handler was successfully removed,
741        False if there was nothing to remove."""
742
743        raise NotImplementedError()
744
745    def attach_loop(self, loop):
746        """Attach the watcher to an event loop.
747
748        If the watcher was previously attached to an event loop, then it is
749        first detached before attaching to the new loop.
750
751        Note: loop may be None.
752        """
753        raise NotImplementedError()
754
755    def close(self):
756        """Close the watcher.
757
758        This must be called to make sure that any underlying resource is freed.
759        """
760        raise NotImplementedError()
761
762    def __enter__(self):
763        """Enter the watcher's context and allow starting new processes
764
765        This function must return self"""
766        raise NotImplementedError()
767
768    def __exit__(self, a, b, c):
769        """Exit the watcher's context"""
770        raise NotImplementedError()
771
772
773class BaseChildWatcher(AbstractChildWatcher):
774
775    def __init__(self):
776        self._loop = None
777        self._callbacks = {}
778
779    def close(self):
780        self.attach_loop(None)
781
782    def _do_waitpid(self, expected_pid):
783        raise NotImplementedError()
784
785    def _do_waitpid_all(self):
786        raise NotImplementedError()
787
788    def attach_loop(self, loop):
789        assert loop is None or isinstance(loop, events.AbstractEventLoop)
790
791        if self._loop is not None and loop is None and self._callbacks:
792            warnings.warn(
793                'A loop is being detached '
794                'from a child watcher with pending handlers',
795                RuntimeWarning)
796
797        if self._loop is not None:
798            self._loop.remove_signal_handler(signal.SIGCHLD)
799
800        self._loop = loop
801        if loop is not None:
802            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
803
804            # Prevent a race condition in case a child terminated
805            # during the switch.
806            self._do_waitpid_all()
807
808    def _sig_chld(self):
809        try:
810            self._do_waitpid_all()
811        except Exception as exc:
812            # self._loop should always be available here
813            # as '_sig_chld' is added as a signal handler
814            # in 'attach_loop'
815            self._loop.call_exception_handler({
816                'message': 'Unknown exception in SIGCHLD handler',
817                'exception': exc,
818            })
819
820    def _compute_returncode(self, status):
821        if os.WIFSIGNALED(status):
822            # The child process died because of a signal.
823            return -os.WTERMSIG(status)
824        elif os.WIFEXITED(status):
825            # The child process exited (e.g sys.exit()).
826            return os.WEXITSTATUS(status)
827        else:
828            # The child exited, but we don't understand its status.
829            # This shouldn't happen, but if it does, let's just
830            # return that status; perhaps that helps debug it.
831            return status
832
833
834class SafeChildWatcher(BaseChildWatcher):
835    """'Safe' child watcher implementation.
836
837    This implementation avoids disrupting other code spawning processes by
838    polling explicitly each process in the SIGCHLD handler instead of calling
839    os.waitpid(-1).
840
841    This is a safe solution but it has a significant overhead when handling a
842    big number of children (O(n) each time SIGCHLD is raised)
843    """
844
845    def close(self):
846        self._callbacks.clear()
847        super().close()
848
849    def __enter__(self):
850        return self
851
852    def __exit__(self, a, b, c):
853        pass
854
855    def add_child_handler(self, pid, callback, *args):
856        if self._loop is None:
857            raise RuntimeError(
858                "Cannot add child handler, "
859                "the child watcher does not have a loop attached")
860
861        self._callbacks[pid] = (callback, args)
862
863        # Prevent a race condition in case the child is already terminated.
864        self._do_waitpid(pid)
865
866    def remove_child_handler(self, pid):
867        try:
868            del self._callbacks[pid]
869            return True
870        except KeyError:
871            return False
872
873    def _do_waitpid_all(self):
874
875        for pid in list(self._callbacks):
876            self._do_waitpid(pid)
877
878    def _do_waitpid(self, expected_pid):
879        assert expected_pid > 0
880
881        try:
882            pid, status = os.waitpid(expected_pid, os.WNOHANG)
883        except ChildProcessError:
884            # The child process is already reaped
885            # (may happen if waitpid() is called elsewhere).
886            pid = expected_pid
887            returncode = 255
888            logger.warning(
889                "Unknown child process pid %d, will report returncode 255",
890                pid)
891        else:
892            if pid == 0:
893                # The child process is still alive.
894                return
895
896            returncode = self._compute_returncode(status)
897            if self._loop.get_debug():
898                logger.debug('process %s exited with returncode %s',
899                             expected_pid, returncode)
900
901        try:
902            callback, args = self._callbacks.pop(pid)
903        except KeyError:  # pragma: no cover
904            # May happen if .remove_child_handler() is called
905            # after os.waitpid() returns.
906            if self._loop.get_debug():
907                logger.warning("Child watcher got an unexpected pid: %r",
908                               pid, exc_info=True)
909        else:
910            callback(pid, returncode, *args)
911
912
913class FastChildWatcher(BaseChildWatcher):
914    """'Fast' child watcher implementation.
915
916    This implementation reaps every terminated processes by calling
917    os.waitpid(-1) directly, possibly breaking other code spawning processes
918    and waiting for their termination.
919
920    There is no noticeable overhead when handling a big number of children
921    (O(1) each time a child terminates).
922    """
923    def __init__(self):
924        super().__init__()
925        self._lock = threading.Lock()
926        self._zombies = {}
927        self._forks = 0
928
929    def close(self):
930        self._callbacks.clear()
931        self._zombies.clear()
932        super().close()
933
934    def __enter__(self):
935        with self._lock:
936            self._forks += 1
937
938            return self
939
940    def __exit__(self, a, b, c):
941        with self._lock:
942            self._forks -= 1
943
944            if self._forks or not self._zombies:
945                return
946
947            collateral_victims = str(self._zombies)
948            self._zombies.clear()
949
950        logger.warning(
951            "Caught subprocesses termination from unknown pids: %s",
952            collateral_victims)
953
954    def add_child_handler(self, pid, callback, *args):
955        assert self._forks, "Must use the context manager"
956
957        if self._loop is None:
958            raise RuntimeError(
959                "Cannot add child handler, "
960                "the child watcher does not have a loop attached")
961
962        with self._lock:
963            try:
964                returncode = self._zombies.pop(pid)
965            except KeyError:
966                # The child is running.
967                self._callbacks[pid] = callback, args
968                return
969
970        # The child is dead already. We can fire the callback.
971        callback(pid, returncode, *args)
972
973    def remove_child_handler(self, pid):
974        try:
975            del self._callbacks[pid]
976            return True
977        except KeyError:
978            return False
979
980    def _do_waitpid_all(self):
981        # Because of signal coalescing, we must keep calling waitpid() as
982        # long as we're able to reap a child.
983        while True:
984            try:
985                pid, status = os.waitpid(-1, os.WNOHANG)
986            except ChildProcessError:
987                # No more child processes exist.
988                return
989            else:
990                if pid == 0:
991                    # A child process is still alive.
992                    return
993
994                returncode = self._compute_returncode(status)
995
996            with self._lock:
997                try:
998                    callback, args = self._callbacks.pop(pid)
999                except KeyError:
1000                    # unknown child
1001                    if self._forks:
1002                        # It may not be registered yet.
1003                        self._zombies[pid] = returncode
1004                        if self._loop.get_debug():
1005                            logger.debug('unknown process %s exited '
1006                                         'with returncode %s',
1007                                         pid, returncode)
1008                        continue
1009                    callback = None
1010                else:
1011                    if self._loop.get_debug():
1012                        logger.debug('process %s exited with returncode %s',
1013                                     pid, returncode)
1014
1015            if callback is None:
1016                logger.warning(
1017                    "Caught subprocess termination from unknown pid: "
1018                    "%d -> %d", pid, returncode)
1019            else:
1020                callback(pid, returncode, *args)
1021
1022
1023class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1024    """UNIX event loop policy with a watcher for child processes."""
1025    _loop_factory = _UnixSelectorEventLoop
1026
1027    def __init__(self):
1028        super().__init__()
1029        self._watcher = None
1030
1031    def _init_watcher(self):
1032        with events._lock:
1033            if self._watcher is None:  # pragma: no branch
1034                self._watcher = SafeChildWatcher()
1035                if isinstance(threading.current_thread(),
1036                              threading._MainThread):
1037                    self._watcher.attach_loop(self._local._loop)
1038
1039    def set_event_loop(self, loop):
1040        """Set the event loop.
1041
1042        As a side effect, if a child watcher was set before, then calling
1043        .set_event_loop() from the main thread will call .attach_loop(loop) on
1044        the child watcher.
1045        """
1046
1047        super().set_event_loop(loop)
1048
1049        if self._watcher is not None and \
1050            isinstance(threading.current_thread(), threading._MainThread):
1051            self._watcher.attach_loop(loop)
1052
1053    def get_child_watcher(self):
1054        """Get the watcher for child processes.
1055
1056        If not yet set, a SafeChildWatcher object is automatically created.
1057        """
1058        if self._watcher is None:
1059            self._init_watcher()
1060
1061        return self._watcher
1062
1063    def set_child_watcher(self, watcher):
1064        """Set the watcher for child processes."""
1065
1066        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1067
1068        if self._watcher is not None:
1069            self._watcher.close()
1070
1071        self._watcher = watcher
1072
1073SelectorEventLoop = _UnixSelectorEventLoop
1074DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1075