• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Event loop and event loop policy."""
2
3__all__ = ['AbstractEventLoopPolicy',
4           'AbstractEventLoop', 'AbstractServer',
5           'Handle', 'TimerHandle',
6           'get_event_loop_policy', 'set_event_loop_policy',
7           'get_event_loop', 'set_event_loop', 'new_event_loop',
8           'get_child_watcher', 'set_child_watcher',
9           '_set_running_loop', '_get_running_loop',
10           ]
11
12import functools
13import inspect
14import os
15import reprlib
16import socket
17import subprocess
18import sys
19import threading
20import traceback
21
22from asyncio import compat
23
24
25def _get_function_source(func):
26    if compat.PY34:
27        func = inspect.unwrap(func)
28    elif hasattr(func, '__wrapped__'):
29        func = func.__wrapped__
30    if inspect.isfunction(func):
31        code = func.__code__
32        return (code.co_filename, code.co_firstlineno)
33    if isinstance(func, functools.partial):
34        return _get_function_source(func.func)
35    if compat.PY34 and isinstance(func, functools.partialmethod):
36        return _get_function_source(func.func)
37    return None
38
39
40def _format_args_and_kwargs(args, kwargs):
41    """Format function arguments and keyword arguments.
42
43    Special case for a single parameter: ('hello',) is formatted as ('hello').
44    """
45    # use reprlib to limit the length of the output
46    items = []
47    if args:
48        items.extend(reprlib.repr(arg) for arg in args)
49    if kwargs:
50        items.extend('{}={}'.format(k, reprlib.repr(v))
51                     for k, v in kwargs.items())
52    return '(' + ', '.join(items) + ')'
53
54
55def _format_callback(func, args, kwargs, suffix=''):
56    if isinstance(func, functools.partial):
57        suffix = _format_args_and_kwargs(args, kwargs) + suffix
58        return _format_callback(func.func, func.args, func.keywords, suffix)
59
60    if hasattr(func, '__qualname__'):
61        func_repr = getattr(func, '__qualname__')
62    elif hasattr(func, '__name__'):
63        func_repr = getattr(func, '__name__')
64    else:
65        func_repr = repr(func)
66
67    func_repr += _format_args_and_kwargs(args, kwargs)
68    if suffix:
69        func_repr += suffix
70    return func_repr
71
72def _format_callback_source(func, args):
73    func_repr = _format_callback(func, args, None)
74    source = _get_function_source(func)
75    if source:
76        func_repr += ' at %s:%s' % source
77    return func_repr
78
79
80class Handle:
81    """Object returned by callback registration methods."""
82
83    __slots__ = ('_callback', '_args', '_cancelled', '_loop',
84                 '_source_traceback', '_repr', '__weakref__')
85
86    def __init__(self, callback, args, loop):
87        self._loop = loop
88        self._callback = callback
89        self._args = args
90        self._cancelled = False
91        self._repr = None
92        if self._loop.get_debug():
93            self._source_traceback = traceback.extract_stack(sys._getframe(1))
94        else:
95            self._source_traceback = None
96
97    def _repr_info(self):
98        info = [self.__class__.__name__]
99        if self._cancelled:
100            info.append('cancelled')
101        if self._callback is not None:
102            info.append(_format_callback_source(self._callback, self._args))
103        if self._source_traceback:
104            frame = self._source_traceback[-1]
105            info.append('created at %s:%s' % (frame[0], frame[1]))
106        return info
107
108    def __repr__(self):
109        if self._repr is not None:
110            return self._repr
111        info = self._repr_info()
112        return '<%s>' % ' '.join(info)
113
114    def cancel(self):
115        if not self._cancelled:
116            self._cancelled = True
117            if self._loop.get_debug():
118                # Keep a representation in debug mode to keep callback and
119                # parameters. For example, to log the warning
120                # "Executing <Handle...> took 2.5 second"
121                self._repr = repr(self)
122            self._callback = None
123            self._args = None
124
125    def _run(self):
126        try:
127            self._callback(*self._args)
128        except Exception as exc:
129            cb = _format_callback_source(self._callback, self._args)
130            msg = 'Exception in callback {}'.format(cb)
131            context = {
132                'message': msg,
133                'exception': exc,
134                'handle': self,
135            }
136            if self._source_traceback:
137                context['source_traceback'] = self._source_traceback
138            self._loop.call_exception_handler(context)
139        self = None  # Needed to break cycles when an exception occurs.
140
141
142class TimerHandle(Handle):
143    """Object returned by timed callback registration methods."""
144
145    __slots__ = ['_scheduled', '_when']
146
147    def __init__(self, when, callback, args, loop):
148        assert when is not None
149        super().__init__(callback, args, loop)
150        if self._source_traceback:
151            del self._source_traceback[-1]
152        self._when = when
153        self._scheduled = False
154
155    def _repr_info(self):
156        info = super()._repr_info()
157        pos = 2 if self._cancelled else 1
158        info.insert(pos, 'when=%s' % self._when)
159        return info
160
161    def __hash__(self):
162        return hash(self._when)
163
164    def __lt__(self, other):
165        return self._when < other._when
166
167    def __le__(self, other):
168        if self._when < other._when:
169            return True
170        return self.__eq__(other)
171
172    def __gt__(self, other):
173        return self._when > other._when
174
175    def __ge__(self, other):
176        if self._when > other._when:
177            return True
178        return self.__eq__(other)
179
180    def __eq__(self, other):
181        if isinstance(other, TimerHandle):
182            return (self._when == other._when and
183                    self._callback == other._callback and
184                    self._args == other._args and
185                    self._cancelled == other._cancelled)
186        return NotImplemented
187
188    def __ne__(self, other):
189        equal = self.__eq__(other)
190        return NotImplemented if equal is NotImplemented else not equal
191
192    def cancel(self):
193        if not self._cancelled:
194            self._loop._timer_handle_cancelled(self)
195        super().cancel()
196
197
198class AbstractServer:
199    """Abstract server returned by create_server()."""
200
201    def close(self):
202        """Stop serving.  This leaves existing connections open."""
203        return NotImplemented
204
205    def wait_closed(self):
206        """Coroutine to wait until service is closed."""
207        return NotImplemented
208
209
210class AbstractEventLoop:
211    """Abstract event loop."""
212
213    # Running and stopping the event loop.
214
215    def run_forever(self):
216        """Run the event loop until stop() is called."""
217        raise NotImplementedError
218
219    def run_until_complete(self, future):
220        """Run the event loop until a Future is done.
221
222        Return the Future's result, or raise its exception.
223        """
224        raise NotImplementedError
225
226    def stop(self):
227        """Stop the event loop as soon as reasonable.
228
229        Exactly how soon that is may depend on the implementation, but
230        no more I/O callbacks should be scheduled.
231        """
232        raise NotImplementedError
233
234    def is_running(self):
235        """Return whether the event loop is currently running."""
236        raise NotImplementedError
237
238    def is_closed(self):
239        """Returns True if the event loop was closed."""
240        raise NotImplementedError
241
242    def close(self):
243        """Close the loop.
244
245        The loop should not be running.
246
247        This is idempotent and irreversible.
248
249        No other methods should be called after this one.
250        """
251        raise NotImplementedError
252
253    def shutdown_asyncgens(self):
254        """Shutdown all active asynchronous generators."""
255        raise NotImplementedError
256
257    # Methods scheduling callbacks.  All these return Handles.
258
259    def _timer_handle_cancelled(self, handle):
260        """Notification that a TimerHandle has been cancelled."""
261        raise NotImplementedError
262
263    def call_soon(self, callback, *args):
264        return self.call_later(0, callback, *args)
265
266    def call_later(self, delay, callback, *args):
267        raise NotImplementedError
268
269    def call_at(self, when, callback, *args):
270        raise NotImplementedError
271
272    def time(self):
273        raise NotImplementedError
274
275    def create_future(self):
276        raise NotImplementedError
277
278    # Method scheduling a coroutine object: create a task.
279
280    def create_task(self, coro):
281        raise NotImplementedError
282
283    # Methods for interacting with threads.
284
285    def call_soon_threadsafe(self, callback, *args):
286        raise NotImplementedError
287
288    def run_in_executor(self, executor, func, *args):
289        raise NotImplementedError
290
291    def set_default_executor(self, executor):
292        raise NotImplementedError
293
294    # Network I/O methods returning Futures.
295
296    def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
297        raise NotImplementedError
298
299    def getnameinfo(self, sockaddr, flags=0):
300        raise NotImplementedError
301
302    def create_connection(self, protocol_factory, host=None, port=None, *,
303                          ssl=None, family=0, proto=0, flags=0, sock=None,
304                          local_addr=None, server_hostname=None):
305        raise NotImplementedError
306
307    def create_server(self, protocol_factory, host=None, port=None, *,
308                      family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
309                      sock=None, backlog=100, ssl=None, reuse_address=None,
310                      reuse_port=None):
311        """A coroutine which creates a TCP server bound to host and port.
312
313        The return value is a Server object which can be used to stop
314        the service.
315
316        If host is an empty string or None all interfaces are assumed
317        and a list of multiple sockets will be returned (most likely
318        one for IPv4 and another one for IPv6). The host parameter can also be a
319        sequence (e.g. list) of hosts to bind to.
320
321        family can be set to either AF_INET or AF_INET6 to force the
322        socket to use IPv4 or IPv6. If not set it will be determined
323        from host (defaults to AF_UNSPEC).
324
325        flags is a bitmask for getaddrinfo().
326
327        sock can optionally be specified in order to use a preexisting
328        socket object.
329
330        backlog is the maximum number of queued connections passed to
331        listen() (defaults to 100).
332
333        ssl can be set to an SSLContext to enable SSL over the
334        accepted connections.
335
336        reuse_address tells the kernel to reuse a local socket in
337        TIME_WAIT state, without waiting for its natural timeout to
338        expire. If not specified will automatically be set to True on
339        UNIX.
340
341        reuse_port tells the kernel to allow this endpoint to be bound to
342        the same port as other existing endpoints are bound to, so long as
343        they all set this flag when being created. This option is not
344        supported on Windows.
345        """
346        raise NotImplementedError
347
348    def create_unix_connection(self, protocol_factory, path, *,
349                               ssl=None, sock=None,
350                               server_hostname=None):
351        raise NotImplementedError
352
353    def create_unix_server(self, protocol_factory, path, *,
354                           sock=None, backlog=100, ssl=None):
355        """A coroutine which creates a UNIX Domain Socket server.
356
357        The return value is a Server object, which can be used to stop
358        the service.
359
360        path is a str, representing a file systsem path to bind the
361        server socket to.
362
363        sock can optionally be specified in order to use a preexisting
364        socket object.
365
366        backlog is the maximum number of queued connections passed to
367        listen() (defaults to 100).
368
369        ssl can be set to an SSLContext to enable SSL over the
370        accepted connections.
371        """
372        raise NotImplementedError
373
374    def create_datagram_endpoint(self, protocol_factory,
375                                 local_addr=None, remote_addr=None, *,
376                                 family=0, proto=0, flags=0,
377                                 reuse_address=None, reuse_port=None,
378                                 allow_broadcast=None, sock=None):
379        """A coroutine which creates a datagram endpoint.
380
381        This method will try to establish the endpoint in the background.
382        When successful, the coroutine returns a (transport, protocol) pair.
383
384        protocol_factory must be a callable returning a protocol instance.
385
386        socket family AF_INET or socket.AF_INET6 depending on host (or
387        family if specified), socket type SOCK_DGRAM.
388
389        reuse_address tells the kernel to reuse a local socket in
390        TIME_WAIT state, without waiting for its natural timeout to
391        expire. If not specified it will automatically be set to True on
392        UNIX.
393
394        reuse_port tells the kernel to allow this endpoint to be bound to
395        the same port as other existing endpoints are bound to, so long as
396        they all set this flag when being created. This option is not
397        supported on Windows and some UNIX's. If the
398        :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
399        capability is unsupported.
400
401        allow_broadcast tells the kernel to allow this endpoint to send
402        messages to the broadcast address.
403
404        sock can optionally be specified in order to use a preexisting
405        socket object.
406        """
407        raise NotImplementedError
408
409    # Pipes and subprocesses.
410
411    def connect_read_pipe(self, protocol_factory, pipe):
412        """Register read pipe in event loop. Set the pipe to non-blocking mode.
413
414        protocol_factory should instantiate object with Protocol interface.
415        pipe is a file-like object.
416        Return pair (transport, protocol), where transport supports the
417        ReadTransport interface."""
418        # The reason to accept file-like object instead of just file descriptor
419        # is: we need to own pipe and close it at transport finishing
420        # Can got complicated errors if pass f.fileno(),
421        # close fd in pipe transport then close f and vise versa.
422        raise NotImplementedError
423
424    def connect_write_pipe(self, protocol_factory, pipe):
425        """Register write pipe in event loop.
426
427        protocol_factory should instantiate object with BaseProtocol interface.
428        Pipe is file-like object already switched to nonblocking.
429        Return pair (transport, protocol), where transport support
430        WriteTransport interface."""
431        # The reason to accept file-like object instead of just file descriptor
432        # is: we need to own pipe and close it at transport finishing
433        # Can got complicated errors if pass f.fileno(),
434        # close fd in pipe transport then close f and vise versa.
435        raise NotImplementedError
436
437    def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
438                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
439                         **kwargs):
440        raise NotImplementedError
441
442    def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
443                        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
444                        **kwargs):
445        raise NotImplementedError
446
447    # Ready-based callback registration methods.
448    # The add_*() methods return None.
449    # The remove_*() methods return True if something was removed,
450    # False if there was nothing to delete.
451
452    def add_reader(self, fd, callback, *args):
453        raise NotImplementedError
454
455    def remove_reader(self, fd):
456        raise NotImplementedError
457
458    def add_writer(self, fd, callback, *args):
459        raise NotImplementedError
460
461    def remove_writer(self, fd):
462        raise NotImplementedError
463
464    # Completion based I/O methods returning Futures.
465
466    def sock_recv(self, sock, nbytes):
467        raise NotImplementedError
468
469    def sock_sendall(self, sock, data):
470        raise NotImplementedError
471
472    def sock_connect(self, sock, address):
473        raise NotImplementedError
474
475    def sock_accept(self, sock):
476        raise NotImplementedError
477
478    # Signal handling.
479
480    def add_signal_handler(self, sig, callback, *args):
481        raise NotImplementedError
482
483    def remove_signal_handler(self, sig):
484        raise NotImplementedError
485
486    # Task factory.
487
488    def set_task_factory(self, factory):
489        raise NotImplementedError
490
491    def get_task_factory(self):
492        raise NotImplementedError
493
494    # Error handlers.
495
496    def get_exception_handler(self):
497        raise NotImplementedError
498
499    def set_exception_handler(self, handler):
500        raise NotImplementedError
501
502    def default_exception_handler(self, context):
503        raise NotImplementedError
504
505    def call_exception_handler(self, context):
506        raise NotImplementedError
507
508    # Debug flag management.
509
510    def get_debug(self):
511        raise NotImplementedError
512
513    def set_debug(self, enabled):
514        raise NotImplementedError
515
516
517class AbstractEventLoopPolicy:
518    """Abstract policy for accessing the event loop."""
519
520    def get_event_loop(self):
521        """Get the event loop for the current context.
522
523        Returns an event loop object implementing the BaseEventLoop interface,
524        or raises an exception in case no event loop has been set for the
525        current context and the current policy does not specify to create one.
526
527        It should never return None."""
528        raise NotImplementedError
529
530    def set_event_loop(self, loop):
531        """Set the event loop for the current context to loop."""
532        raise NotImplementedError
533
534    def new_event_loop(self):
535        """Create and return a new event loop object according to this
536        policy's rules. If there's need to set this loop as the event loop for
537        the current context, set_event_loop must be called explicitly."""
538        raise NotImplementedError
539
540    # Child processes handling (Unix only).
541
542    def get_child_watcher(self):
543        "Get the watcher for child processes."
544        raise NotImplementedError
545
546    def set_child_watcher(self, watcher):
547        """Set the watcher for child processes."""
548        raise NotImplementedError
549
550
551class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
552    """Default policy implementation for accessing the event loop.
553
554    In this policy, each thread has its own event loop.  However, we
555    only automatically create an event loop by default for the main
556    thread; other threads by default have no event loop.
557
558    Other policies may have different rules (e.g. a single global
559    event loop, or automatically creating an event loop per thread, or
560    using some other notion of context to which an event loop is
561    associated).
562    """
563
564    _loop_factory = None
565
566    class _Local(threading.local):
567        _loop = None
568        _set_called = False
569
570    def __init__(self):
571        self._local = self._Local()
572
573    def get_event_loop(self):
574        """Get the event loop.
575
576        This may be None or an instance of EventLoop.
577        """
578        if (self._local._loop is None and
579            not self._local._set_called and
580            isinstance(threading.current_thread(), threading._MainThread)):
581            self.set_event_loop(self.new_event_loop())
582        if self._local._loop is None:
583            raise RuntimeError('There is no current event loop in thread %r.'
584                               % threading.current_thread().name)
585        return self._local._loop
586
587    def set_event_loop(self, loop):
588        """Set the event loop."""
589        self._local._set_called = True
590        assert loop is None or isinstance(loop, AbstractEventLoop)
591        self._local._loop = loop
592
593    def new_event_loop(self):
594        """Create a new event loop.
595
596        You must call set_event_loop() to make this the current event
597        loop.
598        """
599        return self._loop_factory()
600
601
602# Event loop policy.  The policy itself is always global, even if the
603# policy's rules say that there is an event loop per thread (or other
604# notion of context).  The default policy is installed by the first
605# call to get_event_loop_policy().
606_event_loop_policy = None
607
608# Lock for protecting the on-the-fly creation of the event loop policy.
609_lock = threading.Lock()
610
611
612# A TLS for the running event loop, used by _get_running_loop.
613class _RunningLoop(threading.local):
614    _loop = None
615    _pid = None
616
617
618_running_loop = _RunningLoop()
619
620
621def _get_running_loop():
622    """Return the running event loop or None.
623
624    This is a low-level function intended to be used by event loops.
625    This function is thread-specific.
626    """
627    running_loop = _running_loop._loop
628    if running_loop is not None and _running_loop._pid == os.getpid():
629        return running_loop
630
631
632def _set_running_loop(loop):
633    """Set the running event loop.
634
635    This is a low-level function intended to be used by event loops.
636    This function is thread-specific.
637    """
638    _running_loop._pid = os.getpid()
639    _running_loop._loop = loop
640
641
642def _init_event_loop_policy():
643    global _event_loop_policy
644    with _lock:
645        if _event_loop_policy is None:  # pragma: no branch
646            from . import DefaultEventLoopPolicy
647            _event_loop_policy = DefaultEventLoopPolicy()
648
649
650def get_event_loop_policy():
651    """Get the current event loop policy."""
652    if _event_loop_policy is None:
653        _init_event_loop_policy()
654    return _event_loop_policy
655
656
657def set_event_loop_policy(policy):
658    """Set the current event loop policy.
659
660    If policy is None, the default policy is restored."""
661    global _event_loop_policy
662    assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
663    _event_loop_policy = policy
664
665
666def get_event_loop():
667    """Return an asyncio event loop.
668
669    When called from a coroutine or a callback (e.g. scheduled with call_soon
670    or similar API), this function will always return the running event loop.
671
672    If there is no running event loop set, the function will return
673    the result of `get_event_loop_policy().get_event_loop()` call.
674    """
675    current_loop = _get_running_loop()
676    if current_loop is not None:
677        return current_loop
678    return get_event_loop_policy().get_event_loop()
679
680
681def set_event_loop(loop):
682    """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
683    get_event_loop_policy().set_event_loop(loop)
684
685
686def new_event_loop():
687    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
688    return get_event_loop_policy().new_event_loop()
689
690
691def get_child_watcher():
692    """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
693    return get_event_loop_policy().get_child_watcher()
694
695
696def set_child_watcher(watcher):
697    """Equivalent to calling
698    get_event_loop_policy().set_child_watcher(watcher)."""
699    return get_event_loop_policy().set_child_watcher(watcher)
700