1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10) 11 12import concurrent.futures 13import contextvars 14import functools 15import inspect 16import itertools 17import types 18import warnings 19import weakref 20from types import GenericAlias 21 22from . import base_tasks 23from . import coroutines 24from . import events 25from . import exceptions 26from . import futures 27from .coroutines import _is_coroutine 28 29# Helper to generate new task names 30# This uses itertools.count() instead of a "+= 1" operation because the latter 31# is not thread safe. See bpo-11866 for a longer explanation. 32_task_name_counter = itertools.count(1).__next__ 33 34 35def current_task(loop=None): 36 """Return a currently executed task.""" 37 if loop is None: 38 loop = events.get_running_loop() 39 return _current_tasks.get(loop) 40 41 42def all_tasks(loop=None): 43 """Return a set of all tasks for the loop.""" 44 if loop is None: 45 loop = events.get_running_loop() 46 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 47 # thread while we do so. Therefore we cast it to list prior to filtering. The list 48 # cast itself requires iteration, so we repeat it several times ignoring 49 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 50 # details. 51 i = 0 52 while True: 53 try: 54 tasks = list(_all_tasks) 55 except RuntimeError: 56 i += 1 57 if i >= 1000: 58 raise 59 else: 60 break 61 return {t for t in tasks 62 if futures._get_loop(t) is loop and not t.done()} 63 64 65def _set_task_name(task, name): 66 if name is not None: 67 try: 68 set_name = task.set_name 69 except AttributeError: 70 pass 71 else: 72 set_name(name) 73 74 75class Task(futures._PyFuture): # Inherit Python Task implementation 76 # from a Python Future implementation. 77 78 """A coroutine wrapped in a Future.""" 79 80 # An important invariant maintained while a Task not done: 81 # 82 # - Either _fut_waiter is None, and _step() is scheduled; 83 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 84 # 85 # The only transition from the latter to the former is through 86 # _wakeup(). When _fut_waiter is not None, one of its callbacks 87 # must be _wakeup(). 88 89 # If False, don't log a message if the task is destroyed whereas its 90 # status is still pending 91 _log_destroy_pending = True 92 93 def __init__(self, coro, *, loop=None, name=None): 94 super().__init__(loop=loop) 95 if self._source_traceback: 96 del self._source_traceback[-1] 97 if not coroutines.iscoroutine(coro): 98 # raise after Future.__init__(), attrs are required for __del__ 99 # prevent logging for pending task in __del__ 100 self._log_destroy_pending = False 101 raise TypeError(f"a coroutine was expected, got {coro!r}") 102 103 if name is None: 104 self._name = f'Task-{_task_name_counter()}' 105 else: 106 self._name = str(name) 107 108 self._must_cancel = False 109 self._fut_waiter = None 110 self._coro = coro 111 self._context = contextvars.copy_context() 112 113 self._loop.call_soon(self.__step, context=self._context) 114 _register_task(self) 115 116 def __del__(self): 117 if self._state == futures._PENDING and self._log_destroy_pending: 118 context = { 119 'task': self, 120 'message': 'Task was destroyed but it is pending!', 121 } 122 if self._source_traceback: 123 context['source_traceback'] = self._source_traceback 124 self._loop.call_exception_handler(context) 125 super().__del__() 126 127 __class_getitem__ = classmethod(GenericAlias) 128 129 def _repr_info(self): 130 return base_tasks._task_repr_info(self) 131 132 def get_coro(self): 133 return self._coro 134 135 def get_name(self): 136 return self._name 137 138 def set_name(self, value): 139 self._name = str(value) 140 141 def set_result(self, result): 142 raise RuntimeError('Task does not support set_result operation') 143 144 def set_exception(self, exception): 145 raise RuntimeError('Task does not support set_exception operation') 146 147 def get_stack(self, *, limit=None): 148 """Return the list of stack frames for this task's coroutine. 149 150 If the coroutine is not done, this returns the stack where it is 151 suspended. If the coroutine has completed successfully or was 152 cancelled, this returns an empty list. If the coroutine was 153 terminated by an exception, this returns the list of traceback 154 frames. 155 156 The frames are always ordered from oldest to newest. 157 158 The optional limit gives the maximum number of frames to 159 return; by default all available frames are returned. Its 160 meaning differs depending on whether a stack or a traceback is 161 returned: the newest frames of a stack are returned, but the 162 oldest frames of a traceback are returned. (This matches the 163 behavior of the traceback module.) 164 165 For reasons beyond our control, only one stack frame is 166 returned for a suspended coroutine. 167 """ 168 return base_tasks._task_get_stack(self, limit) 169 170 def print_stack(self, *, limit=None, file=None): 171 """Print the stack or traceback for this task's coroutine. 172 173 This produces output similar to that of the traceback module, 174 for the frames retrieved by get_stack(). The limit argument 175 is passed to get_stack(). The file argument is an I/O stream 176 to which the output is written; by default output is written 177 to sys.stderr. 178 """ 179 return base_tasks._task_print_stack(self, limit, file) 180 181 def cancel(self, msg=None): 182 """Request that this task cancel itself. 183 184 This arranges for a CancelledError to be thrown into the 185 wrapped coroutine on the next cycle through the event loop. 186 The coroutine then has a chance to clean up or even deny 187 the request using try/except/finally. 188 189 Unlike Future.cancel, this does not guarantee that the 190 task will be cancelled: the exception might be caught and 191 acted upon, delaying cancellation of the task or preventing 192 cancellation completely. The task may also return a value or 193 raise a different exception. 194 195 Immediately after this method is called, Task.cancelled() will 196 not return True (unless the task was already cancelled). A 197 task will be marked as cancelled when the wrapped coroutine 198 terminates with a CancelledError exception (even if cancel() 199 was not called). 200 """ 201 self._log_traceback = False 202 if self.done(): 203 return False 204 if self._fut_waiter is not None: 205 if self._fut_waiter.cancel(msg=msg): 206 # Leave self._fut_waiter; it may be a Task that 207 # catches and ignores the cancellation so we may have 208 # to cancel it again later. 209 return True 210 # It must be the case that self.__step is already scheduled. 211 self._must_cancel = True 212 self._cancel_message = msg 213 return True 214 215 def __step(self, exc=None): 216 if self.done(): 217 raise exceptions.InvalidStateError( 218 f'_step(): already done: {self!r}, {exc!r}') 219 if self._must_cancel: 220 if not isinstance(exc, exceptions.CancelledError): 221 exc = self._make_cancelled_error() 222 self._must_cancel = False 223 coro = self._coro 224 self._fut_waiter = None 225 226 _enter_task(self._loop, self) 227 # Call either coro.throw(exc) or coro.send(None). 228 try: 229 if exc is None: 230 # We use the `send` method directly, because coroutines 231 # don't have `__iter__` and `__next__` methods. 232 result = coro.send(None) 233 else: 234 result = coro.throw(exc) 235 except StopIteration as exc: 236 if self._must_cancel: 237 # Task is cancelled right before coro stops. 238 self._must_cancel = False 239 super().cancel(msg=self._cancel_message) 240 else: 241 super().set_result(exc.value) 242 except exceptions.CancelledError as exc: 243 # Save the original exception so we can chain it later. 244 self._cancelled_exc = exc 245 super().cancel() # I.e., Future.cancel(self). 246 except (KeyboardInterrupt, SystemExit) as exc: 247 super().set_exception(exc) 248 raise 249 except BaseException as exc: 250 super().set_exception(exc) 251 else: 252 blocking = getattr(result, '_asyncio_future_blocking', None) 253 if blocking is not None: 254 # Yielded Future must come from Future.__iter__(). 255 if futures._get_loop(result) is not self._loop: 256 new_exc = RuntimeError( 257 f'Task {self!r} got Future ' 258 f'{result!r} attached to a different loop') 259 self._loop.call_soon( 260 self.__step, new_exc, context=self._context) 261 elif blocking: 262 if result is self: 263 new_exc = RuntimeError( 264 f'Task cannot await on itself: {self!r}') 265 self._loop.call_soon( 266 self.__step, new_exc, context=self._context) 267 else: 268 result._asyncio_future_blocking = False 269 result.add_done_callback( 270 self.__wakeup, context=self._context) 271 self._fut_waiter = result 272 if self._must_cancel: 273 if self._fut_waiter.cancel( 274 msg=self._cancel_message): 275 self._must_cancel = False 276 else: 277 new_exc = RuntimeError( 278 f'yield was used instead of yield from ' 279 f'in task {self!r} with {result!r}') 280 self._loop.call_soon( 281 self.__step, new_exc, context=self._context) 282 283 elif result is None: 284 # Bare yield relinquishes control for one event loop iteration. 285 self._loop.call_soon(self.__step, context=self._context) 286 elif inspect.isgenerator(result): 287 # Yielding a generator is just wrong. 288 new_exc = RuntimeError( 289 f'yield was used instead of yield from for ' 290 f'generator in task {self!r} with {result!r}') 291 self._loop.call_soon( 292 self.__step, new_exc, context=self._context) 293 else: 294 # Yielding something else is an error. 295 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 296 self._loop.call_soon( 297 self.__step, new_exc, context=self._context) 298 finally: 299 _leave_task(self._loop, self) 300 self = None # Needed to break cycles when an exception occurs. 301 302 def __wakeup(self, future): 303 try: 304 future.result() 305 except BaseException as exc: 306 # This may also be a cancellation. 307 self.__step(exc) 308 else: 309 # Don't pass the value of `future.result()` explicitly, 310 # as `Future.__iter__` and `Future.__await__` don't need it. 311 # If we call `_step(value, None)` instead of `_step()`, 312 # Python eval loop would use `.send(value)` method call, 313 # instead of `__next__()`, which is slower for futures 314 # that return non-generator iterators from their `__iter__`. 315 self.__step() 316 self = None # Needed to break cycles when an exception occurs. 317 318 319_PyTask = Task 320 321 322try: 323 import _asyncio 324except ImportError: 325 pass 326else: 327 # _CTask is needed for tests. 328 Task = _CTask = _asyncio.Task 329 330 331def create_task(coro, *, name=None): 332 """Schedule the execution of a coroutine object in a spawn task. 333 334 Return a Task object. 335 """ 336 loop = events.get_running_loop() 337 task = loop.create_task(coro) 338 _set_task_name(task, name) 339 return task 340 341 342# wait() and as_completed() similar to those in PEP 3148. 343 344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 347 348 349async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 350 """Wait for the Futures and coroutines given by fs to complete. 351 352 The fs iterable must not be empty. 353 354 Coroutines will be wrapped in Tasks. 355 356 Returns two sets of Future: (done, pending). 357 358 Usage: 359 360 done, pending = await asyncio.wait(fs) 361 362 Note: This does not raise TimeoutError! Futures that aren't done 363 when the timeout occurs are returned in the second set. 364 """ 365 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 366 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 367 if not fs: 368 raise ValueError('Set of coroutines/Futures is empty.') 369 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 370 raise ValueError(f'Invalid return_when value: {return_when}') 371 372 loop = events.get_running_loop() 373 374 fs = set(fs) 375 376 if any(coroutines.iscoroutine(f) for f in fs): 377 warnings.warn("The explicit passing of coroutine objects to " 378 "asyncio.wait() is deprecated since Python 3.8, and " 379 "scheduled for removal in Python 3.11.", 380 DeprecationWarning, stacklevel=2) 381 382 fs = {ensure_future(f, loop=loop) for f in fs} 383 384 return await _wait(fs, timeout, return_when, loop) 385 386 387def _release_waiter(waiter, *args): 388 if not waiter.done(): 389 waiter.set_result(None) 390 391 392async def wait_for(fut, timeout): 393 """Wait for the single Future or coroutine to complete, with timeout. 394 395 Coroutine will be wrapped in Task. 396 397 Returns result of the Future or coroutine. When a timeout occurs, 398 it cancels the task and raises TimeoutError. To avoid the task 399 cancellation, wrap it in shield(). 400 401 If the wait is cancelled, the task is also cancelled. 402 403 This function is a coroutine. 404 """ 405 loop = events.get_running_loop() 406 407 if timeout is None: 408 return await fut 409 410 if timeout <= 0: 411 fut = ensure_future(fut, loop=loop) 412 413 if fut.done(): 414 return fut.result() 415 416 await _cancel_and_wait(fut, loop=loop) 417 try: 418 return fut.result() 419 except exceptions.CancelledError as exc: 420 raise exceptions.TimeoutError() from exc 421 422 waiter = loop.create_future() 423 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 424 cb = functools.partial(_release_waiter, waiter) 425 426 fut = ensure_future(fut, loop=loop) 427 fut.add_done_callback(cb) 428 429 try: 430 # wait until the future completes or the timeout 431 try: 432 await waiter 433 except exceptions.CancelledError: 434 if fut.done(): 435 return fut.result() 436 else: 437 fut.remove_done_callback(cb) 438 # We must ensure that the task is not running 439 # after wait_for() returns. 440 # See https://bugs.python.org/issue32751 441 await _cancel_and_wait(fut, loop=loop) 442 raise 443 444 if fut.done(): 445 return fut.result() 446 else: 447 fut.remove_done_callback(cb) 448 # We must ensure that the task is not running 449 # after wait_for() returns. 450 # See https://bugs.python.org/issue32751 451 await _cancel_and_wait(fut, loop=loop) 452 # In case task cancellation failed with some 453 # exception, we should re-raise it 454 # See https://bugs.python.org/issue40607 455 try: 456 return fut.result() 457 except exceptions.CancelledError as exc: 458 raise exceptions.TimeoutError() from exc 459 finally: 460 timeout_handle.cancel() 461 462 463async def _wait(fs, timeout, return_when, loop): 464 """Internal helper for wait(). 465 466 The fs argument must be a collection of Futures. 467 """ 468 assert fs, 'Set of Futures is empty.' 469 waiter = loop.create_future() 470 timeout_handle = None 471 if timeout is not None: 472 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 473 counter = len(fs) 474 475 def _on_completion(f): 476 nonlocal counter 477 counter -= 1 478 if (counter <= 0 or 479 return_when == FIRST_COMPLETED or 480 return_when == FIRST_EXCEPTION and (not f.cancelled() and 481 f.exception() is not None)): 482 if timeout_handle is not None: 483 timeout_handle.cancel() 484 if not waiter.done(): 485 waiter.set_result(None) 486 487 for f in fs: 488 f.add_done_callback(_on_completion) 489 490 try: 491 await waiter 492 finally: 493 if timeout_handle is not None: 494 timeout_handle.cancel() 495 for f in fs: 496 f.remove_done_callback(_on_completion) 497 498 done, pending = set(), set() 499 for f in fs: 500 if f.done(): 501 done.add(f) 502 else: 503 pending.add(f) 504 return done, pending 505 506 507async def _cancel_and_wait(fut, loop): 508 """Cancel the *fut* future or task and wait until it completes.""" 509 510 waiter = loop.create_future() 511 cb = functools.partial(_release_waiter, waiter) 512 fut.add_done_callback(cb) 513 514 try: 515 fut.cancel() 516 # We cannot wait on *fut* directly to make 517 # sure _cancel_and_wait itself is reliably cancellable. 518 await waiter 519 finally: 520 fut.remove_done_callback(cb) 521 522 523# This is *not* a @coroutine! It is just an iterator (yielding Futures). 524def as_completed(fs, *, timeout=None): 525 """Return an iterator whose values are coroutines. 526 527 When waiting for the yielded coroutines you'll get the results (or 528 exceptions!) of the original Futures (or coroutines), in the order 529 in which and as soon as they complete. 530 531 This differs from PEP 3148; the proper way to use this is: 532 533 for f in as_completed(fs): 534 result = await f # The 'await' may raise. 535 # Use result. 536 537 If a timeout is specified, the 'await' will raise 538 TimeoutError when the timeout occurs before all Futures are done. 539 540 Note: The futures 'f' are not necessarily members of fs. 541 """ 542 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 543 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") 544 545 from .queues import Queue # Import here to avoid circular import problem. 546 done = Queue() 547 548 loop = events._get_event_loop() 549 todo = {ensure_future(f, loop=loop) for f in set(fs)} 550 timeout_handle = None 551 552 def _on_timeout(): 553 for f in todo: 554 f.remove_done_callback(_on_completion) 555 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 556 todo.clear() # Can't do todo.remove(f) in the loop. 557 558 def _on_completion(f): 559 if not todo: 560 return # _on_timeout() was here first. 561 todo.remove(f) 562 done.put_nowait(f) 563 if not todo and timeout_handle is not None: 564 timeout_handle.cancel() 565 566 async def _wait_for_one(): 567 f = await done.get() 568 if f is None: 569 # Dummy value from _on_timeout(). 570 raise exceptions.TimeoutError 571 return f.result() # May raise f.exception(). 572 573 for f in todo: 574 f.add_done_callback(_on_completion) 575 if todo and timeout is not None: 576 timeout_handle = loop.call_later(timeout, _on_timeout) 577 for _ in range(len(todo)): 578 yield _wait_for_one() 579 580 581@types.coroutine 582def __sleep0(): 583 """Skip one event loop run cycle. 584 585 This is a private helper for 'asyncio.sleep()', used 586 when the 'delay' is set to 0. It uses a bare 'yield' 587 expression (which Task.__step knows how to handle) 588 instead of creating a Future object. 589 """ 590 yield 591 592 593async def sleep(delay, result=None): 594 """Coroutine that completes after a given time (in seconds).""" 595 if delay <= 0: 596 await __sleep0() 597 return result 598 599 loop = events.get_running_loop() 600 future = loop.create_future() 601 h = loop.call_later(delay, 602 futures._set_result_unless_cancelled, 603 future, result) 604 try: 605 return await future 606 finally: 607 h.cancel() 608 609 610def ensure_future(coro_or_future, *, loop=None): 611 """Wrap a coroutine or an awaitable in a future. 612 613 If the argument is a Future, it is returned directly. 614 """ 615 return _ensure_future(coro_or_future, loop=loop) 616 617 618def _ensure_future(coro_or_future, *, loop=None): 619 if futures.isfuture(coro_or_future): 620 if loop is not None and loop is not futures._get_loop(coro_or_future): 621 raise ValueError('The future belongs to a different loop than ' 622 'the one specified as the loop argument') 623 return coro_or_future 624 called_wrap_awaitable = False 625 if not coroutines.iscoroutine(coro_or_future): 626 if inspect.isawaitable(coro_or_future): 627 coro_or_future = _wrap_awaitable(coro_or_future) 628 called_wrap_awaitable = True 629 else: 630 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 631 'is required') 632 633 if loop is None: 634 loop = events._get_event_loop(stacklevel=4) 635 try: 636 return loop.create_task(coro_or_future) 637 except RuntimeError: 638 if not called_wrap_awaitable: 639 coro_or_future.close() 640 raise 641 642 643@types.coroutine 644def _wrap_awaitable(awaitable): 645 """Helper for asyncio.ensure_future(). 646 647 Wraps awaitable (an object with __await__) into a coroutine 648 that will later be wrapped in a Task by ensure_future(). 649 """ 650 return (yield from awaitable.__await__()) 651 652_wrap_awaitable._is_coroutine = _is_coroutine 653 654 655class _GatheringFuture(futures.Future): 656 """Helper for gather(). 657 658 This overrides cancel() to cancel all the children and act more 659 like Task.cancel(), which doesn't immediately mark itself as 660 cancelled. 661 """ 662 663 def __init__(self, children, *, loop): 664 assert loop is not None 665 super().__init__(loop=loop) 666 self._children = children 667 self._cancel_requested = False 668 669 def cancel(self, msg=None): 670 if self.done(): 671 return False 672 ret = False 673 for child in self._children: 674 if child.cancel(msg=msg): 675 ret = True 676 if ret: 677 # If any child tasks were actually cancelled, we should 678 # propagate the cancellation request regardless of 679 # *return_exceptions* argument. See issue 32684. 680 self._cancel_requested = True 681 return ret 682 683 684def gather(*coros_or_futures, return_exceptions=False): 685 """Return a future aggregating results from the given coroutines/futures. 686 687 Coroutines will be wrapped in a future and scheduled in the event 688 loop. They will not necessarily be scheduled in the same order as 689 passed in. 690 691 All futures must share the same event loop. If all the tasks are 692 done successfully, the returned future's result is the list of 693 results (in the order of the original sequence, not necessarily 694 the order of results arrival). If *return_exceptions* is True, 695 exceptions in the tasks are treated the same as successful 696 results, and gathered in the result list; otherwise, the first 697 raised exception will be immediately propagated to the returned 698 future. 699 700 Cancellation: if the outer Future is cancelled, all children (that 701 have not completed yet) are also cancelled. If any child is 702 cancelled, this is treated as if it raised CancelledError -- 703 the outer Future is *not* cancelled in this case. (This is to 704 prevent the cancellation of one child to cause other children to 705 be cancelled.) 706 707 If *return_exceptions* is False, cancelling gather() after it 708 has been marked done won't cancel any submitted awaitables. 709 For instance, gather can be marked done after propagating an 710 exception to the caller, therefore, calling ``gather.cancel()`` 711 after catching an exception (raised by one of the awaitables) from 712 gather won't cancel any other awaitables. 713 """ 714 if not coros_or_futures: 715 loop = events._get_event_loop() 716 outer = loop.create_future() 717 outer.set_result([]) 718 return outer 719 720 def _done_callback(fut): 721 nonlocal nfinished 722 nfinished += 1 723 724 if outer is None or outer.done(): 725 if not fut.cancelled(): 726 # Mark exception retrieved. 727 fut.exception() 728 return 729 730 if not return_exceptions: 731 if fut.cancelled(): 732 # Check if 'fut' is cancelled first, as 733 # 'fut.exception()' will *raise* a CancelledError 734 # instead of returning it. 735 exc = fut._make_cancelled_error() 736 outer.set_exception(exc) 737 return 738 else: 739 exc = fut.exception() 740 if exc is not None: 741 outer.set_exception(exc) 742 return 743 744 if nfinished == nfuts: 745 # All futures are done; create a list of results 746 # and set it to the 'outer' future. 747 results = [] 748 749 for fut in children: 750 if fut.cancelled(): 751 # Check if 'fut' is cancelled first, as 'fut.exception()' 752 # will *raise* a CancelledError instead of returning it. 753 # Also, since we're adding the exception return value 754 # to 'results' instead of raising it, don't bother 755 # setting __context__. This also lets us preserve 756 # calling '_make_cancelled_error()' at most once. 757 res = exceptions.CancelledError( 758 '' if fut._cancel_message is None else 759 fut._cancel_message) 760 else: 761 res = fut.exception() 762 if res is None: 763 res = fut.result() 764 results.append(res) 765 766 if outer._cancel_requested: 767 # If gather is being cancelled we must propagate the 768 # cancellation regardless of *return_exceptions* argument. 769 # See issue 32684. 770 exc = fut._make_cancelled_error() 771 outer.set_exception(exc) 772 else: 773 outer.set_result(results) 774 775 arg_to_fut = {} 776 children = [] 777 nfuts = 0 778 nfinished = 0 779 loop = None 780 outer = None # bpo-46672 781 for arg in coros_or_futures: 782 if arg not in arg_to_fut: 783 fut = _ensure_future(arg, loop=loop) 784 if loop is None: 785 loop = futures._get_loop(fut) 786 if fut is not arg: 787 # 'arg' was not a Future, therefore, 'fut' is a new 788 # Future created specifically for 'arg'. Since the caller 789 # can't control it, disable the "destroy pending task" 790 # warning. 791 fut._log_destroy_pending = False 792 793 nfuts += 1 794 arg_to_fut[arg] = fut 795 fut.add_done_callback(_done_callback) 796 797 else: 798 # There's a duplicate Future object in coros_or_futures. 799 fut = arg_to_fut[arg] 800 801 children.append(fut) 802 803 outer = _GatheringFuture(children, loop=loop) 804 return outer 805 806 807def shield(arg): 808 """Wait for a future, shielding it from cancellation. 809 810 The statement 811 812 res = await shield(something()) 813 814 is exactly equivalent to the statement 815 816 res = await something() 817 818 *except* that if the coroutine containing it is cancelled, the 819 task running in something() is not cancelled. From the POV of 820 something(), the cancellation did not happen. But its caller is 821 still cancelled, so the yield-from expression still raises 822 CancelledError. Note: If something() is cancelled by other means 823 this will still cancel shield(). 824 825 If you want to completely ignore cancellation (not recommended) 826 you can combine shield() with a try/except clause, as follows: 827 828 try: 829 res = await shield(something()) 830 except CancelledError: 831 res = None 832 """ 833 inner = _ensure_future(arg) 834 if inner.done(): 835 # Shortcut. 836 return inner 837 loop = futures._get_loop(inner) 838 outer = loop.create_future() 839 840 def _inner_done_callback(inner): 841 if outer.cancelled(): 842 if not inner.cancelled(): 843 # Mark inner's result as retrieved. 844 inner.exception() 845 return 846 847 if inner.cancelled(): 848 outer.cancel() 849 else: 850 exc = inner.exception() 851 if exc is not None: 852 outer.set_exception(exc) 853 else: 854 outer.set_result(inner.result()) 855 856 857 def _outer_done_callback(outer): 858 if not inner.done(): 859 inner.remove_done_callback(_inner_done_callback) 860 861 inner.add_done_callback(_inner_done_callback) 862 outer.add_done_callback(_outer_done_callback) 863 return outer 864 865 866def run_coroutine_threadsafe(coro, loop): 867 """Submit a coroutine object to a given event loop. 868 869 Return a concurrent.futures.Future to access the result. 870 """ 871 if not coroutines.iscoroutine(coro): 872 raise TypeError('A coroutine object is required') 873 future = concurrent.futures.Future() 874 875 def callback(): 876 try: 877 futures._chain_future(ensure_future(coro, loop=loop), future) 878 except (SystemExit, KeyboardInterrupt): 879 raise 880 except BaseException as exc: 881 if future.set_running_or_notify_cancel(): 882 future.set_exception(exc) 883 raise 884 885 loop.call_soon_threadsafe(callback) 886 return future 887 888 889# WeakSet containing all alive tasks. 890_all_tasks = weakref.WeakSet() 891 892# Dictionary containing tasks that are currently active in 893# all running event loops. {EventLoop: Task} 894_current_tasks = {} 895 896 897def _register_task(task): 898 """Register a new task in asyncio as executed by loop.""" 899 _all_tasks.add(task) 900 901 902def _enter_task(loop, task): 903 current_task = _current_tasks.get(loop) 904 if current_task is not None: 905 raise RuntimeError(f"Cannot enter into task {task!r} while another " 906 f"task {current_task!r} is being executed.") 907 _current_tasks[loop] = task 908 909 910def _leave_task(loop, task): 911 current_task = _current_tasks.get(loop) 912 if current_task is not task: 913 raise RuntimeError(f"Leaving task {task!r} does not match " 914 f"the current task {current_task!r}.") 915 del _current_tasks[loop] 916 917 918def _unregister_task(task): 919 """Unregister a task.""" 920 _all_tasks.discard(task) 921 922 923_py_register_task = _register_task 924_py_unregister_task = _unregister_task 925_py_enter_task = _enter_task 926_py_leave_task = _leave_task 927 928 929try: 930 from _asyncio import (_register_task, _unregister_task, 931 _enter_task, _leave_task, 932 _all_tasks, _current_tasks) 933except ImportError: 934 pass 935else: 936 _c_register_task = _register_task 937 _c_unregister_task = _unregister_task 938 _c_enter_task = _enter_task 939 _c_leave_task = _leave_task 940