• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
7from _thread import start_new_thread, TIMEOUT_MAX
8import threading
9import unittest
10import weakref
11
12from test import support
13
14
15def _wait():
16    # A crude wait/yield function not relying on synchronization primitives.
17    time.sleep(0.01)
18
19class Bunch(object):
20    """
21    A bunch of threads.
22    """
23    def __init__(self, f, n, wait_before_exit=False):
24        """
25        Construct a bunch of `n` threads running the same function `f`.
26        If `wait_before_exit` is True, the threads won't terminate until
27        do_finish() is called.
28        """
29        self.f = f
30        self.n = n
31        self.started = []
32        self.finished = []
33        self._can_exit = not wait_before_exit
34        self.wait_thread = support.wait_threads_exit()
35        self.wait_thread.__enter__()
36
37        def task():
38            tid = threading.get_ident()
39            self.started.append(tid)
40            try:
41                f()
42            finally:
43                self.finished.append(tid)
44                while not self._can_exit:
45                    _wait()
46
47        try:
48            for i in range(n):
49                start_new_thread(task, ())
50        except:
51            self._can_exit = True
52            raise
53
54    def wait_for_started(self):
55        while len(self.started) < self.n:
56            _wait()
57
58    def wait_for_finished(self):
59        while len(self.finished) < self.n:
60            _wait()
61        # Wait for threads exit
62        self.wait_thread.__exit__(None, None, None)
63
64    def do_finish(self):
65        self._can_exit = True
66
67
68class BaseTestCase(unittest.TestCase):
69    def setUp(self):
70        self._threads = support.threading_setup()
71
72    def tearDown(self):
73        support.threading_cleanup(*self._threads)
74        support.reap_children()
75
76    def assertTimeout(self, actual, expected):
77        # The waiting and/or time.monotonic() can be imprecise, which
78        # is why comparing to the expected value would sometimes fail
79        # (especially under Windows).
80        self.assertGreaterEqual(actual, expected * 0.6)
81        # Test nothing insane happened
82        self.assertLess(actual, expected * 10.0)
83
84
85class BaseLockTests(BaseTestCase):
86    """
87    Tests for both recursive and non-recursive locks.
88    """
89
90    def test_constructor(self):
91        lock = self.locktype()
92        del lock
93
94    def test_repr(self):
95        lock = self.locktype()
96        self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
97        del lock
98
99    def test_locked_repr(self):
100        lock = self.locktype()
101        lock.acquire()
102        self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
103        del lock
104
105    def test_acquire_destroy(self):
106        lock = self.locktype()
107        lock.acquire()
108        del lock
109
110    def test_acquire_release(self):
111        lock = self.locktype()
112        lock.acquire()
113        lock.release()
114        del lock
115
116    def test_try_acquire(self):
117        lock = self.locktype()
118        self.assertTrue(lock.acquire(False))
119        lock.release()
120
121    def test_try_acquire_contended(self):
122        lock = self.locktype()
123        lock.acquire()
124        result = []
125        def f():
126            result.append(lock.acquire(False))
127        Bunch(f, 1).wait_for_finished()
128        self.assertFalse(result[0])
129        lock.release()
130
131    def test_acquire_contended(self):
132        lock = self.locktype()
133        lock.acquire()
134        N = 5
135        def f():
136            lock.acquire()
137            lock.release()
138
139        b = Bunch(f, N)
140        b.wait_for_started()
141        _wait()
142        self.assertEqual(len(b.finished), 0)
143        lock.release()
144        b.wait_for_finished()
145        self.assertEqual(len(b.finished), N)
146
147    def test_with(self):
148        lock = self.locktype()
149        def f():
150            lock.acquire()
151            lock.release()
152        def _with(err=None):
153            with lock:
154                if err is not None:
155                    raise err
156        _with()
157        # Check the lock is unacquired
158        Bunch(f, 1).wait_for_finished()
159        self.assertRaises(TypeError, _with, TypeError)
160        # Check the lock is unacquired
161        Bunch(f, 1).wait_for_finished()
162
163    def test_thread_leak(self):
164        # The lock shouldn't leak a Thread instance when used from a foreign
165        # (non-threading) thread.
166        lock = self.locktype()
167        def f():
168            lock.acquire()
169            lock.release()
170        n = len(threading.enumerate())
171        # We run many threads in the hope that existing threads ids won't
172        # be recycled.
173        Bunch(f, 15).wait_for_finished()
174        if len(threading.enumerate()) != n:
175            # There is a small window during which a Thread instance's
176            # target function has finished running, but the Thread is still
177            # alive and registered.  Avoid spurious failures by waiting a
178            # bit more (seen on a buildbot).
179            time.sleep(0.4)
180            self.assertEqual(n, len(threading.enumerate()))
181
182    def test_timeout(self):
183        lock = self.locktype()
184        # Can't set timeout if not blocking
185        self.assertRaises(ValueError, lock.acquire, 0, 1)
186        # Invalid timeout values
187        self.assertRaises(ValueError, lock.acquire, timeout=-100)
188        self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
189        self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
190        # TIMEOUT_MAX is ok
191        lock.acquire(timeout=TIMEOUT_MAX)
192        lock.release()
193        t1 = time.monotonic()
194        self.assertTrue(lock.acquire(timeout=5))
195        t2 = time.monotonic()
196        # Just a sanity test that it didn't actually wait for the timeout.
197        self.assertLess(t2 - t1, 5)
198        results = []
199        def f():
200            t1 = time.monotonic()
201            results.append(lock.acquire(timeout=0.5))
202            t2 = time.monotonic()
203            results.append(t2 - t1)
204        Bunch(f, 1).wait_for_finished()
205        self.assertFalse(results[0])
206        self.assertTimeout(results[1], 0.5)
207
208    def test_weakref_exists(self):
209        lock = self.locktype()
210        ref = weakref.ref(lock)
211        self.assertIsNotNone(ref())
212
213    def test_weakref_deleted(self):
214        lock = self.locktype()
215        ref = weakref.ref(lock)
216        del lock
217        self.assertIsNone(ref())
218
219
220class LockTests(BaseLockTests):
221    """
222    Tests for non-recursive, weak locks
223    (which can be acquired and released from different threads).
224    """
225    def test_reacquire(self):
226        # Lock needs to be released before re-acquiring.
227        lock = self.locktype()
228        phase = []
229
230        def f():
231            lock.acquire()
232            phase.append(None)
233            lock.acquire()
234            phase.append(None)
235
236        with support.wait_threads_exit():
237            start_new_thread(f, ())
238            while len(phase) == 0:
239                _wait()
240            _wait()
241            self.assertEqual(len(phase), 1)
242            lock.release()
243            while len(phase) == 1:
244                _wait()
245            self.assertEqual(len(phase), 2)
246
247    def test_different_thread(self):
248        # Lock can be released from a different thread.
249        lock = self.locktype()
250        lock.acquire()
251        def f():
252            lock.release()
253        b = Bunch(f, 1)
254        b.wait_for_finished()
255        lock.acquire()
256        lock.release()
257
258    def test_state_after_timeout(self):
259        # Issue #11618: check that lock is in a proper state after a
260        # (non-zero) timeout.
261        lock = self.locktype()
262        lock.acquire()
263        self.assertFalse(lock.acquire(timeout=0.01))
264        lock.release()
265        self.assertFalse(lock.locked())
266        self.assertTrue(lock.acquire(blocking=False))
267
268
269class RLockTests(BaseLockTests):
270    """
271    Tests for recursive locks.
272    """
273    def test_reacquire(self):
274        lock = self.locktype()
275        lock.acquire()
276        lock.acquire()
277        lock.release()
278        lock.acquire()
279        lock.release()
280        lock.release()
281
282    def test_release_unacquired(self):
283        # Cannot release an unacquired lock
284        lock = self.locktype()
285        self.assertRaises(RuntimeError, lock.release)
286        lock.acquire()
287        lock.acquire()
288        lock.release()
289        lock.acquire()
290        lock.release()
291        lock.release()
292        self.assertRaises(RuntimeError, lock.release)
293
294    def test_release_save_unacquired(self):
295        # Cannot _release_save an unacquired lock
296        lock = self.locktype()
297        self.assertRaises(RuntimeError, lock._release_save)
298        lock.acquire()
299        lock.acquire()
300        lock.release()
301        lock.acquire()
302        lock.release()
303        lock.release()
304        self.assertRaises(RuntimeError, lock._release_save)
305
306    def test_different_thread(self):
307        # Cannot release from a different thread
308        lock = self.locktype()
309        def f():
310            lock.acquire()
311        b = Bunch(f, 1, True)
312        try:
313            self.assertRaises(RuntimeError, lock.release)
314        finally:
315            b.do_finish()
316        b.wait_for_finished()
317
318    def test__is_owned(self):
319        lock = self.locktype()
320        self.assertFalse(lock._is_owned())
321        lock.acquire()
322        self.assertTrue(lock._is_owned())
323        lock.acquire()
324        self.assertTrue(lock._is_owned())
325        result = []
326        def f():
327            result.append(lock._is_owned())
328        Bunch(f, 1).wait_for_finished()
329        self.assertFalse(result[0])
330        lock.release()
331        self.assertTrue(lock._is_owned())
332        lock.release()
333        self.assertFalse(lock._is_owned())
334
335
336class EventTests(BaseTestCase):
337    """
338    Tests for Event objects.
339    """
340
341    def test_is_set(self):
342        evt = self.eventtype()
343        self.assertFalse(evt.is_set())
344        evt.set()
345        self.assertTrue(evt.is_set())
346        evt.set()
347        self.assertTrue(evt.is_set())
348        evt.clear()
349        self.assertFalse(evt.is_set())
350        evt.clear()
351        self.assertFalse(evt.is_set())
352
353    def _check_notify(self, evt):
354        # All threads get notified
355        N = 5
356        results1 = []
357        results2 = []
358        def f():
359            results1.append(evt.wait())
360            results2.append(evt.wait())
361        b = Bunch(f, N)
362        b.wait_for_started()
363        _wait()
364        self.assertEqual(len(results1), 0)
365        evt.set()
366        b.wait_for_finished()
367        self.assertEqual(results1, [True] * N)
368        self.assertEqual(results2, [True] * N)
369
370    def test_notify(self):
371        evt = self.eventtype()
372        self._check_notify(evt)
373        # Another time, after an explicit clear()
374        evt.set()
375        evt.clear()
376        self._check_notify(evt)
377
378    def test_timeout(self):
379        evt = self.eventtype()
380        results1 = []
381        results2 = []
382        N = 5
383        def f():
384            results1.append(evt.wait(0.0))
385            t1 = time.monotonic()
386            r = evt.wait(0.5)
387            t2 = time.monotonic()
388            results2.append((r, t2 - t1))
389        Bunch(f, N).wait_for_finished()
390        self.assertEqual(results1, [False] * N)
391        for r, dt in results2:
392            self.assertFalse(r)
393            self.assertTimeout(dt, 0.5)
394        # The event is set
395        results1 = []
396        results2 = []
397        evt.set()
398        Bunch(f, N).wait_for_finished()
399        self.assertEqual(results1, [True] * N)
400        for r, dt in results2:
401            self.assertTrue(r)
402
403    def test_set_and_clear(self):
404        # Issue #13502: check that wait() returns true even when the event is
405        # cleared before the waiting thread is woken up.
406        evt = self.eventtype()
407        results = []
408        timeout = 0.250
409        N = 5
410        def f():
411            results.append(evt.wait(timeout * 4))
412        b = Bunch(f, N)
413        b.wait_for_started()
414        time.sleep(timeout)
415        evt.set()
416        evt.clear()
417        b.wait_for_finished()
418        self.assertEqual(results, [True] * N)
419
420    def test_reset_internal_locks(self):
421        # ensure that condition is still using a Lock after reset
422        evt = self.eventtype()
423        with evt._cond:
424            self.assertFalse(evt._cond.acquire(False))
425        evt._reset_internal_locks()
426        with evt._cond:
427            self.assertFalse(evt._cond.acquire(False))
428
429
430class ConditionTests(BaseTestCase):
431    """
432    Tests for condition variables.
433    """
434
435    def test_acquire(self):
436        cond = self.condtype()
437        # Be default we have an RLock: the condition can be acquired multiple
438        # times.
439        cond.acquire()
440        cond.acquire()
441        cond.release()
442        cond.release()
443        lock = threading.Lock()
444        cond = self.condtype(lock)
445        cond.acquire()
446        self.assertFalse(lock.acquire(False))
447        cond.release()
448        self.assertTrue(lock.acquire(False))
449        self.assertFalse(cond.acquire(False))
450        lock.release()
451        with cond:
452            self.assertFalse(lock.acquire(False))
453
454    def test_unacquired_wait(self):
455        cond = self.condtype()
456        self.assertRaises(RuntimeError, cond.wait)
457
458    def test_unacquired_notify(self):
459        cond = self.condtype()
460        self.assertRaises(RuntimeError, cond.notify)
461
462    def _check_notify(self, cond):
463        # Note that this test is sensitive to timing.  If the worker threads
464        # don't execute in a timely fashion, the main thread may think they
465        # are further along then they are.  The main thread therefore issues
466        # _wait() statements to try to make sure that it doesn't race ahead
467        # of the workers.
468        # Secondly, this test assumes that condition variables are not subject
469        # to spurious wakeups.  The absence of spurious wakeups is an implementation
470        # detail of Condition Variables in current CPython, but in general, not
471        # a guaranteed property of condition variables as a programming
472        # construct.  In particular, it is possible that this can no longer
473        # be conveniently guaranteed should their implementation ever change.
474        N = 5
475        ready = []
476        results1 = []
477        results2 = []
478        phase_num = 0
479        def f():
480            cond.acquire()
481            ready.append(phase_num)
482            result = cond.wait()
483            cond.release()
484            results1.append((result, phase_num))
485            cond.acquire()
486            ready.append(phase_num)
487            result = cond.wait()
488            cond.release()
489            results2.append((result, phase_num))
490        b = Bunch(f, N)
491        b.wait_for_started()
492        # first wait, to ensure all workers settle into cond.wait() before
493        # we continue. See issues #8799 and #30727.
494        while len(ready) < 5:
495            _wait()
496        ready.clear()
497        self.assertEqual(results1, [])
498        # Notify 3 threads at first
499        cond.acquire()
500        cond.notify(3)
501        _wait()
502        phase_num = 1
503        cond.release()
504        while len(results1) < 3:
505            _wait()
506        self.assertEqual(results1, [(True, 1)] * 3)
507        self.assertEqual(results2, [])
508        # make sure all awaken workers settle into cond.wait()
509        while len(ready) < 3:
510            _wait()
511        # Notify 5 threads: they might be in their first or second wait
512        cond.acquire()
513        cond.notify(5)
514        _wait()
515        phase_num = 2
516        cond.release()
517        while len(results1) + len(results2) < 8:
518            _wait()
519        self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
520        self.assertEqual(results2, [(True, 2)] * 3)
521        # make sure all workers settle into cond.wait()
522        while len(ready) < 5:
523            _wait()
524        # Notify all threads: they are all in their second wait
525        cond.acquire()
526        cond.notify_all()
527        _wait()
528        phase_num = 3
529        cond.release()
530        while len(results2) < 5:
531            _wait()
532        self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
533        self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
534        b.wait_for_finished()
535
536    def test_notify(self):
537        cond = self.condtype()
538        self._check_notify(cond)
539        # A second time, to check internal state is still ok.
540        self._check_notify(cond)
541
542    def test_timeout(self):
543        cond = self.condtype()
544        results = []
545        N = 5
546        def f():
547            cond.acquire()
548            t1 = time.monotonic()
549            result = cond.wait(0.5)
550            t2 = time.monotonic()
551            cond.release()
552            results.append((t2 - t1, result))
553        Bunch(f, N).wait_for_finished()
554        self.assertEqual(len(results), N)
555        for dt, result in results:
556            self.assertTimeout(dt, 0.5)
557            # Note that conceptually (that"s the condition variable protocol)
558            # a wait() may succeed even if no one notifies us and before any
559            # timeout occurs.  Spurious wakeups can occur.
560            # This makes it hard to verify the result value.
561            # In practice, this implementation has no spurious wakeups.
562            self.assertFalse(result)
563
564    def test_waitfor(self):
565        cond = self.condtype()
566        state = 0
567        def f():
568            with cond:
569                result = cond.wait_for(lambda : state==4)
570                self.assertTrue(result)
571                self.assertEqual(state, 4)
572        b = Bunch(f, 1)
573        b.wait_for_started()
574        for i in range(4):
575            time.sleep(0.01)
576            with cond:
577                state += 1
578                cond.notify()
579        b.wait_for_finished()
580
581    def test_waitfor_timeout(self):
582        cond = self.condtype()
583        state = 0
584        success = []
585        def f():
586            with cond:
587                dt = time.monotonic()
588                result = cond.wait_for(lambda : state==4, timeout=0.1)
589                dt = time.monotonic() - dt
590                self.assertFalse(result)
591                self.assertTimeout(dt, 0.1)
592                success.append(None)
593        b = Bunch(f, 1)
594        b.wait_for_started()
595        # Only increment 3 times, so state == 4 is never reached.
596        for i in range(3):
597            time.sleep(0.01)
598            with cond:
599                state += 1
600                cond.notify()
601        b.wait_for_finished()
602        self.assertEqual(len(success), 1)
603
604
605class BaseSemaphoreTests(BaseTestCase):
606    """
607    Common tests for {bounded, unbounded} semaphore objects.
608    """
609
610    def test_constructor(self):
611        self.assertRaises(ValueError, self.semtype, value = -1)
612        self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
613
614    def test_acquire(self):
615        sem = self.semtype(1)
616        sem.acquire()
617        sem.release()
618        sem = self.semtype(2)
619        sem.acquire()
620        sem.acquire()
621        sem.release()
622        sem.release()
623
624    def test_acquire_destroy(self):
625        sem = self.semtype()
626        sem.acquire()
627        del sem
628
629    def test_acquire_contended(self):
630        sem = self.semtype(7)
631        sem.acquire()
632        N = 10
633        sem_results = []
634        results1 = []
635        results2 = []
636        phase_num = 0
637        def f():
638            sem_results.append(sem.acquire())
639            results1.append(phase_num)
640            sem_results.append(sem.acquire())
641            results2.append(phase_num)
642        b = Bunch(f, 10)
643        b.wait_for_started()
644        while len(results1) + len(results2) < 6:
645            _wait()
646        self.assertEqual(results1 + results2, [0] * 6)
647        phase_num = 1
648        for i in range(7):
649            sem.release()
650        while len(results1) + len(results2) < 13:
651            _wait()
652        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
653        phase_num = 2
654        for i in range(6):
655            sem.release()
656        while len(results1) + len(results2) < 19:
657            _wait()
658        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
659        # The semaphore is still locked
660        self.assertFalse(sem.acquire(False))
661        # Final release, to let the last thread finish
662        sem.release()
663        b.wait_for_finished()
664        self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
665
666    def test_try_acquire(self):
667        sem = self.semtype(2)
668        self.assertTrue(sem.acquire(False))
669        self.assertTrue(sem.acquire(False))
670        self.assertFalse(sem.acquire(False))
671        sem.release()
672        self.assertTrue(sem.acquire(False))
673
674    def test_try_acquire_contended(self):
675        sem = self.semtype(4)
676        sem.acquire()
677        results = []
678        def f():
679            results.append(sem.acquire(False))
680            results.append(sem.acquire(False))
681        Bunch(f, 5).wait_for_finished()
682        # There can be a thread switch between acquiring the semaphore and
683        # appending the result, therefore results will not necessarily be
684        # ordered.
685        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
686
687    def test_acquire_timeout(self):
688        sem = self.semtype(2)
689        self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
690        self.assertTrue(sem.acquire(timeout=0.005))
691        self.assertTrue(sem.acquire(timeout=0.005))
692        self.assertFalse(sem.acquire(timeout=0.005))
693        sem.release()
694        self.assertTrue(sem.acquire(timeout=0.005))
695        t = time.monotonic()
696        self.assertFalse(sem.acquire(timeout=0.5))
697        dt = time.monotonic() - t
698        self.assertTimeout(dt, 0.5)
699
700    def test_default_value(self):
701        # The default initial value is 1.
702        sem = self.semtype()
703        sem.acquire()
704        def f():
705            sem.acquire()
706            sem.release()
707        b = Bunch(f, 1)
708        b.wait_for_started()
709        _wait()
710        self.assertFalse(b.finished)
711        sem.release()
712        b.wait_for_finished()
713
714    def test_with(self):
715        sem = self.semtype(2)
716        def _with(err=None):
717            with sem:
718                self.assertTrue(sem.acquire(False))
719                sem.release()
720                with sem:
721                    self.assertFalse(sem.acquire(False))
722                    if err:
723                        raise err
724        _with()
725        self.assertTrue(sem.acquire(False))
726        sem.release()
727        self.assertRaises(TypeError, _with, TypeError)
728        self.assertTrue(sem.acquire(False))
729        sem.release()
730
731class SemaphoreTests(BaseSemaphoreTests):
732    """
733    Tests for unbounded semaphores.
734    """
735
736    def test_release_unacquired(self):
737        # Unbounded releases are allowed and increment the semaphore's value
738        sem = self.semtype(1)
739        sem.release()
740        sem.acquire()
741        sem.acquire()
742        sem.release()
743
744
745class BoundedSemaphoreTests(BaseSemaphoreTests):
746    """
747    Tests for bounded semaphores.
748    """
749
750    def test_release_unacquired(self):
751        # Cannot go past the initial value
752        sem = self.semtype()
753        self.assertRaises(ValueError, sem.release)
754        sem.acquire()
755        sem.release()
756        self.assertRaises(ValueError, sem.release)
757
758
759class BarrierTests(BaseTestCase):
760    """
761    Tests for Barrier objects.
762    """
763    N = 5
764    defaultTimeout = 2.0
765
766    def setUp(self):
767        self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
768    def tearDown(self):
769        self.barrier.abort()
770
771    def run_threads(self, f):
772        b = Bunch(f, self.N-1)
773        f()
774        b.wait_for_finished()
775
776    def multipass(self, results, n):
777        m = self.barrier.parties
778        self.assertEqual(m, self.N)
779        for i in range(n):
780            results[0].append(True)
781            self.assertEqual(len(results[1]), i * m)
782            self.barrier.wait()
783            results[1].append(True)
784            self.assertEqual(len(results[0]), (i + 1) * m)
785            self.barrier.wait()
786        self.assertEqual(self.barrier.n_waiting, 0)
787        self.assertFalse(self.barrier.broken)
788
789    def test_barrier(self, passes=1):
790        """
791        Test that a barrier is passed in lockstep
792        """
793        results = [[],[]]
794        def f():
795            self.multipass(results, passes)
796        self.run_threads(f)
797
798    def test_barrier_10(self):
799        """
800        Test that a barrier works for 10 consecutive runs
801        """
802        return self.test_barrier(10)
803
804    def test_wait_return(self):
805        """
806        test the return value from barrier.wait
807        """
808        results = []
809        def f():
810            r = self.barrier.wait()
811            results.append(r)
812
813        self.run_threads(f)
814        self.assertEqual(sum(results), sum(range(self.N)))
815
816    def test_action(self):
817        """
818        Test the 'action' callback
819        """
820        results = []
821        def action():
822            results.append(True)
823        barrier = self.barriertype(self.N, action)
824        def f():
825            barrier.wait()
826            self.assertEqual(len(results), 1)
827
828        self.run_threads(f)
829
830    def test_abort(self):
831        """
832        Test that an abort will put the barrier in a broken state
833        """
834        results1 = []
835        results2 = []
836        def f():
837            try:
838                i = self.barrier.wait()
839                if i == self.N//2:
840                    raise RuntimeError
841                self.barrier.wait()
842                results1.append(True)
843            except threading.BrokenBarrierError:
844                results2.append(True)
845            except RuntimeError:
846                self.barrier.abort()
847                pass
848
849        self.run_threads(f)
850        self.assertEqual(len(results1), 0)
851        self.assertEqual(len(results2), self.N-1)
852        self.assertTrue(self.barrier.broken)
853
854    def test_reset(self):
855        """
856        Test that a 'reset' on a barrier frees the waiting threads
857        """
858        results1 = []
859        results2 = []
860        results3 = []
861        def f():
862            i = self.barrier.wait()
863            if i == self.N//2:
864                # Wait until the other threads are all in the barrier.
865                while self.barrier.n_waiting < self.N-1:
866                    time.sleep(0.001)
867                self.barrier.reset()
868            else:
869                try:
870                    self.barrier.wait()
871                    results1.append(True)
872                except threading.BrokenBarrierError:
873                    results2.append(True)
874            # Now, pass the barrier again
875            self.barrier.wait()
876            results3.append(True)
877
878        self.run_threads(f)
879        self.assertEqual(len(results1), 0)
880        self.assertEqual(len(results2), self.N-1)
881        self.assertEqual(len(results3), self.N)
882
883
884    def test_abort_and_reset(self):
885        """
886        Test that a barrier can be reset after being broken.
887        """
888        results1 = []
889        results2 = []
890        results3 = []
891        barrier2 = self.barriertype(self.N)
892        def f():
893            try:
894                i = self.barrier.wait()
895                if i == self.N//2:
896                    raise RuntimeError
897                self.barrier.wait()
898                results1.append(True)
899            except threading.BrokenBarrierError:
900                results2.append(True)
901            except RuntimeError:
902                self.barrier.abort()
903                pass
904            # Synchronize and reset the barrier.  Must synchronize first so
905            # that everyone has left it when we reset, and after so that no
906            # one enters it before the reset.
907            if barrier2.wait() == self.N//2:
908                self.barrier.reset()
909            barrier2.wait()
910            self.barrier.wait()
911            results3.append(True)
912
913        self.run_threads(f)
914        self.assertEqual(len(results1), 0)
915        self.assertEqual(len(results2), self.N-1)
916        self.assertEqual(len(results3), self.N)
917
918    def test_timeout(self):
919        """
920        Test wait(timeout)
921        """
922        def f():
923            i = self.barrier.wait()
924            if i == self.N // 2:
925                # One thread is late!
926                time.sleep(1.0)
927            # Default timeout is 2.0, so this is shorter.
928            self.assertRaises(threading.BrokenBarrierError,
929                              self.barrier.wait, 0.5)
930        self.run_threads(f)
931
932    def test_default_timeout(self):
933        """
934        Test the barrier's default timeout
935        """
936        # create a barrier with a low default timeout
937        barrier = self.barriertype(self.N, timeout=0.3)
938        def f():
939            i = barrier.wait()
940            if i == self.N // 2:
941                # One thread is later than the default timeout of 0.3s.
942                time.sleep(1.0)
943            self.assertRaises(threading.BrokenBarrierError, barrier.wait)
944        self.run_threads(f)
945
946    def test_single_thread(self):
947        b = self.barriertype(1)
948        b.wait()
949        b.wait()
950