1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import errno 20import heapq 21import itertools 22import os 23import socket 24import stat 25import subprocess 26import threading 27import time 28import traceback 29import sys 30import warnings 31import weakref 32 33try: 34 import ssl 35except ImportError: # pragma: no cover 36 ssl = None 37 38from . import constants 39from . import coroutines 40from . import events 41from . import exceptions 42from . import futures 43from . import protocols 44from . import sslproto 45from . import staggered 46from . import tasks 47from . import timeouts 48from . import transports 49from . import trsock 50from .log import logger 51 52 53__all__ = 'BaseEventLoop','Server', 54 55 56# Minimum number of _scheduled timer handles before cleanup of 57# cancelled handles is performed. 58_MIN_SCHEDULED_TIMER_HANDLES = 100 59 60# Minimum fraction of _scheduled timer handles that are cancelled 61# before cleanup of cancelled handles is performed. 62_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 63 64 65_HAS_IPv6 = hasattr(socket, 'AF_INET6') 66 67# Maximum timeout passed to select to avoid OS limitations 68MAXIMUM_SELECT_TIMEOUT = 24 * 3600 69 70 71def _format_handle(handle): 72 cb = handle._callback 73 if isinstance(getattr(cb, '__self__', None), tasks.Task): 74 # format the task 75 return repr(cb.__self__) 76 else: 77 return str(handle) 78 79 80def _format_pipe(fd): 81 if fd == subprocess.PIPE: 82 return '<pipe>' 83 elif fd == subprocess.STDOUT: 84 return '<stdout>' 85 else: 86 return repr(fd) 87 88 89def _set_reuseport(sock): 90 if not hasattr(socket, 'SO_REUSEPORT'): 91 raise ValueError('reuse_port not supported by socket module') 92 else: 93 try: 94 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 95 except OSError: 96 raise ValueError('reuse_port not supported by socket module, ' 97 'SO_REUSEPORT defined but not implemented.') 98 99 100def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 101 # Try to skip getaddrinfo if "host" is already an IP. Users might have 102 # handled name resolution in their own code and pass in resolved IPs. 103 if not hasattr(socket, 'inet_pton'): 104 return 105 106 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 107 host is None: 108 return None 109 110 if type == socket.SOCK_STREAM: 111 proto = socket.IPPROTO_TCP 112 elif type == socket.SOCK_DGRAM: 113 proto = socket.IPPROTO_UDP 114 else: 115 return None 116 117 if port is None: 118 port = 0 119 elif isinstance(port, bytes) and port == b'': 120 port = 0 121 elif isinstance(port, str) and port == '': 122 port = 0 123 else: 124 # If port's a service name like "http", don't skip getaddrinfo. 125 try: 126 port = int(port) 127 except (TypeError, ValueError): 128 return None 129 130 if family == socket.AF_UNSPEC: 131 afs = [socket.AF_INET] 132 if _HAS_IPv6: 133 afs.append(socket.AF_INET6) 134 else: 135 afs = [family] 136 137 if isinstance(host, bytes): 138 host = host.decode('idna') 139 if '%' in host: 140 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 141 # like '::1%lo0'. 142 return None 143 144 for af in afs: 145 try: 146 socket.inet_pton(af, host) 147 # The host has already been resolved. 148 if _HAS_IPv6 and af == socket.AF_INET6: 149 return af, type, proto, '', (host, port, flowinfo, scopeid) 150 else: 151 return af, type, proto, '', (host, port) 152 except OSError: 153 pass 154 155 # "host" is not an IP address. 156 return None 157 158 159def _interleave_addrinfos(addrinfos, first_address_family_count=1): 160 """Interleave list of addrinfo tuples by family.""" 161 # Group addresses by family 162 addrinfos_by_family = collections.OrderedDict() 163 for addr in addrinfos: 164 family = addr[0] 165 if family not in addrinfos_by_family: 166 addrinfos_by_family[family] = [] 167 addrinfos_by_family[family].append(addr) 168 addrinfos_lists = list(addrinfos_by_family.values()) 169 170 reordered = [] 171 if first_address_family_count > 1: 172 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 173 del addrinfos_lists[0][:first_address_family_count - 1] 174 reordered.extend( 175 a for a in itertools.chain.from_iterable( 176 itertools.zip_longest(*addrinfos_lists) 177 ) if a is not None) 178 return reordered 179 180 181def _run_until_complete_cb(fut): 182 if not fut.cancelled(): 183 exc = fut.exception() 184 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 185 # Issue #22429: run_forever() already finished, no need to 186 # stop it. 187 return 188 futures._get_loop(fut).stop() 189 190 191if hasattr(socket, 'TCP_NODELAY'): 192 def _set_nodelay(sock): 193 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 194 sock.type == socket.SOCK_STREAM and 195 sock.proto == socket.IPPROTO_TCP): 196 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 197else: 198 def _set_nodelay(sock): 199 pass 200 201 202def _check_ssl_socket(sock): 203 if ssl is not None and isinstance(sock, ssl.SSLSocket): 204 raise TypeError("Socket cannot be of type SSLSocket") 205 206 207class _SendfileFallbackProtocol(protocols.Protocol): 208 def __init__(self, transp): 209 if not isinstance(transp, transports._FlowControlMixin): 210 raise TypeError("transport should be _FlowControlMixin instance") 211 self._transport = transp 212 self._proto = transp.get_protocol() 213 self._should_resume_reading = transp.is_reading() 214 self._should_resume_writing = transp._protocol_paused 215 transp.pause_reading() 216 transp.set_protocol(self) 217 if self._should_resume_writing: 218 self._write_ready_fut = self._transport._loop.create_future() 219 else: 220 self._write_ready_fut = None 221 222 async def drain(self): 223 if self._transport.is_closing(): 224 raise ConnectionError("Connection closed by peer") 225 fut = self._write_ready_fut 226 if fut is None: 227 return 228 await fut 229 230 def connection_made(self, transport): 231 raise RuntimeError("Invalid state: " 232 "connection should have been established already.") 233 234 def connection_lost(self, exc): 235 if self._write_ready_fut is not None: 236 # Never happens if peer disconnects after sending the whole content 237 # Thus disconnection is always an exception from user perspective 238 if exc is None: 239 self._write_ready_fut.set_exception( 240 ConnectionError("Connection is closed by peer")) 241 else: 242 self._write_ready_fut.set_exception(exc) 243 self._proto.connection_lost(exc) 244 245 def pause_writing(self): 246 if self._write_ready_fut is not None: 247 return 248 self._write_ready_fut = self._transport._loop.create_future() 249 250 def resume_writing(self): 251 if self._write_ready_fut is None: 252 return 253 self._write_ready_fut.set_result(False) 254 self._write_ready_fut = None 255 256 def data_received(self, data): 257 raise RuntimeError("Invalid state: reading should be paused") 258 259 def eof_received(self): 260 raise RuntimeError("Invalid state: reading should be paused") 261 262 async def restore(self): 263 self._transport.set_protocol(self._proto) 264 if self._should_resume_reading: 265 self._transport.resume_reading() 266 if self._write_ready_fut is not None: 267 # Cancel the future. 268 # Basically it has no effect because protocol is switched back, 269 # no code should wait for it anymore. 270 self._write_ready_fut.cancel() 271 if self._should_resume_writing: 272 self._proto.resume_writing() 273 274 275class Server(events.AbstractServer): 276 277 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 278 ssl_handshake_timeout, ssl_shutdown_timeout=None): 279 self._loop = loop 280 self._sockets = sockets 281 # Weak references so we don't break Transport's ability to 282 # detect abandoned transports 283 self._clients = weakref.WeakSet() 284 self._waiters = [] 285 self._protocol_factory = protocol_factory 286 self._backlog = backlog 287 self._ssl_context = ssl_context 288 self._ssl_handshake_timeout = ssl_handshake_timeout 289 self._ssl_shutdown_timeout = ssl_shutdown_timeout 290 self._serving = False 291 self._serving_forever_fut = None 292 293 def __repr__(self): 294 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 295 296 def _attach(self, transport): 297 assert self._sockets is not None 298 self._clients.add(transport) 299 300 def _detach(self, transport): 301 self._clients.discard(transport) 302 if len(self._clients) == 0 and self._sockets is None: 303 self._wakeup() 304 305 def _wakeup(self): 306 waiters = self._waiters 307 self._waiters = None 308 for waiter in waiters: 309 if not waiter.done(): 310 waiter.set_result(None) 311 312 def _start_serving(self): 313 if self._serving: 314 return 315 self._serving = True 316 for sock in self._sockets: 317 sock.listen(self._backlog) 318 self._loop._start_serving( 319 self._protocol_factory, sock, self._ssl_context, 320 self, self._backlog, self._ssl_handshake_timeout, 321 self._ssl_shutdown_timeout) 322 323 def get_loop(self): 324 return self._loop 325 326 def is_serving(self): 327 return self._serving 328 329 @property 330 def sockets(self): 331 if self._sockets is None: 332 return () 333 return tuple(trsock.TransportSocket(s) for s in self._sockets) 334 335 def close(self): 336 sockets = self._sockets 337 if sockets is None: 338 return 339 self._sockets = None 340 341 for sock in sockets: 342 self._loop._stop_serving(sock) 343 344 self._serving = False 345 346 if (self._serving_forever_fut is not None and 347 not self._serving_forever_fut.done()): 348 self._serving_forever_fut.cancel() 349 self._serving_forever_fut = None 350 351 if len(self._clients) == 0: 352 self._wakeup() 353 354 def close_clients(self): 355 for transport in self._clients.copy(): 356 transport.close() 357 358 def abort_clients(self): 359 for transport in self._clients.copy(): 360 transport.abort() 361 362 async def start_serving(self): 363 self._start_serving() 364 # Skip one loop iteration so that all 'loop.add_reader' 365 # go through. 366 await tasks.sleep(0) 367 368 async def serve_forever(self): 369 if self._serving_forever_fut is not None: 370 raise RuntimeError( 371 f'server {self!r} is already being awaited on serve_forever()') 372 if self._sockets is None: 373 raise RuntimeError(f'server {self!r} is closed') 374 375 self._start_serving() 376 self._serving_forever_fut = self._loop.create_future() 377 378 try: 379 await self._serving_forever_fut 380 except exceptions.CancelledError: 381 try: 382 self.close() 383 await self.wait_closed() 384 finally: 385 raise 386 finally: 387 self._serving_forever_fut = None 388 389 async def wait_closed(self): 390 """Wait until server is closed and all connections are dropped. 391 392 - If the server is not closed, wait. 393 - If it is closed, but there are still active connections, wait. 394 395 Anyone waiting here will be unblocked once both conditions 396 (server is closed and all connections have been dropped) 397 have become true, in either order. 398 399 Historical note: In 3.11 and before, this was broken, returning 400 immediately if the server was already closed, even if there 401 were still active connections. An attempted fix in 3.12.0 was 402 still broken, returning immediately if the server was still 403 open and there were no active connections. Hopefully in 3.12.1 404 we have it right. 405 """ 406 # Waiters are unblocked by self._wakeup(), which is called 407 # from two places: self.close() and self._detach(), but only 408 # when both conditions have become true. To signal that this 409 # has happened, self._wakeup() sets self._waiters to None. 410 if self._waiters is None: 411 return 412 waiter = self._loop.create_future() 413 self._waiters.append(waiter) 414 await waiter 415 416 417class BaseEventLoop(events.AbstractEventLoop): 418 419 def __init__(self): 420 self._timer_cancelled_count = 0 421 self._closed = False 422 self._stopping = False 423 self._ready = collections.deque() 424 self._scheduled = [] 425 self._default_executor = None 426 self._internal_fds = 0 427 # Identifier of the thread running the event loop, or None if the 428 # event loop is not running 429 self._thread_id = None 430 self._clock_resolution = time.get_clock_info('monotonic').resolution 431 self._exception_handler = None 432 self.set_debug(coroutines._is_debug_mode()) 433 # The preserved state of async generator hooks. 434 self._old_agen_hooks = None 435 # In debug mode, if the execution of a callback or a step of a task 436 # exceed this duration in seconds, the slow callback/task is logged. 437 self.slow_callback_duration = 0.1 438 self._current_handle = None 439 self._task_factory = None 440 self._coroutine_origin_tracking_enabled = False 441 self._coroutine_origin_tracking_saved_depth = None 442 443 # A weak set of all asynchronous generators that are 444 # being iterated by the loop. 445 self._asyncgens = weakref.WeakSet() 446 # Set to True when `loop.shutdown_asyncgens` is called. 447 self._asyncgens_shutdown_called = False 448 # Set to True when `loop.shutdown_default_executor` is called. 449 self._executor_shutdown_called = False 450 451 def __repr__(self): 452 return ( 453 f'<{self.__class__.__name__} running={self.is_running()} ' 454 f'closed={self.is_closed()} debug={self.get_debug()}>' 455 ) 456 457 def create_future(self): 458 """Create a Future object attached to the loop.""" 459 return futures.Future(loop=self) 460 461 def create_task(self, coro, *, name=None, context=None): 462 """Schedule a coroutine object. 463 464 Return a task object. 465 """ 466 self._check_closed() 467 if self._task_factory is None: 468 task = tasks.Task(coro, loop=self, name=name, context=context) 469 if task._source_traceback: 470 del task._source_traceback[-1] 471 else: 472 if context is None: 473 # Use legacy API if context is not needed 474 task = self._task_factory(self, coro) 475 else: 476 task = self._task_factory(self, coro, context=context) 477 478 task.set_name(name) 479 480 return task 481 482 def set_task_factory(self, factory): 483 """Set a task factory that will be used by loop.create_task(). 484 485 If factory is None the default task factory will be set. 486 487 If factory is a callable, it should have a signature matching 488 '(loop, coro)', where 'loop' will be a reference to the active 489 event loop, 'coro' will be a coroutine object. The callable 490 must return a Future. 491 """ 492 if factory is not None and not callable(factory): 493 raise TypeError('task factory must be a callable or None') 494 self._task_factory = factory 495 496 def get_task_factory(self): 497 """Return a task factory, or None if the default one is in use.""" 498 return self._task_factory 499 500 def _make_socket_transport(self, sock, protocol, waiter=None, *, 501 extra=None, server=None): 502 """Create socket transport.""" 503 raise NotImplementedError 504 505 def _make_ssl_transport( 506 self, rawsock, protocol, sslcontext, waiter=None, 507 *, server_side=False, server_hostname=None, 508 extra=None, server=None, 509 ssl_handshake_timeout=None, 510 ssl_shutdown_timeout=None, 511 call_connection_made=True): 512 """Create SSL transport.""" 513 raise NotImplementedError 514 515 def _make_datagram_transport(self, sock, protocol, 516 address=None, waiter=None, extra=None): 517 """Create datagram transport.""" 518 raise NotImplementedError 519 520 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 521 extra=None): 522 """Create read pipe transport.""" 523 raise NotImplementedError 524 525 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 526 extra=None): 527 """Create write pipe transport.""" 528 raise NotImplementedError 529 530 async def _make_subprocess_transport(self, protocol, args, shell, 531 stdin, stdout, stderr, bufsize, 532 extra=None, **kwargs): 533 """Create subprocess transport.""" 534 raise NotImplementedError 535 536 def _write_to_self(self): 537 """Write a byte to self-pipe, to wake up the event loop. 538 539 This may be called from a different thread. 540 541 The subclass is responsible for implementing the self-pipe. 542 """ 543 raise NotImplementedError 544 545 def _process_events(self, event_list): 546 """Process selector events.""" 547 raise NotImplementedError 548 549 def _check_closed(self): 550 if self._closed: 551 raise RuntimeError('Event loop is closed') 552 553 def _check_default_executor(self): 554 if self._executor_shutdown_called: 555 raise RuntimeError('Executor shutdown has been called') 556 557 def _asyncgen_finalizer_hook(self, agen): 558 self._asyncgens.discard(agen) 559 if not self.is_closed(): 560 self.call_soon_threadsafe(self.create_task, agen.aclose()) 561 562 def _asyncgen_firstiter_hook(self, agen): 563 if self._asyncgens_shutdown_called: 564 warnings.warn( 565 f"asynchronous generator {agen!r} was scheduled after " 566 f"loop.shutdown_asyncgens() call", 567 ResourceWarning, source=self) 568 569 self._asyncgens.add(agen) 570 571 async def shutdown_asyncgens(self): 572 """Shutdown all active asynchronous generators.""" 573 self._asyncgens_shutdown_called = True 574 575 if not len(self._asyncgens): 576 # If Python version is <3.6 or we don't have any asynchronous 577 # generators alive. 578 return 579 580 closing_agens = list(self._asyncgens) 581 self._asyncgens.clear() 582 583 results = await tasks.gather( 584 *[ag.aclose() for ag in closing_agens], 585 return_exceptions=True) 586 587 for result, agen in zip(results, closing_agens): 588 if isinstance(result, Exception): 589 self.call_exception_handler({ 590 'message': f'an error occurred during closing of ' 591 f'asynchronous generator {agen!r}', 592 'exception': result, 593 'asyncgen': agen 594 }) 595 596 async def shutdown_default_executor(self, timeout=None): 597 """Schedule the shutdown of the default executor. 598 599 The timeout parameter specifies the amount of time the executor will 600 be given to finish joining. The default value is None, which means 601 that the executor will be given an unlimited amount of time. 602 """ 603 self._executor_shutdown_called = True 604 if self._default_executor is None: 605 return 606 future = self.create_future() 607 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 608 thread.start() 609 try: 610 async with timeouts.timeout(timeout): 611 await future 612 except TimeoutError: 613 warnings.warn("The executor did not finishing joining " 614 f"its threads within {timeout} seconds.", 615 RuntimeWarning, stacklevel=2) 616 self._default_executor.shutdown(wait=False) 617 else: 618 thread.join() 619 620 def _do_shutdown(self, future): 621 try: 622 self._default_executor.shutdown(wait=True) 623 if not self.is_closed(): 624 self.call_soon_threadsafe(futures._set_result_unless_cancelled, 625 future, None) 626 except Exception as ex: 627 if not self.is_closed() and not future.cancelled(): 628 self.call_soon_threadsafe(future.set_exception, ex) 629 630 def _check_running(self): 631 if self.is_running(): 632 raise RuntimeError('This event loop is already running') 633 if events._get_running_loop() is not None: 634 raise RuntimeError( 635 'Cannot run the event loop while another loop is running') 636 637 def _run_forever_setup(self): 638 """Prepare the run loop to process events. 639 640 This method exists so that custom custom event loop subclasses (e.g., event loops 641 that integrate a GUI event loop with Python's event loop) have access to all the 642 loop setup logic. 643 """ 644 self._check_closed() 645 self._check_running() 646 self._set_coroutine_origin_tracking(self._debug) 647 648 self._old_agen_hooks = sys.get_asyncgen_hooks() 649 self._thread_id = threading.get_ident() 650 sys.set_asyncgen_hooks( 651 firstiter=self._asyncgen_firstiter_hook, 652 finalizer=self._asyncgen_finalizer_hook 653 ) 654 655 events._set_running_loop(self) 656 657 def _run_forever_cleanup(self): 658 """Clean up after an event loop finishes the looping over events. 659 660 This method exists so that custom custom event loop subclasses (e.g., event loops 661 that integrate a GUI event loop with Python's event loop) have access to all the 662 loop cleanup logic. 663 """ 664 self._stopping = False 665 self._thread_id = None 666 events._set_running_loop(None) 667 self._set_coroutine_origin_tracking(False) 668 # Restore any pre-existing async generator hooks. 669 if self._old_agen_hooks is not None: 670 sys.set_asyncgen_hooks(*self._old_agen_hooks) 671 self._old_agen_hooks = None 672 673 def run_forever(self): 674 """Run until stop() is called.""" 675 try: 676 self._run_forever_setup() 677 while True: 678 self._run_once() 679 if self._stopping: 680 break 681 finally: 682 self._run_forever_cleanup() 683 684 def run_until_complete(self, future): 685 """Run until the Future is done. 686 687 If the argument is a coroutine, it is wrapped in a Task. 688 689 WARNING: It would be disastrous to call run_until_complete() 690 with the same coroutine twice -- it would wrap it in two 691 different Tasks and that can't be good. 692 693 Return the Future's result, or raise its exception. 694 """ 695 self._check_closed() 696 self._check_running() 697 698 new_task = not futures.isfuture(future) 699 future = tasks.ensure_future(future, loop=self) 700 if new_task: 701 # An exception is raised if the future didn't complete, so there 702 # is no need to log the "destroy pending task" message 703 future._log_destroy_pending = False 704 705 future.add_done_callback(_run_until_complete_cb) 706 try: 707 self.run_forever() 708 except: 709 if new_task and future.done() and not future.cancelled(): 710 # The coroutine raised a BaseException. Consume the exception 711 # to not log a warning, the caller doesn't have access to the 712 # local task. 713 future.exception() 714 raise 715 finally: 716 future.remove_done_callback(_run_until_complete_cb) 717 if not future.done(): 718 raise RuntimeError('Event loop stopped before Future completed.') 719 720 return future.result() 721 722 def stop(self): 723 """Stop running the event loop. 724 725 Every callback already scheduled will still run. This simply informs 726 run_forever to stop looping after a complete iteration. 727 """ 728 self._stopping = True 729 730 def close(self): 731 """Close the event loop. 732 733 This clears the queues and shuts down the executor, 734 but does not wait for the executor to finish. 735 736 The event loop must not be running. 737 """ 738 if self.is_running(): 739 raise RuntimeError("Cannot close a running event loop") 740 if self._closed: 741 return 742 if self._debug: 743 logger.debug("Close %r", self) 744 self._closed = True 745 self._ready.clear() 746 self._scheduled.clear() 747 self._executor_shutdown_called = True 748 executor = self._default_executor 749 if executor is not None: 750 self._default_executor = None 751 executor.shutdown(wait=False) 752 753 def is_closed(self): 754 """Returns True if the event loop was closed.""" 755 return self._closed 756 757 def __del__(self, _warn=warnings.warn): 758 if not self.is_closed(): 759 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 760 if not self.is_running(): 761 self.close() 762 763 def is_running(self): 764 """Returns True if the event loop is running.""" 765 return (self._thread_id is not None) 766 767 def time(self): 768 """Return the time according to the event loop's clock. 769 770 This is a float expressed in seconds since an epoch, but the 771 epoch, precision, accuracy and drift are unspecified and may 772 differ per event loop. 773 """ 774 return time.monotonic() 775 776 def call_later(self, delay, callback, *args, context=None): 777 """Arrange for a callback to be called at a given time. 778 779 Return a Handle: an opaque object with a cancel() method that 780 can be used to cancel the call. 781 782 The delay can be an int or float, expressed in seconds. It is 783 always relative to the current time. 784 785 Each callback will be called exactly once. If two callbacks 786 are scheduled for exactly the same time, it is undefined which 787 will be called first. 788 789 Any positional arguments after the callback will be passed to 790 the callback when it is called. 791 """ 792 if delay is None: 793 raise TypeError('delay must not be None') 794 timer = self.call_at(self.time() + delay, callback, *args, 795 context=context) 796 if timer._source_traceback: 797 del timer._source_traceback[-1] 798 return timer 799 800 def call_at(self, when, callback, *args, context=None): 801 """Like call_later(), but uses an absolute time. 802 803 Absolute time corresponds to the event loop's time() method. 804 """ 805 if when is None: 806 raise TypeError("when cannot be None") 807 self._check_closed() 808 if self._debug: 809 self._check_thread() 810 self._check_callback(callback, 'call_at') 811 timer = events.TimerHandle(when, callback, args, self, context) 812 if timer._source_traceback: 813 del timer._source_traceback[-1] 814 heapq.heappush(self._scheduled, timer) 815 timer._scheduled = True 816 return timer 817 818 def call_soon(self, callback, *args, context=None): 819 """Arrange for a callback to be called as soon as possible. 820 821 This operates as a FIFO queue: callbacks are called in the 822 order in which they are registered. Each callback will be 823 called exactly once. 824 825 Any positional arguments after the callback will be passed to 826 the callback when it is called. 827 """ 828 self._check_closed() 829 if self._debug: 830 self._check_thread() 831 self._check_callback(callback, 'call_soon') 832 handle = self._call_soon(callback, args, context) 833 if handle._source_traceback: 834 del handle._source_traceback[-1] 835 return handle 836 837 def _check_callback(self, callback, method): 838 if (coroutines.iscoroutine(callback) or 839 coroutines.iscoroutinefunction(callback)): 840 raise TypeError( 841 f"coroutines cannot be used with {method}()") 842 if not callable(callback): 843 raise TypeError( 844 f'a callable object was expected by {method}(), ' 845 f'got {callback!r}') 846 847 def _call_soon(self, callback, args, context): 848 handle = events.Handle(callback, args, self, context) 849 if handle._source_traceback: 850 del handle._source_traceback[-1] 851 self._ready.append(handle) 852 return handle 853 854 def _check_thread(self): 855 """Check that the current thread is the thread running the event loop. 856 857 Non-thread-safe methods of this class make this assumption and will 858 likely behave incorrectly when the assumption is violated. 859 860 Should only be called when (self._debug == True). The caller is 861 responsible for checking this condition for performance reasons. 862 """ 863 if self._thread_id is None: 864 return 865 thread_id = threading.get_ident() 866 if thread_id != self._thread_id: 867 raise RuntimeError( 868 "Non-thread-safe operation invoked on an event loop other " 869 "than the current one") 870 871 def call_soon_threadsafe(self, callback, *args, context=None): 872 """Like call_soon(), but thread-safe.""" 873 self._check_closed() 874 if self._debug: 875 self._check_callback(callback, 'call_soon_threadsafe') 876 handle = self._call_soon(callback, args, context) 877 if handle._source_traceback: 878 del handle._source_traceback[-1] 879 self._write_to_self() 880 return handle 881 882 def run_in_executor(self, executor, func, *args): 883 self._check_closed() 884 if self._debug: 885 self._check_callback(func, 'run_in_executor') 886 if executor is None: 887 executor = self._default_executor 888 # Only check when the default executor is being used 889 self._check_default_executor() 890 if executor is None: 891 executor = concurrent.futures.ThreadPoolExecutor( 892 thread_name_prefix='asyncio' 893 ) 894 self._default_executor = executor 895 return futures.wrap_future( 896 executor.submit(func, *args), loop=self) 897 898 def set_default_executor(self, executor): 899 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 900 raise TypeError('executor must be ThreadPoolExecutor instance') 901 self._default_executor = executor 902 903 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 904 msg = [f"{host}:{port!r}"] 905 if family: 906 msg.append(f'family={family!r}') 907 if type: 908 msg.append(f'type={type!r}') 909 if proto: 910 msg.append(f'proto={proto!r}') 911 if flags: 912 msg.append(f'flags={flags!r}') 913 msg = ', '.join(msg) 914 logger.debug('Get address info %s', msg) 915 916 t0 = self.time() 917 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 918 dt = self.time() - t0 919 920 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 921 if dt >= self.slow_callback_duration: 922 logger.info(msg) 923 else: 924 logger.debug(msg) 925 return addrinfo 926 927 async def getaddrinfo(self, host, port, *, 928 family=0, type=0, proto=0, flags=0): 929 if self._debug: 930 getaddr_func = self._getaddrinfo_debug 931 else: 932 getaddr_func = socket.getaddrinfo 933 934 return await self.run_in_executor( 935 None, getaddr_func, host, port, family, type, proto, flags) 936 937 async def getnameinfo(self, sockaddr, flags=0): 938 return await self.run_in_executor( 939 None, socket.getnameinfo, sockaddr, flags) 940 941 async def sock_sendfile(self, sock, file, offset=0, count=None, 942 *, fallback=True): 943 if self._debug and sock.gettimeout() != 0: 944 raise ValueError("the socket must be non-blocking") 945 _check_ssl_socket(sock) 946 self._check_sendfile_params(sock, file, offset, count) 947 try: 948 return await self._sock_sendfile_native(sock, file, 949 offset, count) 950 except exceptions.SendfileNotAvailableError as exc: 951 if not fallback: 952 raise 953 return await self._sock_sendfile_fallback(sock, file, 954 offset, count) 955 956 async def _sock_sendfile_native(self, sock, file, offset, count): 957 # NB: sendfile syscall is not supported for SSL sockets and 958 # non-mmap files even if sendfile is supported by OS 959 raise exceptions.SendfileNotAvailableError( 960 f"syscall sendfile is not available for socket {sock!r} " 961 f"and file {file!r} combination") 962 963 async def _sock_sendfile_fallback(self, sock, file, offset, count): 964 if offset: 965 file.seek(offset) 966 blocksize = ( 967 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 968 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 969 ) 970 buf = bytearray(blocksize) 971 total_sent = 0 972 try: 973 while True: 974 if count: 975 blocksize = min(count - total_sent, blocksize) 976 if blocksize <= 0: 977 break 978 view = memoryview(buf)[:blocksize] 979 read = await self.run_in_executor(None, file.readinto, view) 980 if not read: 981 break # EOF 982 await self.sock_sendall(sock, view[:read]) 983 total_sent += read 984 return total_sent 985 finally: 986 if total_sent > 0 and hasattr(file, 'seek'): 987 file.seek(offset + total_sent) 988 989 def _check_sendfile_params(self, sock, file, offset, count): 990 if 'b' not in getattr(file, 'mode', 'b'): 991 raise ValueError("file should be opened in binary mode") 992 if not sock.type == socket.SOCK_STREAM: 993 raise ValueError("only SOCK_STREAM type sockets are supported") 994 if count is not None: 995 if not isinstance(count, int): 996 raise TypeError( 997 "count must be a positive integer (got {!r})".format(count)) 998 if count <= 0: 999 raise ValueError( 1000 "count must be a positive integer (got {!r})".format(count)) 1001 if not isinstance(offset, int): 1002 raise TypeError( 1003 "offset must be a non-negative integer (got {!r})".format( 1004 offset)) 1005 if offset < 0: 1006 raise ValueError( 1007 "offset must be a non-negative integer (got {!r})".format( 1008 offset)) 1009 1010 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 1011 """Create, bind and connect one socket.""" 1012 my_exceptions = [] 1013 exceptions.append(my_exceptions) 1014 family, type_, proto, _, address = addr_info 1015 sock = None 1016 try: 1017 sock = socket.socket(family=family, type=type_, proto=proto) 1018 sock.setblocking(False) 1019 if local_addr_infos is not None: 1020 for lfamily, _, _, _, laddr in local_addr_infos: 1021 # skip local addresses of different family 1022 if lfamily != family: 1023 continue 1024 try: 1025 sock.bind(laddr) 1026 break 1027 except OSError as exc: 1028 msg = ( 1029 f'error while attempting to bind on ' 1030 f'address {laddr!r}: {str(exc).lower()}' 1031 ) 1032 exc = OSError(exc.errno, msg) 1033 my_exceptions.append(exc) 1034 else: # all bind attempts failed 1035 if my_exceptions: 1036 raise my_exceptions.pop() 1037 else: 1038 raise OSError(f"no matching local address with {family=} found") 1039 await self.sock_connect(sock, address) 1040 return sock 1041 except OSError as exc: 1042 my_exceptions.append(exc) 1043 if sock is not None: 1044 sock.close() 1045 raise 1046 except: 1047 if sock is not None: 1048 sock.close() 1049 raise 1050 finally: 1051 exceptions = my_exceptions = None 1052 1053 async def create_connection( 1054 self, protocol_factory, host=None, port=None, 1055 *, ssl=None, family=0, 1056 proto=0, flags=0, sock=None, 1057 local_addr=None, server_hostname=None, 1058 ssl_handshake_timeout=None, 1059 ssl_shutdown_timeout=None, 1060 happy_eyeballs_delay=None, interleave=None, 1061 all_errors=False): 1062 """Connect to a TCP server. 1063 1064 Create a streaming transport connection to a given internet host and 1065 port: socket family AF_INET or socket.AF_INET6 depending on host (or 1066 family if specified), socket type SOCK_STREAM. protocol_factory must be 1067 a callable returning a protocol instance. 1068 1069 This method is a coroutine which will try to establish the connection 1070 in the background. When successful, the coroutine returns a 1071 (transport, protocol) pair. 1072 """ 1073 if server_hostname is not None and not ssl: 1074 raise ValueError('server_hostname is only meaningful with ssl') 1075 1076 if server_hostname is None and ssl: 1077 # Use host as default for server_hostname. It is an error 1078 # if host is empty or not set, e.g. when an 1079 # already-connected socket was passed or when only a port 1080 # is given. To avoid this error, you can pass 1081 # server_hostname='' -- this will bypass the hostname 1082 # check. (This also means that if host is a numeric 1083 # IP/IPv6 address, we will attempt to verify that exact 1084 # address; this will probably fail, but it is possible to 1085 # create a certificate for a specific IP address, so we 1086 # don't judge it here.) 1087 if not host: 1088 raise ValueError('You must set server_hostname ' 1089 'when using ssl without a host') 1090 server_hostname = host 1091 1092 if ssl_handshake_timeout is not None and not ssl: 1093 raise ValueError( 1094 'ssl_handshake_timeout is only meaningful with ssl') 1095 1096 if ssl_shutdown_timeout is not None and not ssl: 1097 raise ValueError( 1098 'ssl_shutdown_timeout is only meaningful with ssl') 1099 1100 if sock is not None: 1101 _check_ssl_socket(sock) 1102 1103 if happy_eyeballs_delay is not None and interleave is None: 1104 # If using happy eyeballs, default to interleave addresses by family 1105 interleave = 1 1106 1107 if host is not None or port is not None: 1108 if sock is not None: 1109 raise ValueError( 1110 'host/port and sock can not be specified at the same time') 1111 1112 infos = await self._ensure_resolved( 1113 (host, port), family=family, 1114 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 1115 if not infos: 1116 raise OSError('getaddrinfo() returned empty list') 1117 1118 if local_addr is not None: 1119 laddr_infos = await self._ensure_resolved( 1120 local_addr, family=family, 1121 type=socket.SOCK_STREAM, proto=proto, 1122 flags=flags, loop=self) 1123 if not laddr_infos: 1124 raise OSError('getaddrinfo() returned empty list') 1125 else: 1126 laddr_infos = None 1127 1128 if interleave: 1129 infos = _interleave_addrinfos(infos, interleave) 1130 1131 exceptions = [] 1132 if happy_eyeballs_delay is None: 1133 # not using happy eyeballs 1134 for addrinfo in infos: 1135 try: 1136 sock = await self._connect_sock( 1137 exceptions, addrinfo, laddr_infos) 1138 break 1139 except OSError: 1140 continue 1141 else: # using happy eyeballs 1142 sock = (await staggered.staggered_race( 1143 ( 1144 # can't use functools.partial as it keeps a reference 1145 # to exceptions 1146 lambda addrinfo=addrinfo: self._connect_sock( 1147 exceptions, addrinfo, laddr_infos 1148 ) 1149 for addrinfo in infos 1150 ), 1151 happy_eyeballs_delay, 1152 loop=self, 1153 ))[0] # can't use sock, _, _ as it keeks a reference to exceptions 1154 1155 if sock is None: 1156 exceptions = [exc for sub in exceptions for exc in sub] 1157 try: 1158 if all_errors: 1159 raise ExceptionGroup("create_connection failed", exceptions) 1160 if len(exceptions) == 1: 1161 raise exceptions[0] 1162 else: 1163 # If they all have the same str(), raise one. 1164 model = str(exceptions[0]) 1165 if all(str(exc) == model for exc in exceptions): 1166 raise exceptions[0] 1167 # Raise a combined exception so the user can see all 1168 # the various error messages. 1169 raise OSError('Multiple exceptions: {}'.format( 1170 ', '.join(str(exc) for exc in exceptions))) 1171 finally: 1172 exceptions = None 1173 1174 else: 1175 if sock is None: 1176 raise ValueError( 1177 'host and port was not specified and no sock specified') 1178 if sock.type != socket.SOCK_STREAM: 1179 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1180 # are SOCK_STREAM. 1181 # We support passing AF_UNIX sockets even though we have 1182 # a dedicated API for that: create_unix_connection. 1183 # Disallowing AF_UNIX in this method, breaks backwards 1184 # compatibility. 1185 raise ValueError( 1186 f'A Stream Socket was expected, got {sock!r}') 1187 1188 transport, protocol = await self._create_connection_transport( 1189 sock, protocol_factory, ssl, server_hostname, 1190 ssl_handshake_timeout=ssl_handshake_timeout, 1191 ssl_shutdown_timeout=ssl_shutdown_timeout) 1192 if self._debug: 1193 # Get the socket from the transport because SSL transport closes 1194 # the old socket and creates a new SSL socket 1195 sock = transport.get_extra_info('socket') 1196 logger.debug("%r connected to %s:%r: (%r, %r)", 1197 sock, host, port, transport, protocol) 1198 return transport, protocol 1199 1200 async def _create_connection_transport( 1201 self, sock, protocol_factory, ssl, 1202 server_hostname, server_side=False, 1203 ssl_handshake_timeout=None, 1204 ssl_shutdown_timeout=None): 1205 1206 sock.setblocking(False) 1207 1208 protocol = protocol_factory() 1209 waiter = self.create_future() 1210 if ssl: 1211 sslcontext = None if isinstance(ssl, bool) else ssl 1212 transport = self._make_ssl_transport( 1213 sock, protocol, sslcontext, waiter, 1214 server_side=server_side, server_hostname=server_hostname, 1215 ssl_handshake_timeout=ssl_handshake_timeout, 1216 ssl_shutdown_timeout=ssl_shutdown_timeout) 1217 else: 1218 transport = self._make_socket_transport(sock, protocol, waiter) 1219 1220 try: 1221 await waiter 1222 except: 1223 transport.close() 1224 raise 1225 1226 return transport, protocol 1227 1228 async def sendfile(self, transport, file, offset=0, count=None, 1229 *, fallback=True): 1230 """Send a file to transport. 1231 1232 Return the total number of bytes which were sent. 1233 1234 The method uses high-performance os.sendfile if available. 1235 1236 file must be a regular file object opened in binary mode. 1237 1238 offset tells from where to start reading the file. If specified, 1239 count is the total number of bytes to transmit as opposed to 1240 sending the file until EOF is reached. File position is updated on 1241 return or also in case of error in which case file.tell() 1242 can be used to figure out the number of bytes 1243 which were sent. 1244 1245 fallback set to True makes asyncio to manually read and send 1246 the file when the platform does not support the sendfile syscall 1247 (e.g. Windows or SSL socket on Unix). 1248 1249 Raise SendfileNotAvailableError if the system does not support 1250 sendfile syscall and fallback is False. 1251 """ 1252 if transport.is_closing(): 1253 raise RuntimeError("Transport is closing") 1254 mode = getattr(transport, '_sendfile_compatible', 1255 constants._SendfileMode.UNSUPPORTED) 1256 if mode is constants._SendfileMode.UNSUPPORTED: 1257 raise RuntimeError( 1258 f"sendfile is not supported for transport {transport!r}") 1259 if mode is constants._SendfileMode.TRY_NATIVE: 1260 try: 1261 return await self._sendfile_native(transport, file, 1262 offset, count) 1263 except exceptions.SendfileNotAvailableError as exc: 1264 if not fallback: 1265 raise 1266 1267 if not fallback: 1268 raise RuntimeError( 1269 f"fallback is disabled and native sendfile is not " 1270 f"supported for transport {transport!r}") 1271 1272 return await self._sendfile_fallback(transport, file, 1273 offset, count) 1274 1275 async def _sendfile_native(self, transp, file, offset, count): 1276 raise exceptions.SendfileNotAvailableError( 1277 "sendfile syscall is not supported") 1278 1279 async def _sendfile_fallback(self, transp, file, offset, count): 1280 if offset: 1281 file.seek(offset) 1282 blocksize = min(count, 16384) if count else 16384 1283 buf = bytearray(blocksize) 1284 total_sent = 0 1285 proto = _SendfileFallbackProtocol(transp) 1286 try: 1287 while True: 1288 if count: 1289 blocksize = min(count - total_sent, blocksize) 1290 if blocksize <= 0: 1291 return total_sent 1292 view = memoryview(buf)[:blocksize] 1293 read = await self.run_in_executor(None, file.readinto, view) 1294 if not read: 1295 return total_sent # EOF 1296 await proto.drain() 1297 transp.write(view[:read]) 1298 total_sent += read 1299 finally: 1300 if total_sent > 0 and hasattr(file, 'seek'): 1301 file.seek(offset + total_sent) 1302 await proto.restore() 1303 1304 async def start_tls(self, transport, protocol, sslcontext, *, 1305 server_side=False, 1306 server_hostname=None, 1307 ssl_handshake_timeout=None, 1308 ssl_shutdown_timeout=None): 1309 """Upgrade transport to TLS. 1310 1311 Return a new transport that *protocol* should start using 1312 immediately. 1313 """ 1314 if ssl is None: 1315 raise RuntimeError('Python ssl module is not available') 1316 1317 if not isinstance(sslcontext, ssl.SSLContext): 1318 raise TypeError( 1319 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1320 f'got {sslcontext!r}') 1321 1322 if not getattr(transport, '_start_tls_compatible', False): 1323 raise TypeError( 1324 f'transport {transport!r} is not supported by start_tls()') 1325 1326 waiter = self.create_future() 1327 ssl_protocol = sslproto.SSLProtocol( 1328 self, protocol, sslcontext, waiter, 1329 server_side, server_hostname, 1330 ssl_handshake_timeout=ssl_handshake_timeout, 1331 ssl_shutdown_timeout=ssl_shutdown_timeout, 1332 call_connection_made=False) 1333 1334 # Pause early so that "ssl_protocol.data_received()" doesn't 1335 # have a chance to get called before "ssl_protocol.connection_made()". 1336 transport.pause_reading() 1337 1338 transport.set_protocol(ssl_protocol) 1339 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1340 resume_cb = self.call_soon(transport.resume_reading) 1341 1342 try: 1343 await waiter 1344 except BaseException: 1345 transport.close() 1346 conmade_cb.cancel() 1347 resume_cb.cancel() 1348 raise 1349 1350 return ssl_protocol._app_transport 1351 1352 async def create_datagram_endpoint(self, protocol_factory, 1353 local_addr=None, remote_addr=None, *, 1354 family=0, proto=0, flags=0, 1355 reuse_port=None, 1356 allow_broadcast=None, sock=None): 1357 """Create datagram connection.""" 1358 if sock is not None: 1359 if sock.type == socket.SOCK_STREAM: 1360 raise ValueError( 1361 f'A datagram socket was expected, got {sock!r}') 1362 if (local_addr or remote_addr or 1363 family or proto or flags or 1364 reuse_port or allow_broadcast): 1365 # show the problematic kwargs in exception msg 1366 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1367 family=family, proto=proto, flags=flags, 1368 reuse_port=reuse_port, 1369 allow_broadcast=allow_broadcast) 1370 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1371 raise ValueError( 1372 f'socket modifier keyword arguments can not be used ' 1373 f'when sock is specified. ({problems})') 1374 sock.setblocking(False) 1375 r_addr = None 1376 else: 1377 if not (local_addr or remote_addr): 1378 if family == 0: 1379 raise ValueError('unexpected address family') 1380 addr_pairs_info = (((family, proto), (None, None)),) 1381 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1382 for addr in (local_addr, remote_addr): 1383 if addr is not None and not isinstance(addr, str): 1384 raise TypeError('string is expected') 1385 1386 if local_addr and local_addr[0] not in (0, '\x00'): 1387 try: 1388 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1389 os.remove(local_addr) 1390 except FileNotFoundError: 1391 pass 1392 except OSError as err: 1393 # Directory may have permissions only to create socket. 1394 logger.error('Unable to check or remove stale UNIX ' 1395 'socket %r: %r', 1396 local_addr, err) 1397 1398 addr_pairs_info = (((family, proto), 1399 (local_addr, remote_addr)), ) 1400 else: 1401 # join address by (family, protocol) 1402 addr_infos = {} # Using order preserving dict 1403 for idx, addr in ((0, local_addr), (1, remote_addr)): 1404 if addr is not None: 1405 if not (isinstance(addr, tuple) and len(addr) == 2): 1406 raise TypeError('2-tuple is expected') 1407 1408 infos = await self._ensure_resolved( 1409 addr, family=family, type=socket.SOCK_DGRAM, 1410 proto=proto, flags=flags, loop=self) 1411 if not infos: 1412 raise OSError('getaddrinfo() returned empty list') 1413 1414 for fam, _, pro, _, address in infos: 1415 key = (fam, pro) 1416 if key not in addr_infos: 1417 addr_infos[key] = [None, None] 1418 addr_infos[key][idx] = address 1419 1420 # each addr has to have info for each (family, proto) pair 1421 addr_pairs_info = [ 1422 (key, addr_pair) for key, addr_pair in addr_infos.items() 1423 if not ((local_addr and addr_pair[0] is None) or 1424 (remote_addr and addr_pair[1] is None))] 1425 1426 if not addr_pairs_info: 1427 raise ValueError('can not get address information') 1428 1429 exceptions = [] 1430 1431 for ((family, proto), 1432 (local_address, remote_address)) in addr_pairs_info: 1433 sock = None 1434 r_addr = None 1435 try: 1436 sock = socket.socket( 1437 family=family, type=socket.SOCK_DGRAM, proto=proto) 1438 if reuse_port: 1439 _set_reuseport(sock) 1440 if allow_broadcast: 1441 sock.setsockopt( 1442 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1443 sock.setblocking(False) 1444 1445 if local_addr: 1446 sock.bind(local_address) 1447 if remote_addr: 1448 if not allow_broadcast: 1449 await self.sock_connect(sock, remote_address) 1450 r_addr = remote_address 1451 except OSError as exc: 1452 if sock is not None: 1453 sock.close() 1454 exceptions.append(exc) 1455 except: 1456 if sock is not None: 1457 sock.close() 1458 raise 1459 else: 1460 break 1461 else: 1462 raise exceptions[0] 1463 1464 protocol = protocol_factory() 1465 waiter = self.create_future() 1466 transport = self._make_datagram_transport( 1467 sock, protocol, r_addr, waiter) 1468 if self._debug: 1469 if local_addr: 1470 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1471 "created: (%r, %r)", 1472 local_addr, remote_addr, transport, protocol) 1473 else: 1474 logger.debug("Datagram endpoint remote_addr=%r created: " 1475 "(%r, %r)", 1476 remote_addr, transport, protocol) 1477 1478 try: 1479 await waiter 1480 except: 1481 transport.close() 1482 raise 1483 1484 return transport, protocol 1485 1486 async def _ensure_resolved(self, address, *, 1487 family=0, type=socket.SOCK_STREAM, 1488 proto=0, flags=0, loop): 1489 host, port = address[:2] 1490 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1491 if info is not None: 1492 # "host" is already a resolved IP. 1493 return [info] 1494 else: 1495 return await loop.getaddrinfo(host, port, family=family, type=type, 1496 proto=proto, flags=flags) 1497 1498 async def _create_server_getaddrinfo(self, host, port, family, flags): 1499 infos = await self._ensure_resolved((host, port), family=family, 1500 type=socket.SOCK_STREAM, 1501 flags=flags, loop=self) 1502 if not infos: 1503 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1504 return infos 1505 1506 async def create_server( 1507 self, protocol_factory, host=None, port=None, 1508 *, 1509 family=socket.AF_UNSPEC, 1510 flags=socket.AI_PASSIVE, 1511 sock=None, 1512 backlog=100, 1513 ssl=None, 1514 reuse_address=None, 1515 reuse_port=None, 1516 keep_alive=None, 1517 ssl_handshake_timeout=None, 1518 ssl_shutdown_timeout=None, 1519 start_serving=True): 1520 """Create a TCP server. 1521 1522 The host parameter can be a string, in that case the TCP server is 1523 bound to host and port. 1524 1525 The host parameter can also be a sequence of strings and in that case 1526 the TCP server is bound to all hosts of the sequence. If a host 1527 appears multiple times (possibly indirectly e.g. when hostnames 1528 resolve to the same IP address), the server is only bound once to that 1529 host. 1530 1531 Return a Server object which can be used to stop the service. 1532 1533 This method is a coroutine. 1534 """ 1535 if isinstance(ssl, bool): 1536 raise TypeError('ssl argument must be an SSLContext or None') 1537 1538 if ssl_handshake_timeout is not None and ssl is None: 1539 raise ValueError( 1540 'ssl_handshake_timeout is only meaningful with ssl') 1541 1542 if ssl_shutdown_timeout is not None and ssl is None: 1543 raise ValueError( 1544 'ssl_shutdown_timeout is only meaningful with ssl') 1545 1546 if sock is not None: 1547 _check_ssl_socket(sock) 1548 1549 if host is not None or port is not None: 1550 if sock is not None: 1551 raise ValueError( 1552 'host/port and sock can not be specified at the same time') 1553 1554 if reuse_address is None: 1555 reuse_address = os.name == "posix" and sys.platform != "cygwin" 1556 sockets = [] 1557 if host == '': 1558 hosts = [None] 1559 elif (isinstance(host, str) or 1560 not isinstance(host, collections.abc.Iterable)): 1561 hosts = [host] 1562 else: 1563 hosts = host 1564 1565 fs = [self._create_server_getaddrinfo(host, port, family=family, 1566 flags=flags) 1567 for host in hosts] 1568 infos = await tasks.gather(*fs) 1569 infos = set(itertools.chain.from_iterable(infos)) 1570 1571 completed = False 1572 try: 1573 for res in infos: 1574 af, socktype, proto, canonname, sa = res 1575 try: 1576 sock = socket.socket(af, socktype, proto) 1577 except socket.error: 1578 # Assume it's a bad family/type/protocol combination. 1579 if self._debug: 1580 logger.warning('create_server() failed to create ' 1581 'socket.socket(%r, %r, %r)', 1582 af, socktype, proto, exc_info=True) 1583 continue 1584 sockets.append(sock) 1585 if reuse_address: 1586 sock.setsockopt( 1587 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1588 if reuse_port: 1589 _set_reuseport(sock) 1590 if keep_alive: 1591 sock.setsockopt( 1592 socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 1593 # Disable IPv4/IPv6 dual stack support (enabled by 1594 # default on Linux) which makes a single socket 1595 # listen on both address families. 1596 if (_HAS_IPv6 and 1597 af == socket.AF_INET6 and 1598 hasattr(socket, 'IPPROTO_IPV6')): 1599 sock.setsockopt(socket.IPPROTO_IPV6, 1600 socket.IPV6_V6ONLY, 1601 True) 1602 try: 1603 sock.bind(sa) 1604 except OSError as err: 1605 msg = ('error while attempting ' 1606 'to bind on address %r: %s' 1607 % (sa, str(err).lower())) 1608 if err.errno == errno.EADDRNOTAVAIL: 1609 # Assume the family is not enabled (bpo-30945) 1610 sockets.pop() 1611 sock.close() 1612 if self._debug: 1613 logger.warning(msg) 1614 continue 1615 raise OSError(err.errno, msg) from None 1616 1617 if not sockets: 1618 raise OSError('could not bind on any address out of %r' 1619 % ([info[4] for info in infos],)) 1620 1621 completed = True 1622 finally: 1623 if not completed: 1624 for sock in sockets: 1625 sock.close() 1626 else: 1627 if sock is None: 1628 raise ValueError('Neither host/port nor sock were specified') 1629 if sock.type != socket.SOCK_STREAM: 1630 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1631 sockets = [sock] 1632 1633 for sock in sockets: 1634 sock.setblocking(False) 1635 1636 server = Server(self, sockets, protocol_factory, 1637 ssl, backlog, ssl_handshake_timeout, 1638 ssl_shutdown_timeout) 1639 if start_serving: 1640 server._start_serving() 1641 # Skip one loop iteration so that all 'loop.add_reader' 1642 # go through. 1643 await tasks.sleep(0) 1644 1645 if self._debug: 1646 logger.info("%r is serving", server) 1647 return server 1648 1649 async def connect_accepted_socket( 1650 self, protocol_factory, sock, 1651 *, ssl=None, 1652 ssl_handshake_timeout=None, 1653 ssl_shutdown_timeout=None): 1654 if sock.type != socket.SOCK_STREAM: 1655 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1656 1657 if ssl_handshake_timeout is not None and not ssl: 1658 raise ValueError( 1659 'ssl_handshake_timeout is only meaningful with ssl') 1660 1661 if ssl_shutdown_timeout is not None and not ssl: 1662 raise ValueError( 1663 'ssl_shutdown_timeout is only meaningful with ssl') 1664 1665 if sock is not None: 1666 _check_ssl_socket(sock) 1667 1668 transport, protocol = await self._create_connection_transport( 1669 sock, protocol_factory, ssl, '', server_side=True, 1670 ssl_handshake_timeout=ssl_handshake_timeout, 1671 ssl_shutdown_timeout=ssl_shutdown_timeout) 1672 if self._debug: 1673 # Get the socket from the transport because SSL transport closes 1674 # the old socket and creates a new SSL socket 1675 sock = transport.get_extra_info('socket') 1676 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1677 return transport, protocol 1678 1679 async def connect_read_pipe(self, protocol_factory, pipe): 1680 protocol = protocol_factory() 1681 waiter = self.create_future() 1682 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1683 1684 try: 1685 await waiter 1686 except: 1687 transport.close() 1688 raise 1689 1690 if self._debug: 1691 logger.debug('Read pipe %r connected: (%r, %r)', 1692 pipe.fileno(), transport, protocol) 1693 return transport, protocol 1694 1695 async def connect_write_pipe(self, protocol_factory, pipe): 1696 protocol = protocol_factory() 1697 waiter = self.create_future() 1698 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1699 1700 try: 1701 await waiter 1702 except: 1703 transport.close() 1704 raise 1705 1706 if self._debug: 1707 logger.debug('Write pipe %r connected: (%r, %r)', 1708 pipe.fileno(), transport, protocol) 1709 return transport, protocol 1710 1711 def _log_subprocess(self, msg, stdin, stdout, stderr): 1712 info = [msg] 1713 if stdin is not None: 1714 info.append(f'stdin={_format_pipe(stdin)}') 1715 if stdout is not None and stderr == subprocess.STDOUT: 1716 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1717 else: 1718 if stdout is not None: 1719 info.append(f'stdout={_format_pipe(stdout)}') 1720 if stderr is not None: 1721 info.append(f'stderr={_format_pipe(stderr)}') 1722 logger.debug(' '.join(info)) 1723 1724 async def subprocess_shell(self, protocol_factory, cmd, *, 1725 stdin=subprocess.PIPE, 1726 stdout=subprocess.PIPE, 1727 stderr=subprocess.PIPE, 1728 universal_newlines=False, 1729 shell=True, bufsize=0, 1730 encoding=None, errors=None, text=None, 1731 **kwargs): 1732 if not isinstance(cmd, (bytes, str)): 1733 raise ValueError("cmd must be a string") 1734 if universal_newlines: 1735 raise ValueError("universal_newlines must be False") 1736 if not shell: 1737 raise ValueError("shell must be True") 1738 if bufsize != 0: 1739 raise ValueError("bufsize must be 0") 1740 if text: 1741 raise ValueError("text must be False") 1742 if encoding is not None: 1743 raise ValueError("encoding must be None") 1744 if errors is not None: 1745 raise ValueError("errors must be None") 1746 1747 protocol = protocol_factory() 1748 debug_log = None 1749 if self._debug: 1750 # don't log parameters: they may contain sensitive information 1751 # (password) and may be too long 1752 debug_log = 'run shell command %r' % cmd 1753 self._log_subprocess(debug_log, stdin, stdout, stderr) 1754 transport = await self._make_subprocess_transport( 1755 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1756 if self._debug and debug_log is not None: 1757 logger.info('%s: %r', debug_log, transport) 1758 return transport, protocol 1759 1760 async def subprocess_exec(self, protocol_factory, program, *args, 1761 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1762 stderr=subprocess.PIPE, universal_newlines=False, 1763 shell=False, bufsize=0, 1764 encoding=None, errors=None, text=None, 1765 **kwargs): 1766 if universal_newlines: 1767 raise ValueError("universal_newlines must be False") 1768 if shell: 1769 raise ValueError("shell must be False") 1770 if bufsize != 0: 1771 raise ValueError("bufsize must be 0") 1772 if text: 1773 raise ValueError("text must be False") 1774 if encoding is not None: 1775 raise ValueError("encoding must be None") 1776 if errors is not None: 1777 raise ValueError("errors must be None") 1778 1779 popen_args = (program,) + args 1780 protocol = protocol_factory() 1781 debug_log = None 1782 if self._debug: 1783 # don't log parameters: they may contain sensitive information 1784 # (password) and may be too long 1785 debug_log = f'execute program {program!r}' 1786 self._log_subprocess(debug_log, stdin, stdout, stderr) 1787 transport = await self._make_subprocess_transport( 1788 protocol, popen_args, False, stdin, stdout, stderr, 1789 bufsize, **kwargs) 1790 if self._debug and debug_log is not None: 1791 logger.info('%s: %r', debug_log, transport) 1792 return transport, protocol 1793 1794 def get_exception_handler(self): 1795 """Return an exception handler, or None if the default one is in use. 1796 """ 1797 return self._exception_handler 1798 1799 def set_exception_handler(self, handler): 1800 """Set handler as the new event loop exception handler. 1801 1802 If handler is None, the default exception handler will 1803 be set. 1804 1805 If handler is a callable object, it should have a 1806 signature matching '(loop, context)', where 'loop' 1807 will be a reference to the active event loop, 'context' 1808 will be a dict object (see `call_exception_handler()` 1809 documentation for details about context). 1810 """ 1811 if handler is not None and not callable(handler): 1812 raise TypeError(f'A callable object or None is expected, ' 1813 f'got {handler!r}') 1814 self._exception_handler = handler 1815 1816 def default_exception_handler(self, context): 1817 """Default exception handler. 1818 1819 This is called when an exception occurs and no exception 1820 handler is set, and can be called by a custom exception 1821 handler that wants to defer to the default behavior. 1822 1823 This default handler logs the error message and other 1824 context-dependent information. In debug mode, a truncated 1825 stack trace is also appended showing where the given object 1826 (e.g. a handle or future or task) was created, if any. 1827 1828 The context parameter has the same meaning as in 1829 `call_exception_handler()`. 1830 """ 1831 message = context.get('message') 1832 if not message: 1833 message = 'Unhandled exception in event loop' 1834 1835 exception = context.get('exception') 1836 if exception is not None: 1837 exc_info = (type(exception), exception, exception.__traceback__) 1838 else: 1839 exc_info = False 1840 1841 if ('source_traceback' not in context and 1842 self._current_handle is not None and 1843 self._current_handle._source_traceback): 1844 context['handle_traceback'] = \ 1845 self._current_handle._source_traceback 1846 1847 log_lines = [message] 1848 for key in sorted(context): 1849 if key in {'message', 'exception'}: 1850 continue 1851 value = context[key] 1852 if key == 'source_traceback': 1853 tb = ''.join(traceback.format_list(value)) 1854 value = 'Object created at (most recent call last):\n' 1855 value += tb.rstrip() 1856 elif key == 'handle_traceback': 1857 tb = ''.join(traceback.format_list(value)) 1858 value = 'Handle created at (most recent call last):\n' 1859 value += tb.rstrip() 1860 else: 1861 value = repr(value) 1862 log_lines.append(f'{key}: {value}') 1863 1864 logger.error('\n'.join(log_lines), exc_info=exc_info) 1865 1866 def call_exception_handler(self, context): 1867 """Call the current event loop's exception handler. 1868 1869 The context argument is a dict containing the following keys: 1870 1871 - 'message': Error message; 1872 - 'exception' (optional): Exception object; 1873 - 'future' (optional): Future instance; 1874 - 'task' (optional): Task instance; 1875 - 'handle' (optional): Handle instance; 1876 - 'protocol' (optional): Protocol instance; 1877 - 'transport' (optional): Transport instance; 1878 - 'socket' (optional): Socket instance; 1879 - 'asyncgen' (optional): Asynchronous generator that caused 1880 the exception. 1881 1882 New keys maybe introduced in the future. 1883 1884 Note: do not overload this method in an event loop subclass. 1885 For custom exception handling, use the 1886 `set_exception_handler()` method. 1887 """ 1888 if self._exception_handler is None: 1889 try: 1890 self.default_exception_handler(context) 1891 except (SystemExit, KeyboardInterrupt): 1892 raise 1893 except BaseException: 1894 # Second protection layer for unexpected errors 1895 # in the default implementation, as well as for subclassed 1896 # event loops with overloaded "default_exception_handler". 1897 logger.error('Exception in default exception handler', 1898 exc_info=True) 1899 else: 1900 try: 1901 ctx = None 1902 thing = context.get("task") 1903 if thing is None: 1904 # Even though Futures don't have a context, 1905 # Task is a subclass of Future, 1906 # and sometimes the 'future' key holds a Task. 1907 thing = context.get("future") 1908 if thing is None: 1909 # Handles also have a context. 1910 thing = context.get("handle") 1911 if thing is not None and hasattr(thing, "get_context"): 1912 ctx = thing.get_context() 1913 if ctx is not None and hasattr(ctx, "run"): 1914 ctx.run(self._exception_handler, self, context) 1915 else: 1916 self._exception_handler(self, context) 1917 except (SystemExit, KeyboardInterrupt): 1918 raise 1919 except BaseException as exc: 1920 # Exception in the user set custom exception handler. 1921 try: 1922 # Let's try default handler. 1923 self.default_exception_handler({ 1924 'message': 'Unhandled error in exception handler', 1925 'exception': exc, 1926 'context': context, 1927 }) 1928 except (SystemExit, KeyboardInterrupt): 1929 raise 1930 except BaseException: 1931 # Guard 'default_exception_handler' in case it is 1932 # overloaded. 1933 logger.error('Exception in default exception handler ' 1934 'while handling an unexpected error ' 1935 'in custom exception handler', 1936 exc_info=True) 1937 1938 def _add_callback(self, handle): 1939 """Add a Handle to _ready.""" 1940 if not handle._cancelled: 1941 self._ready.append(handle) 1942 1943 def _add_callback_signalsafe(self, handle): 1944 """Like _add_callback() but called from a signal handler.""" 1945 self._add_callback(handle) 1946 self._write_to_self() 1947 1948 def _timer_handle_cancelled(self, handle): 1949 """Notification that a TimerHandle has been cancelled.""" 1950 if handle._scheduled: 1951 self._timer_cancelled_count += 1 1952 1953 def _run_once(self): 1954 """Run one full iteration of the event loop. 1955 1956 This calls all currently ready callbacks, polls for I/O, 1957 schedules the resulting callbacks, and finally schedules 1958 'call_later' callbacks. 1959 """ 1960 1961 sched_count = len(self._scheduled) 1962 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1963 self._timer_cancelled_count / sched_count > 1964 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1965 # Remove delayed calls that were cancelled if their number 1966 # is too high 1967 new_scheduled = [] 1968 for handle in self._scheduled: 1969 if handle._cancelled: 1970 handle._scheduled = False 1971 else: 1972 new_scheduled.append(handle) 1973 1974 heapq.heapify(new_scheduled) 1975 self._scheduled = new_scheduled 1976 self._timer_cancelled_count = 0 1977 else: 1978 # Remove delayed calls that were cancelled from head of queue. 1979 while self._scheduled and self._scheduled[0]._cancelled: 1980 self._timer_cancelled_count -= 1 1981 handle = heapq.heappop(self._scheduled) 1982 handle._scheduled = False 1983 1984 timeout = None 1985 if self._ready or self._stopping: 1986 timeout = 0 1987 elif self._scheduled: 1988 # Compute the desired timeout. 1989 timeout = self._scheduled[0]._when - self.time() 1990 if timeout > MAXIMUM_SELECT_TIMEOUT: 1991 timeout = MAXIMUM_SELECT_TIMEOUT 1992 elif timeout < 0: 1993 timeout = 0 1994 1995 event_list = self._selector.select(timeout) 1996 self._process_events(event_list) 1997 # Needed to break cycles when an exception occurs. 1998 event_list = None 1999 2000 # Handle 'later' callbacks that are ready. 2001 end_time = self.time() + self._clock_resolution 2002 while self._scheduled: 2003 handle = self._scheduled[0] 2004 if handle._when >= end_time: 2005 break 2006 handle = heapq.heappop(self._scheduled) 2007 handle._scheduled = False 2008 self._ready.append(handle) 2009 2010 # This is the only place where callbacks are actually *called*. 2011 # All other places just add them to ready. 2012 # Note: We run all currently scheduled callbacks, but not any 2013 # callbacks scheduled by callbacks run this time around -- 2014 # they will be run the next time (after another I/O poll). 2015 # Use an idiom that is thread-safe without using locks. 2016 ntodo = len(self._ready) 2017 for i in range(ntodo): 2018 handle = self._ready.popleft() 2019 if handle._cancelled: 2020 continue 2021 if self._debug: 2022 try: 2023 self._current_handle = handle 2024 t0 = self.time() 2025 handle._run() 2026 dt = self.time() - t0 2027 if dt >= self.slow_callback_duration: 2028 logger.warning('Executing %s took %.3f seconds', 2029 _format_handle(handle), dt) 2030 finally: 2031 self._current_handle = None 2032 else: 2033 handle._run() 2034 handle = None # Needed to break cycles when an exception occurs. 2035 2036 def _set_coroutine_origin_tracking(self, enabled): 2037 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 2038 return 2039 2040 if enabled: 2041 self._coroutine_origin_tracking_saved_depth = ( 2042 sys.get_coroutine_origin_tracking_depth()) 2043 sys.set_coroutine_origin_tracking_depth( 2044 constants.DEBUG_STACK_DEPTH) 2045 else: 2046 sys.set_coroutine_origin_tracking_depth( 2047 self._coroutine_origin_tracking_saved_depth) 2048 2049 self._coroutine_origin_tracking_enabled = enabled 2050 2051 def get_debug(self): 2052 return self._debug 2053 2054 def set_debug(self, enabled): 2055 self._debug = enabled 2056 2057 if self.is_running(): 2058 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 2059