• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
10)
11
12import concurrent.futures
13import contextvars
14import functools
15import inspect
16import itertools
17import types
18import warnings
19import weakref
20from types import GenericAlias
21
22from . import base_tasks
23from . import coroutines
24from . import events
25from . import exceptions
26from . import futures
27from .coroutines import _is_coroutine
28
29# Helper to generate new task names
30# This uses itertools.count() instead of a "+= 1" operation because the latter
31# is not thread safe. See bpo-11866 for a longer explanation.
32_task_name_counter = itertools.count(1).__next__
33
34
35def current_task(loop=None):
36    """Return a currently executed task."""
37    if loop is None:
38        loop = events.get_running_loop()
39    return _current_tasks.get(loop)
40
41
42def all_tasks(loop=None):
43    """Return a set of all tasks for the loop."""
44    if loop is None:
45        loop = events.get_running_loop()
46    # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
47    # thread while we do so. Therefore we cast it to list prior to filtering. The list
48    # cast itself requires iteration, so we repeat it several times ignoring
49    # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
50    # details.
51    i = 0
52    while True:
53        try:
54            tasks = list(_all_tasks)
55        except RuntimeError:
56            i += 1
57            if i >= 1000:
58                raise
59        else:
60            break
61    return {t for t in tasks
62            if futures._get_loop(t) is loop and not t.done()}
63
64
65def _set_task_name(task, name):
66    if name is not None:
67        try:
68            set_name = task.set_name
69        except AttributeError:
70            pass
71        else:
72            set_name(name)
73
74
75class Task(futures._PyFuture):  # Inherit Python Task implementation
76                                # from a Python Future implementation.
77
78    """A coroutine wrapped in a Future."""
79
80    # An important invariant maintained while a Task not done:
81    #
82    # - Either _fut_waiter is None, and _step() is scheduled;
83    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
84    #
85    # The only transition from the latter to the former is through
86    # _wakeup().  When _fut_waiter is not None, one of its callbacks
87    # must be _wakeup().
88
89    # If False, don't log a message if the task is destroyed whereas its
90    # status is still pending
91    _log_destroy_pending = True
92
93    def __init__(self, coro, *, loop=None, name=None):
94        super().__init__(loop=loop)
95        if self._source_traceback:
96            del self._source_traceback[-1]
97        if not coroutines.iscoroutine(coro):
98            # raise after Future.__init__(), attrs are required for __del__
99            # prevent logging for pending task in __del__
100            self._log_destroy_pending = False
101            raise TypeError(f"a coroutine was expected, got {coro!r}")
102
103        if name is None:
104            self._name = f'Task-{_task_name_counter()}'
105        else:
106            self._name = str(name)
107
108        self._must_cancel = False
109        self._fut_waiter = None
110        self._coro = coro
111        self._context = contextvars.copy_context()
112
113        self._loop.call_soon(self.__step, context=self._context)
114        _register_task(self)
115
116    def __del__(self):
117        if self._state == futures._PENDING and self._log_destroy_pending:
118            context = {
119                'task': self,
120                'message': 'Task was destroyed but it is pending!',
121            }
122            if self._source_traceback:
123                context['source_traceback'] = self._source_traceback
124            self._loop.call_exception_handler(context)
125        super().__del__()
126
127    __class_getitem__ = classmethod(GenericAlias)
128
129    def _repr_info(self):
130        return base_tasks._task_repr_info(self)
131
132    def get_coro(self):
133        return self._coro
134
135    def get_name(self):
136        return self._name
137
138    def set_name(self, value):
139        self._name = str(value)
140
141    def set_result(self, result):
142        raise RuntimeError('Task does not support set_result operation')
143
144    def set_exception(self, exception):
145        raise RuntimeError('Task does not support set_exception operation')
146
147    def get_stack(self, *, limit=None):
148        """Return the list of stack frames for this task's coroutine.
149
150        If the coroutine is not done, this returns the stack where it is
151        suspended.  If the coroutine has completed successfully or was
152        cancelled, this returns an empty list.  If the coroutine was
153        terminated by an exception, this returns the list of traceback
154        frames.
155
156        The frames are always ordered from oldest to newest.
157
158        The optional limit gives the maximum number of frames to
159        return; by default all available frames are returned.  Its
160        meaning differs depending on whether a stack or a traceback is
161        returned: the newest frames of a stack are returned, but the
162        oldest frames of a traceback are returned.  (This matches the
163        behavior of the traceback module.)
164
165        For reasons beyond our control, only one stack frame is
166        returned for a suspended coroutine.
167        """
168        return base_tasks._task_get_stack(self, limit)
169
170    def print_stack(self, *, limit=None, file=None):
171        """Print the stack or traceback for this task's coroutine.
172
173        This produces output similar to that of the traceback module,
174        for the frames retrieved by get_stack().  The limit argument
175        is passed to get_stack().  The file argument is an I/O stream
176        to which the output is written; by default output is written
177        to sys.stderr.
178        """
179        return base_tasks._task_print_stack(self, limit, file)
180
181    def cancel(self, msg=None):
182        """Request that this task cancel itself.
183
184        This arranges for a CancelledError to be thrown into the
185        wrapped coroutine on the next cycle through the event loop.
186        The coroutine then has a chance to clean up or even deny
187        the request using try/except/finally.
188
189        Unlike Future.cancel, this does not guarantee that the
190        task will be cancelled: the exception might be caught and
191        acted upon, delaying cancellation of the task or preventing
192        cancellation completely.  The task may also return a value or
193        raise a different exception.
194
195        Immediately after this method is called, Task.cancelled() will
196        not return True (unless the task was already cancelled).  A
197        task will be marked as cancelled when the wrapped coroutine
198        terminates with a CancelledError exception (even if cancel()
199        was not called).
200        """
201        self._log_traceback = False
202        if self.done():
203            return False
204        if self._fut_waiter is not None:
205            if self._fut_waiter.cancel(msg=msg):
206                # Leave self._fut_waiter; it may be a Task that
207                # catches and ignores the cancellation so we may have
208                # to cancel it again later.
209                return True
210        # It must be the case that self.__step is already scheduled.
211        self._must_cancel = True
212        self._cancel_message = msg
213        return True
214
215    def __step(self, exc=None):
216        if self.done():
217            raise exceptions.InvalidStateError(
218                f'_step(): already done: {self!r}, {exc!r}')
219        if self._must_cancel:
220            if not isinstance(exc, exceptions.CancelledError):
221                exc = self._make_cancelled_error()
222            self._must_cancel = False
223        coro = self._coro
224        self._fut_waiter = None
225
226        _enter_task(self._loop, self)
227        # Call either coro.throw(exc) or coro.send(None).
228        try:
229            if exc is None:
230                # We use the `send` method directly, because coroutines
231                # don't have `__iter__` and `__next__` methods.
232                result = coro.send(None)
233            else:
234                result = coro.throw(exc)
235        except StopIteration as exc:
236            if self._must_cancel:
237                # Task is cancelled right before coro stops.
238                self._must_cancel = False
239                super().cancel(msg=self._cancel_message)
240            else:
241                super().set_result(exc.value)
242        except exceptions.CancelledError as exc:
243            # Save the original exception so we can chain it later.
244            self._cancelled_exc = exc
245            super().cancel()  # I.e., Future.cancel(self).
246        except (KeyboardInterrupt, SystemExit) as exc:
247            super().set_exception(exc)
248            raise
249        except BaseException as exc:
250            super().set_exception(exc)
251        else:
252            blocking = getattr(result, '_asyncio_future_blocking', None)
253            if blocking is not None:
254                # Yielded Future must come from Future.__iter__().
255                if futures._get_loop(result) is not self._loop:
256                    new_exc = RuntimeError(
257                        f'Task {self!r} got Future '
258                        f'{result!r} attached to a different loop')
259                    self._loop.call_soon(
260                        self.__step, new_exc, context=self._context)
261                elif blocking:
262                    if result is self:
263                        new_exc = RuntimeError(
264                            f'Task cannot await on itself: {self!r}')
265                        self._loop.call_soon(
266                            self.__step, new_exc, context=self._context)
267                    else:
268                        result._asyncio_future_blocking = False
269                        result.add_done_callback(
270                            self.__wakeup, context=self._context)
271                        self._fut_waiter = result
272                        if self._must_cancel:
273                            if self._fut_waiter.cancel(
274                                    msg=self._cancel_message):
275                                self._must_cancel = False
276                else:
277                    new_exc = RuntimeError(
278                        f'yield was used instead of yield from '
279                        f'in task {self!r} with {result!r}')
280                    self._loop.call_soon(
281                        self.__step, new_exc, context=self._context)
282
283            elif result is None:
284                # Bare yield relinquishes control for one event loop iteration.
285                self._loop.call_soon(self.__step, context=self._context)
286            elif inspect.isgenerator(result):
287                # Yielding a generator is just wrong.
288                new_exc = RuntimeError(
289                    f'yield was used instead of yield from for '
290                    f'generator in task {self!r} with {result!r}')
291                self._loop.call_soon(
292                    self.__step, new_exc, context=self._context)
293            else:
294                # Yielding something else is an error.
295                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
296                self._loop.call_soon(
297                    self.__step, new_exc, context=self._context)
298        finally:
299            _leave_task(self._loop, self)
300            self = None  # Needed to break cycles when an exception occurs.
301
302    def __wakeup(self, future):
303        try:
304            future.result()
305        except BaseException as exc:
306            # This may also be a cancellation.
307            self.__step(exc)
308        else:
309            # Don't pass the value of `future.result()` explicitly,
310            # as `Future.__iter__` and `Future.__await__` don't need it.
311            # If we call `_step(value, None)` instead of `_step()`,
312            # Python eval loop would use `.send(value)` method call,
313            # instead of `__next__()`, which is slower for futures
314            # that return non-generator iterators from their `__iter__`.
315            self.__step()
316        self = None  # Needed to break cycles when an exception occurs.
317
318
319_PyTask = Task
320
321
322try:
323    import _asyncio
324except ImportError:
325    pass
326else:
327    # _CTask is needed for tests.
328    Task = _CTask = _asyncio.Task
329
330
331def create_task(coro, *, name=None):
332    """Schedule the execution of a coroutine object in a spawn task.
333
334    Return a Task object.
335    """
336    loop = events.get_running_loop()
337    task = loop.create_task(coro)
338    _set_task_name(task, name)
339    return task
340
341
342# wait() and as_completed() similar to those in PEP 3148.
343
344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
347
348
349async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
350    """Wait for the Futures and coroutines given by fs to complete.
351
352    The fs iterable must not be empty.
353
354    Coroutines will be wrapped in Tasks.
355
356    Returns two sets of Future: (done, pending).
357
358    Usage:
359
360        done, pending = await asyncio.wait(fs)
361
362    Note: This does not raise TimeoutError! Futures that aren't done
363    when the timeout occurs are returned in the second set.
364    """
365    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
366        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
367    if not fs:
368        raise ValueError('Set of coroutines/Futures is empty.')
369    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
370        raise ValueError(f'Invalid return_when value: {return_when}')
371
372    loop = events.get_running_loop()
373
374    fs = set(fs)
375
376    if any(coroutines.iscoroutine(f) for f in fs):
377        warnings.warn("The explicit passing of coroutine objects to "
378                      "asyncio.wait() is deprecated since Python 3.8, and "
379                      "scheduled for removal in Python 3.11.",
380                      DeprecationWarning, stacklevel=2)
381
382    fs = {ensure_future(f, loop=loop) for f in fs}
383
384    return await _wait(fs, timeout, return_when, loop)
385
386
387def _release_waiter(waiter, *args):
388    if not waiter.done():
389        waiter.set_result(None)
390
391
392async def wait_for(fut, timeout):
393    """Wait for the single Future or coroutine to complete, with timeout.
394
395    Coroutine will be wrapped in Task.
396
397    Returns result of the Future or coroutine.  When a timeout occurs,
398    it cancels the task and raises TimeoutError.  To avoid the task
399    cancellation, wrap it in shield().
400
401    If the wait is cancelled, the task is also cancelled.
402
403    This function is a coroutine.
404    """
405    loop = events.get_running_loop()
406
407    if timeout is None:
408        return await fut
409
410    if timeout <= 0:
411        fut = ensure_future(fut, loop=loop)
412
413        if fut.done():
414            return fut.result()
415
416        await _cancel_and_wait(fut, loop=loop)
417        try:
418            return fut.result()
419        except exceptions.CancelledError as exc:
420            raise exceptions.TimeoutError() from exc
421
422    waiter = loop.create_future()
423    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
424    cb = functools.partial(_release_waiter, waiter)
425
426    fut = ensure_future(fut, loop=loop)
427    fut.add_done_callback(cb)
428
429    try:
430        # wait until the future completes or the timeout
431        try:
432            await waiter
433        except exceptions.CancelledError:
434            if fut.done():
435                return fut.result()
436            else:
437                fut.remove_done_callback(cb)
438                # We must ensure that the task is not running
439                # after wait_for() returns.
440                # See https://bugs.python.org/issue32751
441                await _cancel_and_wait(fut, loop=loop)
442                raise
443
444        if fut.done():
445            return fut.result()
446        else:
447            fut.remove_done_callback(cb)
448            # We must ensure that the task is not running
449            # after wait_for() returns.
450            # See https://bugs.python.org/issue32751
451            await _cancel_and_wait(fut, loop=loop)
452            # In case task cancellation failed with some
453            # exception, we should re-raise it
454            # See https://bugs.python.org/issue40607
455            try:
456                return fut.result()
457            except exceptions.CancelledError as exc:
458                raise exceptions.TimeoutError() from exc
459    finally:
460        timeout_handle.cancel()
461
462
463async def _wait(fs, timeout, return_when, loop):
464    """Internal helper for wait().
465
466    The fs argument must be a collection of Futures.
467    """
468    assert fs, 'Set of Futures is empty.'
469    waiter = loop.create_future()
470    timeout_handle = None
471    if timeout is not None:
472        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
473    counter = len(fs)
474
475    def _on_completion(f):
476        nonlocal counter
477        counter -= 1
478        if (counter <= 0 or
479            return_when == FIRST_COMPLETED or
480            return_when == FIRST_EXCEPTION and (not f.cancelled() and
481                                                f.exception() is not None)):
482            if timeout_handle is not None:
483                timeout_handle.cancel()
484            if not waiter.done():
485                waiter.set_result(None)
486
487    for f in fs:
488        f.add_done_callback(_on_completion)
489
490    try:
491        await waiter
492    finally:
493        if timeout_handle is not None:
494            timeout_handle.cancel()
495        for f in fs:
496            f.remove_done_callback(_on_completion)
497
498    done, pending = set(), set()
499    for f in fs:
500        if f.done():
501            done.add(f)
502        else:
503            pending.add(f)
504    return done, pending
505
506
507async def _cancel_and_wait(fut, loop):
508    """Cancel the *fut* future or task and wait until it completes."""
509
510    waiter = loop.create_future()
511    cb = functools.partial(_release_waiter, waiter)
512    fut.add_done_callback(cb)
513
514    try:
515        fut.cancel()
516        # We cannot wait on *fut* directly to make
517        # sure _cancel_and_wait itself is reliably cancellable.
518        await waiter
519    finally:
520        fut.remove_done_callback(cb)
521
522
523# This is *not* a @coroutine!  It is just an iterator (yielding Futures).
524def as_completed(fs, *, timeout=None):
525    """Return an iterator whose values are coroutines.
526
527    When waiting for the yielded coroutines you'll get the results (or
528    exceptions!) of the original Futures (or coroutines), in the order
529    in which and as soon as they complete.
530
531    This differs from PEP 3148; the proper way to use this is:
532
533        for f in as_completed(fs):
534            result = await f  # The 'await' may raise.
535            # Use result.
536
537    If a timeout is specified, the 'await' will raise
538    TimeoutError when the timeout occurs before all Futures are done.
539
540    Note: The futures 'f' are not necessarily members of fs.
541    """
542    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
543        raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
544
545    from .queues import Queue  # Import here to avoid circular import problem.
546    done = Queue()
547
548    loop = events._get_event_loop()
549    todo = {ensure_future(f, loop=loop) for f in set(fs)}
550    timeout_handle = None
551
552    def _on_timeout():
553        for f in todo:
554            f.remove_done_callback(_on_completion)
555            done.put_nowait(None)  # Queue a dummy value for _wait_for_one().
556        todo.clear()  # Can't do todo.remove(f) in the loop.
557
558    def _on_completion(f):
559        if not todo:
560            return  # _on_timeout() was here first.
561        todo.remove(f)
562        done.put_nowait(f)
563        if not todo and timeout_handle is not None:
564            timeout_handle.cancel()
565
566    async def _wait_for_one():
567        f = await done.get()
568        if f is None:
569            # Dummy value from _on_timeout().
570            raise exceptions.TimeoutError
571        return f.result()  # May raise f.exception().
572
573    for f in todo:
574        f.add_done_callback(_on_completion)
575    if todo and timeout is not None:
576        timeout_handle = loop.call_later(timeout, _on_timeout)
577    for _ in range(len(todo)):
578        yield _wait_for_one()
579
580
581@types.coroutine
582def __sleep0():
583    """Skip one event loop run cycle.
584
585    This is a private helper for 'asyncio.sleep()', used
586    when the 'delay' is set to 0.  It uses a bare 'yield'
587    expression (which Task.__step knows how to handle)
588    instead of creating a Future object.
589    """
590    yield
591
592
593async def sleep(delay, result=None):
594    """Coroutine that completes after a given time (in seconds)."""
595    if delay <= 0:
596        await __sleep0()
597        return result
598
599    loop = events.get_running_loop()
600    future = loop.create_future()
601    h = loop.call_later(delay,
602                        futures._set_result_unless_cancelled,
603                        future, result)
604    try:
605        return await future
606    finally:
607        h.cancel()
608
609
610def ensure_future(coro_or_future, *, loop=None):
611    """Wrap a coroutine or an awaitable in a future.
612
613    If the argument is a Future, it is returned directly.
614    """
615    return _ensure_future(coro_or_future, loop=loop)
616
617
618def _ensure_future(coro_or_future, *, loop=None):
619    if futures.isfuture(coro_or_future):
620        if loop is not None and loop is not futures._get_loop(coro_or_future):
621            raise ValueError('The future belongs to a different loop than '
622                            'the one specified as the loop argument')
623        return coro_or_future
624    called_wrap_awaitable = False
625    if not coroutines.iscoroutine(coro_or_future):
626        if inspect.isawaitable(coro_or_future):
627            coro_or_future = _wrap_awaitable(coro_or_future)
628            called_wrap_awaitable = True
629        else:
630            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
631                            'is required')
632
633    if loop is None:
634        loop = events._get_event_loop(stacklevel=4)
635    try:
636        return loop.create_task(coro_or_future)
637    except RuntimeError:
638        if not called_wrap_awaitable:
639            coro_or_future.close()
640        raise
641
642
643@types.coroutine
644def _wrap_awaitable(awaitable):
645    """Helper for asyncio.ensure_future().
646
647    Wraps awaitable (an object with __await__) into a coroutine
648    that will later be wrapped in a Task by ensure_future().
649    """
650    return (yield from awaitable.__await__())
651
652_wrap_awaitable._is_coroutine = _is_coroutine
653
654
655class _GatheringFuture(futures.Future):
656    """Helper for gather().
657
658    This overrides cancel() to cancel all the children and act more
659    like Task.cancel(), which doesn't immediately mark itself as
660    cancelled.
661    """
662
663    def __init__(self, children, *, loop):
664        assert loop is not None
665        super().__init__(loop=loop)
666        self._children = children
667        self._cancel_requested = False
668
669    def cancel(self, msg=None):
670        if self.done():
671            return False
672        ret = False
673        for child in self._children:
674            if child.cancel(msg=msg):
675                ret = True
676        if ret:
677            # If any child tasks were actually cancelled, we should
678            # propagate the cancellation request regardless of
679            # *return_exceptions* argument.  See issue 32684.
680            self._cancel_requested = True
681        return ret
682
683
684def gather(*coros_or_futures, return_exceptions=False):
685    """Return a future aggregating results from the given coroutines/futures.
686
687    Coroutines will be wrapped in a future and scheduled in the event
688    loop. They will not necessarily be scheduled in the same order as
689    passed in.
690
691    All futures must share the same event loop.  If all the tasks are
692    done successfully, the returned future's result is the list of
693    results (in the order of the original sequence, not necessarily
694    the order of results arrival).  If *return_exceptions* is True,
695    exceptions in the tasks are treated the same as successful
696    results, and gathered in the result list; otherwise, the first
697    raised exception will be immediately propagated to the returned
698    future.
699
700    Cancellation: if the outer Future is cancelled, all children (that
701    have not completed yet) are also cancelled.  If any child is
702    cancelled, this is treated as if it raised CancelledError --
703    the outer Future is *not* cancelled in this case.  (This is to
704    prevent the cancellation of one child to cause other children to
705    be cancelled.)
706
707    If *return_exceptions* is False, cancelling gather() after it
708    has been marked done won't cancel any submitted awaitables.
709    For instance, gather can be marked done after propagating an
710    exception to the caller, therefore, calling ``gather.cancel()``
711    after catching an exception (raised by one of the awaitables) from
712    gather won't cancel any other awaitables.
713    """
714    if not coros_or_futures:
715        loop = events._get_event_loop()
716        outer = loop.create_future()
717        outer.set_result([])
718        return outer
719
720    def _done_callback(fut):
721        nonlocal nfinished
722        nfinished += 1
723
724        if outer is None or outer.done():
725            if not fut.cancelled():
726                # Mark exception retrieved.
727                fut.exception()
728            return
729
730        if not return_exceptions:
731            if fut.cancelled():
732                # Check if 'fut' is cancelled first, as
733                # 'fut.exception()' will *raise* a CancelledError
734                # instead of returning it.
735                exc = fut._make_cancelled_error()
736                outer.set_exception(exc)
737                return
738            else:
739                exc = fut.exception()
740                if exc is not None:
741                    outer.set_exception(exc)
742                    return
743
744        if nfinished == nfuts:
745            # All futures are done; create a list of results
746            # and set it to the 'outer' future.
747            results = []
748
749            for fut in children:
750                if fut.cancelled():
751                    # Check if 'fut' is cancelled first, as 'fut.exception()'
752                    # will *raise* a CancelledError instead of returning it.
753                    # Also, since we're adding the exception return value
754                    # to 'results' instead of raising it, don't bother
755                    # setting __context__.  This also lets us preserve
756                    # calling '_make_cancelled_error()' at most once.
757                    res = exceptions.CancelledError(
758                        '' if fut._cancel_message is None else
759                        fut._cancel_message)
760                else:
761                    res = fut.exception()
762                    if res is None:
763                        res = fut.result()
764                results.append(res)
765
766            if outer._cancel_requested:
767                # If gather is being cancelled we must propagate the
768                # cancellation regardless of *return_exceptions* argument.
769                # See issue 32684.
770                exc = fut._make_cancelled_error()
771                outer.set_exception(exc)
772            else:
773                outer.set_result(results)
774
775    arg_to_fut = {}
776    children = []
777    nfuts = 0
778    nfinished = 0
779    loop = None
780    outer = None  # bpo-46672
781    for arg in coros_or_futures:
782        if arg not in arg_to_fut:
783            fut = _ensure_future(arg, loop=loop)
784            if loop is None:
785                loop = futures._get_loop(fut)
786            if fut is not arg:
787                # 'arg' was not a Future, therefore, 'fut' is a new
788                # Future created specifically for 'arg'.  Since the caller
789                # can't control it, disable the "destroy pending task"
790                # warning.
791                fut._log_destroy_pending = False
792
793            nfuts += 1
794            arg_to_fut[arg] = fut
795            fut.add_done_callback(_done_callback)
796
797        else:
798            # There's a duplicate Future object in coros_or_futures.
799            fut = arg_to_fut[arg]
800
801        children.append(fut)
802
803    outer = _GatheringFuture(children, loop=loop)
804    return outer
805
806
807def shield(arg):
808    """Wait for a future, shielding it from cancellation.
809
810    The statement
811
812        res = await shield(something())
813
814    is exactly equivalent to the statement
815
816        res = await something()
817
818    *except* that if the coroutine containing it is cancelled, the
819    task running in something() is not cancelled.  From the POV of
820    something(), the cancellation did not happen.  But its caller is
821    still cancelled, so the yield-from expression still raises
822    CancelledError.  Note: If something() is cancelled by other means
823    this will still cancel shield().
824
825    If you want to completely ignore cancellation (not recommended)
826    you can combine shield() with a try/except clause, as follows:
827
828        try:
829            res = await shield(something())
830        except CancelledError:
831            res = None
832    """
833    inner = _ensure_future(arg)
834    if inner.done():
835        # Shortcut.
836        return inner
837    loop = futures._get_loop(inner)
838    outer = loop.create_future()
839
840    def _inner_done_callback(inner):
841        if outer.cancelled():
842            if not inner.cancelled():
843                # Mark inner's result as retrieved.
844                inner.exception()
845            return
846
847        if inner.cancelled():
848            outer.cancel()
849        else:
850            exc = inner.exception()
851            if exc is not None:
852                outer.set_exception(exc)
853            else:
854                outer.set_result(inner.result())
855
856
857    def _outer_done_callback(outer):
858        if not inner.done():
859            inner.remove_done_callback(_inner_done_callback)
860
861    inner.add_done_callback(_inner_done_callback)
862    outer.add_done_callback(_outer_done_callback)
863    return outer
864
865
866def run_coroutine_threadsafe(coro, loop):
867    """Submit a coroutine object to a given event loop.
868
869    Return a concurrent.futures.Future to access the result.
870    """
871    if not coroutines.iscoroutine(coro):
872        raise TypeError('A coroutine object is required')
873    future = concurrent.futures.Future()
874
875    def callback():
876        try:
877            futures._chain_future(ensure_future(coro, loop=loop), future)
878        except (SystemExit, KeyboardInterrupt):
879            raise
880        except BaseException as exc:
881            if future.set_running_or_notify_cancel():
882                future.set_exception(exc)
883            raise
884
885    loop.call_soon_threadsafe(callback)
886    return future
887
888
889# WeakSet containing all alive tasks.
890_all_tasks = weakref.WeakSet()
891
892# Dictionary containing tasks that are currently active in
893# all running event loops.  {EventLoop: Task}
894_current_tasks = {}
895
896
897def _register_task(task):
898    """Register a new task in asyncio as executed by loop."""
899    _all_tasks.add(task)
900
901
902def _enter_task(loop, task):
903    current_task = _current_tasks.get(loop)
904    if current_task is not None:
905        raise RuntimeError(f"Cannot enter into task {task!r} while another "
906                           f"task {current_task!r} is being executed.")
907    _current_tasks[loop] = task
908
909
910def _leave_task(loop, task):
911    current_task = _current_tasks.get(loop)
912    if current_task is not task:
913        raise RuntimeError(f"Leaving task {task!r} does not match "
914                           f"the current task {current_task!r}.")
915    del _current_tasks[loop]
916
917
918def _unregister_task(task):
919    """Unregister a task."""
920    _all_tasks.discard(task)
921
922
923_py_register_task = _register_task
924_py_unregister_task = _unregister_task
925_py_enter_task = _enter_task
926_py_leave_task = _leave_task
927
928
929try:
930    from _asyncio import (_register_task, _unregister_task,
931                          _enter_task, _leave_task,
932                          _all_tasks, _current_tasks)
933except ImportError:
934    pass
935else:
936    _c_register_task = _register_task
937    _c_unregister_task = _unregister_task
938    _c_enter_task = _enter_task
939    _c_leave_task = _leave_task
940