1"""Base implementation of event loop. 2 3The event loop can be broken up into a multiplexer (the part 4responsible for notifying us of I/O events) and the event loop proper, 5which wraps a multiplexer with functionality for scheduling callbacks, 6immediately or at a given time in the future. 7 8Whenever a public API takes a callback, subsequent positional 9arguments will be passed to the callback if/when it is called. This 10avoids the proliferation of trivial lambdas implementing closures. 11Keyword arguments for the callback are not supported; this is a 12conscious design decision, leaving the door open for keyword arguments 13to modify the meaning of the API call itself. 14""" 15 16import collections 17import collections.abc 18import concurrent.futures 19import heapq 20import itertools 21import logging 22import os 23import socket 24import subprocess 25import threading 26import time 27import traceback 28import sys 29import warnings 30import weakref 31 32try: 33 import ssl 34except ImportError: # pragma: no cover 35 ssl = None 36 37from . import constants 38from . import coroutines 39from . import events 40from . import futures 41from . import protocols 42from . import sslproto 43from . import tasks 44from . import transports 45from .log import logger 46 47 48__all__ = 'BaseEventLoop', 49 50 51# Minimum number of _scheduled timer handles before cleanup of 52# cancelled handles is performed. 53_MIN_SCHEDULED_TIMER_HANDLES = 100 54 55# Minimum fraction of _scheduled timer handles that are cancelled 56# before cleanup of cancelled handles is performed. 57_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5 58 59# Exceptions which must not call the exception handler in fatal error 60# methods (_fatal_error()) 61_FATAL_ERROR_IGNORE = (BrokenPipeError, 62 ConnectionResetError, ConnectionAbortedError) 63 64_HAS_IPv6 = hasattr(socket, 'AF_INET6') 65 66# Maximum timeout passed to select to avoid OS limitations 67MAXIMUM_SELECT_TIMEOUT = 24 * 3600 68 69 70def _format_handle(handle): 71 cb = handle._callback 72 if isinstance(getattr(cb, '__self__', None), tasks.Task): 73 # format the task 74 return repr(cb.__self__) 75 else: 76 return str(handle) 77 78 79def _format_pipe(fd): 80 if fd == subprocess.PIPE: 81 return '<pipe>' 82 elif fd == subprocess.STDOUT: 83 return '<stdout>' 84 else: 85 return repr(fd) 86 87 88def _set_reuseport(sock): 89 if not hasattr(socket, 'SO_REUSEPORT'): 90 raise ValueError('reuse_port not supported by socket module') 91 else: 92 try: 93 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 94 except OSError: 95 raise ValueError('reuse_port not supported by socket module, ' 96 'SO_REUSEPORT defined but not implemented.') 97 98 99def _ipaddr_info(host, port, family, type, proto): 100 # Try to skip getaddrinfo if "host" is already an IP. Users might have 101 # handled name resolution in their own code and pass in resolved IPs. 102 if not hasattr(socket, 'inet_pton'): 103 return 104 105 if proto not in {0, socket.IPPROTO_TCP, socket.IPPROTO_UDP} or \ 106 host is None: 107 return None 108 109 if type == socket.SOCK_STREAM: 110 proto = socket.IPPROTO_TCP 111 elif type == socket.SOCK_DGRAM: 112 proto = socket.IPPROTO_UDP 113 else: 114 return None 115 116 if port is None: 117 port = 0 118 elif isinstance(port, bytes) and port == b'': 119 port = 0 120 elif isinstance(port, str) and port == '': 121 port = 0 122 else: 123 # If port's a service name like "http", don't skip getaddrinfo. 124 try: 125 port = int(port) 126 except (TypeError, ValueError): 127 return None 128 129 if family == socket.AF_UNSPEC: 130 afs = [socket.AF_INET] 131 if _HAS_IPv6: 132 afs.append(socket.AF_INET6) 133 else: 134 afs = [family] 135 136 if isinstance(host, bytes): 137 host = host.decode('idna') 138 if '%' in host: 139 # Linux's inet_pton doesn't accept an IPv6 zone index after host, 140 # like '::1%lo0'. 141 return None 142 143 for af in afs: 144 try: 145 socket.inet_pton(af, host) 146 # The host has already been resolved. 147 if _HAS_IPv6 and af == socket.AF_INET6: 148 return af, type, proto, '', (host, port, 0, 0) 149 else: 150 return af, type, proto, '', (host, port) 151 except OSError: 152 pass 153 154 # "host" is not an IP address. 155 return None 156 157 158def _run_until_complete_cb(fut): 159 if not fut.cancelled(): 160 exc = fut.exception() 161 if isinstance(exc, BaseException) and not isinstance(exc, Exception): 162 # Issue #22429: run_forever() already finished, no need to 163 # stop it. 164 return 165 futures._get_loop(fut).stop() 166 167 168if hasattr(socket, 'TCP_NODELAY'): 169 def _set_nodelay(sock): 170 if (sock.family in {socket.AF_INET, socket.AF_INET6} and 171 sock.type == socket.SOCK_STREAM and 172 sock.proto == socket.IPPROTO_TCP): 173 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 174else: 175 def _set_nodelay(sock): 176 pass 177 178 179class _SendfileFallbackProtocol(protocols.Protocol): 180 def __init__(self, transp): 181 if not isinstance(transp, transports._FlowControlMixin): 182 raise TypeError("transport should be _FlowControlMixin instance") 183 self._transport = transp 184 self._proto = transp.get_protocol() 185 self._should_resume_reading = transp.is_reading() 186 self._should_resume_writing = transp._protocol_paused 187 transp.pause_reading() 188 transp.set_protocol(self) 189 if self._should_resume_writing: 190 self._write_ready_fut = self._transport._loop.create_future() 191 else: 192 self._write_ready_fut = None 193 194 async def drain(self): 195 if self._transport.is_closing(): 196 raise ConnectionError("Connection closed by peer") 197 fut = self._write_ready_fut 198 if fut is None: 199 return 200 await fut 201 202 def connection_made(self, transport): 203 raise RuntimeError("Invalid state: " 204 "connection should have been established already.") 205 206 def connection_lost(self, exc): 207 if self._write_ready_fut is not None: 208 # Never happens if peer disconnects after sending the whole content 209 # Thus disconnection is always an exception from user perspective 210 if exc is None: 211 self._write_ready_fut.set_exception( 212 ConnectionError("Connection is closed by peer")) 213 else: 214 self._write_ready_fut.set_exception(exc) 215 self._proto.connection_lost(exc) 216 217 def pause_writing(self): 218 if self._write_ready_fut is not None: 219 return 220 self._write_ready_fut = self._transport._loop.create_future() 221 222 def resume_writing(self): 223 if self._write_ready_fut is None: 224 return 225 self._write_ready_fut.set_result(False) 226 self._write_ready_fut = None 227 228 def data_received(self, data): 229 raise RuntimeError("Invalid state: reading should be paused") 230 231 def eof_received(self): 232 raise RuntimeError("Invalid state: reading should be paused") 233 234 async def restore(self): 235 self._transport.set_protocol(self._proto) 236 if self._should_resume_reading: 237 self._transport.resume_reading() 238 if self._write_ready_fut is not None: 239 # Cancel the future. 240 # Basically it has no effect because protocol is switched back, 241 # no code should wait for it anymore. 242 self._write_ready_fut.cancel() 243 if self._should_resume_writing: 244 self._proto.resume_writing() 245 246 247class Server(events.AbstractServer): 248 249 def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, 250 ssl_handshake_timeout): 251 self._loop = loop 252 self._sockets = sockets 253 self._active_count = 0 254 self._waiters = [] 255 self._protocol_factory = protocol_factory 256 self._backlog = backlog 257 self._ssl_context = ssl_context 258 self._ssl_handshake_timeout = ssl_handshake_timeout 259 self._serving = False 260 self._serving_forever_fut = None 261 262 def __repr__(self): 263 return f'<{self.__class__.__name__} sockets={self.sockets!r}>' 264 265 def _attach(self): 266 assert self._sockets is not None 267 self._active_count += 1 268 269 def _detach(self): 270 assert self._active_count > 0 271 self._active_count -= 1 272 if self._active_count == 0 and self._sockets is None: 273 self._wakeup() 274 275 def _wakeup(self): 276 waiters = self._waiters 277 self._waiters = None 278 for waiter in waiters: 279 if not waiter.done(): 280 waiter.set_result(waiter) 281 282 def _start_serving(self): 283 if self._serving: 284 return 285 self._serving = True 286 for sock in self._sockets: 287 sock.listen(self._backlog) 288 self._loop._start_serving( 289 self._protocol_factory, sock, self._ssl_context, 290 self, self._backlog, self._ssl_handshake_timeout) 291 292 def get_loop(self): 293 return self._loop 294 295 def is_serving(self): 296 return self._serving 297 298 @property 299 def sockets(self): 300 if self._sockets is None: 301 return [] 302 return list(self._sockets) 303 304 def close(self): 305 sockets = self._sockets 306 if sockets is None: 307 return 308 self._sockets = None 309 310 for sock in sockets: 311 self._loop._stop_serving(sock) 312 313 self._serving = False 314 315 if (self._serving_forever_fut is not None and 316 not self._serving_forever_fut.done()): 317 self._serving_forever_fut.cancel() 318 self._serving_forever_fut = None 319 320 if self._active_count == 0: 321 self._wakeup() 322 323 async def start_serving(self): 324 self._start_serving() 325 # Skip one loop iteration so that all 'loop.add_reader' 326 # go through. 327 await tasks.sleep(0, loop=self._loop) 328 329 async def serve_forever(self): 330 if self._serving_forever_fut is not None: 331 raise RuntimeError( 332 f'server {self!r} is already being awaited on serve_forever()') 333 if self._sockets is None: 334 raise RuntimeError(f'server {self!r} is closed') 335 336 self._start_serving() 337 self._serving_forever_fut = self._loop.create_future() 338 339 try: 340 await self._serving_forever_fut 341 except futures.CancelledError: 342 try: 343 self.close() 344 await self.wait_closed() 345 finally: 346 raise 347 finally: 348 self._serving_forever_fut = None 349 350 async def wait_closed(self): 351 if self._sockets is None or self._waiters is None: 352 return 353 waiter = self._loop.create_future() 354 self._waiters.append(waiter) 355 await waiter 356 357 358class BaseEventLoop(events.AbstractEventLoop): 359 360 def __init__(self): 361 self._timer_cancelled_count = 0 362 self._closed = False 363 self._stopping = False 364 self._ready = collections.deque() 365 self._scheduled = [] 366 self._default_executor = None 367 self._internal_fds = 0 368 # Identifier of the thread running the event loop, or None if the 369 # event loop is not running 370 self._thread_id = None 371 self._clock_resolution = time.get_clock_info('monotonic').resolution 372 self._exception_handler = None 373 self.set_debug(coroutines._is_debug_mode()) 374 # In debug mode, if the execution of a callback or a step of a task 375 # exceed this duration in seconds, the slow callback/task is logged. 376 self.slow_callback_duration = 0.1 377 self._current_handle = None 378 self._task_factory = None 379 self._coroutine_origin_tracking_enabled = False 380 self._coroutine_origin_tracking_saved_depth = None 381 382 # A weak set of all asynchronous generators that are 383 # being iterated by the loop. 384 self._asyncgens = weakref.WeakSet() 385 # Set to True when `loop.shutdown_asyncgens` is called. 386 self._asyncgens_shutdown_called = False 387 388 def __repr__(self): 389 return ( 390 f'<{self.__class__.__name__} running={self.is_running()} ' 391 f'closed={self.is_closed()} debug={self.get_debug()}>' 392 ) 393 394 def create_future(self): 395 """Create a Future object attached to the loop.""" 396 return futures.Future(loop=self) 397 398 def create_task(self, coro): 399 """Schedule a coroutine object. 400 401 Return a task object. 402 """ 403 self._check_closed() 404 if self._task_factory is None: 405 task = tasks.Task(coro, loop=self) 406 if task._source_traceback: 407 del task._source_traceback[-1] 408 else: 409 task = self._task_factory(self, coro) 410 return task 411 412 def set_task_factory(self, factory): 413 """Set a task factory that will be used by loop.create_task(). 414 415 If factory is None the default task factory will be set. 416 417 If factory is a callable, it should have a signature matching 418 '(loop, coro)', where 'loop' will be a reference to the active 419 event loop, 'coro' will be a coroutine object. The callable 420 must return a Future. 421 """ 422 if factory is not None and not callable(factory): 423 raise TypeError('task factory must be a callable or None') 424 self._task_factory = factory 425 426 def get_task_factory(self): 427 """Return a task factory, or None if the default one is in use.""" 428 return self._task_factory 429 430 def _make_socket_transport(self, sock, protocol, waiter=None, *, 431 extra=None, server=None): 432 """Create socket transport.""" 433 raise NotImplementedError 434 435 def _make_ssl_transport( 436 self, rawsock, protocol, sslcontext, waiter=None, 437 *, server_side=False, server_hostname=None, 438 extra=None, server=None, 439 ssl_handshake_timeout=None, 440 call_connection_made=True): 441 """Create SSL transport.""" 442 raise NotImplementedError 443 444 def _make_datagram_transport(self, sock, protocol, 445 address=None, waiter=None, extra=None): 446 """Create datagram transport.""" 447 raise NotImplementedError 448 449 def _make_read_pipe_transport(self, pipe, protocol, waiter=None, 450 extra=None): 451 """Create read pipe transport.""" 452 raise NotImplementedError 453 454 def _make_write_pipe_transport(self, pipe, protocol, waiter=None, 455 extra=None): 456 """Create write pipe transport.""" 457 raise NotImplementedError 458 459 async def _make_subprocess_transport(self, protocol, args, shell, 460 stdin, stdout, stderr, bufsize, 461 extra=None, **kwargs): 462 """Create subprocess transport.""" 463 raise NotImplementedError 464 465 def _write_to_self(self): 466 """Write a byte to self-pipe, to wake up the event loop. 467 468 This may be called from a different thread. 469 470 The subclass is responsible for implementing the self-pipe. 471 """ 472 raise NotImplementedError 473 474 def _process_events(self, event_list): 475 """Process selector events.""" 476 raise NotImplementedError 477 478 def _check_closed(self): 479 if self._closed: 480 raise RuntimeError('Event loop is closed') 481 482 def _asyncgen_finalizer_hook(self, agen): 483 self._asyncgens.discard(agen) 484 if not self.is_closed(): 485 self.call_soon_threadsafe(self.create_task, agen.aclose()) 486 487 def _asyncgen_firstiter_hook(self, agen): 488 if self._asyncgens_shutdown_called: 489 warnings.warn( 490 f"asynchronous generator {agen!r} was scheduled after " 491 f"loop.shutdown_asyncgens() call", 492 ResourceWarning, source=self) 493 494 self._asyncgens.add(agen) 495 496 async def shutdown_asyncgens(self): 497 """Shutdown all active asynchronous generators.""" 498 self._asyncgens_shutdown_called = True 499 500 if not len(self._asyncgens): 501 # If Python version is <3.6 or we don't have any asynchronous 502 # generators alive. 503 return 504 505 closing_agens = list(self._asyncgens) 506 self._asyncgens.clear() 507 508 results = await tasks.gather( 509 *[ag.aclose() for ag in closing_agens], 510 return_exceptions=True, 511 loop=self) 512 513 for result, agen in zip(results, closing_agens): 514 if isinstance(result, Exception): 515 self.call_exception_handler({ 516 'message': f'an error occurred during closing of ' 517 f'asynchronous generator {agen!r}', 518 'exception': result, 519 'asyncgen': agen 520 }) 521 522 def run_forever(self): 523 """Run until stop() is called.""" 524 self._check_closed() 525 if self.is_running(): 526 raise RuntimeError('This event loop is already running') 527 if events._get_running_loop() is not None: 528 raise RuntimeError( 529 'Cannot run the event loop while another loop is running') 530 self._set_coroutine_origin_tracking(self._debug) 531 self._thread_id = threading.get_ident() 532 533 old_agen_hooks = sys.get_asyncgen_hooks() 534 sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, 535 finalizer=self._asyncgen_finalizer_hook) 536 try: 537 events._set_running_loop(self) 538 while True: 539 self._run_once() 540 if self._stopping: 541 break 542 finally: 543 self._stopping = False 544 self._thread_id = None 545 events._set_running_loop(None) 546 self._set_coroutine_origin_tracking(False) 547 sys.set_asyncgen_hooks(*old_agen_hooks) 548 549 def run_until_complete(self, future): 550 """Run until the Future is done. 551 552 If the argument is a coroutine, it is wrapped in a Task. 553 554 WARNING: It would be disastrous to call run_until_complete() 555 with the same coroutine twice -- it would wrap it in two 556 different Tasks and that can't be good. 557 558 Return the Future's result, or raise its exception. 559 """ 560 self._check_closed() 561 562 new_task = not futures.isfuture(future) 563 future = tasks.ensure_future(future, loop=self) 564 if new_task: 565 # An exception is raised if the future didn't complete, so there 566 # is no need to log the "destroy pending task" message 567 future._log_destroy_pending = False 568 569 future.add_done_callback(_run_until_complete_cb) 570 try: 571 self.run_forever() 572 except: 573 if new_task and future.done() and not future.cancelled(): 574 # The coroutine raised a BaseException. Consume the exception 575 # to not log a warning, the caller doesn't have access to the 576 # local task. 577 future.exception() 578 raise 579 finally: 580 future.remove_done_callback(_run_until_complete_cb) 581 if not future.done(): 582 raise RuntimeError('Event loop stopped before Future completed.') 583 584 return future.result() 585 586 def stop(self): 587 """Stop running the event loop. 588 589 Every callback already scheduled will still run. This simply informs 590 run_forever to stop looping after a complete iteration. 591 """ 592 self._stopping = True 593 594 def close(self): 595 """Close the event loop. 596 597 This clears the queues and shuts down the executor, 598 but does not wait for the executor to finish. 599 600 The event loop must not be running. 601 """ 602 if self.is_running(): 603 raise RuntimeError("Cannot close a running event loop") 604 if self._closed: 605 return 606 if self._debug: 607 logger.debug("Close %r", self) 608 self._closed = True 609 self._ready.clear() 610 self._scheduled.clear() 611 executor = self._default_executor 612 if executor is not None: 613 self._default_executor = None 614 executor.shutdown(wait=False) 615 616 def is_closed(self): 617 """Returns True if the event loop was closed.""" 618 return self._closed 619 620 def __del__(self): 621 if not self.is_closed(): 622 warnings.warn(f"unclosed event loop {self!r}", ResourceWarning, 623 source=self) 624 if not self.is_running(): 625 self.close() 626 627 def is_running(self): 628 """Returns True if the event loop is running.""" 629 return (self._thread_id is not None) 630 631 def time(self): 632 """Return the time according to the event loop's clock. 633 634 This is a float expressed in seconds since an epoch, but the 635 epoch, precision, accuracy and drift are unspecified and may 636 differ per event loop. 637 """ 638 return time.monotonic() 639 640 def call_later(self, delay, callback, *args, context=None): 641 """Arrange for a callback to be called at a given time. 642 643 Return a Handle: an opaque object with a cancel() method that 644 can be used to cancel the call. 645 646 The delay can be an int or float, expressed in seconds. It is 647 always relative to the current time. 648 649 Each callback will be called exactly once. If two callbacks 650 are scheduled for exactly the same time, it undefined which 651 will be called first. 652 653 Any positional arguments after the callback will be passed to 654 the callback when it is called. 655 """ 656 timer = self.call_at(self.time() + delay, callback, *args, 657 context=context) 658 if timer._source_traceback: 659 del timer._source_traceback[-1] 660 return timer 661 662 def call_at(self, when, callback, *args, context=None): 663 """Like call_later(), but uses an absolute time. 664 665 Absolute time corresponds to the event loop's time() method. 666 """ 667 self._check_closed() 668 if self._debug: 669 self._check_thread() 670 self._check_callback(callback, 'call_at') 671 timer = events.TimerHandle(when, callback, args, self, context) 672 if timer._source_traceback: 673 del timer._source_traceback[-1] 674 heapq.heappush(self._scheduled, timer) 675 timer._scheduled = True 676 return timer 677 678 def call_soon(self, callback, *args, context=None): 679 """Arrange for a callback to be called as soon as possible. 680 681 This operates as a FIFO queue: callbacks are called in the 682 order in which they are registered. Each callback will be 683 called exactly once. 684 685 Any positional arguments after the callback will be passed to 686 the callback when it is called. 687 """ 688 self._check_closed() 689 if self._debug: 690 self._check_thread() 691 self._check_callback(callback, 'call_soon') 692 handle = self._call_soon(callback, args, context) 693 if handle._source_traceback: 694 del handle._source_traceback[-1] 695 return handle 696 697 def _check_callback(self, callback, method): 698 if (coroutines.iscoroutine(callback) or 699 coroutines.iscoroutinefunction(callback)): 700 raise TypeError( 701 f"coroutines cannot be used with {method}()") 702 if not callable(callback): 703 raise TypeError( 704 f'a callable object was expected by {method}(), ' 705 f'got {callback!r}') 706 707 def _call_soon(self, callback, args, context): 708 handle = events.Handle(callback, args, self, context) 709 if handle._source_traceback: 710 del handle._source_traceback[-1] 711 self._ready.append(handle) 712 return handle 713 714 def _check_thread(self): 715 """Check that the current thread is the thread running the event loop. 716 717 Non-thread-safe methods of this class make this assumption and will 718 likely behave incorrectly when the assumption is violated. 719 720 Should only be called when (self._debug == True). The caller is 721 responsible for checking this condition for performance reasons. 722 """ 723 if self._thread_id is None: 724 return 725 thread_id = threading.get_ident() 726 if thread_id != self._thread_id: 727 raise RuntimeError( 728 "Non-thread-safe operation invoked on an event loop other " 729 "than the current one") 730 731 def call_soon_threadsafe(self, callback, *args, context=None): 732 """Like call_soon(), but thread-safe.""" 733 self._check_closed() 734 if self._debug: 735 self._check_callback(callback, 'call_soon_threadsafe') 736 handle = self._call_soon(callback, args, context) 737 if handle._source_traceback: 738 del handle._source_traceback[-1] 739 self._write_to_self() 740 return handle 741 742 def run_in_executor(self, executor, func, *args): 743 self._check_closed() 744 if self._debug: 745 self._check_callback(func, 'run_in_executor') 746 if executor is None: 747 executor = self._default_executor 748 if executor is None: 749 executor = concurrent.futures.ThreadPoolExecutor() 750 self._default_executor = executor 751 return futures.wrap_future( 752 executor.submit(func, *args), loop=self) 753 754 def set_default_executor(self, executor): 755 self._default_executor = executor 756 757 def _getaddrinfo_debug(self, host, port, family, type, proto, flags): 758 msg = [f"{host}:{port!r}"] 759 if family: 760 msg.append(f'family={family!r}') 761 if type: 762 msg.append(f'type={type!r}') 763 if proto: 764 msg.append(f'proto={proto!r}') 765 if flags: 766 msg.append(f'flags={flags!r}') 767 msg = ', '.join(msg) 768 logger.debug('Get address info %s', msg) 769 770 t0 = self.time() 771 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags) 772 dt = self.time() - t0 773 774 msg = f'Getting address info {msg} took {dt * 1e3:.3f}ms: {addrinfo!r}' 775 if dt >= self.slow_callback_duration: 776 logger.info(msg) 777 else: 778 logger.debug(msg) 779 return addrinfo 780 781 async def getaddrinfo(self, host, port, *, 782 family=0, type=0, proto=0, flags=0): 783 if self._debug: 784 getaddr_func = self._getaddrinfo_debug 785 else: 786 getaddr_func = socket.getaddrinfo 787 788 return await self.run_in_executor( 789 None, getaddr_func, host, port, family, type, proto, flags) 790 791 async def getnameinfo(self, sockaddr, flags=0): 792 return await self.run_in_executor( 793 None, socket.getnameinfo, sockaddr, flags) 794 795 async def sock_sendfile(self, sock, file, offset=0, count=None, 796 *, fallback=True): 797 if self._debug and sock.gettimeout() != 0: 798 raise ValueError("the socket must be non-blocking") 799 self._check_sendfile_params(sock, file, offset, count) 800 try: 801 return await self._sock_sendfile_native(sock, file, 802 offset, count) 803 except events.SendfileNotAvailableError as exc: 804 if not fallback: 805 raise 806 return await self._sock_sendfile_fallback(sock, file, 807 offset, count) 808 809 async def _sock_sendfile_native(self, sock, file, offset, count): 810 # NB: sendfile syscall is not supported for SSL sockets and 811 # non-mmap files even if sendfile is supported by OS 812 raise events.SendfileNotAvailableError( 813 f"syscall sendfile is not available for socket {sock!r} " 814 "and file {file!r} combination") 815 816 async def _sock_sendfile_fallback(self, sock, file, offset, count): 817 if offset: 818 file.seek(offset) 819 blocksize = ( 820 min(count, constants.SENDFILE_FALLBACK_READBUFFER_SIZE) 821 if count else constants.SENDFILE_FALLBACK_READBUFFER_SIZE 822 ) 823 buf = bytearray(blocksize) 824 total_sent = 0 825 try: 826 while True: 827 if count: 828 blocksize = min(count - total_sent, blocksize) 829 if blocksize <= 0: 830 break 831 view = memoryview(buf)[:blocksize] 832 read = await self.run_in_executor(None, file.readinto, view) 833 if not read: 834 break # EOF 835 await self.sock_sendall(sock, view) 836 total_sent += read 837 return total_sent 838 finally: 839 if total_sent > 0 and hasattr(file, 'seek'): 840 file.seek(offset + total_sent) 841 842 def _check_sendfile_params(self, sock, file, offset, count): 843 if 'b' not in getattr(file, 'mode', 'b'): 844 raise ValueError("file should be opened in binary mode") 845 if not sock.type == socket.SOCK_STREAM: 846 raise ValueError("only SOCK_STREAM type sockets are supported") 847 if count is not None: 848 if not isinstance(count, int): 849 raise TypeError( 850 "count must be a positive integer (got {!r})".format(count)) 851 if count <= 0: 852 raise ValueError( 853 "count must be a positive integer (got {!r})".format(count)) 854 if not isinstance(offset, int): 855 raise TypeError( 856 "offset must be a non-negative integer (got {!r})".format( 857 offset)) 858 if offset < 0: 859 raise ValueError( 860 "offset must be a non-negative integer (got {!r})".format( 861 offset)) 862 863 async def create_connection( 864 self, protocol_factory, host=None, port=None, 865 *, ssl=None, family=0, 866 proto=0, flags=0, sock=None, 867 local_addr=None, server_hostname=None, 868 ssl_handshake_timeout=None): 869 """Connect to a TCP server. 870 871 Create a streaming transport connection to a given Internet host and 872 port: socket family AF_INET or socket.AF_INET6 depending on host (or 873 family if specified), socket type SOCK_STREAM. protocol_factory must be 874 a callable returning a protocol instance. 875 876 This method is a coroutine which will try to establish the connection 877 in the background. When successful, the coroutine returns a 878 (transport, protocol) pair. 879 """ 880 if server_hostname is not None and not ssl: 881 raise ValueError('server_hostname is only meaningful with ssl') 882 883 if server_hostname is None and ssl: 884 # Use host as default for server_hostname. It is an error 885 # if host is empty or not set, e.g. when an 886 # already-connected socket was passed or when only a port 887 # is given. To avoid this error, you can pass 888 # server_hostname='' -- this will bypass the hostname 889 # check. (This also means that if host is a numeric 890 # IP/IPv6 address, we will attempt to verify that exact 891 # address; this will probably fail, but it is possible to 892 # create a certificate for a specific IP address, so we 893 # don't judge it here.) 894 if not host: 895 raise ValueError('You must set server_hostname ' 896 'when using ssl without a host') 897 server_hostname = host 898 899 if ssl_handshake_timeout is not None and not ssl: 900 raise ValueError( 901 'ssl_handshake_timeout is only meaningful with ssl') 902 903 if host is not None or port is not None: 904 if sock is not None: 905 raise ValueError( 906 'host/port and sock can not be specified at the same time') 907 908 infos = await self._ensure_resolved( 909 (host, port), family=family, 910 type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self) 911 if not infos: 912 raise OSError('getaddrinfo() returned empty list') 913 914 if local_addr is not None: 915 laddr_infos = await self._ensure_resolved( 916 local_addr, family=family, 917 type=socket.SOCK_STREAM, proto=proto, 918 flags=flags, loop=self) 919 if not laddr_infos: 920 raise OSError('getaddrinfo() returned empty list') 921 922 exceptions = [] 923 for family, type, proto, cname, address in infos: 924 try: 925 sock = socket.socket(family=family, type=type, proto=proto) 926 sock.setblocking(False) 927 if local_addr is not None: 928 for _, _, _, _, laddr in laddr_infos: 929 try: 930 sock.bind(laddr) 931 break 932 except OSError as exc: 933 msg = ( 934 f'error while attempting to bind on ' 935 f'address {laddr!r}: ' 936 f'{exc.strerror.lower()}' 937 ) 938 exc = OSError(exc.errno, msg) 939 exceptions.append(exc) 940 else: 941 sock.close() 942 sock = None 943 continue 944 if self._debug: 945 logger.debug("connect %r to %r", sock, address) 946 await self.sock_connect(sock, address) 947 except OSError as exc: 948 if sock is not None: 949 sock.close() 950 exceptions.append(exc) 951 except: 952 if sock is not None: 953 sock.close() 954 raise 955 else: 956 break 957 else: 958 if len(exceptions) == 1: 959 raise exceptions[0] 960 else: 961 # If they all have the same str(), raise one. 962 model = str(exceptions[0]) 963 if all(str(exc) == model for exc in exceptions): 964 raise exceptions[0] 965 # Raise a combined exception so the user can see all 966 # the various error messages. 967 raise OSError('Multiple exceptions: {}'.format( 968 ', '.join(str(exc) for exc in exceptions))) 969 970 else: 971 if sock is None: 972 raise ValueError( 973 'host and port was not specified and no sock specified') 974 if sock.type != socket.SOCK_STREAM: 975 # We allow AF_INET, AF_INET6, AF_UNIX as long as they 976 # are SOCK_STREAM. 977 # We support passing AF_UNIX sockets even though we have 978 # a dedicated API for that: create_unix_connection. 979 # Disallowing AF_UNIX in this method, breaks backwards 980 # compatibility. 981 raise ValueError( 982 f'A Stream Socket was expected, got {sock!r}') 983 984 transport, protocol = await self._create_connection_transport( 985 sock, protocol_factory, ssl, server_hostname, 986 ssl_handshake_timeout=ssl_handshake_timeout) 987 if self._debug: 988 # Get the socket from the transport because SSL transport closes 989 # the old socket and creates a new SSL socket 990 sock = transport.get_extra_info('socket') 991 logger.debug("%r connected to %s:%r: (%r, %r)", 992 sock, host, port, transport, protocol) 993 return transport, protocol 994 995 async def _create_connection_transport( 996 self, sock, protocol_factory, ssl, 997 server_hostname, server_side=False, 998 ssl_handshake_timeout=None): 999 1000 sock.setblocking(False) 1001 1002 protocol = protocol_factory() 1003 waiter = self.create_future() 1004 if ssl: 1005 sslcontext = None if isinstance(ssl, bool) else ssl 1006 transport = self._make_ssl_transport( 1007 sock, protocol, sslcontext, waiter, 1008 server_side=server_side, server_hostname=server_hostname, 1009 ssl_handshake_timeout=ssl_handshake_timeout) 1010 else: 1011 transport = self._make_socket_transport(sock, protocol, waiter) 1012 1013 try: 1014 await waiter 1015 except: 1016 transport.close() 1017 raise 1018 1019 return transport, protocol 1020 1021 async def sendfile(self, transport, file, offset=0, count=None, 1022 *, fallback=True): 1023 """Send a file to transport. 1024 1025 Return the total number of bytes which were sent. 1026 1027 The method uses high-performance os.sendfile if available. 1028 1029 file must be a regular file object opened in binary mode. 1030 1031 offset tells from where to start reading the file. If specified, 1032 count is the total number of bytes to transmit as opposed to 1033 sending the file until EOF is reached. File position is updated on 1034 return or also in case of error in which case file.tell() 1035 can be used to figure out the number of bytes 1036 which were sent. 1037 1038 fallback set to True makes asyncio to manually read and send 1039 the file when the platform does not support the sendfile syscall 1040 (e.g. Windows or SSL socket on Unix). 1041 1042 Raise SendfileNotAvailableError if the system does not support 1043 sendfile syscall and fallback is False. 1044 """ 1045 if transport.is_closing(): 1046 raise RuntimeError("Transport is closing") 1047 mode = getattr(transport, '_sendfile_compatible', 1048 constants._SendfileMode.UNSUPPORTED) 1049 if mode is constants._SendfileMode.UNSUPPORTED: 1050 raise RuntimeError( 1051 f"sendfile is not supported for transport {transport!r}") 1052 if mode is constants._SendfileMode.TRY_NATIVE: 1053 try: 1054 return await self._sendfile_native(transport, file, 1055 offset, count) 1056 except events.SendfileNotAvailableError as exc: 1057 if not fallback: 1058 raise 1059 1060 if not fallback: 1061 raise RuntimeError( 1062 f"fallback is disabled and native sendfile is not " 1063 f"supported for transport {transport!r}") 1064 1065 return await self._sendfile_fallback(transport, file, 1066 offset, count) 1067 1068 async def _sendfile_native(self, transp, file, offset, count): 1069 raise events.SendfileNotAvailableError( 1070 "sendfile syscall is not supported") 1071 1072 async def _sendfile_fallback(self, transp, file, offset, count): 1073 if offset: 1074 file.seek(offset) 1075 blocksize = min(count, 16384) if count else 16384 1076 buf = bytearray(blocksize) 1077 total_sent = 0 1078 proto = _SendfileFallbackProtocol(transp) 1079 try: 1080 while True: 1081 if count: 1082 blocksize = min(count - total_sent, blocksize) 1083 if blocksize <= 0: 1084 return total_sent 1085 view = memoryview(buf)[:blocksize] 1086 read = file.readinto(view) 1087 if not read: 1088 return total_sent # EOF 1089 await proto.drain() 1090 transp.write(view) 1091 total_sent += read 1092 finally: 1093 if total_sent > 0 and hasattr(file, 'seek'): 1094 file.seek(offset + total_sent) 1095 await proto.restore() 1096 1097 async def start_tls(self, transport, protocol, sslcontext, *, 1098 server_side=False, 1099 server_hostname=None, 1100 ssl_handshake_timeout=None): 1101 """Upgrade transport to TLS. 1102 1103 Return a new transport that *protocol* should start using 1104 immediately. 1105 """ 1106 if ssl is None: 1107 raise RuntimeError('Python ssl module is not available') 1108 1109 if not isinstance(sslcontext, ssl.SSLContext): 1110 raise TypeError( 1111 f'sslcontext is expected to be an instance of ssl.SSLContext, ' 1112 f'got {sslcontext!r}') 1113 1114 if not getattr(transport, '_start_tls_compatible', False): 1115 raise TypeError( 1116 f'transport {transport!r} is not supported by start_tls()') 1117 1118 waiter = self.create_future() 1119 ssl_protocol = sslproto.SSLProtocol( 1120 self, protocol, sslcontext, waiter, 1121 server_side, server_hostname, 1122 ssl_handshake_timeout=ssl_handshake_timeout, 1123 call_connection_made=False) 1124 1125 # Pause early so that "ssl_protocol.data_received()" doesn't 1126 # have a chance to get called before "ssl_protocol.connection_made()". 1127 transport.pause_reading() 1128 1129 transport.set_protocol(ssl_protocol) 1130 conmade_cb = self.call_soon(ssl_protocol.connection_made, transport) 1131 resume_cb = self.call_soon(transport.resume_reading) 1132 1133 try: 1134 await waiter 1135 except Exception: 1136 transport.close() 1137 conmade_cb.cancel() 1138 resume_cb.cancel() 1139 raise 1140 1141 return ssl_protocol._app_transport 1142 1143 async def create_datagram_endpoint(self, protocol_factory, 1144 local_addr=None, remote_addr=None, *, 1145 family=0, proto=0, flags=0, 1146 reuse_address=None, reuse_port=None, 1147 allow_broadcast=None, sock=None): 1148 """Create datagram connection.""" 1149 if sock is not None: 1150 if sock.type != socket.SOCK_DGRAM: 1151 raise ValueError( 1152 f'A UDP Socket was expected, got {sock!r}') 1153 if (local_addr or remote_addr or 1154 family or proto or flags or 1155 reuse_address or reuse_port or allow_broadcast): 1156 # show the problematic kwargs in exception msg 1157 opts = dict(local_addr=local_addr, remote_addr=remote_addr, 1158 family=family, proto=proto, flags=flags, 1159 reuse_address=reuse_address, reuse_port=reuse_port, 1160 allow_broadcast=allow_broadcast) 1161 problems = ', '.join(f'{k}={v}' for k, v in opts.items() if v) 1162 raise ValueError( 1163 f'socket modifier keyword arguments can not be used ' 1164 f'when sock is specified. ({problems})') 1165 sock.setblocking(False) 1166 r_addr = None 1167 else: 1168 if not (local_addr or remote_addr): 1169 if family == 0: 1170 raise ValueError('unexpected address family') 1171 addr_pairs_info = (((family, proto), (None, None)),) 1172 elif hasattr(socket, 'AF_UNIX') and family == socket.AF_UNIX: 1173 for addr in (local_addr, remote_addr): 1174 if addr is not None and not isinstance(addr, str): 1175 raise TypeError('string is expected') 1176 addr_pairs_info = (((family, proto), 1177 (local_addr, remote_addr)), ) 1178 else: 1179 # join address by (family, protocol) 1180 addr_infos = collections.OrderedDict() 1181 for idx, addr in ((0, local_addr), (1, remote_addr)): 1182 if addr is not None: 1183 assert isinstance(addr, tuple) and len(addr) == 2, ( 1184 '2-tuple is expected') 1185 1186 infos = await self._ensure_resolved( 1187 addr, family=family, type=socket.SOCK_DGRAM, 1188 proto=proto, flags=flags, loop=self) 1189 if not infos: 1190 raise OSError('getaddrinfo() returned empty list') 1191 1192 for fam, _, pro, _, address in infos: 1193 key = (fam, pro) 1194 if key not in addr_infos: 1195 addr_infos[key] = [None, None] 1196 addr_infos[key][idx] = address 1197 1198 # each addr has to have info for each (family, proto) pair 1199 addr_pairs_info = [ 1200 (key, addr_pair) for key, addr_pair in addr_infos.items() 1201 if not ((local_addr and addr_pair[0] is None) or 1202 (remote_addr and addr_pair[1] is None))] 1203 1204 if not addr_pairs_info: 1205 raise ValueError('can not get address information') 1206 1207 exceptions = [] 1208 1209 if reuse_address is None: 1210 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 1211 1212 for ((family, proto), 1213 (local_address, remote_address)) in addr_pairs_info: 1214 sock = None 1215 r_addr = None 1216 try: 1217 sock = socket.socket( 1218 family=family, type=socket.SOCK_DGRAM, proto=proto) 1219 if reuse_address: 1220 sock.setsockopt( 1221 socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1222 if reuse_port: 1223 _set_reuseport(sock) 1224 if allow_broadcast: 1225 sock.setsockopt( 1226 socket.SOL_SOCKET, socket.SO_BROADCAST, 1) 1227 sock.setblocking(False) 1228 1229 if local_addr: 1230 sock.bind(local_address) 1231 if remote_addr: 1232 await self.sock_connect(sock, remote_address) 1233 r_addr = remote_address 1234 except OSError as exc: 1235 if sock is not None: 1236 sock.close() 1237 exceptions.append(exc) 1238 except: 1239 if sock is not None: 1240 sock.close() 1241 raise 1242 else: 1243 break 1244 else: 1245 raise exceptions[0] 1246 1247 protocol = protocol_factory() 1248 waiter = self.create_future() 1249 transport = self._make_datagram_transport( 1250 sock, protocol, r_addr, waiter) 1251 if self._debug: 1252 if local_addr: 1253 logger.info("Datagram endpoint local_addr=%r remote_addr=%r " 1254 "created: (%r, %r)", 1255 local_addr, remote_addr, transport, protocol) 1256 else: 1257 logger.debug("Datagram endpoint remote_addr=%r created: " 1258 "(%r, %r)", 1259 remote_addr, transport, protocol) 1260 1261 try: 1262 await waiter 1263 except: 1264 transport.close() 1265 raise 1266 1267 return transport, protocol 1268 1269 async def _ensure_resolved(self, address, *, 1270 family=0, type=socket.SOCK_STREAM, 1271 proto=0, flags=0, loop): 1272 host, port = address[:2] 1273 info = _ipaddr_info(host, port, family, type, proto) 1274 if info is not None: 1275 # "host" is already a resolved IP. 1276 return [info] 1277 else: 1278 return await loop.getaddrinfo(host, port, family=family, type=type, 1279 proto=proto, flags=flags) 1280 1281 async def _create_server_getaddrinfo(self, host, port, family, flags): 1282 infos = await self._ensure_resolved((host, port), family=family, 1283 type=socket.SOCK_STREAM, 1284 flags=flags, loop=self) 1285 if not infos: 1286 raise OSError(f'getaddrinfo({host!r}) returned empty list') 1287 return infos 1288 1289 async def create_server( 1290 self, protocol_factory, host=None, port=None, 1291 *, 1292 family=socket.AF_UNSPEC, 1293 flags=socket.AI_PASSIVE, 1294 sock=None, 1295 backlog=100, 1296 ssl=None, 1297 reuse_address=None, 1298 reuse_port=None, 1299 ssl_handshake_timeout=None, 1300 start_serving=True): 1301 """Create a TCP server. 1302 1303 The host parameter can be a string, in that case the TCP server is 1304 bound to host and port. 1305 1306 The host parameter can also be a sequence of strings and in that case 1307 the TCP server is bound to all hosts of the sequence. If a host 1308 appears multiple times (possibly indirectly e.g. when hostnames 1309 resolve to the same IP address), the server is only bound once to that 1310 host. 1311 1312 Return a Server object which can be used to stop the service. 1313 1314 This method is a coroutine. 1315 """ 1316 if isinstance(ssl, bool): 1317 raise TypeError('ssl argument must be an SSLContext or None') 1318 1319 if ssl_handshake_timeout is not None and ssl is None: 1320 raise ValueError( 1321 'ssl_handshake_timeout is only meaningful with ssl') 1322 1323 if host is not None or port is not None: 1324 if sock is not None: 1325 raise ValueError( 1326 'host/port and sock can not be specified at the same time') 1327 1328 if reuse_address is None: 1329 reuse_address = os.name == 'posix' and sys.platform != 'cygwin' 1330 sockets = [] 1331 if host == '': 1332 hosts = [None] 1333 elif (isinstance(host, str) or 1334 not isinstance(host, collections.abc.Iterable)): 1335 hosts = [host] 1336 else: 1337 hosts = host 1338 1339 fs = [self._create_server_getaddrinfo(host, port, family=family, 1340 flags=flags) 1341 for host in hosts] 1342 infos = await tasks.gather(*fs, loop=self) 1343 infos = set(itertools.chain.from_iterable(infos)) 1344 1345 completed = False 1346 try: 1347 for res in infos: 1348 af, socktype, proto, canonname, sa = res 1349 try: 1350 sock = socket.socket(af, socktype, proto) 1351 except socket.error: 1352 # Assume it's a bad family/type/protocol combination. 1353 if self._debug: 1354 logger.warning('create_server() failed to create ' 1355 'socket.socket(%r, %r, %r)', 1356 af, socktype, proto, exc_info=True) 1357 continue 1358 sockets.append(sock) 1359 if reuse_address: 1360 sock.setsockopt( 1361 socket.SOL_SOCKET, socket.SO_REUSEADDR, True) 1362 if reuse_port: 1363 _set_reuseport(sock) 1364 # Disable IPv4/IPv6 dual stack support (enabled by 1365 # default on Linux) which makes a single socket 1366 # listen on both address families. 1367 if (_HAS_IPv6 and 1368 af == socket.AF_INET6 and 1369 hasattr(socket, 'IPPROTO_IPV6')): 1370 sock.setsockopt(socket.IPPROTO_IPV6, 1371 socket.IPV6_V6ONLY, 1372 True) 1373 try: 1374 sock.bind(sa) 1375 except OSError as err: 1376 raise OSError(err.errno, 'error while attempting ' 1377 'to bind on address %r: %s' 1378 % (sa, err.strerror.lower())) from None 1379 completed = True 1380 finally: 1381 if not completed: 1382 for sock in sockets: 1383 sock.close() 1384 else: 1385 if sock is None: 1386 raise ValueError('Neither host/port nor sock were specified') 1387 if sock.type != socket.SOCK_STREAM: 1388 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1389 sockets = [sock] 1390 1391 for sock in sockets: 1392 sock.setblocking(False) 1393 1394 server = Server(self, sockets, protocol_factory, 1395 ssl, backlog, ssl_handshake_timeout) 1396 if start_serving: 1397 server._start_serving() 1398 # Skip one loop iteration so that all 'loop.add_reader' 1399 # go through. 1400 await tasks.sleep(0, loop=self) 1401 1402 if self._debug: 1403 logger.info("%r is serving", server) 1404 return server 1405 1406 async def connect_accepted_socket( 1407 self, protocol_factory, sock, 1408 *, ssl=None, 1409 ssl_handshake_timeout=None): 1410 """Handle an accepted connection. 1411 1412 This is used by servers that accept connections outside of 1413 asyncio but that use asyncio to handle connections. 1414 1415 This method is a coroutine. When completed, the coroutine 1416 returns a (transport, protocol) pair. 1417 """ 1418 if sock.type != socket.SOCK_STREAM: 1419 raise ValueError(f'A Stream Socket was expected, got {sock!r}') 1420 1421 if ssl_handshake_timeout is not None and not ssl: 1422 raise ValueError( 1423 'ssl_handshake_timeout is only meaningful with ssl') 1424 1425 transport, protocol = await self._create_connection_transport( 1426 sock, protocol_factory, ssl, '', server_side=True, 1427 ssl_handshake_timeout=ssl_handshake_timeout) 1428 if self._debug: 1429 # Get the socket from the transport because SSL transport closes 1430 # the old socket and creates a new SSL socket 1431 sock = transport.get_extra_info('socket') 1432 logger.debug("%r handled: (%r, %r)", sock, transport, protocol) 1433 return transport, protocol 1434 1435 async def connect_read_pipe(self, protocol_factory, pipe): 1436 protocol = protocol_factory() 1437 waiter = self.create_future() 1438 transport = self._make_read_pipe_transport(pipe, protocol, waiter) 1439 1440 try: 1441 await waiter 1442 except: 1443 transport.close() 1444 raise 1445 1446 if self._debug: 1447 logger.debug('Read pipe %r connected: (%r, %r)', 1448 pipe.fileno(), transport, protocol) 1449 return transport, protocol 1450 1451 async def connect_write_pipe(self, protocol_factory, pipe): 1452 protocol = protocol_factory() 1453 waiter = self.create_future() 1454 transport = self._make_write_pipe_transport(pipe, protocol, waiter) 1455 1456 try: 1457 await waiter 1458 except: 1459 transport.close() 1460 raise 1461 1462 if self._debug: 1463 logger.debug('Write pipe %r connected: (%r, %r)', 1464 pipe.fileno(), transport, protocol) 1465 return transport, protocol 1466 1467 def _log_subprocess(self, msg, stdin, stdout, stderr): 1468 info = [msg] 1469 if stdin is not None: 1470 info.append(f'stdin={_format_pipe(stdin)}') 1471 if stdout is not None and stderr == subprocess.STDOUT: 1472 info.append(f'stdout=stderr={_format_pipe(stdout)}') 1473 else: 1474 if stdout is not None: 1475 info.append(f'stdout={_format_pipe(stdout)}') 1476 if stderr is not None: 1477 info.append(f'stderr={_format_pipe(stderr)}') 1478 logger.debug(' '.join(info)) 1479 1480 async def subprocess_shell(self, protocol_factory, cmd, *, 1481 stdin=subprocess.PIPE, 1482 stdout=subprocess.PIPE, 1483 stderr=subprocess.PIPE, 1484 universal_newlines=False, 1485 shell=True, bufsize=0, 1486 **kwargs): 1487 if not isinstance(cmd, (bytes, str)): 1488 raise ValueError("cmd must be a string") 1489 if universal_newlines: 1490 raise ValueError("universal_newlines must be False") 1491 if not shell: 1492 raise ValueError("shell must be True") 1493 if bufsize != 0: 1494 raise ValueError("bufsize must be 0") 1495 protocol = protocol_factory() 1496 debug_log = None 1497 if self._debug: 1498 # don't log parameters: they may contain sensitive information 1499 # (password) and may be too long 1500 debug_log = 'run shell command %r' % cmd 1501 self._log_subprocess(debug_log, stdin, stdout, stderr) 1502 transport = await self._make_subprocess_transport( 1503 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs) 1504 if self._debug and debug_log is not None: 1505 logger.info('%s: %r', debug_log, transport) 1506 return transport, protocol 1507 1508 async def subprocess_exec(self, protocol_factory, program, *args, 1509 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 1510 stderr=subprocess.PIPE, universal_newlines=False, 1511 shell=False, bufsize=0, **kwargs): 1512 if universal_newlines: 1513 raise ValueError("universal_newlines must be False") 1514 if shell: 1515 raise ValueError("shell must be False") 1516 if bufsize != 0: 1517 raise ValueError("bufsize must be 0") 1518 popen_args = (program,) + args 1519 for arg in popen_args: 1520 if not isinstance(arg, (str, bytes)): 1521 raise TypeError( 1522 f"program arguments must be a bytes or text string, " 1523 f"not {type(arg).__name__}") 1524 protocol = protocol_factory() 1525 debug_log = None 1526 if self._debug: 1527 # don't log parameters: they may contain sensitive information 1528 # (password) and may be too long 1529 debug_log = f'execute program {program!r}' 1530 self._log_subprocess(debug_log, stdin, stdout, stderr) 1531 transport = await self._make_subprocess_transport( 1532 protocol, popen_args, False, stdin, stdout, stderr, 1533 bufsize, **kwargs) 1534 if self._debug and debug_log is not None: 1535 logger.info('%s: %r', debug_log, transport) 1536 return transport, protocol 1537 1538 def get_exception_handler(self): 1539 """Return an exception handler, or None if the default one is in use. 1540 """ 1541 return self._exception_handler 1542 1543 def set_exception_handler(self, handler): 1544 """Set handler as the new event loop exception handler. 1545 1546 If handler is None, the default exception handler will 1547 be set. 1548 1549 If handler is a callable object, it should have a 1550 signature matching '(loop, context)', where 'loop' 1551 will be a reference to the active event loop, 'context' 1552 will be a dict object (see `call_exception_handler()` 1553 documentation for details about context). 1554 """ 1555 if handler is not None and not callable(handler): 1556 raise TypeError(f'A callable object or None is expected, ' 1557 f'got {handler!r}') 1558 self._exception_handler = handler 1559 1560 def default_exception_handler(self, context): 1561 """Default exception handler. 1562 1563 This is called when an exception occurs and no exception 1564 handler is set, and can be called by a custom exception 1565 handler that wants to defer to the default behavior. 1566 1567 This default handler logs the error message and other 1568 context-dependent information. In debug mode, a truncated 1569 stack trace is also appended showing where the given object 1570 (e.g. a handle or future or task) was created, if any. 1571 1572 The context parameter has the same meaning as in 1573 `call_exception_handler()`. 1574 """ 1575 message = context.get('message') 1576 if not message: 1577 message = 'Unhandled exception in event loop' 1578 1579 exception = context.get('exception') 1580 if exception is not None: 1581 exc_info = (type(exception), exception, exception.__traceback__) 1582 else: 1583 exc_info = False 1584 1585 if ('source_traceback' not in context and 1586 self._current_handle is not None and 1587 self._current_handle._source_traceback): 1588 context['handle_traceback'] = \ 1589 self._current_handle._source_traceback 1590 1591 log_lines = [message] 1592 for key in sorted(context): 1593 if key in {'message', 'exception'}: 1594 continue 1595 value = context[key] 1596 if key == 'source_traceback': 1597 tb = ''.join(traceback.format_list(value)) 1598 value = 'Object created at (most recent call last):\n' 1599 value += tb.rstrip() 1600 elif key == 'handle_traceback': 1601 tb = ''.join(traceback.format_list(value)) 1602 value = 'Handle created at (most recent call last):\n' 1603 value += tb.rstrip() 1604 else: 1605 value = repr(value) 1606 log_lines.append(f'{key}: {value}') 1607 1608 logger.error('\n'.join(log_lines), exc_info=exc_info) 1609 1610 def call_exception_handler(self, context): 1611 """Call the current event loop's exception handler. 1612 1613 The context argument is a dict containing the following keys: 1614 1615 - 'message': Error message; 1616 - 'exception' (optional): Exception object; 1617 - 'future' (optional): Future instance; 1618 - 'task' (optional): Task instance; 1619 - 'handle' (optional): Handle instance; 1620 - 'protocol' (optional): Protocol instance; 1621 - 'transport' (optional): Transport instance; 1622 - 'socket' (optional): Socket instance; 1623 - 'asyncgen' (optional): Asynchronous generator that caused 1624 the exception. 1625 1626 New keys maybe introduced in the future. 1627 1628 Note: do not overload this method in an event loop subclass. 1629 For custom exception handling, use the 1630 `set_exception_handler()` method. 1631 """ 1632 if self._exception_handler is None: 1633 try: 1634 self.default_exception_handler(context) 1635 except Exception: 1636 # Second protection layer for unexpected errors 1637 # in the default implementation, as well as for subclassed 1638 # event loops with overloaded "default_exception_handler". 1639 logger.error('Exception in default exception handler', 1640 exc_info=True) 1641 else: 1642 try: 1643 self._exception_handler(self, context) 1644 except Exception as exc: 1645 # Exception in the user set custom exception handler. 1646 try: 1647 # Let's try default handler. 1648 self.default_exception_handler({ 1649 'message': 'Unhandled error in exception handler', 1650 'exception': exc, 1651 'context': context, 1652 }) 1653 except Exception: 1654 # Guard 'default_exception_handler' in case it is 1655 # overloaded. 1656 logger.error('Exception in default exception handler ' 1657 'while handling an unexpected error ' 1658 'in custom exception handler', 1659 exc_info=True) 1660 1661 def _add_callback(self, handle): 1662 """Add a Handle to _scheduled (TimerHandle) or _ready.""" 1663 assert isinstance(handle, events.Handle), 'A Handle is required here' 1664 if handle._cancelled: 1665 return 1666 assert not isinstance(handle, events.TimerHandle) 1667 self._ready.append(handle) 1668 1669 def _add_callback_signalsafe(self, handle): 1670 """Like _add_callback() but called from a signal handler.""" 1671 self._add_callback(handle) 1672 self._write_to_self() 1673 1674 def _timer_handle_cancelled(self, handle): 1675 """Notification that a TimerHandle has been cancelled.""" 1676 if handle._scheduled: 1677 self._timer_cancelled_count += 1 1678 1679 def _run_once(self): 1680 """Run one full iteration of the event loop. 1681 1682 This calls all currently ready callbacks, polls for I/O, 1683 schedules the resulting callbacks, and finally schedules 1684 'call_later' callbacks. 1685 """ 1686 1687 sched_count = len(self._scheduled) 1688 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and 1689 self._timer_cancelled_count / sched_count > 1690 _MIN_CANCELLED_TIMER_HANDLES_FRACTION): 1691 # Remove delayed calls that were cancelled if their number 1692 # is too high 1693 new_scheduled = [] 1694 for handle in self._scheduled: 1695 if handle._cancelled: 1696 handle._scheduled = False 1697 else: 1698 new_scheduled.append(handle) 1699 1700 heapq.heapify(new_scheduled) 1701 self._scheduled = new_scheduled 1702 self._timer_cancelled_count = 0 1703 else: 1704 # Remove delayed calls that were cancelled from head of queue. 1705 while self._scheduled and self._scheduled[0]._cancelled: 1706 self._timer_cancelled_count -= 1 1707 handle = heapq.heappop(self._scheduled) 1708 handle._scheduled = False 1709 1710 timeout = None 1711 if self._ready or self._stopping: 1712 timeout = 0 1713 elif self._scheduled: 1714 # Compute the desired timeout. 1715 when = self._scheduled[0]._when 1716 timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT) 1717 1718 if self._debug and timeout != 0: 1719 t0 = self.time() 1720 event_list = self._selector.select(timeout) 1721 dt = self.time() - t0 1722 if dt >= 1.0: 1723 level = logging.INFO 1724 else: 1725 level = logging.DEBUG 1726 nevent = len(event_list) 1727 if timeout is None: 1728 logger.log(level, 'poll took %.3f ms: %s events', 1729 dt * 1e3, nevent) 1730 elif nevent: 1731 logger.log(level, 1732 'poll %.3f ms took %.3f ms: %s events', 1733 timeout * 1e3, dt * 1e3, nevent) 1734 elif dt >= 1.0: 1735 logger.log(level, 1736 'poll %.3f ms took %.3f ms: timeout', 1737 timeout * 1e3, dt * 1e3) 1738 else: 1739 event_list = self._selector.select(timeout) 1740 self._process_events(event_list) 1741 1742 # Handle 'later' callbacks that are ready. 1743 end_time = self.time() + self._clock_resolution 1744 while self._scheduled: 1745 handle = self._scheduled[0] 1746 if handle._when >= end_time: 1747 break 1748 handle = heapq.heappop(self._scheduled) 1749 handle._scheduled = False 1750 self._ready.append(handle) 1751 1752 # This is the only place where callbacks are actually *called*. 1753 # All other places just add them to ready. 1754 # Note: We run all currently scheduled callbacks, but not any 1755 # callbacks scheduled by callbacks run this time around -- 1756 # they will be run the next time (after another I/O poll). 1757 # Use an idiom that is thread-safe without using locks. 1758 ntodo = len(self._ready) 1759 for i in range(ntodo): 1760 handle = self._ready.popleft() 1761 if handle._cancelled: 1762 continue 1763 if self._debug: 1764 try: 1765 self._current_handle = handle 1766 t0 = self.time() 1767 handle._run() 1768 dt = self.time() - t0 1769 if dt >= self.slow_callback_duration: 1770 logger.warning('Executing %s took %.3f seconds', 1771 _format_handle(handle), dt) 1772 finally: 1773 self._current_handle = None 1774 else: 1775 handle._run() 1776 handle = None # Needed to break cycles when an exception occurs. 1777 1778 def _set_coroutine_origin_tracking(self, enabled): 1779 if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): 1780 return 1781 1782 if enabled: 1783 self._coroutine_origin_tracking_saved_depth = ( 1784 sys.get_coroutine_origin_tracking_depth()) 1785 sys.set_coroutine_origin_tracking_depth( 1786 constants.DEBUG_STACK_DEPTH) 1787 else: 1788 sys.set_coroutine_origin_tracking_depth( 1789 self._coroutine_origin_tracking_saved_depth) 1790 1791 self._coroutine_origin_tracking_enabled = enabled 1792 1793 def get_debug(self): 1794 return self._debug 1795 1796 def set_debug(self, enabled): 1797 self._debug = enabled 1798 1799 if self.is_running(): 1800 self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) 1801