1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import types 14import unittest 15import weakref 16from unittest import mock 17 18import asyncio 19from asyncio import coroutines 20from asyncio import futures 21from asyncio import tasks 22from test.test_asyncio import utils as test_utils 23from test import support 24from test.support.script_helper import assert_python_ok 25 26 27def tearDownModule(): 28 asyncio.set_event_loop_policy(None) 29 30 31async def coroutine_function(): 32 pass 33 34 35@contextlib.contextmanager 36def set_coroutine_debug(enabled): 37 coroutines = asyncio.coroutines 38 39 old_debug = coroutines._DEBUG 40 try: 41 coroutines._DEBUG = enabled 42 yield 43 finally: 44 coroutines._DEBUG = old_debug 45 46 47def format_coroutine(qualname, state, src, source_traceback, generator=False): 48 if generator: 49 state = '%s' % state 50 else: 51 state = '%s, defined' % state 52 if source_traceback is not None: 53 frame = source_traceback[-1] 54 return ('coro=<%s() %s at %s> created at %s:%s' 55 % (qualname, state, src, frame[0], frame[1])) 56 else: 57 return 'coro=<%s() %s at %s>' % (qualname, state, src) 58 59 60class Dummy: 61 62 def __repr__(self): 63 return '<Dummy>' 64 65 def __call__(self, *args): 66 pass 67 68 69class CoroLikeObject: 70 def send(self, v): 71 raise StopIteration(42) 72 73 def throw(self, *exc): 74 pass 75 76 def close(self): 77 pass 78 79 def __await__(self): 80 return self 81 82 83class BaseTaskTests: 84 85 Task = None 86 Future = None 87 88 def new_task(self, loop, coro, name='TestTask'): 89 return self.__class__.Task(coro, loop=loop, name=name) 90 91 def new_future(self, loop): 92 return self.__class__.Future(loop=loop) 93 94 def setUp(self): 95 super().setUp() 96 self.loop = self.new_test_loop() 97 self.loop.set_task_factory(self.new_task) 98 self.loop.create_future = lambda: self.new_future(self.loop) 99 100 def test_task_del_collect(self): 101 class Evil: 102 def __del__(self): 103 gc.collect() 104 105 async def run(): 106 return Evil() 107 108 self.loop.run_until_complete( 109 asyncio.gather(*[ 110 self.new_task(self.loop, run()) for _ in range(100) 111 ], loop=self.loop)) 112 113 def test_other_loop_future(self): 114 other_loop = asyncio.new_event_loop() 115 fut = self.new_future(other_loop) 116 117 async def run(fut): 118 await fut 119 120 try: 121 with self.assertRaisesRegex(RuntimeError, 122 r'Task .* got Future .* attached'): 123 self.loop.run_until_complete(run(fut)) 124 finally: 125 other_loop.close() 126 127 def test_task_awaits_on_itself(self): 128 129 async def test(): 130 await task 131 132 task = asyncio.ensure_future(test(), loop=self.loop) 133 134 with self.assertRaisesRegex(RuntimeError, 135 'Task cannot await on itself'): 136 self.loop.run_until_complete(task) 137 138 def test_task_class(self): 139 async def notmuch(): 140 return 'ok' 141 t = self.new_task(self.loop, notmuch()) 142 self.loop.run_until_complete(t) 143 self.assertTrue(t.done()) 144 self.assertEqual(t.result(), 'ok') 145 self.assertIs(t._loop, self.loop) 146 self.assertIs(t.get_loop(), self.loop) 147 148 loop = asyncio.new_event_loop() 149 self.set_event_loop(loop) 150 t = self.new_task(loop, notmuch()) 151 self.assertIs(t._loop, loop) 152 loop.run_until_complete(t) 153 loop.close() 154 155 def test_ensure_future_coroutine(self): 156 with self.assertWarns(DeprecationWarning): 157 @asyncio.coroutine 158 def notmuch(): 159 return 'ok' 160 t = asyncio.ensure_future(notmuch(), loop=self.loop) 161 self.loop.run_until_complete(t) 162 self.assertTrue(t.done()) 163 self.assertEqual(t.result(), 'ok') 164 self.assertIs(t._loop, self.loop) 165 166 loop = asyncio.new_event_loop() 167 self.set_event_loop(loop) 168 t = asyncio.ensure_future(notmuch(), loop=loop) 169 self.assertIs(t._loop, loop) 170 loop.run_until_complete(t) 171 loop.close() 172 173 def test_ensure_future_future(self): 174 f_orig = self.new_future(self.loop) 175 f_orig.set_result('ko') 176 177 f = asyncio.ensure_future(f_orig) 178 self.loop.run_until_complete(f) 179 self.assertTrue(f.done()) 180 self.assertEqual(f.result(), 'ko') 181 self.assertIs(f, f_orig) 182 183 loop = asyncio.new_event_loop() 184 self.set_event_loop(loop) 185 186 with self.assertRaises(ValueError): 187 f = asyncio.ensure_future(f_orig, loop=loop) 188 189 loop.close() 190 191 f = asyncio.ensure_future(f_orig, loop=self.loop) 192 self.assertIs(f, f_orig) 193 194 def test_ensure_future_task(self): 195 async def notmuch(): 196 return 'ok' 197 t_orig = self.new_task(self.loop, notmuch()) 198 t = asyncio.ensure_future(t_orig) 199 self.loop.run_until_complete(t) 200 self.assertTrue(t.done()) 201 self.assertEqual(t.result(), 'ok') 202 self.assertIs(t, t_orig) 203 204 loop = asyncio.new_event_loop() 205 self.set_event_loop(loop) 206 207 with self.assertRaises(ValueError): 208 t = asyncio.ensure_future(t_orig, loop=loop) 209 210 loop.close() 211 212 t = asyncio.ensure_future(t_orig, loop=self.loop) 213 self.assertIs(t, t_orig) 214 215 def test_ensure_future_awaitable(self): 216 class Aw: 217 def __init__(self, coro): 218 self.coro = coro 219 def __await__(self): 220 return (yield from self.coro) 221 222 with self.assertWarns(DeprecationWarning): 223 @asyncio.coroutine 224 def coro(): 225 return 'ok' 226 227 loop = asyncio.new_event_loop() 228 self.set_event_loop(loop) 229 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 230 loop.run_until_complete(fut) 231 assert fut.result() == 'ok' 232 233 def test_ensure_future_neither(self): 234 with self.assertRaises(TypeError): 235 asyncio.ensure_future('ok') 236 237 def test_ensure_future_error_msg(self): 238 loop = asyncio.new_event_loop() 239 f = self.new_future(self.loop) 240 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 241 'different loop than the one specified as ' 242 'the loop argument'): 243 asyncio.ensure_future(f, loop=loop) 244 loop.close() 245 246 def test_get_stack(self): 247 T = None 248 249 async def foo(): 250 await bar() 251 252 async def bar(): 253 # test get_stack() 254 f = T.get_stack(limit=1) 255 try: 256 self.assertEqual(f[0].f_code.co_name, 'foo') 257 finally: 258 f = None 259 260 # test print_stack() 261 file = io.StringIO() 262 T.print_stack(limit=1, file=file) 263 file.seek(0) 264 tb = file.read() 265 self.assertRegex(tb, r'foo\(\) running') 266 267 async def runner(): 268 nonlocal T 269 T = asyncio.ensure_future(foo(), loop=self.loop) 270 await T 271 272 self.loop.run_until_complete(runner()) 273 274 def test_task_repr(self): 275 self.loop.set_debug(False) 276 277 async def notmuch(): 278 return 'abc' 279 280 # test coroutine function 281 self.assertEqual(notmuch.__name__, 'notmuch') 282 self.assertRegex(notmuch.__qualname__, 283 r'\w+.test_task_repr.<locals>.notmuch') 284 self.assertEqual(notmuch.__module__, __name__) 285 286 filename, lineno = test_utils.get_function_source(notmuch) 287 src = "%s:%s" % (filename, lineno) 288 289 # test coroutine object 290 gen = notmuch() 291 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 292 self.assertEqual(gen.__name__, 'notmuch') 293 self.assertEqual(gen.__qualname__, coro_qualname) 294 295 # test pending Task 296 t = self.new_task(self.loop, gen) 297 t.add_done_callback(Dummy()) 298 299 coro = format_coroutine(coro_qualname, 'running', src, 300 t._source_traceback, generator=True) 301 self.assertEqual(repr(t), 302 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 303 304 # test cancelling Task 305 t.cancel() # Does not take immediate effect! 306 self.assertEqual(repr(t), 307 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 308 309 # test cancelled Task 310 self.assertRaises(asyncio.CancelledError, 311 self.loop.run_until_complete, t) 312 coro = format_coroutine(coro_qualname, 'done', src, 313 t._source_traceback) 314 self.assertEqual(repr(t), 315 "<Task cancelled name='TestTask' %s>" % coro) 316 317 # test finished Task 318 t = self.new_task(self.loop, notmuch()) 319 self.loop.run_until_complete(t) 320 coro = format_coroutine(coro_qualname, 'done', src, 321 t._source_traceback) 322 self.assertEqual(repr(t), 323 "<Task finished name='TestTask' %s result='abc'>" % coro) 324 325 def test_task_repr_autogenerated(self): 326 async def notmuch(): 327 return 123 328 329 t1 = self.new_task(self.loop, notmuch(), None) 330 t2 = self.new_task(self.loop, notmuch(), None) 331 self.assertNotEqual(repr(t1), repr(t2)) 332 333 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 334 self.assertIsNotNone(match1) 335 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 336 self.assertIsNotNone(match2) 337 338 # Autogenerated task names should have monotonically increasing numbers 339 self.assertLess(int(match1.group(1)), int(match2.group(1))) 340 self.loop.run_until_complete(t1) 341 self.loop.run_until_complete(t2) 342 343 def test_task_repr_name_not_str(self): 344 async def notmuch(): 345 return 123 346 347 t = self.new_task(self.loop, notmuch()) 348 t.set_name({6}) 349 self.assertEqual(t.get_name(), '{6}') 350 self.loop.run_until_complete(t) 351 352 def test_task_repr_coro_decorator(self): 353 self.loop.set_debug(False) 354 355 with self.assertWarns(DeprecationWarning): 356 @asyncio.coroutine 357 def notmuch(): 358 # notmuch() function doesn't use yield from: it will be wrapped by 359 # @coroutine decorator 360 return 123 361 362 # test coroutine function 363 self.assertEqual(notmuch.__name__, 'notmuch') 364 self.assertRegex(notmuch.__qualname__, 365 r'\w+.test_task_repr_coro_decorator' 366 r'\.<locals>\.notmuch') 367 self.assertEqual(notmuch.__module__, __name__) 368 369 # test coroutine object 370 gen = notmuch() 371 # On Python >= 3.5, generators now inherit the name of the 372 # function, as expected, and have a qualified name (__qualname__ 373 # attribute). 374 coro_name = 'notmuch' 375 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 376 '.<locals>.notmuch') 377 self.assertEqual(gen.__name__, coro_name) 378 self.assertEqual(gen.__qualname__, coro_qualname) 379 380 # test repr(CoroWrapper) 381 if coroutines._DEBUG: 382 # format the coroutine object 383 if coroutines._DEBUG: 384 filename, lineno = test_utils.get_function_source(notmuch) 385 frame = gen._source_traceback[-1] 386 coro = ('%s() running, defined at %s:%s, created at %s:%s' 387 % (coro_qualname, filename, lineno, 388 frame[0], frame[1])) 389 else: 390 code = gen.gi_code 391 coro = ('%s() running at %s:%s' 392 % (coro_qualname, code.co_filename, 393 code.co_firstlineno)) 394 395 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 396 397 # test pending Task 398 t = self.new_task(self.loop, gen) 399 t.add_done_callback(Dummy()) 400 401 # format the coroutine object 402 if coroutines._DEBUG: 403 src = '%s:%s' % test_utils.get_function_source(notmuch) 404 else: 405 code = gen.gi_code 406 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 407 coro = format_coroutine(coro_qualname, 'running', src, 408 t._source_traceback, 409 generator=not coroutines._DEBUG) 410 self.assertEqual(repr(t), 411 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 412 self.loop.run_until_complete(t) 413 414 def test_task_repr_wait_for(self): 415 self.loop.set_debug(False) 416 417 async def wait_for(fut): 418 return await fut 419 420 fut = self.new_future(self.loop) 421 task = self.new_task(self.loop, wait_for(fut)) 422 test_utils.run_briefly(self.loop) 423 self.assertRegex(repr(task), 424 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 425 426 fut.set_result(None) 427 self.loop.run_until_complete(task) 428 429 def test_task_repr_partial_corowrapper(self): 430 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 431 # coroutine is a partial function 432 with set_coroutine_debug(True): 433 self.loop.set_debug(True) 434 435 async def func(x, y): 436 await asyncio.sleep(0) 437 438 with self.assertWarns(DeprecationWarning): 439 partial_func = asyncio.coroutine(functools.partial(func, 1)) 440 task = self.loop.create_task(partial_func(2)) 441 442 # make warnings quiet 443 task._log_destroy_pending = False 444 self.addCleanup(task._coro.close) 445 446 coro_repr = repr(task._coro) 447 expected = ( 448 r'<coroutine object \w+\.test_task_repr_partial_corowrapper' 449 r'\.<locals>\.func at' 450 ) 451 self.assertRegex(coro_repr, expected) 452 453 def test_task_basics(self): 454 455 async def outer(): 456 a = await inner1() 457 b = await inner2() 458 return a+b 459 460 async def inner1(): 461 return 42 462 463 async def inner2(): 464 return 1000 465 466 t = outer() 467 self.assertEqual(self.loop.run_until_complete(t), 1042) 468 469 def test_cancel(self): 470 471 def gen(): 472 when = yield 473 self.assertAlmostEqual(10.0, when) 474 yield 0 475 476 loop = self.new_test_loop(gen) 477 478 async def task(): 479 await asyncio.sleep(10.0) 480 return 12 481 482 t = self.new_task(loop, task()) 483 loop.call_soon(t.cancel) 484 with self.assertRaises(asyncio.CancelledError): 485 loop.run_until_complete(t) 486 self.assertTrue(t.done()) 487 self.assertTrue(t.cancelled()) 488 self.assertFalse(t.cancel()) 489 490 def test_cancel_yield(self): 491 with self.assertWarns(DeprecationWarning): 492 @asyncio.coroutine 493 def task(): 494 yield 495 yield 496 return 12 497 498 t = self.new_task(self.loop, task()) 499 test_utils.run_briefly(self.loop) # start coro 500 t.cancel() 501 self.assertRaises( 502 asyncio.CancelledError, self.loop.run_until_complete, t) 503 self.assertTrue(t.done()) 504 self.assertTrue(t.cancelled()) 505 self.assertFalse(t.cancel()) 506 507 def test_cancel_inner_future(self): 508 f = self.new_future(self.loop) 509 510 async def task(): 511 await f 512 return 12 513 514 t = self.new_task(self.loop, task()) 515 test_utils.run_briefly(self.loop) # start task 516 f.cancel() 517 with self.assertRaises(asyncio.CancelledError): 518 self.loop.run_until_complete(t) 519 self.assertTrue(f.cancelled()) 520 self.assertTrue(t.cancelled()) 521 522 def test_cancel_both_task_and_inner_future(self): 523 f = self.new_future(self.loop) 524 525 async def task(): 526 await f 527 return 12 528 529 t = self.new_task(self.loop, task()) 530 test_utils.run_briefly(self.loop) 531 532 f.cancel() 533 t.cancel() 534 535 with self.assertRaises(asyncio.CancelledError): 536 self.loop.run_until_complete(t) 537 538 self.assertTrue(t.done()) 539 self.assertTrue(f.cancelled()) 540 self.assertTrue(t.cancelled()) 541 542 def test_cancel_task_catching(self): 543 fut1 = self.new_future(self.loop) 544 fut2 = self.new_future(self.loop) 545 546 async def task(): 547 await fut1 548 try: 549 await fut2 550 except asyncio.CancelledError: 551 return 42 552 553 t = self.new_task(self.loop, task()) 554 test_utils.run_briefly(self.loop) 555 self.assertIs(t._fut_waiter, fut1) # White-box test. 556 fut1.set_result(None) 557 test_utils.run_briefly(self.loop) 558 self.assertIs(t._fut_waiter, fut2) # White-box test. 559 t.cancel() 560 self.assertTrue(fut2.cancelled()) 561 res = self.loop.run_until_complete(t) 562 self.assertEqual(res, 42) 563 self.assertFalse(t.cancelled()) 564 565 def test_cancel_task_ignoring(self): 566 fut1 = self.new_future(self.loop) 567 fut2 = self.new_future(self.loop) 568 fut3 = self.new_future(self.loop) 569 570 async def task(): 571 await fut1 572 try: 573 await fut2 574 except asyncio.CancelledError: 575 pass 576 res = await fut3 577 return res 578 579 t = self.new_task(self.loop, task()) 580 test_utils.run_briefly(self.loop) 581 self.assertIs(t._fut_waiter, fut1) # White-box test. 582 fut1.set_result(None) 583 test_utils.run_briefly(self.loop) 584 self.assertIs(t._fut_waiter, fut2) # White-box test. 585 t.cancel() 586 self.assertTrue(fut2.cancelled()) 587 test_utils.run_briefly(self.loop) 588 self.assertIs(t._fut_waiter, fut3) # White-box test. 589 fut3.set_result(42) 590 res = self.loop.run_until_complete(t) 591 self.assertEqual(res, 42) 592 self.assertFalse(fut3.cancelled()) 593 self.assertFalse(t.cancelled()) 594 595 def test_cancel_current_task(self): 596 loop = asyncio.new_event_loop() 597 self.set_event_loop(loop) 598 599 async def task(): 600 t.cancel() 601 self.assertTrue(t._must_cancel) # White-box test. 602 # The sleep should be cancelled immediately. 603 await asyncio.sleep(100) 604 return 12 605 606 t = self.new_task(loop, task()) 607 self.assertFalse(t.cancelled()) 608 self.assertRaises( 609 asyncio.CancelledError, loop.run_until_complete, t) 610 self.assertTrue(t.done()) 611 self.assertTrue(t.cancelled()) 612 self.assertFalse(t._must_cancel) # White-box test. 613 self.assertFalse(t.cancel()) 614 615 def test_cancel_at_end(self): 616 """coroutine end right after task is cancelled""" 617 loop = asyncio.new_event_loop() 618 self.set_event_loop(loop) 619 620 async def task(): 621 t.cancel() 622 self.assertTrue(t._must_cancel) # White-box test. 623 return 12 624 625 t = self.new_task(loop, task()) 626 self.assertFalse(t.cancelled()) 627 self.assertRaises( 628 asyncio.CancelledError, loop.run_until_complete, t) 629 self.assertTrue(t.done()) 630 self.assertTrue(t.cancelled()) 631 self.assertFalse(t._must_cancel) # White-box test. 632 self.assertFalse(t.cancel()) 633 634 def test_cancel_awaited_task(self): 635 # This tests for a relatively rare condition when 636 # a task cancellation is requested for a task which is not 637 # currently blocked, such as a task cancelling itself. 638 # In this situation we must ensure that whatever next future 639 # or task the cancelled task blocks on is cancelled correctly 640 # as well. See also bpo-34872. 641 loop = asyncio.new_event_loop() 642 self.addCleanup(lambda: loop.close()) 643 644 task = nested_task = None 645 fut = self.new_future(loop) 646 647 async def nested(): 648 await fut 649 650 async def coro(): 651 nonlocal nested_task 652 # Create a sub-task and wait for it to run. 653 nested_task = self.new_task(loop, nested()) 654 await asyncio.sleep(0) 655 656 # Request the current task to be cancelled. 657 task.cancel() 658 # Block on the nested task, which should be immediately 659 # cancelled. 660 await nested_task 661 662 task = self.new_task(loop, coro()) 663 with self.assertRaises(asyncio.CancelledError): 664 loop.run_until_complete(task) 665 666 self.assertTrue(task.cancelled()) 667 self.assertTrue(nested_task.cancelled()) 668 self.assertTrue(fut.cancelled()) 669 670 def test_stop_while_run_in_complete(self): 671 672 def gen(): 673 when = yield 674 self.assertAlmostEqual(0.1, when) 675 when = yield 0.1 676 self.assertAlmostEqual(0.2, when) 677 when = yield 0.1 678 self.assertAlmostEqual(0.3, when) 679 yield 0.1 680 681 loop = self.new_test_loop(gen) 682 683 x = 0 684 685 async def task(): 686 nonlocal x 687 while x < 10: 688 await asyncio.sleep(0.1) 689 x += 1 690 if x == 2: 691 loop.stop() 692 693 t = self.new_task(loop, task()) 694 with self.assertRaises(RuntimeError) as cm: 695 loop.run_until_complete(t) 696 self.assertEqual(str(cm.exception), 697 'Event loop stopped before Future completed.') 698 self.assertFalse(t.done()) 699 self.assertEqual(x, 2) 700 self.assertAlmostEqual(0.3, loop.time()) 701 702 t.cancel() 703 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 704 705 def test_log_traceback(self): 706 async def coro(): 707 pass 708 709 task = self.new_task(self.loop, coro()) 710 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 711 task._log_traceback = True 712 self.loop.run_until_complete(task) 713 714 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 715 def gen(): 716 when = yield 717 self.assertAlmostEqual(0, when) 718 719 loop = self.new_test_loop(gen) 720 721 fut = self.new_future(loop) 722 fut.set_result('done') 723 724 ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) 725 726 self.assertEqual(ret, 'done') 727 self.assertTrue(fut.done()) 728 self.assertAlmostEqual(0, loop.time()) 729 730 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 731 def gen(): 732 when = yield 733 self.assertAlmostEqual(0, when) 734 735 loop = self.new_test_loop(gen) 736 737 foo_started = False 738 739 async def foo(): 740 nonlocal foo_started 741 foo_started = True 742 743 with self.assertRaises(asyncio.TimeoutError): 744 loop.run_until_complete(asyncio.wait_for(foo(), 0)) 745 746 self.assertAlmostEqual(0, loop.time()) 747 self.assertEqual(foo_started, False) 748 749 def test_wait_for_timeout_less_then_0_or_0(self): 750 def gen(): 751 when = yield 752 self.assertAlmostEqual(0.2, when) 753 when = yield 0 754 self.assertAlmostEqual(0, when) 755 756 for timeout in [0, -1]: 757 with self.subTest(timeout=timeout): 758 loop = self.new_test_loop(gen) 759 760 foo_running = None 761 762 async def foo(): 763 nonlocal foo_running 764 foo_running = True 765 try: 766 await asyncio.sleep(0.2) 767 finally: 768 foo_running = False 769 return 'done' 770 771 fut = self.new_task(loop, foo()) 772 773 with self.assertRaises(asyncio.TimeoutError): 774 loop.run_until_complete(asyncio.wait_for(fut, timeout)) 775 self.assertTrue(fut.done()) 776 # it should have been cancelled due to the timeout 777 self.assertTrue(fut.cancelled()) 778 self.assertAlmostEqual(0, loop.time()) 779 self.assertEqual(foo_running, False) 780 781 def test_wait_for(self): 782 783 def gen(): 784 when = yield 785 self.assertAlmostEqual(0.2, when) 786 when = yield 0 787 self.assertAlmostEqual(0.1, when) 788 when = yield 0.1 789 790 loop = self.new_test_loop(gen) 791 792 foo_running = None 793 794 async def foo(): 795 nonlocal foo_running 796 foo_running = True 797 try: 798 await asyncio.sleep(0.2) 799 finally: 800 foo_running = False 801 return 'done' 802 803 fut = self.new_task(loop, foo()) 804 805 with self.assertRaises(asyncio.TimeoutError): 806 loop.run_until_complete(asyncio.wait_for(fut, 0.1)) 807 self.assertTrue(fut.done()) 808 # it should have been cancelled due to the timeout 809 self.assertTrue(fut.cancelled()) 810 self.assertAlmostEqual(0.1, loop.time()) 811 self.assertEqual(foo_running, False) 812 813 def test_wait_for_blocking(self): 814 loop = self.new_test_loop() 815 816 async def coro(): 817 return 'done' 818 819 res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) 820 self.assertEqual(res, 'done') 821 822 def test_wait_for_with_global_loop(self): 823 824 def gen(): 825 when = yield 826 self.assertAlmostEqual(0.2, when) 827 when = yield 0 828 self.assertAlmostEqual(0.01, when) 829 yield 0.01 830 831 loop = self.new_test_loop(gen) 832 833 async def foo(): 834 await asyncio.sleep(0.2) 835 return 'done' 836 837 asyncio.set_event_loop(loop) 838 try: 839 fut = self.new_task(loop, foo()) 840 with self.assertRaises(asyncio.TimeoutError): 841 loop.run_until_complete(asyncio.wait_for(fut, 0.01)) 842 finally: 843 asyncio.set_event_loop(None) 844 845 self.assertAlmostEqual(0.01, loop.time()) 846 self.assertTrue(fut.done()) 847 self.assertTrue(fut.cancelled()) 848 849 def test_wait_for_race_condition(self): 850 851 def gen(): 852 yield 0.1 853 yield 0.1 854 yield 0.1 855 856 loop = self.new_test_loop(gen) 857 858 fut = self.new_future(loop) 859 task = asyncio.wait_for(fut, timeout=0.2) 860 loop.call_later(0.1, fut.set_result, "ok") 861 res = loop.run_until_complete(task) 862 self.assertEqual(res, "ok") 863 864 def test_wait_for_waits_for_task_cancellation(self): 865 loop = asyncio.new_event_loop() 866 self.addCleanup(loop.close) 867 868 task_done = False 869 870 async def foo(): 871 async def inner(): 872 nonlocal task_done 873 try: 874 await asyncio.sleep(0.2) 875 finally: 876 task_done = True 877 878 inner_task = self.new_task(loop, inner()) 879 880 with self.assertRaises(asyncio.TimeoutError): 881 await asyncio.wait_for(inner_task, timeout=0.1) 882 883 self.assertTrue(task_done) 884 885 loop.run_until_complete(foo()) 886 887 def test_wait_for_self_cancellation(self): 888 loop = asyncio.new_event_loop() 889 self.addCleanup(loop.close) 890 891 async def foo(): 892 async def inner(): 893 try: 894 await asyncio.sleep(0.3) 895 except asyncio.CancelledError: 896 try: 897 await asyncio.sleep(0.3) 898 except asyncio.CancelledError: 899 await asyncio.sleep(0.3) 900 901 return 42 902 903 inner_task = self.new_task(loop, inner()) 904 905 wait = asyncio.wait_for(inner_task, timeout=0.1) 906 907 # Test that wait_for itself is properly cancellable 908 # even when the initial task holds up the initial cancellation. 909 task = self.new_task(loop, wait) 910 await asyncio.sleep(0.2) 911 task.cancel() 912 913 with self.assertRaises(asyncio.CancelledError): 914 await task 915 916 self.assertEqual(await inner_task, 42) 917 918 loop.run_until_complete(foo()) 919 920 def test_wait(self): 921 922 def gen(): 923 when = yield 924 self.assertAlmostEqual(0.1, when) 925 when = yield 0 926 self.assertAlmostEqual(0.15, when) 927 yield 0.15 928 929 loop = self.new_test_loop(gen) 930 931 a = self.new_task(loop, asyncio.sleep(0.1)) 932 b = self.new_task(loop, asyncio.sleep(0.15)) 933 934 async def foo(): 935 done, pending = await asyncio.wait([b, a]) 936 self.assertEqual(done, set([a, b])) 937 self.assertEqual(pending, set()) 938 return 42 939 940 res = loop.run_until_complete(self.new_task(loop, foo())) 941 self.assertEqual(res, 42) 942 self.assertAlmostEqual(0.15, loop.time()) 943 944 # Doing it again should take no time and exercise a different path. 945 res = loop.run_until_complete(self.new_task(loop, foo())) 946 self.assertAlmostEqual(0.15, loop.time()) 947 self.assertEqual(res, 42) 948 949 def test_wait_with_global_loop(self): 950 951 def gen(): 952 when = yield 953 self.assertAlmostEqual(0.01, when) 954 when = yield 0 955 self.assertAlmostEqual(0.015, when) 956 yield 0.015 957 958 loop = self.new_test_loop(gen) 959 960 a = self.new_task(loop, asyncio.sleep(0.01)) 961 b = self.new_task(loop, asyncio.sleep(0.015)) 962 963 async def foo(): 964 done, pending = await asyncio.wait([b, a]) 965 self.assertEqual(done, set([a, b])) 966 self.assertEqual(pending, set()) 967 return 42 968 969 asyncio.set_event_loop(loop) 970 res = loop.run_until_complete( 971 self.new_task(loop, foo())) 972 973 self.assertEqual(res, 42) 974 975 def test_wait_duplicate_coroutines(self): 976 977 with self.assertWarns(DeprecationWarning): 978 @asyncio.coroutine 979 def coro(s): 980 return s 981 c = coro('test') 982 983 task =self.new_task( 984 self.loop, 985 asyncio.wait([c, c, coro('spam')])) 986 987 done, pending = self.loop.run_until_complete(task) 988 989 self.assertFalse(pending) 990 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 991 992 def test_wait_errors(self): 993 self.assertRaises( 994 ValueError, self.loop.run_until_complete, 995 asyncio.wait(set())) 996 997 # -1 is an invalid return_when value 998 sleep_coro = asyncio.sleep(10.0) 999 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1000 self.assertRaises(ValueError, 1001 self.loop.run_until_complete, wait_coro) 1002 1003 sleep_coro.close() 1004 1005 def test_wait_first_completed(self): 1006 1007 def gen(): 1008 when = yield 1009 self.assertAlmostEqual(10.0, when) 1010 when = yield 0 1011 self.assertAlmostEqual(0.1, when) 1012 yield 0.1 1013 1014 loop = self.new_test_loop(gen) 1015 1016 a = self.new_task(loop, asyncio.sleep(10.0)) 1017 b = self.new_task(loop, asyncio.sleep(0.1)) 1018 task = self.new_task( 1019 loop, 1020 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1021 1022 done, pending = loop.run_until_complete(task) 1023 self.assertEqual({b}, done) 1024 self.assertEqual({a}, pending) 1025 self.assertFalse(a.done()) 1026 self.assertTrue(b.done()) 1027 self.assertIsNone(b.result()) 1028 self.assertAlmostEqual(0.1, loop.time()) 1029 1030 # move forward to close generator 1031 loop.advance_time(10) 1032 loop.run_until_complete(asyncio.wait([a, b])) 1033 1034 def test_wait_really_done(self): 1035 # there is possibility that some tasks in the pending list 1036 # became done but their callbacks haven't all been called yet 1037 1038 async def coro1(): 1039 await asyncio.sleep(0) 1040 1041 async def coro2(): 1042 await asyncio.sleep(0) 1043 await asyncio.sleep(0) 1044 1045 a = self.new_task(self.loop, coro1()) 1046 b = self.new_task(self.loop, coro2()) 1047 task = self.new_task( 1048 self.loop, 1049 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1050 1051 done, pending = self.loop.run_until_complete(task) 1052 self.assertEqual({a, b}, done) 1053 self.assertTrue(a.done()) 1054 self.assertIsNone(a.result()) 1055 self.assertTrue(b.done()) 1056 self.assertIsNone(b.result()) 1057 1058 def test_wait_first_exception(self): 1059 1060 def gen(): 1061 when = yield 1062 self.assertAlmostEqual(10.0, when) 1063 yield 0 1064 1065 loop = self.new_test_loop(gen) 1066 1067 # first_exception, task already has exception 1068 a = self.new_task(loop, asyncio.sleep(10.0)) 1069 1070 async def exc(): 1071 raise ZeroDivisionError('err') 1072 1073 b = self.new_task(loop, exc()) 1074 task = self.new_task( 1075 loop, 1076 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1077 1078 done, pending = loop.run_until_complete(task) 1079 self.assertEqual({b}, done) 1080 self.assertEqual({a}, pending) 1081 self.assertAlmostEqual(0, loop.time()) 1082 1083 # move forward to close generator 1084 loop.advance_time(10) 1085 loop.run_until_complete(asyncio.wait([a, b])) 1086 1087 def test_wait_first_exception_in_wait(self): 1088 1089 def gen(): 1090 when = yield 1091 self.assertAlmostEqual(10.0, when) 1092 when = yield 0 1093 self.assertAlmostEqual(0.01, when) 1094 yield 0.01 1095 1096 loop = self.new_test_loop(gen) 1097 1098 # first_exception, exception during waiting 1099 a = self.new_task(loop, asyncio.sleep(10.0)) 1100 1101 async def exc(): 1102 await asyncio.sleep(0.01) 1103 raise ZeroDivisionError('err') 1104 1105 b = self.new_task(loop, exc()) 1106 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1107 1108 done, pending = loop.run_until_complete(task) 1109 self.assertEqual({b}, done) 1110 self.assertEqual({a}, pending) 1111 self.assertAlmostEqual(0.01, loop.time()) 1112 1113 # move forward to close generator 1114 loop.advance_time(10) 1115 loop.run_until_complete(asyncio.wait([a, b])) 1116 1117 def test_wait_with_exception(self): 1118 1119 def gen(): 1120 when = yield 1121 self.assertAlmostEqual(0.1, when) 1122 when = yield 0 1123 self.assertAlmostEqual(0.15, when) 1124 yield 0.15 1125 1126 loop = self.new_test_loop(gen) 1127 1128 a = self.new_task(loop, asyncio.sleep(0.1)) 1129 1130 async def sleeper(): 1131 await asyncio.sleep(0.15) 1132 raise ZeroDivisionError('really') 1133 1134 b = self.new_task(loop, sleeper()) 1135 1136 async def foo(): 1137 done, pending = await asyncio.wait([b, a]) 1138 self.assertEqual(len(done), 2) 1139 self.assertEqual(pending, set()) 1140 errors = set(f for f in done if f.exception() is not None) 1141 self.assertEqual(len(errors), 1) 1142 1143 loop.run_until_complete(self.new_task(loop, foo())) 1144 self.assertAlmostEqual(0.15, loop.time()) 1145 1146 loop.run_until_complete(self.new_task(loop, foo())) 1147 self.assertAlmostEqual(0.15, loop.time()) 1148 1149 def test_wait_with_timeout(self): 1150 1151 def gen(): 1152 when = yield 1153 self.assertAlmostEqual(0.1, when) 1154 when = yield 0 1155 self.assertAlmostEqual(0.15, when) 1156 when = yield 0 1157 self.assertAlmostEqual(0.11, when) 1158 yield 0.11 1159 1160 loop = self.new_test_loop(gen) 1161 1162 a = self.new_task(loop, asyncio.sleep(0.1)) 1163 b = self.new_task(loop, asyncio.sleep(0.15)) 1164 1165 async def foo(): 1166 done, pending = await asyncio.wait([b, a], timeout=0.11) 1167 self.assertEqual(done, set([a])) 1168 self.assertEqual(pending, set([b])) 1169 1170 loop.run_until_complete(self.new_task(loop, foo())) 1171 self.assertAlmostEqual(0.11, loop.time()) 1172 1173 # move forward to close generator 1174 loop.advance_time(10) 1175 loop.run_until_complete(asyncio.wait([a, b])) 1176 1177 def test_wait_concurrent_complete(self): 1178 1179 def gen(): 1180 when = yield 1181 self.assertAlmostEqual(0.1, when) 1182 when = yield 0 1183 self.assertAlmostEqual(0.15, when) 1184 when = yield 0 1185 self.assertAlmostEqual(0.1, when) 1186 yield 0.1 1187 1188 loop = self.new_test_loop(gen) 1189 1190 a = self.new_task(loop, asyncio.sleep(0.1)) 1191 b = self.new_task(loop, asyncio.sleep(0.15)) 1192 1193 done, pending = loop.run_until_complete( 1194 asyncio.wait([b, a], timeout=0.1)) 1195 1196 self.assertEqual(done, set([a])) 1197 self.assertEqual(pending, set([b])) 1198 self.assertAlmostEqual(0.1, loop.time()) 1199 1200 # move forward to close generator 1201 loop.advance_time(10) 1202 loop.run_until_complete(asyncio.wait([a, b])) 1203 1204 def test_as_completed(self): 1205 1206 def gen(): 1207 yield 0 1208 yield 0 1209 yield 0.01 1210 yield 0 1211 1212 loop = self.new_test_loop(gen) 1213 # disable "slow callback" warning 1214 loop.slow_callback_duration = 1.0 1215 completed = set() 1216 time_shifted = False 1217 1218 with self.assertWarns(DeprecationWarning): 1219 @asyncio.coroutine 1220 def sleeper(dt, x): 1221 nonlocal time_shifted 1222 yield from asyncio.sleep(dt) 1223 completed.add(x) 1224 if not time_shifted and 'a' in completed and 'b' in completed: 1225 time_shifted = True 1226 loop.advance_time(0.14) 1227 return x 1228 1229 a = sleeper(0.01, 'a') 1230 b = sleeper(0.01, 'b') 1231 c = sleeper(0.15, 'c') 1232 1233 async def foo(): 1234 values = [] 1235 for f in asyncio.as_completed([b, c, a], loop=loop): 1236 values.append(await f) 1237 return values 1238 with self.assertWarns(DeprecationWarning): 1239 res = loop.run_until_complete(self.new_task(loop, foo())) 1240 self.assertAlmostEqual(0.15, loop.time()) 1241 self.assertTrue('a' in res[:2]) 1242 self.assertTrue('b' in res[:2]) 1243 self.assertEqual(res[2], 'c') 1244 1245 # Doing it again should take no time and exercise a different path. 1246 with self.assertWarns(DeprecationWarning): 1247 res = loop.run_until_complete(self.new_task(loop, foo())) 1248 self.assertAlmostEqual(0.15, loop.time()) 1249 1250 def test_as_completed_with_timeout(self): 1251 1252 def gen(): 1253 yield 1254 yield 0 1255 yield 0 1256 yield 0.1 1257 1258 loop = self.new_test_loop(gen) 1259 1260 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1261 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1262 1263 async def foo(): 1264 values = [] 1265 for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): 1266 if values: 1267 loop.advance_time(0.02) 1268 try: 1269 v = await f 1270 values.append((1, v)) 1271 except asyncio.TimeoutError as exc: 1272 values.append((2, exc)) 1273 return values 1274 1275 with self.assertWarns(DeprecationWarning): 1276 res = loop.run_until_complete(self.new_task(loop, foo())) 1277 self.assertEqual(len(res), 2, res) 1278 self.assertEqual(res[0], (1, 'a')) 1279 self.assertEqual(res[1][0], 2) 1280 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1281 self.assertAlmostEqual(0.12, loop.time()) 1282 1283 # move forward to close generator 1284 loop.advance_time(10) 1285 loop.run_until_complete(asyncio.wait([a, b])) 1286 1287 def test_as_completed_with_unused_timeout(self): 1288 1289 def gen(): 1290 yield 1291 yield 0 1292 yield 0.01 1293 1294 loop = self.new_test_loop(gen) 1295 1296 a = asyncio.sleep(0.01, 'a') 1297 1298 async def foo(): 1299 for f in asyncio.as_completed([a], timeout=1, loop=loop): 1300 v = await f 1301 self.assertEqual(v, 'a') 1302 1303 with self.assertWarns(DeprecationWarning): 1304 loop.run_until_complete(self.new_task(loop, foo())) 1305 1306 def test_as_completed_reverse_wait(self): 1307 1308 def gen(): 1309 yield 0 1310 yield 0.05 1311 yield 0 1312 1313 loop = self.new_test_loop(gen) 1314 1315 a = asyncio.sleep(0.05, 'a') 1316 b = asyncio.sleep(0.10, 'b') 1317 fs = {a, b} 1318 1319 with self.assertWarns(DeprecationWarning): 1320 futs = list(asyncio.as_completed(fs, loop=loop)) 1321 self.assertEqual(len(futs), 2) 1322 1323 x = loop.run_until_complete(futs[1]) 1324 self.assertEqual(x, 'a') 1325 self.assertAlmostEqual(0.05, loop.time()) 1326 loop.advance_time(0.05) 1327 y = loop.run_until_complete(futs[0]) 1328 self.assertEqual(y, 'b') 1329 self.assertAlmostEqual(0.10, loop.time()) 1330 1331 def test_as_completed_concurrent(self): 1332 1333 def gen(): 1334 when = yield 1335 self.assertAlmostEqual(0.05, when) 1336 when = yield 0 1337 self.assertAlmostEqual(0.05, when) 1338 yield 0.05 1339 1340 loop = self.new_test_loop(gen) 1341 1342 a = asyncio.sleep(0.05, 'a') 1343 b = asyncio.sleep(0.05, 'b') 1344 fs = {a, b} 1345 with self.assertWarns(DeprecationWarning): 1346 futs = list(asyncio.as_completed(fs, loop=loop)) 1347 self.assertEqual(len(futs), 2) 1348 waiter = asyncio.wait(futs) 1349 done, pending = loop.run_until_complete(waiter) 1350 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1351 1352 def test_as_completed_duplicate_coroutines(self): 1353 1354 with self.assertWarns(DeprecationWarning): 1355 @asyncio.coroutine 1356 def coro(s): 1357 return s 1358 1359 with self.assertWarns(DeprecationWarning): 1360 @asyncio.coroutine 1361 def runner(): 1362 result = [] 1363 c = coro('ham') 1364 for f in asyncio.as_completed([c, c, coro('spam')], 1365 loop=self.loop): 1366 result.append((yield from f)) 1367 return result 1368 1369 with self.assertWarns(DeprecationWarning): 1370 fut = self.new_task(self.loop, runner()) 1371 self.loop.run_until_complete(fut) 1372 result = fut.result() 1373 self.assertEqual(set(result), {'ham', 'spam'}) 1374 self.assertEqual(len(result), 2) 1375 1376 def test_sleep(self): 1377 1378 def gen(): 1379 when = yield 1380 self.assertAlmostEqual(0.05, when) 1381 when = yield 0.05 1382 self.assertAlmostEqual(0.1, when) 1383 yield 0.05 1384 1385 loop = self.new_test_loop(gen) 1386 1387 async def sleeper(dt, arg): 1388 await asyncio.sleep(dt/2) 1389 res = await asyncio.sleep(dt/2, arg) 1390 return res 1391 1392 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1393 loop.run_until_complete(t) 1394 self.assertTrue(t.done()) 1395 self.assertEqual(t.result(), 'yeah') 1396 self.assertAlmostEqual(0.1, loop.time()) 1397 1398 def test_sleep_cancel(self): 1399 1400 def gen(): 1401 when = yield 1402 self.assertAlmostEqual(10.0, when) 1403 yield 0 1404 1405 loop = self.new_test_loop(gen) 1406 1407 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1408 1409 handle = None 1410 orig_call_later = loop.call_later 1411 1412 def call_later(delay, callback, *args): 1413 nonlocal handle 1414 handle = orig_call_later(delay, callback, *args) 1415 return handle 1416 1417 loop.call_later = call_later 1418 test_utils.run_briefly(loop) 1419 1420 self.assertFalse(handle._cancelled) 1421 1422 t.cancel() 1423 test_utils.run_briefly(loop) 1424 self.assertTrue(handle._cancelled) 1425 1426 def test_task_cancel_sleeping_task(self): 1427 1428 def gen(): 1429 when = yield 1430 self.assertAlmostEqual(0.1, when) 1431 when = yield 0 1432 self.assertAlmostEqual(5000, when) 1433 yield 0.1 1434 1435 loop = self.new_test_loop(gen) 1436 1437 async def sleep(dt): 1438 await asyncio.sleep(dt) 1439 1440 async def doit(): 1441 sleeper = self.new_task(loop, sleep(5000)) 1442 loop.call_later(0.1, sleeper.cancel) 1443 try: 1444 await sleeper 1445 except asyncio.CancelledError: 1446 return 'cancelled' 1447 else: 1448 return 'slept in' 1449 1450 doer = doit() 1451 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1452 self.assertAlmostEqual(0.1, loop.time()) 1453 1454 def test_task_cancel_waiter_future(self): 1455 fut = self.new_future(self.loop) 1456 1457 async def coro(): 1458 await fut 1459 1460 task = self.new_task(self.loop, coro()) 1461 test_utils.run_briefly(self.loop) 1462 self.assertIs(task._fut_waiter, fut) 1463 1464 task.cancel() 1465 test_utils.run_briefly(self.loop) 1466 self.assertRaises( 1467 asyncio.CancelledError, self.loop.run_until_complete, task) 1468 self.assertIsNone(task._fut_waiter) 1469 self.assertTrue(fut.cancelled()) 1470 1471 def test_task_set_methods(self): 1472 async def notmuch(): 1473 return 'ko' 1474 1475 gen = notmuch() 1476 task = self.new_task(self.loop, gen) 1477 1478 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1479 task.set_result('ok') 1480 1481 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1482 task.set_exception(ValueError()) 1483 1484 self.assertEqual( 1485 self.loop.run_until_complete(task), 1486 'ko') 1487 1488 def test_step_result(self): 1489 with self.assertWarns(DeprecationWarning): 1490 @asyncio.coroutine 1491 def notmuch(): 1492 yield None 1493 yield 1 1494 return 'ko' 1495 1496 self.assertRaises( 1497 RuntimeError, self.loop.run_until_complete, notmuch()) 1498 1499 def test_step_result_future(self): 1500 # If coroutine returns future, task waits on this future. 1501 1502 class Fut(asyncio.Future): 1503 def __init__(self, *args, **kwds): 1504 self.cb_added = False 1505 super().__init__(*args, **kwds) 1506 1507 def add_done_callback(self, *args, **kwargs): 1508 self.cb_added = True 1509 super().add_done_callback(*args, **kwargs) 1510 1511 fut = Fut(loop=self.loop) 1512 result = None 1513 1514 async def wait_for_future(): 1515 nonlocal result 1516 result = await fut 1517 1518 t = self.new_task(self.loop, wait_for_future()) 1519 test_utils.run_briefly(self.loop) 1520 self.assertTrue(fut.cb_added) 1521 1522 res = object() 1523 fut.set_result(res) 1524 test_utils.run_briefly(self.loop) 1525 self.assertIs(res, result) 1526 self.assertTrue(t.done()) 1527 self.assertIsNone(t.result()) 1528 1529 def test_baseexception_during_cancel(self): 1530 1531 def gen(): 1532 when = yield 1533 self.assertAlmostEqual(10.0, when) 1534 yield 0 1535 1536 loop = self.new_test_loop(gen) 1537 1538 async def sleeper(): 1539 await asyncio.sleep(10) 1540 1541 base_exc = SystemExit() 1542 1543 async def notmutch(): 1544 try: 1545 await sleeper() 1546 except asyncio.CancelledError: 1547 raise base_exc 1548 1549 task = self.new_task(loop, notmutch()) 1550 test_utils.run_briefly(loop) 1551 1552 task.cancel() 1553 self.assertFalse(task.done()) 1554 1555 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1556 1557 self.assertTrue(task.done()) 1558 self.assertFalse(task.cancelled()) 1559 self.assertIs(task.exception(), base_exc) 1560 1561 def test_iscoroutinefunction(self): 1562 def fn(): 1563 pass 1564 1565 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1566 1567 def fn1(): 1568 yield 1569 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1570 1571 with self.assertWarns(DeprecationWarning): 1572 @asyncio.coroutine 1573 def fn2(): 1574 yield 1575 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1576 1577 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1578 1579 def test_yield_vs_yield_from(self): 1580 fut = self.new_future(self.loop) 1581 1582 with self.assertWarns(DeprecationWarning): 1583 @asyncio.coroutine 1584 def wait_for_future(): 1585 yield fut 1586 1587 task = wait_for_future() 1588 with self.assertRaises(RuntimeError): 1589 self.loop.run_until_complete(task) 1590 1591 self.assertFalse(fut.done()) 1592 1593 def test_yield_vs_yield_from_generator(self): 1594 with self.assertWarns(DeprecationWarning): 1595 @asyncio.coroutine 1596 def coro(): 1597 yield 1598 1599 with self.assertWarns(DeprecationWarning): 1600 @asyncio.coroutine 1601 def wait_for_future(): 1602 gen = coro() 1603 try: 1604 yield gen 1605 finally: 1606 gen.close() 1607 1608 task = wait_for_future() 1609 self.assertRaises( 1610 RuntimeError, 1611 self.loop.run_until_complete, task) 1612 1613 def test_coroutine_non_gen_function(self): 1614 with self.assertWarns(DeprecationWarning): 1615 @asyncio.coroutine 1616 def func(): 1617 return 'test' 1618 1619 self.assertTrue(asyncio.iscoroutinefunction(func)) 1620 1621 coro = func() 1622 self.assertTrue(asyncio.iscoroutine(coro)) 1623 1624 res = self.loop.run_until_complete(coro) 1625 self.assertEqual(res, 'test') 1626 1627 def test_coroutine_non_gen_function_return_future(self): 1628 fut = self.new_future(self.loop) 1629 1630 with self.assertWarns(DeprecationWarning): 1631 @asyncio.coroutine 1632 def func(): 1633 return fut 1634 1635 async def coro(): 1636 fut.set_result('test') 1637 1638 t1 = self.new_task(self.loop, func()) 1639 t2 = self.new_task(self.loop, coro()) 1640 res = self.loop.run_until_complete(t1) 1641 self.assertEqual(res, 'test') 1642 self.assertIsNone(t2.result()) 1643 1644 1645 def test_current_task_deprecated(self): 1646 Task = self.__class__.Task 1647 1648 with self.assertWarns(DeprecationWarning): 1649 self.assertIsNone(Task.current_task(loop=self.loop)) 1650 1651 async def coro(loop): 1652 with self.assertWarns(DeprecationWarning): 1653 self.assertIs(Task.current_task(loop=loop), task) 1654 1655 # See http://bugs.python.org/issue29271 for details: 1656 asyncio.set_event_loop(loop) 1657 try: 1658 with self.assertWarns(DeprecationWarning): 1659 self.assertIs(Task.current_task(None), task) 1660 with self.assertWarns(DeprecationWarning): 1661 self.assertIs(Task.current_task(), task) 1662 finally: 1663 asyncio.set_event_loop(None) 1664 1665 task = self.new_task(self.loop, coro(self.loop)) 1666 self.loop.run_until_complete(task) 1667 with self.assertWarns(DeprecationWarning): 1668 self.assertIsNone(Task.current_task(loop=self.loop)) 1669 1670 def test_current_task(self): 1671 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1672 1673 async def coro(loop): 1674 self.assertIs(asyncio.current_task(loop=loop), task) 1675 1676 self.assertIs(asyncio.current_task(None), task) 1677 self.assertIs(asyncio.current_task(), task) 1678 1679 task = self.new_task(self.loop, coro(self.loop)) 1680 self.loop.run_until_complete(task) 1681 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1682 1683 def test_current_task_with_interleaving_tasks(self): 1684 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1685 1686 fut1 = self.new_future(self.loop) 1687 fut2 = self.new_future(self.loop) 1688 1689 async def coro1(loop): 1690 self.assertTrue(asyncio.current_task(loop=loop) is task1) 1691 await fut1 1692 self.assertTrue(asyncio.current_task(loop=loop) is task1) 1693 fut2.set_result(True) 1694 1695 async def coro2(loop): 1696 self.assertTrue(asyncio.current_task(loop=loop) is task2) 1697 fut1.set_result(True) 1698 await fut2 1699 self.assertTrue(asyncio.current_task(loop=loop) is task2) 1700 1701 task1 = self.new_task(self.loop, coro1(self.loop)) 1702 task2 = self.new_task(self.loop, coro2(self.loop)) 1703 1704 self.loop.run_until_complete(asyncio.wait((task1, task2))) 1705 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1706 1707 # Some thorough tests for cancellation propagation through 1708 # coroutines, tasks and wait(). 1709 1710 def test_yield_future_passes_cancel(self): 1711 # Cancelling outer() cancels inner() cancels waiter. 1712 proof = 0 1713 waiter = self.new_future(self.loop) 1714 1715 async def inner(): 1716 nonlocal proof 1717 try: 1718 await waiter 1719 except asyncio.CancelledError: 1720 proof += 1 1721 raise 1722 else: 1723 self.fail('got past sleep() in inner()') 1724 1725 async def outer(): 1726 nonlocal proof 1727 try: 1728 await inner() 1729 except asyncio.CancelledError: 1730 proof += 100 # Expect this path. 1731 else: 1732 proof += 10 1733 1734 f = asyncio.ensure_future(outer(), loop=self.loop) 1735 test_utils.run_briefly(self.loop) 1736 f.cancel() 1737 self.loop.run_until_complete(f) 1738 self.assertEqual(proof, 101) 1739 self.assertTrue(waiter.cancelled()) 1740 1741 def test_yield_wait_does_not_shield_cancel(self): 1742 # Cancelling outer() makes wait() return early, leaves inner() 1743 # running. 1744 proof = 0 1745 waiter = self.new_future(self.loop) 1746 1747 async def inner(): 1748 nonlocal proof 1749 await waiter 1750 proof += 1 1751 1752 async def outer(): 1753 nonlocal proof 1754 d, p = await asyncio.wait([inner()]) 1755 proof += 100 1756 1757 f = asyncio.ensure_future(outer(), loop=self.loop) 1758 test_utils.run_briefly(self.loop) 1759 f.cancel() 1760 self.assertRaises( 1761 asyncio.CancelledError, self.loop.run_until_complete, f) 1762 waiter.set_result(None) 1763 test_utils.run_briefly(self.loop) 1764 self.assertEqual(proof, 1) 1765 1766 def test_shield_result(self): 1767 inner = self.new_future(self.loop) 1768 outer = asyncio.shield(inner) 1769 inner.set_result(42) 1770 res = self.loop.run_until_complete(outer) 1771 self.assertEqual(res, 42) 1772 1773 def test_shield_exception(self): 1774 inner = self.new_future(self.loop) 1775 outer = asyncio.shield(inner) 1776 test_utils.run_briefly(self.loop) 1777 exc = RuntimeError('expected') 1778 inner.set_exception(exc) 1779 test_utils.run_briefly(self.loop) 1780 self.assertIs(outer.exception(), exc) 1781 1782 def test_shield_cancel_inner(self): 1783 inner = self.new_future(self.loop) 1784 outer = asyncio.shield(inner) 1785 test_utils.run_briefly(self.loop) 1786 inner.cancel() 1787 test_utils.run_briefly(self.loop) 1788 self.assertTrue(outer.cancelled()) 1789 1790 def test_shield_cancel_outer(self): 1791 inner = self.new_future(self.loop) 1792 outer = asyncio.shield(inner) 1793 test_utils.run_briefly(self.loop) 1794 outer.cancel() 1795 test_utils.run_briefly(self.loop) 1796 self.assertTrue(outer.cancelled()) 1797 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 1798 1799 def test_shield_shortcut(self): 1800 fut = self.new_future(self.loop) 1801 fut.set_result(42) 1802 res = self.loop.run_until_complete(asyncio.shield(fut)) 1803 self.assertEqual(res, 42) 1804 1805 def test_shield_effect(self): 1806 # Cancelling outer() does not affect inner(). 1807 proof = 0 1808 waiter = self.new_future(self.loop) 1809 1810 async def inner(): 1811 nonlocal proof 1812 await waiter 1813 proof += 1 1814 1815 async def outer(): 1816 nonlocal proof 1817 await asyncio.shield(inner()) 1818 proof += 100 1819 1820 f = asyncio.ensure_future(outer(), loop=self.loop) 1821 test_utils.run_briefly(self.loop) 1822 f.cancel() 1823 with self.assertRaises(asyncio.CancelledError): 1824 self.loop.run_until_complete(f) 1825 waiter.set_result(None) 1826 test_utils.run_briefly(self.loop) 1827 self.assertEqual(proof, 1) 1828 1829 def test_shield_gather(self): 1830 child1 = self.new_future(self.loop) 1831 child2 = self.new_future(self.loop) 1832 parent = asyncio.gather(child1, child2) 1833 outer = asyncio.shield(parent) 1834 test_utils.run_briefly(self.loop) 1835 outer.cancel() 1836 test_utils.run_briefly(self.loop) 1837 self.assertTrue(outer.cancelled()) 1838 child1.set_result(1) 1839 child2.set_result(2) 1840 test_utils.run_briefly(self.loop) 1841 self.assertEqual(parent.result(), [1, 2]) 1842 1843 def test_gather_shield(self): 1844 child1 = self.new_future(self.loop) 1845 child2 = self.new_future(self.loop) 1846 inner1 = asyncio.shield(child1) 1847 inner2 = asyncio.shield(child2) 1848 parent = asyncio.gather(inner1, inner2) 1849 test_utils.run_briefly(self.loop) 1850 parent.cancel() 1851 # This should cancel inner1 and inner2 but bot child1 and child2. 1852 test_utils.run_briefly(self.loop) 1853 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 1854 self.assertTrue(inner1.cancelled()) 1855 self.assertTrue(inner2.cancelled()) 1856 child1.set_result(1) 1857 child2.set_result(2) 1858 test_utils.run_briefly(self.loop) 1859 1860 def test_as_completed_invalid_args(self): 1861 fut = self.new_future(self.loop) 1862 1863 # as_completed() expects a list of futures, not a future instance 1864 self.assertRaises(TypeError, self.loop.run_until_complete, 1865 asyncio.as_completed(fut, loop=self.loop)) 1866 coro = coroutine_function() 1867 self.assertRaises(TypeError, self.loop.run_until_complete, 1868 asyncio.as_completed(coro, loop=self.loop)) 1869 coro.close() 1870 1871 def test_wait_invalid_args(self): 1872 fut = self.new_future(self.loop) 1873 1874 # wait() expects a list of futures, not a future instance 1875 self.assertRaises(TypeError, self.loop.run_until_complete, 1876 asyncio.wait(fut)) 1877 coro = coroutine_function() 1878 self.assertRaises(TypeError, self.loop.run_until_complete, 1879 asyncio.wait(coro)) 1880 coro.close() 1881 1882 # wait() expects at least a future 1883 self.assertRaises(ValueError, self.loop.run_until_complete, 1884 asyncio.wait([])) 1885 1886 def test_corowrapper_mocks_generator(self): 1887 1888 def check(): 1889 # A function that asserts various things. 1890 # Called twice, with different debug flag values. 1891 1892 with self.assertWarns(DeprecationWarning): 1893 @asyncio.coroutine 1894 def coro(): 1895 # The actual coroutine. 1896 self.assertTrue(gen.gi_running) 1897 yield from fut 1898 1899 # A completed Future used to run the coroutine. 1900 fut = self.new_future(self.loop) 1901 fut.set_result(None) 1902 1903 # Call the coroutine. 1904 gen = coro() 1905 1906 # Check some properties. 1907 self.assertTrue(asyncio.iscoroutine(gen)) 1908 self.assertIsInstance(gen.gi_frame, types.FrameType) 1909 self.assertFalse(gen.gi_running) 1910 self.assertIsInstance(gen.gi_code, types.CodeType) 1911 1912 # Run it. 1913 self.loop.run_until_complete(gen) 1914 1915 # The frame should have changed. 1916 self.assertIsNone(gen.gi_frame) 1917 1918 # Test with debug flag cleared. 1919 with set_coroutine_debug(False): 1920 check() 1921 1922 # Test with debug flag set. 1923 with set_coroutine_debug(True): 1924 check() 1925 1926 def test_yield_from_corowrapper(self): 1927 with set_coroutine_debug(True): 1928 with self.assertWarns(DeprecationWarning): 1929 @asyncio.coroutine 1930 def t1(): 1931 return (yield from t2()) 1932 1933 with self.assertWarns(DeprecationWarning): 1934 @asyncio.coroutine 1935 def t2(): 1936 f = self.new_future(self.loop) 1937 self.new_task(self.loop, t3(f)) 1938 return (yield from f) 1939 1940 with self.assertWarns(DeprecationWarning): 1941 @asyncio.coroutine 1942 def t3(f): 1943 f.set_result((1, 2, 3)) 1944 1945 task = self.new_task(self.loop, t1()) 1946 val = self.loop.run_until_complete(task) 1947 self.assertEqual(val, (1, 2, 3)) 1948 1949 def test_yield_from_corowrapper_send(self): 1950 def foo(): 1951 a = yield 1952 return a 1953 1954 def call(arg): 1955 cw = asyncio.coroutines.CoroWrapper(foo()) 1956 cw.send(None) 1957 try: 1958 cw.send(arg) 1959 except StopIteration as ex: 1960 return ex.args[0] 1961 else: 1962 raise AssertionError('StopIteration was expected') 1963 1964 self.assertEqual(call((1, 2)), (1, 2)) 1965 self.assertEqual(call('spam'), 'spam') 1966 1967 def test_corowrapper_weakref(self): 1968 wd = weakref.WeakValueDictionary() 1969 def foo(): yield from [] 1970 cw = asyncio.coroutines.CoroWrapper(foo()) 1971 wd['cw'] = cw # Would fail without __weakref__ slot. 1972 cw.gen = None # Suppress warning from __del__. 1973 1974 def test_corowrapper_throw(self): 1975 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 1976 def foo(): 1977 value = None 1978 while True: 1979 try: 1980 value = yield value 1981 except Exception as e: 1982 value = e 1983 1984 exception = Exception("foo") 1985 cw = asyncio.coroutines.CoroWrapper(foo()) 1986 cw.send(None) 1987 self.assertIs(exception, cw.throw(exception)) 1988 1989 cw = asyncio.coroutines.CoroWrapper(foo()) 1990 cw.send(None) 1991 self.assertIs(exception, cw.throw(Exception, exception)) 1992 1993 cw = asyncio.coroutines.CoroWrapper(foo()) 1994 cw.send(None) 1995 exception = cw.throw(Exception, "foo") 1996 self.assertIsInstance(exception, Exception) 1997 self.assertEqual(exception.args, ("foo", )) 1998 1999 cw = asyncio.coroutines.CoroWrapper(foo()) 2000 cw.send(None) 2001 exception = cw.throw(Exception, "foo", None) 2002 self.assertIsInstance(exception, Exception) 2003 self.assertEqual(exception.args, ("foo", )) 2004 2005 def test_all_tasks_deprecated(self): 2006 Task = self.__class__.Task 2007 2008 async def coro(): 2009 with self.assertWarns(DeprecationWarning): 2010 assert Task.all_tasks(self.loop) == {t} 2011 2012 t = self.new_task(self.loop, coro()) 2013 self.loop.run_until_complete(t) 2014 2015 def test_log_destroyed_pending_task(self): 2016 Task = self.__class__.Task 2017 2018 with self.assertWarns(DeprecationWarning): 2019 @asyncio.coroutine 2020 def kill_me(loop): 2021 future = self.new_future(loop) 2022 yield from future 2023 # at this point, the only reference to kill_me() task is 2024 # the Task._wakeup() method in future._callbacks 2025 raise Exception("code never reached") 2026 2027 mock_handler = mock.Mock() 2028 self.loop.set_debug(True) 2029 self.loop.set_exception_handler(mock_handler) 2030 2031 # schedule the task 2032 coro = kill_me(self.loop) 2033 task = asyncio.ensure_future(coro, loop=self.loop) 2034 2035 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2036 2037 # See http://bugs.python.org/issue29271 for details: 2038 asyncio.set_event_loop(self.loop) 2039 try: 2040 with self.assertWarns(DeprecationWarning): 2041 self.assertEqual(Task.all_tasks(), {task}) 2042 with self.assertWarns(DeprecationWarning): 2043 self.assertEqual(Task.all_tasks(None), {task}) 2044 finally: 2045 asyncio.set_event_loop(None) 2046 2047 # execute the task so it waits for future 2048 self.loop._run_once() 2049 self.assertEqual(len(self.loop._ready), 0) 2050 2051 # remove the future used in kill_me(), and references to the task 2052 del coro.gi_frame.f_locals['future'] 2053 coro = None 2054 source_traceback = task._source_traceback 2055 task = None 2056 2057 # no more reference to kill_me() task: the task is destroyed by the GC 2058 support.gc_collect() 2059 2060 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2061 2062 mock_handler.assert_called_with(self.loop, { 2063 'message': 'Task was destroyed but it is pending!', 2064 'task': mock.ANY, 2065 'source_traceback': source_traceback, 2066 }) 2067 mock_handler.reset_mock() 2068 2069 @mock.patch('asyncio.base_events.logger') 2070 def test_tb_logger_not_called_after_cancel(self, m_log): 2071 loop = asyncio.new_event_loop() 2072 self.set_event_loop(loop) 2073 2074 async def coro(): 2075 raise TypeError 2076 2077 async def runner(): 2078 task = self.new_task(loop, coro()) 2079 await asyncio.sleep(0.05) 2080 task.cancel() 2081 task = None 2082 2083 loop.run_until_complete(runner()) 2084 self.assertFalse(m_log.error.called) 2085 2086 @mock.patch('asyncio.coroutines.logger') 2087 def test_coroutine_never_yielded(self, m_log): 2088 with set_coroutine_debug(True): 2089 with self.assertWarns(DeprecationWarning): 2090 @asyncio.coroutine 2091 def coro_noop(): 2092 pass 2093 2094 tb_filename = __file__ 2095 tb_lineno = sys._getframe().f_lineno + 2 2096 # create a coroutine object but don't use it 2097 coro_noop() 2098 support.gc_collect() 2099 2100 self.assertTrue(m_log.error.called) 2101 message = m_log.error.call_args[0][0] 2102 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2103 2104 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2105 r'was never yielded from\n' 2106 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2107 r'.*\n' 2108 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2109 r' coro_noop\(\)$' 2110 % (re.escape(coro_noop.__qualname__), 2111 re.escape(func_filename), func_lineno, 2112 re.escape(tb_filename), tb_lineno)) 2113 2114 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2115 2116 def test_return_coroutine_from_coroutine(self): 2117 """Return of @asyncio.coroutine()-wrapped function generator object 2118 from @asyncio.coroutine()-wrapped function should have same effect as 2119 returning generator object or Future.""" 2120 def check(): 2121 with self.assertWarns(DeprecationWarning): 2122 @asyncio.coroutine 2123 def outer_coro(): 2124 with self.assertWarns(DeprecationWarning): 2125 @asyncio.coroutine 2126 def inner_coro(): 2127 return 1 2128 2129 return inner_coro() 2130 2131 result = self.loop.run_until_complete(outer_coro()) 2132 self.assertEqual(result, 1) 2133 2134 # Test with debug flag cleared. 2135 with set_coroutine_debug(False): 2136 check() 2137 2138 # Test with debug flag set. 2139 with set_coroutine_debug(True): 2140 check() 2141 2142 def test_task_source_traceback(self): 2143 self.loop.set_debug(True) 2144 2145 task = self.new_task(self.loop, coroutine_function()) 2146 lineno = sys._getframe().f_lineno - 1 2147 self.assertIsInstance(task._source_traceback, list) 2148 self.assertEqual(task._source_traceback[-2][:3], 2149 (__file__, 2150 lineno, 2151 'test_task_source_traceback')) 2152 self.loop.run_until_complete(task) 2153 2154 def _test_cancel_wait_for(self, timeout): 2155 loop = asyncio.new_event_loop() 2156 self.addCleanup(loop.close) 2157 2158 async def blocking_coroutine(): 2159 fut = self.new_future(loop) 2160 # Block: fut result is never set 2161 await fut 2162 2163 task = loop.create_task(blocking_coroutine()) 2164 2165 wait = loop.create_task(asyncio.wait_for(task, timeout)) 2166 loop.call_soon(wait.cancel) 2167 2168 self.assertRaises(asyncio.CancelledError, 2169 loop.run_until_complete, wait) 2170 2171 # Python issue #23219: cancelling the wait must also cancel the task 2172 self.assertTrue(task.cancelled()) 2173 2174 def test_cancel_blocking_wait_for(self): 2175 self._test_cancel_wait_for(None) 2176 2177 def test_cancel_wait_for(self): 2178 self._test_cancel_wait_for(60.0) 2179 2180 def test_cancel_gather_1(self): 2181 """Ensure that a gathering future refuses to be cancelled once all 2182 children are done""" 2183 loop = asyncio.new_event_loop() 2184 self.addCleanup(loop.close) 2185 2186 fut = self.new_future(loop) 2187 # The indirection fut->child_coro is needed since otherwise the 2188 # gathering task is done at the same time as the child future 2189 def child_coro(): 2190 return (yield from fut) 2191 gather_future = asyncio.gather(child_coro(), loop=loop) 2192 gather_task = asyncio.ensure_future(gather_future, loop=loop) 2193 2194 cancel_result = None 2195 def cancelling_callback(_): 2196 nonlocal cancel_result 2197 cancel_result = gather_task.cancel() 2198 fut.add_done_callback(cancelling_callback) 2199 2200 fut.set_result(42) # calls the cancelling_callback after fut is done() 2201 2202 # At this point the task should complete. 2203 loop.run_until_complete(gather_task) 2204 2205 # Python issue #26923: asyncio.gather drops cancellation 2206 self.assertEqual(cancel_result, False) 2207 self.assertFalse(gather_task.cancelled()) 2208 self.assertEqual(gather_task.result(), [42]) 2209 2210 def test_cancel_gather_2(self): 2211 loop = asyncio.new_event_loop() 2212 self.addCleanup(loop.close) 2213 2214 async def test(): 2215 time = 0 2216 while True: 2217 time += 0.05 2218 await asyncio.gather(asyncio.sleep(0.05), 2219 return_exceptions=True, 2220 loop=loop) 2221 if time > 1: 2222 return 2223 2224 async def main(): 2225 qwe = self.new_task(loop, test()) 2226 await asyncio.sleep(0.2) 2227 qwe.cancel() 2228 try: 2229 await qwe 2230 except asyncio.CancelledError: 2231 pass 2232 else: 2233 self.fail('gather did not propagate the cancellation request') 2234 2235 loop.run_until_complete(main()) 2236 2237 def test_exception_traceback(self): 2238 # See http://bugs.python.org/issue28843 2239 2240 async def foo(): 2241 1 / 0 2242 2243 async def main(): 2244 task = self.new_task(self.loop, foo()) 2245 await asyncio.sleep(0) # skip one loop iteration 2246 self.assertIsNotNone(task.exception().__traceback__) 2247 2248 self.loop.run_until_complete(main()) 2249 2250 @mock.patch('asyncio.base_events.logger') 2251 def test_error_in_call_soon(self, m_log): 2252 def call_soon(callback, *args, **kwargs): 2253 raise ValueError 2254 self.loop.call_soon = call_soon 2255 2256 with self.assertWarns(DeprecationWarning): 2257 @asyncio.coroutine 2258 def coro(): 2259 pass 2260 2261 self.assertFalse(m_log.error.called) 2262 2263 with self.assertRaises(ValueError): 2264 gen = coro() 2265 try: 2266 self.new_task(self.loop, gen) 2267 finally: 2268 gen.close() 2269 2270 self.assertTrue(m_log.error.called) 2271 message = m_log.error.call_args[0][0] 2272 self.assertIn('Task was destroyed but it is pending', message) 2273 2274 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2275 2276 def test_create_task_with_noncoroutine(self): 2277 with self.assertRaisesRegex(TypeError, 2278 "a coroutine was expected, got 123"): 2279 self.new_task(self.loop, 123) 2280 2281 # test it for the second time to ensure that caching 2282 # in asyncio.iscoroutine() doesn't break things. 2283 with self.assertRaisesRegex(TypeError, 2284 "a coroutine was expected, got 123"): 2285 self.new_task(self.loop, 123) 2286 2287 def test_create_task_with_oldstyle_coroutine(self): 2288 2289 with self.assertWarns(DeprecationWarning): 2290 @asyncio.coroutine 2291 def coro(): 2292 pass 2293 2294 task = self.new_task(self.loop, coro()) 2295 self.assertIsInstance(task, self.Task) 2296 self.loop.run_until_complete(task) 2297 2298 # test it for the second time to ensure that caching 2299 # in asyncio.iscoroutine() doesn't break things. 2300 task = self.new_task(self.loop, coro()) 2301 self.assertIsInstance(task, self.Task) 2302 self.loop.run_until_complete(task) 2303 2304 def test_create_task_with_async_function(self): 2305 2306 async def coro(): 2307 pass 2308 2309 task = self.new_task(self.loop, coro()) 2310 self.assertIsInstance(task, self.Task) 2311 self.loop.run_until_complete(task) 2312 2313 # test it for the second time to ensure that caching 2314 # in asyncio.iscoroutine() doesn't break things. 2315 task = self.new_task(self.loop, coro()) 2316 self.assertIsInstance(task, self.Task) 2317 self.loop.run_until_complete(task) 2318 2319 def test_create_task_with_asynclike_function(self): 2320 task = self.new_task(self.loop, CoroLikeObject()) 2321 self.assertIsInstance(task, self.Task) 2322 self.assertEqual(self.loop.run_until_complete(task), 42) 2323 2324 # test it for the second time to ensure that caching 2325 # in asyncio.iscoroutine() doesn't break things. 2326 task = self.new_task(self.loop, CoroLikeObject()) 2327 self.assertIsInstance(task, self.Task) 2328 self.assertEqual(self.loop.run_until_complete(task), 42) 2329 2330 def test_bare_create_task(self): 2331 2332 async def inner(): 2333 return 1 2334 2335 async def coro(): 2336 task = asyncio.create_task(inner()) 2337 self.assertIsInstance(task, self.Task) 2338 ret = await task 2339 self.assertEqual(1, ret) 2340 2341 self.loop.run_until_complete(coro()) 2342 2343 def test_bare_create_named_task(self): 2344 2345 async def coro_noop(): 2346 pass 2347 2348 async def coro(): 2349 task = asyncio.create_task(coro_noop(), name='No-op') 2350 self.assertEqual(task.get_name(), 'No-op') 2351 await task 2352 2353 self.loop.run_until_complete(coro()) 2354 2355 def test_context_1(self): 2356 cvar = contextvars.ContextVar('cvar', default='nope') 2357 2358 async def sub(): 2359 await asyncio.sleep(0.01) 2360 self.assertEqual(cvar.get(), 'nope') 2361 cvar.set('something else') 2362 2363 async def main(): 2364 self.assertEqual(cvar.get(), 'nope') 2365 subtask = self.new_task(loop, sub()) 2366 cvar.set('yes') 2367 self.assertEqual(cvar.get(), 'yes') 2368 await subtask 2369 self.assertEqual(cvar.get(), 'yes') 2370 2371 loop = asyncio.new_event_loop() 2372 try: 2373 task = self.new_task(loop, main()) 2374 loop.run_until_complete(task) 2375 finally: 2376 loop.close() 2377 2378 def test_context_2(self): 2379 cvar = contextvars.ContextVar('cvar', default='nope') 2380 2381 async def main(): 2382 def fut_on_done(fut): 2383 # This change must not pollute the context 2384 # of the "main()" task. 2385 cvar.set('something else') 2386 2387 self.assertEqual(cvar.get(), 'nope') 2388 2389 for j in range(2): 2390 fut = self.new_future(loop) 2391 fut.add_done_callback(fut_on_done) 2392 cvar.set(f'yes{j}') 2393 loop.call_soon(fut.set_result, None) 2394 await fut 2395 self.assertEqual(cvar.get(), f'yes{j}') 2396 2397 for i in range(3): 2398 # Test that task passed its context to add_done_callback: 2399 cvar.set(f'yes{i}-{j}') 2400 await asyncio.sleep(0.001) 2401 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2402 2403 loop = asyncio.new_event_loop() 2404 try: 2405 task = self.new_task(loop, main()) 2406 loop.run_until_complete(task) 2407 finally: 2408 loop.close() 2409 2410 self.assertEqual(cvar.get(), 'nope') 2411 2412 def test_context_3(self): 2413 # Run 100 Tasks in parallel, each modifying cvar. 2414 2415 cvar = contextvars.ContextVar('cvar', default=-1) 2416 2417 async def sub(num): 2418 for i in range(10): 2419 cvar.set(num + i) 2420 await asyncio.sleep(random.uniform(0.001, 0.05)) 2421 self.assertEqual(cvar.get(), num + i) 2422 2423 async def main(): 2424 tasks = [] 2425 for i in range(100): 2426 task = loop.create_task(sub(random.randint(0, 10))) 2427 tasks.append(task) 2428 2429 await asyncio.gather(*tasks, loop=loop) 2430 2431 loop = asyncio.new_event_loop() 2432 try: 2433 loop.run_until_complete(main()) 2434 finally: 2435 loop.close() 2436 2437 self.assertEqual(cvar.get(), -1) 2438 2439 def test_get_coro(self): 2440 loop = asyncio.new_event_loop() 2441 coro = coroutine_function() 2442 try: 2443 task = self.new_task(loop, coro) 2444 loop.run_until_complete(task) 2445 self.assertIs(task.get_coro(), coro) 2446 finally: 2447 loop.close() 2448 2449 2450def add_subclass_tests(cls): 2451 BaseTask = cls.Task 2452 BaseFuture = cls.Future 2453 2454 if BaseTask is None or BaseFuture is None: 2455 return cls 2456 2457 class CommonFuture: 2458 def __init__(self, *args, **kwargs): 2459 self.calls = collections.defaultdict(lambda: 0) 2460 super().__init__(*args, **kwargs) 2461 2462 def add_done_callback(self, *args, **kwargs): 2463 self.calls['add_done_callback'] += 1 2464 return super().add_done_callback(*args, **kwargs) 2465 2466 class Task(CommonFuture, BaseTask): 2467 pass 2468 2469 class Future(CommonFuture, BaseFuture): 2470 pass 2471 2472 def test_subclasses_ctask_cfuture(self): 2473 fut = self.Future(loop=self.loop) 2474 2475 async def func(): 2476 self.loop.call_soon(lambda: fut.set_result('spam')) 2477 return await fut 2478 2479 task = self.Task(func(), loop=self.loop) 2480 2481 result = self.loop.run_until_complete(task) 2482 2483 self.assertEqual(result, 'spam') 2484 2485 self.assertEqual( 2486 dict(task.calls), 2487 {'add_done_callback': 1}) 2488 2489 self.assertEqual( 2490 dict(fut.calls), 2491 {'add_done_callback': 1}) 2492 2493 # Add patched Task & Future back to the test case 2494 cls.Task = Task 2495 cls.Future = Future 2496 2497 # Add an extra unit-test 2498 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2499 2500 # Disable the "test_task_source_traceback" test 2501 # (the test is hardcoded for a particular call stack, which 2502 # is slightly different for Task subclasses) 2503 cls.test_task_source_traceback = None 2504 2505 return cls 2506 2507 2508class SetMethodsTest: 2509 2510 def test_set_result_causes_invalid_state(self): 2511 Future = type(self).Future 2512 self.loop.call_exception_handler = exc_handler = mock.Mock() 2513 2514 async def foo(): 2515 await asyncio.sleep(0.1) 2516 return 10 2517 2518 coro = foo() 2519 task = self.new_task(self.loop, coro) 2520 Future.set_result(task, 'spam') 2521 2522 self.assertEqual( 2523 self.loop.run_until_complete(task), 2524 'spam') 2525 2526 exc_handler.assert_called_once() 2527 exc = exc_handler.call_args[0][0]['exception'] 2528 with self.assertRaisesRegex(asyncio.InvalidStateError, 2529 r'step\(\): already done'): 2530 raise exc 2531 2532 coro.close() 2533 2534 def test_set_exception_causes_invalid_state(self): 2535 class MyExc(Exception): 2536 pass 2537 2538 Future = type(self).Future 2539 self.loop.call_exception_handler = exc_handler = mock.Mock() 2540 2541 async def foo(): 2542 await asyncio.sleep(0.1) 2543 return 10 2544 2545 coro = foo() 2546 task = self.new_task(self.loop, coro) 2547 Future.set_exception(task, MyExc()) 2548 2549 with self.assertRaises(MyExc): 2550 self.loop.run_until_complete(task) 2551 2552 exc_handler.assert_called_once() 2553 exc = exc_handler.call_args[0][0]['exception'] 2554 with self.assertRaisesRegex(asyncio.InvalidStateError, 2555 r'step\(\): already done'): 2556 raise exc 2557 2558 coro.close() 2559 2560 2561@unittest.skipUnless(hasattr(futures, '_CFuture') and 2562 hasattr(tasks, '_CTask'), 2563 'requires the C _asyncio module') 2564class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2565 test_utils.TestCase): 2566 2567 Task = getattr(tasks, '_CTask', None) 2568 Future = getattr(futures, '_CFuture', None) 2569 2570 @support.refcount_test 2571 def test_refleaks_in_task___init__(self): 2572 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2573 async def coro(): 2574 pass 2575 task = self.new_task(self.loop, coro()) 2576 self.loop.run_until_complete(task) 2577 refs_before = gettotalrefcount() 2578 for i in range(100): 2579 task.__init__(coro(), loop=self.loop) 2580 self.loop.run_until_complete(task) 2581 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2582 2583 def test_del__log_destroy_pending_segfault(self): 2584 async def coro(): 2585 pass 2586 task = self.new_task(self.loop, coro()) 2587 self.loop.run_until_complete(task) 2588 with self.assertRaises(AttributeError): 2589 del task._log_destroy_pending 2590 2591 2592@unittest.skipUnless(hasattr(futures, '_CFuture') and 2593 hasattr(tasks, '_CTask'), 2594 'requires the C _asyncio module') 2595@add_subclass_tests 2596class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2597 2598 Task = getattr(tasks, '_CTask', None) 2599 Future = getattr(futures, '_CFuture', None) 2600 2601 2602@unittest.skipUnless(hasattr(tasks, '_CTask'), 2603 'requires the C _asyncio module') 2604@add_subclass_tests 2605class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2606 2607 Task = getattr(tasks, '_CTask', None) 2608 Future = futures._PyFuture 2609 2610 2611@unittest.skipUnless(hasattr(futures, '_CFuture'), 2612 'requires the C _asyncio module') 2613@add_subclass_tests 2614class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2615 2616 Future = getattr(futures, '_CFuture', None) 2617 Task = tasks._PyTask 2618 2619 2620@unittest.skipUnless(hasattr(tasks, '_CTask'), 2621 'requires the C _asyncio module') 2622class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2623 2624 Task = getattr(tasks, '_CTask', None) 2625 Future = futures._PyFuture 2626 2627 2628@unittest.skipUnless(hasattr(futures, '_CFuture'), 2629 'requires the C _asyncio module') 2630class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2631 2632 Task = tasks._PyTask 2633 Future = getattr(futures, '_CFuture', None) 2634 2635 2636class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2637 test_utils.TestCase): 2638 2639 Task = tasks._PyTask 2640 Future = futures._PyFuture 2641 2642 2643@add_subclass_tests 2644class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2645 Task = tasks._PyTask 2646 Future = futures._PyFuture 2647 2648 2649@unittest.skipUnless(hasattr(tasks, '_CTask'), 2650 'requires the C _asyncio module') 2651class CTask_Future_Tests(test_utils.TestCase): 2652 2653 def test_foobar(self): 2654 class Fut(asyncio.Future): 2655 @property 2656 def get_loop(self): 2657 raise AttributeError 2658 2659 async def coro(): 2660 await fut 2661 return 'spam' 2662 2663 self.loop = asyncio.new_event_loop() 2664 try: 2665 fut = Fut(loop=self.loop) 2666 self.loop.call_later(0.1, fut.set_result, 1) 2667 task = self.loop.create_task(coro()) 2668 res = self.loop.run_until_complete(task) 2669 finally: 2670 self.loop.close() 2671 2672 self.assertEqual(res, 'spam') 2673 2674 2675class BaseTaskIntrospectionTests: 2676 _register_task = None 2677 _unregister_task = None 2678 _enter_task = None 2679 _leave_task = None 2680 2681 def test__register_task_1(self): 2682 class TaskLike: 2683 @property 2684 def _loop(self): 2685 return loop 2686 2687 def done(self): 2688 return False 2689 2690 task = TaskLike() 2691 loop = mock.Mock() 2692 2693 self.assertEqual(asyncio.all_tasks(loop), set()) 2694 self._register_task(task) 2695 self.assertEqual(asyncio.all_tasks(loop), {task}) 2696 self._unregister_task(task) 2697 2698 def test__register_task_2(self): 2699 class TaskLike: 2700 def get_loop(self): 2701 return loop 2702 2703 def done(self): 2704 return False 2705 2706 task = TaskLike() 2707 loop = mock.Mock() 2708 2709 self.assertEqual(asyncio.all_tasks(loop), set()) 2710 self._register_task(task) 2711 self.assertEqual(asyncio.all_tasks(loop), {task}) 2712 self._unregister_task(task) 2713 2714 def test__register_task_3(self): 2715 class TaskLike: 2716 def get_loop(self): 2717 return loop 2718 2719 def done(self): 2720 return True 2721 2722 task = TaskLike() 2723 loop = mock.Mock() 2724 2725 self.assertEqual(asyncio.all_tasks(loop), set()) 2726 self._register_task(task) 2727 self.assertEqual(asyncio.all_tasks(loop), set()) 2728 with self.assertWarns(DeprecationWarning): 2729 self.assertEqual(asyncio.Task.all_tasks(loop), {task}) 2730 self._unregister_task(task) 2731 2732 def test__enter_task(self): 2733 task = mock.Mock() 2734 loop = mock.Mock() 2735 self.assertIsNone(asyncio.current_task(loop)) 2736 self._enter_task(loop, task) 2737 self.assertIs(asyncio.current_task(loop), task) 2738 self._leave_task(loop, task) 2739 2740 def test__enter_task_failure(self): 2741 task1 = mock.Mock() 2742 task2 = mock.Mock() 2743 loop = mock.Mock() 2744 self._enter_task(loop, task1) 2745 with self.assertRaises(RuntimeError): 2746 self._enter_task(loop, task2) 2747 self.assertIs(asyncio.current_task(loop), task1) 2748 self._leave_task(loop, task1) 2749 2750 def test__leave_task(self): 2751 task = mock.Mock() 2752 loop = mock.Mock() 2753 self._enter_task(loop, task) 2754 self._leave_task(loop, task) 2755 self.assertIsNone(asyncio.current_task(loop)) 2756 2757 def test__leave_task_failure1(self): 2758 task1 = mock.Mock() 2759 task2 = mock.Mock() 2760 loop = mock.Mock() 2761 self._enter_task(loop, task1) 2762 with self.assertRaises(RuntimeError): 2763 self._leave_task(loop, task2) 2764 self.assertIs(asyncio.current_task(loop), task1) 2765 self._leave_task(loop, task1) 2766 2767 def test__leave_task_failure2(self): 2768 task = mock.Mock() 2769 loop = mock.Mock() 2770 with self.assertRaises(RuntimeError): 2771 self._leave_task(loop, task) 2772 self.assertIsNone(asyncio.current_task(loop)) 2773 2774 def test__unregister_task(self): 2775 task = mock.Mock() 2776 loop = mock.Mock() 2777 task.get_loop = lambda: loop 2778 self._register_task(task) 2779 self._unregister_task(task) 2780 self.assertEqual(asyncio.all_tasks(loop), set()) 2781 2782 def test__unregister_task_not_registered(self): 2783 task = mock.Mock() 2784 loop = mock.Mock() 2785 self._unregister_task(task) 2786 self.assertEqual(asyncio.all_tasks(loop), set()) 2787 2788 2789class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 2790 _register_task = staticmethod(tasks._py_register_task) 2791 _unregister_task = staticmethod(tasks._py_unregister_task) 2792 _enter_task = staticmethod(tasks._py_enter_task) 2793 _leave_task = staticmethod(tasks._py_leave_task) 2794 2795 2796@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 2797 'requires the C _asyncio module') 2798class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 2799 if hasattr(tasks, '_c_register_task'): 2800 _register_task = staticmethod(tasks._c_register_task) 2801 _unregister_task = staticmethod(tasks._c_unregister_task) 2802 _enter_task = staticmethod(tasks._c_enter_task) 2803 _leave_task = staticmethod(tasks._c_leave_task) 2804 else: 2805 _register_task = _unregister_task = _enter_task = _leave_task = None 2806 2807 2808class BaseCurrentLoopTests: 2809 2810 def setUp(self): 2811 super().setUp() 2812 self.loop = asyncio.new_event_loop() 2813 self.set_event_loop(self.loop) 2814 2815 def new_task(self, coro): 2816 raise NotImplementedError 2817 2818 def test_current_task_no_running_loop(self): 2819 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2820 2821 def test_current_task_no_running_loop_implicit(self): 2822 with self.assertRaises(RuntimeError): 2823 asyncio.current_task() 2824 2825 def test_current_task_with_implicit_loop(self): 2826 async def coro(): 2827 self.assertIs(asyncio.current_task(loop=self.loop), task) 2828 2829 self.assertIs(asyncio.current_task(None), task) 2830 self.assertIs(asyncio.current_task(), task) 2831 2832 task = self.new_task(coro()) 2833 self.loop.run_until_complete(task) 2834 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2835 2836 2837class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 2838 2839 def new_task(self, coro): 2840 return tasks._PyTask(coro, loop=self.loop) 2841 2842 2843@unittest.skipUnless(hasattr(tasks, '_CTask'), 2844 'requires the C _asyncio module') 2845class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 2846 2847 def new_task(self, coro): 2848 return getattr(tasks, '_CTask')(coro, loop=self.loop) 2849 2850 2851class GenericTaskTests(test_utils.TestCase): 2852 2853 def test_future_subclass(self): 2854 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 2855 2856 def test_asyncio_module_compiled(self): 2857 # Because of circular imports it's easy to make _asyncio 2858 # module non-importable. This is a simple test that will 2859 # fail on systems where C modules were successfully compiled 2860 # (hence the test for _functools), but _asyncio somehow didn't. 2861 try: 2862 import _functools 2863 except ImportError: 2864 pass 2865 else: 2866 try: 2867 import _asyncio 2868 except ImportError: 2869 self.fail('_asyncio module is missing') 2870 2871 2872class GatherTestsBase: 2873 2874 def setUp(self): 2875 super().setUp() 2876 self.one_loop = self.new_test_loop() 2877 self.other_loop = self.new_test_loop() 2878 self.set_event_loop(self.one_loop, cleanup=False) 2879 2880 def _run_loop(self, loop): 2881 while loop._ready: 2882 test_utils.run_briefly(loop) 2883 2884 def _check_success(self, **kwargs): 2885 a, b, c = [self.one_loop.create_future() for i in range(3)] 2886 fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) 2887 cb = test_utils.MockCallback() 2888 fut.add_done_callback(cb) 2889 b.set_result(1) 2890 a.set_result(2) 2891 self._run_loop(self.one_loop) 2892 self.assertEqual(cb.called, False) 2893 self.assertFalse(fut.done()) 2894 c.set_result(3) 2895 self._run_loop(self.one_loop) 2896 cb.assert_called_once_with(fut) 2897 self.assertEqual(fut.result(), [2, 1, 3]) 2898 2899 def test_success(self): 2900 self._check_success() 2901 self._check_success(return_exceptions=False) 2902 2903 def test_result_exception_success(self): 2904 self._check_success(return_exceptions=True) 2905 2906 def test_one_exception(self): 2907 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 2908 fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) 2909 cb = test_utils.MockCallback() 2910 fut.add_done_callback(cb) 2911 exc = ZeroDivisionError() 2912 a.set_result(1) 2913 b.set_exception(exc) 2914 self._run_loop(self.one_loop) 2915 self.assertTrue(fut.done()) 2916 cb.assert_called_once_with(fut) 2917 self.assertIs(fut.exception(), exc) 2918 # Does nothing 2919 c.set_result(3) 2920 d.cancel() 2921 e.set_exception(RuntimeError()) 2922 e.exception() 2923 2924 def test_return_exceptions(self): 2925 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 2926 fut = asyncio.gather(*self.wrap_futures(a, b, c, d), 2927 return_exceptions=True) 2928 cb = test_utils.MockCallback() 2929 fut.add_done_callback(cb) 2930 exc = ZeroDivisionError() 2931 exc2 = RuntimeError() 2932 b.set_result(1) 2933 c.set_exception(exc) 2934 a.set_result(3) 2935 self._run_loop(self.one_loop) 2936 self.assertFalse(fut.done()) 2937 d.set_exception(exc2) 2938 self._run_loop(self.one_loop) 2939 self.assertTrue(fut.done()) 2940 cb.assert_called_once_with(fut) 2941 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 2942 2943 def test_env_var_debug(self): 2944 code = '\n'.join(( 2945 'import asyncio.coroutines', 2946 'print(asyncio.coroutines._DEBUG)')) 2947 2948 # Test with -E to not fail if the unit test was run with 2949 # PYTHONASYNCIODEBUG set to a non-empty string 2950 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 2951 self.assertEqual(stdout.rstrip(), b'False') 2952 2953 sts, stdout, stderr = assert_python_ok('-c', code, 2954 PYTHONASYNCIODEBUG='', 2955 PYTHONDEVMODE='') 2956 self.assertEqual(stdout.rstrip(), b'False') 2957 2958 sts, stdout, stderr = assert_python_ok('-c', code, 2959 PYTHONASYNCIODEBUG='1', 2960 PYTHONDEVMODE='') 2961 self.assertEqual(stdout.rstrip(), b'True') 2962 2963 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 2964 PYTHONASYNCIODEBUG='1', 2965 PYTHONDEVMODE='') 2966 self.assertEqual(stdout.rstrip(), b'False') 2967 2968 # -X dev 2969 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 2970 '-c', code) 2971 self.assertEqual(stdout.rstrip(), b'True') 2972 2973 2974class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 2975 2976 def wrap_futures(self, *futures): 2977 return futures 2978 2979 def _check_empty_sequence(self, seq_or_iter): 2980 asyncio.set_event_loop(self.one_loop) 2981 self.addCleanup(asyncio.set_event_loop, None) 2982 fut = asyncio.gather(*seq_or_iter) 2983 self.assertIsInstance(fut, asyncio.Future) 2984 self.assertIs(fut._loop, self.one_loop) 2985 self._run_loop(self.one_loop) 2986 self.assertTrue(fut.done()) 2987 self.assertEqual(fut.result(), []) 2988 with self.assertWarns(DeprecationWarning): 2989 fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) 2990 self.assertIs(fut._loop, self.other_loop) 2991 2992 def test_constructor_empty_sequence(self): 2993 self._check_empty_sequence([]) 2994 self._check_empty_sequence(()) 2995 self._check_empty_sequence(set()) 2996 self._check_empty_sequence(iter("")) 2997 2998 def test_constructor_heterogenous_futures(self): 2999 fut1 = self.one_loop.create_future() 3000 fut2 = self.other_loop.create_future() 3001 with self.assertRaises(ValueError): 3002 asyncio.gather(fut1, fut2) 3003 with self.assertRaises(ValueError): 3004 asyncio.gather(fut1, loop=self.other_loop) 3005 3006 def test_constructor_homogenous_futures(self): 3007 children = [self.other_loop.create_future() for i in range(3)] 3008 fut = asyncio.gather(*children) 3009 self.assertIs(fut._loop, self.other_loop) 3010 self._run_loop(self.other_loop) 3011 self.assertFalse(fut.done()) 3012 fut = asyncio.gather(*children, loop=self.other_loop) 3013 self.assertIs(fut._loop, self.other_loop) 3014 self._run_loop(self.other_loop) 3015 self.assertFalse(fut.done()) 3016 3017 def test_one_cancellation(self): 3018 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3019 fut = asyncio.gather(a, b, c, d, e) 3020 cb = test_utils.MockCallback() 3021 fut.add_done_callback(cb) 3022 a.set_result(1) 3023 b.cancel() 3024 self._run_loop(self.one_loop) 3025 self.assertTrue(fut.done()) 3026 cb.assert_called_once_with(fut) 3027 self.assertFalse(fut.cancelled()) 3028 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3029 # Does nothing 3030 c.set_result(3) 3031 d.cancel() 3032 e.set_exception(RuntimeError()) 3033 e.exception() 3034 3035 def test_result_exception_one_cancellation(self): 3036 a, b, c, d, e, f = [self.one_loop.create_future() 3037 for i in range(6)] 3038 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3039 cb = test_utils.MockCallback() 3040 fut.add_done_callback(cb) 3041 a.set_result(1) 3042 zde = ZeroDivisionError() 3043 b.set_exception(zde) 3044 c.cancel() 3045 self._run_loop(self.one_loop) 3046 self.assertFalse(fut.done()) 3047 d.set_result(3) 3048 e.cancel() 3049 rte = RuntimeError() 3050 f.set_exception(rte) 3051 res = self.one_loop.run_until_complete(fut) 3052 self.assertIsInstance(res[2], asyncio.CancelledError) 3053 self.assertIsInstance(res[4], asyncio.CancelledError) 3054 res[2] = res[4] = None 3055 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3056 cb.assert_called_once_with(fut) 3057 3058 3059class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3060 3061 def setUp(self): 3062 super().setUp() 3063 asyncio.set_event_loop(self.one_loop) 3064 3065 def wrap_futures(self, *futures): 3066 coros = [] 3067 for fut in futures: 3068 async def coro(fut=fut): 3069 return await fut 3070 coros.append(coro()) 3071 return coros 3072 3073 def test_constructor_loop_selection(self): 3074 async def coro(): 3075 return 'abc' 3076 gen1 = coro() 3077 gen2 = coro() 3078 fut = asyncio.gather(gen1, gen2) 3079 self.assertIs(fut._loop, self.one_loop) 3080 self.one_loop.run_until_complete(fut) 3081 3082 self.set_event_loop(self.other_loop, cleanup=False) 3083 gen3 = coro() 3084 gen4 = coro() 3085 fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) 3086 self.assertIs(fut2._loop, self.other_loop) 3087 self.other_loop.run_until_complete(fut2) 3088 3089 def test_duplicate_coroutines(self): 3090 with self.assertWarns(DeprecationWarning): 3091 @asyncio.coroutine 3092 def coro(s): 3093 return s 3094 c = coro('abc') 3095 fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) 3096 self._run_loop(self.one_loop) 3097 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3098 3099 def test_cancellation_broadcast(self): 3100 # Cancelling outer() cancels all children. 3101 proof = 0 3102 waiter = self.one_loop.create_future() 3103 3104 async def inner(): 3105 nonlocal proof 3106 await waiter 3107 proof += 1 3108 3109 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3110 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3111 gatherer = None 3112 3113 async def outer(): 3114 nonlocal proof, gatherer 3115 gatherer = asyncio.gather(child1, child2, loop=self.one_loop) 3116 await gatherer 3117 proof += 100 3118 3119 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3120 test_utils.run_briefly(self.one_loop) 3121 self.assertTrue(f.cancel()) 3122 with self.assertRaises(asyncio.CancelledError): 3123 self.one_loop.run_until_complete(f) 3124 self.assertFalse(gatherer.cancel()) 3125 self.assertTrue(waiter.cancelled()) 3126 self.assertTrue(child1.cancelled()) 3127 self.assertTrue(child2.cancelled()) 3128 test_utils.run_briefly(self.one_loop) 3129 self.assertEqual(proof, 0) 3130 3131 def test_exception_marking(self): 3132 # Test for the first line marked "Mark exception retrieved." 3133 3134 async def inner(f): 3135 await f 3136 raise RuntimeError('should not be ignored') 3137 3138 a = self.one_loop.create_future() 3139 b = self.one_loop.create_future() 3140 3141 async def outer(): 3142 await asyncio.gather(inner(a), inner(b), loop=self.one_loop) 3143 3144 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3145 test_utils.run_briefly(self.one_loop) 3146 a.set_result(None) 3147 test_utils.run_briefly(self.one_loop) 3148 b.set_result(None) 3149 test_utils.run_briefly(self.one_loop) 3150 self.assertIsInstance(f.exception(), RuntimeError) 3151 3152 3153class RunCoroutineThreadsafeTests(test_utils.TestCase): 3154 """Test case for asyncio.run_coroutine_threadsafe.""" 3155 3156 def setUp(self): 3157 super().setUp() 3158 self.loop = asyncio.new_event_loop() 3159 self.set_event_loop(self.loop) # Will cleanup properly 3160 3161 async def add(self, a, b, fail=False, cancel=False): 3162 """Wait 0.05 second and return a + b.""" 3163 await asyncio.sleep(0.05) 3164 if fail: 3165 raise RuntimeError("Fail!") 3166 if cancel: 3167 asyncio.current_task(self.loop).cancel() 3168 await asyncio.sleep(0) 3169 return a + b 3170 3171 def target(self, fail=False, cancel=False, timeout=None, 3172 advance_coro=False): 3173 """Run add coroutine in the event loop.""" 3174 coro = self.add(1, 2, fail=fail, cancel=cancel) 3175 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3176 if advance_coro: 3177 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3178 # otherwise it spills errors and breaks **other** unittests, since 3179 # 'target' is interacting with threads. 3180 3181 # With this call, `coro` will be advanced, so that 3182 # CoroWrapper.__del__ won't do anything when asyncio tests run 3183 # in debug mode. 3184 self.loop.call_soon_threadsafe(coro.send, None) 3185 try: 3186 return future.result(timeout) 3187 finally: 3188 future.done() or future.cancel() 3189 3190 def test_run_coroutine_threadsafe(self): 3191 """Test coroutine submission from a thread to an event loop.""" 3192 future = self.loop.run_in_executor(None, self.target) 3193 result = self.loop.run_until_complete(future) 3194 self.assertEqual(result, 3) 3195 3196 def test_run_coroutine_threadsafe_with_exception(self): 3197 """Test coroutine submission from a thread to an event loop 3198 when an exception is raised.""" 3199 future = self.loop.run_in_executor(None, self.target, True) 3200 with self.assertRaises(RuntimeError) as exc_context: 3201 self.loop.run_until_complete(future) 3202 self.assertIn("Fail!", exc_context.exception.args) 3203 3204 def test_run_coroutine_threadsafe_with_timeout(self): 3205 """Test coroutine submission from a thread to an event loop 3206 when a timeout is raised.""" 3207 callback = lambda: self.target(timeout=0) 3208 future = self.loop.run_in_executor(None, callback) 3209 with self.assertRaises(asyncio.TimeoutError): 3210 self.loop.run_until_complete(future) 3211 test_utils.run_briefly(self.loop) 3212 # Check that there's no pending task (add has been cancelled) 3213 for task in asyncio.all_tasks(self.loop): 3214 self.assertTrue(task.done()) 3215 3216 def test_run_coroutine_threadsafe_task_cancelled(self): 3217 """Test coroutine submission from a tread to an event loop 3218 when the task is cancelled.""" 3219 callback = lambda: self.target(cancel=True) 3220 future = self.loop.run_in_executor(None, callback) 3221 with self.assertRaises(asyncio.CancelledError): 3222 self.loop.run_until_complete(future) 3223 3224 def test_run_coroutine_threadsafe_task_factory_exception(self): 3225 """Test coroutine submission from a tread to an event loop 3226 when the task factory raise an exception.""" 3227 3228 def task_factory(loop, coro): 3229 raise NameError 3230 3231 run = self.loop.run_in_executor( 3232 None, lambda: self.target(advance_coro=True)) 3233 3234 # Set exception handler 3235 callback = test_utils.MockCallback() 3236 self.loop.set_exception_handler(callback) 3237 3238 # Set corrupted task factory 3239 self.loop.set_task_factory(task_factory) 3240 3241 # Run event loop 3242 with self.assertRaises(NameError) as exc_context: 3243 self.loop.run_until_complete(run) 3244 3245 # Check exceptions 3246 self.assertEqual(len(callback.call_args_list), 1) 3247 (loop, context), kwargs = callback.call_args 3248 self.assertEqual(context['exception'], exc_context.exception) 3249 3250 3251class SleepTests(test_utils.TestCase): 3252 def setUp(self): 3253 super().setUp() 3254 self.loop = asyncio.new_event_loop() 3255 self.set_event_loop(self.loop) 3256 3257 def tearDown(self): 3258 self.loop.close() 3259 self.loop = None 3260 super().tearDown() 3261 3262 def test_sleep_zero(self): 3263 result = 0 3264 3265 def inc_result(num): 3266 nonlocal result 3267 result += num 3268 3269 async def coro(): 3270 self.loop.call_soon(inc_result, 1) 3271 self.assertEqual(result, 0) 3272 num = await asyncio.sleep(0, result=10) 3273 self.assertEqual(result, 1) # inc'ed by call_soon 3274 inc_result(num) # num should be 11 3275 3276 self.loop.run_until_complete(coro()) 3277 self.assertEqual(result, 11) 3278 3279 def test_loop_argument_is_deprecated(self): 3280 # Remove test when loop argument is removed in Python 3.10 3281 with self.assertWarns(DeprecationWarning): 3282 self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop)) 3283 3284 3285class WaitTests(test_utils.TestCase): 3286 def setUp(self): 3287 super().setUp() 3288 self.loop = asyncio.new_event_loop() 3289 self.set_event_loop(self.loop) 3290 3291 def tearDown(self): 3292 self.loop.close() 3293 self.loop = None 3294 super().tearDown() 3295 3296 def test_loop_argument_is_deprecated_in_wait(self): 3297 # Remove test when loop argument is removed in Python 3.10 3298 with self.assertWarns(DeprecationWarning): 3299 self.loop.run_until_complete( 3300 asyncio.wait([coroutine_function()], loop=self.loop)) 3301 3302 def test_loop_argument_is_deprecated_in_wait_for(self): 3303 # Remove test when loop argument is removed in Python 3.10 3304 with self.assertWarns(DeprecationWarning): 3305 self.loop.run_until_complete( 3306 asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop)) 3307 3308 3309class CompatibilityTests(test_utils.TestCase): 3310 # Tests for checking a bridge between old-styled coroutines 3311 # and async/await syntax 3312 3313 def setUp(self): 3314 super().setUp() 3315 self.loop = asyncio.new_event_loop() 3316 self.set_event_loop(self.loop) 3317 3318 def tearDown(self): 3319 self.loop.close() 3320 self.loop = None 3321 super().tearDown() 3322 3323 def test_yield_from_awaitable(self): 3324 3325 with self.assertWarns(DeprecationWarning): 3326 @asyncio.coroutine 3327 def coro(): 3328 yield from asyncio.sleep(0) 3329 return 'ok' 3330 3331 result = self.loop.run_until_complete(coro()) 3332 self.assertEqual('ok', result) 3333 3334 def test_await_old_style_coro(self): 3335 3336 with self.assertWarns(DeprecationWarning): 3337 @asyncio.coroutine 3338 def coro1(): 3339 return 'ok1' 3340 3341 with self.assertWarns(DeprecationWarning): 3342 @asyncio.coroutine 3343 def coro2(): 3344 yield from asyncio.sleep(0) 3345 return 'ok2' 3346 3347 async def inner(): 3348 return await asyncio.gather(coro1(), coro2(), loop=self.loop) 3349 3350 result = self.loop.run_until_complete(inner()) 3351 self.assertEqual(['ok1', 'ok2'], result) 3352 3353 def test_debug_mode_interop(self): 3354 # https://bugs.python.org/issue32636 3355 code = textwrap.dedent(""" 3356 import asyncio 3357 3358 async def native_coro(): 3359 pass 3360 3361 @asyncio.coroutine 3362 def old_style_coro(): 3363 yield from native_coro() 3364 3365 asyncio.run(old_style_coro()) 3366 """) 3367 3368 assert_python_ok("-Wignore::DeprecationWarning", "-c", code, 3369 PYTHONASYNCIODEBUG="1") 3370 3371 3372if __name__ == '__main__': 3373 unittest.main() 3374