• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20
21from . import base_tasks
22from . import coroutines
23from . import events
24from . import exceptions
25from . import futures
26from .coroutines import _is_coroutine
27
28# Helper to generate new task names
29# This uses itertools.count() instead of a "+= 1" operation because the latter
30# is not thread safe. See bpo-11866 for a longer explanation.
31_task_name_counter = itertools.count(1).__next__
32
33
34def current_task(loop=None):
35    """Return a currently executed task."""
36    if loop is None:
37        loop = events.get_running_loop()
38    return _current_tasks.get(loop)
39
40
41def all_tasks(loop=None):
42    """Return a set of all tasks for the loop."""
43    if loop is None:
44        loop = events.get_running_loop()
45    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
46    # thread while we do so. Therefore we cast it to list prior to filtering. The list
47    # cast itself requires iteration, so we repeat it several times ignoring
48    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
49    # details.
50    i = 0
51    while True:
52        try:
53            tasks = list(_all_tasks)
54        except RuntimeError:
55            i += 1
56            if i >= 1000:
57                raise
58        else:
59            break
60    return {t for t in tasks
61            if futures._get_loop(t) is loop and not t.done()}
62
63
64def _all_tasks_compat(loop=None):
65    # Different from "all_task()" by returning *all* Tasks, including
66    # the completed ones.  Used to implement deprecated "Tasks.all_task()"
67    # method.
68    if loop is None:
69        loop = events.get_event_loop()
70    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
71    # thread while we do so. Therefore we cast it to list prior to filtering. The list
72    # cast itself requires iteration, so we repeat it several times ignoring
73    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
74    # details.
75    i = 0
76    while True:
77        try:
78            tasks = list(_all_tasks)
79        except RuntimeError:
80            i += 1
81            if i >= 1000:
82                raise
83        else:
84            break
85    return {t for t in tasks if futures._get_loop(t) is loop}
86
87
88def _set_task_name(task, name):
89    if name is not None:
90        try:
91            set_name = task.set_name
92        except AttributeError:
93            pass
94        else:
95            set_name(name)
96
97
98class Task(futures._PyFuture):  # Inherit Python Task implementation
99                                # from a Python Future implementation.
100
101    """A coroutine wrapped in a Future."""
102
103    # An important invariant maintained while a Task not done:
104    #
105    # - Either _fut_waiter is None, and _step() is scheduled;
106    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
107    #
108    # The only transition from the latter to the former is through
109    # _wakeup().  When _fut_waiter is not None, one of its callbacks
110    # must be _wakeup().
111
112    # If False, don't log a message if the task is destroyed whereas its
113    # status is still pending
114    _log_destroy_pending = True
115
116    def __init__(self, coro, *, loop=None, name=None):
117        super().__init__(loop=loop)
118        if self._source_traceback:
119            del self._source_traceback[-1]
120        if not coroutines.iscoroutine(coro):
121            # raise after Future.__init__(), attrs are required for __del__
122            # prevent logging for pending task in __del__
123            self._log_destroy_pending = False
124            raise TypeError(f"a coroutine was expected, got {coro!r}")
125
126        if name is None:
127            self._name = f'Task-{_task_name_counter()}'
128        else:
129            self._name = str(name)
130
131        self._must_cancel = False
132        self._fut_waiter = None
133        self._coro = coro
134        self._context = contextvars.copy_context()
135
136        self._loop.call_soon(self.__step, context=self._context)
137        _register_task(self)
138
139    def __del__(self):
140        if self._state == futures._PENDING and self._log_destroy_pending:
141            context = {
142                'task': self,
143                'message': 'Task was destroyed but it is pending!',
144            }
145            if self._source_traceback:
146                context['source_traceback'] = self._source_traceback
147            self._loop.call_exception_handler(context)
148        super().__del__()
149
150    def __class_getitem__(cls, type):
151        return cls
152
153    def _repr_info(self):
154        return base_tasks._task_repr_info(self)
155
156    def get_coro(self):
157        return self._coro
158
159    def get_name(self):
160        return self._name
161
162    def set_name(self, value):
163        self._name = str(value)
164
165    def set_result(self, result):
166        raise RuntimeError('Task does not support set_result operation')
167
168    def set_exception(self, exception):
169        raise RuntimeError('Task does not support set_exception operation')
170
171    def get_stack(self, *, limit=None):
172        """Return the list of stack frames for this task's coroutine.
173
174        If the coroutine is not done, this returns the stack where it is
175        suspended.  If the coroutine has completed successfully or was
176        cancelled, this returns an empty list.  If the coroutine was
177        terminated by an exception, this returns the list of traceback
178        frames.
179
180        The frames are always ordered from oldest to newest.
181
182        The optional limit gives the maximum number of frames to
183        return; by default all available frames are returned.  Its
184        meaning differs depending on whether a stack or a traceback is
185        returned: the newest frames of a stack are returned, but the
186        oldest frames of a traceback are returned.  (This matches the
187        behavior of the traceback module.)
188
189        For reasons beyond our control, only one stack frame is
190        returned for a suspended coroutine.
191        """
192        return base_tasks._task_get_stack(self, limit)
193
194    def print_stack(self, *, limit=None, file=None):
195        """Print the stack or traceback for this task's coroutine.
196
197        This produces output similar to that of the traceback module,
198        for the frames retrieved by get_stack().  The limit argument
199        is passed to get_stack().  The file argument is an I/O stream
200        to which the output is written; by default output is written
201        to sys.stderr.
202        """
203        return base_tasks._task_print_stack(self, limit, file)
204
205    def cancel(self, msg=None):
206        """Request that this task cancel itself.
207
208        This arranges for a CancelledError to be thrown into the
209        wrapped coroutine on the next cycle through the event loop.
210        The coroutine then has a chance to clean up or even deny
211        the request using try/except/finally.
212
213        Unlike Future.cancel, this does not guarantee that the
214        task will be cancelled: the exception might be caught and
215        acted upon, delaying cancellation of the task or preventing
216        cancellation completely.  The task may also return a value or
217        raise a different exception.
218
219        Immediately after this method is called, Task.cancelled() will
220        not return True (unless the task was already cancelled).  A
221        task will be marked as cancelled when the wrapped coroutine
222        terminates with a CancelledError exception (even if cancel()
223        was not called).
224        """
225        self._log_traceback = False
226        if self.done():
227            return False
228        if self._fut_waiter is not None:
229            if self._fut_waiter.cancel(msg=msg):
230                # Leave self._fut_waiter; it may be a Task that
231                # catches and ignores the cancellation so we may have
232                # to cancel it again later.
233                return True
234        # It must be the case that self.__step is already scheduled.
235        self._must_cancel = True
236        self._cancel_message = msg
237        return True
238
239    def __step(self, exc=None):
240        if self.done():
241            raise exceptions.InvalidStateError(
242                f'_step(): already done: {self!r}, {exc!r}')
243        if self._must_cancel:
244            if not isinstance(exc, exceptions.CancelledError):
245                exc = self._make_cancelled_error()
246            self._must_cancel = False
247        coro = self._coro
248        self._fut_waiter = None
249
250        _enter_task(self._loop, self)
251        # Call either coro.throw(exc) or coro.send(None).
252        try:
253            if exc is None:
254                # We use the `send` method directly, because coroutines
255                # don't have `__iter__` and `__next__` methods.
256                result = coro.send(None)
257            else:
258                result = coro.throw(exc)
259        except StopIteration as exc:
260            if self._must_cancel:
261                # Task is cancelled right before coro stops.
262                self._must_cancel = False
263                super().cancel(msg=self._cancel_message)
264            else:
265                super().set_result(exc.value)
266        except exceptions.CancelledError as exc:
267            # Save the original exception so we can chain it later.
268            self._cancelled_exc = exc
269            super().cancel()  # I.e., Future.cancel(self).
270        except (KeyboardInterrupt, SystemExit) as exc:
271            super().set_exception(exc)
272            raise
273        except BaseException as exc:
274            super().set_exception(exc)
275        else:
276            blocking = getattr(result, '_asyncio_future_blocking', None)
277            if blocking is not None:
278                # Yielded Future must come from Future.__iter__().
279                if futures._get_loop(result) is not self._loop:
280                    new_exc = RuntimeError(
281                        f'Task {self!r} got Future '
282                        f'{result!r} attached to a different loop')
283                    self._loop.call_soon(
284                        self.__step, new_exc, context=self._context)
285                elif blocking:
286                    if result is self:
287                        new_exc = RuntimeError(
288                            f'Task cannot await on itself: {self!r}')
289                        self._loop.call_soon(
290                            self.__step, new_exc, context=self._context)
291                    else:
292                        result._asyncio_future_blocking = False
293                        result.add_done_callback(
294                            self.__wakeup, context=self._context)
295                        self._fut_waiter = result
296                        if self._must_cancel:
297                            if self._fut_waiter.cancel(
298                                    msg=self._cancel_message):
299                                self._must_cancel = False
300                else:
301                    new_exc = RuntimeError(
302                        f'yield was used instead of yield from '
303                        f'in task {self!r} with {result!r}')
304                    self._loop.call_soon(
305                        self.__step, new_exc, context=self._context)
306
307            elif result is None:
308                # Bare yield relinquishes control for one event loop iteration.
309                self._loop.call_soon(self.__step, context=self._context)
310            elif inspect.isgenerator(result):
311                # Yielding a generator is just wrong.
312                new_exc = RuntimeError(
313                    f'yield was used instead of yield from for '
314                    f'generator in task {self!r} with {result!r}')
315                self._loop.call_soon(
316                    self.__step, new_exc, context=self._context)
317            else:
318                # Yielding something else is an error.
319                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
320                self._loop.call_soon(
321                    self.__step, new_exc, context=self._context)
322        finally:
323            _leave_task(self._loop, self)
324            self = None  # Needed to break cycles when an exception occurs.
325
326    def __wakeup(self, future):
327        try:
328            future.result()
329        except BaseException as exc:
330            # This may also be a cancellation.
331            self.__step(exc)
332        else:
333            # Don't pass the value of `future.result()` explicitly,
334            # as `Future.__iter__` and `Future.__await__` don't need it.
335            # If we call `_step(value, None)` instead of `_step()`,
336            # Python eval loop would use `.send(value)` method call,
337            # instead of `__next__()`, which is slower for futures
338            # that return non-generator iterators from their `__iter__`.
339            self.__step()
340        self = None  # Needed to break cycles when an exception occurs.
341
342
343_PyTask = Task
344
345
346try:
347    import _asyncio
348except ImportError:
349    pass
350else:
351    # _CTask is needed for tests.
352    Task = _CTask = _asyncio.Task
353
354
355def create_task(coro, *, name=None):
356    """Schedule the execution of a coroutine object in a spawn task.
357
358    Return a Task object.
359    """
360    loop = events.get_running_loop()
361    task = loop.create_task(coro)
362    _set_task_name(task, name)
363    return task
364
365
366# wait() and as_completed() similar to those in PEP 3148.
367
368FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
369FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
370ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
371
372
373async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
374    """Wait for the Futures and coroutines given by fs to complete.
375
376    The fs iterable must not be empty.
377
378    Coroutines will be wrapped in Tasks.
379
380    Returns two sets of Future: (done, pending).
381
382    Usage:
383
384        done, pending = await asyncio.wait(fs)
385
386    Note: This does not raise TimeoutError! Futures that aren't done
387    when the timeout occurs are returned in the second set.
388    """
389    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
390        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
391    if not fs:
392        raise ValueError('Set of coroutines/Futures is empty.')
393    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
394        raise ValueError(f'Invalid return_when value: {return_when}')
395
396    if loop is None:
397        loop = events.get_running_loop()
398    else:
399        warnings.warn("The loop argument is deprecated since Python 3.8, "
400                      "and scheduled for removal in Python 3.10.",
401                      DeprecationWarning, stacklevel=2)
402
403    fs = set(fs)
404
405    if any(coroutines.iscoroutine(f) for f in fs):
406        warnings.warn("The explicit passing of coroutine objects to "
407                      "asyncio.wait() is deprecated since Python 3.8, and "
408                      "scheduled for removal in Python 3.11.",
409                      DeprecationWarning, stacklevel=2)
410
411    fs = {ensure_future(f, loop=loop) for f in fs}
412
413    return await _wait(fs, timeout, return_when, loop)
414
415
416def _release_waiter(waiter, *args):
417    if not waiter.done():
418        waiter.set_result(None)
419
420
421async def wait_for(fut, timeout, *, loop=None):
422    """Wait for the single Future or coroutine to complete, with timeout.
423
424    Coroutine will be wrapped in Task.
425
426    Returns result of the Future or coroutine.  When a timeout occurs,
427    it cancels the task and raises TimeoutError.  To avoid the task
428    cancellation, wrap it in shield().
429
430    If the wait is cancelled, the task is also cancelled.
431
432    This function is a coroutine.
433    """
434    if loop is None:
435        loop = events.get_running_loop()
436    else:
437        warnings.warn("The loop argument is deprecated since Python 3.8, "
438                      "and scheduled for removal in Python 3.10.",
439                      DeprecationWarning, stacklevel=2)
440
441    if timeout is None:
442        return await fut
443
444    if timeout <= 0:
445        fut = ensure_future(fut, loop=loop)
446
447        if fut.done():
448            return fut.result()
449
450        await _cancel_and_wait(fut, loop=loop)
451        try:
452            fut.result()
453        except exceptions.CancelledError as exc:
454            raise exceptions.TimeoutError() from exc
455        else:
456            raise exceptions.TimeoutError()
457
458    waiter = loop.create_future()
459    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
460    cb = functools.partial(_release_waiter, waiter)
461
462    fut = ensure_future(fut, loop=loop)
463    fut.add_done_callback(cb)
464
465    try:
466        # wait until the future completes or the timeout
467        try:
468            await waiter
469        except exceptions.CancelledError:
470            if fut.done():
471                return fut.result()
472            else:
473                fut.remove_done_callback(cb)
474                # We must ensure that the task is not running
475                # after wait_for() returns.
476                # See https://bugs.python.org/issue32751
477                await _cancel_and_wait(fut, loop=loop)
478                raise
479
480        if fut.done():
481            return fut.result()
482        else:
483            fut.remove_done_callback(cb)
484            # We must ensure that the task is not running
485            # after wait_for() returns.
486            # See https://bugs.python.org/issue32751
487            await _cancel_and_wait(fut, loop=loop)
488            # In case task cancellation failed with some
489            # exception, we should re-raise it
490            # See https://bugs.python.org/issue40607
491            try:
492                fut.result()
493            except exceptions.CancelledError as exc:
494                raise exceptions.TimeoutError() from exc
495            else:
496                raise exceptions.TimeoutError()
497    finally:
498        timeout_handle.cancel()
499
500
501async def _wait(fs, timeout, return_when, loop):
502    """Internal helper for wait().
503
504    The fs argument must be a collection of Futures.
505    """
506    assert fs, 'Set of Futures is empty.'
507    waiter = loop.create_future()
508    timeout_handle = None
509    if timeout is not None:
510        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
511    counter = len(fs)
512
513    def _on_completion(f):
514        nonlocal counter
515        counter -= 1
516        if (counter <= 0 or
517            return_when == FIRST_COMPLETED or
518            return_when == FIRST_EXCEPTION and (not f.cancelled() and
519                                                f.exception() is not None)):
520            if timeout_handle is not None:
521                timeout_handle.cancel()
522            if not waiter.done():
523                waiter.set_result(None)
524
525    for f in fs:
526        f.add_done_callback(_on_completion)
527
528    try:
529        await waiter
530    finally:
531        if timeout_handle is not None:
532            timeout_handle.cancel()
533        for f in fs:
534            f.remove_done_callback(_on_completion)
535
536    done, pending = set(), set()
537    for f in fs:
538        if f.done():
539            done.add(f)
540        else:
541            pending.add(f)
542    return done, pending
543
544
545async def _cancel_and_wait(fut, loop):
546    """Cancel the *fut* future or task and wait until it completes."""
547
548    waiter = loop.create_future()
549    cb = functools.partial(_release_waiter, waiter)
550    fut.add_done_callback(cb)
551
552    try:
553        fut.cancel()
554        # We cannot wait on *fut* directly to make
555        # sure _cancel_and_wait itself is reliably cancellable.
556        await waiter
557    finally:
558        fut.remove_done_callback(cb)
559
560
561# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
562def as_completed(fs, *, loop=None, timeout=None):
563    """Return an iterator whose values are coroutines.
564
565    When waiting for the yielded coroutines you'll get the results (or
566    exceptions!) of the original Futures (or coroutines), in the order
567    in which and as soon as they complete.
568
569    This differs from PEP 3148; the proper way to use this is:
570
571        for f in as_completed(fs):
572            result = await f  # The 'await' may raise.
573            # Use result.
574
575    If a timeout is specified, the 'await' will raise
576    TimeoutError when the timeout occurs before all Futures are done.
577
578    Note: The futures 'f' are not necessarily members of fs.
579    """
580    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
581        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
582
583    from .queues import Queue  # Import here to avoid circular import problem.
584    done = Queue(loop=loop)
585
586    if loop is None:
587        loop = events.get_event_loop()
588    else:
589        warnings.warn("The loop argument is deprecated since Python 3.8, "
590                      "and scheduled for removal in Python 3.10.",
591                      DeprecationWarning, stacklevel=2)
592    todo = {ensure_future(f, loop=loop) for f in set(fs)}
593    timeout_handle = None
594
595    def _on_timeout():
596        for f in todo:
597            f.remove_done_callback(_on_completion)
598            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
599        todo.clear()  # Can't do todo.remove(f) in the loop.
600
601    def _on_completion(f):
602        if not todo:
603            return  # _on_timeout() was here first.
604        todo.remove(f)
605        done.put_nowait(f)
606        if not todo and timeout_handle is not None:
607            timeout_handle.cancel()
608
609    async def _wait_for_one():
610        f = await done.get()
611        if f is None:
612            # Dummy value from _on_timeout().
613            raise exceptions.TimeoutError
614        return f.result()  # May raise f.exception().
615
616    for f in todo:
617        f.add_done_callback(_on_completion)
618    if todo and timeout is not None:
619        timeout_handle = loop.call_later(timeout, _on_timeout)
620    for _ in range(len(todo)):
621        yield _wait_for_one()
622
623
624@types.coroutine
625def __sleep0():
626    """Skip one event loop run cycle.
627
628    This is a private helper for 'asyncio.sleep()', used
629    when the 'delay' is set to 0.  It uses a bare 'yield'
630    expression (which Task.__step knows how to handle)
631    instead of creating a Future object.
632    """
633    yield
634
635
636async def sleep(delay, result=None, *, loop=None):
637    """Coroutine that completes after a given time (in seconds)."""
638    if delay <= 0:
639        await __sleep0()
640        return result
641
642    if loop is None:
643        loop = events.get_running_loop()
644    else:
645        warnings.warn("The loop argument is deprecated since Python 3.8, "
646                      "and scheduled for removal in Python 3.10.",
647                      DeprecationWarning, stacklevel=2)
648
649    future = loop.create_future()
650    h = loop.call_later(delay,
651                        futures._set_result_unless_cancelled,
652                        future, result)
653    try:
654        return await future
655    finally:
656        h.cancel()
657
658
659def ensure_future(coro_or_future, *, loop=None):
660    """Wrap a coroutine or an awaitable in a future.
661
662    If the argument is a Future, it is returned directly.
663    """
664    if coroutines.iscoroutine(coro_or_future):
665        if loop is None:
666            loop = events.get_event_loop()
667        task = loop.create_task(coro_or_future)
668        if task._source_traceback:
669            del task._source_traceback[-1]
670        return task
671    elif futures.isfuture(coro_or_future):
672        if loop is not None and loop is not futures._get_loop(coro_or_future):
673            raise ValueError('The future belongs to a different loop than '
674                             'the one specified as the loop argument')
675        return coro_or_future
676    elif inspect.isawaitable(coro_or_future):
677        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
678    else:
679        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
680                        'required')
681
682
683@types.coroutine
684def _wrap_awaitable(awaitable):
685    """Helper for asyncio.ensure_future().
686
687    Wraps awaitable (an object with __await__) into a coroutine
688    that will later be wrapped in a Task by ensure_future().
689    """
690    return (yield from awaitable.__await__())
691
692_wrap_awaitable._is_coroutine = _is_coroutine
693
694
695class _GatheringFuture(futures.Future):
696    """Helper for gather().
697
698    This overrides cancel() to cancel all the children and act more
699    like Task.cancel(), which doesn't immediately mark itself as
700    cancelled.
701    """
702
703    def __init__(self, children, *, loop=None):
704        super().__init__(loop=loop)
705        self._children = children
706        self._cancel_requested = False
707
708    def cancel(self, msg=None):
709        if self.done():
710            return False
711        ret = False
712        for child in self._children:
713            if child.cancel(msg=msg):
714                ret = True
715        if ret:
716            # If any child tasks were actually cancelled, we should
717            # propagate the cancellation request regardless of
718            # *return_exceptions* argument.  See issue 32684.
719            self._cancel_requested = True
720        return ret
721
722
723def gather(*coros_or_futures, loop=None, return_exceptions=False):
724    """Return a future aggregating results from the given coroutines/futures.
725
726    Coroutines will be wrapped in a future and scheduled in the event
727    loop. They will not necessarily be scheduled in the same order as
728    passed in.
729
730    All futures must share the same event loop.  If all the tasks are
731    done successfully, the returned future's result is the list of
732    results (in the order of the original sequence, not necessarily
733    the order of results arrival).  If *return_exceptions* is True,
734    exceptions in the tasks are treated the same as successful
735    results, and gathered in the result list; otherwise, the first
736    raised exception will be immediately propagated to the returned
737    future.
738
739    Cancellation: if the outer Future is cancelled, all children (that
740    have not completed yet) are also cancelled.  If any child is
741    cancelled, this is treated as if it raised CancelledError --
742    the outer Future is *not* cancelled in this case.  (This is to
743    prevent the cancellation of one child to cause other children to
744    be cancelled.)
745
746    If *return_exceptions* is False, cancelling gather() after it
747    has been marked done won't cancel any submitted awaitables.
748    For instance, gather can be marked done after propagating an
749    exception to the caller, therefore, calling ``gather.cancel()``
750    after catching an exception (raised by one of the awaitables) from
751    gather won't cancel any other awaitables.
752    """
753    if not coros_or_futures:
754        if loop is None:
755            loop = events.get_event_loop()
756        else:
757            warnings.warn("The loop argument is deprecated since Python 3.8, "
758                          "and scheduled for removal in Python 3.10.",
759                          DeprecationWarning, stacklevel=2)
760        outer = loop.create_future()
761        outer.set_result([])
762        return outer
763
764    def _done_callback(fut):
765        nonlocal nfinished
766        nfinished += 1
767
768        if outer.done():
769            if not fut.cancelled():
770                # Mark exception retrieved.
771                fut.exception()
772            return
773
774        if not return_exceptions:
775            if fut.cancelled():
776                # Check if 'fut' is cancelled first, as
777                # 'fut.exception()' will *raise* a CancelledError
778                # instead of returning it.
779                exc = fut._make_cancelled_error()
780                outer.set_exception(exc)
781                return
782            else:
783                exc = fut.exception()
784                if exc is not None:
785                    outer.set_exception(exc)
786                    return
787
788        if nfinished == nfuts:
789            # All futures are done; create a list of results
790            # and set it to the 'outer' future.
791            results = []
792
793            for fut in children:
794                if fut.cancelled():
795                    # Check if 'fut' is cancelled first, as 'fut.exception()'
796                    # will *raise* a CancelledError instead of returning it.
797                    # Also, since we're adding the exception return value
798                    # to 'results' instead of raising it, don't bother
799                    # setting __context__.  This also lets us preserve
800                    # calling '_make_cancelled_error()' at most once.
801                    res = exceptions.CancelledError(
802                        '' if fut._cancel_message is None else
803                        fut._cancel_message)
804                else:
805                    res = fut.exception()
806                    if res is None:
807                        res = fut.result()
808                results.append(res)
809
810            if outer._cancel_requested:
811                # If gather is being cancelled we must propagate the
812                # cancellation regardless of *return_exceptions* argument.
813                # See issue 32684.
814                exc = fut._make_cancelled_error()
815                outer.set_exception(exc)
816            else:
817                outer.set_result(results)
818
819    arg_to_fut = {}
820    children = []
821    nfuts = 0
822    nfinished = 0
823    for arg in coros_or_futures:
824        if arg not in arg_to_fut:
825            fut = ensure_future(arg, loop=loop)
826            if loop is None:
827                loop = futures._get_loop(fut)
828            if fut is not arg:
829                # 'arg' was not a Future, therefore, 'fut' is a new
830                # Future created specifically for 'arg'.  Since the caller
831                # can't control it, disable the "destroy pending task"
832                # warning.
833                fut._log_destroy_pending = False
834
835            nfuts += 1
836            arg_to_fut[arg] = fut
837            fut.add_done_callback(_done_callback)
838
839        else:
840            # There's a duplicate Future object in coros_or_futures.
841            fut = arg_to_fut[arg]
842
843        children.append(fut)
844
845    outer = _GatheringFuture(children, loop=loop)
846    return outer
847
848
849def shield(arg, *, loop=None):
850    """Wait for a future, shielding it from cancellation.
851
852    The statement
853
854        res = await shield(something())
855
856    is exactly equivalent to the statement
857
858        res = await something()
859
860    *except* that if the coroutine containing it is cancelled, the
861    task running in something() is not cancelled.  From the POV of
862    something(), the cancellation did not happen.  But its caller is
863    still cancelled, so the yield-from expression still raises
864    CancelledError.  Note: If something() is cancelled by other means
865    this will still cancel shield().
866
867    If you want to completely ignore cancellation (not recommended)
868    you can combine shield() with a try/except clause, as follows:
869
870        try:
871            res = await shield(something())
872        except CancelledError:
873            res = None
874    """
875    if loop is not None:
876        warnings.warn("The loop argument is deprecated since Python 3.8, "
877                      "and scheduled for removal in Python 3.10.",
878                      DeprecationWarning, stacklevel=2)
879    inner = ensure_future(arg, loop=loop)
880    if inner.done():
881        # Shortcut.
882        return inner
883    loop = futures._get_loop(inner)
884    outer = loop.create_future()
885
886    def _inner_done_callback(inner):
887        if outer.cancelled():
888            if not inner.cancelled():
889                # Mark inner's result as retrieved.
890                inner.exception()
891            return
892
893        if inner.cancelled():
894            outer.cancel()
895        else:
896            exc = inner.exception()
897            if exc is not None:
898                outer.set_exception(exc)
899            else:
900                outer.set_result(inner.result())
901
902
903    def _outer_done_callback(outer):
904        if not inner.done():
905            inner.remove_done_callback(_inner_done_callback)
906
907    inner.add_done_callback(_inner_done_callback)
908    outer.add_done_callback(_outer_done_callback)
909    return outer
910
911
912def run_coroutine_threadsafe(coro, loop):
913    """Submit a coroutine object to a given event loop.
914
915    Return a concurrent.futures.Future to access the result.
916    """
917    if not coroutines.iscoroutine(coro):
918        raise TypeError('A coroutine object is required')
919    future = concurrent.futures.Future()
920
921    def callback():
922        try:
923            futures._chain_future(ensure_future(coro, loop=loop), future)
924        except (SystemExit, KeyboardInterrupt):
925            raise
926        except BaseException as exc:
927            if future.set_running_or_notify_cancel():
928                future.set_exception(exc)
929            raise
930
931    loop.call_soon_threadsafe(callback)
932    return future
933
934
935# WeakSet containing all alive tasks.
936_all_tasks = weakref.WeakSet()
937
938# Dictionary containing tasks that are currently active in
939# all running event loops.  {EventLoop: Task}
940_current_tasks = {}
941
942
943def _register_task(task):
944    """Register a new task in asyncio as executed by loop."""
945    _all_tasks.add(task)
946
947
948def _enter_task(loop, task):
949    current_task = _current_tasks.get(loop)
950    if current_task is not None:
951        raise RuntimeError(f"Cannot enter into task {task!r} while another "
952                           f"task {current_task!r} is being executed.")
953    _current_tasks[loop] = task
954
955
956def _leave_task(loop, task):
957    current_task = _current_tasks.get(loop)
958    if current_task is not task:
959        raise RuntimeError(f"Leaving task {task!r} does not match "
960                           f"the current task {current_task!r}.")
961    del _current_tasks[loop]
962
963
964def _unregister_task(task):
965    """Unregister a task."""
966    _all_tasks.discard(task)
967
968
969_py_register_task = _register_task
970_py_unregister_task = _unregister_task
971_py_enter_task = _enter_task
972_py_leave_task = _leave_task
973
974
975try:
976    from _asyncio import (_register_task, _unregister_task,
977                          _enter_task, _leave_task,
978                          _all_tasks, _current_tasks)
979except ImportError:
980    pass
981else:
982    _c_register_task = _register_task
983    _c_unregister_task = _unregister_task
984    _c_enter_task = _enter_task
985    _c_leave_task = _leave_task
986