1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import types 14import unittest 15import weakref 16from unittest import mock 17 18import asyncio 19from asyncio import coroutines 20from asyncio import futures 21from asyncio import tasks 22from test.test_asyncio import utils as test_utils 23from test import support 24from test.support.script_helper import assert_python_ok 25 26 27@asyncio.coroutine 28def coroutine_function(): 29 pass 30 31 32@contextlib.contextmanager 33def set_coroutine_debug(enabled): 34 coroutines = asyncio.coroutines 35 36 old_debug = coroutines._DEBUG 37 try: 38 coroutines._DEBUG = enabled 39 yield 40 finally: 41 coroutines._DEBUG = old_debug 42 43 44 45def format_coroutine(qualname, state, src, source_traceback, generator=False): 46 if generator: 47 state = '%s' % state 48 else: 49 state = '%s, defined' % state 50 if source_traceback is not None: 51 frame = source_traceback[-1] 52 return ('coro=<%s() %s at %s> created at %s:%s' 53 % (qualname, state, src, frame[0], frame[1])) 54 else: 55 return 'coro=<%s() %s at %s>' % (qualname, state, src) 56 57 58class Dummy: 59 60 def __repr__(self): 61 return '<Dummy>' 62 63 def __call__(self, *args): 64 pass 65 66 67class CoroLikeObject: 68 def send(self, v): 69 raise StopIteration(42) 70 71 def throw(self, *exc): 72 pass 73 74 def close(self): 75 pass 76 77 def __await__(self): 78 return self 79 80 81class BaseTaskTests: 82 83 Task = None 84 Future = None 85 86 def new_task(self, loop, coro): 87 return self.__class__.Task(coro, loop=loop) 88 89 def new_future(self, loop): 90 return self.__class__.Future(loop=loop) 91 92 def setUp(self): 93 super().setUp() 94 self.loop = self.new_test_loop() 95 self.loop.set_task_factory(self.new_task) 96 self.loop.create_future = lambda: self.new_future(self.loop) 97 98 def test_task_del_collect(self): 99 class Evil: 100 def __del__(self): 101 gc.collect() 102 103 @asyncio.coroutine 104 def run(): 105 return Evil() 106 107 self.loop.run_until_complete( 108 asyncio.gather(*[ 109 self.new_task(self.loop, run()) for _ in range(100) 110 ], loop=self.loop)) 111 112 def test_other_loop_future(self): 113 other_loop = asyncio.new_event_loop() 114 fut = self.new_future(other_loop) 115 116 async def run(fut): 117 await fut 118 119 try: 120 with self.assertRaisesRegex(RuntimeError, 121 r'Task .* got Future .* attached'): 122 self.loop.run_until_complete(run(fut)) 123 finally: 124 other_loop.close() 125 126 def test_task_awaits_on_itself(self): 127 128 async def test(): 129 await task 130 131 task = asyncio.ensure_future(test(), loop=self.loop) 132 133 with self.assertRaisesRegex(RuntimeError, 134 'Task cannot await on itself'): 135 self.loop.run_until_complete(task) 136 137 def test_task_class(self): 138 @asyncio.coroutine 139 def notmuch(): 140 return 'ok' 141 t = self.new_task(self.loop, notmuch()) 142 self.loop.run_until_complete(t) 143 self.assertTrue(t.done()) 144 self.assertEqual(t.result(), 'ok') 145 self.assertIs(t._loop, self.loop) 146 self.assertIs(t.get_loop(), self.loop) 147 148 loop = asyncio.new_event_loop() 149 self.set_event_loop(loop) 150 t = self.new_task(loop, notmuch()) 151 self.assertIs(t._loop, loop) 152 loop.run_until_complete(t) 153 loop.close() 154 155 def test_ensure_future_coroutine(self): 156 @asyncio.coroutine 157 def notmuch(): 158 return 'ok' 159 t = asyncio.ensure_future(notmuch(), loop=self.loop) 160 self.loop.run_until_complete(t) 161 self.assertTrue(t.done()) 162 self.assertEqual(t.result(), 'ok') 163 self.assertIs(t._loop, self.loop) 164 165 loop = asyncio.new_event_loop() 166 self.set_event_loop(loop) 167 t = asyncio.ensure_future(notmuch(), loop=loop) 168 self.assertIs(t._loop, loop) 169 loop.run_until_complete(t) 170 loop.close() 171 172 def test_ensure_future_future(self): 173 f_orig = self.new_future(self.loop) 174 f_orig.set_result('ko') 175 176 f = asyncio.ensure_future(f_orig) 177 self.loop.run_until_complete(f) 178 self.assertTrue(f.done()) 179 self.assertEqual(f.result(), 'ko') 180 self.assertIs(f, f_orig) 181 182 loop = asyncio.new_event_loop() 183 self.set_event_loop(loop) 184 185 with self.assertRaises(ValueError): 186 f = asyncio.ensure_future(f_orig, loop=loop) 187 188 loop.close() 189 190 f = asyncio.ensure_future(f_orig, loop=self.loop) 191 self.assertIs(f, f_orig) 192 193 def test_ensure_future_task(self): 194 @asyncio.coroutine 195 def notmuch(): 196 return 'ok' 197 t_orig = self.new_task(self.loop, notmuch()) 198 t = asyncio.ensure_future(t_orig) 199 self.loop.run_until_complete(t) 200 self.assertTrue(t.done()) 201 self.assertEqual(t.result(), 'ok') 202 self.assertIs(t, t_orig) 203 204 loop = asyncio.new_event_loop() 205 self.set_event_loop(loop) 206 207 with self.assertRaises(ValueError): 208 t = asyncio.ensure_future(t_orig, loop=loop) 209 210 loop.close() 211 212 t = asyncio.ensure_future(t_orig, loop=self.loop) 213 self.assertIs(t, t_orig) 214 215 def test_ensure_future_awaitable(self): 216 class Aw: 217 def __init__(self, coro): 218 self.coro = coro 219 def __await__(self): 220 return (yield from self.coro) 221 222 @asyncio.coroutine 223 def coro(): 224 return 'ok' 225 226 loop = asyncio.new_event_loop() 227 self.set_event_loop(loop) 228 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 229 loop.run_until_complete(fut) 230 assert fut.result() == 'ok' 231 232 def test_ensure_future_neither(self): 233 with self.assertRaises(TypeError): 234 asyncio.ensure_future('ok') 235 236 def test_get_stack(self): 237 T = None 238 239 async def foo(): 240 await bar() 241 242 async def bar(): 243 # test get_stack() 244 f = T.get_stack(limit=1) 245 try: 246 self.assertEqual(f[0].f_code.co_name, 'foo') 247 finally: 248 f = None 249 250 # test print_stack() 251 file = io.StringIO() 252 T.print_stack(limit=1, file=file) 253 file.seek(0) 254 tb = file.read() 255 self.assertRegex(tb, r'foo\(\) running') 256 257 async def runner(): 258 nonlocal T 259 T = asyncio.ensure_future(foo(), loop=self.loop) 260 await T 261 262 self.loop.run_until_complete(runner()) 263 264 def test_task_repr(self): 265 self.loop.set_debug(False) 266 267 @asyncio.coroutine 268 def notmuch(): 269 yield from [] 270 return 'abc' 271 272 # test coroutine function 273 self.assertEqual(notmuch.__name__, 'notmuch') 274 self.assertRegex(notmuch.__qualname__, 275 r'\w+.test_task_repr.<locals>.notmuch') 276 self.assertEqual(notmuch.__module__, __name__) 277 278 filename, lineno = test_utils.get_function_source(notmuch) 279 src = "%s:%s" % (filename, lineno) 280 281 # test coroutine object 282 gen = notmuch() 283 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 284 self.assertEqual(gen.__name__, 'notmuch') 285 self.assertEqual(gen.__qualname__, coro_qualname) 286 287 # test pending Task 288 t = self.new_task(self.loop, gen) 289 t.add_done_callback(Dummy()) 290 291 coro = format_coroutine(coro_qualname, 'running', src, 292 t._source_traceback, generator=True) 293 self.assertEqual(repr(t), 294 '<Task pending %s cb=[<Dummy>()]>' % coro) 295 296 # test cancelling Task 297 t.cancel() # Does not take immediate effect! 298 self.assertEqual(repr(t), 299 '<Task cancelling %s cb=[<Dummy>()]>' % coro) 300 301 # test cancelled Task 302 self.assertRaises(asyncio.CancelledError, 303 self.loop.run_until_complete, t) 304 coro = format_coroutine(coro_qualname, 'done', src, 305 t._source_traceback) 306 self.assertEqual(repr(t), 307 '<Task cancelled %s>' % coro) 308 309 # test finished Task 310 t = self.new_task(self.loop, notmuch()) 311 self.loop.run_until_complete(t) 312 coro = format_coroutine(coro_qualname, 'done', src, 313 t._source_traceback) 314 self.assertEqual(repr(t), 315 "<Task finished %s result='abc'>" % coro) 316 317 def test_task_repr_coro_decorator(self): 318 self.loop.set_debug(False) 319 320 @asyncio.coroutine 321 def notmuch(): 322 # notmuch() function doesn't use yield from: it will be wrapped by 323 # @coroutine decorator 324 return 123 325 326 # test coroutine function 327 self.assertEqual(notmuch.__name__, 'notmuch') 328 self.assertRegex(notmuch.__qualname__, 329 r'\w+.test_task_repr_coro_decorator' 330 r'\.<locals>\.notmuch') 331 self.assertEqual(notmuch.__module__, __name__) 332 333 # test coroutine object 334 gen = notmuch() 335 # On Python >= 3.5, generators now inherit the name of the 336 # function, as expected, and have a qualified name (__qualname__ 337 # attribute). 338 coro_name = 'notmuch' 339 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 340 '.<locals>.notmuch') 341 self.assertEqual(gen.__name__, coro_name) 342 self.assertEqual(gen.__qualname__, coro_qualname) 343 344 # test repr(CoroWrapper) 345 if coroutines._DEBUG: 346 # format the coroutine object 347 if coroutines._DEBUG: 348 filename, lineno = test_utils.get_function_source(notmuch) 349 frame = gen._source_traceback[-1] 350 coro = ('%s() running, defined at %s:%s, created at %s:%s' 351 % (coro_qualname, filename, lineno, 352 frame[0], frame[1])) 353 else: 354 code = gen.gi_code 355 coro = ('%s() running at %s:%s' 356 % (coro_qualname, code.co_filename, 357 code.co_firstlineno)) 358 359 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 360 361 # test pending Task 362 t = self.new_task(self.loop, gen) 363 t.add_done_callback(Dummy()) 364 365 # format the coroutine object 366 if coroutines._DEBUG: 367 src = '%s:%s' % test_utils.get_function_source(notmuch) 368 else: 369 code = gen.gi_code 370 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 371 coro = format_coroutine(coro_qualname, 'running', src, 372 t._source_traceback, 373 generator=not coroutines._DEBUG) 374 self.assertEqual(repr(t), 375 '<Task pending %s cb=[<Dummy>()]>' % coro) 376 self.loop.run_until_complete(t) 377 378 def test_task_repr_wait_for(self): 379 self.loop.set_debug(False) 380 381 async def wait_for(fut): 382 return await fut 383 384 fut = self.new_future(self.loop) 385 task = self.new_task(self.loop, wait_for(fut)) 386 test_utils.run_briefly(self.loop) 387 self.assertRegex(repr(task), 388 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 389 390 fut.set_result(None) 391 self.loop.run_until_complete(task) 392 393 def test_task_repr_partial_corowrapper(self): 394 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 395 # coroutine is a partial function 396 with set_coroutine_debug(True): 397 self.loop.set_debug(True) 398 399 async def func(x, y): 400 await asyncio.sleep(0) 401 402 partial_func = asyncio.coroutine(functools.partial(func, 1)) 403 task = self.loop.create_task(partial_func(2)) 404 405 # make warnings quiet 406 task._log_destroy_pending = False 407 self.addCleanup(task._coro.close) 408 409 coro_repr = repr(task._coro) 410 expected = ( 411 r'<CoroWrapper \w+.test_task_repr_partial_corowrapper' 412 r'\.<locals>\.func\(1\)\(\) running, ' 413 ) 414 self.assertRegex(coro_repr, expected) 415 416 def test_task_basics(self): 417 418 async def outer(): 419 a = await inner1() 420 b = await inner2() 421 return a+b 422 423 async def inner1(): 424 return 42 425 426 async def inner2(): 427 return 1000 428 429 t = outer() 430 self.assertEqual(self.loop.run_until_complete(t), 1042) 431 432 def test_cancel(self): 433 434 def gen(): 435 when = yield 436 self.assertAlmostEqual(10.0, when) 437 yield 0 438 439 loop = self.new_test_loop(gen) 440 441 async def task(): 442 await asyncio.sleep(10.0, loop=loop) 443 return 12 444 445 t = self.new_task(loop, task()) 446 loop.call_soon(t.cancel) 447 with self.assertRaises(asyncio.CancelledError): 448 loop.run_until_complete(t) 449 self.assertTrue(t.done()) 450 self.assertTrue(t.cancelled()) 451 self.assertFalse(t.cancel()) 452 453 def test_cancel_yield(self): 454 @asyncio.coroutine 455 def task(): 456 yield 457 yield 458 return 12 459 460 t = self.new_task(self.loop, task()) 461 test_utils.run_briefly(self.loop) # start coro 462 t.cancel() 463 self.assertRaises( 464 asyncio.CancelledError, self.loop.run_until_complete, t) 465 self.assertTrue(t.done()) 466 self.assertTrue(t.cancelled()) 467 self.assertFalse(t.cancel()) 468 469 def test_cancel_inner_future(self): 470 f = self.new_future(self.loop) 471 472 async def task(): 473 await f 474 return 12 475 476 t = self.new_task(self.loop, task()) 477 test_utils.run_briefly(self.loop) # start task 478 f.cancel() 479 with self.assertRaises(asyncio.CancelledError): 480 self.loop.run_until_complete(t) 481 self.assertTrue(f.cancelled()) 482 self.assertTrue(t.cancelled()) 483 484 def test_cancel_both_task_and_inner_future(self): 485 f = self.new_future(self.loop) 486 487 async def task(): 488 await f 489 return 12 490 491 t = self.new_task(self.loop, task()) 492 test_utils.run_briefly(self.loop) 493 494 f.cancel() 495 t.cancel() 496 497 with self.assertRaises(asyncio.CancelledError): 498 self.loop.run_until_complete(t) 499 500 self.assertTrue(t.done()) 501 self.assertTrue(f.cancelled()) 502 self.assertTrue(t.cancelled()) 503 504 def test_cancel_task_catching(self): 505 fut1 = self.new_future(self.loop) 506 fut2 = self.new_future(self.loop) 507 508 async def task(): 509 await fut1 510 try: 511 await fut2 512 except asyncio.CancelledError: 513 return 42 514 515 t = self.new_task(self.loop, task()) 516 test_utils.run_briefly(self.loop) 517 self.assertIs(t._fut_waiter, fut1) # White-box test. 518 fut1.set_result(None) 519 test_utils.run_briefly(self.loop) 520 self.assertIs(t._fut_waiter, fut2) # White-box test. 521 t.cancel() 522 self.assertTrue(fut2.cancelled()) 523 res = self.loop.run_until_complete(t) 524 self.assertEqual(res, 42) 525 self.assertFalse(t.cancelled()) 526 527 def test_cancel_task_ignoring(self): 528 fut1 = self.new_future(self.loop) 529 fut2 = self.new_future(self.loop) 530 fut3 = self.new_future(self.loop) 531 532 async def task(): 533 await fut1 534 try: 535 await fut2 536 except asyncio.CancelledError: 537 pass 538 res = await fut3 539 return res 540 541 t = self.new_task(self.loop, task()) 542 test_utils.run_briefly(self.loop) 543 self.assertIs(t._fut_waiter, fut1) # White-box test. 544 fut1.set_result(None) 545 test_utils.run_briefly(self.loop) 546 self.assertIs(t._fut_waiter, fut2) # White-box test. 547 t.cancel() 548 self.assertTrue(fut2.cancelled()) 549 test_utils.run_briefly(self.loop) 550 self.assertIs(t._fut_waiter, fut3) # White-box test. 551 fut3.set_result(42) 552 res = self.loop.run_until_complete(t) 553 self.assertEqual(res, 42) 554 self.assertFalse(fut3.cancelled()) 555 self.assertFalse(t.cancelled()) 556 557 def test_cancel_current_task(self): 558 loop = asyncio.new_event_loop() 559 self.set_event_loop(loop) 560 561 async def task(): 562 t.cancel() 563 self.assertTrue(t._must_cancel) # White-box test. 564 # The sleep should be cancelled immediately. 565 await asyncio.sleep(100, loop=loop) 566 return 12 567 568 t = self.new_task(loop, task()) 569 self.assertRaises( 570 asyncio.CancelledError, loop.run_until_complete, t) 571 self.assertTrue(t.done()) 572 self.assertFalse(t._must_cancel) # White-box test. 573 self.assertFalse(t.cancel()) 574 575 def test_cancel_at_end(self): 576 """coroutine end right after task is cancelled""" 577 loop = asyncio.new_event_loop() 578 self.set_event_loop(loop) 579 580 @asyncio.coroutine 581 def task(): 582 t.cancel() 583 self.assertTrue(t._must_cancel) # White-box test. 584 return 12 585 586 t = self.new_task(loop, task()) 587 self.assertRaises( 588 asyncio.CancelledError, loop.run_until_complete, t) 589 self.assertTrue(t.done()) 590 self.assertFalse(t._must_cancel) # White-box test. 591 self.assertFalse(t.cancel()) 592 593 def test_cancel_awaited_task(self): 594 # This tests for a relatively rare condition when 595 # a task cancellation is requested for a task which is not 596 # currently blocked, such as a task cancelling itself. 597 # In this situation we must ensure that whatever next future 598 # or task the cancelled task blocks on is cancelled correctly 599 # as well. See also bpo-34872. 600 loop = asyncio.new_event_loop() 601 self.addCleanup(lambda: loop.close()) 602 603 task = nested_task = None 604 fut = self.new_future(loop) 605 606 async def nested(): 607 await fut 608 609 async def coro(): 610 nonlocal nested_task 611 # Create a sub-task and wait for it to run. 612 nested_task = self.new_task(loop, nested()) 613 await asyncio.sleep(0) 614 615 # Request the current task to be cancelled. 616 task.cancel() 617 # Block on the nested task, which should be immediately 618 # cancelled. 619 await nested_task 620 621 task = self.new_task(loop, coro()) 622 with self.assertRaises(asyncio.CancelledError): 623 loop.run_until_complete(task) 624 625 self.assertTrue(task.cancelled()) 626 self.assertTrue(nested_task.cancelled()) 627 self.assertTrue(fut.cancelled()) 628 629 def test_stop_while_run_in_complete(self): 630 631 def gen(): 632 when = yield 633 self.assertAlmostEqual(0.1, when) 634 when = yield 0.1 635 self.assertAlmostEqual(0.2, when) 636 when = yield 0.1 637 self.assertAlmostEqual(0.3, when) 638 yield 0.1 639 640 loop = self.new_test_loop(gen) 641 642 x = 0 643 644 async def task(): 645 nonlocal x 646 while x < 10: 647 await asyncio.sleep(0.1, loop=loop) 648 x += 1 649 if x == 2: 650 loop.stop() 651 652 t = self.new_task(loop, task()) 653 with self.assertRaises(RuntimeError) as cm: 654 loop.run_until_complete(t) 655 self.assertEqual(str(cm.exception), 656 'Event loop stopped before Future completed.') 657 self.assertFalse(t.done()) 658 self.assertEqual(x, 2) 659 self.assertAlmostEqual(0.3, loop.time()) 660 661 t.cancel() 662 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 663 664 def test_log_traceback(self): 665 async def coro(): 666 pass 667 668 task = self.new_task(self.loop, coro()) 669 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 670 task._log_traceback = True 671 self.loop.run_until_complete(task) 672 673 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 674 def gen(): 675 when = yield 676 self.assertAlmostEqual(0, when) 677 678 loop = self.new_test_loop(gen) 679 680 fut = self.new_future(loop) 681 fut.set_result('done') 682 683 ret = loop.run_until_complete(asyncio.wait_for(fut, 0, loop=loop)) 684 685 self.assertEqual(ret, 'done') 686 self.assertTrue(fut.done()) 687 self.assertAlmostEqual(0, loop.time()) 688 689 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 690 def gen(): 691 when = yield 692 self.assertAlmostEqual(0, when) 693 694 loop = self.new_test_loop(gen) 695 696 foo_started = False 697 698 @asyncio.coroutine 699 def foo(): 700 nonlocal foo_started 701 foo_started = True 702 703 with self.assertRaises(asyncio.TimeoutError): 704 loop.run_until_complete(asyncio.wait_for(foo(), 0, loop=loop)) 705 706 self.assertAlmostEqual(0, loop.time()) 707 self.assertEqual(foo_started, False) 708 709 def test_wait_for_timeout_less_then_0_or_0(self): 710 def gen(): 711 when = yield 712 self.assertAlmostEqual(0.2, when) 713 when = yield 0 714 self.assertAlmostEqual(0, when) 715 716 for timeout in [0, -1]: 717 with self.subTest(timeout=timeout): 718 loop = self.new_test_loop(gen) 719 720 foo_running = None 721 722 async def foo(): 723 nonlocal foo_running 724 foo_running = True 725 try: 726 await asyncio.sleep(0.2, loop=loop) 727 finally: 728 foo_running = False 729 return 'done' 730 731 fut = self.new_task(loop, foo()) 732 733 with self.assertRaises(asyncio.TimeoutError): 734 loop.run_until_complete(asyncio.wait_for( 735 fut, timeout, loop=loop)) 736 self.assertTrue(fut.done()) 737 # it should have been cancelled due to the timeout 738 self.assertTrue(fut.cancelled()) 739 self.assertAlmostEqual(0, loop.time()) 740 self.assertEqual(foo_running, False) 741 742 def test_wait_for(self): 743 744 def gen(): 745 when = yield 746 self.assertAlmostEqual(0.2, when) 747 when = yield 0 748 self.assertAlmostEqual(0.1, when) 749 when = yield 0.1 750 751 loop = self.new_test_loop(gen) 752 753 foo_running = None 754 755 async def foo(): 756 nonlocal foo_running 757 foo_running = True 758 try: 759 await asyncio.sleep(0.2, loop=loop) 760 finally: 761 foo_running = False 762 return 'done' 763 764 fut = self.new_task(loop, foo()) 765 766 with self.assertRaises(asyncio.TimeoutError): 767 loop.run_until_complete(asyncio.wait_for(fut, 0.1, loop=loop)) 768 self.assertTrue(fut.done()) 769 # it should have been cancelled due to the timeout 770 self.assertTrue(fut.cancelled()) 771 self.assertAlmostEqual(0.1, loop.time()) 772 self.assertEqual(foo_running, False) 773 774 def test_wait_for_blocking(self): 775 loop = self.new_test_loop() 776 777 @asyncio.coroutine 778 def coro(): 779 return 'done' 780 781 res = loop.run_until_complete(asyncio.wait_for(coro(), 782 timeout=None, 783 loop=loop)) 784 self.assertEqual(res, 'done') 785 786 def test_wait_for_with_global_loop(self): 787 788 def gen(): 789 when = yield 790 self.assertAlmostEqual(0.2, when) 791 when = yield 0 792 self.assertAlmostEqual(0.01, when) 793 yield 0.01 794 795 loop = self.new_test_loop(gen) 796 797 async def foo(): 798 await asyncio.sleep(0.2, loop=loop) 799 return 'done' 800 801 asyncio.set_event_loop(loop) 802 try: 803 fut = self.new_task(loop, foo()) 804 with self.assertRaises(asyncio.TimeoutError): 805 loop.run_until_complete(asyncio.wait_for(fut, 0.01)) 806 finally: 807 asyncio.set_event_loop(None) 808 809 self.assertAlmostEqual(0.01, loop.time()) 810 self.assertTrue(fut.done()) 811 self.assertTrue(fut.cancelled()) 812 813 def test_wait_for_race_condition(self): 814 815 def gen(): 816 yield 0.1 817 yield 0.1 818 yield 0.1 819 820 loop = self.new_test_loop(gen) 821 822 fut = self.new_future(loop) 823 task = asyncio.wait_for(fut, timeout=0.2, loop=loop) 824 loop.call_later(0.1, fut.set_result, "ok") 825 res = loop.run_until_complete(task) 826 self.assertEqual(res, "ok") 827 828 def test_wait_for_waits_for_task_cancellation(self): 829 loop = asyncio.new_event_loop() 830 self.addCleanup(loop.close) 831 832 task_done = False 833 834 async def foo(): 835 async def inner(): 836 nonlocal task_done 837 try: 838 await asyncio.sleep(0.2, loop=loop) 839 finally: 840 task_done = True 841 842 inner_task = self.new_task(loop, inner()) 843 844 with self.assertRaises(asyncio.TimeoutError): 845 await asyncio.wait_for(inner_task, timeout=0.1, loop=loop) 846 847 self.assertTrue(task_done) 848 849 loop.run_until_complete(foo()) 850 851 def test_wait_for_self_cancellation(self): 852 loop = asyncio.new_event_loop() 853 self.addCleanup(loop.close) 854 855 async def foo(): 856 async def inner(): 857 try: 858 await asyncio.sleep(0.3, loop=loop) 859 except asyncio.CancelledError: 860 try: 861 await asyncio.sleep(0.3, loop=loop) 862 except asyncio.CancelledError: 863 await asyncio.sleep(0.3, loop=loop) 864 865 return 42 866 867 inner_task = self.new_task(loop, inner()) 868 869 wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop) 870 871 # Test that wait_for itself is properly cancellable 872 # even when the initial task holds up the initial cancellation. 873 task = self.new_task(loop, wait) 874 await asyncio.sleep(0.2, loop=loop) 875 task.cancel() 876 877 with self.assertRaises(asyncio.CancelledError): 878 await task 879 880 self.assertEqual(await inner_task, 42) 881 882 loop.run_until_complete(foo()) 883 884 def test_wait(self): 885 886 def gen(): 887 when = yield 888 self.assertAlmostEqual(0.1, when) 889 when = yield 0 890 self.assertAlmostEqual(0.15, when) 891 yield 0.15 892 893 loop = self.new_test_loop(gen) 894 895 a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) 896 b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) 897 898 async def foo(): 899 done, pending = await asyncio.wait([b, a], loop=loop) 900 self.assertEqual(done, set([a, b])) 901 self.assertEqual(pending, set()) 902 return 42 903 904 res = loop.run_until_complete(self.new_task(loop, foo())) 905 self.assertEqual(res, 42) 906 self.assertAlmostEqual(0.15, loop.time()) 907 908 # Doing it again should take no time and exercise a different path. 909 res = loop.run_until_complete(self.new_task(loop, foo())) 910 self.assertAlmostEqual(0.15, loop.time()) 911 self.assertEqual(res, 42) 912 913 def test_wait_with_global_loop(self): 914 915 def gen(): 916 when = yield 917 self.assertAlmostEqual(0.01, when) 918 when = yield 0 919 self.assertAlmostEqual(0.015, when) 920 yield 0.015 921 922 loop = self.new_test_loop(gen) 923 924 a = self.new_task(loop, asyncio.sleep(0.01, loop=loop)) 925 b = self.new_task(loop, asyncio.sleep(0.015, loop=loop)) 926 927 async def foo(): 928 done, pending = await asyncio.wait([b, a]) 929 self.assertEqual(done, set([a, b])) 930 self.assertEqual(pending, set()) 931 return 42 932 933 asyncio.set_event_loop(loop) 934 res = loop.run_until_complete( 935 self.new_task(loop, foo())) 936 937 self.assertEqual(res, 42) 938 939 def test_wait_duplicate_coroutines(self): 940 941 @asyncio.coroutine 942 def coro(s): 943 return s 944 c = coro('test') 945 946 task =self.new_task( 947 self.loop, 948 asyncio.wait([c, c, coro('spam')], loop=self.loop)) 949 950 done, pending = self.loop.run_until_complete(task) 951 952 self.assertFalse(pending) 953 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 954 955 def test_wait_errors(self): 956 self.assertRaises( 957 ValueError, self.loop.run_until_complete, 958 asyncio.wait(set(), loop=self.loop)) 959 960 # -1 is an invalid return_when value 961 sleep_coro = asyncio.sleep(10.0, loop=self.loop) 962 wait_coro = asyncio.wait([sleep_coro], return_when=-1, loop=self.loop) 963 self.assertRaises(ValueError, 964 self.loop.run_until_complete, wait_coro) 965 966 sleep_coro.close() 967 968 def test_wait_first_completed(self): 969 970 def gen(): 971 when = yield 972 self.assertAlmostEqual(10.0, when) 973 when = yield 0 974 self.assertAlmostEqual(0.1, when) 975 yield 0.1 976 977 loop = self.new_test_loop(gen) 978 979 a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) 980 b = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) 981 task = self.new_task( 982 loop, 983 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, 984 loop=loop)) 985 986 done, pending = loop.run_until_complete(task) 987 self.assertEqual({b}, done) 988 self.assertEqual({a}, pending) 989 self.assertFalse(a.done()) 990 self.assertTrue(b.done()) 991 self.assertIsNone(b.result()) 992 self.assertAlmostEqual(0.1, loop.time()) 993 994 # move forward to close generator 995 loop.advance_time(10) 996 loop.run_until_complete(asyncio.wait([a, b], loop=loop)) 997 998 def test_wait_really_done(self): 999 # there is possibility that some tasks in the pending list 1000 # became done but their callbacks haven't all been called yet 1001 1002 @asyncio.coroutine 1003 def coro1(): 1004 yield 1005 1006 @asyncio.coroutine 1007 def coro2(): 1008 yield 1009 yield 1010 1011 a = self.new_task(self.loop, coro1()) 1012 b = self.new_task(self.loop, coro2()) 1013 task = self.new_task( 1014 self.loop, 1015 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED, 1016 loop=self.loop)) 1017 1018 done, pending = self.loop.run_until_complete(task) 1019 self.assertEqual({a, b}, done) 1020 self.assertTrue(a.done()) 1021 self.assertIsNone(a.result()) 1022 self.assertTrue(b.done()) 1023 self.assertIsNone(b.result()) 1024 1025 def test_wait_first_exception(self): 1026 1027 def gen(): 1028 when = yield 1029 self.assertAlmostEqual(10.0, when) 1030 yield 0 1031 1032 loop = self.new_test_loop(gen) 1033 1034 # first_exception, task already has exception 1035 a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) 1036 1037 @asyncio.coroutine 1038 def exc(): 1039 raise ZeroDivisionError('err') 1040 1041 b = self.new_task(loop, exc()) 1042 task = self.new_task( 1043 loop, 1044 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, 1045 loop=loop)) 1046 1047 done, pending = loop.run_until_complete(task) 1048 self.assertEqual({b}, done) 1049 self.assertEqual({a}, pending) 1050 self.assertAlmostEqual(0, loop.time()) 1051 1052 # move forward to close generator 1053 loop.advance_time(10) 1054 loop.run_until_complete(asyncio.wait([a, b], loop=loop)) 1055 1056 def test_wait_first_exception_in_wait(self): 1057 1058 def gen(): 1059 when = yield 1060 self.assertAlmostEqual(10.0, when) 1061 when = yield 0 1062 self.assertAlmostEqual(0.01, when) 1063 yield 0.01 1064 1065 loop = self.new_test_loop(gen) 1066 1067 # first_exception, exception during waiting 1068 a = self.new_task(loop, asyncio.sleep(10.0, loop=loop)) 1069 1070 async def exc(): 1071 await asyncio.sleep(0.01, loop=loop) 1072 raise ZeroDivisionError('err') 1073 1074 b = self.new_task(loop, exc()) 1075 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION, 1076 loop=loop) 1077 1078 done, pending = loop.run_until_complete(task) 1079 self.assertEqual({b}, done) 1080 self.assertEqual({a}, pending) 1081 self.assertAlmostEqual(0.01, loop.time()) 1082 1083 # move forward to close generator 1084 loop.advance_time(10) 1085 loop.run_until_complete(asyncio.wait([a, b], loop=loop)) 1086 1087 def test_wait_with_exception(self): 1088 1089 def gen(): 1090 when = yield 1091 self.assertAlmostEqual(0.1, when) 1092 when = yield 0 1093 self.assertAlmostEqual(0.15, when) 1094 yield 0.15 1095 1096 loop = self.new_test_loop(gen) 1097 1098 a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) 1099 1100 @asyncio.coroutine 1101 def sleeper(): 1102 yield from asyncio.sleep(0.15, loop=loop) 1103 raise ZeroDivisionError('really') 1104 1105 b = self.new_task(loop, sleeper()) 1106 1107 async def foo(): 1108 done, pending = await asyncio.wait([b, a], loop=loop) 1109 self.assertEqual(len(done), 2) 1110 self.assertEqual(pending, set()) 1111 errors = set(f for f in done if f.exception() is not None) 1112 self.assertEqual(len(errors), 1) 1113 1114 loop.run_until_complete(self.new_task(loop, foo())) 1115 self.assertAlmostEqual(0.15, loop.time()) 1116 1117 loop.run_until_complete(self.new_task(loop, foo())) 1118 self.assertAlmostEqual(0.15, loop.time()) 1119 1120 def test_wait_with_timeout(self): 1121 1122 def gen(): 1123 when = yield 1124 self.assertAlmostEqual(0.1, when) 1125 when = yield 0 1126 self.assertAlmostEqual(0.15, when) 1127 when = yield 0 1128 self.assertAlmostEqual(0.11, when) 1129 yield 0.11 1130 1131 loop = self.new_test_loop(gen) 1132 1133 a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) 1134 b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) 1135 1136 async def foo(): 1137 done, pending = await asyncio.wait([b, a], timeout=0.11, 1138 loop=loop) 1139 self.assertEqual(done, set([a])) 1140 self.assertEqual(pending, set([b])) 1141 1142 loop.run_until_complete(self.new_task(loop, foo())) 1143 self.assertAlmostEqual(0.11, loop.time()) 1144 1145 # move forward to close generator 1146 loop.advance_time(10) 1147 loop.run_until_complete(asyncio.wait([a, b], loop=loop)) 1148 1149 def test_wait_concurrent_complete(self): 1150 1151 def gen(): 1152 when = yield 1153 self.assertAlmostEqual(0.1, when) 1154 when = yield 0 1155 self.assertAlmostEqual(0.15, when) 1156 when = yield 0 1157 self.assertAlmostEqual(0.1, when) 1158 yield 0.1 1159 1160 loop = self.new_test_loop(gen) 1161 1162 a = self.new_task(loop, asyncio.sleep(0.1, loop=loop)) 1163 b = self.new_task(loop, asyncio.sleep(0.15, loop=loop)) 1164 1165 done, pending = loop.run_until_complete( 1166 asyncio.wait([b, a], timeout=0.1, loop=loop)) 1167 1168 self.assertEqual(done, set([a])) 1169 self.assertEqual(pending, set([b])) 1170 self.assertAlmostEqual(0.1, loop.time()) 1171 1172 # move forward to close generator 1173 loop.advance_time(10) 1174 loop.run_until_complete(asyncio.wait([a, b], loop=loop)) 1175 1176 def test_as_completed(self): 1177 1178 def gen(): 1179 yield 0 1180 yield 0 1181 yield 0.01 1182 yield 0 1183 1184 loop = self.new_test_loop(gen) 1185 # disable "slow callback" warning 1186 loop.slow_callback_duration = 1.0 1187 completed = set() 1188 time_shifted = False 1189 1190 @asyncio.coroutine 1191 def sleeper(dt, x): 1192 nonlocal time_shifted 1193 yield from asyncio.sleep(dt, loop=loop) 1194 completed.add(x) 1195 if not time_shifted and 'a' in completed and 'b' in completed: 1196 time_shifted = True 1197 loop.advance_time(0.14) 1198 return x 1199 1200 a = sleeper(0.01, 'a') 1201 b = sleeper(0.01, 'b') 1202 c = sleeper(0.15, 'c') 1203 1204 @asyncio.coroutine 1205 def foo(): 1206 values = [] 1207 for f in asyncio.as_completed([b, c, a], loop=loop): 1208 values.append((yield from f)) 1209 return values 1210 1211 res = loop.run_until_complete(self.new_task(loop, foo())) 1212 self.assertAlmostEqual(0.15, loop.time()) 1213 self.assertTrue('a' in res[:2]) 1214 self.assertTrue('b' in res[:2]) 1215 self.assertEqual(res[2], 'c') 1216 1217 # Doing it again should take no time and exercise a different path. 1218 res = loop.run_until_complete(self.new_task(loop, foo())) 1219 self.assertAlmostEqual(0.15, loop.time()) 1220 1221 def test_as_completed_with_timeout(self): 1222 1223 def gen(): 1224 yield 1225 yield 0 1226 yield 0 1227 yield 0.1 1228 1229 loop = self.new_test_loop(gen) 1230 1231 a = loop.create_task(asyncio.sleep(0.1, 'a', loop=loop)) 1232 b = loop.create_task(asyncio.sleep(0.15, 'b', loop=loop)) 1233 1234 async def foo(): 1235 values = [] 1236 for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): 1237 if values: 1238 loop.advance_time(0.02) 1239 try: 1240 v = await f 1241 values.append((1, v)) 1242 except asyncio.TimeoutError as exc: 1243 values.append((2, exc)) 1244 return values 1245 1246 res = loop.run_until_complete(self.new_task(loop, foo())) 1247 self.assertEqual(len(res), 2, res) 1248 self.assertEqual(res[0], (1, 'a')) 1249 self.assertEqual(res[1][0], 2) 1250 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1251 self.assertAlmostEqual(0.12, loop.time()) 1252 1253 # move forward to close generator 1254 loop.advance_time(10) 1255 loop.run_until_complete(asyncio.wait([a, b], loop=loop)) 1256 1257 def test_as_completed_with_unused_timeout(self): 1258 1259 def gen(): 1260 yield 1261 yield 0 1262 yield 0.01 1263 1264 loop = self.new_test_loop(gen) 1265 1266 a = asyncio.sleep(0.01, 'a', loop=loop) 1267 1268 async def foo(): 1269 for f in asyncio.as_completed([a], timeout=1, loop=loop): 1270 v = await f 1271 self.assertEqual(v, 'a') 1272 1273 loop.run_until_complete(self.new_task(loop, foo())) 1274 1275 def test_as_completed_reverse_wait(self): 1276 1277 def gen(): 1278 yield 0 1279 yield 0.05 1280 yield 0 1281 1282 loop = self.new_test_loop(gen) 1283 1284 a = asyncio.sleep(0.05, 'a', loop=loop) 1285 b = asyncio.sleep(0.10, 'b', loop=loop) 1286 fs = {a, b} 1287 futs = list(asyncio.as_completed(fs, loop=loop)) 1288 self.assertEqual(len(futs), 2) 1289 1290 x = loop.run_until_complete(futs[1]) 1291 self.assertEqual(x, 'a') 1292 self.assertAlmostEqual(0.05, loop.time()) 1293 loop.advance_time(0.05) 1294 y = loop.run_until_complete(futs[0]) 1295 self.assertEqual(y, 'b') 1296 self.assertAlmostEqual(0.10, loop.time()) 1297 1298 def test_as_completed_concurrent(self): 1299 1300 def gen(): 1301 when = yield 1302 self.assertAlmostEqual(0.05, when) 1303 when = yield 0 1304 self.assertAlmostEqual(0.05, when) 1305 yield 0.05 1306 1307 loop = self.new_test_loop(gen) 1308 1309 a = asyncio.sleep(0.05, 'a', loop=loop) 1310 b = asyncio.sleep(0.05, 'b', loop=loop) 1311 fs = {a, b} 1312 futs = list(asyncio.as_completed(fs, loop=loop)) 1313 self.assertEqual(len(futs), 2) 1314 waiter = asyncio.wait(futs, loop=loop) 1315 done, pending = loop.run_until_complete(waiter) 1316 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1317 1318 def test_as_completed_duplicate_coroutines(self): 1319 1320 @asyncio.coroutine 1321 def coro(s): 1322 return s 1323 1324 @asyncio.coroutine 1325 def runner(): 1326 result = [] 1327 c = coro('ham') 1328 for f in asyncio.as_completed([c, c, coro('spam')], 1329 loop=self.loop): 1330 result.append((yield from f)) 1331 return result 1332 1333 fut = self.new_task(self.loop, runner()) 1334 self.loop.run_until_complete(fut) 1335 result = fut.result() 1336 self.assertEqual(set(result), {'ham', 'spam'}) 1337 self.assertEqual(len(result), 2) 1338 1339 def test_sleep(self): 1340 1341 def gen(): 1342 when = yield 1343 self.assertAlmostEqual(0.05, when) 1344 when = yield 0.05 1345 self.assertAlmostEqual(0.1, when) 1346 yield 0.05 1347 1348 loop = self.new_test_loop(gen) 1349 1350 @asyncio.coroutine 1351 def sleeper(dt, arg): 1352 yield from asyncio.sleep(dt/2, loop=loop) 1353 res = yield from asyncio.sleep(dt/2, arg, loop=loop) 1354 return res 1355 1356 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1357 loop.run_until_complete(t) 1358 self.assertTrue(t.done()) 1359 self.assertEqual(t.result(), 'yeah') 1360 self.assertAlmostEqual(0.1, loop.time()) 1361 1362 def test_sleep_cancel(self): 1363 1364 def gen(): 1365 when = yield 1366 self.assertAlmostEqual(10.0, when) 1367 yield 0 1368 1369 loop = self.new_test_loop(gen) 1370 1371 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah', loop=loop)) 1372 1373 handle = None 1374 orig_call_later = loop.call_later 1375 1376 def call_later(delay, callback, *args): 1377 nonlocal handle 1378 handle = orig_call_later(delay, callback, *args) 1379 return handle 1380 1381 loop.call_later = call_later 1382 test_utils.run_briefly(loop) 1383 1384 self.assertFalse(handle._cancelled) 1385 1386 t.cancel() 1387 test_utils.run_briefly(loop) 1388 self.assertTrue(handle._cancelled) 1389 1390 def test_task_cancel_sleeping_task(self): 1391 1392 def gen(): 1393 when = yield 1394 self.assertAlmostEqual(0.1, when) 1395 when = yield 0 1396 self.assertAlmostEqual(5000, when) 1397 yield 0.1 1398 1399 loop = self.new_test_loop(gen) 1400 1401 @asyncio.coroutine 1402 def sleep(dt): 1403 yield from asyncio.sleep(dt, loop=loop) 1404 1405 @asyncio.coroutine 1406 def doit(): 1407 sleeper = self.new_task(loop, sleep(5000)) 1408 loop.call_later(0.1, sleeper.cancel) 1409 try: 1410 yield from sleeper 1411 except asyncio.CancelledError: 1412 return 'cancelled' 1413 else: 1414 return 'slept in' 1415 1416 doer = doit() 1417 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1418 self.assertAlmostEqual(0.1, loop.time()) 1419 1420 def test_task_cancel_waiter_future(self): 1421 fut = self.new_future(self.loop) 1422 1423 @asyncio.coroutine 1424 def coro(): 1425 yield from fut 1426 1427 task = self.new_task(self.loop, coro()) 1428 test_utils.run_briefly(self.loop) 1429 self.assertIs(task._fut_waiter, fut) 1430 1431 task.cancel() 1432 test_utils.run_briefly(self.loop) 1433 self.assertRaises( 1434 asyncio.CancelledError, self.loop.run_until_complete, task) 1435 self.assertIsNone(task._fut_waiter) 1436 self.assertTrue(fut.cancelled()) 1437 1438 def test_task_set_methods(self): 1439 @asyncio.coroutine 1440 def notmuch(): 1441 return 'ko' 1442 1443 gen = notmuch() 1444 task = self.new_task(self.loop, gen) 1445 1446 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1447 task.set_result('ok') 1448 1449 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1450 task.set_exception(ValueError()) 1451 1452 self.assertEqual( 1453 self.loop.run_until_complete(task), 1454 'ko') 1455 1456 def test_step_result(self): 1457 @asyncio.coroutine 1458 def notmuch(): 1459 yield None 1460 yield 1 1461 return 'ko' 1462 1463 self.assertRaises( 1464 RuntimeError, self.loop.run_until_complete, notmuch()) 1465 1466 def test_step_result_future(self): 1467 # If coroutine returns future, task waits on this future. 1468 1469 class Fut(asyncio.Future): 1470 def __init__(self, *args, **kwds): 1471 self.cb_added = False 1472 super().__init__(*args, **kwds) 1473 1474 def add_done_callback(self, *args, **kwargs): 1475 self.cb_added = True 1476 super().add_done_callback(*args, **kwargs) 1477 1478 fut = Fut(loop=self.loop) 1479 result = None 1480 1481 @asyncio.coroutine 1482 def wait_for_future(): 1483 nonlocal result 1484 result = yield from fut 1485 1486 t = self.new_task(self.loop, wait_for_future()) 1487 test_utils.run_briefly(self.loop) 1488 self.assertTrue(fut.cb_added) 1489 1490 res = object() 1491 fut.set_result(res) 1492 test_utils.run_briefly(self.loop) 1493 self.assertIs(res, result) 1494 self.assertTrue(t.done()) 1495 self.assertIsNone(t.result()) 1496 1497 def test_baseexception_during_cancel(self): 1498 1499 def gen(): 1500 when = yield 1501 self.assertAlmostEqual(10.0, when) 1502 yield 0 1503 1504 loop = self.new_test_loop(gen) 1505 1506 @asyncio.coroutine 1507 def sleeper(): 1508 yield from asyncio.sleep(10, loop=loop) 1509 1510 base_exc = BaseException() 1511 1512 @asyncio.coroutine 1513 def notmutch(): 1514 try: 1515 yield from sleeper() 1516 except asyncio.CancelledError: 1517 raise base_exc 1518 1519 task = self.new_task(loop, notmutch()) 1520 test_utils.run_briefly(loop) 1521 1522 task.cancel() 1523 self.assertFalse(task.done()) 1524 1525 self.assertRaises(BaseException, test_utils.run_briefly, loop) 1526 1527 self.assertTrue(task.done()) 1528 self.assertFalse(task.cancelled()) 1529 self.assertIs(task.exception(), base_exc) 1530 1531 def test_iscoroutinefunction(self): 1532 def fn(): 1533 pass 1534 1535 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1536 1537 def fn1(): 1538 yield 1539 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1540 1541 @asyncio.coroutine 1542 def fn2(): 1543 yield 1544 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1545 1546 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1547 1548 def test_yield_vs_yield_from(self): 1549 fut = self.new_future(self.loop) 1550 1551 @asyncio.coroutine 1552 def wait_for_future(): 1553 yield fut 1554 1555 task = wait_for_future() 1556 with self.assertRaises(RuntimeError): 1557 self.loop.run_until_complete(task) 1558 1559 self.assertFalse(fut.done()) 1560 1561 def test_yield_vs_yield_from_generator(self): 1562 @asyncio.coroutine 1563 def coro(): 1564 yield 1565 1566 @asyncio.coroutine 1567 def wait_for_future(): 1568 gen = coro() 1569 try: 1570 yield gen 1571 finally: 1572 gen.close() 1573 1574 task = wait_for_future() 1575 self.assertRaises( 1576 RuntimeError, 1577 self.loop.run_until_complete, task) 1578 1579 def test_coroutine_non_gen_function(self): 1580 @asyncio.coroutine 1581 def func(): 1582 return 'test' 1583 1584 self.assertTrue(asyncio.iscoroutinefunction(func)) 1585 1586 coro = func() 1587 self.assertTrue(asyncio.iscoroutine(coro)) 1588 1589 res = self.loop.run_until_complete(coro) 1590 self.assertEqual(res, 'test') 1591 1592 def test_coroutine_non_gen_function_return_future(self): 1593 fut = self.new_future(self.loop) 1594 1595 @asyncio.coroutine 1596 def func(): 1597 return fut 1598 1599 @asyncio.coroutine 1600 def coro(): 1601 fut.set_result('test') 1602 1603 t1 = self.new_task(self.loop, func()) 1604 t2 = self.new_task(self.loop, coro()) 1605 res = self.loop.run_until_complete(t1) 1606 self.assertEqual(res, 'test') 1607 self.assertIsNone(t2.result()) 1608 1609 1610 def test_current_task_deprecated(self): 1611 Task = self.__class__.Task 1612 1613 with self.assertWarns(PendingDeprecationWarning): 1614 self.assertIsNone(Task.current_task(loop=self.loop)) 1615 1616 async def coro(loop): 1617 with self.assertWarns(PendingDeprecationWarning): 1618 self.assertIs(Task.current_task(loop=loop), task) 1619 1620 # See http://bugs.python.org/issue29271 for details: 1621 asyncio.set_event_loop(loop) 1622 try: 1623 with self.assertWarns(PendingDeprecationWarning): 1624 self.assertIs(Task.current_task(None), task) 1625 with self.assertWarns(PendingDeprecationWarning): 1626 self.assertIs(Task.current_task(), task) 1627 finally: 1628 asyncio.set_event_loop(None) 1629 1630 task = self.new_task(self.loop, coro(self.loop)) 1631 self.loop.run_until_complete(task) 1632 with self.assertWarns(PendingDeprecationWarning): 1633 self.assertIsNone(Task.current_task(loop=self.loop)) 1634 1635 def test_current_task(self): 1636 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1637 1638 async def coro(loop): 1639 self.assertIs(asyncio.current_task(loop=loop), task) 1640 1641 self.assertIs(asyncio.current_task(None), task) 1642 self.assertIs(asyncio.current_task(), task) 1643 1644 task = self.new_task(self.loop, coro(self.loop)) 1645 self.loop.run_until_complete(task) 1646 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1647 1648 def test_current_task_with_interleaving_tasks(self): 1649 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1650 1651 fut1 = self.new_future(self.loop) 1652 fut2 = self.new_future(self.loop) 1653 1654 async def coro1(loop): 1655 self.assertTrue(asyncio.current_task(loop=loop) is task1) 1656 await fut1 1657 self.assertTrue(asyncio.current_task(loop=loop) is task1) 1658 fut2.set_result(True) 1659 1660 async def coro2(loop): 1661 self.assertTrue(asyncio.current_task(loop=loop) is task2) 1662 fut1.set_result(True) 1663 await fut2 1664 self.assertTrue(asyncio.current_task(loop=loop) is task2) 1665 1666 task1 = self.new_task(self.loop, coro1(self.loop)) 1667 task2 = self.new_task(self.loop, coro2(self.loop)) 1668 1669 self.loop.run_until_complete(asyncio.wait((task1, task2), 1670 loop=self.loop)) 1671 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1672 1673 # Some thorough tests for cancellation propagation through 1674 # coroutines, tasks and wait(). 1675 1676 def test_yield_future_passes_cancel(self): 1677 # Cancelling outer() cancels inner() cancels waiter. 1678 proof = 0 1679 waiter = self.new_future(self.loop) 1680 1681 async def inner(): 1682 nonlocal proof 1683 try: 1684 await waiter 1685 except asyncio.CancelledError: 1686 proof += 1 1687 raise 1688 else: 1689 self.fail('got past sleep() in inner()') 1690 1691 async def outer(): 1692 nonlocal proof 1693 try: 1694 await inner() 1695 except asyncio.CancelledError: 1696 proof += 100 # Expect this path. 1697 else: 1698 proof += 10 1699 1700 f = asyncio.ensure_future(outer(), loop=self.loop) 1701 test_utils.run_briefly(self.loop) 1702 f.cancel() 1703 self.loop.run_until_complete(f) 1704 self.assertEqual(proof, 101) 1705 self.assertTrue(waiter.cancelled()) 1706 1707 def test_yield_wait_does_not_shield_cancel(self): 1708 # Cancelling outer() makes wait() return early, leaves inner() 1709 # running. 1710 proof = 0 1711 waiter = self.new_future(self.loop) 1712 1713 async def inner(): 1714 nonlocal proof 1715 await waiter 1716 proof += 1 1717 1718 async def outer(): 1719 nonlocal proof 1720 d, p = await asyncio.wait([inner()], loop=self.loop) 1721 proof += 100 1722 1723 f = asyncio.ensure_future(outer(), loop=self.loop) 1724 test_utils.run_briefly(self.loop) 1725 f.cancel() 1726 self.assertRaises( 1727 asyncio.CancelledError, self.loop.run_until_complete, f) 1728 waiter.set_result(None) 1729 test_utils.run_briefly(self.loop) 1730 self.assertEqual(proof, 1) 1731 1732 def test_shield_result(self): 1733 inner = self.new_future(self.loop) 1734 outer = asyncio.shield(inner) 1735 inner.set_result(42) 1736 res = self.loop.run_until_complete(outer) 1737 self.assertEqual(res, 42) 1738 1739 def test_shield_exception(self): 1740 inner = self.new_future(self.loop) 1741 outer = asyncio.shield(inner) 1742 test_utils.run_briefly(self.loop) 1743 exc = RuntimeError('expected') 1744 inner.set_exception(exc) 1745 test_utils.run_briefly(self.loop) 1746 self.assertIs(outer.exception(), exc) 1747 1748 def test_shield_cancel(self): 1749 inner = self.new_future(self.loop) 1750 outer = asyncio.shield(inner) 1751 test_utils.run_briefly(self.loop) 1752 inner.cancel() 1753 test_utils.run_briefly(self.loop) 1754 self.assertTrue(outer.cancelled()) 1755 1756 def test_shield_shortcut(self): 1757 fut = self.new_future(self.loop) 1758 fut.set_result(42) 1759 res = self.loop.run_until_complete(asyncio.shield(fut)) 1760 self.assertEqual(res, 42) 1761 1762 def test_shield_effect(self): 1763 # Cancelling outer() does not affect inner(). 1764 proof = 0 1765 waiter = self.new_future(self.loop) 1766 1767 async def inner(): 1768 nonlocal proof 1769 await waiter 1770 proof += 1 1771 1772 async def outer(): 1773 nonlocal proof 1774 await asyncio.shield(inner(), loop=self.loop) 1775 proof += 100 1776 1777 f = asyncio.ensure_future(outer(), loop=self.loop) 1778 test_utils.run_briefly(self.loop) 1779 f.cancel() 1780 with self.assertRaises(asyncio.CancelledError): 1781 self.loop.run_until_complete(f) 1782 waiter.set_result(None) 1783 test_utils.run_briefly(self.loop) 1784 self.assertEqual(proof, 1) 1785 1786 def test_shield_gather(self): 1787 child1 = self.new_future(self.loop) 1788 child2 = self.new_future(self.loop) 1789 parent = asyncio.gather(child1, child2, loop=self.loop) 1790 outer = asyncio.shield(parent, loop=self.loop) 1791 test_utils.run_briefly(self.loop) 1792 outer.cancel() 1793 test_utils.run_briefly(self.loop) 1794 self.assertTrue(outer.cancelled()) 1795 child1.set_result(1) 1796 child2.set_result(2) 1797 test_utils.run_briefly(self.loop) 1798 self.assertEqual(parent.result(), [1, 2]) 1799 1800 def test_gather_shield(self): 1801 child1 = self.new_future(self.loop) 1802 child2 = self.new_future(self.loop) 1803 inner1 = asyncio.shield(child1, loop=self.loop) 1804 inner2 = asyncio.shield(child2, loop=self.loop) 1805 parent = asyncio.gather(inner1, inner2, loop=self.loop) 1806 test_utils.run_briefly(self.loop) 1807 parent.cancel() 1808 # This should cancel inner1 and inner2 but bot child1 and child2. 1809 test_utils.run_briefly(self.loop) 1810 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 1811 self.assertTrue(inner1.cancelled()) 1812 self.assertTrue(inner2.cancelled()) 1813 child1.set_result(1) 1814 child2.set_result(2) 1815 test_utils.run_briefly(self.loop) 1816 1817 def test_as_completed_invalid_args(self): 1818 fut = self.new_future(self.loop) 1819 1820 # as_completed() expects a list of futures, not a future instance 1821 self.assertRaises(TypeError, self.loop.run_until_complete, 1822 asyncio.as_completed(fut, loop=self.loop)) 1823 coro = coroutine_function() 1824 self.assertRaises(TypeError, self.loop.run_until_complete, 1825 asyncio.as_completed(coro, loop=self.loop)) 1826 coro.close() 1827 1828 def test_wait_invalid_args(self): 1829 fut = self.new_future(self.loop) 1830 1831 # wait() expects a list of futures, not a future instance 1832 self.assertRaises(TypeError, self.loop.run_until_complete, 1833 asyncio.wait(fut, loop=self.loop)) 1834 coro = coroutine_function() 1835 self.assertRaises(TypeError, self.loop.run_until_complete, 1836 asyncio.wait(coro, loop=self.loop)) 1837 coro.close() 1838 1839 # wait() expects at least a future 1840 self.assertRaises(ValueError, self.loop.run_until_complete, 1841 asyncio.wait([], loop=self.loop)) 1842 1843 def test_corowrapper_mocks_generator(self): 1844 1845 def check(): 1846 # A function that asserts various things. 1847 # Called twice, with different debug flag values. 1848 1849 @asyncio.coroutine 1850 def coro(): 1851 # The actual coroutine. 1852 self.assertTrue(gen.gi_running) 1853 yield from fut 1854 1855 # A completed Future used to run the coroutine. 1856 fut = self.new_future(self.loop) 1857 fut.set_result(None) 1858 1859 # Call the coroutine. 1860 gen = coro() 1861 1862 # Check some properties. 1863 self.assertTrue(asyncio.iscoroutine(gen)) 1864 self.assertIsInstance(gen.gi_frame, types.FrameType) 1865 self.assertFalse(gen.gi_running) 1866 self.assertIsInstance(gen.gi_code, types.CodeType) 1867 1868 # Run it. 1869 self.loop.run_until_complete(gen) 1870 1871 # The frame should have changed. 1872 self.assertIsNone(gen.gi_frame) 1873 1874 # Test with debug flag cleared. 1875 with set_coroutine_debug(False): 1876 check() 1877 1878 # Test with debug flag set. 1879 with set_coroutine_debug(True): 1880 check() 1881 1882 def test_yield_from_corowrapper(self): 1883 with set_coroutine_debug(True): 1884 @asyncio.coroutine 1885 def t1(): 1886 return (yield from t2()) 1887 1888 @asyncio.coroutine 1889 def t2(): 1890 f = self.new_future(self.loop) 1891 self.new_task(self.loop, t3(f)) 1892 return (yield from f) 1893 1894 @asyncio.coroutine 1895 def t3(f): 1896 f.set_result((1, 2, 3)) 1897 1898 task = self.new_task(self.loop, t1()) 1899 val = self.loop.run_until_complete(task) 1900 self.assertEqual(val, (1, 2, 3)) 1901 1902 def test_yield_from_corowrapper_send(self): 1903 def foo(): 1904 a = yield 1905 return a 1906 1907 def call(arg): 1908 cw = asyncio.coroutines.CoroWrapper(foo()) 1909 cw.send(None) 1910 try: 1911 cw.send(arg) 1912 except StopIteration as ex: 1913 return ex.args[0] 1914 else: 1915 raise AssertionError('StopIteration was expected') 1916 1917 self.assertEqual(call((1, 2)), (1, 2)) 1918 self.assertEqual(call('spam'), 'spam') 1919 1920 def test_corowrapper_weakref(self): 1921 wd = weakref.WeakValueDictionary() 1922 def foo(): yield from [] 1923 cw = asyncio.coroutines.CoroWrapper(foo()) 1924 wd['cw'] = cw # Would fail without __weakref__ slot. 1925 cw.gen = None # Suppress warning from __del__. 1926 1927 def test_corowrapper_throw(self): 1928 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 1929 def foo(): 1930 value = None 1931 while True: 1932 try: 1933 value = yield value 1934 except Exception as e: 1935 value = e 1936 1937 exception = Exception("foo") 1938 cw = asyncio.coroutines.CoroWrapper(foo()) 1939 cw.send(None) 1940 self.assertIs(exception, cw.throw(exception)) 1941 1942 cw = asyncio.coroutines.CoroWrapper(foo()) 1943 cw.send(None) 1944 self.assertIs(exception, cw.throw(Exception, exception)) 1945 1946 cw = asyncio.coroutines.CoroWrapper(foo()) 1947 cw.send(None) 1948 exception = cw.throw(Exception, "foo") 1949 self.assertIsInstance(exception, Exception) 1950 self.assertEqual(exception.args, ("foo", )) 1951 1952 cw = asyncio.coroutines.CoroWrapper(foo()) 1953 cw.send(None) 1954 exception = cw.throw(Exception, "foo", None) 1955 self.assertIsInstance(exception, Exception) 1956 self.assertEqual(exception.args, ("foo", )) 1957 1958 def test_all_tasks_deprecated(self): 1959 Task = self.__class__.Task 1960 1961 async def coro(): 1962 with self.assertWarns(PendingDeprecationWarning): 1963 assert Task.all_tasks(self.loop) == {t} 1964 1965 t = self.new_task(self.loop, coro()) 1966 self.loop.run_until_complete(t) 1967 1968 def test_log_destroyed_pending_task(self): 1969 Task = self.__class__.Task 1970 1971 @asyncio.coroutine 1972 def kill_me(loop): 1973 future = self.new_future(loop) 1974 yield from future 1975 # at this point, the only reference to kill_me() task is 1976 # the Task._wakeup() method in future._callbacks 1977 raise Exception("code never reached") 1978 1979 mock_handler = mock.Mock() 1980 self.loop.set_debug(True) 1981 self.loop.set_exception_handler(mock_handler) 1982 1983 # schedule the task 1984 coro = kill_me(self.loop) 1985 task = asyncio.ensure_future(coro, loop=self.loop) 1986 1987 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 1988 1989 # See http://bugs.python.org/issue29271 for details: 1990 asyncio.set_event_loop(self.loop) 1991 try: 1992 with self.assertWarns(PendingDeprecationWarning): 1993 self.assertEqual(Task.all_tasks(), {task}) 1994 with self.assertWarns(PendingDeprecationWarning): 1995 self.assertEqual(Task.all_tasks(None), {task}) 1996 finally: 1997 asyncio.set_event_loop(None) 1998 1999 # execute the task so it waits for future 2000 self.loop._run_once() 2001 self.assertEqual(len(self.loop._ready), 0) 2002 2003 # remove the future used in kill_me(), and references to the task 2004 del coro.gi_frame.f_locals['future'] 2005 coro = None 2006 source_traceback = task._source_traceback 2007 task = None 2008 2009 # no more reference to kill_me() task: the task is destroyed by the GC 2010 support.gc_collect() 2011 2012 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2013 2014 mock_handler.assert_called_with(self.loop, { 2015 'message': 'Task was destroyed but it is pending!', 2016 'task': mock.ANY, 2017 'source_traceback': source_traceback, 2018 }) 2019 mock_handler.reset_mock() 2020 2021 @mock.patch('asyncio.base_events.logger') 2022 def test_tb_logger_not_called_after_cancel(self, m_log): 2023 loop = asyncio.new_event_loop() 2024 self.set_event_loop(loop) 2025 2026 @asyncio.coroutine 2027 def coro(): 2028 raise TypeError 2029 2030 @asyncio.coroutine 2031 def runner(): 2032 task = self.new_task(loop, coro()) 2033 yield from asyncio.sleep(0.05, loop=loop) 2034 task.cancel() 2035 task = None 2036 2037 loop.run_until_complete(runner()) 2038 self.assertFalse(m_log.error.called) 2039 2040 @mock.patch('asyncio.coroutines.logger') 2041 def test_coroutine_never_yielded(self, m_log): 2042 with set_coroutine_debug(True): 2043 @asyncio.coroutine 2044 def coro_noop(): 2045 pass 2046 2047 tb_filename = __file__ 2048 tb_lineno = sys._getframe().f_lineno + 2 2049 # create a coroutine object but don't use it 2050 coro_noop() 2051 support.gc_collect() 2052 2053 self.assertTrue(m_log.error.called) 2054 message = m_log.error.call_args[0][0] 2055 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2056 2057 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2058 r'was never yielded from\n' 2059 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2060 r'.*\n' 2061 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2062 r' coro_noop\(\)$' 2063 % (re.escape(coro_noop.__qualname__), 2064 re.escape(func_filename), func_lineno, 2065 re.escape(tb_filename), tb_lineno)) 2066 2067 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2068 2069 def test_return_coroutine_from_coroutine(self): 2070 """Return of @asyncio.coroutine()-wrapped function generator object 2071 from @asyncio.coroutine()-wrapped function should have same effect as 2072 returning generator object or Future.""" 2073 def check(): 2074 @asyncio.coroutine 2075 def outer_coro(): 2076 @asyncio.coroutine 2077 def inner_coro(): 2078 return 1 2079 2080 return inner_coro() 2081 2082 result = self.loop.run_until_complete(outer_coro()) 2083 self.assertEqual(result, 1) 2084 2085 # Test with debug flag cleared. 2086 with set_coroutine_debug(False): 2087 check() 2088 2089 # Test with debug flag set. 2090 with set_coroutine_debug(True): 2091 check() 2092 2093 def test_task_source_traceback(self): 2094 self.loop.set_debug(True) 2095 2096 task = self.new_task(self.loop, coroutine_function()) 2097 lineno = sys._getframe().f_lineno - 1 2098 self.assertIsInstance(task._source_traceback, list) 2099 self.assertEqual(task._source_traceback[-2][:3], 2100 (__file__, 2101 lineno, 2102 'test_task_source_traceback')) 2103 self.loop.run_until_complete(task) 2104 2105 def _test_cancel_wait_for(self, timeout): 2106 loop = asyncio.new_event_loop() 2107 self.addCleanup(loop.close) 2108 2109 @asyncio.coroutine 2110 def blocking_coroutine(): 2111 fut = self.new_future(loop) 2112 # Block: fut result is never set 2113 yield from fut 2114 2115 task = loop.create_task(blocking_coroutine()) 2116 2117 wait = loop.create_task(asyncio.wait_for(task, timeout, loop=loop)) 2118 loop.call_soon(wait.cancel) 2119 2120 self.assertRaises(asyncio.CancelledError, 2121 loop.run_until_complete, wait) 2122 2123 # Python issue #23219: cancelling the wait must also cancel the task 2124 self.assertTrue(task.cancelled()) 2125 2126 def test_cancel_blocking_wait_for(self): 2127 self._test_cancel_wait_for(None) 2128 2129 def test_cancel_wait_for(self): 2130 self._test_cancel_wait_for(60.0) 2131 2132 def test_cancel_gather_1(self): 2133 """Ensure that a gathering future refuses to be cancelled once all 2134 children are done""" 2135 loop = asyncio.new_event_loop() 2136 self.addCleanup(loop.close) 2137 2138 fut = self.new_future(loop) 2139 # The indirection fut->child_coro is needed since otherwise the 2140 # gathering task is done at the same time as the child future 2141 def child_coro(): 2142 return (yield from fut) 2143 gather_future = asyncio.gather(child_coro(), loop=loop) 2144 gather_task = asyncio.ensure_future(gather_future, loop=loop) 2145 2146 cancel_result = None 2147 def cancelling_callback(_): 2148 nonlocal cancel_result 2149 cancel_result = gather_task.cancel() 2150 fut.add_done_callback(cancelling_callback) 2151 2152 fut.set_result(42) # calls the cancelling_callback after fut is done() 2153 2154 # At this point the task should complete. 2155 loop.run_until_complete(gather_task) 2156 2157 # Python issue #26923: asyncio.gather drops cancellation 2158 self.assertEqual(cancel_result, False) 2159 self.assertFalse(gather_task.cancelled()) 2160 self.assertEqual(gather_task.result(), [42]) 2161 2162 def test_cancel_gather_2(self): 2163 loop = asyncio.new_event_loop() 2164 self.addCleanup(loop.close) 2165 2166 async def test(): 2167 time = 0 2168 while True: 2169 time += 0.05 2170 await asyncio.gather(asyncio.sleep(0.05, loop=loop), 2171 return_exceptions=True, 2172 loop=loop) 2173 if time > 1: 2174 return 2175 2176 async def main(): 2177 qwe = self.new_task(loop, test()) 2178 await asyncio.sleep(0.2, loop=loop) 2179 qwe.cancel() 2180 try: 2181 await qwe 2182 except asyncio.CancelledError: 2183 pass 2184 else: 2185 self.fail('gather did not propagate the cancellation request') 2186 2187 loop.run_until_complete(main()) 2188 2189 def test_exception_traceback(self): 2190 # See http://bugs.python.org/issue28843 2191 2192 @asyncio.coroutine 2193 def foo(): 2194 1 / 0 2195 2196 @asyncio.coroutine 2197 def main(): 2198 task = self.new_task(self.loop, foo()) 2199 yield # skip one loop iteration 2200 self.assertIsNotNone(task.exception().__traceback__) 2201 2202 self.loop.run_until_complete(main()) 2203 2204 @mock.patch('asyncio.base_events.logger') 2205 def test_error_in_call_soon(self, m_log): 2206 def call_soon(callback, *args, **kwargs): 2207 raise ValueError 2208 self.loop.call_soon = call_soon 2209 2210 @asyncio.coroutine 2211 def coro(): 2212 pass 2213 2214 self.assertFalse(m_log.error.called) 2215 2216 with self.assertRaises(ValueError): 2217 gen = coro() 2218 try: 2219 self.new_task(self.loop, gen) 2220 finally: 2221 gen.close() 2222 2223 self.assertTrue(m_log.error.called) 2224 message = m_log.error.call_args[0][0] 2225 self.assertIn('Task was destroyed but it is pending', message) 2226 2227 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2228 2229 def test_create_task_with_noncoroutine(self): 2230 with self.assertRaisesRegex(TypeError, 2231 "a coroutine was expected, got 123"): 2232 self.new_task(self.loop, 123) 2233 2234 # test it for the second time to ensure that caching 2235 # in asyncio.iscoroutine() doesn't break things. 2236 with self.assertRaisesRegex(TypeError, 2237 "a coroutine was expected, got 123"): 2238 self.new_task(self.loop, 123) 2239 2240 def test_create_task_with_oldstyle_coroutine(self): 2241 2242 @asyncio.coroutine 2243 def coro(): 2244 pass 2245 2246 task = self.new_task(self.loop, coro()) 2247 self.assertIsInstance(task, self.Task) 2248 self.loop.run_until_complete(task) 2249 2250 # test it for the second time to ensure that caching 2251 # in asyncio.iscoroutine() doesn't break things. 2252 task = self.new_task(self.loop, coro()) 2253 self.assertIsInstance(task, self.Task) 2254 self.loop.run_until_complete(task) 2255 2256 def test_create_task_with_async_function(self): 2257 2258 async def coro(): 2259 pass 2260 2261 task = self.new_task(self.loop, coro()) 2262 self.assertIsInstance(task, self.Task) 2263 self.loop.run_until_complete(task) 2264 2265 # test it for the second time to ensure that caching 2266 # in asyncio.iscoroutine() doesn't break things. 2267 task = self.new_task(self.loop, coro()) 2268 self.assertIsInstance(task, self.Task) 2269 self.loop.run_until_complete(task) 2270 2271 def test_create_task_with_asynclike_function(self): 2272 task = self.new_task(self.loop, CoroLikeObject()) 2273 self.assertIsInstance(task, self.Task) 2274 self.assertEqual(self.loop.run_until_complete(task), 42) 2275 2276 # test it for the second time to ensure that caching 2277 # in asyncio.iscoroutine() doesn't break things. 2278 task = self.new_task(self.loop, CoroLikeObject()) 2279 self.assertIsInstance(task, self.Task) 2280 self.assertEqual(self.loop.run_until_complete(task), 42) 2281 2282 def test_bare_create_task(self): 2283 2284 async def inner(): 2285 return 1 2286 2287 async def coro(): 2288 task = asyncio.create_task(inner()) 2289 self.assertIsInstance(task, self.Task) 2290 ret = await task 2291 self.assertEqual(1, ret) 2292 2293 self.loop.run_until_complete(coro()) 2294 2295 def test_context_1(self): 2296 cvar = contextvars.ContextVar('cvar', default='nope') 2297 2298 async def sub(): 2299 await asyncio.sleep(0.01, loop=loop) 2300 self.assertEqual(cvar.get(), 'nope') 2301 cvar.set('something else') 2302 2303 async def main(): 2304 self.assertEqual(cvar.get(), 'nope') 2305 subtask = self.new_task(loop, sub()) 2306 cvar.set('yes') 2307 self.assertEqual(cvar.get(), 'yes') 2308 await subtask 2309 self.assertEqual(cvar.get(), 'yes') 2310 2311 loop = asyncio.new_event_loop() 2312 try: 2313 task = self.new_task(loop, main()) 2314 loop.run_until_complete(task) 2315 finally: 2316 loop.close() 2317 2318 def test_context_2(self): 2319 cvar = contextvars.ContextVar('cvar', default='nope') 2320 2321 async def main(): 2322 def fut_on_done(fut): 2323 # This change must not pollute the context 2324 # of the "main()" task. 2325 cvar.set('something else') 2326 2327 self.assertEqual(cvar.get(), 'nope') 2328 2329 for j in range(2): 2330 fut = self.new_future(loop) 2331 fut.add_done_callback(fut_on_done) 2332 cvar.set(f'yes{j}') 2333 loop.call_soon(fut.set_result, None) 2334 await fut 2335 self.assertEqual(cvar.get(), f'yes{j}') 2336 2337 for i in range(3): 2338 # Test that task passed its context to add_done_callback: 2339 cvar.set(f'yes{i}-{j}') 2340 await asyncio.sleep(0.001, loop=loop) 2341 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2342 2343 loop = asyncio.new_event_loop() 2344 try: 2345 task = self.new_task(loop, main()) 2346 loop.run_until_complete(task) 2347 finally: 2348 loop.close() 2349 2350 self.assertEqual(cvar.get(), 'nope') 2351 2352 def test_context_3(self): 2353 # Run 100 Tasks in parallel, each modifying cvar. 2354 2355 cvar = contextvars.ContextVar('cvar', default=-1) 2356 2357 async def sub(num): 2358 for i in range(10): 2359 cvar.set(num + i) 2360 await asyncio.sleep( 2361 random.uniform(0.001, 0.05), loop=loop) 2362 self.assertEqual(cvar.get(), num + i) 2363 2364 async def main(): 2365 tasks = [] 2366 for i in range(100): 2367 task = loop.create_task(sub(random.randint(0, 10))) 2368 tasks.append(task) 2369 2370 await asyncio.gather(*tasks, loop=loop) 2371 2372 loop = asyncio.new_event_loop() 2373 try: 2374 loop.run_until_complete(main()) 2375 finally: 2376 loop.close() 2377 2378 self.assertEqual(cvar.get(), -1) 2379 2380 2381def add_subclass_tests(cls): 2382 BaseTask = cls.Task 2383 BaseFuture = cls.Future 2384 2385 if BaseTask is None or BaseFuture is None: 2386 return cls 2387 2388 class CommonFuture: 2389 def __init__(self, *args, **kwargs): 2390 self.calls = collections.defaultdict(lambda: 0) 2391 super().__init__(*args, **kwargs) 2392 2393 def add_done_callback(self, *args, **kwargs): 2394 self.calls['add_done_callback'] += 1 2395 return super().add_done_callback(*args, **kwargs) 2396 2397 class Task(CommonFuture, BaseTask): 2398 pass 2399 2400 class Future(CommonFuture, BaseFuture): 2401 pass 2402 2403 def test_subclasses_ctask_cfuture(self): 2404 fut = self.Future(loop=self.loop) 2405 2406 async def func(): 2407 self.loop.call_soon(lambda: fut.set_result('spam')) 2408 return await fut 2409 2410 task = self.Task(func(), loop=self.loop) 2411 2412 result = self.loop.run_until_complete(task) 2413 2414 self.assertEqual(result, 'spam') 2415 2416 self.assertEqual( 2417 dict(task.calls), 2418 {'add_done_callback': 1}) 2419 2420 self.assertEqual( 2421 dict(fut.calls), 2422 {'add_done_callback': 1}) 2423 2424 # Add patched Task & Future back to the test case 2425 cls.Task = Task 2426 cls.Future = Future 2427 2428 # Add an extra unit-test 2429 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2430 2431 # Disable the "test_task_source_traceback" test 2432 # (the test is hardcoded for a particular call stack, which 2433 # is slightly different for Task subclasses) 2434 cls.test_task_source_traceback = None 2435 2436 return cls 2437 2438 2439class SetMethodsTest: 2440 2441 def test_set_result_causes_invalid_state(self): 2442 Future = type(self).Future 2443 self.loop.call_exception_handler = exc_handler = mock.Mock() 2444 2445 async def foo(): 2446 await asyncio.sleep(0.1, loop=self.loop) 2447 return 10 2448 2449 coro = foo() 2450 task = self.new_task(self.loop, coro) 2451 Future.set_result(task, 'spam') 2452 2453 self.assertEqual( 2454 self.loop.run_until_complete(task), 2455 'spam') 2456 2457 exc_handler.assert_called_once() 2458 exc = exc_handler.call_args[0][0]['exception'] 2459 with self.assertRaisesRegex(asyncio.InvalidStateError, 2460 r'step\(\): already done'): 2461 raise exc 2462 2463 coro.close() 2464 2465 def test_set_exception_causes_invalid_state(self): 2466 class MyExc(Exception): 2467 pass 2468 2469 Future = type(self).Future 2470 self.loop.call_exception_handler = exc_handler = mock.Mock() 2471 2472 async def foo(): 2473 await asyncio.sleep(0.1, loop=self.loop) 2474 return 10 2475 2476 coro = foo() 2477 task = self.new_task(self.loop, coro) 2478 Future.set_exception(task, MyExc()) 2479 2480 with self.assertRaises(MyExc): 2481 self.loop.run_until_complete(task) 2482 2483 exc_handler.assert_called_once() 2484 exc = exc_handler.call_args[0][0]['exception'] 2485 with self.assertRaisesRegex(asyncio.InvalidStateError, 2486 r'step\(\): already done'): 2487 raise exc 2488 2489 coro.close() 2490 2491 2492@unittest.skipUnless(hasattr(futures, '_CFuture') and 2493 hasattr(tasks, '_CTask'), 2494 'requires the C _asyncio module') 2495class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2496 test_utils.TestCase): 2497 2498 Task = getattr(tasks, '_CTask', None) 2499 Future = getattr(futures, '_CFuture', None) 2500 2501 @support.refcount_test 2502 def test_refleaks_in_task___init__(self): 2503 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2504 @asyncio.coroutine 2505 def coro(): 2506 pass 2507 task = self.new_task(self.loop, coro()) 2508 self.loop.run_until_complete(task) 2509 refs_before = gettotalrefcount() 2510 for i in range(100): 2511 task.__init__(coro(), loop=self.loop) 2512 self.loop.run_until_complete(task) 2513 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2514 2515 def test_del__log_destroy_pending_segfault(self): 2516 @asyncio.coroutine 2517 def coro(): 2518 pass 2519 task = self.new_task(self.loop, coro()) 2520 self.loop.run_until_complete(task) 2521 with self.assertRaises(AttributeError): 2522 del task._log_destroy_pending 2523 2524 2525@unittest.skipUnless(hasattr(futures, '_CFuture') and 2526 hasattr(tasks, '_CTask'), 2527 'requires the C _asyncio module') 2528@add_subclass_tests 2529class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2530 2531 Task = getattr(tasks, '_CTask', None) 2532 Future = getattr(futures, '_CFuture', None) 2533 2534 2535@unittest.skipUnless(hasattr(tasks, '_CTask'), 2536 'requires the C _asyncio module') 2537@add_subclass_tests 2538class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2539 2540 Task = getattr(tasks, '_CTask', None) 2541 Future = futures._PyFuture 2542 2543 2544@unittest.skipUnless(hasattr(futures, '_CFuture'), 2545 'requires the C _asyncio module') 2546@add_subclass_tests 2547class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2548 2549 Future = getattr(futures, '_CFuture', None) 2550 Task = tasks._PyTask 2551 2552 2553@unittest.skipUnless(hasattr(tasks, '_CTask'), 2554 'requires the C _asyncio module') 2555class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2556 2557 Task = getattr(tasks, '_CTask', None) 2558 Future = futures._PyFuture 2559 2560 2561@unittest.skipUnless(hasattr(futures, '_CFuture'), 2562 'requires the C _asyncio module') 2563class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2564 2565 Task = tasks._PyTask 2566 Future = getattr(futures, '_CFuture', None) 2567 2568 2569class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2570 test_utils.TestCase): 2571 2572 Task = tasks._PyTask 2573 Future = futures._PyFuture 2574 2575 2576@add_subclass_tests 2577class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2578 Task = tasks._PyTask 2579 Future = futures._PyFuture 2580 2581 2582@unittest.skipUnless(hasattr(tasks, '_CTask'), 2583 'requires the C _asyncio module') 2584class CTask_Future_Tests(test_utils.TestCase): 2585 2586 def test_foobar(self): 2587 class Fut(asyncio.Future): 2588 @property 2589 def get_loop(self): 2590 raise AttributeError 2591 2592 async def coro(): 2593 await fut 2594 return 'spam' 2595 2596 self.loop = asyncio.new_event_loop() 2597 try: 2598 fut = Fut(loop=self.loop) 2599 self.loop.call_later(0.1, fut.set_result, 1) 2600 task = asyncio.Task(coro(), loop=self.loop) 2601 res = self.loop.run_until_complete(task) 2602 finally: 2603 self.loop.close() 2604 2605 self.assertEqual(res, 'spam') 2606 2607 2608class BaseTaskIntrospectionTests: 2609 _register_task = None 2610 _unregister_task = None 2611 _enter_task = None 2612 _leave_task = None 2613 2614 def test__register_task_1(self): 2615 class TaskLike: 2616 @property 2617 def _loop(self): 2618 return loop 2619 2620 def done(self): 2621 return False 2622 2623 task = TaskLike() 2624 loop = mock.Mock() 2625 2626 self.assertEqual(asyncio.all_tasks(loop), set()) 2627 self._register_task(task) 2628 self.assertEqual(asyncio.all_tasks(loop), {task}) 2629 self._unregister_task(task) 2630 2631 def test__register_task_2(self): 2632 class TaskLike: 2633 def get_loop(self): 2634 return loop 2635 2636 def done(self): 2637 return False 2638 2639 task = TaskLike() 2640 loop = mock.Mock() 2641 2642 self.assertEqual(asyncio.all_tasks(loop), set()) 2643 self._register_task(task) 2644 self.assertEqual(asyncio.all_tasks(loop), {task}) 2645 self._unregister_task(task) 2646 2647 def test__register_task_3(self): 2648 class TaskLike: 2649 def get_loop(self): 2650 return loop 2651 2652 def done(self): 2653 return True 2654 2655 task = TaskLike() 2656 loop = mock.Mock() 2657 2658 self.assertEqual(asyncio.all_tasks(loop), set()) 2659 self._register_task(task) 2660 self.assertEqual(asyncio.all_tasks(loop), set()) 2661 with self.assertWarns(PendingDeprecationWarning): 2662 self.assertEqual(asyncio.Task.all_tasks(loop), {task}) 2663 self._unregister_task(task) 2664 2665 def test__enter_task(self): 2666 task = mock.Mock() 2667 loop = mock.Mock() 2668 self.assertIsNone(asyncio.current_task(loop)) 2669 self._enter_task(loop, task) 2670 self.assertIs(asyncio.current_task(loop), task) 2671 self._leave_task(loop, task) 2672 2673 def test__enter_task_failure(self): 2674 task1 = mock.Mock() 2675 task2 = mock.Mock() 2676 loop = mock.Mock() 2677 self._enter_task(loop, task1) 2678 with self.assertRaises(RuntimeError): 2679 self._enter_task(loop, task2) 2680 self.assertIs(asyncio.current_task(loop), task1) 2681 self._leave_task(loop, task1) 2682 2683 def test__leave_task(self): 2684 task = mock.Mock() 2685 loop = mock.Mock() 2686 self._enter_task(loop, task) 2687 self._leave_task(loop, task) 2688 self.assertIsNone(asyncio.current_task(loop)) 2689 2690 def test__leave_task_failure1(self): 2691 task1 = mock.Mock() 2692 task2 = mock.Mock() 2693 loop = mock.Mock() 2694 self._enter_task(loop, task1) 2695 with self.assertRaises(RuntimeError): 2696 self._leave_task(loop, task2) 2697 self.assertIs(asyncio.current_task(loop), task1) 2698 self._leave_task(loop, task1) 2699 2700 def test__leave_task_failure2(self): 2701 task = mock.Mock() 2702 loop = mock.Mock() 2703 with self.assertRaises(RuntimeError): 2704 self._leave_task(loop, task) 2705 self.assertIsNone(asyncio.current_task(loop)) 2706 2707 def test__unregister_task(self): 2708 task = mock.Mock() 2709 loop = mock.Mock() 2710 task.get_loop = lambda: loop 2711 self._register_task(task) 2712 self._unregister_task(task) 2713 self.assertEqual(asyncio.all_tasks(loop), set()) 2714 2715 def test__unregister_task_not_registered(self): 2716 task = mock.Mock() 2717 loop = mock.Mock() 2718 self._unregister_task(task) 2719 self.assertEqual(asyncio.all_tasks(loop), set()) 2720 2721 2722class PyIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests): 2723 _register_task = staticmethod(tasks._py_register_task) 2724 _unregister_task = staticmethod(tasks._py_unregister_task) 2725 _enter_task = staticmethod(tasks._py_enter_task) 2726 _leave_task = staticmethod(tasks._py_leave_task) 2727 2728 2729@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 2730 'requires the C _asyncio module') 2731class CIntrospectionTests(unittest.TestCase, BaseTaskIntrospectionTests): 2732 if hasattr(tasks, '_c_register_task'): 2733 _register_task = staticmethod(tasks._c_register_task) 2734 _unregister_task = staticmethod(tasks._c_unregister_task) 2735 _enter_task = staticmethod(tasks._c_enter_task) 2736 _leave_task = staticmethod(tasks._c_leave_task) 2737 else: 2738 _register_task = _unregister_task = _enter_task = _leave_task = None 2739 2740 2741class BaseCurrentLoopTests: 2742 2743 def setUp(self): 2744 super().setUp() 2745 self.loop = asyncio.new_event_loop() 2746 asyncio.set_event_loop(self.loop) 2747 2748 def tearDown(self): 2749 self.loop.close() 2750 asyncio.set_event_loop(None) 2751 super().tearDown() 2752 2753 def new_task(self, coro): 2754 raise NotImplementedError 2755 2756 def test_current_task_no_running_loop(self): 2757 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2758 2759 def test_current_task_no_running_loop_implicit(self): 2760 with self.assertRaises(RuntimeError): 2761 asyncio.current_task() 2762 2763 def test_current_task_with_implicit_loop(self): 2764 async def coro(): 2765 self.assertIs(asyncio.current_task(loop=self.loop), task) 2766 2767 self.assertIs(asyncio.current_task(None), task) 2768 self.assertIs(asyncio.current_task(), task) 2769 2770 task = self.new_task(coro()) 2771 self.loop.run_until_complete(task) 2772 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2773 2774 2775class PyCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase): 2776 2777 def new_task(self, coro): 2778 return tasks._PyTask(coro, loop=self.loop) 2779 2780 2781@unittest.skipUnless(hasattr(tasks, '_CTask'), 2782 'requires the C _asyncio module') 2783class CCurrentLoopTests(BaseCurrentLoopTests, unittest.TestCase): 2784 2785 def new_task(self, coro): 2786 return getattr(tasks, '_CTask')(coro, loop=self.loop) 2787 2788 2789class GenericTaskTests(test_utils.TestCase): 2790 2791 def test_future_subclass(self): 2792 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 2793 2794 def test_asyncio_module_compiled(self): 2795 # Because of circular imports it's easy to make _asyncio 2796 # module non-importable. This is a simple test that will 2797 # fail on systems where C modules were successfully compiled 2798 # (hence the test for _functools), but _asyncio somehow didn't. 2799 try: 2800 import _functools 2801 except ImportError: 2802 pass 2803 else: 2804 try: 2805 import _asyncio 2806 except ImportError: 2807 self.fail('_asyncio module is missing') 2808 2809 2810class GatherTestsBase: 2811 2812 def setUp(self): 2813 super().setUp() 2814 self.one_loop = self.new_test_loop() 2815 self.other_loop = self.new_test_loop() 2816 self.set_event_loop(self.one_loop, cleanup=False) 2817 2818 def _run_loop(self, loop): 2819 while loop._ready: 2820 test_utils.run_briefly(loop) 2821 2822 def _check_success(self, **kwargs): 2823 a, b, c = [asyncio.Future(loop=self.one_loop) for i in range(3)] 2824 fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) 2825 cb = test_utils.MockCallback() 2826 fut.add_done_callback(cb) 2827 b.set_result(1) 2828 a.set_result(2) 2829 self._run_loop(self.one_loop) 2830 self.assertEqual(cb.called, False) 2831 self.assertFalse(fut.done()) 2832 c.set_result(3) 2833 self._run_loop(self.one_loop) 2834 cb.assert_called_once_with(fut) 2835 self.assertEqual(fut.result(), [2, 1, 3]) 2836 2837 def test_success(self): 2838 self._check_success() 2839 self._check_success(return_exceptions=False) 2840 2841 def test_result_exception_success(self): 2842 self._check_success(return_exceptions=True) 2843 2844 def test_one_exception(self): 2845 a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)] 2846 fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) 2847 cb = test_utils.MockCallback() 2848 fut.add_done_callback(cb) 2849 exc = ZeroDivisionError() 2850 a.set_result(1) 2851 b.set_exception(exc) 2852 self._run_loop(self.one_loop) 2853 self.assertTrue(fut.done()) 2854 cb.assert_called_once_with(fut) 2855 self.assertIs(fut.exception(), exc) 2856 # Does nothing 2857 c.set_result(3) 2858 d.cancel() 2859 e.set_exception(RuntimeError()) 2860 e.exception() 2861 2862 def test_return_exceptions(self): 2863 a, b, c, d = [asyncio.Future(loop=self.one_loop) for i in range(4)] 2864 fut = asyncio.gather(*self.wrap_futures(a, b, c, d), 2865 return_exceptions=True) 2866 cb = test_utils.MockCallback() 2867 fut.add_done_callback(cb) 2868 exc = ZeroDivisionError() 2869 exc2 = RuntimeError() 2870 b.set_result(1) 2871 c.set_exception(exc) 2872 a.set_result(3) 2873 self._run_loop(self.one_loop) 2874 self.assertFalse(fut.done()) 2875 d.set_exception(exc2) 2876 self._run_loop(self.one_loop) 2877 self.assertTrue(fut.done()) 2878 cb.assert_called_once_with(fut) 2879 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 2880 2881 def test_env_var_debug(self): 2882 code = '\n'.join(( 2883 'import asyncio.coroutines', 2884 'print(asyncio.coroutines._DEBUG)')) 2885 2886 # Test with -E to not fail if the unit test was run with 2887 # PYTHONASYNCIODEBUG set to a non-empty string 2888 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 2889 self.assertEqual(stdout.rstrip(), b'False') 2890 2891 sts, stdout, stderr = assert_python_ok('-c', code, 2892 PYTHONASYNCIODEBUG='', 2893 PYTHONDEVMODE='') 2894 self.assertEqual(stdout.rstrip(), b'False') 2895 2896 sts, stdout, stderr = assert_python_ok('-c', code, 2897 PYTHONASYNCIODEBUG='1', 2898 PYTHONDEVMODE='') 2899 self.assertEqual(stdout.rstrip(), b'True') 2900 2901 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 2902 PYTHONASYNCIODEBUG='1', 2903 PYTHONDEVMODE='') 2904 self.assertEqual(stdout.rstrip(), b'False') 2905 2906 # -X dev 2907 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 2908 '-c', code) 2909 self.assertEqual(stdout.rstrip(), b'True') 2910 2911 2912class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 2913 2914 def wrap_futures(self, *futures): 2915 return futures 2916 2917 def _check_empty_sequence(self, seq_or_iter): 2918 asyncio.set_event_loop(self.one_loop) 2919 self.addCleanup(asyncio.set_event_loop, None) 2920 fut = asyncio.gather(*seq_or_iter) 2921 self.assertIsInstance(fut, asyncio.Future) 2922 self.assertIs(fut._loop, self.one_loop) 2923 self._run_loop(self.one_loop) 2924 self.assertTrue(fut.done()) 2925 self.assertEqual(fut.result(), []) 2926 fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) 2927 self.assertIs(fut._loop, self.other_loop) 2928 2929 def test_constructor_empty_sequence(self): 2930 self._check_empty_sequence([]) 2931 self._check_empty_sequence(()) 2932 self._check_empty_sequence(set()) 2933 self._check_empty_sequence(iter("")) 2934 2935 def test_constructor_heterogenous_futures(self): 2936 fut1 = asyncio.Future(loop=self.one_loop) 2937 fut2 = asyncio.Future(loop=self.other_loop) 2938 with self.assertRaises(ValueError): 2939 asyncio.gather(fut1, fut2) 2940 with self.assertRaises(ValueError): 2941 asyncio.gather(fut1, loop=self.other_loop) 2942 2943 def test_constructor_homogenous_futures(self): 2944 children = [asyncio.Future(loop=self.other_loop) for i in range(3)] 2945 fut = asyncio.gather(*children) 2946 self.assertIs(fut._loop, self.other_loop) 2947 self._run_loop(self.other_loop) 2948 self.assertFalse(fut.done()) 2949 fut = asyncio.gather(*children, loop=self.other_loop) 2950 self.assertIs(fut._loop, self.other_loop) 2951 self._run_loop(self.other_loop) 2952 self.assertFalse(fut.done()) 2953 2954 def test_one_cancellation(self): 2955 a, b, c, d, e = [asyncio.Future(loop=self.one_loop) for i in range(5)] 2956 fut = asyncio.gather(a, b, c, d, e) 2957 cb = test_utils.MockCallback() 2958 fut.add_done_callback(cb) 2959 a.set_result(1) 2960 b.cancel() 2961 self._run_loop(self.one_loop) 2962 self.assertTrue(fut.done()) 2963 cb.assert_called_once_with(fut) 2964 self.assertFalse(fut.cancelled()) 2965 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 2966 # Does nothing 2967 c.set_result(3) 2968 d.cancel() 2969 e.set_exception(RuntimeError()) 2970 e.exception() 2971 2972 def test_result_exception_one_cancellation(self): 2973 a, b, c, d, e, f = [asyncio.Future(loop=self.one_loop) 2974 for i in range(6)] 2975 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 2976 cb = test_utils.MockCallback() 2977 fut.add_done_callback(cb) 2978 a.set_result(1) 2979 zde = ZeroDivisionError() 2980 b.set_exception(zde) 2981 c.cancel() 2982 self._run_loop(self.one_loop) 2983 self.assertFalse(fut.done()) 2984 d.set_result(3) 2985 e.cancel() 2986 rte = RuntimeError() 2987 f.set_exception(rte) 2988 res = self.one_loop.run_until_complete(fut) 2989 self.assertIsInstance(res[2], asyncio.CancelledError) 2990 self.assertIsInstance(res[4], asyncio.CancelledError) 2991 res[2] = res[4] = None 2992 self.assertEqual(res, [1, zde, None, 3, None, rte]) 2993 cb.assert_called_once_with(fut) 2994 2995 2996class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 2997 2998 def setUp(self): 2999 super().setUp() 3000 asyncio.set_event_loop(self.one_loop) 3001 3002 def wrap_futures(self, *futures): 3003 coros = [] 3004 for fut in futures: 3005 @asyncio.coroutine 3006 def coro(fut=fut): 3007 return (yield from fut) 3008 coros.append(coro()) 3009 return coros 3010 3011 def test_constructor_loop_selection(self): 3012 @asyncio.coroutine 3013 def coro(): 3014 return 'abc' 3015 gen1 = coro() 3016 gen2 = coro() 3017 fut = asyncio.gather(gen1, gen2) 3018 self.assertIs(fut._loop, self.one_loop) 3019 self.one_loop.run_until_complete(fut) 3020 3021 self.set_event_loop(self.other_loop, cleanup=False) 3022 gen3 = coro() 3023 gen4 = coro() 3024 fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) 3025 self.assertIs(fut2._loop, self.other_loop) 3026 self.other_loop.run_until_complete(fut2) 3027 3028 def test_duplicate_coroutines(self): 3029 @asyncio.coroutine 3030 def coro(s): 3031 return s 3032 c = coro('abc') 3033 fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) 3034 self._run_loop(self.one_loop) 3035 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3036 3037 def test_cancellation_broadcast(self): 3038 # Cancelling outer() cancels all children. 3039 proof = 0 3040 waiter = asyncio.Future(loop=self.one_loop) 3041 3042 @asyncio.coroutine 3043 def inner(): 3044 nonlocal proof 3045 yield from waiter 3046 proof += 1 3047 3048 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3049 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3050 gatherer = None 3051 3052 @asyncio.coroutine 3053 def outer(): 3054 nonlocal proof, gatherer 3055 gatherer = asyncio.gather(child1, child2, loop=self.one_loop) 3056 yield from gatherer 3057 proof += 100 3058 3059 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3060 test_utils.run_briefly(self.one_loop) 3061 self.assertTrue(f.cancel()) 3062 with self.assertRaises(asyncio.CancelledError): 3063 self.one_loop.run_until_complete(f) 3064 self.assertFalse(gatherer.cancel()) 3065 self.assertTrue(waiter.cancelled()) 3066 self.assertTrue(child1.cancelled()) 3067 self.assertTrue(child2.cancelled()) 3068 test_utils.run_briefly(self.one_loop) 3069 self.assertEqual(proof, 0) 3070 3071 def test_exception_marking(self): 3072 # Test for the first line marked "Mark exception retrieved." 3073 3074 @asyncio.coroutine 3075 def inner(f): 3076 yield from f 3077 raise RuntimeError('should not be ignored') 3078 3079 a = asyncio.Future(loop=self.one_loop) 3080 b = asyncio.Future(loop=self.one_loop) 3081 3082 @asyncio.coroutine 3083 def outer(): 3084 yield from asyncio.gather(inner(a), inner(b), loop=self.one_loop) 3085 3086 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3087 test_utils.run_briefly(self.one_loop) 3088 a.set_result(None) 3089 test_utils.run_briefly(self.one_loop) 3090 b.set_result(None) 3091 test_utils.run_briefly(self.one_loop) 3092 self.assertIsInstance(f.exception(), RuntimeError) 3093 3094 3095class RunCoroutineThreadsafeTests(test_utils.TestCase): 3096 """Test case for asyncio.run_coroutine_threadsafe.""" 3097 3098 def setUp(self): 3099 super().setUp() 3100 self.loop = asyncio.new_event_loop() 3101 self.set_event_loop(self.loop) # Will cleanup properly 3102 3103 @asyncio.coroutine 3104 def add(self, a, b, fail=False, cancel=False): 3105 """Wait 0.05 second and return a + b.""" 3106 yield from asyncio.sleep(0.05, loop=self.loop) 3107 if fail: 3108 raise RuntimeError("Fail!") 3109 if cancel: 3110 asyncio.current_task(self.loop).cancel() 3111 yield 3112 return a + b 3113 3114 def target(self, fail=False, cancel=False, timeout=None, 3115 advance_coro=False): 3116 """Run add coroutine in the event loop.""" 3117 coro = self.add(1, 2, fail=fail, cancel=cancel) 3118 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3119 if advance_coro: 3120 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3121 # otherwise it spills errors and breaks **other** unittests, since 3122 # 'target' is interacting with threads. 3123 3124 # With this call, `coro` will be advanced, so that 3125 # CoroWrapper.__del__ won't do anything when asyncio tests run 3126 # in debug mode. 3127 self.loop.call_soon_threadsafe(coro.send, None) 3128 try: 3129 return future.result(timeout) 3130 finally: 3131 future.done() or future.cancel() 3132 3133 def test_run_coroutine_threadsafe(self): 3134 """Test coroutine submission from a thread to an event loop.""" 3135 future = self.loop.run_in_executor(None, self.target) 3136 result = self.loop.run_until_complete(future) 3137 self.assertEqual(result, 3) 3138 3139 def test_run_coroutine_threadsafe_with_exception(self): 3140 """Test coroutine submission from a thread to an event loop 3141 when an exception is raised.""" 3142 future = self.loop.run_in_executor(None, self.target, True) 3143 with self.assertRaises(RuntimeError) as exc_context: 3144 self.loop.run_until_complete(future) 3145 self.assertIn("Fail!", exc_context.exception.args) 3146 3147 def test_run_coroutine_threadsafe_with_timeout(self): 3148 """Test coroutine submission from a thread to an event loop 3149 when a timeout is raised.""" 3150 callback = lambda: self.target(timeout=0) 3151 future = self.loop.run_in_executor(None, callback) 3152 with self.assertRaises(asyncio.TimeoutError): 3153 self.loop.run_until_complete(future) 3154 test_utils.run_briefly(self.loop) 3155 # Check that there's no pending task (add has been cancelled) 3156 for task in asyncio.all_tasks(self.loop): 3157 self.assertTrue(task.done()) 3158 3159 def test_run_coroutine_threadsafe_task_cancelled(self): 3160 """Test coroutine submission from a tread to an event loop 3161 when the task is cancelled.""" 3162 callback = lambda: self.target(cancel=True) 3163 future = self.loop.run_in_executor(None, callback) 3164 with self.assertRaises(asyncio.CancelledError): 3165 self.loop.run_until_complete(future) 3166 3167 def test_run_coroutine_threadsafe_task_factory_exception(self): 3168 """Test coroutine submission from a tread to an event loop 3169 when the task factory raise an exception.""" 3170 3171 def task_factory(loop, coro): 3172 raise NameError 3173 3174 run = self.loop.run_in_executor( 3175 None, lambda: self.target(advance_coro=True)) 3176 3177 # Set exception handler 3178 callback = test_utils.MockCallback() 3179 self.loop.set_exception_handler(callback) 3180 3181 # Set corrupted task factory 3182 self.loop.set_task_factory(task_factory) 3183 3184 # Run event loop 3185 with self.assertRaises(NameError) as exc_context: 3186 self.loop.run_until_complete(run) 3187 3188 # Check exceptions 3189 self.assertEqual(len(callback.call_args_list), 1) 3190 (loop, context), kwargs = callback.call_args 3191 self.assertEqual(context['exception'], exc_context.exception) 3192 3193 3194class SleepTests(test_utils.TestCase): 3195 def setUp(self): 3196 super().setUp() 3197 self.loop = asyncio.new_event_loop() 3198 asyncio.set_event_loop(None) 3199 3200 def tearDown(self): 3201 self.loop.close() 3202 self.loop = None 3203 super().tearDown() 3204 3205 def test_sleep_zero(self): 3206 result = 0 3207 3208 def inc_result(num): 3209 nonlocal result 3210 result += num 3211 3212 @asyncio.coroutine 3213 def coro(): 3214 self.loop.call_soon(inc_result, 1) 3215 self.assertEqual(result, 0) 3216 num = yield from asyncio.sleep(0, loop=self.loop, result=10) 3217 self.assertEqual(result, 1) # inc'ed by call_soon 3218 inc_result(num) # num should be 11 3219 3220 self.loop.run_until_complete(coro()) 3221 self.assertEqual(result, 11) 3222 3223 3224class CompatibilityTests(test_utils.TestCase): 3225 # Tests for checking a bridge between old-styled coroutines 3226 # and async/await syntax 3227 3228 def setUp(self): 3229 super().setUp() 3230 self.loop = asyncio.new_event_loop() 3231 asyncio.set_event_loop(None) 3232 3233 def tearDown(self): 3234 self.loop.close() 3235 self.loop = None 3236 super().tearDown() 3237 3238 def test_yield_from_awaitable(self): 3239 3240 @asyncio.coroutine 3241 def coro(): 3242 yield from asyncio.sleep(0, loop=self.loop) 3243 return 'ok' 3244 3245 result = self.loop.run_until_complete(coro()) 3246 self.assertEqual('ok', result) 3247 3248 def test_await_old_style_coro(self): 3249 3250 @asyncio.coroutine 3251 def coro1(): 3252 return 'ok1' 3253 3254 @asyncio.coroutine 3255 def coro2(): 3256 yield from asyncio.sleep(0, loop=self.loop) 3257 return 'ok2' 3258 3259 async def inner(): 3260 return await asyncio.gather(coro1(), coro2(), loop=self.loop) 3261 3262 result = self.loop.run_until_complete(inner()) 3263 self.assertEqual(['ok1', 'ok2'], result) 3264 3265 def test_debug_mode_interop(self): 3266 # https://bugs.python.org/issue32636 3267 code = textwrap.dedent(""" 3268 import asyncio 3269 3270 async def native_coro(): 3271 pass 3272 3273 @asyncio.coroutine 3274 def old_style_coro(): 3275 yield from native_coro() 3276 3277 asyncio.run(old_style_coro()) 3278 """) 3279 assert_python_ok("-c", code, PYTHONASYNCIODEBUG="1") 3280 3281 3282if __name__ == '__main__': 3283 unittest.main() 3284