1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 10) 11 12import concurrent.futures 13import contextvars 14import functools 15import inspect 16import itertools 17import types 18import warnings 19import weakref 20 21from . import base_tasks 22from . import coroutines 23from . import events 24from . import exceptions 25from . import futures 26from .coroutines import _is_coroutine 27 28# Helper to generate new task names 29# This uses itertools.count() instead of a "+= 1" operation because the latter 30# is not thread safe. See bpo-11866 for a longer explanation. 31_task_name_counter = itertools.count(1).__next__ 32 33 34def current_task(loop=None): 35 """Return a currently executed task.""" 36 if loop is None: 37 loop = events.get_running_loop() 38 return _current_tasks.get(loop) 39 40 41def all_tasks(loop=None): 42 """Return a set of all tasks for the loop.""" 43 if loop is None: 44 loop = events.get_running_loop() 45 # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another 46 # thread while we do so. Therefore we cast it to list prior to filtering. The list 47 # cast itself requires iteration, so we repeat it several times ignoring 48 # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for 49 # details. 50 i = 0 51 while True: 52 try: 53 tasks = list(_all_tasks) 54 except RuntimeError: 55 i += 1 56 if i >= 1000: 57 raise 58 else: 59 break 60 return {t for t in tasks 61 if futures._get_loop(t) is loop and not t.done()} 62 63 64def _set_task_name(task, name): 65 if name is not None: 66 try: 67 set_name = task.set_name 68 except AttributeError: 69 pass 70 else: 71 set_name(name) 72 73 74class Task(futures._PyFuture): # Inherit Python Task implementation 75 # from a Python Future implementation. 76 77 """A coroutine wrapped in a Future.""" 78 79 # An important invariant maintained while a Task not done: 80 # 81 # - Either _fut_waiter is None, and _step() is scheduled; 82 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 83 # 84 # The only transition from the latter to the former is through 85 # _wakeup(). When _fut_waiter is not None, one of its callbacks 86 # must be _wakeup(). 87 88 # If False, don't log a message if the task is destroyed whereas its 89 # status is still pending 90 _log_destroy_pending = True 91 92 def __init__(self, coro, *, loop=None, name=None): 93 super().__init__(loop=loop) 94 if self._source_traceback: 95 del self._source_traceback[-1] 96 if not coroutines.iscoroutine(coro): 97 # raise after Future.__init__(), attrs are required for __del__ 98 # prevent logging for pending task in __del__ 99 self._log_destroy_pending = False 100 raise TypeError(f"a coroutine was expected, got {coro!r}") 101 102 if name is None: 103 self._name = f'Task-{_task_name_counter()}' 104 else: 105 self._name = str(name) 106 107 self._must_cancel = False 108 self._fut_waiter = None 109 self._coro = coro 110 self._context = contextvars.copy_context() 111 112 self._loop.call_soon(self.__step, context=self._context) 113 _register_task(self) 114 115 def __del__(self): 116 if self._state == futures._PENDING and self._log_destroy_pending: 117 context = { 118 'task': self, 119 'message': 'Task was destroyed but it is pending!', 120 } 121 if self._source_traceback: 122 context['source_traceback'] = self._source_traceback 123 self._loop.call_exception_handler(context) 124 super().__del__() 125 126 def __class_getitem__(cls, type): 127 return cls 128 129 def _repr_info(self): 130 return base_tasks._task_repr_info(self) 131 132 def get_coro(self): 133 return self._coro 134 135 def get_name(self): 136 return self._name 137 138 def set_name(self, value): 139 self._name = str(value) 140 141 def set_result(self, result): 142 raise RuntimeError('Task does not support set_result operation') 143 144 def set_exception(self, exception): 145 raise RuntimeError('Task does not support set_exception operation') 146 147 def get_stack(self, *, limit=None): 148 """Return the list of stack frames for this task's coroutine. 149 150 If the coroutine is not done, this returns the stack where it is 151 suspended. If the coroutine has completed successfully or was 152 cancelled, this returns an empty list. If the coroutine was 153 terminated by an exception, this returns the list of traceback 154 frames. 155 156 The frames are always ordered from oldest to newest. 157 158 The optional limit gives the maximum number of frames to 159 return; by default all available frames are returned. Its 160 meaning differs depending on whether a stack or a traceback is 161 returned: the newest frames of a stack are returned, but the 162 oldest frames of a traceback are returned. (This matches the 163 behavior of the traceback module.) 164 165 For reasons beyond our control, only one stack frame is 166 returned for a suspended coroutine. 167 """ 168 return base_tasks._task_get_stack(self, limit) 169 170 def print_stack(self, *, limit=None, file=None): 171 """Print the stack or traceback for this task's coroutine. 172 173 This produces output similar to that of the traceback module, 174 for the frames retrieved by get_stack(). The limit argument 175 is passed to get_stack(). The file argument is an I/O stream 176 to which the output is written; by default output is written 177 to sys.stderr. 178 """ 179 return base_tasks._task_print_stack(self, limit, file) 180 181 def cancel(self, msg=None): 182 """Request that this task cancel itself. 183 184 This arranges for a CancelledError to be thrown into the 185 wrapped coroutine on the next cycle through the event loop. 186 The coroutine then has a chance to clean up or even deny 187 the request using try/except/finally. 188 189 Unlike Future.cancel, this does not guarantee that the 190 task will be cancelled: the exception might be caught and 191 acted upon, delaying cancellation of the task or preventing 192 cancellation completely. The task may also return a value or 193 raise a different exception. 194 195 Immediately after this method is called, Task.cancelled() will 196 not return True (unless the task was already cancelled). A 197 task will be marked as cancelled when the wrapped coroutine 198 terminates with a CancelledError exception (even if cancel() 199 was not called). 200 """ 201 self._log_traceback = False 202 if self.done(): 203 return False 204 if self._fut_waiter is not None: 205 if self._fut_waiter.cancel(msg=msg): 206 # Leave self._fut_waiter; it may be a Task that 207 # catches and ignores the cancellation so we may have 208 # to cancel it again later. 209 return True 210 # It must be the case that self.__step is already scheduled. 211 self._must_cancel = True 212 self._cancel_message = msg 213 return True 214 215 def __step(self, exc=None): 216 if self.done(): 217 raise exceptions.InvalidStateError( 218 f'_step(): already done: {self!r}, {exc!r}') 219 if self._must_cancel: 220 if not isinstance(exc, exceptions.CancelledError): 221 exc = self._make_cancelled_error() 222 self._must_cancel = False 223 coro = self._coro 224 self._fut_waiter = None 225 226 _enter_task(self._loop, self) 227 # Call either coro.throw(exc) or coro.send(None). 228 try: 229 if exc is None: 230 # We use the `send` method directly, because coroutines 231 # don't have `__iter__` and `__next__` methods. 232 result = coro.send(None) 233 else: 234 result = coro.throw(exc) 235 except StopIteration as exc: 236 if self._must_cancel: 237 # Task is cancelled right before coro stops. 238 self._must_cancel = False 239 super().cancel(msg=self._cancel_message) 240 else: 241 super().set_result(exc.value) 242 except exceptions.CancelledError as exc: 243 # Save the original exception so we can chain it later. 244 self._cancelled_exc = exc 245 super().cancel() # I.e., Future.cancel(self). 246 except (KeyboardInterrupt, SystemExit) as exc: 247 super().set_exception(exc) 248 raise 249 except BaseException as exc: 250 super().set_exception(exc) 251 else: 252 blocking = getattr(result, '_asyncio_future_blocking', None) 253 if blocking is not None: 254 # Yielded Future must come from Future.__iter__(). 255 if futures._get_loop(result) is not self._loop: 256 new_exc = RuntimeError( 257 f'Task {self!r} got Future ' 258 f'{result!r} attached to a different loop') 259 self._loop.call_soon( 260 self.__step, new_exc, context=self._context) 261 elif blocking: 262 if result is self: 263 new_exc = RuntimeError( 264 f'Task cannot await on itself: {self!r}') 265 self._loop.call_soon( 266 self.__step, new_exc, context=self._context) 267 else: 268 result._asyncio_future_blocking = False 269 result.add_done_callback( 270 self.__wakeup, context=self._context) 271 self._fut_waiter = result 272 if self._must_cancel: 273 if self._fut_waiter.cancel( 274 msg=self._cancel_message): 275 self._must_cancel = False 276 else: 277 new_exc = RuntimeError( 278 f'yield was used instead of yield from ' 279 f'in task {self!r} with {result!r}') 280 self._loop.call_soon( 281 self.__step, new_exc, context=self._context) 282 283 elif result is None: 284 # Bare yield relinquishes control for one event loop iteration. 285 self._loop.call_soon(self.__step, context=self._context) 286 elif inspect.isgenerator(result): 287 # Yielding a generator is just wrong. 288 new_exc = RuntimeError( 289 f'yield was used instead of yield from for ' 290 f'generator in task {self!r} with {result!r}') 291 self._loop.call_soon( 292 self.__step, new_exc, context=self._context) 293 else: 294 # Yielding something else is an error. 295 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 296 self._loop.call_soon( 297 self.__step, new_exc, context=self._context) 298 finally: 299 _leave_task(self._loop, self) 300 self = None # Needed to break cycles when an exception occurs. 301 302 def __wakeup(self, future): 303 try: 304 future.result() 305 except BaseException as exc: 306 # This may also be a cancellation. 307 self.__step(exc) 308 else: 309 # Don't pass the value of `future.result()` explicitly, 310 # as `Future.__iter__` and `Future.__await__` don't need it. 311 # If we call `_step(value, None)` instead of `_step()`, 312 # Python eval loop would use `.send(value)` method call, 313 # instead of `__next__()`, which is slower for futures 314 # that return non-generator iterators from their `__iter__`. 315 self.__step() 316 self = None # Needed to break cycles when an exception occurs. 317 318 319_PyTask = Task 320 321 322try: 323 import _asyncio 324except ImportError: 325 pass 326else: 327 # _CTask is needed for tests. 328 Task = _CTask = _asyncio.Task 329 330 331def create_task(coro, *, name=None): 332 """Schedule the execution of a coroutine object in a spawn task. 333 334 Return a Task object. 335 """ 336 loop = events.get_running_loop() 337 task = loop.create_task(coro) 338 _set_task_name(task, name) 339 return task 340 341 342# wait() and as_completed() similar to those in PEP 3148. 343 344FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 345FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 346ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 347 348 349async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 350 """Wait for the Futures and coroutines given by fs to complete. 351 352 The fs iterable must not be empty. 353 354 Coroutines will be wrapped in Tasks. 355 356 Returns two sets of Future: (done, pending). 357 358 Usage: 359 360 done, pending = await asyncio.wait(fs) 361 362 Note: This does not raise TimeoutError! Futures that aren't done 363 when the timeout occurs are returned in the second set. 364 """ 365 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 366 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 367 if not fs: 368 raise ValueError('Set of coroutines/Futures is empty.') 369 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 370 raise ValueError(f'Invalid return_when value: {return_when}') 371 372 loop = events.get_running_loop() 373 374 fs = set(fs) 375 376 if any(coroutines.iscoroutine(f) for f in fs): 377 warnings.warn("The explicit passing of coroutine objects to " 378 "asyncio.wait() is deprecated since Python 3.8, and " 379 "scheduled for removal in Python 3.11.", 380 DeprecationWarning, stacklevel=2) 381 382 fs = {ensure_future(f, loop=loop) for f in fs} 383 384 return await _wait(fs, timeout, return_when, loop) 385 386 387def _release_waiter(waiter, *args): 388 if not waiter.done(): 389 waiter.set_result(None) 390 391 392async def wait_for(fut, timeout): 393 """Wait for the single Future or coroutine to complete, with timeout. 394 395 Coroutine will be wrapped in Task. 396 397 Returns result of the Future or coroutine. When a timeout occurs, 398 it cancels the task and raises TimeoutError. To avoid the task 399 cancellation, wrap it in shield(). 400 401 If the wait is cancelled, the task is also cancelled. 402 403 This function is a coroutine. 404 """ 405 loop = events.get_running_loop() 406 407 if timeout is None: 408 return await fut 409 410 if timeout <= 0: 411 fut = ensure_future(fut, loop=loop) 412 413 if fut.done(): 414 return fut.result() 415 416 await _cancel_and_wait(fut, loop=loop) 417 try: 418 return fut.result() 419 except exceptions.CancelledError as exc: 420 raise exceptions.TimeoutError() from exc 421 422 waiter = loop.create_future() 423 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 424 cb = functools.partial(_release_waiter, waiter) 425 426 fut = ensure_future(fut, loop=loop) 427 fut.add_done_callback(cb) 428 429 try: 430 # wait until the future completes or the timeout 431 try: 432 await waiter 433 except exceptions.CancelledError: 434 if fut.done(): 435 return fut.result() 436 else: 437 fut.remove_done_callback(cb) 438 # We must ensure that the task is not running 439 # after wait_for() returns. 440 # See https://bugs.python.org/issue32751 441 await _cancel_and_wait(fut, loop=loop) 442 raise 443 444 if fut.done(): 445 return fut.result() 446 else: 447 fut.remove_done_callback(cb) 448 # We must ensure that the task is not running 449 # after wait_for() returns. 450 # See https://bugs.python.org/issue32751 451 await _cancel_and_wait(fut, loop=loop) 452 # In case task cancellation failed with some 453 # exception, we should re-raise it 454 # See https://bugs.python.org/issue40607 455 try: 456 return fut.result() 457 except exceptions.CancelledError as exc: 458 raise exceptions.TimeoutError() from exc 459 finally: 460 timeout_handle.cancel() 461 462 463async def _wait(fs, timeout, return_when, loop): 464 """Internal helper for wait(). 465 466 The fs argument must be a collection of Futures. 467 """ 468 assert fs, 'Set of Futures is empty.' 469 waiter = loop.create_future() 470 timeout_handle = None 471 if timeout is not None: 472 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 473 counter = len(fs) 474 475 def _on_completion(f): 476 nonlocal counter 477 counter -= 1 478 if (counter <= 0 or 479 return_when == FIRST_COMPLETED or 480 return_when == FIRST_EXCEPTION and (not f.cancelled() and 481 f.exception() is not None)): 482 if timeout_handle is not None: 483 timeout_handle.cancel() 484 if not waiter.done(): 485 waiter.set_result(None) 486 487 for f in fs: 488 f.add_done_callback(_on_completion) 489 490 try: 491 await waiter 492 finally: 493 if timeout_handle is not None: 494 timeout_handle.cancel() 495 for f in fs: 496 f.remove_done_callback(_on_completion) 497 498 done, pending = set(), set() 499 for f in fs: 500 if f.done(): 501 done.add(f) 502 else: 503 pending.add(f) 504 return done, pending 505 506 507async def _cancel_and_wait(fut, loop): 508 """Cancel the *fut* future or task and wait until it completes.""" 509 510 waiter = loop.create_future() 511 cb = functools.partial(_release_waiter, waiter) 512 fut.add_done_callback(cb) 513 514 try: 515 fut.cancel() 516 # We cannot wait on *fut* directly to make 517 # sure _cancel_and_wait itself is reliably cancellable. 518 await waiter 519 finally: 520 fut.remove_done_callback(cb) 521 522 523# This is *not* a @coroutine! It is just an iterator (yielding Futures). 524def as_completed(fs, *, timeout=None): 525 """Return an iterator whose values are coroutines. 526 527 When waiting for the yielded coroutines you'll get the results (or 528 exceptions!) of the original Futures (or coroutines), in the order 529 in which and as soon as they complete. 530 531 This differs from PEP 3148; the proper way to use this is: 532 533 for f in as_completed(fs): 534 result = await f # The 'await' may raise. 535 # Use result. 536 537 If a timeout is specified, the 'await' will raise 538 TimeoutError when the timeout occurs before all Futures are done. 539 540 Note: The futures 'f' are not necessarily members of fs. 541 """ 542 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 543 raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") 544 545 from .queues import Queue # Import here to avoid circular import problem. 546 done = Queue() 547 548 loop = events._get_event_loop() 549 todo = {ensure_future(f, loop=loop) for f in set(fs)} 550 timeout_handle = None 551 552 def _on_timeout(): 553 for f in todo: 554 f.remove_done_callback(_on_completion) 555 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 556 todo.clear() # Can't do todo.remove(f) in the loop. 557 558 def _on_completion(f): 559 if not todo: 560 return # _on_timeout() was here first. 561 todo.remove(f) 562 done.put_nowait(f) 563 if not todo and timeout_handle is not None: 564 timeout_handle.cancel() 565 566 async def _wait_for_one(): 567 f = await done.get() 568 if f is None: 569 # Dummy value from _on_timeout(). 570 raise exceptions.TimeoutError 571 return f.result() # May raise f.exception(). 572 573 for f in todo: 574 f.add_done_callback(_on_completion) 575 if todo and timeout is not None: 576 timeout_handle = loop.call_later(timeout, _on_timeout) 577 for _ in range(len(todo)): 578 yield _wait_for_one() 579 580 581@types.coroutine 582def __sleep0(): 583 """Skip one event loop run cycle. 584 585 This is a private helper for 'asyncio.sleep()', used 586 when the 'delay' is set to 0. It uses a bare 'yield' 587 expression (which Task.__step knows how to handle) 588 instead of creating a Future object. 589 """ 590 yield 591 592 593async def sleep(delay, result=None): 594 """Coroutine that completes after a given time (in seconds).""" 595 if delay <= 0: 596 await __sleep0() 597 return result 598 599 loop = events.get_running_loop() 600 future = loop.create_future() 601 h = loop.call_later(delay, 602 futures._set_result_unless_cancelled, 603 future, result) 604 try: 605 return await future 606 finally: 607 h.cancel() 608 609 610def ensure_future(coro_or_future, *, loop=None): 611 """Wrap a coroutine or an awaitable in a future. 612 613 If the argument is a Future, it is returned directly. 614 """ 615 return _ensure_future(coro_or_future, loop=loop) 616 617 618def _ensure_future(coro_or_future, *, loop=None): 619 if futures.isfuture(coro_or_future): 620 if loop is not None and loop is not futures._get_loop(coro_or_future): 621 raise ValueError('The future belongs to a different loop than ' 622 'the one specified as the loop argument') 623 return coro_or_future 624 625 if not coroutines.iscoroutine(coro_or_future): 626 if inspect.isawaitable(coro_or_future): 627 coro_or_future = _wrap_awaitable(coro_or_future) 628 else: 629 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 630 'is required') 631 632 if loop is None: 633 loop = events._get_event_loop(stacklevel=4) 634 return loop.create_task(coro_or_future) 635 636 637@types.coroutine 638def _wrap_awaitable(awaitable): 639 """Helper for asyncio.ensure_future(). 640 641 Wraps awaitable (an object with __await__) into a coroutine 642 that will later be wrapped in a Task by ensure_future(). 643 """ 644 return (yield from awaitable.__await__()) 645 646_wrap_awaitable._is_coroutine = _is_coroutine 647 648 649class _GatheringFuture(futures.Future): 650 """Helper for gather(). 651 652 This overrides cancel() to cancel all the children and act more 653 like Task.cancel(), which doesn't immediately mark itself as 654 cancelled. 655 """ 656 657 def __init__(self, children, *, loop): 658 assert loop is not None 659 super().__init__(loop=loop) 660 self._children = children 661 self._cancel_requested = False 662 663 def cancel(self, msg=None): 664 if self.done(): 665 return False 666 ret = False 667 for child in self._children: 668 if child.cancel(msg=msg): 669 ret = True 670 if ret: 671 # If any child tasks were actually cancelled, we should 672 # propagate the cancellation request regardless of 673 # *return_exceptions* argument. See issue 32684. 674 self._cancel_requested = True 675 return ret 676 677 678def gather(*coros_or_futures, return_exceptions=False): 679 """Return a future aggregating results from the given coroutines/futures. 680 681 Coroutines will be wrapped in a future and scheduled in the event 682 loop. They will not necessarily be scheduled in the same order as 683 passed in. 684 685 All futures must share the same event loop. If all the tasks are 686 done successfully, the returned future's result is the list of 687 results (in the order of the original sequence, not necessarily 688 the order of results arrival). If *return_exceptions* is True, 689 exceptions in the tasks are treated the same as successful 690 results, and gathered in the result list; otherwise, the first 691 raised exception will be immediately propagated to the returned 692 future. 693 694 Cancellation: if the outer Future is cancelled, all children (that 695 have not completed yet) are also cancelled. If any child is 696 cancelled, this is treated as if it raised CancelledError -- 697 the outer Future is *not* cancelled in this case. (This is to 698 prevent the cancellation of one child to cause other children to 699 be cancelled.) 700 701 If *return_exceptions* is False, cancelling gather() after it 702 has been marked done won't cancel any submitted awaitables. 703 For instance, gather can be marked done after propagating an 704 exception to the caller, therefore, calling ``gather.cancel()`` 705 after catching an exception (raised by one of the awaitables) from 706 gather won't cancel any other awaitables. 707 """ 708 if not coros_or_futures: 709 loop = events._get_event_loop() 710 outer = loop.create_future() 711 outer.set_result([]) 712 return outer 713 714 def _done_callback(fut): 715 nonlocal nfinished 716 nfinished += 1 717 718 if outer.done(): 719 if not fut.cancelled(): 720 # Mark exception retrieved. 721 fut.exception() 722 return 723 724 if not return_exceptions: 725 if fut.cancelled(): 726 # Check if 'fut' is cancelled first, as 727 # 'fut.exception()' will *raise* a CancelledError 728 # instead of returning it. 729 exc = fut._make_cancelled_error() 730 outer.set_exception(exc) 731 return 732 else: 733 exc = fut.exception() 734 if exc is not None: 735 outer.set_exception(exc) 736 return 737 738 if nfinished == nfuts: 739 # All futures are done; create a list of results 740 # and set it to the 'outer' future. 741 results = [] 742 743 for fut in children: 744 if fut.cancelled(): 745 # Check if 'fut' is cancelled first, as 'fut.exception()' 746 # will *raise* a CancelledError instead of returning it. 747 # Also, since we're adding the exception return value 748 # to 'results' instead of raising it, don't bother 749 # setting __context__. This also lets us preserve 750 # calling '_make_cancelled_error()' at most once. 751 res = exceptions.CancelledError( 752 '' if fut._cancel_message is None else 753 fut._cancel_message) 754 else: 755 res = fut.exception() 756 if res is None: 757 res = fut.result() 758 results.append(res) 759 760 if outer._cancel_requested: 761 # If gather is being cancelled we must propagate the 762 # cancellation regardless of *return_exceptions* argument. 763 # See issue 32684. 764 exc = fut._make_cancelled_error() 765 outer.set_exception(exc) 766 else: 767 outer.set_result(results) 768 769 arg_to_fut = {} 770 children = [] 771 nfuts = 0 772 nfinished = 0 773 loop = None 774 for arg in coros_or_futures: 775 if arg not in arg_to_fut: 776 fut = _ensure_future(arg, loop=loop) 777 if loop is None: 778 loop = futures._get_loop(fut) 779 if fut is not arg: 780 # 'arg' was not a Future, therefore, 'fut' is a new 781 # Future created specifically for 'arg'. Since the caller 782 # can't control it, disable the "destroy pending task" 783 # warning. 784 fut._log_destroy_pending = False 785 786 nfuts += 1 787 arg_to_fut[arg] = fut 788 fut.add_done_callback(_done_callback) 789 790 else: 791 # There's a duplicate Future object in coros_or_futures. 792 fut = arg_to_fut[arg] 793 794 children.append(fut) 795 796 outer = _GatheringFuture(children, loop=loop) 797 return outer 798 799 800def shield(arg): 801 """Wait for a future, shielding it from cancellation. 802 803 The statement 804 805 res = await shield(something()) 806 807 is exactly equivalent to the statement 808 809 res = await something() 810 811 *except* that if the coroutine containing it is cancelled, the 812 task running in something() is not cancelled. From the POV of 813 something(), the cancellation did not happen. But its caller is 814 still cancelled, so the yield-from expression still raises 815 CancelledError. Note: If something() is cancelled by other means 816 this will still cancel shield(). 817 818 If you want to completely ignore cancellation (not recommended) 819 you can combine shield() with a try/except clause, as follows: 820 821 try: 822 res = await shield(something()) 823 except CancelledError: 824 res = None 825 """ 826 inner = _ensure_future(arg) 827 if inner.done(): 828 # Shortcut. 829 return inner 830 loop = futures._get_loop(inner) 831 outer = loop.create_future() 832 833 def _inner_done_callback(inner): 834 if outer.cancelled(): 835 if not inner.cancelled(): 836 # Mark inner's result as retrieved. 837 inner.exception() 838 return 839 840 if inner.cancelled(): 841 outer.cancel() 842 else: 843 exc = inner.exception() 844 if exc is not None: 845 outer.set_exception(exc) 846 else: 847 outer.set_result(inner.result()) 848 849 850 def _outer_done_callback(outer): 851 if not inner.done(): 852 inner.remove_done_callback(_inner_done_callback) 853 854 inner.add_done_callback(_inner_done_callback) 855 outer.add_done_callback(_outer_done_callback) 856 return outer 857 858 859def run_coroutine_threadsafe(coro, loop): 860 """Submit a coroutine object to a given event loop. 861 862 Return a concurrent.futures.Future to access the result. 863 """ 864 if not coroutines.iscoroutine(coro): 865 raise TypeError('A coroutine object is required') 866 future = concurrent.futures.Future() 867 868 def callback(): 869 try: 870 futures._chain_future(ensure_future(coro, loop=loop), future) 871 except (SystemExit, KeyboardInterrupt): 872 raise 873 except BaseException as exc: 874 if future.set_running_or_notify_cancel(): 875 future.set_exception(exc) 876 raise 877 878 loop.call_soon_threadsafe(callback) 879 return future 880 881 882# WeakSet containing all alive tasks. 883_all_tasks = weakref.WeakSet() 884 885# Dictionary containing tasks that are currently active in 886# all running event loops. {EventLoop: Task} 887_current_tasks = {} 888 889 890def _register_task(task): 891 """Register a new task in asyncio as executed by loop.""" 892 _all_tasks.add(task) 893 894 895def _enter_task(loop, task): 896 current_task = _current_tasks.get(loop) 897 if current_task is not None: 898 raise RuntimeError(f"Cannot enter into task {task!r} while another " 899 f"task {current_task!r} is being executed.") 900 _current_tasks[loop] = task 901 902 903def _leave_task(loop, task): 904 current_task = _current_tasks.get(loop) 905 if current_task is not task: 906 raise RuntimeError(f"Leaving task {task!r} does not match " 907 f"the current task {current_task!r}.") 908 del _current_tasks[loop] 909 910 911def _unregister_task(task): 912 """Unregister a task.""" 913 _all_tasks.discard(task) 914 915 916_py_register_task = _register_task 917_py_unregister_task = _unregister_task 918_py_enter_task = _enter_task 919_py_leave_task = _leave_task 920 921 922try: 923 from _asyncio import (_register_task, _unregister_task, 924 _enter_task, _leave_task, 925 _all_tasks, _current_tasks) 926except ImportError: 927 pass 928else: 929 _c_register_task = _register_task 930 _c_unregister_task = _unregister_task 931 _c_enter_task = _enter_task 932 _c_leave_task = _leave_task 933