1"""Event loop using a selector and related classes. 2 3A selector is a "notify-when-ready" multiplexer. For a subclass which 4also includes support for signal handling, see the unix_events sub-module. 5""" 6 7__all__ = 'BaseSelectorEventLoop', 8 9import collections 10import errno 11import functools 12import selectors 13import socket 14import warnings 15import weakref 16try: 17 import ssl 18except ImportError: # pragma: no cover 19 ssl = None 20 21from . import base_events 22from . import constants 23from . import events 24from . import futures 25from . import protocols 26from . import sslproto 27from . import transports 28from . import trsock 29from .log import logger 30 31 32def _test_selector_event(selector, fd, event): 33 # Test if the selector is monitoring 'event' events 34 # for the file descriptor 'fd'. 35 try: 36 key = selector.get_key(fd) 37 except KeyError: 38 return False 39 else: 40 return bool(key.events & event) 41 42 43def _check_ssl_socket(sock): 44 if ssl is not None and isinstance(sock, ssl.SSLSocket): 45 raise TypeError("Socket cannot be of type SSLSocket") 46 47 48class BaseSelectorEventLoop(base_events.BaseEventLoop): 49 """Selector event loop. 50 51 See events.EventLoop for API specification. 52 """ 53 54 def __init__(self, selector=None): 55 super().__init__() 56 57 if selector is None: 58 selector = selectors.DefaultSelector() 59 logger.debug('Using selector: %s', selector.__class__.__name__) 60 self._selector = selector 61 self._make_self_pipe() 62 self._transports = weakref.WeakValueDictionary() 63 64 def _make_socket_transport(self, sock, protocol, waiter=None, *, 65 extra=None, server=None): 66 return _SelectorSocketTransport(self, sock, protocol, waiter, 67 extra, server) 68 69 def _make_ssl_transport( 70 self, rawsock, protocol, sslcontext, waiter=None, 71 *, server_side=False, server_hostname=None, 72 extra=None, server=None, 73 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 74 ssl_protocol = sslproto.SSLProtocol( 75 self, protocol, sslcontext, waiter, 76 server_side, server_hostname, 77 ssl_handshake_timeout=ssl_handshake_timeout) 78 _SelectorSocketTransport(self, rawsock, ssl_protocol, 79 extra=extra, server=server) 80 return ssl_protocol._app_transport 81 82 def _make_datagram_transport(self, sock, protocol, 83 address=None, waiter=None, extra=None): 84 return _SelectorDatagramTransport(self, sock, protocol, 85 address, waiter, extra) 86 87 def close(self): 88 if self.is_running(): 89 raise RuntimeError("Cannot close a running event loop") 90 if self.is_closed(): 91 return 92 self._close_self_pipe() 93 super().close() 94 if self._selector is not None: 95 self._selector.close() 96 self._selector = None 97 98 def _close_self_pipe(self): 99 self._remove_reader(self._ssock.fileno()) 100 self._ssock.close() 101 self._ssock = None 102 self._csock.close() 103 self._csock = None 104 self._internal_fds -= 1 105 106 def _make_self_pipe(self): 107 # A self-socket, really. :-) 108 self._ssock, self._csock = socket.socketpair() 109 self._ssock.setblocking(False) 110 self._csock.setblocking(False) 111 self._internal_fds += 1 112 self._add_reader(self._ssock.fileno(), self._read_from_self) 113 114 def _process_self_data(self, data): 115 pass 116 117 def _read_from_self(self): 118 while True: 119 try: 120 data = self._ssock.recv(4096) 121 if not data: 122 break 123 self._process_self_data(data) 124 except InterruptedError: 125 continue 126 except BlockingIOError: 127 break 128 129 def _write_to_self(self): 130 # This may be called from a different thread, possibly after 131 # _close_self_pipe() has been called or even while it is 132 # running. Guard for self._csock being None or closed. When 133 # a socket is closed, send() raises OSError (with errno set to 134 # EBADF, but let's not rely on the exact error code). 135 csock = self._csock 136 if csock is None: 137 return 138 139 try: 140 csock.send(b'\0') 141 except OSError: 142 if self._debug: 143 logger.debug("Fail to write a null byte into the " 144 "self-pipe socket", 145 exc_info=True) 146 147 def _start_serving(self, protocol_factory, sock, 148 sslcontext=None, server=None, backlog=100, 149 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 150 self._add_reader(sock.fileno(), self._accept_connection, 151 protocol_factory, sock, sslcontext, server, backlog, 152 ssl_handshake_timeout) 153 154 def _accept_connection( 155 self, protocol_factory, sock, 156 sslcontext=None, server=None, backlog=100, 157 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 158 # This method is only called once for each event loop tick where the 159 # listening socket has triggered an EVENT_READ. There may be multiple 160 # connections waiting for an .accept() so it is called in a loop. 161 # See https://bugs.python.org/issue27906 for more details. 162 for _ in range(backlog): 163 try: 164 conn, addr = sock.accept() 165 if self._debug: 166 logger.debug("%r got a new connection from %r: %r", 167 server, addr, conn) 168 conn.setblocking(False) 169 except (BlockingIOError, InterruptedError, ConnectionAbortedError): 170 # Early exit because the socket accept buffer is empty. 171 return None 172 except OSError as exc: 173 # There's nowhere to send the error, so just log it. 174 if exc.errno in (errno.EMFILE, errno.ENFILE, 175 errno.ENOBUFS, errno.ENOMEM): 176 # Some platforms (e.g. Linux keep reporting the FD as 177 # ready, so we remove the read handler temporarily. 178 # We'll try again in a while. 179 self.call_exception_handler({ 180 'message': 'socket.accept() out of system resource', 181 'exception': exc, 182 'socket': trsock.TransportSocket(sock), 183 }) 184 self._remove_reader(sock.fileno()) 185 self.call_later(constants.ACCEPT_RETRY_DELAY, 186 self._start_serving, 187 protocol_factory, sock, sslcontext, server, 188 backlog, ssl_handshake_timeout) 189 else: 190 raise # The event loop will catch, log and ignore it. 191 else: 192 extra = {'peername': addr} 193 accept = self._accept_connection2( 194 protocol_factory, conn, extra, sslcontext, server, 195 ssl_handshake_timeout) 196 self.create_task(accept) 197 198 async def _accept_connection2( 199 self, protocol_factory, conn, extra, 200 sslcontext=None, server=None, 201 ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT): 202 protocol = None 203 transport = None 204 try: 205 protocol = protocol_factory() 206 waiter = self.create_future() 207 if sslcontext: 208 transport = self._make_ssl_transport( 209 conn, protocol, sslcontext, waiter=waiter, 210 server_side=True, extra=extra, server=server, 211 ssl_handshake_timeout=ssl_handshake_timeout) 212 else: 213 transport = self._make_socket_transport( 214 conn, protocol, waiter=waiter, extra=extra, 215 server=server) 216 217 try: 218 await waiter 219 except BaseException: 220 transport.close() 221 raise 222 # It's now up to the protocol to handle the connection. 223 224 except (SystemExit, KeyboardInterrupt): 225 raise 226 except BaseException as exc: 227 if self._debug: 228 context = { 229 'message': 230 'Error on transport creation for incoming connection', 231 'exception': exc, 232 } 233 if protocol is not None: 234 context['protocol'] = protocol 235 if transport is not None: 236 context['transport'] = transport 237 self.call_exception_handler(context) 238 239 def _ensure_fd_no_transport(self, fd): 240 fileno = fd 241 if not isinstance(fileno, int): 242 try: 243 fileno = int(fileno.fileno()) 244 except (AttributeError, TypeError, ValueError): 245 # This code matches selectors._fileobj_to_fd function. 246 raise ValueError(f"Invalid file object: {fd!r}") from None 247 try: 248 transport = self._transports[fileno] 249 except KeyError: 250 pass 251 else: 252 if not transport.is_closing(): 253 raise RuntimeError( 254 f'File descriptor {fd!r} is used by transport ' 255 f'{transport!r}') 256 257 def _add_reader(self, fd, callback, *args): 258 self._check_closed() 259 handle = events.Handle(callback, args, self, None) 260 try: 261 key = self._selector.get_key(fd) 262 except KeyError: 263 self._selector.register(fd, selectors.EVENT_READ, 264 (handle, None)) 265 else: 266 mask, (reader, writer) = key.events, key.data 267 self._selector.modify(fd, mask | selectors.EVENT_READ, 268 (handle, writer)) 269 if reader is not None: 270 reader.cancel() 271 return handle 272 273 def _remove_reader(self, fd): 274 if self.is_closed(): 275 return False 276 try: 277 key = self._selector.get_key(fd) 278 except KeyError: 279 return False 280 else: 281 mask, (reader, writer) = key.events, key.data 282 mask &= ~selectors.EVENT_READ 283 if not mask: 284 self._selector.unregister(fd) 285 else: 286 self._selector.modify(fd, mask, (None, writer)) 287 288 if reader is not None: 289 reader.cancel() 290 return True 291 else: 292 return False 293 294 def _add_writer(self, fd, callback, *args): 295 self._check_closed() 296 handle = events.Handle(callback, args, self, None) 297 try: 298 key = self._selector.get_key(fd) 299 except KeyError: 300 self._selector.register(fd, selectors.EVENT_WRITE, 301 (None, handle)) 302 else: 303 mask, (reader, writer) = key.events, key.data 304 self._selector.modify(fd, mask | selectors.EVENT_WRITE, 305 (reader, handle)) 306 if writer is not None: 307 writer.cancel() 308 return handle 309 310 def _remove_writer(self, fd): 311 """Remove a writer callback.""" 312 if self.is_closed(): 313 return False 314 try: 315 key = self._selector.get_key(fd) 316 except KeyError: 317 return False 318 else: 319 mask, (reader, writer) = key.events, key.data 320 # Remove both writer and connector. 321 mask &= ~selectors.EVENT_WRITE 322 if not mask: 323 self._selector.unregister(fd) 324 else: 325 self._selector.modify(fd, mask, (reader, None)) 326 327 if writer is not None: 328 writer.cancel() 329 return True 330 else: 331 return False 332 333 def add_reader(self, fd, callback, *args): 334 """Add a reader callback.""" 335 self._ensure_fd_no_transport(fd) 336 self._add_reader(fd, callback, *args) 337 338 def remove_reader(self, fd): 339 """Remove a reader callback.""" 340 self._ensure_fd_no_transport(fd) 341 return self._remove_reader(fd) 342 343 def add_writer(self, fd, callback, *args): 344 """Add a writer callback..""" 345 self._ensure_fd_no_transport(fd) 346 self._add_writer(fd, callback, *args) 347 348 def remove_writer(self, fd): 349 """Remove a writer callback.""" 350 self._ensure_fd_no_transport(fd) 351 return self._remove_writer(fd) 352 353 async def sock_recv(self, sock, n): 354 """Receive data from the socket. 355 356 The return value is a bytes object representing the data received. 357 The maximum amount of data to be received at once is specified by 358 nbytes. 359 """ 360 _check_ssl_socket(sock) 361 if self._debug and sock.gettimeout() != 0: 362 raise ValueError("the socket must be non-blocking") 363 try: 364 return sock.recv(n) 365 except (BlockingIOError, InterruptedError): 366 pass 367 fut = self.create_future() 368 fd = sock.fileno() 369 self._ensure_fd_no_transport(fd) 370 handle = self._add_reader(fd, self._sock_recv, fut, sock, n) 371 fut.add_done_callback( 372 functools.partial(self._sock_read_done, fd, handle=handle)) 373 return await fut 374 375 def _sock_read_done(self, fd, fut, handle=None): 376 if handle is None or not handle.cancelled(): 377 self.remove_reader(fd) 378 379 def _sock_recv(self, fut, sock, n): 380 # _sock_recv() can add itself as an I/O callback if the operation can't 381 # be done immediately. Don't use it directly, call sock_recv(). 382 if fut.done(): 383 return 384 try: 385 data = sock.recv(n) 386 except (BlockingIOError, InterruptedError): 387 return # try again next time 388 except (SystemExit, KeyboardInterrupt): 389 raise 390 except BaseException as exc: 391 fut.set_exception(exc) 392 else: 393 fut.set_result(data) 394 395 async def sock_recv_into(self, sock, buf): 396 """Receive data from the socket. 397 398 The received data is written into *buf* (a writable buffer). 399 The return value is the number of bytes written. 400 """ 401 _check_ssl_socket(sock) 402 if self._debug and sock.gettimeout() != 0: 403 raise ValueError("the socket must be non-blocking") 404 try: 405 return sock.recv_into(buf) 406 except (BlockingIOError, InterruptedError): 407 pass 408 fut = self.create_future() 409 fd = sock.fileno() 410 self._ensure_fd_no_transport(fd) 411 handle = self._add_reader(fd, self._sock_recv_into, fut, sock, buf) 412 fut.add_done_callback( 413 functools.partial(self._sock_read_done, fd, handle=handle)) 414 return await fut 415 416 def _sock_recv_into(self, fut, sock, buf): 417 # _sock_recv_into() can add itself as an I/O callback if the operation 418 # can't be done immediately. Don't use it directly, call 419 # sock_recv_into(). 420 if fut.done(): 421 return 422 try: 423 nbytes = sock.recv_into(buf) 424 except (BlockingIOError, InterruptedError): 425 return # try again next time 426 except (SystemExit, KeyboardInterrupt): 427 raise 428 except BaseException as exc: 429 fut.set_exception(exc) 430 else: 431 fut.set_result(nbytes) 432 433 async def sock_sendall(self, sock, data): 434 """Send data to the socket. 435 436 The socket must be connected to a remote socket. This method continues 437 to send data from data until either all data has been sent or an 438 error occurs. None is returned on success. On error, an exception is 439 raised, and there is no way to determine how much data, if any, was 440 successfully processed by the receiving end of the connection. 441 """ 442 _check_ssl_socket(sock) 443 if self._debug and sock.gettimeout() != 0: 444 raise ValueError("the socket must be non-blocking") 445 try: 446 n = sock.send(data) 447 except (BlockingIOError, InterruptedError): 448 n = 0 449 450 if n == len(data): 451 # all data sent 452 return 453 454 fut = self.create_future() 455 fd = sock.fileno() 456 self._ensure_fd_no_transport(fd) 457 # use a trick with a list in closure to store a mutable state 458 handle = self._add_writer(fd, self._sock_sendall, fut, sock, 459 memoryview(data), [n]) 460 fut.add_done_callback( 461 functools.partial(self._sock_write_done, fd, handle=handle)) 462 return await fut 463 464 def _sock_sendall(self, fut, sock, view, pos): 465 if fut.done(): 466 # Future cancellation can be scheduled on previous loop iteration 467 return 468 start = pos[0] 469 try: 470 n = sock.send(view[start:]) 471 except (BlockingIOError, InterruptedError): 472 return 473 except (SystemExit, KeyboardInterrupt): 474 raise 475 except BaseException as exc: 476 fut.set_exception(exc) 477 return 478 479 start += n 480 481 if start == len(view): 482 fut.set_result(None) 483 else: 484 pos[0] = start 485 486 async def sock_connect(self, sock, address): 487 """Connect to a remote socket at address. 488 489 This method is a coroutine. 490 """ 491 _check_ssl_socket(sock) 492 if self._debug and sock.gettimeout() != 0: 493 raise ValueError("the socket must be non-blocking") 494 495 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX: 496 resolved = await self._ensure_resolved( 497 address, family=sock.family, proto=sock.proto, loop=self) 498 _, _, _, _, address = resolved[0] 499 500 fut = self.create_future() 501 self._sock_connect(fut, sock, address) 502 return await fut 503 504 def _sock_connect(self, fut, sock, address): 505 fd = sock.fileno() 506 try: 507 sock.connect(address) 508 except (BlockingIOError, InterruptedError): 509 # Issue #23618: When the C function connect() fails with EINTR, the 510 # connection runs in background. We have to wait until the socket 511 # becomes writable to be notified when the connection succeed or 512 # fails. 513 self._ensure_fd_no_transport(fd) 514 handle = self._add_writer( 515 fd, self._sock_connect_cb, fut, sock, address) 516 fut.add_done_callback( 517 functools.partial(self._sock_write_done, fd, handle=handle)) 518 except (SystemExit, KeyboardInterrupt): 519 raise 520 except BaseException as exc: 521 fut.set_exception(exc) 522 else: 523 fut.set_result(None) 524 525 def _sock_write_done(self, fd, fut, handle=None): 526 if handle is None or not handle.cancelled(): 527 self.remove_writer(fd) 528 529 def _sock_connect_cb(self, fut, sock, address): 530 if fut.done(): 531 return 532 533 try: 534 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) 535 if err != 0: 536 # Jump to any except clause below. 537 raise OSError(err, f'Connect call failed {address}') 538 except (BlockingIOError, InterruptedError): 539 # socket is still registered, the callback will be retried later 540 pass 541 except (SystemExit, KeyboardInterrupt): 542 raise 543 except BaseException as exc: 544 fut.set_exception(exc) 545 else: 546 fut.set_result(None) 547 548 async def sock_accept(self, sock): 549 """Accept a connection. 550 551 The socket must be bound to an address and listening for connections. 552 The return value is a pair (conn, address) where conn is a new socket 553 object usable to send and receive data on the connection, and address 554 is the address bound to the socket on the other end of the connection. 555 """ 556 _check_ssl_socket(sock) 557 if self._debug and sock.gettimeout() != 0: 558 raise ValueError("the socket must be non-blocking") 559 fut = self.create_future() 560 self._sock_accept(fut, sock) 561 return await fut 562 563 def _sock_accept(self, fut, sock): 564 fd = sock.fileno() 565 try: 566 conn, address = sock.accept() 567 conn.setblocking(False) 568 except (BlockingIOError, InterruptedError): 569 self._ensure_fd_no_transport(fd) 570 handle = self._add_reader(fd, self._sock_accept, fut, sock) 571 fut.add_done_callback( 572 functools.partial(self._sock_read_done, fd, handle=handle)) 573 except (SystemExit, KeyboardInterrupt): 574 raise 575 except BaseException as exc: 576 fut.set_exception(exc) 577 else: 578 fut.set_result((conn, address)) 579 580 async def _sendfile_native(self, transp, file, offset, count): 581 del self._transports[transp._sock_fd] 582 resume_reading = transp.is_reading() 583 transp.pause_reading() 584 await transp._make_empty_waiter() 585 try: 586 return await self.sock_sendfile(transp._sock, file, offset, count, 587 fallback=False) 588 finally: 589 transp._reset_empty_waiter() 590 if resume_reading: 591 transp.resume_reading() 592 self._transports[transp._sock_fd] = transp 593 594 def _process_events(self, event_list): 595 for key, mask in event_list: 596 fileobj, (reader, writer) = key.fileobj, key.data 597 if mask & selectors.EVENT_READ and reader is not None: 598 if reader._cancelled: 599 self._remove_reader(fileobj) 600 else: 601 self._add_callback(reader) 602 if mask & selectors.EVENT_WRITE and writer is not None: 603 if writer._cancelled: 604 self._remove_writer(fileobj) 605 else: 606 self._add_callback(writer) 607 608 def _stop_serving(self, sock): 609 self._remove_reader(sock.fileno()) 610 sock.close() 611 612 613class _SelectorTransport(transports._FlowControlMixin, 614 transports.Transport): 615 616 max_size = 256 * 1024 # Buffer size passed to recv(). 617 618 _buffer_factory = bytearray # Constructs initial value for self._buffer. 619 620 # Attribute used in the destructor: it must be set even if the constructor 621 # is not called (see _SelectorSslTransport which may start by raising an 622 # exception) 623 _sock = None 624 625 def __init__(self, loop, sock, protocol, extra=None, server=None): 626 super().__init__(extra, loop) 627 self._extra['socket'] = trsock.TransportSocket(sock) 628 try: 629 self._extra['sockname'] = sock.getsockname() 630 except OSError: 631 self._extra['sockname'] = None 632 if 'peername' not in self._extra: 633 try: 634 self._extra['peername'] = sock.getpeername() 635 except socket.error: 636 self._extra['peername'] = None 637 self._sock = sock 638 self._sock_fd = sock.fileno() 639 640 self._protocol_connected = False 641 self.set_protocol(protocol) 642 643 self._server = server 644 self._buffer = self._buffer_factory() 645 self._conn_lost = 0 # Set when call to connection_lost scheduled. 646 self._closing = False # Set when close() called. 647 if self._server is not None: 648 self._server._attach() 649 loop._transports[self._sock_fd] = self 650 651 def __repr__(self): 652 info = [self.__class__.__name__] 653 if self._sock is None: 654 info.append('closed') 655 elif self._closing: 656 info.append('closing') 657 info.append(f'fd={self._sock_fd}') 658 # test if the transport was closed 659 if self._loop is not None and not self._loop.is_closed(): 660 polling = _test_selector_event(self._loop._selector, 661 self._sock_fd, selectors.EVENT_READ) 662 if polling: 663 info.append('read=polling') 664 else: 665 info.append('read=idle') 666 667 polling = _test_selector_event(self._loop._selector, 668 self._sock_fd, 669 selectors.EVENT_WRITE) 670 if polling: 671 state = 'polling' 672 else: 673 state = 'idle' 674 675 bufsize = self.get_write_buffer_size() 676 info.append(f'write=<{state}, bufsize={bufsize}>') 677 return '<{}>'.format(' '.join(info)) 678 679 def abort(self): 680 self._force_close(None) 681 682 def set_protocol(self, protocol): 683 self._protocol = protocol 684 self._protocol_connected = True 685 686 def get_protocol(self): 687 return self._protocol 688 689 def is_closing(self): 690 return self._closing 691 692 def close(self): 693 if self._closing: 694 return 695 self._closing = True 696 self._loop._remove_reader(self._sock_fd) 697 if not self._buffer: 698 self._conn_lost += 1 699 self._loop._remove_writer(self._sock_fd) 700 self._loop.call_soon(self._call_connection_lost, None) 701 702 def __del__(self, _warn=warnings.warn): 703 if self._sock is not None: 704 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self) 705 self._sock.close() 706 707 def _fatal_error(self, exc, message='Fatal error on transport'): 708 # Should be called from exception handler only. 709 if isinstance(exc, OSError): 710 if self._loop.get_debug(): 711 logger.debug("%r: %s", self, message, exc_info=True) 712 else: 713 self._loop.call_exception_handler({ 714 'message': message, 715 'exception': exc, 716 'transport': self, 717 'protocol': self._protocol, 718 }) 719 self._force_close(exc) 720 721 def _force_close(self, exc): 722 if self._conn_lost: 723 return 724 if self._buffer: 725 self._buffer.clear() 726 self._loop._remove_writer(self._sock_fd) 727 if not self._closing: 728 self._closing = True 729 self._loop._remove_reader(self._sock_fd) 730 self._conn_lost += 1 731 self._loop.call_soon(self._call_connection_lost, exc) 732 733 def _call_connection_lost(self, exc): 734 try: 735 if self._protocol_connected: 736 self._protocol.connection_lost(exc) 737 finally: 738 self._sock.close() 739 self._sock = None 740 self._protocol = None 741 self._loop = None 742 server = self._server 743 if server is not None: 744 server._detach() 745 self._server = None 746 747 def get_write_buffer_size(self): 748 return len(self._buffer) 749 750 def _add_reader(self, fd, callback, *args): 751 if self._closing: 752 return 753 754 self._loop._add_reader(fd, callback, *args) 755 756 757class _SelectorSocketTransport(_SelectorTransport): 758 759 _start_tls_compatible = True 760 _sendfile_compatible = constants._SendfileMode.TRY_NATIVE 761 762 def __init__(self, loop, sock, protocol, waiter=None, 763 extra=None, server=None): 764 765 self._read_ready_cb = None 766 super().__init__(loop, sock, protocol, extra, server) 767 self._eof = False 768 self._paused = False 769 self._empty_waiter = None 770 771 # Disable the Nagle algorithm -- small writes will be 772 # sent without waiting for the TCP ACK. This generally 773 # decreases the latency (in some cases significantly.) 774 base_events._set_nodelay(self._sock) 775 776 self._loop.call_soon(self._protocol.connection_made, self) 777 # only start reading when connection_made() has been called 778 self._loop.call_soon(self._add_reader, 779 self._sock_fd, self._read_ready) 780 if waiter is not None: 781 # only wake up the waiter when connection_made() has been called 782 self._loop.call_soon(futures._set_result_unless_cancelled, 783 waiter, None) 784 785 def set_protocol(self, protocol): 786 if isinstance(protocol, protocols.BufferedProtocol): 787 self._read_ready_cb = self._read_ready__get_buffer 788 else: 789 self._read_ready_cb = self._read_ready__data_received 790 791 super().set_protocol(protocol) 792 793 def is_reading(self): 794 return not self._paused and not self._closing 795 796 def pause_reading(self): 797 if self._closing or self._paused: 798 return 799 self._paused = True 800 self._loop._remove_reader(self._sock_fd) 801 if self._loop.get_debug(): 802 logger.debug("%r pauses reading", self) 803 804 def resume_reading(self): 805 if self._closing or not self._paused: 806 return 807 self._paused = False 808 self._add_reader(self._sock_fd, self._read_ready) 809 if self._loop.get_debug(): 810 logger.debug("%r resumes reading", self) 811 812 def _read_ready(self): 813 self._read_ready_cb() 814 815 def _read_ready__get_buffer(self): 816 if self._conn_lost: 817 return 818 819 try: 820 buf = self._protocol.get_buffer(-1) 821 if not len(buf): 822 raise RuntimeError('get_buffer() returned an empty buffer') 823 except (SystemExit, KeyboardInterrupt): 824 raise 825 except BaseException as exc: 826 self._fatal_error( 827 exc, 'Fatal error: protocol.get_buffer() call failed.') 828 return 829 830 try: 831 nbytes = self._sock.recv_into(buf) 832 except (BlockingIOError, InterruptedError): 833 return 834 except (SystemExit, KeyboardInterrupt): 835 raise 836 except BaseException as exc: 837 self._fatal_error(exc, 'Fatal read error on socket transport') 838 return 839 840 if not nbytes: 841 self._read_ready__on_eof() 842 return 843 844 try: 845 self._protocol.buffer_updated(nbytes) 846 except (SystemExit, KeyboardInterrupt): 847 raise 848 except BaseException as exc: 849 self._fatal_error( 850 exc, 'Fatal error: protocol.buffer_updated() call failed.') 851 852 def _read_ready__data_received(self): 853 if self._conn_lost: 854 return 855 try: 856 data = self._sock.recv(self.max_size) 857 except (BlockingIOError, InterruptedError): 858 return 859 except (SystemExit, KeyboardInterrupt): 860 raise 861 except BaseException as exc: 862 self._fatal_error(exc, 'Fatal read error on socket transport') 863 return 864 865 if not data: 866 self._read_ready__on_eof() 867 return 868 869 try: 870 self._protocol.data_received(data) 871 except (SystemExit, KeyboardInterrupt): 872 raise 873 except BaseException as exc: 874 self._fatal_error( 875 exc, 'Fatal error: protocol.data_received() call failed.') 876 877 def _read_ready__on_eof(self): 878 if self._loop.get_debug(): 879 logger.debug("%r received EOF", self) 880 881 try: 882 keep_open = self._protocol.eof_received() 883 except (SystemExit, KeyboardInterrupt): 884 raise 885 except BaseException as exc: 886 self._fatal_error( 887 exc, 'Fatal error: protocol.eof_received() call failed.') 888 return 889 890 if keep_open: 891 # We're keeping the connection open so the 892 # protocol can write more, but we still can't 893 # receive more, so remove the reader callback. 894 self._loop._remove_reader(self._sock_fd) 895 else: 896 self.close() 897 898 def write(self, data): 899 if not isinstance(data, (bytes, bytearray, memoryview)): 900 raise TypeError(f'data argument must be a bytes-like object, ' 901 f'not {type(data).__name__!r}') 902 if self._eof: 903 raise RuntimeError('Cannot call write() after write_eof()') 904 if self._empty_waiter is not None: 905 raise RuntimeError('unable to write; sendfile is in progress') 906 if not data: 907 return 908 909 if self._conn_lost: 910 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 911 logger.warning('socket.send() raised exception.') 912 self._conn_lost += 1 913 return 914 915 if not self._buffer: 916 # Optimization: try to send now. 917 try: 918 n = self._sock.send(data) 919 except (BlockingIOError, InterruptedError): 920 pass 921 except (SystemExit, KeyboardInterrupt): 922 raise 923 except BaseException as exc: 924 self._fatal_error(exc, 'Fatal write error on socket transport') 925 return 926 else: 927 data = data[n:] 928 if not data: 929 return 930 # Not all was written; register write handler. 931 self._loop._add_writer(self._sock_fd, self._write_ready) 932 933 # Add it to the buffer. 934 self._buffer.extend(data) 935 self._maybe_pause_protocol() 936 937 def _write_ready(self): 938 assert self._buffer, 'Data should not be empty' 939 940 if self._conn_lost: 941 return 942 try: 943 n = self._sock.send(self._buffer) 944 except (BlockingIOError, InterruptedError): 945 pass 946 except (SystemExit, KeyboardInterrupt): 947 raise 948 except BaseException as exc: 949 self._loop._remove_writer(self._sock_fd) 950 self._buffer.clear() 951 self._fatal_error(exc, 'Fatal write error on socket transport') 952 if self._empty_waiter is not None: 953 self._empty_waiter.set_exception(exc) 954 else: 955 if n: 956 del self._buffer[:n] 957 self._maybe_resume_protocol() # May append to buffer. 958 if not self._buffer: 959 self._loop._remove_writer(self._sock_fd) 960 if self._empty_waiter is not None: 961 self._empty_waiter.set_result(None) 962 if self._closing: 963 self._call_connection_lost(None) 964 elif self._eof: 965 self._sock.shutdown(socket.SHUT_WR) 966 967 def write_eof(self): 968 if self._closing or self._eof: 969 return 970 self._eof = True 971 if not self._buffer: 972 self._sock.shutdown(socket.SHUT_WR) 973 974 def can_write_eof(self): 975 return True 976 977 def _call_connection_lost(self, exc): 978 super()._call_connection_lost(exc) 979 if self._empty_waiter is not None: 980 self._empty_waiter.set_exception( 981 ConnectionError("Connection is closed by peer")) 982 983 def _make_empty_waiter(self): 984 if self._empty_waiter is not None: 985 raise RuntimeError("Empty waiter is already set") 986 self._empty_waiter = self._loop.create_future() 987 if not self._buffer: 988 self._empty_waiter.set_result(None) 989 return self._empty_waiter 990 991 def _reset_empty_waiter(self): 992 self._empty_waiter = None 993 994 995class _SelectorDatagramTransport(_SelectorTransport): 996 997 _buffer_factory = collections.deque 998 999 def __init__(self, loop, sock, protocol, address=None, 1000 waiter=None, extra=None): 1001 super().__init__(loop, sock, protocol, extra) 1002 self._address = address 1003 self._loop.call_soon(self._protocol.connection_made, self) 1004 # only start reading when connection_made() has been called 1005 self._loop.call_soon(self._add_reader, 1006 self._sock_fd, self._read_ready) 1007 if waiter is not None: 1008 # only wake up the waiter when connection_made() has been called 1009 self._loop.call_soon(futures._set_result_unless_cancelled, 1010 waiter, None) 1011 1012 def get_write_buffer_size(self): 1013 return sum(len(data) for data, _ in self._buffer) 1014 1015 def _read_ready(self): 1016 if self._conn_lost: 1017 return 1018 try: 1019 data, addr = self._sock.recvfrom(self.max_size) 1020 except (BlockingIOError, InterruptedError): 1021 pass 1022 except OSError as exc: 1023 self._protocol.error_received(exc) 1024 except (SystemExit, KeyboardInterrupt): 1025 raise 1026 except BaseException as exc: 1027 self._fatal_error(exc, 'Fatal read error on datagram transport') 1028 else: 1029 self._protocol.datagram_received(data, addr) 1030 1031 def sendto(self, data, addr=None): 1032 if not isinstance(data, (bytes, bytearray, memoryview)): 1033 raise TypeError(f'data argument must be a bytes-like object, ' 1034 f'not {type(data).__name__!r}') 1035 if not data: 1036 return 1037 1038 if self._address: 1039 if addr not in (None, self._address): 1040 raise ValueError( 1041 f'Invalid address: must be None or {self._address}') 1042 addr = self._address 1043 1044 if self._conn_lost and self._address: 1045 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: 1046 logger.warning('socket.send() raised exception.') 1047 self._conn_lost += 1 1048 return 1049 1050 if not self._buffer: 1051 # Attempt to send it right away first. 1052 try: 1053 if self._extra['peername']: 1054 self._sock.send(data) 1055 else: 1056 self._sock.sendto(data, addr) 1057 return 1058 except (BlockingIOError, InterruptedError): 1059 self._loop._add_writer(self._sock_fd, self._sendto_ready) 1060 except OSError as exc: 1061 self._protocol.error_received(exc) 1062 return 1063 except (SystemExit, KeyboardInterrupt): 1064 raise 1065 except BaseException as exc: 1066 self._fatal_error( 1067 exc, 'Fatal write error on datagram transport') 1068 return 1069 1070 # Ensure that what we buffer is immutable. 1071 self._buffer.append((bytes(data), addr)) 1072 self._maybe_pause_protocol() 1073 1074 def _sendto_ready(self): 1075 while self._buffer: 1076 data, addr = self._buffer.popleft() 1077 try: 1078 if self._extra['peername']: 1079 self._sock.send(data) 1080 else: 1081 self._sock.sendto(data, addr) 1082 except (BlockingIOError, InterruptedError): 1083 self._buffer.appendleft((data, addr)) # Try again later. 1084 break 1085 except OSError as exc: 1086 self._protocol.error_received(exc) 1087 return 1088 except (SystemExit, KeyboardInterrupt): 1089 raise 1090 except BaseException as exc: 1091 self._fatal_error( 1092 exc, 'Fatal write error on datagram transport') 1093 return 1094 1095 self._maybe_resume_protocol() # May append to buffer. 1096 if not self._buffer: 1097 self._loop._remove_writer(self._sock_fd) 1098 if self._closing: 1099 self._call_connection_lost(None) 1100