1"""Tests for lock.py""" 2 3import unittest 4from unittest import mock 5import re 6 7import asyncio 8 9STR_RGX_REPR = ( 10 r'^<(?P<class>.*?) object at (?P<address>.*?)' 11 r'\[(?P<extras>' 12 r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?' 13 r')\]>\Z' 14) 15RGX_REPR = re.compile(STR_RGX_REPR) 16 17 18def tearDownModule(): 19 asyncio.set_event_loop_policy(None) 20 21 22class LockTests(unittest.IsolatedAsyncioTestCase): 23 24 async def test_repr(self): 25 lock = asyncio.Lock() 26 self.assertTrue(repr(lock).endswith('[unlocked]>')) 27 self.assertTrue(RGX_REPR.match(repr(lock))) 28 29 await lock.acquire() 30 self.assertTrue(repr(lock).endswith('[locked]>')) 31 self.assertTrue(RGX_REPR.match(repr(lock))) 32 33 async def test_lock(self): 34 lock = asyncio.Lock() 35 36 with self.assertWarns(DeprecationWarning): 37 @asyncio.coroutine 38 def acquire_lock(): 39 return (yield from lock) 40 41 with self.assertRaisesRegex( 42 TypeError, 43 "object is not iterable" 44 ): 45 await acquire_lock() 46 47 self.assertFalse(lock.locked()) 48 49 async def test_lock_doesnt_accept_loop_parameter(self): 50 primitives_cls = [ 51 asyncio.Lock, 52 asyncio.Condition, 53 asyncio.Event, 54 asyncio.Semaphore, 55 asyncio.BoundedSemaphore, 56 ] 57 58 loop = asyncio.get_running_loop() 59 60 for cls in primitives_cls: 61 with self.assertRaisesRegex( 62 TypeError, 63 rf'As of 3.10, the \*loop\* parameter was removed from ' 64 rf'{cls.__name__}\(\) since it is no longer necessary' 65 ): 66 cls(loop=loop) 67 68 async def test_lock_by_with_statement(self): 69 primitives = [ 70 asyncio.Lock(), 71 asyncio.Condition(), 72 asyncio.Semaphore(), 73 asyncio.BoundedSemaphore(), 74 ] 75 76 with self.assertWarns(DeprecationWarning): 77 @asyncio.coroutine 78 def test(lock): 79 yield from asyncio.sleep(0.01) 80 self.assertFalse(lock.locked()) 81 with self.assertRaisesRegex( 82 TypeError, 83 "object is not iterable" 84 ): 85 with (yield from lock): 86 pass 87 self.assertFalse(lock.locked()) 88 89 for lock in primitives: 90 await test(lock) 91 self.assertFalse(lock.locked()) 92 93 async def test_acquire(self): 94 lock = asyncio.Lock() 95 result = [] 96 97 self.assertTrue(await lock.acquire()) 98 99 async def c1(result): 100 if await lock.acquire(): 101 result.append(1) 102 return True 103 104 async def c2(result): 105 if await lock.acquire(): 106 result.append(2) 107 return True 108 109 async def c3(result): 110 if await lock.acquire(): 111 result.append(3) 112 return True 113 114 t1 = asyncio.create_task(c1(result)) 115 t2 = asyncio.create_task(c2(result)) 116 117 await asyncio.sleep(0) 118 self.assertEqual([], result) 119 120 lock.release() 121 await asyncio.sleep(0) 122 self.assertEqual([1], result) 123 124 await asyncio.sleep(0) 125 self.assertEqual([1], result) 126 127 t3 = asyncio.create_task(c3(result)) 128 129 lock.release() 130 await asyncio.sleep(0) 131 self.assertEqual([1, 2], result) 132 133 lock.release() 134 await asyncio.sleep(0) 135 self.assertEqual([1, 2, 3], result) 136 137 self.assertTrue(t1.done()) 138 self.assertTrue(t1.result()) 139 self.assertTrue(t2.done()) 140 self.assertTrue(t2.result()) 141 self.assertTrue(t3.done()) 142 self.assertTrue(t3.result()) 143 144 async def test_acquire_cancel(self): 145 lock = asyncio.Lock() 146 self.assertTrue(await lock.acquire()) 147 148 task = asyncio.create_task(lock.acquire()) 149 asyncio.get_running_loop().call_soon(task.cancel) 150 with self.assertRaises(asyncio.CancelledError): 151 await task 152 self.assertFalse(lock._waiters) 153 154 async def test_cancel_race(self): 155 # Several tasks: 156 # - A acquires the lock 157 # - B is blocked in acquire() 158 # - C is blocked in acquire() 159 # 160 # Now, concurrently: 161 # - B is cancelled 162 # - A releases the lock 163 # 164 # If B's waiter is marked cancelled but not yet removed from 165 # _waiters, A's release() call will crash when trying to set 166 # B's waiter; instead, it should move on to C's waiter. 167 168 # Setup: A has the lock, b and c are waiting. 169 lock = asyncio.Lock() 170 171 async def lockit(name, blocker): 172 await lock.acquire() 173 try: 174 if blocker is not None: 175 await blocker 176 finally: 177 lock.release() 178 179 fa = asyncio.get_running_loop().create_future() 180 ta = asyncio.create_task(lockit('A', fa)) 181 await asyncio.sleep(0) 182 self.assertTrue(lock.locked()) 183 tb = asyncio.create_task(lockit('B', None)) 184 await asyncio.sleep(0) 185 self.assertEqual(len(lock._waiters), 1) 186 tc = asyncio.create_task(lockit('C', None)) 187 await asyncio.sleep(0) 188 self.assertEqual(len(lock._waiters), 2) 189 190 # Create the race and check. 191 # Without the fix this failed at the last assert. 192 fa.set_result(None) 193 tb.cancel() 194 self.assertTrue(lock._waiters[0].cancelled()) 195 await asyncio.sleep(0) 196 self.assertFalse(lock.locked()) 197 self.assertTrue(ta.done()) 198 self.assertTrue(tb.cancelled()) 199 await tc 200 201 async def test_cancel_release_race(self): 202 # Issue 32734 203 # Acquire 4 locks, cancel second, release first 204 # and 2 locks are taken at once. 205 loop = asyncio.get_running_loop() 206 lock = asyncio.Lock() 207 lock_count = 0 208 call_count = 0 209 210 async def lockit(): 211 nonlocal lock_count 212 nonlocal call_count 213 call_count += 1 214 await lock.acquire() 215 lock_count += 1 216 217 def trigger(): 218 t1.cancel() 219 lock.release() 220 221 await lock.acquire() 222 223 t1 = asyncio.create_task(lockit()) 224 t2 = asyncio.create_task(lockit()) 225 t3 = asyncio.create_task(lockit()) 226 227 # Start scheduled tasks 228 await asyncio.sleep(0) 229 230 loop.call_soon(trigger) 231 with self.assertRaises(asyncio.CancelledError): 232 # Wait for cancellation 233 await t1 234 235 # Make sure only one lock was taken 236 self.assertEqual(lock_count, 1) 237 # While 3 calls were made to lockit() 238 self.assertEqual(call_count, 3) 239 self.assertTrue(t1.cancelled() and t2.done()) 240 241 # Cleanup the task that is stuck on acquire. 242 t3.cancel() 243 await asyncio.sleep(0) 244 self.assertTrue(t3.cancelled()) 245 246 async def test_finished_waiter_cancelled(self): 247 lock = asyncio.Lock() 248 249 await lock.acquire() 250 self.assertTrue(lock.locked()) 251 252 tb = asyncio.create_task(lock.acquire()) 253 await asyncio.sleep(0) 254 self.assertEqual(len(lock._waiters), 1) 255 256 # Create a second waiter, wake up the first, and cancel it. 257 # Without the fix, the second was not woken up. 258 tc = asyncio.create_task(lock.acquire()) 259 tb.cancel() 260 lock.release() 261 await asyncio.sleep(0) 262 263 self.assertTrue(lock.locked()) 264 self.assertTrue(tb.cancelled()) 265 266 # Cleanup 267 await tc 268 269 async def test_release_not_acquired(self): 270 lock = asyncio.Lock() 271 272 self.assertRaises(RuntimeError, lock.release) 273 274 async def test_release_no_waiters(self): 275 lock = asyncio.Lock() 276 await lock.acquire() 277 self.assertTrue(lock.locked()) 278 279 lock.release() 280 self.assertFalse(lock.locked()) 281 282 async def test_context_manager(self): 283 lock = asyncio.Lock() 284 self.assertFalse(lock.locked()) 285 286 async with lock: 287 self.assertTrue(lock.locked()) 288 289 self.assertFalse(lock.locked()) 290 291 292class EventTests(unittest.IsolatedAsyncioTestCase): 293 294 def test_repr(self): 295 ev = asyncio.Event() 296 self.assertTrue(repr(ev).endswith('[unset]>')) 297 match = RGX_REPR.match(repr(ev)) 298 self.assertEqual(match.group('extras'), 'unset') 299 300 ev.set() 301 self.assertTrue(repr(ev).endswith('[set]>')) 302 self.assertTrue(RGX_REPR.match(repr(ev))) 303 304 ev._waiters.append(mock.Mock()) 305 self.assertTrue('waiters:1' in repr(ev)) 306 self.assertTrue(RGX_REPR.match(repr(ev))) 307 308 async def test_wait(self): 309 ev = asyncio.Event() 310 self.assertFalse(ev.is_set()) 311 312 result = [] 313 314 async def c1(result): 315 if await ev.wait(): 316 result.append(1) 317 318 async def c2(result): 319 if await ev.wait(): 320 result.append(2) 321 322 async def c3(result): 323 if await ev.wait(): 324 result.append(3) 325 326 t1 = asyncio.create_task(c1(result)) 327 t2 = asyncio.create_task(c2(result)) 328 329 await asyncio.sleep(0) 330 self.assertEqual([], result) 331 332 t3 = asyncio.create_task(c3(result)) 333 334 ev.set() 335 await asyncio.sleep(0) 336 self.assertEqual([3, 1, 2], result) 337 338 self.assertTrue(t1.done()) 339 self.assertIsNone(t1.result()) 340 self.assertTrue(t2.done()) 341 self.assertIsNone(t2.result()) 342 self.assertTrue(t3.done()) 343 self.assertIsNone(t3.result()) 344 345 async def test_wait_on_set(self): 346 ev = asyncio.Event() 347 ev.set() 348 349 res = await ev.wait() 350 self.assertTrue(res) 351 352 async def test_wait_cancel(self): 353 ev = asyncio.Event() 354 355 wait = asyncio.create_task(ev.wait()) 356 asyncio.get_running_loop().call_soon(wait.cancel) 357 with self.assertRaises(asyncio.CancelledError): 358 await wait 359 self.assertFalse(ev._waiters) 360 361 async def test_clear(self): 362 ev = asyncio.Event() 363 self.assertFalse(ev.is_set()) 364 365 ev.set() 366 self.assertTrue(ev.is_set()) 367 368 ev.clear() 369 self.assertFalse(ev.is_set()) 370 371 async def test_clear_with_waiters(self): 372 ev = asyncio.Event() 373 result = [] 374 375 async def c1(result): 376 if await ev.wait(): 377 result.append(1) 378 return True 379 380 t = asyncio.create_task(c1(result)) 381 await asyncio.sleep(0) 382 self.assertEqual([], result) 383 384 ev.set() 385 ev.clear() 386 self.assertFalse(ev.is_set()) 387 388 ev.set() 389 ev.set() 390 self.assertEqual(1, len(ev._waiters)) 391 392 await asyncio.sleep(0) 393 self.assertEqual([1], result) 394 self.assertEqual(0, len(ev._waiters)) 395 396 self.assertTrue(t.done()) 397 self.assertTrue(t.result()) 398 399 400class ConditionTests(unittest.IsolatedAsyncioTestCase): 401 402 async def test_wait(self): 403 cond = asyncio.Condition() 404 result = [] 405 406 async def c1(result): 407 await cond.acquire() 408 if await cond.wait(): 409 result.append(1) 410 return True 411 412 async def c2(result): 413 await cond.acquire() 414 if await cond.wait(): 415 result.append(2) 416 return True 417 418 async def c3(result): 419 await cond.acquire() 420 if await cond.wait(): 421 result.append(3) 422 return True 423 424 t1 = asyncio.create_task(c1(result)) 425 t2 = asyncio.create_task(c2(result)) 426 t3 = asyncio.create_task(c3(result)) 427 428 await asyncio.sleep(0) 429 self.assertEqual([], result) 430 self.assertFalse(cond.locked()) 431 432 self.assertTrue(await cond.acquire()) 433 cond.notify() 434 await asyncio.sleep(0) 435 self.assertEqual([], result) 436 self.assertTrue(cond.locked()) 437 438 cond.release() 439 await asyncio.sleep(0) 440 self.assertEqual([1], result) 441 self.assertTrue(cond.locked()) 442 443 cond.notify(2) 444 await asyncio.sleep(0) 445 self.assertEqual([1], result) 446 self.assertTrue(cond.locked()) 447 448 cond.release() 449 await asyncio.sleep(0) 450 self.assertEqual([1, 2], result) 451 self.assertTrue(cond.locked()) 452 453 cond.release() 454 await asyncio.sleep(0) 455 self.assertEqual([1, 2, 3], result) 456 self.assertTrue(cond.locked()) 457 458 self.assertTrue(t1.done()) 459 self.assertTrue(t1.result()) 460 self.assertTrue(t2.done()) 461 self.assertTrue(t2.result()) 462 self.assertTrue(t3.done()) 463 self.assertTrue(t3.result()) 464 465 async def test_wait_cancel(self): 466 cond = asyncio.Condition() 467 await cond.acquire() 468 469 wait = asyncio.create_task(cond.wait()) 470 asyncio.get_running_loop().call_soon(wait.cancel) 471 with self.assertRaises(asyncio.CancelledError): 472 await wait 473 self.assertFalse(cond._waiters) 474 self.assertTrue(cond.locked()) 475 476 async def test_wait_cancel_contested(self): 477 cond = asyncio.Condition() 478 479 await cond.acquire() 480 self.assertTrue(cond.locked()) 481 482 wait_task = asyncio.create_task(cond.wait()) 483 await asyncio.sleep(0) 484 self.assertFalse(cond.locked()) 485 486 # Notify, but contest the lock before cancelling 487 await cond.acquire() 488 self.assertTrue(cond.locked()) 489 cond.notify() 490 asyncio.get_running_loop().call_soon(wait_task.cancel) 491 asyncio.get_running_loop().call_soon(cond.release) 492 493 try: 494 await wait_task 495 except asyncio.CancelledError: 496 # Should not happen, since no cancellation points 497 pass 498 499 self.assertTrue(cond.locked()) 500 501 async def test_wait_cancel_after_notify(self): 502 # See bpo-32841 503 waited = False 504 505 cond = asyncio.Condition() 506 507 async def wait_on_cond(): 508 nonlocal waited 509 async with cond: 510 waited = True # Make sure this area was reached 511 await cond.wait() 512 513 waiter = asyncio.create_task(wait_on_cond()) 514 await asyncio.sleep(0) # Start waiting 515 516 await cond.acquire() 517 cond.notify() 518 await asyncio.sleep(0) # Get to acquire() 519 waiter.cancel() 520 await asyncio.sleep(0) # Activate cancellation 521 cond.release() 522 await asyncio.sleep(0) # Cancellation should occur 523 524 self.assertTrue(waiter.cancelled()) 525 self.assertTrue(waited) 526 527 async def test_wait_unacquired(self): 528 cond = asyncio.Condition() 529 with self.assertRaises(RuntimeError): 530 await cond.wait() 531 532 async def test_wait_for(self): 533 cond = asyncio.Condition() 534 presult = False 535 536 def predicate(): 537 return presult 538 539 result = [] 540 541 async def c1(result): 542 await cond.acquire() 543 if await cond.wait_for(predicate): 544 result.append(1) 545 cond.release() 546 return True 547 548 t = asyncio.create_task(c1(result)) 549 550 await asyncio.sleep(0) 551 self.assertEqual([], result) 552 553 await cond.acquire() 554 cond.notify() 555 cond.release() 556 await asyncio.sleep(0) 557 self.assertEqual([], result) 558 559 presult = True 560 await cond.acquire() 561 cond.notify() 562 cond.release() 563 await asyncio.sleep(0) 564 self.assertEqual([1], result) 565 566 self.assertTrue(t.done()) 567 self.assertTrue(t.result()) 568 569 async def test_wait_for_unacquired(self): 570 cond = asyncio.Condition() 571 572 # predicate can return true immediately 573 res = await cond.wait_for(lambda: [1, 2, 3]) 574 self.assertEqual([1, 2, 3], res) 575 576 with self.assertRaises(RuntimeError): 577 await cond.wait_for(lambda: False) 578 579 async def test_notify(self): 580 cond = asyncio.Condition() 581 result = [] 582 583 async def c1(result): 584 await cond.acquire() 585 if await cond.wait(): 586 result.append(1) 587 cond.release() 588 return True 589 590 async def c2(result): 591 await cond.acquire() 592 if await cond.wait(): 593 result.append(2) 594 cond.release() 595 return True 596 597 async def c3(result): 598 await cond.acquire() 599 if await cond.wait(): 600 result.append(3) 601 cond.release() 602 return True 603 604 t1 = asyncio.create_task(c1(result)) 605 t2 = asyncio.create_task(c2(result)) 606 t3 = asyncio.create_task(c3(result)) 607 608 await asyncio.sleep(0) 609 self.assertEqual([], result) 610 611 await cond.acquire() 612 cond.notify(1) 613 cond.release() 614 await asyncio.sleep(0) 615 self.assertEqual([1], result) 616 617 await cond.acquire() 618 cond.notify(1) 619 cond.notify(2048) 620 cond.release() 621 await asyncio.sleep(0) 622 self.assertEqual([1, 2, 3], result) 623 624 self.assertTrue(t1.done()) 625 self.assertTrue(t1.result()) 626 self.assertTrue(t2.done()) 627 self.assertTrue(t2.result()) 628 self.assertTrue(t3.done()) 629 self.assertTrue(t3.result()) 630 631 async def test_notify_all(self): 632 cond = asyncio.Condition() 633 634 result = [] 635 636 async def c1(result): 637 await cond.acquire() 638 if await cond.wait(): 639 result.append(1) 640 cond.release() 641 return True 642 643 async def c2(result): 644 await cond.acquire() 645 if await cond.wait(): 646 result.append(2) 647 cond.release() 648 return True 649 650 t1 = asyncio.create_task(c1(result)) 651 t2 = asyncio.create_task(c2(result)) 652 653 await asyncio.sleep(0) 654 self.assertEqual([], result) 655 656 await cond.acquire() 657 cond.notify_all() 658 cond.release() 659 await asyncio.sleep(0) 660 self.assertEqual([1, 2], result) 661 662 self.assertTrue(t1.done()) 663 self.assertTrue(t1.result()) 664 self.assertTrue(t2.done()) 665 self.assertTrue(t2.result()) 666 667 def test_notify_unacquired(self): 668 cond = asyncio.Condition() 669 self.assertRaises(RuntimeError, cond.notify) 670 671 def test_notify_all_unacquired(self): 672 cond = asyncio.Condition() 673 self.assertRaises(RuntimeError, cond.notify_all) 674 675 async def test_repr(self): 676 cond = asyncio.Condition() 677 self.assertTrue('unlocked' in repr(cond)) 678 self.assertTrue(RGX_REPR.match(repr(cond))) 679 680 await cond.acquire() 681 self.assertTrue('locked' in repr(cond)) 682 683 cond._waiters.append(mock.Mock()) 684 self.assertTrue('waiters:1' in repr(cond)) 685 self.assertTrue(RGX_REPR.match(repr(cond))) 686 687 cond._waiters.append(mock.Mock()) 688 self.assertTrue('waiters:2' in repr(cond)) 689 self.assertTrue(RGX_REPR.match(repr(cond))) 690 691 async def test_context_manager(self): 692 cond = asyncio.Condition() 693 self.assertFalse(cond.locked()) 694 async with cond: 695 self.assertTrue(cond.locked()) 696 self.assertFalse(cond.locked()) 697 698 async def test_explicit_lock(self): 699 async def f(lock=None, cond=None): 700 if lock is None: 701 lock = asyncio.Lock() 702 if cond is None: 703 cond = asyncio.Condition(lock) 704 self.assertIs(cond._lock, lock) 705 self.assertFalse(lock.locked()) 706 self.assertFalse(cond.locked()) 707 async with cond: 708 self.assertTrue(lock.locked()) 709 self.assertTrue(cond.locked()) 710 self.assertFalse(lock.locked()) 711 self.assertFalse(cond.locked()) 712 async with lock: 713 self.assertTrue(lock.locked()) 714 self.assertTrue(cond.locked()) 715 self.assertFalse(lock.locked()) 716 self.assertFalse(cond.locked()) 717 718 # All should work in the same way. 719 await f() 720 await f(asyncio.Lock()) 721 lock = asyncio.Lock() 722 await f(lock, asyncio.Condition(lock)) 723 724 async def test_ambiguous_loops(self): 725 loop = asyncio.new_event_loop() 726 self.addCleanup(loop.close) 727 728 async def wrong_loop_in_lock(): 729 with self.assertRaises(TypeError): 730 asyncio.Lock(loop=loop) # actively disallowed since 3.10 731 lock = asyncio.Lock() 732 lock._loop = loop # use private API for testing 733 async with lock: 734 # acquired immediately via the fast-path 735 # without interaction with any event loop. 736 cond = asyncio.Condition(lock) 737 # cond.acquire() will trigger waiting on the lock 738 # and it will discover the event loop mismatch. 739 with self.assertRaisesRegex( 740 RuntimeError, 741 "is bound to a different event loop", 742 ): 743 await cond.acquire() 744 745 async def wrong_loop_in_cond(): 746 # Same analogy here with the condition's loop. 747 lock = asyncio.Lock() 748 async with lock: 749 with self.assertRaises(TypeError): 750 asyncio.Condition(lock, loop=loop) 751 cond = asyncio.Condition(lock) 752 cond._loop = loop 753 with self.assertRaisesRegex( 754 RuntimeError, 755 "is bound to a different event loop", 756 ): 757 await cond.wait() 758 759 await wrong_loop_in_lock() 760 await wrong_loop_in_cond() 761 762 async def test_timeout_in_block(self): 763 condition = asyncio.Condition() 764 async with condition: 765 with self.assertRaises(asyncio.TimeoutError): 766 await asyncio.wait_for(condition.wait(), timeout=0.5) 767 768 769class SemaphoreTests(unittest.IsolatedAsyncioTestCase): 770 771 def test_initial_value_zero(self): 772 sem = asyncio.Semaphore(0) 773 self.assertTrue(sem.locked()) 774 775 async def test_repr(self): 776 sem = asyncio.Semaphore() 777 self.assertTrue(repr(sem).endswith('[unlocked, value:1]>')) 778 self.assertTrue(RGX_REPR.match(repr(sem))) 779 780 await sem.acquire() 781 self.assertTrue(repr(sem).endswith('[locked]>')) 782 self.assertTrue('waiters' not in repr(sem)) 783 self.assertTrue(RGX_REPR.match(repr(sem))) 784 785 sem._waiters.append(mock.Mock()) 786 self.assertTrue('waiters:1' in repr(sem)) 787 self.assertTrue(RGX_REPR.match(repr(sem))) 788 789 sem._waiters.append(mock.Mock()) 790 self.assertTrue('waiters:2' in repr(sem)) 791 self.assertTrue(RGX_REPR.match(repr(sem))) 792 793 async def test_semaphore(self): 794 sem = asyncio.Semaphore() 795 self.assertEqual(1, sem._value) 796 797 with self.assertWarns(DeprecationWarning): 798 @asyncio.coroutine 799 def acquire_lock(): 800 return (yield from sem) 801 802 with self.assertRaisesRegex( 803 TypeError, 804 "'Semaphore' object is not iterable", 805 ): 806 await acquire_lock() 807 808 self.assertFalse(sem.locked()) 809 self.assertEqual(1, sem._value) 810 811 def test_semaphore_value(self): 812 self.assertRaises(ValueError, asyncio.Semaphore, -1) 813 814 async def test_acquire(self): 815 sem = asyncio.Semaphore(3) 816 result = [] 817 818 self.assertTrue(await sem.acquire()) 819 self.assertTrue(await sem.acquire()) 820 self.assertFalse(sem.locked()) 821 822 async def c1(result): 823 await sem.acquire() 824 result.append(1) 825 return True 826 827 async def c2(result): 828 await sem.acquire() 829 result.append(2) 830 return True 831 832 async def c3(result): 833 await sem.acquire() 834 result.append(3) 835 return True 836 837 async def c4(result): 838 await sem.acquire() 839 result.append(4) 840 return True 841 842 t1 = asyncio.create_task(c1(result)) 843 t2 = asyncio.create_task(c2(result)) 844 t3 = asyncio.create_task(c3(result)) 845 846 await asyncio.sleep(0) 847 self.assertEqual([1], result) 848 self.assertTrue(sem.locked()) 849 self.assertEqual(2, len(sem._waiters)) 850 self.assertEqual(0, sem._value) 851 852 t4 = asyncio.create_task(c4(result)) 853 854 sem.release() 855 sem.release() 856 self.assertEqual(2, sem._value) 857 858 await asyncio.sleep(0) 859 self.assertEqual(0, sem._value) 860 self.assertEqual(3, len(result)) 861 self.assertTrue(sem.locked()) 862 self.assertEqual(1, len(sem._waiters)) 863 self.assertEqual(0, sem._value) 864 865 self.assertTrue(t1.done()) 866 self.assertTrue(t1.result()) 867 race_tasks = [t2, t3, t4] 868 done_tasks = [t for t in race_tasks if t.done() and t.result()] 869 self.assertTrue(2, len(done_tasks)) 870 871 # cleanup locked semaphore 872 sem.release() 873 await asyncio.gather(*race_tasks) 874 875 async def test_acquire_cancel(self): 876 sem = asyncio.Semaphore() 877 await sem.acquire() 878 879 acquire = asyncio.create_task(sem.acquire()) 880 asyncio.get_running_loop().call_soon(acquire.cancel) 881 with self.assertRaises(asyncio.CancelledError): 882 await acquire 883 self.assertTrue((not sem._waiters) or 884 all(waiter.done() for waiter in sem._waiters)) 885 886 async def test_acquire_cancel_before_awoken(self): 887 sem = asyncio.Semaphore(value=0) 888 889 t1 = asyncio.create_task(sem.acquire()) 890 t2 = asyncio.create_task(sem.acquire()) 891 t3 = asyncio.create_task(sem.acquire()) 892 t4 = asyncio.create_task(sem.acquire()) 893 894 await asyncio.sleep(0) 895 896 t1.cancel() 897 t2.cancel() 898 sem.release() 899 900 await asyncio.sleep(0) 901 num_done = sum(t.done() for t in [t3, t4]) 902 self.assertEqual(num_done, 1) 903 self.assertTrue(t3.done()) 904 self.assertFalse(t4.done()) 905 906 t3.cancel() 907 t4.cancel() 908 await asyncio.sleep(0) 909 910 async def test_acquire_hang(self): 911 sem = asyncio.Semaphore(value=0) 912 913 t1 = asyncio.create_task(sem.acquire()) 914 t2 = asyncio.create_task(sem.acquire()) 915 await asyncio.sleep(0) 916 917 t1.cancel() 918 sem.release() 919 await asyncio.sleep(0) 920 self.assertTrue(sem.locked()) 921 self.assertTrue(t2.done()) 922 923 def test_release_not_acquired(self): 924 sem = asyncio.BoundedSemaphore() 925 926 self.assertRaises(ValueError, sem.release) 927 928 async def test_release_no_waiters(self): 929 sem = asyncio.Semaphore() 930 await sem.acquire() 931 self.assertTrue(sem.locked()) 932 933 sem.release() 934 self.assertFalse(sem.locked()) 935 936 937if __name__ == '__main__': 938 unittest.main() 939