1# Some simple queue module tests, plus some failure conditions 2# to ensure the Queue locks remain stable. 3import itertools 4import random 5import threading 6import time 7import unittest 8import weakref 9from test.support import gc_collect 10from test.support import import_helper 11from test.support import threading_helper 12 13 14py_queue = import_helper.import_fresh_module('queue', blocked=['_queue']) 15c_queue = import_helper.import_fresh_module('queue', fresh=['_queue']) 16need_c_queue = unittest.skipUnless(c_queue, "No _queue module found") 17 18QUEUE_SIZE = 5 19 20def qfull(q): 21 return q.maxsize > 0 and q.qsize() == q.maxsize 22 23# A thread to run a function that unclogs a blocked Queue. 24class _TriggerThread(threading.Thread): 25 def __init__(self, fn, args): 26 self.fn = fn 27 self.args = args 28 self.startedEvent = threading.Event() 29 threading.Thread.__init__(self) 30 31 def run(self): 32 # The sleep isn't necessary, but is intended to give the blocking 33 # function in the main thread a chance at actually blocking before 34 # we unclog it. But if the sleep is longer than the timeout-based 35 # tests wait in their blocking functions, those tests will fail. 36 # So we give them much longer timeout values compared to the 37 # sleep here (I aimed at 10 seconds for blocking functions -- 38 # they should never actually wait that long - they should make 39 # progress as soon as we call self.fn()). 40 time.sleep(0.1) 41 self.startedEvent.set() 42 self.fn(*self.args) 43 44 45# Execute a function that blocks, and in a separate thread, a function that 46# triggers the release. Returns the result of the blocking function. Caution: 47# block_func must guarantee to block until trigger_func is called, and 48# trigger_func must guarantee to change queue state so that block_func can make 49# enough progress to return. In particular, a block_func that just raises an 50# exception regardless of whether trigger_func is called will lead to 51# timing-dependent sporadic failures, and one of those went rarely seen but 52# undiagnosed for years. Now block_func must be unexceptional. If block_func 53# is supposed to raise an exception, call do_exceptional_blocking_test() 54# instead. 55 56class BlockingTestMixin: 57 58 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args): 59 thread = _TriggerThread(trigger_func, trigger_args) 60 thread.start() 61 try: 62 self.result = block_func(*block_args) 63 # If block_func returned before our thread made the call, we failed! 64 if not thread.startedEvent.is_set(): 65 self.fail("blocking function %r appeared not to block" % 66 block_func) 67 return self.result 68 finally: 69 threading_helper.join_thread(thread) # make sure the thread terminates 70 71 # Call this instead if block_func is supposed to raise an exception. 72 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func, 73 trigger_args, expected_exception_class): 74 thread = _TriggerThread(trigger_func, trigger_args) 75 thread.start() 76 try: 77 try: 78 block_func(*block_args) 79 except expected_exception_class: 80 raise 81 else: 82 self.fail("expected exception of kind %r" % 83 expected_exception_class) 84 finally: 85 threading_helper.join_thread(thread) # make sure the thread terminates 86 if not thread.startedEvent.is_set(): 87 self.fail("trigger thread ended but event never set") 88 89 90class BaseQueueTestMixin(BlockingTestMixin): 91 def setUp(self): 92 self.cum = 0 93 self.cumlock = threading.Lock() 94 95 def basic_queue_test(self, q): 96 if q.qsize(): 97 raise RuntimeError("Call this function with an empty queue") 98 self.assertTrue(q.empty()) 99 self.assertFalse(q.full()) 100 # I guess we better check things actually queue correctly a little :) 101 q.put(111) 102 q.put(333) 103 q.put(222) 104 target_order = dict(Queue = [111, 333, 222], 105 LifoQueue = [222, 333, 111], 106 PriorityQueue = [111, 222, 333]) 107 actual_order = [q.get(), q.get(), q.get()] 108 self.assertEqual(actual_order, target_order[q.__class__.__name__], 109 "Didn't seem to queue the correct data!") 110 for i in range(QUEUE_SIZE-1): 111 q.put(i) 112 self.assertTrue(q.qsize(), "Queue should not be empty") 113 self.assertTrue(not qfull(q), "Queue should not be full") 114 last = 2 * QUEUE_SIZE 115 full = 3 * 2 * QUEUE_SIZE 116 q.put(last) 117 self.assertTrue(qfull(q), "Queue should be full") 118 self.assertFalse(q.empty()) 119 self.assertTrue(q.full()) 120 try: 121 q.put(full, block=0) 122 self.fail("Didn't appear to block with a full queue") 123 except self.queue.Full: 124 pass 125 try: 126 q.put(full, timeout=0.01) 127 self.fail("Didn't appear to time-out with a full queue") 128 except self.queue.Full: 129 pass 130 # Test a blocking put 131 self.do_blocking_test(q.put, (full,), q.get, ()) 132 self.do_blocking_test(q.put, (full, True, 10), q.get, ()) 133 # Empty it 134 for i in range(QUEUE_SIZE): 135 q.get() 136 self.assertTrue(not q.qsize(), "Queue should be empty") 137 try: 138 q.get(block=0) 139 self.fail("Didn't appear to block with an empty queue") 140 except self.queue.Empty: 141 pass 142 try: 143 q.get(timeout=0.01) 144 self.fail("Didn't appear to time-out with an empty queue") 145 except self.queue.Empty: 146 pass 147 # Test a blocking get 148 self.do_blocking_test(q.get, (), q.put, ('empty',)) 149 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) 150 151 152 def worker(self, q): 153 while True: 154 x = q.get() 155 if x < 0: 156 q.task_done() 157 return 158 with self.cumlock: 159 self.cum += x 160 q.task_done() 161 162 def queue_join_test(self, q): 163 self.cum = 0 164 threads = [] 165 for i in (0,1): 166 thread = threading.Thread(target=self.worker, args=(q,)) 167 thread.start() 168 threads.append(thread) 169 for i in range(100): 170 q.put(i) 171 q.join() 172 self.assertEqual(self.cum, sum(range(100)), 173 "q.join() did not block until all tasks were done") 174 for i in (0,1): 175 q.put(-1) # instruct the threads to close 176 q.join() # verify that you can join twice 177 for thread in threads: 178 thread.join() 179 180 def test_queue_task_done(self): 181 # Test to make sure a queue task completed successfully. 182 q = self.type2test() 183 try: 184 q.task_done() 185 except ValueError: 186 pass 187 else: 188 self.fail("Did not detect task count going negative") 189 190 def test_queue_join(self): 191 # Test that a queue join()s successfully, and before anything else 192 # (done twice for insurance). 193 q = self.type2test() 194 self.queue_join_test(q) 195 self.queue_join_test(q) 196 try: 197 q.task_done() 198 except ValueError: 199 pass 200 else: 201 self.fail("Did not detect task count going negative") 202 203 def test_basic(self): 204 # Do it a couple of times on the same queue. 205 # Done twice to make sure works with same instance reused. 206 q = self.type2test(QUEUE_SIZE) 207 self.basic_queue_test(q) 208 self.basic_queue_test(q) 209 210 def test_negative_timeout_raises_exception(self): 211 q = self.type2test(QUEUE_SIZE) 212 with self.assertRaises(ValueError): 213 q.put(1, timeout=-1) 214 with self.assertRaises(ValueError): 215 q.get(1, timeout=-1) 216 217 def test_nowait(self): 218 q = self.type2test(QUEUE_SIZE) 219 for i in range(QUEUE_SIZE): 220 q.put_nowait(1) 221 with self.assertRaises(self.queue.Full): 222 q.put_nowait(1) 223 224 for i in range(QUEUE_SIZE): 225 q.get_nowait() 226 with self.assertRaises(self.queue.Empty): 227 q.get_nowait() 228 229 def test_shrinking_queue(self): 230 # issue 10110 231 q = self.type2test(3) 232 q.put(1) 233 q.put(2) 234 q.put(3) 235 with self.assertRaises(self.queue.Full): 236 q.put_nowait(4) 237 self.assertEqual(q.qsize(), 3) 238 q.maxsize = 2 # shrink the queue 239 with self.assertRaises(self.queue.Full): 240 q.put_nowait(4) 241 242class QueueTest(BaseQueueTestMixin): 243 244 def setUp(self): 245 self.type2test = self.queue.Queue 246 super().setUp() 247 248class PyQueueTest(QueueTest, unittest.TestCase): 249 queue = py_queue 250 251 252@need_c_queue 253class CQueueTest(QueueTest, unittest.TestCase): 254 queue = c_queue 255 256 257class LifoQueueTest(BaseQueueTestMixin): 258 259 def setUp(self): 260 self.type2test = self.queue.LifoQueue 261 super().setUp() 262 263 264class PyLifoQueueTest(LifoQueueTest, unittest.TestCase): 265 queue = py_queue 266 267 268@need_c_queue 269class CLifoQueueTest(LifoQueueTest, unittest.TestCase): 270 queue = c_queue 271 272 273class PriorityQueueTest(BaseQueueTestMixin): 274 275 def setUp(self): 276 self.type2test = self.queue.PriorityQueue 277 super().setUp() 278 279 280class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase): 281 queue = py_queue 282 283 284@need_c_queue 285class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase): 286 queue = c_queue 287 288 289# A Queue subclass that can provoke failure at a moment's notice :) 290class FailingQueueException(Exception): pass 291 292class FailingQueueTest(BlockingTestMixin): 293 294 def setUp(self): 295 296 Queue = self.queue.Queue 297 298 class FailingQueue(Queue): 299 def __init__(self, *args): 300 self.fail_next_put = False 301 self.fail_next_get = False 302 Queue.__init__(self, *args) 303 def _put(self, item): 304 if self.fail_next_put: 305 self.fail_next_put = False 306 raise FailingQueueException("You Lose") 307 return Queue._put(self, item) 308 def _get(self): 309 if self.fail_next_get: 310 self.fail_next_get = False 311 raise FailingQueueException("You Lose") 312 return Queue._get(self) 313 314 self.FailingQueue = FailingQueue 315 316 super().setUp() 317 318 def failing_queue_test(self, q): 319 if q.qsize(): 320 raise RuntimeError("Call this function with an empty queue") 321 for i in range(QUEUE_SIZE-1): 322 q.put(i) 323 # Test a failing non-blocking put. 324 q.fail_next_put = True 325 try: 326 q.put("oops", block=0) 327 self.fail("The queue didn't fail when it should have") 328 except FailingQueueException: 329 pass 330 q.fail_next_put = True 331 try: 332 q.put("oops", timeout=0.1) 333 self.fail("The queue didn't fail when it should have") 334 except FailingQueueException: 335 pass 336 q.put("last") 337 self.assertTrue(qfull(q), "Queue should be full") 338 # Test a failing blocking put 339 q.fail_next_put = True 340 try: 341 self.do_blocking_test(q.put, ("full",), q.get, ()) 342 self.fail("The queue didn't fail when it should have") 343 except FailingQueueException: 344 pass 345 # Check the Queue isn't damaged. 346 # put failed, but get succeeded - re-add 347 q.put("last") 348 # Test a failing timeout put 349 q.fail_next_put = True 350 try: 351 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (), 352 FailingQueueException) 353 self.fail("The queue didn't fail when it should have") 354 except FailingQueueException: 355 pass 356 # Check the Queue isn't damaged. 357 # put failed, but get succeeded - re-add 358 q.put("last") 359 self.assertTrue(qfull(q), "Queue should be full") 360 q.get() 361 self.assertTrue(not qfull(q), "Queue should not be full") 362 q.put("last") 363 self.assertTrue(qfull(q), "Queue should be full") 364 # Test a blocking put 365 self.do_blocking_test(q.put, ("full",), q.get, ()) 366 # Empty it 367 for i in range(QUEUE_SIZE): 368 q.get() 369 self.assertTrue(not q.qsize(), "Queue should be empty") 370 q.put("first") 371 q.fail_next_get = True 372 try: 373 q.get() 374 self.fail("The queue didn't fail when it should have") 375 except FailingQueueException: 376 pass 377 self.assertTrue(q.qsize(), "Queue should not be empty") 378 q.fail_next_get = True 379 try: 380 q.get(timeout=0.1) 381 self.fail("The queue didn't fail when it should have") 382 except FailingQueueException: 383 pass 384 self.assertTrue(q.qsize(), "Queue should not be empty") 385 q.get() 386 self.assertTrue(not q.qsize(), "Queue should be empty") 387 q.fail_next_get = True 388 try: 389 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',), 390 FailingQueueException) 391 self.fail("The queue didn't fail when it should have") 392 except FailingQueueException: 393 pass 394 # put succeeded, but get failed. 395 self.assertTrue(q.qsize(), "Queue should not be empty") 396 q.get() 397 self.assertTrue(not q.qsize(), "Queue should be empty") 398 399 def test_failing_queue(self): 400 401 # Test to make sure a queue is functioning correctly. 402 # Done twice to the same instance. 403 q = self.FailingQueue(QUEUE_SIZE) 404 self.failing_queue_test(q) 405 self.failing_queue_test(q) 406 407 408 409class PyFailingQueueTest(FailingQueueTest, unittest.TestCase): 410 queue = py_queue 411 412 413@need_c_queue 414class CFailingQueueTest(FailingQueueTest, unittest.TestCase): 415 queue = c_queue 416 417 418class BaseSimpleQueueTest: 419 420 def setUp(self): 421 self.q = self.type2test() 422 423 def feed(self, q, seq, rnd, sentinel): 424 while True: 425 try: 426 val = seq.pop() 427 except IndexError: 428 q.put(sentinel) 429 return 430 q.put(val) 431 if rnd.random() > 0.5: 432 time.sleep(rnd.random() * 1e-3) 433 434 def consume(self, q, results, sentinel): 435 while True: 436 val = q.get() 437 if val == sentinel: 438 return 439 results.append(val) 440 441 def consume_nonblock(self, q, results, sentinel): 442 while True: 443 while True: 444 try: 445 val = q.get(block=False) 446 except self.queue.Empty: 447 time.sleep(1e-5) 448 else: 449 break 450 if val == sentinel: 451 return 452 results.append(val) 453 454 def consume_timeout(self, q, results, sentinel): 455 while True: 456 while True: 457 try: 458 val = q.get(timeout=1e-5) 459 except self.queue.Empty: 460 pass 461 else: 462 break 463 if val == sentinel: 464 return 465 results.append(val) 466 467 def run_threads(self, n_threads, q, inputs, feed_func, consume_func): 468 results = [] 469 sentinel = None 470 seq = inputs.copy() 471 seq.reverse() 472 rnd = random.Random(42) 473 474 exceptions = [] 475 def log_exceptions(f): 476 def wrapper(*args, **kwargs): 477 try: 478 f(*args, **kwargs) 479 except BaseException as e: 480 exceptions.append(e) 481 return wrapper 482 483 feeders = [threading.Thread(target=log_exceptions(feed_func), 484 args=(q, seq, rnd, sentinel)) 485 for i in range(n_threads)] 486 consumers = [threading.Thread(target=log_exceptions(consume_func), 487 args=(q, results, sentinel)) 488 for i in range(n_threads)] 489 490 with threading_helper.start_threads(feeders + consumers): 491 pass 492 493 self.assertFalse(exceptions) 494 self.assertTrue(q.empty()) 495 self.assertEqual(q.qsize(), 0) 496 497 return results 498 499 def test_basic(self): 500 # Basic tests for get(), put() etc. 501 q = self.q 502 self.assertTrue(q.empty()) 503 self.assertEqual(q.qsize(), 0) 504 q.put(1) 505 self.assertFalse(q.empty()) 506 self.assertEqual(q.qsize(), 1) 507 q.put(2) 508 q.put_nowait(3) 509 q.put(4) 510 self.assertFalse(q.empty()) 511 self.assertEqual(q.qsize(), 4) 512 513 self.assertEqual(q.get(), 1) 514 self.assertEqual(q.qsize(), 3) 515 516 self.assertEqual(q.get_nowait(), 2) 517 self.assertEqual(q.qsize(), 2) 518 519 self.assertEqual(q.get(block=False), 3) 520 self.assertFalse(q.empty()) 521 self.assertEqual(q.qsize(), 1) 522 523 self.assertEqual(q.get(timeout=0.1), 4) 524 self.assertTrue(q.empty()) 525 self.assertEqual(q.qsize(), 0) 526 527 with self.assertRaises(self.queue.Empty): 528 q.get(block=False) 529 with self.assertRaises(self.queue.Empty): 530 q.get(timeout=1e-3) 531 with self.assertRaises(self.queue.Empty): 532 q.get_nowait() 533 self.assertTrue(q.empty()) 534 self.assertEqual(q.qsize(), 0) 535 536 def test_negative_timeout_raises_exception(self): 537 q = self.q 538 q.put(1) 539 with self.assertRaises(ValueError): 540 q.get(timeout=-1) 541 542 def test_order(self): 543 # Test a pair of concurrent put() and get() 544 q = self.q 545 inputs = list(range(100)) 546 results = self.run_threads(1, q, inputs, self.feed, self.consume) 547 548 # One producer, one consumer => results appended in well-defined order 549 self.assertEqual(results, inputs) 550 551 def test_many_threads(self): 552 # Test multiple concurrent put() and get() 553 N = 50 554 q = self.q 555 inputs = list(range(10000)) 556 results = self.run_threads(N, q, inputs, self.feed, self.consume) 557 558 # Multiple consumers without synchronization append the 559 # results in random order 560 self.assertEqual(sorted(results), inputs) 561 562 def test_many_threads_nonblock(self): 563 # Test multiple concurrent put() and get(block=False) 564 N = 50 565 q = self.q 566 inputs = list(range(10000)) 567 results = self.run_threads(N, q, inputs, 568 self.feed, self.consume_nonblock) 569 570 self.assertEqual(sorted(results), inputs) 571 572 def test_many_threads_timeout(self): 573 # Test multiple concurrent put() and get(timeout=...) 574 N = 50 575 q = self.q 576 inputs = list(range(1000)) 577 results = self.run_threads(N, q, inputs, 578 self.feed, self.consume_timeout) 579 580 self.assertEqual(sorted(results), inputs) 581 582 def test_references(self): 583 # The queue should lose references to each item as soon as 584 # it leaves the queue. 585 class C: 586 pass 587 588 N = 20 589 q = self.q 590 for i in range(N): 591 q.put(C()) 592 for i in range(N): 593 wr = weakref.ref(q.get()) 594 gc_collect() # For PyPy or other GCs. 595 self.assertIsNone(wr()) 596 597 598class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): 599 600 queue = py_queue 601 def setUp(self): 602 self.type2test = self.queue._PySimpleQueue 603 super().setUp() 604 605 606@need_c_queue 607class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase): 608 609 queue = c_queue 610 611 def setUp(self): 612 self.type2test = self.queue.SimpleQueue 613 super().setUp() 614 615 def test_is_default(self): 616 self.assertIs(self.type2test, self.queue.SimpleQueue) 617 self.assertIs(self.type2test, self.queue.SimpleQueue) 618 619 def test_reentrancy(self): 620 # bpo-14976: put() may be called reentrantly in an asynchronous 621 # callback. 622 q = self.q 623 gen = itertools.count() 624 N = 10000 625 results = [] 626 627 # This test exploits the fact that __del__ in a reference cycle 628 # can be called any time the GC may run. 629 630 class Circular(object): 631 def __init__(self): 632 self.circular = self 633 634 def __del__(self): 635 q.put(next(gen)) 636 637 while True: 638 o = Circular() 639 q.put(next(gen)) 640 del o 641 results.append(q.get()) 642 if results[-1] >= N: 643 break 644 645 self.assertEqual(results, list(range(N + 1))) 646 647 648if __name__ == "__main__": 649 unittest.main() 650