1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import traceback 14import types 15import unittest 16import weakref 17from unittest import mock 18from types import GenericAlias 19 20import asyncio 21from asyncio import coroutines 22from asyncio import futures 23from asyncio import tasks 24from test.test_asyncio import utils as test_utils 25from test import support 26from test.support.script_helper import assert_python_ok 27 28 29def tearDownModule(): 30 asyncio.set_event_loop_policy(None) 31 32 33async def coroutine_function(): 34 pass 35 36 37@contextlib.contextmanager 38def set_coroutine_debug(enabled): 39 coroutines = asyncio.coroutines 40 41 old_debug = coroutines._DEBUG 42 try: 43 coroutines._DEBUG = enabled 44 yield 45 finally: 46 coroutines._DEBUG = old_debug 47 48 49def format_coroutine(qualname, state, src, source_traceback, generator=False): 50 if generator: 51 state = '%s' % state 52 else: 53 state = '%s, defined' % state 54 if source_traceback is not None: 55 frame = source_traceback[-1] 56 return ('coro=<%s() %s at %s> created at %s:%s' 57 % (qualname, state, src, frame[0], frame[1])) 58 else: 59 return 'coro=<%s() %s at %s>' % (qualname, state, src) 60 61 62def get_innermost_context(exc): 63 """ 64 Return information about the innermost exception context in the chain. 65 """ 66 depth = 0 67 while True: 68 context = exc.__context__ 69 if context is None: 70 break 71 72 exc = context 73 depth += 1 74 75 return (type(exc), exc.args, depth) 76 77 78class Dummy: 79 80 def __repr__(self): 81 return '<Dummy>' 82 83 def __call__(self, *args): 84 pass 85 86 87class CoroLikeObject: 88 def send(self, v): 89 raise StopIteration(42) 90 91 def throw(self, *exc): 92 pass 93 94 def close(self): 95 pass 96 97 def __await__(self): 98 return self 99 100 101# The following value can be used as a very small timeout: 102# it passes check "timeout > 0", but has almost 103# no effect on the test performance 104_EPSILON = 0.0001 105 106 107class BaseTaskTests: 108 109 Task = None 110 Future = None 111 112 def new_task(self, loop, coro, name='TestTask'): 113 return self.__class__.Task(coro, loop=loop, name=name) 114 115 def new_future(self, loop): 116 return self.__class__.Future(loop=loop) 117 118 def setUp(self): 119 super().setUp() 120 self.loop = self.new_test_loop() 121 self.loop.set_task_factory(self.new_task) 122 self.loop.create_future = lambda: self.new_future(self.loop) 123 124 125 def test_generic_alias(self): 126 task = self.__class__.Task[str] 127 self.assertEqual(task.__args__, (str,)) 128 self.assertIsInstance(task, GenericAlias) 129 130 def test_task_cancel_message_getter(self): 131 async def coro(): 132 pass 133 t = self.new_task(self.loop, coro()) 134 self.assertTrue(hasattr(t, '_cancel_message')) 135 self.assertEqual(t._cancel_message, None) 136 137 t.cancel('my message') 138 self.assertEqual(t._cancel_message, 'my message') 139 140 with self.assertRaises(asyncio.CancelledError): 141 self.loop.run_until_complete(t) 142 143 def test_task_cancel_message_setter(self): 144 async def coro(): 145 pass 146 t = self.new_task(self.loop, coro()) 147 t.cancel('my message') 148 t._cancel_message = 'my new message' 149 self.assertEqual(t._cancel_message, 'my new message') 150 151 with self.assertRaises(asyncio.CancelledError): 152 self.loop.run_until_complete(t) 153 154 def test_task_del_collect(self): 155 class Evil: 156 def __del__(self): 157 gc.collect() 158 159 async def run(): 160 return Evil() 161 162 self.loop.run_until_complete( 163 asyncio.gather(*[ 164 self.new_task(self.loop, run()) for _ in range(100) 165 ])) 166 167 def test_other_loop_future(self): 168 other_loop = asyncio.new_event_loop() 169 fut = self.new_future(other_loop) 170 171 async def run(fut): 172 await fut 173 174 try: 175 with self.assertRaisesRegex(RuntimeError, 176 r'Task .* got Future .* attached'): 177 self.loop.run_until_complete(run(fut)) 178 finally: 179 other_loop.close() 180 181 def test_task_awaits_on_itself(self): 182 183 async def test(): 184 await task 185 186 task = asyncio.ensure_future(test(), loop=self.loop) 187 188 with self.assertRaisesRegex(RuntimeError, 189 'Task cannot await on itself'): 190 self.loop.run_until_complete(task) 191 192 def test_task_class(self): 193 async def notmuch(): 194 return 'ok' 195 t = self.new_task(self.loop, notmuch()) 196 self.loop.run_until_complete(t) 197 self.assertTrue(t.done()) 198 self.assertEqual(t.result(), 'ok') 199 self.assertIs(t._loop, self.loop) 200 self.assertIs(t.get_loop(), self.loop) 201 202 loop = asyncio.new_event_loop() 203 self.set_event_loop(loop) 204 t = self.new_task(loop, notmuch()) 205 self.assertIs(t._loop, loop) 206 loop.run_until_complete(t) 207 loop.close() 208 209 def test_ensure_future_coroutine(self): 210 async def notmuch(): 211 return 'ok' 212 t = asyncio.ensure_future(notmuch(), loop=self.loop) 213 self.assertIs(t._loop, self.loop) 214 self.loop.run_until_complete(t) 215 self.assertTrue(t.done()) 216 self.assertEqual(t.result(), 'ok') 217 218 a = notmuch() 219 self.addCleanup(a.close) 220 with self.assertWarns(DeprecationWarning) as cm: 221 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 222 asyncio.ensure_future(a) 223 self.assertEqual(cm.warnings[0].filename, __file__) 224 225 async def test(): 226 return asyncio.ensure_future(notmuch()) 227 t = self.loop.run_until_complete(test()) 228 self.assertIs(t._loop, self.loop) 229 self.loop.run_until_complete(t) 230 self.assertTrue(t.done()) 231 self.assertEqual(t.result(), 'ok') 232 233 # Deprecated in 3.10 234 asyncio.set_event_loop(self.loop) 235 self.addCleanup(asyncio.set_event_loop, None) 236 with self.assertWarns(DeprecationWarning) as cm: 237 t = asyncio.ensure_future(notmuch()) 238 self.assertEqual(cm.warnings[0].filename, __file__) 239 self.assertIs(t._loop, self.loop) 240 self.loop.run_until_complete(t) 241 self.assertTrue(t.done()) 242 self.assertEqual(t.result(), 'ok') 243 244 def test_ensure_future_coroutine_2(self): 245 with self.assertWarns(DeprecationWarning): 246 @asyncio.coroutine 247 def notmuch(): 248 return 'ok' 249 t = asyncio.ensure_future(notmuch(), loop=self.loop) 250 self.assertIs(t._loop, self.loop) 251 self.loop.run_until_complete(t) 252 self.assertTrue(t.done()) 253 self.assertEqual(t.result(), 'ok') 254 255 a = notmuch() 256 self.addCleanup(a.close) 257 with self.assertWarns(DeprecationWarning) as cm: 258 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 259 asyncio.ensure_future(a) 260 self.assertEqual(cm.warnings[0].filename, __file__) 261 262 async def test(): 263 return asyncio.ensure_future(notmuch()) 264 t = self.loop.run_until_complete(test()) 265 self.assertIs(t._loop, self.loop) 266 self.loop.run_until_complete(t) 267 self.assertTrue(t.done()) 268 self.assertEqual(t.result(), 'ok') 269 270 # Deprecated in 3.10 271 asyncio.set_event_loop(self.loop) 272 self.addCleanup(asyncio.set_event_loop, None) 273 with self.assertWarns(DeprecationWarning) as cm: 274 t = asyncio.ensure_future(notmuch()) 275 self.assertEqual(cm.warnings[0].filename, __file__) 276 self.assertIs(t._loop, self.loop) 277 self.loop.run_until_complete(t) 278 self.assertTrue(t.done()) 279 self.assertEqual(t.result(), 'ok') 280 281 def test_ensure_future_future(self): 282 f_orig = self.new_future(self.loop) 283 f_orig.set_result('ko') 284 285 f = asyncio.ensure_future(f_orig) 286 self.loop.run_until_complete(f) 287 self.assertTrue(f.done()) 288 self.assertEqual(f.result(), 'ko') 289 self.assertIs(f, f_orig) 290 291 loop = asyncio.new_event_loop() 292 self.set_event_loop(loop) 293 294 with self.assertRaises(ValueError): 295 f = asyncio.ensure_future(f_orig, loop=loop) 296 297 loop.close() 298 299 f = asyncio.ensure_future(f_orig, loop=self.loop) 300 self.assertIs(f, f_orig) 301 302 def test_ensure_future_task(self): 303 async def notmuch(): 304 return 'ok' 305 t_orig = self.new_task(self.loop, notmuch()) 306 t = asyncio.ensure_future(t_orig) 307 self.loop.run_until_complete(t) 308 self.assertTrue(t.done()) 309 self.assertEqual(t.result(), 'ok') 310 self.assertIs(t, t_orig) 311 312 loop = asyncio.new_event_loop() 313 self.set_event_loop(loop) 314 315 with self.assertRaises(ValueError): 316 t = asyncio.ensure_future(t_orig, loop=loop) 317 318 loop.close() 319 320 t = asyncio.ensure_future(t_orig, loop=self.loop) 321 self.assertIs(t, t_orig) 322 323 def test_ensure_future_awaitable(self): 324 class Aw: 325 def __init__(self, coro): 326 self.coro = coro 327 def __await__(self): 328 return (yield from self.coro) 329 330 with self.assertWarns(DeprecationWarning): 331 @asyncio.coroutine 332 def coro(): 333 return 'ok' 334 335 loop = asyncio.new_event_loop() 336 self.set_event_loop(loop) 337 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 338 loop.run_until_complete(fut) 339 self.assertEqual(fut.result(), 'ok') 340 341 def test_ensure_future_neither(self): 342 with self.assertRaises(TypeError): 343 asyncio.ensure_future('ok') 344 345 def test_ensure_future_error_msg(self): 346 loop = asyncio.new_event_loop() 347 f = self.new_future(self.loop) 348 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 349 'different loop than the one specified as ' 350 'the loop argument'): 351 asyncio.ensure_future(f, loop=loop) 352 loop.close() 353 354 def test_get_stack(self): 355 T = None 356 357 async def foo(): 358 await bar() 359 360 async def bar(): 361 # test get_stack() 362 f = T.get_stack(limit=1) 363 try: 364 self.assertEqual(f[0].f_code.co_name, 'foo') 365 finally: 366 f = None 367 368 # test print_stack() 369 file = io.StringIO() 370 T.print_stack(limit=1, file=file) 371 file.seek(0) 372 tb = file.read() 373 self.assertRegex(tb, r'foo\(\) running') 374 375 async def runner(): 376 nonlocal T 377 T = asyncio.ensure_future(foo(), loop=self.loop) 378 await T 379 380 self.loop.run_until_complete(runner()) 381 382 def test_task_repr(self): 383 self.loop.set_debug(False) 384 385 async def notmuch(): 386 return 'abc' 387 388 # test coroutine function 389 self.assertEqual(notmuch.__name__, 'notmuch') 390 self.assertRegex(notmuch.__qualname__, 391 r'\w+.test_task_repr.<locals>.notmuch') 392 self.assertEqual(notmuch.__module__, __name__) 393 394 filename, lineno = test_utils.get_function_source(notmuch) 395 src = "%s:%s" % (filename, lineno) 396 397 # test coroutine object 398 gen = notmuch() 399 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 400 self.assertEqual(gen.__name__, 'notmuch') 401 self.assertEqual(gen.__qualname__, coro_qualname) 402 403 # test pending Task 404 t = self.new_task(self.loop, gen) 405 t.add_done_callback(Dummy()) 406 407 coro = format_coroutine(coro_qualname, 'running', src, 408 t._source_traceback, generator=True) 409 self.assertEqual(repr(t), 410 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 411 412 # test cancelling Task 413 t.cancel() # Does not take immediate effect! 414 self.assertEqual(repr(t), 415 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 416 417 # test cancelled Task 418 self.assertRaises(asyncio.CancelledError, 419 self.loop.run_until_complete, t) 420 coro = format_coroutine(coro_qualname, 'done', src, 421 t._source_traceback) 422 self.assertEqual(repr(t), 423 "<Task cancelled name='TestTask' %s>" % coro) 424 425 # test finished Task 426 t = self.new_task(self.loop, notmuch()) 427 self.loop.run_until_complete(t) 428 coro = format_coroutine(coro_qualname, 'done', src, 429 t._source_traceback) 430 self.assertEqual(repr(t), 431 "<Task finished name='TestTask' %s result='abc'>" % coro) 432 433 def test_task_repr_autogenerated(self): 434 async def notmuch(): 435 return 123 436 437 t1 = self.new_task(self.loop, notmuch(), None) 438 t2 = self.new_task(self.loop, notmuch(), None) 439 self.assertNotEqual(repr(t1), repr(t2)) 440 441 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 442 self.assertIsNotNone(match1) 443 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 444 self.assertIsNotNone(match2) 445 446 # Autogenerated task names should have monotonically increasing numbers 447 self.assertLess(int(match1.group(1)), int(match2.group(1))) 448 self.loop.run_until_complete(t1) 449 self.loop.run_until_complete(t2) 450 451 def test_task_repr_name_not_str(self): 452 async def notmuch(): 453 return 123 454 455 t = self.new_task(self.loop, notmuch()) 456 t.set_name({6}) 457 self.assertEqual(t.get_name(), '{6}') 458 self.loop.run_until_complete(t) 459 460 def test_task_repr_coro_decorator(self): 461 self.loop.set_debug(False) 462 463 with self.assertWarns(DeprecationWarning): 464 @asyncio.coroutine 465 def notmuch(): 466 # notmuch() function doesn't use yield from: it will be wrapped by 467 # @coroutine decorator 468 return 123 469 470 # test coroutine function 471 self.assertEqual(notmuch.__name__, 'notmuch') 472 self.assertRegex(notmuch.__qualname__, 473 r'\w+.test_task_repr_coro_decorator' 474 r'\.<locals>\.notmuch') 475 self.assertEqual(notmuch.__module__, __name__) 476 477 # test coroutine object 478 gen = notmuch() 479 # On Python >= 3.5, generators now inherit the name of the 480 # function, as expected, and have a qualified name (__qualname__ 481 # attribute). 482 coro_name = 'notmuch' 483 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 484 '.<locals>.notmuch') 485 self.assertEqual(gen.__name__, coro_name) 486 self.assertEqual(gen.__qualname__, coro_qualname) 487 488 # test repr(CoroWrapper) 489 if coroutines._DEBUG: 490 # format the coroutine object 491 if coroutines._DEBUG: 492 filename, lineno = test_utils.get_function_source(notmuch) 493 frame = gen._source_traceback[-1] 494 coro = ('%s() running, defined at %s:%s, created at %s:%s' 495 % (coro_qualname, filename, lineno, 496 frame[0], frame[1])) 497 else: 498 code = gen.gi_code 499 coro = ('%s() running at %s:%s' 500 % (coro_qualname, code.co_filename, 501 code.co_firstlineno)) 502 503 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 504 505 # test pending Task 506 t = self.new_task(self.loop, gen) 507 t.add_done_callback(Dummy()) 508 509 # format the coroutine object 510 if coroutines._DEBUG: 511 src = '%s:%s' % test_utils.get_function_source(notmuch) 512 else: 513 code = gen.gi_code 514 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 515 coro = format_coroutine(coro_qualname, 'running', src, 516 t._source_traceback, 517 generator=not coroutines._DEBUG) 518 self.assertEqual(repr(t), 519 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 520 self.loop.run_until_complete(t) 521 522 def test_task_repr_wait_for(self): 523 self.loop.set_debug(False) 524 525 async def wait_for(fut): 526 return await fut 527 528 fut = self.new_future(self.loop) 529 task = self.new_task(self.loop, wait_for(fut)) 530 test_utils.run_briefly(self.loop) 531 self.assertRegex(repr(task), 532 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 533 534 fut.set_result(None) 535 self.loop.run_until_complete(task) 536 537 def test_task_repr_partial_corowrapper(self): 538 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 539 # coroutine is a partial function 540 with set_coroutine_debug(True): 541 self.loop.set_debug(True) 542 543 async def func(x, y): 544 await asyncio.sleep(0) 545 546 with self.assertWarns(DeprecationWarning): 547 partial_func = asyncio.coroutine(functools.partial(func, 1)) 548 task = self.loop.create_task(partial_func(2)) 549 550 # make warnings quiet 551 task._log_destroy_pending = False 552 self.addCleanup(task._coro.close) 553 554 coro_repr = repr(task._coro) 555 expected = ( 556 r'<coroutine object \w+\.test_task_repr_partial_corowrapper' 557 r'\.<locals>\.func at' 558 ) 559 self.assertRegex(coro_repr, expected) 560 561 def test_task_basics(self): 562 563 async def outer(): 564 a = await inner1() 565 b = await inner2() 566 return a+b 567 568 async def inner1(): 569 return 42 570 571 async def inner2(): 572 return 1000 573 574 t = outer() 575 self.assertEqual(self.loop.run_until_complete(t), 1042) 576 577 def test_exception_chaining_after_await(self): 578 # Test that when awaiting on a task when an exception is already 579 # active, if the task raises an exception it will be chained 580 # with the original. 581 loop = asyncio.new_event_loop() 582 self.set_event_loop(loop) 583 584 async def raise_error(): 585 raise ValueError 586 587 async def run(): 588 try: 589 raise KeyError(3) 590 except Exception as exc: 591 task = self.new_task(loop, raise_error()) 592 try: 593 await task 594 except Exception as exc: 595 self.assertEqual(type(exc), ValueError) 596 chained = exc.__context__ 597 self.assertEqual((type(chained), chained.args), 598 (KeyError, (3,))) 599 600 try: 601 task = self.new_task(loop, run()) 602 loop.run_until_complete(task) 603 finally: 604 loop.close() 605 606 def test_exception_chaining_after_await_with_context_cycle(self): 607 # Check trying to create an exception context cycle: 608 # https://bugs.python.org/issue40696 609 has_cycle = None 610 loop = asyncio.new_event_loop() 611 self.set_event_loop(loop) 612 613 async def process_exc(exc): 614 raise exc 615 616 async def run(): 617 nonlocal has_cycle 618 try: 619 raise KeyError('a') 620 except Exception as exc: 621 task = self.new_task(loop, process_exc(exc)) 622 try: 623 await task 624 except BaseException as exc: 625 has_cycle = (exc is exc.__context__) 626 # Prevent a hang if has_cycle is True. 627 exc.__context__ = None 628 629 try: 630 task = self.new_task(loop, run()) 631 loop.run_until_complete(task) 632 finally: 633 loop.close() 634 # This also distinguishes from the initial has_cycle=None. 635 self.assertEqual(has_cycle, False) 636 637 def test_cancel(self): 638 639 def gen(): 640 when = yield 641 self.assertAlmostEqual(10.0, when) 642 yield 0 643 644 loop = self.new_test_loop(gen) 645 646 async def task(): 647 await asyncio.sleep(10.0) 648 return 12 649 650 t = self.new_task(loop, task()) 651 loop.call_soon(t.cancel) 652 with self.assertRaises(asyncio.CancelledError): 653 loop.run_until_complete(t) 654 self.assertTrue(t.done()) 655 self.assertTrue(t.cancelled()) 656 self.assertFalse(t.cancel()) 657 658 def test_cancel_with_message_then_future_result(self): 659 # Test Future.result() after calling cancel() with a message. 660 cases = [ 661 ((), ()), 662 ((None,), ()), 663 (('my message',), ('my message',)), 664 # Non-string values should roundtrip. 665 ((5,), (5,)), 666 ] 667 for cancel_args, expected_args in cases: 668 with self.subTest(cancel_args=cancel_args): 669 loop = asyncio.new_event_loop() 670 self.set_event_loop(loop) 671 672 async def sleep(): 673 await asyncio.sleep(10) 674 675 async def coro(): 676 task = self.new_task(loop, sleep()) 677 await asyncio.sleep(0) 678 task.cancel(*cancel_args) 679 done, pending = await asyncio.wait([task]) 680 task.result() 681 682 task = self.new_task(loop, coro()) 683 with self.assertRaises(asyncio.CancelledError) as cm: 684 loop.run_until_complete(task) 685 exc = cm.exception 686 self.assertEqual(exc.args, ()) 687 688 actual = get_innermost_context(exc) 689 self.assertEqual(actual, 690 (asyncio.CancelledError, expected_args, 2)) 691 692 def test_cancel_with_message_then_future_exception(self): 693 # Test Future.exception() after calling cancel() with a message. 694 cases = [ 695 ((), ()), 696 ((None,), ()), 697 (('my message',), ('my message',)), 698 # Non-string values should roundtrip. 699 ((5,), (5,)), 700 ] 701 for cancel_args, expected_args in cases: 702 with self.subTest(cancel_args=cancel_args): 703 loop = asyncio.new_event_loop() 704 self.set_event_loop(loop) 705 706 async def sleep(): 707 await asyncio.sleep(10) 708 709 async def coro(): 710 task = self.new_task(loop, sleep()) 711 await asyncio.sleep(0) 712 task.cancel(*cancel_args) 713 done, pending = await asyncio.wait([task]) 714 task.exception() 715 716 task = self.new_task(loop, coro()) 717 with self.assertRaises(asyncio.CancelledError) as cm: 718 loop.run_until_complete(task) 719 exc = cm.exception 720 self.assertEqual(exc.args, ()) 721 722 actual = get_innermost_context(exc) 723 self.assertEqual(actual, 724 (asyncio.CancelledError, expected_args, 2)) 725 726 def test_cancel_with_message_before_starting_task(self): 727 loop = asyncio.new_event_loop() 728 self.set_event_loop(loop) 729 730 async def sleep(): 731 await asyncio.sleep(10) 732 733 async def coro(): 734 task = self.new_task(loop, sleep()) 735 # We deliberately leave out the sleep here. 736 task.cancel('my message') 737 done, pending = await asyncio.wait([task]) 738 task.exception() 739 740 task = self.new_task(loop, coro()) 741 with self.assertRaises(asyncio.CancelledError) as cm: 742 loop.run_until_complete(task) 743 exc = cm.exception 744 self.assertEqual(exc.args, ()) 745 746 actual = get_innermost_context(exc) 747 self.assertEqual(actual, 748 (asyncio.CancelledError, ('my message',), 2)) 749 750 def test_cancel_yield(self): 751 with self.assertWarns(DeprecationWarning): 752 @asyncio.coroutine 753 def task(): 754 yield 755 yield 756 return 12 757 758 t = self.new_task(self.loop, task()) 759 test_utils.run_briefly(self.loop) # start coro 760 t.cancel() 761 self.assertRaises( 762 asyncio.CancelledError, self.loop.run_until_complete, t) 763 self.assertTrue(t.done()) 764 self.assertTrue(t.cancelled()) 765 self.assertFalse(t.cancel()) 766 767 def test_cancel_inner_future(self): 768 f = self.new_future(self.loop) 769 770 async def task(): 771 await f 772 return 12 773 774 t = self.new_task(self.loop, task()) 775 test_utils.run_briefly(self.loop) # start task 776 f.cancel() 777 with self.assertRaises(asyncio.CancelledError): 778 self.loop.run_until_complete(t) 779 self.assertTrue(f.cancelled()) 780 self.assertTrue(t.cancelled()) 781 782 def test_cancel_both_task_and_inner_future(self): 783 f = self.new_future(self.loop) 784 785 async def task(): 786 await f 787 return 12 788 789 t = self.new_task(self.loop, task()) 790 test_utils.run_briefly(self.loop) 791 792 f.cancel() 793 t.cancel() 794 795 with self.assertRaises(asyncio.CancelledError): 796 self.loop.run_until_complete(t) 797 798 self.assertTrue(t.done()) 799 self.assertTrue(f.cancelled()) 800 self.assertTrue(t.cancelled()) 801 802 def test_cancel_task_catching(self): 803 fut1 = self.new_future(self.loop) 804 fut2 = self.new_future(self.loop) 805 806 async def task(): 807 await fut1 808 try: 809 await fut2 810 except asyncio.CancelledError: 811 return 42 812 813 t = self.new_task(self.loop, task()) 814 test_utils.run_briefly(self.loop) 815 self.assertIs(t._fut_waiter, fut1) # White-box test. 816 fut1.set_result(None) 817 test_utils.run_briefly(self.loop) 818 self.assertIs(t._fut_waiter, fut2) # White-box test. 819 t.cancel() 820 self.assertTrue(fut2.cancelled()) 821 res = self.loop.run_until_complete(t) 822 self.assertEqual(res, 42) 823 self.assertFalse(t.cancelled()) 824 825 def test_cancel_task_ignoring(self): 826 fut1 = self.new_future(self.loop) 827 fut2 = self.new_future(self.loop) 828 fut3 = self.new_future(self.loop) 829 830 async def task(): 831 await fut1 832 try: 833 await fut2 834 except asyncio.CancelledError: 835 pass 836 res = await fut3 837 return res 838 839 t = self.new_task(self.loop, task()) 840 test_utils.run_briefly(self.loop) 841 self.assertIs(t._fut_waiter, fut1) # White-box test. 842 fut1.set_result(None) 843 test_utils.run_briefly(self.loop) 844 self.assertIs(t._fut_waiter, fut2) # White-box test. 845 t.cancel() 846 self.assertTrue(fut2.cancelled()) 847 test_utils.run_briefly(self.loop) 848 self.assertIs(t._fut_waiter, fut3) # White-box test. 849 fut3.set_result(42) 850 res = self.loop.run_until_complete(t) 851 self.assertEqual(res, 42) 852 self.assertFalse(fut3.cancelled()) 853 self.assertFalse(t.cancelled()) 854 855 def test_cancel_current_task(self): 856 loop = asyncio.new_event_loop() 857 self.set_event_loop(loop) 858 859 async def task(): 860 t.cancel() 861 self.assertTrue(t._must_cancel) # White-box test. 862 # The sleep should be cancelled immediately. 863 await asyncio.sleep(100) 864 return 12 865 866 t = self.new_task(loop, task()) 867 self.assertFalse(t.cancelled()) 868 self.assertRaises( 869 asyncio.CancelledError, loop.run_until_complete, t) 870 self.assertTrue(t.done()) 871 self.assertTrue(t.cancelled()) 872 self.assertFalse(t._must_cancel) # White-box test. 873 self.assertFalse(t.cancel()) 874 875 def test_cancel_at_end(self): 876 """coroutine end right after task is cancelled""" 877 loop = asyncio.new_event_loop() 878 self.set_event_loop(loop) 879 880 async def task(): 881 t.cancel() 882 self.assertTrue(t._must_cancel) # White-box test. 883 return 12 884 885 t = self.new_task(loop, task()) 886 self.assertFalse(t.cancelled()) 887 self.assertRaises( 888 asyncio.CancelledError, loop.run_until_complete, t) 889 self.assertTrue(t.done()) 890 self.assertTrue(t.cancelled()) 891 self.assertFalse(t._must_cancel) # White-box test. 892 self.assertFalse(t.cancel()) 893 894 def test_cancel_awaited_task(self): 895 # This tests for a relatively rare condition when 896 # a task cancellation is requested for a task which is not 897 # currently blocked, such as a task cancelling itself. 898 # In this situation we must ensure that whatever next future 899 # or task the cancelled task blocks on is cancelled correctly 900 # as well. See also bpo-34872. 901 loop = asyncio.new_event_loop() 902 self.addCleanup(lambda: loop.close()) 903 904 task = nested_task = None 905 fut = self.new_future(loop) 906 907 async def nested(): 908 await fut 909 910 async def coro(): 911 nonlocal nested_task 912 # Create a sub-task and wait for it to run. 913 nested_task = self.new_task(loop, nested()) 914 await asyncio.sleep(0) 915 916 # Request the current task to be cancelled. 917 task.cancel() 918 # Block on the nested task, which should be immediately 919 # cancelled. 920 await nested_task 921 922 task = self.new_task(loop, coro()) 923 with self.assertRaises(asyncio.CancelledError): 924 loop.run_until_complete(task) 925 926 self.assertTrue(task.cancelled()) 927 self.assertTrue(nested_task.cancelled()) 928 self.assertTrue(fut.cancelled()) 929 930 def assert_text_contains(self, text, substr): 931 if substr not in text: 932 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 933 934 def test_cancel_traceback_for_future_result(self): 935 # When calling Future.result() on a cancelled task, check that the 936 # line of code that was interrupted is included in the traceback. 937 loop = asyncio.new_event_loop() 938 self.set_event_loop(loop) 939 940 async def nested(): 941 # This will get cancelled immediately. 942 await asyncio.sleep(10) 943 944 async def coro(): 945 task = self.new_task(loop, nested()) 946 await asyncio.sleep(0) 947 task.cancel() 948 await task # search target 949 950 task = self.new_task(loop, coro()) 951 try: 952 loop.run_until_complete(task) 953 except asyncio.CancelledError: 954 tb = traceback.format_exc() 955 self.assert_text_contains(tb, "await asyncio.sleep(10)") 956 # The intermediate await should also be included. 957 self.assert_text_contains(tb, "await task # search target") 958 else: 959 self.fail('CancelledError did not occur') 960 961 def test_cancel_traceback_for_future_exception(self): 962 # When calling Future.exception() on a cancelled task, check that the 963 # line of code that was interrupted is included in the traceback. 964 loop = asyncio.new_event_loop() 965 self.set_event_loop(loop) 966 967 async def nested(): 968 # This will get cancelled immediately. 969 await asyncio.sleep(10) 970 971 async def coro(): 972 task = self.new_task(loop, nested()) 973 await asyncio.sleep(0) 974 task.cancel() 975 done, pending = await asyncio.wait([task]) 976 task.exception() # search target 977 978 task = self.new_task(loop, coro()) 979 try: 980 loop.run_until_complete(task) 981 except asyncio.CancelledError: 982 tb = traceback.format_exc() 983 self.assert_text_contains(tb, "await asyncio.sleep(10)") 984 # The intermediate await should also be included. 985 self.assert_text_contains(tb, 986 "task.exception() # search target") 987 else: 988 self.fail('CancelledError did not occur') 989 990 def test_stop_while_run_in_complete(self): 991 992 def gen(): 993 when = yield 994 self.assertAlmostEqual(0.1, when) 995 when = yield 0.1 996 self.assertAlmostEqual(0.2, when) 997 when = yield 0.1 998 self.assertAlmostEqual(0.3, when) 999 yield 0.1 1000 1001 loop = self.new_test_loop(gen) 1002 1003 x = 0 1004 1005 async def task(): 1006 nonlocal x 1007 while x < 10: 1008 await asyncio.sleep(0.1) 1009 x += 1 1010 if x == 2: 1011 loop.stop() 1012 1013 t = self.new_task(loop, task()) 1014 with self.assertRaises(RuntimeError) as cm: 1015 loop.run_until_complete(t) 1016 self.assertEqual(str(cm.exception), 1017 'Event loop stopped before Future completed.') 1018 self.assertFalse(t.done()) 1019 self.assertEqual(x, 2) 1020 self.assertAlmostEqual(0.3, loop.time()) 1021 1022 t.cancel() 1023 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 1024 1025 def test_log_traceback(self): 1026 async def coro(): 1027 pass 1028 1029 task = self.new_task(self.loop, coro()) 1030 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 1031 task._log_traceback = True 1032 self.loop.run_until_complete(task) 1033 1034 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 1035 def gen(): 1036 when = yield 1037 self.assertAlmostEqual(0, when) 1038 1039 loop = self.new_test_loop(gen) 1040 1041 fut = self.new_future(loop) 1042 fut.set_result('done') 1043 1044 ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) 1045 1046 self.assertEqual(ret, 'done') 1047 self.assertTrue(fut.done()) 1048 self.assertAlmostEqual(0, loop.time()) 1049 1050 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 1051 def gen(): 1052 when = yield 1053 self.assertAlmostEqual(0, when) 1054 1055 loop = self.new_test_loop(gen) 1056 1057 foo_started = False 1058 1059 async def foo(): 1060 nonlocal foo_started 1061 foo_started = True 1062 1063 with self.assertRaises(asyncio.TimeoutError): 1064 loop.run_until_complete(asyncio.wait_for(foo(), 0)) 1065 1066 self.assertAlmostEqual(0, loop.time()) 1067 self.assertEqual(foo_started, False) 1068 1069 def test_wait_for_timeout_less_then_0_or_0(self): 1070 def gen(): 1071 when = yield 1072 self.assertAlmostEqual(0.2, when) 1073 when = yield 0 1074 self.assertAlmostEqual(0, when) 1075 1076 for timeout in [0, -1]: 1077 with self.subTest(timeout=timeout): 1078 loop = self.new_test_loop(gen) 1079 1080 foo_running = None 1081 1082 async def foo(): 1083 nonlocal foo_running 1084 foo_running = True 1085 try: 1086 await asyncio.sleep(0.2) 1087 finally: 1088 foo_running = False 1089 return 'done' 1090 1091 fut = self.new_task(loop, foo()) 1092 1093 with self.assertRaises(asyncio.TimeoutError): 1094 loop.run_until_complete(asyncio.wait_for(fut, timeout)) 1095 self.assertTrue(fut.done()) 1096 # it should have been cancelled due to the timeout 1097 self.assertTrue(fut.cancelled()) 1098 self.assertAlmostEqual(0, loop.time()) 1099 self.assertEqual(foo_running, False) 1100 1101 def test_wait_for(self): 1102 1103 def gen(): 1104 when = yield 1105 self.assertAlmostEqual(0.2, when) 1106 when = yield 0 1107 self.assertAlmostEqual(0.1, when) 1108 when = yield 0.1 1109 1110 loop = self.new_test_loop(gen) 1111 1112 foo_running = None 1113 1114 async def foo(): 1115 nonlocal foo_running 1116 foo_running = True 1117 try: 1118 await asyncio.sleep(0.2) 1119 finally: 1120 foo_running = False 1121 return 'done' 1122 1123 fut = self.new_task(loop, foo()) 1124 1125 with self.assertRaises(asyncio.TimeoutError): 1126 loop.run_until_complete(asyncio.wait_for(fut, 0.1)) 1127 self.assertTrue(fut.done()) 1128 # it should have been cancelled due to the timeout 1129 self.assertTrue(fut.cancelled()) 1130 self.assertAlmostEqual(0.1, loop.time()) 1131 self.assertEqual(foo_running, False) 1132 1133 def test_wait_for_blocking(self): 1134 loop = self.new_test_loop() 1135 1136 async def coro(): 1137 return 'done' 1138 1139 res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) 1140 self.assertEqual(res, 'done') 1141 1142 def test_wait_for_race_condition(self): 1143 1144 def gen(): 1145 yield 0.1 1146 yield 0.1 1147 yield 0.1 1148 1149 loop = self.new_test_loop(gen) 1150 1151 fut = self.new_future(loop) 1152 task = asyncio.wait_for(fut, timeout=0.2) 1153 loop.call_later(0.1, fut.set_result, "ok") 1154 res = loop.run_until_complete(task) 1155 self.assertEqual(res, "ok") 1156 1157 def test_wait_for_cancellation_race_condition(self): 1158 async def inner(): 1159 with contextlib.suppress(asyncio.CancelledError): 1160 await asyncio.sleep(1) 1161 return 1 1162 1163 async def main(): 1164 result = await asyncio.wait_for(inner(), timeout=.01) 1165 self.assertEqual(result, 1) 1166 1167 asyncio.run(main()) 1168 1169 def test_wait_for_waits_for_task_cancellation(self): 1170 loop = asyncio.new_event_loop() 1171 self.addCleanup(loop.close) 1172 1173 task_done = False 1174 1175 async def foo(): 1176 async def inner(): 1177 nonlocal task_done 1178 try: 1179 await asyncio.sleep(0.2) 1180 except asyncio.CancelledError: 1181 await asyncio.sleep(_EPSILON) 1182 raise 1183 finally: 1184 task_done = True 1185 1186 inner_task = self.new_task(loop, inner()) 1187 1188 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1189 1190 with self.assertRaises(asyncio.TimeoutError) as cm: 1191 loop.run_until_complete(foo()) 1192 1193 self.assertTrue(task_done) 1194 chained = cm.exception.__context__ 1195 self.assertEqual(type(chained), asyncio.CancelledError) 1196 1197 def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): 1198 loop = asyncio.new_event_loop() 1199 self.addCleanup(loop.close) 1200 1201 task_done = False 1202 1203 async def foo(): 1204 async def inner(): 1205 nonlocal task_done 1206 try: 1207 await asyncio.sleep(10) 1208 except asyncio.CancelledError: 1209 await asyncio.sleep(_EPSILON) 1210 raise 1211 finally: 1212 task_done = True 1213 1214 inner_task = self.new_task(loop, inner()) 1215 await asyncio.sleep(_EPSILON) 1216 await asyncio.wait_for(inner_task, timeout=0) 1217 1218 with self.assertRaises(asyncio.TimeoutError) as cm: 1219 loop.run_until_complete(foo()) 1220 1221 self.assertTrue(task_done) 1222 chained = cm.exception.__context__ 1223 self.assertEqual(type(chained), asyncio.CancelledError) 1224 1225 def test_wait_for_reraises_exception_during_cancellation(self): 1226 loop = asyncio.new_event_loop() 1227 self.addCleanup(loop.close) 1228 1229 class FooException(Exception): 1230 pass 1231 1232 async def foo(): 1233 async def inner(): 1234 try: 1235 await asyncio.sleep(0.2) 1236 finally: 1237 raise FooException 1238 1239 inner_task = self.new_task(loop, inner()) 1240 1241 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1242 1243 with self.assertRaises(FooException): 1244 loop.run_until_complete(foo()) 1245 1246 def test_wait_for_self_cancellation(self): 1247 loop = asyncio.new_event_loop() 1248 self.addCleanup(loop.close) 1249 1250 async def foo(): 1251 async def inner(): 1252 try: 1253 await asyncio.sleep(0.3) 1254 except asyncio.CancelledError: 1255 try: 1256 await asyncio.sleep(0.3) 1257 except asyncio.CancelledError: 1258 await asyncio.sleep(0.3) 1259 1260 return 42 1261 1262 inner_task = self.new_task(loop, inner()) 1263 1264 wait = asyncio.wait_for(inner_task, timeout=0.1) 1265 1266 # Test that wait_for itself is properly cancellable 1267 # even when the initial task holds up the initial cancellation. 1268 task = self.new_task(loop, wait) 1269 await asyncio.sleep(0.2) 1270 task.cancel() 1271 1272 with self.assertRaises(asyncio.CancelledError): 1273 await task 1274 1275 self.assertEqual(await inner_task, 42) 1276 1277 loop.run_until_complete(foo()) 1278 1279 def test_wait(self): 1280 1281 def gen(): 1282 when = yield 1283 self.assertAlmostEqual(0.1, when) 1284 when = yield 0 1285 self.assertAlmostEqual(0.15, when) 1286 yield 0.15 1287 1288 loop = self.new_test_loop(gen) 1289 1290 a = self.new_task(loop, asyncio.sleep(0.1)) 1291 b = self.new_task(loop, asyncio.sleep(0.15)) 1292 1293 async def foo(): 1294 done, pending = await asyncio.wait([b, a]) 1295 self.assertEqual(done, set([a, b])) 1296 self.assertEqual(pending, set()) 1297 return 42 1298 1299 res = loop.run_until_complete(self.new_task(loop, foo())) 1300 self.assertEqual(res, 42) 1301 self.assertAlmostEqual(0.15, loop.time()) 1302 1303 # Doing it again should take no time and exercise a different path. 1304 res = loop.run_until_complete(self.new_task(loop, foo())) 1305 self.assertAlmostEqual(0.15, loop.time()) 1306 self.assertEqual(res, 42) 1307 1308 def test_wait_duplicate_coroutines(self): 1309 1310 with self.assertWarns(DeprecationWarning): 1311 @asyncio.coroutine 1312 def coro(s): 1313 return s 1314 c = coro('test') 1315 task = self.new_task( 1316 self.loop, 1317 asyncio.wait([c, c, coro('spam')])) 1318 1319 with self.assertWarns(DeprecationWarning): 1320 done, pending = self.loop.run_until_complete(task) 1321 1322 self.assertFalse(pending) 1323 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1324 1325 def test_wait_errors(self): 1326 self.assertRaises( 1327 ValueError, self.loop.run_until_complete, 1328 asyncio.wait(set())) 1329 1330 # -1 is an invalid return_when value 1331 sleep_coro = asyncio.sleep(10.0) 1332 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1333 self.assertRaises(ValueError, 1334 self.loop.run_until_complete, wait_coro) 1335 1336 sleep_coro.close() 1337 1338 def test_wait_first_completed(self): 1339 1340 def gen(): 1341 when = yield 1342 self.assertAlmostEqual(10.0, when) 1343 when = yield 0 1344 self.assertAlmostEqual(0.1, when) 1345 yield 0.1 1346 1347 loop = self.new_test_loop(gen) 1348 1349 a = self.new_task(loop, asyncio.sleep(10.0)) 1350 b = self.new_task(loop, asyncio.sleep(0.1)) 1351 task = self.new_task( 1352 loop, 1353 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1354 1355 done, pending = loop.run_until_complete(task) 1356 self.assertEqual({b}, done) 1357 self.assertEqual({a}, pending) 1358 self.assertFalse(a.done()) 1359 self.assertTrue(b.done()) 1360 self.assertIsNone(b.result()) 1361 self.assertAlmostEqual(0.1, loop.time()) 1362 1363 # move forward to close generator 1364 loop.advance_time(10) 1365 loop.run_until_complete(asyncio.wait([a, b])) 1366 1367 def test_wait_really_done(self): 1368 # there is possibility that some tasks in the pending list 1369 # became done but their callbacks haven't all been called yet 1370 1371 async def coro1(): 1372 await asyncio.sleep(0) 1373 1374 async def coro2(): 1375 await asyncio.sleep(0) 1376 await asyncio.sleep(0) 1377 1378 a = self.new_task(self.loop, coro1()) 1379 b = self.new_task(self.loop, coro2()) 1380 task = self.new_task( 1381 self.loop, 1382 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1383 1384 done, pending = self.loop.run_until_complete(task) 1385 self.assertEqual({a, b}, done) 1386 self.assertTrue(a.done()) 1387 self.assertIsNone(a.result()) 1388 self.assertTrue(b.done()) 1389 self.assertIsNone(b.result()) 1390 1391 def test_wait_first_exception(self): 1392 1393 def gen(): 1394 when = yield 1395 self.assertAlmostEqual(10.0, when) 1396 yield 0 1397 1398 loop = self.new_test_loop(gen) 1399 1400 # first_exception, task already has exception 1401 a = self.new_task(loop, asyncio.sleep(10.0)) 1402 1403 async def exc(): 1404 raise ZeroDivisionError('err') 1405 1406 b = self.new_task(loop, exc()) 1407 task = self.new_task( 1408 loop, 1409 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1410 1411 done, pending = loop.run_until_complete(task) 1412 self.assertEqual({b}, done) 1413 self.assertEqual({a}, pending) 1414 self.assertAlmostEqual(0, loop.time()) 1415 1416 # move forward to close generator 1417 loop.advance_time(10) 1418 loop.run_until_complete(asyncio.wait([a, b])) 1419 1420 def test_wait_first_exception_in_wait(self): 1421 1422 def gen(): 1423 when = yield 1424 self.assertAlmostEqual(10.0, when) 1425 when = yield 0 1426 self.assertAlmostEqual(0.01, when) 1427 yield 0.01 1428 1429 loop = self.new_test_loop(gen) 1430 1431 # first_exception, exception during waiting 1432 a = self.new_task(loop, asyncio.sleep(10.0)) 1433 1434 async def exc(): 1435 await asyncio.sleep(0.01) 1436 raise ZeroDivisionError('err') 1437 1438 b = self.new_task(loop, exc()) 1439 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1440 1441 done, pending = loop.run_until_complete(task) 1442 self.assertEqual({b}, done) 1443 self.assertEqual({a}, pending) 1444 self.assertAlmostEqual(0.01, loop.time()) 1445 1446 # move forward to close generator 1447 loop.advance_time(10) 1448 loop.run_until_complete(asyncio.wait([a, b])) 1449 1450 def test_wait_with_exception(self): 1451 1452 def gen(): 1453 when = yield 1454 self.assertAlmostEqual(0.1, when) 1455 when = yield 0 1456 self.assertAlmostEqual(0.15, when) 1457 yield 0.15 1458 1459 loop = self.new_test_loop(gen) 1460 1461 a = self.new_task(loop, asyncio.sleep(0.1)) 1462 1463 async def sleeper(): 1464 await asyncio.sleep(0.15) 1465 raise ZeroDivisionError('really') 1466 1467 b = self.new_task(loop, sleeper()) 1468 1469 async def foo(): 1470 done, pending = await asyncio.wait([b, a]) 1471 self.assertEqual(len(done), 2) 1472 self.assertEqual(pending, set()) 1473 errors = set(f for f in done if f.exception() is not None) 1474 self.assertEqual(len(errors), 1) 1475 1476 loop.run_until_complete(self.new_task(loop, foo())) 1477 self.assertAlmostEqual(0.15, loop.time()) 1478 1479 loop.run_until_complete(self.new_task(loop, foo())) 1480 self.assertAlmostEqual(0.15, loop.time()) 1481 1482 def test_wait_with_timeout(self): 1483 1484 def gen(): 1485 when = yield 1486 self.assertAlmostEqual(0.1, when) 1487 when = yield 0 1488 self.assertAlmostEqual(0.15, when) 1489 when = yield 0 1490 self.assertAlmostEqual(0.11, when) 1491 yield 0.11 1492 1493 loop = self.new_test_loop(gen) 1494 1495 a = self.new_task(loop, asyncio.sleep(0.1)) 1496 b = self.new_task(loop, asyncio.sleep(0.15)) 1497 1498 async def foo(): 1499 done, pending = await asyncio.wait([b, a], timeout=0.11) 1500 self.assertEqual(done, set([a])) 1501 self.assertEqual(pending, set([b])) 1502 1503 loop.run_until_complete(self.new_task(loop, foo())) 1504 self.assertAlmostEqual(0.11, loop.time()) 1505 1506 # move forward to close generator 1507 loop.advance_time(10) 1508 loop.run_until_complete(asyncio.wait([a, b])) 1509 1510 def test_wait_concurrent_complete(self): 1511 1512 def gen(): 1513 when = yield 1514 self.assertAlmostEqual(0.1, when) 1515 when = yield 0 1516 self.assertAlmostEqual(0.15, when) 1517 when = yield 0 1518 self.assertAlmostEqual(0.1, when) 1519 yield 0.1 1520 1521 loop = self.new_test_loop(gen) 1522 1523 a = self.new_task(loop, asyncio.sleep(0.1)) 1524 b = self.new_task(loop, asyncio.sleep(0.15)) 1525 1526 done, pending = loop.run_until_complete( 1527 asyncio.wait([b, a], timeout=0.1)) 1528 1529 self.assertEqual(done, set([a])) 1530 self.assertEqual(pending, set([b])) 1531 self.assertAlmostEqual(0.1, loop.time()) 1532 1533 # move forward to close generator 1534 loop.advance_time(10) 1535 loop.run_until_complete(asyncio.wait([a, b])) 1536 1537 def test_wait_with_iterator_of_tasks(self): 1538 1539 def gen(): 1540 when = yield 1541 self.assertAlmostEqual(0.1, when) 1542 when = yield 0 1543 self.assertAlmostEqual(0.15, when) 1544 yield 0.15 1545 1546 loop = self.new_test_loop(gen) 1547 1548 a = self.new_task(loop, asyncio.sleep(0.1)) 1549 b = self.new_task(loop, asyncio.sleep(0.15)) 1550 1551 async def foo(): 1552 done, pending = await asyncio.wait(iter([b, a])) 1553 self.assertEqual(done, set([a, b])) 1554 self.assertEqual(pending, set()) 1555 return 42 1556 1557 res = loop.run_until_complete(self.new_task(loop, foo())) 1558 self.assertEqual(res, 42) 1559 self.assertAlmostEqual(0.15, loop.time()) 1560 1561 def test_as_completed(self): 1562 1563 def gen(): 1564 yield 0 1565 yield 0 1566 yield 0.01 1567 yield 0 1568 1569 loop = self.new_test_loop(gen) 1570 # disable "slow callback" warning 1571 loop.slow_callback_duration = 1.0 1572 completed = set() 1573 time_shifted = False 1574 1575 with self.assertWarns(DeprecationWarning): 1576 @asyncio.coroutine 1577 def sleeper(dt, x): 1578 nonlocal time_shifted 1579 yield from asyncio.sleep(dt) 1580 completed.add(x) 1581 if not time_shifted and 'a' in completed and 'b' in completed: 1582 time_shifted = True 1583 loop.advance_time(0.14) 1584 return x 1585 1586 a = sleeper(0.01, 'a') 1587 b = sleeper(0.01, 'b') 1588 c = sleeper(0.15, 'c') 1589 1590 async def foo(): 1591 values = [] 1592 for f in asyncio.as_completed([b, c, a]): 1593 values.append(await f) 1594 return values 1595 1596 res = loop.run_until_complete(self.new_task(loop, foo())) 1597 self.assertAlmostEqual(0.15, loop.time()) 1598 self.assertTrue('a' in res[:2]) 1599 self.assertTrue('b' in res[:2]) 1600 self.assertEqual(res[2], 'c') 1601 1602 # Doing it again should take no time and exercise a different path. 1603 res = loop.run_until_complete(self.new_task(loop, foo())) 1604 self.assertAlmostEqual(0.15, loop.time()) 1605 1606 def test_as_completed_with_timeout(self): 1607 1608 def gen(): 1609 yield 1610 yield 0 1611 yield 0 1612 yield 0.1 1613 1614 loop = self.new_test_loop(gen) 1615 1616 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1617 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1618 1619 async def foo(): 1620 values = [] 1621 for f in asyncio.as_completed([a, b], timeout=0.12): 1622 if values: 1623 loop.advance_time(0.02) 1624 try: 1625 v = await f 1626 values.append((1, v)) 1627 except asyncio.TimeoutError as exc: 1628 values.append((2, exc)) 1629 return values 1630 1631 res = loop.run_until_complete(self.new_task(loop, foo())) 1632 self.assertEqual(len(res), 2, res) 1633 self.assertEqual(res[0], (1, 'a')) 1634 self.assertEqual(res[1][0], 2) 1635 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1636 self.assertAlmostEqual(0.12, loop.time()) 1637 1638 # move forward to close generator 1639 loop.advance_time(10) 1640 loop.run_until_complete(asyncio.wait([a, b])) 1641 1642 def test_as_completed_with_unused_timeout(self): 1643 1644 def gen(): 1645 yield 1646 yield 0 1647 yield 0.01 1648 1649 loop = self.new_test_loop(gen) 1650 1651 a = asyncio.sleep(0.01, 'a') 1652 1653 async def foo(): 1654 for f in asyncio.as_completed([a], timeout=1): 1655 v = await f 1656 self.assertEqual(v, 'a') 1657 1658 loop.run_until_complete(self.new_task(loop, foo())) 1659 1660 def test_as_completed_reverse_wait(self): 1661 1662 def gen(): 1663 yield 0 1664 yield 0.05 1665 yield 0 1666 1667 loop = self.new_test_loop(gen) 1668 1669 a = asyncio.sleep(0.05, 'a') 1670 b = asyncio.sleep(0.10, 'b') 1671 fs = {a, b} 1672 1673 async def test(): 1674 futs = list(asyncio.as_completed(fs)) 1675 self.assertEqual(len(futs), 2) 1676 1677 x = await futs[1] 1678 self.assertEqual(x, 'a') 1679 self.assertAlmostEqual(0.05, loop.time()) 1680 loop.advance_time(0.05) 1681 y = await futs[0] 1682 self.assertEqual(y, 'b') 1683 self.assertAlmostEqual(0.10, loop.time()) 1684 1685 loop.run_until_complete(test()) 1686 1687 def test_as_completed_concurrent(self): 1688 1689 def gen(): 1690 when = yield 1691 self.assertAlmostEqual(0.05, when) 1692 when = yield 0 1693 self.assertAlmostEqual(0.05, when) 1694 yield 0.05 1695 1696 a = asyncio.sleep(0.05, 'a') 1697 b = asyncio.sleep(0.05, 'b') 1698 fs = {a, b} 1699 1700 async def test(): 1701 futs = list(asyncio.as_completed(fs)) 1702 self.assertEqual(len(futs), 2) 1703 waiter = asyncio.wait(futs) 1704 # Deprecation from passing coros in futs to asyncio.wait() 1705 with self.assertWarns(DeprecationWarning) as cm: 1706 done, pending = await waiter 1707 self.assertEqual(cm.warnings[0].filename, __file__) 1708 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1709 1710 loop = self.new_test_loop(gen) 1711 loop.run_until_complete(test()) 1712 1713 def test_as_completed_duplicate_coroutines(self): 1714 1715 with self.assertWarns(DeprecationWarning): 1716 @asyncio.coroutine 1717 def coro(s): 1718 return s 1719 1720 with self.assertWarns(DeprecationWarning): 1721 @asyncio.coroutine 1722 def runner(): 1723 result = [] 1724 c = coro('ham') 1725 for f in asyncio.as_completed([c, c, coro('spam')]): 1726 result.append((yield from f)) 1727 return result 1728 1729 fut = self.new_task(self.loop, runner()) 1730 self.loop.run_until_complete(fut) 1731 result = fut.result() 1732 self.assertEqual(set(result), {'ham', 'spam'}) 1733 self.assertEqual(len(result), 2) 1734 1735 def test_as_completed_coroutine_without_loop(self): 1736 async def coro(): 1737 return 42 1738 1739 a = coro() 1740 self.addCleanup(a.close) 1741 1742 futs = asyncio.as_completed([a]) 1743 with self.assertWarns(DeprecationWarning) as cm: 1744 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 1745 list(futs) 1746 self.assertEqual(cm.warnings[0].filename, __file__) 1747 1748 def test_as_completed_coroutine_use_running_loop(self): 1749 loop = self.new_test_loop() 1750 1751 async def coro(): 1752 return 42 1753 1754 async def test(): 1755 futs = list(asyncio.as_completed([coro()])) 1756 self.assertEqual(len(futs), 1) 1757 self.assertEqual(await futs[0], 42) 1758 1759 loop.run_until_complete(test()) 1760 1761 def test_as_completed_coroutine_use_global_loop(self): 1762 # Deprecated in 3.10 1763 async def coro(): 1764 return 42 1765 1766 loop = self.new_test_loop() 1767 asyncio.set_event_loop(loop) 1768 self.addCleanup(asyncio.set_event_loop, None) 1769 futs = asyncio.as_completed([coro()]) 1770 with self.assertWarns(DeprecationWarning) as cm: 1771 futs = list(futs) 1772 self.assertEqual(cm.warnings[0].filename, __file__) 1773 self.assertEqual(len(futs), 1) 1774 self.assertEqual(loop.run_until_complete(futs[0]), 42) 1775 1776 def test_sleep(self): 1777 1778 def gen(): 1779 when = yield 1780 self.assertAlmostEqual(0.05, when) 1781 when = yield 0.05 1782 self.assertAlmostEqual(0.1, when) 1783 yield 0.05 1784 1785 loop = self.new_test_loop(gen) 1786 1787 async def sleeper(dt, arg): 1788 await asyncio.sleep(dt/2) 1789 res = await asyncio.sleep(dt/2, arg) 1790 return res 1791 1792 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1793 loop.run_until_complete(t) 1794 self.assertTrue(t.done()) 1795 self.assertEqual(t.result(), 'yeah') 1796 self.assertAlmostEqual(0.1, loop.time()) 1797 1798 def test_sleep_cancel(self): 1799 1800 def gen(): 1801 when = yield 1802 self.assertAlmostEqual(10.0, when) 1803 yield 0 1804 1805 loop = self.new_test_loop(gen) 1806 1807 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1808 1809 handle = None 1810 orig_call_later = loop.call_later 1811 1812 def call_later(delay, callback, *args): 1813 nonlocal handle 1814 handle = orig_call_later(delay, callback, *args) 1815 return handle 1816 1817 loop.call_later = call_later 1818 test_utils.run_briefly(loop) 1819 1820 self.assertFalse(handle._cancelled) 1821 1822 t.cancel() 1823 test_utils.run_briefly(loop) 1824 self.assertTrue(handle._cancelled) 1825 1826 def test_task_cancel_sleeping_task(self): 1827 1828 def gen(): 1829 when = yield 1830 self.assertAlmostEqual(0.1, when) 1831 when = yield 0 1832 self.assertAlmostEqual(5000, when) 1833 yield 0.1 1834 1835 loop = self.new_test_loop(gen) 1836 1837 async def sleep(dt): 1838 await asyncio.sleep(dt) 1839 1840 async def doit(): 1841 sleeper = self.new_task(loop, sleep(5000)) 1842 loop.call_later(0.1, sleeper.cancel) 1843 try: 1844 await sleeper 1845 except asyncio.CancelledError: 1846 return 'cancelled' 1847 else: 1848 return 'slept in' 1849 1850 doer = doit() 1851 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1852 self.assertAlmostEqual(0.1, loop.time()) 1853 1854 def test_task_cancel_waiter_future(self): 1855 fut = self.new_future(self.loop) 1856 1857 async def coro(): 1858 await fut 1859 1860 task = self.new_task(self.loop, coro()) 1861 test_utils.run_briefly(self.loop) 1862 self.assertIs(task._fut_waiter, fut) 1863 1864 task.cancel() 1865 test_utils.run_briefly(self.loop) 1866 self.assertRaises( 1867 asyncio.CancelledError, self.loop.run_until_complete, task) 1868 self.assertIsNone(task._fut_waiter) 1869 self.assertTrue(fut.cancelled()) 1870 1871 def test_task_set_methods(self): 1872 async def notmuch(): 1873 return 'ko' 1874 1875 gen = notmuch() 1876 task = self.new_task(self.loop, gen) 1877 1878 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1879 task.set_result('ok') 1880 1881 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1882 task.set_exception(ValueError()) 1883 1884 self.assertEqual( 1885 self.loop.run_until_complete(task), 1886 'ko') 1887 1888 def test_step_result(self): 1889 with self.assertWarns(DeprecationWarning): 1890 @asyncio.coroutine 1891 def notmuch(): 1892 yield None 1893 yield 1 1894 return 'ko' 1895 1896 self.assertRaises( 1897 RuntimeError, self.loop.run_until_complete, notmuch()) 1898 1899 def test_step_result_future(self): 1900 # If coroutine returns future, task waits on this future. 1901 1902 class Fut(asyncio.Future): 1903 def __init__(self, *args, **kwds): 1904 self.cb_added = False 1905 super().__init__(*args, **kwds) 1906 1907 def add_done_callback(self, *args, **kwargs): 1908 self.cb_added = True 1909 super().add_done_callback(*args, **kwargs) 1910 1911 fut = Fut(loop=self.loop) 1912 result = None 1913 1914 async def wait_for_future(): 1915 nonlocal result 1916 result = await fut 1917 1918 t = self.new_task(self.loop, wait_for_future()) 1919 test_utils.run_briefly(self.loop) 1920 self.assertTrue(fut.cb_added) 1921 1922 res = object() 1923 fut.set_result(res) 1924 test_utils.run_briefly(self.loop) 1925 self.assertIs(res, result) 1926 self.assertTrue(t.done()) 1927 self.assertIsNone(t.result()) 1928 1929 def test_baseexception_during_cancel(self): 1930 1931 def gen(): 1932 when = yield 1933 self.assertAlmostEqual(10.0, when) 1934 yield 0 1935 1936 loop = self.new_test_loop(gen) 1937 1938 async def sleeper(): 1939 await asyncio.sleep(10) 1940 1941 base_exc = SystemExit() 1942 1943 async def notmutch(): 1944 try: 1945 await sleeper() 1946 except asyncio.CancelledError: 1947 raise base_exc 1948 1949 task = self.new_task(loop, notmutch()) 1950 test_utils.run_briefly(loop) 1951 1952 task.cancel() 1953 self.assertFalse(task.done()) 1954 1955 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1956 1957 self.assertTrue(task.done()) 1958 self.assertFalse(task.cancelled()) 1959 self.assertIs(task.exception(), base_exc) 1960 1961 def test_iscoroutinefunction(self): 1962 def fn(): 1963 pass 1964 1965 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1966 1967 def fn1(): 1968 yield 1969 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1970 1971 with self.assertWarns(DeprecationWarning): 1972 @asyncio.coroutine 1973 def fn2(): 1974 yield 1975 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1976 1977 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1978 1979 def test_yield_vs_yield_from(self): 1980 fut = self.new_future(self.loop) 1981 1982 with self.assertWarns(DeprecationWarning): 1983 @asyncio.coroutine 1984 def wait_for_future(): 1985 yield fut 1986 1987 task = wait_for_future() 1988 with self.assertRaises(RuntimeError): 1989 self.loop.run_until_complete(task) 1990 1991 self.assertFalse(fut.done()) 1992 1993 def test_yield_vs_yield_from_generator(self): 1994 with self.assertWarns(DeprecationWarning): 1995 @asyncio.coroutine 1996 def coro(): 1997 yield 1998 1999 with self.assertWarns(DeprecationWarning): 2000 @asyncio.coroutine 2001 def wait_for_future(): 2002 gen = coro() 2003 try: 2004 yield gen 2005 finally: 2006 gen.close() 2007 2008 task = wait_for_future() 2009 self.assertRaises( 2010 RuntimeError, 2011 self.loop.run_until_complete, task) 2012 2013 def test_coroutine_non_gen_function(self): 2014 with self.assertWarns(DeprecationWarning): 2015 @asyncio.coroutine 2016 def func(): 2017 return 'test' 2018 2019 self.assertTrue(asyncio.iscoroutinefunction(func)) 2020 2021 coro = func() 2022 self.assertTrue(asyncio.iscoroutine(coro)) 2023 2024 res = self.loop.run_until_complete(coro) 2025 self.assertEqual(res, 'test') 2026 2027 def test_coroutine_non_gen_function_return_future(self): 2028 fut = self.new_future(self.loop) 2029 2030 with self.assertWarns(DeprecationWarning): 2031 @asyncio.coroutine 2032 def func(): 2033 return fut 2034 2035 async def coro(): 2036 fut.set_result('test') 2037 2038 t1 = self.new_task(self.loop, func()) 2039 t2 = self.new_task(self.loop, coro()) 2040 res = self.loop.run_until_complete(t1) 2041 self.assertEqual(res, 'test') 2042 self.assertIsNone(t2.result()) 2043 2044 def test_current_task(self): 2045 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2046 2047 async def coro(loop): 2048 self.assertIs(asyncio.current_task(), task) 2049 2050 self.assertIs(asyncio.current_task(None), task) 2051 self.assertIs(asyncio.current_task(), task) 2052 2053 task = self.new_task(self.loop, coro(self.loop)) 2054 self.loop.run_until_complete(task) 2055 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2056 2057 def test_current_task_with_interleaving_tasks(self): 2058 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2059 2060 fut1 = self.new_future(self.loop) 2061 fut2 = self.new_future(self.loop) 2062 2063 async def coro1(loop): 2064 self.assertTrue(asyncio.current_task() is task1) 2065 await fut1 2066 self.assertTrue(asyncio.current_task() is task1) 2067 fut2.set_result(True) 2068 2069 async def coro2(loop): 2070 self.assertTrue(asyncio.current_task() is task2) 2071 fut1.set_result(True) 2072 await fut2 2073 self.assertTrue(asyncio.current_task() is task2) 2074 2075 task1 = self.new_task(self.loop, coro1(self.loop)) 2076 task2 = self.new_task(self.loop, coro2(self.loop)) 2077 2078 self.loop.run_until_complete(asyncio.wait((task1, task2))) 2079 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2080 2081 # Some thorough tests for cancellation propagation through 2082 # coroutines, tasks and wait(). 2083 2084 def test_yield_future_passes_cancel(self): 2085 # Cancelling outer() cancels inner() cancels waiter. 2086 proof = 0 2087 waiter = self.new_future(self.loop) 2088 2089 async def inner(): 2090 nonlocal proof 2091 try: 2092 await waiter 2093 except asyncio.CancelledError: 2094 proof += 1 2095 raise 2096 else: 2097 self.fail('got past sleep() in inner()') 2098 2099 async def outer(): 2100 nonlocal proof 2101 try: 2102 await inner() 2103 except asyncio.CancelledError: 2104 proof += 100 # Expect this path. 2105 else: 2106 proof += 10 2107 2108 f = asyncio.ensure_future(outer(), loop=self.loop) 2109 test_utils.run_briefly(self.loop) 2110 f.cancel() 2111 self.loop.run_until_complete(f) 2112 self.assertEqual(proof, 101) 2113 self.assertTrue(waiter.cancelled()) 2114 2115 def test_yield_wait_does_not_shield_cancel(self): 2116 # Cancelling outer() makes wait() return early, leaves inner() 2117 # running. 2118 proof = 0 2119 waiter = self.new_future(self.loop) 2120 2121 async def inner(): 2122 nonlocal proof 2123 await waiter 2124 proof += 1 2125 2126 async def outer(): 2127 nonlocal proof 2128 with self.assertWarns(DeprecationWarning): 2129 d, p = await asyncio.wait([inner()]) 2130 proof += 100 2131 2132 f = asyncio.ensure_future(outer(), loop=self.loop) 2133 test_utils.run_briefly(self.loop) 2134 f.cancel() 2135 self.assertRaises( 2136 asyncio.CancelledError, self.loop.run_until_complete, f) 2137 waiter.set_result(None) 2138 test_utils.run_briefly(self.loop) 2139 self.assertEqual(proof, 1) 2140 2141 def test_shield_result(self): 2142 inner = self.new_future(self.loop) 2143 outer = asyncio.shield(inner) 2144 inner.set_result(42) 2145 res = self.loop.run_until_complete(outer) 2146 self.assertEqual(res, 42) 2147 2148 def test_shield_exception(self): 2149 inner = self.new_future(self.loop) 2150 outer = asyncio.shield(inner) 2151 test_utils.run_briefly(self.loop) 2152 exc = RuntimeError('expected') 2153 inner.set_exception(exc) 2154 test_utils.run_briefly(self.loop) 2155 self.assertIs(outer.exception(), exc) 2156 2157 def test_shield_cancel_inner(self): 2158 inner = self.new_future(self.loop) 2159 outer = asyncio.shield(inner) 2160 test_utils.run_briefly(self.loop) 2161 inner.cancel() 2162 test_utils.run_briefly(self.loop) 2163 self.assertTrue(outer.cancelled()) 2164 2165 def test_shield_cancel_outer(self): 2166 inner = self.new_future(self.loop) 2167 outer = asyncio.shield(inner) 2168 test_utils.run_briefly(self.loop) 2169 outer.cancel() 2170 test_utils.run_briefly(self.loop) 2171 self.assertTrue(outer.cancelled()) 2172 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 2173 2174 def test_shield_shortcut(self): 2175 fut = self.new_future(self.loop) 2176 fut.set_result(42) 2177 res = self.loop.run_until_complete(asyncio.shield(fut)) 2178 self.assertEqual(res, 42) 2179 2180 def test_shield_effect(self): 2181 # Cancelling outer() does not affect inner(). 2182 proof = 0 2183 waiter = self.new_future(self.loop) 2184 2185 async def inner(): 2186 nonlocal proof 2187 await waiter 2188 proof += 1 2189 2190 async def outer(): 2191 nonlocal proof 2192 await asyncio.shield(inner()) 2193 proof += 100 2194 2195 f = asyncio.ensure_future(outer(), loop=self.loop) 2196 test_utils.run_briefly(self.loop) 2197 f.cancel() 2198 with self.assertRaises(asyncio.CancelledError): 2199 self.loop.run_until_complete(f) 2200 waiter.set_result(None) 2201 test_utils.run_briefly(self.loop) 2202 self.assertEqual(proof, 1) 2203 2204 def test_shield_gather(self): 2205 child1 = self.new_future(self.loop) 2206 child2 = self.new_future(self.loop) 2207 parent = asyncio.gather(child1, child2) 2208 outer = asyncio.shield(parent) 2209 test_utils.run_briefly(self.loop) 2210 outer.cancel() 2211 test_utils.run_briefly(self.loop) 2212 self.assertTrue(outer.cancelled()) 2213 child1.set_result(1) 2214 child2.set_result(2) 2215 test_utils.run_briefly(self.loop) 2216 self.assertEqual(parent.result(), [1, 2]) 2217 2218 def test_gather_shield(self): 2219 child1 = self.new_future(self.loop) 2220 child2 = self.new_future(self.loop) 2221 inner1 = asyncio.shield(child1) 2222 inner2 = asyncio.shield(child2) 2223 parent = asyncio.gather(inner1, inner2) 2224 test_utils.run_briefly(self.loop) 2225 parent.cancel() 2226 # This should cancel inner1 and inner2 but bot child1 and child2. 2227 test_utils.run_briefly(self.loop) 2228 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 2229 self.assertTrue(inner1.cancelled()) 2230 self.assertTrue(inner2.cancelled()) 2231 child1.set_result(1) 2232 child2.set_result(2) 2233 test_utils.run_briefly(self.loop) 2234 2235 def test_shield_coroutine_without_loop(self): 2236 async def coro(): 2237 return 42 2238 2239 inner = coro() 2240 self.addCleanup(inner.close) 2241 with self.assertWarns(DeprecationWarning) as cm: 2242 with self.assertRaisesRegex(RuntimeError, 'There is no current event loop'): 2243 asyncio.shield(inner) 2244 self.assertEqual(cm.warnings[0].filename, __file__) 2245 2246 def test_shield_coroutine_use_running_loop(self): 2247 async def coro(): 2248 return 42 2249 2250 async def test(): 2251 return asyncio.shield(coro()) 2252 outer = self.loop.run_until_complete(test()) 2253 self.assertEqual(outer._loop, self.loop) 2254 res = self.loop.run_until_complete(outer) 2255 self.assertEqual(res, 42) 2256 2257 def test_shield_coroutine_use_global_loop(self): 2258 # Deprecated in 3.10 2259 async def coro(): 2260 return 42 2261 2262 asyncio.set_event_loop(self.loop) 2263 self.addCleanup(asyncio.set_event_loop, None) 2264 with self.assertWarns(DeprecationWarning) as cm: 2265 outer = asyncio.shield(coro()) 2266 self.assertEqual(cm.warnings[0].filename, __file__) 2267 self.assertEqual(outer._loop, self.loop) 2268 res = self.loop.run_until_complete(outer) 2269 self.assertEqual(res, 42) 2270 2271 def test_as_completed_invalid_args(self): 2272 fut = self.new_future(self.loop) 2273 2274 # as_completed() expects a list of futures, not a future instance 2275 self.assertRaises(TypeError, self.loop.run_until_complete, 2276 asyncio.as_completed(fut)) 2277 coro = coroutine_function() 2278 self.assertRaises(TypeError, self.loop.run_until_complete, 2279 asyncio.as_completed(coro)) 2280 coro.close() 2281 2282 def test_wait_invalid_args(self): 2283 fut = self.new_future(self.loop) 2284 2285 # wait() expects a list of futures, not a future instance 2286 self.assertRaises(TypeError, self.loop.run_until_complete, 2287 asyncio.wait(fut)) 2288 coro = coroutine_function() 2289 self.assertRaises(TypeError, self.loop.run_until_complete, 2290 asyncio.wait(coro)) 2291 coro.close() 2292 2293 # wait() expects at least a future 2294 self.assertRaises(ValueError, self.loop.run_until_complete, 2295 asyncio.wait([])) 2296 2297 def test_corowrapper_mocks_generator(self): 2298 2299 def check(): 2300 # A function that asserts various things. 2301 # Called twice, with different debug flag values. 2302 2303 with self.assertWarns(DeprecationWarning): 2304 @asyncio.coroutine 2305 def coro(): 2306 # The actual coroutine. 2307 self.assertTrue(gen.gi_running) 2308 yield from fut 2309 2310 # A completed Future used to run the coroutine. 2311 fut = self.new_future(self.loop) 2312 fut.set_result(None) 2313 2314 # Call the coroutine. 2315 gen = coro() 2316 2317 # Check some properties. 2318 self.assertTrue(asyncio.iscoroutine(gen)) 2319 self.assertIsInstance(gen.gi_frame, types.FrameType) 2320 self.assertFalse(gen.gi_running) 2321 self.assertIsInstance(gen.gi_code, types.CodeType) 2322 2323 # Run it. 2324 self.loop.run_until_complete(gen) 2325 2326 # The frame should have changed. 2327 self.assertIsNone(gen.gi_frame) 2328 2329 # Test with debug flag cleared. 2330 with set_coroutine_debug(False): 2331 check() 2332 2333 # Test with debug flag set. 2334 with set_coroutine_debug(True): 2335 check() 2336 2337 def test_yield_from_corowrapper(self): 2338 with set_coroutine_debug(True): 2339 with self.assertWarns(DeprecationWarning): 2340 @asyncio.coroutine 2341 def t1(): 2342 return (yield from t2()) 2343 2344 with self.assertWarns(DeprecationWarning): 2345 @asyncio.coroutine 2346 def t2(): 2347 f = self.new_future(self.loop) 2348 self.new_task(self.loop, t3(f)) 2349 return (yield from f) 2350 2351 with self.assertWarns(DeprecationWarning): 2352 @asyncio.coroutine 2353 def t3(f): 2354 f.set_result((1, 2, 3)) 2355 2356 task = self.new_task(self.loop, t1()) 2357 val = self.loop.run_until_complete(task) 2358 self.assertEqual(val, (1, 2, 3)) 2359 2360 def test_yield_from_corowrapper_send(self): 2361 def foo(): 2362 a = yield 2363 return a 2364 2365 def call(arg): 2366 cw = asyncio.coroutines.CoroWrapper(foo()) 2367 cw.send(None) 2368 try: 2369 cw.send(arg) 2370 except StopIteration as ex: 2371 return ex.args[0] 2372 else: 2373 raise AssertionError('StopIteration was expected') 2374 2375 self.assertEqual(call((1, 2)), (1, 2)) 2376 self.assertEqual(call('spam'), 'spam') 2377 2378 def test_corowrapper_weakref(self): 2379 wd = weakref.WeakValueDictionary() 2380 def foo(): yield from [] 2381 cw = asyncio.coroutines.CoroWrapper(foo()) 2382 wd['cw'] = cw # Would fail without __weakref__ slot. 2383 cw.gen = None # Suppress warning from __del__. 2384 2385 def test_corowrapper_throw(self): 2386 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 2387 def foo(): 2388 value = None 2389 while True: 2390 try: 2391 value = yield value 2392 except Exception as e: 2393 value = e 2394 2395 exception = Exception("foo") 2396 cw = asyncio.coroutines.CoroWrapper(foo()) 2397 cw.send(None) 2398 self.assertIs(exception, cw.throw(exception)) 2399 2400 cw = asyncio.coroutines.CoroWrapper(foo()) 2401 cw.send(None) 2402 self.assertIs(exception, cw.throw(Exception, exception)) 2403 2404 cw = asyncio.coroutines.CoroWrapper(foo()) 2405 cw.send(None) 2406 exception = cw.throw(Exception, "foo") 2407 self.assertIsInstance(exception, Exception) 2408 self.assertEqual(exception.args, ("foo", )) 2409 2410 cw = asyncio.coroutines.CoroWrapper(foo()) 2411 cw.send(None) 2412 exception = cw.throw(Exception, "foo", None) 2413 self.assertIsInstance(exception, Exception) 2414 self.assertEqual(exception.args, ("foo", )) 2415 2416 def test_log_destroyed_pending_task(self): 2417 Task = self.__class__.Task 2418 2419 with self.assertWarns(DeprecationWarning): 2420 @asyncio.coroutine 2421 def kill_me(loop): 2422 future = self.new_future(loop) 2423 yield from future 2424 # at this point, the only reference to kill_me() task is 2425 # the Task._wakeup() method in future._callbacks 2426 raise Exception("code never reached") 2427 2428 mock_handler = mock.Mock() 2429 self.loop.set_debug(True) 2430 self.loop.set_exception_handler(mock_handler) 2431 2432 # schedule the task 2433 coro = kill_me(self.loop) 2434 task = asyncio.ensure_future(coro, loop=self.loop) 2435 2436 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2437 2438 asyncio.set_event_loop(None) 2439 2440 # execute the task so it waits for future 2441 self.loop._run_once() 2442 self.assertEqual(len(self.loop._ready), 0) 2443 2444 # remove the future used in kill_me(), and references to the task 2445 del coro.gi_frame.f_locals['future'] 2446 coro = None 2447 source_traceback = task._source_traceback 2448 task = None 2449 2450 # no more reference to kill_me() task: the task is destroyed by the GC 2451 support.gc_collect() 2452 2453 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2454 2455 mock_handler.assert_called_with(self.loop, { 2456 'message': 'Task was destroyed but it is pending!', 2457 'task': mock.ANY, 2458 'source_traceback': source_traceback, 2459 }) 2460 mock_handler.reset_mock() 2461 2462 @mock.patch('asyncio.base_events.logger') 2463 def test_tb_logger_not_called_after_cancel(self, m_log): 2464 loop = asyncio.new_event_loop() 2465 self.set_event_loop(loop) 2466 2467 async def coro(): 2468 raise TypeError 2469 2470 async def runner(): 2471 task = self.new_task(loop, coro()) 2472 await asyncio.sleep(0.05) 2473 task.cancel() 2474 task = None 2475 2476 loop.run_until_complete(runner()) 2477 self.assertFalse(m_log.error.called) 2478 2479 @mock.patch('asyncio.coroutines.logger') 2480 def test_coroutine_never_yielded(self, m_log): 2481 with set_coroutine_debug(True): 2482 with self.assertWarns(DeprecationWarning): 2483 @asyncio.coroutine 2484 def coro_noop(): 2485 pass 2486 2487 tb_filename = __file__ 2488 tb_lineno = sys._getframe().f_lineno + 2 2489 # create a coroutine object but don't use it 2490 coro_noop() 2491 support.gc_collect() 2492 2493 self.assertTrue(m_log.error.called) 2494 message = m_log.error.call_args[0][0] 2495 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2496 2497 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2498 r'was never yielded from\n' 2499 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2500 r'.*\n' 2501 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2502 r' coro_noop\(\)$' 2503 % (re.escape(coro_noop.__qualname__), 2504 re.escape(func_filename), func_lineno, 2505 re.escape(tb_filename), tb_lineno)) 2506 2507 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2508 2509 def test_return_coroutine_from_coroutine(self): 2510 """Return of @asyncio.coroutine()-wrapped function generator object 2511 from @asyncio.coroutine()-wrapped function should have same effect as 2512 returning generator object or Future.""" 2513 def check(): 2514 with self.assertWarns(DeprecationWarning): 2515 @asyncio.coroutine 2516 def outer_coro(): 2517 with self.assertWarns(DeprecationWarning): 2518 @asyncio.coroutine 2519 def inner_coro(): 2520 return 1 2521 2522 return inner_coro() 2523 2524 result = self.loop.run_until_complete(outer_coro()) 2525 self.assertEqual(result, 1) 2526 2527 # Test with debug flag cleared. 2528 with set_coroutine_debug(False): 2529 check() 2530 2531 # Test with debug flag set. 2532 with set_coroutine_debug(True): 2533 check() 2534 2535 def test_task_source_traceback(self): 2536 self.loop.set_debug(True) 2537 2538 task = self.new_task(self.loop, coroutine_function()) 2539 lineno = sys._getframe().f_lineno - 1 2540 self.assertIsInstance(task._source_traceback, list) 2541 self.assertEqual(task._source_traceback[-2][:3], 2542 (__file__, 2543 lineno, 2544 'test_task_source_traceback')) 2545 self.loop.run_until_complete(task) 2546 2547 def _test_cancel_wait_for(self, timeout): 2548 loop = asyncio.new_event_loop() 2549 self.addCleanup(loop.close) 2550 2551 async def blocking_coroutine(): 2552 fut = self.new_future(loop) 2553 # Block: fut result is never set 2554 await fut 2555 2556 task = loop.create_task(blocking_coroutine()) 2557 2558 wait = loop.create_task(asyncio.wait_for(task, timeout)) 2559 loop.call_soon(wait.cancel) 2560 2561 self.assertRaises(asyncio.CancelledError, 2562 loop.run_until_complete, wait) 2563 2564 # Python issue #23219: cancelling the wait must also cancel the task 2565 self.assertTrue(task.cancelled()) 2566 2567 def test_cancel_blocking_wait_for(self): 2568 self._test_cancel_wait_for(None) 2569 2570 def test_cancel_wait_for(self): 2571 self._test_cancel_wait_for(60.0) 2572 2573 def test_cancel_gather_1(self): 2574 """Ensure that a gathering future refuses to be cancelled once all 2575 children are done""" 2576 loop = asyncio.new_event_loop() 2577 self.addCleanup(loop.close) 2578 2579 fut = self.new_future(loop) 2580 async def create(): 2581 # The indirection fut->child_coro is needed since otherwise the 2582 # gathering task is done at the same time as the child future 2583 def child_coro(): 2584 return (yield from fut) 2585 gather_future = asyncio.gather(child_coro()) 2586 return asyncio.ensure_future(gather_future) 2587 gather_task = loop.run_until_complete(create()) 2588 2589 cancel_result = None 2590 def cancelling_callback(_): 2591 nonlocal cancel_result 2592 cancel_result = gather_task.cancel() 2593 fut.add_done_callback(cancelling_callback) 2594 2595 fut.set_result(42) # calls the cancelling_callback after fut is done() 2596 2597 # At this point the task should complete. 2598 loop.run_until_complete(gather_task) 2599 2600 # Python issue #26923: asyncio.gather drops cancellation 2601 self.assertEqual(cancel_result, False) 2602 self.assertFalse(gather_task.cancelled()) 2603 self.assertEqual(gather_task.result(), [42]) 2604 2605 def test_cancel_gather_2(self): 2606 cases = [ 2607 ((), ()), 2608 ((None,), ()), 2609 (('my message',), ('my message',)), 2610 # Non-string values should roundtrip. 2611 ((5,), (5,)), 2612 ] 2613 for cancel_args, expected_args in cases: 2614 with self.subTest(cancel_args=cancel_args): 2615 loop = asyncio.new_event_loop() 2616 self.addCleanup(loop.close) 2617 2618 async def test(): 2619 time = 0 2620 while True: 2621 time += 0.05 2622 await asyncio.gather(asyncio.sleep(0.05), 2623 return_exceptions=True) 2624 if time > 1: 2625 return 2626 2627 async def main(): 2628 qwe = self.new_task(loop, test()) 2629 await asyncio.sleep(0.2) 2630 qwe.cancel(*cancel_args) 2631 await qwe 2632 2633 try: 2634 loop.run_until_complete(main()) 2635 except asyncio.CancelledError as exc: 2636 self.assertEqual(exc.args, ()) 2637 exc_type, exc_args, depth = get_innermost_context(exc) 2638 self.assertEqual((exc_type, exc_args), 2639 (asyncio.CancelledError, expected_args)) 2640 # The exact traceback seems to vary in CI. 2641 self.assertIn(depth, (2, 3)) 2642 else: 2643 self.fail('gather did not propagate the cancellation ' 2644 'request') 2645 2646 def test_exception_traceback(self): 2647 # See http://bugs.python.org/issue28843 2648 2649 async def foo(): 2650 1 / 0 2651 2652 async def main(): 2653 task = self.new_task(self.loop, foo()) 2654 await asyncio.sleep(0) # skip one loop iteration 2655 self.assertIsNotNone(task.exception().__traceback__) 2656 2657 self.loop.run_until_complete(main()) 2658 2659 @mock.patch('asyncio.base_events.logger') 2660 def test_error_in_call_soon(self, m_log): 2661 def call_soon(callback, *args, **kwargs): 2662 raise ValueError 2663 self.loop.call_soon = call_soon 2664 2665 with self.assertWarns(DeprecationWarning): 2666 @asyncio.coroutine 2667 def coro(): 2668 pass 2669 2670 self.assertFalse(m_log.error.called) 2671 2672 with self.assertRaises(ValueError): 2673 gen = coro() 2674 try: 2675 self.new_task(self.loop, gen) 2676 finally: 2677 gen.close() 2678 gc.collect() # For PyPy or other GCs. 2679 2680 self.assertTrue(m_log.error.called) 2681 message = m_log.error.call_args[0][0] 2682 self.assertIn('Task was destroyed but it is pending', message) 2683 2684 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2685 2686 def test_create_task_with_noncoroutine(self): 2687 with self.assertRaisesRegex(TypeError, 2688 "a coroutine was expected, got 123"): 2689 self.new_task(self.loop, 123) 2690 2691 # test it for the second time to ensure that caching 2692 # in asyncio.iscoroutine() doesn't break things. 2693 with self.assertRaisesRegex(TypeError, 2694 "a coroutine was expected, got 123"): 2695 self.new_task(self.loop, 123) 2696 2697 def test_create_task_with_oldstyle_coroutine(self): 2698 2699 with self.assertWarns(DeprecationWarning): 2700 @asyncio.coroutine 2701 def coro(): 2702 pass 2703 2704 task = self.new_task(self.loop, coro()) 2705 self.assertIsInstance(task, self.Task) 2706 self.loop.run_until_complete(task) 2707 2708 # test it for the second time to ensure that caching 2709 # in asyncio.iscoroutine() doesn't break things. 2710 task = self.new_task(self.loop, coro()) 2711 self.assertIsInstance(task, self.Task) 2712 self.loop.run_until_complete(task) 2713 2714 def test_create_task_with_async_function(self): 2715 2716 async def coro(): 2717 pass 2718 2719 task = self.new_task(self.loop, coro()) 2720 self.assertIsInstance(task, self.Task) 2721 self.loop.run_until_complete(task) 2722 2723 # test it for the second time to ensure that caching 2724 # in asyncio.iscoroutine() doesn't break things. 2725 task = self.new_task(self.loop, coro()) 2726 self.assertIsInstance(task, self.Task) 2727 self.loop.run_until_complete(task) 2728 2729 def test_create_task_with_asynclike_function(self): 2730 task = self.new_task(self.loop, CoroLikeObject()) 2731 self.assertIsInstance(task, self.Task) 2732 self.assertEqual(self.loop.run_until_complete(task), 42) 2733 2734 # test it for the second time to ensure that caching 2735 # in asyncio.iscoroutine() doesn't break things. 2736 task = self.new_task(self.loop, CoroLikeObject()) 2737 self.assertIsInstance(task, self.Task) 2738 self.assertEqual(self.loop.run_until_complete(task), 42) 2739 2740 def test_bare_create_task(self): 2741 2742 async def inner(): 2743 return 1 2744 2745 async def coro(): 2746 task = asyncio.create_task(inner()) 2747 self.assertIsInstance(task, self.Task) 2748 ret = await task 2749 self.assertEqual(1, ret) 2750 2751 self.loop.run_until_complete(coro()) 2752 2753 def test_bare_create_named_task(self): 2754 2755 async def coro_noop(): 2756 pass 2757 2758 async def coro(): 2759 task = asyncio.create_task(coro_noop(), name='No-op') 2760 self.assertEqual(task.get_name(), 'No-op') 2761 await task 2762 2763 self.loop.run_until_complete(coro()) 2764 2765 def test_context_1(self): 2766 cvar = contextvars.ContextVar('cvar', default='nope') 2767 2768 async def sub(): 2769 await asyncio.sleep(0.01) 2770 self.assertEqual(cvar.get(), 'nope') 2771 cvar.set('something else') 2772 2773 async def main(): 2774 self.assertEqual(cvar.get(), 'nope') 2775 subtask = self.new_task(loop, sub()) 2776 cvar.set('yes') 2777 self.assertEqual(cvar.get(), 'yes') 2778 await subtask 2779 self.assertEqual(cvar.get(), 'yes') 2780 2781 loop = asyncio.new_event_loop() 2782 try: 2783 task = self.new_task(loop, main()) 2784 loop.run_until_complete(task) 2785 finally: 2786 loop.close() 2787 2788 def test_context_2(self): 2789 cvar = contextvars.ContextVar('cvar', default='nope') 2790 2791 async def main(): 2792 def fut_on_done(fut): 2793 # This change must not pollute the context 2794 # of the "main()" task. 2795 cvar.set('something else') 2796 2797 self.assertEqual(cvar.get(), 'nope') 2798 2799 for j in range(2): 2800 fut = self.new_future(loop) 2801 fut.add_done_callback(fut_on_done) 2802 cvar.set(f'yes{j}') 2803 loop.call_soon(fut.set_result, None) 2804 await fut 2805 self.assertEqual(cvar.get(), f'yes{j}') 2806 2807 for i in range(3): 2808 # Test that task passed its context to add_done_callback: 2809 cvar.set(f'yes{i}-{j}') 2810 await asyncio.sleep(0.001) 2811 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2812 2813 loop = asyncio.new_event_loop() 2814 try: 2815 task = self.new_task(loop, main()) 2816 loop.run_until_complete(task) 2817 finally: 2818 loop.close() 2819 2820 self.assertEqual(cvar.get(), 'nope') 2821 2822 def test_context_3(self): 2823 # Run 100 Tasks in parallel, each modifying cvar. 2824 2825 cvar = contextvars.ContextVar('cvar', default=-1) 2826 2827 async def sub(num): 2828 for i in range(10): 2829 cvar.set(num + i) 2830 await asyncio.sleep(random.uniform(0.001, 0.05)) 2831 self.assertEqual(cvar.get(), num + i) 2832 2833 async def main(): 2834 tasks = [] 2835 for i in range(100): 2836 task = loop.create_task(sub(random.randint(0, 10))) 2837 tasks.append(task) 2838 2839 await asyncio.gather(*tasks) 2840 2841 loop = asyncio.new_event_loop() 2842 try: 2843 loop.run_until_complete(main()) 2844 finally: 2845 loop.close() 2846 2847 self.assertEqual(cvar.get(), -1) 2848 2849 def test_get_coro(self): 2850 loop = asyncio.new_event_loop() 2851 coro = coroutine_function() 2852 try: 2853 task = self.new_task(loop, coro) 2854 loop.run_until_complete(task) 2855 self.assertIs(task.get_coro(), coro) 2856 finally: 2857 loop.close() 2858 2859 2860def add_subclass_tests(cls): 2861 BaseTask = cls.Task 2862 BaseFuture = cls.Future 2863 2864 if BaseTask is None or BaseFuture is None: 2865 return cls 2866 2867 class CommonFuture: 2868 def __init__(self, *args, **kwargs): 2869 self.calls = collections.defaultdict(lambda: 0) 2870 super().__init__(*args, **kwargs) 2871 2872 def add_done_callback(self, *args, **kwargs): 2873 self.calls['add_done_callback'] += 1 2874 return super().add_done_callback(*args, **kwargs) 2875 2876 class Task(CommonFuture, BaseTask): 2877 pass 2878 2879 class Future(CommonFuture, BaseFuture): 2880 pass 2881 2882 def test_subclasses_ctask_cfuture(self): 2883 fut = self.Future(loop=self.loop) 2884 2885 async def func(): 2886 self.loop.call_soon(lambda: fut.set_result('spam')) 2887 return await fut 2888 2889 task = self.Task(func(), loop=self.loop) 2890 2891 result = self.loop.run_until_complete(task) 2892 2893 self.assertEqual(result, 'spam') 2894 2895 self.assertEqual( 2896 dict(task.calls), 2897 {'add_done_callback': 1}) 2898 2899 self.assertEqual( 2900 dict(fut.calls), 2901 {'add_done_callback': 1}) 2902 2903 # Add patched Task & Future back to the test case 2904 cls.Task = Task 2905 cls.Future = Future 2906 2907 # Add an extra unit-test 2908 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2909 2910 # Disable the "test_task_source_traceback" test 2911 # (the test is hardcoded for a particular call stack, which 2912 # is slightly different for Task subclasses) 2913 cls.test_task_source_traceback = None 2914 2915 return cls 2916 2917 2918class SetMethodsTest: 2919 2920 def test_set_result_causes_invalid_state(self): 2921 Future = type(self).Future 2922 self.loop.call_exception_handler = exc_handler = mock.Mock() 2923 2924 async def foo(): 2925 await asyncio.sleep(0.1) 2926 return 10 2927 2928 coro = foo() 2929 task = self.new_task(self.loop, coro) 2930 Future.set_result(task, 'spam') 2931 2932 self.assertEqual( 2933 self.loop.run_until_complete(task), 2934 'spam') 2935 2936 exc_handler.assert_called_once() 2937 exc = exc_handler.call_args[0][0]['exception'] 2938 with self.assertRaisesRegex(asyncio.InvalidStateError, 2939 r'step\(\): already done'): 2940 raise exc 2941 2942 coro.close() 2943 2944 def test_set_exception_causes_invalid_state(self): 2945 class MyExc(Exception): 2946 pass 2947 2948 Future = type(self).Future 2949 self.loop.call_exception_handler = exc_handler = mock.Mock() 2950 2951 async def foo(): 2952 await asyncio.sleep(0.1) 2953 return 10 2954 2955 coro = foo() 2956 task = self.new_task(self.loop, coro) 2957 Future.set_exception(task, MyExc()) 2958 2959 with self.assertRaises(MyExc): 2960 self.loop.run_until_complete(task) 2961 2962 exc_handler.assert_called_once() 2963 exc = exc_handler.call_args[0][0]['exception'] 2964 with self.assertRaisesRegex(asyncio.InvalidStateError, 2965 r'step\(\): already done'): 2966 raise exc 2967 2968 coro.close() 2969 2970 2971@unittest.skipUnless(hasattr(futures, '_CFuture') and 2972 hasattr(tasks, '_CTask'), 2973 'requires the C _asyncio module') 2974class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2975 test_utils.TestCase): 2976 2977 Task = getattr(tasks, '_CTask', None) 2978 Future = getattr(futures, '_CFuture', None) 2979 2980 @support.refcount_test 2981 def test_refleaks_in_task___init__(self): 2982 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2983 async def coro(): 2984 pass 2985 task = self.new_task(self.loop, coro()) 2986 self.loop.run_until_complete(task) 2987 refs_before = gettotalrefcount() 2988 for i in range(100): 2989 task.__init__(coro(), loop=self.loop) 2990 self.loop.run_until_complete(task) 2991 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2992 2993 def test_del__log_destroy_pending_segfault(self): 2994 async def coro(): 2995 pass 2996 task = self.new_task(self.loop, coro()) 2997 self.loop.run_until_complete(task) 2998 with self.assertRaises(AttributeError): 2999 del task._log_destroy_pending 3000 3001 3002@unittest.skipUnless(hasattr(futures, '_CFuture') and 3003 hasattr(tasks, '_CTask'), 3004 'requires the C _asyncio module') 3005@add_subclass_tests 3006class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 3007 3008 Task = getattr(tasks, '_CTask', None) 3009 Future = getattr(futures, '_CFuture', None) 3010 3011 3012@unittest.skipUnless(hasattr(tasks, '_CTask'), 3013 'requires the C _asyncio module') 3014@add_subclass_tests 3015class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 3016 3017 Task = getattr(tasks, '_CTask', None) 3018 Future = futures._PyFuture 3019 3020 3021@unittest.skipUnless(hasattr(futures, '_CFuture'), 3022 'requires the C _asyncio module') 3023@add_subclass_tests 3024class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 3025 3026 Future = getattr(futures, '_CFuture', None) 3027 Task = tasks._PyTask 3028 3029 3030@unittest.skipUnless(hasattr(tasks, '_CTask'), 3031 'requires the C _asyncio module') 3032class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 3033 3034 Task = getattr(tasks, '_CTask', None) 3035 Future = futures._PyFuture 3036 3037 3038@unittest.skipUnless(hasattr(futures, '_CFuture'), 3039 'requires the C _asyncio module') 3040class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 3041 3042 Task = tasks._PyTask 3043 Future = getattr(futures, '_CFuture', None) 3044 3045 3046class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 3047 test_utils.TestCase): 3048 3049 Task = tasks._PyTask 3050 Future = futures._PyFuture 3051 3052 3053@add_subclass_tests 3054class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 3055 Task = tasks._PyTask 3056 Future = futures._PyFuture 3057 3058 3059@unittest.skipUnless(hasattr(tasks, '_CTask'), 3060 'requires the C _asyncio module') 3061class CTask_Future_Tests(test_utils.TestCase): 3062 3063 def test_foobar(self): 3064 class Fut(asyncio.Future): 3065 @property 3066 def get_loop(self): 3067 raise AttributeError 3068 3069 async def coro(): 3070 await fut 3071 return 'spam' 3072 3073 self.loop = asyncio.new_event_loop() 3074 try: 3075 fut = Fut(loop=self.loop) 3076 self.loop.call_later(0.1, fut.set_result, 1) 3077 task = self.loop.create_task(coro()) 3078 res = self.loop.run_until_complete(task) 3079 finally: 3080 self.loop.close() 3081 3082 self.assertEqual(res, 'spam') 3083 3084 3085class BaseTaskIntrospectionTests: 3086 _register_task = None 3087 _unregister_task = None 3088 _enter_task = None 3089 _leave_task = None 3090 3091 def test__register_task_1(self): 3092 class TaskLike: 3093 @property 3094 def _loop(self): 3095 return loop 3096 3097 def done(self): 3098 return False 3099 3100 task = TaskLike() 3101 loop = mock.Mock() 3102 3103 self.assertEqual(asyncio.all_tasks(loop), set()) 3104 self._register_task(task) 3105 self.assertEqual(asyncio.all_tasks(loop), {task}) 3106 self._unregister_task(task) 3107 3108 def test__register_task_2(self): 3109 class TaskLike: 3110 def get_loop(self): 3111 return loop 3112 3113 def done(self): 3114 return False 3115 3116 task = TaskLike() 3117 loop = mock.Mock() 3118 3119 self.assertEqual(asyncio.all_tasks(loop), set()) 3120 self._register_task(task) 3121 self.assertEqual(asyncio.all_tasks(loop), {task}) 3122 self._unregister_task(task) 3123 3124 def test__register_task_3(self): 3125 class TaskLike: 3126 def get_loop(self): 3127 return loop 3128 3129 def done(self): 3130 return True 3131 3132 task = TaskLike() 3133 loop = mock.Mock() 3134 3135 self.assertEqual(asyncio.all_tasks(loop), set()) 3136 self._register_task(task) 3137 self.assertEqual(asyncio.all_tasks(loop), set()) 3138 self._unregister_task(task) 3139 3140 def test__enter_task(self): 3141 task = mock.Mock() 3142 loop = mock.Mock() 3143 self.assertIsNone(asyncio.current_task(loop)) 3144 self._enter_task(loop, task) 3145 self.assertIs(asyncio.current_task(loop), task) 3146 self._leave_task(loop, task) 3147 3148 def test__enter_task_failure(self): 3149 task1 = mock.Mock() 3150 task2 = mock.Mock() 3151 loop = mock.Mock() 3152 self._enter_task(loop, task1) 3153 with self.assertRaises(RuntimeError): 3154 self._enter_task(loop, task2) 3155 self.assertIs(asyncio.current_task(loop), task1) 3156 self._leave_task(loop, task1) 3157 3158 def test__leave_task(self): 3159 task = mock.Mock() 3160 loop = mock.Mock() 3161 self._enter_task(loop, task) 3162 self._leave_task(loop, task) 3163 self.assertIsNone(asyncio.current_task(loop)) 3164 3165 def test__leave_task_failure1(self): 3166 task1 = mock.Mock() 3167 task2 = mock.Mock() 3168 loop = mock.Mock() 3169 self._enter_task(loop, task1) 3170 with self.assertRaises(RuntimeError): 3171 self._leave_task(loop, task2) 3172 self.assertIs(asyncio.current_task(loop), task1) 3173 self._leave_task(loop, task1) 3174 3175 def test__leave_task_failure2(self): 3176 task = mock.Mock() 3177 loop = mock.Mock() 3178 with self.assertRaises(RuntimeError): 3179 self._leave_task(loop, task) 3180 self.assertIsNone(asyncio.current_task(loop)) 3181 3182 def test__unregister_task(self): 3183 task = mock.Mock() 3184 loop = mock.Mock() 3185 task.get_loop = lambda: loop 3186 self._register_task(task) 3187 self._unregister_task(task) 3188 self.assertEqual(asyncio.all_tasks(loop), set()) 3189 3190 def test__unregister_task_not_registered(self): 3191 task = mock.Mock() 3192 loop = mock.Mock() 3193 self._unregister_task(task) 3194 self.assertEqual(asyncio.all_tasks(loop), set()) 3195 3196 3197class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3198 _register_task = staticmethod(tasks._py_register_task) 3199 _unregister_task = staticmethod(tasks._py_unregister_task) 3200 _enter_task = staticmethod(tasks._py_enter_task) 3201 _leave_task = staticmethod(tasks._py_leave_task) 3202 3203 3204@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 3205 'requires the C _asyncio module') 3206class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3207 if hasattr(tasks, '_c_register_task'): 3208 _register_task = staticmethod(tasks._c_register_task) 3209 _unregister_task = staticmethod(tasks._c_unregister_task) 3210 _enter_task = staticmethod(tasks._c_enter_task) 3211 _leave_task = staticmethod(tasks._c_leave_task) 3212 else: 3213 _register_task = _unregister_task = _enter_task = _leave_task = None 3214 3215 3216class BaseCurrentLoopTests: 3217 3218 def setUp(self): 3219 super().setUp() 3220 self.loop = asyncio.new_event_loop() 3221 self.set_event_loop(self.loop) 3222 3223 def new_task(self, coro): 3224 raise NotImplementedError 3225 3226 def test_current_task_no_running_loop(self): 3227 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3228 3229 def test_current_task_no_running_loop_implicit(self): 3230 with self.assertRaises(RuntimeError): 3231 asyncio.current_task() 3232 3233 def test_current_task_with_implicit_loop(self): 3234 async def coro(): 3235 self.assertIs(asyncio.current_task(loop=self.loop), task) 3236 3237 self.assertIs(asyncio.current_task(None), task) 3238 self.assertIs(asyncio.current_task(), task) 3239 3240 task = self.new_task(coro()) 3241 self.loop.run_until_complete(task) 3242 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3243 3244 3245class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3246 3247 def new_task(self, coro): 3248 return tasks._PyTask(coro, loop=self.loop) 3249 3250 3251@unittest.skipUnless(hasattr(tasks, '_CTask'), 3252 'requires the C _asyncio module') 3253class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3254 3255 def new_task(self, coro): 3256 return getattr(tasks, '_CTask')(coro, loop=self.loop) 3257 3258 3259class GenericTaskTests(test_utils.TestCase): 3260 3261 def test_future_subclass(self): 3262 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 3263 3264 @support.cpython_only 3265 def test_asyncio_module_compiled(self): 3266 # Because of circular imports it's easy to make _asyncio 3267 # module non-importable. This is a simple test that will 3268 # fail on systems where C modules were successfully compiled 3269 # (hence the test for _functools etc), but _asyncio somehow didn't. 3270 try: 3271 import _functools 3272 import _json 3273 import _pickle 3274 except ImportError: 3275 self.skipTest('C modules are not available') 3276 else: 3277 try: 3278 import _asyncio 3279 except ImportError: 3280 self.fail('_asyncio module is missing') 3281 3282 3283class GatherTestsBase: 3284 3285 def setUp(self): 3286 super().setUp() 3287 self.one_loop = self.new_test_loop() 3288 self.other_loop = self.new_test_loop() 3289 self.set_event_loop(self.one_loop, cleanup=False) 3290 3291 def _run_loop(self, loop): 3292 while loop._ready: 3293 test_utils.run_briefly(loop) 3294 3295 def _check_success(self, **kwargs): 3296 a, b, c = [self.one_loop.create_future() for i in range(3)] 3297 fut = self._gather(*self.wrap_futures(a, b, c), **kwargs) 3298 cb = test_utils.MockCallback() 3299 fut.add_done_callback(cb) 3300 b.set_result(1) 3301 a.set_result(2) 3302 self._run_loop(self.one_loop) 3303 self.assertEqual(cb.called, False) 3304 self.assertFalse(fut.done()) 3305 c.set_result(3) 3306 self._run_loop(self.one_loop) 3307 cb.assert_called_once_with(fut) 3308 self.assertEqual(fut.result(), [2, 1, 3]) 3309 3310 def test_success(self): 3311 self._check_success() 3312 self._check_success(return_exceptions=False) 3313 3314 def test_result_exception_success(self): 3315 self._check_success(return_exceptions=True) 3316 3317 def test_one_exception(self): 3318 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3319 fut = self._gather(*self.wrap_futures(a, b, c, d, e)) 3320 cb = test_utils.MockCallback() 3321 fut.add_done_callback(cb) 3322 exc = ZeroDivisionError() 3323 a.set_result(1) 3324 b.set_exception(exc) 3325 self._run_loop(self.one_loop) 3326 self.assertTrue(fut.done()) 3327 cb.assert_called_once_with(fut) 3328 self.assertIs(fut.exception(), exc) 3329 # Does nothing 3330 c.set_result(3) 3331 d.cancel() 3332 e.set_exception(RuntimeError()) 3333 e.exception() 3334 3335 def test_return_exceptions(self): 3336 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 3337 fut = self._gather(*self.wrap_futures(a, b, c, d), 3338 return_exceptions=True) 3339 cb = test_utils.MockCallback() 3340 fut.add_done_callback(cb) 3341 exc = ZeroDivisionError() 3342 exc2 = RuntimeError() 3343 b.set_result(1) 3344 c.set_exception(exc) 3345 a.set_result(3) 3346 self._run_loop(self.one_loop) 3347 self.assertFalse(fut.done()) 3348 d.set_exception(exc2) 3349 self._run_loop(self.one_loop) 3350 self.assertTrue(fut.done()) 3351 cb.assert_called_once_with(fut) 3352 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 3353 3354 def test_env_var_debug(self): 3355 code = '\n'.join(( 3356 'import asyncio.coroutines', 3357 'print(asyncio.coroutines._DEBUG)')) 3358 3359 # Test with -E to not fail if the unit test was run with 3360 # PYTHONASYNCIODEBUG set to a non-empty string 3361 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 3362 self.assertEqual(stdout.rstrip(), b'False') 3363 3364 sts, stdout, stderr = assert_python_ok('-c', code, 3365 PYTHONASYNCIODEBUG='', 3366 PYTHONDEVMODE='') 3367 self.assertEqual(stdout.rstrip(), b'False') 3368 3369 sts, stdout, stderr = assert_python_ok('-c', code, 3370 PYTHONASYNCIODEBUG='1', 3371 PYTHONDEVMODE='') 3372 self.assertEqual(stdout.rstrip(), b'True') 3373 3374 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 3375 PYTHONASYNCIODEBUG='1', 3376 PYTHONDEVMODE='') 3377 self.assertEqual(stdout.rstrip(), b'False') 3378 3379 # -X dev 3380 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 3381 '-c', code) 3382 self.assertEqual(stdout.rstrip(), b'True') 3383 3384 3385class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 3386 3387 def wrap_futures(self, *futures): 3388 return futures 3389 3390 def _gather(self, *args, **kwargs): 3391 return asyncio.gather(*args, **kwargs) 3392 3393 def test_constructor_empty_sequence_without_loop(self): 3394 with self.assertWarns(DeprecationWarning) as cm: 3395 with self.assertRaises(RuntimeError): 3396 asyncio.gather() 3397 self.assertEqual(cm.warnings[0].filename, __file__) 3398 3399 def test_constructor_empty_sequence_use_running_loop(self): 3400 async def gather(): 3401 return asyncio.gather() 3402 fut = self.one_loop.run_until_complete(gather()) 3403 self.assertIsInstance(fut, asyncio.Future) 3404 self.assertIs(fut._loop, self.one_loop) 3405 self._run_loop(self.one_loop) 3406 self.assertTrue(fut.done()) 3407 self.assertEqual(fut.result(), []) 3408 3409 def test_constructor_empty_sequence_use_global_loop(self): 3410 # Deprecated in 3.10 3411 asyncio.set_event_loop(self.one_loop) 3412 self.addCleanup(asyncio.set_event_loop, None) 3413 with self.assertWarns(DeprecationWarning) as cm: 3414 fut = asyncio.gather() 3415 self.assertEqual(cm.warnings[0].filename, __file__) 3416 self.assertIsInstance(fut, asyncio.Future) 3417 self.assertIs(fut._loop, self.one_loop) 3418 self._run_loop(self.one_loop) 3419 self.assertTrue(fut.done()) 3420 self.assertEqual(fut.result(), []) 3421 3422 def test_constructor_heterogenous_futures(self): 3423 fut1 = self.one_loop.create_future() 3424 fut2 = self.other_loop.create_future() 3425 with self.assertRaises(ValueError): 3426 asyncio.gather(fut1, fut2) 3427 3428 def test_constructor_homogenous_futures(self): 3429 children = [self.other_loop.create_future() for i in range(3)] 3430 fut = asyncio.gather(*children) 3431 self.assertIs(fut._loop, self.other_loop) 3432 self._run_loop(self.other_loop) 3433 self.assertFalse(fut.done()) 3434 fut = asyncio.gather(*children) 3435 self.assertIs(fut._loop, self.other_loop) 3436 self._run_loop(self.other_loop) 3437 self.assertFalse(fut.done()) 3438 3439 def test_one_cancellation(self): 3440 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3441 fut = asyncio.gather(a, b, c, d, e) 3442 cb = test_utils.MockCallback() 3443 fut.add_done_callback(cb) 3444 a.set_result(1) 3445 b.cancel() 3446 self._run_loop(self.one_loop) 3447 self.assertTrue(fut.done()) 3448 cb.assert_called_once_with(fut) 3449 self.assertFalse(fut.cancelled()) 3450 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3451 # Does nothing 3452 c.set_result(3) 3453 d.cancel() 3454 e.set_exception(RuntimeError()) 3455 e.exception() 3456 3457 def test_result_exception_one_cancellation(self): 3458 a, b, c, d, e, f = [self.one_loop.create_future() 3459 for i in range(6)] 3460 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3461 cb = test_utils.MockCallback() 3462 fut.add_done_callback(cb) 3463 a.set_result(1) 3464 zde = ZeroDivisionError() 3465 b.set_exception(zde) 3466 c.cancel() 3467 self._run_loop(self.one_loop) 3468 self.assertFalse(fut.done()) 3469 d.set_result(3) 3470 e.cancel() 3471 rte = RuntimeError() 3472 f.set_exception(rte) 3473 res = self.one_loop.run_until_complete(fut) 3474 self.assertIsInstance(res[2], asyncio.CancelledError) 3475 self.assertIsInstance(res[4], asyncio.CancelledError) 3476 res[2] = res[4] = None 3477 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3478 cb.assert_called_once_with(fut) 3479 3480 3481class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3482 3483 def wrap_futures(self, *futures): 3484 coros = [] 3485 for fut in futures: 3486 async def coro(fut=fut): 3487 return await fut 3488 coros.append(coro()) 3489 return coros 3490 3491 def _gather(self, *args, **kwargs): 3492 async def coro(): 3493 return asyncio.gather(*args, **kwargs) 3494 return self.one_loop.run_until_complete(coro()) 3495 3496 def test_constructor_without_loop(self): 3497 async def coro(): 3498 return 'abc' 3499 gen1 = coro() 3500 self.addCleanup(gen1.close) 3501 gen2 = coro() 3502 self.addCleanup(gen2.close) 3503 with self.assertWarns(DeprecationWarning) as cm: 3504 with self.assertRaises(RuntimeError): 3505 asyncio.gather(gen1, gen2) 3506 self.assertEqual(cm.warnings[0].filename, __file__) 3507 3508 def test_constructor_use_running_loop(self): 3509 async def coro(): 3510 return 'abc' 3511 gen1 = coro() 3512 gen2 = coro() 3513 async def gather(): 3514 return asyncio.gather(gen1, gen2) 3515 fut = self.one_loop.run_until_complete(gather()) 3516 self.assertIs(fut._loop, self.one_loop) 3517 self.one_loop.run_until_complete(fut) 3518 3519 def test_constructor_use_global_loop(self): 3520 # Deprecated in 3.10 3521 async def coro(): 3522 return 'abc' 3523 asyncio.set_event_loop(self.other_loop) 3524 self.addCleanup(asyncio.set_event_loop, None) 3525 gen1 = coro() 3526 gen2 = coro() 3527 with self.assertWarns(DeprecationWarning) as cm: 3528 fut = asyncio.gather(gen1, gen2) 3529 self.assertEqual(cm.warnings[0].filename, __file__) 3530 self.assertIs(fut._loop, self.other_loop) 3531 self.other_loop.run_until_complete(fut) 3532 3533 def test_duplicate_coroutines(self): 3534 with self.assertWarns(DeprecationWarning): 3535 @asyncio.coroutine 3536 def coro(s): 3537 return s 3538 c = coro('abc') 3539 fut = self._gather(c, c, coro('def'), c) 3540 self._run_loop(self.one_loop) 3541 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3542 3543 def test_cancellation_broadcast(self): 3544 # Cancelling outer() cancels all children. 3545 proof = 0 3546 waiter = self.one_loop.create_future() 3547 3548 async def inner(): 3549 nonlocal proof 3550 await waiter 3551 proof += 1 3552 3553 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3554 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3555 gatherer = None 3556 3557 async def outer(): 3558 nonlocal proof, gatherer 3559 gatherer = asyncio.gather(child1, child2) 3560 await gatherer 3561 proof += 100 3562 3563 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3564 test_utils.run_briefly(self.one_loop) 3565 self.assertTrue(f.cancel()) 3566 with self.assertRaises(asyncio.CancelledError): 3567 self.one_loop.run_until_complete(f) 3568 self.assertFalse(gatherer.cancel()) 3569 self.assertTrue(waiter.cancelled()) 3570 self.assertTrue(child1.cancelled()) 3571 self.assertTrue(child2.cancelled()) 3572 test_utils.run_briefly(self.one_loop) 3573 self.assertEqual(proof, 0) 3574 3575 def test_exception_marking(self): 3576 # Test for the first line marked "Mark exception retrieved." 3577 3578 async def inner(f): 3579 await f 3580 raise RuntimeError('should not be ignored') 3581 3582 a = self.one_loop.create_future() 3583 b = self.one_loop.create_future() 3584 3585 async def outer(): 3586 await asyncio.gather(inner(a), inner(b)) 3587 3588 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3589 test_utils.run_briefly(self.one_loop) 3590 a.set_result(None) 3591 test_utils.run_briefly(self.one_loop) 3592 b.set_result(None) 3593 test_utils.run_briefly(self.one_loop) 3594 self.assertIsInstance(f.exception(), RuntimeError) 3595 3596 def test_issue46672(self): 3597 with mock.patch( 3598 'asyncio.base_events.BaseEventLoop.call_exception_handler', 3599 ): 3600 async def coro(s): 3601 return s 3602 c = coro('abc') 3603 3604 with self.assertRaises(TypeError): 3605 self._gather(c, {}) 3606 self._run_loop(self.one_loop) 3607 # NameError should not happen: 3608 self.one_loop.call_exception_handler.assert_not_called() 3609 3610 3611class RunCoroutineThreadsafeTests(test_utils.TestCase): 3612 """Test case for asyncio.run_coroutine_threadsafe.""" 3613 3614 def setUp(self): 3615 super().setUp() 3616 self.loop = asyncio.new_event_loop() 3617 self.set_event_loop(self.loop) # Will cleanup properly 3618 3619 async def add(self, a, b, fail=False, cancel=False): 3620 """Wait 0.05 second and return a + b.""" 3621 await asyncio.sleep(0.05) 3622 if fail: 3623 raise RuntimeError("Fail!") 3624 if cancel: 3625 asyncio.current_task(self.loop).cancel() 3626 await asyncio.sleep(0) 3627 return a + b 3628 3629 def target(self, fail=False, cancel=False, timeout=None, 3630 advance_coro=False): 3631 """Run add coroutine in the event loop.""" 3632 coro = self.add(1, 2, fail=fail, cancel=cancel) 3633 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3634 if advance_coro: 3635 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3636 # otherwise it spills errors and breaks **other** unittests, since 3637 # 'target' is interacting with threads. 3638 3639 # With this call, `coro` will be advanced, so that 3640 # CoroWrapper.__del__ won't do anything when asyncio tests run 3641 # in debug mode. 3642 self.loop.call_soon_threadsafe(coro.send, None) 3643 try: 3644 return future.result(timeout) 3645 finally: 3646 future.done() or future.cancel() 3647 3648 def test_run_coroutine_threadsafe(self): 3649 """Test coroutine submission from a thread to an event loop.""" 3650 future = self.loop.run_in_executor(None, self.target) 3651 result = self.loop.run_until_complete(future) 3652 self.assertEqual(result, 3) 3653 3654 def test_run_coroutine_threadsafe_with_exception(self): 3655 """Test coroutine submission from a thread to an event loop 3656 when an exception is raised.""" 3657 future = self.loop.run_in_executor(None, self.target, True) 3658 with self.assertRaises(RuntimeError) as exc_context: 3659 self.loop.run_until_complete(future) 3660 self.assertIn("Fail!", exc_context.exception.args) 3661 3662 def test_run_coroutine_threadsafe_with_timeout(self): 3663 """Test coroutine submission from a thread to an event loop 3664 when a timeout is raised.""" 3665 callback = lambda: self.target(timeout=0) 3666 future = self.loop.run_in_executor(None, callback) 3667 with self.assertRaises(asyncio.TimeoutError): 3668 self.loop.run_until_complete(future) 3669 test_utils.run_briefly(self.loop) 3670 # Check that there's no pending task (add has been cancelled) 3671 for task in asyncio.all_tasks(self.loop): 3672 self.assertTrue(task.done()) 3673 3674 def test_run_coroutine_threadsafe_task_cancelled(self): 3675 """Test coroutine submission from a thread to an event loop 3676 when the task is cancelled.""" 3677 callback = lambda: self.target(cancel=True) 3678 future = self.loop.run_in_executor(None, callback) 3679 with self.assertRaises(asyncio.CancelledError): 3680 self.loop.run_until_complete(future) 3681 3682 def test_run_coroutine_threadsafe_task_factory_exception(self): 3683 """Test coroutine submission from a thread to an event loop 3684 when the task factory raise an exception.""" 3685 3686 def task_factory(loop, coro): 3687 raise NameError 3688 3689 run = self.loop.run_in_executor( 3690 None, lambda: self.target(advance_coro=True)) 3691 3692 # Set exception handler 3693 callback = test_utils.MockCallback() 3694 self.loop.set_exception_handler(callback) 3695 3696 # Set corrupted task factory 3697 self.addCleanup(self.loop.set_task_factory, 3698 self.loop.get_task_factory()) 3699 self.loop.set_task_factory(task_factory) 3700 3701 # Run event loop 3702 with self.assertRaises(NameError) as exc_context: 3703 self.loop.run_until_complete(run) 3704 3705 # Check exceptions 3706 self.assertEqual(len(callback.call_args_list), 1) 3707 (loop, context), kwargs = callback.call_args 3708 self.assertEqual(context['exception'], exc_context.exception) 3709 3710 3711class SleepTests(test_utils.TestCase): 3712 def setUp(self): 3713 super().setUp() 3714 self.loop = asyncio.new_event_loop() 3715 self.set_event_loop(self.loop) 3716 3717 def tearDown(self): 3718 self.loop.close() 3719 self.loop = None 3720 super().tearDown() 3721 3722 def test_sleep_zero(self): 3723 result = 0 3724 3725 def inc_result(num): 3726 nonlocal result 3727 result += num 3728 3729 async def coro(): 3730 self.loop.call_soon(inc_result, 1) 3731 self.assertEqual(result, 0) 3732 num = await asyncio.sleep(0, result=10) 3733 self.assertEqual(result, 1) # inc'ed by call_soon 3734 inc_result(num) # num should be 11 3735 3736 self.loop.run_until_complete(coro()) 3737 self.assertEqual(result, 11) 3738 3739 3740class WaitTests(test_utils.TestCase): 3741 def setUp(self): 3742 super().setUp() 3743 self.loop = asyncio.new_event_loop() 3744 self.set_event_loop(self.loop) 3745 3746 def tearDown(self): 3747 self.loop.close() 3748 self.loop = None 3749 super().tearDown() 3750 3751 def test_coro_is_deprecated_in_wait(self): 3752 # Remove test when passing coros to asyncio.wait() is removed in 3.11 3753 with self.assertWarns(DeprecationWarning): 3754 self.loop.run_until_complete( 3755 asyncio.wait([coroutine_function()])) 3756 3757 task = self.loop.create_task(coroutine_function()) 3758 with self.assertWarns(DeprecationWarning): 3759 self.loop.run_until_complete( 3760 asyncio.wait([task, coroutine_function()])) 3761 3762 3763class CompatibilityTests(test_utils.TestCase): 3764 # Tests for checking a bridge between old-styled coroutines 3765 # and async/await syntax 3766 3767 def setUp(self): 3768 super().setUp() 3769 self.loop = asyncio.new_event_loop() 3770 self.set_event_loop(self.loop) 3771 3772 def tearDown(self): 3773 self.loop.close() 3774 self.loop = None 3775 super().tearDown() 3776 3777 def test_yield_from_awaitable(self): 3778 3779 with self.assertWarns(DeprecationWarning): 3780 @asyncio.coroutine 3781 def coro(): 3782 yield from asyncio.sleep(0) 3783 return 'ok' 3784 3785 result = self.loop.run_until_complete(coro()) 3786 self.assertEqual('ok', result) 3787 3788 def test_await_old_style_coro(self): 3789 3790 with self.assertWarns(DeprecationWarning): 3791 @asyncio.coroutine 3792 def coro1(): 3793 return 'ok1' 3794 3795 with self.assertWarns(DeprecationWarning): 3796 @asyncio.coroutine 3797 def coro2(): 3798 yield from asyncio.sleep(0) 3799 return 'ok2' 3800 3801 async def inner(): 3802 return await asyncio.gather(coro1(), coro2()) 3803 3804 result = self.loop.run_until_complete(inner()) 3805 self.assertEqual(['ok1', 'ok2'], result) 3806 3807 def test_debug_mode_interop(self): 3808 # https://bugs.python.org/issue32636 3809 code = textwrap.dedent(""" 3810 import asyncio 3811 3812 async def native_coro(): 3813 pass 3814 3815 @asyncio.coroutine 3816 def old_style_coro(): 3817 yield from native_coro() 3818 3819 asyncio.run(old_style_coro()) 3820 """) 3821 3822 assert_python_ok("-Wignore::DeprecationWarning", "-c", code, 3823 PYTHONASYNCIODEBUG="1") 3824 3825 3826if __name__ == '__main__': 3827 unittest.main() 3828