1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import gc 7import io 8import random 9import re 10import sys 11import traceback 12import types 13import unittest 14from unittest import mock 15from types import GenericAlias 16 17import asyncio 18from asyncio import futures 19from asyncio import tasks 20from test.test_asyncio import utils as test_utils 21from test import support 22from test.support.script_helper import assert_python_ok 23 24 25def tearDownModule(): 26 asyncio.set_event_loop_policy(None) 27 28 29async def coroutine_function(): 30 pass 31 32 33def format_coroutine(qualname, state, src, source_traceback, generator=False): 34 if generator: 35 state = '%s' % state 36 else: 37 state = '%s, defined' % state 38 if source_traceback is not None: 39 frame = source_traceback[-1] 40 return ('coro=<%s() %s at %s> created at %s:%s' 41 % (qualname, state, src, frame[0], frame[1])) 42 else: 43 return 'coro=<%s() %s at %s>' % (qualname, state, src) 44 45 46def get_innermost_context(exc): 47 """ 48 Return information about the innermost exception context in the chain. 49 """ 50 depth = 0 51 while True: 52 context = exc.__context__ 53 if context is None: 54 break 55 56 exc = context 57 depth += 1 58 59 return (type(exc), exc.args, depth) 60 61 62class Dummy: 63 64 def __repr__(self): 65 return '<Dummy>' 66 67 def __call__(self, *args): 68 pass 69 70 71class CoroLikeObject: 72 def send(self, v): 73 raise StopIteration(42) 74 75 def throw(self, *exc): 76 pass 77 78 def close(self): 79 pass 80 81 def __await__(self): 82 return self 83 84 85class BaseTaskTests: 86 87 Task = None 88 Future = None 89 90 def new_task(self, loop, coro, name='TestTask', context=None): 91 return self.__class__.Task(coro, loop=loop, name=name, context=context) 92 93 def new_future(self, loop): 94 return self.__class__.Future(loop=loop) 95 96 def setUp(self): 97 super().setUp() 98 self.loop = self.new_test_loop() 99 self.loop.set_task_factory(self.new_task) 100 self.loop.create_future = lambda: self.new_future(self.loop) 101 102 def test_generic_alias(self): 103 task = self.__class__.Task[str] 104 self.assertEqual(task.__args__, (str,)) 105 self.assertIsInstance(task, GenericAlias) 106 107 def test_task_cancel_message_getter(self): 108 async def coro(): 109 pass 110 t = self.new_task(self.loop, coro()) 111 self.assertTrue(hasattr(t, '_cancel_message')) 112 self.assertEqual(t._cancel_message, None) 113 114 t.cancel('my message') 115 self.assertEqual(t._cancel_message, 'my message') 116 117 with self.assertRaises(asyncio.CancelledError) as cm: 118 self.loop.run_until_complete(t) 119 120 self.assertEqual('my message', cm.exception.args[0]) 121 122 def test_task_cancel_message_setter(self): 123 async def coro(): 124 pass 125 t = self.new_task(self.loop, coro()) 126 t.cancel('my message') 127 t._cancel_message = 'my new message' 128 self.assertEqual(t._cancel_message, 'my new message') 129 130 with self.assertRaises(asyncio.CancelledError) as cm: 131 self.loop.run_until_complete(t) 132 133 self.assertEqual('my new message', cm.exception.args[0]) 134 135 def test_task_del_collect(self): 136 class Evil: 137 def __del__(self): 138 gc.collect() 139 140 async def run(): 141 return Evil() 142 143 self.loop.run_until_complete( 144 asyncio.gather(*[ 145 self.new_task(self.loop, run()) for _ in range(100) 146 ])) 147 148 def test_other_loop_future(self): 149 other_loop = asyncio.new_event_loop() 150 fut = self.new_future(other_loop) 151 152 async def run(fut): 153 await fut 154 155 try: 156 with self.assertRaisesRegex(RuntimeError, 157 r'Task .* got Future .* attached'): 158 self.loop.run_until_complete(run(fut)) 159 finally: 160 other_loop.close() 161 162 def test_task_awaits_on_itself(self): 163 164 async def test(): 165 await task 166 167 task = asyncio.ensure_future(test(), loop=self.loop) 168 169 with self.assertRaisesRegex(RuntimeError, 170 'Task cannot await on itself'): 171 self.loop.run_until_complete(task) 172 173 def test_task_class(self): 174 async def notmuch(): 175 return 'ok' 176 t = self.new_task(self.loop, notmuch()) 177 self.loop.run_until_complete(t) 178 self.assertTrue(t.done()) 179 self.assertEqual(t.result(), 'ok') 180 self.assertIs(t._loop, self.loop) 181 self.assertIs(t.get_loop(), self.loop) 182 183 loop = asyncio.new_event_loop() 184 self.set_event_loop(loop) 185 t = self.new_task(loop, notmuch()) 186 self.assertIs(t._loop, loop) 187 loop.run_until_complete(t) 188 loop.close() 189 190 def test_ensure_future_coroutine(self): 191 async def notmuch(): 192 return 'ok' 193 t = asyncio.ensure_future(notmuch(), loop=self.loop) 194 self.assertIs(t._loop, self.loop) 195 self.loop.run_until_complete(t) 196 self.assertTrue(t.done()) 197 self.assertEqual(t.result(), 'ok') 198 199 a = notmuch() 200 self.addCleanup(a.close) 201 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 202 asyncio.ensure_future(a) 203 204 async def test(): 205 return asyncio.ensure_future(notmuch()) 206 t = self.loop.run_until_complete(test()) 207 self.assertIs(t._loop, self.loop) 208 self.loop.run_until_complete(t) 209 self.assertTrue(t.done()) 210 self.assertEqual(t.result(), 'ok') 211 212 # Deprecated in 3.10, undeprecated in 3.12 213 asyncio.set_event_loop(self.loop) 214 self.addCleanup(asyncio.set_event_loop, None) 215 t = asyncio.ensure_future(notmuch()) 216 self.assertIs(t._loop, self.loop) 217 self.loop.run_until_complete(t) 218 self.assertTrue(t.done()) 219 self.assertEqual(t.result(), 'ok') 220 221 def test_ensure_future_future(self): 222 f_orig = self.new_future(self.loop) 223 f_orig.set_result('ko') 224 225 f = asyncio.ensure_future(f_orig) 226 self.loop.run_until_complete(f) 227 self.assertTrue(f.done()) 228 self.assertEqual(f.result(), 'ko') 229 self.assertIs(f, f_orig) 230 231 loop = asyncio.new_event_loop() 232 self.set_event_loop(loop) 233 234 with self.assertRaises(ValueError): 235 f = asyncio.ensure_future(f_orig, loop=loop) 236 237 loop.close() 238 239 f = asyncio.ensure_future(f_orig, loop=self.loop) 240 self.assertIs(f, f_orig) 241 242 def test_ensure_future_task(self): 243 async def notmuch(): 244 return 'ok' 245 t_orig = self.new_task(self.loop, notmuch()) 246 t = asyncio.ensure_future(t_orig) 247 self.loop.run_until_complete(t) 248 self.assertTrue(t.done()) 249 self.assertEqual(t.result(), 'ok') 250 self.assertIs(t, t_orig) 251 252 loop = asyncio.new_event_loop() 253 self.set_event_loop(loop) 254 255 with self.assertRaises(ValueError): 256 t = asyncio.ensure_future(t_orig, loop=loop) 257 258 loop.close() 259 260 t = asyncio.ensure_future(t_orig, loop=self.loop) 261 self.assertIs(t, t_orig) 262 263 def test_ensure_future_awaitable(self): 264 class Aw: 265 def __init__(self, coro): 266 self.coro = coro 267 def __await__(self): 268 return self.coro.__await__() 269 270 async def coro(): 271 return 'ok' 272 273 loop = asyncio.new_event_loop() 274 self.set_event_loop(loop) 275 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 276 loop.run_until_complete(fut) 277 self.assertEqual(fut.result(), 'ok') 278 279 def test_ensure_future_task_awaitable(self): 280 class Aw: 281 def __await__(self): 282 return asyncio.sleep(0, result='ok').__await__() 283 284 loop = asyncio.new_event_loop() 285 self.set_event_loop(loop) 286 task = asyncio.ensure_future(Aw(), loop=loop) 287 loop.run_until_complete(task) 288 self.assertTrue(task.done()) 289 self.assertEqual(task.result(), 'ok') 290 self.assertIsInstance(task.get_coro(), types.CoroutineType) 291 loop.close() 292 293 def test_ensure_future_neither(self): 294 with self.assertRaises(TypeError): 295 asyncio.ensure_future('ok') 296 297 def test_ensure_future_error_msg(self): 298 loop = asyncio.new_event_loop() 299 f = self.new_future(self.loop) 300 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 301 'different loop than the one specified as ' 302 'the loop argument'): 303 asyncio.ensure_future(f, loop=loop) 304 loop.close() 305 306 def test_get_stack(self): 307 T = None 308 309 async def foo(): 310 await bar() 311 312 async def bar(): 313 # test get_stack() 314 f = T.get_stack(limit=1) 315 try: 316 self.assertEqual(f[0].f_code.co_name, 'foo') 317 finally: 318 f = None 319 320 # test print_stack() 321 file = io.StringIO() 322 T.print_stack(limit=1, file=file) 323 file.seek(0) 324 tb = file.read() 325 self.assertRegex(tb, r'foo\(\) running') 326 327 async def runner(): 328 nonlocal T 329 T = asyncio.ensure_future(foo(), loop=self.loop) 330 await T 331 332 self.loop.run_until_complete(runner()) 333 334 def test_task_repr(self): 335 self.loop.set_debug(False) 336 337 async def notmuch(): 338 return 'abc' 339 340 # test coroutine function 341 self.assertEqual(notmuch.__name__, 'notmuch') 342 self.assertRegex(notmuch.__qualname__, 343 r'\w+.test_task_repr.<locals>.notmuch') 344 self.assertEqual(notmuch.__module__, __name__) 345 346 filename, lineno = test_utils.get_function_source(notmuch) 347 src = "%s:%s" % (filename, lineno) 348 349 # test coroutine object 350 gen = notmuch() 351 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 352 self.assertEqual(gen.__name__, 'notmuch') 353 self.assertEqual(gen.__qualname__, coro_qualname) 354 355 # test pending Task 356 t = self.new_task(self.loop, gen) 357 t.add_done_callback(Dummy()) 358 359 coro = format_coroutine(coro_qualname, 'running', src, 360 t._source_traceback, generator=True) 361 self.assertEqual(repr(t), 362 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 363 364 # test cancelling Task 365 t.cancel() # Does not take immediate effect! 366 self.assertEqual(repr(t), 367 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 368 369 # test cancelled Task 370 self.assertRaises(asyncio.CancelledError, 371 self.loop.run_until_complete, t) 372 coro = format_coroutine(coro_qualname, 'done', src, 373 t._source_traceback) 374 self.assertEqual(repr(t), 375 "<Task cancelled name='TestTask' %s>" % coro) 376 377 # test finished Task 378 t = self.new_task(self.loop, notmuch()) 379 self.loop.run_until_complete(t) 380 coro = format_coroutine(coro_qualname, 'done', src, 381 t._source_traceback) 382 self.assertEqual(repr(t), 383 "<Task finished name='TestTask' %s result='abc'>" % coro) 384 385 def test_task_repr_autogenerated(self): 386 async def notmuch(): 387 return 123 388 389 t1 = self.new_task(self.loop, notmuch(), None) 390 t2 = self.new_task(self.loop, notmuch(), None) 391 self.assertNotEqual(repr(t1), repr(t2)) 392 393 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 394 self.assertIsNotNone(match1) 395 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 396 self.assertIsNotNone(match2) 397 398 # Autogenerated task names should have monotonically increasing numbers 399 self.assertLess(int(match1.group(1)), int(match2.group(1))) 400 self.loop.run_until_complete(t1) 401 self.loop.run_until_complete(t2) 402 403 def test_task_set_name_pylong(self): 404 # test that setting the task name to a PyLong explicitly doesn't 405 # incorrectly trigger the deferred name formatting logic 406 async def notmuch(): 407 return 123 408 409 t = self.new_task(self.loop, notmuch(), name=987654321) 410 self.assertEqual(t.get_name(), '987654321') 411 t.set_name(123456789) 412 self.assertEqual(t.get_name(), '123456789') 413 self.loop.run_until_complete(t) 414 415 def test_task_repr_name_not_str(self): 416 async def notmuch(): 417 return 123 418 419 t = self.new_task(self.loop, notmuch()) 420 t.set_name({6}) 421 self.assertEqual(t.get_name(), '{6}') 422 self.loop.run_until_complete(t) 423 424 def test_task_repr_wait_for(self): 425 self.loop.set_debug(False) 426 427 async def wait_for(fut): 428 return await fut 429 430 fut = self.new_future(self.loop) 431 task = self.new_task(self.loop, wait_for(fut)) 432 test_utils.run_briefly(self.loop) 433 self.assertRegex(repr(task), 434 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 435 436 fut.set_result(None) 437 self.loop.run_until_complete(task) 438 439 def test_task_basics(self): 440 441 async def outer(): 442 a = await inner1() 443 b = await inner2() 444 return a+b 445 446 async def inner1(): 447 return 42 448 449 async def inner2(): 450 return 1000 451 452 t = outer() 453 self.assertEqual(self.loop.run_until_complete(t), 1042) 454 455 def test_exception_chaining_after_await(self): 456 # Test that when awaiting on a task when an exception is already 457 # active, if the task raises an exception it will be chained 458 # with the original. 459 loop = asyncio.new_event_loop() 460 self.set_event_loop(loop) 461 462 async def raise_error(): 463 raise ValueError 464 465 async def run(): 466 try: 467 raise KeyError(3) 468 except Exception as exc: 469 task = self.new_task(loop, raise_error()) 470 try: 471 await task 472 except Exception as exc: 473 self.assertEqual(type(exc), ValueError) 474 chained = exc.__context__ 475 self.assertEqual((type(chained), chained.args), 476 (KeyError, (3,))) 477 478 try: 479 task = self.new_task(loop, run()) 480 loop.run_until_complete(task) 481 finally: 482 loop.close() 483 484 def test_exception_chaining_after_await_with_context_cycle(self): 485 # Check trying to create an exception context cycle: 486 # https://bugs.python.org/issue40696 487 has_cycle = None 488 loop = asyncio.new_event_loop() 489 self.set_event_loop(loop) 490 491 async def process_exc(exc): 492 raise exc 493 494 async def run(): 495 nonlocal has_cycle 496 try: 497 raise KeyError('a') 498 except Exception as exc: 499 task = self.new_task(loop, process_exc(exc)) 500 try: 501 await task 502 except BaseException as exc: 503 has_cycle = (exc is exc.__context__) 504 # Prevent a hang if has_cycle is True. 505 exc.__context__ = None 506 507 try: 508 task = self.new_task(loop, run()) 509 loop.run_until_complete(task) 510 finally: 511 loop.close() 512 # This also distinguishes from the initial has_cycle=None. 513 self.assertEqual(has_cycle, False) 514 515 516 def test_cancelling(self): 517 loop = asyncio.new_event_loop() 518 519 async def task(): 520 await asyncio.sleep(10) 521 522 try: 523 t = self.new_task(loop, task()) 524 self.assertFalse(t.cancelling()) 525 self.assertNotIn(" cancelling ", repr(t)) 526 self.assertTrue(t.cancel()) 527 self.assertTrue(t.cancelling()) 528 self.assertIn(" cancelling ", repr(t)) 529 530 # Since we commented out two lines from Task.cancel(), 531 # this t.cancel() call now returns True. 532 # self.assertFalse(t.cancel()) 533 self.assertTrue(t.cancel()) 534 535 with self.assertRaises(asyncio.CancelledError): 536 loop.run_until_complete(t) 537 finally: 538 loop.close() 539 540 def test_uncancel_basic(self): 541 loop = asyncio.new_event_loop() 542 543 async def task(): 544 try: 545 await asyncio.sleep(10) 546 except asyncio.CancelledError: 547 asyncio.current_task().uncancel() 548 await asyncio.sleep(10) 549 550 try: 551 t = self.new_task(loop, task()) 552 loop.run_until_complete(asyncio.sleep(0.01)) 553 554 # Cancel first sleep 555 self.assertTrue(t.cancel()) 556 self.assertIn(" cancelling ", repr(t)) 557 self.assertEqual(t.cancelling(), 1) 558 self.assertFalse(t.cancelled()) # Task is still not complete 559 loop.run_until_complete(asyncio.sleep(0.01)) 560 561 # after .uncancel() 562 self.assertNotIn(" cancelling ", repr(t)) 563 self.assertEqual(t.cancelling(), 0) 564 self.assertFalse(t.cancelled()) # Task is still not complete 565 566 # Cancel second sleep 567 self.assertTrue(t.cancel()) 568 self.assertEqual(t.cancelling(), 1) 569 self.assertFalse(t.cancelled()) # Task is still not complete 570 with self.assertRaises(asyncio.CancelledError): 571 loop.run_until_complete(t) 572 self.assertTrue(t.cancelled()) # Finally, task complete 573 self.assertTrue(t.done()) 574 575 # uncancel is no longer effective after the task is complete 576 t.uncancel() 577 self.assertTrue(t.cancelled()) 578 self.assertTrue(t.done()) 579 finally: 580 loop.close() 581 582 def test_uncancel_structured_blocks(self): 583 # This test recreates the following high-level structure using uncancel():: 584 # 585 # async def make_request_with_timeout(): 586 # try: 587 # async with asyncio.timeout(1): 588 # # Structured block affected by the timeout: 589 # await make_request() 590 # await make_another_request() 591 # except TimeoutError: 592 # pass # There was a timeout 593 # # Outer code not affected by the timeout: 594 # await unrelated_code() 595 596 loop = asyncio.new_event_loop() 597 598 async def make_request_with_timeout(*, sleep: float, timeout: float): 599 task = asyncio.current_task() 600 loop = task.get_loop() 601 602 timed_out = False 603 structured_block_finished = False 604 outer_code_reached = False 605 606 def on_timeout(): 607 nonlocal timed_out 608 timed_out = True 609 task.cancel() 610 611 timeout_handle = loop.call_later(timeout, on_timeout) 612 try: 613 try: 614 # Structured block affected by the timeout 615 await asyncio.sleep(sleep) 616 structured_block_finished = True 617 finally: 618 timeout_handle.cancel() 619 if ( 620 timed_out 621 and task.uncancel() == 0 622 and type(sys.exception()) is asyncio.CancelledError 623 ): 624 # Note the five rules that are needed here to satisfy proper 625 # uncancellation: 626 # 627 # 1. handle uncancellation in a `finally:` block to allow for 628 # plain returns; 629 # 2. our `timed_out` flag is set, meaning that it was our event 630 # that triggered the need to uncancel the task, regardless of 631 # what exception is raised; 632 # 3. we can call `uncancel()` because *we* called `cancel()` 633 # before; 634 # 4. we call `uncancel()` but we only continue converting the 635 # CancelledError to TimeoutError if `uncancel()` caused the 636 # cancellation request count go down to 0. We need to look 637 # at the counter vs having a simple boolean flag because our 638 # code might have been nested (think multiple timeouts). See 639 # commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for 640 # details. 641 # 5. we only convert CancelledError to TimeoutError; for other 642 # exceptions raised due to the cancellation (like 643 # a ConnectionLostError from a database client), simply 644 # propagate them. 645 # 646 # Those checks need to take place in this exact order to make 647 # sure the `cancelling()` counter always stays in sync. 648 # 649 # Additionally, the original stimulus to `cancel()` the task 650 # needs to be unscheduled to avoid re-cancelling the task later. 651 # Here we do it by cancelling `timeout_handle` in the `finally:` 652 # block. 653 raise TimeoutError 654 except TimeoutError: 655 self.assertTrue(timed_out) 656 657 # Outer code not affected by the timeout: 658 outer_code_reached = True 659 await asyncio.sleep(0) 660 return timed_out, structured_block_finished, outer_code_reached 661 662 try: 663 # Test which timed out. 664 t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1)) 665 timed_out, structured_block_finished, outer_code_reached = ( 666 loop.run_until_complete(t1) 667 ) 668 self.assertTrue(timed_out) 669 self.assertFalse(structured_block_finished) # it was cancelled 670 self.assertTrue(outer_code_reached) # task got uncancelled after leaving 671 # the structured block and continued until 672 # completion 673 self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task 674 675 # Test which did not time out. 676 t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0)) 677 timed_out, structured_block_finished, outer_code_reached = ( 678 loop.run_until_complete(t2) 679 ) 680 self.assertFalse(timed_out) 681 self.assertTrue(structured_block_finished) 682 self.assertTrue(outer_code_reached) 683 self.assertEqual(t2.cancelling(), 0) 684 finally: 685 loop.close() 686 687 def test_uncancel_resets_must_cancel(self): 688 689 async def coro(): 690 await fut 691 return 42 692 693 loop = asyncio.new_event_loop() 694 fut = asyncio.Future(loop=loop) 695 task = self.new_task(loop, coro()) 696 loop.run_until_complete(asyncio.sleep(0)) # Get task waiting for fut 697 fut.set_result(None) # Make task runnable 698 try: 699 task.cancel() # Enter cancelled state 700 self.assertEqual(task.cancelling(), 1) 701 self.assertTrue(task._must_cancel) 702 703 task.uncancel() # Undo cancellation 704 self.assertEqual(task.cancelling(), 0) 705 self.assertFalse(task._must_cancel) 706 finally: 707 res = loop.run_until_complete(task) 708 self.assertEqual(res, 42) 709 loop.close() 710 711 def test_cancel(self): 712 713 def gen(): 714 when = yield 715 self.assertAlmostEqual(10.0, when) 716 yield 0 717 718 loop = self.new_test_loop(gen) 719 720 async def task(): 721 await asyncio.sleep(10.0) 722 return 12 723 724 t = self.new_task(loop, task()) 725 loop.call_soon(t.cancel) 726 with self.assertRaises(asyncio.CancelledError): 727 loop.run_until_complete(t) 728 self.assertTrue(t.done()) 729 self.assertTrue(t.cancelled()) 730 self.assertFalse(t.cancel()) 731 732 def test_cancel_with_message_then_future_result(self): 733 # Test Future.result() after calling cancel() with a message. 734 cases = [ 735 ((), ()), 736 ((None,), ()), 737 (('my message',), ('my message',)), 738 # Non-string values should roundtrip. 739 ((5,), (5,)), 740 ] 741 for cancel_args, expected_args in cases: 742 with self.subTest(cancel_args=cancel_args): 743 loop = asyncio.new_event_loop() 744 self.set_event_loop(loop) 745 746 async def sleep(): 747 await asyncio.sleep(10) 748 749 async def coro(): 750 task = self.new_task(loop, sleep()) 751 await asyncio.sleep(0) 752 task.cancel(*cancel_args) 753 done, pending = await asyncio.wait([task]) 754 task.result() 755 756 task = self.new_task(loop, coro()) 757 with self.assertRaises(asyncio.CancelledError) as cm: 758 loop.run_until_complete(task) 759 exc = cm.exception 760 self.assertEqual(exc.args, expected_args) 761 762 actual = get_innermost_context(exc) 763 self.assertEqual(actual, 764 (asyncio.CancelledError, expected_args, 0)) 765 766 def test_cancel_with_message_then_future_exception(self): 767 # Test Future.exception() after calling cancel() with a message. 768 cases = [ 769 ((), ()), 770 ((None,), ()), 771 (('my message',), ('my message',)), 772 # Non-string values should roundtrip. 773 ((5,), (5,)), 774 ] 775 for cancel_args, expected_args in cases: 776 with self.subTest(cancel_args=cancel_args): 777 loop = asyncio.new_event_loop() 778 self.set_event_loop(loop) 779 780 async def sleep(): 781 await asyncio.sleep(10) 782 783 async def coro(): 784 task = self.new_task(loop, sleep()) 785 await asyncio.sleep(0) 786 task.cancel(*cancel_args) 787 done, pending = await asyncio.wait([task]) 788 task.exception() 789 790 task = self.new_task(loop, coro()) 791 with self.assertRaises(asyncio.CancelledError) as cm: 792 loop.run_until_complete(task) 793 exc = cm.exception 794 self.assertEqual(exc.args, expected_args) 795 796 actual = get_innermost_context(exc) 797 self.assertEqual(actual, 798 (asyncio.CancelledError, expected_args, 0)) 799 800 def test_cancellation_exception_context(self): 801 loop = asyncio.new_event_loop() 802 self.set_event_loop(loop) 803 fut = loop.create_future() 804 805 async def sleep(): 806 fut.set_result(None) 807 await asyncio.sleep(10) 808 809 async def coro(): 810 inner_task = self.new_task(loop, sleep()) 811 await fut 812 loop.call_soon(inner_task.cancel, 'msg') 813 try: 814 await inner_task 815 except asyncio.CancelledError as ex: 816 raise ValueError("cancelled") from ex 817 818 task = self.new_task(loop, coro()) 819 with self.assertRaises(ValueError) as cm: 820 loop.run_until_complete(task) 821 exc = cm.exception 822 self.assertEqual(exc.args, ('cancelled',)) 823 824 actual = get_innermost_context(exc) 825 self.assertEqual(actual, 826 (asyncio.CancelledError, ('msg',), 1)) 827 828 def test_cancel_with_message_before_starting_task(self): 829 loop = asyncio.new_event_loop() 830 self.set_event_loop(loop) 831 832 async def sleep(): 833 await asyncio.sleep(10) 834 835 async def coro(): 836 task = self.new_task(loop, sleep()) 837 # We deliberately leave out the sleep here. 838 task.cancel('my message') 839 done, pending = await asyncio.wait([task]) 840 task.exception() 841 842 task = self.new_task(loop, coro()) 843 with self.assertRaises(asyncio.CancelledError) as cm: 844 loop.run_until_complete(task) 845 exc = cm.exception 846 self.assertEqual(exc.args, ('my message',)) 847 848 actual = get_innermost_context(exc) 849 self.assertEqual(actual, 850 (asyncio.CancelledError, ('my message',), 0)) 851 852 def test_cancel_yield(self): 853 async def task(): 854 await asyncio.sleep(0) 855 await asyncio.sleep(0) 856 return 12 857 858 t = self.new_task(self.loop, task()) 859 test_utils.run_briefly(self.loop) # start coro 860 t.cancel() 861 self.assertRaises( 862 asyncio.CancelledError, self.loop.run_until_complete, t) 863 self.assertTrue(t.done()) 864 self.assertTrue(t.cancelled()) 865 self.assertFalse(t.cancel()) 866 867 def test_cancel_inner_future(self): 868 f = self.new_future(self.loop) 869 870 async def task(): 871 await f 872 return 12 873 874 t = self.new_task(self.loop, task()) 875 test_utils.run_briefly(self.loop) # start task 876 f.cancel() 877 with self.assertRaises(asyncio.CancelledError): 878 self.loop.run_until_complete(t) 879 self.assertTrue(f.cancelled()) 880 self.assertTrue(t.cancelled()) 881 882 def test_cancel_both_task_and_inner_future(self): 883 f = self.new_future(self.loop) 884 885 async def task(): 886 await f 887 return 12 888 889 t = self.new_task(self.loop, task()) 890 test_utils.run_briefly(self.loop) 891 892 f.cancel() 893 t.cancel() 894 895 with self.assertRaises(asyncio.CancelledError): 896 self.loop.run_until_complete(t) 897 898 self.assertTrue(t.done()) 899 self.assertTrue(f.cancelled()) 900 self.assertTrue(t.cancelled()) 901 902 def test_cancel_task_catching(self): 903 fut1 = self.new_future(self.loop) 904 fut2 = self.new_future(self.loop) 905 906 async def task(): 907 await fut1 908 try: 909 await fut2 910 except asyncio.CancelledError: 911 return 42 912 913 t = self.new_task(self.loop, task()) 914 test_utils.run_briefly(self.loop) 915 self.assertIs(t._fut_waiter, fut1) # White-box test. 916 fut1.set_result(None) 917 test_utils.run_briefly(self.loop) 918 self.assertIs(t._fut_waiter, fut2) # White-box test. 919 t.cancel() 920 self.assertTrue(fut2.cancelled()) 921 res = self.loop.run_until_complete(t) 922 self.assertEqual(res, 42) 923 self.assertFalse(t.cancelled()) 924 925 def test_cancel_task_ignoring(self): 926 fut1 = self.new_future(self.loop) 927 fut2 = self.new_future(self.loop) 928 fut3 = self.new_future(self.loop) 929 930 async def task(): 931 await fut1 932 try: 933 await fut2 934 except asyncio.CancelledError: 935 pass 936 res = await fut3 937 return res 938 939 t = self.new_task(self.loop, task()) 940 test_utils.run_briefly(self.loop) 941 self.assertIs(t._fut_waiter, fut1) # White-box test. 942 fut1.set_result(None) 943 test_utils.run_briefly(self.loop) 944 self.assertIs(t._fut_waiter, fut2) # White-box test. 945 t.cancel() 946 self.assertTrue(fut2.cancelled()) 947 test_utils.run_briefly(self.loop) 948 self.assertIs(t._fut_waiter, fut3) # White-box test. 949 fut3.set_result(42) 950 res = self.loop.run_until_complete(t) 951 self.assertEqual(res, 42) 952 self.assertFalse(fut3.cancelled()) 953 self.assertFalse(t.cancelled()) 954 955 def test_cancel_current_task(self): 956 loop = asyncio.new_event_loop() 957 self.set_event_loop(loop) 958 959 async def task(): 960 t.cancel() 961 self.assertTrue(t._must_cancel) # White-box test. 962 # The sleep should be cancelled immediately. 963 await asyncio.sleep(100) 964 return 12 965 966 t = self.new_task(loop, task()) 967 self.assertFalse(t.cancelled()) 968 self.assertRaises( 969 asyncio.CancelledError, loop.run_until_complete, t) 970 self.assertTrue(t.done()) 971 self.assertTrue(t.cancelled()) 972 self.assertFalse(t._must_cancel) # White-box test. 973 self.assertFalse(t.cancel()) 974 975 def test_cancel_at_end(self): 976 """coroutine end right after task is cancelled""" 977 loop = asyncio.new_event_loop() 978 self.set_event_loop(loop) 979 980 async def task(): 981 t.cancel() 982 self.assertTrue(t._must_cancel) # White-box test. 983 return 12 984 985 t = self.new_task(loop, task()) 986 self.assertFalse(t.cancelled()) 987 self.assertRaises( 988 asyncio.CancelledError, loop.run_until_complete, t) 989 self.assertTrue(t.done()) 990 self.assertTrue(t.cancelled()) 991 self.assertFalse(t._must_cancel) # White-box test. 992 self.assertFalse(t.cancel()) 993 994 def test_cancel_awaited_task(self): 995 # This tests for a relatively rare condition when 996 # a task cancellation is requested for a task which is not 997 # currently blocked, such as a task cancelling itself. 998 # In this situation we must ensure that whatever next future 999 # or task the cancelled task blocks on is cancelled correctly 1000 # as well. See also bpo-34872. 1001 loop = asyncio.new_event_loop() 1002 self.addCleanup(lambda: loop.close()) 1003 1004 task = nested_task = None 1005 fut = self.new_future(loop) 1006 1007 async def nested(): 1008 await fut 1009 1010 async def coro(): 1011 nonlocal nested_task 1012 # Create a sub-task and wait for it to run. 1013 nested_task = self.new_task(loop, nested()) 1014 await asyncio.sleep(0) 1015 1016 # Request the current task to be cancelled. 1017 task.cancel() 1018 # Block on the nested task, which should be immediately 1019 # cancelled. 1020 await nested_task 1021 1022 task = self.new_task(loop, coro()) 1023 with self.assertRaises(asyncio.CancelledError): 1024 loop.run_until_complete(task) 1025 1026 self.assertTrue(task.cancelled()) 1027 self.assertTrue(nested_task.cancelled()) 1028 self.assertTrue(fut.cancelled()) 1029 1030 def assert_text_contains(self, text, substr): 1031 if substr not in text: 1032 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 1033 1034 def test_cancel_traceback_for_future_result(self): 1035 # When calling Future.result() on a cancelled task, check that the 1036 # line of code that was interrupted is included in the traceback. 1037 loop = asyncio.new_event_loop() 1038 self.set_event_loop(loop) 1039 1040 async def nested(): 1041 # This will get cancelled immediately. 1042 await asyncio.sleep(10) 1043 1044 async def coro(): 1045 task = self.new_task(loop, nested()) 1046 await asyncio.sleep(0) 1047 task.cancel() 1048 await task # search target 1049 1050 task = self.new_task(loop, coro()) 1051 try: 1052 loop.run_until_complete(task) 1053 except asyncio.CancelledError: 1054 tb = traceback.format_exc() 1055 self.assert_text_contains(tb, "await asyncio.sleep(10)") 1056 # The intermediate await should also be included. 1057 self.assert_text_contains(tb, "await task # search target") 1058 else: 1059 self.fail('CancelledError did not occur') 1060 1061 def test_cancel_traceback_for_future_exception(self): 1062 # When calling Future.exception() on a cancelled task, check that the 1063 # line of code that was interrupted is included in the traceback. 1064 loop = asyncio.new_event_loop() 1065 self.set_event_loop(loop) 1066 1067 async def nested(): 1068 # This will get cancelled immediately. 1069 await asyncio.sleep(10) 1070 1071 async def coro(): 1072 task = self.new_task(loop, nested()) 1073 await asyncio.sleep(0) 1074 task.cancel() 1075 done, pending = await asyncio.wait([task]) 1076 task.exception() # search target 1077 1078 task = self.new_task(loop, coro()) 1079 try: 1080 loop.run_until_complete(task) 1081 except asyncio.CancelledError: 1082 tb = traceback.format_exc() 1083 self.assert_text_contains(tb, "await asyncio.sleep(10)") 1084 # The intermediate await should also be included. 1085 self.assert_text_contains(tb, 1086 "task.exception() # search target") 1087 else: 1088 self.fail('CancelledError did not occur') 1089 1090 def test_stop_while_run_in_complete(self): 1091 1092 def gen(): 1093 when = yield 1094 self.assertAlmostEqual(0.1, when) 1095 when = yield 0.1 1096 self.assertAlmostEqual(0.2, when) 1097 when = yield 0.1 1098 self.assertAlmostEqual(0.3, when) 1099 yield 0.1 1100 1101 loop = self.new_test_loop(gen) 1102 1103 x = 0 1104 1105 async def task(): 1106 nonlocal x 1107 while x < 10: 1108 await asyncio.sleep(0.1) 1109 x += 1 1110 if x == 2: 1111 loop.stop() 1112 1113 t = self.new_task(loop, task()) 1114 with self.assertRaises(RuntimeError) as cm: 1115 loop.run_until_complete(t) 1116 self.assertEqual(str(cm.exception), 1117 'Event loop stopped before Future completed.') 1118 self.assertFalse(t.done()) 1119 self.assertEqual(x, 2) 1120 self.assertAlmostEqual(0.3, loop.time()) 1121 1122 t.cancel() 1123 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 1124 1125 def test_log_traceback(self): 1126 async def coro(): 1127 pass 1128 1129 task = self.new_task(self.loop, coro()) 1130 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 1131 task._log_traceback = True 1132 self.loop.run_until_complete(task) 1133 1134 def test_wait(self): 1135 1136 def gen(): 1137 when = yield 1138 self.assertAlmostEqual(0.1, when) 1139 when = yield 0 1140 self.assertAlmostEqual(0.15, when) 1141 yield 0.15 1142 1143 loop = self.new_test_loop(gen) 1144 1145 a = self.new_task(loop, asyncio.sleep(0.1)) 1146 b = self.new_task(loop, asyncio.sleep(0.15)) 1147 1148 async def foo(): 1149 done, pending = await asyncio.wait([b, a]) 1150 self.assertEqual(done, set([a, b])) 1151 self.assertEqual(pending, set()) 1152 return 42 1153 1154 res = loop.run_until_complete(self.new_task(loop, foo())) 1155 self.assertEqual(res, 42) 1156 self.assertAlmostEqual(0.15, loop.time()) 1157 1158 # Doing it again should take no time and exercise a different path. 1159 res = loop.run_until_complete(self.new_task(loop, foo())) 1160 self.assertAlmostEqual(0.15, loop.time()) 1161 self.assertEqual(res, 42) 1162 1163 def test_wait_duplicate_coroutines(self): 1164 1165 async def coro(s): 1166 return s 1167 c = self.loop.create_task(coro('test')) 1168 task = self.new_task( 1169 self.loop, 1170 asyncio.wait([c, c, self.loop.create_task(coro('spam'))])) 1171 1172 done, pending = self.loop.run_until_complete(task) 1173 1174 self.assertFalse(pending) 1175 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1176 1177 def test_wait_errors(self): 1178 self.assertRaises( 1179 ValueError, self.loop.run_until_complete, 1180 asyncio.wait(set())) 1181 1182 # -1 is an invalid return_when value 1183 sleep_coro = asyncio.sleep(10.0) 1184 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1185 self.assertRaises(ValueError, 1186 self.loop.run_until_complete, wait_coro) 1187 1188 sleep_coro.close() 1189 1190 def test_wait_first_completed(self): 1191 1192 def gen(): 1193 when = yield 1194 self.assertAlmostEqual(10.0, when) 1195 when = yield 0 1196 self.assertAlmostEqual(0.1, when) 1197 yield 0.1 1198 1199 loop = self.new_test_loop(gen) 1200 1201 a = self.new_task(loop, asyncio.sleep(10.0)) 1202 b = self.new_task(loop, asyncio.sleep(0.1)) 1203 task = self.new_task( 1204 loop, 1205 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1206 1207 done, pending = loop.run_until_complete(task) 1208 self.assertEqual({b}, done) 1209 self.assertEqual({a}, pending) 1210 self.assertFalse(a.done()) 1211 self.assertTrue(b.done()) 1212 self.assertIsNone(b.result()) 1213 self.assertAlmostEqual(0.1, loop.time()) 1214 1215 # move forward to close generator 1216 loop.advance_time(10) 1217 loop.run_until_complete(asyncio.wait([a, b])) 1218 1219 def test_wait_really_done(self): 1220 # there is possibility that some tasks in the pending list 1221 # became done but their callbacks haven't all been called yet 1222 1223 async def coro1(): 1224 await asyncio.sleep(0) 1225 1226 async def coro2(): 1227 await asyncio.sleep(0) 1228 await asyncio.sleep(0) 1229 1230 a = self.new_task(self.loop, coro1()) 1231 b = self.new_task(self.loop, coro2()) 1232 task = self.new_task( 1233 self.loop, 1234 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1235 1236 done, pending = self.loop.run_until_complete(task) 1237 self.assertEqual({a, b}, done) 1238 self.assertTrue(a.done()) 1239 self.assertIsNone(a.result()) 1240 self.assertTrue(b.done()) 1241 self.assertIsNone(b.result()) 1242 1243 def test_wait_first_exception(self): 1244 1245 def gen(): 1246 when = yield 1247 self.assertAlmostEqual(10.0, when) 1248 yield 0 1249 1250 loop = self.new_test_loop(gen) 1251 1252 # first_exception, task already has exception 1253 a = self.new_task(loop, asyncio.sleep(10.0)) 1254 1255 async def exc(): 1256 raise ZeroDivisionError('err') 1257 1258 b = self.new_task(loop, exc()) 1259 task = self.new_task( 1260 loop, 1261 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1262 1263 done, pending = loop.run_until_complete(task) 1264 self.assertEqual({b}, done) 1265 self.assertEqual({a}, pending) 1266 self.assertAlmostEqual(0, loop.time()) 1267 1268 # move forward to close generator 1269 loop.advance_time(10) 1270 loop.run_until_complete(asyncio.wait([a, b])) 1271 1272 def test_wait_first_exception_in_wait(self): 1273 1274 def gen(): 1275 when = yield 1276 self.assertAlmostEqual(10.0, when) 1277 when = yield 0 1278 self.assertAlmostEqual(0.01, when) 1279 yield 0.01 1280 1281 loop = self.new_test_loop(gen) 1282 1283 # first_exception, exception during waiting 1284 a = self.new_task(loop, asyncio.sleep(10.0)) 1285 1286 async def exc(): 1287 await asyncio.sleep(0.01) 1288 raise ZeroDivisionError('err') 1289 1290 b = self.new_task(loop, exc()) 1291 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1292 1293 done, pending = loop.run_until_complete(task) 1294 self.assertEqual({b}, done) 1295 self.assertEqual({a}, pending) 1296 self.assertAlmostEqual(0.01, loop.time()) 1297 1298 # move forward to close generator 1299 loop.advance_time(10) 1300 loop.run_until_complete(asyncio.wait([a, b])) 1301 1302 def test_wait_with_exception(self): 1303 1304 def gen(): 1305 when = yield 1306 self.assertAlmostEqual(0.1, when) 1307 when = yield 0 1308 self.assertAlmostEqual(0.15, when) 1309 yield 0.15 1310 1311 loop = self.new_test_loop(gen) 1312 1313 a = self.new_task(loop, asyncio.sleep(0.1)) 1314 1315 async def sleeper(): 1316 await asyncio.sleep(0.15) 1317 raise ZeroDivisionError('really') 1318 1319 b = self.new_task(loop, sleeper()) 1320 1321 async def foo(): 1322 done, pending = await asyncio.wait([b, a]) 1323 self.assertEqual(len(done), 2) 1324 self.assertEqual(pending, set()) 1325 errors = set(f for f in done if f.exception() is not None) 1326 self.assertEqual(len(errors), 1) 1327 1328 loop.run_until_complete(self.new_task(loop, foo())) 1329 self.assertAlmostEqual(0.15, loop.time()) 1330 1331 loop.run_until_complete(self.new_task(loop, foo())) 1332 self.assertAlmostEqual(0.15, loop.time()) 1333 1334 def test_wait_with_timeout(self): 1335 1336 def gen(): 1337 when = yield 1338 self.assertAlmostEqual(0.1, when) 1339 when = yield 0 1340 self.assertAlmostEqual(0.15, when) 1341 when = yield 0 1342 self.assertAlmostEqual(0.11, when) 1343 yield 0.11 1344 1345 loop = self.new_test_loop(gen) 1346 1347 a = self.new_task(loop, asyncio.sleep(0.1)) 1348 b = self.new_task(loop, asyncio.sleep(0.15)) 1349 1350 async def foo(): 1351 done, pending = await asyncio.wait([b, a], timeout=0.11) 1352 self.assertEqual(done, set([a])) 1353 self.assertEqual(pending, set([b])) 1354 1355 loop.run_until_complete(self.new_task(loop, foo())) 1356 self.assertAlmostEqual(0.11, loop.time()) 1357 1358 # move forward to close generator 1359 loop.advance_time(10) 1360 loop.run_until_complete(asyncio.wait([a, b])) 1361 1362 def test_wait_concurrent_complete(self): 1363 1364 def gen(): 1365 when = yield 1366 self.assertAlmostEqual(0.1, when) 1367 when = yield 0 1368 self.assertAlmostEqual(0.15, when) 1369 when = yield 0 1370 self.assertAlmostEqual(0.1, when) 1371 yield 0.1 1372 1373 loop = self.new_test_loop(gen) 1374 1375 a = self.new_task(loop, asyncio.sleep(0.1)) 1376 b = self.new_task(loop, asyncio.sleep(0.15)) 1377 1378 done, pending = loop.run_until_complete( 1379 asyncio.wait([b, a], timeout=0.1)) 1380 1381 self.assertEqual(done, set([a])) 1382 self.assertEqual(pending, set([b])) 1383 self.assertAlmostEqual(0.1, loop.time()) 1384 1385 # move forward to close generator 1386 loop.advance_time(10) 1387 loop.run_until_complete(asyncio.wait([a, b])) 1388 1389 def test_wait_with_iterator_of_tasks(self): 1390 1391 def gen(): 1392 when = yield 1393 self.assertAlmostEqual(0.1, when) 1394 when = yield 0 1395 self.assertAlmostEqual(0.15, when) 1396 yield 0.15 1397 1398 loop = self.new_test_loop(gen) 1399 1400 a = self.new_task(loop, asyncio.sleep(0.1)) 1401 b = self.new_task(loop, asyncio.sleep(0.15)) 1402 1403 async def foo(): 1404 done, pending = await asyncio.wait(iter([b, a])) 1405 self.assertEqual(done, set([a, b])) 1406 self.assertEqual(pending, set()) 1407 return 42 1408 1409 res = loop.run_until_complete(self.new_task(loop, foo())) 1410 self.assertEqual(res, 42) 1411 self.assertAlmostEqual(0.15, loop.time()) 1412 1413 1414 def test_wait_generator(self): 1415 async def func(a): 1416 return a 1417 1418 loop = self.new_test_loop() 1419 1420 async def main(): 1421 tasks = (self.new_task(loop, func(i)) for i in range(10)) 1422 done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) 1423 self.assertEqual(len(done), 10) 1424 self.assertEqual(len(pending), 0) 1425 1426 loop.run_until_complete(main()) 1427 1428 1429 def test_as_completed(self): 1430 1431 def gen(): 1432 yield 0 1433 yield 0 1434 yield 0.01 1435 yield 0 1436 1437 async def sleeper(dt, x): 1438 nonlocal time_shifted 1439 await asyncio.sleep(dt) 1440 completed.add(x) 1441 if not time_shifted and 'a' in completed and 'b' in completed: 1442 time_shifted = True 1443 loop.advance_time(0.14) 1444 return x 1445 1446 async def try_iterator(awaitables): 1447 values = [] 1448 for f in asyncio.as_completed(awaitables): 1449 values.append(await f) 1450 return values 1451 1452 async def try_async_iterator(awaitables): 1453 values = [] 1454 async for f in asyncio.as_completed(awaitables): 1455 values.append(await f) 1456 return values 1457 1458 for foo in try_iterator, try_async_iterator: 1459 with self.subTest(method=foo.__name__): 1460 loop = self.new_test_loop(gen) 1461 # disable "slow callback" warning 1462 loop.slow_callback_duration = 1.0 1463 1464 completed = set() 1465 time_shifted = False 1466 1467 a = sleeper(0.01, 'a') 1468 b = sleeper(0.01, 'b') 1469 c = sleeper(0.15, 'c') 1470 1471 res = loop.run_until_complete(self.new_task(loop, foo([b, c, a]))) 1472 self.assertAlmostEqual(0.15, loop.time()) 1473 self.assertTrue('a' in res[:2]) 1474 self.assertTrue('b' in res[:2]) 1475 self.assertEqual(res[2], 'c') 1476 1477 def test_as_completed_same_tasks_in_as_out(self): 1478 # Ensures that asynchronously iterating as_completed's iterator 1479 # yields awaitables are the same awaitables that were passed in when 1480 # those awaitables are futures. 1481 async def try_async_iterator(awaitables): 1482 awaitables_out = set() 1483 async for out_aw in asyncio.as_completed(awaitables): 1484 awaitables_out.add(out_aw) 1485 return awaitables_out 1486 1487 async def coro(i): 1488 return i 1489 1490 with contextlib.closing(asyncio.new_event_loop()) as loop: 1491 # Coroutines shouldn't be yielded back as finished coroutines 1492 # can't be re-used. 1493 awaitables_in = frozenset( 1494 (coro(0), coro(1), coro(2), coro(3)) 1495 ) 1496 awaitables_out = loop.run_until_complete( 1497 try_async_iterator(awaitables_in) 1498 ) 1499 if awaitables_in - awaitables_out != awaitables_in: 1500 raise self.failureException('Got original coroutines ' 1501 'out of as_completed iterator.') 1502 1503 # Tasks should be yielded back. 1504 coro_obj_a = coro('a') 1505 task_b = loop.create_task(coro('b')) 1506 coro_obj_c = coro('c') 1507 task_d = loop.create_task(coro('d')) 1508 awaitables_in = frozenset( 1509 (coro_obj_a, task_b, coro_obj_c, task_d) 1510 ) 1511 awaitables_out = loop.run_until_complete( 1512 try_async_iterator(awaitables_in) 1513 ) 1514 if awaitables_in & awaitables_out != {task_b, task_d}: 1515 raise self.failureException('Only tasks should be yielded ' 1516 'from as_completed iterator ' 1517 'as-is.') 1518 1519 def test_as_completed_with_timeout(self): 1520 1521 def gen(): 1522 yield 1523 yield 0 1524 yield 0 1525 yield 0.1 1526 1527 async def try_iterator(): 1528 values = [] 1529 for f in asyncio.as_completed([a, b], timeout=0.12): 1530 if values: 1531 loop.advance_time(0.02) 1532 try: 1533 v = await f 1534 values.append((1, v)) 1535 except asyncio.TimeoutError as exc: 1536 values.append((2, exc)) 1537 return values 1538 1539 async def try_async_iterator(): 1540 values = [] 1541 try: 1542 async for f in asyncio.as_completed([a, b], timeout=0.12): 1543 v = await f 1544 values.append((1, v)) 1545 loop.advance_time(0.02) 1546 except asyncio.TimeoutError as exc: 1547 values.append((2, exc)) 1548 return values 1549 1550 for foo in try_iterator, try_async_iterator: 1551 with self.subTest(method=foo.__name__): 1552 loop = self.new_test_loop(gen) 1553 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1554 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1555 1556 res = loop.run_until_complete(self.new_task(loop, foo())) 1557 self.assertEqual(len(res), 2, res) 1558 self.assertEqual(res[0], (1, 'a')) 1559 self.assertEqual(res[1][0], 2) 1560 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1561 self.assertAlmostEqual(0.12, loop.time()) 1562 1563 # move forward to close generator 1564 loop.advance_time(10) 1565 loop.run_until_complete(asyncio.wait([a, b])) 1566 1567 def test_as_completed_with_unused_timeout(self): 1568 1569 def gen(): 1570 yield 1571 yield 0 1572 yield 0.01 1573 1574 async def try_iterator(): 1575 for f in asyncio.as_completed([a], timeout=1): 1576 v = await f 1577 self.assertEqual(v, 'a') 1578 1579 async def try_async_iterator(): 1580 async for f in asyncio.as_completed([a], timeout=1): 1581 v = await f 1582 self.assertEqual(v, 'a') 1583 1584 for foo in try_iterator, try_async_iterator: 1585 with self.subTest(method=foo.__name__): 1586 a = asyncio.sleep(0.01, 'a') 1587 loop = self.new_test_loop(gen) 1588 loop.run_until_complete(self.new_task(loop, foo())) 1589 loop.close() 1590 1591 def test_as_completed_resume_iterator(self): 1592 # Test that as_completed returns an iterator that can be resumed 1593 # the next time iteration is performed (i.e. if __iter__ is called 1594 # again) 1595 async def try_iterator(awaitables): 1596 iterations = 0 1597 iterator = asyncio.as_completed(awaitables) 1598 collected = [] 1599 for f in iterator: 1600 collected.append(await f) 1601 iterations += 1 1602 if iterations == 2: 1603 break 1604 self.assertEqual(len(collected), 2) 1605 1606 # Resume same iterator: 1607 for f in iterator: 1608 collected.append(await f) 1609 return collected 1610 1611 async def try_async_iterator(awaitables): 1612 iterations = 0 1613 iterator = asyncio.as_completed(awaitables) 1614 collected = [] 1615 async for f in iterator: 1616 collected.append(await f) 1617 iterations += 1 1618 if iterations == 2: 1619 break 1620 self.assertEqual(len(collected), 2) 1621 1622 # Resume same iterator: 1623 async for f in iterator: 1624 collected.append(await f) 1625 return collected 1626 1627 async def coro(i): 1628 return i 1629 1630 with contextlib.closing(asyncio.new_event_loop()) as loop: 1631 for foo in try_iterator, try_async_iterator: 1632 with self.subTest(method=foo.__name__): 1633 results = loop.run_until_complete( 1634 foo((coro(0), coro(1), coro(2), coro(3))) 1635 ) 1636 self.assertCountEqual(results, (0, 1, 2, 3)) 1637 1638 def test_as_completed_reverse_wait(self): 1639 # Tests the plain iterator style of as_completed iteration to 1640 # ensure that the first future awaited resolves to the first 1641 # completed awaitable from the set we passed in, even if it wasn't 1642 # the first future generated by as_completed. 1643 def gen(): 1644 yield 0 1645 yield 0.05 1646 yield 0 1647 1648 loop = self.new_test_loop(gen) 1649 1650 a = asyncio.sleep(0.05, 'a') 1651 b = asyncio.sleep(0.10, 'b') 1652 fs = {a, b} 1653 1654 async def test(): 1655 futs = list(asyncio.as_completed(fs)) 1656 self.assertEqual(len(futs), 2) 1657 1658 x = await futs[1] 1659 self.assertEqual(x, 'a') 1660 self.assertAlmostEqual(0.05, loop.time()) 1661 loop.advance_time(0.05) 1662 y = await futs[0] 1663 self.assertEqual(y, 'b') 1664 self.assertAlmostEqual(0.10, loop.time()) 1665 1666 loop.run_until_complete(test()) 1667 1668 def test_as_completed_concurrent(self): 1669 # Ensure that more than one future or coroutine yielded from 1670 # as_completed can be awaited concurrently. 1671 def gen(): 1672 when = yield 1673 self.assertAlmostEqual(0.05, when) 1674 when = yield 0 1675 self.assertAlmostEqual(0.05, when) 1676 yield 0.05 1677 1678 async def try_iterator(fs): 1679 return list(asyncio.as_completed(fs)) 1680 1681 async def try_async_iterator(fs): 1682 return [f async for f in asyncio.as_completed(fs)] 1683 1684 for runner in try_iterator, try_async_iterator: 1685 with self.subTest(method=runner.__name__): 1686 a = asyncio.sleep(0.05, 'a') 1687 b = asyncio.sleep(0.05, 'b') 1688 fs = {a, b} 1689 1690 async def test(): 1691 futs = await runner(fs) 1692 self.assertEqual(len(futs), 2) 1693 done, pending = await asyncio.wait( 1694 [asyncio.ensure_future(fut) for fut in futs] 1695 ) 1696 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1697 1698 loop = self.new_test_loop(gen) 1699 loop.run_until_complete(test()) 1700 1701 def test_as_completed_duplicate_coroutines(self): 1702 1703 async def coro(s): 1704 return s 1705 1706 async def try_iterator(): 1707 result = [] 1708 c = coro('ham') 1709 for f in asyncio.as_completed([c, c, coro('spam')]): 1710 result.append(await f) 1711 return result 1712 1713 async def try_async_iterator(): 1714 result = [] 1715 c = coro('ham') 1716 async for f in asyncio.as_completed([c, c, coro('spam')]): 1717 result.append(await f) 1718 return result 1719 1720 for runner in try_iterator, try_async_iterator: 1721 with self.subTest(method=runner.__name__): 1722 fut = self.new_task(self.loop, runner()) 1723 self.loop.run_until_complete(fut) 1724 result = fut.result() 1725 self.assertEqual(set(result), {'ham', 'spam'}) 1726 self.assertEqual(len(result), 2) 1727 1728 def test_as_completed_coroutine_without_loop(self): 1729 async def coro(): 1730 return 42 1731 1732 a = coro() 1733 self.addCleanup(a.close) 1734 1735 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 1736 futs = asyncio.as_completed([a]) 1737 list(futs) 1738 1739 def test_as_completed_coroutine_use_running_loop(self): 1740 loop = self.new_test_loop() 1741 1742 async def coro(): 1743 return 42 1744 1745 async def test(): 1746 futs = list(asyncio.as_completed([coro()])) 1747 self.assertEqual(len(futs), 1) 1748 self.assertEqual(await futs[0], 42) 1749 1750 loop.run_until_complete(test()) 1751 1752 def test_sleep(self): 1753 1754 def gen(): 1755 when = yield 1756 self.assertAlmostEqual(0.05, when) 1757 when = yield 0.05 1758 self.assertAlmostEqual(0.1, when) 1759 yield 0.05 1760 1761 loop = self.new_test_loop(gen) 1762 1763 async def sleeper(dt, arg): 1764 await asyncio.sleep(dt/2) 1765 res = await asyncio.sleep(dt/2, arg) 1766 return res 1767 1768 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1769 loop.run_until_complete(t) 1770 self.assertTrue(t.done()) 1771 self.assertEqual(t.result(), 'yeah') 1772 self.assertAlmostEqual(0.1, loop.time()) 1773 1774 def test_sleep_when_delay_is_nan(self): 1775 1776 def gen(): 1777 yield 1778 1779 loop = self.new_test_loop(gen) 1780 1781 async def sleeper(): 1782 await asyncio.sleep(float("nan")) 1783 1784 t = self.new_task(loop, sleeper()) 1785 1786 with self.assertRaises(ValueError): 1787 loop.run_until_complete(t) 1788 1789 def test_sleep_cancel(self): 1790 1791 def gen(): 1792 when = yield 1793 self.assertAlmostEqual(10.0, when) 1794 yield 0 1795 1796 loop = self.new_test_loop(gen) 1797 1798 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1799 1800 handle = None 1801 orig_call_later = loop.call_later 1802 1803 def call_later(delay, callback, *args): 1804 nonlocal handle 1805 handle = orig_call_later(delay, callback, *args) 1806 return handle 1807 1808 loop.call_later = call_later 1809 test_utils.run_briefly(loop) 1810 1811 self.assertFalse(handle._cancelled) 1812 1813 t.cancel() 1814 test_utils.run_briefly(loop) 1815 self.assertTrue(handle._cancelled) 1816 1817 def test_task_cancel_sleeping_task(self): 1818 1819 def gen(): 1820 when = yield 1821 self.assertAlmostEqual(0.1, when) 1822 when = yield 0 1823 self.assertAlmostEqual(5000, when) 1824 yield 0.1 1825 1826 loop = self.new_test_loop(gen) 1827 1828 async def sleep(dt): 1829 await asyncio.sleep(dt) 1830 1831 async def doit(): 1832 sleeper = self.new_task(loop, sleep(5000)) 1833 loop.call_later(0.1, sleeper.cancel) 1834 try: 1835 await sleeper 1836 except asyncio.CancelledError: 1837 return 'cancelled' 1838 else: 1839 return 'slept in' 1840 1841 doer = doit() 1842 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1843 self.assertAlmostEqual(0.1, loop.time()) 1844 1845 def test_task_cancel_waiter_future(self): 1846 fut = self.new_future(self.loop) 1847 1848 async def coro(): 1849 await fut 1850 1851 task = self.new_task(self.loop, coro()) 1852 test_utils.run_briefly(self.loop) 1853 self.assertIs(task._fut_waiter, fut) 1854 1855 task.cancel() 1856 test_utils.run_briefly(self.loop) 1857 self.assertRaises( 1858 asyncio.CancelledError, self.loop.run_until_complete, task) 1859 self.assertIsNone(task._fut_waiter) 1860 self.assertTrue(fut.cancelled()) 1861 1862 def test_task_set_methods(self): 1863 async def notmuch(): 1864 return 'ko' 1865 1866 gen = notmuch() 1867 task = self.new_task(self.loop, gen) 1868 1869 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1870 task.set_result('ok') 1871 1872 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1873 task.set_exception(ValueError()) 1874 1875 self.assertEqual( 1876 self.loop.run_until_complete(task), 1877 'ko') 1878 1879 def test_step_result_future(self): 1880 # If coroutine returns future, task waits on this future. 1881 1882 class Fut(asyncio.Future): 1883 def __init__(self, *args, **kwds): 1884 self.cb_added = False 1885 super().__init__(*args, **kwds) 1886 1887 def add_done_callback(self, *args, **kwargs): 1888 self.cb_added = True 1889 super().add_done_callback(*args, **kwargs) 1890 1891 fut = Fut(loop=self.loop) 1892 result = None 1893 1894 async def wait_for_future(): 1895 nonlocal result 1896 result = await fut 1897 1898 t = self.new_task(self.loop, wait_for_future()) 1899 test_utils.run_briefly(self.loop) 1900 self.assertTrue(fut.cb_added) 1901 1902 res = object() 1903 fut.set_result(res) 1904 test_utils.run_briefly(self.loop) 1905 self.assertIs(res, result) 1906 self.assertTrue(t.done()) 1907 self.assertIsNone(t.result()) 1908 1909 def test_baseexception_during_cancel(self): 1910 1911 def gen(): 1912 when = yield 1913 self.assertAlmostEqual(10.0, when) 1914 yield 0 1915 1916 loop = self.new_test_loop(gen) 1917 1918 async def sleeper(): 1919 await asyncio.sleep(10) 1920 1921 base_exc = SystemExit() 1922 1923 async def notmutch(): 1924 try: 1925 await sleeper() 1926 except asyncio.CancelledError: 1927 raise base_exc 1928 1929 task = self.new_task(loop, notmutch()) 1930 test_utils.run_briefly(loop) 1931 1932 task.cancel() 1933 self.assertFalse(task.done()) 1934 1935 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1936 1937 self.assertTrue(task.done()) 1938 self.assertFalse(task.cancelled()) 1939 self.assertIs(task.exception(), base_exc) 1940 1941 def test_iscoroutinefunction(self): 1942 def fn(): 1943 pass 1944 1945 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1946 1947 def fn1(): 1948 yield 1949 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1950 1951 async def fn2(): 1952 pass 1953 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1954 1955 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1956 self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock())) 1957 1958 def test_coroutine_non_gen_function(self): 1959 async def func(): 1960 return 'test' 1961 1962 self.assertTrue(asyncio.iscoroutinefunction(func)) 1963 1964 coro = func() 1965 self.assertTrue(asyncio.iscoroutine(coro)) 1966 1967 res = self.loop.run_until_complete(coro) 1968 self.assertEqual(res, 'test') 1969 1970 def test_coroutine_non_gen_function_return_future(self): 1971 fut = self.new_future(self.loop) 1972 1973 async def func(): 1974 return fut 1975 1976 async def coro(): 1977 fut.set_result('test') 1978 1979 t1 = self.new_task(self.loop, func()) 1980 t2 = self.new_task(self.loop, coro()) 1981 res = self.loop.run_until_complete(t1) 1982 self.assertEqual(res, fut) 1983 self.assertIsNone(t2.result()) 1984 1985 def test_current_task(self): 1986 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1987 1988 async def coro(loop): 1989 self.assertIs(asyncio.current_task(), task) 1990 1991 self.assertIs(asyncio.current_task(None), task) 1992 self.assertIs(asyncio.current_task(), task) 1993 1994 task = self.new_task(self.loop, coro(self.loop)) 1995 self.loop.run_until_complete(task) 1996 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1997 1998 def test_current_task_with_interleaving_tasks(self): 1999 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2000 2001 fut1 = self.new_future(self.loop) 2002 fut2 = self.new_future(self.loop) 2003 2004 async def coro1(loop): 2005 self.assertTrue(asyncio.current_task() is task1) 2006 await fut1 2007 self.assertTrue(asyncio.current_task() is task1) 2008 fut2.set_result(True) 2009 2010 async def coro2(loop): 2011 self.assertTrue(asyncio.current_task() is task2) 2012 fut1.set_result(True) 2013 await fut2 2014 self.assertTrue(asyncio.current_task() is task2) 2015 2016 task1 = self.new_task(self.loop, coro1(self.loop)) 2017 task2 = self.new_task(self.loop, coro2(self.loop)) 2018 2019 self.loop.run_until_complete(asyncio.wait((task1, task2))) 2020 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2021 2022 # Some thorough tests for cancellation propagation through 2023 # coroutines, tasks and wait(). 2024 2025 def test_yield_future_passes_cancel(self): 2026 # Cancelling outer() cancels inner() cancels waiter. 2027 proof = 0 2028 waiter = self.new_future(self.loop) 2029 2030 async def inner(): 2031 nonlocal proof 2032 try: 2033 await waiter 2034 except asyncio.CancelledError: 2035 proof += 1 2036 raise 2037 else: 2038 self.fail('got past sleep() in inner()') 2039 2040 async def outer(): 2041 nonlocal proof 2042 try: 2043 await inner() 2044 except asyncio.CancelledError: 2045 proof += 100 # Expect this path. 2046 else: 2047 proof += 10 2048 2049 f = asyncio.ensure_future(outer(), loop=self.loop) 2050 test_utils.run_briefly(self.loop) 2051 f.cancel() 2052 self.loop.run_until_complete(f) 2053 self.assertEqual(proof, 101) 2054 self.assertTrue(waiter.cancelled()) 2055 2056 def test_yield_wait_does_not_shield_cancel(self): 2057 # Cancelling outer() makes wait() return early, leaves inner() 2058 # running. 2059 proof = 0 2060 waiter = self.new_future(self.loop) 2061 2062 async def inner(): 2063 nonlocal proof 2064 await waiter 2065 proof += 1 2066 2067 async def outer(): 2068 nonlocal proof 2069 with self.assertWarns(DeprecationWarning): 2070 d, p = await asyncio.wait([asyncio.create_task(inner())]) 2071 proof += 100 2072 2073 f = asyncio.ensure_future(outer(), loop=self.loop) 2074 test_utils.run_briefly(self.loop) 2075 f.cancel() 2076 self.assertRaises( 2077 asyncio.CancelledError, self.loop.run_until_complete, f) 2078 waiter.set_result(None) 2079 test_utils.run_briefly(self.loop) 2080 self.assertEqual(proof, 1) 2081 2082 def test_shield_result(self): 2083 inner = self.new_future(self.loop) 2084 outer = asyncio.shield(inner) 2085 inner.set_result(42) 2086 res = self.loop.run_until_complete(outer) 2087 self.assertEqual(res, 42) 2088 2089 def test_shield_exception(self): 2090 inner = self.new_future(self.loop) 2091 outer = asyncio.shield(inner) 2092 test_utils.run_briefly(self.loop) 2093 exc = RuntimeError('expected') 2094 inner.set_exception(exc) 2095 test_utils.run_briefly(self.loop) 2096 self.assertIs(outer.exception(), exc) 2097 2098 def test_shield_cancel_inner(self): 2099 inner = self.new_future(self.loop) 2100 outer = asyncio.shield(inner) 2101 test_utils.run_briefly(self.loop) 2102 inner.cancel() 2103 test_utils.run_briefly(self.loop) 2104 self.assertTrue(outer.cancelled()) 2105 2106 def test_shield_cancel_outer(self): 2107 inner = self.new_future(self.loop) 2108 outer = asyncio.shield(inner) 2109 test_utils.run_briefly(self.loop) 2110 outer.cancel() 2111 test_utils.run_briefly(self.loop) 2112 self.assertTrue(outer.cancelled()) 2113 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 2114 2115 def test_shield_shortcut(self): 2116 fut = self.new_future(self.loop) 2117 fut.set_result(42) 2118 res = self.loop.run_until_complete(asyncio.shield(fut)) 2119 self.assertEqual(res, 42) 2120 2121 def test_shield_effect(self): 2122 # Cancelling outer() does not affect inner(). 2123 proof = 0 2124 waiter = self.new_future(self.loop) 2125 2126 async def inner(): 2127 nonlocal proof 2128 await waiter 2129 proof += 1 2130 2131 async def outer(): 2132 nonlocal proof 2133 await asyncio.shield(inner()) 2134 proof += 100 2135 2136 f = asyncio.ensure_future(outer(), loop=self.loop) 2137 test_utils.run_briefly(self.loop) 2138 f.cancel() 2139 with self.assertRaises(asyncio.CancelledError): 2140 self.loop.run_until_complete(f) 2141 waiter.set_result(None) 2142 test_utils.run_briefly(self.loop) 2143 self.assertEqual(proof, 1) 2144 2145 def test_shield_gather(self): 2146 child1 = self.new_future(self.loop) 2147 child2 = self.new_future(self.loop) 2148 parent = asyncio.gather(child1, child2) 2149 outer = asyncio.shield(parent) 2150 test_utils.run_briefly(self.loop) 2151 outer.cancel() 2152 test_utils.run_briefly(self.loop) 2153 self.assertTrue(outer.cancelled()) 2154 child1.set_result(1) 2155 child2.set_result(2) 2156 test_utils.run_briefly(self.loop) 2157 self.assertEqual(parent.result(), [1, 2]) 2158 2159 def test_gather_shield(self): 2160 child1 = self.new_future(self.loop) 2161 child2 = self.new_future(self.loop) 2162 inner1 = asyncio.shield(child1) 2163 inner2 = asyncio.shield(child2) 2164 parent = asyncio.gather(inner1, inner2) 2165 test_utils.run_briefly(self.loop) 2166 parent.cancel() 2167 # This should cancel inner1 and inner2 but bot child1 and child2. 2168 test_utils.run_briefly(self.loop) 2169 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 2170 self.assertTrue(inner1.cancelled()) 2171 self.assertTrue(inner2.cancelled()) 2172 child1.set_result(1) 2173 child2.set_result(2) 2174 test_utils.run_briefly(self.loop) 2175 2176 def test_shield_coroutine_without_loop(self): 2177 async def coro(): 2178 return 42 2179 2180 inner = coro() 2181 self.addCleanup(inner.close) 2182 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 2183 asyncio.shield(inner) 2184 2185 def test_shield_coroutine_use_running_loop(self): 2186 async def coro(): 2187 return 42 2188 2189 async def test(): 2190 return asyncio.shield(coro()) 2191 outer = self.loop.run_until_complete(test()) 2192 self.assertEqual(outer._loop, self.loop) 2193 res = self.loop.run_until_complete(outer) 2194 self.assertEqual(res, 42) 2195 2196 def test_shield_coroutine_use_global_loop(self): 2197 # Deprecated in 3.10, undeprecated in 3.12 2198 async def coro(): 2199 return 42 2200 2201 asyncio.set_event_loop(self.loop) 2202 self.addCleanup(asyncio.set_event_loop, None) 2203 outer = asyncio.shield(coro()) 2204 self.assertEqual(outer._loop, self.loop) 2205 res = self.loop.run_until_complete(outer) 2206 self.assertEqual(res, 42) 2207 2208 def test_as_completed_invalid_args(self): 2209 # as_completed() expects a list of futures, not a future instance 2210 # TypeError should be raised either on iterator construction or first 2211 # iteration 2212 2213 # Plain iterator 2214 fut = self.new_future(self.loop) 2215 with self.assertRaises(TypeError): 2216 iterator = asyncio.as_completed(fut) 2217 next(iterator) 2218 coro = coroutine_function() 2219 with self.assertRaises(TypeError): 2220 iterator = asyncio.as_completed(coro) 2221 next(iterator) 2222 coro.close() 2223 2224 # Async iterator 2225 async def try_async_iterator(aw): 2226 async for f in asyncio.as_completed(aw): 2227 break 2228 2229 fut = self.new_future(self.loop) 2230 with self.assertRaises(TypeError): 2231 self.loop.run_until_complete(try_async_iterator(fut)) 2232 coro = coroutine_function() 2233 with self.assertRaises(TypeError): 2234 self.loop.run_until_complete(try_async_iterator(coro)) 2235 coro.close() 2236 2237 def test_wait_invalid_args(self): 2238 fut = self.new_future(self.loop) 2239 2240 # wait() expects a list of futures, not a future instance 2241 self.assertRaises(TypeError, self.loop.run_until_complete, 2242 asyncio.wait(fut)) 2243 coro = coroutine_function() 2244 self.assertRaises(TypeError, self.loop.run_until_complete, 2245 asyncio.wait(coro)) 2246 coro.close() 2247 2248 # wait() expects at least a future 2249 self.assertRaises(ValueError, self.loop.run_until_complete, 2250 asyncio.wait([])) 2251 2252 def test_log_destroyed_pending_task(self): 2253 Task = self.__class__.Task 2254 2255 async def kill_me(loop): 2256 future = self.new_future(loop) 2257 await future 2258 # at this point, the only reference to kill_me() task is 2259 # the Task._wakeup() method in future._callbacks 2260 raise Exception("code never reached") 2261 2262 mock_handler = mock.Mock() 2263 self.loop.set_debug(True) 2264 self.loop.set_exception_handler(mock_handler) 2265 2266 # schedule the task 2267 coro = kill_me(self.loop) 2268 task = asyncio.ensure_future(coro, loop=self.loop) 2269 2270 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2271 2272 asyncio.set_event_loop(None) 2273 2274 # execute the task so it waits for future 2275 self.loop._run_once() 2276 self.assertEqual(len(self.loop._ready), 0) 2277 2278 coro = None 2279 source_traceback = task._source_traceback 2280 task = None 2281 2282 # no more reference to kill_me() task: the task is destroyed by the GC 2283 support.gc_collect() 2284 2285 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2286 2287 mock_handler.assert_called_with(self.loop, { 2288 'message': 'Task was destroyed but it is pending!', 2289 'task': mock.ANY, 2290 'source_traceback': source_traceback, 2291 }) 2292 mock_handler.reset_mock() 2293 2294 @mock.patch('asyncio.base_events.logger') 2295 def test_tb_logger_not_called_after_cancel(self, m_log): 2296 loop = asyncio.new_event_loop() 2297 self.set_event_loop(loop) 2298 2299 async def coro(): 2300 raise TypeError 2301 2302 async def runner(): 2303 task = self.new_task(loop, coro()) 2304 await asyncio.sleep(0.05) 2305 task.cancel() 2306 task = None 2307 2308 loop.run_until_complete(runner()) 2309 self.assertFalse(m_log.error.called) 2310 2311 def test_task_source_traceback(self): 2312 self.loop.set_debug(True) 2313 2314 task = self.new_task(self.loop, coroutine_function()) 2315 lineno = sys._getframe().f_lineno - 1 2316 self.assertIsInstance(task._source_traceback, list) 2317 self.assertEqual(task._source_traceback[-2][:3], 2318 (__file__, 2319 lineno, 2320 'test_task_source_traceback')) 2321 self.loop.run_until_complete(task) 2322 2323 def test_cancel_gather_1(self): 2324 """Ensure that a gathering future refuses to be cancelled once all 2325 children are done""" 2326 loop = asyncio.new_event_loop() 2327 self.addCleanup(loop.close) 2328 2329 fut = self.new_future(loop) 2330 async def create(): 2331 # The indirection fut->child_coro is needed since otherwise the 2332 # gathering task is done at the same time as the child future 2333 async def child_coro(): 2334 return await fut 2335 gather_future = asyncio.gather(child_coro()) 2336 return asyncio.ensure_future(gather_future) 2337 gather_task = loop.run_until_complete(create()) 2338 2339 cancel_result = None 2340 def cancelling_callback(_): 2341 nonlocal cancel_result 2342 cancel_result = gather_task.cancel() 2343 fut.add_done_callback(cancelling_callback) 2344 2345 fut.set_result(42) # calls the cancelling_callback after fut is done() 2346 2347 # At this point the task should complete. 2348 loop.run_until_complete(gather_task) 2349 2350 # Python issue #26923: asyncio.gather drops cancellation 2351 self.assertEqual(cancel_result, False) 2352 self.assertFalse(gather_task.cancelled()) 2353 self.assertEqual(gather_task.result(), [42]) 2354 2355 def test_cancel_gather_2(self): 2356 cases = [ 2357 ((), ()), 2358 ((None,), ()), 2359 (('my message',), ('my message',)), 2360 # Non-string values should roundtrip. 2361 ((5,), (5,)), 2362 ] 2363 for cancel_args, expected_args in cases: 2364 with self.subTest(cancel_args=cancel_args): 2365 loop = asyncio.new_event_loop() 2366 self.addCleanup(loop.close) 2367 2368 async def test(): 2369 time = 0 2370 while True: 2371 time += 0.05 2372 await asyncio.gather(asyncio.sleep(0.05), 2373 return_exceptions=True) 2374 if time > 1: 2375 return 2376 2377 async def main(): 2378 qwe = self.new_task(loop, test()) 2379 await asyncio.sleep(0.2) 2380 qwe.cancel(*cancel_args) 2381 await qwe 2382 2383 try: 2384 loop.run_until_complete(main()) 2385 except asyncio.CancelledError as exc: 2386 self.assertEqual(exc.args, expected_args) 2387 actual = get_innermost_context(exc) 2388 self.assertEqual( 2389 actual, 2390 (asyncio.CancelledError, expected_args, 0), 2391 ) 2392 else: 2393 self.fail( 2394 'gather() does not propagate CancelledError ' 2395 'raised by inner task to the gather() caller.' 2396 ) 2397 2398 def test_exception_traceback(self): 2399 # See http://bugs.python.org/issue28843 2400 2401 async def foo(): 2402 1 / 0 2403 2404 async def main(): 2405 task = self.new_task(self.loop, foo()) 2406 await asyncio.sleep(0) # skip one loop iteration 2407 self.assertIsNotNone(task.exception().__traceback__) 2408 2409 self.loop.run_until_complete(main()) 2410 2411 @mock.patch('asyncio.base_events.logger') 2412 def test_error_in_call_soon(self, m_log): 2413 def call_soon(callback, *args, **kwargs): 2414 raise ValueError 2415 self.loop.call_soon = call_soon 2416 2417 async def coro(): 2418 pass 2419 2420 self.assertFalse(m_log.error.called) 2421 2422 with self.assertRaises(ValueError): 2423 gen = coro() 2424 try: 2425 self.new_task(self.loop, gen) 2426 finally: 2427 gen.close() 2428 gc.collect() # For PyPy or other GCs. 2429 2430 self.assertTrue(m_log.error.called) 2431 message = m_log.error.call_args[0][0] 2432 self.assertIn('Task was destroyed but it is pending', message) 2433 2434 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2435 2436 def test_create_task_with_noncoroutine(self): 2437 with self.assertRaisesRegex(TypeError, 2438 "a coroutine was expected, got 123"): 2439 self.new_task(self.loop, 123) 2440 2441 # test it for the second time to ensure that caching 2442 # in asyncio.iscoroutine() doesn't break things. 2443 with self.assertRaisesRegex(TypeError, 2444 "a coroutine was expected, got 123"): 2445 self.new_task(self.loop, 123) 2446 2447 def test_create_task_with_async_function(self): 2448 2449 async def coro(): 2450 pass 2451 2452 task = self.new_task(self.loop, coro()) 2453 self.assertIsInstance(task, self.Task) 2454 self.loop.run_until_complete(task) 2455 2456 # test it for the second time to ensure that caching 2457 # in asyncio.iscoroutine() doesn't break things. 2458 task = self.new_task(self.loop, coro()) 2459 self.assertIsInstance(task, self.Task) 2460 self.loop.run_until_complete(task) 2461 2462 def test_create_task_with_asynclike_function(self): 2463 task = self.new_task(self.loop, CoroLikeObject()) 2464 self.assertIsInstance(task, self.Task) 2465 self.assertEqual(self.loop.run_until_complete(task), 42) 2466 2467 # test it for the second time to ensure that caching 2468 # in asyncio.iscoroutine() doesn't break things. 2469 task = self.new_task(self.loop, CoroLikeObject()) 2470 self.assertIsInstance(task, self.Task) 2471 self.assertEqual(self.loop.run_until_complete(task), 42) 2472 2473 def test_bare_create_task(self): 2474 2475 async def inner(): 2476 return 1 2477 2478 async def coro(): 2479 task = asyncio.create_task(inner()) 2480 self.assertIsInstance(task, self.Task) 2481 ret = await task 2482 self.assertEqual(1, ret) 2483 2484 self.loop.run_until_complete(coro()) 2485 2486 def test_bare_create_named_task(self): 2487 2488 async def coro_noop(): 2489 pass 2490 2491 async def coro(): 2492 task = asyncio.create_task(coro_noop(), name='No-op') 2493 self.assertEqual(task.get_name(), 'No-op') 2494 await task 2495 2496 self.loop.run_until_complete(coro()) 2497 2498 def test_context_1(self): 2499 cvar = contextvars.ContextVar('cvar', default='nope') 2500 2501 async def sub(): 2502 await asyncio.sleep(0.01) 2503 self.assertEqual(cvar.get(), 'nope') 2504 cvar.set('something else') 2505 2506 async def main(): 2507 self.assertEqual(cvar.get(), 'nope') 2508 subtask = self.new_task(loop, sub()) 2509 cvar.set('yes') 2510 self.assertEqual(cvar.get(), 'yes') 2511 await subtask 2512 self.assertEqual(cvar.get(), 'yes') 2513 2514 loop = asyncio.new_event_loop() 2515 try: 2516 task = self.new_task(loop, main()) 2517 loop.run_until_complete(task) 2518 finally: 2519 loop.close() 2520 2521 def test_context_2(self): 2522 cvar = contextvars.ContextVar('cvar', default='nope') 2523 2524 async def main(): 2525 def fut_on_done(fut): 2526 # This change must not pollute the context 2527 # of the "main()" task. 2528 cvar.set('something else') 2529 2530 self.assertEqual(cvar.get(), 'nope') 2531 2532 for j in range(2): 2533 fut = self.new_future(loop) 2534 fut.add_done_callback(fut_on_done) 2535 cvar.set(f'yes{j}') 2536 loop.call_soon(fut.set_result, None) 2537 await fut 2538 self.assertEqual(cvar.get(), f'yes{j}') 2539 2540 for i in range(3): 2541 # Test that task passed its context to add_done_callback: 2542 cvar.set(f'yes{i}-{j}') 2543 await asyncio.sleep(0.001) 2544 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2545 2546 loop = asyncio.new_event_loop() 2547 try: 2548 task = self.new_task(loop, main()) 2549 loop.run_until_complete(task) 2550 finally: 2551 loop.close() 2552 2553 self.assertEqual(cvar.get(), 'nope') 2554 2555 def test_context_3(self): 2556 # Run 100 Tasks in parallel, each modifying cvar. 2557 2558 cvar = contextvars.ContextVar('cvar', default=-1) 2559 2560 async def sub(num): 2561 for i in range(10): 2562 cvar.set(num + i) 2563 await asyncio.sleep(random.uniform(0.001, 0.05)) 2564 self.assertEqual(cvar.get(), num + i) 2565 2566 async def main(): 2567 tasks = [] 2568 for i in range(100): 2569 task = loop.create_task(sub(random.randint(0, 10))) 2570 tasks.append(task) 2571 2572 await asyncio.gather(*tasks) 2573 2574 loop = asyncio.new_event_loop() 2575 try: 2576 loop.run_until_complete(main()) 2577 finally: 2578 loop.close() 2579 2580 self.assertEqual(cvar.get(), -1) 2581 2582 def test_context_4(self): 2583 cvar = contextvars.ContextVar('cvar') 2584 2585 async def coro(val): 2586 await asyncio.sleep(0) 2587 cvar.set(val) 2588 2589 async def main(): 2590 ret = [] 2591 ctx = contextvars.copy_context() 2592 ret.append(ctx.get(cvar)) 2593 t1 = self.new_task(loop, coro(1), context=ctx) 2594 await t1 2595 ret.append(ctx.get(cvar)) 2596 t2 = self.new_task(loop, coro(2), context=ctx) 2597 await t2 2598 ret.append(ctx.get(cvar)) 2599 return ret 2600 2601 loop = asyncio.new_event_loop() 2602 try: 2603 task = self.new_task(loop, main()) 2604 ret = loop.run_until_complete(task) 2605 finally: 2606 loop.close() 2607 2608 self.assertEqual([None, 1, 2], ret) 2609 2610 def test_context_5(self): 2611 cvar = contextvars.ContextVar('cvar') 2612 2613 async def coro(val): 2614 await asyncio.sleep(0) 2615 cvar.set(val) 2616 2617 async def main(): 2618 ret = [] 2619 ctx = contextvars.copy_context() 2620 ret.append(ctx.get(cvar)) 2621 t1 = asyncio.create_task(coro(1), context=ctx) 2622 await t1 2623 ret.append(ctx.get(cvar)) 2624 t2 = asyncio.create_task(coro(2), context=ctx) 2625 await t2 2626 ret.append(ctx.get(cvar)) 2627 return ret 2628 2629 loop = asyncio.new_event_loop() 2630 try: 2631 task = self.new_task(loop, main()) 2632 ret = loop.run_until_complete(task) 2633 finally: 2634 loop.close() 2635 2636 self.assertEqual([None, 1, 2], ret) 2637 2638 def test_context_6(self): 2639 cvar = contextvars.ContextVar('cvar') 2640 2641 async def coro(val): 2642 await asyncio.sleep(0) 2643 cvar.set(val) 2644 2645 async def main(): 2646 ret = [] 2647 ctx = contextvars.copy_context() 2648 ret.append(ctx.get(cvar)) 2649 t1 = loop.create_task(coro(1), context=ctx) 2650 await t1 2651 ret.append(ctx.get(cvar)) 2652 t2 = loop.create_task(coro(2), context=ctx) 2653 await t2 2654 ret.append(ctx.get(cvar)) 2655 return ret 2656 2657 loop = asyncio.new_event_loop() 2658 try: 2659 task = loop.create_task(main()) 2660 ret = loop.run_until_complete(task) 2661 finally: 2662 loop.close() 2663 2664 self.assertEqual([None, 1, 2], ret) 2665 2666 def test_get_coro(self): 2667 loop = asyncio.new_event_loop() 2668 coro = coroutine_function() 2669 try: 2670 task = self.new_task(loop, coro) 2671 loop.run_until_complete(task) 2672 self.assertIs(task.get_coro(), coro) 2673 finally: 2674 loop.close() 2675 2676 def test_get_context(self): 2677 loop = asyncio.new_event_loop() 2678 coro = coroutine_function() 2679 context = contextvars.copy_context() 2680 try: 2681 task = self.new_task(loop, coro, context=context) 2682 loop.run_until_complete(task) 2683 self.assertIs(task.get_context(), context) 2684 finally: 2685 loop.close() 2686 2687 def test_proper_refcounts(self): 2688 # see: https://github.com/python/cpython/issues/126083 2689 class Break: 2690 def __str__(self): 2691 raise RuntimeError("break") 2692 2693 obj = object() 2694 initial_refcount = sys.getrefcount(obj) 2695 2696 coro = coroutine_function() 2697 loop = asyncio.new_event_loop() 2698 task = asyncio.Task.__new__(asyncio.Task) 2699 2700 for _ in range(5): 2701 with self.assertRaisesRegex(RuntimeError, 'break'): 2702 task.__init__(coro, loop=loop, context=obj, name=Break()) 2703 2704 coro.close() 2705 del task 2706 2707 self.assertEqual(sys.getrefcount(obj), initial_refcount) 2708 2709 2710def add_subclass_tests(cls): 2711 BaseTask = cls.Task 2712 BaseFuture = cls.Future 2713 2714 if BaseTask is None or BaseFuture is None: 2715 return cls 2716 2717 class CommonFuture: 2718 def __init__(self, *args, **kwargs): 2719 self.calls = collections.defaultdict(lambda: 0) 2720 super().__init__(*args, **kwargs) 2721 2722 def add_done_callback(self, *args, **kwargs): 2723 self.calls['add_done_callback'] += 1 2724 return super().add_done_callback(*args, **kwargs) 2725 2726 class Task(CommonFuture, BaseTask): 2727 pass 2728 2729 class Future(CommonFuture, BaseFuture): 2730 pass 2731 2732 def test_subclasses_ctask_cfuture(self): 2733 fut = self.Future(loop=self.loop) 2734 2735 async def func(): 2736 self.loop.call_soon(lambda: fut.set_result('spam')) 2737 return await fut 2738 2739 task = self.Task(func(), loop=self.loop) 2740 2741 result = self.loop.run_until_complete(task) 2742 2743 self.assertEqual(result, 'spam') 2744 2745 self.assertEqual( 2746 dict(task.calls), 2747 {'add_done_callback': 1}) 2748 2749 self.assertEqual( 2750 dict(fut.calls), 2751 {'add_done_callback': 1}) 2752 2753 # Add patched Task & Future back to the test case 2754 cls.Task = Task 2755 cls.Future = Future 2756 2757 # Add an extra unit-test 2758 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2759 2760 # Disable the "test_task_source_traceback" test 2761 # (the test is hardcoded for a particular call stack, which 2762 # is slightly different for Task subclasses) 2763 cls.test_task_source_traceback = None 2764 2765 return cls 2766 2767 2768class SetMethodsTest: 2769 2770 def test_set_result_causes_invalid_state(self): 2771 Future = type(self).Future 2772 self.loop.call_exception_handler = exc_handler = mock.Mock() 2773 2774 async def foo(): 2775 await asyncio.sleep(0.1) 2776 return 10 2777 2778 coro = foo() 2779 task = self.new_task(self.loop, coro) 2780 Future.set_result(task, 'spam') 2781 2782 self.assertEqual( 2783 self.loop.run_until_complete(task), 2784 'spam') 2785 2786 exc_handler.assert_called_once() 2787 exc = exc_handler.call_args[0][0]['exception'] 2788 with self.assertRaisesRegex(asyncio.InvalidStateError, 2789 r'step\(\): already done'): 2790 raise exc 2791 2792 coro.close() 2793 2794 def test_set_exception_causes_invalid_state(self): 2795 class MyExc(Exception): 2796 pass 2797 2798 Future = type(self).Future 2799 self.loop.call_exception_handler = exc_handler = mock.Mock() 2800 2801 async def foo(): 2802 await asyncio.sleep(0.1) 2803 return 10 2804 2805 coro = foo() 2806 task = self.new_task(self.loop, coro) 2807 Future.set_exception(task, MyExc()) 2808 2809 with self.assertRaises(MyExc): 2810 self.loop.run_until_complete(task) 2811 2812 exc_handler.assert_called_once() 2813 exc = exc_handler.call_args[0][0]['exception'] 2814 with self.assertRaisesRegex(asyncio.InvalidStateError, 2815 r'step\(\): already done'): 2816 raise exc 2817 2818 coro.close() 2819 2820 2821@unittest.skipUnless(hasattr(futures, '_CFuture') and 2822 hasattr(tasks, '_CTask'), 2823 'requires the C _asyncio module') 2824class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2825 test_utils.TestCase): 2826 2827 Task = getattr(tasks, '_CTask', None) 2828 Future = getattr(futures, '_CFuture', None) 2829 2830 @support.refcount_test 2831 def test_refleaks_in_task___init__(self): 2832 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2833 async def coro(): 2834 pass 2835 task = self.new_task(self.loop, coro()) 2836 self.loop.run_until_complete(task) 2837 refs_before = gettotalrefcount() 2838 for i in range(100): 2839 task.__init__(coro(), loop=self.loop) 2840 self.loop.run_until_complete(task) 2841 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2842 2843 def test_del__log_destroy_pending_segfault(self): 2844 async def coro(): 2845 pass 2846 task = self.new_task(self.loop, coro()) 2847 self.loop.run_until_complete(task) 2848 with self.assertRaises(AttributeError): 2849 del task._log_destroy_pending 2850 2851 2852@unittest.skipUnless(hasattr(futures, '_CFuture') and 2853 hasattr(tasks, '_CTask'), 2854 'requires the C _asyncio module') 2855@add_subclass_tests 2856class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2857 2858 Task = getattr(tasks, '_CTask', None) 2859 Future = getattr(futures, '_CFuture', None) 2860 2861 2862@unittest.skipUnless(hasattr(tasks, '_CTask'), 2863 'requires the C _asyncio module') 2864@add_subclass_tests 2865class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2866 2867 Task = getattr(tasks, '_CTask', None) 2868 Future = futures._PyFuture 2869 2870 2871@unittest.skipUnless(hasattr(futures, '_CFuture'), 2872 'requires the C _asyncio module') 2873@add_subclass_tests 2874class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2875 2876 Future = getattr(futures, '_CFuture', None) 2877 Task = tasks._PyTask 2878 2879 2880@unittest.skipUnless(hasattr(tasks, '_CTask'), 2881 'requires the C _asyncio module') 2882class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2883 2884 Task = getattr(tasks, '_CTask', None) 2885 Future = futures._PyFuture 2886 2887 2888@unittest.skipUnless(hasattr(futures, '_CFuture'), 2889 'requires the C _asyncio module') 2890class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2891 2892 Task = tasks._PyTask 2893 Future = getattr(futures, '_CFuture', None) 2894 2895 2896class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2897 test_utils.TestCase): 2898 2899 Task = tasks._PyTask 2900 Future = futures._PyFuture 2901 2902 2903@add_subclass_tests 2904class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2905 Task = tasks._PyTask 2906 Future = futures._PyFuture 2907 2908 2909@unittest.skipUnless(hasattr(tasks, '_CTask'), 2910 'requires the C _asyncio module') 2911class CTask_Future_Tests(test_utils.TestCase): 2912 2913 def test_foobar(self): 2914 class Fut(asyncio.Future): 2915 @property 2916 def get_loop(self): 2917 raise AttributeError 2918 2919 async def coro(): 2920 await fut 2921 return 'spam' 2922 2923 self.loop = asyncio.new_event_loop() 2924 try: 2925 fut = Fut(loop=self.loop) 2926 self.loop.call_later(0.1, fut.set_result, 1) 2927 task = self.loop.create_task(coro()) 2928 res = self.loop.run_until_complete(task) 2929 finally: 2930 self.loop.close() 2931 2932 self.assertEqual(res, 'spam') 2933 2934 2935class BaseTaskIntrospectionTests: 2936 _register_task = None 2937 _unregister_task = None 2938 _enter_task = None 2939 _leave_task = None 2940 2941 def test__register_task_1(self): 2942 class TaskLike: 2943 @property 2944 def _loop(self): 2945 return loop 2946 2947 def done(self): 2948 return False 2949 2950 task = TaskLike() 2951 loop = mock.Mock() 2952 2953 self.assertEqual(asyncio.all_tasks(loop), set()) 2954 self._register_task(task) 2955 self.assertEqual(asyncio.all_tasks(loop), {task}) 2956 self._unregister_task(task) 2957 2958 def test__register_task_2(self): 2959 class TaskLike: 2960 def get_loop(self): 2961 return loop 2962 2963 def done(self): 2964 return False 2965 2966 task = TaskLike() 2967 loop = mock.Mock() 2968 2969 self.assertEqual(asyncio.all_tasks(loop), set()) 2970 self._register_task(task) 2971 self.assertEqual(asyncio.all_tasks(loop), {task}) 2972 self._unregister_task(task) 2973 2974 def test__register_task_3(self): 2975 class TaskLike: 2976 def get_loop(self): 2977 return loop 2978 2979 def done(self): 2980 return True 2981 2982 task = TaskLike() 2983 loop = mock.Mock() 2984 2985 self.assertEqual(asyncio.all_tasks(loop), set()) 2986 self._register_task(task) 2987 self.assertEqual(asyncio.all_tasks(loop), set()) 2988 self._unregister_task(task) 2989 2990 def test__enter_task(self): 2991 task = mock.Mock() 2992 loop = mock.Mock() 2993 self.assertIsNone(asyncio.current_task(loop)) 2994 self._enter_task(loop, task) 2995 self.assertIs(asyncio.current_task(loop), task) 2996 self._leave_task(loop, task) 2997 2998 def test__enter_task_failure(self): 2999 task1 = mock.Mock() 3000 task2 = mock.Mock() 3001 loop = mock.Mock() 3002 self._enter_task(loop, task1) 3003 with self.assertRaises(RuntimeError): 3004 self._enter_task(loop, task2) 3005 self.assertIs(asyncio.current_task(loop), task1) 3006 self._leave_task(loop, task1) 3007 3008 def test__leave_task(self): 3009 task = mock.Mock() 3010 loop = mock.Mock() 3011 self._enter_task(loop, task) 3012 self._leave_task(loop, task) 3013 self.assertIsNone(asyncio.current_task(loop)) 3014 3015 def test__leave_task_failure1(self): 3016 task1 = mock.Mock() 3017 task2 = mock.Mock() 3018 loop = mock.Mock() 3019 self._enter_task(loop, task1) 3020 with self.assertRaises(RuntimeError): 3021 self._leave_task(loop, task2) 3022 self.assertIs(asyncio.current_task(loop), task1) 3023 self._leave_task(loop, task1) 3024 3025 def test__leave_task_failure2(self): 3026 task = mock.Mock() 3027 loop = mock.Mock() 3028 with self.assertRaises(RuntimeError): 3029 self._leave_task(loop, task) 3030 self.assertIsNone(asyncio.current_task(loop)) 3031 3032 def test__unregister_task(self): 3033 task = mock.Mock() 3034 loop = mock.Mock() 3035 task.get_loop = lambda: loop 3036 self._register_task(task) 3037 self._unregister_task(task) 3038 self.assertEqual(asyncio.all_tasks(loop), set()) 3039 3040 def test__unregister_task_not_registered(self): 3041 task = mock.Mock() 3042 loop = mock.Mock() 3043 self._unregister_task(task) 3044 self.assertEqual(asyncio.all_tasks(loop), set()) 3045 3046 3047class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3048 _register_task = staticmethod(tasks._py_register_task) 3049 _unregister_task = staticmethod(tasks._py_unregister_task) 3050 _enter_task = staticmethod(tasks._py_enter_task) 3051 _leave_task = staticmethod(tasks._py_leave_task) 3052 3053 3054@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 3055 'requires the C _asyncio module') 3056class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3057 if hasattr(tasks, '_c_register_task'): 3058 _register_task = staticmethod(tasks._c_register_task) 3059 _unregister_task = staticmethod(tasks._c_unregister_task) 3060 _enter_task = staticmethod(tasks._c_enter_task) 3061 _leave_task = staticmethod(tasks._c_leave_task) 3062 else: 3063 _register_task = _unregister_task = _enter_task = _leave_task = None 3064 3065 3066class BaseCurrentLoopTests: 3067 current_task = None 3068 3069 def setUp(self): 3070 super().setUp() 3071 self.loop = asyncio.new_event_loop() 3072 self.set_event_loop(self.loop) 3073 3074 def new_task(self, coro): 3075 raise NotImplementedError 3076 3077 def test_current_task_no_running_loop(self): 3078 self.assertIsNone(self.current_task(loop=self.loop)) 3079 3080 def test_current_task_no_running_loop_implicit(self): 3081 with self.assertRaisesRegex(RuntimeError, 'no running event loop'): 3082 self.current_task() 3083 3084 def test_current_task_with_implicit_loop(self): 3085 async def coro(): 3086 self.assertIs(self.current_task(loop=self.loop), task) 3087 3088 self.assertIs(self.current_task(None), task) 3089 self.assertIs(self.current_task(), task) 3090 3091 task = self.new_task(coro()) 3092 self.loop.run_until_complete(task) 3093 self.assertIsNone(self.current_task(loop=self.loop)) 3094 3095 3096class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3097 current_task = staticmethod(tasks._py_current_task) 3098 3099 def new_task(self, coro): 3100 return tasks._PyTask(coro, loop=self.loop) 3101 3102 3103@unittest.skipUnless(hasattr(tasks, '_CTask') and 3104 hasattr(tasks, '_c_current_task'), 3105 'requires the C _asyncio module') 3106class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3107 if hasattr(tasks, '_c_current_task'): 3108 current_task = staticmethod(tasks._c_current_task) 3109 else: 3110 current_task = None 3111 3112 def new_task(self, coro): 3113 return getattr(tasks, '_CTask')(coro, loop=self.loop) 3114 3115 3116class GenericTaskTests(test_utils.TestCase): 3117 3118 def test_future_subclass(self): 3119 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 3120 3121 @support.cpython_only 3122 def test_asyncio_module_compiled(self): 3123 # Because of circular imports it's easy to make _asyncio 3124 # module non-importable. This is a simple test that will 3125 # fail on systems where C modules were successfully compiled 3126 # (hence the test for _functools etc), but _asyncio somehow didn't. 3127 try: 3128 import _functools 3129 import _json 3130 import _pickle 3131 except ImportError: 3132 self.skipTest('C modules are not available') 3133 else: 3134 try: 3135 import _asyncio 3136 except ImportError: 3137 self.fail('_asyncio module is missing') 3138 3139 3140class GatherTestsBase: 3141 3142 def setUp(self): 3143 super().setUp() 3144 self.one_loop = self.new_test_loop() 3145 self.other_loop = self.new_test_loop() 3146 self.set_event_loop(self.one_loop, cleanup=False) 3147 3148 def _run_loop(self, loop): 3149 while loop._ready: 3150 test_utils.run_briefly(loop) 3151 3152 def _check_success(self, **kwargs): 3153 a, b, c = [self.one_loop.create_future() for i in range(3)] 3154 fut = self._gather(*self.wrap_futures(a, b, c), **kwargs) 3155 cb = test_utils.MockCallback() 3156 fut.add_done_callback(cb) 3157 b.set_result(1) 3158 a.set_result(2) 3159 self._run_loop(self.one_loop) 3160 self.assertEqual(cb.called, False) 3161 self.assertFalse(fut.done()) 3162 c.set_result(3) 3163 self._run_loop(self.one_loop) 3164 cb.assert_called_once_with(fut) 3165 self.assertEqual(fut.result(), [2, 1, 3]) 3166 3167 def test_success(self): 3168 self._check_success() 3169 self._check_success(return_exceptions=False) 3170 3171 def test_result_exception_success(self): 3172 self._check_success(return_exceptions=True) 3173 3174 def test_one_exception(self): 3175 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3176 fut = self._gather(*self.wrap_futures(a, b, c, d, e)) 3177 cb = test_utils.MockCallback() 3178 fut.add_done_callback(cb) 3179 exc = ZeroDivisionError() 3180 a.set_result(1) 3181 b.set_exception(exc) 3182 self._run_loop(self.one_loop) 3183 self.assertTrue(fut.done()) 3184 cb.assert_called_once_with(fut) 3185 self.assertIs(fut.exception(), exc) 3186 # Does nothing 3187 c.set_result(3) 3188 d.cancel() 3189 e.set_exception(RuntimeError()) 3190 e.exception() 3191 3192 def test_return_exceptions(self): 3193 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 3194 fut = self._gather(*self.wrap_futures(a, b, c, d), 3195 return_exceptions=True) 3196 cb = test_utils.MockCallback() 3197 fut.add_done_callback(cb) 3198 exc = ZeroDivisionError() 3199 exc2 = RuntimeError() 3200 b.set_result(1) 3201 c.set_exception(exc) 3202 a.set_result(3) 3203 self._run_loop(self.one_loop) 3204 self.assertFalse(fut.done()) 3205 d.set_exception(exc2) 3206 self._run_loop(self.one_loop) 3207 self.assertTrue(fut.done()) 3208 cb.assert_called_once_with(fut) 3209 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 3210 3211 def test_env_var_debug(self): 3212 code = '\n'.join(( 3213 'import asyncio.coroutines', 3214 'print(asyncio.coroutines._is_debug_mode())')) 3215 3216 # Test with -E to not fail if the unit test was run with 3217 # PYTHONASYNCIODEBUG set to a non-empty string 3218 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 3219 self.assertEqual(stdout.rstrip(), b'False') 3220 3221 sts, stdout, stderr = assert_python_ok('-c', code, 3222 PYTHONASYNCIODEBUG='', 3223 PYTHONDEVMODE='') 3224 self.assertEqual(stdout.rstrip(), b'False') 3225 3226 sts, stdout, stderr = assert_python_ok('-c', code, 3227 PYTHONASYNCIODEBUG='1', 3228 PYTHONDEVMODE='') 3229 self.assertEqual(stdout.rstrip(), b'True') 3230 3231 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 3232 PYTHONASYNCIODEBUG='1', 3233 PYTHONDEVMODE='') 3234 self.assertEqual(stdout.rstrip(), b'False') 3235 3236 # -X dev 3237 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 3238 '-c', code) 3239 self.assertEqual(stdout.rstrip(), b'True') 3240 3241 3242class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 3243 3244 def wrap_futures(self, *futures): 3245 return futures 3246 3247 def _gather(self, *args, **kwargs): 3248 return asyncio.gather(*args, **kwargs) 3249 3250 def test_constructor_empty_sequence_without_loop(self): 3251 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 3252 asyncio.gather() 3253 3254 def test_constructor_empty_sequence_use_running_loop(self): 3255 async def gather(): 3256 return asyncio.gather() 3257 fut = self.one_loop.run_until_complete(gather()) 3258 self.assertIsInstance(fut, asyncio.Future) 3259 self.assertIs(fut._loop, self.one_loop) 3260 self._run_loop(self.one_loop) 3261 self.assertTrue(fut.done()) 3262 self.assertEqual(fut.result(), []) 3263 3264 def test_constructor_empty_sequence_use_global_loop(self): 3265 # Deprecated in 3.10, undeprecated in 3.12 3266 asyncio.set_event_loop(self.one_loop) 3267 self.addCleanup(asyncio.set_event_loop, None) 3268 fut = asyncio.gather() 3269 self.assertIsInstance(fut, asyncio.Future) 3270 self.assertIs(fut._loop, self.one_loop) 3271 self._run_loop(self.one_loop) 3272 self.assertTrue(fut.done()) 3273 self.assertEqual(fut.result(), []) 3274 3275 def test_constructor_heterogenous_futures(self): 3276 fut1 = self.one_loop.create_future() 3277 fut2 = self.other_loop.create_future() 3278 with self.assertRaises(ValueError): 3279 asyncio.gather(fut1, fut2) 3280 3281 def test_constructor_homogenous_futures(self): 3282 children = [self.other_loop.create_future() for i in range(3)] 3283 fut = asyncio.gather(*children) 3284 self.assertIs(fut._loop, self.other_loop) 3285 self._run_loop(self.other_loop) 3286 self.assertFalse(fut.done()) 3287 fut = asyncio.gather(*children) 3288 self.assertIs(fut._loop, self.other_loop) 3289 self._run_loop(self.other_loop) 3290 self.assertFalse(fut.done()) 3291 3292 def test_one_cancellation(self): 3293 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3294 fut = asyncio.gather(a, b, c, d, e) 3295 cb = test_utils.MockCallback() 3296 fut.add_done_callback(cb) 3297 a.set_result(1) 3298 b.cancel() 3299 self._run_loop(self.one_loop) 3300 self.assertTrue(fut.done()) 3301 cb.assert_called_once_with(fut) 3302 self.assertFalse(fut.cancelled()) 3303 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3304 # Does nothing 3305 c.set_result(3) 3306 d.cancel() 3307 e.set_exception(RuntimeError()) 3308 e.exception() 3309 3310 def test_result_exception_one_cancellation(self): 3311 a, b, c, d, e, f = [self.one_loop.create_future() 3312 for i in range(6)] 3313 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3314 cb = test_utils.MockCallback() 3315 fut.add_done_callback(cb) 3316 a.set_result(1) 3317 zde = ZeroDivisionError() 3318 b.set_exception(zde) 3319 c.cancel() 3320 self._run_loop(self.one_loop) 3321 self.assertFalse(fut.done()) 3322 d.set_result(3) 3323 e.cancel() 3324 rte = RuntimeError() 3325 f.set_exception(rte) 3326 res = self.one_loop.run_until_complete(fut) 3327 self.assertIsInstance(res[2], asyncio.CancelledError) 3328 self.assertIsInstance(res[4], asyncio.CancelledError) 3329 res[2] = res[4] = None 3330 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3331 cb.assert_called_once_with(fut) 3332 3333 3334class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3335 3336 def wrap_futures(self, *futures): 3337 coros = [] 3338 for fut in futures: 3339 async def coro(fut=fut): 3340 return await fut 3341 coros.append(coro()) 3342 return coros 3343 3344 def _gather(self, *args, **kwargs): 3345 async def coro(): 3346 return asyncio.gather(*args, **kwargs) 3347 return self.one_loop.run_until_complete(coro()) 3348 3349 def test_constructor_without_loop(self): 3350 async def coro(): 3351 return 'abc' 3352 gen1 = coro() 3353 self.addCleanup(gen1.close) 3354 gen2 = coro() 3355 self.addCleanup(gen2.close) 3356 with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 3357 asyncio.gather(gen1, gen2) 3358 3359 def test_constructor_use_running_loop(self): 3360 async def coro(): 3361 return 'abc' 3362 gen1 = coro() 3363 gen2 = coro() 3364 async def gather(): 3365 return asyncio.gather(gen1, gen2) 3366 fut = self.one_loop.run_until_complete(gather()) 3367 self.assertIs(fut._loop, self.one_loop) 3368 self.one_loop.run_until_complete(fut) 3369 3370 def test_constructor_use_global_loop(self): 3371 # Deprecated in 3.10, undeprecated in 3.12 3372 async def coro(): 3373 return 'abc' 3374 asyncio.set_event_loop(self.other_loop) 3375 self.addCleanup(asyncio.set_event_loop, None) 3376 gen1 = coro() 3377 gen2 = coro() 3378 fut = asyncio.gather(gen1, gen2) 3379 self.assertIs(fut._loop, self.other_loop) 3380 self.other_loop.run_until_complete(fut) 3381 3382 def test_duplicate_coroutines(self): 3383 async def coro(s): 3384 return s 3385 c = coro('abc') 3386 fut = self._gather(c, c, coro('def'), c) 3387 self._run_loop(self.one_loop) 3388 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3389 3390 def test_cancellation_broadcast(self): 3391 # Cancelling outer() cancels all children. 3392 proof = 0 3393 waiter = self.one_loop.create_future() 3394 3395 async def inner(): 3396 nonlocal proof 3397 await waiter 3398 proof += 1 3399 3400 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3401 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3402 gatherer = None 3403 3404 async def outer(): 3405 nonlocal proof, gatherer 3406 gatherer = asyncio.gather(child1, child2) 3407 await gatherer 3408 proof += 100 3409 3410 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3411 test_utils.run_briefly(self.one_loop) 3412 self.assertTrue(f.cancel()) 3413 with self.assertRaises(asyncio.CancelledError): 3414 self.one_loop.run_until_complete(f) 3415 self.assertFalse(gatherer.cancel()) 3416 self.assertTrue(waiter.cancelled()) 3417 self.assertTrue(child1.cancelled()) 3418 self.assertTrue(child2.cancelled()) 3419 test_utils.run_briefly(self.one_loop) 3420 self.assertEqual(proof, 0) 3421 3422 def test_exception_marking(self): 3423 # Test for the first line marked "Mark exception retrieved." 3424 3425 async def inner(f): 3426 await f 3427 raise RuntimeError('should not be ignored') 3428 3429 a = self.one_loop.create_future() 3430 b = self.one_loop.create_future() 3431 3432 async def outer(): 3433 await asyncio.gather(inner(a), inner(b)) 3434 3435 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3436 test_utils.run_briefly(self.one_loop) 3437 a.set_result(None) 3438 test_utils.run_briefly(self.one_loop) 3439 b.set_result(None) 3440 test_utils.run_briefly(self.one_loop) 3441 self.assertIsInstance(f.exception(), RuntimeError) 3442 3443 def test_issue46672(self): 3444 with mock.patch( 3445 'asyncio.base_events.BaseEventLoop.call_exception_handler', 3446 ): 3447 async def coro(s): 3448 return s 3449 c = coro('abc') 3450 3451 with self.assertRaises(TypeError): 3452 self._gather(c, {}) 3453 self._run_loop(self.one_loop) 3454 # NameError should not happen: 3455 self.one_loop.call_exception_handler.assert_not_called() 3456 3457 3458class RunCoroutineThreadsafeTests(test_utils.TestCase): 3459 """Test case for asyncio.run_coroutine_threadsafe.""" 3460 3461 def setUp(self): 3462 super().setUp() 3463 self.loop = asyncio.new_event_loop() 3464 self.set_event_loop(self.loop) # Will cleanup properly 3465 3466 async def add(self, a, b, fail=False, cancel=False): 3467 """Wait 0.05 second and return a + b.""" 3468 await asyncio.sleep(0.05) 3469 if fail: 3470 raise RuntimeError("Fail!") 3471 if cancel: 3472 asyncio.current_task(self.loop).cancel() 3473 await asyncio.sleep(0) 3474 return a + b 3475 3476 def target(self, fail=False, cancel=False, timeout=None, 3477 advance_coro=False): 3478 """Run add coroutine in the event loop.""" 3479 coro = self.add(1, 2, fail=fail, cancel=cancel) 3480 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3481 if advance_coro: 3482 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3483 # otherwise it spills errors and breaks **other** unittests, since 3484 # 'target' is interacting with threads. 3485 3486 # With this call, `coro` will be advanced. 3487 self.loop.call_soon_threadsafe(coro.send, None) 3488 try: 3489 return future.result(timeout) 3490 finally: 3491 future.done() or future.cancel() 3492 3493 def test_run_coroutine_threadsafe(self): 3494 """Test coroutine submission from a thread to an event loop.""" 3495 future = self.loop.run_in_executor(None, self.target) 3496 result = self.loop.run_until_complete(future) 3497 self.assertEqual(result, 3) 3498 3499 def test_run_coroutine_threadsafe_with_exception(self): 3500 """Test coroutine submission from a thread to an event loop 3501 when an exception is raised.""" 3502 future = self.loop.run_in_executor(None, self.target, True) 3503 with self.assertRaises(RuntimeError) as exc_context: 3504 self.loop.run_until_complete(future) 3505 self.assertIn("Fail!", exc_context.exception.args) 3506 3507 def test_run_coroutine_threadsafe_with_timeout(self): 3508 """Test coroutine submission from a thread to an event loop 3509 when a timeout is raised.""" 3510 callback = lambda: self.target(timeout=0) 3511 future = self.loop.run_in_executor(None, callback) 3512 with self.assertRaises(asyncio.TimeoutError): 3513 self.loop.run_until_complete(future) 3514 test_utils.run_briefly(self.loop) 3515 # Check that there's no pending task (add has been cancelled) 3516 for task in asyncio.all_tasks(self.loop): 3517 self.assertTrue(task.done()) 3518 3519 def test_run_coroutine_threadsafe_task_cancelled(self): 3520 """Test coroutine submission from a thread to an event loop 3521 when the task is cancelled.""" 3522 callback = lambda: self.target(cancel=True) 3523 future = self.loop.run_in_executor(None, callback) 3524 with self.assertRaises(asyncio.CancelledError): 3525 self.loop.run_until_complete(future) 3526 3527 def test_run_coroutine_threadsafe_task_factory_exception(self): 3528 """Test coroutine submission from a thread to an event loop 3529 when the task factory raise an exception.""" 3530 3531 def task_factory(loop, coro): 3532 raise NameError 3533 3534 run = self.loop.run_in_executor( 3535 None, lambda: self.target(advance_coro=True)) 3536 3537 # Set exception handler 3538 callback = test_utils.MockCallback() 3539 self.loop.set_exception_handler(callback) 3540 3541 # Set corrupted task factory 3542 self.addCleanup(self.loop.set_task_factory, 3543 self.loop.get_task_factory()) 3544 self.loop.set_task_factory(task_factory) 3545 3546 # Run event loop 3547 with self.assertRaises(NameError) as exc_context: 3548 self.loop.run_until_complete(run) 3549 3550 # Check exceptions 3551 self.assertEqual(len(callback.call_args_list), 1) 3552 (loop, context), kwargs = callback.call_args 3553 self.assertEqual(context['exception'], exc_context.exception) 3554 3555 3556class SleepTests(test_utils.TestCase): 3557 def setUp(self): 3558 super().setUp() 3559 self.loop = asyncio.new_event_loop() 3560 self.set_event_loop(self.loop) 3561 3562 def tearDown(self): 3563 self.loop.close() 3564 self.loop = None 3565 super().tearDown() 3566 3567 def test_sleep_zero(self): 3568 result = 0 3569 3570 def inc_result(num): 3571 nonlocal result 3572 result += num 3573 3574 async def coro(): 3575 self.loop.call_soon(inc_result, 1) 3576 self.assertEqual(result, 0) 3577 num = await asyncio.sleep(0, result=10) 3578 self.assertEqual(result, 1) # inc'ed by call_soon 3579 inc_result(num) # num should be 11 3580 3581 self.loop.run_until_complete(coro()) 3582 self.assertEqual(result, 11) 3583 3584 3585class CompatibilityTests(test_utils.TestCase): 3586 # Tests for checking a bridge between old-styled coroutines 3587 # and async/await syntax 3588 3589 def setUp(self): 3590 super().setUp() 3591 self.loop = asyncio.new_event_loop() 3592 self.set_event_loop(self.loop) 3593 3594 def tearDown(self): 3595 self.loop.close() 3596 self.loop = None 3597 super().tearDown() 3598 3599 3600if __name__ == '__main__': 3601 unittest.main() 3602