1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import functools 20import heapq 21import itertools 22import os 23import socket 24import stat 25import subprocess 26import threading 27import time 28import traceback 29import sys 30import warnings 31import weakref 32 33try: 34 import ssl 35except ImportError: # pragma: no cover 36 ssl = None 37 38from . import constants 39from . import coroutines 40from . import events 41from . import exceptions 42from . import futures 43from . import protocols 44from . import sslproto 45from . import staggered 46from . import tasks 47from . import transports 48from . import trsock 49from .log import logger 50 51 52__all__ = 'BaseEventLoop', 53 54 55# Minimum number of _scheduled timer handles before cleanup of 56# cancelled handles is performed. 57_MIN_SCHEDULED_TIMER_HANDLES = 100 58 59# Minimum fraction of _scheduled timer handles that are cancelled 60# before cleanup of cancelled handles is performed. 61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 62 63 64_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65 66# Maximum timeout passed to select to avoid OS limitations 67MAXIMUM_SELECT_TIMEOUT = 24 * 3600 68 69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s 70# *reuse_address* parameter 71_unset = object() 72 73 74def _format_handle(handle): 75 cb = handle._callback 76 if isinstance(getattr(cb, '__self__', None), tasks.Task): 77 # format the task 78 return repr(cb.__self__) 79 else: 80 return str(handle) 81 82 83def _format_pipe(fd): 84 if fd == subprocess.PIPE: 85 return '<pipe>' 86 elif fd == subprocess.STDOUT: 87 return '<stdout>' 88 else: 89 return repr(fd) 90 91 92def _set_reuseport(sock): 93 if not hasattr(socket, 'SO_REUSEPORT'): 94 raise ValueError('reuse_port not supported by socket module') 95 else: 96 try: 97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 98 except OSError: 99 raise ValueError('reuse_port not supported by socket module, ' 100 'SO_REUSEPORT defined but not implemented.') 101 102 103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 104 # Try to skip getaddrinfo if "host" is already an IP. Users might have 105 # handled name resolution in their own code and pass in resolved IPs. 106 if not hasattr(socket, 'inet_pton'): 107 return 108 109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 110 host is None: 111 return None 112 113 if type == socket.SOCK_STREAM: 114 proto = socket.IPPROTO_TCP 115 elif type == socket.SOCK_DGRAM: 116 proto = socket.IPPROTO_UDP 117 else: 118 return None 119 120 if port is None: 121 port = 0 122 elif isinstance(port, bytes) and port == b'': 123 port = 0 124 elif isinstance(port, str) and port == '': 125 port = 0 126 else: 127 # If port's a service name like "http", don't skip getaddrinfo. 128 try: 129 port = int(port) 130 except (TypeError, ValueError): 131 return None 132 133 if family == socket.AF_UNSPEC: 134 afs = [socket.AF_INET] 135 if _HAS_IPv6: 136 afs.append(socket.AF_INET6) 137 else: 138 afs = [family] 139 140 if isinstance(host, bytes): 141 host = host.decode('idna') 142 if '%' in host: 143 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 144 # like '::1%lo0'. 145 return None 146 147 for af in afs: 148 try: 149 socket.inet_pton(af, host) 150 # The host has already been resolved. 151 if _HAS_IPv6 and af == socket.AF_INET6: 152 return af, type, proto, '', (host, port, flowinfo, scopeid) 153 else: 154 return af, type, proto, '', (host, port) 155 except OSError: 156 pass 157 158 # "host" is not an IP address. 159 return None 160 161 162def _interleave_addrinfos(addrinfos, first_address_family_count=1): 163 """Interleave list of addrinfo tuples by family.""" 164 # Group addresses by family 165 addrinfos_by_family = collections.OrderedDict() 166 for addr in addrinfos: 167 family = addr[0] 168 if family not in addrinfos_by_family: 169 addrinfos_by_family[family] = [] 170 addrinfos_by_family[family].append(addr) 171 addrinfos_lists = list(addrinfos_by_family.values()) 172 173 reordered = [] 174 if first_address_family_count > 1: 175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 176 del addrinfos_lists[0][:first_address_family_count - 1] 177 reordered.extend( 178 a for a in itertools.chain.from_iterable( 179 itertools.zip_longest(*addrinfos_lists) 180 ) if a is not None) 181 return reordered 182 183 184def _run_until_complete_cb(fut): 185 if not fut.cancelled(): 186 exc = fut.exception() 187 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 188 # Issue #22429: run_forever() already finished, no need to 189 # stop it. 190 return 191 futures._get_loop(fut).stop() 192 193 194if hasattr(socket, 'TCP_NODELAY'): 195 def _set_nodelay(sock): 196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 197 sock.type == socket.SOCK_STREAM and 198 sock.proto == socket.IPPROTO_TCP): 199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 200else: 201 def _set_nodelay(sock): 202 pass 203 204 205class _SendfileFallbackProtocol(protocols.Protocol): 206 def __init__(self, transp): 207 if not isinstance(transp, transports._FlowControlMixin): 208 raise TypeError("transport should be _FlowControlMixin instance") 209 self._transport = transp 210 self._proto = transp.get_protocol() 211 self._should_resume_reading = transp.is_reading() 212 self._should_resume_writing = transp._protocol_paused 213 transp.pause_reading() 214 transp.set_protocol(self) 215 if self._should_resume_writing: 216 self._write_ready_fut = self._transport._loop.create_future() 217 else: 218 self._write_ready_fut = None 219 220 async def drain(self): 221 if self._transport.is_closing(): 222 raise ConnectionError("Connection closed by peer") 223 fut = self._write_ready_fut 224 if fut is None: 225 return 226 await fut 227 228 def connection_made(self, transport): 229 raise RuntimeError("Invalid state: " 230 "connection should have been established already.") 231 232 def connection_lost(self, exc): 233 if self._write_ready_fut is not None: 234 # Never happens if peer disconnects after sending the whole content 235 # Thus disconnection is always an exception from user perspective 236 if exc is None: 237 self._write_ready_fut.set_exception( 238 ConnectionError("Connection is closed by peer")) 239 else: 240 self._write_ready_fut.set_exception(exc) 241 self._proto.connection_lost(exc) 242 243 def pause_writing(self): 244 if self._write_ready_fut is not None: 245 return 246 self._write_ready_fut = self._transport._loop.create_future() 247 248 def resume_writing(self): 249 if self._write_ready_fut is None: 250 return 251 self._write_ready_fut.set_result(False) 252 self._write_ready_fut = None 253 254 def data_received(self, data): 255 raise RuntimeError("Invalid state: reading should be paused") 256 257 def eof_received(self): 258 raise RuntimeError("Invalid state: reading should be paused") 259 260 async def restore(self): 261 self._transport.set_protocol(self._proto) 262 if self._should_resume_reading: 263 self._transport.resume_reading() 264 if self._write_ready_fut is not None: 265 # Cancel the future. 266 # Basically it has no effect because protocol is switched back, 267 # no code should wait for it anymore. 268 self._write_ready_fut.cancel() 269 if self._should_resume_writing: 270 self._proto.resume_writing() 271 272 273class Server(events.AbstractServer): 274 275 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 276 ssl_handshake_timeout): 277 self._loop = loop 278 self._sockets = sockets 279 self._active_count = 0 280 self._waiters = [] 281 self._protocol_factory = protocol_factory 282 self._backlog = backlog 283 self._ssl_context = ssl_context 284 self._ssl_handshake_timeout = ssl_handshake_timeout 285 self._serving = False 286 self._serving_forever_fut = None 287 288 def __repr__(self): 289 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 290 291 def _attach(self): 292 assert self._sockets is not None 293 self._active_count += 1 294 295 def _detach(self): 296 assert self._active_count > 0 297 self._active_count -= 1 298 if self._active_count == 0 and self._sockets is None: 299 self._wakeup() 300 301 def _wakeup(self): 302 waiters = self._waiters 303 self._waiters = None 304 for waiter in waiters: 305 if not waiter.done(): 306 waiter.set_result(waiter) 307 308 def _start_serving(self): 309 if self._serving: 310 return 311 self._serving = True 312 for sock in self._sockets: 313 sock.listen(self._backlog) 314 self._loop._start_serving( 315 self._protocol_factory, sock, self._ssl_context, 316 self, self._backlog, self._ssl_handshake_timeout) 317 318 def get_loop(self): 319 return self._loop 320 321 def is_serving(self): 322 return self._serving 323 324 @property 325 def sockets(self): 326 if self._sockets is None: 327 return () 328 return tuple(trsock.TransportSocket(s) for s in self._sockets) 329 330 def close(self): 331 sockets = self._sockets 332 if sockets is None: 333 return 334 self._sockets = None 335 336 for sock in sockets: 337 self._loop._stop_serving(sock) 338 339 self._serving = False 340 341 if (self._serving_forever_fut is not None and 342 not self._serving_forever_fut.done()): 343 self._serving_forever_fut.cancel() 344 self._serving_forever_fut = None 345 346 if self._active_count == 0: 347 self._wakeup() 348 349 async def start_serving(self): 350 self._start_serving() 351 # Skip one loop iteration so that all 'loop.add_reader' 352 # go through. 353 await tasks.sleep(0, loop=self._loop) 354 355 async def serve_forever(self): 356 if self._serving_forever_fut is not None: 357 raise RuntimeError( 358 f'server {self!r} is already being awaited on serve_forever()') 359 if self._sockets is None: 360 raise RuntimeError(f'server {self!r} is closed') 361 362 self._start_serving() 363 self._serving_forever_fut = self._loop.create_future() 364 365 try: 366 await self._serving_forever_fut 367 except exceptions.CancelledError: 368 try: 369 self.close() 370 await self.wait_closed() 371 finally: 372 raise 373 finally: 374 self._serving_forever_fut = None 375 376 async def wait_closed(self): 377 if self._sockets is None or self._waiters is None: 378 return 379 waiter = self._loop.create_future() 380 self._waiters.append(waiter) 381 await waiter 382 383 384class BaseEventLoop(events.AbstractEventLoop): 385 386 def __init__(self): 387 self._timer_cancelled_count = 0 388 self._closed = False 389 self._stopping = False 390 self._ready = collections.deque() 391 self._scheduled = [] 392 self._default_executor = None 393 self._internal_fds = 0 394 # Identifier of the thread running the event loop, or None if the 395 # event loop is not running 396 self._thread_id = None 397 self._clock_resolution = time.get_clock_info('monotonic').resolution 398 self._exception_handler = None 399 self.set_debug(coroutines._is_debug_mode()) 400 # In debug mode, if the execution of a callback or a step of a task 401 # exceed this duration in seconds, the slow callback/task is logged. 402 self.slow_callback_duration = 0.1 403 self._current_handle = None 404 self._task_factory = None 405 self._coroutine_origin_tracking_enabled = False 406 self._coroutine_origin_tracking_saved_depth = None 407 408 # A weak set of all asynchronous generators that are 409 # being iterated by the loop. 410 self._asyncgens = weakref.WeakSet() 411 # Set to True when `loop.shutdown_asyncgens` is called. 412 self._asyncgens_shutdown_called = False 413 414 def __repr__(self): 415 return ( 416 f'<{self.__class__.__name__} running={self.is_running()} ' 417 f'closed={self.is_closed()} debug={self.get_debug()}>' 418 ) 419 420 def create_future(self): 421 """Create a Future object attached to the loop.""" 422 return futures.Future(loop=self) 423 424 def create_task(self, coro, *, name=None): 425 """Schedule a coroutine object. 426 427 Return a task object. 428 """ 429 self._check_closed() 430 if self._task_factory is None: 431 task = tasks.Task(coro, loop=self, name=name) 432 if task._source_traceback: 433 del task._source_traceback[-1] 434 else: 435 task = self._task_factory(self, coro) 436 tasks._set_task_name(task, name) 437 438 return task 439 440 def set_task_factory(self, factory): 441 """Set a task factory that will be used by loop.create_task(). 442 443 If factory is None the default task factory will be set. 444 445 If factory is a callable, it should have a signature matching 446 '(loop, coro)', where 'loop' will be a reference to the active 447 event loop, 'coro' will be a coroutine object. The callable 448 must return a Future. 449 """ 450 if factory is not None and not callable(factory): 451 raise TypeError('task factory must be a callable or None') 452 self._task_factory = factory 453 454 def get_task_factory(self): 455 """Return a task factory, or None if the default one is in use.""" 456 return self._task_factory 457 458 def _make_socket_transport(self, sock, protocol, waiter=None, *, 459 extra=None, server=None): 460 """Create socket transport.""" 461 raise NotImplementedError 462 463 def _make_ssl_transport( 464 self, rawsock, protocol, sslcontext, waiter=None, 465 *, server_side=False, server_hostname=None, 466 extra=None, server=None, 467 ssl_handshake_timeout=None, 468 call_connection_made=True): 469 """Create SSL transport.""" 470 raise NotImplementedError 471 472 def _make_datagram_transport(self, sock, protocol, 473 address=None, waiter=None, extra=None): 474 """Create datagram transport.""" 475 raise NotImplementedError 476 477 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 478 extra=None): 479 """Create read pipe transport.""" 480 raise NotImplementedError 481 482 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 483 extra=None): 484 """Create write pipe transport.""" 485 raise NotImplementedError 486 487 async def _make_subprocess_transport(self, protocol, args, shell, 488 stdin, stdout, stderr, bufsize, 489 extra=None, **kwargs): 490 """Create subprocess transport.""" 491 raise NotImplementedError 492 493 def _write_to_self(self): 494 """Write a byte to self-pipe, to wake up the event loop. 495 496 This may be called from a different thread. 497 498 The subclass is responsible for implementing the self-pipe. 499 """ 500 raise NotImplementedError 501 502 def _process_events(self, event_list): 503 """Process selector events.""" 504 raise NotImplementedError 505 506 def _check_closed(self): 507 if self._closed: 508 raise RuntimeError('Event loop is closed') 509 510 def _asyncgen_finalizer_hook(self, agen): 511 self._asyncgens.discard(agen) 512 if not self.is_closed(): 513 self.call_soon_threadsafe(self.create_task, agen.aclose()) 514 515 def _asyncgen_firstiter_hook(self, agen): 516 if self._asyncgens_shutdown_called: 517 warnings.warn( 518 f"asynchronous generator {agen!r} was scheduled after " 519 f"loop.shutdown_asyncgens() call", 520 ResourceWarning, source=self) 521 522 self._asyncgens.add(agen) 523 524 async def shutdown_asyncgens(self): 525 """Shutdown all active asynchronous generators.""" 526 self._asyncgens_shutdown_called = True 527 528 if not len(self._asyncgens): 529 # If Python version is <3.6 or we don't have any asynchronous 530 # generators alive. 531 return 532 533 closing_agens = list(self._asyncgens) 534 self._asyncgens.clear() 535 536 results = await tasks.gather( 537 *[ag.aclose() for ag in closing_agens], 538 return_exceptions=True, 539 loop=self) 540 541 for result, agen in zip(results, closing_agens): 542 if isinstance(result, Exception): 543 self.call_exception_handler({ 544 'message': f'an error occurred during closing of ' 545 f'asynchronous generator {agen!r}', 546 'exception': result, 547 'asyncgen': agen 548 }) 549 550 def run_forever(self): 551 """Run until stop() is called.""" 552 self._check_closed() 553 if self.is_running(): 554 raise RuntimeError('This event loop is already running') 555 if events._get_running_loop() is not None: 556 raise RuntimeError( 557 'Cannot run the event loop while another loop is running') 558 self._set_coroutine_origin_tracking(self._debug) 559 self._thread_id = threading.get_ident() 560 561 old_agen_hooks = sys.get_asyncgen_hooks() 562 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 563 finalizer=self._asyncgen_finalizer_hook) 564 try: 565 events._set_running_loop(self) 566 while True: 567 self._run_once() 568 if self._stopping: 569 break 570 finally: 571 self._stopping = False 572 self._thread_id = None 573 events._set_running_loop(None) 574 self._set_coroutine_origin_tracking(False) 575 sys.set_asyncgen_hooks(*old_agen_hooks) 576 577 def run_until_complete(self, future): 578 """Run until the Future is done. 579 580 If the argument is a coroutine, it is wrapped in a Task. 581 582 WARNING: It would be disastrous to call run_until_complete() 583 with the same coroutine twice -- it would wrap it in two 584 different Tasks and that can't be good. 585 586 Return the Future's result, or raise its exception. 587 """ 588 self._check_closed() 589 590 new_task = not futures.isfuture(future) 591 future = tasks.ensure_future(future, loop=self) 592 if new_task: 593 # An exception is raised if the future didn't complete, so there 594 # is no need to log the "destroy pending task" message 595 future._log_destroy_pending = False 596 597 future.add_done_callback(_run_until_complete_cb) 598 try: 599 self.run_forever() 600 except: 601 if new_task and future.done() and not future.cancelled(): 602 # The coroutine raised a BaseException. Consume the exception 603 # to not log a warning, the caller doesn't have access to the 604 # local task. 605 future.exception() 606 raise 607 finally: 608 future.remove_done_callback(_run_until_complete_cb) 609 if not future.done(): 610 raise RuntimeError('Event loop stopped before Future completed.') 611 612 return future.result() 613 614 def stop(self): 615 """Stop running the event loop. 616 617 Every callback already scheduled will still run. This simply informs 618 run_forever to stop looping after a complete iteration. 619 """ 620 self._stopping = True 621 622 def close(self): 623 """Close the event loop. 624 625 This clears the queues and shuts down the executor, 626 but does not wait for the executor to finish. 627 628 The event loop must not be running. 629 """ 630 if self.is_running(): 631 raise RuntimeError("Cannot close a running event loop") 632 if self._closed: 633 return 634 if self._debug: 635 logger.debug("Close %r", self) 636 self._closed = True 637 self._ready.clear() 638 self._scheduled.clear() 639 executor = self._default_executor 640 if executor is not None: 641 self._default_executor = None 642 executor.shutdown(wait=False) 643 644 def is_closed(self): 645 """Returns True if the event loop was closed.""" 646 return self._closed 647 648 def __del__(self, _warn=warnings.warn): 649 if not self.is_closed(): 650 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 651 if not self.is_running(): 652 self.close() 653 654 def is_running(self): 655 """Returns True if the event loop is running.""" 656 return (self._thread_id is not None) 657 658 def time(self): 659 """Return the time according to the event loop's clock. 660 661 This is a float expressed in seconds since an epoch, but the 662 epoch, precision, accuracy and drift are unspecified and may 663 differ per event loop. 664 """ 665 return time.monotonic() 666 667 def call_later(self, delay, callback, *args, context=None): 668 """Arrange for a callback to be called at a given time. 669 670 Return a Handle: an opaque object with a cancel() method that 671 can be used to cancel the call. 672 673 The delay can be an int or float, expressed in seconds. It is 674 always relative to the current time. 675 676 Each callback will be called exactly once. If two callbacks 677 are scheduled for exactly the same time, it undefined which 678 will be called first. 679 680 Any positional arguments after the callback will be passed to 681 the callback when it is called. 682 """ 683 timer = self.call_at(self.time() + delay, callback, *args, 684 context=context) 685 if timer._source_traceback: 686 del timer._source_traceback[-1] 687 return timer 688 689 def call_at(self, when, callback, *args, context=None): 690 """Like call_later(), but uses an absolute time. 691 692 Absolute time corresponds to the event loop's time() method. 693 """ 694 self._check_closed() 695 if self._debug: 696 self._check_thread() 697 self._check_callback(callback, 'call_at') 698 timer = events.TimerHandle(when, callback, args, self, context) 699 if timer._source_traceback: 700 del timer._source_traceback[-1] 701 heapq.heappush(self._scheduled, timer) 702 timer._scheduled = True 703 return timer 704 705 def call_soon(self, callback, *args, context=None): 706 """Arrange for a callback to be called as soon as possible. 707 708 This operates as a FIFO queue: callbacks are called in the 709 order in which they are registered. Each callback will be 710 called exactly once. 711 712 Any positional arguments after the callback will be passed to 713 the callback when it is called. 714 """ 715 self._check_closed() 716 if self._debug: 717 self._check_thread() 718 self._check_callback(callback, 'call_soon') 719 handle = self._call_soon(callback, args, context) 720 if handle._source_traceback: 721 del handle._source_traceback[-1] 722 return handle 723 724 def _check_callback(self, callback, method): 725 if (coroutines.iscoroutine(callback) or 726 coroutines.iscoroutinefunction(callback)): 727 raise TypeError( 728 f"coroutines cannot be used with {method}()") 729 if not callable(callback): 730 raise TypeError( 731 f'a callable object was expected by {method}(), ' 732 f'got {callback!r}') 733 734 def _call_soon(self, callback, args, context): 735 handle = events.Handle(callback, args, self, context) 736 if handle._source_traceback: 737 del handle._source_traceback[-1] 738 self._ready.append(handle) 739 return handle 740 741 def _check_thread(self): 742 """Check that the current thread is the thread running the event loop. 743 744 Non-thread-safe methods of this class make this assumption and will 745 likely behave incorrectly when the assumption is violated. 746 747 Should only be called when (self._debug == True). The caller is 748 responsible for checking this condition for performance reasons. 749 """ 750 if self._thread_id is None: 751 return 752 thread_id = threading.get_ident() 753 if thread_id != self._thread_id: 754 raise RuntimeError( 755 "Non-thread-safe operation invoked on an event loop other " 756 "than the current one") 757 758 def call_soon_threadsafe(self, callback, *args, context=None): 759 """Like call_soon(), but thread-safe.""" 760 self._check_closed() 761 if self._debug: 762 self._check_callback(callback, 'call_soon_threadsafe') 763 handle = self._call_soon(callback, args, context) 764 if handle._source_traceback: 765 del handle._source_traceback[-1] 766 self._write_to_self() 767 return handle 768 769 def run_in_executor(self, executor, func, *args): 770 self._check_closed() 771 if self._debug: 772 self._check_callback(func, 'run_in_executor') 773 if executor is None: 774 executor = self._default_executor 775 if executor is None: 776 executor = concurrent.futures.ThreadPoolExecutor() 777 self._default_executor = executor 778 return futures.wrap_future( 779 executor.submit(func, *args), loop=self) 780 781 def set_default_executor(self, executor): 782 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 783 warnings.warn( 784 'Using the default executor that is not an instance of ' 785 'ThreadPoolExecutor is deprecated and will be prohibited ' 786 'in Python 3.9', 787 DeprecationWarning, 2) 788 self._default_executor = executor 789 790 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 791 msg = [f"{host}:{port!r}"] 792 if family: 793 msg.append(f'family={family!r}') 794 if type: 795 msg.append(f'type={type!r}') 796 if proto: 797 msg.append(f'proto={proto!r}') 798 if flags: 799 msg.append(f'flags={flags!r}') 800 msg = ', '.join(msg) 801 logger.debug('Get address info %s', msg) 802 803 t0 = self.time() 804 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 805 dt = self.time() - t0 806 807 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 808 if dt >= self.slow_callback_duration: 809 logger.info(msg) 810 else: 811 logger.debug(msg) 812 return addrinfo 813 814 async def getaddrinfo(self, host, port, *, 815 family=0, type=0, proto=0, flags=0): 816 if self._debug: 817 getaddr_func = self._getaddrinfo_debug 818 else: 819 getaddr_func = socket.getaddrinfo 820 821 return await self.run_in_executor( 822 None, getaddr_func, host, port, family, type, proto, flags) 823 824 async def getnameinfo(self, sockaddr, flags=0): 825 return await self.run_in_executor( 826 None, socket.getnameinfo, sockaddr, flags) 827 828 async def sock_sendfile(self, sock, file, offset=0, count=None, 829 *, fallback=True): 830 if self._debug and sock.gettimeout() != 0: 831 raise ValueError("the socket must be non-blocking") 832 self._check_sendfile_params(sock, file, offset, count) 833 try: 834 return await self._sock_sendfile_native(sock, file, 835 offset, count) 836 except exceptions.SendfileNotAvailableError as exc: 837 if not fallback: 838 raise 839 return await self._sock_sendfile_fallback(sock, file, 840 offset, count) 841 842 async def _sock_sendfile_native(self, sock, file, offset, count): 843 # NB: sendfile syscall is not supported for SSL sockets and 844 # non-mmap files even if sendfile is supported by OS 845 raise exceptions.SendfileNotAvailableError( 846 f"syscall sendfile is not available for socket {sock!r} " 847 "and file {file!r} combination") 848 849 async def _sock_sendfile_fallback(self, sock, file, offset, count): 850 if offset: 851 file.seek(offset) 852 blocksize = ( 853 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 854 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 855 ) 856 buf = bytearray(blocksize) 857 total_sent = 0 858 try: 859 while True: 860 if count: 861 blocksize = min(count - total_sent, blocksize) 862 if blocksize <= 0: 863 break 864 view = memoryview(buf)[:blocksize] 865 read = await self.run_in_executor(None, file.readinto, view) 866 if not read: 867 break # EOF 868 await self.sock_sendall(sock, view[:read]) 869 total_sent += read 870 return total_sent 871 finally: 872 if total_sent > 0 and hasattr(file, 'seek'): 873 file.seek(offset + total_sent) 874 875 def _check_sendfile_params(self, sock, file, offset, count): 876 if 'b' not in getattr(file, 'mode', 'b'): 877 raise ValueError("file should be opened in binary mode") 878 if not sock.type == socket.SOCK_STREAM: 879 raise ValueError("only SOCK_STREAM type sockets are supported") 880 if count is not None: 881 if not isinstance(count, int): 882 raise TypeError( 883 "count must be a positive integer (got {!r})".format(count)) 884 if count <= 0: 885 raise ValueError( 886 "count must be a positive integer (got {!r})".format(count)) 887 if not isinstance(offset, int): 888 raise TypeError( 889 "offset must be a non-negative integer (got {!r})".format( 890 offset)) 891 if offset < 0: 892 raise ValueError( 893 "offset must be a non-negative integer (got {!r})".format( 894 offset)) 895 896 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 897 """Create, bind and connect one socket.""" 898 my_exceptions = [] 899 exceptions.append(my_exceptions) 900 family, type_, proto, _, address = addr_info 901 sock = None 902 try: 903 sock = socket.socket(family=family, type=type_, proto=proto) 904 sock.setblocking(False) 905 if local_addr_infos is not None: 906 for _, _, _, _, laddr in local_addr_infos: 907 try: 908 sock.bind(laddr) 909 break 910 except OSError as exc: 911 msg = ( 912 f'error while attempting to bind on ' 913 f'address {laddr!r}: ' 914 f'{exc.strerror.lower()}' 915 ) 916 exc = OSError(exc.errno, msg) 917 my_exceptions.append(exc) 918 else: # all bind attempts failed 919 raise my_exceptions.pop() 920 await self.sock_connect(sock, address) 921 return sock 922 except OSError as exc: 923 my_exceptions.append(exc) 924 if sock is not None: 925 sock.close() 926 raise 927 except: 928 if sock is not None: 929 sock.close() 930 raise 931 932 async def create_connection( 933 self, protocol_factory, host=None, port=None, 934 *, ssl=None, family=0, 935 proto=0, flags=0, sock=None, 936 local_addr=None, server_hostname=None, 937 ssl_handshake_timeout=None, 938 happy_eyeballs_delay=None, interleave=None): 939 """Connect to a TCP server. 940 941 Create a streaming transport connection to a given Internet host and 942 port: socket family AF_INET or socket.AF_INET6 depending on host (or 943 family if specified), socket type SOCK_STREAM. protocol_factory must be 944 a callable returning a protocol instance. 945 946 This method is a coroutine which will try to establish the connection 947 in the background. When successful, the coroutine returns a 948 (transport, protocol) pair. 949 """ 950 if server_hostname is not None and not ssl: 951 raise ValueError('server_hostname is only meaningful with ssl') 952 953 if server_hostname is None and ssl: 954 # Use host as default for server_hostname. It is an error 955 # if host is empty or not set, e.g. when an 956 # already-connected socket was passed or when only a port 957 # is given. To avoid this error, you can pass 958 # server_hostname='' -- this will bypass the hostname 959 # check. (This also means that if host is a numeric 960 # IP/IPv6 address, we will attempt to verify that exact 961 # address; this will probably fail, but it is possible to 962 # create a certificate for a specific IP address, so we 963 # don't judge it here.) 964 if not host: 965 raise ValueError('You must set server_hostname ' 966 'when using ssl without a host') 967 server_hostname = host 968 969 if ssl_handshake_timeout is not None and not ssl: 970 raise ValueError( 971 'ssl_handshake_timeout is only meaningful with ssl') 972 973 if happy_eyeballs_delay is not None and interleave is None: 974 # If using happy eyeballs, default to interleave addresses by family 975 interleave = 1 976 977 if host is not None or port is not None: 978 if sock is not None: 979 raise ValueError( 980 'host/port and sock can not be specified at the same time') 981 982 infos = await self._ensure_resolved( 983 (host, port), family=family, 984 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 985 if not infos: 986 raise OSError('getaddrinfo() returned empty list') 987 988 if local_addr is not None: 989 laddr_infos = await self._ensure_resolved( 990 local_addr, family=family, 991 type=socket.SOCK_STREAM, proto=proto, 992 flags=flags, loop=self) 993 if not laddr_infos: 994 raise OSError('getaddrinfo() returned empty list') 995 else: 996 laddr_infos = None 997 998 if interleave: 999 infos = _interleave_addrinfos(infos, interleave) 1000 1001 exceptions = [] 1002 if happy_eyeballs_delay is None: 1003 # not using happy eyeballs 1004 for addrinfo in infos: 1005 try: 1006 sock = await self._connect_sock( 1007 exceptions, addrinfo, laddr_infos) 1008 break 1009 except OSError: 1010 continue 1011 else: # using happy eyeballs 1012 sock, _, _ = await staggered.staggered_race( 1013 (functools.partial(self._connect_sock, 1014 exceptions, addrinfo, laddr_infos) 1015 for addrinfo in infos), 1016 happy_eyeballs_delay, loop=self) 1017 1018 if sock is None: 1019 exceptions = [exc for sub in exceptions for exc in sub] 1020 if len(exceptions) == 1: 1021 raise exceptions[0] 1022 else: 1023 # If they all have the same str(), raise one. 1024 model = str(exceptions[0]) 1025 if all(str(exc) == model for exc in exceptions): 1026 raise exceptions[0] 1027 # Raise a combined exception so the user can see all 1028 # the various error messages. 1029 raise OSError('Multiple exceptions: {}'.format( 1030 ', '.join(str(exc) for exc in exceptions))) 1031 1032 else: 1033 if sock is None: 1034 raise ValueError( 1035 'host and port was not specified and no sock specified') 1036 if sock.type != socket.SOCK_STREAM: 1037 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1038 # are SOCK_STREAM. 1039 # We support passing AF_UNIX sockets even though we have 1040 # a dedicated API for that: create_unix_connection. 1041 # Disallowing AF_UNIX in this method, breaks backwards 1042 # compatibility. 1043 raise ValueError( 1044 f'A Stream Socket was expected, got {sock!r}') 1045 1046 transport, protocol = await self._create_connection_transport( 1047 sock, protocol_factory, ssl, server_hostname, 1048 ssl_handshake_timeout=ssl_handshake_timeout) 1049 if self._debug: 1050 # Get the socket from the transport because SSL transport closes 1051 # the old socket and creates a new SSL socket 1052 sock = transport.get_extra_info('socket') 1053 logger.debug("%r connected to %s:%r: (%r, %r)", 1054 sock, host, port, transport, protocol) 1055 return transport, protocol 1056 1057 async def _create_connection_transport( 1058 self, sock, protocol_factory, ssl, 1059 server_hostname, server_side=False, 1060 ssl_handshake_timeout=None): 1061 1062 sock.setblocking(False) 1063 1064 protocol = protocol_factory() 1065 waiter = self.create_future() 1066 if ssl: 1067 sslcontext = None if isinstance(ssl, bool) else ssl 1068 transport = self._make_ssl_transport( 1069 sock, protocol, sslcontext, waiter, 1070 server_side=server_side, server_hostname=server_hostname, 1071 ssl_handshake_timeout=ssl_handshake_timeout) 1072 else: 1073 transport = self._make_socket_transport(sock, protocol, waiter) 1074 1075 try: 1076 await waiter 1077 except: 1078 transport.close() 1079 raise 1080 1081 return transport, protocol 1082 1083 async def sendfile(self, transport, file, offset=0, count=None, 1084 *, fallback=True): 1085 """Send a file to transport. 1086 1087 Return the total number of bytes which were sent. 1088 1089 The method uses high-performance os.sendfile if available. 1090 1091 file must be a regular file object opened in binary mode. 1092 1093 offset tells from where to start reading the file. If specified, 1094 count is the total number of bytes to transmit as opposed to 1095 sending the file until EOF is reached. File position is updated on 1096 return or also in case of error in which case file.tell() 1097 can be used to figure out the number of bytes 1098 which were sent. 1099 1100 fallback set to True makes asyncio to manually read and send 1101 the file when the platform does not support the sendfile syscall 1102 (e.g. Windows or SSL socket on Unix). 1103 1104 Raise SendfileNotAvailableError if the system does not support 1105 sendfile syscall and fallback is False. 1106 """ 1107 if transport.is_closing(): 1108 raise RuntimeError("Transport is closing") 1109 mode = getattr(transport, '_sendfile_compatible', 1110 constants._SendfileMode.UNSUPPORTED) 1111 if mode is constants._SendfileMode.UNSUPPORTED: 1112 raise RuntimeError( 1113 f"sendfile is not supported for transport {transport!r}") 1114 if mode is constants._SendfileMode.TRY_NATIVE: 1115 try: 1116 return await self._sendfile_native(transport, file, 1117 offset, count) 1118 except exceptions.SendfileNotAvailableError as exc: 1119 if not fallback: 1120 raise 1121 1122 if not fallback: 1123 raise RuntimeError( 1124 f"fallback is disabled and native sendfile is not " 1125 f"supported for transport {transport!r}") 1126 1127 return await self._sendfile_fallback(transport, file, 1128 offset, count) 1129 1130 async def _sendfile_native(self, transp, file, offset, count): 1131 raise exceptions.SendfileNotAvailableError( 1132 "sendfile syscall is not supported") 1133 1134 async def _sendfile_fallback(self, transp, file, offset, count): 1135 if offset: 1136 file.seek(offset) 1137 blocksize = min(count, 16384) if count else 16384 1138 buf = bytearray(blocksize) 1139 total_sent = 0 1140 proto = _SendfileFallbackProtocol(transp) 1141 try: 1142 while True: 1143 if count: 1144 blocksize = min(count - total_sent, blocksize) 1145 if blocksize <= 0: 1146 return total_sent 1147 view = memoryview(buf)[:blocksize] 1148 read = await self.run_in_executor(None, file.readinto, view) 1149 if not read: 1150 return total_sent # EOF 1151 await proto.drain() 1152 transp.write(view[:read]) 1153 total_sent += read 1154 finally: 1155 if total_sent > 0 and hasattr(file, 'seek'): 1156 file.seek(offset + total_sent) 1157 await proto.restore() 1158 1159 async def start_tls(self, transport, protocol, sslcontext, *, 1160 server_side=False, 1161 server_hostname=None, 1162 ssl_handshake_timeout=None): 1163 """Upgrade transport to TLS. 1164 1165 Return a new transport that *protocol* should start using 1166 immediately. 1167 """ 1168 if ssl is None: 1169 raise RuntimeError('Python ssl module is not available') 1170 1171 if not isinstance(sslcontext, ssl.SSLContext): 1172 raise TypeError( 1173 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1174 f'got {sslcontext!r}') 1175 1176 if not getattr(transport, '_start_tls_compatible', False): 1177 raise TypeError( 1178 f'transport {transport!r} is not supported by start_tls()') 1179 1180 waiter = self.create_future() 1181 ssl_protocol = sslproto.SSLProtocol( 1182 self, protocol, sslcontext, waiter, 1183 server_side, server_hostname, 1184 ssl_handshake_timeout=ssl_handshake_timeout, 1185 call_connection_made=False) 1186 1187 # Pause early so that "ssl_protocol.data_received()" doesn't 1188 # have a chance to get called before "ssl_protocol.connection_made()". 1189 transport.pause_reading() 1190 1191 transport.set_protocol(ssl_protocol) 1192 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1193 resume_cb = self.call_soon(transport.resume_reading) 1194 1195 try: 1196 await waiter 1197 except BaseException: 1198 transport.close() 1199 conmade_cb.cancel() 1200 resume_cb.cancel() 1201 raise 1202 1203 return ssl_protocol._app_transport 1204 1205 async def create_datagram_endpoint(self, protocol_factory, 1206 local_addr=None, remote_addr=None, *, 1207 family=0, proto=0, flags=0, 1208 reuse_address=_unset, reuse_port=None, 1209 allow_broadcast=None, sock=None): 1210 """Create datagram connection.""" 1211 if sock is not None: 1212 if sock.type != socket.SOCK_DGRAM: 1213 raise ValueError( 1214 f'A UDP Socket was expected, got {sock!r}') 1215 if (local_addr or remote_addr or 1216 family or proto or flags or 1217 reuse_port or allow_broadcast): 1218 # show the problematic kwargs in exception msg 1219 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1220 family=family, proto=proto, flags=flags, 1221 reuse_address=reuse_address, reuse_port=reuse_port, 1222 allow_broadcast=allow_broadcast) 1223 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1224 raise ValueError( 1225 f'socket modifier keyword arguments can not be used ' 1226 f'when sock is specified. ({problems})') 1227 sock.setblocking(False) 1228 r_addr = None 1229 else: 1230 if not (local_addr or remote_addr): 1231 if family == 0: 1232 raise ValueError('unexpected address family') 1233 addr_pairs_info = (((family, proto), (None, None)),) 1234 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1235 for addr in (local_addr, remote_addr): 1236 if addr is not None and not isinstance(addr, str): 1237 raise TypeError('string is expected') 1238 1239 if local_addr and local_addr[0] not in (0, '\x00'): 1240 try: 1241 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1242 os.remove(local_addr) 1243 except FileNotFoundError: 1244 pass 1245 except OSError as err: 1246 # Directory may have permissions only to create socket. 1247 logger.error('Unable to check or remove stale UNIX ' 1248 'socket %r: %r', 1249 local_addr, err) 1250 1251 addr_pairs_info = (((family, proto), 1252 (local_addr, remote_addr)), ) 1253 else: 1254 # join address by (family, protocol) 1255 addr_infos = {} # Using order preserving dict 1256 for idx, addr in ((0, local_addr), (1, remote_addr)): 1257 if addr is not None: 1258 assert isinstance(addr, tuple) and len(addr) == 2, ( 1259 '2-tuple is expected') 1260 1261 infos = await self._ensure_resolved( 1262 addr, family=family, type=socket.SOCK_DGRAM, 1263 proto=proto, flags=flags, loop=self) 1264 if not infos: 1265 raise OSError('getaddrinfo() returned empty list') 1266 1267 for fam, _, pro, _, address in infos: 1268 key = (fam, pro) 1269 if key not in addr_infos: 1270 addr_infos[key] = [None, None] 1271 addr_infos[key][idx] = address 1272 1273 # each addr has to have info for each (family, proto) pair 1274 addr_pairs_info = [ 1275 (key, addr_pair) for key, addr_pair in addr_infos.items() 1276 if not ((local_addr and addr_pair[0] is None) or 1277 (remote_addr and addr_pair[1] is None))] 1278 1279 if not addr_pairs_info: 1280 raise ValueError('can not get address information') 1281 1282 exceptions = [] 1283 1284 # bpo-37228 1285 if reuse_address is not _unset: 1286 if reuse_address: 1287 raise ValueError("Passing `reuse_address=True` is no " 1288 "longer supported, as the usage of " 1289 "SO_REUSEPORT in UDP poses a significant " 1290 "security concern.") 1291 else: 1292 warnings.warn("The *reuse_address* parameter has been " 1293 "deprecated as of 3.5.10 and is scheduled " 1294 "for removal in 3.11.", DeprecationWarning, 1295 stacklevel=2) 1296 1297 for ((family, proto), 1298 (local_address, remote_address)) in addr_pairs_info: 1299 sock = None 1300 r_addr = None 1301 try: 1302 sock = socket.socket( 1303 family=family, type=socket.SOCK_DGRAM, proto=proto) 1304 if reuse_port: 1305 _set_reuseport(sock) 1306 if allow_broadcast: 1307 sock.setsockopt( 1308 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1309 sock.setblocking(False) 1310 1311 if local_addr: 1312 sock.bind(local_address) 1313 if remote_addr: 1314 if not allow_broadcast: 1315 await self.sock_connect(sock, remote_address) 1316 r_addr = remote_address 1317 except OSError as exc: 1318 if sock is not None: 1319 sock.close() 1320 exceptions.append(exc) 1321 except: 1322 if sock is not None: 1323 sock.close() 1324 raise 1325 else: 1326 break 1327 else: 1328 raise exceptions[0] 1329 1330 protocol = protocol_factory() 1331 waiter = self.create_future() 1332 transport = self._make_datagram_transport( 1333 sock, protocol, r_addr, waiter) 1334 if self._debug: 1335 if local_addr: 1336 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1337 "created: (%r, %r)", 1338 local_addr, remote_addr, transport, protocol) 1339 else: 1340 logger.debug("Datagram endpoint remote_addr=%r created: " 1341 "(%r, %r)", 1342 remote_addr, transport, protocol) 1343 1344 try: 1345 await waiter 1346 except: 1347 transport.close() 1348 raise 1349 1350 return transport, protocol 1351 1352 async def _ensure_resolved(self, address, *, 1353 family=0, type=socket.SOCK_STREAM, 1354 proto=0, flags=0, loop): 1355 host, port = address[:2] 1356 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1357 if info is not None: 1358 # "host" is already a resolved IP. 1359 return [info] 1360 else: 1361 return await loop.getaddrinfo(host, port, family=family, type=type, 1362 proto=proto, flags=flags) 1363 1364 async def _create_server_getaddrinfo(self, host, port, family, flags): 1365 infos = await self._ensure_resolved((host, port), family=family, 1366 type=socket.SOCK_STREAM, 1367 flags=flags, loop=self) 1368 if not infos: 1369 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1370 return infos 1371 1372 async def create_server( 1373 self, protocol_factory, host=None, port=None, 1374 *, 1375 family=socket.AF_UNSPEC, 1376 flags=socket.AI_PASSIVE, 1377 sock=None, 1378 backlog=100, 1379 ssl=None, 1380 reuse_address=None, 1381 reuse_port=None, 1382 ssl_handshake_timeout=None, 1383 start_serving=True): 1384 """Create a TCP server. 1385 1386 The host parameter can be a string, in that case the TCP server is 1387 bound to host and port. 1388 1389 The host parameter can also be a sequence of strings and in that case 1390 the TCP server is bound to all hosts of the sequence. If a host 1391 appears multiple times (possibly indirectly e.g. when hostnames 1392 resolve to the same IP address), the server is only bound once to that 1393 host. 1394 1395 Return a Server object which can be used to stop the service. 1396 1397 This method is a coroutine. 1398 """ 1399 if isinstance(ssl, bool): 1400 raise TypeError('ssl argument must be an SSLContext or None') 1401 1402 if ssl_handshake_timeout is not None and ssl is None: 1403 raise ValueError( 1404 'ssl_handshake_timeout is only meaningful with ssl') 1405 1406 if host is not None or port is not None: 1407 if sock is not None: 1408 raise ValueError( 1409 'host/port and sock can not be specified at the same time') 1410 1411 if reuse_address is None: 1412 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 1413 sockets = [] 1414 if host == '': 1415 hosts = [None] 1416 elif (isinstance(host, str) or 1417 not isinstance(host, collections.abc.Iterable)): 1418 hosts = [host] 1419 else: 1420 hosts = host 1421 1422 fs = [self._create_server_getaddrinfo(host, port, family=family, 1423 flags=flags) 1424 for host in hosts] 1425 infos = await tasks.gather(*fs, loop=self) 1426 infos = set(itertools.chain.from_iterable(infos)) 1427 1428 completed = False 1429 try: 1430 for res in infos: 1431 af, socktype, proto, canonname, sa = res 1432 try: 1433 sock = socket.socket(af, socktype, proto) 1434 except socket.error: 1435 # Assume it's a bad family/type/protocol combination. 1436 if self._debug: 1437 logger.warning('create_server() failed to create ' 1438 'socket.socket(%r, %r, %r)', 1439 af, socktype, proto, exc_info=True) 1440 continue 1441 sockets.append(sock) 1442 if reuse_address: 1443 sock.setsockopt( 1444 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1445 if reuse_port: 1446 _set_reuseport(sock) 1447 # Disable IPv4/IPv6 dual stack support (enabled by 1448 # default on Linux) which makes a single socket 1449 # listen on both address families. 1450 if (_HAS_IPv6 and 1451 af == socket.AF_INET6 and 1452 hasattr(socket, 'IPPROTO_IPV6')): 1453 sock.setsockopt(socket.IPPROTO_IPV6, 1454 socket.IPV6_V6ONLY, 1455 True) 1456 try: 1457 sock.bind(sa) 1458 except OSError as err: 1459 raise OSError(err.errno, 'error while attempting ' 1460 'to bind on address %r: %s' 1461 % (sa, err.strerror.lower())) from None 1462 completed = True 1463 finally: 1464 if not completed: 1465 for sock in sockets: 1466 sock.close() 1467 else: 1468 if sock is None: 1469 raise ValueError('Neither host/port nor sock were specified') 1470 if sock.type != socket.SOCK_STREAM: 1471 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1472 sockets = [sock] 1473 1474 for sock in sockets: 1475 sock.setblocking(False) 1476 1477 server = Server(self, sockets, protocol_factory, 1478 ssl, backlog, ssl_handshake_timeout) 1479 if start_serving: 1480 server._start_serving() 1481 # Skip one loop iteration so that all 'loop.add_reader' 1482 # go through. 1483 await tasks.sleep(0, loop=self) 1484 1485 if self._debug: 1486 logger.info("%r is serving", server) 1487 return server 1488 1489 async def connect_accepted_socket( 1490 self, protocol_factory, sock, 1491 *, ssl=None, 1492 ssl_handshake_timeout=None): 1493 """Handle an accepted connection. 1494 1495 This is used by servers that accept connections outside of 1496 asyncio but that use asyncio to handle connections. 1497 1498 This method is a coroutine. When completed, the coroutine 1499 returns a (transport, protocol) pair. 1500 """ 1501 if sock.type != socket.SOCK_STREAM: 1502 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1503 1504 if ssl_handshake_timeout is not None and not ssl: 1505 raise ValueError( 1506 'ssl_handshake_timeout is only meaningful with ssl') 1507 1508 transport, protocol = await self._create_connection_transport( 1509 sock, protocol_factory, ssl, '', server_side=True, 1510 ssl_handshake_timeout=ssl_handshake_timeout) 1511 if self._debug: 1512 # Get the socket from the transport because SSL transport closes 1513 # the old socket and creates a new SSL socket 1514 sock = transport.get_extra_info('socket') 1515 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1516 return transport, protocol 1517 1518 async def connect_read_pipe(self, protocol_factory, pipe): 1519 protocol = protocol_factory() 1520 waiter = self.create_future() 1521 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1522 1523 try: 1524 await waiter 1525 except: 1526 transport.close() 1527 raise 1528 1529 if self._debug: 1530 logger.debug('Read pipe %r connected: (%r, %r)', 1531 pipe.fileno(), transport, protocol) 1532 return transport, protocol 1533 1534 async def connect_write_pipe(self, protocol_factory, pipe): 1535 protocol = protocol_factory() 1536 waiter = self.create_future() 1537 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1538 1539 try: 1540 await waiter 1541 except: 1542 transport.close() 1543 raise 1544 1545 if self._debug: 1546 logger.debug('Write pipe %r connected: (%r, %r)', 1547 pipe.fileno(), transport, protocol) 1548 return transport, protocol 1549 1550 def _log_subprocess(self, msg, stdin, stdout, stderr): 1551 info = [msg] 1552 if stdin is not None: 1553 info.append(f'stdin={_format_pipe(stdin)}') 1554 if stdout is not None and stderr == subprocess.STDOUT: 1555 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1556 else: 1557 if stdout is not None: 1558 info.append(f'stdout={_format_pipe(stdout)}') 1559 if stderr is not None: 1560 info.append(f'stderr={_format_pipe(stderr)}') 1561 logger.debug(' '.join(info)) 1562 1563 async def subprocess_shell(self, protocol_factory, cmd, *, 1564 stdin=subprocess.PIPE, 1565 stdout=subprocess.PIPE, 1566 stderr=subprocess.PIPE, 1567 universal_newlines=False, 1568 shell=True, bufsize=0, 1569 encoding=None, errors=None, text=None, 1570 **kwargs): 1571 if not isinstance(cmd, (bytes, str)): 1572 raise ValueError("cmd must be a string") 1573 if universal_newlines: 1574 raise ValueError("universal_newlines must be False") 1575 if not shell: 1576 raise ValueError("shell must be True") 1577 if bufsize != 0: 1578 raise ValueError("bufsize must be 0") 1579 if text: 1580 raise ValueError("text must be False") 1581 if encoding is not None: 1582 raise ValueError("encoding must be None") 1583 if errors is not None: 1584 raise ValueError("errors must be None") 1585 1586 protocol = protocol_factory() 1587 debug_log = None 1588 if self._debug: 1589 # don't log parameters: they may contain sensitive information 1590 # (password) and may be too long 1591 debug_log = 'run shell command %r' % cmd 1592 self._log_subprocess(debug_log, stdin, stdout, stderr) 1593 transport = await self._make_subprocess_transport( 1594 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1595 if self._debug and debug_log is not None: 1596 logger.info('%s: %r', debug_log, transport) 1597 return transport, protocol 1598 1599 async def subprocess_exec(self, protocol_factory, program, *args, 1600 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1601 stderr=subprocess.PIPE, universal_newlines=False, 1602 shell=False, bufsize=0, 1603 encoding=None, errors=None, text=None, 1604 **kwargs): 1605 if universal_newlines: 1606 raise ValueError("universal_newlines must be False") 1607 if shell: 1608 raise ValueError("shell must be False") 1609 if bufsize != 0: 1610 raise ValueError("bufsize must be 0") 1611 if text: 1612 raise ValueError("text must be False") 1613 if encoding is not None: 1614 raise ValueError("encoding must be None") 1615 if errors is not None: 1616 raise ValueError("errors must be None") 1617 1618 popen_args = (program,) + args 1619 protocol = protocol_factory() 1620 debug_log = None 1621 if self._debug: 1622 # don't log parameters: they may contain sensitive information 1623 # (password) and may be too long 1624 debug_log = f'execute program {program!r}' 1625 self._log_subprocess(debug_log, stdin, stdout, stderr) 1626 transport = await self._make_subprocess_transport( 1627 protocol, popen_args, False, stdin, stdout, stderr, 1628 bufsize, **kwargs) 1629 if self._debug and debug_log is not None: 1630 logger.info('%s: %r', debug_log, transport) 1631 return transport, protocol 1632 1633 def get_exception_handler(self): 1634 """Return an exception handler, or None if the default one is in use. 1635 """ 1636 return self._exception_handler 1637 1638 def set_exception_handler(self, handler): 1639 """Set handler as the new event loop exception handler. 1640 1641 If handler is None, the default exception handler will 1642 be set. 1643 1644 If handler is a callable object, it should have a 1645 signature matching '(loop, context)', where 'loop' 1646 will be a reference to the active event loop, 'context' 1647 will be a dict object (see `call_exception_handler()` 1648 documentation for details about context). 1649 """ 1650 if handler is not None and not callable(handler): 1651 raise TypeError(f'A callable object or None is expected, ' 1652 f'got {handler!r}') 1653 self._exception_handler = handler 1654 1655 def default_exception_handler(self, context): 1656 """Default exception handler. 1657 1658 This is called when an exception occurs and no exception 1659 handler is set, and can be called by a custom exception 1660 handler that wants to defer to the default behavior. 1661 1662 This default handler logs the error message and other 1663 context-dependent information. In debug mode, a truncated 1664 stack trace is also appended showing where the given object 1665 (e.g. a handle or future or task) was created, if any. 1666 1667 The context parameter has the same meaning as in 1668 `call_exception_handler()`. 1669 """ 1670 message = context.get('message') 1671 if not message: 1672 message = 'Unhandled exception in event loop' 1673 1674 exception = context.get('exception') 1675 if exception is not None: 1676 exc_info = (type(exception), exception, exception.__traceback__) 1677 else: 1678 exc_info = False 1679 1680 if ('source_traceback' not in context and 1681 self._current_handle is not None and 1682 self._current_handle._source_traceback): 1683 context['handle_traceback'] = \ 1684 self._current_handle._source_traceback 1685 1686 log_lines = [message] 1687 for key in sorted(context): 1688 if key in {'message', 'exception'}: 1689 continue 1690 value = context[key] 1691 if key == 'source_traceback': 1692 tb = ''.join(traceback.format_list(value)) 1693 value = 'Object created at (most recent call last):\n' 1694 value += tb.rstrip() 1695 elif key == 'handle_traceback': 1696 tb = ''.join(traceback.format_list(value)) 1697 value = 'Handle created at (most recent call last):\n' 1698 value += tb.rstrip() 1699 else: 1700 value = repr(value) 1701 log_lines.append(f'{key}: {value}') 1702 1703 logger.error('\n'.join(log_lines), exc_info=exc_info) 1704 1705 def call_exception_handler(self, context): 1706 """Call the current event loop's exception handler. 1707 1708 The context argument is a dict containing the following keys: 1709 1710 - 'message': Error message; 1711 - 'exception' (optional): Exception object; 1712 - 'future' (optional): Future instance; 1713 - 'task' (optional): Task instance; 1714 - 'handle' (optional): Handle instance; 1715 - 'protocol' (optional): Protocol instance; 1716 - 'transport' (optional): Transport instance; 1717 - 'socket' (optional): Socket instance; 1718 - 'asyncgen' (optional): Asynchronous generator that caused 1719 the exception. 1720 1721 New keys maybe introduced in the future. 1722 1723 Note: do not overload this method in an event loop subclass. 1724 For custom exception handling, use the 1725 `set_exception_handler()` method. 1726 """ 1727 if self._exception_handler is None: 1728 try: 1729 self.default_exception_handler(context) 1730 except (SystemExit, KeyboardInterrupt): 1731 raise 1732 except BaseException: 1733 # Second protection layer for unexpected errors 1734 # in the default implementation, as well as for subclassed 1735 # event loops with overloaded "default_exception_handler". 1736 logger.error('Exception in default exception handler', 1737 exc_info=True) 1738 else: 1739 try: 1740 self._exception_handler(self, context) 1741 except (SystemExit, KeyboardInterrupt): 1742 raise 1743 except BaseException as exc: 1744 # Exception in the user set custom exception handler. 1745 try: 1746 # Let's try default handler. 1747 self.default_exception_handler({ 1748 'message': 'Unhandled error in exception handler', 1749 'exception': exc, 1750 'context': context, 1751 }) 1752 except (SystemExit, KeyboardInterrupt): 1753 raise 1754 except BaseException: 1755 # Guard 'default_exception_handler' in case it is 1756 # overloaded. 1757 logger.error('Exception in default exception handler ' 1758 'while handling an unexpected error ' 1759 'in custom exception handler', 1760 exc_info=True) 1761 1762 def _add_callback(self, handle): 1763 """Add a Handle to _scheduled (TimerHandle) or _ready.""" 1764 assert isinstance(handle, events.Handle), 'A Handle is required here' 1765 if handle._cancelled: 1766 return 1767 assert not isinstance(handle, events.TimerHandle) 1768 self._ready.append(handle) 1769 1770 def _add_callback_signalsafe(self, handle): 1771 """Like _add_callback() but called from a signal handler.""" 1772 self._add_callback(handle) 1773 self._write_to_self() 1774 1775 def _timer_handle_cancelled(self, handle): 1776 """Notification that a TimerHandle has been cancelled.""" 1777 if handle._scheduled: 1778 self._timer_cancelled_count += 1 1779 1780 def _run_once(self): 1781 """Run one full iteration of the event loop. 1782 1783 This calls all currently ready callbacks, polls for I/O, 1784 schedules the resulting callbacks, and finally schedules 1785 'call_later' callbacks. 1786 """ 1787 1788 sched_count = len(self._scheduled) 1789 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1790 self._timer_cancelled_count / sched_count > 1791 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1792 # Remove delayed calls that were cancelled if their number 1793 # is too high 1794 new_scheduled = [] 1795 for handle in self._scheduled: 1796 if handle._cancelled: 1797 handle._scheduled = False 1798 else: 1799 new_scheduled.append(handle) 1800 1801 heapq.heapify(new_scheduled) 1802 self._scheduled = new_scheduled 1803 self._timer_cancelled_count = 0 1804 else: 1805 # Remove delayed calls that were cancelled from head of queue. 1806 while self._scheduled and self._scheduled[0]._cancelled: 1807 self._timer_cancelled_count -= 1 1808 handle = heapq.heappop(self._scheduled) 1809 handle._scheduled = False 1810 1811 timeout = None 1812 if self._ready or self._stopping: 1813 timeout = 0 1814 elif self._scheduled: 1815 # Compute the desired timeout. 1816 when = self._scheduled[0]._when 1817 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1818 1819 event_list = self._selector.select(timeout) 1820 self._process_events(event_list) 1821 1822 # Handle 'later' callbacks that are ready. 1823 end_time = self.time() + self._clock_resolution 1824 while self._scheduled: 1825 handle = self._scheduled[0] 1826 if handle._when >= end_time: 1827 break 1828 handle = heapq.heappop(self._scheduled) 1829 handle._scheduled = False 1830 self._ready.append(handle) 1831 1832 # This is the only place where callbacks are actually *called*. 1833 # All other places just add them to ready. 1834 # Note: We run all currently scheduled callbacks, but not any 1835 # callbacks scheduled by callbacks run this time around -- 1836 # they will be run the next time (after another I/O poll). 1837 # Use an idiom that is thread-safe without using locks. 1838 ntodo = len(self._ready) 1839 for i in range(ntodo): 1840 handle = self._ready.popleft() 1841 if handle._cancelled: 1842 continue 1843 if self._debug: 1844 try: 1845 self._current_handle = handle 1846 t0 = self.time() 1847 handle._run() 1848 dt = self.time() - t0 1849 if dt >= self.slow_callback_duration: 1850 logger.warning('Executing %s took %.3f seconds', 1851 _format_handle(handle), dt) 1852 finally: 1853 self._current_handle = None 1854 else: 1855 handle._run() 1856 handle = None # Needed to break cycles when an exception occurs. 1857 1858 def _set_coroutine_origin_tracking(self, enabled): 1859 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1860 return 1861 1862 if enabled: 1863 self._coroutine_origin_tracking_saved_depth = ( 1864 sys.get_coroutine_origin_tracking_depth()) 1865 sys.set_coroutine_origin_tracking_depth( 1866 constants.DEBUG_STACK_DEPTH) 1867 else: 1868 sys.set_coroutine_origin_tracking_depth( 1869 self._coroutine_origin_tracking_saved_depth) 1870 1871 self._coroutine_origin_tracking_enabled = enabled 1872 1873 def get_debug(self): 1874 return self._debug 1875 1876 def set_debug(self, enabled): 1877 self._debug = enabled 1878 1879 if self.is_running(): 1880 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1881