1""" 2Various tests for synchronization primitives. 3""" 4 5import sys 6import time 7from thread import start_new_thread, get_ident 8import threading 9import unittest 10 11from test import test_support as support 12 13 14def _wait(): 15 # A crude wait/yield function not relying on synchronization primitives. 16 time.sleep(0.01) 17 18class Bunch(object): 19 """ 20 A bunch of threads. 21 """ 22 def __init__(self, f, n, wait_before_exit=False): 23 """ 24 Construct a bunch of `n` threads running the same function `f`. 25 If `wait_before_exit` is True, the threads won't terminate until 26 do_finish() is called. 27 """ 28 self.f = f 29 self.n = n 30 self.started = [] 31 self.finished = [] 32 self._can_exit = not wait_before_exit 33 def task(): 34 tid = get_ident() 35 self.started.append(tid) 36 try: 37 f() 38 finally: 39 self.finished.append(tid) 40 while not self._can_exit: 41 _wait() 42 try: 43 for i in range(n): 44 start_new_thread(task, ()) 45 except: 46 self._can_exit = True 47 raise 48 49 def wait_for_started(self): 50 while len(self.started) < self.n: 51 _wait() 52 53 def wait_for_finished(self): 54 while len(self.finished) < self.n: 55 _wait() 56 57 def do_finish(self): 58 self._can_exit = True 59 60 61class BaseTestCase(unittest.TestCase): 62 def setUp(self): 63 self._threads = support.threading_setup() 64 65 def tearDown(self): 66 support.threading_cleanup(*self._threads) 67 support.reap_children() 68 69 70class BaseLockTests(BaseTestCase): 71 """ 72 Tests for both recursive and non-recursive locks. 73 """ 74 75 def test_constructor(self): 76 lock = self.locktype() 77 del lock 78 79 def test_acquire_destroy(self): 80 lock = self.locktype() 81 lock.acquire() 82 del lock 83 84 def test_acquire_release(self): 85 lock = self.locktype() 86 lock.acquire() 87 lock.release() 88 del lock 89 90 def test_try_acquire(self): 91 lock = self.locktype() 92 self.assertTrue(lock.acquire(False)) 93 lock.release() 94 95 def test_try_acquire_contended(self): 96 lock = self.locktype() 97 lock.acquire() 98 result = [] 99 def f(): 100 result.append(lock.acquire(False)) 101 Bunch(f, 1).wait_for_finished() 102 self.assertFalse(result[0]) 103 lock.release() 104 105 def test_acquire_contended(self): 106 lock = self.locktype() 107 lock.acquire() 108 N = 5 109 def f(): 110 lock.acquire() 111 lock.release() 112 113 b = Bunch(f, N) 114 b.wait_for_started() 115 _wait() 116 self.assertEqual(len(b.finished), 0) 117 lock.release() 118 b.wait_for_finished() 119 self.assertEqual(len(b.finished), N) 120 121 def test_with(self): 122 lock = self.locktype() 123 def f(): 124 lock.acquire() 125 lock.release() 126 def _with(err=None): 127 with lock: 128 if err is not None: 129 raise err 130 _with() 131 # Check the lock is unacquired 132 Bunch(f, 1).wait_for_finished() 133 self.assertRaises(TypeError, _with, TypeError) 134 # Check the lock is unacquired 135 Bunch(f, 1).wait_for_finished() 136 137 def test_thread_leak(self): 138 # The lock shouldn't leak a Thread instance when used from a foreign 139 # (non-threading) thread. 140 lock = self.locktype() 141 def f(): 142 lock.acquire() 143 lock.release() 144 n = len(threading.enumerate()) 145 # We run many threads in the hope that existing threads ids won't 146 # be recycled. 147 Bunch(f, 15).wait_for_finished() 148 self.assertEqual(n, len(threading.enumerate())) 149 150 151class LockTests(BaseLockTests): 152 """ 153 Tests for non-recursive, weak locks 154 (which can be acquired and released from different threads). 155 """ 156 def test_reacquire(self): 157 # Lock needs to be released before re-acquiring. 158 lock = self.locktype() 159 phase = [] 160 def f(): 161 lock.acquire() 162 phase.append(None) 163 lock.acquire() 164 phase.append(None) 165 start_new_thread(f, ()) 166 while len(phase) == 0: 167 _wait() 168 _wait() 169 self.assertEqual(len(phase), 1) 170 lock.release() 171 while len(phase) == 1: 172 _wait() 173 self.assertEqual(len(phase), 2) 174 175 def test_different_thread(self): 176 # Lock can be released from a different thread. 177 lock = self.locktype() 178 lock.acquire() 179 def f(): 180 lock.release() 181 b = Bunch(f, 1) 182 b.wait_for_finished() 183 lock.acquire() 184 lock.release() 185 186 187class RLockTests(BaseLockTests): 188 """ 189 Tests for recursive locks. 190 """ 191 def test_reacquire(self): 192 lock = self.locktype() 193 lock.acquire() 194 lock.acquire() 195 lock.release() 196 lock.acquire() 197 lock.release() 198 lock.release() 199 200 def test_release_unacquired(self): 201 # Cannot release an unacquired lock 202 lock = self.locktype() 203 self.assertRaises(RuntimeError, lock.release) 204 lock.acquire() 205 lock.acquire() 206 lock.release() 207 lock.acquire() 208 lock.release() 209 lock.release() 210 self.assertRaises(RuntimeError, lock.release) 211 212 def test_different_thread(self): 213 # Cannot release from a different thread 214 lock = self.locktype() 215 def f(): 216 lock.acquire() 217 b = Bunch(f, 1, True) 218 try: 219 self.assertRaises(RuntimeError, lock.release) 220 finally: 221 b.do_finish() 222 223 def test__is_owned(self): 224 lock = self.locktype() 225 self.assertFalse(lock._is_owned()) 226 lock.acquire() 227 self.assertTrue(lock._is_owned()) 228 lock.acquire() 229 self.assertTrue(lock._is_owned()) 230 result = [] 231 def f(): 232 result.append(lock._is_owned()) 233 Bunch(f, 1).wait_for_finished() 234 self.assertFalse(result[0]) 235 lock.release() 236 self.assertTrue(lock._is_owned()) 237 lock.release() 238 self.assertFalse(lock._is_owned()) 239 240 241class EventTests(BaseTestCase): 242 """ 243 Tests for Event objects. 244 """ 245 246 def test_is_set(self): 247 evt = self.eventtype() 248 self.assertFalse(evt.is_set()) 249 evt.set() 250 self.assertTrue(evt.is_set()) 251 evt.set() 252 self.assertTrue(evt.is_set()) 253 evt.clear() 254 self.assertFalse(evt.is_set()) 255 evt.clear() 256 self.assertFalse(evt.is_set()) 257 258 def _check_notify(self, evt): 259 # All threads get notified 260 N = 5 261 results1 = [] 262 results2 = [] 263 def f(): 264 results1.append(evt.wait()) 265 results2.append(evt.wait()) 266 b = Bunch(f, N) 267 b.wait_for_started() 268 _wait() 269 self.assertEqual(len(results1), 0) 270 evt.set() 271 b.wait_for_finished() 272 self.assertEqual(results1, [True] * N) 273 self.assertEqual(results2, [True] * N) 274 275 def test_notify(self): 276 evt = self.eventtype() 277 self._check_notify(evt) 278 # Another time, after an explicit clear() 279 evt.set() 280 evt.clear() 281 self._check_notify(evt) 282 283 def test_timeout(self): 284 evt = self.eventtype() 285 results1 = [] 286 results2 = [] 287 N = 5 288 def f(): 289 results1.append(evt.wait(0.0)) 290 t1 = time.time() 291 r = evt.wait(0.2) 292 t2 = time.time() 293 results2.append((r, t2 - t1)) 294 Bunch(f, N).wait_for_finished() 295 self.assertEqual(results1, [False] * N) 296 for r, dt in results2: 297 self.assertFalse(r) 298 self.assertTrue(dt >= 0.2, dt) 299 # The event is set 300 results1 = [] 301 results2 = [] 302 evt.set() 303 Bunch(f, N).wait_for_finished() 304 self.assertEqual(results1, [True] * N) 305 for r, dt in results2: 306 self.assertTrue(r) 307 308 def test_reset_internal_locks(self): 309 evt = self.eventtype() 310 old_lock = evt._Event__cond._Condition__lock 311 evt._reset_internal_locks() 312 new_lock = evt._Event__cond._Condition__lock 313 self.assertIsNot(new_lock, old_lock) 314 self.assertIs(type(new_lock), type(old_lock)) 315 316 317class ConditionTests(BaseTestCase): 318 """ 319 Tests for condition variables. 320 """ 321 322 def test_acquire(self): 323 cond = self.condtype() 324 # Be default we have an RLock: the condition can be acquired multiple 325 # times. 326 cond.acquire() 327 cond.acquire() 328 cond.release() 329 cond.release() 330 lock = threading.Lock() 331 cond = self.condtype(lock) 332 cond.acquire() 333 self.assertFalse(lock.acquire(False)) 334 cond.release() 335 self.assertTrue(lock.acquire(False)) 336 self.assertFalse(cond.acquire(False)) 337 lock.release() 338 with cond: 339 self.assertFalse(lock.acquire(False)) 340 341 def test_unacquired_wait(self): 342 cond = self.condtype() 343 self.assertRaises(RuntimeError, cond.wait) 344 345 def test_unacquired_notify(self): 346 cond = self.condtype() 347 self.assertRaises(RuntimeError, cond.notify) 348 349 def _check_notify(self, cond): 350 N = 5 351 results1 = [] 352 results2 = [] 353 phase_num = 0 354 def f(): 355 cond.acquire() 356 cond.wait() 357 cond.release() 358 results1.append(phase_num) 359 cond.acquire() 360 cond.wait() 361 cond.release() 362 results2.append(phase_num) 363 b = Bunch(f, N) 364 b.wait_for_started() 365 _wait() 366 self.assertEqual(results1, []) 367 # Notify 3 threads at first 368 cond.acquire() 369 cond.notify(3) 370 _wait() 371 phase_num = 1 372 cond.release() 373 while len(results1) < 3: 374 _wait() 375 self.assertEqual(results1, [1] * 3) 376 self.assertEqual(results2, []) 377 # Notify 5 threads: they might be in their first or second wait 378 cond.acquire() 379 cond.notify(5) 380 _wait() 381 phase_num = 2 382 cond.release() 383 while len(results1) + len(results2) < 8: 384 _wait() 385 self.assertEqual(results1, [1] * 3 + [2] * 2) 386 self.assertEqual(results2, [2] * 3) 387 # Notify all threads: they are all in their second wait 388 cond.acquire() 389 cond.notify_all() 390 _wait() 391 phase_num = 3 392 cond.release() 393 while len(results2) < 5: 394 _wait() 395 self.assertEqual(results1, [1] * 3 + [2] * 2) 396 self.assertEqual(results2, [2] * 3 + [3] * 2) 397 b.wait_for_finished() 398 399 def test_notify(self): 400 cond = self.condtype() 401 self._check_notify(cond) 402 # A second time, to check internal state is still ok. 403 self._check_notify(cond) 404 405 def test_timeout(self): 406 cond = self.condtype() 407 results = [] 408 N = 5 409 def f(): 410 cond.acquire() 411 t1 = time.time() 412 cond.wait(0.2) 413 t2 = time.time() 414 cond.release() 415 results.append(t2 - t1) 416 Bunch(f, N).wait_for_finished() 417 self.assertEqual(len(results), 5) 418 for dt in results: 419 self.assertTrue(dt >= 0.2, dt) 420 421 422class BaseSemaphoreTests(BaseTestCase): 423 """ 424 Common tests for {bounded, unbounded} semaphore objects. 425 """ 426 427 def test_constructor(self): 428 self.assertRaises(ValueError, self.semtype, value = -1) 429 self.assertRaises(ValueError, self.semtype, value = -sys.maxint) 430 431 def test_acquire(self): 432 sem = self.semtype(1) 433 sem.acquire() 434 sem.release() 435 sem = self.semtype(2) 436 sem.acquire() 437 sem.acquire() 438 sem.release() 439 sem.release() 440 441 def test_acquire_destroy(self): 442 sem = self.semtype() 443 sem.acquire() 444 del sem 445 446 def test_acquire_contended(self): 447 sem = self.semtype(7) 448 sem.acquire() 449 N = 10 450 results1 = [] 451 results2 = [] 452 phase_num = 0 453 def f(): 454 sem.acquire() 455 results1.append(phase_num) 456 sem.acquire() 457 results2.append(phase_num) 458 b = Bunch(f, 10) 459 b.wait_for_started() 460 while len(results1) + len(results2) < 6: 461 _wait() 462 self.assertEqual(results1 + results2, [0] * 6) 463 phase_num = 1 464 for i in range(7): 465 sem.release() 466 while len(results1) + len(results2) < 13: 467 _wait() 468 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) 469 phase_num = 2 470 for i in range(6): 471 sem.release() 472 while len(results1) + len(results2) < 19: 473 _wait() 474 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) 475 # The semaphore is still locked 476 self.assertFalse(sem.acquire(False)) 477 # Final release, to let the last thread finish 478 sem.release() 479 b.wait_for_finished() 480 481 def test_try_acquire(self): 482 sem = self.semtype(2) 483 self.assertTrue(sem.acquire(False)) 484 self.assertTrue(sem.acquire(False)) 485 self.assertFalse(sem.acquire(False)) 486 sem.release() 487 self.assertTrue(sem.acquire(False)) 488 489 def test_try_acquire_contended(self): 490 sem = self.semtype(4) 491 sem.acquire() 492 results = [] 493 def f(): 494 results.append(sem.acquire(False)) 495 results.append(sem.acquire(False)) 496 Bunch(f, 5).wait_for_finished() 497 # There can be a thread switch between acquiring the semaphore and 498 # appending the result, therefore results will not necessarily be 499 # ordered. 500 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) 501 502 def test_default_value(self): 503 # The default initial value is 1. 504 sem = self.semtype() 505 sem.acquire() 506 def f(): 507 sem.acquire() 508 sem.release() 509 b = Bunch(f, 1) 510 b.wait_for_started() 511 _wait() 512 self.assertFalse(b.finished) 513 sem.release() 514 b.wait_for_finished() 515 516 def test_with(self): 517 sem = self.semtype(2) 518 def _with(err=None): 519 with sem: 520 self.assertTrue(sem.acquire(False)) 521 sem.release() 522 with sem: 523 self.assertFalse(sem.acquire(False)) 524 if err: 525 raise err 526 _with() 527 self.assertTrue(sem.acquire(False)) 528 sem.release() 529 self.assertRaises(TypeError, _with, TypeError) 530 self.assertTrue(sem.acquire(False)) 531 sem.release() 532 533class SemaphoreTests(BaseSemaphoreTests): 534 """ 535 Tests for unbounded semaphores. 536 """ 537 538 def test_release_unacquired(self): 539 # Unbounded releases are allowed and increment the semaphore's value 540 sem = self.semtype(1) 541 sem.release() 542 sem.acquire() 543 sem.acquire() 544 sem.release() 545 546 547class BoundedSemaphoreTests(BaseSemaphoreTests): 548 """ 549 Tests for bounded semaphores. 550 """ 551 552 def test_release_unacquired(self): 553 # Cannot go past the initial value 554 sem = self.semtype() 555 self.assertRaises(ValueError, sem.release) 556 sem.acquire() 557 sem.release() 558 self.assertRaises(ValueError, sem.release) 559