• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = ['Task',
4           'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
5           'wait', 'wait_for', 'as_completed', 'sleep', 'async',
6           'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
7           ]
8
9import concurrent.futures
10import functools
11import inspect
12import warnings
13import weakref
14
15from . import base_tasks
16from . import compat
17from . import coroutines
18from . import events
19from . import futures
20from .coroutines import coroutine
21
22
23class Task(futures.Future):
24    """A coroutine wrapped in a Future."""
25
26    # An important invariant maintained while a Task not done:
27    #
28    # - Either _fut_waiter is None, and _step() is scheduled;
29    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
30    #
31    # The only transition from the latter to the former is through
32    # _wakeup().  When _fut_waiter is not None, one of its callbacks
33    # must be _wakeup().
34
35    # Weak set containing all tasks alive.
36    _all_tasks = weakref.WeakSet()
37
38    # Dictionary containing tasks that are currently active in
39    # all running event loops.  {EventLoop: Task}
40    _current_tasks = {}
41
42    # If False, don't log a message if the task is destroyed whereas its
43    # status is still pending
44    _log_destroy_pending = True
45
46    @classmethod
47    def current_task(cls, loop=None):
48        """Return the currently running task in an event loop or None.
49
50        By default the current task for the current event loop is returned.
51
52        None is returned when called not in the context of a Task.
53        """
54        if loop is None:
55            loop = events.get_event_loop()
56        return cls._current_tasks.get(loop)
57
58    @classmethod
59    def all_tasks(cls, loop=None):
60        """Return a set of all tasks for an event loop.
61
62        By default all tasks for the current event loop are returned.
63        """
64        if loop is None:
65            loop = events.get_event_loop()
66        return {t for t in cls._all_tasks if t._loop is loop}
67
68    def __init__(self, coro, *, loop=None):
69        assert coroutines.iscoroutine(coro), repr(coro)
70        super().__init__(loop=loop)
71        if self._source_traceback:
72            del self._source_traceback[-1]
73        self._coro = coro
74        self._fut_waiter = None
75        self._must_cancel = False
76        self._loop.call_soon(self._step)
77        self.__class__._all_tasks.add(self)
78
79    # On Python 3.3 or older, objects with a destructor that are part of a
80    # reference cycle are never destroyed. That's not the case any more on
81    # Python 3.4 thanks to the PEP 442.
82    if compat.PY34:
83        def __del__(self):
84            if self._state == futures._PENDING and self._log_destroy_pending:
85                context = {
86                    'task': self,
87                    'message': 'Task was destroyed but it is pending!',
88                }
89                if self._source_traceback:
90                    context['source_traceback'] = self._source_traceback
91                self._loop.call_exception_handler(context)
92            futures.Future.__del__(self)
93
94    def _repr_info(self):
95        return base_tasks._task_repr_info(self)
96
97    def get_stack(self, *, limit=None):
98        """Return the list of stack frames for this task's coroutine.
99
100        If the coroutine is not done, this returns the stack where it is
101        suspended.  If the coroutine has completed successfully or was
102        cancelled, this returns an empty list.  If the coroutine was
103        terminated by an exception, this returns the list of traceback
104        frames.
105
106        The frames are always ordered from oldest to newest.
107
108        The optional limit gives the maximum number of frames to
109        return; by default all available frames are returned.  Its
110        meaning differs depending on whether a stack or a traceback is
111        returned: the newest frames of a stack are returned, but the
112        oldest frames of a traceback are returned.  (This matches the
113        behavior of the traceback module.)
114
115        For reasons beyond our control, only one stack frame is
116        returned for a suspended coroutine.
117        """
118        return base_tasks._task_get_stack(self, limit)
119
120    def print_stack(self, *, limit=None, file=None):
121        """Print the stack or traceback for this task's coroutine.
122
123        This produces output similar to that of the traceback module,
124        for the frames retrieved by get_stack().  The limit argument
125        is passed to get_stack().  The file argument is an I/O stream
126        to which the output is written; by default output is written
127        to sys.stderr.
128        """
129        return base_tasks._task_print_stack(self, limit, file)
130
131    def cancel(self):
132        """Request that this task cancel itself.
133
134        This arranges for a CancelledError to be thrown into the
135        wrapped coroutine on the next cycle through the event loop.
136        The coroutine then has a chance to clean up or even deny
137        the request using try/except/finally.
138
139        Unlike Future.cancel, this does not guarantee that the
140        task will be cancelled: the exception might be caught and
141        acted upon, delaying cancellation of the task or preventing
142        cancellation completely.  The task may also return a value or
143        raise a different exception.
144
145        Immediately after this method is called, Task.cancelled() will
146        not return True (unless the task was already cancelled).  A
147        task will be marked as cancelled when the wrapped coroutine
148        terminates with a CancelledError exception (even if cancel()
149        was not called).
150        """
151        if self.done():
152            return False
153        if self._fut_waiter is not None:
154            if self._fut_waiter.cancel():
155                # Leave self._fut_waiter; it may be a Task that
156                # catches and ignores the cancellation so we may have
157                # to cancel it again later.
158                return True
159        # It must be the case that self._step is already scheduled.
160        self._must_cancel = True
161        return True
162
163    def _step(self, exc=None):
164        assert not self.done(), \
165            '_step(): already done: {!r}, {!r}'.format(self, exc)
166        if self._must_cancel:
167            if not isinstance(exc, futures.CancelledError):
168                exc = futures.CancelledError()
169            self._must_cancel = False
170        coro = self._coro
171        self._fut_waiter = None
172
173        self.__class__._current_tasks[self._loop] = self
174        # Call either coro.throw(exc) or coro.send(None).
175        try:
176            if exc is None:
177                # We use the `send` method directly, because coroutines
178                # don't have `__iter__` and `__next__` methods.
179                result = coro.send(None)
180            else:
181                result = coro.throw(exc)
182        except StopIteration as exc:
183            self.set_result(exc.value)
184        except futures.CancelledError:
185            super().cancel()  # I.e., Future.cancel(self).
186        except Exception as exc:
187            self.set_exception(exc)
188        except BaseException as exc:
189            self.set_exception(exc)
190            raise
191        else:
192            blocking = getattr(result, '_asyncio_future_blocking', None)
193            if blocking is not None:
194                # Yielded Future must come from Future.__iter__().
195                if result._loop is not self._loop:
196                    self._loop.call_soon(
197                        self._step,
198                        RuntimeError(
199                            'Task {!r} got Future {!r} attached to a '
200                            'different loop'.format(self, result)))
201                elif blocking:
202                    if result is self:
203                        self._loop.call_soon(
204                            self._step,
205                            RuntimeError(
206                                'Task cannot await on itself: {!r}'.format(
207                                    self)))
208                    else:
209                        result._asyncio_future_blocking = False
210                        result.add_done_callback(self._wakeup)
211                        self._fut_waiter = result
212                        if self._must_cancel:
213                            if self._fut_waiter.cancel():
214                                self._must_cancel = False
215                else:
216                    self._loop.call_soon(
217                        self._step,
218                        RuntimeError(
219                            'yield was used instead of yield from '
220                            'in task {!r} with {!r}'.format(self, result)))
221            elif result is None:
222                # Bare yield relinquishes control for one event loop iteration.
223                self._loop.call_soon(self._step)
224            elif inspect.isgenerator(result):
225                # Yielding a generator is just wrong.
226                self._loop.call_soon(
227                    self._step,
228                    RuntimeError(
229                        'yield was used instead of yield from for '
230                        'generator in task {!r} with {}'.format(
231                            self, result)))
232            else:
233                # Yielding something else is an error.
234                self._loop.call_soon(
235                    self._step,
236                    RuntimeError(
237                        'Task got bad yield: {!r}'.format(result)))
238        finally:
239            self.__class__._current_tasks.pop(self._loop)
240            self = None  # Needed to break cycles when an exception occurs.
241
242    def _wakeup(self, future):
243        try:
244            future.result()
245        except Exception as exc:
246            # This may also be a cancellation.
247            self._step(exc)
248        else:
249            # Don't pass the value of `future.result()` explicitly,
250            # as `Future.__iter__` and `Future.__await__` don't need it.
251            # If we call `_step(value, None)` instead of `_step()`,
252            # Python eval loop would use `.send(value)` method call,
253            # instead of `__next__()`, which is slower for futures
254            # that return non-generator iterators from their `__iter__`.
255            self._step()
256        self = None  # Needed to break cycles when an exception occurs.
257
258
259_PyTask = Task
260
261
262try:
263    import _asyncio
264except ImportError:
265    pass
266else:
267    # _CTask is needed for tests.
268    Task = _CTask = _asyncio.Task
269
270
271# wait() and as_completed() similar to those in PEP 3148.
272
273FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
274FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
275ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
276
277
278@coroutine
279def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
280    """Wait for the Futures and coroutines given by fs to complete.
281
282    The sequence futures must not be empty.
283
284    Coroutines will be wrapped in Tasks.
285
286    Returns two sets of Future: (done, pending).
287
288    Usage:
289
290        done, pending = yield from asyncio.wait(fs)
291
292    Note: This does not raise TimeoutError! Futures that aren't done
293    when the timeout occurs are returned in the second set.
294    """
295    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
296        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
297    if not fs:
298        raise ValueError('Set of coroutines/Futures is empty.')
299    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
300        raise ValueError('Invalid return_when value: {}'.format(return_when))
301
302    if loop is None:
303        loop = events.get_event_loop()
304
305    fs = {ensure_future(f, loop=loop) for f in set(fs)}
306
307    return (yield from _wait(fs, timeout, return_when, loop))
308
309
310def _release_waiter(waiter, *args):
311    if not waiter.done():
312        waiter.set_result(None)
313
314
315@coroutine
316def wait_for(fut, timeout, *, loop=None):
317    """Wait for the single Future or coroutine to complete, with timeout.
318
319    Coroutine will be wrapped in Task.
320
321    Returns result of the Future or coroutine.  When a timeout occurs,
322    it cancels the task and raises TimeoutError.  To avoid the task
323    cancellation, wrap it in shield().
324
325    If the wait is cancelled, the task is also cancelled.
326
327    This function is a coroutine.
328    """
329    if loop is None:
330        loop = events.get_event_loop()
331
332    if timeout is None:
333        return (yield from fut)
334
335    waiter = loop.create_future()
336    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
337    cb = functools.partial(_release_waiter, waiter)
338
339    fut = ensure_future(fut, loop=loop)
340    fut.add_done_callback(cb)
341
342    try:
343        # wait until the future completes or the timeout
344        try:
345            yield from waiter
346        except futures.CancelledError:
347            fut.remove_done_callback(cb)
348            fut.cancel()
349            raise
350
351        if fut.done():
352            return fut.result()
353        else:
354            fut.remove_done_callback(cb)
355            fut.cancel()
356            raise futures.TimeoutError()
357    finally:
358        timeout_handle.cancel()
359
360
361@coroutine
362def _wait(fs, timeout, return_when, loop):
363    """Internal helper for wait() and wait_for().
364
365    The fs argument must be a collection of Futures.
366    """
367    assert fs, 'Set of Futures is empty.'
368    waiter = loop.create_future()
369    timeout_handle = None
370    if timeout is not None:
371        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
372    counter = len(fs)
373
374    def _on_completion(f):
375        nonlocal counter
376        counter -= 1
377        if (counter <= 0 or
378            return_when == FIRST_COMPLETED or
379            return_when == FIRST_EXCEPTION and (not f.cancelled() and
380                                                f.exception() is not None)):
381            if timeout_handle is not None:
382                timeout_handle.cancel()
383            if not waiter.done():
384                waiter.set_result(None)
385
386    for f in fs:
387        f.add_done_callback(_on_completion)
388
389    try:
390        yield from waiter
391    finally:
392        if timeout_handle is not None:
393            timeout_handle.cancel()
394
395    done, pending = set(), set()
396    for f in fs:
397        f.remove_done_callback(_on_completion)
398        if f.done():
399            done.add(f)
400        else:
401            pending.add(f)
402    return done, pending
403
404
405# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
406def as_completed(fs, *, loop=None, timeout=None):
407    """Return an iterator whose values are coroutines.
408
409    When waiting for the yielded coroutines you'll get the results (or
410    exceptions!) of the original Futures (or coroutines), in the order
411    in which and as soon as they complete.
412
413    This differs from PEP 3148; the proper way to use this is:
414
415        for f in as_completed(fs):
416            result = yield from f  # The 'yield from' may raise.
417            # Use result.
418
419    If a timeout is specified, the 'yield from' will raise
420    TimeoutError when the timeout occurs before all Futures are done.
421
422    Note: The futures 'f' are not necessarily members of fs.
423    """
424    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
425        raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
426    loop = loop if loop is not None else events.get_event_loop()
427    todo = {ensure_future(f, loop=loop) for f in set(fs)}
428    from .queues import Queue  # Import here to avoid circular import problem.
429    done = Queue(loop=loop)
430    timeout_handle = None
431
432    def _on_timeout():
433        for f in todo:
434            f.remove_done_callback(_on_completion)
435            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
436        todo.clear()  # Can't do todo.remove(f) in the loop.
437
438    def _on_completion(f):
439        if not todo:
440            return  # _on_timeout() was here first.
441        todo.remove(f)
442        done.put_nowait(f)
443        if not todo and timeout_handle is not None:
444            timeout_handle.cancel()
445
446    @coroutine
447    def _wait_for_one():
448        f = yield from done.get()
449        if f is None:
450            # Dummy value from _on_timeout().
451            raise futures.TimeoutError
452        return f.result()  # May raise f.exception().
453
454    for f in todo:
455        f.add_done_callback(_on_completion)
456    if todo and timeout is not None:
457        timeout_handle = loop.call_later(timeout, _on_timeout)
458    for _ in range(len(todo)):
459        yield _wait_for_one()
460
461
462@coroutine
463def sleep(delay, result=None, *, loop=None):
464    """Coroutine that completes after a given time (in seconds)."""
465    if delay == 0:
466        yield
467        return result
468
469    if loop is None:
470        loop = events.get_event_loop()
471    future = loop.create_future()
472    h = future._loop.call_later(delay,
473                                futures._set_result_unless_cancelled,
474                                future, result)
475    try:
476        return (yield from future)
477    finally:
478        h.cancel()
479
480
481def async_(coro_or_future, *, loop=None):
482    """Wrap a coroutine in a future.
483
484    If the argument is a Future, it is returned directly.
485
486    This function is deprecated in 3.5. Use asyncio.ensure_future() instead.
487    """
488
489    warnings.warn("asyncio.async() function is deprecated, use ensure_future()",
490                  DeprecationWarning,
491                  stacklevel=2)
492
493    return ensure_future(coro_or_future, loop=loop)
494
495# Silence DeprecationWarning:
496globals()['async'] = async_
497async_.__name__ = 'async'
498del async_
499
500
501def ensure_future(coro_or_future, *, loop=None):
502    """Wrap a coroutine or an awaitable in a future.
503
504    If the argument is a Future, it is returned directly.
505    """
506    if futures.isfuture(coro_or_future):
507        if loop is not None and loop is not coro_or_future._loop:
508            raise ValueError('loop argument must agree with Future')
509        return coro_or_future
510    elif coroutines.iscoroutine(coro_or_future):
511        if loop is None:
512            loop = events.get_event_loop()
513        task = loop.create_task(coro_or_future)
514        if task._source_traceback:
515            del task._source_traceback[-1]
516        return task
517    elif compat.PY35 and inspect.isawaitable(coro_or_future):
518        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
519    else:
520        raise TypeError('A Future, a coroutine or an awaitable is required')
521
522
523@coroutine
524def _wrap_awaitable(awaitable):
525    """Helper for asyncio.ensure_future().
526
527    Wraps awaitable (an object with __await__) into a coroutine
528    that will later be wrapped in a Task by ensure_future().
529    """
530    return (yield from awaitable.__await__())
531
532
533class _GatheringFuture(futures.Future):
534    """Helper for gather().
535
536    This overrides cancel() to cancel all the children and act more
537    like Task.cancel(), which doesn't immediately mark itself as
538    cancelled.
539    """
540
541    def __init__(self, children, *, loop=None):
542        super().__init__(loop=loop)
543        self._children = children
544
545    def cancel(self):
546        if self.done():
547            return False
548        ret = False
549        for child in self._children:
550            if child.cancel():
551                ret = True
552        return ret
553
554
555def gather(*coros_or_futures, loop=None, return_exceptions=False):
556    """Return a future aggregating results from the given coroutines
557    or futures.
558
559    Coroutines will be wrapped in a future and scheduled in the event
560    loop. They will not necessarily be scheduled in the same order as
561    passed in.
562
563    All futures must share the same event loop.  If all the tasks are
564    done successfully, the returned future's result is the list of
565    results (in the order of the original sequence, not necessarily
566    the order of results arrival).  If *return_exceptions* is True,
567    exceptions in the tasks are treated the same as successful
568    results, and gathered in the result list; otherwise, the first
569    raised exception will be immediately propagated to the returned
570    future.
571
572    Cancellation: if the outer Future is cancelled, all children (that
573    have not completed yet) are also cancelled.  If any child is
574    cancelled, this is treated as if it raised CancelledError --
575    the outer Future is *not* cancelled in this case.  (This is to
576    prevent the cancellation of one child to cause other children to
577    be cancelled.)
578    """
579    if not coros_or_futures:
580        if loop is None:
581            loop = events.get_event_loop()
582        outer = loop.create_future()
583        outer.set_result([])
584        return outer
585
586    arg_to_fut = {}
587    for arg in set(coros_or_futures):
588        if not futures.isfuture(arg):
589            fut = ensure_future(arg, loop=loop)
590            if loop is None:
591                loop = fut._loop
592            # The caller cannot control this future, the "destroy pending task"
593            # warning should not be emitted.
594            fut._log_destroy_pending = False
595        else:
596            fut = arg
597            if loop is None:
598                loop = fut._loop
599            elif fut._loop is not loop:
600                raise ValueError("futures are tied to different event loops")
601        arg_to_fut[arg] = fut
602
603    children = [arg_to_fut[arg] for arg in coros_or_futures]
604    nchildren = len(children)
605    outer = _GatheringFuture(children, loop=loop)
606    nfinished = 0
607    results = [None] * nchildren
608
609    def _done_callback(i, fut):
610        nonlocal nfinished
611        if outer.done():
612            if not fut.cancelled():
613                # Mark exception retrieved.
614                fut.exception()
615            return
616
617        if fut.cancelled():
618            res = futures.CancelledError()
619            if not return_exceptions:
620                outer.set_exception(res)
621                return
622        elif fut._exception is not None:
623            res = fut.exception()  # Mark exception retrieved.
624            if not return_exceptions:
625                outer.set_exception(res)
626                return
627        else:
628            res = fut._result
629        results[i] = res
630        nfinished += 1
631        if nfinished == nchildren:
632            outer.set_result(results)
633
634    for i, fut in enumerate(children):
635        fut.add_done_callback(functools.partial(_done_callback, i))
636    return outer
637
638
639def shield(arg, *, loop=None):
640    """Wait for a future, shielding it from cancellation.
641
642    The statement
643
644        res = yield from shield(something())
645
646    is exactly equivalent to the statement
647
648        res = yield from something()
649
650    *except* that if the coroutine containing it is cancelled, the
651    task running in something() is not cancelled.  From the POV of
652    something(), the cancellation did not happen.  But its caller is
653    still cancelled, so the yield-from expression still raises
654    CancelledError.  Note: If something() is cancelled by other means
655    this will still cancel shield().
656
657    If you want to completely ignore cancellation (not recommended)
658    you can combine shield() with a try/except clause, as follows:
659
660        try:
661            res = yield from shield(something())
662        except CancelledError:
663            res = None
664    """
665    inner = ensure_future(arg, loop=loop)
666    if inner.done():
667        # Shortcut.
668        return inner
669    loop = inner._loop
670    outer = loop.create_future()
671
672    def _done_callback(inner):
673        if outer.cancelled():
674            if not inner.cancelled():
675                # Mark inner's result as retrieved.
676                inner.exception()
677            return
678
679        if inner.cancelled():
680            outer.cancel()
681        else:
682            exc = inner.exception()
683            if exc is not None:
684                outer.set_exception(exc)
685            else:
686                outer.set_result(inner.result())
687
688    inner.add_done_callback(_done_callback)
689    return outer
690
691
692def run_coroutine_threadsafe(coro, loop):
693    """Submit a coroutine object to a given event loop.
694
695    Return a concurrent.futures.Future to access the result.
696    """
697    if not coroutines.iscoroutine(coro):
698        raise TypeError('A coroutine object is required')
699    future = concurrent.futures.Future()
700
701    def callback():
702        try:
703            futures._chain_future(ensure_future(coro, loop=loop), future)
704        except Exception as exc:
705            if future.set_running_or_notify_cancel():
706                future.set_exception(exc)
707            raise
708
709    loop.call_soon_threadsafe(callback)
710    return future
711