• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer.  For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17    import ssl
18except ImportError:  # pragma: no cover
19    ssl = None
20
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
30
31
32def _test_selector_event(selector, fd, event):
33    # Test if the selector is monitoring 'event' events
34    # for the file descriptor 'fd'.
35    try:
36        key = selector.get_key(fd)
37    except KeyError:
38        return False
39    else:
40        return bool(key.events & event)
41
42
43def _check_ssl_socket(sock):
44    if ssl is not None and isinstance(sock, ssl.SSLSocket):
45        raise TypeError("Socket cannot be of type SSLSocket")
46
47
48class BaseSelectorEventLoop(base_events.BaseEventLoop):
49    """Selector event loop.
50
51    See events.EventLoop for API specification.
52    """
53
54    def __init__(self, selector=None):
55        super().__init__()
56
57        if selector is None:
58            selector = selectors.DefaultSelector()
59        logger.debug('Using selector: %s', selector.__class__.__name__)
60        self._selector = selector
61        self._make_self_pipe()
62        self._transports = weakref.WeakValueDictionary()
63
64    def _make_socket_transport(self, sock, protocol, waiter=None, *,
65                               extra=None, server=None):
66        return _SelectorSocketTransport(self, sock, protocol, waiter,
67                                        extra, server)
68
69    def _make_ssl_transport(
70            self, rawsock, protocol, sslcontext, waiter=None,
71            *, server_side=False, server_hostname=None,
72            extra=None, server=None,
73            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
74        ssl_protocol = sslproto.SSLProtocol(
75                self, protocol, sslcontext, waiter,
76                server_side, server_hostname,
77                ssl_handshake_timeout=ssl_handshake_timeout)
78        _SelectorSocketTransport(self, rawsock, ssl_protocol,
79                                 extra=extra, server=server)
80        return ssl_protocol._app_transport
81
82    def _make_datagram_transport(self, sock, protocol,
83                                 address=None, waiter=None, extra=None):
84        return _SelectorDatagramTransport(self, sock, protocol,
85                                          address, waiter, extra)
86
87    def close(self):
88        if self.is_running():
89            raise RuntimeError("Cannot close a running event loop")
90        if self.is_closed():
91            return
92        self._close_self_pipe()
93        super().close()
94        if self._selector is not None:
95            self._selector.close()
96            self._selector = None
97
98    def _close_self_pipe(self):
99        self._remove_reader(self._ssock.fileno())
100        self._ssock.close()
101        self._ssock = None
102        self._csock.close()
103        self._csock = None
104        self._internal_fds -= 1
105
106    def _make_self_pipe(self):
107        # A self-socket, really. :-)
108        self._ssock, self._csock = socket.socketpair()
109        self._ssock.setblocking(False)
110        self._csock.setblocking(False)
111        self._internal_fds += 1
112        self._add_reader(self._ssock.fileno(), self._read_from_self)
113
114    def _process_self_data(self, data):
115        pass
116
117    def _read_from_self(self):
118        while True:
119            try:
120                data = self._ssock.recv(4096)
121                if not data:
122                    break
123                self._process_self_data(data)
124            except InterruptedError:
125                continue
126            except BlockingIOError:
127                break
128
129    def _write_to_self(self):
130        # This may be called from a different thread, possibly after
131        # _close_self_pipe() has been called or even while it is
132        # running.  Guard for self._csock being None or closed.  When
133        # a socket is closed, send() raises OSError (with errno set to
134        # EBADF, but let's not rely on the exact error code).
135        csock = self._csock
136        if csock is None:
137            return
138
139        try:
140            csock.send(b'\0')
141        except OSError:
142            if self._debug:
143                logger.debug("Fail to write a null byte into the "
144                             "self-pipe socket",
145                             exc_info=True)
146
147    def _start_serving(self, protocol_factory, sock,
148                       sslcontext=None, server=None, backlog=100,
149                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
150        self._add_reader(sock.fileno(), self._accept_connection,
151                         protocol_factory, sock, sslcontext, server, backlog,
152                         ssl_handshake_timeout)
153
154    def _accept_connection(
155            self, protocol_factory, sock,
156            sslcontext=None, server=None, backlog=100,
157            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
158        # This method is only called once for each event loop tick where the
159        # listening socket has triggered an EVENT_READ. There may be multiple
160        # connections waiting for an .accept() so it is called in a loop.
161        # See https://bugs.python.org/issue27906 for more details.
162        for _ in range(backlog):
163            try:
164                conn, addr = sock.accept()
165                if self._debug:
166                    logger.debug("%r got a new connection from %r: %r",
167                                 server, addr, conn)
168                conn.setblocking(False)
169            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
170                # Early exit because the socket accept buffer is empty.
171                return None
172            except OSError as exc:
173                # There's nowhere to send the error, so just log it.
174                if exc.errno in (errno.EMFILE, errno.ENFILE,
175                                 errno.ENOBUFS, errno.ENOMEM):
176                    # Some platforms (e.g. Linux keep reporting the FD as
177                    # ready, so we remove the read handler temporarily.
178                    # We'll try again in a while.
179                    self.call_exception_handler({
180                        'message': 'socket.accept() out of system resource',
181                        'exception': exc,
182                        'socket': trsock.TransportSocket(sock),
183                    })
184                    self._remove_reader(sock.fileno())
185                    self.call_later(constants.ACCEPT_RETRY_DELAY,
186                                    self._start_serving,
187                                    protocol_factory, sock, sslcontext, server,
188                                    backlog, ssl_handshake_timeout)
189                else:
190                    raise  # The event loop will catch, log and ignore it.
191            else:
192                extra = {'peername': addr}
193                accept = self._accept_connection2(
194                    protocol_factory, conn, extra, sslcontext, server,
195                    ssl_handshake_timeout)
196                self.create_task(accept)
197
198    async def _accept_connection2(
199            self, protocol_factory, conn, extra,
200            sslcontext=None, server=None,
201            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
202        protocol = None
203        transport = None
204        try:
205            protocol = protocol_factory()
206            waiter = self.create_future()
207            if sslcontext:
208                transport = self._make_ssl_transport(
209                    conn, protocol, sslcontext, waiter=waiter,
210                    server_side=True, extra=extra, server=server,
211                    ssl_handshake_timeout=ssl_handshake_timeout)
212            else:
213                transport = self._make_socket_transport(
214                    conn, protocol, waiter=waiter, extra=extra,
215                    server=server)
216
217            try:
218                await waiter
219            except BaseException:
220                transport.close()
221                raise
222                # It's now up to the protocol to handle the connection.
223
224        except (SystemExit, KeyboardInterrupt):
225            raise
226        except BaseException as exc:
227            if self._debug:
228                context = {
229                    'message':
230                        'Error on transport creation for incoming connection',
231                    'exception': exc,
232                }
233                if protocol is not None:
234                    context['protocol'] = protocol
235                if transport is not None:
236                    context['transport'] = transport
237                self.call_exception_handler(context)
238
239    def _ensure_fd_no_transport(self, fd):
240        fileno = fd
241        if not isinstance(fileno, int):
242            try:
243                fileno = int(fileno.fileno())
244            except (AttributeError, TypeError, ValueError):
245                # This code matches selectors._fileobj_to_fd function.
246                raise ValueError(f"Invalid file object: {fd!r}") from None
247        try:
248            transport = self._transports[fileno]
249        except KeyError:
250            pass
251        else:
252            if not transport.is_closing():
253                raise RuntimeError(
254                    f'File descriptor {fd!r} is used by transport '
255                    f'{transport!r}')
256
257    def _add_reader(self, fd, callback, *args):
258        self._check_closed()
259        handle = events.Handle(callback, args, self, None)
260        try:
261            key = self._selector.get_key(fd)
262        except KeyError:
263            self._selector.register(fd, selectors.EVENT_READ,
264                                    (handle, None))
265        else:
266            mask, (reader, writer) = key.events, key.data
267            self._selector.modify(fd, mask | selectors.EVENT_READ,
268                                  (handle, writer))
269            if reader is not None:
270                reader.cancel()
271        return handle
272
273    def _remove_reader(self, fd):
274        if self.is_closed():
275            return False
276        try:
277            key = self._selector.get_key(fd)
278        except KeyError:
279            return False
280        else:
281            mask, (reader, writer) = key.events, key.data
282            mask &= ~selectors.EVENT_READ
283            if not mask:
284                self._selector.unregister(fd)
285            else:
286                self._selector.modify(fd, mask, (None, writer))
287
288            if reader is not None:
289                reader.cancel()
290                return True
291            else:
292                return False
293
294    def _add_writer(self, fd, callback, *args):
295        self._check_closed()
296        handle = events.Handle(callback, args, self, None)
297        try:
298            key = self._selector.get_key(fd)
299        except KeyError:
300            self._selector.register(fd, selectors.EVENT_WRITE,
301                                    (None, handle))
302        else:
303            mask, (reader, writer) = key.events, key.data
304            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
305                                  (reader, handle))
306            if writer is not None:
307                writer.cancel()
308        return handle
309
310    def _remove_writer(self, fd):
311        """Remove a writer callback."""
312        if self.is_closed():
313            return False
314        try:
315            key = self._selector.get_key(fd)
316        except KeyError:
317            return False
318        else:
319            mask, (reader, writer) = key.events, key.data
320            # Remove both writer and connector.
321            mask &= ~selectors.EVENT_WRITE
322            if not mask:
323                self._selector.unregister(fd)
324            else:
325                self._selector.modify(fd, mask, (reader, None))
326
327            if writer is not None:
328                writer.cancel()
329                return True
330            else:
331                return False
332
333    def add_reader(self, fd, callback, *args):
334        """Add a reader callback."""
335        self._ensure_fd_no_transport(fd)
336        self._add_reader(fd, callback, *args)
337
338    def remove_reader(self, fd):
339        """Remove a reader callback."""
340        self._ensure_fd_no_transport(fd)
341        return self._remove_reader(fd)
342
343    def add_writer(self, fd, callback, *args):
344        """Add a writer callback.."""
345        self._ensure_fd_no_transport(fd)
346        self._add_writer(fd, callback, *args)
347
348    def remove_writer(self, fd):
349        """Remove a writer callback."""
350        self._ensure_fd_no_transport(fd)
351        return self._remove_writer(fd)
352
353    async def sock_recv(self, sock, n):
354        """Receive data from the socket.
355
356        The return value is a bytes object representing the data received.
357        The maximum amount of data to be received at once is specified by
358        nbytes.
359        """
360        _check_ssl_socket(sock)
361        if self._debug and sock.gettimeout() != 0:
362            raise ValueError("the socket must be non-blocking")
363        try:
364            return sock.recv(n)
365        except (BlockingIOError, InterruptedError):
366            pass
367        fut = self.create_future()
368        fd = sock.fileno()
369        self._ensure_fd_no_transport(fd)
370        handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
371        fut.add_done_callback(
372            functools.partial(self._sock_read_done, fd, handle=handle))
373        return await fut
374
375    def _sock_read_done(self, fd, fut, handle=None):
376        if handle is None or not handle.cancelled():
377            self.remove_reader(fd)
378
379    def _sock_recv(self, fut, sock, n):
380        # _sock_recv() can add itself as an I/O callback if the operation can't
381        # be done immediately. Don't use it directly, call sock_recv().
382        if fut.done():
383            return
384        try:
385            data = sock.recv(n)
386        except (BlockingIOError, InterruptedError):
387            return  # try again next time
388        except (SystemExit, KeyboardInterrupt):
389            raise
390        except BaseException as exc:
391            fut.set_exception(exc)
392        else:
393            fut.set_result(data)
394
395    async def sock_recv_into(self, sock, buf):
396        """Receive data from the socket.
397
398        The received data is written into *buf* (a writable buffer).
399        The return value is the number of bytes written.
400        """
401        _check_ssl_socket(sock)
402        if self._debug and sock.gettimeout() != 0:
403            raise ValueError("the socket must be non-blocking")
404        try:
405            return sock.recv_into(buf)
406        except (BlockingIOError, InterruptedError):
407            pass
408        fut = self.create_future()
409        fd = sock.fileno()
410        self._ensure_fd_no_transport(fd)
411        handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
412        fut.add_done_callback(
413            functools.partial(self._sock_read_done, fd, handle=handle))
414        return await fut
415
416    def _sock_recv_into(self, fut, sock, buf):
417        # _sock_recv_into() can add itself as an I/O callback if the operation
418        # can't be done immediately. Don't use it directly, call
419        # sock_recv_into().
420        if fut.done():
421            return
422        try:
423            nbytes = sock.recv_into(buf)
424        except (BlockingIOError, InterruptedError):
425            return  # try again next time
426        except (SystemExit, KeyboardInterrupt):
427            raise
428        except BaseException as exc:
429            fut.set_exception(exc)
430        else:
431            fut.set_result(nbytes)
432
433    async def sock_sendall(self, sock, data):
434        """Send data to the socket.
435
436        The socket must be connected to a remote socket. This method continues
437        to send data from data until either all data has been sent or an
438        error occurs. None is returned on success. On error, an exception is
439        raised, and there is no way to determine how much data, if any, was
440        successfully processed by the receiving end of the connection.
441        """
442        _check_ssl_socket(sock)
443        if self._debug and sock.gettimeout() != 0:
444            raise ValueError("the socket must be non-blocking")
445        try:
446            n = sock.send(data)
447        except (BlockingIOError, InterruptedError):
448            n = 0
449
450        if n == len(data):
451            # all data sent
452            return
453
454        fut = self.create_future()
455        fd = sock.fileno()
456        self._ensure_fd_no_transport(fd)
457        # use a trick with a list in closure to store a mutable state
458        handle = self._add_writer(fd, self._sock_sendall, fut, sock,
459                                  memoryview(data), [n])
460        fut.add_done_callback(
461            functools.partial(self._sock_write_done, fd, handle=handle))
462        return await fut
463
464    def _sock_sendall(self, fut, sock, view, pos):
465        if fut.done():
466            # Future cancellation can be scheduled on previous loop iteration
467            return
468        start = pos[0]
469        try:
470            n = sock.send(view[start:])
471        except (BlockingIOError, InterruptedError):
472            return
473        except (SystemExit, KeyboardInterrupt):
474            raise
475        except BaseException as exc:
476            fut.set_exception(exc)
477            return
478
479        start += n
480
481        if start == len(view):
482            fut.set_result(None)
483        else:
484            pos[0] = start
485
486    async def sock_connect(self, sock, address):
487        """Connect to a remote socket at address.
488
489        This method is a coroutine.
490        """
491        _check_ssl_socket(sock)
492        if self._debug and sock.gettimeout() != 0:
493            raise ValueError("the socket must be non-blocking")
494
495        if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
496            resolved = await self._ensure_resolved(
497                address, family=sock.family, proto=sock.proto, loop=self)
498            _, _, _, _, address = resolved[0]
499
500        fut = self.create_future()
501        self._sock_connect(fut, sock, address)
502        return await fut
503
504    def _sock_connect(self, fut, sock, address):
505        fd = sock.fileno()
506        try:
507            sock.connect(address)
508        except (BlockingIOError, InterruptedError):
509            # Issue #23618: When the C function connect() fails with EINTR, the
510            # connection runs in background. We have to wait until the socket
511            # becomes writable to be notified when the connection succeed or
512            # fails.
513            self._ensure_fd_no_transport(fd)
514            handle = self._add_writer(
515                fd, self._sock_connect_cb, fut, sock, address)
516            fut.add_done_callback(
517                functools.partial(self._sock_write_done, fd, handle=handle))
518        except (SystemExit, KeyboardInterrupt):
519            raise
520        except BaseException as exc:
521            fut.set_exception(exc)
522        else:
523            fut.set_result(None)
524
525    def _sock_write_done(self, fd, fut, handle=None):
526        if handle is None or not handle.cancelled():
527            self.remove_writer(fd)
528
529    def _sock_connect_cb(self, fut, sock, address):
530        if fut.done():
531            return
532
533        try:
534            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
535            if err != 0:
536                # Jump to any except clause below.
537                raise OSError(err, f'Connect call failed {address}')
538        except (BlockingIOError, InterruptedError):
539            # socket is still registered, the callback will be retried later
540            pass
541        except (SystemExit, KeyboardInterrupt):
542            raise
543        except BaseException as exc:
544            fut.set_exception(exc)
545        else:
546            fut.set_result(None)
547
548    async def sock_accept(self, sock):
549        """Accept a connection.
550
551        The socket must be bound to an address and listening for connections.
552        The return value is a pair (conn, address) where conn is a new socket
553        object usable to send and receive data on the connection, and address
554        is the address bound to the socket on the other end of the connection.
555        """
556        _check_ssl_socket(sock)
557        if self._debug and sock.gettimeout() != 0:
558            raise ValueError("the socket must be non-blocking")
559        fut = self.create_future()
560        self._sock_accept(fut, sock)
561        return await fut
562
563    def _sock_accept(self, fut, sock):
564        fd = sock.fileno()
565        try:
566            conn, address = sock.accept()
567            conn.setblocking(False)
568        except (BlockingIOError, InterruptedError):
569            self._ensure_fd_no_transport(fd)
570            handle = self._add_reader(fd, self._sock_accept, fut, sock)
571            fut.add_done_callback(
572                functools.partial(self._sock_read_done, fd, handle=handle))
573        except (SystemExit, KeyboardInterrupt):
574            raise
575        except BaseException as exc:
576            fut.set_exception(exc)
577        else:
578            fut.set_result((conn, address))
579
580    async def _sendfile_native(self, transp, file, offset, count):
581        del self._transports[transp._sock_fd]
582        resume_reading = transp.is_reading()
583        transp.pause_reading()
584        await transp._make_empty_waiter()
585        try:
586            return await self.sock_sendfile(transp._sock, file, offset, count,
587                                            fallback=False)
588        finally:
589            transp._reset_empty_waiter()
590            if resume_reading:
591                transp.resume_reading()
592            self._transports[transp._sock_fd] = transp
593
594    def _process_events(self, event_list):
595        for key, mask in event_list:
596            fileobj, (reader, writer) = key.fileobj, key.data
597            if mask & selectors.EVENT_READ and reader is not None:
598                if reader._cancelled:
599                    self._remove_reader(fileobj)
600                else:
601                    self._add_callback(reader)
602            if mask & selectors.EVENT_WRITE and writer is not None:
603                if writer._cancelled:
604                    self._remove_writer(fileobj)
605                else:
606                    self._add_callback(writer)
607
608    def _stop_serving(self, sock):
609        self._remove_reader(sock.fileno())
610        sock.close()
611
612
613class _SelectorTransport(transports._FlowControlMixin,
614                         transports.Transport):
615
616    max_size = 256 * 1024  # Buffer size passed to recv().
617
618    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
619
620    # Attribute used in the destructor: it must be set even if the constructor
621    # is not called (see _SelectorSslTransport which may start by raising an
622    # exception)
623    _sock = None
624
625    def __init__(self, loop, sock, protocol, extra=None, server=None):
626        super().__init__(extra, loop)
627        self._extra['socket'] = trsock.TransportSocket(sock)
628        try:
629            self._extra['sockname'] = sock.getsockname()
630        except OSError:
631            self._extra['sockname'] = None
632        if 'peername' not in self._extra:
633            try:
634                self._extra['peername'] = sock.getpeername()
635            except socket.error:
636                self._extra['peername'] = None
637        self._sock = sock
638        self._sock_fd = sock.fileno()
639
640        self._protocol_connected = False
641        self.set_protocol(protocol)
642
643        self._server = server
644        self._buffer = self._buffer_factory()
645        self._conn_lost = 0  # Set when call to connection_lost scheduled.
646        self._closing = False  # Set when close() called.
647        if self._server is not None:
648            self._server._attach()
649        loop._transports[self._sock_fd] = self
650
651    def __repr__(self):
652        info = [self.__class__.__name__]
653        if self._sock is None:
654            info.append('closed')
655        elif self._closing:
656            info.append('closing')
657        info.append(f'fd={self._sock_fd}')
658        # test if the transport was closed
659        if self._loop is not None and not self._loop.is_closed():
660            polling = _test_selector_event(self._loop._selector,
661                                           self._sock_fd, selectors.EVENT_READ)
662            if polling:
663                info.append('read=polling')
664            else:
665                info.append('read=idle')
666
667            polling = _test_selector_event(self._loop._selector,
668                                           self._sock_fd,
669                                           selectors.EVENT_WRITE)
670            if polling:
671                state = 'polling'
672            else:
673                state = 'idle'
674
675            bufsize = self.get_write_buffer_size()
676            info.append(f'write=<{state}, bufsize={bufsize}>')
677        return '<{}>'.format(' '.join(info))
678
679    def abort(self):
680        self._force_close(None)
681
682    def set_protocol(self, protocol):
683        self._protocol = protocol
684        self._protocol_connected = True
685
686    def get_protocol(self):
687        return self._protocol
688
689    def is_closing(self):
690        return self._closing
691
692    def close(self):
693        if self._closing:
694            return
695        self._closing = True
696        self._loop._remove_reader(self._sock_fd)
697        if not self._buffer:
698            self._conn_lost += 1
699            self._loop._remove_writer(self._sock_fd)
700            self._loop.call_soon(self._call_connection_lost, None)
701
702    def __del__(self, _warn=warnings.warn):
703        if self._sock is not None:
704            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
705            self._sock.close()
706
707    def _fatal_error(self, exc, message='Fatal error on transport'):
708        # Should be called from exception handler only.
709        if isinstance(exc, OSError):
710            if self._loop.get_debug():
711                logger.debug("%r: %s", self, message, exc_info=True)
712        else:
713            self._loop.call_exception_handler({
714                'message': message,
715                'exception': exc,
716                'transport': self,
717                'protocol': self._protocol,
718            })
719        self._force_close(exc)
720
721    def _force_close(self, exc):
722        if self._conn_lost:
723            return
724        if self._buffer:
725            self._buffer.clear()
726            self._loop._remove_writer(self._sock_fd)
727        if not self._closing:
728            self._closing = True
729            self._loop._remove_reader(self._sock_fd)
730        self._conn_lost += 1
731        self._loop.call_soon(self._call_connection_lost, exc)
732
733    def _call_connection_lost(self, exc):
734        try:
735            if self._protocol_connected:
736                self._protocol.connection_lost(exc)
737        finally:
738            self._sock.close()
739            self._sock = None
740            self._protocol = None
741            self._loop = None
742            server = self._server
743            if server is not None:
744                server._detach()
745                self._server = None
746
747    def get_write_buffer_size(self):
748        return len(self._buffer)
749
750    def _add_reader(self, fd, callback, *args):
751        if self._closing:
752            return
753
754        self._loop._add_reader(fd, callback, *args)
755
756
757class _SelectorSocketTransport(_SelectorTransport):
758
759    _start_tls_compatible = True
760    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
761
762    def __init__(self, loop, sock, protocol, waiter=None,
763                 extra=None, server=None):
764
765        self._read_ready_cb = None
766        super().__init__(loop, sock, protocol, extra, server)
767        self._eof = False
768        self._paused = False
769        self._empty_waiter = None
770
771        # Disable the Nagle algorithm -- small writes will be
772        # sent without waiting for the TCP ACK.  This generally
773        # decreases the latency (in some cases significantly.)
774        base_events._set_nodelay(self._sock)
775
776        self._loop.call_soon(self._protocol.connection_made, self)
777        # only start reading when connection_made() has been called
778        self._loop.call_soon(self._add_reader,
779                             self._sock_fd, self._read_ready)
780        if waiter is not None:
781            # only wake up the waiter when connection_made() has been called
782            self._loop.call_soon(futures._set_result_unless_cancelled,
783                                 waiter, None)
784
785    def set_protocol(self, protocol):
786        if isinstance(protocol, protocols.BufferedProtocol):
787            self._read_ready_cb = self._read_ready__get_buffer
788        else:
789            self._read_ready_cb = self._read_ready__data_received
790
791        super().set_protocol(protocol)
792
793    def is_reading(self):
794        return not self._paused and not self._closing
795
796    def pause_reading(self):
797        if self._closing or self._paused:
798            return
799        self._paused = True
800        self._loop._remove_reader(self._sock_fd)
801        if self._loop.get_debug():
802            logger.debug("%r pauses reading", self)
803
804    def resume_reading(self):
805        if self._closing or not self._paused:
806            return
807        self._paused = False
808        self._add_reader(self._sock_fd, self._read_ready)
809        if self._loop.get_debug():
810            logger.debug("%r resumes reading", self)
811
812    def _read_ready(self):
813        self._read_ready_cb()
814
815    def _read_ready__get_buffer(self):
816        if self._conn_lost:
817            return
818
819        try:
820            buf = self._protocol.get_buffer(-1)
821            if not len(buf):
822                raise RuntimeError('get_buffer() returned an empty buffer')
823        except (SystemExit, KeyboardInterrupt):
824            raise
825        except BaseException as exc:
826            self._fatal_error(
827                exc, 'Fatal error: protocol.get_buffer() call failed.')
828            return
829
830        try:
831            nbytes = self._sock.recv_into(buf)
832        except (BlockingIOError, InterruptedError):
833            return
834        except (SystemExit, KeyboardInterrupt):
835            raise
836        except BaseException as exc:
837            self._fatal_error(exc, 'Fatal read error on socket transport')
838            return
839
840        if not nbytes:
841            self._read_ready__on_eof()
842            return
843
844        try:
845            self._protocol.buffer_updated(nbytes)
846        except (SystemExit, KeyboardInterrupt):
847            raise
848        except BaseException as exc:
849            self._fatal_error(
850                exc, 'Fatal error: protocol.buffer_updated() call failed.')
851
852    def _read_ready__data_received(self):
853        if self._conn_lost:
854            return
855        try:
856            data = self._sock.recv(self.max_size)
857        except (BlockingIOError, InterruptedError):
858            return
859        except (SystemExit, KeyboardInterrupt):
860            raise
861        except BaseException as exc:
862            self._fatal_error(exc, 'Fatal read error on socket transport')
863            return
864
865        if not data:
866            self._read_ready__on_eof()
867            return
868
869        try:
870            self._protocol.data_received(data)
871        except (SystemExit, KeyboardInterrupt):
872            raise
873        except BaseException as exc:
874            self._fatal_error(
875                exc, 'Fatal error: protocol.data_received() call failed.')
876
877    def _read_ready__on_eof(self):
878        if self._loop.get_debug():
879            logger.debug("%r received EOF", self)
880
881        try:
882            keep_open = self._protocol.eof_received()
883        except (SystemExit, KeyboardInterrupt):
884            raise
885        except BaseException as exc:
886            self._fatal_error(
887                exc, 'Fatal error: protocol.eof_received() call failed.')
888            return
889
890        if keep_open:
891            # We're keeping the connection open so the
892            # protocol can write more, but we still can't
893            # receive more, so remove the reader callback.
894            self._loop._remove_reader(self._sock_fd)
895        else:
896            self.close()
897
898    def write(self, data):
899        if not isinstance(data, (bytes, bytearray, memoryview)):
900            raise TypeError(f'data argument must be a bytes-like object, '
901                            f'not {type(data).__name__!r}')
902        if self._eof:
903            raise RuntimeError('Cannot call write() after write_eof()')
904        if self._empty_waiter is not None:
905            raise RuntimeError('unable to write; sendfile is in progress')
906        if not data:
907            return
908
909        if self._conn_lost:
910            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
911                logger.warning('socket.send() raised exception.')
912            self._conn_lost += 1
913            return
914
915        if not self._buffer:
916            # Optimization: try to send now.
917            try:
918                n = self._sock.send(data)
919            except (BlockingIOError, InterruptedError):
920                pass
921            except (SystemExit, KeyboardInterrupt):
922                raise
923            except BaseException as exc:
924                self._fatal_error(exc, 'Fatal write error on socket transport')
925                return
926            else:
927                data = data[n:]
928                if not data:
929                    return
930            # Not all was written; register write handler.
931            self._loop._add_writer(self._sock_fd, self._write_ready)
932
933        # Add it to the buffer.
934        self._buffer.extend(data)
935        self._maybe_pause_protocol()
936
937    def _write_ready(self):
938        assert self._buffer, 'Data should not be empty'
939
940        if self._conn_lost:
941            return
942        try:
943            n = self._sock.send(self._buffer)
944        except (BlockingIOError, InterruptedError):
945            pass
946        except (SystemExit, KeyboardInterrupt):
947            raise
948        except BaseException as exc:
949            self._loop._remove_writer(self._sock_fd)
950            self._buffer.clear()
951            self._fatal_error(exc, 'Fatal write error on socket transport')
952            if self._empty_waiter is not None:
953                self._empty_waiter.set_exception(exc)
954        else:
955            if n:
956                del self._buffer[:n]
957            self._maybe_resume_protocol()  # May append to buffer.
958            if not self._buffer:
959                self._loop._remove_writer(self._sock_fd)
960                if self._empty_waiter is not None:
961                    self._empty_waiter.set_result(None)
962                if self._closing:
963                    self._call_connection_lost(None)
964                elif self._eof:
965                    self._sock.shutdown(socket.SHUT_WR)
966
967    def write_eof(self):
968        if self._closing or self._eof:
969            return
970        self._eof = True
971        if not self._buffer:
972            self._sock.shutdown(socket.SHUT_WR)
973
974    def can_write_eof(self):
975        return True
976
977    def _call_connection_lost(self, exc):
978        super()._call_connection_lost(exc)
979        if self._empty_waiter is not None:
980            self._empty_waiter.set_exception(
981                ConnectionError("Connection is closed by peer"))
982
983    def _make_empty_waiter(self):
984        if self._empty_waiter is not None:
985            raise RuntimeError("Empty waiter is already set")
986        self._empty_waiter = self._loop.create_future()
987        if not self._buffer:
988            self._empty_waiter.set_result(None)
989        return self._empty_waiter
990
991    def _reset_empty_waiter(self):
992        self._empty_waiter = None
993
994
995class _SelectorDatagramTransport(_SelectorTransport):
996
997    _buffer_factory = collections.deque
998
999    def __init__(self, loop, sock, protocol, address=None,
1000                 waiter=None, extra=None):
1001        super().__init__(loop, sock, protocol, extra)
1002        self._address = address
1003        self._loop.call_soon(self._protocol.connection_made, self)
1004        # only start reading when connection_made() has been called
1005        self._loop.call_soon(self._add_reader,
1006                             self._sock_fd, self._read_ready)
1007        if waiter is not None:
1008            # only wake up the waiter when connection_made() has been called
1009            self._loop.call_soon(futures._set_result_unless_cancelled,
1010                                 waiter, None)
1011
1012    def get_write_buffer_size(self):
1013        return sum(len(data) for data, _ in self._buffer)
1014
1015    def _read_ready(self):
1016        if self._conn_lost:
1017            return
1018        try:
1019            data, addr = self._sock.recvfrom(self.max_size)
1020        except (BlockingIOError, InterruptedError):
1021            pass
1022        except OSError as exc:
1023            self._protocol.error_received(exc)
1024        except (SystemExit, KeyboardInterrupt):
1025            raise
1026        except BaseException as exc:
1027            self._fatal_error(exc, 'Fatal read error on datagram transport')
1028        else:
1029            self._protocol.datagram_received(data, addr)
1030
1031    def sendto(self, data, addr=None):
1032        if not isinstance(data, (bytes, bytearray, memoryview)):
1033            raise TypeError(f'data argument must be a bytes-like object, '
1034                            f'not {type(data).__name__!r}')
1035        if not data:
1036            return
1037
1038        if self._address:
1039            if addr not in (None, self._address):
1040                raise ValueError(
1041                    f'Invalid address: must be None or {self._address}')
1042            addr = self._address
1043
1044        if self._conn_lost and self._address:
1045            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1046                logger.warning('socket.send() raised exception.')
1047            self._conn_lost += 1
1048            return
1049
1050        if not self._buffer:
1051            # Attempt to send it right away first.
1052            try:
1053                if self._extra['peername']:
1054                    self._sock.send(data)
1055                else:
1056                    self._sock.sendto(data, addr)
1057                return
1058            except (BlockingIOError, InterruptedError):
1059                self._loop._add_writer(self._sock_fd, self._sendto_ready)
1060            except OSError as exc:
1061                self._protocol.error_received(exc)
1062                return
1063            except (SystemExit, KeyboardInterrupt):
1064                raise
1065            except BaseException as exc:
1066                self._fatal_error(
1067                    exc, 'Fatal write error on datagram transport')
1068                return
1069
1070        # Ensure that what we buffer is immutable.
1071        self._buffer.append((bytes(data), addr))
1072        self._maybe_pause_protocol()
1073
1074    def _sendto_ready(self):
1075        while self._buffer:
1076            data, addr = self._buffer.popleft()
1077            try:
1078                if self._extra['peername']:
1079                    self._sock.send(data)
1080                else:
1081                    self._sock.sendto(data, addr)
1082            except (BlockingIOError, InterruptedError):
1083                self._buffer.appendleft((data, addr))  # Try again later.
1084                break
1085            except OSError as exc:
1086                self._protocol.error_received(exc)
1087                return
1088            except (SystemExit, KeyboardInterrupt):
1089                raise
1090            except BaseException as exc:
1091                self._fatal_error(
1092                    exc, 'Fatal write error on datagram transport')
1093                return
1094
1095        self._maybe_resume_protocol()  # May append to buffer.
1096        if not self._buffer:
1097            self._loop._remove_writer(self._sock_fd)
1098            if self._closing:
1099                self._call_connection_lost(None)
1100