1"""Event loop and event loop policy.""" 2 3__all__ = ( 4 'AbstractEventLoopPolicy', 5 'AbstractEventLoop', 'AbstractServer', 6 'Handle', 'TimerHandle', 7 'get_event_loop_policy', 'set_event_loop_policy', 8 'get_event_loop', 'set_event_loop', 'new_event_loop', 9 'get_child_watcher', 'set_child_watcher', 10 '_set_running_loop', 'get_running_loop', 11 '_get_running_loop', 12) 13 14import contextvars 15import os 16import socket 17import subprocess 18import sys 19import threading 20 21from . import format_helpers 22 23 24class Handle: 25 """Object returned by callback registration methods.""" 26 27 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 28 '_source_traceback', '_repr', '__weakref__', 29 '_context') 30 31 def __init__(self, callback, args, loop, context=None): 32 if context is None: 33 context = contextvars.copy_context() 34 self._context = context 35 self._loop = loop 36 self._callback = callback 37 self._args = args 38 self._cancelled = False 39 self._repr = None 40 if self._loop.get_debug(): 41 self._source_traceback = format_helpers.extract_stack( 42 sys._getframe(1)) 43 else: 44 self._source_traceback = None 45 46 def _repr_info(self): 47 info = [self.__class__.__name__] 48 if self._cancelled: 49 info.append('cancelled') 50 if self._callback is not None: 51 info.append(format_helpers._format_callback_source( 52 self._callback, self._args)) 53 if self._source_traceback: 54 frame = self._source_traceback[-1] 55 info.append(f'created at {frame[0]}:{frame[1]}') 56 return info 57 58 def __repr__(self): 59 if self._repr is not None: 60 return self._repr 61 info = self._repr_info() 62 return '<{}>'.format(' '.join(info)) 63 64 def cancel(self): 65 if not self._cancelled: 66 self._cancelled = True 67 if self._loop.get_debug(): 68 # Keep a representation in debug mode to keep callback and 69 # parameters. For example, to log the warning 70 # "Executing <Handle...> took 2.5 second" 71 self._repr = repr(self) 72 self._callback = None 73 self._args = None 74 75 def cancelled(self): 76 return self._cancelled 77 78 def _run(self): 79 try: 80 self._context.run(self._callback, *self._args) 81 except (SystemExit, KeyboardInterrupt): 82 raise 83 except BaseException as exc: 84 cb = format_helpers._format_callback_source( 85 self._callback, self._args) 86 msg = f'Exception in callback {cb}' 87 context = { 88 'message': msg, 89 'exception': exc, 90 'handle': self, 91 } 92 if self._source_traceback: 93 context['source_traceback'] = self._source_traceback 94 self._loop.call_exception_handler(context) 95 self = None # Needed to break cycles when an exception occurs. 96 97 98class TimerHandle(Handle): 99 """Object returned by timed callback registration methods.""" 100 101 __slots__ = ['_scheduled', '_when'] 102 103 def __init__(self, when, callback, args, loop, context=None): 104 assert when is not None 105 super().__init__(callback, args, loop, context) 106 if self._source_traceback: 107 del self._source_traceback[-1] 108 self._when = when 109 self._scheduled = False 110 111 def _repr_info(self): 112 info = super()._repr_info() 113 pos = 2 if self._cancelled else 1 114 info.insert(pos, f'when={self._when}') 115 return info 116 117 def __hash__(self): 118 return hash(self._when) 119 120 def __lt__(self, other): 121 if isinstance(other, TimerHandle): 122 return self._when < other._when 123 return NotImplemented 124 125 def __le__(self, other): 126 if isinstance(other, TimerHandle): 127 return self._when < other._when or self.__eq__(other) 128 return NotImplemented 129 130 def __gt__(self, other): 131 if isinstance(other, TimerHandle): 132 return self._when > other._when 133 return NotImplemented 134 135 def __ge__(self, other): 136 if isinstance(other, TimerHandle): 137 return self._when > other._when or self.__eq__(other) 138 return NotImplemented 139 140 def __eq__(self, other): 141 if isinstance(other, TimerHandle): 142 return (self._when == other._when and 143 self._callback == other._callback and 144 self._args == other._args and 145 self._cancelled == other._cancelled) 146 return NotImplemented 147 148 def cancel(self): 149 if not self._cancelled: 150 self._loop._timer_handle_cancelled(self) 151 super().cancel() 152 153 def when(self): 154 """Return a scheduled callback time. 155 156 The time is an absolute timestamp, using the same time 157 reference as loop.time(). 158 """ 159 return self._when 160 161 162class AbstractServer: 163 """Abstract server returned by create_server().""" 164 165 def close(self): 166 """Stop serving. This leaves existing connections open.""" 167 raise NotImplementedError 168 169 def get_loop(self): 170 """Get the event loop the Server object is attached to.""" 171 raise NotImplementedError 172 173 def is_serving(self): 174 """Return True if the server is accepting connections.""" 175 raise NotImplementedError 176 177 async def start_serving(self): 178 """Start accepting connections. 179 180 This method is idempotent, so it can be called when 181 the server is already being serving. 182 """ 183 raise NotImplementedError 184 185 async def serve_forever(self): 186 """Start accepting connections until the coroutine is cancelled. 187 188 The server is closed when the coroutine is cancelled. 189 """ 190 raise NotImplementedError 191 192 async def wait_closed(self): 193 """Coroutine to wait until service is closed.""" 194 raise NotImplementedError 195 196 async def __aenter__(self): 197 return self 198 199 async def __aexit__(self, *exc): 200 self.close() 201 await self.wait_closed() 202 203 204class AbstractEventLoop: 205 """Abstract event loop.""" 206 207 # Running and stopping the event loop. 208 209 def run_forever(self): 210 """Run the event loop until stop() is called.""" 211 raise NotImplementedError 212 213 def run_until_complete(self, future): 214 """Run the event loop until a Future is done. 215 216 Return the Future's result, or raise its exception. 217 """ 218 raise NotImplementedError 219 220 def stop(self): 221 """Stop the event loop as soon as reasonable. 222 223 Exactly how soon that is may depend on the implementation, but 224 no more I/O callbacks should be scheduled. 225 """ 226 raise NotImplementedError 227 228 def is_running(self): 229 """Return whether the event loop is currently running.""" 230 raise NotImplementedError 231 232 def is_closed(self): 233 """Returns True if the event loop was closed.""" 234 raise NotImplementedError 235 236 def close(self): 237 """Close the loop. 238 239 The loop should not be running. 240 241 This is idempotent and irreversible. 242 243 No other methods should be called after this one. 244 """ 245 raise NotImplementedError 246 247 async def shutdown_asyncgens(self): 248 """Shutdown all active asynchronous generators.""" 249 raise NotImplementedError 250 251 async def shutdown_default_executor(self): 252 """Schedule the shutdown of the default executor.""" 253 raise NotImplementedError 254 255 # Methods scheduling callbacks. All these return Handles. 256 257 def _timer_handle_cancelled(self, handle): 258 """Notification that a TimerHandle has been cancelled.""" 259 raise NotImplementedError 260 261 def call_soon(self, callback, *args, context=None): 262 return self.call_later(0, callback, *args, context=context) 263 264 def call_later(self, delay, callback, *args, context=None): 265 raise NotImplementedError 266 267 def call_at(self, when, callback, *args, context=None): 268 raise NotImplementedError 269 270 def time(self): 271 raise NotImplementedError 272 273 def create_future(self): 274 raise NotImplementedError 275 276 # Method scheduling a coroutine object: create a task. 277 278 def create_task(self, coro, *, name=None): 279 raise NotImplementedError 280 281 # Methods for interacting with threads. 282 283 def call_soon_threadsafe(self, callback, *args, context=None): 284 raise NotImplementedError 285 286 def run_in_executor(self, executor, func, *args): 287 raise NotImplementedError 288 289 def set_default_executor(self, executor): 290 raise NotImplementedError 291 292 # Network I/O methods returning Futures. 293 294 async def getaddrinfo(self, host, port, *, 295 family=0, type=0, proto=0, flags=0): 296 raise NotImplementedError 297 298 async def getnameinfo(self, sockaddr, flags=0): 299 raise NotImplementedError 300 301 async def create_connection( 302 self, protocol_factory, host=None, port=None, 303 *, ssl=None, family=0, proto=0, 304 flags=0, sock=None, local_addr=None, 305 server_hostname=None, 306 ssl_handshake_timeout=None, 307 happy_eyeballs_delay=None, interleave=None): 308 raise NotImplementedError 309 310 async def create_server( 311 self, protocol_factory, host=None, port=None, 312 *, family=socket.AF_UNSPEC, 313 flags=socket.AI_PASSIVE, sock=None, backlog=100, 314 ssl=None, reuse_address=None, reuse_port=None, 315 ssl_handshake_timeout=None, 316 start_serving=True): 317 """A coroutine which creates a TCP server bound to host and port. 318 319 The return value is a Server object which can be used to stop 320 the service. 321 322 If host is an empty string or None all interfaces are assumed 323 and a list of multiple sockets will be returned (most likely 324 one for IPv4 and another one for IPv6). The host parameter can also be 325 a sequence (e.g. list) of hosts to bind to. 326 327 family can be set to either AF_INET or AF_INET6 to force the 328 socket to use IPv4 or IPv6. If not set it will be determined 329 from host (defaults to AF_UNSPEC). 330 331 flags is a bitmask for getaddrinfo(). 332 333 sock can optionally be specified in order to use a preexisting 334 socket object. 335 336 backlog is the maximum number of queued connections passed to 337 listen() (defaults to 100). 338 339 ssl can be set to an SSLContext to enable SSL over the 340 accepted connections. 341 342 reuse_address tells the kernel to reuse a local socket in 343 TIME_WAIT state, without waiting for its natural timeout to 344 expire. If not specified will automatically be set to True on 345 UNIX. 346 347 reuse_port tells the kernel to allow this endpoint to be bound to 348 the same port as other existing endpoints are bound to, so long as 349 they all set this flag when being created. This option is not 350 supported on Windows. 351 352 ssl_handshake_timeout is the time in seconds that an SSL server 353 will wait for completion of the SSL handshake before aborting the 354 connection. Default is 60s. 355 356 start_serving set to True (default) causes the created server 357 to start accepting connections immediately. When set to False, 358 the user should await Server.start_serving() or Server.serve_forever() 359 to make the server to start accepting connections. 360 """ 361 raise NotImplementedError 362 363 async def sendfile(self, transport, file, offset=0, count=None, 364 *, fallback=True): 365 """Send a file through a transport. 366 367 Return an amount of sent bytes. 368 """ 369 raise NotImplementedError 370 371 async def start_tls(self, transport, protocol, sslcontext, *, 372 server_side=False, 373 server_hostname=None, 374 ssl_handshake_timeout=None): 375 """Upgrade a transport to TLS. 376 377 Return a new transport that *protocol* should start using 378 immediately. 379 """ 380 raise NotImplementedError 381 382 async def create_unix_connection( 383 self, protocol_factory, path=None, *, 384 ssl=None, sock=None, 385 server_hostname=None, 386 ssl_handshake_timeout=None): 387 raise NotImplementedError 388 389 async def create_unix_server( 390 self, protocol_factory, path=None, *, 391 sock=None, backlog=100, ssl=None, 392 ssl_handshake_timeout=None, 393 start_serving=True): 394 """A coroutine which creates a UNIX Domain Socket server. 395 396 The return value is a Server object, which can be used to stop 397 the service. 398 399 path is a str, representing a file system path to bind the 400 server socket to. 401 402 sock can optionally be specified in order to use a preexisting 403 socket object. 404 405 backlog is the maximum number of queued connections passed to 406 listen() (defaults to 100). 407 408 ssl can be set to an SSLContext to enable SSL over the 409 accepted connections. 410 411 ssl_handshake_timeout is the time in seconds that an SSL server 412 will wait for the SSL handshake to complete (defaults to 60s). 413 414 start_serving set to True (default) causes the created server 415 to start accepting connections immediately. When set to False, 416 the user should await Server.start_serving() or Server.serve_forever() 417 to make the server to start accepting connections. 418 """ 419 raise NotImplementedError 420 421 async def connect_accepted_socket( 422 self, protocol_factory, sock, 423 *, ssl=None, 424 ssl_handshake_timeout=None): 425 """Handle an accepted connection. 426 427 This is used by servers that accept connections outside of 428 asyncio, but use asyncio to handle connections. 429 430 This method is a coroutine. When completed, the coroutine 431 returns a (transport, protocol) pair. 432 """ 433 raise NotImplementedError 434 435 async def create_datagram_endpoint(self, protocol_factory, 436 local_addr=None, remote_addr=None, *, 437 family=0, proto=0, flags=0, 438 reuse_address=None, reuse_port=None, 439 allow_broadcast=None, sock=None): 440 """A coroutine which creates a datagram endpoint. 441 442 This method will try to establish the endpoint in the background. 443 When successful, the coroutine returns a (transport, protocol) pair. 444 445 protocol_factory must be a callable returning a protocol instance. 446 447 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on 448 host (or family if specified), socket type SOCK_DGRAM. 449 450 reuse_address tells the kernel to reuse a local socket in 451 TIME_WAIT state, without waiting for its natural timeout to 452 expire. If not specified it will automatically be set to True on 453 UNIX. 454 455 reuse_port tells the kernel to allow this endpoint to be bound to 456 the same port as other existing endpoints are bound to, so long as 457 they all set this flag when being created. This option is not 458 supported on Windows and some UNIX's. If the 459 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 460 capability is unsupported. 461 462 allow_broadcast tells the kernel to allow this endpoint to send 463 messages to the broadcast address. 464 465 sock can optionally be specified in order to use a preexisting 466 socket object. 467 """ 468 raise NotImplementedError 469 470 # Pipes and subprocesses. 471 472 async def connect_read_pipe(self, protocol_factory, pipe): 473 """Register read pipe in event loop. Set the pipe to non-blocking mode. 474 475 protocol_factory should instantiate object with Protocol interface. 476 pipe is a file-like object. 477 Return pair (transport, protocol), where transport supports the 478 ReadTransport interface.""" 479 # The reason to accept file-like object instead of just file descriptor 480 # is: we need to own pipe and close it at transport finishing 481 # Can got complicated errors if pass f.fileno(), 482 # close fd in pipe transport then close f and vice versa. 483 raise NotImplementedError 484 485 async def connect_write_pipe(self, protocol_factory, pipe): 486 """Register write pipe in event loop. 487 488 protocol_factory should instantiate object with BaseProtocol interface. 489 Pipe is file-like object already switched to nonblocking. 490 Return pair (transport, protocol), where transport support 491 WriteTransport interface.""" 492 # The reason to accept file-like object instead of just file descriptor 493 # is: we need to own pipe and close it at transport finishing 494 # Can got complicated errors if pass f.fileno(), 495 # close fd in pipe transport then close f and vice versa. 496 raise NotImplementedError 497 498 async def subprocess_shell(self, protocol_factory, cmd, *, 499 stdin=subprocess.PIPE, 500 stdout=subprocess.PIPE, 501 stderr=subprocess.PIPE, 502 **kwargs): 503 raise NotImplementedError 504 505 async def subprocess_exec(self, protocol_factory, *args, 506 stdin=subprocess.PIPE, 507 stdout=subprocess.PIPE, 508 stderr=subprocess.PIPE, 509 **kwargs): 510 raise NotImplementedError 511 512 # Ready-based callback registration methods. 513 # The add_*() methods return None. 514 # The remove_*() methods return True if something was removed, 515 # False if there was nothing to delete. 516 517 def add_reader(self, fd, callback, *args): 518 raise NotImplementedError 519 520 def remove_reader(self, fd): 521 raise NotImplementedError 522 523 def add_writer(self, fd, callback, *args): 524 raise NotImplementedError 525 526 def remove_writer(self, fd): 527 raise NotImplementedError 528 529 # Completion based I/O methods returning Futures. 530 531 async def sock_recv(self, sock, nbytes): 532 raise NotImplementedError 533 534 async def sock_recv_into(self, sock, buf): 535 raise NotImplementedError 536 537 async def sock_sendall(self, sock, data): 538 raise NotImplementedError 539 540 async def sock_connect(self, sock, address): 541 raise NotImplementedError 542 543 async def sock_accept(self, sock): 544 raise NotImplementedError 545 546 async def sock_sendfile(self, sock, file, offset=0, count=None, 547 *, fallback=None): 548 raise NotImplementedError 549 550 # Signal handling. 551 552 def add_signal_handler(self, sig, callback, *args): 553 raise NotImplementedError 554 555 def remove_signal_handler(self, sig): 556 raise NotImplementedError 557 558 # Task factory. 559 560 def set_task_factory(self, factory): 561 raise NotImplementedError 562 563 def get_task_factory(self): 564 raise NotImplementedError 565 566 # Error handlers. 567 568 def get_exception_handler(self): 569 raise NotImplementedError 570 571 def set_exception_handler(self, handler): 572 raise NotImplementedError 573 574 def default_exception_handler(self, context): 575 raise NotImplementedError 576 577 def call_exception_handler(self, context): 578 raise NotImplementedError 579 580 # Debug flag management. 581 582 def get_debug(self): 583 raise NotImplementedError 584 585 def set_debug(self, enabled): 586 raise NotImplementedError 587 588 589class AbstractEventLoopPolicy: 590 """Abstract policy for accessing the event loop.""" 591 592 def get_event_loop(self): 593 """Get the event loop for the current context. 594 595 Returns an event loop object implementing the BaseEventLoop interface, 596 or raises an exception in case no event loop has been set for the 597 current context and the current policy does not specify to create one. 598 599 It should never return None.""" 600 raise NotImplementedError 601 602 def set_event_loop(self, loop): 603 """Set the event loop for the current context to loop.""" 604 raise NotImplementedError 605 606 def new_event_loop(self): 607 """Create and return a new event loop object according to this 608 policy's rules. If there's need to set this loop as the event loop for 609 the current context, set_event_loop must be called explicitly.""" 610 raise NotImplementedError 611 612 # Child processes handling (Unix only). 613 614 def get_child_watcher(self): 615 "Get the watcher for child processes." 616 raise NotImplementedError 617 618 def set_child_watcher(self, watcher): 619 """Set the watcher for child processes.""" 620 raise NotImplementedError 621 622 623class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): 624 """Default policy implementation for accessing the event loop. 625 626 In this policy, each thread has its own event loop. However, we 627 only automatically create an event loop by default for the main 628 thread; other threads by default have no event loop. 629 630 Other policies may have different rules (e.g. a single global 631 event loop, or automatically creating an event loop per thread, or 632 using some other notion of context to which an event loop is 633 associated). 634 """ 635 636 _loop_factory = None 637 638 class _Local(threading.local): 639 _loop = None 640 _set_called = False 641 642 def __init__(self): 643 self._local = self._Local() 644 645 def get_event_loop(self): 646 """Get the event loop for the current context. 647 648 Returns an instance of EventLoop or raises an exception. 649 """ 650 if (self._local._loop is None and 651 not self._local._set_called and 652 threading.current_thread() is threading.main_thread()): 653 self.set_event_loop(self.new_event_loop()) 654 655 if self._local._loop is None: 656 raise RuntimeError('There is no current event loop in thread %r.' 657 % threading.current_thread().name) 658 659 return self._local._loop 660 661 def set_event_loop(self, loop): 662 """Set the event loop.""" 663 self._local._set_called = True 664 assert loop is None or isinstance(loop, AbstractEventLoop) 665 self._local._loop = loop 666 667 def new_event_loop(self): 668 """Create a new event loop. 669 670 You must call set_event_loop() to make this the current event 671 loop. 672 """ 673 return self._loop_factory() 674 675 676# Event loop policy. The policy itself is always global, even if the 677# policy's rules say that there is an event loop per thread (or other 678# notion of context). The default policy is installed by the first 679# call to get_event_loop_policy(). 680_event_loop_policy = None 681 682# Lock for protecting the on-the-fly creation of the event loop policy. 683_lock = threading.Lock() 684 685 686# A TLS for the running event loop, used by _get_running_loop. 687class _RunningLoop(threading.local): 688 loop_pid = (None, None) 689 690 691_running_loop = _RunningLoop() 692 693 694def get_running_loop(): 695 """Return the running event loop. Raise a RuntimeError if there is none. 696 697 This function is thread-specific. 698 """ 699 # NOTE: this function is implemented in C (see _asynciomodule.c) 700 loop = _get_running_loop() 701 if loop is None: 702 raise RuntimeError('no running event loop') 703 return loop 704 705 706def _get_running_loop(): 707 """Return the running event loop or None. 708 709 This is a low-level function intended to be used by event loops. 710 This function is thread-specific. 711 """ 712 # NOTE: this function is implemented in C (see _asynciomodule.c) 713 running_loop, pid = _running_loop.loop_pid 714 if running_loop is not None and pid == os.getpid(): 715 return running_loop 716 717 718def _set_running_loop(loop): 719 """Set the running event loop. 720 721 This is a low-level function intended to be used by event loops. 722 This function is thread-specific. 723 """ 724 # NOTE: this function is implemented in C (see _asynciomodule.c) 725 _running_loop.loop_pid = (loop, os.getpid()) 726 727 728def _init_event_loop_policy(): 729 global _event_loop_policy 730 with _lock: 731 if _event_loop_policy is None: # pragma: no branch 732 from . import DefaultEventLoopPolicy 733 _event_loop_policy = DefaultEventLoopPolicy() 734 735 736def get_event_loop_policy(): 737 """Get the current event loop policy.""" 738 if _event_loop_policy is None: 739 _init_event_loop_policy() 740 return _event_loop_policy 741 742 743def set_event_loop_policy(policy): 744 """Set the current event loop policy. 745 746 If policy is None, the default policy is restored.""" 747 global _event_loop_policy 748 assert policy is None or isinstance(policy, AbstractEventLoopPolicy) 749 _event_loop_policy = policy 750 751 752def get_event_loop(): 753 """Return an asyncio event loop. 754 755 When called from a coroutine or a callback (e.g. scheduled with call_soon 756 or similar API), this function will always return the running event loop. 757 758 If there is no running event loop set, the function will return 759 the result of `get_event_loop_policy().get_event_loop()` call. 760 """ 761 # NOTE: this function is implemented in C (see _asynciomodule.c) 762 return _py__get_event_loop() 763 764 765def _get_event_loop(stacklevel=3): 766 current_loop = _get_running_loop() 767 if current_loop is not None: 768 return current_loop 769 import warnings 770 warnings.warn('There is no current event loop', 771 DeprecationWarning, stacklevel=stacklevel) 772 return get_event_loop_policy().get_event_loop() 773 774 775def set_event_loop(loop): 776 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 777 get_event_loop_policy().set_event_loop(loop) 778 779 780def new_event_loop(): 781 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 782 return get_event_loop_policy().new_event_loop() 783 784 785def get_child_watcher(): 786 """Equivalent to calling get_event_loop_policy().get_child_watcher().""" 787 return get_event_loop_policy().get_child_watcher() 788 789 790def set_child_watcher(watcher): 791 """Equivalent to calling 792 get_event_loop_policy().set_child_watcher(watcher).""" 793 return get_event_loop_policy().set_child_watcher(watcher) 794 795 796# Alias pure-Python implementations for testing purposes. 797_py__get_running_loop = _get_running_loop 798_py__set_running_loop = _set_running_loop 799_py_get_running_loop = get_running_loop 800_py_get_event_loop = get_event_loop 801_py__get_event_loop = _get_event_loop 802 803 804try: 805 # get_event_loop() is one of the most frequently called 806 # functions in asyncio. Pure Python implementation is 807 # about 4 times slower than C-accelerated. 808 from _asyncio import (_get_running_loop, _set_running_loop, 809 get_running_loop, get_event_loop, _get_event_loop) 810except ImportError: 811 pass 812else: 813 # Alias C implementations for testing purposes. 814 _c__get_running_loop = _get_running_loop 815 _c__set_running_loop = _set_running_loop 816 _c_get_running_loop = get_running_loop 817 _c_get_event_loop = get_event_loop 818 _c__get_event_loop = _get_event_loop 819