1"""Event loop using a selector and related classes. 2 3A selector is a "notify-when-ready" multiplexer. For a subclass which 4also includes support for signal handling, see the unix_events sub-module. 5""" 6 7__all__ = 'BaseSelectorEventLoop', 8 9import collections 10import errno 11import functools 12import selectors 13import socket 14import warnings 15import weakref 16try: 17 import ssl 18except ImportError: # pragma: no cover 19 ssl = None 20 21from . import base_events 22from . import constants 23from . import events 24from . import futures 25from . import protocols 26from . import sslproto 27from . import transports 28from . import trsock 29from .log import logger 30 31 32def _test_selector_event(selector, fd, event): 33 # Test if the selector is monitoring 'event' events 34 # for the file descriptor 'fd'. 35 try: 36 key = selector.get_key(fd) 37 except KeyError: 38 return False 39 else: 40 return bool(key.events & event) 41 42 43class BaseSelectorEventLoop(base_events.BaseEventLoop): 44 """Selector event loop. 45 46 See events.EventLoop for API specification. 47 """ 48 49 def __init__(self, selector=None): 50 super().__init__() 51 52 if selector is None: 53 selector = selectors.DefaultSelector() 54 logger.debug('Using selector: %s', selector.__class__.__name__) 55 self._selector = selector 56 self._make_self_pipe() 57 self._transports = weakref.WeakValueDictionary() 58 59 def _make_socket_transport(self, sock, protocol, waiter=None, *, 60 extra=None, server=None): 61 return _SelectorSocketTransport(self, sock, protocol, waiter, 62 extra, server) 63 64 def _make_ssl_transport( 65 self, rawsock, protocol, sslcontext, waiter=None, 66 *, server_side=False, server_hostname=None, 67 extra=None, server=None, 68 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 69 ssl_protocol = sslproto.SSLProtocol( 70 self, protocol, sslcontext, waiter, 71 server_side, server_hostname, 72 ssl_handshake_timeout=ssl_handshake_timeout) 73 _SelectorSocketTransport(self, rawsock, ssl_protocol, 74 extra=extra, server=server) 75 return ssl_protocol._app_transport 76 77 def _make_datagram_transport(self, sock, protocol, 78 address=None, waiter=None, extra=None): 79 return _SelectorDatagramTransport(self, sock, protocol, 80 address, waiter, extra) 81 82 def close(self): 83 if self.is_running(): 84 raise RuntimeError("Cannot close a running event loop") 85 if self.is_closed(): 86 return 87 self._close_self_pipe() 88 super().close() 89 if self._selector is not None: 90 self._selector.close() 91 self._selector = None 92 93 def _close_self_pipe(self): 94 self._remove_reader(self._ssock.fileno()) 95 self._ssock.close() 96 self._ssock = None 97 self._csock.close() 98 self._csock = None 99 self._internal_fds -= 1 100 101 def _make_self_pipe(self): 102 # A self-socket, really. :-) 103 self._ssock, self._csock = socket.socketpair() 104 self._ssock.setblocking(False) 105 self._csock.setblocking(False) 106 self._internal_fds += 1 107 self._add_reader(self._ssock.fileno(), self._read_from_self) 108 109 def _process_self_data(self, data): 110 pass 111 112 def _read_from_self(self): 113 while True: 114 try: 115 data = self._ssock.recv(4096) 116 if not data: 117 break 118 self._process_self_data(data) 119 except InterruptedError: 120 continue 121 except BlockingIOError: 122 break 123 124 def _write_to_self(self): 125 # This may be called from a different thread, possibly after 126 # _close_self_pipe() has been called or even while it is 127 # running. Guard for self._csock being None or closed. When 128 # a socket is closed, send() raises OSError (with errno set to 129 # EBADF, but let's not rely on the exact error code). 130 csock = self._csock 131 if csock is None: 132 return 133 134 try: 135 csock.send(b'\0') 136 except OSError: 137 if self._debug: 138 logger.debug("Fail to write a null byte into the " 139 "self-pipe socket", 140 exc_info=True) 141 142 def _start_serving(self, protocol_factory, sock, 143 sslcontext=None, server=None, backlog=100, 144 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 145 self._add_reader(sock.fileno(), self._accept_connection, 146 protocol_factory, sock, sslcontext, server, backlog, 147 ssl_handshake_timeout) 148 149 def _accept_connection( 150 self, protocol_factory, sock, 151 sslcontext=None, server=None, backlog=100, 152 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 153 # This method is only called once for each event loop tick where the 154 # listening socket has triggered an EVENT_READ. There may be multiple 155 # connections waiting for an .accept() so it is called in a loop. 156 # See https://bugs.python.org/issue27906 for more details. 157 for _ in range(backlog): 158 try: 159 conn, addr = sock.accept() 160 if self._debug: 161 logger.debug("%r got a new connection from %r: %r", 162 server, addr, conn) 163 conn.setblocking(False) 164 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 165 # Early exit because the socket accept buffer is empty. 166 return None 167 except OSError as exc: 168 # There's nowhere to send the error, so just log it. 169 if exc.errno in (errno.EMFILE, errno.ENFILE, 170 errno.ENOBUFS, errno.ENOMEM): 171 # Some platforms (e.g. Linux keep reporting the FD as 172 # ready, so we remove the read handler temporarily. 173 # We'll try again in a while. 174 self.call_exception_handler({ 175 'message': 'socket.accept() out of system resource', 176 'exception': exc, 177 'socket': trsock.TransportSocket(sock), 178 }) 179 self._remove_reader(sock.fileno()) 180 self.call_later(constants.ACCEPT_RETRY_DELAY, 181 self._start_serving, 182 protocol_factory, sock, sslcontext, server, 183 backlog, ssl_handshake_timeout) 184 else: 185 raise # The event loop will catch, log and ignore it. 186 else: 187 extra = {'peername': addr} 188 accept = self._accept_connection2( 189 protocol_factory, conn, extra, sslcontext, server, 190 ssl_handshake_timeout) 191 self.create_task(accept) 192 193 async def _accept_connection2( 194 self, protocol_factory, conn, extra, 195 sslcontext=None, server=None, 196 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 197 protocol = None 198 transport = None 199 try: 200 protocol = protocol_factory() 201 waiter = self.create_future() 202 if sslcontext: 203 transport = self._make_ssl_transport( 204 conn, protocol, sslcontext, waiter=waiter, 205 server_side=True, extra=extra, server=server, 206 ssl_handshake_timeout=ssl_handshake_timeout) 207 else: 208 transport = self._make_socket_transport( 209 conn, protocol, waiter=waiter, extra=extra, 210 server=server) 211 212 try: 213 await waiter 214 except BaseException: 215 transport.close() 216 raise 217 # It's now up to the protocol to handle the connection. 218 219 except (SystemExit, KeyboardInterrupt): 220 raise 221 except BaseException as exc: 222 if self._debug: 223 context = { 224 'message': 225 'Error on transport creation for incoming connection', 226 'exception': exc, 227 } 228 if protocol is not None: 229 context['protocol'] = protocol 230 if transport is not None: 231 context['transport'] = transport 232 self.call_exception_handler(context) 233 234 def _ensure_fd_no_transport(self, fd): 235 fileno = fd 236 if not isinstance(fileno, int): 237 try: 238 fileno = int(fileno.fileno()) 239 except (AttributeError, TypeError, ValueError): 240 # This code matches selectors._fileobj_to_fd function. 241 raise ValueError(f"Invalid file object: {fd!r}") from None 242 try: 243 transport = self._transports[fileno] 244 except KeyError: 245 pass 246 else: 247 if not transport.is_closing(): 248 raise RuntimeError( 249 f'File descriptor {fd!r} is used by transport ' 250 f'{transport!r}') 251 252 def _add_reader(self, fd, callback, *args): 253 self._check_closed() 254 handle = events.Handle(callback, args, self, None) 255 try: 256 key = self._selector.get_key(fd) 257 except KeyError: 258 self._selector.register(fd, selectors.EVENT_READ, 259 (handle, None)) 260 else: 261 mask, (reader, writer) = key.events, key.data 262 self._selector.modify(fd, mask | selectors.EVENT_READ, 263 (handle, writer)) 264 if reader is not None: 265 reader.cancel() 266 return handle 267 268 def _remove_reader(self, fd): 269 if self.is_closed(): 270 return False 271 try: 272 key = self._selector.get_key(fd) 273 except KeyError: 274 return False 275 else: 276 mask, (reader, writer) = key.events, key.data 277 mask &= ~selectors.EVENT_READ 278 if not mask: 279 self._selector.unregister(fd) 280 else: 281 self._selector.modify(fd, mask, (None, writer)) 282 283 if reader is not None: 284 reader.cancel() 285 return True 286 else: 287 return False 288 289 def _add_writer(self, fd, callback, *args): 290 self._check_closed() 291 handle = events.Handle(callback, args, self, None) 292 try: 293 key = self._selector.get_key(fd) 294 except KeyError: 295 self._selector.register(fd, selectors.EVENT_WRITE, 296 (None, handle)) 297 else: 298 mask, (reader, writer) = key.events, key.data 299 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 300 (reader, handle)) 301 if writer is not None: 302 writer.cancel() 303 return handle 304 305 def _remove_writer(self, fd): 306 """Remove a writer callback.""" 307 if self.is_closed(): 308 return False 309 try: 310 key = self._selector.get_key(fd) 311 except KeyError: 312 return False 313 else: 314 mask, (reader, writer) = key.events, key.data 315 # Remove both writer and connector. 316 mask &= ~selectors.EVENT_WRITE 317 if not mask: 318 self._selector.unregister(fd) 319 else: 320 self._selector.modify(fd, mask, (reader, None)) 321 322 if writer is not None: 323 writer.cancel() 324 return True 325 else: 326 return False 327 328 def add_reader(self, fd, callback, *args): 329 """Add a reader callback.""" 330 self._ensure_fd_no_transport(fd) 331 self._add_reader(fd, callback, *args) 332 333 def remove_reader(self, fd): 334 """Remove a reader callback.""" 335 self._ensure_fd_no_transport(fd) 336 return self._remove_reader(fd) 337 338 def add_writer(self, fd, callback, *args): 339 """Add a writer callback..""" 340 self._ensure_fd_no_transport(fd) 341 self._add_writer(fd, callback, *args) 342 343 def remove_writer(self, fd): 344 """Remove a writer callback.""" 345 self._ensure_fd_no_transport(fd) 346 return self._remove_writer(fd) 347 348 async def sock_recv(self, sock, n): 349 """Receive data from the socket. 350 351 The return value is a bytes object representing the data received. 352 The maximum amount of data to be received at once is specified by 353 nbytes. 354 """ 355 base_events._check_ssl_socket(sock) 356 if self._debug and sock.gettimeout() != 0: 357 raise ValueError("the socket must be non-blocking") 358 try: 359 return sock.recv(n) 360 except (BlockingIOError, InterruptedError): 361 pass 362 fut = self.create_future() 363 fd = sock.fileno() 364 self._ensure_fd_no_transport(fd) 365 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 366 fut.add_done_callback( 367 functools.partial(self._sock_read_done, fd, handle=handle)) 368 return await fut 369 370 def _sock_read_done(self, fd, fut, handle=None): 371 if handle is None or not handle.cancelled(): 372 self.remove_reader(fd) 373 374 def _sock_recv(self, fut, sock, n): 375 # _sock_recv() can add itself as an I/O callback if the operation can't 376 # be done immediately. Don't use it directly, call sock_recv(). 377 if fut.done(): 378 return 379 try: 380 data = sock.recv(n) 381 except (BlockingIOError, InterruptedError): 382 return # try again next time 383 except (SystemExit, KeyboardInterrupt): 384 raise 385 except BaseException as exc: 386 fut.set_exception(exc) 387 else: 388 fut.set_result(data) 389 390 async def sock_recv_into(self, sock, buf): 391 """Receive data from the socket. 392 393 The received data is written into *buf* (a writable buffer). 394 The return value is the number of bytes written. 395 """ 396 base_events._check_ssl_socket(sock) 397 if self._debug and sock.gettimeout() != 0: 398 raise ValueError("the socket must be non-blocking") 399 try: 400 return sock.recv_into(buf) 401 except (BlockingIOError, InterruptedError): 402 pass 403 fut = self.create_future() 404 fd = sock.fileno() 405 self._ensure_fd_no_transport(fd) 406 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 407 fut.add_done_callback( 408 functools.partial(self._sock_read_done, fd, handle=handle)) 409 return await fut 410 411 def _sock_recv_into(self, fut, sock, buf): 412 # _sock_recv_into() can add itself as an I/O callback if the operation 413 # can't be done immediately. Don't use it directly, call 414 # sock_recv_into(). 415 if fut.done(): 416 return 417 try: 418 nbytes = sock.recv_into(buf) 419 except (BlockingIOError, InterruptedError): 420 return # try again next time 421 except (SystemExit, KeyboardInterrupt): 422 raise 423 except BaseException as exc: 424 fut.set_exception(exc) 425 else: 426 fut.set_result(nbytes) 427 428 async def sock_sendall(self, sock, data): 429 """Send data to the socket. 430 431 The socket must be connected to a remote socket. This method continues 432 to send data from data until either all data has been sent or an 433 error occurs. None is returned on success. On error, an exception is 434 raised, and there is no way to determine how much data, if any, was 435 successfully processed by the receiving end of the connection. 436 """ 437 base_events._check_ssl_socket(sock) 438 if self._debug and sock.gettimeout() != 0: 439 raise ValueError("the socket must be non-blocking") 440 try: 441 n = sock.send(data) 442 except (BlockingIOError, InterruptedError): 443 n = 0 444 445 if n == len(data): 446 # all data sent 447 return 448 449 fut = self.create_future() 450 fd = sock.fileno() 451 self._ensure_fd_no_transport(fd) 452 # use a trick with a list in closure to store a mutable state 453 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 454 memoryview(data), [n]) 455 fut.add_done_callback( 456 functools.partial(self._sock_write_done, fd, handle=handle)) 457 return await fut 458 459 def _sock_sendall(self, fut, sock, view, pos): 460 if fut.done(): 461 # Future cancellation can be scheduled on previous loop iteration 462 return 463 start = pos[0] 464 try: 465 n = sock.send(view[start:]) 466 except (BlockingIOError, InterruptedError): 467 return 468 except (SystemExit, KeyboardInterrupt): 469 raise 470 except BaseException as exc: 471 fut.set_exception(exc) 472 return 473 474 start += n 475 476 if start == len(view): 477 fut.set_result(None) 478 else: 479 pos[0] = start 480 481 async def sock_connect(self, sock, address): 482 """Connect to a remote socket at address. 483 484 This method is a coroutine. 485 """ 486 base_events._check_ssl_socket(sock) 487 if self._debug and sock.gettimeout() != 0: 488 raise ValueError("the socket must be non-blocking") 489 490 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 491 resolved = await self._ensure_resolved( 492 address, family=sock.family, type=sock.type, proto=sock.proto, 493 loop=self, 494 ) 495 _, _, _, _, address = resolved[0] 496 497 fut = self.create_future() 498 self._sock_connect(fut, sock, address) 499 return await fut 500 501 def _sock_connect(self, fut, sock, address): 502 fd = sock.fileno() 503 try: 504 sock.connect(address) 505 except (BlockingIOError, InterruptedError): 506 # Issue #23618: When the C function connect() fails with EINTR, the 507 # connection runs in background. We have to wait until the socket 508 # becomes writable to be notified when the connection succeed or 509 # fails. 510 self._ensure_fd_no_transport(fd) 511 handle = self._add_writer( 512 fd, self._sock_connect_cb, fut, sock, address) 513 fut.add_done_callback( 514 functools.partial(self._sock_write_done, fd, handle=handle)) 515 except (SystemExit, KeyboardInterrupt): 516 raise 517 except BaseException as exc: 518 fut.set_exception(exc) 519 else: 520 fut.set_result(None) 521 522 def _sock_write_done(self, fd, fut, handle=None): 523 if handle is None or not handle.cancelled(): 524 self.remove_writer(fd) 525 526 def _sock_connect_cb(self, fut, sock, address): 527 if fut.done(): 528 return 529 530 try: 531 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 532 if err != 0: 533 # Jump to any except clause below. 534 raise OSError(err, f'Connect call failed {address}') 535 except (BlockingIOError, InterruptedError): 536 # socket is still registered, the callback will be retried later 537 pass 538 except (SystemExit, KeyboardInterrupt): 539 raise 540 except BaseException as exc: 541 fut.set_exception(exc) 542 else: 543 fut.set_result(None) 544 545 async def sock_accept(self, sock): 546 """Accept a connection. 547 548 The socket must be bound to an address and listening for connections. 549 The return value is a pair (conn, address) where conn is a new socket 550 object usable to send and receive data on the connection, and address 551 is the address bound to the socket on the other end of the connection. 552 """ 553 base_events._check_ssl_socket(sock) 554 if self._debug and sock.gettimeout() != 0: 555 raise ValueError("the socket must be non-blocking") 556 fut = self.create_future() 557 self._sock_accept(fut, sock) 558 return await fut 559 560 def _sock_accept(self, fut, sock): 561 fd = sock.fileno() 562 try: 563 conn, address = sock.accept() 564 conn.setblocking(False) 565 except (BlockingIOError, InterruptedError): 566 self._ensure_fd_no_transport(fd) 567 handle = self._add_reader(fd, self._sock_accept, fut, sock) 568 fut.add_done_callback( 569 functools.partial(self._sock_read_done, fd, handle=handle)) 570 except (SystemExit, KeyboardInterrupt): 571 raise 572 except BaseException as exc: 573 fut.set_exception(exc) 574 else: 575 fut.set_result((conn, address)) 576 577 async def _sendfile_native(self, transp, file, offset, count): 578 del self._transports[transp._sock_fd] 579 resume_reading = transp.is_reading() 580 transp.pause_reading() 581 await transp._make_empty_waiter() 582 try: 583 return await self.sock_sendfile(transp._sock, file, offset, count, 584 fallback=False) 585 finally: 586 transp._reset_empty_waiter() 587 if resume_reading: 588 transp.resume_reading() 589 self._transports[transp._sock_fd] = transp 590 591 def _process_events(self, event_list): 592 for key, mask in event_list: 593 fileobj, (reader, writer) = key.fileobj, key.data 594 if mask & selectors.EVENT_READ and reader is not None: 595 if reader._cancelled: 596 self._remove_reader(fileobj) 597 else: 598 self._add_callback(reader) 599 if mask & selectors.EVENT_WRITE and writer is not None: 600 if writer._cancelled: 601 self._remove_writer(fileobj) 602 else: 603 self._add_callback(writer) 604 605 def _stop_serving(self, sock): 606 self._remove_reader(sock.fileno()) 607 sock.close() 608 609 610class _SelectorTransport(transports._FlowControlMixin, 611 transports.Transport): 612 613 max_size = 256 * 1024 # Buffer size passed to recv(). 614 615 _buffer_factory = bytearray # Constructs initial value for self._buffer. 616 617 # Attribute used in the destructor: it must be set even if the constructor 618 # is not called (see _SelectorSslTransport which may start by raising an 619 # exception) 620 _sock = None 621 622 def __init__(self, loop, sock, protocol, extra=None, server=None): 623 super().__init__(extra, loop) 624 self._extra['socket'] = trsock.TransportSocket(sock) 625 try: 626 self._extra['sockname'] = sock.getsockname() 627 except OSError: 628 self._extra['sockname'] = None 629 if 'peername' not in self._extra: 630 try: 631 self._extra['peername'] = sock.getpeername() 632 except socket.error: 633 self._extra['peername'] = None 634 self._sock = sock 635 self._sock_fd = sock.fileno() 636 637 self._protocol_connected = False 638 self.set_protocol(protocol) 639 640 self._server = server 641 self._buffer = self._buffer_factory() 642 self._conn_lost = 0 # Set when call to connection_lost scheduled. 643 self._closing = False # Set when close() called. 644 if self._server is not None: 645 self._server._attach() 646 loop._transports[self._sock_fd] = self 647 648 def __repr__(self): 649 info = [self.__class__.__name__] 650 if self._sock is None: 651 info.append('closed') 652 elif self._closing: 653 info.append('closing') 654 info.append(f'fd={self._sock_fd}') 655 # test if the transport was closed 656 if self._loop is not None and not self._loop.is_closed(): 657 polling = _test_selector_event(self._loop._selector, 658 self._sock_fd, selectors.EVENT_READ) 659 if polling: 660 info.append('read=polling') 661 else: 662 info.append('read=idle') 663 664 polling = _test_selector_event(self._loop._selector, 665 self._sock_fd, 666 selectors.EVENT_WRITE) 667 if polling: 668 state = 'polling' 669 else: 670 state = 'idle' 671 672 bufsize = self.get_write_buffer_size() 673 info.append(f'write=<{state}, bufsize={bufsize}>') 674 return '<{}>'.format(' '.join(info)) 675 676 def abort(self): 677 self._force_close(None) 678 679 def set_protocol(self, protocol): 680 self._protocol = protocol 681 self._protocol_connected = True 682 683 def get_protocol(self): 684 return self._protocol 685 686 def is_closing(self): 687 return self._closing 688 689 def close(self): 690 if self._closing: 691 return 692 self._closing = True 693 self._loop._remove_reader(self._sock_fd) 694 if not self._buffer: 695 self._conn_lost += 1 696 self._loop._remove_writer(self._sock_fd) 697 self._loop.call_soon(self._call_connection_lost, None) 698 699 def __del__(self, _warn=warnings.warn): 700 if self._sock is not None: 701 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 702 self._sock.close() 703 704 def _fatal_error(self, exc, message='Fatal error on transport'): 705 # Should be called from exception handler only. 706 if isinstance(exc, OSError): 707 if self._loop.get_debug(): 708 logger.debug("%r: %s", self, message, exc_info=True) 709 else: 710 self._loop.call_exception_handler({ 711 'message': message, 712 'exception': exc, 713 'transport': self, 714 'protocol': self._protocol, 715 }) 716 self._force_close(exc) 717 718 def _force_close(self, exc): 719 if self._conn_lost: 720 return 721 if self._buffer: 722 self._buffer.clear() 723 self._loop._remove_writer(self._sock_fd) 724 if not self._closing: 725 self._closing = True 726 self._loop._remove_reader(self._sock_fd) 727 self._conn_lost += 1 728 self._loop.call_soon(self._call_connection_lost, exc) 729 730 def _call_connection_lost(self, exc): 731 try: 732 if self._protocol_connected: 733 self._protocol.connection_lost(exc) 734 finally: 735 self._sock.close() 736 self._sock = None 737 self._protocol = None 738 self._loop = None 739 server = self._server 740 if server is not None: 741 server._detach() 742 self._server = None 743 744 def get_write_buffer_size(self): 745 return len(self._buffer) 746 747 def _add_reader(self, fd, callback, *args): 748 if self._closing: 749 return 750 751 self._loop._add_reader(fd, callback, *args) 752 753 754class _SelectorSocketTransport(_SelectorTransport): 755 756 _start_tls_compatible = True 757 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 758 759 def __init__(self, loop, sock, protocol, waiter=None, 760 extra=None, server=None): 761 762 self._read_ready_cb = None 763 super().__init__(loop, sock, protocol, extra, server) 764 self._eof = False 765 self._paused = False 766 self._empty_waiter = None 767 768 # Disable the Nagle algorithm -- small writes will be 769 # sent without waiting for the TCP ACK. This generally 770 # decreases the latency (in some cases significantly.) 771 base_events._set_nodelay(self._sock) 772 773 self._loop.call_soon(self._protocol.connection_made, self) 774 # only start reading when connection_made() has been called 775 self._loop.call_soon(self._add_reader, 776 self._sock_fd, self._read_ready) 777 if waiter is not None: 778 # only wake up the waiter when connection_made() has been called 779 self._loop.call_soon(futures._set_result_unless_cancelled, 780 waiter, None) 781 782 def set_protocol(self, protocol): 783 if isinstance(protocol, protocols.BufferedProtocol): 784 self._read_ready_cb = self._read_ready__get_buffer 785 else: 786 self._read_ready_cb = self._read_ready__data_received 787 788 super().set_protocol(protocol) 789 790 def is_reading(self): 791 return not self._paused and not self._closing 792 793 def pause_reading(self): 794 if self._closing or self._paused: 795 return 796 self._paused = True 797 self._loop._remove_reader(self._sock_fd) 798 if self._loop.get_debug(): 799 logger.debug("%r pauses reading", self) 800 801 def resume_reading(self): 802 if self._closing or not self._paused: 803 return 804 self._paused = False 805 self._add_reader(self._sock_fd, self._read_ready) 806 if self._loop.get_debug(): 807 logger.debug("%r resumes reading", self) 808 809 def _read_ready(self): 810 self._read_ready_cb() 811 812 def _read_ready__get_buffer(self): 813 if self._conn_lost: 814 return 815 816 try: 817 buf = self._protocol.get_buffer(-1) 818 if not len(buf): 819 raise RuntimeError('get_buffer() returned an empty buffer') 820 except (SystemExit, KeyboardInterrupt): 821 raise 822 except BaseException as exc: 823 self._fatal_error( 824 exc, 'Fatal error: protocol.get_buffer() call failed.') 825 return 826 827 try: 828 nbytes = self._sock.recv_into(buf) 829 except (BlockingIOError, InterruptedError): 830 return 831 except (SystemExit, KeyboardInterrupt): 832 raise 833 except BaseException as exc: 834 self._fatal_error(exc, 'Fatal read error on socket transport') 835 return 836 837 if not nbytes: 838 self._read_ready__on_eof() 839 return 840 841 try: 842 self._protocol.buffer_updated(nbytes) 843 except (SystemExit, KeyboardInterrupt): 844 raise 845 except BaseException as exc: 846 self._fatal_error( 847 exc, 'Fatal error: protocol.buffer_updated() call failed.') 848 849 def _read_ready__data_received(self): 850 if self._conn_lost: 851 return 852 try: 853 data = self._sock.recv(self.max_size) 854 except (BlockingIOError, InterruptedError): 855 return 856 except (SystemExit, KeyboardInterrupt): 857 raise 858 except BaseException as exc: 859 self._fatal_error(exc, 'Fatal read error on socket transport') 860 return 861 862 if not data: 863 self._read_ready__on_eof() 864 return 865 866 try: 867 self._protocol.data_received(data) 868 except (SystemExit, KeyboardInterrupt): 869 raise 870 except BaseException as exc: 871 self._fatal_error( 872 exc, 'Fatal error: protocol.data_received() call failed.') 873 874 def _read_ready__on_eof(self): 875 if self._loop.get_debug(): 876 logger.debug("%r received EOF", self) 877 878 try: 879 keep_open = self._protocol.eof_received() 880 except (SystemExit, KeyboardInterrupt): 881 raise 882 except BaseException as exc: 883 self._fatal_error( 884 exc, 'Fatal error: protocol.eof_received() call failed.') 885 return 886 887 if keep_open: 888 # We're keeping the connection open so the 889 # protocol can write more, but we still can't 890 # receive more, so remove the reader callback. 891 self._loop._remove_reader(self._sock_fd) 892 else: 893 self.close() 894 895 def write(self, data): 896 if not isinstance(data, (bytes, bytearray, memoryview)): 897 raise TypeError(f'data argument must be a bytes-like object, ' 898 f'not {type(data).__name__!r}') 899 if self._eof: 900 raise RuntimeError('Cannot call write() after write_eof()') 901 if self._empty_waiter is not None: 902 raise RuntimeError('unable to write; sendfile is in progress') 903 if not data: 904 return 905 906 if self._conn_lost: 907 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 908 logger.warning('socket.send() raised exception.') 909 self._conn_lost += 1 910 return 911 912 if not self._buffer: 913 # Optimization: try to send now. 914 try: 915 n = self._sock.send(data) 916 except (BlockingIOError, InterruptedError): 917 pass 918 except (SystemExit, KeyboardInterrupt): 919 raise 920 except BaseException as exc: 921 self._fatal_error(exc, 'Fatal write error on socket transport') 922 return 923 else: 924 data = data[n:] 925 if not data: 926 return 927 # Not all was written; register write handler. 928 self._loop._add_writer(self._sock_fd, self._write_ready) 929 930 # Add it to the buffer. 931 self._buffer.extend(data) 932 self._maybe_pause_protocol() 933 934 def _write_ready(self): 935 assert self._buffer, 'Data should not be empty' 936 937 if self._conn_lost: 938 return 939 try: 940 n = self._sock.send(self._buffer) 941 except (BlockingIOError, InterruptedError): 942 pass 943 except (SystemExit, KeyboardInterrupt): 944 raise 945 except BaseException as exc: 946 self._loop._remove_writer(self._sock_fd) 947 self._buffer.clear() 948 self._fatal_error(exc, 'Fatal write error on socket transport') 949 if self._empty_waiter is not None: 950 self._empty_waiter.set_exception(exc) 951 else: 952 if n: 953 del self._buffer[:n] 954 self._maybe_resume_protocol() # May append to buffer. 955 if not self._buffer: 956 self._loop._remove_writer(self._sock_fd) 957 if self._empty_waiter is not None: 958 self._empty_waiter.set_result(None) 959 if self._closing: 960 self._call_connection_lost(None) 961 elif self._eof: 962 self._sock.shutdown(socket.SHUT_WR) 963 964 def write_eof(self): 965 if self._closing or self._eof: 966 return 967 self._eof = True 968 if not self._buffer: 969 self._sock.shutdown(socket.SHUT_WR) 970 971 def can_write_eof(self): 972 return True 973 974 def _call_connection_lost(self, exc): 975 super()._call_connection_lost(exc) 976 if self._empty_waiter is not None: 977 self._empty_waiter.set_exception( 978 ConnectionError("Connection is closed by peer")) 979 980 def _make_empty_waiter(self): 981 if self._empty_waiter is not None: 982 raise RuntimeError("Empty waiter is already set") 983 self._empty_waiter = self._loop.create_future() 984 if not self._buffer: 985 self._empty_waiter.set_result(None) 986 return self._empty_waiter 987 988 def _reset_empty_waiter(self): 989 self._empty_waiter = None 990 991 992class _SelectorDatagramTransport(_SelectorTransport): 993 994 _buffer_factory = collections.deque 995 996 def __init__(self, loop, sock, protocol, address=None, 997 waiter=None, extra=None): 998 super().__init__(loop, sock, protocol, extra) 999 self._address = address 1000 self._loop.call_soon(self._protocol.connection_made, self) 1001 # only start reading when connection_made() has been called 1002 self._loop.call_soon(self._add_reader, 1003 self._sock_fd, self._read_ready) 1004 if waiter is not None: 1005 # only wake up the waiter when connection_made() has been called 1006 self._loop.call_soon(futures._set_result_unless_cancelled, 1007 waiter, None) 1008 1009 def get_write_buffer_size(self): 1010 return sum(len(data) for data, _ in self._buffer) 1011 1012 def _read_ready(self): 1013 if self._conn_lost: 1014 return 1015 try: 1016 data, addr = self._sock.recvfrom(self.max_size) 1017 except (BlockingIOError, InterruptedError): 1018 pass 1019 except OSError as exc: 1020 self._protocol.error_received(exc) 1021 except (SystemExit, KeyboardInterrupt): 1022 raise 1023 except BaseException as exc: 1024 self._fatal_error(exc, 'Fatal read error on datagram transport') 1025 else: 1026 self._protocol.datagram_received(data, addr) 1027 1028 def sendto(self, data, addr=None): 1029 if not isinstance(data, (bytes, bytearray, memoryview)): 1030 raise TypeError(f'data argument must be a bytes-like object, ' 1031 f'not {type(data).__name__!r}') 1032 if not data: 1033 return 1034 1035 if self._address: 1036 if addr not in (None, self._address): 1037 raise ValueError( 1038 f'Invalid address: must be None or {self._address}') 1039 addr = self._address 1040 1041 if self._conn_lost and self._address: 1042 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1043 logger.warning('socket.send() raised exception.') 1044 self._conn_lost += 1 1045 return 1046 1047 if not self._buffer: 1048 # Attempt to send it right away first. 1049 try: 1050 if self._extra['peername']: 1051 self._sock.send(data) 1052 else: 1053 self._sock.sendto(data, addr) 1054 return 1055 except (BlockingIOError, InterruptedError): 1056 self._loop._add_writer(self._sock_fd, self._sendto_ready) 1057 except OSError as exc: 1058 self._protocol.error_received(exc) 1059 return 1060 except (SystemExit, KeyboardInterrupt): 1061 raise 1062 except BaseException as exc: 1063 self._fatal_error( 1064 exc, 'Fatal write error on datagram transport') 1065 return 1066 1067 # Ensure that what we buffer is immutable. 1068 self._buffer.append((bytes(data), addr)) 1069 self._maybe_pause_protocol() 1070 1071 def _sendto_ready(self): 1072 while self._buffer: 1073 data, addr = self._buffer.popleft() 1074 try: 1075 if self._extra['peername']: 1076 self._sock.send(data) 1077 else: 1078 self._sock.sendto(data, addr) 1079 except (BlockingIOError, InterruptedError): 1080 self._buffer.appendleft((data, addr)) # Try again later. 1081 break 1082 except OSError as exc: 1083 self._protocol.error_received(exc) 1084 return 1085 except (SystemExit, KeyboardInterrupt): 1086 raise 1087 except BaseException as exc: 1088 self._fatal_error( 1089 exc, 'Fatal write error on datagram transport') 1090 return 1091 1092 self._maybe_resume_protocol() # May append to buffer. 1093 if not self._buffer: 1094 self._loop._remove_writer(self._sock_fd) 1095 if self._closing: 1096 self._call_connection_lost(None) 1097