1"""Event loop and event loop policy.""" 2 3# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0 4# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0) 5# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc. http://magic.io 6 7__all__ = ( 8 'AbstractEventLoopPolicy', 9 'AbstractEventLoop', 'AbstractServer', 10 'Handle', 'TimerHandle', 11 'get_event_loop_policy', 'set_event_loop_policy', 12 'get_event_loop', 'set_event_loop', 'new_event_loop', 13 'get_child_watcher', 'set_child_watcher', 14 '_set_running_loop', 'get_running_loop', 15 '_get_running_loop', 16) 17 18import contextvars 19import os 20import signal 21import socket 22import subprocess 23import sys 24import threading 25 26from . import format_helpers 27 28 29class Handle: 30 """Object returned by callback registration methods.""" 31 32 __slots__ = ('_callback', '_args', '_cancelled', '_loop', 33 '_source_traceback', '_repr', '__weakref__', 34 '_context') 35 36 def __init__(self, callback, args, loop, context=None): 37 if context is None: 38 context = contextvars.copy_context() 39 self._context = context 40 self._loop = loop 41 self._callback = callback 42 self._args = args 43 self._cancelled = False 44 self._repr = None 45 if self._loop.get_debug(): 46 self._source_traceback = format_helpers.extract_stack( 47 sys._getframe(1)) 48 else: 49 self._source_traceback = None 50 51 def _repr_info(self): 52 info = [self.__class__.__name__] 53 if self._cancelled: 54 info.append('cancelled') 55 if self._callback is not None: 56 info.append(format_helpers._format_callback_source( 57 self._callback, self._args, 58 debug=self._loop.get_debug())) 59 if self._source_traceback: 60 frame = self._source_traceback[-1] 61 info.append(f'created at {frame[0]}:{frame[1]}') 62 return info 63 64 def __repr__(self): 65 if self._repr is not None: 66 return self._repr 67 info = self._repr_info() 68 return '<{}>'.format(' '.join(info)) 69 70 def get_context(self): 71 return self._context 72 73 def cancel(self): 74 if not self._cancelled: 75 self._cancelled = True 76 if self._loop.get_debug(): 77 # Keep a representation in debug mode to keep callback and 78 # parameters. For example, to log the warning 79 # "Executing <Handle...> took 2.5 second" 80 self._repr = repr(self) 81 self._callback = None 82 self._args = None 83 84 def cancelled(self): 85 return self._cancelled 86 87 def _run(self): 88 try: 89 self._context.run(self._callback, *self._args) 90 except (SystemExit, KeyboardInterrupt): 91 raise 92 except BaseException as exc: 93 cb = format_helpers._format_callback_source( 94 self._callback, self._args, 95 debug=self._loop.get_debug()) 96 msg = f'Exception in callback {cb}' 97 context = { 98 'message': msg, 99 'exception': exc, 100 'handle': self, 101 } 102 if self._source_traceback: 103 context['source_traceback'] = self._source_traceback 104 self._loop.call_exception_handler(context) 105 self = None # Needed to break cycles when an exception occurs. 106 107 108class TimerHandle(Handle): 109 """Object returned by timed callback registration methods.""" 110 111 __slots__ = ['_scheduled', '_when'] 112 113 def __init__(self, when, callback, args, loop, context=None): 114 super().__init__(callback, args, loop, context) 115 if self._source_traceback: 116 del self._source_traceback[-1] 117 self._when = when 118 self._scheduled = False 119 120 def _repr_info(self): 121 info = super()._repr_info() 122 pos = 2 if self._cancelled else 1 123 info.insert(pos, f'when={self._when}') 124 return info 125 126 def __hash__(self): 127 return hash(self._when) 128 129 def __lt__(self, other): 130 if isinstance(other, TimerHandle): 131 return self._when < other._when 132 return NotImplemented 133 134 def __le__(self, other): 135 if isinstance(other, TimerHandle): 136 return self._when < other._when or self.__eq__(other) 137 return NotImplemented 138 139 def __gt__(self, other): 140 if isinstance(other, TimerHandle): 141 return self._when > other._when 142 return NotImplemented 143 144 def __ge__(self, other): 145 if isinstance(other, TimerHandle): 146 return self._when > other._when or self.__eq__(other) 147 return NotImplemented 148 149 def __eq__(self, other): 150 if isinstance(other, TimerHandle): 151 return (self._when == other._when and 152 self._callback == other._callback and 153 self._args == other._args and 154 self._cancelled == other._cancelled) 155 return NotImplemented 156 157 def cancel(self): 158 if not self._cancelled: 159 self._loop._timer_handle_cancelled(self) 160 super().cancel() 161 162 def when(self): 163 """Return a scheduled callback time. 164 165 The time is an absolute timestamp, using the same time 166 reference as loop.time(). 167 """ 168 return self._when 169 170 171class AbstractServer: 172 """Abstract server returned by create_server().""" 173 174 def close(self): 175 """Stop serving. This leaves existing connections open.""" 176 raise NotImplementedError 177 178 def close_clients(self): 179 """Close all active connections.""" 180 raise NotImplementedError 181 182 def abort_clients(self): 183 """Close all active connections immediately.""" 184 raise NotImplementedError 185 186 def get_loop(self): 187 """Get the event loop the Server object is attached to.""" 188 raise NotImplementedError 189 190 def is_serving(self): 191 """Return True if the server is accepting connections.""" 192 raise NotImplementedError 193 194 async def start_serving(self): 195 """Start accepting connections. 196 197 This method is idempotent, so it can be called when 198 the server is already being serving. 199 """ 200 raise NotImplementedError 201 202 async def serve_forever(self): 203 """Start accepting connections until the coroutine is cancelled. 204 205 The server is closed when the coroutine is cancelled. 206 """ 207 raise NotImplementedError 208 209 async def wait_closed(self): 210 """Coroutine to wait until service is closed.""" 211 raise NotImplementedError 212 213 async def __aenter__(self): 214 return self 215 216 async def __aexit__(self, *exc): 217 self.close() 218 await self.wait_closed() 219 220 221class AbstractEventLoop: 222 """Abstract event loop.""" 223 224 # Running and stopping the event loop. 225 226 def run_forever(self): 227 """Run the event loop until stop() is called.""" 228 raise NotImplementedError 229 230 def run_until_complete(self, future): 231 """Run the event loop until a Future is done. 232 233 Return the Future's result, or raise its exception. 234 """ 235 raise NotImplementedError 236 237 def stop(self): 238 """Stop the event loop as soon as reasonable. 239 240 Exactly how soon that is may depend on the implementation, but 241 no more I/O callbacks should be scheduled. 242 """ 243 raise NotImplementedError 244 245 def is_running(self): 246 """Return whether the event loop is currently running.""" 247 raise NotImplementedError 248 249 def is_closed(self): 250 """Returns True if the event loop was closed.""" 251 raise NotImplementedError 252 253 def close(self): 254 """Close the loop. 255 256 The loop should not be running. 257 258 This is idempotent and irreversible. 259 260 No other methods should be called after this one. 261 """ 262 raise NotImplementedError 263 264 async def shutdown_asyncgens(self): 265 """Shutdown all active asynchronous generators.""" 266 raise NotImplementedError 267 268 async def shutdown_default_executor(self): 269 """Schedule the shutdown of the default executor.""" 270 raise NotImplementedError 271 272 # Methods scheduling callbacks. All these return Handles. 273 274 def _timer_handle_cancelled(self, handle): 275 """Notification that a TimerHandle has been cancelled.""" 276 raise NotImplementedError 277 278 def call_soon(self, callback, *args, context=None): 279 return self.call_later(0, callback, *args, context=context) 280 281 def call_later(self, delay, callback, *args, context=None): 282 raise NotImplementedError 283 284 def call_at(self, when, callback, *args, context=None): 285 raise NotImplementedError 286 287 def time(self): 288 raise NotImplementedError 289 290 def create_future(self): 291 raise NotImplementedError 292 293 # Method scheduling a coroutine object: create a task. 294 295 def create_task(self, coro, *, name=None, context=None): 296 raise NotImplementedError 297 298 # Methods for interacting with threads. 299 300 def call_soon_threadsafe(self, callback, *args, context=None): 301 raise NotImplementedError 302 303 def run_in_executor(self, executor, func, *args): 304 raise NotImplementedError 305 306 def set_default_executor(self, executor): 307 raise NotImplementedError 308 309 # Network I/O methods returning Futures. 310 311 async def getaddrinfo(self, host, port, *, 312 family=0, type=0, proto=0, flags=0): 313 raise NotImplementedError 314 315 async def getnameinfo(self, sockaddr, flags=0): 316 raise NotImplementedError 317 318 async def create_connection( 319 self, protocol_factory, host=None, port=None, 320 *, ssl=None, family=0, proto=0, 321 flags=0, sock=None, local_addr=None, 322 server_hostname=None, 323 ssl_handshake_timeout=None, 324 ssl_shutdown_timeout=None, 325 happy_eyeballs_delay=None, interleave=None): 326 raise NotImplementedError 327 328 async def create_server( 329 self, protocol_factory, host=None, port=None, 330 *, family=socket.AF_UNSPEC, 331 flags=socket.AI_PASSIVE, sock=None, backlog=100, 332 ssl=None, reuse_address=None, reuse_port=None, 333 keep_alive=None, 334 ssl_handshake_timeout=None, 335 ssl_shutdown_timeout=None, 336 start_serving=True): 337 """A coroutine which creates a TCP server bound to host and port. 338 339 The return value is a Server object which can be used to stop 340 the service. 341 342 If host is an empty string or None all interfaces are assumed 343 and a list of multiple sockets will be returned (most likely 344 one for IPv4 and another one for IPv6). The host parameter can also be 345 a sequence (e.g. list) of hosts to bind to. 346 347 family can be set to either AF_INET or AF_INET6 to force the 348 socket to use IPv4 or IPv6. If not set it will be determined 349 from host (defaults to AF_UNSPEC). 350 351 flags is a bitmask for getaddrinfo(). 352 353 sock can optionally be specified in order to use a preexisting 354 socket object. 355 356 backlog is the maximum number of queued connections passed to 357 listen() (defaults to 100). 358 359 ssl can be set to an SSLContext to enable SSL over the 360 accepted connections. 361 362 reuse_address tells the kernel to reuse a local socket in 363 TIME_WAIT state, without waiting for its natural timeout to 364 expire. If not specified will automatically be set to True on 365 UNIX. 366 367 reuse_port tells the kernel to allow this endpoint to be bound to 368 the same port as other existing endpoints are bound to, so long as 369 they all set this flag when being created. This option is not 370 supported on Windows. 371 372 keep_alive set to True keeps connections active by enabling the 373 periodic transmission of messages. 374 375 ssl_handshake_timeout is the time in seconds that an SSL server 376 will wait for completion of the SSL handshake before aborting the 377 connection. Default is 60s. 378 379 ssl_shutdown_timeout is the time in seconds that an SSL server 380 will wait for completion of the SSL shutdown procedure 381 before aborting the connection. Default is 30s. 382 383 start_serving set to True (default) causes the created server 384 to start accepting connections immediately. When set to False, 385 the user should await Server.start_serving() or Server.serve_forever() 386 to make the server to start accepting connections. 387 """ 388 raise NotImplementedError 389 390 async def sendfile(self, transport, file, offset=0, count=None, 391 *, fallback=True): 392 """Send a file through a transport. 393 394 Return an amount of sent bytes. 395 """ 396 raise NotImplementedError 397 398 async def start_tls(self, transport, protocol, sslcontext, *, 399 server_side=False, 400 server_hostname=None, 401 ssl_handshake_timeout=None, 402 ssl_shutdown_timeout=None): 403 """Upgrade a transport to TLS. 404 405 Return a new transport that *protocol* should start using 406 immediately. 407 """ 408 raise NotImplementedError 409 410 async def create_unix_connection( 411 self, protocol_factory, path=None, *, 412 ssl=None, sock=None, 413 server_hostname=None, 414 ssl_handshake_timeout=None, 415 ssl_shutdown_timeout=None): 416 raise NotImplementedError 417 418 async def create_unix_server( 419 self, protocol_factory, path=None, *, 420 sock=None, backlog=100, ssl=None, 421 ssl_handshake_timeout=None, 422 ssl_shutdown_timeout=None, 423 start_serving=True): 424 """A coroutine which creates a UNIX Domain Socket server. 425 426 The return value is a Server object, which can be used to stop 427 the service. 428 429 path is a str, representing a file system path to bind the 430 server socket to. 431 432 sock can optionally be specified in order to use a preexisting 433 socket object. 434 435 backlog is the maximum number of queued connections passed to 436 listen() (defaults to 100). 437 438 ssl can be set to an SSLContext to enable SSL over the 439 accepted connections. 440 441 ssl_handshake_timeout is the time in seconds that an SSL server 442 will wait for the SSL handshake to complete (defaults to 60s). 443 444 ssl_shutdown_timeout is the time in seconds that an SSL server 445 will wait for the SSL shutdown to finish (defaults to 30s). 446 447 start_serving set to True (default) causes the created server 448 to start accepting connections immediately. When set to False, 449 the user should await Server.start_serving() or Server.serve_forever() 450 to make the server to start accepting connections. 451 """ 452 raise NotImplementedError 453 454 async def connect_accepted_socket( 455 self, protocol_factory, sock, 456 *, ssl=None, 457 ssl_handshake_timeout=None, 458 ssl_shutdown_timeout=None): 459 """Handle an accepted connection. 460 461 This is used by servers that accept connections outside of 462 asyncio, but use asyncio to handle connections. 463 464 This method is a coroutine. When completed, the coroutine 465 returns a (transport, protocol) pair. 466 """ 467 raise NotImplementedError 468 469 async def create_datagram_endpoint(self, protocol_factory, 470 local_addr=None, remote_addr=None, *, 471 family=0, proto=0, flags=0, 472 reuse_address=None, reuse_port=None, 473 allow_broadcast=None, sock=None): 474 """A coroutine which creates a datagram endpoint. 475 476 This method will try to establish the endpoint in the background. 477 When successful, the coroutine returns a (transport, protocol) pair. 478 479 protocol_factory must be a callable returning a protocol instance. 480 481 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on 482 host (or family if specified), socket type SOCK_DGRAM. 483 484 reuse_address tells the kernel to reuse a local socket in 485 TIME_WAIT state, without waiting for its natural timeout to 486 expire. If not specified it will automatically be set to True on 487 UNIX. 488 489 reuse_port tells the kernel to allow this endpoint to be bound to 490 the same port as other existing endpoints are bound to, so long as 491 they all set this flag when being created. This option is not 492 supported on Windows and some UNIX's. If the 493 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this 494 capability is unsupported. 495 496 allow_broadcast tells the kernel to allow this endpoint to send 497 messages to the broadcast address. 498 499 sock can optionally be specified in order to use a preexisting 500 socket object. 501 """ 502 raise NotImplementedError 503 504 # Pipes and subprocesses. 505 506 async def connect_read_pipe(self, protocol_factory, pipe): 507 """Register read pipe in event loop. Set the pipe to non-blocking mode. 508 509 protocol_factory should instantiate object with Protocol interface. 510 pipe is a file-like object. 511 Return pair (transport, protocol), where transport supports the 512 ReadTransport interface.""" 513 # The reason to accept file-like object instead of just file descriptor 514 # is: we need to own pipe and close it at transport finishing 515 # Can got complicated errors if pass f.fileno(), 516 # close fd in pipe transport then close f and vice versa. 517 raise NotImplementedError 518 519 async def connect_write_pipe(self, protocol_factory, pipe): 520 """Register write pipe in event loop. 521 522 protocol_factory should instantiate object with BaseProtocol interface. 523 Pipe is file-like object already switched to nonblocking. 524 Return pair (transport, protocol), where transport support 525 WriteTransport interface.""" 526 # The reason to accept file-like object instead of just file descriptor 527 # is: we need to own pipe and close it at transport finishing 528 # Can got complicated errors if pass f.fileno(), 529 # close fd in pipe transport then close f and vice versa. 530 raise NotImplementedError 531 532 async def subprocess_shell(self, protocol_factory, cmd, *, 533 stdin=subprocess.PIPE, 534 stdout=subprocess.PIPE, 535 stderr=subprocess.PIPE, 536 **kwargs): 537 raise NotImplementedError 538 539 async def subprocess_exec(self, protocol_factory, *args, 540 stdin=subprocess.PIPE, 541 stdout=subprocess.PIPE, 542 stderr=subprocess.PIPE, 543 **kwargs): 544 raise NotImplementedError 545 546 # Ready-based callback registration methods. 547 # The add_*() methods return None. 548 # The remove_*() methods return True if something was removed, 549 # False if there was nothing to delete. 550 551 def add_reader(self, fd, callback, *args): 552 raise NotImplementedError 553 554 def remove_reader(self, fd): 555 raise NotImplementedError 556 557 def add_writer(self, fd, callback, *args): 558 raise NotImplementedError 559 560 def remove_writer(self, fd): 561 raise NotImplementedError 562 563 # Completion based I/O methods returning Futures. 564 565 async def sock_recv(self, sock, nbytes): 566 raise NotImplementedError 567 568 async def sock_recv_into(self, sock, buf): 569 raise NotImplementedError 570 571 async def sock_recvfrom(self, sock, bufsize): 572 raise NotImplementedError 573 574 async def sock_recvfrom_into(self, sock, buf, nbytes=0): 575 raise NotImplementedError 576 577 async def sock_sendall(self, sock, data): 578 raise NotImplementedError 579 580 async def sock_sendto(self, sock, data, address): 581 raise NotImplementedError 582 583 async def sock_connect(self, sock, address): 584 raise NotImplementedError 585 586 async def sock_accept(self, sock): 587 raise NotImplementedError 588 589 async def sock_sendfile(self, sock, file, offset=0, count=None, 590 *, fallback=None): 591 raise NotImplementedError 592 593 # Signal handling. 594 595 def add_signal_handler(self, sig, callback, *args): 596 raise NotImplementedError 597 598 def remove_signal_handler(self, sig): 599 raise NotImplementedError 600 601 # Task factory. 602 603 def set_task_factory(self, factory): 604 raise NotImplementedError 605 606 def get_task_factory(self): 607 raise NotImplementedError 608 609 # Error handlers. 610 611 def get_exception_handler(self): 612 raise NotImplementedError 613 614 def set_exception_handler(self, handler): 615 raise NotImplementedError 616 617 def default_exception_handler(self, context): 618 raise NotImplementedError 619 620 def call_exception_handler(self, context): 621 raise NotImplementedError 622 623 # Debug flag management. 624 625 def get_debug(self): 626 raise NotImplementedError 627 628 def set_debug(self, enabled): 629 raise NotImplementedError 630 631 632class AbstractEventLoopPolicy: 633 """Abstract policy for accessing the event loop.""" 634 635 def get_event_loop(self): 636 """Get the event loop for the current context. 637 638 Returns an event loop object implementing the AbstractEventLoop interface, 639 or raises an exception in case no event loop has been set for the 640 current context and the current policy does not specify to create one. 641 642 It should never return None.""" 643 raise NotImplementedError 644 645 def set_event_loop(self, loop): 646 """Set the event loop for the current context to loop.""" 647 raise NotImplementedError 648 649 def new_event_loop(self): 650 """Create and return a new event loop object according to this 651 policy's rules. If there's need to set this loop as the event loop for 652 the current context, set_event_loop must be called explicitly.""" 653 raise NotImplementedError 654 655 # Child processes handling (Unix only). 656 657 def get_child_watcher(self): 658 "Get the watcher for child processes." 659 raise NotImplementedError 660 661 def set_child_watcher(self, watcher): 662 """Set the watcher for child processes.""" 663 raise NotImplementedError 664 665 666class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): 667 """Default policy implementation for accessing the event loop. 668 669 In this policy, each thread has its own event loop. However, we 670 only automatically create an event loop by default for the main 671 thread; other threads by default have no event loop. 672 673 Other policies may have different rules (e.g. a single global 674 event loop, or automatically creating an event loop per thread, or 675 using some other notion of context to which an event loop is 676 associated). 677 """ 678 679 _loop_factory = None 680 681 class _Local(threading.local): 682 _loop = None 683 _set_called = False 684 685 def __init__(self): 686 self._local = self._Local() 687 688 def get_event_loop(self): 689 """Get the event loop for the current context. 690 691 Returns an instance of EventLoop or raises an exception. 692 """ 693 if (self._local._loop is None and 694 not self._local._set_called and 695 threading.current_thread() is threading.main_thread()): 696 stacklevel = 2 697 try: 698 f = sys._getframe(1) 699 except AttributeError: 700 pass 701 else: 702 # Move up the call stack so that the warning is attached 703 # to the line outside asyncio itself. 704 while f: 705 module = f.f_globals.get('__name__') 706 if not (module == 'asyncio' or module.startswith('asyncio.')): 707 break 708 f = f.f_back 709 stacklevel += 1 710 import warnings 711 warnings.warn('There is no current event loop', 712 DeprecationWarning, stacklevel=stacklevel) 713 self.set_event_loop(self.new_event_loop()) 714 715 if self._local._loop is None: 716 raise RuntimeError('There is no current event loop in thread %r.' 717 % threading.current_thread().name) 718 719 return self._local._loop 720 721 def set_event_loop(self, loop): 722 """Set the event loop.""" 723 self._local._set_called = True 724 if loop is not None and not isinstance(loop, AbstractEventLoop): 725 raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'") 726 self._local._loop = loop 727 728 def new_event_loop(self): 729 """Create a new event loop. 730 731 You must call set_event_loop() to make this the current event 732 loop. 733 """ 734 return self._loop_factory() 735 736 737# Event loop policy. The policy itself is always global, even if the 738# policy's rules say that there is an event loop per thread (or other 739# notion of context). The default policy is installed by the first 740# call to get_event_loop_policy(). 741_event_loop_policy = None 742 743# Lock for protecting the on-the-fly creation of the event loop policy. 744_lock = threading.Lock() 745 746 747# A TLS for the running event loop, used by _get_running_loop. 748class _RunningLoop(threading.local): 749 loop_pid = (None, None) 750 751 752_running_loop = _RunningLoop() 753 754 755def get_running_loop(): 756 """Return the running event loop. Raise a RuntimeError if there is none. 757 758 This function is thread-specific. 759 """ 760 # NOTE: this function is implemented in C (see _asynciomodule.c) 761 loop = _get_running_loop() 762 if loop is None: 763 raise RuntimeError('no running event loop') 764 return loop 765 766 767def _get_running_loop(): 768 """Return the running event loop or None. 769 770 This is a low-level function intended to be used by event loops. 771 This function is thread-specific. 772 """ 773 # NOTE: this function is implemented in C (see _asynciomodule.c) 774 running_loop, pid = _running_loop.loop_pid 775 if running_loop is not None and pid == os.getpid(): 776 return running_loop 777 778 779def _set_running_loop(loop): 780 """Set the running event loop. 781 782 This is a low-level function intended to be used by event loops. 783 This function is thread-specific. 784 """ 785 # NOTE: this function is implemented in C (see _asynciomodule.c) 786 _running_loop.loop_pid = (loop, os.getpid()) 787 788 789def _init_event_loop_policy(): 790 global _event_loop_policy 791 with _lock: 792 if _event_loop_policy is None: # pragma: no branch 793 from . import DefaultEventLoopPolicy 794 _event_loop_policy = DefaultEventLoopPolicy() 795 796 797def get_event_loop_policy(): 798 """Get the current event loop policy.""" 799 if _event_loop_policy is None: 800 _init_event_loop_policy() 801 return _event_loop_policy 802 803 804def set_event_loop_policy(policy): 805 """Set the current event loop policy. 806 807 If policy is None, the default policy is restored.""" 808 global _event_loop_policy 809 if policy is not None and not isinstance(policy, AbstractEventLoopPolicy): 810 raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'") 811 _event_loop_policy = policy 812 813 814def get_event_loop(): 815 """Return an asyncio event loop. 816 817 When called from a coroutine or a callback (e.g. scheduled with call_soon 818 or similar API), this function will always return the running event loop. 819 820 If there is no running event loop set, the function will return 821 the result of `get_event_loop_policy().get_event_loop()` call. 822 """ 823 # NOTE: this function is implemented in C (see _asynciomodule.c) 824 current_loop = _get_running_loop() 825 if current_loop is not None: 826 return current_loop 827 return get_event_loop_policy().get_event_loop() 828 829 830def set_event_loop(loop): 831 """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" 832 get_event_loop_policy().set_event_loop(loop) 833 834 835def new_event_loop(): 836 """Equivalent to calling get_event_loop_policy().new_event_loop().""" 837 return get_event_loop_policy().new_event_loop() 838 839 840def get_child_watcher(): 841 """Equivalent to calling get_event_loop_policy().get_child_watcher().""" 842 return get_event_loop_policy().get_child_watcher() 843 844 845def set_child_watcher(watcher): 846 """Equivalent to calling 847 get_event_loop_policy().set_child_watcher(watcher).""" 848 return get_event_loop_policy().set_child_watcher(watcher) 849 850 851# Alias pure-Python implementations for testing purposes. 852_py__get_running_loop = _get_running_loop 853_py__set_running_loop = _set_running_loop 854_py_get_running_loop = get_running_loop 855_py_get_event_loop = get_event_loop 856 857 858try: 859 # get_event_loop() is one of the most frequently called 860 # functions in asyncio. Pure Python implementation is 861 # about 4 times slower than C-accelerated. 862 from _asyncio import (_get_running_loop, _set_running_loop, 863 get_running_loop, get_event_loop) 864except ImportError: 865 pass 866else: 867 # Alias C implementations for testing purposes. 868 _c__get_running_loop = _get_running_loop 869 _c__set_running_loop = _set_running_loop 870 _c_get_running_loop = get_running_loop 871 _c_get_event_loop = get_event_loop 872 873 874if hasattr(os, 'fork'): 875 def on_fork(): 876 # Reset the loop and wakeupfd in the forked child process. 877 if _event_loop_policy is not None: 878 _event_loop_policy._local = BaseDefaultEventLoopPolicy._Local() 879 _set_running_loop(None) 880 signal.set_wakeup_fd(-1) 881 882 os.register_at_fork(after_in_child=on_fork) 883