• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer.  For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7__all__ = 'BaseSelectorEventLoop',
8
9import collections
10import errno
11import functools
12import selectors
13import socket
14import warnings
15import weakref
16try:
17    import ssl
18except ImportError:  # pragma: no cover
19    ssl = None
20
21from . import base_events
22from . import constants
23from . import events
24from . import futures
25from . import protocols
26from . import sslproto
27from . import transports
28from . import trsock
29from .log import logger
30
31
32def _test_selector_event(selector, fd, event):
33    # Test if the selector is monitoring 'event' events
34    # for the file descriptor 'fd'.
35    try:
36        key = selector.get_key(fd)
37    except KeyError:
38        return False
39    else:
40        return bool(key.events & event)
41
42
43class BaseSelectorEventLoop(base_events.BaseEventLoop):
44    """Selector event loop.
45
46    See events.EventLoop for API specification.
47    """
48
49    def __init__(self, selector=None):
50        super().__init__()
51
52        if selector is None:
53            selector = selectors.DefaultSelector()
54        logger.debug('Using selector: %s', selector.__class__.__name__)
55        self._selector = selector
56        self._make_self_pipe()
57        self._transports = weakref.WeakValueDictionary()
58
59    def _make_socket_transport(self, sock, protocol, waiter=None, *,
60                               extra=None, server=None):
61        return _SelectorSocketTransport(self, sock, protocol, waiter,
62                                        extra, server)
63
64    def _make_ssl_transport(
65            self, rawsock, protocol, sslcontext, waiter=None,
66            *, server_side=False, server_hostname=None,
67            extra=None, server=None,
68            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
69        ssl_protocol = sslproto.SSLProtocol(
70                self, protocol, sslcontext, waiter,
71                server_side, server_hostname,
72                ssl_handshake_timeout=ssl_handshake_timeout)
73        _SelectorSocketTransport(self, rawsock, ssl_protocol,
74                                 extra=extra, server=server)
75        return ssl_protocol._app_transport
76
77    def _make_datagram_transport(self, sock, protocol,
78                                 address=None, waiter=None, extra=None):
79        return _SelectorDatagramTransport(self, sock, protocol,
80                                          address, waiter, extra)
81
82    def close(self):
83        if self.is_running():
84            raise RuntimeError("Cannot close a running event loop")
85        if self.is_closed():
86            return
87        self._close_self_pipe()
88        super().close()
89        if self._selector is not None:
90            self._selector.close()
91            self._selector = None
92
93    def _close_self_pipe(self):
94        self._remove_reader(self._ssock.fileno())
95        self._ssock.close()
96        self._ssock = None
97        self._csock.close()
98        self._csock = None
99        self._internal_fds -= 1
100
101    def _make_self_pipe(self):
102        # A self-socket, really. :-)
103        self._ssock, self._csock = socket.socketpair()
104        self._ssock.setblocking(False)
105        self._csock.setblocking(False)
106        self._internal_fds += 1
107        self._add_reader(self._ssock.fileno(), self._read_from_self)
108
109    def _process_self_data(self, data):
110        pass
111
112    def _read_from_self(self):
113        while True:
114            try:
115                data = self._ssock.recv(4096)
116                if not data:
117                    break
118                self._process_self_data(data)
119            except InterruptedError:
120                continue
121            except BlockingIOError:
122                break
123
124    def _write_to_self(self):
125        # This may be called from a different thread, possibly after
126        # _close_self_pipe() has been called or even while it is
127        # running.  Guard for self._csock being None or closed.  When
128        # a socket is closed, send() raises OSError (with errno set to
129        # EBADF, but let's not rely on the exact error code).
130        csock = self._csock
131        if csock is None:
132            return
133
134        try:
135            csock.send(b'\0')
136        except OSError:
137            if self._debug:
138                logger.debug("Fail to write a null byte into the "
139                             "self-pipe socket",
140                             exc_info=True)
141
142    def _start_serving(self, protocol_factory, sock,
143                       sslcontext=None, server=None, backlog=100,
144                       ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
145        self._add_reader(sock.fileno(), self._accept_connection,
146                         protocol_factory, sock, sslcontext, server, backlog,
147                         ssl_handshake_timeout)
148
149    def _accept_connection(
150            self, protocol_factory, sock,
151            sslcontext=None, server=None, backlog=100,
152            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
153        # This method is only called once for each event loop tick where the
154        # listening socket has triggered an EVENT_READ. There may be multiple
155        # connections waiting for an .accept() so it is called in a loop.
156        # See https://bugs.python.org/issue27906 for more details.
157        for _ in range(backlog):
158            try:
159                conn, addr = sock.accept()
160                if self._debug:
161                    logger.debug("%r got a new connection from %r: %r",
162                                 server, addr, conn)
163                conn.setblocking(False)
164            except (BlockingIOError, InterruptedError, ConnectionAbortedError):
165                # Early exit because the socket accept buffer is empty.
166                return None
167            except OSError as exc:
168                # There's nowhere to send the error, so just log it.
169                if exc.errno in (errno.EMFILE, errno.ENFILE,
170                                 errno.ENOBUFS, errno.ENOMEM):
171                    # Some platforms (e.g. Linux keep reporting the FD as
172                    # ready, so we remove the read handler temporarily.
173                    # We'll try again in a while.
174                    self.call_exception_handler({
175                        'message': 'socket.accept() out of system resource',
176                        'exception': exc,
177                        'socket': trsock.TransportSocket(sock),
178                    })
179                    self._remove_reader(sock.fileno())
180                    self.call_later(constants.ACCEPT_RETRY_DELAY,
181                                    self._start_serving,
182                                    protocol_factory, sock, sslcontext, server,
183                                    backlog, ssl_handshake_timeout)
184                else:
185                    raise  # The event loop will catch, log and ignore it.
186            else:
187                extra = {'peername': addr}
188                accept = self._accept_connection2(
189                    protocol_factory, conn, extra, sslcontext, server,
190                    ssl_handshake_timeout)
191                self.create_task(accept)
192
193    async def _accept_connection2(
194            self, protocol_factory, conn, extra,
195            sslcontext=None, server=None,
196            ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT):
197        protocol = None
198        transport = None
199        try:
200            protocol = protocol_factory()
201            waiter = self.create_future()
202            if sslcontext:
203                transport = self._make_ssl_transport(
204                    conn, protocol, sslcontext, waiter=waiter,
205                    server_side=True, extra=extra, server=server,
206                    ssl_handshake_timeout=ssl_handshake_timeout)
207            else:
208                transport = self._make_socket_transport(
209                    conn, protocol, waiter=waiter, extra=extra,
210                    server=server)
211
212            try:
213                await waiter
214            except BaseException:
215                transport.close()
216                raise
217                # It's now up to the protocol to handle the connection.
218
219        except (SystemExit, KeyboardInterrupt):
220            raise
221        except BaseException as exc:
222            if self._debug:
223                context = {
224                    'message':
225                        'Error on transport creation for incoming connection',
226                    'exception': exc,
227                }
228                if protocol is not None:
229                    context['protocol'] = protocol
230                if transport is not None:
231                    context['transport'] = transport
232                self.call_exception_handler(context)
233
234    def _ensure_fd_no_transport(self, fd):
235        fileno = fd
236        if not isinstance(fileno, int):
237            try:
238                fileno = int(fileno.fileno())
239            except (AttributeError, TypeError, ValueError):
240                # This code matches selectors._fileobj_to_fd function.
241                raise ValueError(f"Invalid file object: {fd!r}") from None
242        try:
243            transport = self._transports[fileno]
244        except KeyError:
245            pass
246        else:
247            if not transport.is_closing():
248                raise RuntimeError(
249                    f'File descriptor {fd!r} is used by transport '
250                    f'{transport!r}')
251
252    def _add_reader(self, fd, callback, *args):
253        self._check_closed()
254        handle = events.Handle(callback, args, self, None)
255        try:
256            key = self._selector.get_key(fd)
257        except KeyError:
258            self._selector.register(fd, selectors.EVENT_READ,
259                                    (handle, None))
260        else:
261            mask, (reader, writer) = key.events, key.data
262            self._selector.modify(fd, mask | selectors.EVENT_READ,
263                                  (handle, writer))
264            if reader is not None:
265                reader.cancel()
266        return handle
267
268    def _remove_reader(self, fd):
269        if self.is_closed():
270            return False
271        try:
272            key = self._selector.get_key(fd)
273        except KeyError:
274            return False
275        else:
276            mask, (reader, writer) = key.events, key.data
277            mask &= ~selectors.EVENT_READ
278            if not mask:
279                self._selector.unregister(fd)
280            else:
281                self._selector.modify(fd, mask, (None, writer))
282
283            if reader is not None:
284                reader.cancel()
285                return True
286            else:
287                return False
288
289    def _add_writer(self, fd, callback, *args):
290        self._check_closed()
291        handle = events.Handle(callback, args, self, None)
292        try:
293            key = self._selector.get_key(fd)
294        except KeyError:
295            self._selector.register(fd, selectors.EVENT_WRITE,
296                                    (None, handle))
297        else:
298            mask, (reader, writer) = key.events, key.data
299            self._selector.modify(fd, mask | selectors.EVENT_WRITE,
300                                  (reader, handle))
301            if writer is not None:
302                writer.cancel()
303        return handle
304
305    def _remove_writer(self, fd):
306        """Remove a writer callback."""
307        if self.is_closed():
308            return False
309        try:
310            key = self._selector.get_key(fd)
311        except KeyError:
312            return False
313        else:
314            mask, (reader, writer) = key.events, key.data
315            # Remove both writer and connector.
316            mask &= ~selectors.EVENT_WRITE
317            if not mask:
318                self._selector.unregister(fd)
319            else:
320                self._selector.modify(fd, mask, (reader, None))
321
322            if writer is not None:
323                writer.cancel()
324                return True
325            else:
326                return False
327
328    def add_reader(self, fd, callback, *args):
329        """Add a reader callback."""
330        self._ensure_fd_no_transport(fd)
331        self._add_reader(fd, callback, *args)
332
333    def remove_reader(self, fd):
334        """Remove a reader callback."""
335        self._ensure_fd_no_transport(fd)
336        return self._remove_reader(fd)
337
338    def add_writer(self, fd, callback, *args):
339        """Add a writer callback.."""
340        self._ensure_fd_no_transport(fd)
341        self._add_writer(fd, callback, *args)
342
343    def remove_writer(self, fd):
344        """Remove a writer callback."""
345        self._ensure_fd_no_transport(fd)
346        return self._remove_writer(fd)
347
348    async def sock_recv(self, sock, n):
349        """Receive data from the socket.
350
351        The return value is a bytes object representing the data received.
352        The maximum amount of data to be received at once is specified by
353        nbytes.
354        """
355        base_events._check_ssl_socket(sock)
356        if self._debug and sock.gettimeout() != 0:
357            raise ValueError("the socket must be non-blocking")
358        try:
359            return sock.recv(n)
360        except (BlockingIOError, InterruptedError):
361            pass
362        fut = self.create_future()
363        fd = sock.fileno()
364        self._ensure_fd_no_transport(fd)
365        handle = self._add_reader(fd, self._sock_recv, fut, sock, n)
366        fut.add_done_callback(
367            functools.partial(self._sock_read_done, fd, handle=handle))
368        return await fut
369
370    def _sock_read_done(self, fd, fut, handle=None):
371        if handle is None or not handle.cancelled():
372            self.remove_reader(fd)
373
374    def _sock_recv(self, fut, sock, n):
375        # _sock_recv() can add itself as an I/O callback if the operation can't
376        # be done immediately. Don't use it directly, call sock_recv().
377        if fut.done():
378            return
379        try:
380            data = sock.recv(n)
381        except (BlockingIOError, InterruptedError):
382            return  # try again next time
383        except (SystemExit, KeyboardInterrupt):
384            raise
385        except BaseException as exc:
386            fut.set_exception(exc)
387        else:
388            fut.set_result(data)
389
390    async def sock_recv_into(self, sock, buf):
391        """Receive data from the socket.
392
393        The received data is written into *buf* (a writable buffer).
394        The return value is the number of bytes written.
395        """
396        base_events._check_ssl_socket(sock)
397        if self._debug and sock.gettimeout() != 0:
398            raise ValueError("the socket must be non-blocking")
399        try:
400            return sock.recv_into(buf)
401        except (BlockingIOError, InterruptedError):
402            pass
403        fut = self.create_future()
404        fd = sock.fileno()
405        self._ensure_fd_no_transport(fd)
406        handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf)
407        fut.add_done_callback(
408            functools.partial(self._sock_read_done, fd, handle=handle))
409        return await fut
410
411    def _sock_recv_into(self, fut, sock, buf):
412        # _sock_recv_into() can add itself as an I/O callback if the operation
413        # can't be done immediately. Don't use it directly, call
414        # sock_recv_into().
415        if fut.done():
416            return
417        try:
418            nbytes = sock.recv_into(buf)
419        except (BlockingIOError, InterruptedError):
420            return  # try again next time
421        except (SystemExit, KeyboardInterrupt):
422            raise
423        except BaseException as exc:
424            fut.set_exception(exc)
425        else:
426            fut.set_result(nbytes)
427
428    async def sock_sendall(self, sock, data):
429        """Send data to the socket.
430
431        The socket must be connected to a remote socket. This method continues
432        to send data from data until either all data has been sent or an
433        error occurs. None is returned on success. On error, an exception is
434        raised, and there is no way to determine how much data, if any, was
435        successfully processed by the receiving end of the connection.
436        """
437        base_events._check_ssl_socket(sock)
438        if self._debug and sock.gettimeout() != 0:
439            raise ValueError("the socket must be non-blocking")
440        try:
441            n = sock.send(data)
442        except (BlockingIOError, InterruptedError):
443            n = 0
444
445        if n == len(data):
446            # all data sent
447            return
448
449        fut = self.create_future()
450        fd = sock.fileno()
451        self._ensure_fd_no_transport(fd)
452        # use a trick with a list in closure to store a mutable state
453        handle = self._add_writer(fd, self._sock_sendall, fut, sock,
454                                  memoryview(data), [n])
455        fut.add_done_callback(
456            functools.partial(self._sock_write_done, fd, handle=handle))
457        return await fut
458
459    def _sock_sendall(self, fut, sock, view, pos):
460        if fut.done():
461            # Future cancellation can be scheduled on previous loop iteration
462            return
463        start = pos[0]
464        try:
465            n = sock.send(view[start:])
466        except (BlockingIOError, InterruptedError):
467            return
468        except (SystemExit, KeyboardInterrupt):
469            raise
470        except BaseException as exc:
471            fut.set_exception(exc)
472            return
473
474        start += n
475
476        if start == len(view):
477            fut.set_result(None)
478        else:
479            pos[0] = start
480
481    async def sock_connect(self, sock, address):
482        """Connect to a remote socket at address.
483
484        This method is a coroutine.
485        """
486        base_events._check_ssl_socket(sock)
487        if self._debug and sock.gettimeout() != 0:
488            raise ValueError("the socket must be non-blocking")
489
490        if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
491            resolved = await self._ensure_resolved(
492                address, family=sock.family, type=sock.type, proto=sock.proto,
493                loop=self,
494            )
495            _, _, _, _, address = resolved[0]
496
497        fut = self.create_future()
498        self._sock_connect(fut, sock, address)
499        return await fut
500
501    def _sock_connect(self, fut, sock, address):
502        fd = sock.fileno()
503        try:
504            sock.connect(address)
505        except (BlockingIOError, InterruptedError):
506            # Issue #23618: When the C function connect() fails with EINTR, the
507            # connection runs in background. We have to wait until the socket
508            # becomes writable to be notified when the connection succeed or
509            # fails.
510            self._ensure_fd_no_transport(fd)
511            handle = self._add_writer(
512                fd, self._sock_connect_cb, fut, sock, address)
513            fut.add_done_callback(
514                functools.partial(self._sock_write_done, fd, handle=handle))
515        except (SystemExit, KeyboardInterrupt):
516            raise
517        except BaseException as exc:
518            fut.set_exception(exc)
519        else:
520            fut.set_result(None)
521
522    def _sock_write_done(self, fd, fut, handle=None):
523        if handle is None or not handle.cancelled():
524            self.remove_writer(fd)
525
526    def _sock_connect_cb(self, fut, sock, address):
527        if fut.done():
528            return
529
530        try:
531            err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
532            if err != 0:
533                # Jump to any except clause below.
534                raise OSError(err, f'Connect call failed {address}')
535        except (BlockingIOError, InterruptedError):
536            # socket is still registered, the callback will be retried later
537            pass
538        except (SystemExit, KeyboardInterrupt):
539            raise
540        except BaseException as exc:
541            fut.set_exception(exc)
542        else:
543            fut.set_result(None)
544
545    async def sock_accept(self, sock):
546        """Accept a connection.
547
548        The socket must be bound to an address and listening for connections.
549        The return value is a pair (conn, address) where conn is a new socket
550        object usable to send and receive data on the connection, and address
551        is the address bound to the socket on the other end of the connection.
552        """
553        base_events._check_ssl_socket(sock)
554        if self._debug and sock.gettimeout() != 0:
555            raise ValueError("the socket must be non-blocking")
556        fut = self.create_future()
557        self._sock_accept(fut, sock)
558        return await fut
559
560    def _sock_accept(self, fut, sock):
561        fd = sock.fileno()
562        try:
563            conn, address = sock.accept()
564            conn.setblocking(False)
565        except (BlockingIOError, InterruptedError):
566            self._ensure_fd_no_transport(fd)
567            handle = self._add_reader(fd, self._sock_accept, fut, sock)
568            fut.add_done_callback(
569                functools.partial(self._sock_read_done, fd, handle=handle))
570        except (SystemExit, KeyboardInterrupt):
571            raise
572        except BaseException as exc:
573            fut.set_exception(exc)
574        else:
575            fut.set_result((conn, address))
576
577    async def _sendfile_native(self, transp, file, offset, count):
578        del self._transports[transp._sock_fd]
579        resume_reading = transp.is_reading()
580        transp.pause_reading()
581        await transp._make_empty_waiter()
582        try:
583            return await self.sock_sendfile(transp._sock, file, offset, count,
584                                            fallback=False)
585        finally:
586            transp._reset_empty_waiter()
587            if resume_reading:
588                transp.resume_reading()
589            self._transports[transp._sock_fd] = transp
590
591    def _process_events(self, event_list):
592        for key, mask in event_list:
593            fileobj, (reader, writer) = key.fileobj, key.data
594            if mask & selectors.EVENT_READ and reader is not None:
595                if reader._cancelled:
596                    self._remove_reader(fileobj)
597                else:
598                    self._add_callback(reader)
599            if mask & selectors.EVENT_WRITE and writer is not None:
600                if writer._cancelled:
601                    self._remove_writer(fileobj)
602                else:
603                    self._add_callback(writer)
604
605    def _stop_serving(self, sock):
606        self._remove_reader(sock.fileno())
607        sock.close()
608
609
610class _SelectorTransport(transports._FlowControlMixin,
611                         transports.Transport):
612
613    max_size = 256 * 1024  # Buffer size passed to recv().
614
615    _buffer_factory = bytearray  # Constructs initial value for self._buffer.
616
617    # Attribute used in the destructor: it must be set even if the constructor
618    # is not called (see _SelectorSslTransport which may start by raising an
619    # exception)
620    _sock = None
621
622    def __init__(self, loop, sock, protocol, extra=None, server=None):
623        super().__init__(extra, loop)
624        self._extra['socket'] = trsock.TransportSocket(sock)
625        try:
626            self._extra['sockname'] = sock.getsockname()
627        except OSError:
628            self._extra['sockname'] = None
629        if 'peername' not in self._extra:
630            try:
631                self._extra['peername'] = sock.getpeername()
632            except socket.error:
633                self._extra['peername'] = None
634        self._sock = sock
635        self._sock_fd = sock.fileno()
636
637        self._protocol_connected = False
638        self.set_protocol(protocol)
639
640        self._server = server
641        self._buffer = self._buffer_factory()
642        self._conn_lost = 0  # Set when call to connection_lost scheduled.
643        self._closing = False  # Set when close() called.
644        if self._server is not None:
645            self._server._attach()
646        loop._transports[self._sock_fd] = self
647
648    def __repr__(self):
649        info = [self.__class__.__name__]
650        if self._sock is None:
651            info.append('closed')
652        elif self._closing:
653            info.append('closing')
654        info.append(f'fd={self._sock_fd}')
655        # test if the transport was closed
656        if self._loop is not None and not self._loop.is_closed():
657            polling = _test_selector_event(self._loop._selector,
658                                           self._sock_fd, selectors.EVENT_READ)
659            if polling:
660                info.append('read=polling')
661            else:
662                info.append('read=idle')
663
664            polling = _test_selector_event(self._loop._selector,
665                                           self._sock_fd,
666                                           selectors.EVENT_WRITE)
667            if polling:
668                state = 'polling'
669            else:
670                state = 'idle'
671
672            bufsize = self.get_write_buffer_size()
673            info.append(f'write=<{state}, bufsize={bufsize}>')
674        return '<{}>'.format(' '.join(info))
675
676    def abort(self):
677        self._force_close(None)
678
679    def set_protocol(self, protocol):
680        self._protocol = protocol
681        self._protocol_connected = True
682
683    def get_protocol(self):
684        return self._protocol
685
686    def is_closing(self):
687        return self._closing
688
689    def close(self):
690        if self._closing:
691            return
692        self._closing = True
693        self._loop._remove_reader(self._sock_fd)
694        if not self._buffer:
695            self._conn_lost += 1
696            self._loop._remove_writer(self._sock_fd)
697            self._loop.call_soon(self._call_connection_lost, None)
698
699    def __del__(self, _warn=warnings.warn):
700        if self._sock is not None:
701            _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
702            self._sock.close()
703
704    def _fatal_error(self, exc, message='Fatal error on transport'):
705        # Should be called from exception handler only.
706        if isinstance(exc, OSError):
707            if self._loop.get_debug():
708                logger.debug("%r: %s", self, message, exc_info=True)
709        else:
710            self._loop.call_exception_handler({
711                'message': message,
712                'exception': exc,
713                'transport': self,
714                'protocol': self._protocol,
715            })
716        self._force_close(exc)
717
718    def _force_close(self, exc):
719        if self._conn_lost:
720            return
721        if self._buffer:
722            self._buffer.clear()
723            self._loop._remove_writer(self._sock_fd)
724        if not self._closing:
725            self._closing = True
726            self._loop._remove_reader(self._sock_fd)
727        self._conn_lost += 1
728        self._loop.call_soon(self._call_connection_lost, exc)
729
730    def _call_connection_lost(self, exc):
731        try:
732            if self._protocol_connected:
733                self._protocol.connection_lost(exc)
734        finally:
735            self._sock.close()
736            self._sock = None
737            self._protocol = None
738            self._loop = None
739            server = self._server
740            if server is not None:
741                server._detach()
742                self._server = None
743
744    def get_write_buffer_size(self):
745        return len(self._buffer)
746
747    def _add_reader(self, fd, callback, *args):
748        if self._closing:
749            return
750
751        self._loop._add_reader(fd, callback, *args)
752
753
754class _SelectorSocketTransport(_SelectorTransport):
755
756    _start_tls_compatible = True
757    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
758
759    def __init__(self, loop, sock, protocol, waiter=None,
760                 extra=None, server=None):
761
762        self._read_ready_cb = None
763        super().__init__(loop, sock, protocol, extra, server)
764        self._eof = False
765        self._paused = False
766        self._empty_waiter = None
767
768        # Disable the Nagle algorithm -- small writes will be
769        # sent without waiting for the TCP ACK.  This generally
770        # decreases the latency (in some cases significantly.)
771        base_events._set_nodelay(self._sock)
772
773        self._loop.call_soon(self._protocol.connection_made, self)
774        # only start reading when connection_made() has been called
775        self._loop.call_soon(self._add_reader,
776                             self._sock_fd, self._read_ready)
777        if waiter is not None:
778            # only wake up the waiter when connection_made() has been called
779            self._loop.call_soon(futures._set_result_unless_cancelled,
780                                 waiter, None)
781
782    def set_protocol(self, protocol):
783        if isinstance(protocol, protocols.BufferedProtocol):
784            self._read_ready_cb = self._read_ready__get_buffer
785        else:
786            self._read_ready_cb = self._read_ready__data_received
787
788        super().set_protocol(protocol)
789
790    def is_reading(self):
791        return not self._paused and not self._closing
792
793    def pause_reading(self):
794        if self._closing or self._paused:
795            return
796        self._paused = True
797        self._loop._remove_reader(self._sock_fd)
798        if self._loop.get_debug():
799            logger.debug("%r pauses reading", self)
800
801    def resume_reading(self):
802        if self._closing or not self._paused:
803            return
804        self._paused = False
805        self._add_reader(self._sock_fd, self._read_ready)
806        if self._loop.get_debug():
807            logger.debug("%r resumes reading", self)
808
809    def _read_ready(self):
810        self._read_ready_cb()
811
812    def _read_ready__get_buffer(self):
813        if self._conn_lost:
814            return
815
816        try:
817            buf = self._protocol.get_buffer(-1)
818            if not len(buf):
819                raise RuntimeError('get_buffer() returned an empty buffer')
820        except (SystemExit, KeyboardInterrupt):
821            raise
822        except BaseException as exc:
823            self._fatal_error(
824                exc, 'Fatal error: protocol.get_buffer() call failed.')
825            return
826
827        try:
828            nbytes = self._sock.recv_into(buf)
829        except (BlockingIOError, InterruptedError):
830            return
831        except (SystemExit, KeyboardInterrupt):
832            raise
833        except BaseException as exc:
834            self._fatal_error(exc, 'Fatal read error on socket transport')
835            return
836
837        if not nbytes:
838            self._read_ready__on_eof()
839            return
840
841        try:
842            self._protocol.buffer_updated(nbytes)
843        except (SystemExit, KeyboardInterrupt):
844            raise
845        except BaseException as exc:
846            self._fatal_error(
847                exc, 'Fatal error: protocol.buffer_updated() call failed.')
848
849    def _read_ready__data_received(self):
850        if self._conn_lost:
851            return
852        try:
853            data = self._sock.recv(self.max_size)
854        except (BlockingIOError, InterruptedError):
855            return
856        except (SystemExit, KeyboardInterrupt):
857            raise
858        except BaseException as exc:
859            self._fatal_error(exc, 'Fatal read error on socket transport')
860            return
861
862        if not data:
863            self._read_ready__on_eof()
864            return
865
866        try:
867            self._protocol.data_received(data)
868        except (SystemExit, KeyboardInterrupt):
869            raise
870        except BaseException as exc:
871            self._fatal_error(
872                exc, 'Fatal error: protocol.data_received() call failed.')
873
874    def _read_ready__on_eof(self):
875        if self._loop.get_debug():
876            logger.debug("%r received EOF", self)
877
878        try:
879            keep_open = self._protocol.eof_received()
880        except (SystemExit, KeyboardInterrupt):
881            raise
882        except BaseException as exc:
883            self._fatal_error(
884                exc, 'Fatal error: protocol.eof_received() call failed.')
885            return
886
887        if keep_open:
888            # We're keeping the connection open so the
889            # protocol can write more, but we still can't
890            # receive more, so remove the reader callback.
891            self._loop._remove_reader(self._sock_fd)
892        else:
893            self.close()
894
895    def write(self, data):
896        if not isinstance(data, (bytes, bytearray, memoryview)):
897            raise TypeError(f'data argument must be a bytes-like object, '
898                            f'not {type(data).__name__!r}')
899        if self._eof:
900            raise RuntimeError('Cannot call write() after write_eof()')
901        if self._empty_waiter is not None:
902            raise RuntimeError('unable to write; sendfile is in progress')
903        if not data:
904            return
905
906        if self._conn_lost:
907            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
908                logger.warning('socket.send() raised exception.')
909            self._conn_lost += 1
910            return
911
912        if not self._buffer:
913            # Optimization: try to send now.
914            try:
915                n = self._sock.send(data)
916            except (BlockingIOError, InterruptedError):
917                pass
918            except (SystemExit, KeyboardInterrupt):
919                raise
920            except BaseException as exc:
921                self._fatal_error(exc, 'Fatal write error on socket transport')
922                return
923            else:
924                data = data[n:]
925                if not data:
926                    return
927            # Not all was written; register write handler.
928            self._loop._add_writer(self._sock_fd, self._write_ready)
929
930        # Add it to the buffer.
931        self._buffer.extend(data)
932        self._maybe_pause_protocol()
933
934    def _write_ready(self):
935        assert self._buffer, 'Data should not be empty'
936
937        if self._conn_lost:
938            return
939        try:
940            n = self._sock.send(self._buffer)
941        except (BlockingIOError, InterruptedError):
942            pass
943        except (SystemExit, KeyboardInterrupt):
944            raise
945        except BaseException as exc:
946            self._loop._remove_writer(self._sock_fd)
947            self._buffer.clear()
948            self._fatal_error(exc, 'Fatal write error on socket transport')
949            if self._empty_waiter is not None:
950                self._empty_waiter.set_exception(exc)
951        else:
952            if n:
953                del self._buffer[:n]
954            self._maybe_resume_protocol()  # May append to buffer.
955            if not self._buffer:
956                self._loop._remove_writer(self._sock_fd)
957                if self._empty_waiter is not None:
958                    self._empty_waiter.set_result(None)
959                if self._closing:
960                    self._call_connection_lost(None)
961                elif self._eof:
962                    self._sock.shutdown(socket.SHUT_WR)
963
964    def write_eof(self):
965        if self._closing or self._eof:
966            return
967        self._eof = True
968        if not self._buffer:
969            self._sock.shutdown(socket.SHUT_WR)
970
971    def can_write_eof(self):
972        return True
973
974    def _call_connection_lost(self, exc):
975        super()._call_connection_lost(exc)
976        if self._empty_waiter is not None:
977            self._empty_waiter.set_exception(
978                ConnectionError("Connection is closed by peer"))
979
980    def _make_empty_waiter(self):
981        if self._empty_waiter is not None:
982            raise RuntimeError("Empty waiter is already set")
983        self._empty_waiter = self._loop.create_future()
984        if not self._buffer:
985            self._empty_waiter.set_result(None)
986        return self._empty_waiter
987
988    def _reset_empty_waiter(self):
989        self._empty_waiter = None
990
991
992class _SelectorDatagramTransport(_SelectorTransport):
993
994    _buffer_factory = collections.deque
995
996    def __init__(self, loop, sock, protocol, address=None,
997                 waiter=None, extra=None):
998        super().__init__(loop, sock, protocol, extra)
999        self._address = address
1000        self._loop.call_soon(self._protocol.connection_made, self)
1001        # only start reading when connection_made() has been called
1002        self._loop.call_soon(self._add_reader,
1003                             self._sock_fd, self._read_ready)
1004        if waiter is not None:
1005            # only wake up the waiter when connection_made() has been called
1006            self._loop.call_soon(futures._set_result_unless_cancelled,
1007                                 waiter, None)
1008
1009    def get_write_buffer_size(self):
1010        return sum(len(data) for data, _ in self._buffer)
1011
1012    def _read_ready(self):
1013        if self._conn_lost:
1014            return
1015        try:
1016            data, addr = self._sock.recvfrom(self.max_size)
1017        except (BlockingIOError, InterruptedError):
1018            pass
1019        except OSError as exc:
1020            self._protocol.error_received(exc)
1021        except (SystemExit, KeyboardInterrupt):
1022            raise
1023        except BaseException as exc:
1024            self._fatal_error(exc, 'Fatal read error on datagram transport')
1025        else:
1026            self._protocol.datagram_received(data, addr)
1027
1028    def sendto(self, data, addr=None):
1029        if not isinstance(data, (bytes, bytearray, memoryview)):
1030            raise TypeError(f'data argument must be a bytes-like object, '
1031                            f'not {type(data).__name__!r}')
1032        if not data:
1033            return
1034
1035        if self._address:
1036            if addr not in (None, self._address):
1037                raise ValueError(
1038                    f'Invalid address: must be None or {self._address}')
1039            addr = self._address
1040
1041        if self._conn_lost and self._address:
1042            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
1043                logger.warning('socket.send() raised exception.')
1044            self._conn_lost += 1
1045            return
1046
1047        if not self._buffer:
1048            # Attempt to send it right away first.
1049            try:
1050                if self._extra['peername']:
1051                    self._sock.send(data)
1052                else:
1053                    self._sock.sendto(data, addr)
1054                return
1055            except (BlockingIOError, InterruptedError):
1056                self._loop._add_writer(self._sock_fd, self._sendto_ready)
1057            except OSError as exc:
1058                self._protocol.error_received(exc)
1059                return
1060            except (SystemExit, KeyboardInterrupt):
1061                raise
1062            except BaseException as exc:
1063                self._fatal_error(
1064                    exc, 'Fatal write error on datagram transport')
1065                return
1066
1067        # Ensure that what we buffer is immutable.
1068        self._buffer.append((bytes(data), addr))
1069        self._maybe_pause_protocol()
1070
1071    def _sendto_ready(self):
1072        while self._buffer:
1073            data, addr = self._buffer.popleft()
1074            try:
1075                if self._extra['peername']:
1076                    self._sock.send(data)
1077                else:
1078                    self._sock.sendto(data, addr)
1079            except (BlockingIOError, InterruptedError):
1080                self._buffer.appendleft((data, addr))  # Try again later.
1081                break
1082            except OSError as exc:
1083                self._protocol.error_received(exc)
1084                return
1085            except (SystemExit, KeyboardInterrupt):
1086                raise
1087            except BaseException as exc:
1088                self._fatal_error(
1089                    exc, 'Fatal write error on datagram transport')
1090                return
1091
1092        self._maybe_resume_protocol()  # May append to buffer.
1093        if not self._buffer:
1094            self._loop._remove_writer(self._sock_fd)
1095            if self._closing:
1096                self._call_connection_lost(None)
1097