1"""Event loop and event loop policy.""" 2 3__all__ = ( 4 'AbstractEventLoopPolicy', 5 'AbstractEventLoop', 'AbstractServer', 6 'Handle', 'TimerHandle', 'SendfileNotAvailableError', 7 'get_event_loop_policy', 'set_event_loop_policy', 8 'get_event_loop', 'set_event_loop', 'new_event_loop', 9 'get_child_watcher', 'set_child_watcher', 10 '_set_running_loop', 'get_running_loop', 11 '_get_running_loop', 12) 13 14import contextvars 15import os 16import socket 17import subprocess 18import sys 19import threading 20 21from . import format_helpers 22 23 24class SendfileNotAvailableError(RuntimeError): 25 """Sendfile syscall is not available. 26 27 Raised if OS does not support sendfile syscall for given socket or 28 file type. 29 """ 30 31 32class Handle: 33 """Object returned by callback registration methods.""" 34 35 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 36 '_source_traceback', '_repr', '__weakref__', 37 '_context') 38 39 def __init__(self, callback, args, loop, context=None): 40 if context is None: 41 context = contextvars.copy_context() 42 self._context = context 43 self._loop = loop 44 self._callback = callback 45 self._args = args 46 self._cancelled = False 47 self._repr = None 48 if self._loop.get_debug(): 49 self._source_traceback = format_helpers.extract_stack( 50 sys._getframe(1)) 51 else: 52 self._source_traceback = None 53 54 def _repr_info(self): 55 info = [self.__class__.__name__] 56 if self._cancelled: 57 info.append('cancelled') 58 if self._callback is not None: 59 info.append(format_helpers._format_callback_source( 60 self._callback, self._args)) 61 if self._source_traceback: 62 frame = self._source_traceback[-1] 63 info.append(f'created at {frame[0]}:{frame[1]}') 64 return info 65 66 def __repr__(self): 67 if self._repr is not None: 68 return self._repr 69 info = self._repr_info() 70 return '<{}>'.format(' '.join(info)) 71 72 def cancel(self): 73 if not self._cancelled: 74 self._cancelled = True 75 if self._loop.get_debug(): 76 # Keep a representation in debug mode to keep callback and 77 # parameters. For example, to log the warning 78 # "Executing <Handle...> took 2.5 second" 79 self._repr = repr(self) 80 self._callback = None 81 self._args = None 82 83 def cancelled(self): 84 return self._cancelled 85 86 def _run(self): 87 try: 88 self._context.run(self._callback, *self._args) 89 except Exception as exc: 90 cb = format_helpers._format_callback_source( 91 self._callback, self._args) 92 msg = f'Exception in callback {cb}' 93 context = { 94 'message': msg, 95 'exception': exc, 96 'handle': self, 97 } 98 if self._source_traceback: 99 context['source_traceback'] = self._source_traceback 100 self._loop.call_exception_handler(context) 101 self = None # Needed to break cycles when an exception occurs. 102 103 104class TimerHandle(Handle): 105 """Object returned by timed callback registration methods.""" 106 107 __slots__ = ['_scheduled', '_when'] 108 109 def __init__(self, when, callback, args, loop, context=None): 110 assert when is not None 111 super().__init__(callback, args, loop, context) 112 if self._source_traceback: 113 del self._source_traceback[-1] 114 self._when = when 115 self._scheduled = False 116 117 def _repr_info(self): 118 info = super()._repr_info() 119 pos = 2 if self._cancelled else 1 120 info.insert(pos, f'when={self._when}') 121 return info 122 123 def __hash__(self): 124 return hash(self._when) 125 126 def __lt__(self, other): 127 return self._when < other._when 128 129 def __le__(self, other): 130 if self._when < other._when: 131 return True 132 return self.__eq__(other) 133 134 def __gt__(self, other): 135 return self._when > other._when 136 137 def __ge__(self, other): 138 if self._when > other._when: 139 return True 140 return self.__eq__(other) 141 142 def __eq__(self, other): 143 if isinstance(other, TimerHandle): 144 return (self._when == other._when and 145 self._callback == other._callback and 146 self._args == other._args and 147 self._cancelled == other._cancelled) 148 return NotImplemented 149 150 def __ne__(self, other): 151 equal = self.__eq__(other) 152 return NotImplemented if equal is NotImplemented else not equal 153 154 def cancel(self): 155 if not self._cancelled: 156 self._loop._timer_handle_cancelled(self) 157 super().cancel() 158 159 def when(self): 160 """Return a scheduled callback time. 161 162 The time is an absolute timestamp, using the same time 163 reference as loop.time(). 164 """ 165 return self._when 166 167 168class AbstractServer: 169 """Abstract server returned by create_server().""" 170 171 def close(self): 172 """Stop serving. This leaves existing connections open.""" 173 raise NotImplementedError 174 175 def get_loop(self): 176 """Get the event loop the Server object is attached to.""" 177 raise NotImplementedError 178 179 def is_serving(self): 180 """Return True if the server is accepting connections.""" 181 raise NotImplementedError 182 183 async def start_serving(self): 184 """Start accepting connections. 185 186 This method is idempotent, so it can be called when 187 the server is already being serving. 188 """ 189 raise NotImplementedError 190 191 async def serve_forever(self): 192 """Start accepting connections until the coroutine is cancelled. 193 194 The server is closed when the coroutine is cancelled. 195 """ 196 raise NotImplementedError 197 198 async def wait_closed(self): 199 """Coroutine to wait until service is closed.""" 200 raise NotImplementedError 201 202 async def __aenter__(self): 203 return self 204 205 async def __aexit__(self, *exc): 206 self.close() 207 await self.wait_closed() 208 209 210class AbstractEventLoop: 211 """Abstract event loop.""" 212 213 # Running and stopping the event loop. 214 215 def run_forever(self): 216 """Run the event loop until stop() is called.""" 217 raise NotImplementedError 218 219 def run_until_complete(self, future): 220 """Run the event loop until a Future is done. 221 222 Return the Future's result, or raise its exception. 223 """ 224 raise NotImplementedError 225 226 def stop(self): 227 """Stop the event loop as soon as reasonable. 228 229 Exactly how soon that is may depend on the implementation, but 230 no more I/O callbacks should be scheduled. 231 """ 232 raise NotImplementedError 233 234 def is_running(self): 235 """Return whether the event loop is currently running.""" 236 raise NotImplementedError 237 238 def is_closed(self): 239 """Returns True if the event loop was closed.""" 240 raise NotImplementedError 241 242 def close(self): 243 """Close the loop. 244 245 The loop should not be running. 246 247 This is idempotent and irreversible. 248 249 No other methods should be called after this one. 250 """ 251 raise NotImplementedError 252 253 async def shutdown_asyncgens(self): 254 """Shutdown all active asynchronous generators.""" 255 raise NotImplementedError 256 257 # Methods scheduling callbacks. All these return Handles. 258 259 def _timer_handle_cancelled(self, handle): 260 """Notification that a TimerHandle has been cancelled.""" 261 raise NotImplementedError 262 263 def call_soon(self, callback, *args): 264 return self.call_later(0, callback, *args) 265 266 def call_later(self, delay, callback, *args): 267 raise NotImplementedError 268 269 def call_at(self, when, callback, *args): 270 raise NotImplementedError 271 272 def time(self): 273 raise NotImplementedError 274 275 def create_future(self): 276 raise NotImplementedError 277 278 # Method scheduling a coroutine object: create a task. 279 280 def create_task(self, coro): 281 raise NotImplementedError 282 283 # Methods for interacting with threads. 284 285 def call_soon_threadsafe(self, callback, *args): 286 raise NotImplementedError 287 288 async def run_in_executor(self, executor, func, *args): 289 raise NotImplementedError 290 291 def set_default_executor(self, executor): 292 raise NotImplementedError 293 294 # Network I/O methods returning Futures. 295 296 async def getaddrinfo(self, host, port, *, 297 family=0, type=0, proto=0, flags=0): 298 raise NotImplementedError 299 300 async def getnameinfo(self, sockaddr, flags=0): 301 raise NotImplementedError 302 303 async def create_connection( 304 self, protocol_factory, host=None, port=None, 305 *, ssl=None, family=0, proto=0, 306 flags=0, sock=None, local_addr=None, 307 server_hostname=None, 308 ssl_handshake_timeout=None): 309 raise NotImplementedError 310 311 async def create_server( 312 self, protocol_factory, host=None, port=None, 313 *, family=socket.AF_UNSPEC, 314 flags=socket.AI_PASSIVE, sock=None, backlog=100, 315 ssl=None, reuse_address=None, reuse_port=None, 316 ssl_handshake_timeout=None, 317 start_serving=True): 318 """A coroutine which creates a TCP server bound to host and port. 319 320 The return value is a Server object which can be used to stop 321 the service. 322 323 If host is an empty string or None all interfaces are assumed 324 and a list of multiple sockets will be returned (most likely 325 one for IPv4 and another one for IPv6). The host parameter can also be 326 a sequence (e.g. list) of hosts to bind to. 327 328 family can be set to either AF_INET or AF_INET6 to force the 329 socket to use IPv4 or IPv6. If not set it will be determined 330 from host (defaults to AF_UNSPEC). 331 332 flags is a bitmask for getaddrinfo(). 333 334 sock can optionally be specified in order to use a preexisting 335 socket object. 336 337 backlog is the maximum number of queued connections passed to 338 listen() (defaults to 100). 339 340 ssl can be set to an SSLContext to enable SSL over the 341 accepted connections. 342 343 reuse_address tells the kernel to reuse a local socket in 344 TIME_WAIT state, without waiting for its natural timeout to 345 expire. If not specified will automatically be set to True on 346 UNIX. 347 348 reuse_port tells the kernel to allow this endpoint to be bound to 349 the same port as other existing endpoints are bound to, so long as 350 they all set this flag when being created. This option is not 351 supported on Windows. 352 353 ssl_handshake_timeout is the time in seconds that an SSL server 354 will wait for completion of the SSL handshake before aborting the 355 connection. Default is 60s. 356 357 start_serving set to True (default) causes the created server 358 to start accepting connections immediately. When set to False, 359 the user should await Server.start_serving() or Server.serve_forever() 360 to make the server to start accepting connections. 361 """ 362 raise NotImplementedError 363 364 async def sendfile(self, transport, file, offset=0, count=None, 365 *, fallback=True): 366 """Send a file through a transport. 367 368 Return an amount of sent bytes. 369 """ 370 raise NotImplementedError 371 372 async def start_tls(self, transport, protocol, sslcontext, *, 373 server_side=False, 374 server_hostname=None, 375 ssl_handshake_timeout=None): 376 """Upgrade a transport to TLS. 377 378 Return a new transport that *protocol* should start using 379 immediately. 380 """ 381 raise NotImplementedError 382 383 async def create_unix_connection( 384 self, protocol_factory, path=None, *, 385 ssl=None, sock=None, 386 server_hostname=None, 387 ssl_handshake_timeout=None): 388 raise NotImplementedError 389 390 async def create_unix_server( 391 self, protocol_factory, path=None, *, 392 sock=None, backlog=100, ssl=None, 393 ssl_handshake_timeout=None, 394 start_serving=True): 395 """A coroutine which creates a UNIX Domain Socket server. 396 397 The return value is a Server object, which can be used to stop 398 the service. 399 400 path is a str, representing a file systsem path to bind the 401 server socket to. 402 403 sock can optionally be specified in order to use a preexisting 404 socket object. 405 406 backlog is the maximum number of queued connections passed to 407 listen() (defaults to 100). 408 409 ssl can be set to an SSLContext to enable SSL over the 410 accepted connections. 411 412 ssl_handshake_timeout is the time in seconds that an SSL server 413 will wait for the SSL handshake to complete (defaults to 60s). 414 415 start_serving set to True (default) causes the created server 416 to start accepting connections immediately. When set to False, 417 the user should await Server.start_serving() or Server.serve_forever() 418 to make the server to start accepting connections. 419 """ 420 raise NotImplementedError 421 422 async def create_datagram_endpoint(self, protocol_factory, 423 local_addr=None, remote_addr=None, *, 424 family=0, proto=0, flags=0, 425 reuse_address=None, reuse_port=None, 426 allow_broadcast=None, sock=None): 427 """A coroutine which creates a datagram endpoint. 428 429 This method will try to establish the endpoint in the background. 430 When successful, the coroutine returns a (transport, protocol) pair. 431 432 protocol_factory must be a callable returning a protocol instance. 433 434 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on 435 host (or family if specified), socket type SOCK_DGRAM. 436 437 reuse_address tells the kernel to reuse a local socket in 438 TIME_WAIT state, without waiting for its natural timeout to 439 expire. If not specified it will automatically be set to True on 440 UNIX. 441 442 reuse_port tells the kernel to allow this endpoint to be bound to 443 the same port as other existing endpoints are bound to, so long as 444 they all set this flag when being created. This option is not 445 supported on Windows and some UNIX's. If the 446 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 447 capability is unsupported. 448 449 allow_broadcast tells the kernel to allow this endpoint to send 450 messages to the broadcast address. 451 452 sock can optionally be specified in order to use a preexisting 453 socket object. 454 """ 455 raise NotImplementedError 456 457 # Pipes and subprocesses. 458 459 async def connect_read_pipe(self, protocol_factory, pipe): 460 """Register read pipe in event loop. Set the pipe to non-blocking mode. 461 462 protocol_factory should instantiate object with Protocol interface. 463 pipe is a file-like object. 464 Return pair (transport, protocol), where transport supports the 465 ReadTransport interface.""" 466 # The reason to accept file-like object instead of just file descriptor 467 # is: we need to own pipe and close it at transport finishing 468 # Can got complicated errors if pass f.fileno(), 469 # close fd in pipe transport then close f and vise versa. 470 raise NotImplementedError 471 472 async def connect_write_pipe(self, protocol_factory, pipe): 473 """Register write pipe in event loop. 474 475 protocol_factory should instantiate object with BaseProtocol interface. 476 Pipe is file-like object already switched to nonblocking. 477 Return pair (transport, protocol), where transport support 478 WriteTransport interface.""" 479 # The reason to accept file-like object instead of just file descriptor 480 # is: we need to own pipe and close it at transport finishing 481 # Can got complicated errors if pass f.fileno(), 482 # close fd in pipe transport then close f and vise versa. 483 raise NotImplementedError 484 485 async def subprocess_shell(self, protocol_factory, cmd, *, 486 stdin=subprocess.PIPE, 487 stdout=subprocess.PIPE, 488 stderr=subprocess.PIPE, 489 **kwargs): 490 raise NotImplementedError 491 492 async def subprocess_exec(self, protocol_factory, *args, 493 stdin=subprocess.PIPE, 494 stdout=subprocess.PIPE, 495 stderr=subprocess.PIPE, 496 **kwargs): 497 raise NotImplementedError 498 499 # Ready-based callback registration methods. 500 # The add_*() methods return None. 501 # The remove_*() methods return True if something was removed, 502 # False if there was nothing to delete. 503 504 def add_reader(self, fd, callback, *args): 505 raise NotImplementedError 506 507 def remove_reader(self, fd): 508 raise NotImplementedError 509 510 def add_writer(self, fd, callback, *args): 511 raise NotImplementedError 512 513 def remove_writer(self, fd): 514 raise NotImplementedError 515 516 # Completion based I/O methods returning Futures. 517 518 async def sock_recv(self, sock, nbytes): 519 raise NotImplementedError 520 521 async def sock_recv_into(self, sock, buf): 522 raise NotImplementedError 523 524 async def sock_sendall(self, sock, data): 525 raise NotImplementedError 526 527 async def sock_connect(self, sock, address): 528 raise NotImplementedError 529 530 async def sock_accept(self, sock): 531 raise NotImplementedError 532 533 async def sock_sendfile(self, sock, file, offset=0, count=None, 534 *, fallback=None): 535 raise NotImplementedError 536 537 # Signal handling. 538 539 def add_signal_handler(self, sig, callback, *args): 540 raise NotImplementedError 541 542 def remove_signal_handler(self, sig): 543 raise NotImplementedError 544 545 # Task factory. 546 547 def set_task_factory(self, factory): 548 raise NotImplementedError 549 550 def get_task_factory(self): 551 raise NotImplementedError 552 553 # Error handlers. 554 555 def get_exception_handler(self): 556 raise NotImplementedError 557 558 def set_exception_handler(self, handler): 559 raise NotImplementedError 560 561 def default_exception_handler(self, context): 562 raise NotImplementedError 563 564 def call_exception_handler(self, context): 565 raise NotImplementedError 566 567 # Debug flag management. 568 569 def get_debug(self): 570 raise NotImplementedError 571 572 def set_debug(self, enabled): 573 raise NotImplementedError 574 575 576class AbstractEventLoopPolicy: 577 """Abstract policy for accessing the event loop.""" 578 579 def get_event_loop(self): 580 """Get the event loop for the current context. 581 582 Returns an event loop object implementing the BaseEventLoop interface, 583 or raises an exception in case no event loop has been set for the 584 current context and the current policy does not specify to create one. 585 586 It should never return None.""" 587 raise NotImplementedError 588 589 def set_event_loop(self, loop): 590 """Set the event loop for the current context to loop.""" 591 raise NotImplementedError 592 593 def new_event_loop(self): 594 """Create and return a new event loop object according to this 595 policy's rules. If there's need to set this loop as the event loop for 596 the current context, set_event_loop must be called explicitly.""" 597 raise NotImplementedError 598 599 # Child processes handling (Unix only). 600 601 def get_child_watcher(self): 602 "Get the watcher for child processes." 603 raise NotImplementedError 604 605 def set_child_watcher(self, watcher): 606 """Set the watcher for child processes.""" 607 raise NotImplementedError 608 609 610class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): 611 """Default policy implementation for accessing the event loop. 612 613 In this policy, each thread has its own event loop. However, we 614 only automatically create an event loop by default for the main 615 thread; other threads by default have no event loop. 616 617 Other policies may have different rules (e.g. a single global 618 event loop, or automatically creating an event loop per thread, or 619 using some other notion of context to which an event loop is 620 associated). 621 """ 622 623 _loop_factory = None 624 625 class _Local(threading.local): 626 _loop = None 627 _set_called = False 628 629 def __init__(self): 630 self._local = self._Local() 631 632 def get_event_loop(self): 633 """Get the event loop. 634 635 This may be None or an instance of EventLoop. 636 """ 637 if (self._local._loop is None and 638 not self._local._set_called and 639 isinstance(threading.current_thread(), threading._MainThread)): 640 self.set_event_loop(self.new_event_loop()) 641 642 if self._local._loop is None: 643 raise RuntimeError('There is no current event loop in thread %r.' 644 % threading.current_thread().name) 645 646 return self._local._loop 647 648 def set_event_loop(self, loop): 649 """Set the event loop.""" 650 self._local._set_called = True 651 assert loop is None or isinstance(loop, AbstractEventLoop) 652 self._local._loop = loop 653 654 def new_event_loop(self): 655 """Create a new event loop. 656 657 You must call set_event_loop() to make this the current event 658 loop. 659 """ 660 return self._loop_factory() 661 662 663# Event loop policy. The policy itself is always global, even if the 664# policy's rules say that there is an event loop per thread (or other 665# notion of context). The default policy is installed by the first 666# call to get_event_loop_policy(). 667_event_loop_policy = None 668 669# Lock for protecting the on-the-fly creation of the event loop policy. 670_lock = threading.Lock() 671 672 673# A TLS for the running event loop, used by _get_running_loop. 674class _RunningLoop(threading.local): 675 loop_pid = (None, None) 676 677 678_running_loop = _RunningLoop() 679 680 681def get_running_loop(): 682 """Return the running event loop. Raise a RuntimeError if there is none. 683 684 This function is thread-specific. 685 """ 686 # NOTE: this function is implemented in C (see _asynciomodule.c) 687 loop = _get_running_loop() 688 if loop is None: 689 raise RuntimeError('no running event loop') 690 return loop 691 692 693def _get_running_loop(): 694 """Return the running event loop or None. 695 696 This is a low-level function intended to be used by event loops. 697 This function is thread-specific. 698 """ 699 # NOTE: this function is implemented in C (see _asynciomodule.c) 700 running_loop, pid = _running_loop.loop_pid 701 if running_loop is not None and pid == os.getpid(): 702 return running_loop 703 704 705def _set_running_loop(loop): 706 """Set the running event loop. 707 708 This is a low-level function intended to be used by event loops. 709 This function is thread-specific. 710 """ 711 # NOTE: this function is implemented in C (see _asynciomodule.c) 712 _running_loop.loop_pid = (loop, os.getpid()) 713 714 715def _init_event_loop_policy(): 716 global _event_loop_policy 717 with _lock: 718 if _event_loop_policy is None: # pragma: no branch 719 from . import DefaultEventLoopPolicy 720 _event_loop_policy = DefaultEventLoopPolicy() 721 722 723def get_event_loop_policy(): 724 """Get the current event loop policy.""" 725 if _event_loop_policy is None: 726 _init_event_loop_policy() 727 return _event_loop_policy 728 729 730def set_event_loop_policy(policy): 731 """Set the current event loop policy. 732 733 If policy is None, the default policy is restored.""" 734 global _event_loop_policy 735 assert policy is None or isinstance(policy, AbstractEventLoopPolicy) 736 _event_loop_policy = policy 737 738 739def get_event_loop(): 740 """Return an asyncio event loop. 741 742 When called from a coroutine or a callback (e.g. scheduled with call_soon 743 or similar API), this function will always return the running event loop. 744 745 If there is no running event loop set, the function will return 746 the result of `get_event_loop_policy().get_event_loop()` call. 747 """ 748 # NOTE: this function is implemented in C (see _asynciomodule.c) 749 current_loop = _get_running_loop() 750 if current_loop is not None: 751 return current_loop 752 return get_event_loop_policy().get_event_loop() 753 754 755def set_event_loop(loop): 756 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 757 get_event_loop_policy().set_event_loop(loop) 758 759 760def new_event_loop(): 761 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 762 return get_event_loop_policy().new_event_loop() 763 764 765def get_child_watcher(): 766 """Equivalent to calling get_event_loop_policy().get_child_watcher().""" 767 return get_event_loop_policy().get_child_watcher() 768 769 770def set_child_watcher(watcher): 771 """Equivalent to calling 772 get_event_loop_policy().set_child_watcher(watcher).""" 773 return get_event_loop_policy().set_child_watcher(watcher) 774 775 776# Alias pure-Python implementations for testing purposes. 777_py__get_running_loop = _get_running_loop 778_py__set_running_loop = _set_running_loop 779_py_get_running_loop = get_running_loop 780_py_get_event_loop = get_event_loop 781 782 783try: 784 # get_event_loop() is one of the most frequently called 785 # functions in asyncio. Pure Python implementation is 786 # about 4 times slower than C-accelerated. 787 from _asyncio import (_get_running_loop, _set_running_loop, 788 get_running_loop, get_event_loop) 789except ImportError: 790 pass 791else: 792 # Alias C implementations for testing purposes. 793 _c__get_running_loop = _get_running_loop 794 _c__set_running_loop = _set_running_loop 795 _c_get_running_loop = get_running_loop 796 _c_get_event_loop = get_event_loop 797