1"""Support for tasks, coroutines and the scheduler.""" 2 3__all__ = ['Task', 4 'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED', 5 'wait', 'wait_for', 'as_completed', 'sleep', 'async', 6 'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe', 7 ] 8 9import concurrent.futures 10import functools 11import inspect 12import warnings 13import weakref 14 15from . import base_tasks 16from . import compat 17from . import coroutines 18from . import events 19from . import futures 20from .coroutines import coroutine 21 22 23class Task(futures.Future): 24 """A coroutine wrapped in a Future.""" 25 26 # An important invariant maintained while a Task not done: 27 # 28 # - Either _fut_waiter is None, and _step() is scheduled; 29 # - or _fut_waiter is some Future, and _step() is *not* scheduled. 30 # 31 # The only transition from the latter to the former is through 32 # _wakeup(). When _fut_waiter is not None, one of its callbacks 33 # must be _wakeup(). 34 35 # Weak set containing all tasks alive. 36 _all_tasks = weakref.WeakSet() 37 38 # Dictionary containing tasks that are currently active in 39 # all running event loops. {EventLoop: Task} 40 _current_tasks = {} 41 42 # If False, don't log a message if the task is destroyed whereas its 43 # status is still pending 44 _log_destroy_pending = True 45 46 @classmethod 47 def current_task(cls, loop=None): 48 """Return the currently running task in an event loop or None. 49 50 By default the current task for the current event loop is returned. 51 52 None is returned when called not in the context of a Task. 53 """ 54 if loop is None: 55 loop = events.get_event_loop() 56 return cls._current_tasks.get(loop) 57 58 @classmethod 59 def all_tasks(cls, loop=None): 60 """Return a set of all tasks for an event loop. 61 62 By default all tasks for the current event loop are returned. 63 """ 64 if loop is None: 65 loop = events.get_event_loop() 66 return {t for t in cls._all_tasks if t._loop is loop} 67 68 def __init__(self, coro, *, loop=None): 69 assert coroutines.iscoroutine(coro), repr(coro) 70 super().__init__(loop=loop) 71 if self._source_traceback: 72 del self._source_traceback[-1] 73 self._coro = coro 74 self._fut_waiter = None 75 self._must_cancel = False 76 self._loop.call_soon(self._step) 77 self.__class__._all_tasks.add(self) 78 79 # On Python 3.3 or older, objects with a destructor that are part of a 80 # reference cycle are never destroyed. That's not the case any more on 81 # Python 3.4 thanks to the PEP 442. 82 if compat.PY34: 83 def __del__(self): 84 if self._state == futures._PENDING and self._log_destroy_pending: 85 context = { 86 'task': self, 87 'message': 'Task was destroyed but it is pending!', 88 } 89 if self._source_traceback: 90 context['source_traceback'] = self._source_traceback 91 self._loop.call_exception_handler(context) 92 futures.Future.__del__(self) 93 94 def _repr_info(self): 95 return base_tasks._task_repr_info(self) 96 97 def get_stack(self, *, limit=None): 98 """Return the list of stack frames for this task's coroutine. 99 100 If the coroutine is not done, this returns the stack where it is 101 suspended. If the coroutine has completed successfully or was 102 cancelled, this returns an empty list. If the coroutine was 103 terminated by an exception, this returns the list of traceback 104 frames. 105 106 The frames are always ordered from oldest to newest. 107 108 The optional limit gives the maximum number of frames to 109 return; by default all available frames are returned. Its 110 meaning differs depending on whether a stack or a traceback is 111 returned: the newest frames of a stack are returned, but the 112 oldest frames of a traceback are returned. (This matches the 113 behavior of the traceback module.) 114 115 For reasons beyond our control, only one stack frame is 116 returned for a suspended coroutine. 117 """ 118 return base_tasks._task_get_stack(self, limit) 119 120 def print_stack(self, *, limit=None, file=None): 121 """Print the stack or traceback for this task's coroutine. 122 123 This produces output similar to that of the traceback module, 124 for the frames retrieved by get_stack(). The limit argument 125 is passed to get_stack(). The file argument is an I/O stream 126 to which the output is written; by default output is written 127 to sys.stderr. 128 """ 129 return base_tasks._task_print_stack(self, limit, file) 130 131 def cancel(self): 132 """Request that this task cancel itself. 133 134 This arranges for a CancelledError to be thrown into the 135 wrapped coroutine on the next cycle through the event loop. 136 The coroutine then has a chance to clean up or even deny 137 the request using try/except/finally. 138 139 Unlike Future.cancel, this does not guarantee that the 140 task will be cancelled: the exception might be caught and 141 acted upon, delaying cancellation of the task or preventing 142 cancellation completely. The task may also return a value or 143 raise a different exception. 144 145 Immediately after this method is called, Task.cancelled() will 146 not return True (unless the task was already cancelled). A 147 task will be marked as cancelled when the wrapped coroutine 148 terminates with a CancelledError exception (even if cancel() 149 was not called). 150 """ 151 if self.done(): 152 return False 153 if self._fut_waiter is not None: 154 if self._fut_waiter.cancel(): 155 # Leave self._fut_waiter; it may be a Task that 156 # catches and ignores the cancellation so we may have 157 # to cancel it again later. 158 return True 159 # It must be the case that self._step is already scheduled. 160 self._must_cancel = True 161 return True 162 163 def _step(self, exc=None): 164 assert not self.done(), \ 165 '_step(): already done: {!r}, {!r}'.format(self, exc) 166 if self._must_cancel: 167 if not isinstance(exc, futures.CancelledError): 168 exc = futures.CancelledError() 169 self._must_cancel = False 170 coro = self._coro 171 self._fut_waiter = None 172 173 self.__class__._current_tasks[self._loop] = self 174 # Call either coro.throw(exc) or coro.send(None). 175 try: 176 if exc is None: 177 # We use the `send` method directly, because coroutines 178 # don't have `__iter__` and `__next__` methods. 179 result = coro.send(None) 180 else: 181 result = coro.throw(exc) 182 except StopIteration as exc: 183 self.set_result(exc.value) 184 except futures.CancelledError: 185 super().cancel() # I.e., Future.cancel(self). 186 except Exception as exc: 187 self.set_exception(exc) 188 except BaseException as exc: 189 self.set_exception(exc) 190 raise 191 else: 192 blocking = getattr(result, '_asyncio_future_blocking', None) 193 if blocking is not None: 194 # Yielded Future must come from Future.__iter__(). 195 if result._loop is not self._loop: 196 self._loop.call_soon( 197 self._step, 198 RuntimeError( 199 'Task {!r} got Future {!r} attached to a ' 200 'different loop'.format(self, result))) 201 elif blocking: 202 if result is self: 203 self._loop.call_soon( 204 self._step, 205 RuntimeError( 206 'Task cannot await on itself: {!r}'.format( 207 self))) 208 else: 209 result._asyncio_future_blocking = False 210 result.add_done_callback(self._wakeup) 211 self._fut_waiter = result 212 if self._must_cancel: 213 if self._fut_waiter.cancel(): 214 self._must_cancel = False 215 else: 216 self._loop.call_soon( 217 self._step, 218 RuntimeError( 219 'yield was used instead of yield from ' 220 'in task {!r} with {!r}'.format(self, result))) 221 elif result is None: 222 # Bare yield relinquishes control for one event loop iteration. 223 self._loop.call_soon(self._step) 224 elif inspect.isgenerator(result): 225 # Yielding a generator is just wrong. 226 self._loop.call_soon( 227 self._step, 228 RuntimeError( 229 'yield was used instead of yield from for ' 230 'generator in task {!r} with {}'.format( 231 self, result))) 232 else: 233 # Yielding something else is an error. 234 self._loop.call_soon( 235 self._step, 236 RuntimeError( 237 'Task got bad yield: {!r}'.format(result))) 238 finally: 239 self.__class__._current_tasks.pop(self._loop) 240 self = None # Needed to break cycles when an exception occurs. 241 242 def _wakeup(self, future): 243 try: 244 future.result() 245 except Exception as exc: 246 # This may also be a cancellation. 247 self._step(exc) 248 else: 249 # Don't pass the value of `future.result()` explicitly, 250 # as `Future.__iter__` and `Future.__await__` don't need it. 251 # If we call `_step(value, None)` instead of `_step()`, 252 # Python eval loop would use `.send(value)` method call, 253 # instead of `__next__()`, which is slower for futures 254 # that return non-generator iterators from their `__iter__`. 255 self._step() 256 self = None # Needed to break cycles when an exception occurs. 257 258 259_PyTask = Task 260 261 262try: 263 import _asyncio 264except ImportError: 265 pass 266else: 267 # _CTask is needed for tests. 268 Task = _CTask = _asyncio.Task 269 270 271# wait() and as_completed() similar to those in PEP 3148. 272 273FIRST_COMPLETED = concurrent.futures.FIRST_COMPLETED 274FIRST_EXCEPTION = concurrent.futures.FIRST_EXCEPTION 275ALL_COMPLETED = concurrent.futures.ALL_COMPLETED 276 277 278@coroutine 279def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED): 280 """Wait for the Futures and coroutines given by fs to complete. 281 282 The sequence futures must not be empty. 283 284 Coroutines will be wrapped in Tasks. 285 286 Returns two sets of Future: (done, pending). 287 288 Usage: 289 290 done, pending = yield from asyncio.wait(fs) 291 292 Note: This does not raise TimeoutError! Futures that aren't done 293 when the timeout occurs are returned in the second set. 294 """ 295 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 296 raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 297 if not fs: 298 raise ValueError('Set of coroutines/Futures is empty.') 299 if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED): 300 raise ValueError('Invalid return_when value: {}'.format(return_when)) 301 302 if loop is None: 303 loop = events.get_event_loop() 304 305 fs = {ensure_future(f, loop=loop) for f in set(fs)} 306 307 return (yield from _wait(fs, timeout, return_when, loop)) 308 309 310def _release_waiter(waiter, *args): 311 if not waiter.done(): 312 waiter.set_result(None) 313 314 315@coroutine 316def wait_for(fut, timeout, *, loop=None): 317 """Wait for the single Future or coroutine to complete, with timeout. 318 319 Coroutine will be wrapped in Task. 320 321 Returns result of the Future or coroutine. When a timeout occurs, 322 it cancels the task and raises TimeoutError. To avoid the task 323 cancellation, wrap it in shield(). 324 325 If the wait is cancelled, the task is also cancelled. 326 327 This function is a coroutine. 328 """ 329 if loop is None: 330 loop = events.get_event_loop() 331 332 if timeout is None: 333 return (yield from fut) 334 335 waiter = loop.create_future() 336 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 337 cb = functools.partial(_release_waiter, waiter) 338 339 fut = ensure_future(fut, loop=loop) 340 fut.add_done_callback(cb) 341 342 try: 343 # wait until the future completes or the timeout 344 try: 345 yield from waiter 346 except futures.CancelledError: 347 fut.remove_done_callback(cb) 348 fut.cancel() 349 raise 350 351 if fut.done(): 352 return fut.result() 353 else: 354 fut.remove_done_callback(cb) 355 fut.cancel() 356 raise futures.TimeoutError() 357 finally: 358 timeout_handle.cancel() 359 360 361@coroutine 362def _wait(fs, timeout, return_when, loop): 363 """Internal helper for wait() and wait_for(). 364 365 The fs argument must be a collection of Futures. 366 """ 367 assert fs, 'Set of Futures is empty.' 368 waiter = loop.create_future() 369 timeout_handle = None 370 if timeout is not None: 371 timeout_handle = loop.call_later(timeout, _release_waiter, waiter) 372 counter = len(fs) 373 374 def _on_completion(f): 375 nonlocal counter 376 counter -= 1 377 if (counter <= 0 or 378 return_when == FIRST_COMPLETED or 379 return_when == FIRST_EXCEPTION and (not f.cancelled() and 380 f.exception() is not None)): 381 if timeout_handle is not None: 382 timeout_handle.cancel() 383 if not waiter.done(): 384 waiter.set_result(None) 385 386 for f in fs: 387 f.add_done_callback(_on_completion) 388 389 try: 390 yield from waiter 391 finally: 392 if timeout_handle is not None: 393 timeout_handle.cancel() 394 395 done, pending = set(), set() 396 for f in fs: 397 f.remove_done_callback(_on_completion) 398 if f.done(): 399 done.add(f) 400 else: 401 pending.add(f) 402 return done, pending 403 404 405# This is *not* a @coroutine! It is just an iterator (yielding Futures). 406def as_completed(fs, *, loop=None, timeout=None): 407 """Return an iterator whose values are coroutines. 408 409 When waiting for the yielded coroutines you'll get the results (or 410 exceptions!) of the original Futures (or coroutines), in the order 411 in which and as soon as they complete. 412 413 This differs from PEP 3148; the proper way to use this is: 414 415 for f in as_completed(fs): 416 result = yield from f # The 'yield from' may raise. 417 # Use result. 418 419 If a timeout is specified, the 'yield from' will raise 420 TimeoutError when the timeout occurs before all Futures are done. 421 422 Note: The futures 'f' are not necessarily members of fs. 423 """ 424 if futures.isfuture(fs) or coroutines.iscoroutine(fs): 425 raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 426 loop = loop if loop is not None else events.get_event_loop() 427 todo = {ensure_future(f, loop=loop) for f in set(fs)} 428 from .queues import Queue # Import here to avoid circular import problem. 429 done = Queue(loop=loop) 430 timeout_handle = None 431 432 def _on_timeout(): 433 for f in todo: 434 f.remove_done_callback(_on_completion) 435 done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 436 todo.clear() # Can't do todo.remove(f) in the loop. 437 438 def _on_completion(f): 439 if not todo: 440 return # _on_timeout() was here first. 441 todo.remove(f) 442 done.put_nowait(f) 443 if not todo and timeout_handle is not None: 444 timeout_handle.cancel() 445 446 @coroutine 447 def _wait_for_one(): 448 f = yield from done.get() 449 if f is None: 450 # Dummy value from _on_timeout(). 451 raise futures.TimeoutError 452 return f.result() # May raise f.exception(). 453 454 for f in todo: 455 f.add_done_callback(_on_completion) 456 if todo and timeout is not None: 457 timeout_handle = loop.call_later(timeout, _on_timeout) 458 for _ in range(len(todo)): 459 yield _wait_for_one() 460 461 462@coroutine 463def sleep(delay, result=None, *, loop=None): 464 """Coroutine that completes after a given time (in seconds).""" 465 if delay == 0: 466 yield 467 return result 468 469 if loop is None: 470 loop = events.get_event_loop() 471 future = loop.create_future() 472 h = future._loop.call_later(delay, 473 futures._set_result_unless_cancelled, 474 future, result) 475 try: 476 return (yield from future) 477 finally: 478 h.cancel() 479 480 481def async_(coro_or_future, *, loop=None): 482 """Wrap a coroutine in a future. 483 484 If the argument is a Future, it is returned directly. 485 486 This function is deprecated in 3.5. Use asyncio.ensure_future() instead. 487 """ 488 489 warnings.warn("asyncio.async() function is deprecated, use ensure_future()", 490 DeprecationWarning, 491 stacklevel=2) 492 493 return ensure_future(coro_or_future, loop=loop) 494 495# Silence DeprecationWarning: 496globals()['async'] = async_ 497async_.__name__ = 'async' 498del async_ 499 500 501def ensure_future(coro_or_future, *, loop=None): 502 """Wrap a coroutine or an awaitable in a future. 503 504 If the argument is a Future, it is returned directly. 505 """ 506 if futures.isfuture(coro_or_future): 507 if loop is not None and loop is not coro_or_future._loop: 508 raise ValueError('loop argument must agree with Future') 509 return coro_or_future 510 elif coroutines.iscoroutine(coro_or_future): 511 if loop is None: 512 loop = events.get_event_loop() 513 task = loop.create_task(coro_or_future) 514 if task._source_traceback: 515 del task._source_traceback[-1] 516 return task 517 elif compat.PY35 and inspect.isawaitable(coro_or_future): 518 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) 519 else: 520 raise TypeError('A Future, a coroutine or an awaitable is required') 521 522 523@coroutine 524def _wrap_awaitable(awaitable): 525 """Helper for asyncio.ensure_future(). 526 527 Wraps awaitable (an object with __await__) into a coroutine 528 that will later be wrapped in a Task by ensure_future(). 529 """ 530 return (yield from awaitable.__await__()) 531 532 533class _GatheringFuture(futures.Future): 534 """Helper for gather(). 535 536 This overrides cancel() to cancel all the children and act more 537 like Task.cancel(), which doesn't immediately mark itself as 538 cancelled. 539 """ 540 541 def __init__(self, children, *, loop=None): 542 super().__init__(loop=loop) 543 self._children = children 544 545 def cancel(self): 546 if self.done(): 547 return False 548 ret = False 549 for child in self._children: 550 if child.cancel(): 551 ret = True 552 return ret 553 554 555def gather(*coros_or_futures, loop=None, return_exceptions=False): 556 """Return a future aggregating results from the given coroutines 557 or futures. 558 559 Coroutines will be wrapped in a future and scheduled in the event 560 loop. They will not necessarily be scheduled in the same order as 561 passed in. 562 563 All futures must share the same event loop. If all the tasks are 564 done successfully, the returned future's result is the list of 565 results (in the order of the original sequence, not necessarily 566 the order of results arrival). If *return_exceptions* is True, 567 exceptions in the tasks are treated the same as successful 568 results, and gathered in the result list; otherwise, the first 569 raised exception will be immediately propagated to the returned 570 future. 571 572 Cancellation: if the outer Future is cancelled, all children (that 573 have not completed yet) are also cancelled. If any child is 574 cancelled, this is treated as if it raised CancelledError -- 575 the outer Future is *not* cancelled in this case. (This is to 576 prevent the cancellation of one child to cause other children to 577 be cancelled.) 578 """ 579 if not coros_or_futures: 580 if loop is None: 581 loop = events.get_event_loop() 582 outer = loop.create_future() 583 outer.set_result([]) 584 return outer 585 586 arg_to_fut = {} 587 for arg in set(coros_or_futures): 588 if not futures.isfuture(arg): 589 fut = ensure_future(arg, loop=loop) 590 if loop is None: 591 loop = fut._loop 592 # The caller cannot control this future, the "destroy pending task" 593 # warning should not be emitted. 594 fut._log_destroy_pending = False 595 else: 596 fut = arg 597 if loop is None: 598 loop = fut._loop 599 elif fut._loop is not loop: 600 raise ValueError("futures are tied to different event loops") 601 arg_to_fut[arg] = fut 602 603 children = [arg_to_fut[arg] for arg in coros_or_futures] 604 nchildren = len(children) 605 outer = _GatheringFuture(children, loop=loop) 606 nfinished = 0 607 results = [None] * nchildren 608 609 def _done_callback(i, fut): 610 nonlocal nfinished 611 if outer.done(): 612 if not fut.cancelled(): 613 # Mark exception retrieved. 614 fut.exception() 615 return 616 617 if fut.cancelled(): 618 res = futures.CancelledError() 619 if not return_exceptions: 620 outer.set_exception(res) 621 return 622 elif fut._exception is not None: 623 res = fut.exception() # Mark exception retrieved. 624 if not return_exceptions: 625 outer.set_exception(res) 626 return 627 else: 628 res = fut._result 629 results[i] = res 630 nfinished += 1 631 if nfinished == nchildren: 632 outer.set_result(results) 633 634 for i, fut in enumerate(children): 635 fut.add_done_callback(functools.partial(_done_callback, i)) 636 return outer 637 638 639def shield(arg, *, loop=None): 640 """Wait for a future, shielding it from cancellation. 641 642 The statement 643 644 res = yield from shield(something()) 645 646 is exactly equivalent to the statement 647 648 res = yield from something() 649 650 *except* that if the coroutine containing it is cancelled, the 651 task running in something() is not cancelled. From the POV of 652 something(), the cancellation did not happen. But its caller is 653 still cancelled, so the yield-from expression still raises 654 CancelledError. Note: If something() is cancelled by other means 655 this will still cancel shield(). 656 657 If you want to completely ignore cancellation (not recommended) 658 you can combine shield() with a try/except clause, as follows: 659 660 try: 661 res = yield from shield(something()) 662 except CancelledError: 663 res = None 664 """ 665 inner = ensure_future(arg, loop=loop) 666 if inner.done(): 667 # Shortcut. 668 return inner 669 loop = inner._loop 670 outer = loop.create_future() 671 672 def _done_callback(inner): 673 if outer.cancelled(): 674 if not inner.cancelled(): 675 # Mark inner's result as retrieved. 676 inner.exception() 677 return 678 679 if inner.cancelled(): 680 outer.cancel() 681 else: 682 exc = inner.exception() 683 if exc is not None: 684 outer.set_exception(exc) 685 else: 686 outer.set_result(inner.result()) 687 688 inner.add_done_callback(_done_callback) 689 return outer 690 691 692def run_coroutine_threadsafe(coro, loop): 693 """Submit a coroutine object to a given event loop. 694 695 Return a concurrent.futures.Future to access the result. 696 """ 697 if not coroutines.iscoroutine(coro): 698 raise TypeError('A coroutine object is required') 699 future = concurrent.futures.Future() 700 701 def callback(): 702 try: 703 futures._chain_future(ensure_future(coro, loop=loop), future) 704 except Exception as exc: 705 if future.set_running_or_notify_cancel(): 706 future.set_exception(exc) 707 raise 708 709 loop.call_soon_threadsafe(callback) 710 return future 711