• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selector event loop for Unix with signal handling."""
2
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
15
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
27
28
29__all__ = (
30    'SelectorEventLoop',
31    'AbstractChildWatcher', 'SafeChildWatcher',
32    'FastChildWatcher', 'PidfdChildWatcher',
33    'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34    'DefaultEventLoopPolicy',
35)
36
37
38if sys.platform == 'win32':  # pragma: no cover
39    raise ImportError('Signals are not really supported on Windows')
40
41
42def _sighandler_noop(signum, frame):
43    """Dummy signal handler."""
44    pass
45
46
47class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
48    """Unix event loop.
49
50    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
51    """
52
53    def __init__(self, selector=None):
54        super().__init__(selector)
55        self._signal_handlers = {}
56
57    def close(self):
58        super().close()
59        if not sys.is_finalizing():
60            for sig in list(self._signal_handlers):
61                self.remove_signal_handler(sig)
62        else:
63            if self._signal_handlers:
64                warnings.warn(f"Closing the loop {self!r} "
65                              f"on interpreter shutdown "
66                              f"stage, skipping signal handlers removal",
67                              ResourceWarning,
68                              source=self)
69                self._signal_handlers.clear()
70
71    def _process_self_data(self, data):
72        for signum in data:
73            if not signum:
74                # ignore null bytes written by _write_to_self()
75                continue
76            self._handle_signal(signum)
77
78    def add_signal_handler(self, sig, callback, *args):
79        """Add a handler for a signal.  UNIX only.
80
81        Raise ValueError if the signal number is invalid or uncatchable.
82        Raise RuntimeError if there is a problem setting up the handler.
83        """
84        if (coroutines.iscoroutine(callback) or
85                coroutines.iscoroutinefunction(callback)):
86            raise TypeError("coroutines cannot be used "
87                            "with add_signal_handler()")
88        self._check_signal(sig)
89        self._check_closed()
90        try:
91            # set_wakeup_fd() raises ValueError if this is not the
92            # main thread.  By calling it early we ensure that an
93            # event loop running in another thread cannot add a signal
94            # handler.
95            signal.set_wakeup_fd(self._csock.fileno())
96        except (ValueError, OSError) as exc:
97            raise RuntimeError(str(exc))
98
99        handle = events.Handle(callback, args, self, None)
100        self._signal_handlers[sig] = handle
101
102        try:
103            # Register a dummy signal handler to ask Python to write the signal
104            # number in the wakeup file descriptor. _process_self_data() will
105            # read signal numbers from this file descriptor to handle signals.
106            signal.signal(sig, _sighandler_noop)
107
108            # Set SA_RESTART to limit EINTR occurrences.
109            signal.siginterrupt(sig, False)
110        except OSError as exc:
111            del self._signal_handlers[sig]
112            if not self._signal_handlers:
113                try:
114                    signal.set_wakeup_fd(-1)
115                except (ValueError, OSError) as nexc:
116                    logger.info('set_wakeup_fd(-1) failed: %s', nexc)
117
118            if exc.errno == errno.EINVAL:
119                raise RuntimeError(f'sig {sig} cannot be caught')
120            else:
121                raise
122
123    def _handle_signal(self, sig):
124        """Internal helper that is the actual signal handler."""
125        handle = self._signal_handlers.get(sig)
126        if handle is None:
127            return  # Assume it's some race condition.
128        if handle._cancelled:
129            self.remove_signal_handler(sig)  # Remove it properly.
130        else:
131            self._add_callback_signalsafe(handle)
132
133    def remove_signal_handler(self, sig):
134        """Remove a handler for a signal.  UNIX only.
135
136        Return True if a signal handler was removed, False if not.
137        """
138        self._check_signal(sig)
139        try:
140            del self._signal_handlers[sig]
141        except KeyError:
142            return False
143
144        if sig == signal.SIGINT:
145            handler = signal.default_int_handler
146        else:
147            handler = signal.SIG_DFL
148
149        try:
150            signal.signal(sig, handler)
151        except OSError as exc:
152            if exc.errno == errno.EINVAL:
153                raise RuntimeError(f'sig {sig} cannot be caught')
154            else:
155                raise
156
157        if not self._signal_handlers:
158            try:
159                signal.set_wakeup_fd(-1)
160            except (ValueError, OSError) as exc:
161                logger.info('set_wakeup_fd(-1) failed: %s', exc)
162
163        return True
164
165    def _check_signal(self, sig):
166        """Internal helper to validate a signal.
167
168        Raise ValueError if the signal number is invalid or uncatchable.
169        Raise RuntimeError if there is a problem setting up the handler.
170        """
171        if not isinstance(sig, int):
172            raise TypeError(f'sig must be an int, not {sig!r}')
173
174        if sig not in signal.valid_signals():
175            raise ValueError(f'invalid signal number {sig}')
176
177    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
178                                  extra=None):
179        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
180
181    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
182                                   extra=None):
183        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
184
185    async def _make_subprocess_transport(self, protocol, args, shell,
186                                         stdin, stdout, stderr, bufsize,
187                                         extra=None, **kwargs):
188        with events.get_child_watcher() as watcher:
189            if not watcher.is_active():
190                # Check early.
191                # Raising exception before process creation
192                # prevents subprocess execution if the watcher
193                # is not ready to handle it.
194                raise RuntimeError("asyncio.get_child_watcher() is not activated, "
195                                   "subprocess support is not installed.")
196            waiter = self.create_future()
197            transp = _UnixSubprocessTransport(self, protocol, args, shell,
198                                              stdin, stdout, stderr, bufsize,
199                                              waiter=waiter, extra=extra,
200                                              **kwargs)
201
202            watcher.add_child_handler(transp.get_pid(),
203                                      self._child_watcher_callback, transp)
204            try:
205                await waiter
206            except (SystemExit, KeyboardInterrupt):
207                raise
208            except BaseException:
209                transp.close()
210                await transp._wait()
211                raise
212
213        return transp
214
215    def _child_watcher_callback(self, pid, returncode, transp):
216        self.call_soon_threadsafe(transp._process_exited, returncode)
217
218    async def create_unix_connection(
219            self, protocol_factory, path=None, *,
220            ssl=None, sock=None,
221            server_hostname=None,
222            ssl_handshake_timeout=None):
223        assert server_hostname is None or isinstance(server_hostname, str)
224        if ssl:
225            if server_hostname is None:
226                raise ValueError(
227                    'you have to pass server_hostname when using ssl')
228        else:
229            if server_hostname is not None:
230                raise ValueError('server_hostname is only meaningful with ssl')
231            if ssl_handshake_timeout is not None:
232                raise ValueError(
233                    'ssl_handshake_timeout is only meaningful with ssl')
234
235        if path is not None:
236            if sock is not None:
237                raise ValueError(
238                    'path and sock can not be specified at the same time')
239
240            path = os.fspath(path)
241            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
242            try:
243                sock.setblocking(False)
244                await self.sock_connect(sock, path)
245            except:
246                sock.close()
247                raise
248
249        else:
250            if sock is None:
251                raise ValueError('no path and sock were specified')
252            if (sock.family != socket.AF_UNIX or
253                    sock.type != socket.SOCK_STREAM):
254                raise ValueError(
255                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
256            sock.setblocking(False)
257
258        transport, protocol = await self._create_connection_transport(
259            sock, protocol_factory, ssl, server_hostname,
260            ssl_handshake_timeout=ssl_handshake_timeout)
261        return transport, protocol
262
263    async def create_unix_server(
264            self, protocol_factory, path=None, *,
265            sock=None, backlog=100, ssl=None,
266            ssl_handshake_timeout=None,
267            start_serving=True):
268        if isinstance(ssl, bool):
269            raise TypeError('ssl argument must be an SSLContext or None')
270
271        if ssl_handshake_timeout is not None and not ssl:
272            raise ValueError(
273                'ssl_handshake_timeout is only meaningful with ssl')
274
275        if path is not None:
276            if sock is not None:
277                raise ValueError(
278                    'path and sock can not be specified at the same time')
279
280            path = os.fspath(path)
281            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
282
283            # Check for abstract socket. `str` and `bytes` paths are supported.
284            if path[0] not in (0, '\x00'):
285                try:
286                    if stat.S_ISSOCK(os.stat(path).st_mode):
287                        os.remove(path)
288                except FileNotFoundError:
289                    pass
290                except OSError as err:
291                    # Directory may have permissions only to create socket.
292                    logger.error('Unable to check or remove stale UNIX socket '
293                                 '%r: %r', path, err)
294
295            try:
296                sock.bind(path)
297            except OSError as exc:
298                sock.close()
299                if exc.errno == errno.EADDRINUSE:
300                    # Let's improve the error message by adding
301                    # with what exact address it occurs.
302                    msg = f'Address {path!r} is already in use'
303                    raise OSError(errno.EADDRINUSE, msg) from None
304                else:
305                    raise
306            except:
307                sock.close()
308                raise
309        else:
310            if sock is None:
311                raise ValueError(
312                    'path was not specified, and no sock specified')
313
314            if (sock.family != socket.AF_UNIX or
315                    sock.type != socket.SOCK_STREAM):
316                raise ValueError(
317                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
318
319        sock.setblocking(False)
320        server = base_events.Server(self, [sock], protocol_factory,
321                                    ssl, backlog, ssl_handshake_timeout)
322        if start_serving:
323            server._start_serving()
324            # Skip one loop iteration so that all 'loop.add_reader'
325            # go through.
326            await tasks.sleep(0, loop=self)
327
328        return server
329
330    async def _sock_sendfile_native(self, sock, file, offset, count):
331        try:
332            os.sendfile
333        except AttributeError:
334            raise exceptions.SendfileNotAvailableError(
335                "os.sendfile() is not available")
336        try:
337            fileno = file.fileno()
338        except (AttributeError, io.UnsupportedOperation) as err:
339            raise exceptions.SendfileNotAvailableError("not a regular file")
340        try:
341            fsize = os.fstat(fileno).st_size
342        except OSError:
343            raise exceptions.SendfileNotAvailableError("not a regular file")
344        blocksize = count if count else fsize
345        if not blocksize:
346            return 0  # empty file
347
348        fut = self.create_future()
349        self._sock_sendfile_native_impl(fut, None, sock, fileno,
350                                        offset, count, blocksize, 0)
351        return await fut
352
353    def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
354                                   offset, count, blocksize, total_sent):
355        fd = sock.fileno()
356        if registered_fd is not None:
357            # Remove the callback early.  It should be rare that the
358            # selector says the fd is ready but the call still returns
359            # EAGAIN, and I am willing to take a hit in that case in
360            # order to simplify the common case.
361            self.remove_writer(registered_fd)
362        if fut.cancelled():
363            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
364            return
365        if count:
366            blocksize = count - total_sent
367            if blocksize <= 0:
368                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
369                fut.set_result(total_sent)
370                return
371
372        try:
373            sent = os.sendfile(fd, fileno, offset, blocksize)
374        except (BlockingIOError, InterruptedError):
375            if registered_fd is None:
376                self._sock_add_cancellation_callback(fut, sock)
377            self.add_writer(fd, self._sock_sendfile_native_impl, fut,
378                            fd, sock, fileno,
379                            offset, count, blocksize, total_sent)
380        except OSError as exc:
381            if (registered_fd is not None and
382                    exc.errno == errno.ENOTCONN and
383                    type(exc) is not ConnectionError):
384                # If we have an ENOTCONN and this isn't a first call to
385                # sendfile(), i.e. the connection was closed in the middle
386                # of the operation, normalize the error to ConnectionError
387                # to make it consistent across all Posix systems.
388                new_exc = ConnectionError(
389                    "socket is not connected", errno.ENOTCONN)
390                new_exc.__cause__ = exc
391                exc = new_exc
392            if total_sent == 0:
393                # We can get here for different reasons, the main
394                # one being 'file' is not a regular mmap(2)-like
395                # file, in which case we'll fall back on using
396                # plain send().
397                err = exceptions.SendfileNotAvailableError(
398                    "os.sendfile call failed")
399                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
400                fut.set_exception(err)
401            else:
402                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
403                fut.set_exception(exc)
404        except (SystemExit, KeyboardInterrupt):
405            raise
406        except BaseException as exc:
407            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
408            fut.set_exception(exc)
409        else:
410            if sent == 0:
411                # EOF
412                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413                fut.set_result(total_sent)
414            else:
415                offset += sent
416                total_sent += sent
417                if registered_fd is None:
418                    self._sock_add_cancellation_callback(fut, sock)
419                self.add_writer(fd, self._sock_sendfile_native_impl, fut,
420                                fd, sock, fileno,
421                                offset, count, blocksize, total_sent)
422
423    def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
424        if total_sent > 0:
425            os.lseek(fileno, offset, os.SEEK_SET)
426
427    def _sock_add_cancellation_callback(self, fut, sock):
428        def cb(fut):
429            if fut.cancelled():
430                fd = sock.fileno()
431                if fd != -1:
432                    self.remove_writer(fd)
433        fut.add_done_callback(cb)
434
435
436class _UnixReadPipeTransport(transports.ReadTransport):
437
438    max_size = 256 * 1024  # max bytes we read in one event loop iteration
439
440    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
441        super().__init__(extra)
442        self._extra['pipe'] = pipe
443        self._loop = loop
444        self._pipe = pipe
445        self._fileno = pipe.fileno()
446        self._protocol = protocol
447        self._closing = False
448        self._paused = False
449
450        mode = os.fstat(self._fileno).st_mode
451        if not (stat.S_ISFIFO(mode) or
452                stat.S_ISSOCK(mode) or
453                stat.S_ISCHR(mode)):
454            self._pipe = None
455            self._fileno = None
456            self._protocol = None
457            raise ValueError("Pipe transport is for pipes/sockets only.")
458
459        os.set_blocking(self._fileno, False)
460
461        self._loop.call_soon(self._protocol.connection_made, self)
462        # only start reading when connection_made() has been called
463        self._loop.call_soon(self._loop._add_reader,
464                             self._fileno, self._read_ready)
465        if waiter is not None:
466            # only wake up the waiter when connection_made() has been called
467            self._loop.call_soon(futures._set_result_unless_cancelled,
468                                 waiter, None)
469
470    def __repr__(self):
471        info = [self.__class__.__name__]
472        if self._pipe is None:
473            info.append('closed')
474        elif self._closing:
475            info.append('closing')
476        info.append(f'fd={self._fileno}')
477        selector = getattr(self._loop, '_selector', None)
478        if self._pipe is not None and selector is not None:
479            polling = selector_events._test_selector_event(
480                selector, self._fileno, selectors.EVENT_READ)
481            if polling:
482                info.append('polling')
483            else:
484                info.append('idle')
485        elif self._pipe is not None:
486            info.append('open')
487        else:
488            info.append('closed')
489        return '<{}>'.format(' '.join(info))
490
491    def _read_ready(self):
492        try:
493            data = os.read(self._fileno, self.max_size)
494        except (BlockingIOError, InterruptedError):
495            pass
496        except OSError as exc:
497            self._fatal_error(exc, 'Fatal read error on pipe transport')
498        else:
499            if data:
500                self._protocol.data_received(data)
501            else:
502                if self._loop.get_debug():
503                    logger.info("%r was closed by peer", self)
504                self._closing = True
505                self._loop._remove_reader(self._fileno)
506                self._loop.call_soon(self._protocol.eof_received)
507                self._loop.call_soon(self._call_connection_lost, None)
508
509    def pause_reading(self):
510        if self._closing or self._paused:
511            return
512        self._paused = True
513        self._loop._remove_reader(self._fileno)
514        if self._loop.get_debug():
515            logger.debug("%r pauses reading", self)
516
517    def resume_reading(self):
518        if self._closing or not self._paused:
519            return
520        self._paused = False
521        self._loop._add_reader(self._fileno, self._read_ready)
522        if self._loop.get_debug():
523            logger.debug("%r resumes reading", self)
524
525    def set_protocol(self, protocol):
526        self._protocol = protocol
527
528    def get_protocol(self):
529        return self._protocol
530
531    def is_closing(self):
532        return self._closing
533
534    def close(self):
535        if not self._closing:
536            self._close(None)
537
538    def __del__(self, _warn=warnings.warn):
539        if self._pipe is not None:
540            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
541            self._pipe.close()
542
543    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
544        # should be called by exception handler only
545        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
546            if self._loop.get_debug():
547                logger.debug("%r: %s", self, message, exc_info=True)
548        else:
549            self._loop.call_exception_handler({
550                'message': message,
551                'exception': exc,
552                'transport': self,
553                'protocol': self._protocol,
554            })
555        self._close(exc)
556
557    def _close(self, exc):
558        self._closing = True
559        self._loop._remove_reader(self._fileno)
560        self._loop.call_soon(self._call_connection_lost, exc)
561
562    def _call_connection_lost(self, exc):
563        try:
564            self._protocol.connection_lost(exc)
565        finally:
566            self._pipe.close()
567            self._pipe = None
568            self._protocol = None
569            self._loop = None
570
571
572class _UnixWritePipeTransport(transports._FlowControlMixin,
573                              transports.WriteTransport):
574
575    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
576        super().__init__(extra, loop)
577        self._extra['pipe'] = pipe
578        self._pipe = pipe
579        self._fileno = pipe.fileno()
580        self._protocol = protocol
581        self._buffer = bytearray()
582        self._conn_lost = 0
583        self._closing = False  # Set when close() or write_eof() called.
584
585        mode = os.fstat(self._fileno).st_mode
586        is_char = stat.S_ISCHR(mode)
587        is_fifo = stat.S_ISFIFO(mode)
588        is_socket = stat.S_ISSOCK(mode)
589        if not (is_char or is_fifo or is_socket):
590            self._pipe = None
591            self._fileno = None
592            self._protocol = None
593            raise ValueError("Pipe transport is only for "
594                             "pipes, sockets and character devices")
595
596        os.set_blocking(self._fileno, False)
597        self._loop.call_soon(self._protocol.connection_made, self)
598
599        # On AIX, the reader trick (to be notified when the read end of the
600        # socket is closed) only works for sockets. On other platforms it
601        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
602        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
603            # only start reading when connection_made() has been called
604            self._loop.call_soon(self._loop._add_reader,
605                                 self._fileno, self._read_ready)
606
607        if waiter is not None:
608            # only wake up the waiter when connection_made() has been called
609            self._loop.call_soon(futures._set_result_unless_cancelled,
610                                 waiter, None)
611
612    def __repr__(self):
613        info = [self.__class__.__name__]
614        if self._pipe is None:
615            info.append('closed')
616        elif self._closing:
617            info.append('closing')
618        info.append(f'fd={self._fileno}')
619        selector = getattr(self._loop, '_selector', None)
620        if self._pipe is not None and selector is not None:
621            polling = selector_events._test_selector_event(
622                selector, self._fileno, selectors.EVENT_WRITE)
623            if polling:
624                info.append('polling')
625            else:
626                info.append('idle')
627
628            bufsize = self.get_write_buffer_size()
629            info.append(f'bufsize={bufsize}')
630        elif self._pipe is not None:
631            info.append('open')
632        else:
633            info.append('closed')
634        return '<{}>'.format(' '.join(info))
635
636    def get_write_buffer_size(self):
637        return len(self._buffer)
638
639    def _read_ready(self):
640        # Pipe was closed by peer.
641        if self._loop.get_debug():
642            logger.info("%r was closed by peer", self)
643        if self._buffer:
644            self._close(BrokenPipeError())
645        else:
646            self._close()
647
648    def write(self, data):
649        assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
650        if isinstance(data, bytearray):
651            data = memoryview(data)
652        if not data:
653            return
654
655        if self._conn_lost or self._closing:
656            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
657                logger.warning('pipe closed by peer or '
658                               'os.write(pipe, data) raised exception.')
659            self._conn_lost += 1
660            return
661
662        if not self._buffer:
663            # Attempt to send it right away first.
664            try:
665                n = os.write(self._fileno, data)
666            except (BlockingIOError, InterruptedError):
667                n = 0
668            except (SystemExit, KeyboardInterrupt):
669                raise
670            except BaseException as exc:
671                self._conn_lost += 1
672                self._fatal_error(exc, 'Fatal write error on pipe transport')
673                return
674            if n == len(data):
675                return
676            elif n > 0:
677                data = memoryview(data)[n:]
678            self._loop._add_writer(self._fileno, self._write_ready)
679
680        self._buffer += data
681        self._maybe_pause_protocol()
682
683    def _write_ready(self):
684        assert self._buffer, 'Data should not be empty'
685
686        try:
687            n = os.write(self._fileno, self._buffer)
688        except (BlockingIOError, InterruptedError):
689            pass
690        except (SystemExit, KeyboardInterrupt):
691            raise
692        except BaseException as exc:
693            self._buffer.clear()
694            self._conn_lost += 1
695            # Remove writer here, _fatal_error() doesn't it
696            # because _buffer is empty.
697            self._loop._remove_writer(self._fileno)
698            self._fatal_error(exc, 'Fatal write error on pipe transport')
699        else:
700            if n == len(self._buffer):
701                self._buffer.clear()
702                self._loop._remove_writer(self._fileno)
703                self._maybe_resume_protocol()  # May append to buffer.
704                if self._closing:
705                    self._loop._remove_reader(self._fileno)
706                    self._call_connection_lost(None)
707                return
708            elif n > 0:
709                del self._buffer[:n]
710
711    def can_write_eof(self):
712        return True
713
714    def write_eof(self):
715        if self._closing:
716            return
717        assert self._pipe
718        self._closing = True
719        if not self._buffer:
720            self._loop._remove_reader(self._fileno)
721            self._loop.call_soon(self._call_connection_lost, None)
722
723    def set_protocol(self, protocol):
724        self._protocol = protocol
725
726    def get_protocol(self):
727        return self._protocol
728
729    def is_closing(self):
730        return self._closing
731
732    def close(self):
733        if self._pipe is not None and not self._closing:
734            # write_eof is all what we needed to close the write pipe
735            self.write_eof()
736
737    def __del__(self, _warn=warnings.warn):
738        if self._pipe is not None:
739            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
740            self._pipe.close()
741
742    def abort(self):
743        self._close(None)
744
745    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
746        # should be called by exception handler only
747        if isinstance(exc, OSError):
748            if self._loop.get_debug():
749                logger.debug("%r: %s", self, message, exc_info=True)
750        else:
751            self._loop.call_exception_handler({
752                'message': message,
753                'exception': exc,
754                'transport': self,
755                'protocol': self._protocol,
756            })
757        self._close(exc)
758
759    def _close(self, exc=None):
760        self._closing = True
761        if self._buffer:
762            self._loop._remove_writer(self._fileno)
763        self._buffer.clear()
764        self._loop._remove_reader(self._fileno)
765        self._loop.call_soon(self._call_connection_lost, exc)
766
767    def _call_connection_lost(self, exc):
768        try:
769            self._protocol.connection_lost(exc)
770        finally:
771            self._pipe.close()
772            self._pipe = None
773            self._protocol = None
774            self._loop = None
775
776
777class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
778
779    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
780        stdin_w = None
781        if stdin == subprocess.PIPE:
782            # Use a socket pair for stdin, since not all platforms
783            # support selecting read events on the write end of a
784            # socket (which we use in order to detect closing of the
785            # other end).  Notably this is needed on AIX, and works
786            # just fine on other platforms.
787            stdin, stdin_w = socket.socketpair()
788        try:
789            self._proc = subprocess.Popen(
790                args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
791                universal_newlines=False, bufsize=bufsize, **kwargs)
792            if stdin_w is not None:
793                stdin.close()
794                self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
795                stdin_w = None
796        finally:
797            if stdin_w is not None:
798                stdin.close()
799                stdin_w.close()
800
801
802class AbstractChildWatcher:
803    """Abstract base class for monitoring child processes.
804
805    Objects derived from this class monitor a collection of subprocesses and
806    report their termination or interruption by a signal.
807
808    New callbacks are registered with .add_child_handler(). Starting a new
809    process must be done within a 'with' block to allow the watcher to suspend
810    its activity until the new process if fully registered (this is needed to
811    prevent a race condition in some implementations).
812
813    Example:
814        with watcher:
815            proc = subprocess.Popen("sleep 1")
816            watcher.add_child_handler(proc.pid, callback)
817
818    Notes:
819        Implementations of this class must be thread-safe.
820
821        Since child watcher objects may catch the SIGCHLD signal and call
822        waitpid(-1), there should be only one active object per process.
823    """
824
825    def add_child_handler(self, pid, callback, *args):
826        """Register a new child handler.
827
828        Arrange for callback(pid, returncode, *args) to be called when
829        process 'pid' terminates. Specifying another callback for the same
830        process replaces the previous handler.
831
832        Note: callback() must be thread-safe.
833        """
834        raise NotImplementedError()
835
836    def remove_child_handler(self, pid):
837        """Removes the handler for process 'pid'.
838
839        The function returns True if the handler was successfully removed,
840        False if there was nothing to remove."""
841
842        raise NotImplementedError()
843
844    def attach_loop(self, loop):
845        """Attach the watcher to an event loop.
846
847        If the watcher was previously attached to an event loop, then it is
848        first detached before attaching to the new loop.
849
850        Note: loop may be None.
851        """
852        raise NotImplementedError()
853
854    def close(self):
855        """Close the watcher.
856
857        This must be called to make sure that any underlying resource is freed.
858        """
859        raise NotImplementedError()
860
861    def is_active(self):
862        """Return ``True`` if the watcher is active and is used by the event loop.
863
864        Return True if the watcher is installed and ready to handle process exit
865        notifications.
866
867        """
868        raise NotImplementedError()
869
870    def __enter__(self):
871        """Enter the watcher's context and allow starting new processes
872
873        This function must return self"""
874        raise NotImplementedError()
875
876    def __exit__(self, a, b, c):
877        """Exit the watcher's context"""
878        raise NotImplementedError()
879
880
881class PidfdChildWatcher(AbstractChildWatcher):
882    """Child watcher implementation using Linux's pid file descriptors.
883
884    This child watcher polls process file descriptors (pidfds) to await child
885    process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
886    child watcher implementation. It doesn't require signals or threads, doesn't
887    interfere with any processes launched outside the event loop, and scales
888    linearly with the number of subprocesses launched by the event loop. The
889    main disadvantage is that pidfds are specific to Linux, and only work on
890    recent (5.3+) kernels.
891    """
892
893    def __init__(self):
894        self._loop = None
895        self._callbacks = {}
896
897    def __enter__(self):
898        return self
899
900    def __exit__(self, exc_type, exc_value, exc_traceback):
901        pass
902
903    def is_active(self):
904        return self._loop is not None and self._loop.is_running()
905
906    def close(self):
907        self.attach_loop(None)
908
909    def attach_loop(self, loop):
910        if self._loop is not None and loop is None and self._callbacks:
911            warnings.warn(
912                'A loop is being detached '
913                'from a child watcher with pending handlers',
914                RuntimeWarning)
915        for pidfd, _, _ in self._callbacks.values():
916            self._loop._remove_reader(pidfd)
917            os.close(pidfd)
918        self._callbacks.clear()
919        self._loop = loop
920
921    def add_child_handler(self, pid, callback, *args):
922        existing = self._callbacks.get(pid)
923        if existing is not None:
924            self._callbacks[pid] = existing[0], callback, args
925        else:
926            pidfd = os.pidfd_open(pid)
927            self._loop._add_reader(pidfd, self._do_wait, pid)
928            self._callbacks[pid] = pidfd, callback, args
929
930    def _do_wait(self, pid):
931        pidfd, callback, args = self._callbacks.pop(pid)
932        self._loop._remove_reader(pidfd)
933        try:
934            _, status = os.waitpid(pid, 0)
935        except ChildProcessError:
936            # The child process is already reaped
937            # (may happen if waitpid() is called elsewhere).
938            returncode = 255
939            logger.warning(
940                "child process pid %d exit status already read: "
941                " will report returncode 255",
942                pid)
943        else:
944            returncode = _compute_returncode(status)
945
946        os.close(pidfd)
947        callback(pid, returncode, *args)
948
949    def remove_child_handler(self, pid):
950        try:
951            pidfd, _, _ = self._callbacks.pop(pid)
952        except KeyError:
953            return False
954        self._loop._remove_reader(pidfd)
955        os.close(pidfd)
956        return True
957
958
959def _compute_returncode(status):
960    if os.WIFSIGNALED(status):
961        # The child process died because of a signal.
962        return -os.WTERMSIG(status)
963    elif os.WIFEXITED(status):
964        # The child process exited (e.g sys.exit()).
965        return os.WEXITSTATUS(status)
966    else:
967        # The child exited, but we don't understand its status.
968        # This shouldn't happen, but if it does, let's just
969        # return that status; perhaps that helps debug it.
970        return status
971
972
973class BaseChildWatcher(AbstractChildWatcher):
974
975    def __init__(self):
976        self._loop = None
977        self._callbacks = {}
978
979    def close(self):
980        self.attach_loop(None)
981
982    def is_active(self):
983        return self._loop is not None and self._loop.is_running()
984
985    def _do_waitpid(self, expected_pid):
986        raise NotImplementedError()
987
988    def _do_waitpid_all(self):
989        raise NotImplementedError()
990
991    def attach_loop(self, loop):
992        assert loop is None or isinstance(loop, events.AbstractEventLoop)
993
994        if self._loop is not None and loop is None and self._callbacks:
995            warnings.warn(
996                'A loop is being detached '
997                'from a child watcher with pending handlers',
998                RuntimeWarning)
999
1000        if self._loop is not None:
1001            self._loop.remove_signal_handler(signal.SIGCHLD)
1002
1003        self._loop = loop
1004        if loop is not None:
1005            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1006
1007            # Prevent a race condition in case a child terminated
1008            # during the switch.
1009            self._do_waitpid_all()
1010
1011    def _sig_chld(self):
1012        try:
1013            self._do_waitpid_all()
1014        except (SystemExit, KeyboardInterrupt):
1015            raise
1016        except BaseException as exc:
1017            # self._loop should always be available here
1018            # as '_sig_chld' is added as a signal handler
1019            # in 'attach_loop'
1020            self._loop.call_exception_handler({
1021                'message': 'Unknown exception in SIGCHLD handler',
1022                'exception': exc,
1023            })
1024
1025
1026class SafeChildWatcher(BaseChildWatcher):
1027    """'Safe' child watcher implementation.
1028
1029    This implementation avoids disrupting other code spawning processes by
1030    polling explicitly each process in the SIGCHLD handler instead of calling
1031    os.waitpid(-1).
1032
1033    This is a safe solution but it has a significant overhead when handling a
1034    big number of children (O(n) each time SIGCHLD is raised)
1035    """
1036
1037    def close(self):
1038        self._callbacks.clear()
1039        super().close()
1040
1041    def __enter__(self):
1042        return self
1043
1044    def __exit__(self, a, b, c):
1045        pass
1046
1047    def add_child_handler(self, pid, callback, *args):
1048        self._callbacks[pid] = (callback, args)
1049
1050        # Prevent a race condition in case the child is already terminated.
1051        self._do_waitpid(pid)
1052
1053    def remove_child_handler(self, pid):
1054        try:
1055            del self._callbacks[pid]
1056            return True
1057        except KeyError:
1058            return False
1059
1060    def _do_waitpid_all(self):
1061
1062        for pid in list(self._callbacks):
1063            self._do_waitpid(pid)
1064
1065    def _do_waitpid(self, expected_pid):
1066        assert expected_pid > 0
1067
1068        try:
1069            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1070        except ChildProcessError:
1071            # The child process is already reaped
1072            # (may happen if waitpid() is called elsewhere).
1073            pid = expected_pid
1074            returncode = 255
1075            logger.warning(
1076                "Unknown child process pid %d, will report returncode 255",
1077                pid)
1078        else:
1079            if pid == 0:
1080                # The child process is still alive.
1081                return
1082
1083            returncode = _compute_returncode(status)
1084            if self._loop.get_debug():
1085                logger.debug('process %s exited with returncode %s',
1086                             expected_pid, returncode)
1087
1088        try:
1089            callback, args = self._callbacks.pop(pid)
1090        except KeyError:  # pragma: no cover
1091            # May happen if .remove_child_handler() is called
1092            # after os.waitpid() returns.
1093            if self._loop.get_debug():
1094                logger.warning("Child watcher got an unexpected pid: %r",
1095                               pid, exc_info=True)
1096        else:
1097            callback(pid, returncode, *args)
1098
1099
1100class FastChildWatcher(BaseChildWatcher):
1101    """'Fast' child watcher implementation.
1102
1103    This implementation reaps every terminated processes by calling
1104    os.waitpid(-1) directly, possibly breaking other code spawning processes
1105    and waiting for their termination.
1106
1107    There is no noticeable overhead when handling a big number of children
1108    (O(1) each time a child terminates).
1109    """
1110    def __init__(self):
1111        super().__init__()
1112        self._lock = threading.Lock()
1113        self._zombies = {}
1114        self._forks = 0
1115
1116    def close(self):
1117        self._callbacks.clear()
1118        self._zombies.clear()
1119        super().close()
1120
1121    def __enter__(self):
1122        with self._lock:
1123            self._forks += 1
1124
1125            return self
1126
1127    def __exit__(self, a, b, c):
1128        with self._lock:
1129            self._forks -= 1
1130
1131            if self._forks or not self._zombies:
1132                return
1133
1134            collateral_victims = str(self._zombies)
1135            self._zombies.clear()
1136
1137        logger.warning(
1138            "Caught subprocesses termination from unknown pids: %s",
1139            collateral_victims)
1140
1141    def add_child_handler(self, pid, callback, *args):
1142        assert self._forks, "Must use the context manager"
1143
1144        with self._lock:
1145            try:
1146                returncode = self._zombies.pop(pid)
1147            except KeyError:
1148                # The child is running.
1149                self._callbacks[pid] = callback, args
1150                return
1151
1152        # The child is dead already. We can fire the callback.
1153        callback(pid, returncode, *args)
1154
1155    def remove_child_handler(self, pid):
1156        try:
1157            del self._callbacks[pid]
1158            return True
1159        except KeyError:
1160            return False
1161
1162    def _do_waitpid_all(self):
1163        # Because of signal coalescing, we must keep calling waitpid() as
1164        # long as we're able to reap a child.
1165        while True:
1166            try:
1167                pid, status = os.waitpid(-1, os.WNOHANG)
1168            except ChildProcessError:
1169                # No more child processes exist.
1170                return
1171            else:
1172                if pid == 0:
1173                    # A child process is still alive.
1174                    return
1175
1176                returncode = _compute_returncode(status)
1177
1178            with self._lock:
1179                try:
1180                    callback, args = self._callbacks.pop(pid)
1181                except KeyError:
1182                    # unknown child
1183                    if self._forks:
1184                        # It may not be registered yet.
1185                        self._zombies[pid] = returncode
1186                        if self._loop.get_debug():
1187                            logger.debug('unknown process %s exited '
1188                                         'with returncode %s',
1189                                         pid, returncode)
1190                        continue
1191                    callback = None
1192                else:
1193                    if self._loop.get_debug():
1194                        logger.debug('process %s exited with returncode %s',
1195                                     pid, returncode)
1196
1197            if callback is None:
1198                logger.warning(
1199                    "Caught subprocess termination from unknown pid: "
1200                    "%d -> %d", pid, returncode)
1201            else:
1202                callback(pid, returncode, *args)
1203
1204
1205class MultiLoopChildWatcher(AbstractChildWatcher):
1206    """A watcher that doesn't require running loop in the main thread.
1207
1208    This implementation registers a SIGCHLD signal handler on
1209    instantiation (which may conflict with other code that
1210    install own handler for this signal).
1211
1212    The solution is safe but it has a significant overhead when
1213    handling a big number of processes (*O(n)* each time a
1214    SIGCHLD is received).
1215    """
1216
1217    # Implementation note:
1218    # The class keeps compatibility with AbstractChildWatcher ABC
1219    # To achieve this it has empty attach_loop() method
1220    # and doesn't accept explicit loop argument
1221    # for add_child_handler()/remove_child_handler()
1222    # but retrieves the current loop by get_running_loop()
1223
1224    def __init__(self):
1225        self._callbacks = {}
1226        self._saved_sighandler = None
1227
1228    def is_active(self):
1229        return self._saved_sighandler is not None
1230
1231    def close(self):
1232        self._callbacks.clear()
1233        if self._saved_sighandler is not None:
1234            handler = signal.getsignal(signal.SIGCHLD)
1235            if handler != self._sig_chld:
1236                logger.warning("SIGCHLD handler was changed by outside code")
1237            else:
1238                signal.signal(signal.SIGCHLD, self._saved_sighandler)
1239            self._saved_sighandler = None
1240
1241    def __enter__(self):
1242        return self
1243
1244    def __exit__(self, exc_type, exc_val, exc_tb):
1245        pass
1246
1247    def add_child_handler(self, pid, callback, *args):
1248        loop = events.get_running_loop()
1249        self._callbacks[pid] = (loop, callback, args)
1250
1251        # Prevent a race condition in case the child is already terminated.
1252        self._do_waitpid(pid)
1253
1254    def remove_child_handler(self, pid):
1255        try:
1256            del self._callbacks[pid]
1257            return True
1258        except KeyError:
1259            return False
1260
1261    def attach_loop(self, loop):
1262        # Don't save the loop but initialize itself if called first time
1263        # The reason to do it here is that attach_loop() is called from
1264        # unix policy only for the main thread.
1265        # Main thread is required for subscription on SIGCHLD signal
1266        if self._saved_sighandler is None:
1267            self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1268            if self._saved_sighandler is None:
1269                logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1270                               "restore to default handler on watcher close.")
1271                self._saved_sighandler = signal.SIG_DFL
1272
1273            # Set SA_RESTART to limit EINTR occurrences.
1274            signal.siginterrupt(signal.SIGCHLD, False)
1275
1276    def _do_waitpid_all(self):
1277        for pid in list(self._callbacks):
1278            self._do_waitpid(pid)
1279
1280    def _do_waitpid(self, expected_pid):
1281        assert expected_pid > 0
1282
1283        try:
1284            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1285        except ChildProcessError:
1286            # The child process is already reaped
1287            # (may happen if waitpid() is called elsewhere).
1288            pid = expected_pid
1289            returncode = 255
1290            logger.warning(
1291                "Unknown child process pid %d, will report returncode 255",
1292                pid)
1293            debug_log = False
1294        else:
1295            if pid == 0:
1296                # The child process is still alive.
1297                return
1298
1299            returncode = _compute_returncode(status)
1300            debug_log = True
1301        try:
1302            loop, callback, args = self._callbacks.pop(pid)
1303        except KeyError:  # pragma: no cover
1304            # May happen if .remove_child_handler() is called
1305            # after os.waitpid() returns.
1306            logger.warning("Child watcher got an unexpected pid: %r",
1307                           pid, exc_info=True)
1308        else:
1309            if loop.is_closed():
1310                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1311            else:
1312                if debug_log and loop.get_debug():
1313                    logger.debug('process %s exited with returncode %s',
1314                                 expected_pid, returncode)
1315                loop.call_soon_threadsafe(callback, pid, returncode, *args)
1316
1317    def _sig_chld(self, signum, frame):
1318        try:
1319            self._do_waitpid_all()
1320        except (SystemExit, KeyboardInterrupt):
1321            raise
1322        except BaseException:
1323            logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1324
1325
1326class ThreadedChildWatcher(AbstractChildWatcher):
1327    """Threaded child watcher implementation.
1328
1329    The watcher uses a thread per process
1330    for waiting for the process finish.
1331
1332    It doesn't require subscription on POSIX signal
1333    but a thread creation is not free.
1334
1335    The watcher has O(1) complexity, its performance doesn't depend
1336    on amount of spawn processes.
1337    """
1338
1339    def __init__(self):
1340        self._pid_counter = itertools.count(0)
1341        self._threads = {}
1342
1343    def is_active(self):
1344        return True
1345
1346    def close(self):
1347        self._join_threads()
1348
1349    def _join_threads(self):
1350        """Internal: Join all non-daemon threads"""
1351        threads = [thread for thread in list(self._threads.values())
1352                   if thread.is_alive() and not thread.daemon]
1353        for thread in threads:
1354            thread.join()
1355
1356    def __enter__(self):
1357        return self
1358
1359    def __exit__(self, exc_type, exc_val, exc_tb):
1360        pass
1361
1362    def __del__(self, _warn=warnings.warn):
1363        threads = [thread for thread in list(self._threads.values())
1364                   if thread.is_alive()]
1365        if threads:
1366            _warn(f"{self.__class__} has registered but not finished child processes",
1367                  ResourceWarning,
1368                  source=self)
1369
1370    def add_child_handler(self, pid, callback, *args):
1371        loop = events.get_running_loop()
1372        thread = threading.Thread(target=self._do_waitpid,
1373                                  name=f"waitpid-{next(self._pid_counter)}",
1374                                  args=(loop, pid, callback, args),
1375                                  daemon=True)
1376        self._threads[pid] = thread
1377        thread.start()
1378
1379    def remove_child_handler(self, pid):
1380        # asyncio never calls remove_child_handler() !!!
1381        # The method is no-op but is implemented because
1382        # abstract base classe requires it
1383        return True
1384
1385    def attach_loop(self, loop):
1386        pass
1387
1388    def _do_waitpid(self, loop, expected_pid, callback, args):
1389        assert expected_pid > 0
1390
1391        try:
1392            pid, status = os.waitpid(expected_pid, 0)
1393        except ChildProcessError:
1394            # The child process is already reaped
1395            # (may happen if waitpid() is called elsewhere).
1396            pid = expected_pid
1397            returncode = 255
1398            logger.warning(
1399                "Unknown child process pid %d, will report returncode 255",
1400                pid)
1401        else:
1402            returncode = _compute_returncode(status)
1403            if loop.get_debug():
1404                logger.debug('process %s exited with returncode %s',
1405                             expected_pid, returncode)
1406
1407        if loop.is_closed():
1408            logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1409        else:
1410            loop.call_soon_threadsafe(callback, pid, returncode, *args)
1411
1412        self._threads.pop(expected_pid)
1413
1414
1415class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1416    """UNIX event loop policy with a watcher for child processes."""
1417    _loop_factory = _UnixSelectorEventLoop
1418
1419    def __init__(self):
1420        super().__init__()
1421        self._watcher = None
1422
1423    def _init_watcher(self):
1424        with events._lock:
1425            if self._watcher is None:  # pragma: no branch
1426                self._watcher = ThreadedChildWatcher()
1427                if threading.current_thread() is threading.main_thread():
1428                    self._watcher.attach_loop(self._local._loop)
1429
1430    def set_event_loop(self, loop):
1431        """Set the event loop.
1432
1433        As a side effect, if a child watcher was set before, then calling
1434        .set_event_loop() from the main thread will call .attach_loop(loop) on
1435        the child watcher.
1436        """
1437
1438        super().set_event_loop(loop)
1439
1440        if (self._watcher is not None and
1441                threading.current_thread() is threading.main_thread()):
1442            self._watcher.attach_loop(loop)
1443
1444    def get_child_watcher(self):
1445        """Get the watcher for child processes.
1446
1447        If not yet set, a ThreadedChildWatcher object is automatically created.
1448        """
1449        if self._watcher is None:
1450            self._init_watcher()
1451
1452        return self._watcher
1453
1454    def set_child_watcher(self, watcher):
1455        """Set the watcher for child processes."""
1456
1457        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1458
1459        if self._watcher is not None:
1460            self._watcher.close()
1461
1462        self._watcher = watcher
1463
1464
1465SelectorEventLoop = _UnixSelectorEventLoop
1466DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1467