1""" 2Various tests for synchronization primitives. 3""" 4 5import sys 6import time 7from _thread import start_new_thread, TIMEOUT_MAX 8import threading 9import unittest 10import weakref 11 12from test import support 13 14 15def _wait(): 16 # A crude wait/yield function not relying on synchronization primitives. 17 time.sleep(0.01) 18 19class Bunch(object): 20 """ 21 A bunch of threads. 22 """ 23 def __init__(self, f, n, wait_before_exit=False): 24 """ 25 Construct a bunch of `n` threads running the same function `f`. 26 If `wait_before_exit` is True, the threads won't terminate until 27 do_finish() is called. 28 """ 29 self.f = f 30 self.n = n 31 self.started = [] 32 self.finished = [] 33 self._can_exit = not wait_before_exit 34 self.wait_thread = support.wait_threads_exit() 35 self.wait_thread.__enter__() 36 37 def task(): 38 tid = threading.get_ident() 39 self.started.append(tid) 40 try: 41 f() 42 finally: 43 self.finished.append(tid) 44 while not self._can_exit: 45 _wait() 46 47 try: 48 for i in range(n): 49 start_new_thread(task, ()) 50 except: 51 self._can_exit = True 52 raise 53 54 def wait_for_started(self): 55 while len(self.started) < self.n: 56 _wait() 57 58 def wait_for_finished(self): 59 while len(self.finished) < self.n: 60 _wait() 61 # Wait for threads exit 62 self.wait_thread.__exit__(None, None, None) 63 64 def do_finish(self): 65 self._can_exit = True 66 67 68class BaseTestCase(unittest.TestCase): 69 def setUp(self): 70 self._threads = support.threading_setup() 71 72 def tearDown(self): 73 support.threading_cleanup(*self._threads) 74 support.reap_children() 75 76 def assertTimeout(self, actual, expected): 77 # The waiting and/or time.monotonic() can be imprecise, which 78 # is why comparing to the expected value would sometimes fail 79 # (especially under Windows). 80 self.assertGreaterEqual(actual, expected * 0.6) 81 # Test nothing insane happened 82 self.assertLess(actual, expected * 10.0) 83 84 85class BaseLockTests(BaseTestCase): 86 """ 87 Tests for both recursive and non-recursive locks. 88 """ 89 90 def test_constructor(self): 91 lock = self.locktype() 92 del lock 93 94 def test_repr(self): 95 lock = self.locktype() 96 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>") 97 del lock 98 99 def test_locked_repr(self): 100 lock = self.locktype() 101 lock.acquire() 102 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>") 103 del lock 104 105 def test_acquire_destroy(self): 106 lock = self.locktype() 107 lock.acquire() 108 del lock 109 110 def test_acquire_release(self): 111 lock = self.locktype() 112 lock.acquire() 113 lock.release() 114 del lock 115 116 def test_try_acquire(self): 117 lock = self.locktype() 118 self.assertTrue(lock.acquire(False)) 119 lock.release() 120 121 def test_try_acquire_contended(self): 122 lock = self.locktype() 123 lock.acquire() 124 result = [] 125 def f(): 126 result.append(lock.acquire(False)) 127 Bunch(f, 1).wait_for_finished() 128 self.assertFalse(result[0]) 129 lock.release() 130 131 def test_acquire_contended(self): 132 lock = self.locktype() 133 lock.acquire() 134 N = 5 135 def f(): 136 lock.acquire() 137 lock.release() 138 139 b = Bunch(f, N) 140 b.wait_for_started() 141 _wait() 142 self.assertEqual(len(b.finished), 0) 143 lock.release() 144 b.wait_for_finished() 145 self.assertEqual(len(b.finished), N) 146 147 def test_with(self): 148 lock = self.locktype() 149 def f(): 150 lock.acquire() 151 lock.release() 152 def _with(err=None): 153 with lock: 154 if err is not None: 155 raise err 156 _with() 157 # Check the lock is unacquired 158 Bunch(f, 1).wait_for_finished() 159 self.assertRaises(TypeError, _with, TypeError) 160 # Check the lock is unacquired 161 Bunch(f, 1).wait_for_finished() 162 163 def test_thread_leak(self): 164 # The lock shouldn't leak a Thread instance when used from a foreign 165 # (non-threading) thread. 166 lock = self.locktype() 167 def f(): 168 lock.acquire() 169 lock.release() 170 n = len(threading.enumerate()) 171 # We run many threads in the hope that existing threads ids won't 172 # be recycled. 173 Bunch(f, 15).wait_for_finished() 174 if len(threading.enumerate()) != n: 175 # There is a small window during which a Thread instance's 176 # target function has finished running, but the Thread is still 177 # alive and registered. Avoid spurious failures by waiting a 178 # bit more (seen on a buildbot). 179 time.sleep(0.4) 180 self.assertEqual(n, len(threading.enumerate())) 181 182 def test_timeout(self): 183 lock = self.locktype() 184 # Can't set timeout if not blocking 185 self.assertRaises(ValueError, lock.acquire, 0, 1) 186 # Invalid timeout values 187 self.assertRaises(ValueError, lock.acquire, timeout=-100) 188 self.assertRaises(OverflowError, lock.acquire, timeout=1e100) 189 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1) 190 # TIMEOUT_MAX is ok 191 lock.acquire(timeout=TIMEOUT_MAX) 192 lock.release() 193 t1 = time.monotonic() 194 self.assertTrue(lock.acquire(timeout=5)) 195 t2 = time.monotonic() 196 # Just a sanity test that it didn't actually wait for the timeout. 197 self.assertLess(t2 - t1, 5) 198 results = [] 199 def f(): 200 t1 = time.monotonic() 201 results.append(lock.acquire(timeout=0.5)) 202 t2 = time.monotonic() 203 results.append(t2 - t1) 204 Bunch(f, 1).wait_for_finished() 205 self.assertFalse(results[0]) 206 self.assertTimeout(results[1], 0.5) 207 208 def test_weakref_exists(self): 209 lock = self.locktype() 210 ref = weakref.ref(lock) 211 self.assertIsNotNone(ref()) 212 213 def test_weakref_deleted(self): 214 lock = self.locktype() 215 ref = weakref.ref(lock) 216 del lock 217 self.assertIsNone(ref()) 218 219 220class LockTests(BaseLockTests): 221 """ 222 Tests for non-recursive, weak locks 223 (which can be acquired and released from different threads). 224 """ 225 def test_reacquire(self): 226 # Lock needs to be released before re-acquiring. 227 lock = self.locktype() 228 phase = [] 229 230 def f(): 231 lock.acquire() 232 phase.append(None) 233 lock.acquire() 234 phase.append(None) 235 236 with support.wait_threads_exit(): 237 start_new_thread(f, ()) 238 while len(phase) == 0: 239 _wait() 240 _wait() 241 self.assertEqual(len(phase), 1) 242 lock.release() 243 while len(phase) == 1: 244 _wait() 245 self.assertEqual(len(phase), 2) 246 247 def test_different_thread(self): 248 # Lock can be released from a different thread. 249 lock = self.locktype() 250 lock.acquire() 251 def f(): 252 lock.release() 253 b = Bunch(f, 1) 254 b.wait_for_finished() 255 lock.acquire() 256 lock.release() 257 258 def test_state_after_timeout(self): 259 # Issue #11618: check that lock is in a proper state after a 260 # (non-zero) timeout. 261 lock = self.locktype() 262 lock.acquire() 263 self.assertFalse(lock.acquire(timeout=0.01)) 264 lock.release() 265 self.assertFalse(lock.locked()) 266 self.assertTrue(lock.acquire(blocking=False)) 267 268 269class RLockTests(BaseLockTests): 270 """ 271 Tests for recursive locks. 272 """ 273 def test_reacquire(self): 274 lock = self.locktype() 275 lock.acquire() 276 lock.acquire() 277 lock.release() 278 lock.acquire() 279 lock.release() 280 lock.release() 281 282 def test_release_unacquired(self): 283 # Cannot release an unacquired lock 284 lock = self.locktype() 285 self.assertRaises(RuntimeError, lock.release) 286 lock.acquire() 287 lock.acquire() 288 lock.release() 289 lock.acquire() 290 lock.release() 291 lock.release() 292 self.assertRaises(RuntimeError, lock.release) 293 294 def test_release_save_unacquired(self): 295 # Cannot _release_save an unacquired lock 296 lock = self.locktype() 297 self.assertRaises(RuntimeError, lock._release_save) 298 lock.acquire() 299 lock.acquire() 300 lock.release() 301 lock.acquire() 302 lock.release() 303 lock.release() 304 self.assertRaises(RuntimeError, lock._release_save) 305 306 def test_different_thread(self): 307 # Cannot release from a different thread 308 lock = self.locktype() 309 def f(): 310 lock.acquire() 311 b = Bunch(f, 1, True) 312 try: 313 self.assertRaises(RuntimeError, lock.release) 314 finally: 315 b.do_finish() 316 b.wait_for_finished() 317 318 def test__is_owned(self): 319 lock = self.locktype() 320 self.assertFalse(lock._is_owned()) 321 lock.acquire() 322 self.assertTrue(lock._is_owned()) 323 lock.acquire() 324 self.assertTrue(lock._is_owned()) 325 result = [] 326 def f(): 327 result.append(lock._is_owned()) 328 Bunch(f, 1).wait_for_finished() 329 self.assertFalse(result[0]) 330 lock.release() 331 self.assertTrue(lock._is_owned()) 332 lock.release() 333 self.assertFalse(lock._is_owned()) 334 335 336class EventTests(BaseTestCase): 337 """ 338 Tests for Event objects. 339 """ 340 341 def test_is_set(self): 342 evt = self.eventtype() 343 self.assertFalse(evt.is_set()) 344 evt.set() 345 self.assertTrue(evt.is_set()) 346 evt.set() 347 self.assertTrue(evt.is_set()) 348 evt.clear() 349 self.assertFalse(evt.is_set()) 350 evt.clear() 351 self.assertFalse(evt.is_set()) 352 353 def _check_notify(self, evt): 354 # All threads get notified 355 N = 5 356 results1 = [] 357 results2 = [] 358 def f(): 359 results1.append(evt.wait()) 360 results2.append(evt.wait()) 361 b = Bunch(f, N) 362 b.wait_for_started() 363 _wait() 364 self.assertEqual(len(results1), 0) 365 evt.set() 366 b.wait_for_finished() 367 self.assertEqual(results1, [True] * N) 368 self.assertEqual(results2, [True] * N) 369 370 def test_notify(self): 371 evt = self.eventtype() 372 self._check_notify(evt) 373 # Another time, after an explicit clear() 374 evt.set() 375 evt.clear() 376 self._check_notify(evt) 377 378 def test_timeout(self): 379 evt = self.eventtype() 380 results1 = [] 381 results2 = [] 382 N = 5 383 def f(): 384 results1.append(evt.wait(0.0)) 385 t1 = time.monotonic() 386 r = evt.wait(0.5) 387 t2 = time.monotonic() 388 results2.append((r, t2 - t1)) 389 Bunch(f, N).wait_for_finished() 390 self.assertEqual(results1, [False] * N) 391 for r, dt in results2: 392 self.assertFalse(r) 393 self.assertTimeout(dt, 0.5) 394 # The event is set 395 results1 = [] 396 results2 = [] 397 evt.set() 398 Bunch(f, N).wait_for_finished() 399 self.assertEqual(results1, [True] * N) 400 for r, dt in results2: 401 self.assertTrue(r) 402 403 def test_set_and_clear(self): 404 # Issue #13502: check that wait() returns true even when the event is 405 # cleared before the waiting thread is woken up. 406 evt = self.eventtype() 407 results = [] 408 timeout = 0.250 409 N = 5 410 def f(): 411 results.append(evt.wait(timeout * 4)) 412 b = Bunch(f, N) 413 b.wait_for_started() 414 time.sleep(timeout) 415 evt.set() 416 evt.clear() 417 b.wait_for_finished() 418 self.assertEqual(results, [True] * N) 419 420 def test_reset_internal_locks(self): 421 # ensure that condition is still using a Lock after reset 422 evt = self.eventtype() 423 with evt._cond: 424 self.assertFalse(evt._cond.acquire(False)) 425 evt._reset_internal_locks() 426 with evt._cond: 427 self.assertFalse(evt._cond.acquire(False)) 428 429 430class ConditionTests(BaseTestCase): 431 """ 432 Tests for condition variables. 433 """ 434 435 def test_acquire(self): 436 cond = self.condtype() 437 # Be default we have an RLock: the condition can be acquired multiple 438 # times. 439 cond.acquire() 440 cond.acquire() 441 cond.release() 442 cond.release() 443 lock = threading.Lock() 444 cond = self.condtype(lock) 445 cond.acquire() 446 self.assertFalse(lock.acquire(False)) 447 cond.release() 448 self.assertTrue(lock.acquire(False)) 449 self.assertFalse(cond.acquire(False)) 450 lock.release() 451 with cond: 452 self.assertFalse(lock.acquire(False)) 453 454 def test_unacquired_wait(self): 455 cond = self.condtype() 456 self.assertRaises(RuntimeError, cond.wait) 457 458 def test_unacquired_notify(self): 459 cond = self.condtype() 460 self.assertRaises(RuntimeError, cond.notify) 461 462 def _check_notify(self, cond): 463 # Note that this test is sensitive to timing. If the worker threads 464 # don't execute in a timely fashion, the main thread may think they 465 # are further along then they are. The main thread therefore issues 466 # _wait() statements to try to make sure that it doesn't race ahead 467 # of the workers. 468 # Secondly, this test assumes that condition variables are not subject 469 # to spurious wakeups. The absence of spurious wakeups is an implementation 470 # detail of Condition Variables in current CPython, but in general, not 471 # a guaranteed property of condition variables as a programming 472 # construct. In particular, it is possible that this can no longer 473 # be conveniently guaranteed should their implementation ever change. 474 N = 5 475 ready = [] 476 results1 = [] 477 results2 = [] 478 phase_num = 0 479 def f(): 480 cond.acquire() 481 ready.append(phase_num) 482 result = cond.wait() 483 cond.release() 484 results1.append((result, phase_num)) 485 cond.acquire() 486 ready.append(phase_num) 487 result = cond.wait() 488 cond.release() 489 results2.append((result, phase_num)) 490 b = Bunch(f, N) 491 b.wait_for_started() 492 # first wait, to ensure all workers settle into cond.wait() before 493 # we continue. See issues #8799 and #30727. 494 while len(ready) < 5: 495 _wait() 496 ready.clear() 497 self.assertEqual(results1, []) 498 # Notify 3 threads at first 499 cond.acquire() 500 cond.notify(3) 501 _wait() 502 phase_num = 1 503 cond.release() 504 while len(results1) < 3: 505 _wait() 506 self.assertEqual(results1, [(True, 1)] * 3) 507 self.assertEqual(results2, []) 508 # make sure all awaken workers settle into cond.wait() 509 while len(ready) < 3: 510 _wait() 511 # Notify 5 threads: they might be in their first or second wait 512 cond.acquire() 513 cond.notify(5) 514 _wait() 515 phase_num = 2 516 cond.release() 517 while len(results1) + len(results2) < 8: 518 _wait() 519 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) 520 self.assertEqual(results2, [(True, 2)] * 3) 521 # make sure all workers settle into cond.wait() 522 while len(ready) < 5: 523 _wait() 524 # Notify all threads: they are all in their second wait 525 cond.acquire() 526 cond.notify_all() 527 _wait() 528 phase_num = 3 529 cond.release() 530 while len(results2) < 5: 531 _wait() 532 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) 533 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) 534 b.wait_for_finished() 535 536 def test_notify(self): 537 cond = self.condtype() 538 self._check_notify(cond) 539 # A second time, to check internal state is still ok. 540 self._check_notify(cond) 541 542 def test_timeout(self): 543 cond = self.condtype() 544 results = [] 545 N = 5 546 def f(): 547 cond.acquire() 548 t1 = time.monotonic() 549 result = cond.wait(0.5) 550 t2 = time.monotonic() 551 cond.release() 552 results.append((t2 - t1, result)) 553 Bunch(f, N).wait_for_finished() 554 self.assertEqual(len(results), N) 555 for dt, result in results: 556 self.assertTimeout(dt, 0.5) 557 # Note that conceptually (that"s the condition variable protocol) 558 # a wait() may succeed even if no one notifies us and before any 559 # timeout occurs. Spurious wakeups can occur. 560 # This makes it hard to verify the result value. 561 # In practice, this implementation has no spurious wakeups. 562 self.assertFalse(result) 563 564 def test_waitfor(self): 565 cond = self.condtype() 566 state = 0 567 def f(): 568 with cond: 569 result = cond.wait_for(lambda : state==4) 570 self.assertTrue(result) 571 self.assertEqual(state, 4) 572 b = Bunch(f, 1) 573 b.wait_for_started() 574 for i in range(4): 575 time.sleep(0.01) 576 with cond: 577 state += 1 578 cond.notify() 579 b.wait_for_finished() 580 581 def test_waitfor_timeout(self): 582 cond = self.condtype() 583 state = 0 584 success = [] 585 def f(): 586 with cond: 587 dt = time.monotonic() 588 result = cond.wait_for(lambda : state==4, timeout=0.1) 589 dt = time.monotonic() - dt 590 self.assertFalse(result) 591 self.assertTimeout(dt, 0.1) 592 success.append(None) 593 b = Bunch(f, 1) 594 b.wait_for_started() 595 # Only increment 3 times, so state == 4 is never reached. 596 for i in range(3): 597 time.sleep(0.01) 598 with cond: 599 state += 1 600 cond.notify() 601 b.wait_for_finished() 602 self.assertEqual(len(success), 1) 603 604 605class BaseSemaphoreTests(BaseTestCase): 606 """ 607 Common tests for {bounded, unbounded} semaphore objects. 608 """ 609 610 def test_constructor(self): 611 self.assertRaises(ValueError, self.semtype, value = -1) 612 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize) 613 614 def test_acquire(self): 615 sem = self.semtype(1) 616 sem.acquire() 617 sem.release() 618 sem = self.semtype(2) 619 sem.acquire() 620 sem.acquire() 621 sem.release() 622 sem.release() 623 624 def test_acquire_destroy(self): 625 sem = self.semtype() 626 sem.acquire() 627 del sem 628 629 def test_acquire_contended(self): 630 sem = self.semtype(7) 631 sem.acquire() 632 N = 10 633 sem_results = [] 634 results1 = [] 635 results2 = [] 636 phase_num = 0 637 def f(): 638 sem_results.append(sem.acquire()) 639 results1.append(phase_num) 640 sem_results.append(sem.acquire()) 641 results2.append(phase_num) 642 b = Bunch(f, 10) 643 b.wait_for_started() 644 while len(results1) + len(results2) < 6: 645 _wait() 646 self.assertEqual(results1 + results2, [0] * 6) 647 phase_num = 1 648 for i in range(7): 649 sem.release() 650 while len(results1) + len(results2) < 13: 651 _wait() 652 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) 653 phase_num = 2 654 for i in range(6): 655 sem.release() 656 while len(results1) + len(results2) < 19: 657 _wait() 658 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) 659 # The semaphore is still locked 660 self.assertFalse(sem.acquire(False)) 661 # Final release, to let the last thread finish 662 sem.release() 663 b.wait_for_finished() 664 self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1)) 665 666 def test_try_acquire(self): 667 sem = self.semtype(2) 668 self.assertTrue(sem.acquire(False)) 669 self.assertTrue(sem.acquire(False)) 670 self.assertFalse(sem.acquire(False)) 671 sem.release() 672 self.assertTrue(sem.acquire(False)) 673 674 def test_try_acquire_contended(self): 675 sem = self.semtype(4) 676 sem.acquire() 677 results = [] 678 def f(): 679 results.append(sem.acquire(False)) 680 results.append(sem.acquire(False)) 681 Bunch(f, 5).wait_for_finished() 682 # There can be a thread switch between acquiring the semaphore and 683 # appending the result, therefore results will not necessarily be 684 # ordered. 685 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) 686 687 def test_acquire_timeout(self): 688 sem = self.semtype(2) 689 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0) 690 self.assertTrue(sem.acquire(timeout=0.005)) 691 self.assertTrue(sem.acquire(timeout=0.005)) 692 self.assertFalse(sem.acquire(timeout=0.005)) 693 sem.release() 694 self.assertTrue(sem.acquire(timeout=0.005)) 695 t = time.monotonic() 696 self.assertFalse(sem.acquire(timeout=0.5)) 697 dt = time.monotonic() - t 698 self.assertTimeout(dt, 0.5) 699 700 def test_default_value(self): 701 # The default initial value is 1. 702 sem = self.semtype() 703 sem.acquire() 704 def f(): 705 sem.acquire() 706 sem.release() 707 b = Bunch(f, 1) 708 b.wait_for_started() 709 _wait() 710 self.assertFalse(b.finished) 711 sem.release() 712 b.wait_for_finished() 713 714 def test_with(self): 715 sem = self.semtype(2) 716 def _with(err=None): 717 with sem: 718 self.assertTrue(sem.acquire(False)) 719 sem.release() 720 with sem: 721 self.assertFalse(sem.acquire(False)) 722 if err: 723 raise err 724 _with() 725 self.assertTrue(sem.acquire(False)) 726 sem.release() 727 self.assertRaises(TypeError, _with, TypeError) 728 self.assertTrue(sem.acquire(False)) 729 sem.release() 730 731class SemaphoreTests(BaseSemaphoreTests): 732 """ 733 Tests for unbounded semaphores. 734 """ 735 736 def test_release_unacquired(self): 737 # Unbounded releases are allowed and increment the semaphore's value 738 sem = self.semtype(1) 739 sem.release() 740 sem.acquire() 741 sem.acquire() 742 sem.release() 743 744 745class BoundedSemaphoreTests(BaseSemaphoreTests): 746 """ 747 Tests for bounded semaphores. 748 """ 749 750 def test_release_unacquired(self): 751 # Cannot go past the initial value 752 sem = self.semtype() 753 self.assertRaises(ValueError, sem.release) 754 sem.acquire() 755 sem.release() 756 self.assertRaises(ValueError, sem.release) 757 758 759class BarrierTests(BaseTestCase): 760 """ 761 Tests for Barrier objects. 762 """ 763 N = 5 764 defaultTimeout = 2.0 765 766 def setUp(self): 767 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) 768 def tearDown(self): 769 self.barrier.abort() 770 771 def run_threads(self, f): 772 b = Bunch(f, self.N-1) 773 f() 774 b.wait_for_finished() 775 776 def multipass(self, results, n): 777 m = self.barrier.parties 778 self.assertEqual(m, self.N) 779 for i in range(n): 780 results[0].append(True) 781 self.assertEqual(len(results[1]), i * m) 782 self.barrier.wait() 783 results[1].append(True) 784 self.assertEqual(len(results[0]), (i + 1) * m) 785 self.barrier.wait() 786 self.assertEqual(self.barrier.n_waiting, 0) 787 self.assertFalse(self.barrier.broken) 788 789 def test_barrier(self, passes=1): 790 """ 791 Test that a barrier is passed in lockstep 792 """ 793 results = [[],[]] 794 def f(): 795 self.multipass(results, passes) 796 self.run_threads(f) 797 798 def test_barrier_10(self): 799 """ 800 Test that a barrier works for 10 consecutive runs 801 """ 802 return self.test_barrier(10) 803 804 def test_wait_return(self): 805 """ 806 test the return value from barrier.wait 807 """ 808 results = [] 809 def f(): 810 r = self.barrier.wait() 811 results.append(r) 812 813 self.run_threads(f) 814 self.assertEqual(sum(results), sum(range(self.N))) 815 816 def test_action(self): 817 """ 818 Test the 'action' callback 819 """ 820 results = [] 821 def action(): 822 results.append(True) 823 barrier = self.barriertype(self.N, action) 824 def f(): 825 barrier.wait() 826 self.assertEqual(len(results), 1) 827 828 self.run_threads(f) 829 830 def test_abort(self): 831 """ 832 Test that an abort will put the barrier in a broken state 833 """ 834 results1 = [] 835 results2 = [] 836 def f(): 837 try: 838 i = self.barrier.wait() 839 if i == self.N//2: 840 raise RuntimeError 841 self.barrier.wait() 842 results1.append(True) 843 except threading.BrokenBarrierError: 844 results2.append(True) 845 except RuntimeError: 846 self.barrier.abort() 847 pass 848 849 self.run_threads(f) 850 self.assertEqual(len(results1), 0) 851 self.assertEqual(len(results2), self.N-1) 852 self.assertTrue(self.barrier.broken) 853 854 def test_reset(self): 855 """ 856 Test that a 'reset' on a barrier frees the waiting threads 857 """ 858 results1 = [] 859 results2 = [] 860 results3 = [] 861 def f(): 862 i = self.barrier.wait() 863 if i == self.N//2: 864 # Wait until the other threads are all in the barrier. 865 while self.barrier.n_waiting < self.N-1: 866 time.sleep(0.001) 867 self.barrier.reset() 868 else: 869 try: 870 self.barrier.wait() 871 results1.append(True) 872 except threading.BrokenBarrierError: 873 results2.append(True) 874 # Now, pass the barrier again 875 self.barrier.wait() 876 results3.append(True) 877 878 self.run_threads(f) 879 self.assertEqual(len(results1), 0) 880 self.assertEqual(len(results2), self.N-1) 881 self.assertEqual(len(results3), self.N) 882 883 884 def test_abort_and_reset(self): 885 """ 886 Test that a barrier can be reset after being broken. 887 """ 888 results1 = [] 889 results2 = [] 890 results3 = [] 891 barrier2 = self.barriertype(self.N) 892 def f(): 893 try: 894 i = self.barrier.wait() 895 if i == self.N//2: 896 raise RuntimeError 897 self.barrier.wait() 898 results1.append(True) 899 except threading.BrokenBarrierError: 900 results2.append(True) 901 except RuntimeError: 902 self.barrier.abort() 903 pass 904 # Synchronize and reset the barrier. Must synchronize first so 905 # that everyone has left it when we reset, and after so that no 906 # one enters it before the reset. 907 if barrier2.wait() == self.N//2: 908 self.barrier.reset() 909 barrier2.wait() 910 self.barrier.wait() 911 results3.append(True) 912 913 self.run_threads(f) 914 self.assertEqual(len(results1), 0) 915 self.assertEqual(len(results2), self.N-1) 916 self.assertEqual(len(results3), self.N) 917 918 def test_timeout(self): 919 """ 920 Test wait(timeout) 921 """ 922 def f(): 923 i = self.barrier.wait() 924 if i == self.N // 2: 925 # One thread is late! 926 time.sleep(1.0) 927 # Default timeout is 2.0, so this is shorter. 928 self.assertRaises(threading.BrokenBarrierError, 929 self.barrier.wait, 0.5) 930 self.run_threads(f) 931 932 def test_default_timeout(self): 933 """ 934 Test the barrier's default timeout 935 """ 936 # create a barrier with a low default timeout 937 barrier = self.barriertype(self.N, timeout=0.3) 938 def f(): 939 i = barrier.wait() 940 if i == self.N // 2: 941 # One thread is later than the default timeout of 0.3s. 942 time.sleep(1.0) 943 self.assertRaises(threading.BrokenBarrierError, barrier.wait) 944 self.run_threads(f) 945 946 def test_single_thread(self): 947 b = self.barriertype(1) 948 b.wait() 949 b.wait() 950