• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35    """Return a currently executed task."""
36    if loop is None:
37        loop = events.get_running_loop()
38    return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42    """Return a set of all tasks for the loop."""
43    if loop is None:
44        loop = events.get_running_loop()
45    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46    # thread while we do so. Therefore we cast it to list prior to filtering. The list
47    # cast itself requires iteration, so we repeat it several times ignoring
48    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49    # details.
50    i = 0
51    while True:
52        try:
53            tasks = list(_all_tasks)
54        except RuntimeError:
55            i += 1
56            if i >= 1000:
57                raise
58        else:
59            break
60    return {t for t in tasks
61            if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65    # Different from "all_task()" by returning *all* Tasks, including
66    # the completed ones.  Used to implement deprecated "Tasks.all_task()"
67    # method.
68    if loop is None:
69        loop = events.get_event_loop()
70    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71    # thread while we do so. Therefore we cast it to list prior to filtering. The list
72    # cast itself requires iteration, so we repeat it several times ignoring
73    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74    # details.
75    i = 0
76    while True:
77        try:
78            tasks = list(_all_tasks)
79        except RuntimeError:
80            i += 1
81            if i >= 1000:
82                raise
83        else:
84            break
85    return {t for t in tasks if futures._get_loop(t) is loop}
86
87
88def _set_task_name(task, name):
89    if name is not None:
90        try:
91            set_name = task.set_name
92        except AttributeError:
93            pass
94        else:
95            set_name(name)
96
97
98class Task(futures._PyFuture):  # Inherit Python Task implementation
99                                # from a Python Future implementation.
100
101    """A coroutine wrapped in a Future."""
102
103    # An important invariant maintained while a Task not done:
104    #
105    # - Either _fut_waiter is None, and _step() is scheduled;
106    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107    #
108    # The only transition from the latter to the former is through
109    # _wakeup().  When _fut_waiter is not None, one of its callbacks
110    # must be _wakeup().
111
112    # If False, don't log a message if the task is destroyed whereas its
113    # status is still pending
114    _log_destroy_pending = True
115
116    def __init__(self, coro, *, loop=None, name=None):
117        super().__init__(loop=loop)
118        if self._source_traceback:
119            del self._source_traceback[-1]
120        if not coroutines.iscoroutine(coro):
121            # raise after Future.__init__(), attrs are required for __del__
122            # prevent logging for pending task in __del__
123            self._log_destroy_pending = False
124            raise TypeError(f"a coroutine was expected, got {coro!r}")
125
126        if name is None:
127            self._name = f'Task-{_task_name_counter()}'
128        else:
129            self._name = str(name)
130
131        self._must_cancel = False
132        self._fut_waiter = None
133        self._coro = coro
134        self._context = contextvars.copy_context()
135
136        self._loop.call_soon(self.__step, context=self._context)
137        _register_task(self)
138
139    def __del__(self):
140        if self._state == futures._PENDING and self._log_destroy_pending:
141            context = {
142                'task': self,
143                'message': 'Task was destroyed but it is pending!',
144            }
145            if self._source_traceback:
146                context['source_traceback'] = self._source_traceback
147            self._loop.call_exception_handler(context)
148        super().__del__()
149
150    def __class_getitem__(cls, type):
151        return cls
152
153    def _repr_info(self):
154        return base_tasks._task_repr_info(self)
155
156    def get_coro(self):
157        return self._coro
158
159    def get_name(self):
160        return self._name
161
162    def set_name(self, value):
163        self._name = str(value)
164
165    def set_result(self, result):
166        raise RuntimeError('Task does not support set_result operation')
167
168    def set_exception(self, exception):
169        raise RuntimeError('Task does not support set_exception operation')
170
171    def get_stack(self, *, limit=None):
172        """Return the list of stack frames for this task's coroutine.
173
174        If the coroutine is not done, this returns the stack where it is
175        suspended.  If the coroutine has completed successfully or was
176        cancelled, this returns an empty list.  If the coroutine was
177        terminated by an exception, this returns the list of traceback
178        frames.
179
180        The frames are always ordered from oldest to newest.
181
182        The optional limit gives the maximum number of frames to
183        return; by default all available frames are returned.  Its
184        meaning differs depending on whether a stack or a traceback is
185        returned: the newest frames of a stack are returned, but the
186        oldest frames of a traceback are returned.  (This matches the
187        behavior of the traceback module.)
188
189        For reasons beyond our control, only one stack frame is
190        returned for a suspended coroutine.
191        """
192        return base_tasks._task_get_stack(self, limit)
193
194    def print_stack(self, *, limit=None, file=None):
195        """Print the stack or traceback for this task's coroutine.
196
197        This produces output similar to that of the traceback module,
198        for the frames retrieved by get_stack().  The limit argument
199        is passed to get_stack().  The file argument is an I/O stream
200        to which the output is written; by default output is written
201        to sys.stderr.
202        """
203        return base_tasks._task_print_stack(self, limit, file)
204
205    def cancel(self, msg=None):
206        """Request that this task cancel itself.
207
208        This arranges for a CancelledError to be thrown into the
209        wrapped coroutine on the next cycle through the event loop.
210        The coroutine then has a chance to clean up or even deny
211        the request using try/except/finally.
212
213        Unlike Future.cancel, this does not guarantee that the
214        task will be cancelled: the exception might be caught and
215        acted upon, delaying cancellation of the task or preventing
216        cancellation completely.  The task may also return a value or
217        raise a different exception.
218
219        Immediately after this method is called, Task.cancelled() will
220        not return True (unless the task was already cancelled).  A
221        task will be marked as cancelled when the wrapped coroutine
222        terminates with a CancelledError exception (even if cancel()
223        was not called).
224        """
225        self._log_traceback = False
226        if self.done():
227            return False
228        if self._fut_waiter is not None:
229            if self._fut_waiter.cancel(msg=msg):
230                # Leave self._fut_waiter; it may be a Task that
231                # catches and ignores the cancellation so we may have
232                # to cancel it again later.
233                return True
234        # It must be the case that self.__step is already scheduled.
235        self._must_cancel = True
236        self._cancel_message = msg
237        return True
238
239    def __step(self, exc=None):
240        if self.done():
241            raise exceptions.InvalidStateError(
242                f'_step(): already done: {self!r}, {exc!r}')
243        if self._must_cancel:
244            if not isinstance(exc, exceptions.CancelledError):
245                exc = self._make_cancelled_error()
246            self._must_cancel = False
247        coro = self._coro
248        self._fut_waiter = None
249
250        _enter_task(self._loop, self)
251        # Call either coro.throw(exc) or coro.send(None).
252        try:
253            if exc is None:
254                # We use the `send` method directly, because coroutines
255                # don't have `__iter__` and `__next__` methods.
256                result = coro.send(None)
257            else:
258                result = coro.throw(exc)
259        except StopIteration as exc:
260            if self._must_cancel:
261                # Task is cancelled right before coro stops.
262                self._must_cancel = False
263                super().cancel(msg=self._cancel_message)
264            else:
265                super().set_result(exc.value)
266        except exceptions.CancelledError as exc:
267            # Save the original exception so we can chain it later.
268            self._cancelled_exc = exc
269            super().cancel()  # I.e., Future.cancel(self).
270        except (KeyboardInterrupt, SystemExit) as exc:
271            super().set_exception(exc)
272            raise
273        except BaseException as exc:
274            super().set_exception(exc)
275        else:
276            blocking = getattr(result, '_asyncio_future_blocking', None)
277            if blocking is not None:
278                # Yielded Future must come from Future.__iter__().
279                if futures._get_loop(result) is not self._loop:
280                    new_exc = RuntimeError(
281                        f'Task {self!r} got Future '
282                        f'{result!r} attached to a different loop')
283                    self._loop.call_soon(
284                        self.__step, new_exc, context=self._context)
285                elif blocking:
286                    if result is self:
287                        new_exc = RuntimeError(
288                            f'Task cannot await on itself: {self!r}')
289                        self._loop.call_soon(
290                            self.__step, new_exc, context=self._context)
291                    else:
292                        result._asyncio_future_blocking = False
293                        result.add_done_callback(
294                            self.__wakeup, context=self._context)
295                        self._fut_waiter = result
296                        if self._must_cancel:
297                            if self._fut_waiter.cancel(
298                                    msg=self._cancel_message):
299                                self._must_cancel = False
300                else:
301                    new_exc = RuntimeError(
302                        f'yield was used instead of yield from '
303                        f'in task {self!r} with {result!r}')
304                    self._loop.call_soon(
305                        self.__step, new_exc, context=self._context)
306
307            elif result is None:
308                # Bare yield relinquishes control for one event loop iteration.
309                self._loop.call_soon(self.__step, context=self._context)
310            elif inspect.isgenerator(result):
311                # Yielding a generator is just wrong.
312                new_exc = RuntimeError(
313                    f'yield was used instead of yield from for '
314                    f'generator in task {self!r} with {result!r}')
315                self._loop.call_soon(
316                    self.__step, new_exc, context=self._context)
317            else:
318                # Yielding something else is an error.
319                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
320                self._loop.call_soon(
321                    self.__step, new_exc, context=self._context)
322        finally:
323            _leave_task(self._loop, self)
324            self = None  # Needed to break cycles when an exception occurs.
325
326    def __wakeup(self, future):
327        try:
328            future.result()
329        except BaseException as exc:
330            # This may also be a cancellation.
331            self.__step(exc)
332        else:
333            # Don't pass the value of `future.result()` explicitly,
334            # as `Future.__iter__` and `Future.__await__` don't need it.
335            # If we call `_step(value, None)` instead of `_step()`,
336            # Python eval loop would use `.send(value)` method call,
337            # instead of `__next__()`, which is slower for futures
338            # that return non-generator iterators from their `__iter__`.
339            self.__step()
340        self = None  # Needed to break cycles when an exception occurs.
341
342
343_PyTask = Task
344
345
346try:
347    import _asyncio
348except ImportError:
349    pass
350else:
351    # _CTask is needed for tests.
352    Task = _CTask = _asyncio.Task
353
354
355def create_task(coro, *, name=None):
356    """Schedule the execution of a coroutine object in a spawn task.
357
358    Return a Task object.
359    """
360    loop = events.get_running_loop()
361    task = loop.create_task(coro)
362    _set_task_name(task, name)
363    return task
364
365
366# wait() and as_completed() similar to those in PEP 3148.
367
368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
371
372
373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
374    """Wait for the Futures and coroutines given by fs to complete.
375
376    The fs iterable must not be empty.
377
378    Coroutines will be wrapped in Tasks.
379
380    Returns two sets of Future: (done, pending).
381
382    Usage:
383
384        done, pending = await asyncio.wait(fs)
385
386    Note: This does not raise TimeoutError! Futures that aren't done
387    when the timeout occurs are returned in the second set.
388    """
389    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
390        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
391    if not fs:
392        raise ValueError('Set of coroutines/Futures is empty.')
393    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
394        raise ValueError(f'Invalid return_when value: {return_when}')
395
396    if loop is None:
397        loop = events.get_running_loop()
398    else:
399        warnings.warn("The loop argument is deprecated since Python 3.8, "
400                      "and scheduled for removal in Python 3.10.",
401                      DeprecationWarning, stacklevel=2)
402
403    fs = set(fs)
404
405    if any(coroutines.iscoroutine(f) for f in fs):
406        warnings.warn("The explicit passing of coroutine objects to "
407                      "asyncio.wait() is deprecated since Python 3.8, and "
408                      "scheduled for removal in Python 3.11.",
409                      DeprecationWarning, stacklevel=2)
410
411    fs = {ensure_future(f, loop=loop) for f in fs}
412
413    return await _wait(fs, timeout, return_when, loop)
414
415
416def _release_waiter(waiter, *args):
417    if not waiter.done():
418        waiter.set_result(None)
419
420
421async def wait_for(fut, timeout, *, loop=None):
422    """Wait for the single Future or coroutine to complete, with timeout.
423
424    Coroutine will be wrapped in Task.
425
426    Returns result of the Future or coroutine.  When a timeout occurs,
427    it cancels the task and raises TimeoutError.  To avoid the task
428    cancellation, wrap it in shield().
429
430    If the wait is cancelled, the task is also cancelled.
431
432    This function is a coroutine.
433    """
434    if loop is None:
435        loop = events.get_running_loop()
436    else:
437        warnings.warn("The loop argument is deprecated since Python 3.8, "
438                      "and scheduled for removal in Python 3.10.",
439                      DeprecationWarning, stacklevel=2)
440
441    if timeout is None:
442        return await fut
443
444    if timeout <= 0:
445        fut = ensure_future(fut, loop=loop)
446
447        if fut.done():
448            return fut.result()
449
450        await _cancel_and_wait(fut, loop=loop)
451        try:
452            fut.result()
453        except exceptions.CancelledError as exc:
454            raise exceptions.TimeoutError() from exc
455        else:
456            raise exceptions.TimeoutError()
457
458    waiter = loop.create_future()
459    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
460    cb = functools.partial(_release_waiter, waiter)
461
462    fut = ensure_future(fut, loop=loop)
463    fut.add_done_callback(cb)
464
465    try:
466        # wait until the future completes or the timeout
467        try:
468            await waiter
469        except exceptions.CancelledError:
470            if fut.done():
471                return fut.result()
472            else:
473                fut.remove_done_callback(cb)
474                fut.cancel()
475                raise
476
477        if fut.done():
478            return fut.result()
479        else:
480            fut.remove_done_callback(cb)
481            # We must ensure that the task is not running
482            # after wait_for() returns.
483            # See https://bugs.python.org/issue32751
484            await _cancel_and_wait(fut, loop=loop)
485            # In case task cancellation failed with some
486            # exception, we should re-raise it
487            # See https://bugs.python.org/issue40607
488            try:
489                fut.result()
490            except exceptions.CancelledError as exc:
491                raise exceptions.TimeoutError() from exc
492            else:
493                raise exceptions.TimeoutError()
494    finally:
495        timeout_handle.cancel()
496
497
498async def _wait(fs, timeout, return_when, loop):
499    """Internal helper for wait().
500
501    The fs argument must be a collection of Futures.
502    """
503    assert fs, 'Set of Futures is empty.'
504    waiter = loop.create_future()
505    timeout_handle = None
506    if timeout is not None:
507        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
508    counter = len(fs)
509
510    def _on_completion(f):
511        nonlocal counter
512        counter -= 1
513        if (counter <= 0 or
514            return_when == FIRST_COMPLETED or
515            return_when == FIRST_EXCEPTION and (not f.cancelled() and
516                                                f.exception() is not None)):
517            if timeout_handle is not None:
518                timeout_handle.cancel()
519            if not waiter.done():
520                waiter.set_result(None)
521
522    for f in fs:
523        f.add_done_callback(_on_completion)
524
525    try:
526        await waiter
527    finally:
528        if timeout_handle is not None:
529            timeout_handle.cancel()
530        for f in fs:
531            f.remove_done_callback(_on_completion)
532
533    done, pending = set(), set()
534    for f in fs:
535        if f.done():
536            done.add(f)
537        else:
538            pending.add(f)
539    return done, pending
540
541
542async def _cancel_and_wait(fut, loop):
543    """Cancel the *fut* future or task and wait until it completes."""
544
545    waiter = loop.create_future()
546    cb = functools.partial(_release_waiter, waiter)
547    fut.add_done_callback(cb)
548
549    try:
550        fut.cancel()
551        # We cannot wait on *fut* directly to make
552        # sure _cancel_and_wait itself is reliably cancellable.
553        await waiter
554    finally:
555        fut.remove_done_callback(cb)
556
557
558# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
559def as_completed(fs, *, loop=None, timeout=None):
560    """Return an iterator whose values are coroutines.
561
562    When waiting for the yielded coroutines you'll get the results (or
563    exceptions!) of the original Futures (or coroutines), in the order
564    in which and as soon as they complete.
565
566    This differs from PEP 3148; the proper way to use this is:
567
568        for f in as_completed(fs):
569            result = await f  # The 'await' may raise.
570            # Use result.
571
572    If a timeout is specified, the 'await' will raise
573    TimeoutError when the timeout occurs before all Futures are done.
574
575    Note: The futures 'f' are not necessarily members of fs.
576    """
577    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
578        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
579
580    from .queues import Queue  # Import here to avoid circular import problem.
581    done = Queue(loop=loop)
582
583    if loop is None:
584        loop = events.get_event_loop()
585    else:
586        warnings.warn("The loop argument is deprecated since Python 3.8, "
587                      "and scheduled for removal in Python 3.10.",
588                      DeprecationWarning, stacklevel=2)
589    todo = {ensure_future(f, loop=loop) for f in set(fs)}
590    timeout_handle = None
591
592    def _on_timeout():
593        for f in todo:
594            f.remove_done_callback(_on_completion)
595            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
596        todo.clear()  # Can't do todo.remove(f) in the loop.
597
598    def _on_completion(f):
599        if not todo:
600            return  # _on_timeout() was here first.
601        todo.remove(f)
602        done.put_nowait(f)
603        if not todo and timeout_handle is not None:
604            timeout_handle.cancel()
605
606    async def _wait_for_one():
607        f = await done.get()
608        if f is None:
609            # Dummy value from _on_timeout().
610            raise exceptions.TimeoutError
611        return f.result()  # May raise f.exception().
612
613    for f in todo:
614        f.add_done_callback(_on_completion)
615    if todo and timeout is not None:
616        timeout_handle = loop.call_later(timeout, _on_timeout)
617    for _ in range(len(todo)):
618        yield _wait_for_one()
619
620
621@types.coroutine
622def __sleep0():
623    """Skip one event loop run cycle.
624
625    This is a private helper for 'asyncio.sleep()', used
626    when the 'delay' is set to 0.  It uses a bare 'yield'
627    expression (which Task.__step knows how to handle)
628    instead of creating a Future object.
629    """
630    yield
631
632
633async def sleep(delay, result=None, *, loop=None):
634    """Coroutine that completes after a given time (in seconds)."""
635    if delay <= 0:
636        await __sleep0()
637        return result
638
639    if loop is None:
640        loop = events.get_running_loop()
641    else:
642        warnings.warn("The loop argument is deprecated since Python 3.8, "
643                      "and scheduled for removal in Python 3.10.",
644                      DeprecationWarning, stacklevel=2)
645
646    future = loop.create_future()
647    h = loop.call_later(delay,
648                        futures._set_result_unless_cancelled,
649                        future, result)
650    try:
651        return await future
652    finally:
653        h.cancel()
654
655
656def ensure_future(coro_or_future, *, loop=None):
657    """Wrap a coroutine or an awaitable in a future.
658
659    If the argument is a Future, it is returned directly.
660    """
661    if coroutines.iscoroutine(coro_or_future):
662        if loop is None:
663            loop = events.get_event_loop()
664        task = loop.create_task(coro_or_future)
665        if task._source_traceback:
666            del task._source_traceback[-1]
667        return task
668    elif futures.isfuture(coro_or_future):
669        if loop is not None and loop is not futures._get_loop(coro_or_future):
670            raise ValueError('The future belongs to a different loop than '
671                             'the one specified as the loop argument')
672        return coro_or_future
673    elif inspect.isawaitable(coro_or_future):
674        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
675    else:
676        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
677                        'required')
678
679
680@types.coroutine
681def _wrap_awaitable(awaitable):
682    """Helper for asyncio.ensure_future().
683
684    Wraps awaitable (an object with __await__) into a coroutine
685    that will later be wrapped in a Task by ensure_future().
686    """
687    return (yield from awaitable.__await__())
688
689_wrap_awaitable._is_coroutine = _is_coroutine
690
691
692class _GatheringFuture(futures.Future):
693    """Helper for gather().
694
695    This overrides cancel() to cancel all the children and act more
696    like Task.cancel(), which doesn't immediately mark itself as
697    cancelled.
698    """
699
700    def __init__(self, children, *, loop=None):
701        super().__init__(loop=loop)
702        self._children = children
703        self._cancel_requested = False
704
705    def cancel(self, msg=None):
706        if self.done():
707            return False
708        ret = False
709        for child in self._children:
710            if child.cancel(msg=msg):
711                ret = True
712        if ret:
713            # If any child tasks were actually cancelled, we should
714            # propagate the cancellation request regardless of
715            # *return_exceptions* argument.  See issue 32684.
716            self._cancel_requested = True
717        return ret
718
719
720def gather(*coros_or_futures, loop=None, return_exceptions=False):
721    """Return a future aggregating results from the given coroutines/futures.
722
723    Coroutines will be wrapped in a future and scheduled in the event
724    loop. They will not necessarily be scheduled in the same order as
725    passed in.
726
727    All futures must share the same event loop.  If all the tasks are
728    done successfully, the returned future's result is the list of
729    results (in the order of the original sequence, not necessarily
730    the order of results arrival).  If *return_exceptions* is True,
731    exceptions in the tasks are treated the same as successful
732    results, and gathered in the result list; otherwise, the first
733    raised exception will be immediately propagated to the returned
734    future.
735
736    Cancellation: if the outer Future is cancelled, all children (that
737    have not completed yet) are also cancelled.  If any child is
738    cancelled, this is treated as if it raised CancelledError --
739    the outer Future is *not* cancelled in this case.  (This is to
740    prevent the cancellation of one child to cause other children to
741    be cancelled.)
742
743    If *return_exceptions* is False, cancelling gather() after it
744    has been marked done won't cancel any submitted awaitables.
745    For instance, gather can be marked done after propagating an
746    exception to the caller, therefore, calling ``gather.cancel()``
747    after catching an exception (raised by one of the awaitables) from
748    gather won't cancel any other awaitables.
749    """
750    if not coros_or_futures:
751        if loop is None:
752            loop = events.get_event_loop()
753        else:
754            warnings.warn("The loop argument is deprecated since Python 3.8, "
755                          "and scheduled for removal in Python 3.10.",
756                          DeprecationWarning, stacklevel=2)
757        outer = loop.create_future()
758        outer.set_result([])
759        return outer
760
761    def _done_callback(fut):
762        nonlocal nfinished
763        nfinished += 1
764
765        if outer.done():
766            if not fut.cancelled():
767                # Mark exception retrieved.
768                fut.exception()
769            return
770
771        if not return_exceptions:
772            if fut.cancelled():
773                # Check if 'fut' is cancelled first, as
774                # 'fut.exception()' will *raise* a CancelledError
775                # instead of returning it.
776                exc = fut._make_cancelled_error()
777                outer.set_exception(exc)
778                return
779            else:
780                exc = fut.exception()
781                if exc is not None:
782                    outer.set_exception(exc)
783                    return
784
785        if nfinished == nfuts:
786            # All futures are done; create a list of results
787            # and set it to the 'outer' future.
788            results = []
789
790            for fut in children:
791                if fut.cancelled():
792                    # Check if 'fut' is cancelled first, as 'fut.exception()'
793                    # will *raise* a CancelledError instead of returning it.
794                    # Also, since we're adding the exception return value
795                    # to 'results' instead of raising it, don't bother
796                    # setting __context__.  This also lets us preserve
797                    # calling '_make_cancelled_error()' at most once.
798                    res = exceptions.CancelledError(
799                        '' if fut._cancel_message is None else
800                        fut._cancel_message)
801                else:
802                    res = fut.exception()
803                    if res is None:
804                        res = fut.result()
805                results.append(res)
806
807            if outer._cancel_requested:
808                # If gather is being cancelled we must propagate the
809                # cancellation regardless of *return_exceptions* argument.
810                # See issue 32684.
811                exc = fut._make_cancelled_error()
812                outer.set_exception(exc)
813            else:
814                outer.set_result(results)
815
816    arg_to_fut = {}
817    children = []
818    nfuts = 0
819    nfinished = 0
820    for arg in coros_or_futures:
821        if arg not in arg_to_fut:
822            fut = ensure_future(arg, loop=loop)
823            if loop is None:
824                loop = futures._get_loop(fut)
825            if fut is not arg:
826                # 'arg' was not a Future, therefore, 'fut' is a new
827                # Future created specifically for 'arg'.  Since the caller
828                # can't control it, disable the "destroy pending task"
829                # warning.
830                fut._log_destroy_pending = False
831
832            nfuts += 1
833            arg_to_fut[arg] = fut
834            fut.add_done_callback(_done_callback)
835
836        else:
837            # There's a duplicate Future object in coros_or_futures.
838            fut = arg_to_fut[arg]
839
840        children.append(fut)
841
842    outer = _GatheringFuture(children, loop=loop)
843    return outer
844
845
846def shield(arg, *, loop=None):
847    """Wait for a future, shielding it from cancellation.
848
849    The statement
850
851        res = await shield(something())
852
853    is exactly equivalent to the statement
854
855        res = await something()
856
857    *except* that if the coroutine containing it is cancelled, the
858    task running in something() is not cancelled.  From the POV of
859    something(), the cancellation did not happen.  But its caller is
860    still cancelled, so the yield-from expression still raises
861    CancelledError.  Note: If something() is cancelled by other means
862    this will still cancel shield().
863
864    If you want to completely ignore cancellation (not recommended)
865    you can combine shield() with a try/except clause, as follows:
866
867        try:
868            res = await shield(something())
869        except CancelledError:
870            res = None
871    """
872    if loop is not None:
873        warnings.warn("The loop argument is deprecated since Python 3.8, "
874                      "and scheduled for removal in Python 3.10.",
875                      DeprecationWarning, stacklevel=2)
876    inner = ensure_future(arg, loop=loop)
877    if inner.done():
878        # Shortcut.
879        return inner
880    loop = futures._get_loop(inner)
881    outer = loop.create_future()
882
883    def _inner_done_callback(inner):
884        if outer.cancelled():
885            if not inner.cancelled():
886                # Mark inner's result as retrieved.
887                inner.exception()
888            return
889
890        if inner.cancelled():
891            outer.cancel()
892        else:
893            exc = inner.exception()
894            if exc is not None:
895                outer.set_exception(exc)
896            else:
897                outer.set_result(inner.result())
898
899
900    def _outer_done_callback(outer):
901        if not inner.done():
902            inner.remove_done_callback(_inner_done_callback)
903
904    inner.add_done_callback(_inner_done_callback)
905    outer.add_done_callback(_outer_done_callback)
906    return outer
907
908
909def run_coroutine_threadsafe(coro, loop):
910    """Submit a coroutine object to a given event loop.
911
912    Return a concurrent.futures.Future to access the result.
913    """
914    if not coroutines.iscoroutine(coro):
915        raise TypeError('A coroutine object is required')
916    future = concurrent.futures.Future()
917
918    def callback():
919        try:
920            futures._chain_future(ensure_future(coro, loop=loop), future)
921        except (SystemExit, KeyboardInterrupt):
922            raise
923        except BaseException as exc:
924            if future.set_running_or_notify_cancel():
925                future.set_exception(exc)
926            raise
927
928    loop.call_soon_threadsafe(callback)
929    return future
930
931
932# WeakSet containing all alive tasks.
933_all_tasks = weakref.WeakSet()
934
935# Dictionary containing tasks that are currently active in
936# all running event loops.  {EventLoop: Task}
937_current_tasks = {}
938
939
940def _register_task(task):
941    """Register a new task in asyncio as executed by loop."""
942    _all_tasks.add(task)
943
944
945def _enter_task(loop, task):
946    current_task = _current_tasks.get(loop)
947    if current_task is not None:
948        raise RuntimeError(f"Cannot enter into task {task!r} while another "
949                           f"task {current_task!r} is being executed.")
950    _current_tasks[loop] = task
951
952
953def _leave_task(loop, task):
954    current_task = _current_tasks.get(loop)
955    if current_task is not task:
956        raise RuntimeError(f"Leaving task {task!r} does not match "
957                           f"the current task {current_task!r}.")
958    del _current_tasks[loop]
959
960
961def _unregister_task(task):
962    """Unregister a task."""
963    _all_tasks.discard(task)
964
965
966_py_register_task = _register_task
967_py_unregister_task = _unregister_task
968_py_enter_task = _enter_task
969_py_leave_task = _leave_task
970
971
972try:
973    from _asyncio import (_register_task, _unregister_task,
974                          _enter_task, _leave_task,
975                          _all_tasks, _current_tasks)
976except ImportError:
977    pass
978else:
979    _c_register_task = _register_task
980    _c_unregister_task = _unregister_task
981    _c_enter_task = _enter_task
982    _c_leave_task = _leave_task
983