1"""Event loop using a selector and related classes. 2 3A selector is a "notify-when-ready" multiplexer. For a subclass which 4also includes support for signal handling, see the unix_events sub-module. 5""" 6 7__all__ = 'BaseSelectorEventLoop', 8 9import collections 10import errno 11import functools 12import selectors 13import socket 14import warnings 15import weakref 16try: 17 import ssl 18except ImportError: # pragma: no cover 19 ssl = None 20 21from . import base_events 22from . import constants 23from . import events 24from . import futures 25from . import protocols 26from . import sslproto 27from . import transports 28from . import trsock 29from .log import logger 30 31 32def _test_selector_event(selector, fd, event): 33 # Test if the selector is monitoring 'event' events 34 # for the file descriptor 'fd'. 35 try: 36 key = selector.get_key(fd) 37 except KeyError: 38 return False 39 else: 40 return bool(key.events & event) 41 42 43def _check_ssl_socket(sock): 44 if ssl is not None and isinstance(sock, ssl.SSLSocket): 45 raise TypeError("Socket cannot be of type SSLSocket") 46 47 48class BaseSelectorEventLoop(base_events.BaseEventLoop): 49 """Selector event loop. 50 51 See events.EventLoop for API specification. 52 """ 53 54 def __init__(self, selector=None): 55 super().__init__() 56 57 if selector is None: 58 selector = selectors.DefaultSelector() 59 logger.debug('Using selector: %s', selector.__class__.__name__) 60 self._selector = selector 61 self._make_self_pipe() 62 self._transports = weakref.WeakValueDictionary() 63 64 def _make_socket_transport(self, sock, protocol, waiter=None, *, 65 extra=None, server=None): 66 return _SelectorSocketTransport(self, sock, protocol, waiter, 67 extra, server) 68 69 def _make_ssl_transport( 70 self, rawsock, protocol, sslcontext, waiter=None, 71 *, server_side=False, server_hostname=None, 72 extra=None, server=None, 73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 74 ssl_protocol = sslproto.SSLProtocol( 75 self, protocol, sslcontext, waiter, 76 server_side, server_hostname, 77 ssl_handshake_timeout=ssl_handshake_timeout) 78 _SelectorSocketTransport(self, rawsock, ssl_protocol, 79 extra=extra, server=server) 80 return ssl_protocol._app_transport 81 82 def _make_datagram_transport(self, sock, protocol, 83 address=None, waiter=None, extra=None): 84 return _SelectorDatagramTransport(self, sock, protocol, 85 address, waiter, extra) 86 87 def close(self): 88 if self.is_running(): 89 raise RuntimeError("Cannot close a running event loop") 90 if self.is_closed(): 91 return 92 self._close_self_pipe() 93 super().close() 94 if self._selector is not None: 95 self._selector.close() 96 self._selector = None 97 98 def _close_self_pipe(self): 99 self._remove_reader(self._ssock.fileno()) 100 self._ssock.close() 101 self._ssock = None 102 self._csock.close() 103 self._csock = None 104 self._internal_fds -= 1 105 106 def _make_self_pipe(self): 107 # A self-socket, really. :-) 108 self._ssock, self._csock = socket.socketpair() 109 self._ssock.setblocking(False) 110 self._csock.setblocking(False) 111 self._internal_fds += 1 112 self._add_reader(self._ssock.fileno(), self._read_from_self) 113 114 def _process_self_data(self, data): 115 pass 116 117 def _read_from_self(self): 118 while True: 119 try: 120 data = self._ssock.recv(4096) 121 if not data: 122 break 123 self._process_self_data(data) 124 except InterruptedError: 125 continue 126 except BlockingIOError: 127 break 128 129 def _write_to_self(self): 130 # This may be called from a different thread, possibly after 131 # _close_self_pipe() has been called or even while it is 132 # running. Guard for self._csock being None or closed. When 133 # a socket is closed, send() raises OSError (with errno set to 134 # EBADF, but let's not rely on the exact error code). 135 csock = self._csock 136 if csock is not None: 137 try: 138 csock.send(b'\0') 139 except OSError: 140 if self._debug: 141 logger.debug("Fail to write a null byte into the " 142 "self-pipe socket", 143 exc_info=True) 144 145 def _start_serving(self, protocol_factory, sock, 146 sslcontext=None, server=None, backlog=100, 147 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 148 self._add_reader(sock.fileno(), self._accept_connection, 149 protocol_factory, sock, sslcontext, server, backlog, 150 ssl_handshake_timeout) 151 152 def _accept_connection( 153 self, protocol_factory, sock, 154 sslcontext=None, server=None, backlog=100, 155 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 156 # This method is only called once for each event loop tick where the 157 # listening socket has triggered an EVENT_READ. There may be multiple 158 # connections waiting for an .accept() so it is called in a loop. 159 # See https://bugs.python.org/issue27906 for more details. 160 for _ in range(backlog): 161 try: 162 conn, addr = sock.accept() 163 if self._debug: 164 logger.debug("%r got a new connection from %r: %r", 165 server, addr, conn) 166 conn.setblocking(False) 167 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 168 # Early exit because the socket accept buffer is empty. 169 return None 170 except OSError as exc: 171 # There's nowhere to send the error, so just log it. 172 if exc.errno in (errno.EMFILE, errno.ENFILE, 173 errno.ENOBUFS, errno.ENOMEM): 174 # Some platforms (e.g. Linux keep reporting the FD as 175 # ready, so we remove the read handler temporarily. 176 # We'll try again in a while. 177 self.call_exception_handler({ 178 'message': 'socket.accept() out of system resource', 179 'exception': exc, 180 'socket': trsock.TransportSocket(sock), 181 }) 182 self._remove_reader(sock.fileno()) 183 self.call_later(constants.ACCEPT_RETRY_DELAY, 184 self._start_serving, 185 protocol_factory, sock, sslcontext, server, 186 backlog, ssl_handshake_timeout) 187 else: 188 raise # The event loop will catch, log and ignore it. 189 else: 190 extra = {'peername': addr} 191 accept = self._accept_connection2( 192 protocol_factory, conn, extra, sslcontext, server, 193 ssl_handshake_timeout) 194 self.create_task(accept) 195 196 async def _accept_connection2( 197 self, protocol_factory, conn, extra, 198 sslcontext=None, server=None, 199 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 200 protocol = None 201 transport = None 202 try: 203 protocol = protocol_factory() 204 waiter = self.create_future() 205 if sslcontext: 206 transport = self._make_ssl_transport( 207 conn, protocol, sslcontext, waiter=waiter, 208 server_side=True, extra=extra, server=server, 209 ssl_handshake_timeout=ssl_handshake_timeout) 210 else: 211 transport = self._make_socket_transport( 212 conn, protocol, waiter=waiter, extra=extra, 213 server=server) 214 215 try: 216 await waiter 217 except BaseException: 218 transport.close() 219 raise 220 # It's now up to the protocol to handle the connection. 221 222 except (SystemExit, KeyboardInterrupt): 223 raise 224 except BaseException as exc: 225 if self._debug: 226 context = { 227 'message': 228 'Error on transport creation for incoming connection', 229 'exception': exc, 230 } 231 if protocol is not None: 232 context['protocol'] = protocol 233 if transport is not None: 234 context['transport'] = transport 235 self.call_exception_handler(context) 236 237 def _ensure_fd_no_transport(self, fd): 238 fileno = fd 239 if not isinstance(fileno, int): 240 try: 241 fileno = int(fileno.fileno()) 242 except (AttributeError, TypeError, ValueError): 243 # This code matches selectors._fileobj_to_fd function. 244 raise ValueError(f"Invalid file object: {fd!r}") from None 245 try: 246 transport = self._transports[fileno] 247 except KeyError: 248 pass 249 else: 250 if not transport.is_closing(): 251 raise RuntimeError( 252 f'File descriptor {fd!r} is used by transport ' 253 f'{transport!r}') 254 255 def _add_reader(self, fd, callback, *args): 256 self._check_closed() 257 handle = events.Handle(callback, args, self, None) 258 try: 259 key = self._selector.get_key(fd) 260 except KeyError: 261 self._selector.register(fd, selectors.EVENT_READ, 262 (handle, None)) 263 else: 264 mask, (reader, writer) = key.events, key.data 265 self._selector.modify(fd, mask | selectors.EVENT_READ, 266 (handle, writer)) 267 if reader is not None: 268 reader.cancel() 269 270 def _remove_reader(self, fd): 271 if self.is_closed(): 272 return False 273 try: 274 key = self._selector.get_key(fd) 275 except KeyError: 276 return False 277 else: 278 mask, (reader, writer) = key.events, key.data 279 mask &= ~selectors.EVENT_READ 280 if not mask: 281 self._selector.unregister(fd) 282 else: 283 self._selector.modify(fd, mask, (None, writer)) 284 285 if reader is not None: 286 reader.cancel() 287 return True 288 else: 289 return False 290 291 def _add_writer(self, fd, callback, *args): 292 self._check_closed() 293 handle = events.Handle(callback, args, self, None) 294 try: 295 key = self._selector.get_key(fd) 296 except KeyError: 297 self._selector.register(fd, selectors.EVENT_WRITE, 298 (None, handle)) 299 else: 300 mask, (reader, writer) = key.events, key.data 301 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 302 (reader, handle)) 303 if writer is not None: 304 writer.cancel() 305 306 def _remove_writer(self, fd): 307 """Remove a writer callback.""" 308 if self.is_closed(): 309 return False 310 try: 311 key = self._selector.get_key(fd) 312 except KeyError: 313 return False 314 else: 315 mask, (reader, writer) = key.events, key.data 316 # Remove both writer and connector. 317 mask &= ~selectors.EVENT_WRITE 318 if not mask: 319 self._selector.unregister(fd) 320 else: 321 self._selector.modify(fd, mask, (reader, None)) 322 323 if writer is not None: 324 writer.cancel() 325 return True 326 else: 327 return False 328 329 def add_reader(self, fd, callback, *args): 330 """Add a reader callback.""" 331 self._ensure_fd_no_transport(fd) 332 return self._add_reader(fd, callback, *args) 333 334 def remove_reader(self, fd): 335 """Remove a reader callback.""" 336 self._ensure_fd_no_transport(fd) 337 return self._remove_reader(fd) 338 339 def add_writer(self, fd, callback, *args): 340 """Add a writer callback..""" 341 self._ensure_fd_no_transport(fd) 342 return self._add_writer(fd, callback, *args) 343 344 def remove_writer(self, fd): 345 """Remove a writer callback.""" 346 self._ensure_fd_no_transport(fd) 347 return self._remove_writer(fd) 348 349 async def sock_recv(self, sock, n): 350 """Receive data from the socket. 351 352 The return value is a bytes object representing the data received. 353 The maximum amount of data to be received at once is specified by 354 nbytes. 355 """ 356 _check_ssl_socket(sock) 357 if self._debug and sock.gettimeout() != 0: 358 raise ValueError("the socket must be non-blocking") 359 try: 360 return sock.recv(n) 361 except (BlockingIOError, InterruptedError): 362 pass 363 fut = self.create_future() 364 fd = sock.fileno() 365 self.add_reader(fd, self._sock_recv, fut, sock, n) 366 fut.add_done_callback( 367 functools.partial(self._sock_read_done, fd)) 368 return await fut 369 370 def _sock_read_done(self, fd, fut): 371 self.remove_reader(fd) 372 373 def _sock_recv(self, fut, sock, n): 374 # _sock_recv() can add itself as an I/O callback if the operation can't 375 # be done immediately. Don't use it directly, call sock_recv(). 376 if fut.done(): 377 return 378 try: 379 data = sock.recv(n) 380 except (BlockingIOError, InterruptedError): 381 return # try again next time 382 except (SystemExit, KeyboardInterrupt): 383 raise 384 except BaseException as exc: 385 fut.set_exception(exc) 386 else: 387 fut.set_result(data) 388 389 async def sock_recv_into(self, sock, buf): 390 """Receive data from the socket. 391 392 The received data is written into *buf* (a writable buffer). 393 The return value is the number of bytes written. 394 """ 395 _check_ssl_socket(sock) 396 if self._debug and sock.gettimeout() != 0: 397 raise ValueError("the socket must be non-blocking") 398 try: 399 return sock.recv_into(buf) 400 except (BlockingIOError, InterruptedError): 401 pass 402 fut = self.create_future() 403 fd = sock.fileno() 404 self.add_reader(fd, self._sock_recv_into, fut, sock, buf) 405 fut.add_done_callback( 406 functools.partial(self._sock_read_done, fd)) 407 return await fut 408 409 def _sock_recv_into(self, fut, sock, buf): 410 # _sock_recv_into() can add itself as an I/O callback if the operation 411 # can't be done immediately. Don't use it directly, call 412 # sock_recv_into(). 413 if fut.done(): 414 return 415 try: 416 nbytes = sock.recv_into(buf) 417 except (BlockingIOError, InterruptedError): 418 return # try again next time 419 except (SystemExit, KeyboardInterrupt): 420 raise 421 except BaseException as exc: 422 fut.set_exception(exc) 423 else: 424 fut.set_result(nbytes) 425 426 async def sock_sendall(self, sock, data): 427 """Send data to the socket. 428 429 The socket must be connected to a remote socket. This method continues 430 to send data from data until either all data has been sent or an 431 error occurs. None is returned on success. On error, an exception is 432 raised, and there is no way to determine how much data, if any, was 433 successfully processed by the receiving end of the connection. 434 """ 435 _check_ssl_socket(sock) 436 if self._debug and sock.gettimeout() != 0: 437 raise ValueError("the socket must be non-blocking") 438 try: 439 n = sock.send(data) 440 except (BlockingIOError, InterruptedError): 441 n = 0 442 443 if n == len(data): 444 # all data sent 445 return 446 447 fut = self.create_future() 448 fd = sock.fileno() 449 fut.add_done_callback( 450 functools.partial(self._sock_write_done, fd)) 451 # use a trick with a list in closure to store a mutable state 452 self.add_writer(fd, self._sock_sendall, fut, sock, 453 memoryview(data), [n]) 454 return await fut 455 456 def _sock_sendall(self, fut, sock, view, pos): 457 if fut.done(): 458 # Future cancellation can be scheduled on previous loop iteration 459 return 460 start = pos[0] 461 try: 462 n = sock.send(view[start:]) 463 except (BlockingIOError, InterruptedError): 464 return 465 except (SystemExit, KeyboardInterrupt): 466 raise 467 except BaseException as exc: 468 fut.set_exception(exc) 469 return 470 471 start += n 472 473 if start == len(view): 474 fut.set_result(None) 475 else: 476 pos[0] = start 477 478 async def sock_connect(self, sock, address): 479 """Connect to a remote socket at address. 480 481 This method is a coroutine. 482 """ 483 _check_ssl_socket(sock) 484 if self._debug and sock.gettimeout() != 0: 485 raise ValueError("the socket must be non-blocking") 486 487 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 488 resolved = await self._ensure_resolved( 489 address, family=sock.family, proto=sock.proto, loop=self) 490 _, _, _, _, address = resolved[0] 491 492 fut = self.create_future() 493 self._sock_connect(fut, sock, address) 494 return await fut 495 496 def _sock_connect(self, fut, sock, address): 497 fd = sock.fileno() 498 try: 499 sock.connect(address) 500 except (BlockingIOError, InterruptedError): 501 # Issue #23618: When the C function connect() fails with EINTR, the 502 # connection runs in background. We have to wait until the socket 503 # becomes writable to be notified when the connection succeed or 504 # fails. 505 fut.add_done_callback( 506 functools.partial(self._sock_write_done, fd)) 507 self.add_writer(fd, self._sock_connect_cb, fut, sock, address) 508 except (SystemExit, KeyboardInterrupt): 509 raise 510 except BaseException as exc: 511 fut.set_exception(exc) 512 else: 513 fut.set_result(None) 514 515 def _sock_write_done(self, fd, fut): 516 self.remove_writer(fd) 517 518 def _sock_connect_cb(self, fut, sock, address): 519 if fut.done(): 520 return 521 522 try: 523 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 524 if err != 0: 525 # Jump to any except clause below. 526 raise OSError(err, f'Connect call failed {address}') 527 except (BlockingIOError, InterruptedError): 528 # socket is still registered, the callback will be retried later 529 pass 530 except (SystemExit, KeyboardInterrupt): 531 raise 532 except BaseException as exc: 533 fut.set_exception(exc) 534 else: 535 fut.set_result(None) 536 537 async def sock_accept(self, sock): 538 """Accept a connection. 539 540 The socket must be bound to an address and listening for connections. 541 The return value is a pair (conn, address) where conn is a new socket 542 object usable to send and receive data on the connection, and address 543 is the address bound to the socket on the other end of the connection. 544 """ 545 _check_ssl_socket(sock) 546 if self._debug and sock.gettimeout() != 0: 547 raise ValueError("the socket must be non-blocking") 548 fut = self.create_future() 549 self._sock_accept(fut, False, sock) 550 return await fut 551 552 def _sock_accept(self, fut, registered, sock): 553 fd = sock.fileno() 554 if registered: 555 self.remove_reader(fd) 556 if fut.done(): 557 return 558 try: 559 conn, address = sock.accept() 560 conn.setblocking(False) 561 except (BlockingIOError, InterruptedError): 562 self.add_reader(fd, self._sock_accept, fut, True, sock) 563 except (SystemExit, KeyboardInterrupt): 564 raise 565 except BaseException as exc: 566 fut.set_exception(exc) 567 else: 568 fut.set_result((conn, address)) 569 570 async def _sendfile_native(self, transp, file, offset, count): 571 del self._transports[transp._sock_fd] 572 resume_reading = transp.is_reading() 573 transp.pause_reading() 574 await transp._make_empty_waiter() 575 try: 576 return await self.sock_sendfile(transp._sock, file, offset, count, 577 fallback=False) 578 finally: 579 transp._reset_empty_waiter() 580 if resume_reading: 581 transp.resume_reading() 582 self._transports[transp._sock_fd] = transp 583 584 def _process_events(self, event_list): 585 for key, mask in event_list: 586 fileobj, (reader, writer) = key.fileobj, key.data 587 if mask & selectors.EVENT_READ and reader is not None: 588 if reader._cancelled: 589 self._remove_reader(fileobj) 590 else: 591 self._add_callback(reader) 592 if mask & selectors.EVENT_WRITE and writer is not None: 593 if writer._cancelled: 594 self._remove_writer(fileobj) 595 else: 596 self._add_callback(writer) 597 598 def _stop_serving(self, sock): 599 self._remove_reader(sock.fileno()) 600 sock.close() 601 602 603class _SelectorTransport(transports._FlowControlMixin, 604 transports.Transport): 605 606 max_size = 256 * 1024 # Buffer size passed to recv(). 607 608 _buffer_factory = bytearray # Constructs initial value for self._buffer. 609 610 # Attribute used in the destructor: it must be set even if the constructor 611 # is not called (see _SelectorSslTransport which may start by raising an 612 # exception) 613 _sock = None 614 615 def __init__(self, loop, sock, protocol, extra=None, server=None): 616 super().__init__(extra, loop) 617 self._extra['socket'] = trsock.TransportSocket(sock) 618 try: 619 self._extra['sockname'] = sock.getsockname() 620 except OSError: 621 self._extra['sockname'] = None 622 if 'peername' not in self._extra: 623 try: 624 self._extra['peername'] = sock.getpeername() 625 except socket.error: 626 self._extra['peername'] = None 627 self._sock = sock 628 self._sock_fd = sock.fileno() 629 630 self._protocol_connected = False 631 self.set_protocol(protocol) 632 633 self._server = server 634 self._buffer = self._buffer_factory() 635 self._conn_lost = 0 # Set when call to connection_lost scheduled. 636 self._closing = False # Set when close() called. 637 if self._server is not None: 638 self._server._attach() 639 loop._transports[self._sock_fd] = self 640 641 def __repr__(self): 642 info = [self.__class__.__name__] 643 if self._sock is None: 644 info.append('closed') 645 elif self._closing: 646 info.append('closing') 647 info.append(f'fd={self._sock_fd}') 648 # test if the transport was closed 649 if self._loop is not None and not self._loop.is_closed(): 650 polling = _test_selector_event(self._loop._selector, 651 self._sock_fd, selectors.EVENT_READ) 652 if polling: 653 info.append('read=polling') 654 else: 655 info.append('read=idle') 656 657 polling = _test_selector_event(self._loop._selector, 658 self._sock_fd, 659 selectors.EVENT_WRITE) 660 if polling: 661 state = 'polling' 662 else: 663 state = 'idle' 664 665 bufsize = self.get_write_buffer_size() 666 info.append(f'write=<{state}, bufsize={bufsize}>') 667 return '<{}>'.format(' '.join(info)) 668 669 def abort(self): 670 self._force_close(None) 671 672 def set_protocol(self, protocol): 673 self._protocol = protocol 674 self._protocol_connected = True 675 676 def get_protocol(self): 677 return self._protocol 678 679 def is_closing(self): 680 return self._closing 681 682 def close(self): 683 if self._closing: 684 return 685 self._closing = True 686 self._loop._remove_reader(self._sock_fd) 687 if not self._buffer: 688 self._conn_lost += 1 689 self._loop._remove_writer(self._sock_fd) 690 self._loop.call_soon(self._call_connection_lost, None) 691 692 def __del__(self, _warn=warnings.warn): 693 if self._sock is not None: 694 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 695 self._sock.close() 696 697 def _fatal_error(self, exc, message='Fatal error on transport'): 698 # Should be called from exception handler only. 699 if isinstance(exc, OSError): 700 if self._loop.get_debug(): 701 logger.debug("%r: %s", self, message, exc_info=True) 702 else: 703 self._loop.call_exception_handler({ 704 'message': message, 705 'exception': exc, 706 'transport': self, 707 'protocol': self._protocol, 708 }) 709 self._force_close(exc) 710 711 def _force_close(self, exc): 712 if self._conn_lost: 713 return 714 if self._buffer: 715 self._buffer.clear() 716 self._loop._remove_writer(self._sock_fd) 717 if not self._closing: 718 self._closing = True 719 self._loop._remove_reader(self._sock_fd) 720 self._conn_lost += 1 721 self._loop.call_soon(self._call_connection_lost, exc) 722 723 def _call_connection_lost(self, exc): 724 try: 725 if self._protocol_connected: 726 self._protocol.connection_lost(exc) 727 finally: 728 self._sock.close() 729 self._sock = None 730 self._protocol = None 731 self._loop = None 732 server = self._server 733 if server is not None: 734 server._detach() 735 self._server = None 736 737 def get_write_buffer_size(self): 738 return len(self._buffer) 739 740 def _add_reader(self, fd, callback, *args): 741 if self._closing: 742 return 743 744 self._loop._add_reader(fd, callback, *args) 745 746 747class _SelectorSocketTransport(_SelectorTransport): 748 749 _start_tls_compatible = True 750 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 751 752 def __init__(self, loop, sock, protocol, waiter=None, 753 extra=None, server=None): 754 755 self._read_ready_cb = None 756 super().__init__(loop, sock, protocol, extra, server) 757 self._eof = False 758 self._paused = False 759 self._empty_waiter = None 760 761 # Disable the Nagle algorithm -- small writes will be 762 # sent without waiting for the TCP ACK. This generally 763 # decreases the latency (in some cases significantly.) 764 base_events._set_nodelay(self._sock) 765 766 self._loop.call_soon(self._protocol.connection_made, self) 767 # only start reading when connection_made() has been called 768 self._loop.call_soon(self._add_reader, 769 self._sock_fd, self._read_ready) 770 if waiter is not None: 771 # only wake up the waiter when connection_made() has been called 772 self._loop.call_soon(futures._set_result_unless_cancelled, 773 waiter, None) 774 775 def set_protocol(self, protocol): 776 if isinstance(protocol, protocols.BufferedProtocol): 777 self._read_ready_cb = self._read_ready__get_buffer 778 else: 779 self._read_ready_cb = self._read_ready__data_received 780 781 super().set_protocol(protocol) 782 783 def is_reading(self): 784 return not self._paused and not self._closing 785 786 def pause_reading(self): 787 if self._closing or self._paused: 788 return 789 self._paused = True 790 self._loop._remove_reader(self._sock_fd) 791 if self._loop.get_debug(): 792 logger.debug("%r pauses reading", self) 793 794 def resume_reading(self): 795 if self._closing or not self._paused: 796 return 797 self._paused = False 798 self._add_reader(self._sock_fd, self._read_ready) 799 if self._loop.get_debug(): 800 logger.debug("%r resumes reading", self) 801 802 def _read_ready(self): 803 self._read_ready_cb() 804 805 def _read_ready__get_buffer(self): 806 if self._conn_lost: 807 return 808 809 try: 810 buf = self._protocol.get_buffer(-1) 811 if not len(buf): 812 raise RuntimeError('get_buffer() returned an empty buffer') 813 except (SystemExit, KeyboardInterrupt): 814 raise 815 except BaseException as exc: 816 self._fatal_error( 817 exc, 'Fatal error: protocol.get_buffer() call failed.') 818 return 819 820 try: 821 nbytes = self._sock.recv_into(buf) 822 except (BlockingIOError, InterruptedError): 823 return 824 except (SystemExit, KeyboardInterrupt): 825 raise 826 except BaseException as exc: 827 self._fatal_error(exc, 'Fatal read error on socket transport') 828 return 829 830 if not nbytes: 831 self._read_ready__on_eof() 832 return 833 834 try: 835 self._protocol.buffer_updated(nbytes) 836 except (SystemExit, KeyboardInterrupt): 837 raise 838 except BaseException as exc: 839 self._fatal_error( 840 exc, 'Fatal error: protocol.buffer_updated() call failed.') 841 842 def _read_ready__data_received(self): 843 if self._conn_lost: 844 return 845 try: 846 data = self._sock.recv(self.max_size) 847 except (BlockingIOError, InterruptedError): 848 return 849 except (SystemExit, KeyboardInterrupt): 850 raise 851 except BaseException as exc: 852 self._fatal_error(exc, 'Fatal read error on socket transport') 853 return 854 855 if not data: 856 self._read_ready__on_eof() 857 return 858 859 try: 860 self._protocol.data_received(data) 861 except (SystemExit, KeyboardInterrupt): 862 raise 863 except BaseException as exc: 864 self._fatal_error( 865 exc, 'Fatal error: protocol.data_received() call failed.') 866 867 def _read_ready__on_eof(self): 868 if self._loop.get_debug(): 869 logger.debug("%r received EOF", self) 870 871 try: 872 keep_open = self._protocol.eof_received() 873 except (SystemExit, KeyboardInterrupt): 874 raise 875 except BaseException as exc: 876 self._fatal_error( 877 exc, 'Fatal error: protocol.eof_received() call failed.') 878 return 879 880 if keep_open: 881 # We're keeping the connection open so the 882 # protocol can write more, but we still can't 883 # receive more, so remove the reader callback. 884 self._loop._remove_reader(self._sock_fd) 885 else: 886 self.close() 887 888 def write(self, data): 889 if not isinstance(data, (bytes, bytearray, memoryview)): 890 raise TypeError(f'data argument must be a bytes-like object, ' 891 f'not {type(data).__name__!r}') 892 if self._eof: 893 raise RuntimeError('Cannot call write() after write_eof()') 894 if self._empty_waiter is not None: 895 raise RuntimeError('unable to write; sendfile is in progress') 896 if not data: 897 return 898 899 if self._conn_lost: 900 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 901 logger.warning('socket.send() raised exception.') 902 self._conn_lost += 1 903 return 904 905 if not self._buffer: 906 # Optimization: try to send now. 907 try: 908 n = self._sock.send(data) 909 except (BlockingIOError, InterruptedError): 910 pass 911 except (SystemExit, KeyboardInterrupt): 912 raise 913 except BaseException as exc: 914 self._fatal_error(exc, 'Fatal write error on socket transport') 915 return 916 else: 917 data = data[n:] 918 if not data: 919 return 920 # Not all was written; register write handler. 921 self._loop._add_writer(self._sock_fd, self._write_ready) 922 923 # Add it to the buffer. 924 self._buffer.extend(data) 925 self._maybe_pause_protocol() 926 927 def _write_ready(self): 928 assert self._buffer, 'Data should not be empty' 929 930 if self._conn_lost: 931 return 932 try: 933 n = self._sock.send(self._buffer) 934 except (BlockingIOError, InterruptedError): 935 pass 936 except (SystemExit, KeyboardInterrupt): 937 raise 938 except BaseException as exc: 939 self._loop._remove_writer(self._sock_fd) 940 self._buffer.clear() 941 self._fatal_error(exc, 'Fatal write error on socket transport') 942 if self._empty_waiter is not None: 943 self._empty_waiter.set_exception(exc) 944 else: 945 if n: 946 del self._buffer[:n] 947 self._maybe_resume_protocol() # May append to buffer. 948 if not self._buffer: 949 self._loop._remove_writer(self._sock_fd) 950 if self._empty_waiter is not None: 951 self._empty_waiter.set_result(None) 952 if self._closing: 953 self._call_connection_lost(None) 954 elif self._eof: 955 self._sock.shutdown(socket.SHUT_WR) 956 957 def write_eof(self): 958 if self._closing or self._eof: 959 return 960 self._eof = True 961 if not self._buffer: 962 self._sock.shutdown(socket.SHUT_WR) 963 964 def can_write_eof(self): 965 return True 966 967 def _call_connection_lost(self, exc): 968 super()._call_connection_lost(exc) 969 if self._empty_waiter is not None: 970 self._empty_waiter.set_exception( 971 ConnectionError("Connection is closed by peer")) 972 973 def _make_empty_waiter(self): 974 if self._empty_waiter is not None: 975 raise RuntimeError("Empty waiter is already set") 976 self._empty_waiter = self._loop.create_future() 977 if not self._buffer: 978 self._empty_waiter.set_result(None) 979 return self._empty_waiter 980 981 def _reset_empty_waiter(self): 982 self._empty_waiter = None 983 984 985class _SelectorDatagramTransport(_SelectorTransport): 986 987 _buffer_factory = collections.deque 988 989 def __init__(self, loop, sock, protocol, address=None, 990 waiter=None, extra=None): 991 super().__init__(loop, sock, protocol, extra) 992 self._address = address 993 self._loop.call_soon(self._protocol.connection_made, self) 994 # only start reading when connection_made() has been called 995 self._loop.call_soon(self._add_reader, 996 self._sock_fd, self._read_ready) 997 if waiter is not None: 998 # only wake up the waiter when connection_made() has been called 999 self._loop.call_soon(futures._set_result_unless_cancelled, 1000 waiter, None) 1001 1002 def get_write_buffer_size(self): 1003 return sum(len(data) for data, _ in self._buffer) 1004 1005 def _read_ready(self): 1006 if self._conn_lost: 1007 return 1008 try: 1009 data, addr = self._sock.recvfrom(self.max_size) 1010 except (BlockingIOError, InterruptedError): 1011 pass 1012 except OSError as exc: 1013 self._protocol.error_received(exc) 1014 except (SystemExit, KeyboardInterrupt): 1015 raise 1016 except BaseException as exc: 1017 self._fatal_error(exc, 'Fatal read error on datagram transport') 1018 else: 1019 self._protocol.datagram_received(data, addr) 1020 1021 def sendto(self, data, addr=None): 1022 if not isinstance(data, (bytes, bytearray, memoryview)): 1023 raise TypeError(f'data argument must be a bytes-like object, ' 1024 f'not {type(data).__name__!r}') 1025 if not data: 1026 return 1027 1028 if self._address: 1029 if addr not in (None, self._address): 1030 raise ValueError( 1031 f'Invalid address: must be None or {self._address}') 1032 addr = self._address 1033 1034 if self._conn_lost and self._address: 1035 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1036 logger.warning('socket.send() raised exception.') 1037 self._conn_lost += 1 1038 return 1039 1040 if not self._buffer: 1041 # Attempt to send it right away first. 1042 try: 1043 if self._extra['peername']: 1044 self._sock.send(data) 1045 else: 1046 self._sock.sendto(data, addr) 1047 return 1048 except (BlockingIOError, InterruptedError): 1049 self._loop._add_writer(self._sock_fd, self._sendto_ready) 1050 except OSError as exc: 1051 self._protocol.error_received(exc) 1052 return 1053 except (SystemExit, KeyboardInterrupt): 1054 raise 1055 except BaseException as exc: 1056 self._fatal_error( 1057 exc, 'Fatal write error on datagram transport') 1058 return 1059 1060 # Ensure that what we buffer is immutable. 1061 self._buffer.append((bytes(data), addr)) 1062 self._maybe_pause_protocol() 1063 1064 def _sendto_ready(self): 1065 while self._buffer: 1066 data, addr = self._buffer.popleft() 1067 try: 1068 if self._extra['peername']: 1069 self._sock.send(data) 1070 else: 1071 self._sock.sendto(data, addr) 1072 except (BlockingIOError, InterruptedError): 1073 self._buffer.appendleft((data, addr)) # Try again later. 1074 break 1075 except OSError as exc: 1076 self._protocol.error_received(exc) 1077 return 1078 except (SystemExit, KeyboardInterrupt): 1079 raise 1080 except BaseException as exc: 1081 self._fatal_error( 1082 exc, 'Fatal write error on datagram transport') 1083 return 1084 1085 self._maybe_resume_protocol() # May append to buffer. 1086 if not self._buffer: 1087 self._loop._remove_writer(self._sock_fd) 1088 if self._closing: 1089 self._call_connection_lost(None) 1090