1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import traceback 14import types 15import unittest 16import weakref 17from unittest import mock 18 19import asyncio 20from asyncio import coroutines 21from asyncio import futures 22from asyncio import tasks 23from test.test_asyncio import utils as test_utils 24from test import support 25from test.support.script_helper import assert_python_ok 26 27 28def tearDownModule(): 29 asyncio.set_event_loop_policy(None) 30 31 32async def coroutine_function(): 33 pass 34 35 36@contextlib.contextmanager 37def set_coroutine_debug(enabled): 38 coroutines = asyncio.coroutines 39 40 old_debug = coroutines._DEBUG 41 try: 42 coroutines._DEBUG = enabled 43 yield 44 finally: 45 coroutines._DEBUG = old_debug 46 47 48def format_coroutine(qualname, state, src, source_traceback, generator=False): 49 if generator: 50 state = '%s' % state 51 else: 52 state = '%s, defined' % state 53 if source_traceback is not None: 54 frame = source_traceback[-1] 55 return ('coro=<%s() %s at %s> created at %s:%s' 56 % (qualname, state, src, frame[0], frame[1])) 57 else: 58 return 'coro=<%s() %s at %s>' % (qualname, state, src) 59 60 61def get_innermost_context(exc): 62 """ 63 Return information about the innermost exception context in the chain. 64 """ 65 depth = 0 66 while True: 67 context = exc.__context__ 68 if context is None: 69 break 70 71 exc = context 72 depth += 1 73 74 return (type(exc), exc.args, depth) 75 76 77class Dummy: 78 79 def __repr__(self): 80 return '<Dummy>' 81 82 def __call__(self, *args): 83 pass 84 85 86class CoroLikeObject: 87 def send(self, v): 88 raise StopIteration(42) 89 90 def throw(self, *exc): 91 pass 92 93 def close(self): 94 pass 95 96 def __await__(self): 97 return self 98 99 100# The following value can be used as a very small timeout: 101# it passes check "timeout > 0", but has almost 102# no effect on the test performance 103_EPSILON = 0.0001 104 105 106class BaseTaskTests: 107 108 Task = None 109 Future = None 110 111 def new_task(self, loop, coro, name='TestTask'): 112 return self.__class__.Task(coro, loop=loop, name=name) 113 114 def new_future(self, loop): 115 return self.__class__.Future(loop=loop) 116 117 def setUp(self): 118 super().setUp() 119 self.loop = self.new_test_loop() 120 self.loop.set_task_factory(self.new_task) 121 self.loop.create_future = lambda: self.new_future(self.loop) 122 123 def test_task_cancel_message_getter(self): 124 async def coro(): 125 pass 126 t = self.new_task(self.loop, coro()) 127 self.assertTrue(hasattr(t, '_cancel_message')) 128 self.assertEqual(t._cancel_message, None) 129 130 t.cancel('my message') 131 self.assertEqual(t._cancel_message, 'my message') 132 133 with self.assertRaises(asyncio.CancelledError): 134 self.loop.run_until_complete(t) 135 136 def test_task_cancel_message_setter(self): 137 async def coro(): 138 pass 139 t = self.new_task(self.loop, coro()) 140 t.cancel('my message') 141 t._cancel_message = 'my new message' 142 self.assertEqual(t._cancel_message, 'my new message') 143 144 with self.assertRaises(asyncio.CancelledError): 145 self.loop.run_until_complete(t) 146 147 def test_task_del_collect(self): 148 class Evil: 149 def __del__(self): 150 gc.collect() 151 152 async def run(): 153 return Evil() 154 155 self.loop.run_until_complete( 156 asyncio.gather(*[ 157 self.new_task(self.loop, run()) for _ in range(100) 158 ])) 159 160 def test_other_loop_future(self): 161 other_loop = asyncio.new_event_loop() 162 fut = self.new_future(other_loop) 163 164 async def run(fut): 165 await fut 166 167 try: 168 with self.assertRaisesRegex(RuntimeError, 169 r'Task .* got Future .* attached'): 170 self.loop.run_until_complete(run(fut)) 171 finally: 172 other_loop.close() 173 174 def test_task_awaits_on_itself(self): 175 176 async def test(): 177 await task 178 179 task = asyncio.ensure_future(test(), loop=self.loop) 180 181 with self.assertRaisesRegex(RuntimeError, 182 'Task cannot await on itself'): 183 self.loop.run_until_complete(task) 184 185 def test_task_class(self): 186 async def notmuch(): 187 return 'ok' 188 t = self.new_task(self.loop, notmuch()) 189 self.loop.run_until_complete(t) 190 self.assertTrue(t.done()) 191 self.assertEqual(t.result(), 'ok') 192 self.assertIs(t._loop, self.loop) 193 self.assertIs(t.get_loop(), self.loop) 194 195 loop = asyncio.new_event_loop() 196 self.set_event_loop(loop) 197 t = self.new_task(loop, notmuch()) 198 self.assertIs(t._loop, loop) 199 loop.run_until_complete(t) 200 loop.close() 201 202 def test_ensure_future_coroutine(self): 203 async def notmuch(): 204 return 'ok' 205 t = asyncio.ensure_future(notmuch(), loop=self.loop) 206 self.assertIs(t._loop, self.loop) 207 self.loop.run_until_complete(t) 208 self.assertTrue(t.done()) 209 self.assertEqual(t.result(), 'ok') 210 211 a = notmuch() 212 self.addCleanup(a.close) 213 with self.assertWarns(DeprecationWarning) as cm: 214 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 215 asyncio.ensure_future(a) 216 self.assertEqual(cm.warnings[0].filename, __file__) 217 218 async def test(): 219 return asyncio.ensure_future(notmuch()) 220 t = self.loop.run_until_complete(test()) 221 self.assertIs(t._loop, self.loop) 222 self.loop.run_until_complete(t) 223 self.assertTrue(t.done()) 224 self.assertEqual(t.result(), 'ok') 225 226 # Deprecated in 3.10 227 asyncio.set_event_loop(self.loop) 228 self.addCleanup(asyncio.set_event_loop, None) 229 with self.assertWarns(DeprecationWarning) as cm: 230 t = asyncio.ensure_future(notmuch()) 231 self.assertEqual(cm.warnings[0].filename, __file__) 232 self.assertIs(t._loop, self.loop) 233 self.loop.run_until_complete(t) 234 self.assertTrue(t.done()) 235 self.assertEqual(t.result(), 'ok') 236 237 def test_ensure_future_coroutine_2(self): 238 with self.assertWarns(DeprecationWarning): 239 @asyncio.coroutine 240 def notmuch(): 241 return 'ok' 242 t = asyncio.ensure_future(notmuch(), loop=self.loop) 243 self.assertIs(t._loop, self.loop) 244 self.loop.run_until_complete(t) 245 self.assertTrue(t.done()) 246 self.assertEqual(t.result(), 'ok') 247 248 a = notmuch() 249 self.addCleanup(a.close) 250 with self.assertWarns(DeprecationWarning) as cm: 251 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 252 asyncio.ensure_future(a) 253 self.assertEqual(cm.warnings[0].filename, __file__) 254 255 async def test(): 256 return asyncio.ensure_future(notmuch()) 257 t = self.loop.run_until_complete(test()) 258 self.assertIs(t._loop, self.loop) 259 self.loop.run_until_complete(t) 260 self.assertTrue(t.done()) 261 self.assertEqual(t.result(), 'ok') 262 263 # Deprecated in 3.10 264 asyncio.set_event_loop(self.loop) 265 self.addCleanup(asyncio.set_event_loop, None) 266 with self.assertWarns(DeprecationWarning) as cm: 267 t = asyncio.ensure_future(notmuch()) 268 self.assertEqual(cm.warnings[0].filename, __file__) 269 self.assertIs(t._loop, self.loop) 270 self.loop.run_until_complete(t) 271 self.assertTrue(t.done()) 272 self.assertEqual(t.result(), 'ok') 273 274 def test_ensure_future_future(self): 275 f_orig = self.new_future(self.loop) 276 f_orig.set_result('ko') 277 278 f = asyncio.ensure_future(f_orig) 279 self.loop.run_until_complete(f) 280 self.assertTrue(f.done()) 281 self.assertEqual(f.result(), 'ko') 282 self.assertIs(f, f_orig) 283 284 loop = asyncio.new_event_loop() 285 self.set_event_loop(loop) 286 287 with self.assertRaises(ValueError): 288 f = asyncio.ensure_future(f_orig, loop=loop) 289 290 loop.close() 291 292 f = asyncio.ensure_future(f_orig, loop=self.loop) 293 self.assertIs(f, f_orig) 294 295 def test_ensure_future_task(self): 296 async def notmuch(): 297 return 'ok' 298 t_orig = self.new_task(self.loop, notmuch()) 299 t = asyncio.ensure_future(t_orig) 300 self.loop.run_until_complete(t) 301 self.assertTrue(t.done()) 302 self.assertEqual(t.result(), 'ok') 303 self.assertIs(t, t_orig) 304 305 loop = asyncio.new_event_loop() 306 self.set_event_loop(loop) 307 308 with self.assertRaises(ValueError): 309 t = asyncio.ensure_future(t_orig, loop=loop) 310 311 loop.close() 312 313 t = asyncio.ensure_future(t_orig, loop=self.loop) 314 self.assertIs(t, t_orig) 315 316 def test_ensure_future_awaitable(self): 317 class Aw: 318 def __init__(self, coro): 319 self.coro = coro 320 def __await__(self): 321 return (yield from self.coro) 322 323 with self.assertWarns(DeprecationWarning): 324 @asyncio.coroutine 325 def coro(): 326 return 'ok' 327 328 loop = asyncio.new_event_loop() 329 self.set_event_loop(loop) 330 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 331 loop.run_until_complete(fut) 332 self.assertEqual(fut.result(), 'ok') 333 334 def test_ensure_future_neither(self): 335 with self.assertRaises(TypeError): 336 asyncio.ensure_future('ok') 337 338 def test_ensure_future_error_msg(self): 339 loop = asyncio.new_event_loop() 340 f = self.new_future(self.loop) 341 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 342 'different loop than the one specified as ' 343 'the loop argument'): 344 asyncio.ensure_future(f, loop=loop) 345 loop.close() 346 347 def test_get_stack(self): 348 T = None 349 350 async def foo(): 351 await bar() 352 353 async def bar(): 354 # test get_stack() 355 f = T.get_stack(limit=1) 356 try: 357 self.assertEqual(f[0].f_code.co_name, 'foo') 358 finally: 359 f = None 360 361 # test print_stack() 362 file = io.StringIO() 363 T.print_stack(limit=1, file=file) 364 file.seek(0) 365 tb = file.read() 366 self.assertRegex(tb, r'foo\(\) running') 367 368 async def runner(): 369 nonlocal T 370 T = asyncio.ensure_future(foo(), loop=self.loop) 371 await T 372 373 self.loop.run_until_complete(runner()) 374 375 def test_task_repr(self): 376 self.loop.set_debug(False) 377 378 async def notmuch(): 379 return 'abc' 380 381 # test coroutine function 382 self.assertEqual(notmuch.__name__, 'notmuch') 383 self.assertRegex(notmuch.__qualname__, 384 r'\w+.test_task_repr.<locals>.notmuch') 385 self.assertEqual(notmuch.__module__, __name__) 386 387 filename, lineno = test_utils.get_function_source(notmuch) 388 src = "%s:%s" % (filename, lineno) 389 390 # test coroutine object 391 gen = notmuch() 392 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 393 self.assertEqual(gen.__name__, 'notmuch') 394 self.assertEqual(gen.__qualname__, coro_qualname) 395 396 # test pending Task 397 t = self.new_task(self.loop, gen) 398 t.add_done_callback(Dummy()) 399 400 coro = format_coroutine(coro_qualname, 'running', src, 401 t._source_traceback, generator=True) 402 self.assertEqual(repr(t), 403 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 404 405 # test cancelling Task 406 t.cancel() # Does not take immediate effect! 407 self.assertEqual(repr(t), 408 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 409 410 # test cancelled Task 411 self.assertRaises(asyncio.CancelledError, 412 self.loop.run_until_complete, t) 413 coro = format_coroutine(coro_qualname, 'done', src, 414 t._source_traceback) 415 self.assertEqual(repr(t), 416 "<Task cancelled name='TestTask' %s>" % coro) 417 418 # test finished Task 419 t = self.new_task(self.loop, notmuch()) 420 self.loop.run_until_complete(t) 421 coro = format_coroutine(coro_qualname, 'done', src, 422 t._source_traceback) 423 self.assertEqual(repr(t), 424 "<Task finished name='TestTask' %s result='abc'>" % coro) 425 426 def test_task_repr_autogenerated(self): 427 async def notmuch(): 428 return 123 429 430 t1 = self.new_task(self.loop, notmuch(), None) 431 t2 = self.new_task(self.loop, notmuch(), None) 432 self.assertNotEqual(repr(t1), repr(t2)) 433 434 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 435 self.assertIsNotNone(match1) 436 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 437 self.assertIsNotNone(match2) 438 439 # Autogenerated task names should have monotonically increasing numbers 440 self.assertLess(int(match1.group(1)), int(match2.group(1))) 441 self.loop.run_until_complete(t1) 442 self.loop.run_until_complete(t2) 443 444 def test_task_repr_name_not_str(self): 445 async def notmuch(): 446 return 123 447 448 t = self.new_task(self.loop, notmuch()) 449 t.set_name({6}) 450 self.assertEqual(t.get_name(), '{6}') 451 self.loop.run_until_complete(t) 452 453 def test_task_repr_coro_decorator(self): 454 self.loop.set_debug(False) 455 456 with self.assertWarns(DeprecationWarning): 457 @asyncio.coroutine 458 def notmuch(): 459 # notmuch() function doesn't use yield from: it will be wrapped by 460 # @coroutine decorator 461 return 123 462 463 # test coroutine function 464 self.assertEqual(notmuch.__name__, 'notmuch') 465 self.assertRegex(notmuch.__qualname__, 466 r'\w+.test_task_repr_coro_decorator' 467 r'\.<locals>\.notmuch') 468 self.assertEqual(notmuch.__module__, __name__) 469 470 # test coroutine object 471 gen = notmuch() 472 # On Python >= 3.5, generators now inherit the name of the 473 # function, as expected, and have a qualified name (__qualname__ 474 # attribute). 475 coro_name = 'notmuch' 476 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 477 '.<locals>.notmuch') 478 self.assertEqual(gen.__name__, coro_name) 479 self.assertEqual(gen.__qualname__, coro_qualname) 480 481 # test repr(CoroWrapper) 482 if coroutines._DEBUG: 483 # format the coroutine object 484 if coroutines._DEBUG: 485 filename, lineno = test_utils.get_function_source(notmuch) 486 frame = gen._source_traceback[-1] 487 coro = ('%s() running, defined at %s:%s, created at %s:%s' 488 % (coro_qualname, filename, lineno, 489 frame[0], frame[1])) 490 else: 491 code = gen.gi_code 492 coro = ('%s() running at %s:%s' 493 % (coro_qualname, code.co_filename, 494 code.co_firstlineno)) 495 496 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 497 498 # test pending Task 499 t = self.new_task(self.loop, gen) 500 t.add_done_callback(Dummy()) 501 502 # format the coroutine object 503 if coroutines._DEBUG: 504 src = '%s:%s' % test_utils.get_function_source(notmuch) 505 else: 506 code = gen.gi_code 507 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 508 coro = format_coroutine(coro_qualname, 'running', src, 509 t._source_traceback, 510 generator=not coroutines._DEBUG) 511 self.assertEqual(repr(t), 512 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 513 self.loop.run_until_complete(t) 514 515 def test_task_repr_wait_for(self): 516 self.loop.set_debug(False) 517 518 async def wait_for(fut): 519 return await fut 520 521 fut = self.new_future(self.loop) 522 task = self.new_task(self.loop, wait_for(fut)) 523 test_utils.run_briefly(self.loop) 524 self.assertRegex(repr(task), 525 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 526 527 fut.set_result(None) 528 self.loop.run_until_complete(task) 529 530 def test_task_repr_partial_corowrapper(self): 531 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 532 # coroutine is a partial function 533 with set_coroutine_debug(True): 534 self.loop.set_debug(True) 535 536 async def func(x, y): 537 await asyncio.sleep(0) 538 539 with self.assertWarns(DeprecationWarning): 540 partial_func = asyncio.coroutine(functools.partial(func, 1)) 541 task = self.loop.create_task(partial_func(2)) 542 543 # make warnings quiet 544 task._log_destroy_pending = False 545 self.addCleanup(task._coro.close) 546 547 coro_repr = repr(task._coro) 548 expected = ( 549 r'<coroutine object \w+\.test_task_repr_partial_corowrapper' 550 r'\.<locals>\.func at' 551 ) 552 self.assertRegex(coro_repr, expected) 553 554 def test_task_basics(self): 555 556 async def outer(): 557 a = await inner1() 558 b = await inner2() 559 return a+b 560 561 async def inner1(): 562 return 42 563 564 async def inner2(): 565 return 1000 566 567 t = outer() 568 self.assertEqual(self.loop.run_until_complete(t), 1042) 569 570 def test_exception_chaining_after_await(self): 571 # Test that when awaiting on a task when an exception is already 572 # active, if the task raises an exception it will be chained 573 # with the original. 574 loop = asyncio.new_event_loop() 575 self.set_event_loop(loop) 576 577 async def raise_error(): 578 raise ValueError 579 580 async def run(): 581 try: 582 raise KeyError(3) 583 except Exception as exc: 584 task = self.new_task(loop, raise_error()) 585 try: 586 await task 587 except Exception as exc: 588 self.assertEqual(type(exc), ValueError) 589 chained = exc.__context__ 590 self.assertEqual((type(chained), chained.args), 591 (KeyError, (3,))) 592 593 try: 594 task = self.new_task(loop, run()) 595 loop.run_until_complete(task) 596 finally: 597 loop.close() 598 599 def test_exception_chaining_after_await_with_context_cycle(self): 600 # Check trying to create an exception context cycle: 601 # https://bugs.python.org/issue40696 602 has_cycle = None 603 loop = asyncio.new_event_loop() 604 self.set_event_loop(loop) 605 606 async def process_exc(exc): 607 raise exc 608 609 async def run(): 610 nonlocal has_cycle 611 try: 612 raise KeyError('a') 613 except Exception as exc: 614 task = self.new_task(loop, process_exc(exc)) 615 try: 616 await task 617 except BaseException as exc: 618 has_cycle = (exc is exc.__context__) 619 # Prevent a hang if has_cycle is True. 620 exc.__context__ = None 621 622 try: 623 task = self.new_task(loop, run()) 624 loop.run_until_complete(task) 625 finally: 626 loop.close() 627 # This also distinguishes from the initial has_cycle=None. 628 self.assertEqual(has_cycle, False) 629 630 def test_cancel(self): 631 632 def gen(): 633 when = yield 634 self.assertAlmostEqual(10.0, when) 635 yield 0 636 637 loop = self.new_test_loop(gen) 638 639 async def task(): 640 await asyncio.sleep(10.0) 641 return 12 642 643 t = self.new_task(loop, task()) 644 loop.call_soon(t.cancel) 645 with self.assertRaises(asyncio.CancelledError): 646 loop.run_until_complete(t) 647 self.assertTrue(t.done()) 648 self.assertTrue(t.cancelled()) 649 self.assertFalse(t.cancel()) 650 651 def test_cancel_with_message_then_future_result(self): 652 # Test Future.result() after calling cancel() with a message. 653 cases = [ 654 ((), ()), 655 ((None,), ()), 656 (('my message',), ('my message',)), 657 # Non-string values should roundtrip. 658 ((5,), (5,)), 659 ] 660 for cancel_args, expected_args in cases: 661 with self.subTest(cancel_args=cancel_args): 662 loop = asyncio.new_event_loop() 663 self.set_event_loop(loop) 664 665 async def sleep(): 666 await asyncio.sleep(10) 667 668 async def coro(): 669 task = self.new_task(loop, sleep()) 670 await asyncio.sleep(0) 671 task.cancel(*cancel_args) 672 done, pending = await asyncio.wait([task]) 673 task.result() 674 675 task = self.new_task(loop, coro()) 676 with self.assertRaises(asyncio.CancelledError) as cm: 677 loop.run_until_complete(task) 678 exc = cm.exception 679 self.assertEqual(exc.args, ()) 680 681 actual = get_innermost_context(exc) 682 self.assertEqual(actual, 683 (asyncio.CancelledError, expected_args, 2)) 684 685 def test_cancel_with_message_then_future_exception(self): 686 # Test Future.exception() after calling cancel() with a message. 687 cases = [ 688 ((), ()), 689 ((None,), ()), 690 (('my message',), ('my message',)), 691 # Non-string values should roundtrip. 692 ((5,), (5,)), 693 ] 694 for cancel_args, expected_args in cases: 695 with self.subTest(cancel_args=cancel_args): 696 loop = asyncio.new_event_loop() 697 self.set_event_loop(loop) 698 699 async def sleep(): 700 await asyncio.sleep(10) 701 702 async def coro(): 703 task = self.new_task(loop, sleep()) 704 await asyncio.sleep(0) 705 task.cancel(*cancel_args) 706 done, pending = await asyncio.wait([task]) 707 task.exception() 708 709 task = self.new_task(loop, coro()) 710 with self.assertRaises(asyncio.CancelledError) as cm: 711 loop.run_until_complete(task) 712 exc = cm.exception 713 self.assertEqual(exc.args, ()) 714 715 actual = get_innermost_context(exc) 716 self.assertEqual(actual, 717 (asyncio.CancelledError, expected_args, 2)) 718 719 def test_cancel_with_message_before_starting_task(self): 720 loop = asyncio.new_event_loop() 721 self.set_event_loop(loop) 722 723 async def sleep(): 724 await asyncio.sleep(10) 725 726 async def coro(): 727 task = self.new_task(loop, sleep()) 728 # We deliberately leave out the sleep here. 729 task.cancel('my message') 730 done, pending = await asyncio.wait([task]) 731 task.exception() 732 733 task = self.new_task(loop, coro()) 734 with self.assertRaises(asyncio.CancelledError) as cm: 735 loop.run_until_complete(task) 736 exc = cm.exception 737 self.assertEqual(exc.args, ()) 738 739 actual = get_innermost_context(exc) 740 self.assertEqual(actual, 741 (asyncio.CancelledError, ('my message',), 2)) 742 743 def test_cancel_yield(self): 744 with self.assertWarns(DeprecationWarning): 745 @asyncio.coroutine 746 def task(): 747 yield 748 yield 749 return 12 750 751 t = self.new_task(self.loop, task()) 752 test_utils.run_briefly(self.loop) # start coro 753 t.cancel() 754 self.assertRaises( 755 asyncio.CancelledError, self.loop.run_until_complete, t) 756 self.assertTrue(t.done()) 757 self.assertTrue(t.cancelled()) 758 self.assertFalse(t.cancel()) 759 760 def test_cancel_inner_future(self): 761 f = self.new_future(self.loop) 762 763 async def task(): 764 await f 765 return 12 766 767 t = self.new_task(self.loop, task()) 768 test_utils.run_briefly(self.loop) # start task 769 f.cancel() 770 with self.assertRaises(asyncio.CancelledError): 771 self.loop.run_until_complete(t) 772 self.assertTrue(f.cancelled()) 773 self.assertTrue(t.cancelled()) 774 775 def test_cancel_both_task_and_inner_future(self): 776 f = self.new_future(self.loop) 777 778 async def task(): 779 await f 780 return 12 781 782 t = self.new_task(self.loop, task()) 783 test_utils.run_briefly(self.loop) 784 785 f.cancel() 786 t.cancel() 787 788 with self.assertRaises(asyncio.CancelledError): 789 self.loop.run_until_complete(t) 790 791 self.assertTrue(t.done()) 792 self.assertTrue(f.cancelled()) 793 self.assertTrue(t.cancelled()) 794 795 def test_cancel_task_catching(self): 796 fut1 = self.new_future(self.loop) 797 fut2 = self.new_future(self.loop) 798 799 async def task(): 800 await fut1 801 try: 802 await fut2 803 except asyncio.CancelledError: 804 return 42 805 806 t = self.new_task(self.loop, task()) 807 test_utils.run_briefly(self.loop) 808 self.assertIs(t._fut_waiter, fut1) # White-box test. 809 fut1.set_result(None) 810 test_utils.run_briefly(self.loop) 811 self.assertIs(t._fut_waiter, fut2) # White-box test. 812 t.cancel() 813 self.assertTrue(fut2.cancelled()) 814 res = self.loop.run_until_complete(t) 815 self.assertEqual(res, 42) 816 self.assertFalse(t.cancelled()) 817 818 def test_cancel_task_ignoring(self): 819 fut1 = self.new_future(self.loop) 820 fut2 = self.new_future(self.loop) 821 fut3 = self.new_future(self.loop) 822 823 async def task(): 824 await fut1 825 try: 826 await fut2 827 except asyncio.CancelledError: 828 pass 829 res = await fut3 830 return res 831 832 t = self.new_task(self.loop, task()) 833 test_utils.run_briefly(self.loop) 834 self.assertIs(t._fut_waiter, fut1) # White-box test. 835 fut1.set_result(None) 836 test_utils.run_briefly(self.loop) 837 self.assertIs(t._fut_waiter, fut2) # White-box test. 838 t.cancel() 839 self.assertTrue(fut2.cancelled()) 840 test_utils.run_briefly(self.loop) 841 self.assertIs(t._fut_waiter, fut3) # White-box test. 842 fut3.set_result(42) 843 res = self.loop.run_until_complete(t) 844 self.assertEqual(res, 42) 845 self.assertFalse(fut3.cancelled()) 846 self.assertFalse(t.cancelled()) 847 848 def test_cancel_current_task(self): 849 loop = asyncio.new_event_loop() 850 self.set_event_loop(loop) 851 852 async def task(): 853 t.cancel() 854 self.assertTrue(t._must_cancel) # White-box test. 855 # The sleep should be cancelled immediately. 856 await asyncio.sleep(100) 857 return 12 858 859 t = self.new_task(loop, task()) 860 self.assertFalse(t.cancelled()) 861 self.assertRaises( 862 asyncio.CancelledError, loop.run_until_complete, t) 863 self.assertTrue(t.done()) 864 self.assertTrue(t.cancelled()) 865 self.assertFalse(t._must_cancel) # White-box test. 866 self.assertFalse(t.cancel()) 867 868 def test_cancel_at_end(self): 869 """coroutine end right after task is cancelled""" 870 loop = asyncio.new_event_loop() 871 self.set_event_loop(loop) 872 873 async def task(): 874 t.cancel() 875 self.assertTrue(t._must_cancel) # White-box test. 876 return 12 877 878 t = self.new_task(loop, task()) 879 self.assertFalse(t.cancelled()) 880 self.assertRaises( 881 asyncio.CancelledError, loop.run_until_complete, t) 882 self.assertTrue(t.done()) 883 self.assertTrue(t.cancelled()) 884 self.assertFalse(t._must_cancel) # White-box test. 885 self.assertFalse(t.cancel()) 886 887 def test_cancel_awaited_task(self): 888 # This tests for a relatively rare condition when 889 # a task cancellation is requested for a task which is not 890 # currently blocked, such as a task cancelling itself. 891 # In this situation we must ensure that whatever next future 892 # or task the cancelled task blocks on is cancelled correctly 893 # as well. See also bpo-34872. 894 loop = asyncio.new_event_loop() 895 self.addCleanup(lambda: loop.close()) 896 897 task = nested_task = None 898 fut = self.new_future(loop) 899 900 async def nested(): 901 await fut 902 903 async def coro(): 904 nonlocal nested_task 905 # Create a sub-task and wait for it to run. 906 nested_task = self.new_task(loop, nested()) 907 await asyncio.sleep(0) 908 909 # Request the current task to be cancelled. 910 task.cancel() 911 # Block on the nested task, which should be immediately 912 # cancelled. 913 await nested_task 914 915 task = self.new_task(loop, coro()) 916 with self.assertRaises(asyncio.CancelledError): 917 loop.run_until_complete(task) 918 919 self.assertTrue(task.cancelled()) 920 self.assertTrue(nested_task.cancelled()) 921 self.assertTrue(fut.cancelled()) 922 923 def assert_text_contains(self, text, substr): 924 if substr not in text: 925 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 926 927 def test_cancel_traceback_for_future_result(self): 928 # When calling Future.result() on a cancelled task, check that the 929 # line of code that was interrupted is included in the traceback. 930 loop = asyncio.new_event_loop() 931 self.set_event_loop(loop) 932 933 async def nested(): 934 # This will get cancelled immediately. 935 await asyncio.sleep(10) 936 937 async def coro(): 938 task = self.new_task(loop, nested()) 939 await asyncio.sleep(0) 940 task.cancel() 941 await task # search target 942 943 task = self.new_task(loop, coro()) 944 try: 945 loop.run_until_complete(task) 946 except asyncio.CancelledError: 947 tb = traceback.format_exc() 948 self.assert_text_contains(tb, "await asyncio.sleep(10)") 949 # The intermediate await should also be included. 950 self.assert_text_contains(tb, "await task # search target") 951 else: 952 self.fail('CancelledError did not occur') 953 954 def test_cancel_traceback_for_future_exception(self): 955 # When calling Future.exception() on a cancelled task, check that the 956 # line of code that was interrupted is included in the traceback. 957 loop = asyncio.new_event_loop() 958 self.set_event_loop(loop) 959 960 async def nested(): 961 # This will get cancelled immediately. 962 await asyncio.sleep(10) 963 964 async def coro(): 965 task = self.new_task(loop, nested()) 966 await asyncio.sleep(0) 967 task.cancel() 968 done, pending = await asyncio.wait([task]) 969 task.exception() # search target 970 971 task = self.new_task(loop, coro()) 972 try: 973 loop.run_until_complete(task) 974 except asyncio.CancelledError: 975 tb = traceback.format_exc() 976 self.assert_text_contains(tb, "await asyncio.sleep(10)") 977 # The intermediate await should also be included. 978 self.assert_text_contains(tb, 979 "task.exception() # search target") 980 else: 981 self.fail('CancelledError did not occur') 982 983 def test_stop_while_run_in_complete(self): 984 985 def gen(): 986 when = yield 987 self.assertAlmostEqual(0.1, when) 988 when = yield 0.1 989 self.assertAlmostEqual(0.2, when) 990 when = yield 0.1 991 self.assertAlmostEqual(0.3, when) 992 yield 0.1 993 994 loop = self.new_test_loop(gen) 995 996 x = 0 997 998 async def task(): 999 nonlocal x 1000 while x < 10: 1001 await asyncio.sleep(0.1) 1002 x += 1 1003 if x == 2: 1004 loop.stop() 1005 1006 t = self.new_task(loop, task()) 1007 with self.assertRaises(RuntimeError) as cm: 1008 loop.run_until_complete(t) 1009 self.assertEqual(str(cm.exception), 1010 'Event loop stopped before Future completed.') 1011 self.assertFalse(t.done()) 1012 self.assertEqual(x, 2) 1013 self.assertAlmostEqual(0.3, loop.time()) 1014 1015 t.cancel() 1016 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 1017 1018 def test_log_traceback(self): 1019 async def coro(): 1020 pass 1021 1022 task = self.new_task(self.loop, coro()) 1023 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 1024 task._log_traceback = True 1025 self.loop.run_until_complete(task) 1026 1027 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 1028 def gen(): 1029 when = yield 1030 self.assertAlmostEqual(0, when) 1031 1032 loop = self.new_test_loop(gen) 1033 1034 fut = self.new_future(loop) 1035 fut.set_result('done') 1036 1037 ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) 1038 1039 self.assertEqual(ret, 'done') 1040 self.assertTrue(fut.done()) 1041 self.assertAlmostEqual(0, loop.time()) 1042 1043 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 1044 def gen(): 1045 when = yield 1046 self.assertAlmostEqual(0, when) 1047 1048 loop = self.new_test_loop(gen) 1049 1050 foo_started = False 1051 1052 async def foo(): 1053 nonlocal foo_started 1054 foo_started = True 1055 1056 with self.assertRaises(asyncio.TimeoutError): 1057 loop.run_until_complete(asyncio.wait_for(foo(), 0)) 1058 1059 self.assertAlmostEqual(0, loop.time()) 1060 self.assertEqual(foo_started, False) 1061 1062 def test_wait_for_timeout_less_then_0_or_0(self): 1063 def gen(): 1064 when = yield 1065 self.assertAlmostEqual(0.2, when) 1066 when = yield 0 1067 self.assertAlmostEqual(0, when) 1068 1069 for timeout in [0, -1]: 1070 with self.subTest(timeout=timeout): 1071 loop = self.new_test_loop(gen) 1072 1073 foo_running = None 1074 1075 async def foo(): 1076 nonlocal foo_running 1077 foo_running = True 1078 try: 1079 await asyncio.sleep(0.2) 1080 finally: 1081 foo_running = False 1082 return 'done' 1083 1084 fut = self.new_task(loop, foo()) 1085 1086 with self.assertRaises(asyncio.TimeoutError): 1087 loop.run_until_complete(asyncio.wait_for(fut, timeout)) 1088 self.assertTrue(fut.done()) 1089 # it should have been cancelled due to the timeout 1090 self.assertTrue(fut.cancelled()) 1091 self.assertAlmostEqual(0, loop.time()) 1092 self.assertEqual(foo_running, False) 1093 1094 def test_wait_for(self): 1095 1096 def gen(): 1097 when = yield 1098 self.assertAlmostEqual(0.2, when) 1099 when = yield 0 1100 self.assertAlmostEqual(0.1, when) 1101 when = yield 0.1 1102 1103 loop = self.new_test_loop(gen) 1104 1105 foo_running = None 1106 1107 async def foo(): 1108 nonlocal foo_running 1109 foo_running = True 1110 try: 1111 await asyncio.sleep(0.2) 1112 finally: 1113 foo_running = False 1114 return 'done' 1115 1116 fut = self.new_task(loop, foo()) 1117 1118 with self.assertRaises(asyncio.TimeoutError): 1119 loop.run_until_complete(asyncio.wait_for(fut, 0.1)) 1120 self.assertTrue(fut.done()) 1121 # it should have been cancelled due to the timeout 1122 self.assertTrue(fut.cancelled()) 1123 self.assertAlmostEqual(0.1, loop.time()) 1124 self.assertEqual(foo_running, False) 1125 1126 def test_wait_for_blocking(self): 1127 loop = self.new_test_loop() 1128 1129 async def coro(): 1130 return 'done' 1131 1132 res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) 1133 self.assertEqual(res, 'done') 1134 1135 def test_wait_for_race_condition(self): 1136 1137 def gen(): 1138 yield 0.1 1139 yield 0.1 1140 yield 0.1 1141 1142 loop = self.new_test_loop(gen) 1143 1144 fut = self.new_future(loop) 1145 task = asyncio.wait_for(fut, timeout=0.2) 1146 loop.call_later(0.1, fut.set_result, "ok") 1147 res = loop.run_until_complete(task) 1148 self.assertEqual(res, "ok") 1149 1150 def test_wait_for_cancellation_race_condition(self): 1151 async def inner(): 1152 with contextlib.suppress(asyncio.CancelledError): 1153 await asyncio.sleep(1) 1154 return 1 1155 1156 async def main(): 1157 result = await asyncio.wait_for(inner(), timeout=.01) 1158 self.assertEqual(result, 1) 1159 1160 asyncio.run(main()) 1161 1162 def test_wait_for_waits_for_task_cancellation(self): 1163 loop = asyncio.new_event_loop() 1164 self.addCleanup(loop.close) 1165 1166 task_done = False 1167 1168 async def foo(): 1169 async def inner(): 1170 nonlocal task_done 1171 try: 1172 await asyncio.sleep(0.2) 1173 except asyncio.CancelledError: 1174 await asyncio.sleep(_EPSILON) 1175 raise 1176 finally: 1177 task_done = True 1178 1179 inner_task = self.new_task(loop, inner()) 1180 1181 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1182 1183 with self.assertRaises(asyncio.TimeoutError) as cm: 1184 loop.run_until_complete(foo()) 1185 1186 self.assertTrue(task_done) 1187 chained = cm.exception.__context__ 1188 self.assertEqual(type(chained), asyncio.CancelledError) 1189 1190 def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): 1191 loop = asyncio.new_event_loop() 1192 self.addCleanup(loop.close) 1193 1194 task_done = False 1195 1196 async def foo(): 1197 async def inner(): 1198 nonlocal task_done 1199 try: 1200 await asyncio.sleep(10) 1201 except asyncio.CancelledError: 1202 await asyncio.sleep(_EPSILON) 1203 raise 1204 finally: 1205 task_done = True 1206 1207 inner_task = self.new_task(loop, inner()) 1208 await asyncio.sleep(_EPSILON) 1209 await asyncio.wait_for(inner_task, timeout=0) 1210 1211 with self.assertRaises(asyncio.TimeoutError) as cm: 1212 loop.run_until_complete(foo()) 1213 1214 self.assertTrue(task_done) 1215 chained = cm.exception.__context__ 1216 self.assertEqual(type(chained), asyncio.CancelledError) 1217 1218 def test_wait_for_reraises_exception_during_cancellation(self): 1219 loop = asyncio.new_event_loop() 1220 self.addCleanup(loop.close) 1221 1222 class FooException(Exception): 1223 pass 1224 1225 async def foo(): 1226 async def inner(): 1227 try: 1228 await asyncio.sleep(0.2) 1229 finally: 1230 raise FooException 1231 1232 inner_task = self.new_task(loop, inner()) 1233 1234 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1235 1236 with self.assertRaises(FooException): 1237 loop.run_until_complete(foo()) 1238 1239 def test_wait_for_self_cancellation(self): 1240 loop = asyncio.new_event_loop() 1241 self.addCleanup(loop.close) 1242 1243 async def foo(): 1244 async def inner(): 1245 try: 1246 await asyncio.sleep(0.3) 1247 except asyncio.CancelledError: 1248 try: 1249 await asyncio.sleep(0.3) 1250 except asyncio.CancelledError: 1251 await asyncio.sleep(0.3) 1252 1253 return 42 1254 1255 inner_task = self.new_task(loop, inner()) 1256 1257 wait = asyncio.wait_for(inner_task, timeout=0.1) 1258 1259 # Test that wait_for itself is properly cancellable 1260 # even when the initial task holds up the initial cancellation. 1261 task = self.new_task(loop, wait) 1262 await asyncio.sleep(0.2) 1263 task.cancel() 1264 1265 with self.assertRaises(asyncio.CancelledError): 1266 await task 1267 1268 self.assertEqual(await inner_task, 42) 1269 1270 loop.run_until_complete(foo()) 1271 1272 def test_wait(self): 1273 1274 def gen(): 1275 when = yield 1276 self.assertAlmostEqual(0.1, when) 1277 when = yield 0 1278 self.assertAlmostEqual(0.15, when) 1279 yield 0.15 1280 1281 loop = self.new_test_loop(gen) 1282 1283 a = self.new_task(loop, asyncio.sleep(0.1)) 1284 b = self.new_task(loop, asyncio.sleep(0.15)) 1285 1286 async def foo(): 1287 done, pending = await asyncio.wait([b, a]) 1288 self.assertEqual(done, set([a, b])) 1289 self.assertEqual(pending, set()) 1290 return 42 1291 1292 res = loop.run_until_complete(self.new_task(loop, foo())) 1293 self.assertEqual(res, 42) 1294 self.assertAlmostEqual(0.15, loop.time()) 1295 1296 # Doing it again should take no time and exercise a different path. 1297 res = loop.run_until_complete(self.new_task(loop, foo())) 1298 self.assertAlmostEqual(0.15, loop.time()) 1299 self.assertEqual(res, 42) 1300 1301 def test_wait_duplicate_coroutines(self): 1302 1303 with self.assertWarns(DeprecationWarning): 1304 @asyncio.coroutine 1305 def coro(s): 1306 return s 1307 c = coro('test') 1308 task = self.new_task( 1309 self.loop, 1310 asyncio.wait([c, c, coro('spam')])) 1311 1312 with self.assertWarns(DeprecationWarning): 1313 done, pending = self.loop.run_until_complete(task) 1314 1315 self.assertFalse(pending) 1316 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1317 1318 def test_wait_errors(self): 1319 self.assertRaises( 1320 ValueError, self.loop.run_until_complete, 1321 asyncio.wait(set())) 1322 1323 # -1 is an invalid return_when value 1324 sleep_coro = asyncio.sleep(10.0) 1325 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1326 self.assertRaises(ValueError, 1327 self.loop.run_until_complete, wait_coro) 1328 1329 sleep_coro.close() 1330 1331 def test_wait_first_completed(self): 1332 1333 def gen(): 1334 when = yield 1335 self.assertAlmostEqual(10.0, when) 1336 when = yield 0 1337 self.assertAlmostEqual(0.1, when) 1338 yield 0.1 1339 1340 loop = self.new_test_loop(gen) 1341 1342 a = self.new_task(loop, asyncio.sleep(10.0)) 1343 b = self.new_task(loop, asyncio.sleep(0.1)) 1344 task = self.new_task( 1345 loop, 1346 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1347 1348 done, pending = loop.run_until_complete(task) 1349 self.assertEqual({b}, done) 1350 self.assertEqual({a}, pending) 1351 self.assertFalse(a.done()) 1352 self.assertTrue(b.done()) 1353 self.assertIsNone(b.result()) 1354 self.assertAlmostEqual(0.1, loop.time()) 1355 1356 # move forward to close generator 1357 loop.advance_time(10) 1358 loop.run_until_complete(asyncio.wait([a, b])) 1359 1360 def test_wait_really_done(self): 1361 # there is possibility that some tasks in the pending list 1362 # became done but their callbacks haven't all been called yet 1363 1364 async def coro1(): 1365 await asyncio.sleep(0) 1366 1367 async def coro2(): 1368 await asyncio.sleep(0) 1369 await asyncio.sleep(0) 1370 1371 a = self.new_task(self.loop, coro1()) 1372 b = self.new_task(self.loop, coro2()) 1373 task = self.new_task( 1374 self.loop, 1375 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1376 1377 done, pending = self.loop.run_until_complete(task) 1378 self.assertEqual({a, b}, done) 1379 self.assertTrue(a.done()) 1380 self.assertIsNone(a.result()) 1381 self.assertTrue(b.done()) 1382 self.assertIsNone(b.result()) 1383 1384 def test_wait_first_exception(self): 1385 1386 def gen(): 1387 when = yield 1388 self.assertAlmostEqual(10.0, when) 1389 yield 0 1390 1391 loop = self.new_test_loop(gen) 1392 1393 # first_exception, task already has exception 1394 a = self.new_task(loop, asyncio.sleep(10.0)) 1395 1396 async def exc(): 1397 raise ZeroDivisionError('err') 1398 1399 b = self.new_task(loop, exc()) 1400 task = self.new_task( 1401 loop, 1402 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1403 1404 done, pending = loop.run_until_complete(task) 1405 self.assertEqual({b}, done) 1406 self.assertEqual({a}, pending) 1407 self.assertAlmostEqual(0, loop.time()) 1408 1409 # move forward to close generator 1410 loop.advance_time(10) 1411 loop.run_until_complete(asyncio.wait([a, b])) 1412 1413 def test_wait_first_exception_in_wait(self): 1414 1415 def gen(): 1416 when = yield 1417 self.assertAlmostEqual(10.0, when) 1418 when = yield 0 1419 self.assertAlmostEqual(0.01, when) 1420 yield 0.01 1421 1422 loop = self.new_test_loop(gen) 1423 1424 # first_exception, exception during waiting 1425 a = self.new_task(loop, asyncio.sleep(10.0)) 1426 1427 async def exc(): 1428 await asyncio.sleep(0.01) 1429 raise ZeroDivisionError('err') 1430 1431 b = self.new_task(loop, exc()) 1432 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1433 1434 done, pending = loop.run_until_complete(task) 1435 self.assertEqual({b}, done) 1436 self.assertEqual({a}, pending) 1437 self.assertAlmostEqual(0.01, loop.time()) 1438 1439 # move forward to close generator 1440 loop.advance_time(10) 1441 loop.run_until_complete(asyncio.wait([a, b])) 1442 1443 def test_wait_with_exception(self): 1444 1445 def gen(): 1446 when = yield 1447 self.assertAlmostEqual(0.1, when) 1448 when = yield 0 1449 self.assertAlmostEqual(0.15, when) 1450 yield 0.15 1451 1452 loop = self.new_test_loop(gen) 1453 1454 a = self.new_task(loop, asyncio.sleep(0.1)) 1455 1456 async def sleeper(): 1457 await asyncio.sleep(0.15) 1458 raise ZeroDivisionError('really') 1459 1460 b = self.new_task(loop, sleeper()) 1461 1462 async def foo(): 1463 done, pending = await asyncio.wait([b, a]) 1464 self.assertEqual(len(done), 2) 1465 self.assertEqual(pending, set()) 1466 errors = set(f for f in done if f.exception() is not None) 1467 self.assertEqual(len(errors), 1) 1468 1469 loop.run_until_complete(self.new_task(loop, foo())) 1470 self.assertAlmostEqual(0.15, loop.time()) 1471 1472 loop.run_until_complete(self.new_task(loop, foo())) 1473 self.assertAlmostEqual(0.15, loop.time()) 1474 1475 def test_wait_with_timeout(self): 1476 1477 def gen(): 1478 when = yield 1479 self.assertAlmostEqual(0.1, when) 1480 when = yield 0 1481 self.assertAlmostEqual(0.15, when) 1482 when = yield 0 1483 self.assertAlmostEqual(0.11, when) 1484 yield 0.11 1485 1486 loop = self.new_test_loop(gen) 1487 1488 a = self.new_task(loop, asyncio.sleep(0.1)) 1489 b = self.new_task(loop, asyncio.sleep(0.15)) 1490 1491 async def foo(): 1492 done, pending = await asyncio.wait([b, a], timeout=0.11) 1493 self.assertEqual(done, set([a])) 1494 self.assertEqual(pending, set([b])) 1495 1496 loop.run_until_complete(self.new_task(loop, foo())) 1497 self.assertAlmostEqual(0.11, loop.time()) 1498 1499 # move forward to close generator 1500 loop.advance_time(10) 1501 loop.run_until_complete(asyncio.wait([a, b])) 1502 1503 def test_wait_concurrent_complete(self): 1504 1505 def gen(): 1506 when = yield 1507 self.assertAlmostEqual(0.1, when) 1508 when = yield 0 1509 self.assertAlmostEqual(0.15, when) 1510 when = yield 0 1511 self.assertAlmostEqual(0.1, when) 1512 yield 0.1 1513 1514 loop = self.new_test_loop(gen) 1515 1516 a = self.new_task(loop, asyncio.sleep(0.1)) 1517 b = self.new_task(loop, asyncio.sleep(0.15)) 1518 1519 done, pending = loop.run_until_complete( 1520 asyncio.wait([b, a], timeout=0.1)) 1521 1522 self.assertEqual(done, set([a])) 1523 self.assertEqual(pending, set([b])) 1524 self.assertAlmostEqual(0.1, loop.time()) 1525 1526 # move forward to close generator 1527 loop.advance_time(10) 1528 loop.run_until_complete(asyncio.wait([a, b])) 1529 1530 def test_wait_with_iterator_of_tasks(self): 1531 1532 def gen(): 1533 when = yield 1534 self.assertAlmostEqual(0.1, when) 1535 when = yield 0 1536 self.assertAlmostEqual(0.15, when) 1537 yield 0.15 1538 1539 loop = self.new_test_loop(gen) 1540 1541 a = self.new_task(loop, asyncio.sleep(0.1)) 1542 b = self.new_task(loop, asyncio.sleep(0.15)) 1543 1544 async def foo(): 1545 done, pending = await asyncio.wait(iter([b, a])) 1546 self.assertEqual(done, set([a, b])) 1547 self.assertEqual(pending, set()) 1548 return 42 1549 1550 res = loop.run_until_complete(self.new_task(loop, foo())) 1551 self.assertEqual(res, 42) 1552 self.assertAlmostEqual(0.15, loop.time()) 1553 1554 def test_as_completed(self): 1555 1556 def gen(): 1557 yield 0 1558 yield 0 1559 yield 0.01 1560 yield 0 1561 1562 loop = self.new_test_loop(gen) 1563 # disable "slow callback" warning 1564 loop.slow_callback_duration = 1.0 1565 completed = set() 1566 time_shifted = False 1567 1568 with self.assertWarns(DeprecationWarning): 1569 @asyncio.coroutine 1570 def sleeper(dt, x): 1571 nonlocal time_shifted 1572 yield from asyncio.sleep(dt) 1573 completed.add(x) 1574 if not time_shifted and 'a' in completed and 'b' in completed: 1575 time_shifted = True 1576 loop.advance_time(0.14) 1577 return x 1578 1579 a = sleeper(0.01, 'a') 1580 b = sleeper(0.01, 'b') 1581 c = sleeper(0.15, 'c') 1582 1583 async def foo(): 1584 values = [] 1585 for f in asyncio.as_completed([b, c, a]): 1586 values.append(await f) 1587 return values 1588 1589 res = loop.run_until_complete(self.new_task(loop, foo())) 1590 self.assertAlmostEqual(0.15, loop.time()) 1591 self.assertTrue('a' in res[:2]) 1592 self.assertTrue('b' in res[:2]) 1593 self.assertEqual(res[2], 'c') 1594 1595 # Doing it again should take no time and exercise a different path. 1596 res = loop.run_until_complete(self.new_task(loop, foo())) 1597 self.assertAlmostEqual(0.15, loop.time()) 1598 1599 def test_as_completed_with_timeout(self): 1600 1601 def gen(): 1602 yield 1603 yield 0 1604 yield 0 1605 yield 0.1 1606 1607 loop = self.new_test_loop(gen) 1608 1609 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1610 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1611 1612 async def foo(): 1613 values = [] 1614 for f in asyncio.as_completed([a, b], timeout=0.12): 1615 if values: 1616 loop.advance_time(0.02) 1617 try: 1618 v = await f 1619 values.append((1, v)) 1620 except asyncio.TimeoutError as exc: 1621 values.append((2, exc)) 1622 return values 1623 1624 res = loop.run_until_complete(self.new_task(loop, foo())) 1625 self.assertEqual(len(res), 2, res) 1626 self.assertEqual(res[0], (1, 'a')) 1627 self.assertEqual(res[1][0], 2) 1628 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1629 self.assertAlmostEqual(0.12, loop.time()) 1630 1631 # move forward to close generator 1632 loop.advance_time(10) 1633 loop.run_until_complete(asyncio.wait([a, b])) 1634 1635 def test_as_completed_with_unused_timeout(self): 1636 1637 def gen(): 1638 yield 1639 yield 0 1640 yield 0.01 1641 1642 loop = self.new_test_loop(gen) 1643 1644 a = asyncio.sleep(0.01, 'a') 1645 1646 async def foo(): 1647 for f in asyncio.as_completed([a], timeout=1): 1648 v = await f 1649 self.assertEqual(v, 'a') 1650 1651 loop.run_until_complete(self.new_task(loop, foo())) 1652 1653 def test_as_completed_reverse_wait(self): 1654 1655 def gen(): 1656 yield 0 1657 yield 0.05 1658 yield 0 1659 1660 loop = self.new_test_loop(gen) 1661 1662 a = asyncio.sleep(0.05, 'a') 1663 b = asyncio.sleep(0.10, 'b') 1664 fs = {a, b} 1665 1666 async def test(): 1667 futs = list(asyncio.as_completed(fs)) 1668 self.assertEqual(len(futs), 2) 1669 1670 x = await futs[1] 1671 self.assertEqual(x, 'a') 1672 self.assertAlmostEqual(0.05, loop.time()) 1673 loop.advance_time(0.05) 1674 y = await futs[0] 1675 self.assertEqual(y, 'b') 1676 self.assertAlmostEqual(0.10, loop.time()) 1677 1678 loop.run_until_complete(test()) 1679 1680 def test_as_completed_concurrent(self): 1681 1682 def gen(): 1683 when = yield 1684 self.assertAlmostEqual(0.05, when) 1685 when = yield 0 1686 self.assertAlmostEqual(0.05, when) 1687 yield 0.05 1688 1689 a = asyncio.sleep(0.05, 'a') 1690 b = asyncio.sleep(0.05, 'b') 1691 fs = {a, b} 1692 1693 async def test(): 1694 futs = list(asyncio.as_completed(fs)) 1695 self.assertEqual(len(futs), 2) 1696 waiter = asyncio.wait(futs) 1697 # Deprecation from passing coros in futs to asyncio.wait() 1698 with self.assertWarns(DeprecationWarning) as cm: 1699 done, pending = await waiter 1700 self.assertEqual(cm.warnings[0].filename, __file__) 1701 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1702 1703 loop = self.new_test_loop(gen) 1704 loop.run_until_complete(test()) 1705 1706 def test_as_completed_duplicate_coroutines(self): 1707 1708 with self.assertWarns(DeprecationWarning): 1709 @asyncio.coroutine 1710 def coro(s): 1711 return s 1712 1713 with self.assertWarns(DeprecationWarning): 1714 @asyncio.coroutine 1715 def runner(): 1716 result = [] 1717 c = coro('ham') 1718 for f in asyncio.as_completed([c, c, coro('spam')]): 1719 result.append((yield from f)) 1720 return result 1721 1722 fut = self.new_task(self.loop, runner()) 1723 self.loop.run_until_complete(fut) 1724 result = fut.result() 1725 self.assertEqual(set(result), {'ham', 'spam'}) 1726 self.assertEqual(len(result), 2) 1727 1728 def test_as_completed_coroutine_without_loop(self): 1729 async def coro(): 1730 return 42 1731 1732 a = coro() 1733 self.addCleanup(a.close) 1734 1735 futs = asyncio.as_completed([a]) 1736 with self.assertWarns(DeprecationWarning) as cm: 1737 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 1738 list(futs) 1739 self.assertEqual(cm.warnings[0].filename, __file__) 1740 1741 def test_as_completed_coroutine_use_running_loop(self): 1742 loop = self.new_test_loop() 1743 1744 async def coro(): 1745 return 42 1746 1747 async def test(): 1748 futs = list(asyncio.as_completed([coro()])) 1749 self.assertEqual(len(futs), 1) 1750 self.assertEqual(await futs[0], 42) 1751 1752 loop.run_until_complete(test()) 1753 1754 def test_as_completed_coroutine_use_global_loop(self): 1755 # Deprecated in 3.10 1756 async def coro(): 1757 return 42 1758 1759 loop = self.new_test_loop() 1760 asyncio.set_event_loop(loop) 1761 self.addCleanup(asyncio.set_event_loop, None) 1762 futs = asyncio.as_completed([coro()]) 1763 with self.assertWarns(DeprecationWarning) as cm: 1764 futs = list(futs) 1765 self.assertEqual(cm.warnings[0].filename, __file__) 1766 self.assertEqual(len(futs), 1) 1767 self.assertEqual(loop.run_until_complete(futs[0]), 42) 1768 1769 def test_sleep(self): 1770 1771 def gen(): 1772 when = yield 1773 self.assertAlmostEqual(0.05, when) 1774 when = yield 0.05 1775 self.assertAlmostEqual(0.1, when) 1776 yield 0.05 1777 1778 loop = self.new_test_loop(gen) 1779 1780 async def sleeper(dt, arg): 1781 await asyncio.sleep(dt/2) 1782 res = await asyncio.sleep(dt/2, arg) 1783 return res 1784 1785 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1786 loop.run_until_complete(t) 1787 self.assertTrue(t.done()) 1788 self.assertEqual(t.result(), 'yeah') 1789 self.assertAlmostEqual(0.1, loop.time()) 1790 1791 def test_sleep_cancel(self): 1792 1793 def gen(): 1794 when = yield 1795 self.assertAlmostEqual(10.0, when) 1796 yield 0 1797 1798 loop = self.new_test_loop(gen) 1799 1800 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1801 1802 handle = None 1803 orig_call_later = loop.call_later 1804 1805 def call_later(delay, callback, *args): 1806 nonlocal handle 1807 handle = orig_call_later(delay, callback, *args) 1808 return handle 1809 1810 loop.call_later = call_later 1811 test_utils.run_briefly(loop) 1812 1813 self.assertFalse(handle._cancelled) 1814 1815 t.cancel() 1816 test_utils.run_briefly(loop) 1817 self.assertTrue(handle._cancelled) 1818 1819 def test_task_cancel_sleeping_task(self): 1820 1821 def gen(): 1822 when = yield 1823 self.assertAlmostEqual(0.1, when) 1824 when = yield 0 1825 self.assertAlmostEqual(5000, when) 1826 yield 0.1 1827 1828 loop = self.new_test_loop(gen) 1829 1830 async def sleep(dt): 1831 await asyncio.sleep(dt) 1832 1833 async def doit(): 1834 sleeper = self.new_task(loop, sleep(5000)) 1835 loop.call_later(0.1, sleeper.cancel) 1836 try: 1837 await sleeper 1838 except asyncio.CancelledError: 1839 return 'cancelled' 1840 else: 1841 return 'slept in' 1842 1843 doer = doit() 1844 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1845 self.assertAlmostEqual(0.1, loop.time()) 1846 1847 def test_task_cancel_waiter_future(self): 1848 fut = self.new_future(self.loop) 1849 1850 async def coro(): 1851 await fut 1852 1853 task = self.new_task(self.loop, coro()) 1854 test_utils.run_briefly(self.loop) 1855 self.assertIs(task._fut_waiter, fut) 1856 1857 task.cancel() 1858 test_utils.run_briefly(self.loop) 1859 self.assertRaises( 1860 asyncio.CancelledError, self.loop.run_until_complete, task) 1861 self.assertIsNone(task._fut_waiter) 1862 self.assertTrue(fut.cancelled()) 1863 1864 def test_task_set_methods(self): 1865 async def notmuch(): 1866 return 'ko' 1867 1868 gen = notmuch() 1869 task = self.new_task(self.loop, gen) 1870 1871 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1872 task.set_result('ok') 1873 1874 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1875 task.set_exception(ValueError()) 1876 1877 self.assertEqual( 1878 self.loop.run_until_complete(task), 1879 'ko') 1880 1881 def test_step_result(self): 1882 with self.assertWarns(DeprecationWarning): 1883 @asyncio.coroutine 1884 def notmuch(): 1885 yield None 1886 yield 1 1887 return 'ko' 1888 1889 self.assertRaises( 1890 RuntimeError, self.loop.run_until_complete, notmuch()) 1891 1892 def test_step_result_future(self): 1893 # If coroutine returns future, task waits on this future. 1894 1895 class Fut(asyncio.Future): 1896 def __init__(self, *args, **kwds): 1897 self.cb_added = False 1898 super().__init__(*args, **kwds) 1899 1900 def add_done_callback(self, *args, **kwargs): 1901 self.cb_added = True 1902 super().add_done_callback(*args, **kwargs) 1903 1904 fut = Fut(loop=self.loop) 1905 result = None 1906 1907 async def wait_for_future(): 1908 nonlocal result 1909 result = await fut 1910 1911 t = self.new_task(self.loop, wait_for_future()) 1912 test_utils.run_briefly(self.loop) 1913 self.assertTrue(fut.cb_added) 1914 1915 res = object() 1916 fut.set_result(res) 1917 test_utils.run_briefly(self.loop) 1918 self.assertIs(res, result) 1919 self.assertTrue(t.done()) 1920 self.assertIsNone(t.result()) 1921 1922 def test_baseexception_during_cancel(self): 1923 1924 def gen(): 1925 when = yield 1926 self.assertAlmostEqual(10.0, when) 1927 yield 0 1928 1929 loop = self.new_test_loop(gen) 1930 1931 async def sleeper(): 1932 await asyncio.sleep(10) 1933 1934 base_exc = SystemExit() 1935 1936 async def notmutch(): 1937 try: 1938 await sleeper() 1939 except asyncio.CancelledError: 1940 raise base_exc 1941 1942 task = self.new_task(loop, notmutch()) 1943 test_utils.run_briefly(loop) 1944 1945 task.cancel() 1946 self.assertFalse(task.done()) 1947 1948 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1949 1950 self.assertTrue(task.done()) 1951 self.assertFalse(task.cancelled()) 1952 self.assertIs(task.exception(), base_exc) 1953 1954 def test_iscoroutinefunction(self): 1955 def fn(): 1956 pass 1957 1958 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1959 1960 def fn1(): 1961 yield 1962 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1963 1964 with self.assertWarns(DeprecationWarning): 1965 @asyncio.coroutine 1966 def fn2(): 1967 yield 1968 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1969 1970 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1971 1972 def test_yield_vs_yield_from(self): 1973 fut = self.new_future(self.loop) 1974 1975 with self.assertWarns(DeprecationWarning): 1976 @asyncio.coroutine 1977 def wait_for_future(): 1978 yield fut 1979 1980 task = wait_for_future() 1981 with self.assertRaises(RuntimeError): 1982 self.loop.run_until_complete(task) 1983 1984 self.assertFalse(fut.done()) 1985 1986 def test_yield_vs_yield_from_generator(self): 1987 with self.assertWarns(DeprecationWarning): 1988 @asyncio.coroutine 1989 def coro(): 1990 yield 1991 1992 with self.assertWarns(DeprecationWarning): 1993 @asyncio.coroutine 1994 def wait_for_future(): 1995 gen = coro() 1996 try: 1997 yield gen 1998 finally: 1999 gen.close() 2000 2001 task = wait_for_future() 2002 self.assertRaises( 2003 RuntimeError, 2004 self.loop.run_until_complete, task) 2005 2006 def test_coroutine_non_gen_function(self): 2007 with self.assertWarns(DeprecationWarning): 2008 @asyncio.coroutine 2009 def func(): 2010 return 'test' 2011 2012 self.assertTrue(asyncio.iscoroutinefunction(func)) 2013 2014 coro = func() 2015 self.assertTrue(asyncio.iscoroutine(coro)) 2016 2017 res = self.loop.run_until_complete(coro) 2018 self.assertEqual(res, 'test') 2019 2020 def test_coroutine_non_gen_function_return_future(self): 2021 fut = self.new_future(self.loop) 2022 2023 with self.assertWarns(DeprecationWarning): 2024 @asyncio.coroutine 2025 def func(): 2026 return fut 2027 2028 async def coro(): 2029 fut.set_result('test') 2030 2031 t1 = self.new_task(self.loop, func()) 2032 t2 = self.new_task(self.loop, coro()) 2033 res = self.loop.run_until_complete(t1) 2034 self.assertEqual(res, 'test') 2035 self.assertIsNone(t2.result()) 2036 2037 def test_current_task(self): 2038 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2039 2040 async def coro(loop): 2041 self.assertIs(asyncio.current_task(), task) 2042 2043 self.assertIs(asyncio.current_task(None), task) 2044 self.assertIs(asyncio.current_task(), task) 2045 2046 task = self.new_task(self.loop, coro(self.loop)) 2047 self.loop.run_until_complete(task) 2048 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2049 2050 def test_current_task_with_interleaving_tasks(self): 2051 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2052 2053 fut1 = self.new_future(self.loop) 2054 fut2 = self.new_future(self.loop) 2055 2056 async def coro1(loop): 2057 self.assertTrue(asyncio.current_task() is task1) 2058 await fut1 2059 self.assertTrue(asyncio.current_task() is task1) 2060 fut2.set_result(True) 2061 2062 async def coro2(loop): 2063 self.assertTrue(asyncio.current_task() is task2) 2064 fut1.set_result(True) 2065 await fut2 2066 self.assertTrue(asyncio.current_task() is task2) 2067 2068 task1 = self.new_task(self.loop, coro1(self.loop)) 2069 task2 = self.new_task(self.loop, coro2(self.loop)) 2070 2071 self.loop.run_until_complete(asyncio.wait((task1, task2))) 2072 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2073 2074 # Some thorough tests for cancellation propagation through 2075 # coroutines, tasks and wait(). 2076 2077 def test_yield_future_passes_cancel(self): 2078 # Cancelling outer() cancels inner() cancels waiter. 2079 proof = 0 2080 waiter = self.new_future(self.loop) 2081 2082 async def inner(): 2083 nonlocal proof 2084 try: 2085 await waiter 2086 except asyncio.CancelledError: 2087 proof += 1 2088 raise 2089 else: 2090 self.fail('got past sleep() in inner()') 2091 2092 async def outer(): 2093 nonlocal proof 2094 try: 2095 await inner() 2096 except asyncio.CancelledError: 2097 proof += 100 # Expect this path. 2098 else: 2099 proof += 10 2100 2101 f = asyncio.ensure_future(outer(), loop=self.loop) 2102 test_utils.run_briefly(self.loop) 2103 f.cancel() 2104 self.loop.run_until_complete(f) 2105 self.assertEqual(proof, 101) 2106 self.assertTrue(waiter.cancelled()) 2107 2108 def test_yield_wait_does_not_shield_cancel(self): 2109 # Cancelling outer() makes wait() return early, leaves inner() 2110 # running. 2111 proof = 0 2112 waiter = self.new_future(self.loop) 2113 2114 async def inner(): 2115 nonlocal proof 2116 await waiter 2117 proof += 1 2118 2119 async def outer(): 2120 nonlocal proof 2121 with self.assertWarns(DeprecationWarning): 2122 d, p = await asyncio.wait([inner()]) 2123 proof += 100 2124 2125 f = asyncio.ensure_future(outer(), loop=self.loop) 2126 test_utils.run_briefly(self.loop) 2127 f.cancel() 2128 self.assertRaises( 2129 asyncio.CancelledError, self.loop.run_until_complete, f) 2130 waiter.set_result(None) 2131 test_utils.run_briefly(self.loop) 2132 self.assertEqual(proof, 1) 2133 2134 def test_shield_result(self): 2135 inner = self.new_future(self.loop) 2136 outer = asyncio.shield(inner) 2137 inner.set_result(42) 2138 res = self.loop.run_until_complete(outer) 2139 self.assertEqual(res, 42) 2140 2141 def test_shield_exception(self): 2142 inner = self.new_future(self.loop) 2143 outer = asyncio.shield(inner) 2144 test_utils.run_briefly(self.loop) 2145 exc = RuntimeError('expected') 2146 inner.set_exception(exc) 2147 test_utils.run_briefly(self.loop) 2148 self.assertIs(outer.exception(), exc) 2149 2150 def test_shield_cancel_inner(self): 2151 inner = self.new_future(self.loop) 2152 outer = asyncio.shield(inner) 2153 test_utils.run_briefly(self.loop) 2154 inner.cancel() 2155 test_utils.run_briefly(self.loop) 2156 self.assertTrue(outer.cancelled()) 2157 2158 def test_shield_cancel_outer(self): 2159 inner = self.new_future(self.loop) 2160 outer = asyncio.shield(inner) 2161 test_utils.run_briefly(self.loop) 2162 outer.cancel() 2163 test_utils.run_briefly(self.loop) 2164 self.assertTrue(outer.cancelled()) 2165 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 2166 2167 def test_shield_shortcut(self): 2168 fut = self.new_future(self.loop) 2169 fut.set_result(42) 2170 res = self.loop.run_until_complete(asyncio.shield(fut)) 2171 self.assertEqual(res, 42) 2172 2173 def test_shield_effect(self): 2174 # Cancelling outer() does not affect inner(). 2175 proof = 0 2176 waiter = self.new_future(self.loop) 2177 2178 async def inner(): 2179 nonlocal proof 2180 await waiter 2181 proof += 1 2182 2183 async def outer(): 2184 nonlocal proof 2185 await asyncio.shield(inner()) 2186 proof += 100 2187 2188 f = asyncio.ensure_future(outer(), loop=self.loop) 2189 test_utils.run_briefly(self.loop) 2190 f.cancel() 2191 with self.assertRaises(asyncio.CancelledError): 2192 self.loop.run_until_complete(f) 2193 waiter.set_result(None) 2194 test_utils.run_briefly(self.loop) 2195 self.assertEqual(proof, 1) 2196 2197 def test_shield_gather(self): 2198 child1 = self.new_future(self.loop) 2199 child2 = self.new_future(self.loop) 2200 parent = asyncio.gather(child1, child2) 2201 outer = asyncio.shield(parent) 2202 test_utils.run_briefly(self.loop) 2203 outer.cancel() 2204 test_utils.run_briefly(self.loop) 2205 self.assertTrue(outer.cancelled()) 2206 child1.set_result(1) 2207 child2.set_result(2) 2208 test_utils.run_briefly(self.loop) 2209 self.assertEqual(parent.result(), [1, 2]) 2210 2211 def test_gather_shield(self): 2212 child1 = self.new_future(self.loop) 2213 child2 = self.new_future(self.loop) 2214 inner1 = asyncio.shield(child1) 2215 inner2 = asyncio.shield(child2) 2216 parent = asyncio.gather(inner1, inner2) 2217 test_utils.run_briefly(self.loop) 2218 parent.cancel() 2219 # This should cancel inner1 and inner2 but bot child1 and child2. 2220 test_utils.run_briefly(self.loop) 2221 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 2222 self.assertTrue(inner1.cancelled()) 2223 self.assertTrue(inner2.cancelled()) 2224 child1.set_result(1) 2225 child2.set_result(2) 2226 test_utils.run_briefly(self.loop) 2227 2228 def test_shield_coroutine_without_loop(self): 2229 async def coro(): 2230 return 42 2231 2232 inner = coro() 2233 self.addCleanup(inner.close) 2234 with self.assertWarns(DeprecationWarning) as cm: 2235 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 2236 asyncio.shield(inner) 2237 self.assertEqual(cm.warnings[0].filename, __file__) 2238 2239 def test_shield_coroutine_use_running_loop(self): 2240 async def coro(): 2241 return 42 2242 2243 async def test(): 2244 return asyncio.shield(coro()) 2245 outer = self.loop.run_until_complete(test()) 2246 self.assertEqual(outer._loop, self.loop) 2247 res = self.loop.run_until_complete(outer) 2248 self.assertEqual(res, 42) 2249 2250 def test_shield_coroutine_use_global_loop(self): 2251 # Deprecated in 3.10 2252 async def coro(): 2253 return 42 2254 2255 asyncio.set_event_loop(self.loop) 2256 self.addCleanup(asyncio.set_event_loop, None) 2257 with self.assertWarns(DeprecationWarning) as cm: 2258 outer = asyncio.shield(coro()) 2259 self.assertEqual(cm.warnings[0].filename, __file__) 2260 self.assertEqual(outer._loop, self.loop) 2261 res = self.loop.run_until_complete(outer) 2262 self.assertEqual(res, 42) 2263 2264 def test_as_completed_invalid_args(self): 2265 fut = self.new_future(self.loop) 2266 2267 # as_completed() expects a list of futures, not a future instance 2268 self.assertRaises(TypeError, self.loop.run_until_complete, 2269 asyncio.as_completed(fut)) 2270 coro = coroutine_function() 2271 self.assertRaises(TypeError, self.loop.run_until_complete, 2272 asyncio.as_completed(coro)) 2273 coro.close() 2274 2275 def test_wait_invalid_args(self): 2276 fut = self.new_future(self.loop) 2277 2278 # wait() expects a list of futures, not a future instance 2279 self.assertRaises(TypeError, self.loop.run_until_complete, 2280 asyncio.wait(fut)) 2281 coro = coroutine_function() 2282 self.assertRaises(TypeError, self.loop.run_until_complete, 2283 asyncio.wait(coro)) 2284 coro.close() 2285 2286 # wait() expects at least a future 2287 self.assertRaises(ValueError, self.loop.run_until_complete, 2288 asyncio.wait([])) 2289 2290 def test_corowrapper_mocks_generator(self): 2291 2292 def check(): 2293 # A function that asserts various things. 2294 # Called twice, with different debug flag values. 2295 2296 with self.assertWarns(DeprecationWarning): 2297 @asyncio.coroutine 2298 def coro(): 2299 # The actual coroutine. 2300 self.assertTrue(gen.gi_running) 2301 yield from fut 2302 2303 # A completed Future used to run the coroutine. 2304 fut = self.new_future(self.loop) 2305 fut.set_result(None) 2306 2307 # Call the coroutine. 2308 gen = coro() 2309 2310 # Check some properties. 2311 self.assertTrue(asyncio.iscoroutine(gen)) 2312 self.assertIsInstance(gen.gi_frame, types.FrameType) 2313 self.assertFalse(gen.gi_running) 2314 self.assertIsInstance(gen.gi_code, types.CodeType) 2315 2316 # Run it. 2317 self.loop.run_until_complete(gen) 2318 2319 # The frame should have changed. 2320 self.assertIsNone(gen.gi_frame) 2321 2322 # Test with debug flag cleared. 2323 with set_coroutine_debug(False): 2324 check() 2325 2326 # Test with debug flag set. 2327 with set_coroutine_debug(True): 2328 check() 2329 2330 def test_yield_from_corowrapper(self): 2331 with set_coroutine_debug(True): 2332 with self.assertWarns(DeprecationWarning): 2333 @asyncio.coroutine 2334 def t1(): 2335 return (yield from t2()) 2336 2337 with self.assertWarns(DeprecationWarning): 2338 @asyncio.coroutine 2339 def t2(): 2340 f = self.new_future(self.loop) 2341 self.new_task(self.loop, t3(f)) 2342 return (yield from f) 2343 2344 with self.assertWarns(DeprecationWarning): 2345 @asyncio.coroutine 2346 def t3(f): 2347 f.set_result((1, 2, 3)) 2348 2349 task = self.new_task(self.loop, t1()) 2350 val = self.loop.run_until_complete(task) 2351 self.assertEqual(val, (1, 2, 3)) 2352 2353 def test_yield_from_corowrapper_send(self): 2354 def foo(): 2355 a = yield 2356 return a 2357 2358 def call(arg): 2359 cw = asyncio.coroutines.CoroWrapper(foo()) 2360 cw.send(None) 2361 try: 2362 cw.send(arg) 2363 except StopIteration as ex: 2364 return ex.args[0] 2365 else: 2366 raise AssertionError('StopIteration was expected') 2367 2368 self.assertEqual(call((1, 2)), (1, 2)) 2369 self.assertEqual(call('spam'), 'spam') 2370 2371 def test_corowrapper_weakref(self): 2372 wd = weakref.WeakValueDictionary() 2373 def foo(): yield from [] 2374 cw = asyncio.coroutines.CoroWrapper(foo()) 2375 wd['cw'] = cw # Would fail without __weakref__ slot. 2376 cw.gen = None # Suppress warning from __del__. 2377 2378 def test_corowrapper_throw(self): 2379 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 2380 def foo(): 2381 value = None 2382 while True: 2383 try: 2384 value = yield value 2385 except Exception as e: 2386 value = e 2387 2388 exception = Exception("foo") 2389 cw = asyncio.coroutines.CoroWrapper(foo()) 2390 cw.send(None) 2391 self.assertIs(exception, cw.throw(exception)) 2392 2393 cw = asyncio.coroutines.CoroWrapper(foo()) 2394 cw.send(None) 2395 self.assertIs(exception, cw.throw(Exception, exception)) 2396 2397 cw = asyncio.coroutines.CoroWrapper(foo()) 2398 cw.send(None) 2399 exception = cw.throw(Exception, "foo") 2400 self.assertIsInstance(exception, Exception) 2401 self.assertEqual(exception.args, ("foo", )) 2402 2403 cw = asyncio.coroutines.CoroWrapper(foo()) 2404 cw.send(None) 2405 exception = cw.throw(Exception, "foo", None) 2406 self.assertIsInstance(exception, Exception) 2407 self.assertEqual(exception.args, ("foo", )) 2408 2409 def test_log_destroyed_pending_task(self): 2410 Task = self.__class__.Task 2411 2412 with self.assertWarns(DeprecationWarning): 2413 @asyncio.coroutine 2414 def kill_me(loop): 2415 future = self.new_future(loop) 2416 yield from future 2417 # at this point, the only reference to kill_me() task is 2418 # the Task._wakeup() method in future._callbacks 2419 raise Exception("code never reached") 2420 2421 mock_handler = mock.Mock() 2422 self.loop.set_debug(True) 2423 self.loop.set_exception_handler(mock_handler) 2424 2425 # schedule the task 2426 coro = kill_me(self.loop) 2427 task = asyncio.ensure_future(coro, loop=self.loop) 2428 2429 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2430 2431 asyncio.set_event_loop(None) 2432 2433 # execute the task so it waits for future 2434 self.loop._run_once() 2435 self.assertEqual(len(self.loop._ready), 0) 2436 2437 # remove the future used in kill_me(), and references to the task 2438 del coro.gi_frame.f_locals['future'] 2439 coro = None 2440 source_traceback = task._source_traceback 2441 task = None 2442 2443 # no more reference to kill_me() task: the task is destroyed by the GC 2444 support.gc_collect() 2445 2446 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2447 2448 mock_handler.assert_called_with(self.loop, { 2449 'message': 'Task was destroyed but it is pending!', 2450 'task': mock.ANY, 2451 'source_traceback': source_traceback, 2452 }) 2453 mock_handler.reset_mock() 2454 2455 @mock.patch('asyncio.base_events.logger') 2456 def test_tb_logger_not_called_after_cancel(self, m_log): 2457 loop = asyncio.new_event_loop() 2458 self.set_event_loop(loop) 2459 2460 async def coro(): 2461 raise TypeError 2462 2463 async def runner(): 2464 task = self.new_task(loop, coro()) 2465 await asyncio.sleep(0.05) 2466 task.cancel() 2467 task = None 2468 2469 loop.run_until_complete(runner()) 2470 self.assertFalse(m_log.error.called) 2471 2472 @mock.patch('asyncio.coroutines.logger') 2473 def test_coroutine_never_yielded(self, m_log): 2474 with set_coroutine_debug(True): 2475 with self.assertWarns(DeprecationWarning): 2476 @asyncio.coroutine 2477 def coro_noop(): 2478 pass 2479 2480 tb_filename = __file__ 2481 tb_lineno = sys._getframe().f_lineno + 2 2482 # create a coroutine object but don't use it 2483 coro_noop() 2484 support.gc_collect() 2485 2486 self.assertTrue(m_log.error.called) 2487 message = m_log.error.call_args[0][0] 2488 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2489 2490 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2491 r'was never yielded from\n' 2492 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2493 r'.*\n' 2494 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2495 r' coro_noop\(\)$' 2496 % (re.escape(coro_noop.__qualname__), 2497 re.escape(func_filename), func_lineno, 2498 re.escape(tb_filename), tb_lineno)) 2499 2500 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2501 2502 def test_return_coroutine_from_coroutine(self): 2503 """Return of @asyncio.coroutine()-wrapped function generator object 2504 from @asyncio.coroutine()-wrapped function should have same effect as 2505 returning generator object or Future.""" 2506 def check(): 2507 with self.assertWarns(DeprecationWarning): 2508 @asyncio.coroutine 2509 def outer_coro(): 2510 with self.assertWarns(DeprecationWarning): 2511 @asyncio.coroutine 2512 def inner_coro(): 2513 return 1 2514 2515 return inner_coro() 2516 2517 result = self.loop.run_until_complete(outer_coro()) 2518 self.assertEqual(result, 1) 2519 2520 # Test with debug flag cleared. 2521 with set_coroutine_debug(False): 2522 check() 2523 2524 # Test with debug flag set. 2525 with set_coroutine_debug(True): 2526 check() 2527 2528 def test_task_source_traceback(self): 2529 self.loop.set_debug(True) 2530 2531 task = self.new_task(self.loop, coroutine_function()) 2532 lineno = sys._getframe().f_lineno - 1 2533 self.assertIsInstance(task._source_traceback, list) 2534 self.assertEqual(task._source_traceback[-2][:3], 2535 (__file__, 2536 lineno, 2537 'test_task_source_traceback')) 2538 self.loop.run_until_complete(task) 2539 2540 def _test_cancel_wait_for(self, timeout): 2541 loop = asyncio.new_event_loop() 2542 self.addCleanup(loop.close) 2543 2544 async def blocking_coroutine(): 2545 fut = self.new_future(loop) 2546 # Block: fut result is never set 2547 await fut 2548 2549 task = loop.create_task(blocking_coroutine()) 2550 2551 wait = loop.create_task(asyncio.wait_for(task, timeout)) 2552 loop.call_soon(wait.cancel) 2553 2554 self.assertRaises(asyncio.CancelledError, 2555 loop.run_until_complete, wait) 2556 2557 # Python issue #23219: cancelling the wait must also cancel the task 2558 self.assertTrue(task.cancelled()) 2559 2560 def test_cancel_blocking_wait_for(self): 2561 self._test_cancel_wait_for(None) 2562 2563 def test_cancel_wait_for(self): 2564 self._test_cancel_wait_for(60.0) 2565 2566 def test_cancel_gather_1(self): 2567 """Ensure that a gathering future refuses to be cancelled once all 2568 children are done""" 2569 loop = asyncio.new_event_loop() 2570 self.addCleanup(loop.close) 2571 2572 fut = self.new_future(loop) 2573 async def create(): 2574 # The indirection fut->child_coro is needed since otherwise the 2575 # gathering task is done at the same time as the child future 2576 def child_coro(): 2577 return (yield from fut) 2578 gather_future = asyncio.gather(child_coro()) 2579 return asyncio.ensure_future(gather_future) 2580 gather_task = loop.run_until_complete(create()) 2581 2582 cancel_result = None 2583 def cancelling_callback(_): 2584 nonlocal cancel_result 2585 cancel_result = gather_task.cancel() 2586 fut.add_done_callback(cancelling_callback) 2587 2588 fut.set_result(42) # calls the cancelling_callback after fut is done() 2589 2590 # At this point the task should complete. 2591 loop.run_until_complete(gather_task) 2592 2593 # Python issue #26923: asyncio.gather drops cancellation 2594 self.assertEqual(cancel_result, False) 2595 self.assertFalse(gather_task.cancelled()) 2596 self.assertEqual(gather_task.result(), [42]) 2597 2598 def test_cancel_gather_2(self): 2599 cases = [ 2600 ((), ()), 2601 ((None,), ()), 2602 (('my message',), ('my message',)), 2603 # Non-string values should roundtrip. 2604 ((5,), (5,)), 2605 ] 2606 for cancel_args, expected_args in cases: 2607 with self.subTest(cancel_args=cancel_args): 2608 loop = asyncio.new_event_loop() 2609 self.addCleanup(loop.close) 2610 2611 async def test(): 2612 time = 0 2613 while True: 2614 time += 0.05 2615 await asyncio.gather(asyncio.sleep(0.05), 2616 return_exceptions=True) 2617 if time > 1: 2618 return 2619 2620 async def main(): 2621 qwe = self.new_task(loop, test()) 2622 await asyncio.sleep(0.2) 2623 qwe.cancel(*cancel_args) 2624 await qwe 2625 2626 try: 2627 loop.run_until_complete(main()) 2628 except asyncio.CancelledError as exc: 2629 self.assertEqual(exc.args, ()) 2630 exc_type, exc_args, depth = get_innermost_context(exc) 2631 self.assertEqual((exc_type, exc_args), 2632 (asyncio.CancelledError, expected_args)) 2633 # The exact traceback seems to vary in CI. 2634 self.assertIn(depth, (2, 3)) 2635 else: 2636 self.fail('gather did not propagate the cancellation ' 2637 'request') 2638 2639 def test_exception_traceback(self): 2640 # See http://bugs.python.org/issue28843 2641 2642 async def foo(): 2643 1 / 0 2644 2645 async def main(): 2646 task = self.new_task(self.loop, foo()) 2647 await asyncio.sleep(0) # skip one loop iteration 2648 self.assertIsNotNone(task.exception().__traceback__) 2649 2650 self.loop.run_until_complete(main()) 2651 2652 @mock.patch('asyncio.base_events.logger') 2653 def test_error_in_call_soon(self, m_log): 2654 def call_soon(callback, *args, **kwargs): 2655 raise ValueError 2656 self.loop.call_soon = call_soon 2657 2658 with self.assertWarns(DeprecationWarning): 2659 @asyncio.coroutine 2660 def coro(): 2661 pass 2662 2663 self.assertFalse(m_log.error.called) 2664 2665 with self.assertRaises(ValueError): 2666 gen = coro() 2667 try: 2668 self.new_task(self.loop, gen) 2669 finally: 2670 gen.close() 2671 gc.collect() # For PyPy or other GCs. 2672 2673 self.assertTrue(m_log.error.called) 2674 message = m_log.error.call_args[0][0] 2675 self.assertIn('Task was destroyed but it is pending', message) 2676 2677 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2678 2679 def test_create_task_with_noncoroutine(self): 2680 with self.assertRaisesRegex(TypeError, 2681 "a coroutine was expected, got 123"): 2682 self.new_task(self.loop, 123) 2683 2684 # test it for the second time to ensure that caching 2685 # in asyncio.iscoroutine() doesn't break things. 2686 with self.assertRaisesRegex(TypeError, 2687 "a coroutine was expected, got 123"): 2688 self.new_task(self.loop, 123) 2689 2690 def test_create_task_with_oldstyle_coroutine(self): 2691 2692 with self.assertWarns(DeprecationWarning): 2693 @asyncio.coroutine 2694 def coro(): 2695 pass 2696 2697 task = self.new_task(self.loop, coro()) 2698 self.assertIsInstance(task, self.Task) 2699 self.loop.run_until_complete(task) 2700 2701 # test it for the second time to ensure that caching 2702 # in asyncio.iscoroutine() doesn't break things. 2703 task = self.new_task(self.loop, coro()) 2704 self.assertIsInstance(task, self.Task) 2705 self.loop.run_until_complete(task) 2706 2707 def test_create_task_with_async_function(self): 2708 2709 async def coro(): 2710 pass 2711 2712 task = self.new_task(self.loop, coro()) 2713 self.assertIsInstance(task, self.Task) 2714 self.loop.run_until_complete(task) 2715 2716 # test it for the second time to ensure that caching 2717 # in asyncio.iscoroutine() doesn't break things. 2718 task = self.new_task(self.loop, coro()) 2719 self.assertIsInstance(task, self.Task) 2720 self.loop.run_until_complete(task) 2721 2722 def test_create_task_with_asynclike_function(self): 2723 task = self.new_task(self.loop, CoroLikeObject()) 2724 self.assertIsInstance(task, self.Task) 2725 self.assertEqual(self.loop.run_until_complete(task), 42) 2726 2727 # test it for the second time to ensure that caching 2728 # in asyncio.iscoroutine() doesn't break things. 2729 task = self.new_task(self.loop, CoroLikeObject()) 2730 self.assertIsInstance(task, self.Task) 2731 self.assertEqual(self.loop.run_until_complete(task), 42) 2732 2733 def test_bare_create_task(self): 2734 2735 async def inner(): 2736 return 1 2737 2738 async def coro(): 2739 task = asyncio.create_task(inner()) 2740 self.assertIsInstance(task, self.Task) 2741 ret = await task 2742 self.assertEqual(1, ret) 2743 2744 self.loop.run_until_complete(coro()) 2745 2746 def test_bare_create_named_task(self): 2747 2748 async def coro_noop(): 2749 pass 2750 2751 async def coro(): 2752 task = asyncio.create_task(coro_noop(), name='No-op') 2753 self.assertEqual(task.get_name(), 'No-op') 2754 await task 2755 2756 self.loop.run_until_complete(coro()) 2757 2758 def test_context_1(self): 2759 cvar = contextvars.ContextVar('cvar', default='nope') 2760 2761 async def sub(): 2762 await asyncio.sleep(0.01) 2763 self.assertEqual(cvar.get(), 'nope') 2764 cvar.set('something else') 2765 2766 async def main(): 2767 self.assertEqual(cvar.get(), 'nope') 2768 subtask = self.new_task(loop, sub()) 2769 cvar.set('yes') 2770 self.assertEqual(cvar.get(), 'yes') 2771 await subtask 2772 self.assertEqual(cvar.get(), 'yes') 2773 2774 loop = asyncio.new_event_loop() 2775 try: 2776 task = self.new_task(loop, main()) 2777 loop.run_until_complete(task) 2778 finally: 2779 loop.close() 2780 2781 def test_context_2(self): 2782 cvar = contextvars.ContextVar('cvar', default='nope') 2783 2784 async def main(): 2785 def fut_on_done(fut): 2786 # This change must not pollute the context 2787 # of the "main()" task. 2788 cvar.set('something else') 2789 2790 self.assertEqual(cvar.get(), 'nope') 2791 2792 for j in range(2): 2793 fut = self.new_future(loop) 2794 fut.add_done_callback(fut_on_done) 2795 cvar.set(f'yes{j}') 2796 loop.call_soon(fut.set_result, None) 2797 await fut 2798 self.assertEqual(cvar.get(), f'yes{j}') 2799 2800 for i in range(3): 2801 # Test that task passed its context to add_done_callback: 2802 cvar.set(f'yes{i}-{j}') 2803 await asyncio.sleep(0.001) 2804 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2805 2806 loop = asyncio.new_event_loop() 2807 try: 2808 task = self.new_task(loop, main()) 2809 loop.run_until_complete(task) 2810 finally: 2811 loop.close() 2812 2813 self.assertEqual(cvar.get(), 'nope') 2814 2815 def test_context_3(self): 2816 # Run 100 Tasks in parallel, each modifying cvar. 2817 2818 cvar = contextvars.ContextVar('cvar', default=-1) 2819 2820 async def sub(num): 2821 for i in range(10): 2822 cvar.set(num + i) 2823 await asyncio.sleep(random.uniform(0.001, 0.05)) 2824 self.assertEqual(cvar.get(), num + i) 2825 2826 async def main(): 2827 tasks = [] 2828 for i in range(100): 2829 task = loop.create_task(sub(random.randint(0, 10))) 2830 tasks.append(task) 2831 2832 await asyncio.gather(*tasks) 2833 2834 loop = asyncio.new_event_loop() 2835 try: 2836 loop.run_until_complete(main()) 2837 finally: 2838 loop.close() 2839 2840 self.assertEqual(cvar.get(), -1) 2841 2842 def test_get_coro(self): 2843 loop = asyncio.new_event_loop() 2844 coro = coroutine_function() 2845 try: 2846 task = self.new_task(loop, coro) 2847 loop.run_until_complete(task) 2848 self.assertIs(task.get_coro(), coro) 2849 finally: 2850 loop.close() 2851 2852 2853def add_subclass_tests(cls): 2854 BaseTask = cls.Task 2855 BaseFuture = cls.Future 2856 2857 if BaseTask is None or BaseFuture is None: 2858 return cls 2859 2860 class CommonFuture: 2861 def __init__(self, *args, **kwargs): 2862 self.calls = collections.defaultdict(lambda: 0) 2863 super().__init__(*args, **kwargs) 2864 2865 def add_done_callback(self, *args, **kwargs): 2866 self.calls['add_done_callback'] += 1 2867 return super().add_done_callback(*args, **kwargs) 2868 2869 class Task(CommonFuture, BaseTask): 2870 pass 2871 2872 class Future(CommonFuture, BaseFuture): 2873 pass 2874 2875 def test_subclasses_ctask_cfuture(self): 2876 fut = self.Future(loop=self.loop) 2877 2878 async def func(): 2879 self.loop.call_soon(lambda: fut.set_result('spam')) 2880 return await fut 2881 2882 task = self.Task(func(), loop=self.loop) 2883 2884 result = self.loop.run_until_complete(task) 2885 2886 self.assertEqual(result, 'spam') 2887 2888 self.assertEqual( 2889 dict(task.calls), 2890 {'add_done_callback': 1}) 2891 2892 self.assertEqual( 2893 dict(fut.calls), 2894 {'add_done_callback': 1}) 2895 2896 # Add patched Task & Future back to the test case 2897 cls.Task = Task 2898 cls.Future = Future 2899 2900 # Add an extra unit-test 2901 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2902 2903 # Disable the "test_task_source_traceback" test 2904 # (the test is hardcoded for a particular call stack, which 2905 # is slightly different for Task subclasses) 2906 cls.test_task_source_traceback = None 2907 2908 return cls 2909 2910 2911class SetMethodsTest: 2912 2913 def test_set_result_causes_invalid_state(self): 2914 Future = type(self).Future 2915 self.loop.call_exception_handler = exc_handler = mock.Mock() 2916 2917 async def foo(): 2918 await asyncio.sleep(0.1) 2919 return 10 2920 2921 coro = foo() 2922 task = self.new_task(self.loop, coro) 2923 Future.set_result(task, 'spam') 2924 2925 self.assertEqual( 2926 self.loop.run_until_complete(task), 2927 'spam') 2928 2929 exc_handler.assert_called_once() 2930 exc = exc_handler.call_args[0][0]['exception'] 2931 with self.assertRaisesRegex(asyncio.InvalidStateError, 2932 r'step\(\): already done'): 2933 raise exc 2934 2935 coro.close() 2936 2937 def test_set_exception_causes_invalid_state(self): 2938 class MyExc(Exception): 2939 pass 2940 2941 Future = type(self).Future 2942 self.loop.call_exception_handler = exc_handler = mock.Mock() 2943 2944 async def foo(): 2945 await asyncio.sleep(0.1) 2946 return 10 2947 2948 coro = foo() 2949 task = self.new_task(self.loop, coro) 2950 Future.set_exception(task, MyExc()) 2951 2952 with self.assertRaises(MyExc): 2953 self.loop.run_until_complete(task) 2954 2955 exc_handler.assert_called_once() 2956 exc = exc_handler.call_args[0][0]['exception'] 2957 with self.assertRaisesRegex(asyncio.InvalidStateError, 2958 r'step\(\): already done'): 2959 raise exc 2960 2961 coro.close() 2962 2963 2964@unittest.skipUnless(hasattr(futures, '_CFuture') and 2965 hasattr(tasks, '_CTask'), 2966 'requires the C _asyncio module') 2967class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2968 test_utils.TestCase): 2969 2970 Task = getattr(tasks, '_CTask', None) 2971 Future = getattr(futures, '_CFuture', None) 2972 2973 @support.refcount_test 2974 def test_refleaks_in_task___init__(self): 2975 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2976 async def coro(): 2977 pass 2978 task = self.new_task(self.loop, coro()) 2979 self.loop.run_until_complete(task) 2980 refs_before = gettotalrefcount() 2981 for i in range(100): 2982 task.__init__(coro(), loop=self.loop) 2983 self.loop.run_until_complete(task) 2984 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2985 2986 def test_del__log_destroy_pending_segfault(self): 2987 async def coro(): 2988 pass 2989 task = self.new_task(self.loop, coro()) 2990 self.loop.run_until_complete(task) 2991 with self.assertRaises(AttributeError): 2992 del task._log_destroy_pending 2993 2994 2995@unittest.skipUnless(hasattr(futures, '_CFuture') and 2996 hasattr(tasks, '_CTask'), 2997 'requires the C _asyncio module') 2998@add_subclass_tests 2999class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 3000 3001 Task = getattr(tasks, '_CTask', None) 3002 Future = getattr(futures, '_CFuture', None) 3003 3004 3005@unittest.skipUnless(hasattr(tasks, '_CTask'), 3006 'requires the C _asyncio module') 3007@add_subclass_tests 3008class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 3009 3010 Task = getattr(tasks, '_CTask', None) 3011 Future = futures._PyFuture 3012 3013 3014@unittest.skipUnless(hasattr(futures, '_CFuture'), 3015 'requires the C _asyncio module') 3016@add_subclass_tests 3017class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 3018 3019 Future = getattr(futures, '_CFuture', None) 3020 Task = tasks._PyTask 3021 3022 3023@unittest.skipUnless(hasattr(tasks, '_CTask'), 3024 'requires the C _asyncio module') 3025class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 3026 3027 Task = getattr(tasks, '_CTask', None) 3028 Future = futures._PyFuture 3029 3030 3031@unittest.skipUnless(hasattr(futures, '_CFuture'), 3032 'requires the C _asyncio module') 3033class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 3034 3035 Task = tasks._PyTask 3036 Future = getattr(futures, '_CFuture', None) 3037 3038 3039class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 3040 test_utils.TestCase): 3041 3042 Task = tasks._PyTask 3043 Future = futures._PyFuture 3044 3045 3046@add_subclass_tests 3047class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 3048 Task = tasks._PyTask 3049 Future = futures._PyFuture 3050 3051 3052@unittest.skipUnless(hasattr(tasks, '_CTask'), 3053 'requires the C _asyncio module') 3054class CTask_Future_Tests(test_utils.TestCase): 3055 3056 def test_foobar(self): 3057 class Fut(asyncio.Future): 3058 @property 3059 def get_loop(self): 3060 raise AttributeError 3061 3062 async def coro(): 3063 await fut 3064 return 'spam' 3065 3066 self.loop = asyncio.new_event_loop() 3067 try: 3068 fut = Fut(loop=self.loop) 3069 self.loop.call_later(0.1, fut.set_result, 1) 3070 task = self.loop.create_task(coro()) 3071 res = self.loop.run_until_complete(task) 3072 finally: 3073 self.loop.close() 3074 3075 self.assertEqual(res, 'spam') 3076 3077 3078class BaseTaskIntrospectionTests: 3079 _register_task = None 3080 _unregister_task = None 3081 _enter_task = None 3082 _leave_task = None 3083 3084 def test__register_task_1(self): 3085 class TaskLike: 3086 @property 3087 def _loop(self): 3088 return loop 3089 3090 def done(self): 3091 return False 3092 3093 task = TaskLike() 3094 loop = mock.Mock() 3095 3096 self.assertEqual(asyncio.all_tasks(loop), set()) 3097 self._register_task(task) 3098 self.assertEqual(asyncio.all_tasks(loop), {task}) 3099 self._unregister_task(task) 3100 3101 def test__register_task_2(self): 3102 class TaskLike: 3103 def get_loop(self): 3104 return loop 3105 3106 def done(self): 3107 return False 3108 3109 task = TaskLike() 3110 loop = mock.Mock() 3111 3112 self.assertEqual(asyncio.all_tasks(loop), set()) 3113 self._register_task(task) 3114 self.assertEqual(asyncio.all_tasks(loop), {task}) 3115 self._unregister_task(task) 3116 3117 def test__register_task_3(self): 3118 class TaskLike: 3119 def get_loop(self): 3120 return loop 3121 3122 def done(self): 3123 return True 3124 3125 task = TaskLike() 3126 loop = mock.Mock() 3127 3128 self.assertEqual(asyncio.all_tasks(loop), set()) 3129 self._register_task(task) 3130 self.assertEqual(asyncio.all_tasks(loop), set()) 3131 self._unregister_task(task) 3132 3133 def test__enter_task(self): 3134 task = mock.Mock() 3135 loop = mock.Mock() 3136 self.assertIsNone(asyncio.current_task(loop)) 3137 self._enter_task(loop, task) 3138 self.assertIs(asyncio.current_task(loop), task) 3139 self._leave_task(loop, task) 3140 3141 def test__enter_task_failure(self): 3142 task1 = mock.Mock() 3143 task2 = mock.Mock() 3144 loop = mock.Mock() 3145 self._enter_task(loop, task1) 3146 with self.assertRaises(RuntimeError): 3147 self._enter_task(loop, task2) 3148 self.assertIs(asyncio.current_task(loop), task1) 3149 self._leave_task(loop, task1) 3150 3151 def test__leave_task(self): 3152 task = mock.Mock() 3153 loop = mock.Mock() 3154 self._enter_task(loop, task) 3155 self._leave_task(loop, task) 3156 self.assertIsNone(asyncio.current_task(loop)) 3157 3158 def test__leave_task_failure1(self): 3159 task1 = mock.Mock() 3160 task2 = mock.Mock() 3161 loop = mock.Mock() 3162 self._enter_task(loop, task1) 3163 with self.assertRaises(RuntimeError): 3164 self._leave_task(loop, task2) 3165 self.assertIs(asyncio.current_task(loop), task1) 3166 self._leave_task(loop, task1) 3167 3168 def test__leave_task_failure2(self): 3169 task = mock.Mock() 3170 loop = mock.Mock() 3171 with self.assertRaises(RuntimeError): 3172 self._leave_task(loop, task) 3173 self.assertIsNone(asyncio.current_task(loop)) 3174 3175 def test__unregister_task(self): 3176 task = mock.Mock() 3177 loop = mock.Mock() 3178 task.get_loop = lambda: loop 3179 self._register_task(task) 3180 self._unregister_task(task) 3181 self.assertEqual(asyncio.all_tasks(loop), set()) 3182 3183 def test__unregister_task_not_registered(self): 3184 task = mock.Mock() 3185 loop = mock.Mock() 3186 self._unregister_task(task) 3187 self.assertEqual(asyncio.all_tasks(loop), set()) 3188 3189 3190class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3191 _register_task = staticmethod(tasks._py_register_task) 3192 _unregister_task = staticmethod(tasks._py_unregister_task) 3193 _enter_task = staticmethod(tasks._py_enter_task) 3194 _leave_task = staticmethod(tasks._py_leave_task) 3195 3196 3197@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 3198 'requires the C _asyncio module') 3199class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3200 if hasattr(tasks, '_c_register_task'): 3201 _register_task = staticmethod(tasks._c_register_task) 3202 _unregister_task = staticmethod(tasks._c_unregister_task) 3203 _enter_task = staticmethod(tasks._c_enter_task) 3204 _leave_task = staticmethod(tasks._c_leave_task) 3205 else: 3206 _register_task = _unregister_task = _enter_task = _leave_task = None 3207 3208 3209class BaseCurrentLoopTests: 3210 3211 def setUp(self): 3212 super().setUp() 3213 self.loop = asyncio.new_event_loop() 3214 self.set_event_loop(self.loop) 3215 3216 def new_task(self, coro): 3217 raise NotImplementedError 3218 3219 def test_current_task_no_running_loop(self): 3220 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3221 3222 def test_current_task_no_running_loop_implicit(self): 3223 with self.assertRaises(RuntimeError): 3224 asyncio.current_task() 3225 3226 def test_current_task_with_implicit_loop(self): 3227 async def coro(): 3228 self.assertIs(asyncio.current_task(loop=self.loop), task) 3229 3230 self.assertIs(asyncio.current_task(None), task) 3231 self.assertIs(asyncio.current_task(), task) 3232 3233 task = self.new_task(coro()) 3234 self.loop.run_until_complete(task) 3235 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3236 3237 3238class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3239 3240 def new_task(self, coro): 3241 return tasks._PyTask(coro, loop=self.loop) 3242 3243 3244@unittest.skipUnless(hasattr(tasks, '_CTask'), 3245 'requires the C _asyncio module') 3246class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3247 3248 def new_task(self, coro): 3249 return getattr(tasks, '_CTask')(coro, loop=self.loop) 3250 3251 3252class GenericTaskTests(test_utils.TestCase): 3253 3254 def test_future_subclass(self): 3255 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 3256 3257 @support.cpython_only 3258 def test_asyncio_module_compiled(self): 3259 # Because of circular imports it's easy to make _asyncio 3260 # module non-importable. This is a simple test that will 3261 # fail on systems where C modules were successfully compiled 3262 # (hence the test for _functools etc), but _asyncio somehow didn't. 3263 try: 3264 import _functools 3265 import _json 3266 import _pickle 3267 except ImportError: 3268 self.skipTest('C modules are not available') 3269 else: 3270 try: 3271 import _asyncio 3272 except ImportError: 3273 self.fail('_asyncio module is missing') 3274 3275 3276class GatherTestsBase: 3277 3278 def setUp(self): 3279 super().setUp() 3280 self.one_loop = self.new_test_loop() 3281 self.other_loop = self.new_test_loop() 3282 self.set_event_loop(self.one_loop, cleanup=False) 3283 3284 def _run_loop(self, loop): 3285 while loop._ready: 3286 test_utils.run_briefly(loop) 3287 3288 def _check_success(self, **kwargs): 3289 a, b, c = [self.one_loop.create_future() for i in range(3)] 3290 fut = self._gather(*self.wrap_futures(a, b, c), **kwargs) 3291 cb = test_utils.MockCallback() 3292 fut.add_done_callback(cb) 3293 b.set_result(1) 3294 a.set_result(2) 3295 self._run_loop(self.one_loop) 3296 self.assertEqual(cb.called, False) 3297 self.assertFalse(fut.done()) 3298 c.set_result(3) 3299 self._run_loop(self.one_loop) 3300 cb.assert_called_once_with(fut) 3301 self.assertEqual(fut.result(), [2, 1, 3]) 3302 3303 def test_success(self): 3304 self._check_success() 3305 self._check_success(return_exceptions=False) 3306 3307 def test_result_exception_success(self): 3308 self._check_success(return_exceptions=True) 3309 3310 def test_one_exception(self): 3311 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3312 fut = self._gather(*self.wrap_futures(a, b, c, d, e)) 3313 cb = test_utils.MockCallback() 3314 fut.add_done_callback(cb) 3315 exc = ZeroDivisionError() 3316 a.set_result(1) 3317 b.set_exception(exc) 3318 self._run_loop(self.one_loop) 3319 self.assertTrue(fut.done()) 3320 cb.assert_called_once_with(fut) 3321 self.assertIs(fut.exception(), exc) 3322 # Does nothing 3323 c.set_result(3) 3324 d.cancel() 3325 e.set_exception(RuntimeError()) 3326 e.exception() 3327 3328 def test_return_exceptions(self): 3329 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 3330 fut = self._gather(*self.wrap_futures(a, b, c, d), 3331 return_exceptions=True) 3332 cb = test_utils.MockCallback() 3333 fut.add_done_callback(cb) 3334 exc = ZeroDivisionError() 3335 exc2 = RuntimeError() 3336 b.set_result(1) 3337 c.set_exception(exc) 3338 a.set_result(3) 3339 self._run_loop(self.one_loop) 3340 self.assertFalse(fut.done()) 3341 d.set_exception(exc2) 3342 self._run_loop(self.one_loop) 3343 self.assertTrue(fut.done()) 3344 cb.assert_called_once_with(fut) 3345 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 3346 3347 def test_env_var_debug(self): 3348 code = '\n'.join(( 3349 'import asyncio.coroutines', 3350 'print(asyncio.coroutines._DEBUG)')) 3351 3352 # Test with -E to not fail if the unit test was run with 3353 # PYTHONASYNCIODEBUG set to a non-empty string 3354 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 3355 self.assertEqual(stdout.rstrip(), b'False') 3356 3357 sts, stdout, stderr = assert_python_ok('-c', code, 3358 PYTHONASYNCIODEBUG='', 3359 PYTHONDEVMODE='') 3360 self.assertEqual(stdout.rstrip(), b'False') 3361 3362 sts, stdout, stderr = assert_python_ok('-c', code, 3363 PYTHONASYNCIODEBUG='1', 3364 PYTHONDEVMODE='') 3365 self.assertEqual(stdout.rstrip(), b'True') 3366 3367 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 3368 PYTHONASYNCIODEBUG='1', 3369 PYTHONDEVMODE='') 3370 self.assertEqual(stdout.rstrip(), b'False') 3371 3372 # -X dev 3373 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 3374 '-c', code) 3375 self.assertEqual(stdout.rstrip(), b'True') 3376 3377 3378class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 3379 3380 def wrap_futures(self, *futures): 3381 return futures 3382 3383 def _gather(self, *args, **kwargs): 3384 return asyncio.gather(*args, **kwargs) 3385 3386 def test_constructor_empty_sequence_without_loop(self): 3387 with self.assertWarns(DeprecationWarning) as cm: 3388 with self.assertRaises(RuntimeError): 3389 asyncio.gather() 3390 self.assertEqual(cm.warnings[0].filename, __file__) 3391 3392 def test_constructor_empty_sequence_use_running_loop(self): 3393 async def gather(): 3394 return asyncio.gather() 3395 fut = self.one_loop.run_until_complete(gather()) 3396 self.assertIsInstance(fut, asyncio.Future) 3397 self.assertIs(fut._loop, self.one_loop) 3398 self._run_loop(self.one_loop) 3399 self.assertTrue(fut.done()) 3400 self.assertEqual(fut.result(), []) 3401 3402 def test_constructor_empty_sequence_use_global_loop(self): 3403 # Deprecated in 3.10 3404 asyncio.set_event_loop(self.one_loop) 3405 self.addCleanup(asyncio.set_event_loop, None) 3406 with self.assertWarns(DeprecationWarning) as cm: 3407 fut = asyncio.gather() 3408 self.assertEqual(cm.warnings[0].filename, __file__) 3409 self.assertIsInstance(fut, asyncio.Future) 3410 self.assertIs(fut._loop, self.one_loop) 3411 self._run_loop(self.one_loop) 3412 self.assertTrue(fut.done()) 3413 self.assertEqual(fut.result(), []) 3414 3415 def test_constructor_heterogenous_futures(self): 3416 fut1 = self.one_loop.create_future() 3417 fut2 = self.other_loop.create_future() 3418 with self.assertRaises(ValueError): 3419 asyncio.gather(fut1, fut2) 3420 3421 def test_constructor_homogenous_futures(self): 3422 children = [self.other_loop.create_future() for i in range(3)] 3423 fut = asyncio.gather(*children) 3424 self.assertIs(fut._loop, self.other_loop) 3425 self._run_loop(self.other_loop) 3426 self.assertFalse(fut.done()) 3427 fut = asyncio.gather(*children) 3428 self.assertIs(fut._loop, self.other_loop) 3429 self._run_loop(self.other_loop) 3430 self.assertFalse(fut.done()) 3431 3432 def test_one_cancellation(self): 3433 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3434 fut = asyncio.gather(a, b, c, d, e) 3435 cb = test_utils.MockCallback() 3436 fut.add_done_callback(cb) 3437 a.set_result(1) 3438 b.cancel() 3439 self._run_loop(self.one_loop) 3440 self.assertTrue(fut.done()) 3441 cb.assert_called_once_with(fut) 3442 self.assertFalse(fut.cancelled()) 3443 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3444 # Does nothing 3445 c.set_result(3) 3446 d.cancel() 3447 e.set_exception(RuntimeError()) 3448 e.exception() 3449 3450 def test_result_exception_one_cancellation(self): 3451 a, b, c, d, e, f = [self.one_loop.create_future() 3452 for i in range(6)] 3453 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3454 cb = test_utils.MockCallback() 3455 fut.add_done_callback(cb) 3456 a.set_result(1) 3457 zde = ZeroDivisionError() 3458 b.set_exception(zde) 3459 c.cancel() 3460 self._run_loop(self.one_loop) 3461 self.assertFalse(fut.done()) 3462 d.set_result(3) 3463 e.cancel() 3464 rte = RuntimeError() 3465 f.set_exception(rte) 3466 res = self.one_loop.run_until_complete(fut) 3467 self.assertIsInstance(res[2], asyncio.CancelledError) 3468 self.assertIsInstance(res[4], asyncio.CancelledError) 3469 res[2] = res[4] = None 3470 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3471 cb.assert_called_once_with(fut) 3472 3473 3474class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3475 3476 def wrap_futures(self, *futures): 3477 coros = [] 3478 for fut in futures: 3479 async def coro(fut=fut): 3480 return await fut 3481 coros.append(coro()) 3482 return coros 3483 3484 def _gather(self, *args, **kwargs): 3485 async def coro(): 3486 return asyncio.gather(*args, **kwargs) 3487 return self.one_loop.run_until_complete(coro()) 3488 3489 def test_constructor_without_loop(self): 3490 async def coro(): 3491 return 'abc' 3492 gen1 = coro() 3493 self.addCleanup(gen1.close) 3494 gen2 = coro() 3495 self.addCleanup(gen2.close) 3496 with self.assertWarns(DeprecationWarning) as cm: 3497 with self.assertRaises(RuntimeError): 3498 asyncio.gather(gen1, gen2) 3499 self.assertEqual(cm.warnings[0].filename, __file__) 3500 3501 def test_constructor_use_running_loop(self): 3502 async def coro(): 3503 return 'abc' 3504 gen1 = coro() 3505 gen2 = coro() 3506 async def gather(): 3507 return asyncio.gather(gen1, gen2) 3508 fut = self.one_loop.run_until_complete(gather()) 3509 self.assertIs(fut._loop, self.one_loop) 3510 self.one_loop.run_until_complete(fut) 3511 3512 def test_constructor_use_global_loop(self): 3513 # Deprecated in 3.10 3514 async def coro(): 3515 return 'abc' 3516 asyncio.set_event_loop(self.other_loop) 3517 self.addCleanup(asyncio.set_event_loop, None) 3518 gen1 = coro() 3519 gen2 = coro() 3520 with self.assertWarns(DeprecationWarning) as cm: 3521 fut = asyncio.gather(gen1, gen2) 3522 self.assertEqual(cm.warnings[0].filename, __file__) 3523 self.assertIs(fut._loop, self.other_loop) 3524 self.other_loop.run_until_complete(fut) 3525 3526 def test_duplicate_coroutines(self): 3527 with self.assertWarns(DeprecationWarning): 3528 @asyncio.coroutine 3529 def coro(s): 3530 return s 3531 c = coro('abc') 3532 fut = self._gather(c, c, coro('def'), c) 3533 self._run_loop(self.one_loop) 3534 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3535 3536 def test_cancellation_broadcast(self): 3537 # Cancelling outer() cancels all children. 3538 proof = 0 3539 waiter = self.one_loop.create_future() 3540 3541 async def inner(): 3542 nonlocal proof 3543 await waiter 3544 proof += 1 3545 3546 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3547 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3548 gatherer = None 3549 3550 async def outer(): 3551 nonlocal proof, gatherer 3552 gatherer = asyncio.gather(child1, child2) 3553 await gatherer 3554 proof += 100 3555 3556 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3557 test_utils.run_briefly(self.one_loop) 3558 self.assertTrue(f.cancel()) 3559 with self.assertRaises(asyncio.CancelledError): 3560 self.one_loop.run_until_complete(f) 3561 self.assertFalse(gatherer.cancel()) 3562 self.assertTrue(waiter.cancelled()) 3563 self.assertTrue(child1.cancelled()) 3564 self.assertTrue(child2.cancelled()) 3565 test_utils.run_briefly(self.one_loop) 3566 self.assertEqual(proof, 0) 3567 3568 def test_exception_marking(self): 3569 # Test for the first line marked "Mark exception retrieved." 3570 3571 async def inner(f): 3572 await f 3573 raise RuntimeError('should not be ignored') 3574 3575 a = self.one_loop.create_future() 3576 b = self.one_loop.create_future() 3577 3578 async def outer(): 3579 await asyncio.gather(inner(a), inner(b)) 3580 3581 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3582 test_utils.run_briefly(self.one_loop) 3583 a.set_result(None) 3584 test_utils.run_briefly(self.one_loop) 3585 b.set_result(None) 3586 test_utils.run_briefly(self.one_loop) 3587 self.assertIsInstance(f.exception(), RuntimeError) 3588 3589 3590class RunCoroutineThreadsafeTests(test_utils.TestCase): 3591 """Test case for asyncio.run_coroutine_threadsafe.""" 3592 3593 def setUp(self): 3594 super().setUp() 3595 self.loop = asyncio.new_event_loop() 3596 self.set_event_loop(self.loop) # Will cleanup properly 3597 3598 async def add(self, a, b, fail=False, cancel=False): 3599 """Wait 0.05 second and return a + b.""" 3600 await asyncio.sleep(0.05) 3601 if fail: 3602 raise RuntimeError("Fail!") 3603 if cancel: 3604 asyncio.current_task(self.loop).cancel() 3605 await asyncio.sleep(0) 3606 return a + b 3607 3608 def target(self, fail=False, cancel=False, timeout=None, 3609 advance_coro=False): 3610 """Run add coroutine in the event loop.""" 3611 coro = self.add(1, 2, fail=fail, cancel=cancel) 3612 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3613 if advance_coro: 3614 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3615 # otherwise it spills errors and breaks **other** unittests, since 3616 # 'target' is interacting with threads. 3617 3618 # With this call, `coro` will be advanced, so that 3619 # CoroWrapper.__del__ won't do anything when asyncio tests run 3620 # in debug mode. 3621 self.loop.call_soon_threadsafe(coro.send, None) 3622 try: 3623 return future.result(timeout) 3624 finally: 3625 future.done() or future.cancel() 3626 3627 def test_run_coroutine_threadsafe(self): 3628 """Test coroutine submission from a thread to an event loop.""" 3629 future = self.loop.run_in_executor(None, self.target) 3630 result = self.loop.run_until_complete(future) 3631 self.assertEqual(result, 3) 3632 3633 def test_run_coroutine_threadsafe_with_exception(self): 3634 """Test coroutine submission from a thread to an event loop 3635 when an exception is raised.""" 3636 future = self.loop.run_in_executor(None, self.target, True) 3637 with self.assertRaises(RuntimeError) as exc_context: 3638 self.loop.run_until_complete(future) 3639 self.assertIn("Fail!", exc_context.exception.args) 3640 3641 def test_run_coroutine_threadsafe_with_timeout(self): 3642 """Test coroutine submission from a thread to an event loop 3643 when a timeout is raised.""" 3644 callback = lambda: self.target(timeout=0) 3645 future = self.loop.run_in_executor(None, callback) 3646 with self.assertRaises(asyncio.TimeoutError): 3647 self.loop.run_until_complete(future) 3648 test_utils.run_briefly(self.loop) 3649 # Check that there's no pending task (add has been cancelled) 3650 for task in asyncio.all_tasks(self.loop): 3651 self.assertTrue(task.done()) 3652 3653 def test_run_coroutine_threadsafe_task_cancelled(self): 3654 """Test coroutine submission from a thread to an event loop 3655 when the task is cancelled.""" 3656 callback = lambda: self.target(cancel=True) 3657 future = self.loop.run_in_executor(None, callback) 3658 with self.assertRaises(asyncio.CancelledError): 3659 self.loop.run_until_complete(future) 3660 3661 def test_run_coroutine_threadsafe_task_factory_exception(self): 3662 """Test coroutine submission from a thread to an event loop 3663 when the task factory raise an exception.""" 3664 3665 def task_factory(loop, coro): 3666 raise NameError 3667 3668 run = self.loop.run_in_executor( 3669 None, lambda: self.target(advance_coro=True)) 3670 3671 # Set exception handler 3672 callback = test_utils.MockCallback() 3673 self.loop.set_exception_handler(callback) 3674 3675 # Set corrupted task factory 3676 self.addCleanup(self.loop.set_task_factory, 3677 self.loop.get_task_factory()) 3678 self.loop.set_task_factory(task_factory) 3679 3680 # Run event loop 3681 with self.assertRaises(NameError) as exc_context: 3682 self.loop.run_until_complete(run) 3683 3684 # Check exceptions 3685 self.assertEqual(len(callback.call_args_list), 1) 3686 (loop, context), kwargs = callback.call_args 3687 self.assertEqual(context['exception'], exc_context.exception) 3688 3689 3690class SleepTests(test_utils.TestCase): 3691 def setUp(self): 3692 super().setUp() 3693 self.loop = asyncio.new_event_loop() 3694 self.set_event_loop(self.loop) 3695 3696 def tearDown(self): 3697 self.loop.close() 3698 self.loop = None 3699 super().tearDown() 3700 3701 def test_sleep_zero(self): 3702 result = 0 3703 3704 def inc_result(num): 3705 nonlocal result 3706 result += num 3707 3708 async def coro(): 3709 self.loop.call_soon(inc_result, 1) 3710 self.assertEqual(result, 0) 3711 num = await asyncio.sleep(0, result=10) 3712 self.assertEqual(result, 1) # inc'ed by call_soon 3713 inc_result(num) # num should be 11 3714 3715 self.loop.run_until_complete(coro()) 3716 self.assertEqual(result, 11) 3717 3718 3719class WaitTests(test_utils.TestCase): 3720 def setUp(self): 3721 super().setUp() 3722 self.loop = asyncio.new_event_loop() 3723 self.set_event_loop(self.loop) 3724 3725 def tearDown(self): 3726 self.loop.close() 3727 self.loop = None 3728 super().tearDown() 3729 3730 def test_coro_is_deprecated_in_wait(self): 3731 # Remove test when passing coros to asyncio.wait() is removed in 3.11 3732 with self.assertWarns(DeprecationWarning): 3733 self.loop.run_until_complete( 3734 asyncio.wait([coroutine_function()])) 3735 3736 task = self.loop.create_task(coroutine_function()) 3737 with self.assertWarns(DeprecationWarning): 3738 self.loop.run_until_complete( 3739 asyncio.wait([task, coroutine_function()])) 3740 3741 3742class CompatibilityTests(test_utils.TestCase): 3743 # Tests for checking a bridge between old-styled coroutines 3744 # and async/await syntax 3745 3746 def setUp(self): 3747 super().setUp() 3748 self.loop = asyncio.new_event_loop() 3749 self.set_event_loop(self.loop) 3750 3751 def tearDown(self): 3752 self.loop.close() 3753 self.loop = None 3754 super().tearDown() 3755 3756 def test_yield_from_awaitable(self): 3757 3758 with self.assertWarns(DeprecationWarning): 3759 @asyncio.coroutine 3760 def coro(): 3761 yield from asyncio.sleep(0) 3762 return 'ok' 3763 3764 result = self.loop.run_until_complete(coro()) 3765 self.assertEqual('ok', result) 3766 3767 def test_await_old_style_coro(self): 3768 3769 with self.assertWarns(DeprecationWarning): 3770 @asyncio.coroutine 3771 def coro1(): 3772 return 'ok1' 3773 3774 with self.assertWarns(DeprecationWarning): 3775 @asyncio.coroutine 3776 def coro2(): 3777 yield from asyncio.sleep(0) 3778 return 'ok2' 3779 3780 async def inner(): 3781 return await asyncio.gather(coro1(), coro2()) 3782 3783 result = self.loop.run_until_complete(inner()) 3784 self.assertEqual(['ok1', 'ok2'], result) 3785 3786 def test_debug_mode_interop(self): 3787 # https://bugs.python.org/issue32636 3788 code = textwrap.dedent(""" 3789 import asyncio 3790 3791 async def native_coro(): 3792 pass 3793 3794 @asyncio.coroutine 3795 def old_style_coro(): 3796 yield from native_coro() 3797 3798 asyncio.run(old_style_coro()) 3799 """) 3800 3801 assert_python_ok("-Wignore::DeprecationWarning", "-c", code, 3802 PYTHONASYNCIODEBUG="1") 3803 3804 3805if __name__ == '__main__': 3806 unittest.main() 3807