1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import functools 20import heapq 21import itertools 22import os 23import socket 24import stat 25import subprocess 26import threading 27import time 28import traceback 29import sys 30import warnings 31import weakref 32 33try: 34 import ssl 35except ImportError: # pragma: no cover 36 ssl = None 37 38from . import constants 39from . import coroutines 40from . import events 41from . import exceptions 42from . import futures 43from . import protocols 44from . import sslproto 45from . import staggered 46from . import tasks 47from . import transports 48from . import trsock 49from .log import logger 50 51 52__all__ = 'BaseEventLoop','Server', 53 54 55# Minimum number of _scheduled timer handles before cleanup of 56# cancelled handles is performed. 57_MIN_SCHEDULED_TIMER_HANDLES = 100 58 59# Minimum fraction of _scheduled timer handles that are cancelled 60# before cleanup of cancelled handles is performed. 61_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 62 63 64_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65 66# Maximum timeout passed to select to avoid OS limitations 67MAXIMUM_SELECT_TIMEOUT = 24 * 3600 68 69# Used for deprecation and removal of `loop.create_datagram_endpoint()`'s 70# *reuse_address* parameter 71_unset = object() 72 73 74def _format_handle(handle): 75 cb = handle._callback 76 if isinstance(getattr(cb, '__self__', None), tasks.Task): 77 # format the task 78 return repr(cb.__self__) 79 else: 80 return str(handle) 81 82 83def _format_pipe(fd): 84 if fd == subprocess.PIPE: 85 return '<pipe>' 86 elif fd == subprocess.STDOUT: 87 return '<stdout>' 88 else: 89 return repr(fd) 90 91 92def _set_reuseport(sock): 93 if not hasattr(socket, 'SO_REUSEPORT'): 94 raise ValueError('reuse_port not supported by socket module') 95 else: 96 try: 97 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 98 except OSError: 99 raise ValueError('reuse_port not supported by socket module, ' 100 'SO_REUSEPORT defined but not implemented.') 101 102 103def _ipaddr_info(host, port, family, type, proto, flowinfo=0, scopeid=0): 104 # Try to skip getaddrinfo if "host" is already an IP. Users might have 105 # handled name resolution in their own code and pass in resolved IPs. 106 if not hasattr(socket, 'inet_pton'): 107 return 108 109 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 110 host is None: 111 return None 112 113 if type == socket.SOCK_STREAM: 114 proto = socket.IPPROTO_TCP 115 elif type == socket.SOCK_DGRAM: 116 proto = socket.IPPROTO_UDP 117 else: 118 return None 119 120 if port is None: 121 port = 0 122 elif isinstance(port, bytes) and port == b'': 123 port = 0 124 elif isinstance(port, str) and port == '': 125 port = 0 126 else: 127 # If port's a service name like "http", don't skip getaddrinfo. 128 try: 129 port = int(port) 130 except (TypeError, ValueError): 131 return None 132 133 if family == socket.AF_UNSPEC: 134 afs = [socket.AF_INET] 135 if _HAS_IPv6: 136 afs.append(socket.AF_INET6) 137 else: 138 afs = [family] 139 140 if isinstance(host, bytes): 141 host = host.decode('idna') 142 if '%' in host: 143 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 144 # like '::1%lo0'. 145 return None 146 147 for af in afs: 148 try: 149 socket.inet_pton(af, host) 150 # The host has already been resolved. 151 if _HAS_IPv6 and af == socket.AF_INET6: 152 return af, type, proto, '', (host, port, flowinfo, scopeid) 153 else: 154 return af, type, proto, '', (host, port) 155 except OSError: 156 pass 157 158 # "host" is not an IP address. 159 return None 160 161 162def _interleave_addrinfos(addrinfos, first_address_family_count=1): 163 """Interleave list of addrinfo tuples by family.""" 164 # Group addresses by family 165 addrinfos_by_family = collections.OrderedDict() 166 for addr in addrinfos: 167 family = addr[0] 168 if family not in addrinfos_by_family: 169 addrinfos_by_family[family] = [] 170 addrinfos_by_family[family].append(addr) 171 addrinfos_lists = list(addrinfos_by_family.values()) 172 173 reordered = [] 174 if first_address_family_count > 1: 175 reordered.extend(addrinfos_lists[0][:first_address_family_count - 1]) 176 del addrinfos_lists[0][:first_address_family_count - 1] 177 reordered.extend( 178 a for a in itertools.chain.from_iterable( 179 itertools.zip_longest(*addrinfos_lists) 180 ) if a is not None) 181 return reordered 182 183 184def _run_until_complete_cb(fut): 185 if not fut.cancelled(): 186 exc = fut.exception() 187 if isinstance(exc, (SystemExit, KeyboardInterrupt)): 188 # Issue #22429: run_forever() already finished, no need to 189 # stop it. 190 return 191 futures._get_loop(fut).stop() 192 193 194if hasattr(socket, 'TCP_NODELAY'): 195 def _set_nodelay(sock): 196 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 197 sock.type == socket.SOCK_STREAM and 198 sock.proto == socket.IPPROTO_TCP): 199 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 200else: 201 def _set_nodelay(sock): 202 pass 203 204 205def _check_ssl_socket(sock): 206 if ssl is not None and isinstance(sock, ssl.SSLSocket): 207 raise TypeError("Socket cannot be of type SSLSocket") 208 209 210class _SendfileFallbackProtocol(protocols.Protocol): 211 def __init__(self, transp): 212 if not isinstance(transp, transports._FlowControlMixin): 213 raise TypeError("transport should be _FlowControlMixin instance") 214 self._transport = transp 215 self._proto = transp.get_protocol() 216 self._should_resume_reading = transp.is_reading() 217 self._should_resume_writing = transp._protocol_paused 218 transp.pause_reading() 219 transp.set_protocol(self) 220 if self._should_resume_writing: 221 self._write_ready_fut = self._transport._loop.create_future() 222 else: 223 self._write_ready_fut = None 224 225 async def drain(self): 226 if self._transport.is_closing(): 227 raise ConnectionError("Connection closed by peer") 228 fut = self._write_ready_fut 229 if fut is None: 230 return 231 await fut 232 233 def connection_made(self, transport): 234 raise RuntimeError("Invalid state: " 235 "connection should have been established already.") 236 237 def connection_lost(self, exc): 238 if self._write_ready_fut is not None: 239 # Never happens if peer disconnects after sending the whole content 240 # Thus disconnection is always an exception from user perspective 241 if exc is None: 242 self._write_ready_fut.set_exception( 243 ConnectionError("Connection is closed by peer")) 244 else: 245 self._write_ready_fut.set_exception(exc) 246 self._proto.connection_lost(exc) 247 248 def pause_writing(self): 249 if self._write_ready_fut is not None: 250 return 251 self._write_ready_fut = self._transport._loop.create_future() 252 253 def resume_writing(self): 254 if self._write_ready_fut is None: 255 return 256 self._write_ready_fut.set_result(False) 257 self._write_ready_fut = None 258 259 def data_received(self, data): 260 raise RuntimeError("Invalid state: reading should be paused") 261 262 def eof_received(self): 263 raise RuntimeError("Invalid state: reading should be paused") 264 265 async def restore(self): 266 self._transport.set_protocol(self._proto) 267 if self._should_resume_reading: 268 self._transport.resume_reading() 269 if self._write_ready_fut is not None: 270 # Cancel the future. 271 # Basically it has no effect because protocol is switched back, 272 # no code should wait for it anymore. 273 self._write_ready_fut.cancel() 274 if self._should_resume_writing: 275 self._proto.resume_writing() 276 277 278class Server(events.AbstractServer): 279 280 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 281 ssl_handshake_timeout): 282 self._loop = loop 283 self._sockets = sockets 284 self._active_count = 0 285 self._waiters = [] 286 self._protocol_factory = protocol_factory 287 self._backlog = backlog 288 self._ssl_context = ssl_context 289 self._ssl_handshake_timeout = ssl_handshake_timeout 290 self._serving = False 291 self._serving_forever_fut = None 292 293 def __repr__(self): 294 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 295 296 def _attach(self): 297 assert self._sockets is not None 298 self._active_count += 1 299 300 def _detach(self): 301 assert self._active_count > 0 302 self._active_count -= 1 303 if self._active_count == 0 and self._sockets is None: 304 self._wakeup() 305 306 def _wakeup(self): 307 waiters = self._waiters 308 self._waiters = None 309 for waiter in waiters: 310 if not waiter.done(): 311 waiter.set_result(waiter) 312 313 def _start_serving(self): 314 if self._serving: 315 return 316 self._serving = True 317 for sock in self._sockets: 318 sock.listen(self._backlog) 319 self._loop._start_serving( 320 self._protocol_factory, sock, self._ssl_context, 321 self, self._backlog, self._ssl_handshake_timeout) 322 323 def get_loop(self): 324 return self._loop 325 326 def is_serving(self): 327 return self._serving 328 329 @property 330 def sockets(self): 331 if self._sockets is None: 332 return () 333 return tuple(trsock.TransportSocket(s) for s in self._sockets) 334 335 def close(self): 336 sockets = self._sockets 337 if sockets is None: 338 return 339 self._sockets = None 340 341 for sock in sockets: 342 self._loop._stop_serving(sock) 343 344 self._serving = False 345 346 if (self._serving_forever_fut is not None and 347 not self._serving_forever_fut.done()): 348 self._serving_forever_fut.cancel() 349 self._serving_forever_fut = None 350 351 if self._active_count == 0: 352 self._wakeup() 353 354 async def start_serving(self): 355 self._start_serving() 356 # Skip one loop iteration so that all 'loop.add_reader' 357 # go through. 358 await tasks.sleep(0) 359 360 async def serve_forever(self): 361 if self._serving_forever_fut is not None: 362 raise RuntimeError( 363 f'server {self!r} is already being awaited on serve_forever()') 364 if self._sockets is None: 365 raise RuntimeError(f'server {self!r} is closed') 366 367 self._start_serving() 368 self._serving_forever_fut = self._loop.create_future() 369 370 try: 371 await self._serving_forever_fut 372 except exceptions.CancelledError: 373 try: 374 self.close() 375 await self.wait_closed() 376 finally: 377 raise 378 finally: 379 self._serving_forever_fut = None 380 381 async def wait_closed(self): 382 if self._sockets is None or self._waiters is None: 383 return 384 waiter = self._loop.create_future() 385 self._waiters.append(waiter) 386 await waiter 387 388 389class BaseEventLoop(events.AbstractEventLoop): 390 391 def __init__(self): 392 self._timer_cancelled_count = 0 393 self._closed = False 394 self._stopping = False 395 self._ready = collections.deque() 396 self._scheduled = [] 397 self._default_executor = None 398 self._internal_fds = 0 399 # Identifier of the thread running the event loop, or None if the 400 # event loop is not running 401 self._thread_id = None 402 self._clock_resolution = time.get_clock_info('monotonic').resolution 403 self._exception_handler = None 404 self.set_debug(coroutines._is_debug_mode()) 405 # In debug mode, if the execution of a callback or a step of a task 406 # exceed this duration in seconds, the slow callback/task is logged. 407 self.slow_callback_duration = 0.1 408 self._current_handle = None 409 self._task_factory = None 410 self._coroutine_origin_tracking_enabled = False 411 self._coroutine_origin_tracking_saved_depth = None 412 413 # A weak set of all asynchronous generators that are 414 # being iterated by the loop. 415 self._asyncgens = weakref.WeakSet() 416 # Set to True when `loop.shutdown_asyncgens` is called. 417 self._asyncgens_shutdown_called = False 418 # Set to True when `loop.shutdown_default_executor` is called. 419 self._executor_shutdown_called = False 420 421 def __repr__(self): 422 return ( 423 f'<{self.__class__.__name__} running={self.is_running()} ' 424 f'closed={self.is_closed()} debug={self.get_debug()}>' 425 ) 426 427 def create_future(self): 428 """Create a Future object attached to the loop.""" 429 return futures.Future(loop=self) 430 431 def create_task(self, coro, *, name=None): 432 """Schedule a coroutine object. 433 434 Return a task object. 435 """ 436 self._check_closed() 437 if self._task_factory is None: 438 task = tasks.Task(coro, loop=self, name=name) 439 if task._source_traceback: 440 del task._source_traceback[-1] 441 else: 442 task = self._task_factory(self, coro) 443 tasks._set_task_name(task, name) 444 445 return task 446 447 def set_task_factory(self, factory): 448 """Set a task factory that will be used by loop.create_task(). 449 450 If factory is None the default task factory will be set. 451 452 If factory is a callable, it should have a signature matching 453 '(loop, coro)', where 'loop' will be a reference to the active 454 event loop, 'coro' will be a coroutine object. The callable 455 must return a Future. 456 """ 457 if factory is not None and not callable(factory): 458 raise TypeError('task factory must be a callable or None') 459 self._task_factory = factory 460 461 def get_task_factory(self): 462 """Return a task factory, or None if the default one is in use.""" 463 return self._task_factory 464 465 def _make_socket_transport(self, sock, protocol, waiter=None, *, 466 extra=None, server=None): 467 """Create socket transport.""" 468 raise NotImplementedError 469 470 def _make_ssl_transport( 471 self, rawsock, protocol, sslcontext, waiter=None, 472 *, server_side=False, server_hostname=None, 473 extra=None, server=None, 474 ssl_handshake_timeout=None, 475 call_connection_made=True): 476 """Create SSL transport.""" 477 raise NotImplementedError 478 479 def _make_datagram_transport(self, sock, protocol, 480 address=None, waiter=None, extra=None): 481 """Create datagram transport.""" 482 raise NotImplementedError 483 484 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 485 extra=None): 486 """Create read pipe transport.""" 487 raise NotImplementedError 488 489 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 490 extra=None): 491 """Create write pipe transport.""" 492 raise NotImplementedError 493 494 async def _make_subprocess_transport(self, protocol, args, shell, 495 stdin, stdout, stderr, bufsize, 496 extra=None, **kwargs): 497 """Create subprocess transport.""" 498 raise NotImplementedError 499 500 def _write_to_self(self): 501 """Write a byte to self-pipe, to wake up the event loop. 502 503 This may be called from a different thread. 504 505 The subclass is responsible for implementing the self-pipe. 506 """ 507 raise NotImplementedError 508 509 def _process_events(self, event_list): 510 """Process selector events.""" 511 raise NotImplementedError 512 513 def _check_closed(self): 514 if self._closed: 515 raise RuntimeError('Event loop is closed') 516 517 def _check_default_executor(self): 518 if self._executor_shutdown_called: 519 raise RuntimeError('Executor shutdown has been called') 520 521 def _asyncgen_finalizer_hook(self, agen): 522 self._asyncgens.discard(agen) 523 if not self.is_closed(): 524 self.call_soon_threadsafe(self.create_task, agen.aclose()) 525 526 def _asyncgen_firstiter_hook(self, agen): 527 if self._asyncgens_shutdown_called: 528 warnings.warn( 529 f"asynchronous generator {agen!r} was scheduled after " 530 f"loop.shutdown_asyncgens() call", 531 ResourceWarning, source=self) 532 533 self._asyncgens.add(agen) 534 535 async def shutdown_asyncgens(self): 536 """Shutdown all active asynchronous generators.""" 537 self._asyncgens_shutdown_called = True 538 539 if not len(self._asyncgens): 540 # If Python version is <3.6 or we don't have any asynchronous 541 # generators alive. 542 return 543 544 closing_agens = list(self._asyncgens) 545 self._asyncgens.clear() 546 547 results = await tasks.gather( 548 *[ag.aclose() for ag in closing_agens], 549 return_exceptions=True) 550 551 for result, agen in zip(results, closing_agens): 552 if isinstance(result, Exception): 553 self.call_exception_handler({ 554 'message': f'an error occurred during closing of ' 555 f'asynchronous generator {agen!r}', 556 'exception': result, 557 'asyncgen': agen 558 }) 559 560 async def shutdown_default_executor(self): 561 """Schedule the shutdown of the default executor.""" 562 self._executor_shutdown_called = True 563 if self._default_executor is None: 564 return 565 future = self.create_future() 566 thread = threading.Thread(target=self._do_shutdown, args=(future,)) 567 thread.start() 568 try: 569 await future 570 finally: 571 thread.join() 572 573 def _do_shutdown(self, future): 574 try: 575 self._default_executor.shutdown(wait=True) 576 self.call_soon_threadsafe(future.set_result, None) 577 except Exception as ex: 578 self.call_soon_threadsafe(future.set_exception, ex) 579 580 def _check_running(self): 581 if self.is_running(): 582 raise RuntimeError('This event loop is already running') 583 if events._get_running_loop() is not None: 584 raise RuntimeError( 585 'Cannot run the event loop while another loop is running') 586 587 def run_forever(self): 588 """Run until stop() is called.""" 589 self._check_closed() 590 self._check_running() 591 self._set_coroutine_origin_tracking(self._debug) 592 self._thread_id = threading.get_ident() 593 594 old_agen_hooks = sys.get_asyncgen_hooks() 595 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 596 finalizer=self._asyncgen_finalizer_hook) 597 try: 598 events._set_running_loop(self) 599 while True: 600 self._run_once() 601 if self._stopping: 602 break 603 finally: 604 self._stopping = False 605 self._thread_id = None 606 events._set_running_loop(None) 607 self._set_coroutine_origin_tracking(False) 608 sys.set_asyncgen_hooks(*old_agen_hooks) 609 610 def run_until_complete(self, future): 611 """Run until the Future is done. 612 613 If the argument is a coroutine, it is wrapped in a Task. 614 615 WARNING: It would be disastrous to call run_until_complete() 616 with the same coroutine twice -- it would wrap it in two 617 different Tasks and that can't be good. 618 619 Return the Future's result, or raise its exception. 620 """ 621 self._check_closed() 622 self._check_running() 623 624 new_task = not futures.isfuture(future) 625 future = tasks.ensure_future(future, loop=self) 626 if new_task: 627 # An exception is raised if the future didn't complete, so there 628 # is no need to log the "destroy pending task" message 629 future._log_destroy_pending = False 630 631 future.add_done_callback(_run_until_complete_cb) 632 try: 633 self.run_forever() 634 except: 635 if new_task and future.done() and not future.cancelled(): 636 # The coroutine raised a BaseException. Consume the exception 637 # to not log a warning, the caller doesn't have access to the 638 # local task. 639 future.exception() 640 raise 641 finally: 642 future.remove_done_callback(_run_until_complete_cb) 643 if not future.done(): 644 raise RuntimeError('Event loop stopped before Future completed.') 645 646 return future.result() 647 648 def stop(self): 649 """Stop running the event loop. 650 651 Every callback already scheduled will still run. This simply informs 652 run_forever to stop looping after a complete iteration. 653 """ 654 self._stopping = True 655 656 def close(self): 657 """Close the event loop. 658 659 This clears the queues and shuts down the executor, 660 but does not wait for the executor to finish. 661 662 The event loop must not be running. 663 """ 664 if self.is_running(): 665 raise RuntimeError("Cannot close a running event loop") 666 if self._closed: 667 return 668 if self._debug: 669 logger.debug("Close %r", self) 670 self._closed = True 671 self._ready.clear() 672 self._scheduled.clear() 673 self._executor_shutdown_called = True 674 executor = self._default_executor 675 if executor is not None: 676 self._default_executor = None 677 executor.shutdown(wait=False) 678 679 def is_closed(self): 680 """Returns True if the event loop was closed.""" 681 return self._closed 682 683 def __del__(self, _warn=warnings.warn): 684 if not self.is_closed(): 685 _warn(f"unclosed event loop {self!r}", ResourceWarning, source=self) 686 if not self.is_running(): 687 self.close() 688 689 def is_running(self): 690 """Returns True if the event loop is running.""" 691 return (self._thread_id is not None) 692 693 def time(self): 694 """Return the time according to the event loop's clock. 695 696 This is a float expressed in seconds since an epoch, but the 697 epoch, precision, accuracy and drift are unspecified and may 698 differ per event loop. 699 """ 700 return time.monotonic() 701 702 def call_later(self, delay, callback, *args, context=None): 703 """Arrange for a callback to be called at a given time. 704 705 Return a Handle: an opaque object with a cancel() method that 706 can be used to cancel the call. 707 708 The delay can be an int or float, expressed in seconds. It is 709 always relative to the current time. 710 711 Each callback will be called exactly once. If two callbacks 712 are scheduled for exactly the same time, it undefined which 713 will be called first. 714 715 Any positional arguments after the callback will be passed to 716 the callback when it is called. 717 """ 718 timer = self.call_at(self.time() + delay, callback, *args, 719 context=context) 720 if timer._source_traceback: 721 del timer._source_traceback[-1] 722 return timer 723 724 def call_at(self, when, callback, *args, context=None): 725 """Like call_later(), but uses an absolute time. 726 727 Absolute time corresponds to the event loop's time() method. 728 """ 729 self._check_closed() 730 if self._debug: 731 self._check_thread() 732 self._check_callback(callback, 'call_at') 733 timer = events.TimerHandle(when, callback, args, self, context) 734 if timer._source_traceback: 735 del timer._source_traceback[-1] 736 heapq.heappush(self._scheduled, timer) 737 timer._scheduled = True 738 return timer 739 740 def call_soon(self, callback, *args, context=None): 741 """Arrange for a callback to be called as soon as possible. 742 743 This operates as a FIFO queue: callbacks are called in the 744 order in which they are registered. Each callback will be 745 called exactly once. 746 747 Any positional arguments after the callback will be passed to 748 the callback when it is called. 749 """ 750 self._check_closed() 751 if self._debug: 752 self._check_thread() 753 self._check_callback(callback, 'call_soon') 754 handle = self._call_soon(callback, args, context) 755 if handle._source_traceback: 756 del handle._source_traceback[-1] 757 return handle 758 759 def _check_callback(self, callback, method): 760 if (coroutines.iscoroutine(callback) or 761 coroutines.iscoroutinefunction(callback)): 762 raise TypeError( 763 f"coroutines cannot be used with {method}()") 764 if not callable(callback): 765 raise TypeError( 766 f'a callable object was expected by {method}(), ' 767 f'got {callback!r}') 768 769 def _call_soon(self, callback, args, context): 770 handle = events.Handle(callback, args, self, context) 771 if handle._source_traceback: 772 del handle._source_traceback[-1] 773 self._ready.append(handle) 774 return handle 775 776 def _check_thread(self): 777 """Check that the current thread is the thread running the event loop. 778 779 Non-thread-safe methods of this class make this assumption and will 780 likely behave incorrectly when the assumption is violated. 781 782 Should only be called when (self._debug == True). The caller is 783 responsible for checking this condition for performance reasons. 784 """ 785 if self._thread_id is None: 786 return 787 thread_id = threading.get_ident() 788 if thread_id != self._thread_id: 789 raise RuntimeError( 790 "Non-thread-safe operation invoked on an event loop other " 791 "than the current one") 792 793 def call_soon_threadsafe(self, callback, *args, context=None): 794 """Like call_soon(), but thread-safe.""" 795 self._check_closed() 796 if self._debug: 797 self._check_callback(callback, 'call_soon_threadsafe') 798 handle = self._call_soon(callback, args, context) 799 if handle._source_traceback: 800 del handle._source_traceback[-1] 801 self._write_to_self() 802 return handle 803 804 def run_in_executor(self, executor, func, *args): 805 self._check_closed() 806 if self._debug: 807 self._check_callback(func, 'run_in_executor') 808 if executor is None: 809 executor = self._default_executor 810 # Only check when the default executor is being used 811 self._check_default_executor() 812 if executor is None: 813 executor = concurrent.futures.ThreadPoolExecutor( 814 thread_name_prefix='asyncio' 815 ) 816 self._default_executor = executor 817 return futures.wrap_future( 818 executor.submit(func, *args), loop=self) 819 820 def set_default_executor(self, executor): 821 if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): 822 warnings.warn( 823 'Using the default executor that is not an instance of ' 824 'ThreadPoolExecutor is deprecated and will be prohibited ' 825 'in Python 3.9', 826 DeprecationWarning, 2) 827 self._default_executor = executor 828 829 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 830 msg = [f"{host}:{port!r}"] 831 if family: 832 msg.append(f'family={family!r}') 833 if type: 834 msg.append(f'type={type!r}') 835 if proto: 836 msg.append(f'proto={proto!r}') 837 if flags: 838 msg.append(f'flags={flags!r}') 839 msg = ', '.join(msg) 840 logger.debug('Get address info %s', msg) 841 842 t0 = self.time() 843 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 844 dt = self.time() - t0 845 846 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 847 if dt >= self.slow_callback_duration: 848 logger.info(msg) 849 else: 850 logger.debug(msg) 851 return addrinfo 852 853 async def getaddrinfo(self, host, port, *, 854 family=0, type=0, proto=0, flags=0): 855 if self._debug: 856 getaddr_func = self._getaddrinfo_debug 857 else: 858 getaddr_func = socket.getaddrinfo 859 860 return await self.run_in_executor( 861 None, getaddr_func, host, port, family, type, proto, flags) 862 863 async def getnameinfo(self, sockaddr, flags=0): 864 return await self.run_in_executor( 865 None, socket.getnameinfo, sockaddr, flags) 866 867 async def sock_sendfile(self, sock, file, offset=0, count=None, 868 *, fallback=True): 869 if self._debug and sock.gettimeout() != 0: 870 raise ValueError("the socket must be non-blocking") 871 _check_ssl_socket(sock) 872 self._check_sendfile_params(sock, file, offset, count) 873 try: 874 return await self._sock_sendfile_native(sock, file, 875 offset, count) 876 except exceptions.SendfileNotAvailableError as exc: 877 if not fallback: 878 raise 879 return await self._sock_sendfile_fallback(sock, file, 880 offset, count) 881 882 async def _sock_sendfile_native(self, sock, file, offset, count): 883 # NB: sendfile syscall is not supported for SSL sockets and 884 # non-mmap files even if sendfile is supported by OS 885 raise exceptions.SendfileNotAvailableError( 886 f"syscall sendfile is not available for socket {sock!r} " 887 "and file {file!r} combination") 888 889 async def _sock_sendfile_fallback(self, sock, file, offset, count): 890 if offset: 891 file.seek(offset) 892 blocksize = ( 893 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 894 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 895 ) 896 buf = bytearray(blocksize) 897 total_sent = 0 898 try: 899 while True: 900 if count: 901 blocksize = min(count - total_sent, blocksize) 902 if blocksize <= 0: 903 break 904 view = memoryview(buf)[:blocksize] 905 read = await self.run_in_executor(None, file.readinto, view) 906 if not read: 907 break # EOF 908 await self.sock_sendall(sock, view[:read]) 909 total_sent += read 910 return total_sent 911 finally: 912 if total_sent > 0 and hasattr(file, 'seek'): 913 file.seek(offset + total_sent) 914 915 def _check_sendfile_params(self, sock, file, offset, count): 916 if 'b' not in getattr(file, 'mode', 'b'): 917 raise ValueError("file should be opened in binary mode") 918 if not sock.type == socket.SOCK_STREAM: 919 raise ValueError("only SOCK_STREAM type sockets are supported") 920 if count is not None: 921 if not isinstance(count, int): 922 raise TypeError( 923 "count must be a positive integer (got {!r})".format(count)) 924 if count <= 0: 925 raise ValueError( 926 "count must be a positive integer (got {!r})".format(count)) 927 if not isinstance(offset, int): 928 raise TypeError( 929 "offset must be a non-negative integer (got {!r})".format( 930 offset)) 931 if offset < 0: 932 raise ValueError( 933 "offset must be a non-negative integer (got {!r})".format( 934 offset)) 935 936 async def _connect_sock(self, exceptions, addr_info, local_addr_infos=None): 937 """Create, bind and connect one socket.""" 938 my_exceptions = [] 939 exceptions.append(my_exceptions) 940 family, type_, proto, _, address = addr_info 941 sock = None 942 try: 943 sock = socket.socket(family=family, type=type_, proto=proto) 944 sock.setblocking(False) 945 if local_addr_infos is not None: 946 for _, _, _, _, laddr in local_addr_infos: 947 try: 948 sock.bind(laddr) 949 break 950 except OSError as exc: 951 msg = ( 952 f'error while attempting to bind on ' 953 f'address {laddr!r}: ' 954 f'{exc.strerror.lower()}' 955 ) 956 exc = OSError(exc.errno, msg) 957 my_exceptions.append(exc) 958 else: # all bind attempts failed 959 raise my_exceptions.pop() 960 await self.sock_connect(sock, address) 961 return sock 962 except OSError as exc: 963 my_exceptions.append(exc) 964 if sock is not None: 965 sock.close() 966 raise 967 except: 968 if sock is not None: 969 sock.close() 970 raise 971 972 async def create_connection( 973 self, protocol_factory, host=None, port=None, 974 *, ssl=None, family=0, 975 proto=0, flags=0, sock=None, 976 local_addr=None, server_hostname=None, 977 ssl_handshake_timeout=None, 978 happy_eyeballs_delay=None, interleave=None): 979 """Connect to a TCP server. 980 981 Create a streaming transport connection to a given internet host and 982 port: socket family AF_INET or socket.AF_INET6 depending on host (or 983 family if specified), socket type SOCK_STREAM. protocol_factory must be 984 a callable returning a protocol instance. 985 986 This method is a coroutine which will try to establish the connection 987 in the background. When successful, the coroutine returns a 988 (transport, protocol) pair. 989 """ 990 if server_hostname is not None and not ssl: 991 raise ValueError('server_hostname is only meaningful with ssl') 992 993 if server_hostname is None and ssl: 994 # Use host as default for server_hostname. It is an error 995 # if host is empty or not set, e.g. when an 996 # already-connected socket was passed or when only a port 997 # is given. To avoid this error, you can pass 998 # server_hostname='' -- this will bypass the hostname 999 # check. (This also means that if host is a numeric 1000 # IP/IPv6 address, we will attempt to verify that exact 1001 # address; this will probably fail, but it is possible to 1002 # create a certificate for a specific IP address, so we 1003 # don't judge it here.) 1004 if not host: 1005 raise ValueError('You must set server_hostname ' 1006 'when using ssl without a host') 1007 server_hostname = host 1008 1009 if ssl_handshake_timeout is not None and not ssl: 1010 raise ValueError( 1011 'ssl_handshake_timeout is only meaningful with ssl') 1012 1013 if sock is not None: 1014 _check_ssl_socket(sock) 1015 1016 if happy_eyeballs_delay is not None and interleave is None: 1017 # If using happy eyeballs, default to interleave addresses by family 1018 interleave = 1 1019 1020 if host is not None or port is not None: 1021 if sock is not None: 1022 raise ValueError( 1023 'host/port and sock can not be specified at the same time') 1024 1025 infos = await self._ensure_resolved( 1026 (host, port), family=family, 1027 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 1028 if not infos: 1029 raise OSError('getaddrinfo() returned empty list') 1030 1031 if local_addr is not None: 1032 laddr_infos = await self._ensure_resolved( 1033 local_addr, family=family, 1034 type=socket.SOCK_STREAM, proto=proto, 1035 flags=flags, loop=self) 1036 if not laddr_infos: 1037 raise OSError('getaddrinfo() returned empty list') 1038 else: 1039 laddr_infos = None 1040 1041 if interleave: 1042 infos = _interleave_addrinfos(infos, interleave) 1043 1044 exceptions = [] 1045 if happy_eyeballs_delay is None: 1046 # not using happy eyeballs 1047 for addrinfo in infos: 1048 try: 1049 sock = await self._connect_sock( 1050 exceptions, addrinfo, laddr_infos) 1051 break 1052 except OSError: 1053 continue 1054 else: # using happy eyeballs 1055 sock, _, _ = await staggered.staggered_race( 1056 (functools.partial(self._connect_sock, 1057 exceptions, addrinfo, laddr_infos) 1058 for addrinfo in infos), 1059 happy_eyeballs_delay, loop=self) 1060 1061 if sock is None: 1062 exceptions = [exc for sub in exceptions for exc in sub] 1063 if len(exceptions) == 1: 1064 raise exceptions[0] 1065 else: 1066 # If they all have the same str(), raise one. 1067 model = str(exceptions[0]) 1068 if all(str(exc) == model for exc in exceptions): 1069 raise exceptions[0] 1070 # Raise a combined exception so the user can see all 1071 # the various error messages. 1072 raise OSError('Multiple exceptions: {}'.format( 1073 ', '.join(str(exc) for exc in exceptions))) 1074 1075 else: 1076 if sock is None: 1077 raise ValueError( 1078 'host and port was not specified and no sock specified') 1079 if sock.type != socket.SOCK_STREAM: 1080 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 1081 # are SOCK_STREAM. 1082 # We support passing AF_UNIX sockets even though we have 1083 # a dedicated API for that: create_unix_connection. 1084 # Disallowing AF_UNIX in this method, breaks backwards 1085 # compatibility. 1086 raise ValueError( 1087 f'A Stream Socket was expected, got {sock!r}') 1088 1089 transport, protocol = await self._create_connection_transport( 1090 sock, protocol_factory, ssl, server_hostname, 1091 ssl_handshake_timeout=ssl_handshake_timeout) 1092 if self._debug: 1093 # Get the socket from the transport because SSL transport closes 1094 # the old socket and creates a new SSL socket 1095 sock = transport.get_extra_info('socket') 1096 logger.debug("%r connected to %s:%r: (%r, %r)", 1097 sock, host, port, transport, protocol) 1098 return transport, protocol 1099 1100 async def _create_connection_transport( 1101 self, sock, protocol_factory, ssl, 1102 server_hostname, server_side=False, 1103 ssl_handshake_timeout=None): 1104 1105 sock.setblocking(False) 1106 1107 protocol = protocol_factory() 1108 waiter = self.create_future() 1109 if ssl: 1110 sslcontext = None if isinstance(ssl, bool) else ssl 1111 transport = self._make_ssl_transport( 1112 sock, protocol, sslcontext, waiter, 1113 server_side=server_side, server_hostname=server_hostname, 1114 ssl_handshake_timeout=ssl_handshake_timeout) 1115 else: 1116 transport = self._make_socket_transport(sock, protocol, waiter) 1117 1118 try: 1119 await waiter 1120 except: 1121 transport.close() 1122 raise 1123 1124 return transport, protocol 1125 1126 async def sendfile(self, transport, file, offset=0, count=None, 1127 *, fallback=True): 1128 """Send a file to transport. 1129 1130 Return the total number of bytes which were sent. 1131 1132 The method uses high-performance os.sendfile if available. 1133 1134 file must be a regular file object opened in binary mode. 1135 1136 offset tells from where to start reading the file. If specified, 1137 count is the total number of bytes to transmit as opposed to 1138 sending the file until EOF is reached. File position is updated on 1139 return or also in case of error in which case file.tell() 1140 can be used to figure out the number of bytes 1141 which were sent. 1142 1143 fallback set to True makes asyncio to manually read and send 1144 the file when the platform does not support the sendfile syscall 1145 (e.g. Windows or SSL socket on Unix). 1146 1147 Raise SendfileNotAvailableError if the system does not support 1148 sendfile syscall and fallback is False. 1149 """ 1150 if transport.is_closing(): 1151 raise RuntimeError("Transport is closing") 1152 mode = getattr(transport, '_sendfile_compatible', 1153 constants._SendfileMode.UNSUPPORTED) 1154 if mode is constants._SendfileMode.UNSUPPORTED: 1155 raise RuntimeError( 1156 f"sendfile is not supported for transport {transport!r}") 1157 if mode is constants._SendfileMode.TRY_NATIVE: 1158 try: 1159 return await self._sendfile_native(transport, file, 1160 offset, count) 1161 except exceptions.SendfileNotAvailableError as exc: 1162 if not fallback: 1163 raise 1164 1165 if not fallback: 1166 raise RuntimeError( 1167 f"fallback is disabled and native sendfile is not " 1168 f"supported for transport {transport!r}") 1169 1170 return await self._sendfile_fallback(transport, file, 1171 offset, count) 1172 1173 async def _sendfile_native(self, transp, file, offset, count): 1174 raise exceptions.SendfileNotAvailableError( 1175 "sendfile syscall is not supported") 1176 1177 async def _sendfile_fallback(self, transp, file, offset, count): 1178 if offset: 1179 file.seek(offset) 1180 blocksize = min(count, 16384) if count else 16384 1181 buf = bytearray(blocksize) 1182 total_sent = 0 1183 proto = _SendfileFallbackProtocol(transp) 1184 try: 1185 while True: 1186 if count: 1187 blocksize = min(count - total_sent, blocksize) 1188 if blocksize <= 0: 1189 return total_sent 1190 view = memoryview(buf)[:blocksize] 1191 read = await self.run_in_executor(None, file.readinto, view) 1192 if not read: 1193 return total_sent # EOF 1194 await proto.drain() 1195 transp.write(view[:read]) 1196 total_sent += read 1197 finally: 1198 if total_sent > 0 and hasattr(file, 'seek'): 1199 file.seek(offset + total_sent) 1200 await proto.restore() 1201 1202 async def start_tls(self, transport, protocol, sslcontext, *, 1203 server_side=False, 1204 server_hostname=None, 1205 ssl_handshake_timeout=None): 1206 """Upgrade transport to TLS. 1207 1208 Return a new transport that *protocol* should start using 1209 immediately. 1210 """ 1211 if ssl is None: 1212 raise RuntimeError('Python ssl module is not available') 1213 1214 if not isinstance(sslcontext, ssl.SSLContext): 1215 raise TypeError( 1216 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1217 f'got {sslcontext!r}') 1218 1219 if not getattr(transport, '_start_tls_compatible', False): 1220 raise TypeError( 1221 f'transport {transport!r} is not supported by start_tls()') 1222 1223 waiter = self.create_future() 1224 ssl_protocol = sslproto.SSLProtocol( 1225 self, protocol, sslcontext, waiter, 1226 server_side, server_hostname, 1227 ssl_handshake_timeout=ssl_handshake_timeout, 1228 call_connection_made=False) 1229 1230 # Pause early so that "ssl_protocol.data_received()" doesn't 1231 # have a chance to get called before "ssl_protocol.connection_made()". 1232 transport.pause_reading() 1233 1234 transport.set_protocol(ssl_protocol) 1235 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1236 resume_cb = self.call_soon(transport.resume_reading) 1237 1238 try: 1239 await waiter 1240 except BaseException: 1241 transport.close() 1242 conmade_cb.cancel() 1243 resume_cb.cancel() 1244 raise 1245 1246 return ssl_protocol._app_transport 1247 1248 async def create_datagram_endpoint(self, protocol_factory, 1249 local_addr=None, remote_addr=None, *, 1250 family=0, proto=0, flags=0, 1251 reuse_address=_unset, reuse_port=None, 1252 allow_broadcast=None, sock=None): 1253 """Create datagram connection.""" 1254 if sock is not None: 1255 if sock.type != socket.SOCK_DGRAM: 1256 raise ValueError( 1257 f'A UDP Socket was expected, got {sock!r}') 1258 if (local_addr or remote_addr or 1259 family or proto or flags or 1260 reuse_port or allow_broadcast): 1261 # show the problematic kwargs in exception msg 1262 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1263 family=family, proto=proto, flags=flags, 1264 reuse_address=reuse_address, reuse_port=reuse_port, 1265 allow_broadcast=allow_broadcast) 1266 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1267 raise ValueError( 1268 f'socket modifier keyword arguments can not be used ' 1269 f'when sock is specified. ({problems})') 1270 sock.setblocking(False) 1271 r_addr = None 1272 else: 1273 if not (local_addr or remote_addr): 1274 if family == 0: 1275 raise ValueError('unexpected address family') 1276 addr_pairs_info = (((family, proto), (None, None)),) 1277 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1278 for addr in (local_addr, remote_addr): 1279 if addr is not None and not isinstance(addr, str): 1280 raise TypeError('string is expected') 1281 1282 if local_addr and local_addr[0] not in (0, '\x00'): 1283 try: 1284 if stat.S_ISSOCK(os.stat(local_addr).st_mode): 1285 os.remove(local_addr) 1286 except FileNotFoundError: 1287 pass 1288 except OSError as err: 1289 # Directory may have permissions only to create socket. 1290 logger.error('Unable to check or remove stale UNIX ' 1291 'socket %r: %r', 1292 local_addr, err) 1293 1294 addr_pairs_info = (((family, proto), 1295 (local_addr, remote_addr)), ) 1296 else: 1297 # join address by (family, protocol) 1298 addr_infos = {} # Using order preserving dict 1299 for idx, addr in ((0, local_addr), (1, remote_addr)): 1300 if addr is not None: 1301 assert isinstance(addr, tuple) and len(addr) == 2, ( 1302 '2-tuple is expected') 1303 1304 infos = await self._ensure_resolved( 1305 addr, family=family, type=socket.SOCK_DGRAM, 1306 proto=proto, flags=flags, loop=self) 1307 if not infos: 1308 raise OSError('getaddrinfo() returned empty list') 1309 1310 for fam, _, pro, _, address in infos: 1311 key = (fam, pro) 1312 if key not in addr_infos: 1313 addr_infos[key] = [None, None] 1314 addr_infos[key][idx] = address 1315 1316 # each addr has to have info for each (family, proto) pair 1317 addr_pairs_info = [ 1318 (key, addr_pair) for key, addr_pair in addr_infos.items() 1319 if not ((local_addr and addr_pair[0] is None) or 1320 (remote_addr and addr_pair[1] is None))] 1321 1322 if not addr_pairs_info: 1323 raise ValueError('can not get address information') 1324 1325 exceptions = [] 1326 1327 # bpo-37228 1328 if reuse_address is not _unset: 1329 if reuse_address: 1330 raise ValueError("Passing `reuse_address=True` is no " 1331 "longer supported, as the usage of " 1332 "SO_REUSEPORT in UDP poses a significant " 1333 "security concern.") 1334 else: 1335 warnings.warn("The *reuse_address* parameter has been " 1336 "deprecated as of 3.5.10 and is scheduled " 1337 "for removal in 3.11.", DeprecationWarning, 1338 stacklevel=2) 1339 1340 for ((family, proto), 1341 (local_address, remote_address)) in addr_pairs_info: 1342 sock = None 1343 r_addr = None 1344 try: 1345 sock = socket.socket( 1346 family=family, type=socket.SOCK_DGRAM, proto=proto) 1347 if reuse_port: 1348 _set_reuseport(sock) 1349 if allow_broadcast: 1350 sock.setsockopt( 1351 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1352 sock.setblocking(False) 1353 1354 if local_addr: 1355 sock.bind(local_address) 1356 if remote_addr: 1357 if not allow_broadcast: 1358 await self.sock_connect(sock, remote_address) 1359 r_addr = remote_address 1360 except OSError as exc: 1361 if sock is not None: 1362 sock.close() 1363 exceptions.append(exc) 1364 except: 1365 if sock is not None: 1366 sock.close() 1367 raise 1368 else: 1369 break 1370 else: 1371 raise exceptions[0] 1372 1373 protocol = protocol_factory() 1374 waiter = self.create_future() 1375 transport = self._make_datagram_transport( 1376 sock, protocol, r_addr, waiter) 1377 if self._debug: 1378 if local_addr: 1379 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1380 "created: (%r, %r)", 1381 local_addr, remote_addr, transport, protocol) 1382 else: 1383 logger.debug("Datagram endpoint remote_addr=%r created: " 1384 "(%r, %r)", 1385 remote_addr, transport, protocol) 1386 1387 try: 1388 await waiter 1389 except: 1390 transport.close() 1391 raise 1392 1393 return transport, protocol 1394 1395 async def _ensure_resolved(self, address, *, 1396 family=0, type=socket.SOCK_STREAM, 1397 proto=0, flags=0, loop): 1398 host, port = address[:2] 1399 info = _ipaddr_info(host, port, family, type, proto, *address[2:]) 1400 if info is not None: 1401 # "host" is already a resolved IP. 1402 return [info] 1403 else: 1404 return await loop.getaddrinfo(host, port, family=family, type=type, 1405 proto=proto, flags=flags) 1406 1407 async def _create_server_getaddrinfo(self, host, port, family, flags): 1408 infos = await self._ensure_resolved((host, port), family=family, 1409 type=socket.SOCK_STREAM, 1410 flags=flags, loop=self) 1411 if not infos: 1412 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1413 return infos 1414 1415 async def create_server( 1416 self, protocol_factory, host=None, port=None, 1417 *, 1418 family=socket.AF_UNSPEC, 1419 flags=socket.AI_PASSIVE, 1420 sock=None, 1421 backlog=100, 1422 ssl=None, 1423 reuse_address=None, 1424 reuse_port=None, 1425 ssl_handshake_timeout=None, 1426 start_serving=True): 1427 """Create a TCP server. 1428 1429 The host parameter can be a string, in that case the TCP server is 1430 bound to host and port. 1431 1432 The host parameter can also be a sequence of strings and in that case 1433 the TCP server is bound to all hosts of the sequence. If a host 1434 appears multiple times (possibly indirectly e.g. when hostnames 1435 resolve to the same IP address), the server is only bound once to that 1436 host. 1437 1438 Return a Server object which can be used to stop the service. 1439 1440 This method is a coroutine. 1441 """ 1442 if isinstance(ssl, bool): 1443 raise TypeError('ssl argument must be an SSLContext or None') 1444 1445 if ssl_handshake_timeout is not None and ssl is None: 1446 raise ValueError( 1447 'ssl_handshake_timeout is only meaningful with ssl') 1448 1449 if sock is not None: 1450 _check_ssl_socket(sock) 1451 1452 if host is not None or port is not None: 1453 if sock is not None: 1454 raise ValueError( 1455 'host/port and sock can not be specified at the same time') 1456 1457 if reuse_address is None: 1458 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 1459 sockets = [] 1460 if host == '': 1461 hosts = [None] 1462 elif (isinstance(host, str) or 1463 not isinstance(host, collections.abc.Iterable)): 1464 hosts = [host] 1465 else: 1466 hosts = host 1467 1468 fs = [self._create_server_getaddrinfo(host, port, family=family, 1469 flags=flags) 1470 for host in hosts] 1471 infos = await tasks.gather(*fs) 1472 infos = set(itertools.chain.from_iterable(infos)) 1473 1474 completed = False 1475 try: 1476 for res in infos: 1477 af, socktype, proto, canonname, sa = res 1478 try: 1479 sock = socket.socket(af, socktype, proto) 1480 except socket.error: 1481 # Assume it's a bad family/type/protocol combination. 1482 if self._debug: 1483 logger.warning('create_server() failed to create ' 1484 'socket.socket(%r, %r, %r)', 1485 af, socktype, proto, exc_info=True) 1486 continue 1487 sockets.append(sock) 1488 if reuse_address: 1489 sock.setsockopt( 1490 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1491 if reuse_port: 1492 _set_reuseport(sock) 1493 # Disable IPv4/IPv6 dual stack support (enabled by 1494 # default on Linux) which makes a single socket 1495 # listen on both address families. 1496 if (_HAS_IPv6 and 1497 af == socket.AF_INET6 and 1498 hasattr(socket, 'IPPROTO_IPV6')): 1499 sock.setsockopt(socket.IPPROTO_IPV6, 1500 socket.IPV6_V6ONLY, 1501 True) 1502 try: 1503 sock.bind(sa) 1504 except OSError as err: 1505 raise OSError(err.errno, 'error while attempting ' 1506 'to bind on address %r: %s' 1507 % (sa, err.strerror.lower())) from None 1508 completed = True 1509 finally: 1510 if not completed: 1511 for sock in sockets: 1512 sock.close() 1513 else: 1514 if sock is None: 1515 raise ValueError('Neither host/port nor sock were specified') 1516 if sock.type != socket.SOCK_STREAM: 1517 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1518 sockets = [sock] 1519 1520 for sock in sockets: 1521 sock.setblocking(False) 1522 1523 server = Server(self, sockets, protocol_factory, 1524 ssl, backlog, ssl_handshake_timeout) 1525 if start_serving: 1526 server._start_serving() 1527 # Skip one loop iteration so that all 'loop.add_reader' 1528 # go through. 1529 await tasks.sleep(0) 1530 1531 if self._debug: 1532 logger.info("%r is serving", server) 1533 return server 1534 1535 async def connect_accepted_socket( 1536 self, protocol_factory, sock, 1537 *, ssl=None, 1538 ssl_handshake_timeout=None): 1539 if sock.type != socket.SOCK_STREAM: 1540 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1541 1542 if ssl_handshake_timeout is not None and not ssl: 1543 raise ValueError( 1544 'ssl_handshake_timeout is only meaningful with ssl') 1545 1546 if sock is not None: 1547 _check_ssl_socket(sock) 1548 1549 transport, protocol = await self._create_connection_transport( 1550 sock, protocol_factory, ssl, '', server_side=True, 1551 ssl_handshake_timeout=ssl_handshake_timeout) 1552 if self._debug: 1553 # Get the socket from the transport because SSL transport closes 1554 # the old socket and creates a new SSL socket 1555 sock = transport.get_extra_info('socket') 1556 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1557 return transport, protocol 1558 1559 async def connect_read_pipe(self, protocol_factory, pipe): 1560 protocol = protocol_factory() 1561 waiter = self.create_future() 1562 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1563 1564 try: 1565 await waiter 1566 except: 1567 transport.close() 1568 raise 1569 1570 if self._debug: 1571 logger.debug('Read pipe %r connected: (%r, %r)', 1572 pipe.fileno(), transport, protocol) 1573 return transport, protocol 1574 1575 async def connect_write_pipe(self, protocol_factory, pipe): 1576 protocol = protocol_factory() 1577 waiter = self.create_future() 1578 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1579 1580 try: 1581 await waiter 1582 except: 1583 transport.close() 1584 raise 1585 1586 if self._debug: 1587 logger.debug('Write pipe %r connected: (%r, %r)', 1588 pipe.fileno(), transport, protocol) 1589 return transport, protocol 1590 1591 def _log_subprocess(self, msg, stdin, stdout, stderr): 1592 info = [msg] 1593 if stdin is not None: 1594 info.append(f'stdin={_format_pipe(stdin)}') 1595 if stdout is not None and stderr == subprocess.STDOUT: 1596 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1597 else: 1598 if stdout is not None: 1599 info.append(f'stdout={_format_pipe(stdout)}') 1600 if stderr is not None: 1601 info.append(f'stderr={_format_pipe(stderr)}') 1602 logger.debug(' '.join(info)) 1603 1604 async def subprocess_shell(self, protocol_factory, cmd, *, 1605 stdin=subprocess.PIPE, 1606 stdout=subprocess.PIPE, 1607 stderr=subprocess.PIPE, 1608 universal_newlines=False, 1609 shell=True, bufsize=0, 1610 encoding=None, errors=None, text=None, 1611 **kwargs): 1612 if not isinstance(cmd, (bytes, str)): 1613 raise ValueError("cmd must be a string") 1614 if universal_newlines: 1615 raise ValueError("universal_newlines must be False") 1616 if not shell: 1617 raise ValueError("shell must be True") 1618 if bufsize != 0: 1619 raise ValueError("bufsize must be 0") 1620 if text: 1621 raise ValueError("text must be False") 1622 if encoding is not None: 1623 raise ValueError("encoding must be None") 1624 if errors is not None: 1625 raise ValueError("errors must be None") 1626 1627 protocol = protocol_factory() 1628 debug_log = None 1629 if self._debug: 1630 # don't log parameters: they may contain sensitive information 1631 # (password) and may be too long 1632 debug_log = 'run shell command %r' % cmd 1633 self._log_subprocess(debug_log, stdin, stdout, stderr) 1634 transport = await self._make_subprocess_transport( 1635 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1636 if self._debug and debug_log is not None: 1637 logger.info('%s: %r', debug_log, transport) 1638 return transport, protocol 1639 1640 async def subprocess_exec(self, protocol_factory, program, *args, 1641 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1642 stderr=subprocess.PIPE, universal_newlines=False, 1643 shell=False, bufsize=0, 1644 encoding=None, errors=None, text=None, 1645 **kwargs): 1646 if universal_newlines: 1647 raise ValueError("universal_newlines must be False") 1648 if shell: 1649 raise ValueError("shell must be False") 1650 if bufsize != 0: 1651 raise ValueError("bufsize must be 0") 1652 if text: 1653 raise ValueError("text must be False") 1654 if encoding is not None: 1655 raise ValueError("encoding must be None") 1656 if errors is not None: 1657 raise ValueError("errors must be None") 1658 1659 popen_args = (program,) + args 1660 protocol = protocol_factory() 1661 debug_log = None 1662 if self._debug: 1663 # don't log parameters: they may contain sensitive information 1664 # (password) and may be too long 1665 debug_log = f'execute program {program!r}' 1666 self._log_subprocess(debug_log, stdin, stdout, stderr) 1667 transport = await self._make_subprocess_transport( 1668 protocol, popen_args, False, stdin, stdout, stderr, 1669 bufsize, **kwargs) 1670 if self._debug and debug_log is not None: 1671 logger.info('%s: %r', debug_log, transport) 1672 return transport, protocol 1673 1674 def get_exception_handler(self): 1675 """Return an exception handler, or None if the default one is in use. 1676 """ 1677 return self._exception_handler 1678 1679 def set_exception_handler(self, handler): 1680 """Set handler as the new event loop exception handler. 1681 1682 If handler is None, the default exception handler will 1683 be set. 1684 1685 If handler is a callable object, it should have a 1686 signature matching '(loop, context)', where 'loop' 1687 will be a reference to the active event loop, 'context' 1688 will be a dict object (see `call_exception_handler()` 1689 documentation for details about context). 1690 """ 1691 if handler is not None and not callable(handler): 1692 raise TypeError(f'A callable object or None is expected, ' 1693 f'got {handler!r}') 1694 self._exception_handler = handler 1695 1696 def default_exception_handler(self, context): 1697 """Default exception handler. 1698 1699 This is called when an exception occurs and no exception 1700 handler is set, and can be called by a custom exception 1701 handler that wants to defer to the default behavior. 1702 1703 This default handler logs the error message and other 1704 context-dependent information. In debug mode, a truncated 1705 stack trace is also appended showing where the given object 1706 (e.g. a handle or future or task) was created, if any. 1707 1708 The context parameter has the same meaning as in 1709 `call_exception_handler()`. 1710 """ 1711 message = context.get('message') 1712 if not message: 1713 message = 'Unhandled exception in event loop' 1714 1715 exception = context.get('exception') 1716 if exception is not None: 1717 exc_info = (type(exception), exception, exception.__traceback__) 1718 else: 1719 exc_info = False 1720 1721 if ('source_traceback' not in context and 1722 self._current_handle is not None and 1723 self._current_handle._source_traceback): 1724 context['handle_traceback'] = \ 1725 self._current_handle._source_traceback 1726 1727 log_lines = [message] 1728 for key in sorted(context): 1729 if key in {'message', 'exception'}: 1730 continue 1731 value = context[key] 1732 if key == 'source_traceback': 1733 tb = ''.join(traceback.format_list(value)) 1734 value = 'Object created at (most recent call last):\n' 1735 value += tb.rstrip() 1736 elif key == 'handle_traceback': 1737 tb = ''.join(traceback.format_list(value)) 1738 value = 'Handle created at (most recent call last):\n' 1739 value += tb.rstrip() 1740 else: 1741 value = repr(value) 1742 log_lines.append(f'{key}: {value}') 1743 1744 logger.error('\n'.join(log_lines), exc_info=exc_info) 1745 1746 def call_exception_handler(self, context): 1747 """Call the current event loop's exception handler. 1748 1749 The context argument is a dict containing the following keys: 1750 1751 - 'message': Error message; 1752 - 'exception' (optional): Exception object; 1753 - 'future' (optional): Future instance; 1754 - 'task' (optional): Task instance; 1755 - 'handle' (optional): Handle instance; 1756 - 'protocol' (optional): Protocol instance; 1757 - 'transport' (optional): Transport instance; 1758 - 'socket' (optional): Socket instance; 1759 - 'asyncgen' (optional): Asynchronous generator that caused 1760 the exception. 1761 1762 New keys maybe introduced in the future. 1763 1764 Note: do not overload this method in an event loop subclass. 1765 For custom exception handling, use the 1766 `set_exception_handler()` method. 1767 """ 1768 if self._exception_handler is None: 1769 try: 1770 self.default_exception_handler(context) 1771 except (SystemExit, KeyboardInterrupt): 1772 raise 1773 except BaseException: 1774 # Second protection layer for unexpected errors 1775 # in the default implementation, as well as for subclassed 1776 # event loops with overloaded "default_exception_handler". 1777 logger.error('Exception in default exception handler', 1778 exc_info=True) 1779 else: 1780 try: 1781 self._exception_handler(self, context) 1782 except (SystemExit, KeyboardInterrupt): 1783 raise 1784 except BaseException as exc: 1785 # Exception in the user set custom exception handler. 1786 try: 1787 # Let's try default handler. 1788 self.default_exception_handler({ 1789 'message': 'Unhandled error in exception handler', 1790 'exception': exc, 1791 'context': context, 1792 }) 1793 except (SystemExit, KeyboardInterrupt): 1794 raise 1795 except BaseException: 1796 # Guard 'default_exception_handler' in case it is 1797 # overloaded. 1798 logger.error('Exception in default exception handler ' 1799 'while handling an unexpected error ' 1800 'in custom exception handler', 1801 exc_info=True) 1802 1803 def _add_callback(self, handle): 1804 """Add a Handle to _scheduled (TimerHandle) or _ready.""" 1805 assert isinstance(handle, events.Handle), 'A Handle is required here' 1806 if handle._cancelled: 1807 return 1808 assert not isinstance(handle, events.TimerHandle) 1809 self._ready.append(handle) 1810 1811 def _add_callback_signalsafe(self, handle): 1812 """Like _add_callback() but called from a signal handler.""" 1813 self._add_callback(handle) 1814 self._write_to_self() 1815 1816 def _timer_handle_cancelled(self, handle): 1817 """Notification that a TimerHandle has been cancelled.""" 1818 if handle._scheduled: 1819 self._timer_cancelled_count += 1 1820 1821 def _run_once(self): 1822 """Run one full iteration of the event loop. 1823 1824 This calls all currently ready callbacks, polls for I/O, 1825 schedules the resulting callbacks, and finally schedules 1826 'call_later' callbacks. 1827 """ 1828 1829 sched_count = len(self._scheduled) 1830 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1831 self._timer_cancelled_count / sched_count > 1832 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1833 # Remove delayed calls that were cancelled if their number 1834 # is too high 1835 new_scheduled = [] 1836 for handle in self._scheduled: 1837 if handle._cancelled: 1838 handle._scheduled = False 1839 else: 1840 new_scheduled.append(handle) 1841 1842 heapq.heapify(new_scheduled) 1843 self._scheduled = new_scheduled 1844 self._timer_cancelled_count = 0 1845 else: 1846 # Remove delayed calls that were cancelled from head of queue. 1847 while self._scheduled and self._scheduled[0]._cancelled: 1848 self._timer_cancelled_count -= 1 1849 handle = heapq.heappop(self._scheduled) 1850 handle._scheduled = False 1851 1852 timeout = None 1853 if self._ready or self._stopping: 1854 timeout = 0 1855 elif self._scheduled: 1856 # Compute the desired timeout. 1857 when = self._scheduled[0]._when 1858 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1859 1860 event_list = self._selector.select(timeout) 1861 self._process_events(event_list) 1862 1863 # Handle 'later' callbacks that are ready. 1864 end_time = self.time() + self._clock_resolution 1865 while self._scheduled: 1866 handle = self._scheduled[0] 1867 if handle._when >= end_time: 1868 break 1869 handle = heapq.heappop(self._scheduled) 1870 handle._scheduled = False 1871 self._ready.append(handle) 1872 1873 # This is the only place where callbacks are actually *called*. 1874 # All other places just add them to ready. 1875 # Note: We run all currently scheduled callbacks, but not any 1876 # callbacks scheduled by callbacks run this time around -- 1877 # they will be run the next time (after another I/O poll). 1878 # Use an idiom that is thread-safe without using locks. 1879 ntodo = len(self._ready) 1880 for i in range(ntodo): 1881 handle = self._ready.popleft() 1882 if handle._cancelled: 1883 continue 1884 if self._debug: 1885 try: 1886 self._current_handle = handle 1887 t0 = self.time() 1888 handle._run() 1889 dt = self.time() - t0 1890 if dt >= self.slow_callback_duration: 1891 logger.warning('Executing %s took %.3f seconds', 1892 _format_handle(handle), dt) 1893 finally: 1894 self._current_handle = None 1895 else: 1896 handle._run() 1897 handle = None # Needed to break cycles when an exception occurs. 1898 1899 def _set_coroutine_origin_tracking(self, enabled): 1900 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1901 return 1902 1903 if enabled: 1904 self._coroutine_origin_tracking_saved_depth = ( 1905 sys.get_coroutine_origin_tracking_depth()) 1906 sys.set_coroutine_origin_tracking_depth( 1907 constants.DEBUG_STACK_DEPTH) 1908 else: 1909 sys.set_coroutine_origin_tracking_depth( 1910 self._coroutine_origin_tracking_saved_depth) 1911 1912 self._coroutine_origin_tracking_enabled = enabled 1913 1914 def get_debug(self): 1915 return self._debug 1916 1917 def set_debug(self, enabled): 1918 self._debug = enabled 1919 1920 if self.is_running(): 1921 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1922