• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35    """Return a currently executed task."""
36    if loop is None:
37        loop = events.get_running_loop()
38    return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42    """Return a set of all tasks for the loop."""
43    if loop is None:
44        loop = events.get_running_loop()
45    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46    # thread while we do so. Therefore we cast it to list prior to filtering. The list
47    # cast itself requires iteration, so we repeat it several times ignoring
48    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49    # details.
50    i = 0
51    while True:
52        try:
53            tasks = list(_all_tasks)
54        except RuntimeError:
55            i += 1
56            if i >= 1000:
57                raise
58        else:
59            break
60    return {t for t in tasks
61            if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65    # Different from "all_task()" by returning *all* Tasks, including
66    # the completed ones.  Used to implement deprecated "Tasks.all_task()"
67    # method.
68    if loop is None:
69        loop = events.get_event_loop()
70    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71    # thread while we do so. Therefore we cast it to list prior to filtering. The list
72    # cast itself requires iteration, so we repeat it several times ignoring
73    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74    # details.
75    i = 0
76    while True:
77        try:
78            tasks = list(_all_tasks)
79        except RuntimeError:
80            i += 1
81            if i >= 1000:
82                raise
83        else:
84            break
85    return {t for t in tasks if futures._get_loop(t) is loop}
86
87
88def _set_task_name(task, name):
89    if name is not None:
90        try:
91            set_name = task.set_name
92        except AttributeError:
93            pass
94        else:
95            set_name(name)
96
97
98class Task(futures._PyFuture):  # Inherit Python Task implementation
99                                # from a Python Future implementation.
100
101    """A coroutine wrapped in a Future."""
102
103    # An important invariant maintained while a Task not done:
104    #
105    # - Either _fut_waiter is None, and _step() is scheduled;
106    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107    #
108    # The only transition from the latter to the former is through
109    # _wakeup().  When _fut_waiter is not None, one of its callbacks
110    # must be _wakeup().
111
112    # If False, don't log a message if the task is destroyed whereas its
113    # status is still pending
114    _log_destroy_pending = True
115
116    @classmethod
117    def current_task(cls, loop=None):
118        """Return the currently running task in an event loop or None.
119
120        By default the current task for the current event loop is returned.
121
122        None is returned when called not in the context of a Task.
123        """
124        warnings.warn("Task.current_task() is deprecated since Python 3.7, "
125                      "use asyncio.current_task() instead",
126                      DeprecationWarning,
127                      stacklevel=2)
128        if loop is None:
129            loop = events.get_event_loop()
130        return current_task(loop)
131
132    @classmethod
133    def all_tasks(cls, loop=None):
134        """Return a set of all tasks for an event loop.
135
136        By default all tasks for the current event loop are returned.
137        """
138        warnings.warn("Task.all_tasks() is deprecated since Python 3.7, "
139                      "use asyncio.all_tasks() instead",
140                      DeprecationWarning,
141                      stacklevel=2)
142        return _all_tasks_compat(loop)
143
144    def __init__(self, coro, *, loop=None, name=None):
145        super().__init__(loop=loop)
146        if self._source_traceback:
147            del self._source_traceback[-1]
148        if not coroutines.iscoroutine(coro):
149            # raise after Future.__init__(), attrs are required for __del__
150            # prevent logging for pending task in __del__
151            self._log_destroy_pending = False
152            raise TypeError(f"a coroutine was expected, got {coro!r}")
153
154        if name is None:
155            self._name = f'Task-{_task_name_counter()}'
156        else:
157            self._name = str(name)
158
159        self._must_cancel = False
160        self._fut_waiter = None
161        self._coro = coro
162        self._context = contextvars.copy_context()
163
164        self._loop.call_soon(self.__step, context=self._context)
165        _register_task(self)
166
167    def __del__(self):
168        if self._state == futures._PENDING and self._log_destroy_pending:
169            context = {
170                'task': self,
171                'message': 'Task was destroyed but it is pending!',
172            }
173            if self._source_traceback:
174                context['source_traceback'] = self._source_traceback
175            self._loop.call_exception_handler(context)
176        super().__del__()
177
178    def _repr_info(self):
179        return base_tasks._task_repr_info(self)
180
181    def get_coro(self):
182        return self._coro
183
184    def get_name(self):
185        return self._name
186
187    def set_name(self, value):
188        self._name = str(value)
189
190    def set_result(self, result):
191        raise RuntimeError('Task does not support set_result operation')
192
193    def set_exception(self, exception):
194        raise RuntimeError('Task does not support set_exception operation')
195
196    def get_stack(self, *, limit=None):
197        """Return the list of stack frames for this task's coroutine.
198
199        If the coroutine is not done, this returns the stack where it is
200        suspended.  If the coroutine has completed successfully or was
201        cancelled, this returns an empty list.  If the coroutine was
202        terminated by an exception, this returns the list of traceback
203        frames.
204
205        The frames are always ordered from oldest to newest.
206
207        The optional limit gives the maximum number of frames to
208        return; by default all available frames are returned.  Its
209        meaning differs depending on whether a stack or a traceback is
210        returned: the newest frames of a stack are returned, but the
211        oldest frames of a traceback are returned.  (This matches the
212        behavior of the traceback module.)
213
214        For reasons beyond our control, only one stack frame is
215        returned for a suspended coroutine.
216        """
217        return base_tasks._task_get_stack(self, limit)
218
219    def print_stack(self, *, limit=None, file=None):
220        """Print the stack or traceback for this task's coroutine.
221
222        This produces output similar to that of the traceback module,
223        for the frames retrieved by get_stack().  The limit argument
224        is passed to get_stack().  The file argument is an I/O stream
225        to which the output is written; by default output is written
226        to sys.stderr.
227        """
228        return base_tasks._task_print_stack(self, limit, file)
229
230    def cancel(self):
231        """Request that this task cancel itself.
232
233        This arranges for a CancelledError to be thrown into the
234        wrapped coroutine on the next cycle through the event loop.
235        The coroutine then has a chance to clean up or even deny
236        the request using try/except/finally.
237
238        Unlike Future.cancel, this does not guarantee that the
239        task will be cancelled: the exception might be caught and
240        acted upon, delaying cancellation of the task or preventing
241        cancellation completely.  The task may also return a value or
242        raise a different exception.
243
244        Immediately after this method is called, Task.cancelled() will
245        not return True (unless the task was already cancelled).  A
246        task will be marked as cancelled when the wrapped coroutine
247        terminates with a CancelledError exception (even if cancel()
248        was not called).
249        """
250        self._log_traceback = False
251        if self.done():
252            return False
253        if self._fut_waiter is not None:
254            if self._fut_waiter.cancel():
255                # Leave self._fut_waiter; it may be a Task that
256                # catches and ignores the cancellation so we may have
257                # to cancel it again later.
258                return True
259        # It must be the case that self.__step is already scheduled.
260        self._must_cancel = True
261        return True
262
263    def __step(self, exc=None):
264        if self.done():
265            raise exceptions.InvalidStateError(
266                f'_step(): already done: {self!r}, {exc!r}')
267        if self._must_cancel:
268            if not isinstance(exc, exceptions.CancelledError):
269                exc = exceptions.CancelledError()
270            self._must_cancel = False
271        coro = self._coro
272        self._fut_waiter = None
273
274        _enter_task(self._loop, self)
275        # Call either coro.throw(exc) or coro.send(None).
276        try:
277            if exc is None:
278                # We use the `send` method directly, because coroutines
279                # don't have `__iter__` and `__next__` methods.
280                result = coro.send(None)
281            else:
282                result = coro.throw(exc)
283        except StopIteration as exc:
284            if self._must_cancel:
285                # Task is cancelled right before coro stops.
286                self._must_cancel = False
287                super().cancel()
288            else:
289                super().set_result(exc.value)
290        except exceptions.CancelledError:
291            super().cancel()  # I.e., Future.cancel(self).
292        except (KeyboardInterrupt, SystemExit) as exc:
293            super().set_exception(exc)
294            raise
295        except BaseException as exc:
296            super().set_exception(exc)
297        else:
298            blocking = getattr(result, '_asyncio_future_blocking', None)
299            if blocking is not None:
300                # Yielded Future must come from Future.__iter__().
301                if futures._get_loop(result) is not self._loop:
302                    new_exc = RuntimeError(
303                        f'Task {self!r} got Future '
304                        f'{result!r} attached to a different loop')
305                    self._loop.call_soon(
306                        self.__step, new_exc, context=self._context)
307                elif blocking:
308                    if result is self:
309                        new_exc = RuntimeError(
310                            f'Task cannot await on itself: {self!r}')
311                        self._loop.call_soon(
312                            self.__step, new_exc, context=self._context)
313                    else:
314                        result._asyncio_future_blocking = False
315                        result.add_done_callback(
316                            self.__wakeup, context=self._context)
317                        self._fut_waiter = result
318                        if self._must_cancel:
319                            if self._fut_waiter.cancel():
320                                self._must_cancel = False
321                else:
322                    new_exc = RuntimeError(
323                        f'yield was used instead of yield from '
324                        f'in task {self!r} with {result!r}')
325                    self._loop.call_soon(
326                        self.__step, new_exc, context=self._context)
327
328            elif result is None:
329                # Bare yield relinquishes control for one event loop iteration.
330                self._loop.call_soon(self.__step, context=self._context)
331            elif inspect.isgenerator(result):
332                # Yielding a generator is just wrong.
333                new_exc = RuntimeError(
334                    f'yield was used instead of yield from for '
335                    f'generator in task {self!r} with {result!r}')
336                self._loop.call_soon(
337                    self.__step, new_exc, context=self._context)
338            else:
339                # Yielding something else is an error.
340                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
341                self._loop.call_soon(
342                    self.__step, new_exc, context=self._context)
343        finally:
344            _leave_task(self._loop, self)
345            self = None  # Needed to break cycles when an exception occurs.
346
347    def __wakeup(self, future):
348        try:
349            future.result()
350        except BaseException as exc:
351            # This may also be a cancellation.
352            self.__step(exc)
353        else:
354            # Don't pass the value of `future.result()` explicitly,
355            # as `Future.__iter__` and `Future.__await__` don't need it.
356            # If we call `_step(value, None)` instead of `_step()`,
357            # Python eval loop would use `.send(value)` method call,
358            # instead of `__next__()`, which is slower for futures
359            # that return non-generator iterators from their `__iter__`.
360            self.__step()
361        self = None  # Needed to break cycles when an exception occurs.
362
363
364_PyTask = Task
365
366
367try:
368    import _asyncio
369except ImportError:
370    pass
371else:
372    # _CTask is needed for tests.
373    Task = _CTask = _asyncio.Task
374
375
376def create_task(coro, *, name=None):
377    """Schedule the execution of a coroutine object in a spawn task.
378
379    Return a Task object.
380    """
381    loop = events.get_running_loop()
382    task = loop.create_task(coro)
383    _set_task_name(task, name)
384    return task
385
386
387# wait() and as_completed() similar to those in PEP 3148.
388
389FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
390FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
391ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
392
393
394async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
395    """Wait for the Futures and coroutines given by fs to complete.
396
397    The sequence futures must not be empty.
398
399    Coroutines will be wrapped in Tasks.
400
401    Returns two sets of Future: (done, pending).
402
403    Usage:
404
405        done, pending = await asyncio.wait(fs)
406
407    Note: This does not raise TimeoutError! Futures that aren't done
408    when the timeout occurs are returned in the second set.
409    """
410    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
411        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
412    if not fs:
413        raise ValueError('Set of coroutines/Futures is empty.')
414    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
415        raise ValueError(f'Invalid return_when value: {return_when}')
416
417    if loop is None:
418        loop = events.get_running_loop()
419    else:
420        warnings.warn("The loop argument is deprecated since Python 3.8, "
421                      "and scheduled for removal in Python 3.10.",
422                      DeprecationWarning, stacklevel=2)
423
424    fs = {ensure_future(f, loop=loop) for f in set(fs)}
425
426    return await _wait(fs, timeout, return_when, loop)
427
428
429def _release_waiter(waiter, *args):
430    if not waiter.done():
431        waiter.set_result(None)
432
433
434async def wait_for(fut, timeout, *, loop=None):
435    """Wait for the single Future or coroutine to complete, with timeout.
436
437    Coroutine will be wrapped in Task.
438
439    Returns result of the Future or coroutine.  When a timeout occurs,
440    it cancels the task and raises TimeoutError.  To avoid the task
441    cancellation, wrap it in shield().
442
443    If the wait is cancelled, the task is also cancelled.
444
445    This function is a coroutine.
446    """
447    if loop is None:
448        loop = events.get_running_loop()
449    else:
450        warnings.warn("The loop argument is deprecated since Python 3.8, "
451                      "and scheduled for removal in Python 3.10.",
452                      DeprecationWarning, stacklevel=2)
453
454    if timeout is None:
455        return await fut
456
457    if timeout <= 0:
458        fut = ensure_future(fut, loop=loop)
459
460        if fut.done():
461            return fut.result()
462
463        fut.cancel()
464        raise exceptions.TimeoutError()
465
466    waiter = loop.create_future()
467    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
468    cb = functools.partial(_release_waiter, waiter)
469
470    fut = ensure_future(fut, loop=loop)
471    fut.add_done_callback(cb)
472
473    try:
474        # wait until the future completes or the timeout
475        try:
476            await waiter
477        except exceptions.CancelledError:
478            fut.remove_done_callback(cb)
479            fut.cancel()
480            raise
481
482        if fut.done():
483            return fut.result()
484        else:
485            fut.remove_done_callback(cb)
486            # We must ensure that the task is not running
487            # after wait_for() returns.
488            # See https://bugs.python.org/issue32751
489            await _cancel_and_wait(fut, loop=loop)
490            raise exceptions.TimeoutError()
491    finally:
492        timeout_handle.cancel()
493
494
495async def _wait(fs, timeout, return_when, loop):
496    """Internal helper for wait().
497
498    The fs argument must be a collection of Futures.
499    """
500    assert fs, 'Set of Futures is empty.'
501    waiter = loop.create_future()
502    timeout_handle = None
503    if timeout is not None:
504        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
505    counter = len(fs)
506
507    def _on_completion(f):
508        nonlocal counter
509        counter -= 1
510        if (counter <= 0 or
511            return_when == FIRST_COMPLETED or
512            return_when == FIRST_EXCEPTION and (not f.cancelled() and
513                                                f.exception() is not None)):
514            if timeout_handle is not None:
515                timeout_handle.cancel()
516            if not waiter.done():
517                waiter.set_result(None)
518
519    for f in fs:
520        f.add_done_callback(_on_completion)
521
522    try:
523        await waiter
524    finally:
525        if timeout_handle is not None:
526            timeout_handle.cancel()
527        for f in fs:
528            f.remove_done_callback(_on_completion)
529
530    done, pending = set(), set()
531    for f in fs:
532        if f.done():
533            done.add(f)
534        else:
535            pending.add(f)
536    return done, pending
537
538
539async def _cancel_and_wait(fut, loop):
540    """Cancel the *fut* future or task and wait until it completes."""
541
542    waiter = loop.create_future()
543    cb = functools.partial(_release_waiter, waiter)
544    fut.add_done_callback(cb)
545
546    try:
547        fut.cancel()
548        # We cannot wait on *fut* directly to make
549        # sure _cancel_and_wait itself is reliably cancellable.
550        await waiter
551    finally:
552        fut.remove_done_callback(cb)
553
554
555# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
556def as_completed(fs, *, loop=None, timeout=None):
557    """Return an iterator whose values are coroutines.
558
559    When waiting for the yielded coroutines you'll get the results (or
560    exceptions!) of the original Futures (or coroutines), in the order
561    in which and as soon as they complete.
562
563    This differs from PEP 3148; the proper way to use this is:
564
565        for f in as_completed(fs):
566            result = await f  # The 'await' may raise.
567            # Use result.
568
569    If a timeout is specified, the 'await' will raise
570    TimeoutError when the timeout occurs before all Futures are done.
571
572    Note: The futures 'f' are not necessarily members of fs.
573    """
574    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
575        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
576
577    from .queues import Queue  # Import here to avoid circular import problem.
578    done = Queue(loop=loop)
579
580    if loop is None:
581        loop = events.get_event_loop()
582    else:
583        warnings.warn("The loop argument is deprecated since Python 3.8, "
584                      "and scheduled for removal in Python 3.10.",
585                      DeprecationWarning, stacklevel=2)
586    todo = {ensure_future(f, loop=loop) for f in set(fs)}
587    timeout_handle = None
588
589    def _on_timeout():
590        for f in todo:
591            f.remove_done_callback(_on_completion)
592            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
593        todo.clear()  # Can't do todo.remove(f) in the loop.
594
595    def _on_completion(f):
596        if not todo:
597            return  # _on_timeout() was here first.
598        todo.remove(f)
599        done.put_nowait(f)
600        if not todo and timeout_handle is not None:
601            timeout_handle.cancel()
602
603    async def _wait_for_one():
604        f = await done.get()
605        if f is None:
606            # Dummy value from _on_timeout().
607            raise exceptions.TimeoutError
608        return f.result()  # May raise f.exception().
609
610    for f in todo:
611        f.add_done_callback(_on_completion)
612    if todo and timeout is not None:
613        timeout_handle = loop.call_later(timeout, _on_timeout)
614    for _ in range(len(todo)):
615        yield _wait_for_one()
616
617
618@types.coroutine
619def __sleep0():
620    """Skip one event loop run cycle.
621
622    This is a private helper for 'asyncio.sleep()', used
623    when the 'delay' is set to 0.  It uses a bare 'yield'
624    expression (which Task.__step knows how to handle)
625    instead of creating a Future object.
626    """
627    yield
628
629
630async def sleep(delay, result=None, *, loop=None):
631    """Coroutine that completes after a given time (in seconds)."""
632    if delay <= 0:
633        await __sleep0()
634        return result
635
636    if loop is None:
637        loop = events.get_running_loop()
638    else:
639        warnings.warn("The loop argument is deprecated since Python 3.8, "
640                      "and scheduled for removal in Python 3.10.",
641                      DeprecationWarning, stacklevel=2)
642
643    future = loop.create_future()
644    h = loop.call_later(delay,
645                        futures._set_result_unless_cancelled,
646                        future, result)
647    try:
648        return await future
649    finally:
650        h.cancel()
651
652
653def ensure_future(coro_or_future, *, loop=None):
654    """Wrap a coroutine or an awaitable in a future.
655
656    If the argument is a Future, it is returned directly.
657    """
658    if coroutines.iscoroutine(coro_or_future):
659        if loop is None:
660            loop = events.get_event_loop()
661        task = loop.create_task(coro_or_future)
662        if task._source_traceback:
663            del task._source_traceback[-1]
664        return task
665    elif futures.isfuture(coro_or_future):
666        if loop is not None and loop is not futures._get_loop(coro_or_future):
667            raise ValueError('The future belongs to a different loop than '
668                             'the one specified as the loop argument')
669        return coro_or_future
670    elif inspect.isawaitable(coro_or_future):
671        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
672    else:
673        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
674                        'required')
675
676
677@types.coroutine
678def _wrap_awaitable(awaitable):
679    """Helper for asyncio.ensure_future().
680
681    Wraps awaitable (an object with __await__) into a coroutine
682    that will later be wrapped in a Task by ensure_future().
683    """
684    return (yield from awaitable.__await__())
685
686_wrap_awaitable._is_coroutine = _is_coroutine
687
688
689class _GatheringFuture(futures.Future):
690    """Helper for gather().
691
692    This overrides cancel() to cancel all the children and act more
693    like Task.cancel(), which doesn't immediately mark itself as
694    cancelled.
695    """
696
697    def __init__(self, children, *, loop=None):
698        super().__init__(loop=loop)
699        self._children = children
700        self._cancel_requested = False
701
702    def cancel(self):
703        if self.done():
704            return False
705        ret = False
706        for child in self._children:
707            if child.cancel():
708                ret = True
709        if ret:
710            # If any child tasks were actually cancelled, we should
711            # propagate the cancellation request regardless of
712            # *return_exceptions* argument.  See issue 32684.
713            self._cancel_requested = True
714        return ret
715
716
717def gather(*coros_or_futures, loop=None, return_exceptions=False):
718    """Return a future aggregating results from the given coroutines/futures.
719
720    Coroutines will be wrapped in a future and scheduled in the event
721    loop. They will not necessarily be scheduled in the same order as
722    passed in.
723
724    All futures must share the same event loop.  If all the tasks are
725    done successfully, the returned future's result is the list of
726    results (in the order of the original sequence, not necessarily
727    the order of results arrival).  If *return_exceptions* is True,
728    exceptions in the tasks are treated the same as successful
729    results, and gathered in the result list; otherwise, the first
730    raised exception will be immediately propagated to the returned
731    future.
732
733    Cancellation: if the outer Future is cancelled, all children (that
734    have not completed yet) are also cancelled.  If any child is
735    cancelled, this is treated as if it raised CancelledError --
736    the outer Future is *not* cancelled in this case.  (This is to
737    prevent the cancellation of one child to cause other children to
738    be cancelled.)
739
740    If *return_exceptions* is False, cancelling gather() after it
741    has been marked done won't cancel any submitted awaitables.
742    For instance, gather can be marked done after propagating an
743    exception to the caller, therefore, calling ``gather.cancel()``
744    after catching an exception (raised by one of the awaitables) from
745    gather won't cancel any other awaitables.
746    """
747    if not coros_or_futures:
748        if loop is None:
749            loop = events.get_event_loop()
750        else:
751            warnings.warn("The loop argument is deprecated since Python 3.8, "
752                          "and scheduled for removal in Python 3.10.",
753                          DeprecationWarning, stacklevel=2)
754        outer = loop.create_future()
755        outer.set_result([])
756        return outer
757
758    def _done_callback(fut):
759        nonlocal nfinished
760        nfinished += 1
761
762        if outer.done():
763            if not fut.cancelled():
764                # Mark exception retrieved.
765                fut.exception()
766            return
767
768        if not return_exceptions:
769            if fut.cancelled():
770                # Check if 'fut' is cancelled first, as
771                # 'fut.exception()' will *raise* a CancelledError
772                # instead of returning it.
773                exc = exceptions.CancelledError()
774                outer.set_exception(exc)
775                return
776            else:
777                exc = fut.exception()
778                if exc is not None:
779                    outer.set_exception(exc)
780                    return
781
782        if nfinished == nfuts:
783            # All futures are done; create a list of results
784            # and set it to the 'outer' future.
785            results = []
786
787            for fut in children:
788                if fut.cancelled():
789                    # Check if 'fut' is cancelled first, as
790                    # 'fut.exception()' will *raise* a CancelledError
791                    # instead of returning it.
792                    res = exceptions.CancelledError()
793                else:
794                    res = fut.exception()
795                    if res is None:
796                        res = fut.result()
797                results.append(res)
798
799            if outer._cancel_requested:
800                # If gather is being cancelled we must propagate the
801                # cancellation regardless of *return_exceptions* argument.
802                # See issue 32684.
803                outer.set_exception(exceptions.CancelledError())
804            else:
805                outer.set_result(results)
806
807    arg_to_fut = {}
808    children = []
809    nfuts = 0
810    nfinished = 0
811    for arg in coros_or_futures:
812        if arg not in arg_to_fut:
813            fut = ensure_future(arg, loop=loop)
814            if loop is None:
815                loop = futures._get_loop(fut)
816            if fut is not arg:
817                # 'arg' was not a Future, therefore, 'fut' is a new
818                # Future created specifically for 'arg'.  Since the caller
819                # can't control it, disable the "destroy pending task"
820                # warning.
821                fut._log_destroy_pending = False
822
823            nfuts += 1
824            arg_to_fut[arg] = fut
825            fut.add_done_callback(_done_callback)
826
827        else:
828            # There's a duplicate Future object in coros_or_futures.
829            fut = arg_to_fut[arg]
830
831        children.append(fut)
832
833    outer = _GatheringFuture(children, loop=loop)
834    return outer
835
836
837def shield(arg, *, loop=None):
838    """Wait for a future, shielding it from cancellation.
839
840    The statement
841
842        res = await shield(something())
843
844    is exactly equivalent to the statement
845
846        res = await something()
847
848    *except* that if the coroutine containing it is cancelled, the
849    task running in something() is not cancelled.  From the POV of
850    something(), the cancellation did not happen.  But its caller is
851    still cancelled, so the yield-from expression still raises
852    CancelledError.  Note: If something() is cancelled by other means
853    this will still cancel shield().
854
855    If you want to completely ignore cancellation (not recommended)
856    you can combine shield() with a try/except clause, as follows:
857
858        try:
859            res = await shield(something())
860        except CancelledError:
861            res = None
862    """
863    if loop is not None:
864        warnings.warn("The loop argument is deprecated since Python 3.8, "
865                      "and scheduled for removal in Python 3.10.",
866                      DeprecationWarning, stacklevel=2)
867    inner = ensure_future(arg, loop=loop)
868    if inner.done():
869        # Shortcut.
870        return inner
871    loop = futures._get_loop(inner)
872    outer = loop.create_future()
873
874    def _inner_done_callback(inner):
875        if outer.cancelled():
876            if not inner.cancelled():
877                # Mark inner's result as retrieved.
878                inner.exception()
879            return
880
881        if inner.cancelled():
882            outer.cancel()
883        else:
884            exc = inner.exception()
885            if exc is not None:
886                outer.set_exception(exc)
887            else:
888                outer.set_result(inner.result())
889
890
891    def _outer_done_callback(outer):
892        if not inner.done():
893            inner.remove_done_callback(_inner_done_callback)
894
895    inner.add_done_callback(_inner_done_callback)
896    outer.add_done_callback(_outer_done_callback)
897    return outer
898
899
900def run_coroutine_threadsafe(coro, loop):
901    """Submit a coroutine object to a given event loop.
902
903    Return a concurrent.futures.Future to access the result.
904    """
905    if not coroutines.iscoroutine(coro):
906        raise TypeError('A coroutine object is required')
907    future = concurrent.futures.Future()
908
909    def callback():
910        try:
911            futures._chain_future(ensure_future(coro, loop=loop), future)
912        except (SystemExit, KeyboardInterrupt):
913            raise
914        except BaseException as exc:
915            if future.set_running_or_notify_cancel():
916                future.set_exception(exc)
917            raise
918
919    loop.call_soon_threadsafe(callback)
920    return future
921
922
923# WeakSet containing all alive tasks.
924_all_tasks = weakref.WeakSet()
925
926# Dictionary containing tasks that are currently active in
927# all running event loops.  {EventLoop: Task}
928_current_tasks = {}
929
930
931def _register_task(task):
932    """Register a new task in asyncio as executed by loop."""
933    _all_tasks.add(task)
934
935
936def _enter_task(loop, task):
937    current_task = _current_tasks.get(loop)
938    if current_task is not None:
939        raise RuntimeError(f"Cannot enter into task {task!r} while another "
940                           f"task {current_task!r} is being executed.")
941    _current_tasks[loop] = task
942
943
944def _leave_task(loop, task):
945    current_task = _current_tasks.get(loop)
946    if current_task is not task:
947        raise RuntimeError(f"Leaving task {task!r} does not match "
948                           f"the current task {current_task!r}.")
949    del _current_tasks[loop]
950
951
952def _unregister_task(task):
953    """Unregister a task."""
954    _all_tasks.discard(task)
955
956
957_py_register_task = _register_task
958_py_unregister_task = _unregister_task
959_py_enter_task = _enter_task
960_py_leave_task = _leave_task
961
962
963try:
964    from _asyncio import (_register_task, _unregister_task,
965                          _enter_task, _leave_task,
966                          _all_tasks, _current_tasks)
967except ImportError:
968    pass
969else:
970    _c_register_task = _register_task
971    _c_unregister_task = _unregister_task
972    _c_enter_task = _enter_task
973    _c_leave_task = _leave_task
974