• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Event loop and event loop policy."""
2
3# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0
4# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0)
5# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc.  http://magic.io
6
7__all__ = (
8    'AbstractEventLoopPolicy',
9    'AbstractEventLoop', 'AbstractServer',
10    'Handle', 'TimerHandle',
11    'get_event_loop_policy', 'set_event_loop_policy',
12    'get_event_loop', 'set_event_loop', 'new_event_loop',
13    'get_child_watcher', 'set_child_watcher',
14    '_set_running_loop', 'get_running_loop',
15    '_get_running_loop',
16)
17
18import contextvars
19import os
20import signal
21import socket
22import subprocess
23import sys
24import threading
25
26from . import format_helpers
27
28
29class Handle:
30    """Object returned by callback registration methods."""
31
32    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
33                 '_source_traceback', '_repr', '__weakref__',
34                 '_context')
35
36    def __init__(self, callback, args, loop, context=None):
37        if context is None:
38            context = contextvars.copy_context()
39        self._context = context
40        self._loop = loop
41        self._callback = callback
42        self._args = args
43        self._cancelled = False
44        self._repr = None
45        if self._loop.get_debug():
46            self._source_traceback = format_helpers.extract_stack(
47                sys._getframe(1))
48        else:
49            self._source_traceback = None
50
51    def _repr_info(self):
52        info = [self.__class__.__name__]
53        if self._cancelled:
54            info.append('cancelled')
55        if self._callback is not None:
56            info.append(format_helpers._format_callback_source(
57                self._callback, self._args,
58                debug=self._loop.get_debug()))
59        if self._source_traceback:
60            frame = self._source_traceback[-1]
61            info.append(f'created at {frame[0]}:{frame[1]}')
62        return info
63
64    def __repr__(self):
65        if self._repr is not None:
66            return self._repr
67        info = self._repr_info()
68        return '<{}>'.format(' '.join(info))
69
70    def get_context(self):
71        return self._context
72
73    def cancel(self):
74        if not self._cancelled:
75            self._cancelled = True
76            if self._loop.get_debug():
77                # Keep a representation in debug mode to keep callback and
78                # parameters. For example, to log the warning
79                # "Executing <Handle...> took 2.5 second"
80                self._repr = repr(self)
81            self._callback = None
82            self._args = None
83
84    def cancelled(self):
85        return self._cancelled
86
87    def _run(self):
88        try:
89            self._context.run(self._callback, *self._args)
90        except (SystemExit, KeyboardInterrupt):
91            raise
92        except BaseException as exc:
93            cb = format_helpers._format_callback_source(
94                self._callback, self._args,
95                debug=self._loop.get_debug())
96            msg = f'Exception in callback {cb}'
97            context = {
98                'message': msg,
99                'exception': exc,
100                'handle': self,
101            }
102            if self._source_traceback:
103                context['source_traceback'] = self._source_traceback
104            self._loop.call_exception_handler(context)
105        self = None  # Needed to break cycles when an exception occurs.
106
107
108class TimerHandle(Handle):
109    """Object returned by timed callback registration methods."""
110
111    __slots__ = ['_scheduled', '_when']
112
113    def __init__(self, when, callback, args, loop, context=None):
114        super().__init__(callback, args, loop, context)
115        if self._source_traceback:
116            del self._source_traceback[-1]
117        self._when = when
118        self._scheduled = False
119
120    def _repr_info(self):
121        info = super()._repr_info()
122        pos = 2 if self._cancelled else 1
123        info.insert(pos, f'when={self._when}')
124        return info
125
126    def __hash__(self):
127        return hash(self._when)
128
129    def __lt__(self, other):
130        if isinstance(other, TimerHandle):
131            return self._when < other._when
132        return NotImplemented
133
134    def __le__(self, other):
135        if isinstance(other, TimerHandle):
136            return self._when < other._when or self.__eq__(other)
137        return NotImplemented
138
139    def __gt__(self, other):
140        if isinstance(other, TimerHandle):
141            return self._when > other._when
142        return NotImplemented
143
144    def __ge__(self, other):
145        if isinstance(other, TimerHandle):
146            return self._when > other._when or self.__eq__(other)
147        return NotImplemented
148
149    def __eq__(self, other):
150        if isinstance(other, TimerHandle):
151            return (self._when == other._when and
152                    self._callback == other._callback and
153                    self._args == other._args and
154                    self._cancelled == other._cancelled)
155        return NotImplemented
156
157    def cancel(self):
158        if not self._cancelled:
159            self._loop._timer_handle_cancelled(self)
160        super().cancel()
161
162    def when(self):
163        """Return a scheduled callback time.
164
165        The time is an absolute timestamp, using the same time
166        reference as loop.time().
167        """
168        return self._when
169
170
171class AbstractServer:
172    """Abstract server returned by create_server()."""
173
174    def close(self):
175        """Stop serving.  This leaves existing connections open."""
176        raise NotImplementedError
177
178    def close_clients(self):
179        """Close all active connections."""
180        raise NotImplementedError
181
182    def abort_clients(self):
183        """Close all active connections immediately."""
184        raise NotImplementedError
185
186    def get_loop(self):
187        """Get the event loop the Server object is attached to."""
188        raise NotImplementedError
189
190    def is_serving(self):
191        """Return True if the server is accepting connections."""
192        raise NotImplementedError
193
194    async def start_serving(self):
195        """Start accepting connections.
196
197        This method is idempotent, so it can be called when
198        the server is already being serving.
199        """
200        raise NotImplementedError
201
202    async def serve_forever(self):
203        """Start accepting connections until the coroutine is cancelled.
204
205        The server is closed when the coroutine is cancelled.
206        """
207        raise NotImplementedError
208
209    async def wait_closed(self):
210        """Coroutine to wait until service is closed."""
211        raise NotImplementedError
212
213    async def __aenter__(self):
214        return self
215
216    async def __aexit__(self, *exc):
217        self.close()
218        await self.wait_closed()
219
220
221class AbstractEventLoop:
222    """Abstract event loop."""
223
224    # Running and stopping the event loop.
225
226    def run_forever(self):
227        """Run the event loop until stop() is called."""
228        raise NotImplementedError
229
230    def run_until_complete(self, future):
231        """Run the event loop until a Future is done.
232
233        Return the Future's result, or raise its exception.
234        """
235        raise NotImplementedError
236
237    def stop(self):
238        """Stop the event loop as soon as reasonable.
239
240        Exactly how soon that is may depend on the implementation, but
241        no more I/O callbacks should be scheduled.
242        """
243        raise NotImplementedError
244
245    def is_running(self):
246        """Return whether the event loop is currently running."""
247        raise NotImplementedError
248
249    def is_closed(self):
250        """Returns True if the event loop was closed."""
251        raise NotImplementedError
252
253    def close(self):
254        """Close the loop.
255
256        The loop should not be running.
257
258        This is idempotent and irreversible.
259
260        No other methods should be called after this one.
261        """
262        raise NotImplementedError
263
264    async def shutdown_asyncgens(self):
265        """Shutdown all active asynchronous generators."""
266        raise NotImplementedError
267
268    async def shutdown_default_executor(self):
269        """Schedule the shutdown of the default executor."""
270        raise NotImplementedError
271
272    # Methods scheduling callbacks.  All these return Handles.
273
274    def _timer_handle_cancelled(self, handle):
275        """Notification that a TimerHandle has been cancelled."""
276        raise NotImplementedError
277
278    def call_soon(self, callback, *args, context=None):
279        return self.call_later(0, callback, *args, context=context)
280
281    def call_later(self, delay, callback, *args, context=None):
282        raise NotImplementedError
283
284    def call_at(self, when, callback, *args, context=None):
285        raise NotImplementedError
286
287    def time(self):
288        raise NotImplementedError
289
290    def create_future(self):
291        raise NotImplementedError
292
293    # Method scheduling a coroutine object: create a task.
294
295    def create_task(self, coro, *, name=None, context=None):
296        raise NotImplementedError
297
298    # Methods for interacting with threads.
299
300    def call_soon_threadsafe(self, callback, *args, context=None):
301        raise NotImplementedError
302
303    def run_in_executor(self, executor, func, *args):
304        raise NotImplementedError
305
306    def set_default_executor(self, executor):
307        raise NotImplementedError
308
309    # Network I/O methods returning Futures.
310
311    async def getaddrinfo(self, host, port, *,
312                          family=0, type=0, proto=0, flags=0):
313        raise NotImplementedError
314
315    async def getnameinfo(self, sockaddr, flags=0):
316        raise NotImplementedError
317
318    async def create_connection(
319            self, protocol_factory, host=None, port=None,
320            *, ssl=None, family=0, proto=0,
321            flags=0, sock=None, local_addr=None,
322            server_hostname=None,
323            ssl_handshake_timeout=None,
324            ssl_shutdown_timeout=None,
325            happy_eyeballs_delay=None, interleave=None):
326        raise NotImplementedError
327
328    async def create_server(
329            self, protocol_factory, host=None, port=None,
330            *, family=socket.AF_UNSPEC,
331            flags=socket.AI_PASSIVE, sock=None, backlog=100,
332            ssl=None, reuse_address=None, reuse_port=None,
333            keep_alive=None,
334            ssl_handshake_timeout=None,
335            ssl_shutdown_timeout=None,
336            start_serving=True):
337        """A coroutine which creates a TCP server bound to host and port.
338
339        The return value is a Server object which can be used to stop
340        the service.
341
342        If host is an empty string or None all interfaces are assumed
343        and a list of multiple sockets will be returned (most likely
344        one for IPv4 and another one for IPv6). The host parameter can also be
345        a sequence (e.g. list) of hosts to bind to.
346
347        family can be set to either AF_INET or AF_INET6 to force the
348        socket to use IPv4 or IPv6. If not set it will be determined
349        from host (defaults to AF_UNSPEC).
350
351        flags is a bitmask for getaddrinfo().
352
353        sock can optionally be specified in order to use a preexisting
354        socket object.
355
356        backlog is the maximum number of queued connections passed to
357        listen() (defaults to 100).
358
359        ssl can be set to an SSLContext to enable SSL over the
360        accepted connections.
361
362        reuse_address tells the kernel to reuse a local socket in
363        TIME_WAIT state, without waiting for its natural timeout to
364        expire. If not specified will automatically be set to True on
365        UNIX.
366
367        reuse_port tells the kernel to allow this endpoint to be bound to
368        the same port as other existing endpoints are bound to, so long as
369        they all set this flag when being created. This option is not
370        supported on Windows.
371
372        keep_alive set to True keeps connections active by enabling the
373        periodic transmission of messages.
374
375        ssl_handshake_timeout is the time in seconds that an SSL server
376        will wait for completion of the SSL handshake before aborting the
377        connection. Default is 60s.
378
379        ssl_shutdown_timeout is the time in seconds that an SSL server
380        will wait for completion of the SSL shutdown procedure
381        before aborting the connection. Default is 30s.
382
383        start_serving set to True (default) causes the created server
384        to start accepting connections immediately.  When set to False,
385        the user should await Server.start_serving() or Server.serve_forever()
386        to make the server to start accepting connections.
387        """
388        raise NotImplementedError
389
390    async def sendfile(self, transport, file, offset=0, count=None,
391                       *, fallback=True):
392        """Send a file through a transport.
393
394        Return an amount of sent bytes.
395        """
396        raise NotImplementedError
397
398    async def start_tls(self, transport, protocol, sslcontext, *,
399                        server_side=False,
400                        server_hostname=None,
401                        ssl_handshake_timeout=None,
402                        ssl_shutdown_timeout=None):
403        """Upgrade a transport to TLS.
404
405        Return a new transport that *protocol* should start using
406        immediately.
407        """
408        raise NotImplementedError
409
410    async def create_unix_connection(
411            self, protocol_factory, path=None, *,
412            ssl=None, sock=None,
413            server_hostname=None,
414            ssl_handshake_timeout=None,
415            ssl_shutdown_timeout=None):
416        raise NotImplementedError
417
418    async def create_unix_server(
419            self, protocol_factory, path=None, *,
420            sock=None, backlog=100, ssl=None,
421            ssl_handshake_timeout=None,
422            ssl_shutdown_timeout=None,
423            start_serving=True):
424        """A coroutine which creates a UNIX Domain Socket server.
425
426        The return value is a Server object, which can be used to stop
427        the service.
428
429        path is a str, representing a file system path to bind the
430        server socket to.
431
432        sock can optionally be specified in order to use a preexisting
433        socket object.
434
435        backlog is the maximum number of queued connections passed to
436        listen() (defaults to 100).
437
438        ssl can be set to an SSLContext to enable SSL over the
439        accepted connections.
440
441        ssl_handshake_timeout is the time in seconds that an SSL server
442        will wait for the SSL handshake to complete (defaults to 60s).
443
444        ssl_shutdown_timeout is the time in seconds that an SSL server
445        will wait for the SSL shutdown to finish (defaults to 30s).
446
447        start_serving set to True (default) causes the created server
448        to start accepting connections immediately.  When set to False,
449        the user should await Server.start_serving() or Server.serve_forever()
450        to make the server to start accepting connections.
451        """
452        raise NotImplementedError
453
454    async def connect_accepted_socket(
455            self, protocol_factory, sock,
456            *, ssl=None,
457            ssl_handshake_timeout=None,
458            ssl_shutdown_timeout=None):
459        """Handle an accepted connection.
460
461        This is used by servers that accept connections outside of
462        asyncio, but use asyncio to handle connections.
463
464        This method is a coroutine.  When completed, the coroutine
465        returns a (transport, protocol) pair.
466        """
467        raise NotImplementedError
468
469    async def create_datagram_endpoint(self, protocol_factory,
470                                       local_addr=None, remote_addr=None, *,
471                                       family=0, proto=0, flags=0,
472                                       reuse_address=None, reuse_port=None,
473                                       allow_broadcast=None, sock=None):
474        """A coroutine which creates a datagram endpoint.
475
476        This method will try to establish the endpoint in the background.
477        When successful, the coroutine returns a (transport, protocol) pair.
478
479        protocol_factory must be a callable returning a protocol instance.
480
481        socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
482        host (or family if specified), socket type SOCK_DGRAM.
483
484        reuse_address tells the kernel to reuse a local socket in
485        TIME_WAIT state, without waiting for its natural timeout to
486        expire. If not specified it will automatically be set to True on
487        UNIX.
488
489        reuse_port tells the kernel to allow this endpoint to be bound to
490        the same port as other existing endpoints are bound to, so long as
491        they all set this flag when being created. This option is not
492        supported on Windows and some UNIX's. If the
493        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
494        capability is unsupported.
495
496        allow_broadcast tells the kernel to allow this endpoint to send
497        messages to the broadcast address.
498
499        sock can optionally be specified in order to use a preexisting
500        socket object.
501        """
502        raise NotImplementedError
503
504    # Pipes and subprocesses.
505
506    async def connect_read_pipe(self, protocol_factory, pipe):
507        """Register read pipe in event loop. Set the pipe to non-blocking mode.
508
509        protocol_factory should instantiate object with Protocol interface.
510        pipe is a file-like object.
511        Return pair (transport, protocol), where transport supports the
512        ReadTransport interface."""
513        # The reason to accept file-like object instead of just file descriptor
514        # is: we need to own pipe and close it at transport finishing
515        # Can got complicated errors if pass f.fileno(),
516        # close fd in pipe transport then close f and vice versa.
517        raise NotImplementedError
518
519    async def connect_write_pipe(self, protocol_factory, pipe):
520        """Register write pipe in event loop.
521
522        protocol_factory should instantiate object with BaseProtocol interface.
523        Pipe is file-like object already switched to nonblocking.
524        Return pair (transport, protocol), where transport support
525        WriteTransport interface."""
526        # The reason to accept file-like object instead of just file descriptor
527        # is: we need to own pipe and close it at transport finishing
528        # Can got complicated errors if pass f.fileno(),
529        # close fd in pipe transport then close f and vice versa.
530        raise NotImplementedError
531
532    async def subprocess_shell(self, protocol_factory, cmd, *,
533                               stdin=subprocess.PIPE,
534                               stdout=subprocess.PIPE,
535                               stderr=subprocess.PIPE,
536                               **kwargs):
537        raise NotImplementedError
538
539    async def subprocess_exec(self, protocol_factory, *args,
540                              stdin=subprocess.PIPE,
541                              stdout=subprocess.PIPE,
542                              stderr=subprocess.PIPE,
543                              **kwargs):
544        raise NotImplementedError
545
546    # Ready-based callback registration methods.
547    # The add_*() methods return None.
548    # The remove_*() methods return True if something was removed,
549    # False if there was nothing to delete.
550
551    def add_reader(self, fd, callback, *args):
552        raise NotImplementedError
553
554    def remove_reader(self, fd):
555        raise NotImplementedError
556
557    def add_writer(self, fd, callback, *args):
558        raise NotImplementedError
559
560    def remove_writer(self, fd):
561        raise NotImplementedError
562
563    # Completion based I/O methods returning Futures.
564
565    async def sock_recv(self, sock, nbytes):
566        raise NotImplementedError
567
568    async def sock_recv_into(self, sock, buf):
569        raise NotImplementedError
570
571    async def sock_recvfrom(self, sock, bufsize):
572        raise NotImplementedError
573
574    async def sock_recvfrom_into(self, sock, buf, nbytes=0):
575        raise NotImplementedError
576
577    async def sock_sendall(self, sock, data):
578        raise NotImplementedError
579
580    async def sock_sendto(self, sock, data, address):
581        raise NotImplementedError
582
583    async def sock_connect(self, sock, address):
584        raise NotImplementedError
585
586    async def sock_accept(self, sock):
587        raise NotImplementedError
588
589    async def sock_sendfile(self, sock, file, offset=0, count=None,
590                            *, fallback=None):
591        raise NotImplementedError
592
593    # Signal handling.
594
595    def add_signal_handler(self, sig, callback, *args):
596        raise NotImplementedError
597
598    def remove_signal_handler(self, sig):
599        raise NotImplementedError
600
601    # Task factory.
602
603    def set_task_factory(self, factory):
604        raise NotImplementedError
605
606    def get_task_factory(self):
607        raise NotImplementedError
608
609    # Error handlers.
610
611    def get_exception_handler(self):
612        raise NotImplementedError
613
614    def set_exception_handler(self, handler):
615        raise NotImplementedError
616
617    def default_exception_handler(self, context):
618        raise NotImplementedError
619
620    def call_exception_handler(self, context):
621        raise NotImplementedError
622
623    # Debug flag management.
624
625    def get_debug(self):
626        raise NotImplementedError
627
628    def set_debug(self, enabled):
629        raise NotImplementedError
630
631
632class AbstractEventLoopPolicy:
633    """Abstract policy for accessing the event loop."""
634
635    def get_event_loop(self):
636        """Get the event loop for the current context.
637
638        Returns an event loop object implementing the AbstractEventLoop interface,
639        or raises an exception in case no event loop has been set for the
640        current context and the current policy does not specify to create one.
641
642        It should never return None."""
643        raise NotImplementedError
644
645    def set_event_loop(self, loop):
646        """Set the event loop for the current context to loop."""
647        raise NotImplementedError
648
649    def new_event_loop(self):
650        """Create and return a new event loop object according to this
651        policy's rules. If there's need to set this loop as the event loop for
652        the current context, set_event_loop must be called explicitly."""
653        raise NotImplementedError
654
655    # Child processes handling (Unix only).
656
657    def get_child_watcher(self):
658        "Get the watcher for child processes."
659        raise NotImplementedError
660
661    def set_child_watcher(self, watcher):
662        """Set the watcher for child processes."""
663        raise NotImplementedError
664
665
666class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
667    """Default policy implementation for accessing the event loop.
668
669    In this policy, each thread has its own event loop.  However, we
670    only automatically create an event loop by default for the main
671    thread; other threads by default have no event loop.
672
673    Other policies may have different rules (e.g. a single global
674    event loop, or automatically creating an event loop per thread, or
675    using some other notion of context to which an event loop is
676    associated).
677    """
678
679    _loop_factory = None
680
681    class _Local(threading.local):
682        _loop = None
683        _set_called = False
684
685    def __init__(self):
686        self._local = self._Local()
687
688    def get_event_loop(self):
689        """Get the event loop for the current context.
690
691        Returns an instance of EventLoop or raises an exception.
692        """
693        if (self._local._loop is None and
694                not self._local._set_called and
695                threading.current_thread() is threading.main_thread()):
696            stacklevel = 2
697            try:
698                f = sys._getframe(1)
699            except AttributeError:
700                pass
701            else:
702                # Move up the call stack so that the warning is attached
703                # to the line outside asyncio itself.
704                while f:
705                    module = f.f_globals.get('__name__')
706                    if not (module == 'asyncio' or module.startswith('asyncio.')):
707                        break
708                    f = f.f_back
709                    stacklevel += 1
710            import warnings
711            warnings.warn('There is no current event loop',
712                          DeprecationWarning, stacklevel=stacklevel)
713            self.set_event_loop(self.new_event_loop())
714
715        if self._local._loop is None:
716            raise RuntimeError('There is no current event loop in thread %r.'
717                               % threading.current_thread().name)
718
719        return self._local._loop
720
721    def set_event_loop(self, loop):
722        """Set the event loop."""
723        self._local._set_called = True
724        if loop is not None and not isinstance(loop, AbstractEventLoop):
725            raise TypeError(f"loop must be an instance of AbstractEventLoop or None, not '{type(loop).__name__}'")
726        self._local._loop = loop
727
728    def new_event_loop(self):
729        """Create a new event loop.
730
731        You must call set_event_loop() to make this the current event
732        loop.
733        """
734        return self._loop_factory()
735
736
737# Event loop policy.  The policy itself is always global, even if the
738# policy's rules say that there is an event loop per thread (or other
739# notion of context).  The default policy is installed by the first
740# call to get_event_loop_policy().
741_event_loop_policy = None
742
743# Lock for protecting the on-the-fly creation of the event loop policy.
744_lock = threading.Lock()
745
746
747# A TLS for the running event loop, used by _get_running_loop.
748class _RunningLoop(threading.local):
749    loop_pid = (None, None)
750
751
752_running_loop = _RunningLoop()
753
754
755def get_running_loop():
756    """Return the running event loop.  Raise a RuntimeError if there is none.
757
758    This function is thread-specific.
759    """
760    # NOTE: this function is implemented in C (see _asynciomodule.c)
761    loop = _get_running_loop()
762    if loop is None:
763        raise RuntimeError('no running event loop')
764    return loop
765
766
767def _get_running_loop():
768    """Return the running event loop or None.
769
770    This is a low-level function intended to be used by event loops.
771    This function is thread-specific.
772    """
773    # NOTE: this function is implemented in C (see _asynciomodule.c)
774    running_loop, pid = _running_loop.loop_pid
775    if running_loop is not None and pid == os.getpid():
776        return running_loop
777
778
779def _set_running_loop(loop):
780    """Set the running event loop.
781
782    This is a low-level function intended to be used by event loops.
783    This function is thread-specific.
784    """
785    # NOTE: this function is implemented in C (see _asynciomodule.c)
786    _running_loop.loop_pid = (loop, os.getpid())
787
788
789def _init_event_loop_policy():
790    global _event_loop_policy
791    with _lock:
792        if _event_loop_policy is None:  # pragma: no branch
793            from . import DefaultEventLoopPolicy
794            _event_loop_policy = DefaultEventLoopPolicy()
795
796
797def get_event_loop_policy():
798    """Get the current event loop policy."""
799    if _event_loop_policy is None:
800        _init_event_loop_policy()
801    return _event_loop_policy
802
803
804def set_event_loop_policy(policy):
805    """Set the current event loop policy.
806
807    If policy is None, the default policy is restored."""
808    global _event_loop_policy
809    if policy is not None and not isinstance(policy, AbstractEventLoopPolicy):
810        raise TypeError(f"policy must be an instance of AbstractEventLoopPolicy or None, not '{type(policy).__name__}'")
811    _event_loop_policy = policy
812
813
814def get_event_loop():
815    """Return an asyncio event loop.
816
817    When called from a coroutine or a callback (e.g. scheduled with call_soon
818    or similar API), this function will always return the running event loop.
819
820    If there is no running event loop set, the function will return
821    the result of `get_event_loop_policy().get_event_loop()` call.
822    """
823    # NOTE: this function is implemented in C (see _asynciomodule.c)
824    current_loop = _get_running_loop()
825    if current_loop is not None:
826        return current_loop
827    return get_event_loop_policy().get_event_loop()
828
829
830def set_event_loop(loop):
831    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
832    get_event_loop_policy().set_event_loop(loop)
833
834
835def new_event_loop():
836    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
837    return get_event_loop_policy().new_event_loop()
838
839
840def get_child_watcher():
841    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
842    return get_event_loop_policy().get_child_watcher()
843
844
845def set_child_watcher(watcher):
846    """Equivalent to calling
847    get_event_loop_policy().set_child_watcher(watcher)."""
848    return get_event_loop_policy().set_child_watcher(watcher)
849
850
851# Alias pure-Python implementations for testing purposes.
852_py__get_running_loop = _get_running_loop
853_py__set_running_loop = _set_running_loop
854_py_get_running_loop = get_running_loop
855_py_get_event_loop = get_event_loop
856
857
858try:
859    # get_event_loop() is one of the most frequently called
860    # functions in asyncio.  Pure Python implementation is
861    # about 4 times slower than C-accelerated.
862    from _asyncio import (_get_running_loop, _set_running_loop,
863                          get_running_loop, get_event_loop)
864except ImportError:
865    pass
866else:
867    # Alias C implementations for testing purposes.
868    _c__get_running_loop = _get_running_loop
869    _c__set_running_loop = _set_running_loop
870    _c_get_running_loop = get_running_loop
871    _c_get_event_loop = get_event_loop
872
873
874if hasattr(os, 'fork'):
875    def on_fork():
876        # Reset the loop and wakeupfd in the forked child process.
877        if _event_loop_policy is not None:
878            _event_loop_policy._local = BaseDefaultEventLoopPolicy._Local()
879        _set_running_loop(None)
880        signal.set_wakeup_fd(-1)
881
882    os.register_at_fork(after_in_child=on_fork)
883