• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Support for tasks, coroutines and the scheduler."""
2
3__all__ = (
4    'Task', 'create_task',
5    'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
6    'wait', 'wait_for', 'as_completed', 'sleep',
7    'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
8    'current_task', 'all_tasks',
9    'create_eager_task_factory', 'eager_task_factory',
10    '_register_task', '_unregister_task', '_enter_task', '_leave_task',
11)
12
13import concurrent.futures
14import contextvars
15import functools
16import inspect
17import itertools
18import math
19import types
20import weakref
21from types import GenericAlias
22
23from . import base_tasks
24from . import coroutines
25from . import events
26from . import exceptions
27from . import futures
28from . import queues
29from . import timeouts
30
31# Helper to generate new task names
32# This uses itertools.count() instead of a "+= 1" operation because the latter
33# is not thread safe. See bpo-11866 for a longer explanation.
34_task_name_counter = itertools.count(1).__next__
35
36
37def current_task(loop=None):
38    """Return a currently executed task."""
39    if loop is None:
40        loop = events.get_running_loop()
41    return _current_tasks.get(loop)
42
43
44def all_tasks(loop=None):
45    """Return a set of all tasks for the loop."""
46    if loop is None:
47        loop = events.get_running_loop()
48    # capturing the set of eager tasks first, so if an eager task "graduates"
49    # to a regular task in another thread, we don't risk missing it.
50    eager_tasks = list(_eager_tasks)
51    # Looping over the WeakSet isn't safe as it can be updated from another
52    # thread, therefore we cast it to list prior to filtering. The list cast
53    # itself requires iteration, so we repeat it several times ignoring
54    # RuntimeErrors (which are not very likely to occur).
55    # See issues 34970 and 36607 for details.
56    scheduled_tasks = None
57    i = 0
58    while True:
59        try:
60            scheduled_tasks = list(_scheduled_tasks)
61        except RuntimeError:
62            i += 1
63            if i >= 1000:
64                raise
65        else:
66            break
67    return {t for t in itertools.chain(scheduled_tasks, eager_tasks)
68            if futures._get_loop(t) is loop and not t.done()}
69
70
71class Task(futures._PyFuture):  # Inherit Python Task implementation
72                                # from a Python Future implementation.
73
74    """A coroutine wrapped in a Future."""
75
76    # An important invariant maintained while a Task not done:
77    # _fut_waiter is either None or a Future.  The Future
78    # can be either done() or not done().
79    # The task can be in any of 3 states:
80    #
81    # - 1: _fut_waiter is not None and not _fut_waiter.done():
82    #      __step() is *not* scheduled and the Task is waiting for _fut_waiter.
83    # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled:
84    #       the Task is waiting for __step() to be executed.
85    # - 3:  _fut_waiter is None and __step() is *not* scheduled:
86    #       the Task is currently executing (in __step()).
87    #
88    # * In state 1, one of the callbacks of __fut_waiter must be __wakeup().
89    # * The transition from 1 to 2 happens when _fut_waiter becomes done(),
90    #   as it schedules __wakeup() to be called (which calls __step() so
91    #   we way that __step() is scheduled).
92    # * It transitions from 2 to 3 when __step() is executed, and it clears
93    #   _fut_waiter to None.
94
95    # If False, don't log a message if the task is destroyed while its
96    # status is still pending
97    _log_destroy_pending = True
98
99    def __init__(self, coro, *, loop=None, name=None, context=None,
100                 eager_start=False):
101        super().__init__(loop=loop)
102        if self._source_traceback:
103            del self._source_traceback[-1]
104        if not coroutines.iscoroutine(coro):
105            # raise after Future.__init__(), attrs are required for __del__
106            # prevent logging for pending task in __del__
107            self._log_destroy_pending = False
108            raise TypeError(f"a coroutine was expected, got {coro!r}")
109
110        if name is None:
111            self._name = f'Task-{_task_name_counter()}'
112        else:
113            self._name = str(name)
114
115        self._num_cancels_requested = 0
116        self._must_cancel = False
117        self._fut_waiter = None
118        self._coro = coro
119        if context is None:
120            self._context = contextvars.copy_context()
121        else:
122            self._context = context
123
124        if eager_start and self._loop.is_running():
125            self.__eager_start()
126        else:
127            self._loop.call_soon(self.__step, context=self._context)
128            _register_task(self)
129
130    def __del__(self):
131        if self._state == futures._PENDING and self._log_destroy_pending:
132            context = {
133                'task': self,
134                'message': 'Task was destroyed but it is pending!',
135            }
136            if self._source_traceback:
137                context['source_traceback'] = self._source_traceback
138            self._loop.call_exception_handler(context)
139        super().__del__()
140
141    __class_getitem__ = classmethod(GenericAlias)
142
143    def __repr__(self):
144        return base_tasks._task_repr(self)
145
146    def get_coro(self):
147        return self._coro
148
149    def get_context(self):
150        return self._context
151
152    def get_name(self):
153        return self._name
154
155    def set_name(self, value):
156        self._name = str(value)
157
158    def set_result(self, result):
159        raise RuntimeError('Task does not support set_result operation')
160
161    def set_exception(self, exception):
162        raise RuntimeError('Task does not support set_exception operation')
163
164    def get_stack(self, *, limit=None):
165        """Return the list of stack frames for this task's coroutine.
166
167        If the coroutine is not done, this returns the stack where it is
168        suspended.  If the coroutine has completed successfully or was
169        cancelled, this returns an empty list.  If the coroutine was
170        terminated by an exception, this returns the list of traceback
171        frames.
172
173        The frames are always ordered from oldest to newest.
174
175        The optional limit gives the maximum number of frames to
176        return; by default all available frames are returned.  Its
177        meaning differs depending on whether a stack or a traceback is
178        returned: the newest frames of a stack are returned, but the
179        oldest frames of a traceback are returned.  (This matches the
180        behavior of the traceback module.)
181
182        For reasons beyond our control, only one stack frame is
183        returned for a suspended coroutine.
184        """
185        return base_tasks._task_get_stack(self, limit)
186
187    def print_stack(self, *, limit=None, file=None):
188        """Print the stack or traceback for this task's coroutine.
189
190        This produces output similar to that of the traceback module,
191        for the frames retrieved by get_stack().  The limit argument
192        is passed to get_stack().  The file argument is an I/O stream
193        to which the output is written; by default output is written
194        to sys.stderr.
195        """
196        return base_tasks._task_print_stack(self, limit, file)
197
198    def cancel(self, msg=None):
199        """Request that this task cancel itself.
200
201        This arranges for a CancelledError to be thrown into the
202        wrapped coroutine on the next cycle through the event loop.
203        The coroutine then has a chance to clean up or even deny
204        the request using try/except/finally.
205
206        Unlike Future.cancel, this does not guarantee that the
207        task will be cancelled: the exception might be caught and
208        acted upon, delaying cancellation of the task or preventing
209        cancellation completely.  The task may also return a value or
210        raise a different exception.
211
212        Immediately after this method is called, Task.cancelled() will
213        not return True (unless the task was already cancelled).  A
214        task will be marked as cancelled when the wrapped coroutine
215        terminates with a CancelledError exception (even if cancel()
216        was not called).
217
218        This also increases the task's count of cancellation requests.
219        """
220        self._log_traceback = False
221        if self.done():
222            return False
223        self._num_cancels_requested += 1
224        # These two lines are controversial.  See discussion starting at
225        # https://github.com/python/cpython/pull/31394#issuecomment-1053545331
226        # Also remember that this is duplicated in _asynciomodule.c.
227        # if self._num_cancels_requested > 1:
228        #     return False
229        if self._fut_waiter is not None:
230            if self._fut_waiter.cancel(msg=msg):
231                # Leave self._fut_waiter; it may be a Task that
232                # catches and ignores the cancellation so we may have
233                # to cancel it again later.
234                return True
235        # It must be the case that self.__step is already scheduled.
236        self._must_cancel = True
237        self._cancel_message = msg
238        return True
239
240    def cancelling(self):
241        """Return the count of the task's cancellation requests.
242
243        This count is incremented when .cancel() is called
244        and may be decremented using .uncancel().
245        """
246        return self._num_cancels_requested
247
248    def uncancel(self):
249        """Decrement the task's count of cancellation requests.
250
251        This should be called by the party that called `cancel()` on the task
252        beforehand.
253
254        Returns the remaining number of cancellation requests.
255        """
256        if self._num_cancels_requested > 0:
257            self._num_cancels_requested -= 1
258            if self._num_cancels_requested == 0:
259                self._must_cancel = False
260        return self._num_cancels_requested
261
262    def __eager_start(self):
263        prev_task = _swap_current_task(self._loop, self)
264        try:
265            _register_eager_task(self)
266            try:
267                self._context.run(self.__step_run_and_handle_result, None)
268            finally:
269                _unregister_eager_task(self)
270        finally:
271            try:
272                curtask = _swap_current_task(self._loop, prev_task)
273                assert curtask is self
274            finally:
275                if self.done():
276                    self._coro = None
277                    self = None  # Needed to break cycles when an exception occurs.
278                else:
279                    _register_task(self)
280
281    def __step(self, exc=None):
282        if self.done():
283            raise exceptions.InvalidStateError(
284                f'_step(): already done: {self!r}, {exc!r}')
285        if self._must_cancel:
286            if not isinstance(exc, exceptions.CancelledError):
287                exc = self._make_cancelled_error()
288            self._must_cancel = False
289        self._fut_waiter = None
290
291        _enter_task(self._loop, self)
292        try:
293            self.__step_run_and_handle_result(exc)
294        finally:
295            _leave_task(self._loop, self)
296            self = None  # Needed to break cycles when an exception occurs.
297
298    def __step_run_and_handle_result(self, exc):
299        coro = self._coro
300        try:
301            if exc is None:
302                # We use the `send` method directly, because coroutines
303                # don't have `__iter__` and `__next__` methods.
304                result = coro.send(None)
305            else:
306                result = coro.throw(exc)
307        except StopIteration as exc:
308            if self._must_cancel:
309                # Task is cancelled right before coro stops.
310                self._must_cancel = False
311                super().cancel(msg=self._cancel_message)
312            else:
313                super().set_result(exc.value)
314        except exceptions.CancelledError as exc:
315            # Save the original exception so we can chain it later.
316            self._cancelled_exc = exc
317            super().cancel()  # I.e., Future.cancel(self).
318        except (KeyboardInterrupt, SystemExit) as exc:
319            super().set_exception(exc)
320            raise
321        except BaseException as exc:
322            super().set_exception(exc)
323        else:
324            blocking = getattr(result, '_asyncio_future_blocking', None)
325            if blocking is not None:
326                # Yielded Future must come from Future.__iter__().
327                if futures._get_loop(result) is not self._loop:
328                    new_exc = RuntimeError(
329                        f'Task {self!r} got Future '
330                        f'{result!r} attached to a different loop')
331                    self._loop.call_soon(
332                        self.__step, new_exc, context=self._context)
333                elif blocking:
334                    if result is self:
335                        new_exc = RuntimeError(
336                            f'Task cannot await on itself: {self!r}')
337                        self._loop.call_soon(
338                            self.__step, new_exc, context=self._context)
339                    else:
340                        result._asyncio_future_blocking = False
341                        result.add_done_callback(
342                            self.__wakeup, context=self._context)
343                        self._fut_waiter = result
344                        if self._must_cancel:
345                            if self._fut_waiter.cancel(
346                                    msg=self._cancel_message):
347                                self._must_cancel = False
348                else:
349                    new_exc = RuntimeError(
350                        f'yield was used instead of yield from '
351                        f'in task {self!r} with {result!r}')
352                    self._loop.call_soon(
353                        self.__step, new_exc, context=self._context)
354
355            elif result is None:
356                # Bare yield relinquishes control for one event loop iteration.
357                self._loop.call_soon(self.__step, context=self._context)
358            elif inspect.isgenerator(result):
359                # Yielding a generator is just wrong.
360                new_exc = RuntimeError(
361                    f'yield was used instead of yield from for '
362                    f'generator in task {self!r} with {result!r}')
363                self._loop.call_soon(
364                    self.__step, new_exc, context=self._context)
365            else:
366                # Yielding something else is an error.
367                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
368                self._loop.call_soon(
369                    self.__step, new_exc, context=self._context)
370        finally:
371            self = None  # Needed to break cycles when an exception occurs.
372
373    def __wakeup(self, future):
374        try:
375            future.result()
376        except BaseException as exc:
377            # This may also be a cancellation.
378            self.__step(exc)
379        else:
380            # Don't pass the value of `future.result()` explicitly,
381            # as `Future.__iter__` and `Future.__await__` don't need it.
382            # If we call `_step(value, None)` instead of `_step()`,
383            # Python eval loop would use `.send(value)` method call,
384            # instead of `__next__()`, which is slower for futures
385            # that return non-generator iterators from their `__iter__`.
386            self.__step()
387        self = None  # Needed to break cycles when an exception occurs.
388
389
390_PyTask = Task
391
392
393try:
394    import _asyncio
395except ImportError:
396    pass
397else:
398    # _CTask is needed for tests.
399    Task = _CTask = _asyncio.Task
400
401
402def create_task(coro, *, name=None, context=None):
403    """Schedule the execution of a coroutine object in a spawn task.
404
405    Return a Task object.
406    """
407    loop = events.get_running_loop()
408    if context is None:
409        # Use legacy API if context is not needed
410        task = loop.create_task(coro, name=name)
411    else:
412        task = loop.create_task(coro, name=name, context=context)
413
414    return task
415
416
417# wait() and as_completed() similar to those in PEP 3148.
418
419FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED
420FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION
421ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
422
423
424async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
425    """Wait for the Futures or Tasks given by fs to complete.
426
427    The fs iterable must not be empty.
428
429    Returns two sets of Future: (done, pending).
430
431    Usage:
432
433        done, pending = await asyncio.wait(fs)
434
435    Note: This does not raise TimeoutError! Futures that aren't done
436    when the timeout occurs are returned in the second set.
437    """
438    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
439        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
440    if not fs:
441        raise ValueError('Set of Tasks/Futures is empty.')
442    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
443        raise ValueError(f'Invalid return_when value: {return_when}')
444
445    fs = set(fs)
446
447    if any(coroutines.iscoroutine(f) for f in fs):
448        raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
449
450    loop = events.get_running_loop()
451    return await _wait(fs, timeout, return_when, loop)
452
453
454def _release_waiter(waiter, *args):
455    if not waiter.done():
456        waiter.set_result(None)
457
458
459async def wait_for(fut, timeout):
460    """Wait for the single Future or coroutine to complete, with timeout.
461
462    Coroutine will be wrapped in Task.
463
464    Returns result of the Future or coroutine.  When a timeout occurs,
465    it cancels the task and raises TimeoutError.  To avoid the task
466    cancellation, wrap it in shield().
467
468    If the wait is cancelled, the task is also cancelled.
469
470    If the task suppresses the cancellation and returns a value instead,
471    that value is returned.
472
473    This function is a coroutine.
474    """
475    # The special case for timeout <= 0 is for the following case:
476    #
477    # async def test_waitfor():
478    #     func_started = False
479    #
480    #     async def func():
481    #         nonlocal func_started
482    #         func_started = True
483    #
484    #     try:
485    #         await asyncio.wait_for(func(), 0)
486    #     except asyncio.TimeoutError:
487    #         assert not func_started
488    #     else:
489    #         assert False
490    #
491    # asyncio.run(test_waitfor())
492
493
494    if timeout is not None and timeout <= 0:
495        fut = ensure_future(fut)
496
497        if fut.done():
498            return fut.result()
499
500        await _cancel_and_wait(fut)
501        try:
502            return fut.result()
503        except exceptions.CancelledError as exc:
504            raise TimeoutError from exc
505
506    async with timeouts.timeout(timeout):
507        return await fut
508
509async def _wait(fs, timeout, return_when, loop):
510    """Internal helper for wait().
511
512    The fs argument must be a collection of Futures.
513    """
514    assert fs, 'Set of Futures is empty.'
515    waiter = loop.create_future()
516    timeout_handle = None
517    if timeout is not None:
518        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
519    counter = len(fs)
520
521    def _on_completion(f):
522        nonlocal counter
523        counter -= 1
524        if (counter <= 0 or
525            return_when == FIRST_COMPLETED or
526            return_when == FIRST_EXCEPTION and (not f.cancelled() and
527                                                f.exception() is not None)):
528            if timeout_handle is not None:
529                timeout_handle.cancel()
530            if not waiter.done():
531                waiter.set_result(None)
532
533    for f in fs:
534        f.add_done_callback(_on_completion)
535
536    try:
537        await waiter
538    finally:
539        if timeout_handle is not None:
540            timeout_handle.cancel()
541        for f in fs:
542            f.remove_done_callback(_on_completion)
543
544    done, pending = set(), set()
545    for f in fs:
546        if f.done():
547            done.add(f)
548        else:
549            pending.add(f)
550    return done, pending
551
552
553async def _cancel_and_wait(fut):
554    """Cancel the *fut* future or task and wait until it completes."""
555
556    loop = events.get_running_loop()
557    waiter = loop.create_future()
558    cb = functools.partial(_release_waiter, waiter)
559    fut.add_done_callback(cb)
560
561    try:
562        fut.cancel()
563        # We cannot wait on *fut* directly to make
564        # sure _cancel_and_wait itself is reliably cancellable.
565        await waiter
566    finally:
567        fut.remove_done_callback(cb)
568
569
570class _AsCompletedIterator:
571    """Iterator of awaitables representing tasks of asyncio.as_completed.
572
573    As an asynchronous iterator, iteration yields futures as they finish. As a
574    plain iterator, new coroutines are yielded that will return or raise the
575    result of the next underlying future to complete.
576    """
577    def __init__(self, aws, timeout):
578        self._done = queues.Queue()
579        self._timeout_handle = None
580
581        loop = events.get_event_loop()
582        todo = {ensure_future(aw, loop=loop) for aw in set(aws)}
583        for f in todo:
584            f.add_done_callback(self._handle_completion)
585        if todo and timeout is not None:
586            self._timeout_handle = (
587                loop.call_later(timeout, self._handle_timeout)
588            )
589        self._todo = todo
590        self._todo_left = len(todo)
591
592    def __aiter__(self):
593        return self
594
595    def __iter__(self):
596        return self
597
598    async def __anext__(self):
599        if not self._todo_left:
600            raise StopAsyncIteration
601        assert self._todo_left > 0
602        self._todo_left -= 1
603        return await self._wait_for_one()
604
605    def __next__(self):
606        if not self._todo_left:
607            raise StopIteration
608        assert self._todo_left > 0
609        self._todo_left -= 1
610        return self._wait_for_one(resolve=True)
611
612    def _handle_timeout(self):
613        for f in self._todo:
614            f.remove_done_callback(self._handle_completion)
615            self._done.put_nowait(None)  # Sentinel for _wait_for_one().
616        self._todo.clear()  # Can't do todo.remove(f) in the loop.
617
618    def _handle_completion(self, f):
619        if not self._todo:
620            return  # _handle_timeout() was here first.
621        self._todo.remove(f)
622        self._done.put_nowait(f)
623        if not self._todo and self._timeout_handle is not None:
624            self._timeout_handle.cancel()
625
626    async def _wait_for_one(self, resolve=False):
627        # Wait for the next future to be done and return it unless resolve is
628        # set, in which case return either the result of the future or raise
629        # an exception.
630        f = await self._done.get()
631        if f is None:
632            # Dummy value from _handle_timeout().
633            raise exceptions.TimeoutError
634        return f.result() if resolve else f
635
636
637def as_completed(fs, *, timeout=None):
638    """Create an iterator of awaitables or their results in completion order.
639
640    Run the supplied awaitables concurrently. The returned object can be
641    iterated to obtain the results of the awaitables as they finish.
642
643    The object returned can be iterated as an asynchronous iterator or a plain
644    iterator. When asynchronous iteration is used, the originally-supplied
645    awaitables are yielded if they are tasks or futures. This makes it easy to
646    correlate previously-scheduled tasks with their results:
647
648        ipv4_connect = create_task(open_connection("127.0.0.1", 80))
649        ipv6_connect = create_task(open_connection("::1", 80))
650        tasks = [ipv4_connect, ipv6_connect]
651
652        async for earliest_connect in as_completed(tasks):
653            # earliest_connect is done. The result can be obtained by
654            # awaiting it or calling earliest_connect.result()
655            reader, writer = await earliest_connect
656
657            if earliest_connect is ipv6_connect:
658                print("IPv6 connection established.")
659            else:
660                print("IPv4 connection established.")
661
662    During asynchronous iteration, implicitly-created tasks will be yielded for
663    supplied awaitables that aren't tasks or futures.
664
665    When used as a plain iterator, each iteration yields a new coroutine that
666    returns the result or raises the exception of the next completed awaitable.
667    This pattern is compatible with Python versions older than 3.13:
668
669        ipv4_connect = create_task(open_connection("127.0.0.1", 80))
670        ipv6_connect = create_task(open_connection("::1", 80))
671        tasks = [ipv4_connect, ipv6_connect]
672
673        for next_connect in as_completed(tasks):
674            # next_connect is not one of the original task objects. It must be
675            # awaited to obtain the result value or raise the exception of the
676            # awaitable that finishes next.
677            reader, writer = await next_connect
678
679    A TimeoutError is raised if the timeout occurs before all awaitables are
680    done. This is raised by the async for loop during asynchronous iteration or
681    by the coroutines yielded during plain iteration.
682    """
683    if inspect.isawaitable(fs):
684        raise TypeError(
685            f"expects an iterable of awaitables, not {type(fs).__name__}"
686        )
687
688    return _AsCompletedIterator(fs, timeout)
689
690
691@types.coroutine
692def __sleep0():
693    """Skip one event loop run cycle.
694
695    This is a private helper for 'asyncio.sleep()', used
696    when the 'delay' is set to 0.  It uses a bare 'yield'
697    expression (which Task.__step knows how to handle)
698    instead of creating a Future object.
699    """
700    yield
701
702
703async def sleep(delay, result=None):
704    """Coroutine that completes after a given time (in seconds)."""
705    if delay <= 0:
706        await __sleep0()
707        return result
708
709    if math.isnan(delay):
710        raise ValueError("Invalid delay: NaN (not a number)")
711
712    loop = events.get_running_loop()
713    future = loop.create_future()
714    h = loop.call_later(delay,
715                        futures._set_result_unless_cancelled,
716                        future, result)
717    try:
718        return await future
719    finally:
720        h.cancel()
721
722
723def ensure_future(coro_or_future, *, loop=None):
724    """Wrap a coroutine or an awaitable in a future.
725
726    If the argument is a Future, it is returned directly.
727    """
728    if futures.isfuture(coro_or_future):
729        if loop is not None and loop is not futures._get_loop(coro_or_future):
730            raise ValueError('The future belongs to a different loop than '
731                            'the one specified as the loop argument')
732        return coro_or_future
733    should_close = True
734    if not coroutines.iscoroutine(coro_or_future):
735        if inspect.isawaitable(coro_or_future):
736            async def _wrap_awaitable(awaitable):
737                return await awaitable
738
739            coro_or_future = _wrap_awaitable(coro_or_future)
740            should_close = False
741        else:
742            raise TypeError('An asyncio.Future, a coroutine or an awaitable '
743                            'is required')
744
745    if loop is None:
746        loop = events.get_event_loop()
747    try:
748        return loop.create_task(coro_or_future)
749    except RuntimeError:
750        if should_close:
751            coro_or_future.close()
752        raise
753
754
755class _GatheringFuture(futures.Future):
756    """Helper for gather().
757
758    This overrides cancel() to cancel all the children and act more
759    like Task.cancel(), which doesn't immediately mark itself as
760    cancelled.
761    """
762
763    def __init__(self, children, *, loop):
764        assert loop is not None
765        super().__init__(loop=loop)
766        self._children = children
767        self._cancel_requested = False
768
769    def cancel(self, msg=None):
770        if self.done():
771            return False
772        ret = False
773        for child in self._children:
774            if child.cancel(msg=msg):
775                ret = True
776        if ret:
777            # If any child tasks were actually cancelled, we should
778            # propagate the cancellation request regardless of
779            # *return_exceptions* argument.  See issue 32684.
780            self._cancel_requested = True
781        return ret
782
783
784def gather(*coros_or_futures, return_exceptions=False):
785    """Return a future aggregating results from the given coroutines/futures.
786
787    Coroutines will be wrapped in a future and scheduled in the event
788    loop. They will not necessarily be scheduled in the same order as
789    passed in.
790
791    All futures must share the same event loop.  If all the tasks are
792    done successfully, the returned future's result is the list of
793    results (in the order of the original sequence, not necessarily
794    the order of results arrival).  If *return_exceptions* is True,
795    exceptions in the tasks are treated the same as successful
796    results, and gathered in the result list; otherwise, the first
797    raised exception will be immediately propagated to the returned
798    future.
799
800    Cancellation: if the outer Future is cancelled, all children (that
801    have not completed yet) are also cancelled.  If any child is
802    cancelled, this is treated as if it raised CancelledError --
803    the outer Future is *not* cancelled in this case.  (This is to
804    prevent the cancellation of one child to cause other children to
805    be cancelled.)
806
807    If *return_exceptions* is False, cancelling gather() after it
808    has been marked done won't cancel any submitted awaitables.
809    For instance, gather can be marked done after propagating an
810    exception to the caller, therefore, calling ``gather.cancel()``
811    after catching an exception (raised by one of the awaitables) from
812    gather won't cancel any other awaitables.
813    """
814    if not coros_or_futures:
815        loop = events.get_event_loop()
816        outer = loop.create_future()
817        outer.set_result([])
818        return outer
819
820    def _done_callback(fut):
821        nonlocal nfinished
822        nfinished += 1
823
824        if outer is None or outer.done():
825            if not fut.cancelled():
826                # Mark exception retrieved.
827                fut.exception()
828            return
829
830        if not return_exceptions:
831            if fut.cancelled():
832                # Check if 'fut' is cancelled first, as
833                # 'fut.exception()' will *raise* a CancelledError
834                # instead of returning it.
835                exc = fut._make_cancelled_error()
836                outer.set_exception(exc)
837                return
838            else:
839                exc = fut.exception()
840                if exc is not None:
841                    outer.set_exception(exc)
842                    return
843
844        if nfinished == nfuts:
845            # All futures are done; create a list of results
846            # and set it to the 'outer' future.
847            results = []
848
849            for fut in children:
850                if fut.cancelled():
851                    # Check if 'fut' is cancelled first, as 'fut.exception()'
852                    # will *raise* a CancelledError instead of returning it.
853                    # Also, since we're adding the exception return value
854                    # to 'results' instead of raising it, don't bother
855                    # setting __context__.  This also lets us preserve
856                    # calling '_make_cancelled_error()' at most once.
857                    res = exceptions.CancelledError(
858                        '' if fut._cancel_message is None else
859                        fut._cancel_message)
860                else:
861                    res = fut.exception()
862                    if res is None:
863                        res = fut.result()
864                results.append(res)
865
866            if outer._cancel_requested:
867                # If gather is being cancelled we must propagate the
868                # cancellation regardless of *return_exceptions* argument.
869                # See issue 32684.
870                exc = fut._make_cancelled_error()
871                outer.set_exception(exc)
872            else:
873                outer.set_result(results)
874
875    arg_to_fut = {}
876    children = []
877    nfuts = 0
878    nfinished = 0
879    done_futs = []
880    loop = None
881    outer = None  # bpo-46672
882    for arg in coros_or_futures:
883        if arg not in arg_to_fut:
884            fut = ensure_future(arg, loop=loop)
885            if loop is None:
886                loop = futures._get_loop(fut)
887            if fut is not arg:
888                # 'arg' was not a Future, therefore, 'fut' is a new
889                # Future created specifically for 'arg'.  Since the caller
890                # can't control it, disable the "destroy pending task"
891                # warning.
892                fut._log_destroy_pending = False
893
894            nfuts += 1
895            arg_to_fut[arg] = fut
896            if fut.done():
897                done_futs.append(fut)
898            else:
899                fut.add_done_callback(_done_callback)
900
901        else:
902            # There's a duplicate Future object in coros_or_futures.
903            fut = arg_to_fut[arg]
904
905        children.append(fut)
906
907    outer = _GatheringFuture(children, loop=loop)
908    # Run done callbacks after GatheringFuture created so any post-processing
909    # can be performed at this point
910    # optimization: in the special case that *all* futures finished eagerly,
911    # this will effectively complete the gather eagerly, with the last
912    # callback setting the result (or exception) on outer before returning it
913    for fut in done_futs:
914        _done_callback(fut)
915    return outer
916
917
918def shield(arg):
919    """Wait for a future, shielding it from cancellation.
920
921    The statement
922
923        task = asyncio.create_task(something())
924        res = await shield(task)
925
926    is exactly equivalent to the statement
927
928        res = await something()
929
930    *except* that if the coroutine containing it is cancelled, the
931    task running in something() is not cancelled.  From the POV of
932    something(), the cancellation did not happen.  But its caller is
933    still cancelled, so the yield-from expression still raises
934    CancelledError.  Note: If something() is cancelled by other means
935    this will still cancel shield().
936
937    If you want to completely ignore cancellation (not recommended)
938    you can combine shield() with a try/except clause, as follows:
939
940        task = asyncio.create_task(something())
941        try:
942            res = await shield(task)
943        except CancelledError:
944            res = None
945
946    Save a reference to tasks passed to this function, to avoid
947    a task disappearing mid-execution. The event loop only keeps
948    weak references to tasks. A task that isn't referenced elsewhere
949    may get garbage collected at any time, even before it's done.
950    """
951    inner = ensure_future(arg)
952    if inner.done():
953        # Shortcut.
954        return inner
955    loop = futures._get_loop(inner)
956    outer = loop.create_future()
957
958    def _inner_done_callback(inner):
959        if outer.cancelled():
960            if not inner.cancelled():
961                # Mark inner's result as retrieved.
962                inner.exception()
963            return
964
965        if inner.cancelled():
966            outer.cancel()
967        else:
968            exc = inner.exception()
969            if exc is not None:
970                outer.set_exception(exc)
971            else:
972                outer.set_result(inner.result())
973
974
975    def _outer_done_callback(outer):
976        if not inner.done():
977            inner.remove_done_callback(_inner_done_callback)
978
979    inner.add_done_callback(_inner_done_callback)
980    outer.add_done_callback(_outer_done_callback)
981    return outer
982
983
984def run_coroutine_threadsafe(coro, loop):
985    """Submit a coroutine object to a given event loop.
986
987    Return a concurrent.futures.Future to access the result.
988    """
989    if not coroutines.iscoroutine(coro):
990        raise TypeError('A coroutine object is required')
991    future = concurrent.futures.Future()
992
993    def callback():
994        try:
995            futures._chain_future(ensure_future(coro, loop=loop), future)
996        except (SystemExit, KeyboardInterrupt):
997            raise
998        except BaseException as exc:
999            if future.set_running_or_notify_cancel():
1000                future.set_exception(exc)
1001            raise
1002
1003    loop.call_soon_threadsafe(callback)
1004    return future
1005
1006
1007def create_eager_task_factory(custom_task_constructor):
1008    """Create a function suitable for use as a task factory on an event-loop.
1009
1010        Example usage:
1011
1012            loop.set_task_factory(
1013                asyncio.create_eager_task_factory(my_task_constructor))
1014
1015        Now, tasks created will be started immediately (rather than being first
1016        scheduled to an event loop). The constructor argument can be any callable
1017        that returns a Task-compatible object and has a signature compatible
1018        with `Task.__init__`; it must have the `eager_start` keyword argument.
1019
1020        Most applications will use `Task` for `custom_task_constructor` and in
1021        this case there's no need to call `create_eager_task_factory()`
1022        directly. Instead the  global `eager_task_factory` instance can be
1023        used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`.
1024        """
1025
1026    def factory(loop, coro, *, name=None, context=None):
1027        return custom_task_constructor(
1028            coro, loop=loop, name=name, context=context, eager_start=True)
1029
1030    return factory
1031
1032
1033eager_task_factory = create_eager_task_factory(Task)
1034
1035
1036# Collectively these two sets hold references to the complete set of active
1037# tasks. Eagerly executed tasks use a faster regular set as an optimization
1038# but may graduate to a WeakSet if the task blocks on IO.
1039_scheduled_tasks = weakref.WeakSet()
1040_eager_tasks = set()
1041
1042# Dictionary containing tasks that are currently active in
1043# all running event loops.  {EventLoop: Task}
1044_current_tasks = {}
1045
1046
1047def _register_task(task):
1048    """Register an asyncio Task scheduled to run on an event loop."""
1049    _scheduled_tasks.add(task)
1050
1051
1052def _register_eager_task(task):
1053    """Register an asyncio Task about to be eagerly executed."""
1054    _eager_tasks.add(task)
1055
1056
1057def _enter_task(loop, task):
1058    current_task = _current_tasks.get(loop)
1059    if current_task is not None:
1060        raise RuntimeError(f"Cannot enter into task {task!r} while another "
1061                           f"task {current_task!r} is being executed.")
1062    _current_tasks[loop] = task
1063
1064
1065def _leave_task(loop, task):
1066    current_task = _current_tasks.get(loop)
1067    if current_task is not task:
1068        raise RuntimeError(f"Leaving task {task!r} does not match "
1069                           f"the current task {current_task!r}.")
1070    del _current_tasks[loop]
1071
1072
1073def _swap_current_task(loop, task):
1074    prev_task = _current_tasks.get(loop)
1075    if task is None:
1076        del _current_tasks[loop]
1077    else:
1078        _current_tasks[loop] = task
1079    return prev_task
1080
1081
1082def _unregister_task(task):
1083    """Unregister a completed, scheduled Task."""
1084    _scheduled_tasks.discard(task)
1085
1086
1087def _unregister_eager_task(task):
1088    """Unregister a task which finished its first eager step."""
1089    _eager_tasks.discard(task)
1090
1091
1092_py_current_task = current_task
1093_py_register_task = _register_task
1094_py_register_eager_task = _register_eager_task
1095_py_unregister_task = _unregister_task
1096_py_unregister_eager_task = _unregister_eager_task
1097_py_enter_task = _enter_task
1098_py_leave_task = _leave_task
1099_py_swap_current_task = _swap_current_task
1100
1101
1102try:
1103    from _asyncio import (_register_task, _register_eager_task,
1104                          _unregister_task, _unregister_eager_task,
1105                          _enter_task, _leave_task, _swap_current_task,
1106                          _scheduled_tasks, _eager_tasks, _current_tasks,
1107                          current_task)
1108except ImportError:
1109    pass
1110else:
1111    _c_current_task = current_task
1112    _c_register_task = _register_task
1113    _c_register_eager_task = _register_eager_task
1114    _c_unregister_task = _unregister_task
1115    _c_unregister_eager_task = _unregister_eager_task
1116    _c_enter_task = _enter_task
1117    _c_leave_task = _leave_task
1118    _c_swap_current_task = _swap_current_task
1119