1"""Tests for lock.py""" 2 3import unittest 4from unittest import mock 5import re 6 7import asyncio 8from test.test_asyncio import utils as test_utils 9 10STR_RGX_REPR = ( 11 r'^<(?P<class>.*?) object at (?P<address>.*?)' 12 r'\[(?P<extras>' 13 r'(set|unset|locked|unlocked)(, value:\d)?(, waiters:\d+)?' 14 r')\]>\Z' 15) 16RGX_REPR = re.compile(STR_RGX_REPR) 17 18 19def tearDownModule(): 20 asyncio.set_event_loop_policy(None) 21 22 23class LockTests(test_utils.TestCase): 24 25 def setUp(self): 26 super().setUp() 27 self.loop = self.new_test_loop() 28 29 def test_ctor_loop(self): 30 loop = mock.Mock() 31 with self.assertWarns(DeprecationWarning): 32 lock = asyncio.Lock(loop=loop) 33 self.assertIs(lock._loop, loop) 34 35 with self.assertWarns(DeprecationWarning): 36 lock = asyncio.Lock(loop=self.loop) 37 self.assertIs(lock._loop, self.loop) 38 39 def test_ctor_noloop(self): 40 asyncio.set_event_loop(self.loop) 41 lock = asyncio.Lock() 42 self.assertIs(lock._loop, self.loop) 43 44 def test_repr(self): 45 with self.assertWarns(DeprecationWarning): 46 lock = asyncio.Lock(loop=self.loop) 47 self.assertTrue(repr(lock).endswith('[unlocked]>')) 48 self.assertTrue(RGX_REPR.match(repr(lock))) 49 50 self.loop.run_until_complete(lock.acquire()) 51 self.assertTrue(repr(lock).endswith('[locked]>')) 52 self.assertTrue(RGX_REPR.match(repr(lock))) 53 54 def test_lock(self): 55 with self.assertWarns(DeprecationWarning): 56 lock = asyncio.Lock(loop=self.loop) 57 58 @asyncio.coroutine 59 def acquire_lock(): 60 return (yield from lock) 61 62 with self.assertRaisesRegex( 63 TypeError, 64 "object is not iterable" 65 ): 66 self.loop.run_until_complete(acquire_lock()) 67 68 self.assertFalse(lock.locked()) 69 70 def test_lock_by_with_statement(self): 71 loop = asyncio.new_event_loop() # don't use TestLoop quirks 72 self.set_event_loop(loop) 73 with self.assertWarns(DeprecationWarning): 74 primitives = [ 75 asyncio.Lock(loop=loop), 76 asyncio.Condition(loop=loop), 77 asyncio.Semaphore(loop=loop), 78 asyncio.BoundedSemaphore(loop=loop), 79 ] 80 81 @asyncio.coroutine 82 def test(lock): 83 yield from asyncio.sleep(0.01) 84 self.assertFalse(lock.locked()) 85 with self.assertRaisesRegex( 86 TypeError, 87 "object is not iterable" 88 ): 89 with (yield from lock): 90 pass 91 self.assertFalse(lock.locked()) 92 93 for primitive in primitives: 94 loop.run_until_complete(test(primitive)) 95 self.assertFalse(primitive.locked()) 96 97 def test_acquire(self): 98 with self.assertWarns(DeprecationWarning): 99 lock = asyncio.Lock(loop=self.loop) 100 result = [] 101 102 self.assertTrue(self.loop.run_until_complete(lock.acquire())) 103 104 async def c1(result): 105 if await lock.acquire(): 106 result.append(1) 107 return True 108 109 async def c2(result): 110 if await lock.acquire(): 111 result.append(2) 112 return True 113 114 async def c3(result): 115 if await lock.acquire(): 116 result.append(3) 117 return True 118 119 t1 = self.loop.create_task(c1(result)) 120 t2 = self.loop.create_task(c2(result)) 121 122 test_utils.run_briefly(self.loop) 123 self.assertEqual([], result) 124 125 lock.release() 126 test_utils.run_briefly(self.loop) 127 self.assertEqual([1], result) 128 129 test_utils.run_briefly(self.loop) 130 self.assertEqual([1], result) 131 132 t3 = self.loop.create_task(c3(result)) 133 134 lock.release() 135 test_utils.run_briefly(self.loop) 136 self.assertEqual([1, 2], result) 137 138 lock.release() 139 test_utils.run_briefly(self.loop) 140 self.assertEqual([1, 2, 3], result) 141 142 self.assertTrue(t1.done()) 143 self.assertTrue(t1.result()) 144 self.assertTrue(t2.done()) 145 self.assertTrue(t2.result()) 146 self.assertTrue(t3.done()) 147 self.assertTrue(t3.result()) 148 149 def test_acquire_cancel(self): 150 with self.assertWarns(DeprecationWarning): 151 lock = asyncio.Lock(loop=self.loop) 152 self.assertTrue(self.loop.run_until_complete(lock.acquire())) 153 154 task = self.loop.create_task(lock.acquire()) 155 self.loop.call_soon(task.cancel) 156 self.assertRaises( 157 asyncio.CancelledError, 158 self.loop.run_until_complete, task) 159 self.assertFalse(lock._waiters) 160 161 def test_cancel_race(self): 162 # Several tasks: 163 # - A acquires the lock 164 # - B is blocked in acquire() 165 # - C is blocked in acquire() 166 # 167 # Now, concurrently: 168 # - B is cancelled 169 # - A releases the lock 170 # 171 # If B's waiter is marked cancelled but not yet removed from 172 # _waiters, A's release() call will crash when trying to set 173 # B's waiter; instead, it should move on to C's waiter. 174 175 # Setup: A has the lock, b and c are waiting. 176 with self.assertWarns(DeprecationWarning): 177 lock = asyncio.Lock(loop=self.loop) 178 179 async def lockit(name, blocker): 180 await lock.acquire() 181 try: 182 if blocker is not None: 183 await blocker 184 finally: 185 lock.release() 186 187 fa = self.loop.create_future() 188 ta = self.loop.create_task(lockit('A', fa)) 189 test_utils.run_briefly(self.loop) 190 self.assertTrue(lock.locked()) 191 tb = self.loop.create_task(lockit('B', None)) 192 test_utils.run_briefly(self.loop) 193 self.assertEqual(len(lock._waiters), 1) 194 tc = self.loop.create_task(lockit('C', None)) 195 test_utils.run_briefly(self.loop) 196 self.assertEqual(len(lock._waiters), 2) 197 198 # Create the race and check. 199 # Without the fix this failed at the last assert. 200 fa.set_result(None) 201 tb.cancel() 202 self.assertTrue(lock._waiters[0].cancelled()) 203 test_utils.run_briefly(self.loop) 204 self.assertFalse(lock.locked()) 205 self.assertTrue(ta.done()) 206 self.assertTrue(tb.cancelled()) 207 self.assertTrue(tc.done()) 208 209 def test_cancel_release_race(self): 210 # Issue 32734 211 # Acquire 4 locks, cancel second, release first 212 # and 2 locks are taken at once. 213 with self.assertWarns(DeprecationWarning): 214 lock = asyncio.Lock(loop=self.loop) 215 lock_count = 0 216 call_count = 0 217 218 async def lockit(): 219 nonlocal lock_count 220 nonlocal call_count 221 call_count += 1 222 await lock.acquire() 223 lock_count += 1 224 225 async def lockandtrigger(): 226 await lock.acquire() 227 self.loop.call_soon(trigger) 228 229 def trigger(): 230 t1.cancel() 231 lock.release() 232 233 t0 = self.loop.create_task(lockandtrigger()) 234 t1 = self.loop.create_task(lockit()) 235 t2 = self.loop.create_task(lockit()) 236 t3 = self.loop.create_task(lockit()) 237 238 # First loop acquires all 239 test_utils.run_briefly(self.loop) 240 self.assertTrue(t0.done()) 241 242 # Second loop calls trigger 243 test_utils.run_briefly(self.loop) 244 # Third loop calls cancellation 245 test_utils.run_briefly(self.loop) 246 247 # Make sure only one lock was taken 248 self.assertEqual(lock_count, 1) 249 # While 3 calls were made to lockit() 250 self.assertEqual(call_count, 3) 251 self.assertTrue(t1.cancelled() and t2.done()) 252 253 # Cleanup the task that is stuck on acquire. 254 t3.cancel() 255 test_utils.run_briefly(self.loop) 256 self.assertTrue(t3.cancelled()) 257 258 def test_finished_waiter_cancelled(self): 259 with self.assertWarns(DeprecationWarning): 260 lock = asyncio.Lock(loop=self.loop) 261 262 ta = self.loop.create_task(lock.acquire()) 263 test_utils.run_briefly(self.loop) 264 self.assertTrue(lock.locked()) 265 266 tb = self.loop.create_task(lock.acquire()) 267 test_utils.run_briefly(self.loop) 268 self.assertEqual(len(lock._waiters), 1) 269 270 # Create a second waiter, wake up the first, and cancel it. 271 # Without the fix, the second was not woken up. 272 tc = self.loop.create_task(lock.acquire()) 273 lock.release() 274 tb.cancel() 275 test_utils.run_briefly(self.loop) 276 277 self.assertTrue(lock.locked()) 278 self.assertTrue(ta.done()) 279 self.assertTrue(tb.cancelled()) 280 281 def test_release_not_acquired(self): 282 with self.assertWarns(DeprecationWarning): 283 lock = asyncio.Lock(loop=self.loop) 284 285 self.assertRaises(RuntimeError, lock.release) 286 287 def test_release_no_waiters(self): 288 with self.assertWarns(DeprecationWarning): 289 lock = asyncio.Lock(loop=self.loop) 290 self.loop.run_until_complete(lock.acquire()) 291 self.assertTrue(lock.locked()) 292 293 lock.release() 294 self.assertFalse(lock.locked()) 295 296 def test_context_manager(self): 297 async def f(): 298 lock = asyncio.Lock() 299 self.assertFalse(lock.locked()) 300 301 async with lock: 302 self.assertTrue(lock.locked()) 303 304 self.assertFalse(lock.locked()) 305 306 self.loop.run_until_complete(f()) 307 308 309class EventTests(test_utils.TestCase): 310 311 def setUp(self): 312 super().setUp() 313 self.loop = self.new_test_loop() 314 315 def test_ctor_loop(self): 316 loop = mock.Mock() 317 with self.assertWarns(DeprecationWarning): 318 ev = asyncio.Event(loop=loop) 319 self.assertIs(ev._loop, loop) 320 321 with self.assertWarns(DeprecationWarning): 322 ev = asyncio.Event(loop=self.loop) 323 self.assertIs(ev._loop, self.loop) 324 325 def test_ctor_noloop(self): 326 asyncio.set_event_loop(self.loop) 327 ev = asyncio.Event() 328 self.assertIs(ev._loop, self.loop) 329 330 def test_repr(self): 331 with self.assertWarns(DeprecationWarning): 332 ev = asyncio.Event(loop=self.loop) 333 self.assertTrue(repr(ev).endswith('[unset]>')) 334 match = RGX_REPR.match(repr(ev)) 335 self.assertEqual(match.group('extras'), 'unset') 336 337 ev.set() 338 self.assertTrue(repr(ev).endswith('[set]>')) 339 self.assertTrue(RGX_REPR.match(repr(ev))) 340 341 ev._waiters.append(mock.Mock()) 342 self.assertTrue('waiters:1' in repr(ev)) 343 self.assertTrue(RGX_REPR.match(repr(ev))) 344 345 def test_wait(self): 346 with self.assertWarns(DeprecationWarning): 347 ev = asyncio.Event(loop=self.loop) 348 self.assertFalse(ev.is_set()) 349 350 result = [] 351 352 async def c1(result): 353 if await ev.wait(): 354 result.append(1) 355 356 async def c2(result): 357 if await ev.wait(): 358 result.append(2) 359 360 async def c3(result): 361 if await ev.wait(): 362 result.append(3) 363 364 t1 = self.loop.create_task(c1(result)) 365 t2 = self.loop.create_task(c2(result)) 366 367 test_utils.run_briefly(self.loop) 368 self.assertEqual([], result) 369 370 t3 = self.loop.create_task(c3(result)) 371 372 ev.set() 373 test_utils.run_briefly(self.loop) 374 self.assertEqual([3, 1, 2], result) 375 376 self.assertTrue(t1.done()) 377 self.assertIsNone(t1.result()) 378 self.assertTrue(t2.done()) 379 self.assertIsNone(t2.result()) 380 self.assertTrue(t3.done()) 381 self.assertIsNone(t3.result()) 382 383 def test_wait_on_set(self): 384 with self.assertWarns(DeprecationWarning): 385 ev = asyncio.Event(loop=self.loop) 386 ev.set() 387 388 res = self.loop.run_until_complete(ev.wait()) 389 self.assertTrue(res) 390 391 def test_wait_cancel(self): 392 with self.assertWarns(DeprecationWarning): 393 ev = asyncio.Event(loop=self.loop) 394 395 wait = self.loop.create_task(ev.wait()) 396 self.loop.call_soon(wait.cancel) 397 self.assertRaises( 398 asyncio.CancelledError, 399 self.loop.run_until_complete, wait) 400 self.assertFalse(ev._waiters) 401 402 def test_clear(self): 403 with self.assertWarns(DeprecationWarning): 404 ev = asyncio.Event(loop=self.loop) 405 self.assertFalse(ev.is_set()) 406 407 ev.set() 408 self.assertTrue(ev.is_set()) 409 410 ev.clear() 411 self.assertFalse(ev.is_set()) 412 413 def test_clear_with_waiters(self): 414 with self.assertWarns(DeprecationWarning): 415 ev = asyncio.Event(loop=self.loop) 416 result = [] 417 418 async def c1(result): 419 if await ev.wait(): 420 result.append(1) 421 return True 422 423 t = self.loop.create_task(c1(result)) 424 test_utils.run_briefly(self.loop) 425 self.assertEqual([], result) 426 427 ev.set() 428 ev.clear() 429 self.assertFalse(ev.is_set()) 430 431 ev.set() 432 ev.set() 433 self.assertEqual(1, len(ev._waiters)) 434 435 test_utils.run_briefly(self.loop) 436 self.assertEqual([1], result) 437 self.assertEqual(0, len(ev._waiters)) 438 439 self.assertTrue(t.done()) 440 self.assertTrue(t.result()) 441 442 443class ConditionTests(test_utils.TestCase): 444 445 def setUp(self): 446 super().setUp() 447 self.loop = self.new_test_loop() 448 449 def test_ctor_loop(self): 450 loop = mock.Mock() 451 with self.assertWarns(DeprecationWarning): 452 cond = asyncio.Condition(loop=loop) 453 self.assertIs(cond._loop, loop) 454 455 cond = asyncio.Condition(loop=self.loop) 456 self.assertIs(cond._loop, self.loop) 457 458 def test_ctor_noloop(self): 459 asyncio.set_event_loop(self.loop) 460 cond = asyncio.Condition() 461 self.assertIs(cond._loop, self.loop) 462 463 def test_wait(self): 464 with self.assertWarns(DeprecationWarning): 465 cond = asyncio.Condition(loop=self.loop) 466 result = [] 467 468 async def c1(result): 469 await cond.acquire() 470 if await cond.wait(): 471 result.append(1) 472 return True 473 474 async def c2(result): 475 await cond.acquire() 476 if await cond.wait(): 477 result.append(2) 478 return True 479 480 async def c3(result): 481 await cond.acquire() 482 if await cond.wait(): 483 result.append(3) 484 return True 485 486 t1 = self.loop.create_task(c1(result)) 487 t2 = self.loop.create_task(c2(result)) 488 t3 = self.loop.create_task(c3(result)) 489 490 test_utils.run_briefly(self.loop) 491 self.assertEqual([], result) 492 self.assertFalse(cond.locked()) 493 494 self.assertTrue(self.loop.run_until_complete(cond.acquire())) 495 cond.notify() 496 test_utils.run_briefly(self.loop) 497 self.assertEqual([], result) 498 self.assertTrue(cond.locked()) 499 500 cond.release() 501 test_utils.run_briefly(self.loop) 502 self.assertEqual([1], result) 503 self.assertTrue(cond.locked()) 504 505 cond.notify(2) 506 test_utils.run_briefly(self.loop) 507 self.assertEqual([1], result) 508 self.assertTrue(cond.locked()) 509 510 cond.release() 511 test_utils.run_briefly(self.loop) 512 self.assertEqual([1, 2], result) 513 self.assertTrue(cond.locked()) 514 515 cond.release() 516 test_utils.run_briefly(self.loop) 517 self.assertEqual([1, 2, 3], result) 518 self.assertTrue(cond.locked()) 519 520 self.assertTrue(t1.done()) 521 self.assertTrue(t1.result()) 522 self.assertTrue(t2.done()) 523 self.assertTrue(t2.result()) 524 self.assertTrue(t3.done()) 525 self.assertTrue(t3.result()) 526 527 def test_wait_cancel(self): 528 with self.assertWarns(DeprecationWarning): 529 cond = asyncio.Condition(loop=self.loop) 530 self.loop.run_until_complete(cond.acquire()) 531 532 wait = self.loop.create_task(cond.wait()) 533 self.loop.call_soon(wait.cancel) 534 self.assertRaises( 535 asyncio.CancelledError, 536 self.loop.run_until_complete, wait) 537 self.assertFalse(cond._waiters) 538 self.assertTrue(cond.locked()) 539 540 def test_wait_cancel_contested(self): 541 with self.assertWarns(DeprecationWarning): 542 cond = asyncio.Condition(loop=self.loop) 543 544 self.loop.run_until_complete(cond.acquire()) 545 self.assertTrue(cond.locked()) 546 547 wait_task = self.loop.create_task(cond.wait()) 548 test_utils.run_briefly(self.loop) 549 self.assertFalse(cond.locked()) 550 551 # Notify, but contest the lock before cancelling 552 self.loop.run_until_complete(cond.acquire()) 553 self.assertTrue(cond.locked()) 554 cond.notify() 555 self.loop.call_soon(wait_task.cancel) 556 self.loop.call_soon(cond.release) 557 558 try: 559 self.loop.run_until_complete(wait_task) 560 except asyncio.CancelledError: 561 # Should not happen, since no cancellation points 562 pass 563 564 self.assertTrue(cond.locked()) 565 566 def test_wait_cancel_after_notify(self): 567 # See bpo-32841 568 with self.assertWarns(DeprecationWarning): 569 cond = asyncio.Condition(loop=self.loop) 570 waited = False 571 572 async def wait_on_cond(): 573 nonlocal waited 574 async with cond: 575 waited = True # Make sure this area was reached 576 await cond.wait() 577 578 waiter = asyncio.ensure_future(wait_on_cond(), loop=self.loop) 579 test_utils.run_briefly(self.loop) # Start waiting 580 581 self.loop.run_until_complete(cond.acquire()) 582 cond.notify() 583 test_utils.run_briefly(self.loop) # Get to acquire() 584 waiter.cancel() 585 test_utils.run_briefly(self.loop) # Activate cancellation 586 cond.release() 587 test_utils.run_briefly(self.loop) # Cancellation should occur 588 589 self.assertTrue(waiter.cancelled()) 590 self.assertTrue(waited) 591 592 def test_wait_unacquired(self): 593 with self.assertWarns(DeprecationWarning): 594 cond = asyncio.Condition(loop=self.loop) 595 self.assertRaises( 596 RuntimeError, 597 self.loop.run_until_complete, cond.wait()) 598 599 def test_wait_for(self): 600 with self.assertWarns(DeprecationWarning): 601 cond = asyncio.Condition(loop=self.loop) 602 presult = False 603 604 def predicate(): 605 return presult 606 607 result = [] 608 609 async def c1(result): 610 await cond.acquire() 611 if await cond.wait_for(predicate): 612 result.append(1) 613 cond.release() 614 return True 615 616 t = self.loop.create_task(c1(result)) 617 618 test_utils.run_briefly(self.loop) 619 self.assertEqual([], result) 620 621 self.loop.run_until_complete(cond.acquire()) 622 cond.notify() 623 cond.release() 624 test_utils.run_briefly(self.loop) 625 self.assertEqual([], result) 626 627 presult = True 628 self.loop.run_until_complete(cond.acquire()) 629 cond.notify() 630 cond.release() 631 test_utils.run_briefly(self.loop) 632 self.assertEqual([1], result) 633 634 self.assertTrue(t.done()) 635 self.assertTrue(t.result()) 636 637 def test_wait_for_unacquired(self): 638 with self.assertWarns(DeprecationWarning): 639 cond = asyncio.Condition(loop=self.loop) 640 641 # predicate can return true immediately 642 res = self.loop.run_until_complete(cond.wait_for(lambda: [1, 2, 3])) 643 self.assertEqual([1, 2, 3], res) 644 645 self.assertRaises( 646 RuntimeError, 647 self.loop.run_until_complete, 648 cond.wait_for(lambda: False)) 649 650 def test_notify(self): 651 with self.assertWarns(DeprecationWarning): 652 cond = asyncio.Condition(loop=self.loop) 653 result = [] 654 655 async def c1(result): 656 await cond.acquire() 657 if await cond.wait(): 658 result.append(1) 659 cond.release() 660 return True 661 662 async def c2(result): 663 await cond.acquire() 664 if await cond.wait(): 665 result.append(2) 666 cond.release() 667 return True 668 669 async def c3(result): 670 await cond.acquire() 671 if await cond.wait(): 672 result.append(3) 673 cond.release() 674 return True 675 676 t1 = self.loop.create_task(c1(result)) 677 t2 = self.loop.create_task(c2(result)) 678 t3 = self.loop.create_task(c3(result)) 679 680 test_utils.run_briefly(self.loop) 681 self.assertEqual([], result) 682 683 self.loop.run_until_complete(cond.acquire()) 684 cond.notify(1) 685 cond.release() 686 test_utils.run_briefly(self.loop) 687 self.assertEqual([1], result) 688 689 self.loop.run_until_complete(cond.acquire()) 690 cond.notify(1) 691 cond.notify(2048) 692 cond.release() 693 test_utils.run_briefly(self.loop) 694 self.assertEqual([1, 2, 3], result) 695 696 self.assertTrue(t1.done()) 697 self.assertTrue(t1.result()) 698 self.assertTrue(t2.done()) 699 self.assertTrue(t2.result()) 700 self.assertTrue(t3.done()) 701 self.assertTrue(t3.result()) 702 703 def test_notify_all(self): 704 with self.assertWarns(DeprecationWarning): 705 cond = asyncio.Condition(loop=self.loop) 706 707 result = [] 708 709 async def c1(result): 710 await cond.acquire() 711 if await cond.wait(): 712 result.append(1) 713 cond.release() 714 return True 715 716 async def c2(result): 717 await cond.acquire() 718 if await cond.wait(): 719 result.append(2) 720 cond.release() 721 return True 722 723 t1 = self.loop.create_task(c1(result)) 724 t2 = self.loop.create_task(c2(result)) 725 726 test_utils.run_briefly(self.loop) 727 self.assertEqual([], result) 728 729 self.loop.run_until_complete(cond.acquire()) 730 cond.notify_all() 731 cond.release() 732 test_utils.run_briefly(self.loop) 733 self.assertEqual([1, 2], result) 734 735 self.assertTrue(t1.done()) 736 self.assertTrue(t1.result()) 737 self.assertTrue(t2.done()) 738 self.assertTrue(t2.result()) 739 740 def test_notify_unacquired(self): 741 with self.assertWarns(DeprecationWarning): 742 cond = asyncio.Condition(loop=self.loop) 743 self.assertRaises(RuntimeError, cond.notify) 744 745 def test_notify_all_unacquired(self): 746 with self.assertWarns(DeprecationWarning): 747 cond = asyncio.Condition(loop=self.loop) 748 self.assertRaises(RuntimeError, cond.notify_all) 749 750 def test_repr(self): 751 with self.assertWarns(DeprecationWarning): 752 cond = asyncio.Condition(loop=self.loop) 753 self.assertTrue('unlocked' in repr(cond)) 754 self.assertTrue(RGX_REPR.match(repr(cond))) 755 756 self.loop.run_until_complete(cond.acquire()) 757 self.assertTrue('locked' in repr(cond)) 758 759 cond._waiters.append(mock.Mock()) 760 self.assertTrue('waiters:1' in repr(cond)) 761 self.assertTrue(RGX_REPR.match(repr(cond))) 762 763 cond._waiters.append(mock.Mock()) 764 self.assertTrue('waiters:2' in repr(cond)) 765 self.assertTrue(RGX_REPR.match(repr(cond))) 766 767 def test_context_manager(self): 768 async def f(): 769 cond = asyncio.Condition() 770 self.assertFalse(cond.locked()) 771 async with cond: 772 self.assertTrue(cond.locked()) 773 self.assertFalse(cond.locked()) 774 775 self.loop.run_until_complete(f()) 776 777 def test_explicit_lock(self): 778 with self.assertWarns(DeprecationWarning): 779 lock = asyncio.Lock(loop=self.loop) 780 cond = asyncio.Condition(lock, loop=self.loop) 781 782 self.assertIs(cond._lock, lock) 783 self.assertIs(cond._loop, lock._loop) 784 785 def test_ambiguous_loops(self): 786 loop = self.new_test_loop() 787 self.addCleanup(loop.close) 788 with self.assertWarns(DeprecationWarning): 789 lock = asyncio.Lock(loop=self.loop) 790 with self.assertRaises(ValueError): 791 asyncio.Condition(lock, loop=loop) 792 793 def test_timeout_in_block(self): 794 loop = asyncio.new_event_loop() 795 self.addCleanup(loop.close) 796 797 async def task_timeout(): 798 condition = asyncio.Condition(loop=loop) 799 async with condition: 800 with self.assertRaises(asyncio.TimeoutError): 801 await asyncio.wait_for(condition.wait(), timeout=0.5) 802 803 with self.assertWarns(DeprecationWarning): 804 loop.run_until_complete(task_timeout()) 805 806 807class SemaphoreTests(test_utils.TestCase): 808 809 def setUp(self): 810 super().setUp() 811 self.loop = self.new_test_loop() 812 813 def test_ctor_loop(self): 814 loop = mock.Mock() 815 with self.assertWarns(DeprecationWarning): 816 sem = asyncio.Semaphore(loop=loop) 817 self.assertIs(sem._loop, loop) 818 819 with self.assertWarns(DeprecationWarning): 820 sem = asyncio.Semaphore(loop=self.loop) 821 self.assertIs(sem._loop, self.loop) 822 823 def test_ctor_noloop(self): 824 asyncio.set_event_loop(self.loop) 825 sem = asyncio.Semaphore() 826 self.assertIs(sem._loop, self.loop) 827 828 def test_initial_value_zero(self): 829 with self.assertWarns(DeprecationWarning): 830 sem = asyncio.Semaphore(0, loop=self.loop) 831 self.assertTrue(sem.locked()) 832 833 def test_repr(self): 834 with self.assertWarns(DeprecationWarning): 835 sem = asyncio.Semaphore(loop=self.loop) 836 self.assertTrue(repr(sem).endswith('[unlocked, value:1]>')) 837 self.assertTrue(RGX_REPR.match(repr(sem))) 838 839 self.loop.run_until_complete(sem.acquire()) 840 self.assertTrue(repr(sem).endswith('[locked]>')) 841 self.assertTrue('waiters' not in repr(sem)) 842 self.assertTrue(RGX_REPR.match(repr(sem))) 843 844 sem._waiters.append(mock.Mock()) 845 self.assertTrue('waiters:1' in repr(sem)) 846 self.assertTrue(RGX_REPR.match(repr(sem))) 847 848 sem._waiters.append(mock.Mock()) 849 self.assertTrue('waiters:2' in repr(sem)) 850 self.assertTrue(RGX_REPR.match(repr(sem))) 851 852 def test_semaphore(self): 853 with self.assertWarns(DeprecationWarning): 854 sem = asyncio.Semaphore(loop=self.loop) 855 self.assertEqual(1, sem._value) 856 857 with self.assertWarns(DeprecationWarning): 858 @asyncio.coroutine 859 def acquire_lock(): 860 return (yield from sem) 861 862 with self.assertRaisesRegex( 863 TypeError, 864 "'Semaphore' object is not iterable", 865 ): 866 self.loop.run_until_complete(acquire_lock()) 867 868 self.assertFalse(sem.locked()) 869 self.assertEqual(1, sem._value) 870 871 def test_semaphore_value(self): 872 self.assertRaises(ValueError, asyncio.Semaphore, -1) 873 874 def test_acquire(self): 875 with self.assertWarns(DeprecationWarning): 876 sem = asyncio.Semaphore(3, loop=self.loop) 877 result = [] 878 879 self.assertTrue(self.loop.run_until_complete(sem.acquire())) 880 self.assertTrue(self.loop.run_until_complete(sem.acquire())) 881 self.assertFalse(sem.locked()) 882 883 async def c1(result): 884 await sem.acquire() 885 result.append(1) 886 return True 887 888 async def c2(result): 889 await sem.acquire() 890 result.append(2) 891 return True 892 893 async def c3(result): 894 await sem.acquire() 895 result.append(3) 896 return True 897 898 async def c4(result): 899 await sem.acquire() 900 result.append(4) 901 return True 902 903 t1 = self.loop.create_task(c1(result)) 904 t2 = self.loop.create_task(c2(result)) 905 t3 = self.loop.create_task(c3(result)) 906 907 test_utils.run_briefly(self.loop) 908 self.assertEqual([1], result) 909 self.assertTrue(sem.locked()) 910 self.assertEqual(2, len(sem._waiters)) 911 self.assertEqual(0, sem._value) 912 913 t4 = self.loop.create_task(c4(result)) 914 915 sem.release() 916 sem.release() 917 self.assertEqual(2, sem._value) 918 919 test_utils.run_briefly(self.loop) 920 self.assertEqual(0, sem._value) 921 self.assertEqual(3, len(result)) 922 self.assertTrue(sem.locked()) 923 self.assertEqual(1, len(sem._waiters)) 924 self.assertEqual(0, sem._value) 925 926 self.assertTrue(t1.done()) 927 self.assertTrue(t1.result()) 928 race_tasks = [t2, t3, t4] 929 done_tasks = [t for t in race_tasks if t.done() and t.result()] 930 self.assertTrue(2, len(done_tasks)) 931 932 # cleanup locked semaphore 933 sem.release() 934 self.loop.run_until_complete(asyncio.gather(*race_tasks)) 935 936 def test_acquire_cancel(self): 937 with self.assertWarns(DeprecationWarning): 938 sem = asyncio.Semaphore(loop=self.loop) 939 self.loop.run_until_complete(sem.acquire()) 940 941 acquire = self.loop.create_task(sem.acquire()) 942 self.loop.call_soon(acquire.cancel) 943 self.assertRaises( 944 asyncio.CancelledError, 945 self.loop.run_until_complete, acquire) 946 self.assertTrue((not sem._waiters) or 947 all(waiter.done() for waiter in sem._waiters)) 948 949 def test_acquire_cancel_before_awoken(self): 950 with self.assertWarns(DeprecationWarning): 951 sem = asyncio.Semaphore(value=0, loop=self.loop) 952 953 t1 = self.loop.create_task(sem.acquire()) 954 t2 = self.loop.create_task(sem.acquire()) 955 t3 = self.loop.create_task(sem.acquire()) 956 t4 = self.loop.create_task(sem.acquire()) 957 958 test_utils.run_briefly(self.loop) 959 960 sem.release() 961 t1.cancel() 962 t2.cancel() 963 964 test_utils.run_briefly(self.loop) 965 num_done = sum(t.done() for t in [t3, t4]) 966 self.assertEqual(num_done, 1) 967 968 t3.cancel() 969 t4.cancel() 970 test_utils.run_briefly(self.loop) 971 972 def test_acquire_hang(self): 973 with self.assertWarns(DeprecationWarning): 974 sem = asyncio.Semaphore(value=0, loop=self.loop) 975 976 t1 = self.loop.create_task(sem.acquire()) 977 t2 = self.loop.create_task(sem.acquire()) 978 979 test_utils.run_briefly(self.loop) 980 981 sem.release() 982 t1.cancel() 983 984 test_utils.run_briefly(self.loop) 985 self.assertTrue(sem.locked()) 986 987 def test_release_not_acquired(self): 988 with self.assertWarns(DeprecationWarning): 989 sem = asyncio.BoundedSemaphore(loop=self.loop) 990 991 self.assertRaises(ValueError, sem.release) 992 993 def test_release_no_waiters(self): 994 with self.assertWarns(DeprecationWarning): 995 sem = asyncio.Semaphore(loop=self.loop) 996 self.loop.run_until_complete(sem.acquire()) 997 self.assertTrue(sem.locked()) 998 999 sem.release() 1000 self.assertFalse(sem.locked()) 1001 1002 1003if __name__ == '__main__': 1004 unittest.main() 1005