• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35    """Return a currently executed task."""
36    if loop is None:
37        loop = events.get_running_loop()
38    return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42    """Return a set of all tasks for the loop."""
43    if loop is None:
44        loop = events.get_running_loop()
45    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46    # thread while we do so. Therefore we cast it to list prior to filtering. The list
47    # cast itself requires iteration, so we repeat it several times ignoring
48    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49    # details.
50    i = 0
51    while True:
52        try:
53            tasks = list(_all_tasks)
54        except RuntimeError:
55            i += 1
56            if i >= 1000:
57                raise
58        else:
59            break
60    return {t for t in tasks
61            if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65    # Different from "all_task()" by returning *all* Tasks, including
66    # the completed ones.  Used to implement deprecated "Tasks.all_task()"
67    # method.
68    if loop is None:
69        loop = events.get_event_loop()
70    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71    # thread while we do so. Therefore we cast it to list prior to filtering. The list
72    # cast itself requires iteration, so we repeat it several times ignoring
73    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74    # details.
75    i = 0
76    while True:
77        try:
78            tasks = list(_all_tasks)
79        except RuntimeError:
80            i += 1
81            if i >= 1000:
82                raise
83        else:
84            break
85    return {t for t in tasks if futures._get_loop(t) is loop}
86
87
88def _set_task_name(task, name):
89    if name is not None:
90        try:
91            set_name = task.set_name
92        except AttributeError:
93            pass
94        else:
95            set_name(name)
96
97
98class Task(futures._PyFuture):  # Inherit Python Task implementation
99                                # from a Python Future implementation.
100
101    """A coroutine wrapped in a Future."""
102
103    # An important invariant maintained while a Task not done:
104    #
105    # - Either _fut_waiter is None, and _step() is scheduled;
106    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107    #
108    # The only transition from the latter to the former is through
109    # _wakeup().  When _fut_waiter is not None, one of its callbacks
110    # must be _wakeup().
111
112    # If False, don't log a message if the task is destroyed whereas its
113    # status is still pending
114    _log_destroy_pending = True
115
116    @classmethod
117    def current_task(cls, loop=None):
118        """Return the currently running task in an event loop or None.
119
120        By default the current task for the current event loop is returned.
121
122        None is returned when called not in the context of a Task.
123        """
124        warnings.warn("Task.current_task() is deprecated since Python 3.7, "
125                      "use asyncio.current_task() instead",
126                      DeprecationWarning,
127                      stacklevel=2)
128        if loop is None:
129            loop = events.get_event_loop()
130        return current_task(loop)
131
132    @classmethod
133    def all_tasks(cls, loop=None):
134        """Return a set of all tasks for an event loop.
135
136        By default all tasks for the current event loop are returned.
137        """
138        warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
139                      "use asyncio.all_tasks() instead",
140                      DeprecationWarning,
141                      stacklevel=2)
142        return _all_tasks_compat(loop)
143
144    def __init__(self, coro, *, loop=None, name=None):
145        super().__init__(loop=loop)
146        if self._source_traceback:
147            del self._source_traceback[-1]
148        if not coroutines.iscoroutine(coro):
149            # raise after Future.__init__(), attrs are required for __del__
150            # prevent logging for pending task in __del__
151            self._log_destroy_pending = False
152            raise TypeError(f"a coroutine was expected, got {coro!r}")
153
154        if name is None:
155            self._name = f'Task-{_task_name_counter()}'
156        else:
157            self._name = str(name)
158
159        self._must_cancel = False
160        self._fut_waiter = None
161        self._coro = coro
162        self._context = contextvars.copy_context()
163
164        self._loop.call_soon(self.__step, context=self._context)
165        _register_task(self)
166
167    def __del__(self):
168        if self._state == futures._PENDING and self._log_destroy_pending:
169            context = {
170                'task': self,
171                'message': 'Task was destroyed but it is pending!',
172            }
173            if self._source_traceback:
174                context['source_traceback'] = self._source_traceback
175            self._loop.call_exception_handler(context)
176        super().__del__()
177
178    def _repr_info(self):
179        return base_tasks._task_repr_info(self)
180
181    def get_coro(self):
182        return self._coro
183
184    def get_name(self):
185        return self._name
186
187    def set_name(self, value):
188        self._name = str(value)
189
190    def set_result(self, result):
191        raise RuntimeError('Task does not support set_result operation')
192
193    def set_exception(self, exception):
194        raise RuntimeError('Task does not support set_exception operation')
195
196    def get_stack(self, *, limit=None):
197        """Return the list of stack frames for this task's coroutine.
198
199        If the coroutine is not done, this returns the stack where it is
200        suspended.  If the coroutine has completed successfully or was
201        cancelled, this returns an empty list.  If the coroutine was
202        terminated by an exception, this returns the list of traceback
203        frames.
204
205        The frames are always ordered from oldest to newest.
206
207        The optional limit gives the maximum number of frames to
208        return; by default all available frames are returned.  Its
209        meaning differs depending on whether a stack or a traceback is
210        returned: the newest frames of a stack are returned, but the
211        oldest frames of a traceback are returned.  (This matches the
212        behavior of the traceback module.)
213
214        For reasons beyond our control, only one stack frame is
215        returned for a suspended coroutine.
216        """
217        return base_tasks._task_get_stack(self, limit)
218
219    def print_stack(self, *, limit=None, file=None):
220        """Print the stack or traceback for this task's coroutine.
221
222        This produces output similar to that of the traceback module,
223        for the frames retrieved by get_stack().  The limit argument
224        is passed to get_stack().  The file argument is an I/O stream
225        to which the output is written; by default output is written
226        to sys.stderr.
227        """
228        return base_tasks._task_print_stack(self, limit, file)
229
230    def cancel(self):
231        """Request that this task cancel itself.
232
233        This arranges for a CancelledError to be thrown into the
234        wrapped coroutine on the next cycle through the event loop.
235        The coroutine then has a chance to clean up or even deny
236        the request using try/except/finally.
237
238        Unlike Future.cancel, this does not guarantee that the
239        task will be cancelled: the exception might be caught and
240        acted upon, delaying cancellation of the task or preventing
241        cancellation completely.  The task may also return a value or
242        raise a different exception.
243
244        Immediately after this method is called, Task.cancelled() will
245        not return True (unless the task was already cancelled).  A
246        task will be marked as cancelled when the wrapped coroutine
247        terminates with a CancelledError exception (even if cancel()
248        was not called).
249        """
250        self._log_traceback = False
251        if self.done():
252            return False
253        if self._fut_waiter is not None:
254            if self._fut_waiter.cancel():
255                # Leave self._fut_waiter; it may be a Task that
256                # catches and ignores the cancellation so we may have
257                # to cancel it again later.
258                return True
259        # It must be the case that self.__step is already scheduled.
260        self._must_cancel = True
261        return True
262
263    def __step(self, exc=None):
264        if self.done():
265            raise exceptions.InvalidStateError(
266                f'_step(): already done: {self!r}, {exc!r}')
267        if self._must_cancel:
268            if not isinstance(exc, exceptions.CancelledError):
269                exc = exceptions.CancelledError()
270            self._must_cancel = False
271        coro = self._coro
272        self._fut_waiter = None
273
274        _enter_task(self._loop, self)
275        # Call either coro.throw(exc) or coro.send(None).
276        try:
277            if exc is None:
278                # We use the `send` method directly, because coroutines
279                # don't have `__iter__` and `__next__` methods.
280                result = coro.send(None)
281            else:
282                result = coro.throw(exc)
283        except StopIteration as exc:
284            if self._must_cancel:
285                # Task is cancelled right before coro stops.
286                self._must_cancel = False
287                super().cancel()
288            else:
289                super().set_result(exc.value)
290        except exceptions.CancelledError:
291            super().cancel()  # I.e., Future.cancel(self).
292        except (KeyboardInterrupt, SystemExit) as exc:
293            super().set_exception(exc)
294            raise
295        except BaseException as exc:
296            super().set_exception(exc)
297        else:
298            blocking = getattr(result, '_asyncio_future_blocking', None)
299            if blocking is not None:
300                # Yielded Future must come from Future.__iter__().
301                if futures._get_loop(result) is not self._loop:
302                    new_exc = RuntimeError(
303                        f'Task {self!r} got Future '
304                        f'{result!r} attached to a different loop')
305                    self._loop.call_soon(
306                        self.__step, new_exc, context=self._context)
307                elif blocking:
308                    if result is self:
309                        new_exc = RuntimeError(
310                            f'Task cannot await on itself: {self!r}')
311                        self._loop.call_soon(
312                            self.__step, new_exc, context=self._context)
313                    else:
314                        result._asyncio_future_blocking = False
315                        result.add_done_callback(
316                            self.__wakeup, context=self._context)
317                        self._fut_waiter = result
318                        if self._must_cancel:
319                            if self._fut_waiter.cancel():
320                                self._must_cancel = False
321                else:
322                    new_exc = RuntimeError(
323                        f'yield was used instead of yield from '
324                        f'in task {self!r} with {result!r}')
325                    self._loop.call_soon(
326                        self.__step, new_exc, context=self._context)
327
328            elif result is None:
329                # Bare yield relinquishes control for one event loop iteration.
330                self._loop.call_soon(self.__step, context=self._context)
331            elif inspect.isgenerator(result):
332                # Yielding a generator is just wrong.
333                new_exc = RuntimeError(
334                    f'yield was used instead of yield from for '
335                    f'generator in task {self!r} with {result!r}')
336                self._loop.call_soon(
337                    self.__step, new_exc, context=self._context)
338            else:
339                # Yielding something else is an error.
340                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
341                self._loop.call_soon(
342                    self.__step, new_exc, context=self._context)
343        finally:
344            _leave_task(self._loop, self)
345            self = None  # Needed to break cycles when an exception occurs.
346
347    def __wakeup(self, future):
348        try:
349            future.result()
350        except BaseException as exc:
351            # This may also be a cancellation.
352            self.__step(exc)
353        else:
354            # Don't pass the value of `future.result()` explicitly,
355            # as `Future.__iter__` and `Future.__await__` don't need it.
356            # If we call `_step(value, None)` instead of `_step()`,
357            # Python eval loop would use `.send(value)` method call,
358            # instead of `__next__()`, which is slower for futures
359            # that return non-generator iterators from their `__iter__`.
360            self.__step()
361        self = None  # Needed to break cycles when an exception occurs.
362
363
364_PyTask = Task
365
366
367try:
368    import _asyncio
369except ImportError:
370    pass
371else:
372    # _CTask is needed for tests.
373    Task = _CTask = _asyncio.Task
374
375
376def create_task(coro, *, name=None):
377    """Schedule the execution of a coroutine object in a spawn task.
378
379    Return a Task object.
380    """
381    loop = events.get_running_loop()
382    task = loop.create_task(coro)
383    _set_task_name(task, name)
384    return task
385
386
387# wait() and as_completed() similar to those in PEP 3148.
388
389FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
390FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
391ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
392
393
394async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
395    """Wait for the Futures and coroutines given by fs to complete.
396
397    The sequence futures must not be empty.
398
399    Coroutines will be wrapped in Tasks.
400
401    Returns two sets of Future: (done, pending).
402
403    Usage:
404
405        done, pending = await asyncio.wait(fs)
406
407    Note: This does not raise TimeoutError! Futures that aren't done
408    when the timeout occurs are returned in the second set.
409    """
410    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
411        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
412    if not fs:
413        raise ValueError('Set of coroutines/Futures is empty.')
414    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
415        raise ValueError(f'Invalid return_when value: {return_when}')
416
417    if loop is None:
418        loop = events.get_running_loop()
419    else:
420        warnings.warn("The loop argument is deprecated since Python 3.8, "
421                      "and scheduled for removal in Python 3.10.",
422                      DeprecationWarning, stacklevel=2)
423
424    fs = {ensure_future(f, loop=loop) for f in set(fs)}
425
426    return await _wait(fs, timeout, return_when, loop)
427
428
429def _release_waiter(waiter, *args):
430    if not waiter.done():
431        waiter.set_result(None)
432
433
434async def wait_for(fut, timeout, *, loop=None):
435    """Wait for the single Future or coroutine to complete, with timeout.
436
437    Coroutine will be wrapped in Task.
438
439    Returns result of the Future or coroutine.  When a timeout occurs,
440    it cancels the task and raises TimeoutError.  To avoid the task
441    cancellation, wrap it in shield().
442
443    If the wait is cancelled, the task is also cancelled.
444
445    This function is a coroutine.
446    """
447    if loop is None:
448        loop = events.get_running_loop()
449    else:
450        warnings.warn("The loop argument is deprecated since Python 3.8, "
451                      "and scheduled for removal in Python 3.10.",
452                      DeprecationWarning, stacklevel=2)
453
454    if timeout is None:
455        return await fut
456
457    if timeout <= 0:
458        fut = ensure_future(fut, loop=loop)
459
460        if fut.done():
461            return fut.result()
462
463        fut.cancel()
464        raise exceptions.TimeoutError()
465
466    waiter = loop.create_future()
467    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
468    cb = functools.partial(_release_waiter, waiter)
469
470    fut = ensure_future(fut, loop=loop)
471    fut.add_done_callback(cb)
472
473    try:
474        # wait until the future completes or the timeout
475        try:
476            await waiter
477        except exceptions.CancelledError:
478            fut.remove_done_callback(cb)
479            fut.cancel()
480            raise
481
482        if fut.done():
483            return fut.result()
484        else:
485            fut.remove_done_callback(cb)
486            # We must ensure that the task is not running
487            # after wait_for() returns.
488            # See https://bugs.python.org/issue32751
489            await _cancel_and_wait(fut, loop=loop)
490            raise exceptions.TimeoutError()
491    finally:
492        timeout_handle.cancel()
493
494
495async def _wait(fs, timeout, return_when, loop):
496    """Internal helper for wait().
497
498    The fs argument must be a collection of Futures.
499    """
500    assert fs, 'Set of Futures is empty.'
501    waiter = loop.create_future()
502    timeout_handle = None
503    if timeout is not None:
504        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
505    counter = len(fs)
506
507    def _on_completion(f):
508        nonlocal counter
509        counter -= 1
510        if (counter <= 0 or
511            return_when == FIRST_COMPLETED or
512            return_when == FIRST_EXCEPTION and (not f.cancelled() and
513                                                f.exception() is not None)):
514            if timeout_handle is not None:
515                timeout_handle.cancel()
516            if not waiter.done():
517                waiter.set_result(None)
518
519    for f in fs:
520        f.add_done_callback(_on_completion)
521
522    try:
523        await waiter
524    finally:
525        if timeout_handle is not None:
526            timeout_handle.cancel()
527        for f in fs:
528            f.remove_done_callback(_on_completion)
529
530    done, pending = set(), set()
531    for f in fs:
532        if f.done():
533            done.add(f)
534        else:
535            pending.add(f)
536    return done, pending
537
538
539async def _cancel_and_wait(fut, loop):
540    """Cancel the *fut* future or task and wait until it completes."""
541
542    waiter = loop.create_future()
543    cb = functools.partial(_release_waiter, waiter)
544    fut.add_done_callback(cb)
545
546    try:
547        fut.cancel()
548        # We cannot wait on *fut* directly to make
549        # sure _cancel_and_wait itself is reliably cancellable.
550        await waiter
551    finally:
552        fut.remove_done_callback(cb)
553
554
555# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
556def as_completed(fs, *, loop=None, timeout=None):
557    """Return an iterator whose values are coroutines.
558
559    When waiting for the yielded coroutines you'll get the results (or
560    exceptions!) of the original Futures (or coroutines), in the order
561    in which and as soon as they complete.
562
563    This differs from PEP 3148; the proper way to use this is:
564
565        for f in as_completed(fs):
566            result = await f  # The 'await' may raise.
567            # Use result.
568
569    If a timeout is specified, the 'await' will raise
570    TimeoutError when the timeout occurs before all Futures are done.
571
572    Note: The futures 'f' are not necessarily members of fs.
573    """
574    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
575        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
576
577    from .queues import Queue  # Import here to avoid circular import problem.
578    done = Queue(loop=loop)
579
580    if loop is None:
581        loop = events.get_event_loop()
582    else:
583        warnings.warn("The loop argument is deprecated since Python 3.8, "
584                      "and scheduled for removal in Python 3.10.",
585                      DeprecationWarning, stacklevel=2)
586    todo = {ensure_future(f, loop=loop) for f in set(fs)}
587    timeout_handle = None
588
589    def _on_timeout():
590        for f in todo:
591            f.remove_done_callback(_on_completion)
592            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
593        todo.clear()  # Can't do todo.remove(f) in the loop.
594
595    def _on_completion(f):
596        if not todo:
597            return  # _on_timeout() was here first.
598        todo.remove(f)
599        done.put_nowait(f)
600        if not todo and timeout_handle is not None:
601            timeout_handle.cancel()
602
603    async def _wait_for_one():
604        f = await done.get()
605        if f is None:
606            # Dummy value from _on_timeout().
607            raise exceptions.TimeoutError
608        return f.result()  # May raise f.exception().
609
610    for f in todo:
611        f.add_done_callback(_on_completion)
612    if todo and timeout is not None:
613        timeout_handle = loop.call_later(timeout, _on_timeout)
614    for _ in range(len(todo)):
615        yield _wait_for_one()
616
617
618@types.coroutine
619def __sleep0():
620    """Skip one event loop run cycle.
621
622    This is a private helper for 'asyncio.sleep()', used
623    when the 'delay' is set to 0.  It uses a bare 'yield'
624    expression (which Task.__step knows how to handle)
625    instead of creating a Future object.
626    """
627    yield
628
629
630async def sleep(delay, result=None, *, loop=None):
631    """Coroutine that completes after a given time (in seconds)."""
632    if delay <= 0:
633        await __sleep0()
634        return result
635
636    if loop is None:
637        loop = events.get_running_loop()
638    else:
639        warnings.warn("The loop argument is deprecated since Python 3.8, "
640                      "and scheduled for removal in Python 3.10.",
641                      DeprecationWarning, stacklevel=2)
642
643    future = loop.create_future()
644    h = loop.call_later(delay,
645                        futures._set_result_unless_cancelled,
646                        future, result)
647    try:
648        return await future
649    finally:
650        h.cancel()
651
652
653def ensure_future(coro_or_future, *, loop=None):
654    """Wrap a coroutine or an awaitable in a future.
655
656    If the argument is a Future, it is returned directly.
657    """
658    if coroutines.iscoroutine(coro_or_future):
659        if loop is None:
660            loop = events.get_event_loop()
661        task = loop.create_task(coro_or_future)
662        if task._source_traceback:
663            del task._source_traceback[-1]
664        return task
665    elif futures.isfuture(coro_or_future):
666        if loop is not None and loop is not futures._get_loop(coro_or_future):
667            raise ValueError('The future belongs to a different loop than '
668                             'the one specified as the loop argument')
669        return coro_or_future
670    elif inspect.isawaitable(coro_or_future):
671        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
672    else:
673        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
674                        'required')
675
676
677@types.coroutine
678def _wrap_awaitable(awaitable):
679    """Helper for asyncio.ensure_future().
680
681    Wraps awaitable (an object with __await__) into a coroutine
682    that will later be wrapped in a Task by ensure_future().
683    """
684    return (yield from awaitable.__await__())
685
686_wrap_awaitable._is_coroutine = _is_coroutine
687
688
689class _GatheringFuture(futures.Future):
690    """Helper for gather().
691
692    This overrides cancel() to cancel all the children and act more
693    like Task.cancel(), which doesn't immediately mark itself as
694    cancelled.
695    """
696
697    def __init__(self, children, *, loop=None):
698        super().__init__(loop=loop)
699        self._children = children
700        self._cancel_requested = False
701
702    def cancel(self):
703        if self.done():
704            return False
705        ret = False
706        for child in self._children:
707            if child.cancel():
708                ret = True
709        if ret:
710            # If any child tasks were actually cancelled, we should
711            # propagate the cancellation request regardless of
712            # *return_exceptions* argument.  See issue 32684.
713            self._cancel_requested = True
714        return ret
715
716
717def gather(*coros_or_futures, loop=None, return_exceptions=False):
718    """Return a future aggregating results from the given coroutines/futures.
719
720    Coroutines will be wrapped in a future and scheduled in the event
721    loop. They will not necessarily be scheduled in the same order as
722    passed in.
723
724    All futures must share the same event loop.  If all the tasks are
725    done successfully, the returned future's result is the list of
726    results (in the order of the original sequence, not necessarily
727    the order of results arrival).  If *return_exceptions* is True,
728    exceptions in the tasks are treated the same as successful
729    results, and gathered in the result list; otherwise, the first
730    raised exception will be immediately propagated to the returned
731    future.
732
733    Cancellation: if the outer Future is cancelled, all children (that
734    have not completed yet) are also cancelled.  If any child is
735    cancelled, this is treated as if it raised CancelledError --
736    the outer Future is *not* cancelled in this case.  (This is to
737    prevent the cancellation of one child to cause other children to
738    be cancelled.)
739    """
740    if not coros_or_futures:
741        if loop is None:
742            loop = events.get_event_loop()
743        else:
744            warnings.warn("The loop argument is deprecated since Python 3.8, "
745                          "and scheduled for removal in Python 3.10.",
746                          DeprecationWarning, stacklevel=2)
747        outer = loop.create_future()
748        outer.set_result([])
749        return outer
750
751    def _done_callback(fut):
752        nonlocal nfinished
753        nfinished += 1
754
755        if outer.done():
756            if not fut.cancelled():
757                # Mark exception retrieved.
758                fut.exception()
759            return
760
761        if not return_exceptions:
762            if fut.cancelled():
763                # Check if 'fut' is cancelled first, as
764                # 'fut.exception()' will *raise* a CancelledError
765                # instead of returning it.
766                exc = exceptions.CancelledError()
767                outer.set_exception(exc)
768                return
769            else:
770                exc = fut.exception()
771                if exc is not None:
772                    outer.set_exception(exc)
773                    return
774
775        if nfinished == nfuts:
776            # All futures are done; create a list of results
777            # and set it to the 'outer' future.
778            results = []
779
780            for fut in children:
781                if fut.cancelled():
782                    # Check if 'fut' is cancelled first, as
783                    # 'fut.exception()' will *raise* a CancelledError
784                    # instead of returning it.
785                    res = exceptions.CancelledError()
786                else:
787                    res = fut.exception()
788                    if res is None:
789                        res = fut.result()
790                results.append(res)
791
792            if outer._cancel_requested:
793                # If gather is being cancelled we must propagate the
794                # cancellation regardless of *return_exceptions* argument.
795                # See issue 32684.
796                outer.set_exception(exceptions.CancelledError())
797            else:
798                outer.set_result(results)
799
800    arg_to_fut = {}
801    children = []
802    nfuts = 0
803    nfinished = 0
804    for arg in coros_or_futures:
805        if arg not in arg_to_fut:
806            fut = ensure_future(arg, loop=loop)
807            if loop is None:
808                loop = futures._get_loop(fut)
809            if fut is not arg:
810                # 'arg' was not a Future, therefore, 'fut' is a new
811                # Future created specifically for 'arg'.  Since the caller
812                # can't control it, disable the "destroy pending task"
813                # warning.
814                fut._log_destroy_pending = False
815
816            nfuts += 1
817            arg_to_fut[arg] = fut
818            fut.add_done_callback(_done_callback)
819
820        else:
821            # There's a duplicate Future object in coros_or_futures.
822            fut = arg_to_fut[arg]
823
824        children.append(fut)
825
826    outer = _GatheringFuture(children, loop=loop)
827    return outer
828
829
830def shield(arg, *, loop=None):
831    """Wait for a future, shielding it from cancellation.
832
833    The statement
834
835        res = await shield(something())
836
837    is exactly equivalent to the statement
838
839        res = await something()
840
841    *except* that if the coroutine containing it is cancelled, the
842    task running in something() is not cancelled.  From the POV of
843    something(), the cancellation did not happen.  But its caller is
844    still cancelled, so the yield-from expression still raises
845    CancelledError.  Note: If something() is cancelled by other means
846    this will still cancel shield().
847
848    If you want to completely ignore cancellation (not recommended)
849    you can combine shield() with a try/except clause, as follows:
850
851        try:
852            res = await shield(something())
853        except CancelledError:
854            res = None
855    """
856    if loop is not None:
857        warnings.warn("The loop argument is deprecated since Python 3.8, "
858                      "and scheduled for removal in Python 3.10.",
859                      DeprecationWarning, stacklevel=2)
860    inner = ensure_future(arg, loop=loop)
861    if inner.done():
862        # Shortcut.
863        return inner
864    loop = futures._get_loop(inner)
865    outer = loop.create_future()
866
867    def _inner_done_callback(inner):
868        if outer.cancelled():
869            if not inner.cancelled():
870                # Mark inner's result as retrieved.
871                inner.exception()
872            return
873
874        if inner.cancelled():
875            outer.cancel()
876        else:
877            exc = inner.exception()
878            if exc is not None:
879                outer.set_exception(exc)
880            else:
881                outer.set_result(inner.result())
882
883
884    def _outer_done_callback(outer):
885        if not inner.done():
886            inner.remove_done_callback(_inner_done_callback)
887
888    inner.add_done_callback(_inner_done_callback)
889    outer.add_done_callback(_outer_done_callback)
890    return outer
891
892
893def run_coroutine_threadsafe(coro, loop):
894    """Submit a coroutine object to a given event loop.
895
896    Return a concurrent.futures.Future to access the result.
897    """
898    if not coroutines.iscoroutine(coro):
899        raise TypeError('A coroutine object is required')
900    future = concurrent.futures.Future()
901
902    def callback():
903        try:
904            futures._chain_future(ensure_future(coro, loop=loop), future)
905        except (SystemExit, KeyboardInterrupt):
906            raise
907        except BaseException as exc:
908            if future.set_running_or_notify_cancel():
909                future.set_exception(exc)
910            raise
911
912    loop.call_soon_threadsafe(callback)
913    return future
914
915
916# WeakSet containing all alive tasks.
917_all_tasks = weakref.WeakSet()
918
919# Dictionary containing tasks that are currently active in
920# all running event loops.  {EventLoop: Task}
921_current_tasks = {}
922
923
924def _register_task(task):
925    """Register a new task in asyncio as executed by loop."""
926    _all_tasks.add(task)
927
928
929def _enter_task(loop, task):
930    current_task = _current_tasks.get(loop)
931    if current_task is not None:
932        raise RuntimeError(f"Cannot enter into task {task!r} while another "
933                           f"task {current_task!r} is being executed.")
934    _current_tasks[loop] = task
935
936
937def _leave_task(loop, task):
938    current_task = _current_tasks.get(loop)
939    if current_task is not task:
940        raise RuntimeError(f"Leaving task {task!r} does not match "
941                           f"the current task {current_task!r}.")
942    del _current_tasks[loop]
943
944
945def _unregister_task(task):
946    """Unregister a task."""
947    _all_tasks.discard(task)
948
949
950_py_register_task = _register_task
951_py_unregister_task = _unregister_task
952_py_enter_task = _enter_task
953_py_leave_task = _leave_task
954
955
956try:
957    from _asyncio import (_register_task, _unregister_task,
958                          _enter_task, _leave_task,
959                          _all_tasks, _current_tasks)
960except ImportError:
961    pass
962else:
963    _c_register_task = _register_task
964    _c_unregister_task = _unregister_task
965    _c_enter_task = _enter_task
966    _c_leave_task = _leave_task
967