• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selector event loop for Unix with signal handling."""
2
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
15
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
27
28
29__all__ = (
30    'SelectorEventLoop',
31    'AbstractChildWatcher', 'SafeChildWatcher',
32    'FastChildWatcher',
33    'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34    'DefaultEventLoopPolicy',
35)
36
37
38if sys.platform == 'win32':  # pragma: no cover
39    raise ImportError('Signals are not really supported on Windows')
40
41
42def _sighandler_noop(signum, frame):
43    """Dummy signal handler."""
44    pass
45
46
47class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
48    """Unix event loop.
49
50    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
51    """
52
53    def __init__(self, selector=None):
54        super().__init__(selector)
55        self._signal_handlers = {}
56
57    def close(self):
58        super().close()
59        if not sys.is_finalizing():
60            for sig in list(self._signal_handlers):
61                self.remove_signal_handler(sig)
62        else:
63            if self._signal_handlers:
64                warnings.warn(f"Closing the loop {self!r} "
65                              f"on interpreter shutdown "
66                              f"stage, skipping signal handlers removal",
67                              ResourceWarning,
68                              source=self)
69                self._signal_handlers.clear()
70
71    def _process_self_data(self, data):
72        for signum in data:
73            if not signum:
74                # ignore null bytes written by _write_to_self()
75                continue
76            self._handle_signal(signum)
77
78    def add_signal_handler(self, sig, callback, *args):
79        """Add a handler for a signal.  UNIX only.
80
81        Raise ValueError if the signal number is invalid or uncatchable.
82        Raise RuntimeError if there is a problem setting up the handler.
83        """
84        if (coroutines.iscoroutine(callback) or
85                coroutines.iscoroutinefunction(callback)):
86            raise TypeError("coroutines cannot be used "
87                            "with add_signal_handler()")
88        self._check_signal(sig)
89        self._check_closed()
90        try:
91            # set_wakeup_fd() raises ValueError if this is not the
92            # main thread.  By calling it early we ensure that an
93            # event loop running in another thread cannot add a signal
94            # handler.
95            signal.set_wakeup_fd(self._csock.fileno())
96        except (ValueError, OSError) as exc:
97            raise RuntimeError(str(exc))
98
99        handle = events.Handle(callback, args, self, None)
100        self._signal_handlers[sig] = handle
101
102        try:
103            # Register a dummy signal handler to ask Python to write the signal
104            # number in the wakup file descriptor. _process_self_data() will
105            # read signal numbers from this file descriptor to handle signals.
106            signal.signal(sig, _sighandler_noop)
107
108            # Set SA_RESTART to limit EINTR occurrences.
109            signal.siginterrupt(sig, False)
110        except OSError as exc:
111            del self._signal_handlers[sig]
112            if not self._signal_handlers:
113                try:
114                    signal.set_wakeup_fd(-1)
115                except (ValueError, OSError) as nexc:
116                    logger.info('set_wakeup_fd(-1) failed: %s', nexc)
117
118            if exc.errno == errno.EINVAL:
119                raise RuntimeError(f'sig {sig} cannot be caught')
120            else:
121                raise
122
123    def _handle_signal(self, sig):
124        """Internal helper that is the actual signal handler."""
125        handle = self._signal_handlers.get(sig)
126        if handle is None:
127            return  # Assume it's some race condition.
128        if handle._cancelled:
129            self.remove_signal_handler(sig)  # Remove it properly.
130        else:
131            self._add_callback_signalsafe(handle)
132
133    def remove_signal_handler(self, sig):
134        """Remove a handler for a signal.  UNIX only.
135
136        Return True if a signal handler was removed, False if not.
137        """
138        self._check_signal(sig)
139        try:
140            del self._signal_handlers[sig]
141        except KeyError:
142            return False
143
144        if sig == signal.SIGINT:
145            handler = signal.default_int_handler
146        else:
147            handler = signal.SIG_DFL
148
149        try:
150            signal.signal(sig, handler)
151        except OSError as exc:
152            if exc.errno == errno.EINVAL:
153                raise RuntimeError(f'sig {sig} cannot be caught')
154            else:
155                raise
156
157        if not self._signal_handlers:
158            try:
159                signal.set_wakeup_fd(-1)
160            except (ValueError, OSError) as exc:
161                logger.info('set_wakeup_fd(-1) failed: %s', exc)
162
163        return True
164
165    def _check_signal(self, sig):
166        """Internal helper to validate a signal.
167
168        Raise ValueError if the signal number is invalid or uncatchable.
169        Raise RuntimeError if there is a problem setting up the handler.
170        """
171        if not isinstance(sig, int):
172            raise TypeError(f'sig must be an int, not {sig!r}')
173
174        if sig not in signal.valid_signals():
175            raise ValueError(f'invalid signal number {sig}')
176
177    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
178                                  extra=None):
179        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
180
181    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
182                                   extra=None):
183        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
184
185    async def _make_subprocess_transport(self, protocol, args, shell,
186                                         stdin, stdout, stderr, bufsize,
187                                         extra=None, **kwargs):
188        with events.get_child_watcher() as watcher:
189            if not watcher.is_active():
190                # Check early.
191                # Raising exception before process creation
192                # prevents subprocess execution if the watcher
193                # is not ready to handle it.
194                raise RuntimeError("asyncio.get_child_watcher() is not activated, "
195                                   "subprocess support is not installed.")
196            waiter = self.create_future()
197            transp = _UnixSubprocessTransport(self, protocol, args, shell,
198                                              stdin, stdout, stderr, bufsize,
199                                              waiter=waiter, extra=extra,
200                                              **kwargs)
201
202            watcher.add_child_handler(transp.get_pid(),
203                                      self._child_watcher_callback, transp)
204            try:
205                await waiter
206            except (SystemExit, KeyboardInterrupt):
207                raise
208            except BaseException:
209                transp.close()
210                await transp._wait()
211                raise
212
213        return transp
214
215    def _child_watcher_callback(self, pid, returncode, transp):
216        self.call_soon_threadsafe(transp._process_exited, returncode)
217
218    async def create_unix_connection(
219            self, protocol_factory, path=None, *,
220            ssl=None, sock=None,
221            server_hostname=None,
222            ssl_handshake_timeout=None):
223        assert server_hostname is None or isinstance(server_hostname, str)
224        if ssl:
225            if server_hostname is None:
226                raise ValueError(
227                    'you have to pass server_hostname when using ssl')
228        else:
229            if server_hostname is not None:
230                raise ValueError('server_hostname is only meaningful with ssl')
231            if ssl_handshake_timeout is not None:
232                raise ValueError(
233                    'ssl_handshake_timeout is only meaningful with ssl')
234
235        if path is not None:
236            if sock is not None:
237                raise ValueError(
238                    'path and sock can not be specified at the same time')
239
240            path = os.fspath(path)
241            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
242            try:
243                sock.setblocking(False)
244                await self.sock_connect(sock, path)
245            except:
246                sock.close()
247                raise
248
249        else:
250            if sock is None:
251                raise ValueError('no path and sock were specified')
252            if (sock.family != socket.AF_UNIX or
253                    sock.type != socket.SOCK_STREAM):
254                raise ValueError(
255                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
256            sock.setblocking(False)
257
258        transport, protocol = await self._create_connection_transport(
259            sock, protocol_factory, ssl, server_hostname,
260            ssl_handshake_timeout=ssl_handshake_timeout)
261        return transport, protocol
262
263    async def create_unix_server(
264            self, protocol_factory, path=None, *,
265            sock=None, backlog=100, ssl=None,
266            ssl_handshake_timeout=None,
267            start_serving=True):
268        if isinstance(ssl, bool):
269            raise TypeError('ssl argument must be an SSLContext or None')
270
271        if ssl_handshake_timeout is not None and not ssl:
272            raise ValueError(
273                'ssl_handshake_timeout is only meaningful with ssl')
274
275        if path is not None:
276            if sock is not None:
277                raise ValueError(
278                    'path and sock can not be specified at the same time')
279
280            path = os.fspath(path)
281            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
282
283            # Check for abstract socket. `str` and `bytes` paths are supported.
284            if path[0] not in (0, '\x00'):
285                try:
286                    if stat.S_ISSOCK(os.stat(path).st_mode):
287                        os.remove(path)
288                except FileNotFoundError:
289                    pass
290                except OSError as err:
291                    # Directory may have permissions only to create socket.
292                    logger.error('Unable to check or remove stale UNIX socket '
293                                 '%r: %r', path, err)
294
295            try:
296                sock.bind(path)
297            except OSError as exc:
298                sock.close()
299                if exc.errno == errno.EADDRINUSE:
300                    # Let's improve the error message by adding
301                    # with what exact address it occurs.
302                    msg = f'Address {path!r} is already in use'
303                    raise OSError(errno.EADDRINUSE, msg) from None
304                else:
305                    raise
306            except:
307                sock.close()
308                raise
309        else:
310            if sock is None:
311                raise ValueError(
312                    'path was not specified, and no sock specified')
313
314            if (sock.family != socket.AF_UNIX or
315                    sock.type != socket.SOCK_STREAM):
316                raise ValueError(
317                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
318
319        sock.setblocking(False)
320        server = base_events.Server(self, [sock], protocol_factory,
321                                    ssl, backlog, ssl_handshake_timeout)
322        if start_serving:
323            server._start_serving()
324            # Skip one loop iteration so that all 'loop.add_reader'
325            # go through.
326            await tasks.sleep(0, loop=self)
327
328        return server
329
330    async def _sock_sendfile_native(self, sock, file, offset, count):
331        try:
332            os.sendfile
333        except AttributeError as exc:
334            raise exceptions.SendfileNotAvailableError(
335                "os.sendfile() is not available")
336        try:
337            fileno = file.fileno()
338        except (AttributeError, io.UnsupportedOperation) as err:
339            raise exceptions.SendfileNotAvailableError("not a regular file")
340        try:
341            fsize = os.fstat(fileno).st_size
342        except OSError as err:
343            raise exceptions.SendfileNotAvailableError("not a regular file")
344        blocksize = count if count else fsize
345        if not blocksize:
346            return 0  # empty file
347
348        fut = self.create_future()
349        self._sock_sendfile_native_impl(fut, None, sock, fileno,
350                                        offset, count, blocksize, 0)
351        return await fut
352
353    def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
354                                   offset, count, blocksize, total_sent):
355        fd = sock.fileno()
356        if registered_fd is not None:
357            # Remove the callback early.  It should be rare that the
358            # selector says the fd is ready but the call still returns
359            # EAGAIN, and I am willing to take a hit in that case in
360            # order to simplify the common case.
361            self.remove_writer(registered_fd)
362        if fut.cancelled():
363            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
364            return
365        if count:
366            blocksize = count - total_sent
367            if blocksize <= 0:
368                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
369                fut.set_result(total_sent)
370                return
371
372        try:
373            sent = os.sendfile(fd, fileno, offset, blocksize)
374        except (BlockingIOError, InterruptedError):
375            if registered_fd is None:
376                self._sock_add_cancellation_callback(fut, sock)
377            self.add_writer(fd, self._sock_sendfile_native_impl, fut,
378                            fd, sock, fileno,
379                            offset, count, blocksize, total_sent)
380        except OSError as exc:
381            if (registered_fd is not None and
382                    exc.errno == errno.ENOTCONN and
383                    type(exc) is not ConnectionError):
384                # If we have an ENOTCONN and this isn't a first call to
385                # sendfile(), i.e. the connection was closed in the middle
386                # of the operation, normalize the error to ConnectionError
387                # to make it consistent across all Posix systems.
388                new_exc = ConnectionError(
389                    "socket is not connected", errno.ENOTCONN)
390                new_exc.__cause__ = exc
391                exc = new_exc
392            if total_sent == 0:
393                # We can get here for different reasons, the main
394                # one being 'file' is not a regular mmap(2)-like
395                # file, in which case we'll fall back on using
396                # plain send().
397                err = exceptions.SendfileNotAvailableError(
398                    "os.sendfile call failed")
399                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400                fut.set_exception(err)
401            else:
402                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
403                fut.set_exception(exc)
404        except (SystemExit, KeyboardInterrupt):
405            raise
406        except BaseException as exc:
407            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
408            fut.set_exception(exc)
409        else:
410            if sent == 0:
411                # EOF
412                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413                fut.set_result(total_sent)
414            else:
415                offset += sent
416                total_sent += sent
417                if registered_fd is None:
418                    self._sock_add_cancellation_callback(fut, sock)
419                self.add_writer(fd, self._sock_sendfile_native_impl, fut,
420                                fd, sock, fileno,
421                                offset, count, blocksize, total_sent)
422
423    def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
424        if total_sent > 0:
425            os.lseek(fileno, offset, os.SEEK_SET)
426
427    def _sock_add_cancellation_callback(self, fut, sock):
428        def cb(fut):
429            if fut.cancelled():
430                fd = sock.fileno()
431                if fd != -1:
432                    self.remove_writer(fd)
433        fut.add_done_callback(cb)
434
435
436class _UnixReadPipeTransport(transports.ReadTransport):
437
438    max_size = 256 * 1024  # max bytes we read in one event loop iteration
439
440    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
441        super().__init__(extra)
442        self._extra['pipe'] = pipe
443        self._loop = loop
444        self._pipe = pipe
445        self._fileno = pipe.fileno()
446        self._protocol = protocol
447        self._closing = False
448        self._paused = False
449
450        mode = os.fstat(self._fileno).st_mode
451        if not (stat.S_ISFIFO(mode) or
452                stat.S_ISSOCK(mode) or
453                stat.S_ISCHR(mode)):
454            self._pipe = None
455            self._fileno = None
456            self._protocol = None
457            raise ValueError("Pipe transport is for pipes/sockets only.")
458
459        os.set_blocking(self._fileno, False)
460
461        self._loop.call_soon(self._protocol.connection_made, self)
462        # only start reading when connection_made() has been called
463        self._loop.call_soon(self._loop._add_reader,
464                             self._fileno, self._read_ready)
465        if waiter is not None:
466            # only wake up the waiter when connection_made() has been called
467            self._loop.call_soon(futures._set_result_unless_cancelled,
468                                 waiter, None)
469
470    def __repr__(self):
471        info = [self.__class__.__name__]
472        if self._pipe is None:
473            info.append('closed')
474        elif self._closing:
475            info.append('closing')
476        info.append(f'fd={self._fileno}')
477        selector = getattr(self._loop, '_selector', None)
478        if self._pipe is not None and selector is not None:
479            polling = selector_events._test_selector_event(
480                selector, self._fileno, selectors.EVENT_READ)
481            if polling:
482                info.append('polling')
483            else:
484                info.append('idle')
485        elif self._pipe is not None:
486            info.append('open')
487        else:
488            info.append('closed')
489        return '<{}>'.format(' '.join(info))
490
491    def _read_ready(self):
492        try:
493            data = os.read(self._fileno, self.max_size)
494        except (BlockingIOError, InterruptedError):
495            pass
496        except OSError as exc:
497            self._fatal_error(exc, 'Fatal read error on pipe transport')
498        else:
499            if data:
500                self._protocol.data_received(data)
501            else:
502                if self._loop.get_debug():
503                    logger.info("%r was closed by peer", self)
504                self._closing = True
505                self._loop._remove_reader(self._fileno)
506                self._loop.call_soon(self._protocol.eof_received)
507                self._loop.call_soon(self._call_connection_lost, None)
508
509    def pause_reading(self):
510        if self._closing or self._paused:
511            return
512        self._paused = True
513        self._loop._remove_reader(self._fileno)
514        if self._loop.get_debug():
515            logger.debug("%r pauses reading", self)
516
517    def resume_reading(self):
518        if self._closing or not self._paused:
519            return
520        self._paused = False
521        self._loop._add_reader(self._fileno, self._read_ready)
522        if self._loop.get_debug():
523            logger.debug("%r resumes reading", self)
524
525    def set_protocol(self, protocol):
526        self._protocol = protocol
527
528    def get_protocol(self):
529        return self._protocol
530
531    def is_closing(self):
532        return self._closing
533
534    def close(self):
535        if not self._closing:
536            self._close(None)
537
538    def __del__(self, _warn=warnings.warn):
539        if self._pipe is not None:
540            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
541            self._pipe.close()
542
543    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
544        # should be called by exception handler only
545        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
546            if self._loop.get_debug():
547                logger.debug("%r: %s", self, message, exc_info=True)
548        else:
549            self._loop.call_exception_handler({
550                'message': message,
551                'exception': exc,
552                'transport': self,
553                'protocol': self._protocol,
554            })
555        self._close(exc)
556
557    def _close(self, exc):
558        self._closing = True
559        self._loop._remove_reader(self._fileno)
560        self._loop.call_soon(self._call_connection_lost, exc)
561
562    def _call_connection_lost(self, exc):
563        try:
564            self._protocol.connection_lost(exc)
565        finally:
566            self._pipe.close()
567            self._pipe = None
568            self._protocol = None
569            self._loop = None
570
571
572class _UnixWritePipeTransport(transports._FlowControlMixin,
573                              transports.WriteTransport):
574
575    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
576        super().__init__(extra, loop)
577        self._extra['pipe'] = pipe
578        self._pipe = pipe
579        self._fileno = pipe.fileno()
580        self._protocol = protocol
581        self._buffer = bytearray()
582        self._conn_lost = 0
583        self._closing = False  # Set when close() or write_eof() called.
584
585        mode = os.fstat(self._fileno).st_mode
586        is_char = stat.S_ISCHR(mode)
587        is_fifo = stat.S_ISFIFO(mode)
588        is_socket = stat.S_ISSOCK(mode)
589        if not (is_char or is_fifo or is_socket):
590            self._pipe = None
591            self._fileno = None
592            self._protocol = None
593            raise ValueError("Pipe transport is only for "
594                             "pipes, sockets and character devices")
595
596        os.set_blocking(self._fileno, False)
597        self._loop.call_soon(self._protocol.connection_made, self)
598
599        # On AIX, the reader trick (to be notified when the read end of the
600        # socket is closed) only works for sockets. On other platforms it
601        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
602        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
603            # only start reading when connection_made() has been called
604            self._loop.call_soon(self._loop._add_reader,
605                                 self._fileno, self._read_ready)
606
607        if waiter is not None:
608            # only wake up the waiter when connection_made() has been called
609            self._loop.call_soon(futures._set_result_unless_cancelled,
610                                 waiter, None)
611
612    def __repr__(self):
613        info = [self.__class__.__name__]
614        if self._pipe is None:
615            info.append('closed')
616        elif self._closing:
617            info.append('closing')
618        info.append(f'fd={self._fileno}')
619        selector = getattr(self._loop, '_selector', None)
620        if self._pipe is not None and selector is not None:
621            polling = selector_events._test_selector_event(
622                selector, self._fileno, selectors.EVENT_WRITE)
623            if polling:
624                info.append('polling')
625            else:
626                info.append('idle')
627
628            bufsize = self.get_write_buffer_size()
629            info.append(f'bufsize={bufsize}')
630        elif self._pipe is not None:
631            info.append('open')
632        else:
633            info.append('closed')
634        return '<{}>'.format(' '.join(info))
635
636    def get_write_buffer_size(self):
637        return len(self._buffer)
638
639    def _read_ready(self):
640        # Pipe was closed by peer.
641        if self._loop.get_debug():
642            logger.info("%r was closed by peer", self)
643        if self._buffer:
644            self._close(BrokenPipeError())
645        else:
646            self._close()
647
648    def write(self, data):
649        assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
650        if isinstance(data, bytearray):
651            data = memoryview(data)
652        if not data:
653            return
654
655        if self._conn_lost or self._closing:
656            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
657                logger.warning('pipe closed by peer or '
658                               'os.write(pipe, data) raised exception.')
659            self._conn_lost += 1
660            return
661
662        if not self._buffer:
663            # Attempt to send it right away first.
664            try:
665                n = os.write(self._fileno, data)
666            except (BlockingIOError, InterruptedError):
667                n = 0
668            except (SystemExit, KeyboardInterrupt):
669                raise
670            except BaseException as exc:
671                self._conn_lost += 1
672                self._fatal_error(exc, 'Fatal write error on pipe transport')
673                return
674            if n == len(data):
675                return
676            elif n > 0:
677                data = memoryview(data)[n:]
678            self._loop._add_writer(self._fileno, self._write_ready)
679
680        self._buffer += data
681        self._maybe_pause_protocol()
682
683    def _write_ready(self):
684        assert self._buffer, 'Data should not be empty'
685
686        try:
687            n = os.write(self._fileno, self._buffer)
688        except (BlockingIOError, InterruptedError):
689            pass
690        except (SystemExit, KeyboardInterrupt):
691            raise
692        except BaseException as exc:
693            self._buffer.clear()
694            self._conn_lost += 1
695            # Remove writer here, _fatal_error() doesn't it
696            # because _buffer is empty.
697            self._loop._remove_writer(self._fileno)
698            self._fatal_error(exc, 'Fatal write error on pipe transport')
699        else:
700            if n == len(self._buffer):
701                self._buffer.clear()
702                self._loop._remove_writer(self._fileno)
703                self._maybe_resume_protocol()  # May append to buffer.
704                if self._closing:
705                    self._loop._remove_reader(self._fileno)
706                    self._call_connection_lost(None)
707                return
708            elif n > 0:
709                del self._buffer[:n]
710
711    def can_write_eof(self):
712        return True
713
714    def write_eof(self):
715        if self._closing:
716            return
717        assert self._pipe
718        self._closing = True
719        if not self._buffer:
720            self._loop._remove_reader(self._fileno)
721            self._loop.call_soon(self._call_connection_lost, None)
722
723    def set_protocol(self, protocol):
724        self._protocol = protocol
725
726    def get_protocol(self):
727        return self._protocol
728
729    def is_closing(self):
730        return self._closing
731
732    def close(self):
733        if self._pipe is not None and not self._closing:
734            # write_eof is all what we needed to close the write pipe
735            self.write_eof()
736
737    def __del__(self, _warn=warnings.warn):
738        if self._pipe is not None:
739            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
740            self._pipe.close()
741
742    def abort(self):
743        self._close(None)
744
745    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
746        # should be called by exception handler only
747        if isinstance(exc, OSError):
748            if self._loop.get_debug():
749                logger.debug("%r: %s", self, message, exc_info=True)
750        else:
751            self._loop.call_exception_handler({
752                'message': message,
753                'exception': exc,
754                'transport': self,
755                'protocol': self._protocol,
756            })
757        self._close(exc)
758
759    def _close(self, exc=None):
760        self._closing = True
761        if self._buffer:
762            self._loop._remove_writer(self._fileno)
763        self._buffer.clear()
764        self._loop._remove_reader(self._fileno)
765        self._loop.call_soon(self._call_connection_lost, exc)
766
767    def _call_connection_lost(self, exc):
768        try:
769            self._protocol.connection_lost(exc)
770        finally:
771            self._pipe.close()
772            self._pipe = None
773            self._protocol = None
774            self._loop = None
775
776
777class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
778
779    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
780        stdin_w = None
781        if stdin == subprocess.PIPE:
782            # Use a socket pair for stdin, since not all platforms
783            # support selecting read events on the write end of a
784            # socket (which we use in order to detect closing of the
785            # other end).  Notably this is needed on AIX, and works
786            # just fine on other platforms.
787            stdin, stdin_w = socket.socketpair()
788        try:
789            self._proc = subprocess.Popen(
790                args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
791                universal_newlines=False, bufsize=bufsize, **kwargs)
792            if stdin_w is not None:
793                stdin.close()
794                self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
795                stdin_w = None
796        finally:
797            if stdin_w is not None:
798                stdin.close()
799                stdin_w.close()
800
801
802class AbstractChildWatcher:
803    """Abstract base class for monitoring child processes.
804
805    Objects derived from this class monitor a collection of subprocesses and
806    report their termination or interruption by a signal.
807
808    New callbacks are registered with .add_child_handler(). Starting a new
809    process must be done within a 'with' block to allow the watcher to suspend
810    its activity until the new process if fully registered (this is needed to
811    prevent a race condition in some implementations).
812
813    Example:
814        with watcher:
815            proc = subprocess.Popen("sleep 1")
816            watcher.add_child_handler(proc.pid, callback)
817
818    Notes:
819        Implementations of this class must be thread-safe.
820
821        Since child watcher objects may catch the SIGCHLD signal and call
822        waitpid(-1), there should be only one active object per process.
823    """
824
825    def add_child_handler(self, pid, callback, *args):
826        """Register a new child handler.
827
828        Arrange for callback(pid, returncode, *args) to be called when
829        process 'pid' terminates. Specifying another callback for the same
830        process replaces the previous handler.
831
832        Note: callback() must be thread-safe.
833        """
834        raise NotImplementedError()
835
836    def remove_child_handler(self, pid):
837        """Removes the handler for process 'pid'.
838
839        The function returns True if the handler was successfully removed,
840        False if there was nothing to remove."""
841
842        raise NotImplementedError()
843
844    def attach_loop(self, loop):
845        """Attach the watcher to an event loop.
846
847        If the watcher was previously attached to an event loop, then it is
848        first detached before attaching to the new loop.
849
850        Note: loop may be None.
851        """
852        raise NotImplementedError()
853
854    def close(self):
855        """Close the watcher.
856
857        This must be called to make sure that any underlying resource is freed.
858        """
859        raise NotImplementedError()
860
861    def is_active(self):
862        """Return ``True`` if the watcher is active and is used by the event loop.
863
864        Return True if the watcher is installed and ready to handle process exit
865        notifications.
866
867        """
868        raise NotImplementedError()
869
870    def __enter__(self):
871        """Enter the watcher's context and allow starting new processes
872
873        This function must return self"""
874        raise NotImplementedError()
875
876    def __exit__(self, a, b, c):
877        """Exit the watcher's context"""
878        raise NotImplementedError()
879
880
881def _compute_returncode(status):
882    if os.WIFSIGNALED(status):
883        # The child process died because of a signal.
884        return -os.WTERMSIG(status)
885    elif os.WIFEXITED(status):
886        # The child process exited (e.g sys.exit()).
887        return os.WEXITSTATUS(status)
888    else:
889        # The child exited, but we don't understand its status.
890        # This shouldn't happen, but if it does, let's just
891        # return that status; perhaps that helps debug it.
892        return status
893
894
895class BaseChildWatcher(AbstractChildWatcher):
896
897    def __init__(self):
898        self._loop = None
899        self._callbacks = {}
900
901    def close(self):
902        self.attach_loop(None)
903
904    def is_active(self):
905        return self._loop is not None and self._loop.is_running()
906
907    def _do_waitpid(self, expected_pid):
908        raise NotImplementedError()
909
910    def _do_waitpid_all(self):
911        raise NotImplementedError()
912
913    def attach_loop(self, loop):
914        assert loop is None or isinstance(loop, events.AbstractEventLoop)
915
916        if self._loop is not None and loop is None and self._callbacks:
917            warnings.warn(
918                'A loop is being detached '
919                'from a child watcher with pending handlers',
920                RuntimeWarning)
921
922        if self._loop is not None:
923            self._loop.remove_signal_handler(signal.SIGCHLD)
924
925        self._loop = loop
926        if loop is not None:
927            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
928
929            # Prevent a race condition in case a child terminated
930            # during the switch.
931            self._do_waitpid_all()
932
933    def _sig_chld(self):
934        try:
935            self._do_waitpid_all()
936        except (SystemExit, KeyboardInterrupt):
937            raise
938        except BaseException as exc:
939            # self._loop should always be available here
940            # as '_sig_chld' is added as a signal handler
941            # in 'attach_loop'
942            self._loop.call_exception_handler({
943                'message': 'Unknown exception in SIGCHLD handler',
944                'exception': exc,
945            })
946
947
948class SafeChildWatcher(BaseChildWatcher):
949    """'Safe' child watcher implementation.
950
951    This implementation avoids disrupting other code spawning processes by
952    polling explicitly each process in the SIGCHLD handler instead of calling
953    os.waitpid(-1).
954
955    This is a safe solution but it has a significant overhead when handling a
956    big number of children (O(n) each time SIGCHLD is raised)
957    """
958
959    def close(self):
960        self._callbacks.clear()
961        super().close()
962
963    def __enter__(self):
964        return self
965
966    def __exit__(self, a, b, c):
967        pass
968
969    def add_child_handler(self, pid, callback, *args):
970        self._callbacks[pid] = (callback, args)
971
972        # Prevent a race condition in case the child is already terminated.
973        self._do_waitpid(pid)
974
975    def remove_child_handler(self, pid):
976        try:
977            del self._callbacks[pid]
978            return True
979        except KeyError:
980            return False
981
982    def _do_waitpid_all(self):
983
984        for pid in list(self._callbacks):
985            self._do_waitpid(pid)
986
987    def _do_waitpid(self, expected_pid):
988        assert expected_pid > 0
989
990        try:
991            pid, status = os.waitpid(expected_pid, os.WNOHANG)
992        except ChildProcessError:
993            # The child process is already reaped
994            # (may happen if waitpid() is called elsewhere).
995            pid = expected_pid
996            returncode = 255
997            logger.warning(
998                "Unknown child process pid %d, will report returncode 255",
999                pid)
1000        else:
1001            if pid == 0:
1002                # The child process is still alive.
1003                return
1004
1005            returncode = _compute_returncode(status)
1006            if self._loop.get_debug():
1007                logger.debug('process %s exited with returncode %s',
1008                             expected_pid, returncode)
1009
1010        try:
1011            callback, args = self._callbacks.pop(pid)
1012        except KeyError:  # pragma: no cover
1013            # May happen if .remove_child_handler() is called
1014            # after os.waitpid() returns.
1015            if self._loop.get_debug():
1016                logger.warning("Child watcher got an unexpected pid: %r",
1017                               pid, exc_info=True)
1018        else:
1019            callback(pid, returncode, *args)
1020
1021
1022class FastChildWatcher(BaseChildWatcher):
1023    """'Fast' child watcher implementation.
1024
1025    This implementation reaps every terminated processes by calling
1026    os.waitpid(-1) directly, possibly breaking other code spawning processes
1027    and waiting for their termination.
1028
1029    There is no noticeable overhead when handling a big number of children
1030    (O(1) each time a child terminates).
1031    """
1032    def __init__(self):
1033        super().__init__()
1034        self._lock = threading.Lock()
1035        self._zombies = {}
1036        self._forks = 0
1037
1038    def close(self):
1039        self._callbacks.clear()
1040        self._zombies.clear()
1041        super().close()
1042
1043    def __enter__(self):
1044        with self._lock:
1045            self._forks += 1
1046
1047            return self
1048
1049    def __exit__(self, a, b, c):
1050        with self._lock:
1051            self._forks -= 1
1052
1053            if self._forks or not self._zombies:
1054                return
1055
1056            collateral_victims = str(self._zombies)
1057            self._zombies.clear()
1058
1059        logger.warning(
1060            "Caught subprocesses termination from unknown pids: %s",
1061            collateral_victims)
1062
1063    def add_child_handler(self, pid, callback, *args):
1064        assert self._forks, "Must use the context manager"
1065
1066        with self._lock:
1067            try:
1068                returncode = self._zombies.pop(pid)
1069            except KeyError:
1070                # The child is running.
1071                self._callbacks[pid] = callback, args
1072                return
1073
1074        # The child is dead already. We can fire the callback.
1075        callback(pid, returncode, *args)
1076
1077    def remove_child_handler(self, pid):
1078        try:
1079            del self._callbacks[pid]
1080            return True
1081        except KeyError:
1082            return False
1083
1084    def _do_waitpid_all(self):
1085        # Because of signal coalescing, we must keep calling waitpid() as
1086        # long as we're able to reap a child.
1087        while True:
1088            try:
1089                pid, status = os.waitpid(-1, os.WNOHANG)
1090            except ChildProcessError:
1091                # No more child processes exist.
1092                return
1093            else:
1094                if pid == 0:
1095                    # A child process is still alive.
1096                    return
1097
1098                returncode = _compute_returncode(status)
1099
1100            with self._lock:
1101                try:
1102                    callback, args = self._callbacks.pop(pid)
1103                except KeyError:
1104                    # unknown child
1105                    if self._forks:
1106                        # It may not be registered yet.
1107                        self._zombies[pid] = returncode
1108                        if self._loop.get_debug():
1109                            logger.debug('unknown process %s exited '
1110                                         'with returncode %s',
1111                                         pid, returncode)
1112                        continue
1113                    callback = None
1114                else:
1115                    if self._loop.get_debug():
1116                        logger.debug('process %s exited with returncode %s',
1117                                     pid, returncode)
1118
1119            if callback is None:
1120                logger.warning(
1121                    "Caught subprocess termination from unknown pid: "
1122                    "%d -> %d", pid, returncode)
1123            else:
1124                callback(pid, returncode, *args)
1125
1126
1127class MultiLoopChildWatcher(AbstractChildWatcher):
1128    """A watcher that doesn't require running loop in the main thread.
1129
1130    This implementation registers a SIGCHLD signal handler on
1131    instantiation (which may conflict with other code that
1132    install own handler for this signal).
1133
1134    The solution is safe but it has a significant overhead when
1135    handling a big number of processes (*O(n)* each time a
1136    SIGCHLD is received).
1137    """
1138
1139    # Implementation note:
1140    # The class keeps compatibility with AbstractChildWatcher ABC
1141    # To achieve this it has empty attach_loop() method
1142    # and doesn't accept explicit loop argument
1143    # for add_child_handler()/remove_child_handler()
1144    # but retrieves the current loop by get_running_loop()
1145
1146    def __init__(self):
1147        self._callbacks = {}
1148        self._saved_sighandler = None
1149
1150    def is_active(self):
1151        return self._saved_sighandler is not None
1152
1153    def close(self):
1154        self._callbacks.clear()
1155        if self._saved_sighandler is not None:
1156            handler = signal.getsignal(signal.SIGCHLD)
1157            if handler != self._sig_chld:
1158                logger.warning("SIGCHLD handler was changed by outside code")
1159            else:
1160                signal.signal(signal.SIGCHLD, self._saved_sighandler)
1161            self._saved_sighandler = None
1162
1163    def __enter__(self):
1164        return self
1165
1166    def __exit__(self, exc_type, exc_val, exc_tb):
1167        pass
1168
1169    def add_child_handler(self, pid, callback, *args):
1170        loop = events.get_running_loop()
1171        self._callbacks[pid] = (loop, callback, args)
1172
1173        # Prevent a race condition in case the child is already terminated.
1174        self._do_waitpid(pid)
1175
1176    def remove_child_handler(self, pid):
1177        try:
1178            del self._callbacks[pid]
1179            return True
1180        except KeyError:
1181            return False
1182
1183    def attach_loop(self, loop):
1184        # Don't save the loop but initialize itself if called first time
1185        # The reason to do it here is that attach_loop() is called from
1186        # unix policy only for the main thread.
1187        # Main thread is required for subscription on SIGCHLD signal
1188        if self._saved_sighandler is None:
1189            self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1190            if self._saved_sighandler is None:
1191                logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1192                               "restore to default handler on watcher close.")
1193                self._saved_sighandler = signal.SIG_DFL
1194
1195            # Set SA_RESTART to limit EINTR occurrences.
1196            signal.siginterrupt(signal.SIGCHLD, False)
1197
1198    def _do_waitpid_all(self):
1199        for pid in list(self._callbacks):
1200            self._do_waitpid(pid)
1201
1202    def _do_waitpid(self, expected_pid):
1203        assert expected_pid > 0
1204
1205        try:
1206            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1207        except ChildProcessError:
1208            # The child process is already reaped
1209            # (may happen if waitpid() is called elsewhere).
1210            pid = expected_pid
1211            returncode = 255
1212            logger.warning(
1213                "Unknown child process pid %d, will report returncode 255",
1214                pid)
1215            debug_log = False
1216        else:
1217            if pid == 0:
1218                # The child process is still alive.
1219                return
1220
1221            returncode = _compute_returncode(status)
1222            debug_log = True
1223        try:
1224            loop, callback, args = self._callbacks.pop(pid)
1225        except KeyError:  # pragma: no cover
1226            # May happen if .remove_child_handler() is called
1227            # after os.waitpid() returns.
1228            logger.warning("Child watcher got an unexpected pid: %r",
1229                           pid, exc_info=True)
1230        else:
1231            if loop.is_closed():
1232                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1233            else:
1234                if debug_log and loop.get_debug():
1235                    logger.debug('process %s exited with returncode %s',
1236                                 expected_pid, returncode)
1237                loop.call_soon_threadsafe(callback, pid, returncode, *args)
1238
1239    def _sig_chld(self, signum, frame):
1240        try:
1241            self._do_waitpid_all()
1242        except (SystemExit, KeyboardInterrupt):
1243            raise
1244        except BaseException:
1245            logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1246
1247
1248class ThreadedChildWatcher(AbstractChildWatcher):
1249    """Threaded child watcher implementation.
1250
1251    The watcher uses a thread per process
1252    for waiting for the process finish.
1253
1254    It doesn't require subscription on POSIX signal
1255    but a thread creation is not free.
1256
1257    The watcher has O(1) complexity, its performance doesn't depend
1258    on amount of spawn processes.
1259    """
1260
1261    def __init__(self):
1262        self._pid_counter = itertools.count(0)
1263        self._threads = {}
1264
1265    def is_active(self):
1266        return True
1267
1268    def close(self):
1269        pass
1270
1271    def __enter__(self):
1272        return self
1273
1274    def __exit__(self, exc_type, exc_val, exc_tb):
1275        pass
1276
1277    def __del__(self, _warn=warnings.warn):
1278        threads = [thread for thread in list(self._threads.values())
1279                   if thread.is_alive()]
1280        if threads:
1281            _warn(f"{self.__class__} has registered but not finished child processes",
1282                  ResourceWarning,
1283                  source=self)
1284
1285    def add_child_handler(self, pid, callback, *args):
1286        loop = events.get_running_loop()
1287        thread = threading.Thread(target=self._do_waitpid,
1288                                  name=f"waitpid-{next(self._pid_counter)}",
1289                                  args=(loop, pid, callback, args),
1290                                  daemon=True)
1291        self._threads[pid] = thread
1292        thread.start()
1293
1294    def remove_child_handler(self, pid):
1295        # asyncio never calls remove_child_handler() !!!
1296        # The method is no-op but is implemented because
1297        # abstract base classe requires it
1298        return True
1299
1300    def attach_loop(self, loop):
1301        pass
1302
1303    def _do_waitpid(self, loop, expected_pid, callback, args):
1304        assert expected_pid > 0
1305
1306        try:
1307            pid, status = os.waitpid(expected_pid, 0)
1308        except ChildProcessError:
1309            # The child process is already reaped
1310            # (may happen if waitpid() is called elsewhere).
1311            pid = expected_pid
1312            returncode = 255
1313            logger.warning(
1314                "Unknown child process pid %d, will report returncode 255",
1315                pid)
1316        else:
1317            returncode = _compute_returncode(status)
1318            if loop.get_debug():
1319                logger.debug('process %s exited with returncode %s',
1320                             expected_pid, returncode)
1321
1322        if loop.is_closed():
1323            logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1324        else:
1325            loop.call_soon_threadsafe(callback, pid, returncode, *args)
1326
1327        self._threads.pop(expected_pid)
1328
1329
1330class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1331    """UNIX event loop policy with a watcher for child processes."""
1332    _loop_factory = _UnixSelectorEventLoop
1333
1334    def __init__(self):
1335        super().__init__()
1336        self._watcher = None
1337
1338    def _init_watcher(self):
1339        with events._lock:
1340            if self._watcher is None:  # pragma: no branch
1341                self._watcher = ThreadedChildWatcher()
1342                if isinstance(threading.current_thread(),
1343                              threading._MainThread):
1344                    self._watcher.attach_loop(self._local._loop)
1345
1346    def set_event_loop(self, loop):
1347        """Set the event loop.
1348
1349        As a side effect, if a child watcher was set before, then calling
1350        .set_event_loop() from the main thread will call .attach_loop(loop) on
1351        the child watcher.
1352        """
1353
1354        super().set_event_loop(loop)
1355
1356        if (self._watcher is not None and
1357                isinstance(threading.current_thread(), threading._MainThread)):
1358            self._watcher.attach_loop(loop)
1359
1360    def get_child_watcher(self):
1361        """Get the watcher for child processes.
1362
1363        If not yet set, a ThreadedChildWatcher object is automatically created.
1364        """
1365        if self._watcher is None:
1366            self._init_watcher()
1367
1368        return self._watcher
1369
1370    def set_child_watcher(self, watcher):
1371        """Set the watcher for child processes."""
1372
1373        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1374
1375        if self._watcher is not None:
1376            self._watcher.close()
1377
1378        self._watcher = watcher
1379
1380
1381SelectorEventLoop = _UnixSelectorEventLoop
1382DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1383