• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Selector event loop for Unix with signal handling."""
2
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
15
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
27
28
29__all__ = (
30    'SelectorEventLoop',
31    'AbstractChildWatcher', 'SafeChildWatcher',
32    'FastChildWatcher', 'PidfdChildWatcher',
33    'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34    'DefaultEventLoopPolicy',
35)
36
37
38if sys.platform == 'win32':  # pragma: no cover
39    raise ImportError('Signals are not really supported on Windows')
40
41
42def _sighandler_noop(signum, frame):
43    """Dummy signal handler."""
44    pass
45
46
47def waitstatus_to_exitcode(status):
48    try:
49        return os.waitstatus_to_exitcode(status)
50    except ValueError:
51        # The child exited, but we don't understand its status.
52        # This shouldn't happen, but if it does, let's just
53        # return that status; perhaps that helps debug it.
54        return status
55
56
57class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
58    """Unix event loop.
59
60    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
61    """
62
63    def __init__(self, selector=None):
64        super().__init__(selector)
65        self._signal_handlers = {}
66
67    def close(self):
68        super().close()
69        if not sys.is_finalizing():
70            for sig in list(self._signal_handlers):
71                self.remove_signal_handler(sig)
72        else:
73            if self._signal_handlers:
74                warnings.warn(f"Closing the loop {self!r} "
75                              f"on interpreter shutdown "
76                              f"stage, skipping signal handlers removal",
77                              ResourceWarning,
78                              source=self)
79                self._signal_handlers.clear()
80
81    def _process_self_data(self, data):
82        for signum in data:
83            if not signum:
84                # ignore null bytes written by _write_to_self()
85                continue
86            self._handle_signal(signum)
87
88    def add_signal_handler(self, sig, callback, *args):
89        """Add a handler for a signal.  UNIX only.
90
91        Raise ValueError if the signal number is invalid or uncatchable.
92        Raise RuntimeError if there is a problem setting up the handler.
93        """
94        if (coroutines.iscoroutine(callback) or
95                coroutines.iscoroutinefunction(callback)):
96            raise TypeError("coroutines cannot be used "
97                            "with add_signal_handler()")
98        self._check_signal(sig)
99        self._check_closed()
100        try:
101            # set_wakeup_fd() raises ValueError if this is not the
102            # main thread.  By calling it early we ensure that an
103            # event loop running in another thread cannot add a signal
104            # handler.
105            signal.set_wakeup_fd(self._csock.fileno())
106        except (ValueError, OSError) as exc:
107            raise RuntimeError(str(exc))
108
109        handle = events.Handle(callback, args, self, None)
110        self._signal_handlers[sig] = handle
111
112        try:
113            # Register a dummy signal handler to ask Python to write the signal
114            # number in the wakeup file descriptor. _process_self_data() will
115            # read signal numbers from this file descriptor to handle signals.
116            signal.signal(sig, _sighandler_noop)
117
118            # Set SA_RESTART to limit EINTR occurrences.
119            signal.siginterrupt(sig, False)
120        except OSError as exc:
121            del self._signal_handlers[sig]
122            if not self._signal_handlers:
123                try:
124                    signal.set_wakeup_fd(-1)
125                except (ValueError, OSError) as nexc:
126                    logger.info('set_wakeup_fd(-1) failed: %s', nexc)
127
128            if exc.errno == errno.EINVAL:
129                raise RuntimeError(f'sig {sig} cannot be caught')
130            else:
131                raise
132
133    def _handle_signal(self, sig):
134        """Internal helper that is the actual signal handler."""
135        handle = self._signal_handlers.get(sig)
136        if handle is None:
137            return  # Assume it's some race condition.
138        if handle._cancelled:
139            self.remove_signal_handler(sig)  # Remove it properly.
140        else:
141            self._add_callback_signalsafe(handle)
142
143    def remove_signal_handler(self, sig):
144        """Remove a handler for a signal.  UNIX only.
145
146        Return True if a signal handler was removed, False if not.
147        """
148        self._check_signal(sig)
149        try:
150            del self._signal_handlers[sig]
151        except KeyError:
152            return False
153
154        if sig == signal.SIGINT:
155            handler = signal.default_int_handler
156        else:
157            handler = signal.SIG_DFL
158
159        try:
160            signal.signal(sig, handler)
161        except OSError as exc:
162            if exc.errno == errno.EINVAL:
163                raise RuntimeError(f'sig {sig} cannot be caught')
164            else:
165                raise
166
167        if not self._signal_handlers:
168            try:
169                signal.set_wakeup_fd(-1)
170            except (ValueError, OSError) as exc:
171                logger.info('set_wakeup_fd(-1) failed: %s', exc)
172
173        return True
174
175    def _check_signal(self, sig):
176        """Internal helper to validate a signal.
177
178        Raise ValueError if the signal number is invalid or uncatchable.
179        Raise RuntimeError if there is a problem setting up the handler.
180        """
181        if not isinstance(sig, int):
182            raise TypeError(f'sig must be an int, not {sig!r}')
183
184        if sig not in signal.valid_signals():
185            raise ValueError(f'invalid signal number {sig}')
186
187    def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188                                  extra=None):
189        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191    def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192                                   extra=None):
193        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
195    async def _make_subprocess_transport(self, protocol, args, shell,
196                                         stdin, stdout, stderr, bufsize,
197                                         extra=None, **kwargs):
198        with events.get_child_watcher() as watcher:
199            if not watcher.is_active():
200                # Check early.
201                # Raising exception before process creation
202                # prevents subprocess execution if the watcher
203                # is not ready to handle it.
204                raise RuntimeError("asyncio.get_child_watcher() is not activated, "
205                                   "subprocess support is not installed.")
206            waiter = self.create_future()
207            transp = _UnixSubprocessTransport(self, protocol, args, shell,
208                                              stdin, stdout, stderr, bufsize,
209                                              waiter=waiter, extra=extra,
210                                              **kwargs)
211
212            watcher.add_child_handler(transp.get_pid(),
213                                      self._child_watcher_callback, transp)
214            try:
215                await waiter
216            except (SystemExit, KeyboardInterrupt):
217                raise
218            except BaseException:
219                transp.close()
220                await transp._wait()
221                raise
222
223        return transp
224
225    def _child_watcher_callback(self, pid, returncode, transp):
226        self.call_soon_threadsafe(transp._process_exited, returncode)
227
228    async def create_unix_connection(
229            self, protocol_factory, path=None, *,
230            ssl=None, sock=None,
231            server_hostname=None,
232            ssl_handshake_timeout=None):
233        assert server_hostname is None or isinstance(server_hostname, str)
234        if ssl:
235            if server_hostname is None:
236                raise ValueError(
237                    'you have to pass server_hostname when using ssl')
238        else:
239            if server_hostname is not None:
240                raise ValueError('server_hostname is only meaningful with ssl')
241            if ssl_handshake_timeout is not None:
242                raise ValueError(
243                    'ssl_handshake_timeout is only meaningful with ssl')
244
245        if path is not None:
246            if sock is not None:
247                raise ValueError(
248                    'path and sock can not be specified at the same time')
249
250            path = os.fspath(path)
251            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
252            try:
253                sock.setblocking(False)
254                await self.sock_connect(sock, path)
255            except:
256                sock.close()
257                raise
258
259        else:
260            if sock is None:
261                raise ValueError('no path and sock were specified')
262            if (sock.family != socket.AF_UNIX or
263                    sock.type != socket.SOCK_STREAM):
264                raise ValueError(
265                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
266            sock.setblocking(False)
267
268        transport, protocol = await self._create_connection_transport(
269            sock, protocol_factory, ssl, server_hostname,
270            ssl_handshake_timeout=ssl_handshake_timeout)
271        return transport, protocol
272
273    async def create_unix_server(
274            self, protocol_factory, path=None, *,
275            sock=None, backlog=100, ssl=None,
276            ssl_handshake_timeout=None,
277            start_serving=True):
278        if isinstance(ssl, bool):
279            raise TypeError('ssl argument must be an SSLContext or None')
280
281        if ssl_handshake_timeout is not None and not ssl:
282            raise ValueError(
283                'ssl_handshake_timeout is only meaningful with ssl')
284
285        if path is not None:
286            if sock is not None:
287                raise ValueError(
288                    'path and sock can not be specified at the same time')
289
290            path = os.fspath(path)
291            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
292
293            # Check for abstract socket. `str` and `bytes` paths are supported.
294            if path[0] not in (0, '\x00'):
295                try:
296                    if stat.S_ISSOCK(os.stat(path).st_mode):
297                        os.remove(path)
298                except FileNotFoundError:
299                    pass
300                except OSError as err:
301                    # Directory may have permissions only to create socket.
302                    logger.error('Unable to check or remove stale UNIX socket '
303                                 '%r: %r', path, err)
304
305            try:
306                sock.bind(path)
307            except OSError as exc:
308                sock.close()
309                if exc.errno == errno.EADDRINUSE:
310                    # Let's improve the error message by adding
311                    # with what exact address it occurs.
312                    msg = f'Address {path!r} is already in use'
313                    raise OSError(errno.EADDRINUSE, msg) from None
314                else:
315                    raise
316            except:
317                sock.close()
318                raise
319        else:
320            if sock is None:
321                raise ValueError(
322                    'path was not specified, and no sock specified')
323
324            if (sock.family != socket.AF_UNIX or
325                    sock.type != socket.SOCK_STREAM):
326                raise ValueError(
327                    f'A UNIX Domain Stream Socket was expected, got {sock!r}')
328
329        sock.setblocking(False)
330        server = base_events.Server(self, [sock], protocol_factory,
331                                    ssl, backlog, ssl_handshake_timeout)
332        if start_serving:
333            server._start_serving()
334            # Skip one loop iteration so that all 'loop.add_reader'
335            # go through.
336            await tasks.sleep(0)
337
338        return server
339
340    async def _sock_sendfile_native(self, sock, file, offset, count):
341        try:
342            os.sendfile
343        except AttributeError:
344            raise exceptions.SendfileNotAvailableError(
345                "os.sendfile() is not available")
346        try:
347            fileno = file.fileno()
348        except (AttributeError, io.UnsupportedOperation) as err:
349            raise exceptions.SendfileNotAvailableError("not a regular file")
350        try:
351            fsize = os.fstat(fileno).st_size
352        except OSError:
353            raise exceptions.SendfileNotAvailableError("not a regular file")
354        blocksize = count if count else fsize
355        if not blocksize:
356            return 0  # empty file
357
358        fut = self.create_future()
359        self._sock_sendfile_native_impl(fut, None, sock, fileno,
360                                        offset, count, blocksize, 0)
361        return await fut
362
363    def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
364                                   offset, count, blocksize, total_sent):
365        fd = sock.fileno()
366        if registered_fd is not None:
367            # Remove the callback early.  It should be rare that the
368            # selector says the fd is ready but the call still returns
369            # EAGAIN, and I am willing to take a hit in that case in
370            # order to simplify the common case.
371            self.remove_writer(registered_fd)
372        if fut.cancelled():
373            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
374            return
375        if count:
376            blocksize = count - total_sent
377            if blocksize <= 0:
378                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
379                fut.set_result(total_sent)
380                return
381
382        try:
383            sent = os.sendfile(fd, fileno, offset, blocksize)
384        except (BlockingIOError, InterruptedError):
385            if registered_fd is None:
386                self._sock_add_cancellation_callback(fut, sock)
387            self.add_writer(fd, self._sock_sendfile_native_impl, fut,
388                            fd, sock, fileno,
389                            offset, count, blocksize, total_sent)
390        except OSError as exc:
391            if (registered_fd is not None and
392                    exc.errno == errno.ENOTCONN and
393                    type(exc) is not ConnectionError):
394                # If we have an ENOTCONN and this isn't a first call to
395                # sendfile(), i.e. the connection was closed in the middle
396                # of the operation, normalize the error to ConnectionError
397                # to make it consistent across all Posix systems.
398                new_exc = ConnectionError(
399                    "socket is not connected", errno.ENOTCONN)
400                new_exc.__cause__ = exc
401                exc = new_exc
402            if total_sent == 0:
403                # We can get here for different reasons, the main
404                # one being 'file' is not a regular mmap(2)-like
405                # file, in which case we'll fall back on using
406                # plain send().
407                err = exceptions.SendfileNotAvailableError(
408                    "os.sendfile call failed")
409                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
410                fut.set_exception(err)
411            else:
412                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413                fut.set_exception(exc)
414        except (SystemExit, KeyboardInterrupt):
415            raise
416        except BaseException as exc:
417            self._sock_sendfile_update_filepos(fileno, offset, total_sent)
418            fut.set_exception(exc)
419        else:
420            if sent == 0:
421                # EOF
422                self._sock_sendfile_update_filepos(fileno, offset, total_sent)
423                fut.set_result(total_sent)
424            else:
425                offset += sent
426                total_sent += sent
427                if registered_fd is None:
428                    self._sock_add_cancellation_callback(fut, sock)
429                self.add_writer(fd, self._sock_sendfile_native_impl, fut,
430                                fd, sock, fileno,
431                                offset, count, blocksize, total_sent)
432
433    def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
434        if total_sent > 0:
435            os.lseek(fileno, offset, os.SEEK_SET)
436
437    def _sock_add_cancellation_callback(self, fut, sock):
438        def cb(fut):
439            if fut.cancelled():
440                fd = sock.fileno()
441                if fd != -1:
442                    self.remove_writer(fd)
443        fut.add_done_callback(cb)
444
445
446class _UnixReadPipeTransport(transports.ReadTransport):
447
448    max_size = 256 * 1024  # max bytes we read in one event loop iteration
449
450    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
451        super().__init__(extra)
452        self._extra['pipe'] = pipe
453        self._loop = loop
454        self._pipe = pipe
455        self._fileno = pipe.fileno()
456        self._protocol = protocol
457        self._closing = False
458        self._paused = False
459
460        mode = os.fstat(self._fileno).st_mode
461        if not (stat.S_ISFIFO(mode) or
462                stat.S_ISSOCK(mode) or
463                stat.S_ISCHR(mode)):
464            self._pipe = None
465            self._fileno = None
466            self._protocol = None
467            raise ValueError("Pipe transport is for pipes/sockets only.")
468
469        os.set_blocking(self._fileno, False)
470
471        self._loop.call_soon(self._protocol.connection_made, self)
472        # only start reading when connection_made() has been called
473        self._loop.call_soon(self._loop._add_reader,
474                             self._fileno, self._read_ready)
475        if waiter is not None:
476            # only wake up the waiter when connection_made() has been called
477            self._loop.call_soon(futures._set_result_unless_cancelled,
478                                 waiter, None)
479
480    def __repr__(self):
481        info = [self.__class__.__name__]
482        if self._pipe is None:
483            info.append('closed')
484        elif self._closing:
485            info.append('closing')
486        info.append(f'fd={self._fileno}')
487        selector = getattr(self._loop, '_selector', None)
488        if self._pipe is not None and selector is not None:
489            polling = selector_events._test_selector_event(
490                selector, self._fileno, selectors.EVENT_READ)
491            if polling:
492                info.append('polling')
493            else:
494                info.append('idle')
495        elif self._pipe is not None:
496            info.append('open')
497        else:
498            info.append('closed')
499        return '<{}>'.format(' '.join(info))
500
501    def _read_ready(self):
502        try:
503            data = os.read(self._fileno, self.max_size)
504        except (BlockingIOError, InterruptedError):
505            pass
506        except OSError as exc:
507            self._fatal_error(exc, 'Fatal read error on pipe transport')
508        else:
509            if data:
510                self._protocol.data_received(data)
511            else:
512                if self._loop.get_debug():
513                    logger.info("%r was closed by peer", self)
514                self._closing = True
515                self._loop._remove_reader(self._fileno)
516                self._loop.call_soon(self._protocol.eof_received)
517                self._loop.call_soon(self._call_connection_lost, None)
518
519    def pause_reading(self):
520        if self._closing or self._paused:
521            return
522        self._paused = True
523        self._loop._remove_reader(self._fileno)
524        if self._loop.get_debug():
525            logger.debug("%r pauses reading", self)
526
527    def resume_reading(self):
528        if self._closing or not self._paused:
529            return
530        self._paused = False
531        self._loop._add_reader(self._fileno, self._read_ready)
532        if self._loop.get_debug():
533            logger.debug("%r resumes reading", self)
534
535    def set_protocol(self, protocol):
536        self._protocol = protocol
537
538    def get_protocol(self):
539        return self._protocol
540
541    def is_closing(self):
542        return self._closing
543
544    def close(self):
545        if not self._closing:
546            self._close(None)
547
548    def __del__(self, _warn=warnings.warn):
549        if self._pipe is not None:
550            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
551            self._pipe.close()
552
553    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
554        # should be called by exception handler only
555        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
556            if self._loop.get_debug():
557                logger.debug("%r: %s", self, message, exc_info=True)
558        else:
559            self._loop.call_exception_handler({
560                'message': message,
561                'exception': exc,
562                'transport': self,
563                'protocol': self._protocol,
564            })
565        self._close(exc)
566
567    def _close(self, exc):
568        self._closing = True
569        self._loop._remove_reader(self._fileno)
570        self._loop.call_soon(self._call_connection_lost, exc)
571
572    def _call_connection_lost(self, exc):
573        try:
574            self._protocol.connection_lost(exc)
575        finally:
576            self._pipe.close()
577            self._pipe = None
578            self._protocol = None
579            self._loop = None
580
581
582class _UnixWritePipeTransport(transports._FlowControlMixin,
583                              transports.WriteTransport):
584
585    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
586        super().__init__(extra, loop)
587        self._extra['pipe'] = pipe
588        self._pipe = pipe
589        self._fileno = pipe.fileno()
590        self._protocol = protocol
591        self._buffer = bytearray()
592        self._conn_lost = 0
593        self._closing = False  # Set when close() or write_eof() called.
594
595        mode = os.fstat(self._fileno).st_mode
596        is_char = stat.S_ISCHR(mode)
597        is_fifo = stat.S_ISFIFO(mode)
598        is_socket = stat.S_ISSOCK(mode)
599        if not (is_char or is_fifo or is_socket):
600            self._pipe = None
601            self._fileno = None
602            self._protocol = None
603            raise ValueError("Pipe transport is only for "
604                             "pipes, sockets and character devices")
605
606        os.set_blocking(self._fileno, False)
607        self._loop.call_soon(self._protocol.connection_made, self)
608
609        # On AIX, the reader trick (to be notified when the read end of the
610        # socket is closed) only works for sockets. On other platforms it
611        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
612        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
613            # only start reading when connection_made() has been called
614            self._loop.call_soon(self._loop._add_reader,
615                                 self._fileno, self._read_ready)
616
617        if waiter is not None:
618            # only wake up the waiter when connection_made() has been called
619            self._loop.call_soon(futures._set_result_unless_cancelled,
620                                 waiter, None)
621
622    def __repr__(self):
623        info = [self.__class__.__name__]
624        if self._pipe is None:
625            info.append('closed')
626        elif self._closing:
627            info.append('closing')
628        info.append(f'fd={self._fileno}')
629        selector = getattr(self._loop, '_selector', None)
630        if self._pipe is not None and selector is not None:
631            polling = selector_events._test_selector_event(
632                selector, self._fileno, selectors.EVENT_WRITE)
633            if polling:
634                info.append('polling')
635            else:
636                info.append('idle')
637
638            bufsize = self.get_write_buffer_size()
639            info.append(f'bufsize={bufsize}')
640        elif self._pipe is not None:
641            info.append('open')
642        else:
643            info.append('closed')
644        return '<{}>'.format(' '.join(info))
645
646    def get_write_buffer_size(self):
647        return len(self._buffer)
648
649    def _read_ready(self):
650        # Pipe was closed by peer.
651        if self._loop.get_debug():
652            logger.info("%r was closed by peer", self)
653        if self._buffer:
654            self._close(BrokenPipeError())
655        else:
656            self._close()
657
658    def write(self, data):
659        assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
660        if isinstance(data, bytearray):
661            data = memoryview(data)
662        if not data:
663            return
664
665        if self._conn_lost or self._closing:
666            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
667                logger.warning('pipe closed by peer or '
668                               'os.write(pipe, data) raised exception.')
669            self._conn_lost += 1
670            return
671
672        if not self._buffer:
673            # Attempt to send it right away first.
674            try:
675                n = os.write(self._fileno, data)
676            except (BlockingIOError, InterruptedError):
677                n = 0
678            except (SystemExit, KeyboardInterrupt):
679                raise
680            except BaseException as exc:
681                self._conn_lost += 1
682                self._fatal_error(exc, 'Fatal write error on pipe transport')
683                return
684            if n == len(data):
685                return
686            elif n > 0:
687                data = memoryview(data)[n:]
688            self._loop._add_writer(self._fileno, self._write_ready)
689
690        self._buffer += data
691        self._maybe_pause_protocol()
692
693    def _write_ready(self):
694        assert self._buffer, 'Data should not be empty'
695
696        try:
697            n = os.write(self._fileno, self._buffer)
698        except (BlockingIOError, InterruptedError):
699            pass
700        except (SystemExit, KeyboardInterrupt):
701            raise
702        except BaseException as exc:
703            self._buffer.clear()
704            self._conn_lost += 1
705            # Remove writer here, _fatal_error() doesn't it
706            # because _buffer is empty.
707            self._loop._remove_writer(self._fileno)
708            self._fatal_error(exc, 'Fatal write error on pipe transport')
709        else:
710            if n == len(self._buffer):
711                self._buffer.clear()
712                self._loop._remove_writer(self._fileno)
713                self._maybe_resume_protocol()  # May append to buffer.
714                if self._closing:
715                    self._loop._remove_reader(self._fileno)
716                    self._call_connection_lost(None)
717                return
718            elif n > 0:
719                del self._buffer[:n]
720
721    def can_write_eof(self):
722        return True
723
724    def write_eof(self):
725        if self._closing:
726            return
727        assert self._pipe
728        self._closing = True
729        if not self._buffer:
730            self._loop._remove_reader(self._fileno)
731            self._loop.call_soon(self._call_connection_lost, None)
732
733    def set_protocol(self, protocol):
734        self._protocol = protocol
735
736    def get_protocol(self):
737        return self._protocol
738
739    def is_closing(self):
740        return self._closing
741
742    def close(self):
743        if self._pipe is not None and not self._closing:
744            # write_eof is all what we needed to close the write pipe
745            self.write_eof()
746
747    def __del__(self, _warn=warnings.warn):
748        if self._pipe is not None:
749            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
750            self._pipe.close()
751
752    def abort(self):
753        self._close(None)
754
755    def _fatal_error(self, exc, message='Fatal error on pipe transport'):
756        # should be called by exception handler only
757        if isinstance(exc, OSError):
758            if self._loop.get_debug():
759                logger.debug("%r: %s", self, message, exc_info=True)
760        else:
761            self._loop.call_exception_handler({
762                'message': message,
763                'exception': exc,
764                'transport': self,
765                'protocol': self._protocol,
766            })
767        self._close(exc)
768
769    def _close(self, exc=None):
770        self._closing = True
771        if self._buffer:
772            self._loop._remove_writer(self._fileno)
773        self._buffer.clear()
774        self._loop._remove_reader(self._fileno)
775        self._loop.call_soon(self._call_connection_lost, exc)
776
777    def _call_connection_lost(self, exc):
778        try:
779            self._protocol.connection_lost(exc)
780        finally:
781            self._pipe.close()
782            self._pipe = None
783            self._protocol = None
784            self._loop = None
785
786
787class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
788
789    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
790        stdin_w = None
791        if stdin == subprocess.PIPE:
792            # Use a socket pair for stdin, since not all platforms
793            # support selecting read events on the write end of a
794            # socket (which we use in order to detect closing of the
795            # other end).  Notably this is needed on AIX, and works
796            # just fine on other platforms.
797            stdin, stdin_w = socket.socketpair()
798        try:
799            self._proc = subprocess.Popen(
800                args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
801                universal_newlines=False, bufsize=bufsize, **kwargs)
802            if stdin_w is not None:
803                stdin.close()
804                self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
805                stdin_w = None
806        finally:
807            if stdin_w is not None:
808                stdin.close()
809                stdin_w.close()
810
811
812class AbstractChildWatcher:
813    """Abstract base class for monitoring child processes.
814
815    Objects derived from this class monitor a collection of subprocesses and
816    report their termination or interruption by a signal.
817
818    New callbacks are registered with .add_child_handler(). Starting a new
819    process must be done within a 'with' block to allow the watcher to suspend
820    its activity until the new process if fully registered (this is needed to
821    prevent a race condition in some implementations).
822
823    Example:
824        with watcher:
825            proc = subprocess.Popen("sleep 1")
826            watcher.add_child_handler(proc.pid, callback)
827
828    Notes:
829        Implementations of this class must be thread-safe.
830
831        Since child watcher objects may catch the SIGCHLD signal and call
832        waitpid(-1), there should be only one active object per process.
833    """
834
835    def add_child_handler(self, pid, callback, *args):
836        """Register a new child handler.
837
838        Arrange for callback(pid, returncode, *args) to be called when
839        process 'pid' terminates. Specifying another callback for the same
840        process replaces the previous handler.
841
842        Note: callback() must be thread-safe.
843        """
844        raise NotImplementedError()
845
846    def remove_child_handler(self, pid):
847        """Removes the handler for process 'pid'.
848
849        The function returns True if the handler was successfully removed,
850        False if there was nothing to remove."""
851
852        raise NotImplementedError()
853
854    def attach_loop(self, loop):
855        """Attach the watcher to an event loop.
856
857        If the watcher was previously attached to an event loop, then it is
858        first detached before attaching to the new loop.
859
860        Note: loop may be None.
861        """
862        raise NotImplementedError()
863
864    def close(self):
865        """Close the watcher.
866
867        This must be called to make sure that any underlying resource is freed.
868        """
869        raise NotImplementedError()
870
871    def is_active(self):
872        """Return ``True`` if the watcher is active and is used by the event loop.
873
874        Return True if the watcher is installed and ready to handle process exit
875        notifications.
876
877        """
878        raise NotImplementedError()
879
880    def __enter__(self):
881        """Enter the watcher's context and allow starting new processes
882
883        This function must return self"""
884        raise NotImplementedError()
885
886    def __exit__(self, a, b, c):
887        """Exit the watcher's context"""
888        raise NotImplementedError()
889
890
891class PidfdChildWatcher(AbstractChildWatcher):
892    """Child watcher implementation using Linux's pid file descriptors.
893
894    This child watcher polls process file descriptors (pidfds) to await child
895    process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
896    child watcher implementation. It doesn't require signals or threads, doesn't
897    interfere with any processes launched outside the event loop, and scales
898    linearly with the number of subprocesses launched by the event loop. The
899    main disadvantage is that pidfds are specific to Linux, and only work on
900    recent (5.3+) kernels.
901    """
902
903    def __init__(self):
904        self._loop = None
905        self._callbacks = {}
906
907    def __enter__(self):
908        return self
909
910    def __exit__(self, exc_type, exc_value, exc_traceback):
911        pass
912
913    def is_active(self):
914        return self._loop is not None and self._loop.is_running()
915
916    def close(self):
917        self.attach_loop(None)
918
919    def attach_loop(self, loop):
920        if self._loop is not None and loop is None and self._callbacks:
921            warnings.warn(
922                'A loop is being detached '
923                'from a child watcher with pending handlers',
924                RuntimeWarning)
925        for pidfd, _, _ in self._callbacks.values():
926            self._loop._remove_reader(pidfd)
927            os.close(pidfd)
928        self._callbacks.clear()
929        self._loop = loop
930
931    def add_child_handler(self, pid, callback, *args):
932        existing = self._callbacks.get(pid)
933        if existing is not None:
934            self._callbacks[pid] = existing[0], callback, args
935        else:
936            pidfd = os.pidfd_open(pid)
937            self._loop._add_reader(pidfd, self._do_wait, pid)
938            self._callbacks[pid] = pidfd, callback, args
939
940    def _do_wait(self, pid):
941        pidfd, callback, args = self._callbacks.pop(pid)
942        self._loop._remove_reader(pidfd)
943        try:
944            _, status = os.waitpid(pid, 0)
945        except ChildProcessError:
946            # The child process is already reaped
947            # (may happen if waitpid() is called elsewhere).
948            returncode = 255
949            logger.warning(
950                "child process pid %d exit status already read: "
951                " will report returncode 255",
952                pid)
953        else:
954            returncode = waitstatus_to_exitcode(status)
955
956        os.close(pidfd)
957        callback(pid, returncode, *args)
958
959    def remove_child_handler(self, pid):
960        try:
961            pidfd, _, _ = self._callbacks.pop(pid)
962        except KeyError:
963            return False
964        self._loop._remove_reader(pidfd)
965        os.close(pidfd)
966        return True
967
968
969class BaseChildWatcher(AbstractChildWatcher):
970
971    def __init__(self):
972        self._loop = None
973        self._callbacks = {}
974
975    def close(self):
976        self.attach_loop(None)
977
978    def is_active(self):
979        return self._loop is not None and self._loop.is_running()
980
981    def _do_waitpid(self, expected_pid):
982        raise NotImplementedError()
983
984    def _do_waitpid_all(self):
985        raise NotImplementedError()
986
987    def attach_loop(self, loop):
988        assert loop is None or isinstance(loop, events.AbstractEventLoop)
989
990        if self._loop is not None and loop is None and self._callbacks:
991            warnings.warn(
992                'A loop is being detached '
993                'from a child watcher with pending handlers',
994                RuntimeWarning)
995
996        if self._loop is not None:
997            self._loop.remove_signal_handler(signal.SIGCHLD)
998
999        self._loop = loop
1000        if loop is not None:
1001            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1002
1003            # Prevent a race condition in case a child terminated
1004            # during the switch.
1005            self._do_waitpid_all()
1006
1007    def _sig_chld(self):
1008        try:
1009            self._do_waitpid_all()
1010        except (SystemExit, KeyboardInterrupt):
1011            raise
1012        except BaseException as exc:
1013            # self._loop should always be available here
1014            # as '_sig_chld' is added as a signal handler
1015            # in 'attach_loop'
1016            self._loop.call_exception_handler({
1017                'message': 'Unknown exception in SIGCHLD handler',
1018                'exception': exc,
1019            })
1020
1021
1022class SafeChildWatcher(BaseChildWatcher):
1023    """'Safe' child watcher implementation.
1024
1025    This implementation avoids disrupting other code spawning processes by
1026    polling explicitly each process in the SIGCHLD handler instead of calling
1027    os.waitpid(-1).
1028
1029    This is a safe solution but it has a significant overhead when handling a
1030    big number of children (O(n) each time SIGCHLD is raised)
1031    """
1032
1033    def close(self):
1034        self._callbacks.clear()
1035        super().close()
1036
1037    def __enter__(self):
1038        return self
1039
1040    def __exit__(self, a, b, c):
1041        pass
1042
1043    def add_child_handler(self, pid, callback, *args):
1044        self._callbacks[pid] = (callback, args)
1045
1046        # Prevent a race condition in case the child is already terminated.
1047        self._do_waitpid(pid)
1048
1049    def remove_child_handler(self, pid):
1050        try:
1051            del self._callbacks[pid]
1052            return True
1053        except KeyError:
1054            return False
1055
1056    def _do_waitpid_all(self):
1057
1058        for pid in list(self._callbacks):
1059            self._do_waitpid(pid)
1060
1061    def _do_waitpid(self, expected_pid):
1062        assert expected_pid > 0
1063
1064        try:
1065            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1066        except ChildProcessError:
1067            # The child process is already reaped
1068            # (may happen if waitpid() is called elsewhere).
1069            pid = expected_pid
1070            returncode = 255
1071            logger.warning(
1072                "Unknown child process pid %d, will report returncode 255",
1073                pid)
1074        else:
1075            if pid == 0:
1076                # The child process is still alive.
1077                return
1078
1079            returncode = waitstatus_to_exitcode(status)
1080            if self._loop.get_debug():
1081                logger.debug('process %s exited with returncode %s',
1082                             expected_pid, returncode)
1083
1084        try:
1085            callback, args = self._callbacks.pop(pid)
1086        except KeyError:  # pragma: no cover
1087            # May happen if .remove_child_handler() is called
1088            # after os.waitpid() returns.
1089            if self._loop.get_debug():
1090                logger.warning("Child watcher got an unexpected pid: %r",
1091                               pid, exc_info=True)
1092        else:
1093            callback(pid, returncode, *args)
1094
1095
1096class FastChildWatcher(BaseChildWatcher):
1097    """'Fast' child watcher implementation.
1098
1099    This implementation reaps every terminated processes by calling
1100    os.waitpid(-1) directly, possibly breaking other code spawning processes
1101    and waiting for their termination.
1102
1103    There is no noticeable overhead when handling a big number of children
1104    (O(1) each time a child terminates).
1105    """
1106    def __init__(self):
1107        super().__init__()
1108        self._lock = threading.Lock()
1109        self._zombies = {}
1110        self._forks = 0
1111
1112    def close(self):
1113        self._callbacks.clear()
1114        self._zombies.clear()
1115        super().close()
1116
1117    def __enter__(self):
1118        with self._lock:
1119            self._forks += 1
1120
1121            return self
1122
1123    def __exit__(self, a, b, c):
1124        with self._lock:
1125            self._forks -= 1
1126
1127            if self._forks or not self._zombies:
1128                return
1129
1130            collateral_victims = str(self._zombies)
1131            self._zombies.clear()
1132
1133        logger.warning(
1134            "Caught subprocesses termination from unknown pids: %s",
1135            collateral_victims)
1136
1137    def add_child_handler(self, pid, callback, *args):
1138        assert self._forks, "Must use the context manager"
1139
1140        with self._lock:
1141            try:
1142                returncode = self._zombies.pop(pid)
1143            except KeyError:
1144                # The child is running.
1145                self._callbacks[pid] = callback, args
1146                return
1147
1148        # The child is dead already. We can fire the callback.
1149        callback(pid, returncode, *args)
1150
1151    def remove_child_handler(self, pid):
1152        try:
1153            del self._callbacks[pid]
1154            return True
1155        except KeyError:
1156            return False
1157
1158    def _do_waitpid_all(self):
1159        # Because of signal coalescing, we must keep calling waitpid() as
1160        # long as we're able to reap a child.
1161        while True:
1162            try:
1163                pid, status = os.waitpid(-1, os.WNOHANG)
1164            except ChildProcessError:
1165                # No more child processes exist.
1166                return
1167            else:
1168                if pid == 0:
1169                    # A child process is still alive.
1170                    return
1171
1172                returncode = waitstatus_to_exitcode(status)
1173
1174            with self._lock:
1175                try:
1176                    callback, args = self._callbacks.pop(pid)
1177                except KeyError:
1178                    # unknown child
1179                    if self._forks:
1180                        # It may not be registered yet.
1181                        self._zombies[pid] = returncode
1182                        if self._loop.get_debug():
1183                            logger.debug('unknown process %s exited '
1184                                         'with returncode %s',
1185                                         pid, returncode)
1186                        continue
1187                    callback = None
1188                else:
1189                    if self._loop.get_debug():
1190                        logger.debug('process %s exited with returncode %s',
1191                                     pid, returncode)
1192
1193            if callback is None:
1194                logger.warning(
1195                    "Caught subprocess termination from unknown pid: "
1196                    "%d -> %d", pid, returncode)
1197            else:
1198                callback(pid, returncode, *args)
1199
1200
1201class MultiLoopChildWatcher(AbstractChildWatcher):
1202    """A watcher that doesn't require running loop in the main thread.
1203
1204    This implementation registers a SIGCHLD signal handler on
1205    instantiation (which may conflict with other code that
1206    install own handler for this signal).
1207
1208    The solution is safe but it has a significant overhead when
1209    handling a big number of processes (*O(n)* each time a
1210    SIGCHLD is received).
1211    """
1212
1213    # Implementation note:
1214    # The class keeps compatibility with AbstractChildWatcher ABC
1215    # To achieve this it has empty attach_loop() method
1216    # and doesn't accept explicit loop argument
1217    # for add_child_handler()/remove_child_handler()
1218    # but retrieves the current loop by get_running_loop()
1219
1220    def __init__(self):
1221        self._callbacks = {}
1222        self._saved_sighandler = None
1223
1224    def is_active(self):
1225        return self._saved_sighandler is not None
1226
1227    def close(self):
1228        self._callbacks.clear()
1229        if self._saved_sighandler is None:
1230            return
1231
1232        handler = signal.getsignal(signal.SIGCHLD)
1233        if handler != self._sig_chld:
1234            logger.warning("SIGCHLD handler was changed by outside code")
1235        else:
1236            signal.signal(signal.SIGCHLD, self._saved_sighandler)
1237        self._saved_sighandler = None
1238
1239    def __enter__(self):
1240        return self
1241
1242    def __exit__(self, exc_type, exc_val, exc_tb):
1243        pass
1244
1245    def add_child_handler(self, pid, callback, *args):
1246        loop = events.get_running_loop()
1247        self._callbacks[pid] = (loop, callback, args)
1248
1249        # Prevent a race condition in case the child is already terminated.
1250        self._do_waitpid(pid)
1251
1252    def remove_child_handler(self, pid):
1253        try:
1254            del self._callbacks[pid]
1255            return True
1256        except KeyError:
1257            return False
1258
1259    def attach_loop(self, loop):
1260        # Don't save the loop but initialize itself if called first time
1261        # The reason to do it here is that attach_loop() is called from
1262        # unix policy only for the main thread.
1263        # Main thread is required for subscription on SIGCHLD signal
1264        if self._saved_sighandler is not None:
1265            return
1266
1267        self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1268        if self._saved_sighandler is None:
1269            logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1270                           "restore to default handler on watcher close.")
1271            self._saved_sighandler = signal.SIG_DFL
1272
1273        # Set SA_RESTART to limit EINTR occurrences.
1274        signal.siginterrupt(signal.SIGCHLD, False)
1275
1276    def _do_waitpid_all(self):
1277        for pid in list(self._callbacks):
1278            self._do_waitpid(pid)
1279
1280    def _do_waitpid(self, expected_pid):
1281        assert expected_pid > 0
1282
1283        try:
1284            pid, status = os.waitpid(expected_pid, os.WNOHANG)
1285        except ChildProcessError:
1286            # The child process is already reaped
1287            # (may happen if waitpid() is called elsewhere).
1288            pid = expected_pid
1289            returncode = 255
1290            logger.warning(
1291                "Unknown child process pid %d, will report returncode 255",
1292                pid)
1293            debug_log = False
1294        else:
1295            if pid == 0:
1296                # The child process is still alive.
1297                return
1298
1299            returncode = waitstatus_to_exitcode(status)
1300            debug_log = True
1301        try:
1302            loop, callback, args = self._callbacks.pop(pid)
1303        except KeyError:  # pragma: no cover
1304            # May happen if .remove_child_handler() is called
1305            # after os.waitpid() returns.
1306            logger.warning("Child watcher got an unexpected pid: %r",
1307                           pid, exc_info=True)
1308        else:
1309            if loop.is_closed():
1310                logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1311            else:
1312                if debug_log and loop.get_debug():
1313                    logger.debug('process %s exited with returncode %s',
1314                                 expected_pid, returncode)
1315                loop.call_soon_threadsafe(callback, pid, returncode, *args)
1316
1317    def _sig_chld(self, signum, frame):
1318        try:
1319            self._do_waitpid_all()
1320        except (SystemExit, KeyboardInterrupt):
1321            raise
1322        except BaseException:
1323            logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1324
1325
1326class ThreadedChildWatcher(AbstractChildWatcher):
1327    """Threaded child watcher implementation.
1328
1329    The watcher uses a thread per process
1330    for waiting for the process finish.
1331
1332    It doesn't require subscription on POSIX signal
1333    but a thread creation is not free.
1334
1335    The watcher has O(1) complexity, its performance doesn't depend
1336    on amount of spawn processes.
1337    """
1338
1339    def __init__(self):
1340        self._pid_counter = itertools.count(0)
1341        self._threads = {}
1342
1343    def is_active(self):
1344        return True
1345
1346    def close(self):
1347        self._join_threads()
1348
1349    def _join_threads(self):
1350        """Internal: Join all non-daemon threads"""
1351        threads = [thread for thread in list(self._threads.values())
1352                   if thread.is_alive() and not thread.daemon]
1353        for thread in threads:
1354            thread.join()
1355
1356    def __enter__(self):
1357        return self
1358
1359    def __exit__(self, exc_type, exc_val, exc_tb):
1360        pass
1361
1362    def __del__(self, _warn=warnings.warn):
1363        threads = [thread for thread in list(self._threads.values())
1364                   if thread.is_alive()]
1365        if threads:
1366            _warn(f"{self.__class__} has registered but not finished child processes",
1367                  ResourceWarning,
1368                  source=self)
1369
1370    def add_child_handler(self, pid, callback, *args):
1371        loop = events.get_running_loop()
1372        thread = threading.Thread(target=self._do_waitpid,
1373                                  name=f"waitpid-{next(self._pid_counter)}",
1374                                  args=(loop, pid, callback, args),
1375                                  daemon=True)
1376        self._threads[pid] = thread
1377        thread.start()
1378
1379    def remove_child_handler(self, pid):
1380        # asyncio never calls remove_child_handler() !!!
1381        # The method is no-op but is implemented because
1382        # abstract base classes require it.
1383        return True
1384
1385    def attach_loop(self, loop):
1386        pass
1387
1388    def _do_waitpid(self, loop, expected_pid, callback, args):
1389        assert expected_pid > 0
1390
1391        try:
1392            pid, status = os.waitpid(expected_pid, 0)
1393        except ChildProcessError:
1394            # The child process is already reaped
1395            # (may happen if waitpid() is called elsewhere).
1396            pid = expected_pid
1397            returncode = 255
1398            logger.warning(
1399                "Unknown child process pid %d, will report returncode 255",
1400                pid)
1401        else:
1402            returncode = waitstatus_to_exitcode(status)
1403            if loop.get_debug():
1404                logger.debug('process %s exited with returncode %s',
1405                             expected_pid, returncode)
1406
1407        if loop.is_closed():
1408            logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1409        else:
1410            loop.call_soon_threadsafe(callback, pid, returncode, *args)
1411
1412        self._threads.pop(expected_pid)
1413
1414
1415class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1416    """UNIX event loop policy with a watcher for child processes."""
1417    _loop_factory = _UnixSelectorEventLoop
1418
1419    def __init__(self):
1420        super().__init__()
1421        self._watcher = None
1422
1423    def _init_watcher(self):
1424        with events._lock:
1425            if self._watcher is None:  # pragma: no branch
1426                self._watcher = ThreadedChildWatcher()
1427                if threading.current_thread() is threading.main_thread():
1428                    self._watcher.attach_loop(self._local._loop)
1429
1430    def set_event_loop(self, loop):
1431        """Set the event loop.
1432
1433        As a side effect, if a child watcher was set before, then calling
1434        .set_event_loop() from the main thread will call .attach_loop(loop) on
1435        the child watcher.
1436        """
1437
1438        super().set_event_loop(loop)
1439
1440        if (self._watcher is not None and
1441                threading.current_thread() is threading.main_thread()):
1442            self._watcher.attach_loop(loop)
1443
1444    def get_child_watcher(self):
1445        """Get the watcher for child processes.
1446
1447        If not yet set, a ThreadedChildWatcher object is automatically created.
1448        """
1449        if self._watcher is None:
1450            self._init_watcher()
1451
1452        return self._watcher
1453
1454    def set_child_watcher(self, watcher):
1455        """Set the watcher for child processes."""
1456
1457        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1458
1459        if self._watcher is not None:
1460            self._watcher.close()
1461
1462        self._watcher = watcher
1463
1464
1465SelectorEventLoop = _UnixSelectorEventLoop
1466DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
1467