• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer.  For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17    import ssl
18except ImportError:  # pragma: no cover
19    ssl = None
20
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
30
31
32def _test_selector_event(selector, fd, event):
33    # Test if the selector is monitoring 'event' events
34    # for the file descriptor 'fd'.
35    try:
36        key = selector.get_key(fd)
37    except KeyError:
38        return False
39    else:
40        return bool(key.events & event)
41
42
43def _check_ssl_socket(sock):
44    if ssl is not None and isinstance(sock, ssl.SSLSocket):
45        raise TypeError("Socket cannot be of type SSLSocket")
46
47
48class BaseSelectorEventLoop(base_events.BaseEventLoop):
49    """Selector event loop.
50
51    See events.EventLoop for API specification.
52    """
53
54    def __init__(self, selector=None):
55        super().__init__()
56
57        if selector is None:
58            selector = selectors.DefaultSelector()
59        logger.debug('Using selector: %s', selector.__class__.__name__)
60        self._selector = selector
61        self._make_self_pipe()
62        self._transports = weakref.WeakValueDictionary()
63
64    def _make_socket_transport(self, sock, protocol, waiter=None, *,
65                               extra=None, server=None):
66        return _SelectorSocketTransport(self, sock, protocol, waiter,
67                                        extra, server)
68
69    def _make_ssl_transport(
70            self, rawsock, protocol, sslcontext, waiter=None,
71            *, server_side=False, server_hostname=None,
72            extra=None, server=None,
73            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
74        ssl_protocol = sslproto.SSLProtocol(
75                self, protocol, sslcontext, waiter,
76                server_side, server_hostname,
77                ssl_handshake_timeout=ssl_handshake_timeout)
78        _SelectorSocketTransport(self, rawsock, ssl_protocol,
79                                 extra=extra, server=server)
80        return ssl_protocol._app_transport
81
82    def _make_datagram_transport(self, sock, protocol,
83                                 address=None, waiter=None, extra=None):
84        return _SelectorDatagramTransport(self, sock, protocol,
85                                          address, waiter, extra)
86
87    def close(self):
88        if self.is_running():
89            raise RuntimeError("Cannot close a running event loop")
90        if self.is_closed():
91            return
92        self._close_self_pipe()
93        super().close()
94        if self._selector is not None:
95            self._selector.close()
96            self._selector = None
97
98    def _close_self_pipe(self):
99        self._remove_reader(self._ssock.fileno())
100        self._ssock.close()
101        self._ssock = None
102        self._csock.close()
103        self._csock = None
104        self._internal_fds -= 1
105
106    def _make_self_pipe(self):
107        # A self-socket, really. :-)
108        self._ssock, self._csock = socket.socketpair()
109        self._ssock.setblocking(False)
110        self._csock.setblocking(False)
111        self._internal_fds += 1
112        self._add_reader(self._ssock.fileno(), self._read_from_self)
113
114    def _process_self_data(self, data):
115        pass
116
117    def _read_from_self(self):
118        while True:
119            try:
120                data = self._ssock.recv(4096)
121                if not data:
122                    break
123                self._process_self_data(data)
124            except InterruptedError:
125                continue
126            except BlockingIOError:
127                break
128
129    def _write_to_self(self):
130        # This may be called from a different thread, possibly after
131        # _close_self_pipe() has been called or even while it is
132        # running.  Guard for self._csock being None or closed.  When
133        # a socket is closed, send() raises OSError (with errno set to
134        # EBADF, but let's not rely on the exact error code).
135        csock = self._csock
136        if csock is not None:
137            try:
138                csock.send(b'\0')
139            except OSError:
140                if self._debug:
141                    logger.debug("Fail to write a null byte into the "
142                                 "self-pipe socket",
143                                 exc_info=True)
144
145    def _start_serving(self, protocol_factory, sock,
146                       sslcontext=None, server=None, backlog=100,
147                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
148        self._add_reader(sock.fileno(), self._accept_connection,
149                         protocol_factory, sock, sslcontext, server, backlog,
150                         ssl_handshake_timeout)
151
152    def _accept_connection(
153            self, protocol_factory, sock,
154            sslcontext=None, server=None, backlog=100,
155            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
156        # This method is only called once for each event loop tick where the
157        # listening socket has triggered an EVENT_READ. There may be multiple
158        # connections waiting for an .accept() so it is called in a loop.
159        # See https://bugs.python.org/issue27906 for more details.
160        for _ in range(backlog):
161            try:
162                conn, addr = sock.accept()
163                if self._debug:
164                    logger.debug("%r got a new connection from %r: %r",
165                                 server, addr, conn)
166                conn.setblocking(False)
167            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
168                # Early exit because the socket accept buffer is empty.
169                return None
170            except OSError as exc:
171                # There's nowhere to send the error, so just log it.
172                if exc.errno in (errno.EMFILE, errno.ENFILE,
173                                 errno.ENOBUFS, errno.ENOMEM):
174                    # Some platforms (e.g. Linux keep reporting the FD as
175                    # ready, so we remove the read handler temporarily.
176                    # We'll try again in a while.
177                    self.call_exception_handler({
178                        'message': 'socket.accept() out of system resource',
179                        'exception': exc,
180                        'socket': trsock.TransportSocket(sock),
181                    })
182                    self._remove_reader(sock.fileno())
183                    self.call_later(constants.ACCEPT_RETRY_DELAY,
184                                    self._start_serving,
185                                    protocol_factory, sock, sslcontext, server,
186                                    backlog, ssl_handshake_timeout)
187                else:
188                    raise  # The event loop will catch, log and ignore it.
189            else:
190                extra = {'peername': addr}
191                accept = self._accept_connection2(
192                    protocol_factory, conn, extra, sslcontext, server,
193                    ssl_handshake_timeout)
194                self.create_task(accept)
195
196    async def _accept_connection2(
197            self, protocol_factory, conn, extra,
198            sslcontext=None, server=None,
199            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
200        protocol = None
201        transport = None
202        try:
203            protocol = protocol_factory()
204            waiter = self.create_future()
205            if sslcontext:
206                transport = self._make_ssl_transport(
207                    conn, protocol, sslcontext, waiter=waiter,
208                    server_side=True, extra=extra, server=server,
209                    ssl_handshake_timeout=ssl_handshake_timeout)
210            else:
211                transport = self._make_socket_transport(
212                    conn, protocol, waiter=waiter, extra=extra,
213                    server=server)
214
215            try:
216                await waiter
217            except BaseException:
218                transport.close()
219                raise
220                # It's now up to the protocol to handle the connection.
221
222        except (SystemExit, KeyboardInterrupt):
223            raise
224        except BaseException as exc:
225            if self._debug:
226                context = {
227                    'message':
228                        'Error on transport creation for incoming connection',
229                    'exception': exc,
230                }
231                if protocol is not None:
232                    context['protocol'] = protocol
233                if transport is not None:
234                    context['transport'] = transport
235                self.call_exception_handler(context)
236
237    def _ensure_fd_no_transport(self, fd):
238        fileno = fd
239        if not isinstance(fileno, int):
240            try:
241                fileno = int(fileno.fileno())
242            except (AttributeError, TypeError, ValueError):
243                # This code matches selectors._fileobj_to_fd function.
244                raise ValueError(f"Invalid file object: {fd!r}") from None
245        try:
246            transport = self._transports[fileno]
247        except KeyError:
248            pass
249        else:
250            if not transport.is_closing():
251                raise RuntimeError(
252                    f'File descriptor {fd!r} is used by transport '
253                    f'{transport!r}')
254
255    def _add_reader(self, fd, callback, *args):
256        self._check_closed()
257        handle = events.Handle(callback, args, self, None)
258        try:
259            key = self._selector.get_key(fd)
260        except KeyError:
261            self._selector.register(fd, selectors.EVENT_READ,
262                                    (handle, None))
263        else:
264            mask, (reader, writer) = key.events, key.data
265            self._selector.modify(fd, mask | selectors.EVENT_READ,
266                                  (handle, writer))
267            if reader is not None:
268                reader.cancel()
269
270    def _remove_reader(self, fd):
271        if self.is_closed():
272            return False
273        try:
274            key = self._selector.get_key(fd)
275        except KeyError:
276            return False
277        else:
278            mask, (reader, writer) = key.events, key.data
279            mask &= ~selectors.EVENT_READ
280            if not mask:
281                self._selector.unregister(fd)
282            else:
283                self._selector.modify(fd, mask, (None, writer))
284
285            if reader is not None:
286                reader.cancel()
287                return True
288            else:
289                return False
290
291    def _add_writer(self, fd, callback, *args):
292        self._check_closed()
293        handle = events.Handle(callback, args, self, None)
294        try:
295            key = self._selector.get_key(fd)
296        except KeyError:
297            self._selector.register(fd, selectors.EVENT_WRITE,
298                                    (None, handle))
299        else:
300            mask, (reader, writer) = key.events, key.data
301            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
302                                  (reader, handle))
303            if writer is not None:
304                writer.cancel()
305
306    def _remove_writer(self, fd):
307        """Remove a writer callback."""
308        if self.is_closed():
309            return False
310        try:
311            key = self._selector.get_key(fd)
312        except KeyError:
313            return False
314        else:
315            mask, (reader, writer) = key.events, key.data
316            # Remove both writer and connector.
317            mask &= ~selectors.EVENT_WRITE
318            if not mask:
319                self._selector.unregister(fd)
320            else:
321                self._selector.modify(fd, mask, (reader, None))
322
323            if writer is not None:
324                writer.cancel()
325                return True
326            else:
327                return False
328
329    def add_reader(self, fd, callback, *args):
330        """Add a reader callback."""
331        self._ensure_fd_no_transport(fd)
332        return self._add_reader(fd, callback, *args)
333
334    def remove_reader(self, fd):
335        """Remove a reader callback."""
336        self._ensure_fd_no_transport(fd)
337        return self._remove_reader(fd)
338
339    def add_writer(self, fd, callback, *args):
340        """Add a writer callback.."""
341        self._ensure_fd_no_transport(fd)
342        return self._add_writer(fd, callback, *args)
343
344    def remove_writer(self, fd):
345        """Remove a writer callback."""
346        self._ensure_fd_no_transport(fd)
347        return self._remove_writer(fd)
348
349    async def sock_recv(self, sock, n):
350        """Receive data from the socket.
351
352        The return value is a bytes object representing the data received.
353        The maximum amount of data to be received at once is specified by
354        nbytes.
355        """
356        _check_ssl_socket(sock)
357        if self._debug and sock.gettimeout() != 0:
358            raise ValueError("the socket must be non-blocking")
359        try:
360            return sock.recv(n)
361        except (BlockingIOError, InterruptedError):
362            pass
363        fut = self.create_future()
364        fd = sock.fileno()
365        self.add_reader(fd, self._sock_recv, fut, sock, n)
366        fut.add_done_callback(
367            functools.partial(self._sock_read_done, fd))
368        return await fut
369
370    def _sock_read_done(self, fd, fut):
371        self.remove_reader(fd)
372
373    def _sock_recv(self, fut, sock, n):
374        # _sock_recv() can add itself as an I/O callback if the operation can't
375        # be done immediately. Don't use it directly, call sock_recv().
376        if fut.done():
377            return
378        try:
379            data = sock.recv(n)
380        except (BlockingIOError, InterruptedError):
381            return  # try again next time
382        except (SystemExit, KeyboardInterrupt):
383            raise
384        except BaseException as exc:
385            fut.set_exception(exc)
386        else:
387            fut.set_result(data)
388
389    async def sock_recv_into(self, sock, buf):
390        """Receive data from the socket.
391
392        The received data is written into *buf* (a writable buffer).
393        The return value is the number of bytes written.
394        """
395        _check_ssl_socket(sock)
396        if self._debug and sock.gettimeout() != 0:
397            raise ValueError("the socket must be non-blocking")
398        try:
399            return sock.recv_into(buf)
400        except (BlockingIOError, InterruptedError):
401            pass
402        fut = self.create_future()
403        fd = sock.fileno()
404        self.add_reader(fd, self._sock_recv_into, fut, sock, buf)
405        fut.add_done_callback(
406            functools.partial(self._sock_read_done, fd))
407        return await fut
408
409    def _sock_recv_into(self, fut, sock, buf):
410        # _sock_recv_into() can add itself as an I/O callback if the operation
411        # can't be done immediately. Don't use it directly, call
412        # sock_recv_into().
413        if fut.done():
414            return
415        try:
416            nbytes = sock.recv_into(buf)
417        except (BlockingIOError, InterruptedError):
418            return  # try again next time
419        except (SystemExit, KeyboardInterrupt):
420            raise
421        except BaseException as exc:
422            fut.set_exception(exc)
423        else:
424            fut.set_result(nbytes)
425
426    async def sock_sendall(self, sock, data):
427        """Send data to the socket.
428
429        The socket must be connected to a remote socket. This method continues
430        to send data from data until either all data has been sent or an
431        error occurs. None is returned on success. On error, an exception is
432        raised, and there is no way to determine how much data, if any, was
433        successfully processed by the receiving end of the connection.
434        """
435        _check_ssl_socket(sock)
436        if self._debug and sock.gettimeout() != 0:
437            raise ValueError("the socket must be non-blocking")
438        try:
439            n = sock.send(data)
440        except (BlockingIOError, InterruptedError):
441            n = 0
442
443        if n == len(data):
444            # all data sent
445            return
446
447        fut = self.create_future()
448        fd = sock.fileno()
449        fut.add_done_callback(
450            functools.partial(self._sock_write_done, fd))
451        # use a trick with a list in closure to store a mutable state
452        self.add_writer(fd, self._sock_sendall, fut, sock,
453                        memoryview(data), [n])
454        return await fut
455
456    def _sock_sendall(self, fut, sock, view, pos):
457        if fut.done():
458            # Future cancellation can be scheduled on previous loop iteration
459            return
460        start = pos[0]
461        try:
462            n = sock.send(view[start:])
463        except (BlockingIOError, InterruptedError):
464            return
465        except (SystemExit, KeyboardInterrupt):
466            raise
467        except BaseException as exc:
468            fut.set_exception(exc)
469            return
470
471        start += n
472
473        if start == len(view):
474            fut.set_result(None)
475        else:
476            pos[0] = start
477
478    async def sock_connect(self, sock, address):
479        """Connect to a remote socket at address.
480
481        This method is a coroutine.
482        """
483        _check_ssl_socket(sock)
484        if self._debug and sock.gettimeout() != 0:
485            raise ValueError("the socket must be non-blocking")
486
487        if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
488            resolved = await self._ensure_resolved(
489                address, family=sock.family, proto=sock.proto, loop=self)
490            _, _, _, _, address = resolved[0]
491
492        fut = self.create_future()
493        self._sock_connect(fut, sock, address)
494        return await fut
495
496    def _sock_connect(self, fut, sock, address):
497        fd = sock.fileno()
498        try:
499            sock.connect(address)
500        except (BlockingIOError, InterruptedError):
501            # Issue #23618: When the C function connect() fails with EINTR, the
502            # connection runs in background. We have to wait until the socket
503            # becomes writable to be notified when the connection succeed or
504            # fails.
505            fut.add_done_callback(
506                functools.partial(self._sock_write_done, fd))
507            self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
508        except (SystemExit, KeyboardInterrupt):
509            raise
510        except BaseException as exc:
511            fut.set_exception(exc)
512        else:
513            fut.set_result(None)
514
515    def _sock_write_done(self, fd, fut):
516        self.remove_writer(fd)
517
518    def _sock_connect_cb(self, fut, sock, address):
519        if fut.done():
520            return
521
522        try:
523            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
524            if err != 0:
525                # Jump to any except clause below.
526                raise OSError(err, f'Connect call failed {address}')
527        except (BlockingIOError, InterruptedError):
528            # socket is still registered, the callback will be retried later
529            pass
530        except (SystemExit, KeyboardInterrupt):
531            raise
532        except BaseException as exc:
533            fut.set_exception(exc)
534        else:
535            fut.set_result(None)
536
537    async def sock_accept(self, sock):
538        """Accept a connection.
539
540        The socket must be bound to an address and listening for connections.
541        The return value is a pair (conn, address) where conn is a new socket
542        object usable to send and receive data on the connection, and address
543        is the address bound to the socket on the other end of the connection.
544        """
545        _check_ssl_socket(sock)
546        if self._debug and sock.gettimeout() != 0:
547            raise ValueError("the socket must be non-blocking")
548        fut = self.create_future()
549        self._sock_accept(fut, False, sock)
550        return await fut
551
552    def _sock_accept(self, fut, registered, sock):
553        fd = sock.fileno()
554        if registered:
555            self.remove_reader(fd)
556        if fut.done():
557            return
558        try:
559            conn, address = sock.accept()
560            conn.setblocking(False)
561        except (BlockingIOError, InterruptedError):
562            self.add_reader(fd, self._sock_accept, fut, True, sock)
563        except (SystemExit, KeyboardInterrupt):
564            raise
565        except BaseException as exc:
566            fut.set_exception(exc)
567        else:
568            fut.set_result((conn, address))
569
570    async def _sendfile_native(self, transp, file, offset, count):
571        del self._transports[transp._sock_fd]
572        resume_reading = transp.is_reading()
573        transp.pause_reading()
574        await transp._make_empty_waiter()
575        try:
576            return await self.sock_sendfile(transp._sock, file, offset, count,
577                                            fallback=False)
578        finally:
579            transp._reset_empty_waiter()
580            if resume_reading:
581                transp.resume_reading()
582            self._transports[transp._sock_fd] = transp
583
584    def _process_events(self, event_list):
585        for key, mask in event_list:
586            fileobj, (reader, writer) = key.fileobj, key.data
587            if mask & selectors.EVENT_READ and reader is not None:
588                if reader._cancelled:
589                    self._remove_reader(fileobj)
590                else:
591                    self._add_callback(reader)
592            if mask & selectors.EVENT_WRITE and writer is not None:
593                if writer._cancelled:
594                    self._remove_writer(fileobj)
595                else:
596                    self._add_callback(writer)
597
598    def _stop_serving(self, sock):
599        self._remove_reader(sock.fileno())
600        sock.close()
601
602
603class _SelectorTransport(transports._FlowControlMixin,
604                         transports.Transport):
605
606    max_size = 256 * 1024  # Buffer size passed to recv().
607
608    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
609
610    # Attribute used in the destructor: it must be set even if the constructor
611    # is not called (see _SelectorSslTransport which may start by raising an
612    # exception)
613    _sock = None
614
615    def __init__(self, loop, sock, protocol, extra=None, server=None):
616        super().__init__(extra, loop)
617        self._extra['socket'] = trsock.TransportSocket(sock)
618        try:
619            self._extra['sockname'] = sock.getsockname()
620        except OSError:
621            self._extra['sockname'] = None
622        if 'peername' not in self._extra:
623            try:
624                self._extra['peername'] = sock.getpeername()
625            except socket.error:
626                self._extra['peername'] = None
627        self._sock = sock
628        self._sock_fd = sock.fileno()
629
630        self._protocol_connected = False
631        self.set_protocol(protocol)
632
633        self._server = server
634        self._buffer = self._buffer_factory()
635        self._conn_lost = 0  # Set when call to connection_lost scheduled.
636        self._closing = False  # Set when close() called.
637        if self._server is not None:
638            self._server._attach()
639        loop._transports[self._sock_fd] = self
640
641    def __repr__(self):
642        info = [self.__class__.__name__]
643        if self._sock is None:
644            info.append('closed')
645        elif self._closing:
646            info.append('closing')
647        info.append(f'fd={self._sock_fd}')
648        # test if the transport was closed
649        if self._loop is not None and not self._loop.is_closed():
650            polling = _test_selector_event(self._loop._selector,
651                                           self._sock_fd, selectors.EVENT_READ)
652            if polling:
653                info.append('read=polling')
654            else:
655                info.append('read=idle')
656
657            polling = _test_selector_event(self._loop._selector,
658                                           self._sock_fd,
659                                           selectors.EVENT_WRITE)
660            if polling:
661                state = 'polling'
662            else:
663                state = 'idle'
664
665            bufsize = self.get_write_buffer_size()
666            info.append(f'write=<{state}, bufsize={bufsize}>')
667        return '<{}>'.format(' '.join(info))
668
669    def abort(self):
670        self._force_close(None)
671
672    def set_protocol(self, protocol):
673        self._protocol = protocol
674        self._protocol_connected = True
675
676    def get_protocol(self):
677        return self._protocol
678
679    def is_closing(self):
680        return self._closing
681
682    def close(self):
683        if self._closing:
684            return
685        self._closing = True
686        self._loop._remove_reader(self._sock_fd)
687        if not self._buffer:
688            self._conn_lost += 1
689            self._loop._remove_writer(self._sock_fd)
690            self._loop.call_soon(self._call_connection_lost, None)
691
692    def __del__(self, _warn=warnings.warn):
693        if self._sock is not None:
694            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
695            self._sock.close()
696
697    def _fatal_error(self, exc, message='Fatal error on transport'):
698        # Should be called from exception handler only.
699        if isinstance(exc, OSError):
700            if self._loop.get_debug():
701                logger.debug("%r: %s", self, message, exc_info=True)
702        else:
703            self._loop.call_exception_handler({
704                'message': message,
705                'exception': exc,
706                'transport': self,
707                'protocol': self._protocol,
708            })
709        self._force_close(exc)
710
711    def _force_close(self, exc):
712        if self._conn_lost:
713            return
714        if self._buffer:
715            self._buffer.clear()
716            self._loop._remove_writer(self._sock_fd)
717        if not self._closing:
718            self._closing = True
719            self._loop._remove_reader(self._sock_fd)
720        self._conn_lost += 1
721        self._loop.call_soon(self._call_connection_lost, exc)
722
723    def _call_connection_lost(self, exc):
724        try:
725            if self._protocol_connected:
726                self._protocol.connection_lost(exc)
727        finally:
728            self._sock.close()
729            self._sock = None
730            self._protocol = None
731            self._loop = None
732            server = self._server
733            if server is not None:
734                server._detach()
735                self._server = None
736
737    def get_write_buffer_size(self):
738        return len(self._buffer)
739
740    def _add_reader(self, fd, callback, *args):
741        if self._closing:
742            return
743
744        self._loop._add_reader(fd, callback, *args)
745
746
747class _SelectorSocketTransport(_SelectorTransport):
748
749    _start_tls_compatible = True
750    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
751
752    def __init__(self, loop, sock, protocol, waiter=None,
753                 extra=None, server=None):
754
755        self._read_ready_cb = None
756        super().__init__(loop, sock, protocol, extra, server)
757        self._eof = False
758        self._paused = False
759        self._empty_waiter = None
760
761        # Disable the Nagle algorithm -- small writes will be
762        # sent without waiting for the TCP ACK.  This generally
763        # decreases the latency (in some cases significantly.)
764        base_events._set_nodelay(self._sock)
765
766        self._loop.call_soon(self._protocol.connection_made, self)
767        # only start reading when connection_made() has been called
768        self._loop.call_soon(self._add_reader,
769                             self._sock_fd, self._read_ready)
770        if waiter is not None:
771            # only wake up the waiter when connection_made() has been called
772            self._loop.call_soon(futures._set_result_unless_cancelled,
773                                 waiter, None)
774
775    def set_protocol(self, protocol):
776        if isinstance(protocol, protocols.BufferedProtocol):
777            self._read_ready_cb = self._read_ready__get_buffer
778        else:
779            self._read_ready_cb = self._read_ready__data_received
780
781        super().set_protocol(protocol)
782
783    def is_reading(self):
784        return not self._paused and not self._closing
785
786    def pause_reading(self):
787        if self._closing or self._paused:
788            return
789        self._paused = True
790        self._loop._remove_reader(self._sock_fd)
791        if self._loop.get_debug():
792            logger.debug("%r pauses reading", self)
793
794    def resume_reading(self):
795        if self._closing or not self._paused:
796            return
797        self._paused = False
798        self._add_reader(self._sock_fd, self._read_ready)
799        if self._loop.get_debug():
800            logger.debug("%r resumes reading", self)
801
802    def _read_ready(self):
803        self._read_ready_cb()
804
805    def _read_ready__get_buffer(self):
806        if self._conn_lost:
807            return
808
809        try:
810            buf = self._protocol.get_buffer(-1)
811            if not len(buf):
812                raise RuntimeError('get_buffer() returned an empty buffer')
813        except (SystemExit, KeyboardInterrupt):
814            raise
815        except BaseException as exc:
816            self._fatal_error(
817                exc, 'Fatal error: protocol.get_buffer() call failed.')
818            return
819
820        try:
821            nbytes = self._sock.recv_into(buf)
822        except (BlockingIOError, InterruptedError):
823            return
824        except (SystemExit, KeyboardInterrupt):
825            raise
826        except BaseException as exc:
827            self._fatal_error(exc, 'Fatal read error on socket transport')
828            return
829
830        if not nbytes:
831            self._read_ready__on_eof()
832            return
833
834        try:
835            self._protocol.buffer_updated(nbytes)
836        except (SystemExit, KeyboardInterrupt):
837            raise
838        except BaseException as exc:
839            self._fatal_error(
840                exc, 'Fatal error: protocol.buffer_updated() call failed.')
841
842    def _read_ready__data_received(self):
843        if self._conn_lost:
844            return
845        try:
846            data = self._sock.recv(self.max_size)
847        except (BlockingIOError, InterruptedError):
848            return
849        except (SystemExit, KeyboardInterrupt):
850            raise
851        except BaseException as exc:
852            self._fatal_error(exc, 'Fatal read error on socket transport')
853            return
854
855        if not data:
856            self._read_ready__on_eof()
857            return
858
859        try:
860            self._protocol.data_received(data)
861        except (SystemExit, KeyboardInterrupt):
862            raise
863        except BaseException as exc:
864            self._fatal_error(
865                exc, 'Fatal error: protocol.data_received() call failed.')
866
867    def _read_ready__on_eof(self):
868        if self._loop.get_debug():
869            logger.debug("%r received EOF", self)
870
871        try:
872            keep_open = self._protocol.eof_received()
873        except (SystemExit, KeyboardInterrupt):
874            raise
875        except BaseException as exc:
876            self._fatal_error(
877                exc, 'Fatal error: protocol.eof_received() call failed.')
878            return
879
880        if keep_open:
881            # We're keeping the connection open so the
882            # protocol can write more, but we still can't
883            # receive more, so remove the reader callback.
884            self._loop._remove_reader(self._sock_fd)
885        else:
886            self.close()
887
888    def write(self, data):
889        if not isinstance(data, (bytes, bytearray, memoryview)):
890            raise TypeError(f'data argument must be a bytes-like object, '
891                            f'not {type(data).__name__!r}')
892        if self._eof:
893            raise RuntimeError('Cannot call write() after write_eof()')
894        if self._empty_waiter is not None:
895            raise RuntimeError('unable to write; sendfile is in progress')
896        if not data:
897            return
898
899        if self._conn_lost:
900            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
901                logger.warning('socket.send() raised exception.')
902            self._conn_lost += 1
903            return
904
905        if not self._buffer:
906            # Optimization: try to send now.
907            try:
908                n = self._sock.send(data)
909            except (BlockingIOError, InterruptedError):
910                pass
911            except (SystemExit, KeyboardInterrupt):
912                raise
913            except BaseException as exc:
914                self._fatal_error(exc, 'Fatal write error on socket transport')
915                return
916            else:
917                data = data[n:]
918                if not data:
919                    return
920            # Not all was written; register write handler.
921            self._loop._add_writer(self._sock_fd, self._write_ready)
922
923        # Add it to the buffer.
924        self._buffer.extend(data)
925        self._maybe_pause_protocol()
926
927    def _write_ready(self):
928        assert self._buffer, 'Data should not be empty'
929
930        if self._conn_lost:
931            return
932        try:
933            n = self._sock.send(self._buffer)
934        except (BlockingIOError, InterruptedError):
935            pass
936        except (SystemExit, KeyboardInterrupt):
937            raise
938        except BaseException as exc:
939            self._loop._remove_writer(self._sock_fd)
940            self._buffer.clear()
941            self._fatal_error(exc, 'Fatal write error on socket transport')
942            if self._empty_waiter is not None:
943                self._empty_waiter.set_exception(exc)
944        else:
945            if n:
946                del self._buffer[:n]
947            self._maybe_resume_protocol()  # May append to buffer.
948            if not self._buffer:
949                self._loop._remove_writer(self._sock_fd)
950                if self._empty_waiter is not None:
951                    self._empty_waiter.set_result(None)
952                if self._closing:
953                    self._call_connection_lost(None)
954                elif self._eof:
955                    self._sock.shutdown(socket.SHUT_WR)
956
957    def write_eof(self):
958        if self._closing or self._eof:
959            return
960        self._eof = True
961        if not self._buffer:
962            self._sock.shutdown(socket.SHUT_WR)
963
964    def can_write_eof(self):
965        return True
966
967    def _call_connection_lost(self, exc):
968        super()._call_connection_lost(exc)
969        if self._empty_waiter is not None:
970            self._empty_waiter.set_exception(
971                ConnectionError("Connection is closed by peer"))
972
973    def _make_empty_waiter(self):
974        if self._empty_waiter is not None:
975            raise RuntimeError("Empty waiter is already set")
976        self._empty_waiter = self._loop.create_future()
977        if not self._buffer:
978            self._empty_waiter.set_result(None)
979        return self._empty_waiter
980
981    def _reset_empty_waiter(self):
982        self._empty_waiter = None
983
984
985class _SelectorDatagramTransport(_SelectorTransport):
986
987    _buffer_factory = collections.deque
988
989    def __init__(self, loop, sock, protocol, address=None,
990                 waiter=None, extra=None):
991        super().__init__(loop, sock, protocol, extra)
992        self._address = address
993        self._loop.call_soon(self._protocol.connection_made, self)
994        # only start reading when connection_made() has been called
995        self._loop.call_soon(self._add_reader,
996                             self._sock_fd, self._read_ready)
997        if waiter is not None:
998            # only wake up the waiter when connection_made() has been called
999            self._loop.call_soon(futures._set_result_unless_cancelled,
1000                                 waiter, None)
1001
1002    def get_write_buffer_size(self):
1003        return sum(len(data) for data, _ in self._buffer)
1004
1005    def _read_ready(self):
1006        if self._conn_lost:
1007            return
1008        try:
1009            data, addr = self._sock.recvfrom(self.max_size)
1010        except (BlockingIOError, InterruptedError):
1011            pass
1012        except OSError as exc:
1013            self._protocol.error_received(exc)
1014        except (SystemExit, KeyboardInterrupt):
1015            raise
1016        except BaseException as exc:
1017            self._fatal_error(exc, 'Fatal read error on datagram transport')
1018        else:
1019            self._protocol.datagram_received(data, addr)
1020
1021    def sendto(self, data, addr=None):
1022        if not isinstance(data, (bytes, bytearray, memoryview)):
1023            raise TypeError(f'data argument must be a bytes-like object, '
1024                            f'not {type(data).__name__!r}')
1025        if not data:
1026            return
1027
1028        if self._address:
1029            if addr not in (None, self._address):
1030                raise ValueError(
1031                    f'Invalid address: must be None or {self._address}')
1032            addr = self._address
1033
1034        if self._conn_lost and self._address:
1035            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1036                logger.warning('socket.send() raised exception.')
1037            self._conn_lost += 1
1038            return
1039
1040        if not self._buffer:
1041            # Attempt to send it right away first.
1042            try:
1043                if self._extra['peername']:
1044                    self._sock.send(data)
1045                else:
1046                    self._sock.sendto(data, addr)
1047                return
1048            except (BlockingIOError, InterruptedError):
1049                self._loop._add_writer(self._sock_fd, self._sendto_ready)
1050            except OSError as exc:
1051                self._protocol.error_received(exc)
1052                return
1053            except (SystemExit, KeyboardInterrupt):
1054                raise
1055            except BaseException as exc:
1056                self._fatal_error(
1057                    exc, 'Fatal write error on datagram transport')
1058                return
1059
1060        # Ensure that what we buffer is immutable.
1061        self._buffer.append((bytes(data), addr))
1062        self._maybe_pause_protocol()
1063
1064    def _sendto_ready(self):
1065        while self._buffer:
1066            data, addr = self._buffer.popleft()
1067            try:
1068                if self._extra['peername']:
1069                    self._sock.send(data)
1070                else:
1071                    self._sock.sendto(data, addr)
1072            except (BlockingIOError, InterruptedError):
1073                self._buffer.appendleft((data, addr))  # Try again later.
1074                break
1075            except OSError as exc:
1076                self._protocol.error_received(exc)
1077                return
1078            except (SystemExit, KeyboardInterrupt):
1079                raise
1080            except BaseException as exc:
1081                self._fatal_error(
1082                    exc, 'Fatal write error on datagram transport')
1083                return
1084
1085        self._maybe_resume_protocol()  # May append to buffer.
1086        if not self._buffer:
1087            self._loop._remove_writer(self._sock_fd)
1088            if self._closing:
1089                self._call_connection_lost(None)
1090