1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10) 11 12import concurrent.futures 13import contextvars 14import functools 15import inspect 16import itertools 17import types 18import warnings 19import weakref 20 21from . import base_tasks 22from . import coroutines 23from . import events 24from . import exceptions 25from . import futures 26from .coroutines import _is_coroutine 27 28# Helper to generate new task names 29# This uses itertools.count() instead of a "+= 1" operation because the latter 30# is not thread safe. See bpo-11866 for a longer explanation. 31_task_name_counter = itertools.count(1).__next__ 32 33 34def current_task(loop=None): 35 """Return a currently executed task.""" 36 if loop is None: 37 loop = events.get_running_loop() 38 return _current_tasks.get(loop) 39 40 41def all_tasks(loop=None): 42 """Return a set of all tasks for the loop.""" 43 if loop is None: 44 loop = events.get_running_loop() 45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 46 # thread while we do so. Therefore we cast it to list prior to filtering. The list 47 # cast itself requires iteration, so we repeat it several times ignoring 48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 49 # details. 50 i = 0 51 while True: 52 try: 53 tasks = list(_all_tasks) 54 except RuntimeError: 55 i += 1 56 if i >= 1000: 57 raise 58 else: 59 break 60 return {t for t in tasks 61 if futures._get_loop(t) is loop and not t.done()} 62 63 64def _all_tasks_compat(loop=None): 65 # Different from "all_task()" by returning *all* Tasks, including 66 # the completed ones. Used to implement deprecated "Tasks.all_task()" 67 # method. 68 if loop is None: 69 loop = events.get_event_loop() 70 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 71 # thread while we do so. Therefore we cast it to list prior to filtering. The list 72 # cast itself requires iteration, so we repeat it several times ignoring 73 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 74 # details. 75 i = 0 76 while True: 77 try: 78 tasks = list(_all_tasks) 79 except RuntimeError: 80 i += 1 81 if i >= 1000: 82 raise 83 else: 84 break 85 return {t for t in tasks if futures._get_loop(t) is loop} 86 87 88def _set_task_name(task, name): 89 if name is not None: 90 try: 91 set_name = task.set_name 92 except AttributeError: 93 pass 94 else: 95 set_name(name) 96 97 98class Task(futures._PyFuture): # Inherit Python Task implementation 99 # from a Python Future implementation. 100 101 """A coroutine wrapped in a Future.""" 102 103 # An important invariant maintained while a Task not done: 104 # 105 # - Either _fut_waiter is None, and _step() is scheduled; 106 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 107 # 108 # The only transition from the latter to the former is through 109 # _wakeup(). When _fut_waiter is not None, one of its callbacks 110 # must be _wakeup(). 111 112 # If False, don't log a message if the task is destroyed whereas its 113 # status is still pending 114 _log_destroy_pending = True 115 116 @classmethod 117 def current_task(cls, loop=None): 118 """Return the currently running task in an event loop or None. 119 120 By default the current task for the current event loop is returned. 121 122 None is returned when called not in the context of a Task. 123 """ 124 warnings.warn("Task.current_task() is deprecated since Python 3.7, " 125 "use asyncio.current_task() instead", 126 DeprecationWarning, 127 stacklevel=2) 128 if loop is None: 129 loop = events.get_event_loop() 130 return current_task(loop) 131 132 @classmethod 133 def all_tasks(cls, loop=None): 134 """Return a set of all tasks for an event loop. 135 136 By default all tasks for the current event loop are returned. 137 """ 138 warnings.warn("Task.all_tasks() is deprecated since Python 3.7, " 139 "use asyncio.all_tasks() instead", 140 DeprecationWarning, 141 stacklevel=2) 142 return _all_tasks_compat(loop) 143 144 def __init__(self, coro, *, loop=None, name=None): 145 super().__init__(loop=loop) 146 if self._source_traceback: 147 del self._source_traceback[-1] 148 if not coroutines.iscoroutine(coro): 149 # raise after Future.__init__(), attrs are required for __del__ 150 # prevent logging for pending task in __del__ 151 self._log_destroy_pending = False 152 raise TypeError(f"a coroutine was expected, got {coro!r}") 153 154 if name is None: 155 self._name = f'Task-{_task_name_counter()}' 156 else: 157 self._name = str(name) 158 159 self._must_cancel = False 160 self._fut_waiter = None 161 self._coro = coro 162 self._context = contextvars.copy_context() 163 164 self._loop.call_soon(self.__step, context=self._context) 165 _register_task(self) 166 167 def __del__(self): 168 if self._state == futures._PENDING and self._log_destroy_pending: 169 context = { 170 'task': self, 171 'message': 'Task was destroyed but it is pending!', 172 } 173 if self._source_traceback: 174 context['source_traceback'] = self._source_traceback 175 self._loop.call_exception_handler(context) 176 super().__del__() 177 178 def _repr_info(self): 179 return base_tasks._task_repr_info(self) 180 181 def get_coro(self): 182 return self._coro 183 184 def get_name(self): 185 return self._name 186 187 def set_name(self, value): 188 self._name = str(value) 189 190 def set_result(self, result): 191 raise RuntimeError('Task does not support set_result operation') 192 193 def set_exception(self, exception): 194 raise RuntimeError('Task does not support set_exception operation') 195 196 def get_stack(self, *, limit=None): 197 """Return the list of stack frames for this task's coroutine. 198 199 If the coroutine is not done, this returns the stack where it is 200 suspended. If the coroutine has completed successfully or was 201 cancelled, this returns an empty list. If the coroutine was 202 terminated by an exception, this returns the list of traceback 203 frames. 204 205 The frames are always ordered from oldest to newest. 206 207 The optional limit gives the maximum number of frames to 208 return; by default all available frames are returned. Its 209 meaning differs depending on whether a stack or a traceback is 210 returned: the newest frames of a stack are returned, but the 211 oldest frames of a traceback are returned. (This matches the 212 behavior of the traceback module.) 213 214 For reasons beyond our control, only one stack frame is 215 returned for a suspended coroutine. 216 """ 217 return base_tasks._task_get_stack(self, limit) 218 219 def print_stack(self, *, limit=None, file=None): 220 """Print the stack or traceback for this task's coroutine. 221 222 This produces output similar to that of the traceback module, 223 for the frames retrieved by get_stack(). The limit argument 224 is passed to get_stack(). The file argument is an I/O stream 225 to which the output is written; by default output is written 226 to sys.stderr. 227 """ 228 return base_tasks._task_print_stack(self, limit, file) 229 230 def cancel(self): 231 """Request that this task cancel itself. 232 233 This arranges for a CancelledError to be thrown into the 234 wrapped coroutine on the next cycle through the event loop. 235 The coroutine then has a chance to clean up or even deny 236 the request using try/except/finally. 237 238 Unlike Future.cancel, this does not guarantee that the 239 task will be cancelled: the exception might be caught and 240 acted upon, delaying cancellation of the task or preventing 241 cancellation completely. The task may also return a value or 242 raise a different exception. 243 244 Immediately after this method is called, Task.cancelled() will 245 not return True (unless the task was already cancelled). A 246 task will be marked as cancelled when the wrapped coroutine 247 terminates with a CancelledError exception (even if cancel() 248 was not called). 249 """ 250 self._log_traceback = False 251 if self.done(): 252 return False 253 if self._fut_waiter is not None: 254 if self._fut_waiter.cancel(): 255 # Leave self._fut_waiter; it may be a Task that 256 # catches and ignores the cancellation so we may have 257 # to cancel it again later. 258 return True 259 # It must be the case that self.__step is already scheduled. 260 self._must_cancel = True 261 return True 262 263 def __step(self, exc=None): 264 if self.done(): 265 raise exceptions.InvalidStateError( 266 f'_step(): already done: {self!r}, {exc!r}') 267 if self._must_cancel: 268 if not isinstance(exc, exceptions.CancelledError): 269 exc = exceptions.CancelledError() 270 self._must_cancel = False 271 coro = self._coro 272 self._fut_waiter = None 273 274 _enter_task(self._loop, self) 275 # Call either coro.throw(exc) or coro.send(None). 276 try: 277 if exc is None: 278 # We use the `send` method directly, because coroutines 279 # don't have `__iter__` and `__next__` methods. 280 result = coro.send(None) 281 else: 282 result = coro.throw(exc) 283 except StopIteration as exc: 284 if self._must_cancel: 285 # Task is cancelled right before coro stops. 286 self._must_cancel = False 287 super().cancel() 288 else: 289 super().set_result(exc.value) 290 except exceptions.CancelledError: 291 super().cancel() # I.e., Future.cancel(self). 292 except (KeyboardInterrupt, SystemExit) as exc: 293 super().set_exception(exc) 294 raise 295 except BaseException as exc: 296 super().set_exception(exc) 297 else: 298 blocking = getattr(result, '_asyncio_future_blocking', None) 299 if blocking is not None: 300 # Yielded Future must come from Future.__iter__(). 301 if futures._get_loop(result) is not self._loop: 302 new_exc = RuntimeError( 303 f'Task {self!r} got Future ' 304 f'{result!r} attached to a different loop') 305 self._loop.call_soon( 306 self.__step, new_exc, context=self._context) 307 elif blocking: 308 if result is self: 309 new_exc = RuntimeError( 310 f'Task cannot await on itself: {self!r}') 311 self._loop.call_soon( 312 self.__step, new_exc, context=self._context) 313 else: 314 result._asyncio_future_blocking = False 315 result.add_done_callback( 316 self.__wakeup, context=self._context) 317 self._fut_waiter = result 318 if self._must_cancel: 319 if self._fut_waiter.cancel(): 320 self._must_cancel = False 321 else: 322 new_exc = RuntimeError( 323 f'yield was used instead of yield from ' 324 f'in task {self!r} with {result!r}') 325 self._loop.call_soon( 326 self.__step, new_exc, context=self._context) 327 328 elif result is None: 329 # Bare yield relinquishes control for one event loop iteration. 330 self._loop.call_soon(self.__step, context=self._context) 331 elif inspect.isgenerator(result): 332 # Yielding a generator is just wrong. 333 new_exc = RuntimeError( 334 f'yield was used instead of yield from for ' 335 f'generator in task {self!r} with {result!r}') 336 self._loop.call_soon( 337 self.__step, new_exc, context=self._context) 338 else: 339 # Yielding something else is an error. 340 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 341 self._loop.call_soon( 342 self.__step, new_exc, context=self._context) 343 finally: 344 _leave_task(self._loop, self) 345 self = None # Needed to break cycles when an exception occurs. 346 347 def __wakeup(self, future): 348 try: 349 future.result() 350 except BaseException as exc: 351 # This may also be a cancellation. 352 self.__step(exc) 353 else: 354 # Don't pass the value of `future.result()` explicitly, 355 # as `Future.__iter__` and `Future.__await__` don't need it. 356 # If we call `_step(value, None)` instead of `_step()`, 357 # Python eval loop would use `.send(value)` method call, 358 # instead of `__next__()`, which is slower for futures 359 # that return non-generator iterators from their `__iter__`. 360 self.__step() 361 self = None # Needed to break cycles when an exception occurs. 362 363 364_PyTask = Task 365 366 367try: 368 import _asyncio 369except ImportError: 370 pass 371else: 372 # _CTask is needed for tests. 373 Task = _CTask = _asyncio.Task 374 375 376def create_task(coro, *, name=None): 377 """Schedule the execution of a coroutine object in a spawn task. 378 379 Return a Task object. 380 """ 381 loop = events.get_running_loop() 382 task = loop.create_task(coro) 383 _set_task_name(task, name) 384 return task 385 386 387# wait() and as_completed() similar to those in PEP 3148. 388 389FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 390FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 391ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 392 393 394async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 395 """Wait for the Futures and coroutines given by fs to complete. 396 397 The sequence futures must not be empty. 398 399 Coroutines will be wrapped in Tasks. 400 401 Returns two sets of Future: (done, pending). 402 403 Usage: 404 405 done, pending = await asyncio.wait(fs) 406 407 Note: This does not raise TimeoutError! Futures that aren't done 408 when the timeout occurs are returned in the second set. 409 """ 410 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 411 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 412 if not fs: 413 raise ValueError('Set of coroutines/Futures is empty.') 414 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 415 raise ValueError(f'Invalid return_when value: {return_when}') 416 417 if loop is None: 418 loop = events.get_running_loop() 419 else: 420 warnings.warn("The loop argument is deprecated since Python 3.8, " 421 "and scheduled for removal in Python 3.10.", 422 DeprecationWarning, stacklevel=2) 423 424 fs = {ensure_future(f, loop=loop) for f in set(fs)} 425 426 return await _wait(fs, timeout, return_when, loop) 427 428 429def _release_waiter(waiter, *args): 430 if not waiter.done(): 431 waiter.set_result(None) 432 433 434async def wait_for(fut, timeout, *, loop=None): 435 """Wait for the single Future or coroutine to complete, with timeout. 436 437 Coroutine will be wrapped in Task. 438 439 Returns result of the Future or coroutine. When a timeout occurs, 440 it cancels the task and raises TimeoutError. To avoid the task 441 cancellation, wrap it in shield(). 442 443 If the wait is cancelled, the task is also cancelled. 444 445 This function is a coroutine. 446 """ 447 if loop is None: 448 loop = events.get_running_loop() 449 else: 450 warnings.warn("The loop argument is deprecated since Python 3.8, " 451 "and scheduled for removal in Python 3.10.", 452 DeprecationWarning, stacklevel=2) 453 454 if timeout is None: 455 return await fut 456 457 if timeout <= 0: 458 fut = ensure_future(fut, loop=loop) 459 460 if fut.done(): 461 return fut.result() 462 463 fut.cancel() 464 raise exceptions.TimeoutError() 465 466 waiter = loop.create_future() 467 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 468 cb = functools.partial(_release_waiter, waiter) 469 470 fut = ensure_future(fut, loop=loop) 471 fut.add_done_callback(cb) 472 473 try: 474 # wait until the future completes or the timeout 475 try: 476 await waiter 477 except exceptions.CancelledError: 478 fut.remove_done_callback(cb) 479 fut.cancel() 480 raise 481 482 if fut.done(): 483 return fut.result() 484 else: 485 fut.remove_done_callback(cb) 486 # We must ensure that the task is not running 487 # after wait_for() returns. 488 # See https://bugs.python.org/issue32751 489 await _cancel_and_wait(fut, loop=loop) 490 raise exceptions.TimeoutError() 491 finally: 492 timeout_handle.cancel() 493 494 495async def _wait(fs, timeout, return_when, loop): 496 """Internal helper for wait(). 497 498 The fs argument must be a collection of Futures. 499 """ 500 assert fs, 'Set of Futures is empty.' 501 waiter = loop.create_future() 502 timeout_handle = None 503 if timeout is not None: 504 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 505 counter = len(fs) 506 507 def _on_completion(f): 508 nonlocal counter 509 counter -= 1 510 if (counter <= 0 or 511 return_when == FIRST_COMPLETED or 512 return_when == FIRST_EXCEPTION and (not f.cancelled() and 513 f.exception() is not None)): 514 if timeout_handle is not None: 515 timeout_handle.cancel() 516 if not waiter.done(): 517 waiter.set_result(None) 518 519 for f in fs: 520 f.add_done_callback(_on_completion) 521 522 try: 523 await waiter 524 finally: 525 if timeout_handle is not None: 526 timeout_handle.cancel() 527 for f in fs: 528 f.remove_done_callback(_on_completion) 529 530 done, pending = set(), set() 531 for f in fs: 532 if f.done(): 533 done.add(f) 534 else: 535 pending.add(f) 536 return done, pending 537 538 539async def _cancel_and_wait(fut, loop): 540 """Cancel the *fut* future or task and wait until it completes.""" 541 542 waiter = loop.create_future() 543 cb = functools.partial(_release_waiter, waiter) 544 fut.add_done_callback(cb) 545 546 try: 547 fut.cancel() 548 # We cannot wait on *fut* directly to make 549 # sure _cancel_and_wait itself is reliably cancellable. 550 await waiter 551 finally: 552 fut.remove_done_callback(cb) 553 554 555# This is *not* a @coroutine! It is just an iterator (yielding Futures). 556def as_completed(fs, *, loop=None, timeout=None): 557 """Return an iterator whose values are coroutines. 558 559 When waiting for the yielded coroutines you'll get the results (or 560 exceptions!) of the original Futures (or coroutines), in the order 561 in which and as soon as they complete. 562 563 This differs from PEP 3148; the proper way to use this is: 564 565 for f in as_completed(fs): 566 result = await f # The 'await' may raise. 567 # Use result. 568 569 If a timeout is specified, the 'await' will raise 570 TimeoutError when the timeout occurs before all Futures are done. 571 572 Note: The futures 'f' are not necessarily members of fs. 573 """ 574 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 575 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 576 577 from .queues import Queue # Import here to avoid circular import problem. 578 done = Queue(loop=loop) 579 580 if loop is None: 581 loop = events.get_event_loop() 582 else: 583 warnings.warn("The loop argument is deprecated since Python 3.8, " 584 "and scheduled for removal in Python 3.10.", 585 DeprecationWarning, stacklevel=2) 586 todo = {ensure_future(f, loop=loop) for f in set(fs)} 587 timeout_handle = None 588 589 def _on_timeout(): 590 for f in todo: 591 f.remove_done_callback(_on_completion) 592 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 593 todo.clear() # Can't do todo.remove(f) in the loop. 594 595 def _on_completion(f): 596 if not todo: 597 return # _on_timeout() was here first. 598 todo.remove(f) 599 done.put_nowait(f) 600 if not todo and timeout_handle is not None: 601 timeout_handle.cancel() 602 603 async def _wait_for_one(): 604 f = await done.get() 605 if f is None: 606 # Dummy value from _on_timeout(). 607 raise exceptions.TimeoutError 608 return f.result() # May raise f.exception(). 609 610 for f in todo: 611 f.add_done_callback(_on_completion) 612 if todo and timeout is not None: 613 timeout_handle = loop.call_later(timeout, _on_timeout) 614 for _ in range(len(todo)): 615 yield _wait_for_one() 616 617 618@types.coroutine 619def __sleep0(): 620 """Skip one event loop run cycle. 621 622 This is a private helper for 'asyncio.sleep()', used 623 when the 'delay' is set to 0. It uses a bare 'yield' 624 expression (which Task.__step knows how to handle) 625 instead of creating a Future object. 626 """ 627 yield 628 629 630async def sleep(delay, result=None, *, loop=None): 631 """Coroutine that completes after a given time (in seconds).""" 632 if delay <= 0: 633 await __sleep0() 634 return result 635 636 if loop is None: 637 loop = events.get_running_loop() 638 else: 639 warnings.warn("The loop argument is deprecated since Python 3.8, " 640 "and scheduled for removal in Python 3.10.", 641 DeprecationWarning, stacklevel=2) 642 643 future = loop.create_future() 644 h = loop.call_later(delay, 645 futures._set_result_unless_cancelled, 646 future, result) 647 try: 648 return await future 649 finally: 650 h.cancel() 651 652 653def ensure_future(coro_or_future, *, loop=None): 654 """Wrap a coroutine or an awaitable in a future. 655 656 If the argument is a Future, it is returned directly. 657 """ 658 if coroutines.iscoroutine(coro_or_future): 659 if loop is None: 660 loop = events.get_event_loop() 661 task = loop.create_task(coro_or_future) 662 if task._source_traceback: 663 del task._source_traceback[-1] 664 return task 665 elif futures.isfuture(coro_or_future): 666 if loop is not None and loop is not futures._get_loop(coro_or_future): 667 raise ValueError('The future belongs to a different loop than ' 668 'the one specified as the loop argument') 669 return coro_or_future 670 elif inspect.isawaitable(coro_or_future): 671 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 672 else: 673 raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 674 'required') 675 676 677@types.coroutine 678def _wrap_awaitable(awaitable): 679 """Helper for asyncio.ensure_future(). 680 681 Wraps awaitable (an object with __await__) into a coroutine 682 that will later be wrapped in a Task by ensure_future(). 683 """ 684 return (yield from awaitable.__await__()) 685 686_wrap_awaitable._is_coroutine = _is_coroutine 687 688 689class _GatheringFuture(futures.Future): 690 """Helper for gather(). 691 692 This overrides cancel() to cancel all the children and act more 693 like Task.cancel(), which doesn't immediately mark itself as 694 cancelled. 695 """ 696 697 def __init__(self, children, *, loop=None): 698 super().__init__(loop=loop) 699 self._children = children 700 self._cancel_requested = False 701 702 def cancel(self): 703 if self.done(): 704 return False 705 ret = False 706 for child in self._children: 707 if child.cancel(): 708 ret = True 709 if ret: 710 # If any child tasks were actually cancelled, we should 711 # propagate the cancellation request regardless of 712 # *return_exceptions* argument. See issue 32684. 713 self._cancel_requested = True 714 return ret 715 716 717def gather(*coros_or_futures, loop=None, return_exceptions=False): 718 """Return a future aggregating results from the given coroutines/futures. 719 720 Coroutines will be wrapped in a future and scheduled in the event 721 loop. They will not necessarily be scheduled in the same order as 722 passed in. 723 724 All futures must share the same event loop. If all the tasks are 725 done successfully, the returned future's result is the list of 726 results (in the order of the original sequence, not necessarily 727 the order of results arrival). If *return_exceptions* is True, 728 exceptions in the tasks are treated the same as successful 729 results, and gathered in the result list; otherwise, the first 730 raised exception will be immediately propagated to the returned 731 future. 732 733 Cancellation: if the outer Future is cancelled, all children (that 734 have not completed yet) are also cancelled. If any child is 735 cancelled, this is treated as if it raised CancelledError -- 736 the outer Future is *not* cancelled in this case. (This is to 737 prevent the cancellation of one child to cause other children to 738 be cancelled.) 739 """ 740 if not coros_or_futures: 741 if loop is None: 742 loop = events.get_event_loop() 743 else: 744 warnings.warn("The loop argument is deprecated since Python 3.8, " 745 "and scheduled for removal in Python 3.10.", 746 DeprecationWarning, stacklevel=2) 747 outer = loop.create_future() 748 outer.set_result([]) 749 return outer 750 751 def _done_callback(fut): 752 nonlocal nfinished 753 nfinished += 1 754 755 if outer.done(): 756 if not fut.cancelled(): 757 # Mark exception retrieved. 758 fut.exception() 759 return 760 761 if not return_exceptions: 762 if fut.cancelled(): 763 # Check if 'fut' is cancelled first, as 764 # 'fut.exception()' will *raise* a CancelledError 765 # instead of returning it. 766 exc = exceptions.CancelledError() 767 outer.set_exception(exc) 768 return 769 else: 770 exc = fut.exception() 771 if exc is not None: 772 outer.set_exception(exc) 773 return 774 775 if nfinished == nfuts: 776 # All futures are done; create a list of results 777 # and set it to the 'outer' future. 778 results = [] 779 780 for fut in children: 781 if fut.cancelled(): 782 # Check if 'fut' is cancelled first, as 783 # 'fut.exception()' will *raise* a CancelledError 784 # instead of returning it. 785 res = exceptions.CancelledError() 786 else: 787 res = fut.exception() 788 if res is None: 789 res = fut.result() 790 results.append(res) 791 792 if outer._cancel_requested: 793 # If gather is being cancelled we must propagate the 794 # cancellation regardless of *return_exceptions* argument. 795 # See issue 32684. 796 outer.set_exception(exceptions.CancelledError()) 797 else: 798 outer.set_result(results) 799 800 arg_to_fut = {} 801 children = [] 802 nfuts = 0 803 nfinished = 0 804 for arg in coros_or_futures: 805 if arg not in arg_to_fut: 806 fut = ensure_future(arg, loop=loop) 807 if loop is None: 808 loop = futures._get_loop(fut) 809 if fut is not arg: 810 # 'arg' was not a Future, therefore, 'fut' is a new 811 # Future created specifically for 'arg'. Since the caller 812 # can't control it, disable the "destroy pending task" 813 # warning. 814 fut._log_destroy_pending = False 815 816 nfuts += 1 817 arg_to_fut[arg] = fut 818 fut.add_done_callback(_done_callback) 819 820 else: 821 # There's a duplicate Future object in coros_or_futures. 822 fut = arg_to_fut[arg] 823 824 children.append(fut) 825 826 outer = _GatheringFuture(children, loop=loop) 827 return outer 828 829 830def shield(arg, *, loop=None): 831 """Wait for a future, shielding it from cancellation. 832 833 The statement 834 835 res = await shield(something()) 836 837 is exactly equivalent to the statement 838 839 res = await something() 840 841 *except* that if the coroutine containing it is cancelled, the 842 task running in something() is not cancelled. From the POV of 843 something(), the cancellation did not happen. But its caller is 844 still cancelled, so the yield-from expression still raises 845 CancelledError. Note: If something() is cancelled by other means 846 this will still cancel shield(). 847 848 If you want to completely ignore cancellation (not recommended) 849 you can combine shield() with a try/except clause, as follows: 850 851 try: 852 res = await shield(something()) 853 except CancelledError: 854 res = None 855 """ 856 if loop is not None: 857 warnings.warn("The loop argument is deprecated since Python 3.8, " 858 "and scheduled for removal in Python 3.10.", 859 DeprecationWarning, stacklevel=2) 860 inner = ensure_future(arg, loop=loop) 861 if inner.done(): 862 # Shortcut. 863 return inner 864 loop = futures._get_loop(inner) 865 outer = loop.create_future() 866 867 def _inner_done_callback(inner): 868 if outer.cancelled(): 869 if not inner.cancelled(): 870 # Mark inner's result as retrieved. 871 inner.exception() 872 return 873 874 if inner.cancelled(): 875 outer.cancel() 876 else: 877 exc = inner.exception() 878 if exc is not None: 879 outer.set_exception(exc) 880 else: 881 outer.set_result(inner.result()) 882 883 884 def _outer_done_callback(outer): 885 if not inner.done(): 886 inner.remove_done_callback(_inner_done_callback) 887 888 inner.add_done_callback(_inner_done_callback) 889 outer.add_done_callback(_outer_done_callback) 890 return outer 891 892 893def run_coroutine_threadsafe(coro, loop): 894 """Submit a coroutine object to a given event loop. 895 896 Return a concurrent.futures.Future to access the result. 897 """ 898 if not coroutines.iscoroutine(coro): 899 raise TypeError('A coroutine object is required') 900 future = concurrent.futures.Future() 901 902 def callback(): 903 try: 904 futures._chain_future(ensure_future(coro, loop=loop), future) 905 except (SystemExit, KeyboardInterrupt): 906 raise 907 except BaseException as exc: 908 if future.set_running_or_notify_cancel(): 909 future.set_exception(exc) 910 raise 911 912 loop.call_soon_threadsafe(callback) 913 return future 914 915 916# WeakSet containing all alive tasks. 917_all_tasks = weakref.WeakSet() 918 919# Dictionary containing tasks that are currently active in 920# all running event loops. {EventLoop: Task} 921_current_tasks = {} 922 923 924def _register_task(task): 925 """Register a new task in asyncio as executed by loop.""" 926 _all_tasks.add(task) 927 928 929def _enter_task(loop, task): 930 current_task = _current_tasks.get(loop) 931 if current_task is not None: 932 raise RuntimeError(f"Cannot enter into task {task!r} while another " 933 f"task {current_task!r} is being executed.") 934 _current_tasks[loop] = task 935 936 937def _leave_task(loop, task): 938 current_task = _current_tasks.get(loop) 939 if current_task is not task: 940 raise RuntimeError(f"Leaving task {task!r} does not match " 941 f"the current task {current_task!r}.") 942 del _current_tasks[loop] 943 944 945def _unregister_task(task): 946 """Unregister a task.""" 947 _all_tasks.discard(task) 948 949 950_py_register_task = _register_task 951_py_unregister_task = _unregister_task 952_py_enter_task = _enter_task 953_py_leave_task = _leave_task 954 955 956try: 957 from _asyncio import (_register_task, _unregister_task, 958 _enter_task, _leave_task, 959 _all_tasks, _current_tasks) 960except ImportError: 961 pass 962else: 963 _c_register_task = _register_task 964 _c_unregister_task = _unregister_task 965 _c_enter_task = _enter_task 966 _c_leave_task = _leave_task 967