1"""Tests for lock.py""" 2 3import unittest 4from unittest import mock 5import re 6 7import asyncio 8from test.test_asyncio import utils as test_utils 9 10STR_RGX_REPR = ( 11 r'^<(?P<class>.*?) object at (?P<address>.*?)' 12 r'\[(?P<extras>' 13 r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?' 14 r')\]>\Z' 15) 16RGX_REPR = re.compile(STR_RGX_REPR) 17 18 19def tearDownModule(): 20 asyncio.set_event_loop_policy(None) 21 22 23class LockTests(test_utils.TestCase): 24 25 def setUp(self): 26 super().setUp() 27 self.loop = self.new_test_loop() 28 29 def test_ctor_loop(self): 30 loop = mock.Mock() 31 with self.assertWarns(DeprecationWarning): 32 lock = asyncio.Lock(loop=loop) 33 self.assertIs(lock._loop, loop) 34 35 with self.assertWarns(DeprecationWarning): 36 lock = asyncio.Lock(loop=self.loop) 37 self.assertIs(lock._loop, self.loop) 38 39 def test_ctor_noloop(self): 40 asyncio.set_event_loop(self.loop) 41 lock = asyncio.Lock() 42 self.assertIs(lock._loop, self.loop) 43 44 def test_repr(self): 45 with self.assertWarns(DeprecationWarning): 46 lock = asyncio.Lock(loop=self.loop) 47 self.assertTrue(repr(lock).endswith('[unlocked]>')) 48 self.assertTrue(RGX_REPR.match(repr(lock))) 49 50 with self.assertWarns(DeprecationWarning): 51 @asyncio.coroutine 52 def acquire_lock(): 53 with self.assertWarns(DeprecationWarning): 54 yield from lock 55 56 self.loop.run_until_complete(acquire_lock()) 57 self.assertTrue(repr(lock).endswith('[locked]>')) 58 self.assertTrue(RGX_REPR.match(repr(lock))) 59 60 def test_lock(self): 61 with self.assertWarns(DeprecationWarning): 62 lock = asyncio.Lock(loop=self.loop) 63 64 65 @asyncio.coroutine 66 def acquire_lock(): 67 with self.assertWarns(DeprecationWarning): 68 return (yield from lock) 69 70 res = self.loop.run_until_complete(acquire_lock()) 71 72 self.assertTrue(res) 73 self.assertTrue(lock.locked()) 74 75 lock.release() 76 self.assertFalse(lock.locked()) 77 78 def test_lock_by_with_statement(self): 79 loop = asyncio.new_event_loop() # don't use TestLoop quirks 80 self.set_event_loop(loop) 81 with self.assertWarns(DeprecationWarning): 82 primitives = [ 83 asyncio.Lock(loop=loop), 84 asyncio.Condition(loop=loop), 85 asyncio.Semaphore(loop=loop), 86 asyncio.BoundedSemaphore(loop=loop), 87 ] 88 89 @asyncio.coroutine 90 def test(lock): 91 yield from asyncio.sleep(0.01) 92 self.assertFalse(lock.locked()) 93 with self.assertWarns(DeprecationWarning): 94 with (yield from lock) as _lock: 95 self.assertIs(_lock, None) 96 self.assertTrue(lock.locked()) 97 yield from asyncio.sleep(0.01) 98 self.assertTrue(lock.locked()) 99 self.assertFalse(lock.locked()) 100 101 for primitive in primitives: 102 loop.run_until_complete(test(primitive)) 103 self.assertFalse(primitive.locked()) 104 105 def test_acquire(self): 106 with self.assertWarns(DeprecationWarning): 107 lock = asyncio.Lock(loop=self.loop) 108 result = [] 109 110 self.assertTrue(self.loop.run_until_complete(lock.acquire())) 111 112 async def c1(result): 113 if await lock.acquire(): 114 result.append(1) 115 return True 116 117 async def c2(result): 118 if await lock.acquire(): 119 result.append(2) 120 return True 121 122 async def c3(result): 123 if await lock.acquire(): 124 result.append(3) 125 return True 126 127 t1 = self.loop.create_task(c1(result)) 128 t2 = self.loop.create_task(c2(result)) 129 130 test_utils.run_briefly(self.loop) 131 self.assertEqual([], result) 132 133 lock.release() 134 test_utils.run_briefly(self.loop) 135 self.assertEqual([1], result) 136 137 test_utils.run_briefly(self.loop) 138 self.assertEqual([1], result) 139 140 t3 = self.loop.create_task(c3(result)) 141 142 lock.release() 143 test_utils.run_briefly(self.loop) 144 self.assertEqual([1, 2], result) 145 146 lock.release() 147 test_utils.run_briefly(self.loop) 148 self.assertEqual([1, 2, 3], result) 149 150 self.assertTrue(t1.done()) 151 self.assertTrue(t1.result()) 152 self.assertTrue(t2.done()) 153 self.assertTrue(t2.result()) 154 self.assertTrue(t3.done()) 155 self.assertTrue(t3.result()) 156 157 def test_acquire_cancel(self): 158 with self.assertWarns(DeprecationWarning): 159 lock = asyncio.Lock(loop=self.loop) 160 self.assertTrue(self.loop.run_until_complete(lock.acquire())) 161 162 task = self.loop.create_task(lock.acquire()) 163 self.loop.call_soon(task.cancel) 164 self.assertRaises( 165 asyncio.CancelledError, 166 self.loop.run_until_complete, task) 167 self.assertFalse(lock._waiters) 168 169 def test_cancel_race(self): 170 # Several tasks: 171 # - A acquires the lock 172 # - B is blocked in acquire() 173 # - C is blocked in acquire() 174 # 175 # Now, concurrently: 176 # - B is cancelled 177 # - A releases the lock 178 # 179 # If B's waiter is marked cancelled but not yet removed from 180 # _waiters, A's release() call will crash when trying to set 181 # B's waiter; instead, it should move on to C's waiter. 182 183 # Setup: A has the lock, b and c are waiting. 184 with self.assertWarns(DeprecationWarning): 185 lock = asyncio.Lock(loop=self.loop) 186 187 async def lockit(name, blocker): 188 await lock.acquire() 189 try: 190 if blocker is not None: 191 await blocker 192 finally: 193 lock.release() 194 195 fa = self.loop.create_future() 196 ta = self.loop.create_task(lockit('A', fa)) 197 test_utils.run_briefly(self.loop) 198 self.assertTrue(lock.locked()) 199 tb = self.loop.create_task(lockit('B', None)) 200 test_utils.run_briefly(self.loop) 201 self.assertEqual(len(lock._waiters), 1) 202 tc = self.loop.create_task(lockit('C', None)) 203 test_utils.run_briefly(self.loop) 204 self.assertEqual(len(lock._waiters), 2) 205 206 # Create the race and check. 207 # Without the fix this failed at the last assert. 208 fa.set_result(None) 209 tb.cancel() 210 self.assertTrue(lock._waiters[0].cancelled()) 211 test_utils.run_briefly(self.loop) 212 self.assertFalse(lock.locked()) 213 self.assertTrue(ta.done()) 214 self.assertTrue(tb.cancelled()) 215 self.assertTrue(tc.done()) 216 217 def test_cancel_release_race(self): 218 # Issue 32734 219 # Acquire 4 locks, cancel second, release first 220 # and 2 locks are taken at once. 221 with self.assertWarns(DeprecationWarning): 222 lock = asyncio.Lock(loop=self.loop) 223 lock_count = 0 224 call_count = 0 225 226 async def lockit(): 227 nonlocal lock_count 228 nonlocal call_count 229 call_count += 1 230 await lock.acquire() 231 lock_count += 1 232 233 async def lockandtrigger(): 234 await lock.acquire() 235 self.loop.call_soon(trigger) 236 237 def trigger(): 238 t1.cancel() 239 lock.release() 240 241 t0 = self.loop.create_task(lockandtrigger()) 242 t1 = self.loop.create_task(lockit()) 243 t2 = self.loop.create_task(lockit()) 244 t3 = self.loop.create_task(lockit()) 245 246 # First loop acquires all 247 test_utils.run_briefly(self.loop) 248 self.assertTrue(t0.done()) 249 250 # Second loop calls trigger 251 test_utils.run_briefly(self.loop) 252 # Third loop calls cancellation 253 test_utils.run_briefly(self.loop) 254 255 # Make sure only one lock was taken 256 self.assertEqual(lock_count, 1) 257 # While 3 calls were made to lockit() 258 self.assertEqual(call_count, 3) 259 self.assertTrue(t1.cancelled() and t2.done()) 260 261 # Cleanup the task that is stuck on acquire. 262 t3.cancel() 263 test_utils.run_briefly(self.loop) 264 self.assertTrue(t3.cancelled()) 265 266 def test_finished_waiter_cancelled(self): 267 with self.assertWarns(DeprecationWarning): 268 lock = asyncio.Lock(loop=self.loop) 269 270 ta = self.loop.create_task(lock.acquire()) 271 test_utils.run_briefly(self.loop) 272 self.assertTrue(lock.locked()) 273 274 tb = self.loop.create_task(lock.acquire()) 275 test_utils.run_briefly(self.loop) 276 self.assertEqual(len(lock._waiters), 1) 277 278 # Create a second waiter, wake up the first, and cancel it. 279 # Without the fix, the second was not woken up. 280 tc = self.loop.create_task(lock.acquire()) 281 lock.release() 282 tb.cancel() 283 test_utils.run_briefly(self.loop) 284 285 self.assertTrue(lock.locked()) 286 self.assertTrue(ta.done()) 287 self.assertTrue(tb.cancelled()) 288 289 def test_release_not_acquired(self): 290 with self.assertWarns(DeprecationWarning): 291 lock = asyncio.Lock(loop=self.loop) 292 293 self.assertRaises(RuntimeError, lock.release) 294 295 def test_release_no_waiters(self): 296 with self.assertWarns(DeprecationWarning): 297 lock = asyncio.Lock(loop=self.loop) 298 self.loop.run_until_complete(lock.acquire()) 299 self.assertTrue(lock.locked()) 300 301 lock.release() 302 self.assertFalse(lock.locked()) 303 304 def test_context_manager(self): 305 with self.assertWarns(DeprecationWarning): 306 lock = asyncio.Lock(loop=self.loop) 307 308 @asyncio.coroutine 309 def acquire_lock(): 310 with self.assertWarns(DeprecationWarning): 311 return (yield from lock) 312 313 with self.loop.run_until_complete(acquire_lock()): 314 self.assertTrue(lock.locked()) 315 316 self.assertFalse(lock.locked()) 317 318 def test_context_manager_cant_reuse(self): 319 with self.assertWarns(DeprecationWarning): 320 lock = asyncio.Lock(loop=self.loop) 321 322 @asyncio.coroutine 323 def acquire_lock(): 324 with self.assertWarns(DeprecationWarning): 325 return (yield from lock) 326 327 # This spells "yield from lock" outside a generator. 328 cm = self.loop.run_until_complete(acquire_lock()) 329 with cm: 330 self.assertTrue(lock.locked()) 331 332 self.assertFalse(lock.locked()) 333 334 with self.assertRaises(AttributeError): 335 with cm: 336 pass 337 338 def test_context_manager_no_yield(self): 339 with self.assertWarns(DeprecationWarning): 340 lock = asyncio.Lock(loop=self.loop) 341 342 try: 343 with lock: 344 self.fail('RuntimeError is not raised in with expression') 345 except RuntimeError as err: 346 self.assertEqual( 347 str(err), 348 '"yield from" should be used as context manager expression') 349 350 self.assertFalse(lock.locked()) 351 352 353class EventTests(test_utils.TestCase): 354 355 def setUp(self): 356 super().setUp() 357 self.loop = self.new_test_loop() 358 359 def test_ctor_loop(self): 360 loop = mock.Mock() 361 with self.assertWarns(DeprecationWarning): 362 ev = asyncio.Event(loop=loop) 363 self.assertIs(ev._loop, loop) 364 365 with self.assertWarns(DeprecationWarning): 366 ev = asyncio.Event(loop=self.loop) 367 self.assertIs(ev._loop, self.loop) 368 369 def test_ctor_noloop(self): 370 asyncio.set_event_loop(self.loop) 371 ev = asyncio.Event() 372 self.assertIs(ev._loop, self.loop) 373 374 def test_repr(self): 375 with self.assertWarns(DeprecationWarning): 376 ev = asyncio.Event(loop=self.loop) 377 self.assertTrue(repr(ev).endswith('[unset]>')) 378 match = RGX_REPR.match(repr(ev)) 379 self.assertEqual(match.group('extras'), 'unset') 380 381 ev.set() 382 self.assertTrue(repr(ev).endswith('[set]>')) 383 self.assertTrue(RGX_REPR.match(repr(ev))) 384 385 ev._waiters.append(mock.Mock()) 386 self.assertTrue('waiters:1' in repr(ev)) 387 self.assertTrue(RGX_REPR.match(repr(ev))) 388 389 def test_wait(self): 390 with self.assertWarns(DeprecationWarning): 391 ev = asyncio.Event(loop=self.loop) 392 self.assertFalse(ev.is_set()) 393 394 result = [] 395 396 async def c1(result): 397 if await ev.wait(): 398 result.append(1) 399 400 async def c2(result): 401 if await ev.wait(): 402 result.append(2) 403 404 async def c3(result): 405 if await ev.wait(): 406 result.append(3) 407 408 t1 = self.loop.create_task(c1(result)) 409 t2 = self.loop.create_task(c2(result)) 410 411 test_utils.run_briefly(self.loop) 412 self.assertEqual([], result) 413 414 t3 = self.loop.create_task(c3(result)) 415 416 ev.set() 417 test_utils.run_briefly(self.loop) 418 self.assertEqual([3, 1, 2], result) 419 420 self.assertTrue(t1.done()) 421 self.assertIsNone(t1.result()) 422 self.assertTrue(t2.done()) 423 self.assertIsNone(t2.result()) 424 self.assertTrue(t3.done()) 425 self.assertIsNone(t3.result()) 426 427 def test_wait_on_set(self): 428 with self.assertWarns(DeprecationWarning): 429 ev = asyncio.Event(loop=self.loop) 430 ev.set() 431 432 res = self.loop.run_until_complete(ev.wait()) 433 self.assertTrue(res) 434 435 def test_wait_cancel(self): 436 with self.assertWarns(DeprecationWarning): 437 ev = asyncio.Event(loop=self.loop) 438 439 wait = self.loop.create_task(ev.wait()) 440 self.loop.call_soon(wait.cancel) 441 self.assertRaises( 442 asyncio.CancelledError, 443 self.loop.run_until_complete, wait) 444 self.assertFalse(ev._waiters) 445 446 def test_clear(self): 447 with self.assertWarns(DeprecationWarning): 448 ev = asyncio.Event(loop=self.loop) 449 self.assertFalse(ev.is_set()) 450 451 ev.set() 452 self.assertTrue(ev.is_set()) 453 454 ev.clear() 455 self.assertFalse(ev.is_set()) 456 457 def test_clear_with_waiters(self): 458 with self.assertWarns(DeprecationWarning): 459 ev = asyncio.Event(loop=self.loop) 460 result = [] 461 462 async def c1(result): 463 if await ev.wait(): 464 result.append(1) 465 return True 466 467 t = self.loop.create_task(c1(result)) 468 test_utils.run_briefly(self.loop) 469 self.assertEqual([], result) 470 471 ev.set() 472 ev.clear() 473 self.assertFalse(ev.is_set()) 474 475 ev.set() 476 ev.set() 477 self.assertEqual(1, len(ev._waiters)) 478 479 test_utils.run_briefly(self.loop) 480 self.assertEqual([1], result) 481 self.assertEqual(0, len(ev._waiters)) 482 483 self.assertTrue(t.done()) 484 self.assertTrue(t.result()) 485 486 487class ConditionTests(test_utils.TestCase): 488 489 def setUp(self): 490 super().setUp() 491 self.loop = self.new_test_loop() 492 493 def test_ctor_loop(self): 494 loop = mock.Mock() 495 with self.assertWarns(DeprecationWarning): 496 cond = asyncio.Condition(loop=loop) 497 self.assertIs(cond._loop, loop) 498 499 cond = asyncio.Condition(loop=self.loop) 500 self.assertIs(cond._loop, self.loop) 501 502 def test_ctor_noloop(self): 503 asyncio.set_event_loop(self.loop) 504 cond = asyncio.Condition() 505 self.assertIs(cond._loop, self.loop) 506 507 def test_wait(self): 508 with self.assertWarns(DeprecationWarning): 509 cond = asyncio.Condition(loop=self.loop) 510 result = [] 511 512 async def c1(result): 513 await cond.acquire() 514 if await cond.wait(): 515 result.append(1) 516 return True 517 518 async def c2(result): 519 await cond.acquire() 520 if await cond.wait(): 521 result.append(2) 522 return True 523 524 async def c3(result): 525 await cond.acquire() 526 if await cond.wait(): 527 result.append(3) 528 return True 529 530 t1 = self.loop.create_task(c1(result)) 531 t2 = self.loop.create_task(c2(result)) 532 t3 = self.loop.create_task(c3(result)) 533 534 test_utils.run_briefly(self.loop) 535 self.assertEqual([], result) 536 self.assertFalse(cond.locked()) 537 538 self.assertTrue(self.loop.run_until_complete(cond.acquire())) 539 cond.notify() 540 test_utils.run_briefly(self.loop) 541 self.assertEqual([], result) 542 self.assertTrue(cond.locked()) 543 544 cond.release() 545 test_utils.run_briefly(self.loop) 546 self.assertEqual([1], result) 547 self.assertTrue(cond.locked()) 548 549 cond.notify(2) 550 test_utils.run_briefly(self.loop) 551 self.assertEqual([1], result) 552 self.assertTrue(cond.locked()) 553 554 cond.release() 555 test_utils.run_briefly(self.loop) 556 self.assertEqual([1, 2], result) 557 self.assertTrue(cond.locked()) 558 559 cond.release() 560 test_utils.run_briefly(self.loop) 561 self.assertEqual([1, 2, 3], result) 562 self.assertTrue(cond.locked()) 563 564 self.assertTrue(t1.done()) 565 self.assertTrue(t1.result()) 566 self.assertTrue(t2.done()) 567 self.assertTrue(t2.result()) 568 self.assertTrue(t3.done()) 569 self.assertTrue(t3.result()) 570 571 def test_wait_cancel(self): 572 with self.assertWarns(DeprecationWarning): 573 cond = asyncio.Condition(loop=self.loop) 574 self.loop.run_until_complete(cond.acquire()) 575 576 wait = self.loop.create_task(cond.wait()) 577 self.loop.call_soon(wait.cancel) 578 self.assertRaises( 579 asyncio.CancelledError, 580 self.loop.run_until_complete, wait) 581 self.assertFalse(cond._waiters) 582 self.assertTrue(cond.locked()) 583 584 def test_wait_cancel_contested(self): 585 with self.assertWarns(DeprecationWarning): 586 cond = asyncio.Condition(loop=self.loop) 587 588 self.loop.run_until_complete(cond.acquire()) 589 self.assertTrue(cond.locked()) 590 591 wait_task = self.loop.create_task(cond.wait()) 592 test_utils.run_briefly(self.loop) 593 self.assertFalse(cond.locked()) 594 595 # Notify, but contest the lock before cancelling 596 self.loop.run_until_complete(cond.acquire()) 597 self.assertTrue(cond.locked()) 598 cond.notify() 599 self.loop.call_soon(wait_task.cancel) 600 self.loop.call_soon(cond.release) 601 602 try: 603 self.loop.run_until_complete(wait_task) 604 except asyncio.CancelledError: 605 # Should not happen, since no cancellation points 606 pass 607 608 self.assertTrue(cond.locked()) 609 610 def test_wait_cancel_after_notify(self): 611 # See bpo-32841 612 with self.assertWarns(DeprecationWarning): 613 cond = asyncio.Condition(loop=self.loop) 614 waited = False 615 616 async def wait_on_cond(): 617 nonlocal waited 618 async with cond: 619 waited = True # Make sure this area was reached 620 await cond.wait() 621 622 waiter = asyncio.ensure_future(wait_on_cond(), loop=self.loop) 623 test_utils.run_briefly(self.loop) # Start waiting 624 625 self.loop.run_until_complete(cond.acquire()) 626 cond.notify() 627 test_utils.run_briefly(self.loop) # Get to acquire() 628 waiter.cancel() 629 test_utils.run_briefly(self.loop) # Activate cancellation 630 cond.release() 631 test_utils.run_briefly(self.loop) # Cancellation should occur 632 633 self.assertTrue(waiter.cancelled()) 634 self.assertTrue(waited) 635 636 def test_wait_unacquired(self): 637 with self.assertWarns(DeprecationWarning): 638 cond = asyncio.Condition(loop=self.loop) 639 self.assertRaises( 640 RuntimeError, 641 self.loop.run_until_complete, cond.wait()) 642 643 def test_wait_for(self): 644 with self.assertWarns(DeprecationWarning): 645 cond = asyncio.Condition(loop=self.loop) 646 presult = False 647 648 def predicate(): 649 return presult 650 651 result = [] 652 653 async def c1(result): 654 await cond.acquire() 655 if await cond.wait_for(predicate): 656 result.append(1) 657 cond.release() 658 return True 659 660 t = self.loop.create_task(c1(result)) 661 662 test_utils.run_briefly(self.loop) 663 self.assertEqual([], result) 664 665 self.loop.run_until_complete(cond.acquire()) 666 cond.notify() 667 cond.release() 668 test_utils.run_briefly(self.loop) 669 self.assertEqual([], result) 670 671 presult = True 672 self.loop.run_until_complete(cond.acquire()) 673 cond.notify() 674 cond.release() 675 test_utils.run_briefly(self.loop) 676 self.assertEqual([1], result) 677 678 self.assertTrue(t.done()) 679 self.assertTrue(t.result()) 680 681 def test_wait_for_unacquired(self): 682 with self.assertWarns(DeprecationWarning): 683 cond = asyncio.Condition(loop=self.loop) 684 685 # predicate can return true immediately 686 res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3])) 687 self.assertEqual([1, 2, 3], res) 688 689 self.assertRaises( 690 RuntimeError, 691 self.loop.run_until_complete, 692 cond.wait_for(lambda: False)) 693 694 def test_notify(self): 695 with self.assertWarns(DeprecationWarning): 696 cond = asyncio.Condition(loop=self.loop) 697 result = [] 698 699 async def c1(result): 700 await cond.acquire() 701 if await cond.wait(): 702 result.append(1) 703 cond.release() 704 return True 705 706 async def c2(result): 707 await cond.acquire() 708 if await cond.wait(): 709 result.append(2) 710 cond.release() 711 return True 712 713 async def c3(result): 714 await cond.acquire() 715 if await cond.wait(): 716 result.append(3) 717 cond.release() 718 return True 719 720 t1 = self.loop.create_task(c1(result)) 721 t2 = self.loop.create_task(c2(result)) 722 t3 = self.loop.create_task(c3(result)) 723 724 test_utils.run_briefly(self.loop) 725 self.assertEqual([], result) 726 727 self.loop.run_until_complete(cond.acquire()) 728 cond.notify(1) 729 cond.release() 730 test_utils.run_briefly(self.loop) 731 self.assertEqual([1], result) 732 733 self.loop.run_until_complete(cond.acquire()) 734 cond.notify(1) 735 cond.notify(2048) 736 cond.release() 737 test_utils.run_briefly(self.loop) 738 self.assertEqual([1, 2, 3], result) 739 740 self.assertTrue(t1.done()) 741 self.assertTrue(t1.result()) 742 self.assertTrue(t2.done()) 743 self.assertTrue(t2.result()) 744 self.assertTrue(t3.done()) 745 self.assertTrue(t3.result()) 746 747 def test_notify_all(self): 748 with self.assertWarns(DeprecationWarning): 749 cond = asyncio.Condition(loop=self.loop) 750 751 result = [] 752 753 async def c1(result): 754 await cond.acquire() 755 if await cond.wait(): 756 result.append(1) 757 cond.release() 758 return True 759 760 async def c2(result): 761 await cond.acquire() 762 if await cond.wait(): 763 result.append(2) 764 cond.release() 765 return True 766 767 t1 = self.loop.create_task(c1(result)) 768 t2 = self.loop.create_task(c2(result)) 769 770 test_utils.run_briefly(self.loop) 771 self.assertEqual([], result) 772 773 self.loop.run_until_complete(cond.acquire()) 774 cond.notify_all() 775 cond.release() 776 test_utils.run_briefly(self.loop) 777 self.assertEqual([1, 2], result) 778 779 self.assertTrue(t1.done()) 780 self.assertTrue(t1.result()) 781 self.assertTrue(t2.done()) 782 self.assertTrue(t2.result()) 783 784 def test_notify_unacquired(self): 785 with self.assertWarns(DeprecationWarning): 786 cond = asyncio.Condition(loop=self.loop) 787 self.assertRaises(RuntimeError, cond.notify) 788 789 def test_notify_all_unacquired(self): 790 with self.assertWarns(DeprecationWarning): 791 cond = asyncio.Condition(loop=self.loop) 792 self.assertRaises(RuntimeError, cond.notify_all) 793 794 def test_repr(self): 795 with self.assertWarns(DeprecationWarning): 796 cond = asyncio.Condition(loop=self.loop) 797 self.assertTrue('unlocked' in repr(cond)) 798 self.assertTrue(RGX_REPR.match(repr(cond))) 799 800 self.loop.run_until_complete(cond.acquire()) 801 self.assertTrue('locked' in repr(cond)) 802 803 cond._waiters.append(mock.Mock()) 804 self.assertTrue('waiters:1' in repr(cond)) 805 self.assertTrue(RGX_REPR.match(repr(cond))) 806 807 cond._waiters.append(mock.Mock()) 808 self.assertTrue('waiters:2' in repr(cond)) 809 self.assertTrue(RGX_REPR.match(repr(cond))) 810 811 def test_context_manager(self): 812 with self.assertWarns(DeprecationWarning): 813 cond = asyncio.Condition(loop=self.loop) 814 815 with self.assertWarns(DeprecationWarning): 816 @asyncio.coroutine 817 def acquire_cond(): 818 with self.assertWarns(DeprecationWarning): 819 return (yield from cond) 820 821 with self.loop.run_until_complete(acquire_cond()): 822 self.assertTrue(cond.locked()) 823 824 self.assertFalse(cond.locked()) 825 826 def test_context_manager_no_yield(self): 827 with self.assertWarns(DeprecationWarning): 828 cond = asyncio.Condition(loop=self.loop) 829 830 try: 831 with cond: 832 self.fail('RuntimeError is not raised in with expression') 833 except RuntimeError as err: 834 self.assertEqual( 835 str(err), 836 '"yield from" should be used as context manager expression') 837 838 self.assertFalse(cond.locked()) 839 840 def test_explicit_lock(self): 841 with self.assertWarns(DeprecationWarning): 842 lock = asyncio.Lock(loop=self.loop) 843 cond = asyncio.Condition(lock, loop=self.loop) 844 845 self.assertIs(cond._lock, lock) 846 self.assertIs(cond._loop, lock._loop) 847 848 def test_ambiguous_loops(self): 849 loop = self.new_test_loop() 850 self.addCleanup(loop.close) 851 with self.assertWarns(DeprecationWarning): 852 lock = asyncio.Lock(loop=self.loop) 853 with self.assertRaises(ValueError): 854 asyncio.Condition(lock, loop=loop) 855 856 def test_timeout_in_block(self): 857 loop = asyncio.new_event_loop() 858 self.addCleanup(loop.close) 859 860 async def task_timeout(): 861 condition = asyncio.Condition(loop=loop) 862 async with condition: 863 with self.assertRaises(asyncio.TimeoutError): 864 await asyncio.wait_for(condition.wait(), timeout=0.5) 865 866 with self.assertWarns(DeprecationWarning): 867 loop.run_until_complete(task_timeout()) 868 869 870class SemaphoreTests(test_utils.TestCase): 871 872 def setUp(self): 873 super().setUp() 874 self.loop = self.new_test_loop() 875 876 def test_ctor_loop(self): 877 loop = mock.Mock() 878 with self.assertWarns(DeprecationWarning): 879 sem = asyncio.Semaphore(loop=loop) 880 self.assertIs(sem._loop, loop) 881 882 with self.assertWarns(DeprecationWarning): 883 sem = asyncio.Semaphore(loop=self.loop) 884 self.assertIs(sem._loop, self.loop) 885 886 def test_ctor_noloop(self): 887 asyncio.set_event_loop(self.loop) 888 sem = asyncio.Semaphore() 889 self.assertIs(sem._loop, self.loop) 890 891 def test_initial_value_zero(self): 892 with self.assertWarns(DeprecationWarning): 893 sem = asyncio.Semaphore(0, loop=self.loop) 894 self.assertTrue(sem.locked()) 895 896 def test_repr(self): 897 with self.assertWarns(DeprecationWarning): 898 sem = asyncio.Semaphore(loop=self.loop) 899 self.assertTrue(repr(sem).endswith('[unlocked, value:1]>')) 900 self.assertTrue(RGX_REPR.match(repr(sem))) 901 902 self.loop.run_until_complete(sem.acquire()) 903 self.assertTrue(repr(sem).endswith('[locked]>')) 904 self.assertTrue('waiters' not in repr(sem)) 905 self.assertTrue(RGX_REPR.match(repr(sem))) 906 907 sem._waiters.append(mock.Mock()) 908 self.assertTrue('waiters:1' in repr(sem)) 909 self.assertTrue(RGX_REPR.match(repr(sem))) 910 911 sem._waiters.append(mock.Mock()) 912 self.assertTrue('waiters:2' in repr(sem)) 913 self.assertTrue(RGX_REPR.match(repr(sem))) 914 915 def test_semaphore(self): 916 with self.assertWarns(DeprecationWarning): 917 sem = asyncio.Semaphore(loop=self.loop) 918 self.assertEqual(1, sem._value) 919 920 with self.assertWarns(DeprecationWarning): 921 @asyncio.coroutine 922 def acquire_lock(): 923 with self.assertWarns(DeprecationWarning): 924 return (yield from sem) 925 926 res = self.loop.run_until_complete(acquire_lock()) 927 928 self.assertTrue(res) 929 self.assertTrue(sem.locked()) 930 self.assertEqual(0, sem._value) 931 932 sem.release() 933 self.assertFalse(sem.locked()) 934 self.assertEqual(1, sem._value) 935 936 def test_semaphore_value(self): 937 self.assertRaises(ValueError, asyncio.Semaphore, -1) 938 939 def test_acquire(self): 940 with self.assertWarns(DeprecationWarning): 941 sem = asyncio.Semaphore(3, loop=self.loop) 942 result = [] 943 944 self.assertTrue(self.loop.run_until_complete(sem.acquire())) 945 self.assertTrue(self.loop.run_until_complete(sem.acquire())) 946 self.assertFalse(sem.locked()) 947 948 async def c1(result): 949 await sem.acquire() 950 result.append(1) 951 return True 952 953 async def c2(result): 954 await sem.acquire() 955 result.append(2) 956 return True 957 958 async def c3(result): 959 await sem.acquire() 960 result.append(3) 961 return True 962 963 async def c4(result): 964 await sem.acquire() 965 result.append(4) 966 return True 967 968 t1 = self.loop.create_task(c1(result)) 969 t2 = self.loop.create_task(c2(result)) 970 t3 = self.loop.create_task(c3(result)) 971 972 test_utils.run_briefly(self.loop) 973 self.assertEqual([1], result) 974 self.assertTrue(sem.locked()) 975 self.assertEqual(2, len(sem._waiters)) 976 self.assertEqual(0, sem._value) 977 978 t4 = self.loop.create_task(c4(result)) 979 980 sem.release() 981 sem.release() 982 self.assertEqual(2, sem._value) 983 984 test_utils.run_briefly(self.loop) 985 self.assertEqual(0, sem._value) 986 self.assertEqual(3, len(result)) 987 self.assertTrue(sem.locked()) 988 self.assertEqual(1, len(sem._waiters)) 989 self.assertEqual(0, sem._value) 990 991 self.assertTrue(t1.done()) 992 self.assertTrue(t1.result()) 993 race_tasks = [t2, t3, t4] 994 done_tasks = [t for t in race_tasks if t.done() and t.result()] 995 self.assertTrue(2, len(done_tasks)) 996 997 # cleanup locked semaphore 998 sem.release() 999 self.loop.run_until_complete(asyncio.gather(*race_tasks)) 1000 1001 def test_acquire_cancel(self): 1002 with self.assertWarns(DeprecationWarning): 1003 sem = asyncio.Semaphore(loop=self.loop) 1004 self.loop.run_until_complete(sem.acquire()) 1005 1006 acquire = self.loop.create_task(sem.acquire()) 1007 self.loop.call_soon(acquire.cancel) 1008 self.assertRaises( 1009 asyncio.CancelledError, 1010 self.loop.run_until_complete, acquire) 1011 self.assertTrue((not sem._waiters) or 1012 all(waiter.done() for waiter in sem._waiters)) 1013 1014 def test_acquire_cancel_before_awoken(self): 1015 with self.assertWarns(DeprecationWarning): 1016 sem = asyncio.Semaphore(value=0, loop=self.loop) 1017 1018 t1 = self.loop.create_task(sem.acquire()) 1019 t2 = self.loop.create_task(sem.acquire()) 1020 t3 = self.loop.create_task(sem.acquire()) 1021 t4 = self.loop.create_task(sem.acquire()) 1022 1023 test_utils.run_briefly(self.loop) 1024 1025 sem.release() 1026 t1.cancel() 1027 t2.cancel() 1028 1029 test_utils.run_briefly(self.loop) 1030 num_done = sum(t.done() for t in [t3, t4]) 1031 self.assertEqual(num_done, 1) 1032 1033 t3.cancel() 1034 t4.cancel() 1035 test_utils.run_briefly(self.loop) 1036 1037 def test_acquire_hang(self): 1038 with self.assertWarns(DeprecationWarning): 1039 sem = asyncio.Semaphore(value=0, loop=self.loop) 1040 1041 t1 = self.loop.create_task(sem.acquire()) 1042 t2 = self.loop.create_task(sem.acquire()) 1043 1044 test_utils.run_briefly(self.loop) 1045 1046 sem.release() 1047 t1.cancel() 1048 1049 test_utils.run_briefly(self.loop) 1050 self.assertTrue(sem.locked()) 1051 1052 def test_release_not_acquired(self): 1053 with self.assertWarns(DeprecationWarning): 1054 sem = asyncio.BoundedSemaphore(loop=self.loop) 1055 1056 self.assertRaises(ValueError, sem.release) 1057 1058 def test_release_no_waiters(self): 1059 with self.assertWarns(DeprecationWarning): 1060 sem = asyncio.Semaphore(loop=self.loop) 1061 self.loop.run_until_complete(sem.acquire()) 1062 self.assertTrue(sem.locked()) 1063 1064 sem.release() 1065 self.assertFalse(sem.locked()) 1066 1067 def test_context_manager(self): 1068 with self.assertWarns(DeprecationWarning): 1069 sem = asyncio.Semaphore(2, loop=self.loop) 1070 1071 @asyncio.coroutine 1072 def acquire_lock(): 1073 with self.assertWarns(DeprecationWarning): 1074 return (yield from sem) 1075 1076 with self.loop.run_until_complete(acquire_lock()): 1077 self.assertFalse(sem.locked()) 1078 self.assertEqual(1, sem._value) 1079 1080 with self.loop.run_until_complete(acquire_lock()): 1081 self.assertTrue(sem.locked()) 1082 1083 self.assertEqual(2, sem._value) 1084 1085 def test_context_manager_no_yield(self): 1086 with self.assertWarns(DeprecationWarning): 1087 sem = asyncio.Semaphore(2, loop=self.loop) 1088 1089 try: 1090 with sem: 1091 self.fail('RuntimeError is not raised in with expression') 1092 except RuntimeError as err: 1093 self.assertEqual( 1094 str(err), 1095 '"yield from" should be used as context manager expression') 1096 1097 self.assertEqual(2, sem._value) 1098 1099 1100if __name__ == '__main__': 1101 unittest.main() 1102