• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
7from thread import start_new_thread, get_ident
8import threading
9import unittest
10
11from test import test_support as support
12
13
14def _wait():
15    # A crude wait/yield function not relying on synchronization primitives.
16    time.sleep(0.01)
17
18class Bunch(object):
19    """
20    A bunch of threads.
21    """
22    def __init__(self, f, n, wait_before_exit=False):
23        """
24        Construct a bunch of `n` threads running the same function `f`.
25        If `wait_before_exit` is True, the threads won't terminate until
26        do_finish() is called.
27        """
28        self.f = f
29        self.n = n
30        self.started = []
31        self.finished = []
32        self._can_exit = not wait_before_exit
33        def task():
34            tid = get_ident()
35            self.started.append(tid)
36            try:
37                f()
38            finally:
39                self.finished.append(tid)
40                while not self._can_exit:
41                    _wait()
42        try:
43            for i in range(n):
44                start_new_thread(task, ())
45        except:
46            self._can_exit = True
47            raise
48
49    def wait_for_started(self):
50        while len(self.started) < self.n:
51            _wait()
52
53    def wait_for_finished(self):
54        while len(self.finished) < self.n:
55            _wait()
56
57    def do_finish(self):
58        self._can_exit = True
59
60
61class BaseTestCase(unittest.TestCase):
62    def setUp(self):
63        self._threads = support.threading_setup()
64
65    def tearDown(self):
66        support.threading_cleanup(*self._threads)
67        support.reap_children()
68
69
70class BaseLockTests(BaseTestCase):
71    """
72    Tests for both recursive and non-recursive locks.
73    """
74
75    def test_constructor(self):
76        lock = self.locktype()
77        del lock
78
79    def test_acquire_destroy(self):
80        lock = self.locktype()
81        lock.acquire()
82        del lock
83
84    def test_acquire_release(self):
85        lock = self.locktype()
86        lock.acquire()
87        lock.release()
88        del lock
89
90    def test_try_acquire(self):
91        lock = self.locktype()
92        self.assertTrue(lock.acquire(False))
93        lock.release()
94
95    def test_try_acquire_contended(self):
96        lock = self.locktype()
97        lock.acquire()
98        result = []
99        def f():
100            result.append(lock.acquire(False))
101        Bunch(f, 1).wait_for_finished()
102        self.assertFalse(result[0])
103        lock.release()
104
105    def test_acquire_contended(self):
106        lock = self.locktype()
107        lock.acquire()
108        N = 5
109        def f():
110            lock.acquire()
111            lock.release()
112
113        b = Bunch(f, N)
114        b.wait_for_started()
115        _wait()
116        self.assertEqual(len(b.finished), 0)
117        lock.release()
118        b.wait_for_finished()
119        self.assertEqual(len(b.finished), N)
120
121    def test_with(self):
122        lock = self.locktype()
123        def f():
124            lock.acquire()
125            lock.release()
126        def _with(err=None):
127            with lock:
128                if err is not None:
129                    raise err
130        _with()
131        # Check the lock is unacquired
132        Bunch(f, 1).wait_for_finished()
133        self.assertRaises(TypeError, _with, TypeError)
134        # Check the lock is unacquired
135        Bunch(f, 1).wait_for_finished()
136
137    def test_thread_leak(self):
138        # The lock shouldn't leak a Thread instance when used from a foreign
139        # (non-threading) thread.
140        lock = self.locktype()
141        def f():
142            lock.acquire()
143            lock.release()
144        n = len(threading.enumerate())
145        # We run many threads in the hope that existing threads ids won't
146        # be recycled.
147        Bunch(f, 15).wait_for_finished()
148        self.assertEqual(n, len(threading.enumerate()))
149
150
151class LockTests(BaseLockTests):
152    """
153    Tests for non-recursive, weak locks
154    (which can be acquired and released from different threads).
155    """
156    def test_reacquire(self):
157        # Lock needs to be released before re-acquiring.
158        lock = self.locktype()
159        phase = []
160        def f():
161            lock.acquire()
162            phase.append(None)
163            lock.acquire()
164            phase.append(None)
165        start_new_thread(f, ())
166        while len(phase) == 0:
167            _wait()
168        _wait()
169        self.assertEqual(len(phase), 1)
170        lock.release()
171        while len(phase) == 1:
172            _wait()
173        self.assertEqual(len(phase), 2)
174
175    def test_different_thread(self):
176        # Lock can be released from a different thread.
177        lock = self.locktype()
178        lock.acquire()
179        def f():
180            lock.release()
181        b = Bunch(f, 1)
182        b.wait_for_finished()
183        lock.acquire()
184        lock.release()
185
186
187class RLockTests(BaseLockTests):
188    """
189    Tests for recursive locks.
190    """
191    def test_reacquire(self):
192        lock = self.locktype()
193        lock.acquire()
194        lock.acquire()
195        lock.release()
196        lock.acquire()
197        lock.release()
198        lock.release()
199
200    def test_release_unacquired(self):
201        # Cannot release an unacquired lock
202        lock = self.locktype()
203        self.assertRaises(RuntimeError, lock.release)
204        lock.acquire()
205        lock.acquire()
206        lock.release()
207        lock.acquire()
208        lock.release()
209        lock.release()
210        self.assertRaises(RuntimeError, lock.release)
211
212    def test_different_thread(self):
213        # Cannot release from a different thread
214        lock = self.locktype()
215        def f():
216            lock.acquire()
217        b = Bunch(f, 1, True)
218        try:
219            self.assertRaises(RuntimeError, lock.release)
220        finally:
221            b.do_finish()
222
223    def test__is_owned(self):
224        lock = self.locktype()
225        self.assertFalse(lock._is_owned())
226        lock.acquire()
227        self.assertTrue(lock._is_owned())
228        lock.acquire()
229        self.assertTrue(lock._is_owned())
230        result = []
231        def f():
232            result.append(lock._is_owned())
233        Bunch(f, 1).wait_for_finished()
234        self.assertFalse(result[0])
235        lock.release()
236        self.assertTrue(lock._is_owned())
237        lock.release()
238        self.assertFalse(lock._is_owned())
239
240
241class EventTests(BaseTestCase):
242    """
243    Tests for Event objects.
244    """
245
246    def test_is_set(self):
247        evt = self.eventtype()
248        self.assertFalse(evt.is_set())
249        evt.set()
250        self.assertTrue(evt.is_set())
251        evt.set()
252        self.assertTrue(evt.is_set())
253        evt.clear()
254        self.assertFalse(evt.is_set())
255        evt.clear()
256        self.assertFalse(evt.is_set())
257
258    def _check_notify(self, evt):
259        # All threads get notified
260        N = 5
261        results1 = []
262        results2 = []
263        def f():
264            results1.append(evt.wait())
265            results2.append(evt.wait())
266        b = Bunch(f, N)
267        b.wait_for_started()
268        _wait()
269        self.assertEqual(len(results1), 0)
270        evt.set()
271        b.wait_for_finished()
272        self.assertEqual(results1, [True] * N)
273        self.assertEqual(results2, [True] * N)
274
275    def test_notify(self):
276        evt = self.eventtype()
277        self._check_notify(evt)
278        # Another time, after an explicit clear()
279        evt.set()
280        evt.clear()
281        self._check_notify(evt)
282
283    def test_timeout(self):
284        evt = self.eventtype()
285        results1 = []
286        results2 = []
287        N = 5
288        def f():
289            results1.append(evt.wait(0.0))
290            t1 = time.time()
291            r = evt.wait(0.2)
292            t2 = time.time()
293            results2.append((r, t2 - t1))
294        Bunch(f, N).wait_for_finished()
295        self.assertEqual(results1, [False] * N)
296        for r, dt in results2:
297            self.assertFalse(r)
298            self.assertTrue(dt >= 0.2, dt)
299        # The event is set
300        results1 = []
301        results2 = []
302        evt.set()
303        Bunch(f, N).wait_for_finished()
304        self.assertEqual(results1, [True] * N)
305        for r, dt in results2:
306            self.assertTrue(r)
307
308    def test_reset_internal_locks(self):
309        evt = self.eventtype()
310        old_lock = evt._Event__cond._Condition__lock
311        evt._reset_internal_locks()
312        new_lock = evt._Event__cond._Condition__lock
313        self.assertIsNot(new_lock, old_lock)
314        self.assertIs(type(new_lock), type(old_lock))
315
316
317class ConditionTests(BaseTestCase):
318    """
319    Tests for condition variables.
320    """
321
322    def test_acquire(self):
323        cond = self.condtype()
324        # Be default we have an RLock: the condition can be acquired multiple
325        # times.
326        cond.acquire()
327        cond.acquire()
328        cond.release()
329        cond.release()
330        lock = threading.Lock()
331        cond = self.condtype(lock)
332        cond.acquire()
333        self.assertFalse(lock.acquire(False))
334        cond.release()
335        self.assertTrue(lock.acquire(False))
336        self.assertFalse(cond.acquire(False))
337        lock.release()
338        with cond:
339            self.assertFalse(lock.acquire(False))
340
341    def test_unacquired_wait(self):
342        cond = self.condtype()
343        self.assertRaises(RuntimeError, cond.wait)
344
345    def test_unacquired_notify(self):
346        cond = self.condtype()
347        self.assertRaises(RuntimeError, cond.notify)
348
349    def _check_notify(self, cond):
350        N = 5
351        results1 = []
352        results2 = []
353        phase_num = 0
354        def f():
355            cond.acquire()
356            cond.wait()
357            cond.release()
358            results1.append(phase_num)
359            cond.acquire()
360            cond.wait()
361            cond.release()
362            results2.append(phase_num)
363        b = Bunch(f, N)
364        b.wait_for_started()
365        _wait()
366        self.assertEqual(results1, [])
367        # Notify 3 threads at first
368        cond.acquire()
369        cond.notify(3)
370        _wait()
371        phase_num = 1
372        cond.release()
373        while len(results1) < 3:
374            _wait()
375        self.assertEqual(results1, [1] * 3)
376        self.assertEqual(results2, [])
377        # Notify 5 threads: they might be in their first or second wait
378        cond.acquire()
379        cond.notify(5)
380        _wait()
381        phase_num = 2
382        cond.release()
383        while len(results1) + len(results2) < 8:
384            _wait()
385        self.assertEqual(results1, [1] * 3 + [2] * 2)
386        self.assertEqual(results2, [2] * 3)
387        # Notify all threads: they are all in their second wait
388        cond.acquire()
389        cond.notify_all()
390        _wait()
391        phase_num = 3
392        cond.release()
393        while len(results2) < 5:
394            _wait()
395        self.assertEqual(results1, [1] * 3 + [2] * 2)
396        self.assertEqual(results2, [2] * 3 + [3] * 2)
397        b.wait_for_finished()
398
399    def test_notify(self):
400        cond = self.condtype()
401        self._check_notify(cond)
402        # A second time, to check internal state is still ok.
403        self._check_notify(cond)
404
405    def test_timeout(self):
406        cond = self.condtype()
407        results = []
408        N = 5
409        def f():
410            cond.acquire()
411            t1 = time.time()
412            cond.wait(0.2)
413            t2 = time.time()
414            cond.release()
415            results.append(t2 - t1)
416        Bunch(f, N).wait_for_finished()
417        self.assertEqual(len(results), 5)
418        for dt in results:
419            self.assertTrue(dt >= 0.2, dt)
420
421
422class BaseSemaphoreTests(BaseTestCase):
423    """
424    Common tests for {bounded, unbounded} semaphore objects.
425    """
426
427    def test_constructor(self):
428        self.assertRaises(ValueError, self.semtype, value = -1)
429        self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
430
431    def test_acquire(self):
432        sem = self.semtype(1)
433        sem.acquire()
434        sem.release()
435        sem = self.semtype(2)
436        sem.acquire()
437        sem.acquire()
438        sem.release()
439        sem.release()
440
441    def test_acquire_destroy(self):
442        sem = self.semtype()
443        sem.acquire()
444        del sem
445
446    def test_acquire_contended(self):
447        sem = self.semtype(7)
448        sem.acquire()
449        N = 10
450        results1 = []
451        results2 = []
452        phase_num = 0
453        def f():
454            sem.acquire()
455            results1.append(phase_num)
456            sem.acquire()
457            results2.append(phase_num)
458        b = Bunch(f, 10)
459        b.wait_for_started()
460        while len(results1) + len(results2) < 6:
461            _wait()
462        self.assertEqual(results1 + results2, [0] * 6)
463        phase_num = 1
464        for i in range(7):
465            sem.release()
466        while len(results1) + len(results2) < 13:
467            _wait()
468        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
469        phase_num = 2
470        for i in range(6):
471            sem.release()
472        while len(results1) + len(results2) < 19:
473            _wait()
474        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
475        # The semaphore is still locked
476        self.assertFalse(sem.acquire(False))
477        # Final release, to let the last thread finish
478        sem.release()
479        b.wait_for_finished()
480
481    def test_try_acquire(self):
482        sem = self.semtype(2)
483        self.assertTrue(sem.acquire(False))
484        self.assertTrue(sem.acquire(False))
485        self.assertFalse(sem.acquire(False))
486        sem.release()
487        self.assertTrue(sem.acquire(False))
488
489    def test_try_acquire_contended(self):
490        sem = self.semtype(4)
491        sem.acquire()
492        results = []
493        def f():
494            results.append(sem.acquire(False))
495            results.append(sem.acquire(False))
496        Bunch(f, 5).wait_for_finished()
497        # There can be a thread switch between acquiring the semaphore and
498        # appending the result, therefore results will not necessarily be
499        # ordered.
500        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
501
502    def test_default_value(self):
503        # The default initial value is 1.
504        sem = self.semtype()
505        sem.acquire()
506        def f():
507            sem.acquire()
508            sem.release()
509        b = Bunch(f, 1)
510        b.wait_for_started()
511        _wait()
512        self.assertFalse(b.finished)
513        sem.release()
514        b.wait_for_finished()
515
516    def test_with(self):
517        sem = self.semtype(2)
518        def _with(err=None):
519            with sem:
520                self.assertTrue(sem.acquire(False))
521                sem.release()
522                with sem:
523                    self.assertFalse(sem.acquire(False))
524                    if err:
525                        raise err
526        _with()
527        self.assertTrue(sem.acquire(False))
528        sem.release()
529        self.assertRaises(TypeError, _with, TypeError)
530        self.assertTrue(sem.acquire(False))
531        sem.release()
532
533class SemaphoreTests(BaseSemaphoreTests):
534    """
535    Tests for unbounded semaphores.
536    """
537
538    def test_release_unacquired(self):
539        # Unbounded releases are allowed and increment the semaphore's value
540        sem = self.semtype(1)
541        sem.release()
542        sem.acquire()
543        sem.acquire()
544        sem.release()
545
546
547class BoundedSemaphoreTests(BaseSemaphoreTests):
548    """
549    Tests for bounded semaphores.
550    """
551
552    def test_release_unacquired(self):
553        # Cannot go past the initial value
554        sem = self.semtype()
555        self.assertRaises(ValueError, sem.release)
556        sem.acquire()
557        sem.release()
558        self.assertRaises(ValueError, sem.release)
559