1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ( 4 'Task', 'create_task', 5 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 6 'wait', 'wait_for', 'as_completed', 'sleep', 7 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 8 'current_task', 'all_tasks', 9 'create_eager_task_factory', 'eager_task_factory', 10 '_register_task', '_unregister_task', '_enter_task', '_leave_task', 11) 12 13import concurrent.futures 14import contextvars 15import functools 16import inspect 17import itertools 18import math 19import types 20import weakref 21from types import GenericAlias 22 23from . import base_tasks 24from . import coroutines 25from . import events 26from . import exceptions 27from . import futures 28from . import queues 29from . import timeouts 30 31# Helper to generate new task names 32# This uses itertools.count() instead of a "+= 1" operation because the latter 33# is not thread safe. See bpo-11866 for a longer explanation. 34_task_name_counter = itertools.count(1).__next__ 35 36 37def current_task(loop=None): 38 """Return a currently executed task.""" 39 if loop is None: 40 loop = events.get_running_loop() 41 return _current_tasks.get(loop) 42 43 44def all_tasks(loop=None): 45 """Return a set of all tasks for the loop.""" 46 if loop is None: 47 loop = events.get_running_loop() 48 # capturing the set of eager tasks first, so if an eager task "graduates" 49 # to a regular task in another thread, we don't risk missing it. 50 eager_tasks = list(_eager_tasks) 51 # Looping over the WeakSet isn't safe as it can be updated from another 52 # thread, therefore we cast it to list prior to filtering. The list cast 53 # itself requires iteration, so we repeat it several times ignoring 54 # RuntimeErrors (which are not very likely to occur). 55 # See issues 34970 and 36607 for details. 56 scheduled_tasks = None 57 i = 0 58 while True: 59 try: 60 scheduled_tasks = list(_scheduled_tasks) 61 except RuntimeError: 62 i += 1 63 if i >= 1000: 64 raise 65 else: 66 break 67 return {t for t in itertools.chain(scheduled_tasks, eager_tasks) 68 if futures._get_loop(t) is loop and not t.done()} 69 70 71class Task(futures._PyFuture): # Inherit Python Task implementation 72 # from a Python Future implementation. 73 74 """A coroutine wrapped in a Future.""" 75 76 # An important invariant maintained while a Task not done: 77 # _fut_waiter is either None or a Future. The Future 78 # can be either done() or not done(). 79 # The task can be in any of 3 states: 80 # 81 # - 1: _fut_waiter is not None and not _fut_waiter.done(): 82 # __step() is *not* scheduled and the Task is waiting for _fut_waiter. 83 # - 2: (_fut_waiter is None or _fut_waiter.done()) and __step() is scheduled: 84 # the Task is waiting for __step() to be executed. 85 # - 3: _fut_waiter is None and __step() is *not* scheduled: 86 # the Task is currently executing (in __step()). 87 # 88 # * In state 1, one of the callbacks of __fut_waiter must be __wakeup(). 89 # * The transition from 1 to 2 happens when _fut_waiter becomes done(), 90 # as it schedules __wakeup() to be called (which calls __step() so 91 # we way that __step() is scheduled). 92 # * It transitions from 2 to 3 when __step() is executed, and it clears 93 # _fut_waiter to None. 94 95 # If False, don't log a message if the task is destroyed while its 96 # status is still pending 97 _log_destroy_pending = True 98 99 def __init__(self, coro, *, loop=None, name=None, context=None, 100 eager_start=False): 101 super().__init__(loop=loop) 102 if self._source_traceback: 103 del self._source_traceback[-1] 104 if not coroutines.iscoroutine(coro): 105 # raise after Future.__init__(), attrs are required for __del__ 106 # prevent logging for pending task in __del__ 107 self._log_destroy_pending = False 108 raise TypeError(f"a coroutine was expected, got {coro!r}") 109 110 if name is None: 111 self._name = f'Task-{_task_name_counter()}' 112 else: 113 self._name = str(name) 114 115 self._num_cancels_requested = 0 116 self._must_cancel = False 117 self._fut_waiter = None 118 self._coro = coro 119 if context is None: 120 self._context = contextvars.copy_context() 121 else: 122 self._context = context 123 124 if eager_start and self._loop.is_running(): 125 self.__eager_start() 126 else: 127 self._loop.call_soon(self.__step, context=self._context) 128 _register_task(self) 129 130 def __del__(self): 131 if self._state == futures._PENDING and self._log_destroy_pending: 132 context = { 133 'task': self, 134 'message': 'Task was destroyed but it is pending!', 135 } 136 if self._source_traceback: 137 context['source_traceback'] = self._source_traceback 138 self._loop.call_exception_handler(context) 139 super().__del__() 140 141 __class_getitem__ = classmethod(GenericAlias) 142 143 def __repr__(self): 144 return base_tasks._task_repr(self) 145 146 def get_coro(self): 147 return self._coro 148 149 def get_context(self): 150 return self._context 151 152 def get_name(self): 153 return self._name 154 155 def set_name(self, value): 156 self._name = str(value) 157 158 def set_result(self, result): 159 raise RuntimeError('Task does not support set_result operation') 160 161 def set_exception(self, exception): 162 raise RuntimeError('Task does not support set_exception operation') 163 164 def get_stack(self, *, limit=None): 165 """Return the list of stack frames for this task's coroutine. 166 167 If the coroutine is not done, this returns the stack where it is 168 suspended. If the coroutine has completed successfully or was 169 cancelled, this returns an empty list. If the coroutine was 170 terminated by an exception, this returns the list of traceback 171 frames. 172 173 The frames are always ordered from oldest to newest. 174 175 The optional limit gives the maximum number of frames to 176 return; by default all available frames are returned. Its 177 meaning differs depending on whether a stack or a traceback is 178 returned: the newest frames of a stack are returned, but the 179 oldest frames of a traceback are returned. (This matches the 180 behavior of the traceback module.) 181 182 For reasons beyond our control, only one stack frame is 183 returned for a suspended coroutine. 184 """ 185 return base_tasks._task_get_stack(self, limit) 186 187 def print_stack(self, *, limit=None, file=None): 188 """Print the stack or traceback for this task's coroutine. 189 190 This produces output similar to that of the traceback module, 191 for the frames retrieved by get_stack(). The limit argument 192 is passed to get_stack(). The file argument is an I/O stream 193 to which the output is written; by default output is written 194 to sys.stderr. 195 """ 196 return base_tasks._task_print_stack(self, limit, file) 197 198 def cancel(self, msg=None): 199 """Request that this task cancel itself. 200 201 This arranges for a CancelledError to be thrown into the 202 wrapped coroutine on the next cycle through the event loop. 203 The coroutine then has a chance to clean up or even deny 204 the request using try/except/finally. 205 206 Unlike Future.cancel, this does not guarantee that the 207 task will be cancelled: the exception might be caught and 208 acted upon, delaying cancellation of the task or preventing 209 cancellation completely. The task may also return a value or 210 raise a different exception. 211 212 Immediately after this method is called, Task.cancelled() will 213 not return True (unless the task was already cancelled). A 214 task will be marked as cancelled when the wrapped coroutine 215 terminates with a CancelledError exception (even if cancel() 216 was not called). 217 218 This also increases the task's count of cancellation requests. 219 """ 220 self._log_traceback = False 221 if self.done(): 222 return False 223 self._num_cancels_requested += 1 224 # These two lines are controversial. See discussion starting at 225 # https://github.com/python/cpython/pull/31394#issuecomment-1053545331 226 # Also remember that this is duplicated in _asynciomodule.c. 227 # if self._num_cancels_requested > 1: 228 # return False 229 if self._fut_waiter is not None: 230 if self._fut_waiter.cancel(msg=msg): 231 # Leave self._fut_waiter; it may be a Task that 232 # catches and ignores the cancellation so we may have 233 # to cancel it again later. 234 return True 235 # It must be the case that self.__step is already scheduled. 236 self._must_cancel = True 237 self._cancel_message = msg 238 return True 239 240 def cancelling(self): 241 """Return the count of the task's cancellation requests. 242 243 This count is incremented when .cancel() is called 244 and may be decremented using .uncancel(). 245 """ 246 return self._num_cancels_requested 247 248 def uncancel(self): 249 """Decrement the task's count of cancellation requests. 250 251 This should be called by the party that called `cancel()` on the task 252 beforehand. 253 254 Returns the remaining number of cancellation requests. 255 """ 256 if self._num_cancels_requested > 0: 257 self._num_cancels_requested -= 1 258 if self._num_cancels_requested == 0: 259 self._must_cancel = False 260 return self._num_cancels_requested 261 262 def __eager_start(self): 263 prev_task = _swap_current_task(self._loop, self) 264 try: 265 _register_eager_task(self) 266 try: 267 self._context.run(self.__step_run_and_handle_result, None) 268 finally: 269 _unregister_eager_task(self) 270 finally: 271 try: 272 curtask = _swap_current_task(self._loop, prev_task) 273 assert curtask is self 274 finally: 275 if self.done(): 276 self._coro = None 277 self = None # Needed to break cycles when an exception occurs. 278 else: 279 _register_task(self) 280 281 def __step(self, exc=None): 282 if self.done(): 283 raise exceptions.InvalidStateError( 284 f'_step(): already done: {self!r}, {exc!r}') 285 if self._must_cancel: 286 if not isinstance(exc, exceptions.CancelledError): 287 exc = self._make_cancelled_error() 288 self._must_cancel = False 289 self._fut_waiter = None 290 291 _enter_task(self._loop, self) 292 try: 293 self.__step_run_and_handle_result(exc) 294 finally: 295 _leave_task(self._loop, self) 296 self = None # Needed to break cycles when an exception occurs. 297 298 def __step_run_and_handle_result(self, exc): 299 coro = self._coro 300 try: 301 if exc is None: 302 # We use the `send` method directly, because coroutines 303 # don't have `__iter__` and `__next__` methods. 304 result = coro.send(None) 305 else: 306 result = coro.throw(exc) 307 except StopIteration as exc: 308 if self._must_cancel: 309 # Task is cancelled right before coro stops. 310 self._must_cancel = False 311 super().cancel(msg=self._cancel_message) 312 else: 313 super().set_result(exc.value) 314 except exceptions.CancelledError as exc: 315 # Save the original exception so we can chain it later. 316 self._cancelled_exc = exc 317 super().cancel() # I.e., Future.cancel(self). 318 except (KeyboardInterrupt, SystemExit) as exc: 319 super().set_exception(exc) 320 raise 321 except BaseException as exc: 322 super().set_exception(exc) 323 else: 324 blocking = getattr(result, '_asyncio_future_blocking', None) 325 if blocking is not None: 326 # Yielded Future must come from Future.__iter__(). 327 if futures._get_loop(result) is not self._loop: 328 new_exc = RuntimeError( 329 f'Task {self!r} got Future ' 330 f'{result!r} attached to a different loop') 331 self._loop.call_soon( 332 self.__step, new_exc, context=self._context) 333 elif blocking: 334 if result is self: 335 new_exc = RuntimeError( 336 f'Task cannot await on itself: {self!r}') 337 self._loop.call_soon( 338 self.__step, new_exc, context=self._context) 339 else: 340 result._asyncio_future_blocking = False 341 result.add_done_callback( 342 self.__wakeup, context=self._context) 343 self._fut_waiter = result 344 if self._must_cancel: 345 if self._fut_waiter.cancel( 346 msg=self._cancel_message): 347 self._must_cancel = False 348 else: 349 new_exc = RuntimeError( 350 f'yield was used instead of yield from ' 351 f'in task {self!r} with {result!r}') 352 self._loop.call_soon( 353 self.__step, new_exc, context=self._context) 354 355 elif result is None: 356 # Bare yield relinquishes control for one event loop iteration. 357 self._loop.call_soon(self.__step, context=self._context) 358 elif inspect.isgenerator(result): 359 # Yielding a generator is just wrong. 360 new_exc = RuntimeError( 361 f'yield was used instead of yield from for ' 362 f'generator in task {self!r} with {result!r}') 363 self._loop.call_soon( 364 self.__step, new_exc, context=self._context) 365 else: 366 # Yielding something else is an error. 367 new_exc = RuntimeError(f'Task got bad yield: {result!r}') 368 self._loop.call_soon( 369 self.__step, new_exc, context=self._context) 370 finally: 371 self = None # Needed to break cycles when an exception occurs. 372 373 def __wakeup(self, future): 374 try: 375 future.result() 376 except BaseException as exc: 377 # This may also be a cancellation. 378 self.__step(exc) 379 else: 380 # Don't pass the value of `future.result()` explicitly, 381 # as `Future.__iter__` and `Future.__await__` don't need it. 382 # If we call `_step(value, None)` instead of `_step()`, 383 # Python eval loop would use `.send(value)` method call, 384 # instead of `__next__()`, which is slower for futures 385 # that return non-generator iterators from their `__iter__`. 386 self.__step() 387 self = None # Needed to break cycles when an exception occurs. 388 389 390_PyTask = Task 391 392 393try: 394 import _asyncio 395except ImportError: 396 pass 397else: 398 # _CTask is needed for tests. 399 Task = _CTask = _asyncio.Task 400 401 402def create_task(coro, *, name=None, context=None): 403 """Schedule the execution of a coroutine object in a spawn task. 404 405 Return a Task object. 406 """ 407 loop = events.get_running_loop() 408 if context is None: 409 # Use legacy API if context is not needed 410 task = loop.create_task(coro, name=name) 411 else: 412 task = loop.create_task(coro, name=name, context=context) 413 414 return task 415 416 417# wait() and as_completed() similar to those in PEP 3148. 418 419FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 420FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 421ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 422 423 424async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED): 425 """Wait for the Futures or Tasks given by fs to complete. 426 427 The fs iterable must not be empty. 428 429 Returns two sets of Future: (done, pending). 430 431 Usage: 432 433 done, pending = await asyncio.wait(fs) 434 435 Note: This does not raise TimeoutError! Futures that aren't done 436 when the timeout occurs are returned in the second set. 437 """ 438 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 439 raise TypeError(f"expect a list of futures, not {type(fs).__name__}") 440 if not fs: 441 raise ValueError('Set of Tasks/Futures is empty.') 442 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 443 raise ValueError(f'Invalid return_when value: {return_when}') 444 445 fs = set(fs) 446 447 if any(coroutines.iscoroutine(f) for f in fs): 448 raise TypeError("Passing coroutines is forbidden, use tasks explicitly.") 449 450 loop = events.get_running_loop() 451 return await _wait(fs, timeout, return_when, loop) 452 453 454def _release_waiter(waiter, *args): 455 if not waiter.done(): 456 waiter.set_result(None) 457 458 459async def wait_for(fut, timeout): 460 """Wait for the single Future or coroutine to complete, with timeout. 461 462 Coroutine will be wrapped in Task. 463 464 Returns result of the Future or coroutine. When a timeout occurs, 465 it cancels the task and raises TimeoutError. To avoid the task 466 cancellation, wrap it in shield(). 467 468 If the wait is cancelled, the task is also cancelled. 469 470 If the task suppresses the cancellation and returns a value instead, 471 that value is returned. 472 473 This function is a coroutine. 474 """ 475 # The special case for timeout <= 0 is for the following case: 476 # 477 # async def test_waitfor(): 478 # func_started = False 479 # 480 # async def func(): 481 # nonlocal func_started 482 # func_started = True 483 # 484 # try: 485 # await asyncio.wait_for(func(), 0) 486 # except asyncio.TimeoutError: 487 # assert not func_started 488 # else: 489 # assert False 490 # 491 # asyncio.run(test_waitfor()) 492 493 494 if timeout is not None and timeout <= 0: 495 fut = ensure_future(fut) 496 497 if fut.done(): 498 return fut.result() 499 500 await _cancel_and_wait(fut) 501 try: 502 return fut.result() 503 except exceptions.CancelledError as exc: 504 raise TimeoutError from exc 505 506 async with timeouts.timeout(timeout): 507 return await fut 508 509async def _wait(fs, timeout, return_when, loop): 510 """Internal helper for wait(). 511 512 The fs argument must be a collection of Futures. 513 """ 514 assert fs, 'Set of Futures is empty.' 515 waiter = loop.create_future() 516 timeout_handle = None 517 if timeout is not None: 518 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 519 counter = len(fs) 520 521 def _on_completion(f): 522 nonlocal counter 523 counter -= 1 524 if (counter <= 0 or 525 return_when == FIRST_COMPLETED or 526 return_when == FIRST_EXCEPTION and (not f.cancelled() and 527 f.exception() is not None)): 528 if timeout_handle is not None: 529 timeout_handle.cancel() 530 if not waiter.done(): 531 waiter.set_result(None) 532 533 for f in fs: 534 f.add_done_callback(_on_completion) 535 536 try: 537 await waiter 538 finally: 539 if timeout_handle is not None: 540 timeout_handle.cancel() 541 for f in fs: 542 f.remove_done_callback(_on_completion) 543 544 done, pending = set(), set() 545 for f in fs: 546 if f.done(): 547 done.add(f) 548 else: 549 pending.add(f) 550 return done, pending 551 552 553async def _cancel_and_wait(fut): 554 """Cancel the *fut* future or task and wait until it completes.""" 555 556 loop = events.get_running_loop() 557 waiter = loop.create_future() 558 cb = functools.partial(_release_waiter, waiter) 559 fut.add_done_callback(cb) 560 561 try: 562 fut.cancel() 563 # We cannot wait on *fut* directly to make 564 # sure _cancel_and_wait itself is reliably cancellable. 565 await waiter 566 finally: 567 fut.remove_done_callback(cb) 568 569 570class _AsCompletedIterator: 571 """Iterator of awaitables representing tasks of asyncio.as_completed. 572 573 As an asynchronous iterator, iteration yields futures as they finish. As a 574 plain iterator, new coroutines are yielded that will return or raise the 575 result of the next underlying future to complete. 576 """ 577 def __init__(self, aws, timeout): 578 self._done = queues.Queue() 579 self._timeout_handle = None 580 581 loop = events.get_event_loop() 582 todo = {ensure_future(aw, loop=loop) for aw in set(aws)} 583 for f in todo: 584 f.add_done_callback(self._handle_completion) 585 if todo and timeout is not None: 586 self._timeout_handle = ( 587 loop.call_later(timeout, self._handle_timeout) 588 ) 589 self._todo = todo 590 self._todo_left = len(todo) 591 592 def __aiter__(self): 593 return self 594 595 def __iter__(self): 596 return self 597 598 async def __anext__(self): 599 if not self._todo_left: 600 raise StopAsyncIteration 601 assert self._todo_left > 0 602 self._todo_left -= 1 603 return await self._wait_for_one() 604 605 def __next__(self): 606 if not self._todo_left: 607 raise StopIteration 608 assert self._todo_left > 0 609 self._todo_left -= 1 610 return self._wait_for_one(resolve=True) 611 612 def _handle_timeout(self): 613 for f in self._todo: 614 f.remove_done_callback(self._handle_completion) 615 self._done.put_nowait(None) # Sentinel for _wait_for_one(). 616 self._todo.clear() # Can't do todo.remove(f) in the loop. 617 618 def _handle_completion(self, f): 619 if not self._todo: 620 return # _handle_timeout() was here first. 621 self._todo.remove(f) 622 self._done.put_nowait(f) 623 if not self._todo and self._timeout_handle is not None: 624 self._timeout_handle.cancel() 625 626 async def _wait_for_one(self, resolve=False): 627 # Wait for the next future to be done and return it unless resolve is 628 # set, in which case return either the result of the future or raise 629 # an exception. 630 f = await self._done.get() 631 if f is None: 632 # Dummy value from _handle_timeout(). 633 raise exceptions.TimeoutError 634 return f.result() if resolve else f 635 636 637def as_completed(fs, *, timeout=None): 638 """Create an iterator of awaitables or their results in completion order. 639 640 Run the supplied awaitables concurrently. The returned object can be 641 iterated to obtain the results of the awaitables as they finish. 642 643 The object returned can be iterated as an asynchronous iterator or a plain 644 iterator. When asynchronous iteration is used, the originally-supplied 645 awaitables are yielded if they are tasks or futures. This makes it easy to 646 correlate previously-scheduled tasks with their results: 647 648 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 649 ipv6_connect = create_task(open_connection("::1", 80)) 650 tasks = [ipv4_connect, ipv6_connect] 651 652 async for earliest_connect in as_completed(tasks): 653 # earliest_connect is done. The result can be obtained by 654 # awaiting it or calling earliest_connect.result() 655 reader, writer = await earliest_connect 656 657 if earliest_connect is ipv6_connect: 658 print("IPv6 connection established.") 659 else: 660 print("IPv4 connection established.") 661 662 During asynchronous iteration, implicitly-created tasks will be yielded for 663 supplied awaitables that aren't tasks or futures. 664 665 When used as a plain iterator, each iteration yields a new coroutine that 666 returns the result or raises the exception of the next completed awaitable. 667 This pattern is compatible with Python versions older than 3.13: 668 669 ipv4_connect = create_task(open_connection("127.0.0.1", 80)) 670 ipv6_connect = create_task(open_connection("::1", 80)) 671 tasks = [ipv4_connect, ipv6_connect] 672 673 for next_connect in as_completed(tasks): 674 # next_connect is not one of the original task objects. It must be 675 # awaited to obtain the result value or raise the exception of the 676 # awaitable that finishes next. 677 reader, writer = await next_connect 678 679 A TimeoutError is raised if the timeout occurs before all awaitables are 680 done. This is raised by the async for loop during asynchronous iteration or 681 by the coroutines yielded during plain iteration. 682 """ 683 if inspect.isawaitable(fs): 684 raise TypeError( 685 f"expects an iterable of awaitables, not {type(fs).__name__}" 686 ) 687 688 return _AsCompletedIterator(fs, timeout) 689 690 691@types.coroutine 692def __sleep0(): 693 """Skip one event loop run cycle. 694 695 This is a private helper for 'asyncio.sleep()', used 696 when the 'delay' is set to 0. It uses a bare 'yield' 697 expression (which Task.__step knows how to handle) 698 instead of creating a Future object. 699 """ 700 yield 701 702 703async def sleep(delay, result=None): 704 """Coroutine that completes after a given time (in seconds).""" 705 if delay <= 0: 706 await __sleep0() 707 return result 708 709 if math.isnan(delay): 710 raise ValueError("Invalid delay: NaN (not a number)") 711 712 loop = events.get_running_loop() 713 future = loop.create_future() 714 h = loop.call_later(delay, 715 futures._set_result_unless_cancelled, 716 future, result) 717 try: 718 return await future 719 finally: 720 h.cancel() 721 722 723def ensure_future(coro_or_future, *, loop=None): 724 """Wrap a coroutine or an awaitable in a future. 725 726 If the argument is a Future, it is returned directly. 727 """ 728 if futures.isfuture(coro_or_future): 729 if loop is not None and loop is not futures._get_loop(coro_or_future): 730 raise ValueError('The future belongs to a different loop than ' 731 'the one specified as the loop argument') 732 return coro_or_future 733 should_close = True 734 if not coroutines.iscoroutine(coro_or_future): 735 if inspect.isawaitable(coro_or_future): 736 async def _wrap_awaitable(awaitable): 737 return await awaitable 738 739 coro_or_future = _wrap_awaitable(coro_or_future) 740 should_close = False 741 else: 742 raise TypeError('An asyncio.Future, a coroutine or an awaitable ' 743 'is required') 744 745 if loop is None: 746 loop = events.get_event_loop() 747 try: 748 return loop.create_task(coro_or_future) 749 except RuntimeError: 750 if should_close: 751 coro_or_future.close() 752 raise 753 754 755class _GatheringFuture(futures.Future): 756 """Helper for gather(). 757 758 This overrides cancel() to cancel all the children and act more 759 like Task.cancel(), which doesn't immediately mark itself as 760 cancelled. 761 """ 762 763 def __init__(self, children, *, loop): 764 assert loop is not None 765 super().__init__(loop=loop) 766 self._children = children 767 self._cancel_requested = False 768 769 def cancel(self, msg=None): 770 if self.done(): 771 return False 772 ret = False 773 for child in self._children: 774 if child.cancel(msg=msg): 775 ret = True 776 if ret: 777 # If any child tasks were actually cancelled, we should 778 # propagate the cancellation request regardless of 779 # *return_exceptions* argument. See issue 32684. 780 self._cancel_requested = True 781 return ret 782 783 784def gather(*coros_or_futures, return_exceptions=False): 785 """Return a future aggregating results from the given coroutines/futures. 786 787 Coroutines will be wrapped in a future and scheduled in the event 788 loop. They will not necessarily be scheduled in the same order as 789 passed in. 790 791 All futures must share the same event loop. If all the tasks are 792 done successfully, the returned future's result is the list of 793 results (in the order of the original sequence, not necessarily 794 the order of results arrival). If *return_exceptions* is True, 795 exceptions in the tasks are treated the same as successful 796 results, and gathered in the result list; otherwise, the first 797 raised exception will be immediately propagated to the returned 798 future. 799 800 Cancellation: if the outer Future is cancelled, all children (that 801 have not completed yet) are also cancelled. If any child is 802 cancelled, this is treated as if it raised CancelledError -- 803 the outer Future is *not* cancelled in this case. (This is to 804 prevent the cancellation of one child to cause other children to 805 be cancelled.) 806 807 If *return_exceptions* is False, cancelling gather() after it 808 has been marked done won't cancel any submitted awaitables. 809 For instance, gather can be marked done after propagating an 810 exception to the caller, therefore, calling ``gather.cancel()`` 811 after catching an exception (raised by one of the awaitables) from 812 gather won't cancel any other awaitables. 813 """ 814 if not coros_or_futures: 815 loop = events.get_event_loop() 816 outer = loop.create_future() 817 outer.set_result([]) 818 return outer 819 820 def _done_callback(fut): 821 nonlocal nfinished 822 nfinished += 1 823 824 if outer is None or outer.done(): 825 if not fut.cancelled(): 826 # Mark exception retrieved. 827 fut.exception() 828 return 829 830 if not return_exceptions: 831 if fut.cancelled(): 832 # Check if 'fut' is cancelled first, as 833 # 'fut.exception()' will *raise* a CancelledError 834 # instead of returning it. 835 exc = fut._make_cancelled_error() 836 outer.set_exception(exc) 837 return 838 else: 839 exc = fut.exception() 840 if exc is not None: 841 outer.set_exception(exc) 842 return 843 844 if nfinished == nfuts: 845 # All futures are done; create a list of results 846 # and set it to the 'outer' future. 847 results = [] 848 849 for fut in children: 850 if fut.cancelled(): 851 # Check if 'fut' is cancelled first, as 'fut.exception()' 852 # will *raise* a CancelledError instead of returning it. 853 # Also, since we're adding the exception return value 854 # to 'results' instead of raising it, don't bother 855 # setting __context__. This also lets us preserve 856 # calling '_make_cancelled_error()' at most once. 857 res = exceptions.CancelledError( 858 '' if fut._cancel_message is None else 859 fut._cancel_message) 860 else: 861 res = fut.exception() 862 if res is None: 863 res = fut.result() 864 results.append(res) 865 866 if outer._cancel_requested: 867 # If gather is being cancelled we must propagate the 868 # cancellation regardless of *return_exceptions* argument. 869 # See issue 32684. 870 exc = fut._make_cancelled_error() 871 outer.set_exception(exc) 872 else: 873 outer.set_result(results) 874 875 arg_to_fut = {} 876 children = [] 877 nfuts = 0 878 nfinished = 0 879 done_futs = [] 880 loop = None 881 outer = None # bpo-46672 882 for arg in coros_or_futures: 883 if arg not in arg_to_fut: 884 fut = ensure_future(arg, loop=loop) 885 if loop is None: 886 loop = futures._get_loop(fut) 887 if fut is not arg: 888 # 'arg' was not a Future, therefore, 'fut' is a new 889 # Future created specifically for 'arg'. Since the caller 890 # can't control it, disable the "destroy pending task" 891 # warning. 892 fut._log_destroy_pending = False 893 894 nfuts += 1 895 arg_to_fut[arg] = fut 896 if fut.done(): 897 done_futs.append(fut) 898 else: 899 fut.add_done_callback(_done_callback) 900 901 else: 902 # There's a duplicate Future object in coros_or_futures. 903 fut = arg_to_fut[arg] 904 905 children.append(fut) 906 907 outer = _GatheringFuture(children, loop=loop) 908 # Run done callbacks after GatheringFuture created so any post-processing 909 # can be performed at this point 910 # optimization: in the special case that *all* futures finished eagerly, 911 # this will effectively complete the gather eagerly, with the last 912 # callback setting the result (or exception) on outer before returning it 913 for fut in done_futs: 914 _done_callback(fut) 915 return outer 916 917 918def shield(arg): 919 """Wait for a future, shielding it from cancellation. 920 921 The statement 922 923 task = asyncio.create_task(something()) 924 res = await shield(task) 925 926 is exactly equivalent to the statement 927 928 res = await something() 929 930 *except* that if the coroutine containing it is cancelled, the 931 task running in something() is not cancelled. From the POV of 932 something(), the cancellation did not happen. But its caller is 933 still cancelled, so the yield-from expression still raises 934 CancelledError. Note: If something() is cancelled by other means 935 this will still cancel shield(). 936 937 If you want to completely ignore cancellation (not recommended) 938 you can combine shield() with a try/except clause, as follows: 939 940 task = asyncio.create_task(something()) 941 try: 942 res = await shield(task) 943 except CancelledError: 944 res = None 945 946 Save a reference to tasks passed to this function, to avoid 947 a task disappearing mid-execution. The event loop only keeps 948 weak references to tasks. A task that isn't referenced elsewhere 949 may get garbage collected at any time, even before it's done. 950 """ 951 inner = ensure_future(arg) 952 if inner.done(): 953 # Shortcut. 954 return inner 955 loop = futures._get_loop(inner) 956 outer = loop.create_future() 957 958 def _inner_done_callback(inner): 959 if outer.cancelled(): 960 if not inner.cancelled(): 961 # Mark inner's result as retrieved. 962 inner.exception() 963 return 964 965 if inner.cancelled(): 966 outer.cancel() 967 else: 968 exc = inner.exception() 969 if exc is not None: 970 outer.set_exception(exc) 971 else: 972 outer.set_result(inner.result()) 973 974 975 def _outer_done_callback(outer): 976 if not inner.done(): 977 inner.remove_done_callback(_inner_done_callback) 978 979 inner.add_done_callback(_inner_done_callback) 980 outer.add_done_callback(_outer_done_callback) 981 return outer 982 983 984def run_coroutine_threadsafe(coro, loop): 985 """Submit a coroutine object to a given event loop. 986 987 Return a concurrent.futures.Future to access the result. 988 """ 989 if not coroutines.iscoroutine(coro): 990 raise TypeError('A coroutine object is required') 991 future = concurrent.futures.Future() 992 993 def callback(): 994 try: 995 futures._chain_future(ensure_future(coro, loop=loop), future) 996 except (SystemExit, KeyboardInterrupt): 997 raise 998 except BaseException as exc: 999 if future.set_running_or_notify_cancel(): 1000 future.set_exception(exc) 1001 raise 1002 1003 loop.call_soon_threadsafe(callback) 1004 return future 1005 1006 1007def create_eager_task_factory(custom_task_constructor): 1008 """Create a function suitable for use as a task factory on an event-loop. 1009 1010 Example usage: 1011 1012 loop.set_task_factory( 1013 asyncio.create_eager_task_factory(my_task_constructor)) 1014 1015 Now, tasks created will be started immediately (rather than being first 1016 scheduled to an event loop). The constructor argument can be any callable 1017 that returns a Task-compatible object and has a signature compatible 1018 with `Task.__init__`; it must have the `eager_start` keyword argument. 1019 1020 Most applications will use `Task` for `custom_task_constructor` and in 1021 this case there's no need to call `create_eager_task_factory()` 1022 directly. Instead the global `eager_task_factory` instance can be 1023 used. E.g. `loop.set_task_factory(asyncio.eager_task_factory)`. 1024 """ 1025 1026 def factory(loop, coro, *, name=None, context=None): 1027 return custom_task_constructor( 1028 coro, loop=loop, name=name, context=context, eager_start=True) 1029 1030 return factory 1031 1032 1033eager_task_factory = create_eager_task_factory(Task) 1034 1035 1036# Collectively these two sets hold references to the complete set of active 1037# tasks. Eagerly executed tasks use a faster regular set as an optimization 1038# but may graduate to a WeakSet if the task blocks on IO. 1039_scheduled_tasks = weakref.WeakSet() 1040_eager_tasks = set() 1041 1042# Dictionary containing tasks that are currently active in 1043# all running event loops. {EventLoop: Task} 1044_current_tasks = {} 1045 1046 1047def _register_task(task): 1048 """Register an asyncio Task scheduled to run on an event loop.""" 1049 _scheduled_tasks.add(task) 1050 1051 1052def _register_eager_task(task): 1053 """Register an asyncio Task about to be eagerly executed.""" 1054 _eager_tasks.add(task) 1055 1056 1057def _enter_task(loop, task): 1058 current_task = _current_tasks.get(loop) 1059 if current_task is not None: 1060 raise RuntimeError(f"Cannot enter into task {task!r} while another " 1061 f"task {current_task!r} is being executed.") 1062 _current_tasks[loop] = task 1063 1064 1065def _leave_task(loop, task): 1066 current_task = _current_tasks.get(loop) 1067 if current_task is not task: 1068 raise RuntimeError(f"Leaving task {task!r} does not match " 1069 f"the current task {current_task!r}.") 1070 del _current_tasks[loop] 1071 1072 1073def _swap_current_task(loop, task): 1074 prev_task = _current_tasks.get(loop) 1075 if task is None: 1076 del _current_tasks[loop] 1077 else: 1078 _current_tasks[loop] = task 1079 return prev_task 1080 1081 1082def _unregister_task(task): 1083 """Unregister a completed, scheduled Task.""" 1084 _scheduled_tasks.discard(task) 1085 1086 1087def _unregister_eager_task(task): 1088 """Unregister a task which finished its first eager step.""" 1089 _eager_tasks.discard(task) 1090 1091 1092_py_current_task = current_task 1093_py_register_task = _register_task 1094_py_register_eager_task = _register_eager_task 1095_py_unregister_task = _unregister_task 1096_py_unregister_eager_task = _unregister_eager_task 1097_py_enter_task = _enter_task 1098_py_leave_task = _leave_task 1099_py_swap_current_task = _swap_current_task 1100 1101 1102try: 1103 from _asyncio import (_register_task, _register_eager_task, 1104 _unregister_task, _unregister_eager_task, 1105 _enter_task, _leave_task, _swap_current_task, 1106 _scheduled_tasks, _eager_tasks, _current_tasks, 1107 current_task) 1108except ImportError: 1109 pass 1110else: 1111 _c_current_task = current_task 1112 _c_register_task = _register_task 1113 _c_register_eager_task = _register_eager_task 1114 _c_unregister_task = _unregister_task 1115 _c_unregister_eager_task = _unregister_eager_task 1116 _c_enter_task = _enter_task 1117 _c_leave_task = _leave_task 1118 _c_swap_current_task = _swap_current_task 1119