• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Some simple queue module tests, plus some failure conditions
2# to ensure the Queue locks remain stable.
3import itertools
4import random
5import sys
6import threading
7import time
8import unittest
9import weakref
10from test.support import gc_collect
11from test.support import import_helper
12from test.support import threading_helper
13
14# queue module depends on threading primitives
15threading_helper.requires_working_threading(module=True)
16
17py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
18c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
19need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
20
21QUEUE_SIZE = 5
22
23def qfull(q):
24    return q.maxsize > 0 and q.qsize() == q.maxsize
25
26# A thread to run a function that unclogs a blocked Queue.
27class _TriggerThread(threading.Thread):
28    def __init__(self, fn, args):
29        self.fn = fn
30        self.args = args
31        self.startedEvent = threading.Event()
32        threading.Thread.__init__(self)
33
34    def run(self):
35        # The sleep isn't necessary, but is intended to give the blocking
36        # function in the main thread a chance at actually blocking before
37        # we unclog it.  But if the sleep is longer than the timeout-based
38        # tests wait in their blocking functions, those tests will fail.
39        # So we give them much longer timeout values compared to the
40        # sleep here (I aimed at 10 seconds for blocking functions --
41        # they should never actually wait that long - they should make
42        # progress as soon as we call self.fn()).
43        time.sleep(0.1)
44        self.startedEvent.set()
45        self.fn(*self.args)
46
47
48# Execute a function that blocks, and in a separate thread, a function that
49# triggers the release.  Returns the result of the blocking function.  Caution:
50# block_func must guarantee to block until trigger_func is called, and
51# trigger_func must guarantee to change queue state so that block_func can make
52# enough progress to return.  In particular, a block_func that just raises an
53# exception regardless of whether trigger_func is called will lead to
54# timing-dependent sporadic failures, and one of those went rarely seen but
55# undiagnosed for years.  Now block_func must be unexceptional.  If block_func
56# is supposed to raise an exception, call do_exceptional_blocking_test()
57# instead.
58
59class BlockingTestMixin:
60
61    def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
62        thread = _TriggerThread(trigger_func, trigger_args)
63        thread.start()
64        try:
65            self.result = block_func(*block_args)
66            # If block_func returned before our thread made the call, we failed!
67            if not thread.startedEvent.is_set():
68                self.fail("blocking function %r appeared not to block" %
69                          block_func)
70            return self.result
71        finally:
72            threading_helper.join_thread(thread) # make sure the thread terminates
73
74    # Call this instead if block_func is supposed to raise an exception.
75    def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
76                                   trigger_args, expected_exception_class):
77        thread = _TriggerThread(trigger_func, trigger_args)
78        thread.start()
79        try:
80            try:
81                block_func(*block_args)
82            except expected_exception_class:
83                raise
84            else:
85                self.fail("expected exception of kind %r" %
86                                 expected_exception_class)
87        finally:
88            threading_helper.join_thread(thread) # make sure the thread terminates
89            if not thread.startedEvent.is_set():
90                self.fail("trigger thread ended but event never set")
91
92
93class BaseQueueTestMixin(BlockingTestMixin):
94    def setUp(self):
95        self.cum = 0
96        self.cumlock = threading.Lock()
97
98    def basic_queue_test(self, q):
99        if q.qsize():
100            raise RuntimeError("Call this function with an empty queue")
101        self.assertTrue(q.empty())
102        self.assertFalse(q.full())
103        # I guess we better check things actually queue correctly a little :)
104        q.put(111)
105        q.put(333)
106        q.put(222)
107        target_order = dict(Queue = [111, 333, 222],
108                            LifoQueue = [222, 333, 111],
109                            PriorityQueue = [111, 222, 333])
110        actual_order = [q.get(), q.get(), q.get()]
111        self.assertEqual(actual_order, target_order[q.__class__.__name__],
112                         "Didn't seem to queue the correct data!")
113        for i in range(QUEUE_SIZE-1):
114            q.put(i)
115            self.assertTrue(q.qsize(), "Queue should not be empty")
116        self.assertTrue(not qfull(q), "Queue should not be full")
117        last = 2 * QUEUE_SIZE
118        full = 3 * 2 * QUEUE_SIZE
119        q.put(last)
120        self.assertTrue(qfull(q), "Queue should be full")
121        self.assertFalse(q.empty())
122        self.assertTrue(q.full())
123        try:
124            q.put(full, block=0)
125            self.fail("Didn't appear to block with a full queue")
126        except self.queue.Full:
127            pass
128        try:
129            q.put(full, timeout=0.01)
130            self.fail("Didn't appear to time-out with a full queue")
131        except self.queue.Full:
132            pass
133        # Test a blocking put
134        self.do_blocking_test(q.put, (full,), q.get, ())
135        self.do_blocking_test(q.put, (full, True, 10), q.get, ())
136        # Empty it
137        for i in range(QUEUE_SIZE):
138            q.get()
139        self.assertTrue(not q.qsize(), "Queue should be empty")
140        try:
141            q.get(block=0)
142            self.fail("Didn't appear to block with an empty queue")
143        except self.queue.Empty:
144            pass
145        try:
146            q.get(timeout=0.01)
147            self.fail("Didn't appear to time-out with an empty queue")
148        except self.queue.Empty:
149            pass
150        # Test a blocking get
151        self.do_blocking_test(q.get, (), q.put, ('empty',))
152        self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
153
154
155    def worker(self, q):
156        while True:
157            x = q.get()
158            if x < 0:
159                q.task_done()
160                return
161            with self.cumlock:
162                self.cum += x
163            q.task_done()
164
165    def queue_join_test(self, q):
166        self.cum = 0
167        threads = []
168        for i in (0,1):
169            thread = threading.Thread(target=self.worker, args=(q,))
170            thread.start()
171            threads.append(thread)
172        for i in range(100):
173            q.put(i)
174        q.join()
175        self.assertEqual(self.cum, sum(range(100)),
176                         "q.join() did not block until all tasks were done")
177        for i in (0,1):
178            q.put(-1)         # instruct the threads to close
179        q.join()                # verify that you can join twice
180        for thread in threads:
181            thread.join()
182
183    def test_queue_task_done(self):
184        # Test to make sure a queue task completed successfully.
185        q = self.type2test()
186        try:
187            q.task_done()
188        except ValueError:
189            pass
190        else:
191            self.fail("Did not detect task count going negative")
192
193    def test_queue_join(self):
194        # Test that a queue join()s successfully, and before anything else
195        # (done twice for insurance).
196        q = self.type2test()
197        self.queue_join_test(q)
198        self.queue_join_test(q)
199        try:
200            q.task_done()
201        except ValueError:
202            pass
203        else:
204            self.fail("Did not detect task count going negative")
205
206    def test_basic(self):
207        # Do it a couple of times on the same queue.
208        # Done twice to make sure works with same instance reused.
209        q = self.type2test(QUEUE_SIZE)
210        self.basic_queue_test(q)
211        self.basic_queue_test(q)
212
213    def test_negative_timeout_raises_exception(self):
214        q = self.type2test(QUEUE_SIZE)
215        with self.assertRaises(ValueError):
216            q.put(1, timeout=-1)
217        with self.assertRaises(ValueError):
218            q.get(1, timeout=-1)
219
220    def test_nowait(self):
221        q = self.type2test(QUEUE_SIZE)
222        for i in range(QUEUE_SIZE):
223            q.put_nowait(1)
224        with self.assertRaises(self.queue.Full):
225            q.put_nowait(1)
226
227        for i in range(QUEUE_SIZE):
228            q.get_nowait()
229        with self.assertRaises(self.queue.Empty):
230            q.get_nowait()
231
232    def test_shrinking_queue(self):
233        # issue 10110
234        q = self.type2test(3)
235        q.put(1)
236        q.put(2)
237        q.put(3)
238        with self.assertRaises(self.queue.Full):
239            q.put_nowait(4)
240        self.assertEqual(q.qsize(), 3)
241        q.maxsize = 2                       # shrink the queue
242        with self.assertRaises(self.queue.Full):
243            q.put_nowait(4)
244
245    def test_shutdown_empty(self):
246        q = self.type2test()
247        q.shutdown()
248        with self.assertRaises(self.queue.ShutDown):
249            q.put("data")
250        with self.assertRaises(self.queue.ShutDown):
251            q.get()
252
253    def test_shutdown_nonempty(self):
254        q = self.type2test()
255        q.put("data")
256        q.shutdown()
257        q.get()
258        with self.assertRaises(self.queue.ShutDown):
259            q.get()
260
261    def test_shutdown_immediate(self):
262        q = self.type2test()
263        q.put("data")
264        q.shutdown(immediate=True)
265        with self.assertRaises(self.queue.ShutDown):
266            q.get()
267
268    def test_shutdown_allowed_transitions(self):
269        # allowed transitions would be from alive via shutdown to immediate
270        q = self.type2test()
271        self.assertFalse(q.is_shutdown)
272
273        q.shutdown()
274        self.assertTrue(q.is_shutdown)
275
276        q.shutdown(immediate=True)
277        self.assertTrue(q.is_shutdown)
278
279        q.shutdown(immediate=False)
280
281    def _shutdown_all_methods_in_one_thread(self, immediate):
282        q = self.type2test(2)
283        q.put("L")
284        q.put_nowait("O")
285        q.shutdown(immediate)
286
287        with self.assertRaises(self.queue.ShutDown):
288            q.put("E")
289        with self.assertRaises(self.queue.ShutDown):
290            q.put_nowait("W")
291        if immediate:
292            with self.assertRaises(self.queue.ShutDown):
293                q.get()
294            with self.assertRaises(self.queue.ShutDown):
295                q.get_nowait()
296            with self.assertRaises(ValueError):
297                q.task_done()
298            q.join()
299        else:
300            self.assertIn(q.get(), "LO")
301            q.task_done()
302            self.assertIn(q.get(), "LO")
303            q.task_done()
304            q.join()
305            # on shutdown(immediate=False)
306            # when queue is empty, should raise ShutDown Exception
307            with self.assertRaises(self.queue.ShutDown):
308                q.get() # p.get(True)
309            with self.assertRaises(self.queue.ShutDown):
310                q.get_nowait() # p.get(False)
311            with self.assertRaises(self.queue.ShutDown):
312                q.get(True, 1.0)
313
314    def test_shutdown_all_methods_in_one_thread(self):
315        return self._shutdown_all_methods_in_one_thread(False)
316
317    def test_shutdown_immediate_all_methods_in_one_thread(self):
318        return self._shutdown_all_methods_in_one_thread(True)
319
320    def _write_msg_thread(self, q, n, results,
321                            i_when_exec_shutdown, event_shutdown,
322                            barrier_start):
323        # All `write_msg_threads`
324        # put several items into the queue.
325        for i in range(0, i_when_exec_shutdown//2):
326            q.put((i, 'LOYD'))
327        # Wait for the barrier to be complete.
328        barrier_start.wait()
329
330        for i in range(i_when_exec_shutdown//2, n):
331            try:
332                q.put((i, "YDLO"))
333            except self.queue.ShutDown:
334                results.append(False)
335                break
336
337            # Trigger queue shutdown.
338            if i == i_when_exec_shutdown:
339                # Only one thread should call shutdown().
340                if not event_shutdown.is_set():
341                    event_shutdown.set()
342                    results.append(True)
343
344    def _read_msg_thread(self, q, results, barrier_start):
345        # Get at least one item.
346        q.get(True)
347        q.task_done()
348        # Wait for the barrier to be complete.
349        barrier_start.wait()
350        while True:
351            try:
352                q.get(False)
353                q.task_done()
354            except self.queue.ShutDown:
355                results.append(True)
356                break
357            except self.queue.Empty:
358                pass
359
360    def _shutdown_thread(self, q, results, event_end, immediate):
361        event_end.wait()
362        q.shutdown(immediate)
363        results.append(q.qsize() == 0)
364
365    def _join_thread(self, q, barrier_start):
366        # Wait for the barrier to be complete.
367        barrier_start.wait()
368        q.join()
369
370    def _shutdown_all_methods_in_many_threads(self, immediate):
371        # Run a 'multi-producers/consumers queue' use case,
372        # with enough items into the queue.
373        # When shutdown, all running threads will be joined.
374        q = self.type2test()
375        ps = []
376        res_puts = []
377        res_gets = []
378        res_shutdown = []
379        write_threads = 4
380        read_threads = 6
381        join_threads = 2
382        nb_msgs = 1024*64
383        nb_msgs_w = nb_msgs // write_threads
384        when_exec_shutdown = nb_msgs_w // 2
385        # Use of a Barrier to ensure that
386        # - all write threads put all their items into the queue,
387        # - all read thread get at least one item from the queue,
388        #   and keep on running until shutdown.
389        # The join thread is started only when shutdown is immediate.
390        nparties = write_threads + read_threads
391        if immediate:
392            nparties += join_threads
393        barrier_start = threading.Barrier(nparties)
394        ev_exec_shutdown = threading.Event()
395        lprocs = [
396            (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
397                                            when_exec_shutdown, ev_exec_shutdown,
398                                            barrier_start)),
399            (self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
400            (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
401            ]
402        if immediate:
403            lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
404        # start all threads.
405        for func, n, args in lprocs:
406            for i in range(n):
407                ps.append(threading.Thread(target=func, args=args))
408                ps[-1].start()
409        for thread in ps:
410            thread.join()
411
412        self.assertTrue(True in res_puts)
413        self.assertEqual(res_gets.count(True), read_threads)
414        if immediate:
415            self.assertListEqual(res_shutdown, [True])
416            self.assertTrue(q.empty())
417
418    def test_shutdown_all_methods_in_many_threads(self):
419        return self._shutdown_all_methods_in_many_threads(False)
420
421    def test_shutdown_immediate_all_methods_in_many_threads(self):
422        return self._shutdown_all_methods_in_many_threads(True)
423
424    def _get(self, q, go, results, shutdown=False):
425        go.wait()
426        try:
427            msg = q.get()
428            results.append(not shutdown)
429            return not shutdown
430        except self.queue.ShutDown:
431            results.append(shutdown)
432            return shutdown
433
434    def _get_shutdown(self, q, go, results):
435        return self._get(q, go, results, True)
436
437    def _get_task_done(self, q, go, results):
438        go.wait()
439        try:
440            msg = q.get()
441            q.task_done()
442            results.append(True)
443            return msg
444        except self.queue.ShutDown:
445            results.append(False)
446            return False
447
448    def _put(self, q, msg, go, results, shutdown=False):
449        go.wait()
450        try:
451            q.put(msg)
452            results.append(not shutdown)
453            return not shutdown
454        except self.queue.ShutDown:
455            results.append(shutdown)
456            return shutdown
457
458    def _put_shutdown(self, q, msg, go, results):
459        return self._put(q, msg, go, results, True)
460
461    def _join(self, q, results, shutdown=False):
462        try:
463            q.join()
464            results.append(not shutdown)
465            return not shutdown
466        except self.queue.ShutDown:
467            results.append(shutdown)
468            return shutdown
469
470    def _join_shutdown(self, q, results):
471        return self._join(q, results, True)
472
473    def _shutdown_get(self, immediate):
474        q = self.type2test(2)
475        results = []
476        go = threading.Event()
477        q.put("Y")
478        q.put("D")
479        # queue full
480
481        if immediate:
482            thrds = (
483                (self._get_shutdown, (q, go, results)),
484                (self._get_shutdown, (q, go, results)),
485            )
486        else:
487            thrds = (
488                # on shutdown(immediate=False)
489                # one of these threads should raise Shutdown
490                (self._get, (q, go, results)),
491                (self._get, (q, go, results)),
492                (self._get, (q, go, results)),
493            )
494        threads = []
495        for func, params in thrds:
496            threads.append(threading.Thread(target=func, args=params))
497            threads[-1].start()
498        q.shutdown(immediate)
499        go.set()
500        for t in threads:
501            t.join()
502        if immediate:
503            self.assertListEqual(results, [True, True])
504        else:
505            self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1))
506
507    def test_shutdown_get(self):
508        return self._shutdown_get(False)
509
510    def test_shutdown_immediate_get(self):
511        return self._shutdown_get(True)
512
513    def _shutdown_put(self, immediate):
514        q = self.type2test(2)
515        results = []
516        go = threading.Event()
517        q.put("Y")
518        q.put("D")
519        # queue fulled
520
521        thrds = (
522            (self._put_shutdown, (q, "E", go, results)),
523            (self._put_shutdown, (q, "W", go, results)),
524        )
525        threads = []
526        for func, params in thrds:
527            threads.append(threading.Thread(target=func, args=params))
528            threads[-1].start()
529        q.shutdown()
530        go.set()
531        for t in threads:
532            t.join()
533
534        self.assertEqual(results, [True]*len(thrds))
535
536    def test_shutdown_put(self):
537        return self._shutdown_put(False)
538
539    def test_shutdown_immediate_put(self):
540        return self._shutdown_put(True)
541
542    def _shutdown_join(self, immediate):
543        q = self.type2test()
544        results = []
545        q.put("Y")
546        go = threading.Event()
547        nb = q.qsize()
548
549        thrds = (
550            (self._join, (q, results)),
551            (self._join, (q, results)),
552        )
553        threads = []
554        for func, params in thrds:
555            threads.append(threading.Thread(target=func, args=params))
556            threads[-1].start()
557        if not immediate:
558            res = []
559            for i in range(nb):
560                threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res)))
561                threads[-1].start()
562        q.shutdown(immediate)
563        go.set()
564        for t in threads:
565            t.join()
566
567        self.assertEqual(results, [True]*len(thrds))
568
569    def test_shutdown_immediate_join(self):
570        return self._shutdown_join(True)
571
572    def test_shutdown_join(self):
573        return self._shutdown_join(False)
574
575    def _shutdown_put_join(self, immediate):
576        q = self.type2test(2)
577        results = []
578        go = threading.Event()
579        q.put("Y")
580        # queue not fulled
581
582        thrds = (
583            (self._put_shutdown, (q, "E", go, results)),
584            (self._join, (q, results)),
585        )
586        threads = []
587        for func, params in thrds:
588            threads.append(threading.Thread(target=func, args=params))
589            threads[-1].start()
590        self.assertEqual(q.unfinished_tasks, 1)
591
592        q.shutdown(immediate)
593        go.set()
594
595        if immediate:
596            with self.assertRaises(self.queue.ShutDown):
597                q.get_nowait()
598        else:
599            result = q.get()
600            self.assertEqual(result, "Y")
601            q.task_done()
602
603        for t in threads:
604            t.join()
605
606        self.assertEqual(results, [True]*len(thrds))
607
608    def test_shutdown_immediate_put_join(self):
609        return self._shutdown_put_join(True)
610
611    def test_shutdown_put_join(self):
612        return self._shutdown_put_join(False)
613
614    def test_shutdown_get_task_done_join(self):
615        q = self.type2test(2)
616        results = []
617        go = threading.Event()
618        q.put("Y")
619        q.put("D")
620        self.assertEqual(q.unfinished_tasks, q.qsize())
621
622        thrds = (
623            (self._get_task_done, (q, go, results)),
624            (self._get_task_done, (q, go, results)),
625            (self._join, (q, results)),
626            (self._join, (q, results)),
627        )
628        threads = []
629        for func, params in thrds:
630            threads.append(threading.Thread(target=func, args=params))
631            threads[-1].start()
632        go.set()
633        q.shutdown(False)
634        for t in threads:
635            t.join()
636
637        self.assertEqual(results, [True]*len(thrds))
638
639    def test_shutdown_pending_get(self):
640        def get():
641            try:
642                results.append(q.get())
643            except Exception as e:
644                results.append(e)
645
646        q = self.type2test()
647        results = []
648        get_thread = threading.Thread(target=get)
649        get_thread.start()
650        q.shutdown(immediate=False)
651        get_thread.join(timeout=10.0)
652        self.assertFalse(get_thread.is_alive())
653        self.assertEqual(len(results), 1)
654        self.assertIsInstance(results[0], self.queue.ShutDown)
655
656
657class QueueTest(BaseQueueTestMixin):
658
659    def setUp(self):
660        self.type2test = self.queue.Queue
661        super().setUp()
662
663class PyQueueTest(QueueTest, unittest.TestCase):
664    queue = py_queue
665
666
667@need_c_queue
668class CQueueTest(QueueTest, unittest.TestCase):
669    queue = c_queue
670
671
672class LifoQueueTest(BaseQueueTestMixin):
673
674    def setUp(self):
675        self.type2test = self.queue.LifoQueue
676        super().setUp()
677
678
679class PyLifoQueueTest(LifoQueueTest, unittest.TestCase):
680    queue = py_queue
681
682
683@need_c_queue
684class CLifoQueueTest(LifoQueueTest, unittest.TestCase):
685    queue = c_queue
686
687
688class PriorityQueueTest(BaseQueueTestMixin):
689
690    def setUp(self):
691        self.type2test = self.queue.PriorityQueue
692        super().setUp()
693
694
695class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
696    queue = py_queue
697
698
699@need_c_queue
700class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
701    queue = c_queue
702
703
704# A Queue subclass that can provoke failure at a moment's notice :)
705class FailingQueueException(Exception): pass
706
707
708class FailingQueueTest(BlockingTestMixin):
709
710    def setUp(self):
711
712        Queue = self.queue.Queue
713
714        class FailingQueue(Queue):
715            def __init__(self, *args):
716                self.fail_next_put = False
717                self.fail_next_get = False
718                Queue.__init__(self, *args)
719            def _put(self, item):
720                if self.fail_next_put:
721                    self.fail_next_put = False
722                    raise FailingQueueException("You Lose")
723                return Queue._put(self, item)
724            def _get(self):
725                if self.fail_next_get:
726                    self.fail_next_get = False
727                    raise FailingQueueException("You Lose")
728                return Queue._get(self)
729
730        self.FailingQueue = FailingQueue
731
732        super().setUp()
733
734    def failing_queue_test(self, q):
735        if q.qsize():
736            raise RuntimeError("Call this function with an empty queue")
737        for i in range(QUEUE_SIZE-1):
738            q.put(i)
739        # Test a failing non-blocking put.
740        q.fail_next_put = True
741        try:
742            q.put("oops", block=0)
743            self.fail("The queue didn't fail when it should have")
744        except FailingQueueException:
745            pass
746        q.fail_next_put = True
747        try:
748            q.put("oops", timeout=0.1)
749            self.fail("The queue didn't fail when it should have")
750        except FailingQueueException:
751            pass
752        q.put("last")
753        self.assertTrue(qfull(q), "Queue should be full")
754        # Test a failing blocking put
755        q.fail_next_put = True
756        try:
757            self.do_blocking_test(q.put, ("full",), q.get, ())
758            self.fail("The queue didn't fail when it should have")
759        except FailingQueueException:
760            pass
761        # Check the Queue isn't damaged.
762        # put failed, but get succeeded - re-add
763        q.put("last")
764        # Test a failing timeout put
765        q.fail_next_put = True
766        try:
767            self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
768                                              FailingQueueException)
769            self.fail("The queue didn't fail when it should have")
770        except FailingQueueException:
771            pass
772        # Check the Queue isn't damaged.
773        # put failed, but get succeeded - re-add
774        q.put("last")
775        self.assertTrue(qfull(q), "Queue should be full")
776        q.get()
777        self.assertTrue(not qfull(q), "Queue should not be full")
778        q.put("last")
779        self.assertTrue(qfull(q), "Queue should be full")
780        # Test a blocking put
781        self.do_blocking_test(q.put, ("full",), q.get, ())
782        # Empty it
783        for i in range(QUEUE_SIZE):
784            q.get()
785        self.assertTrue(not q.qsize(), "Queue should be empty")
786        q.put("first")
787        q.fail_next_get = True
788        try:
789            q.get()
790            self.fail("The queue didn't fail when it should have")
791        except FailingQueueException:
792            pass
793        self.assertTrue(q.qsize(), "Queue should not be empty")
794        q.fail_next_get = True
795        try:
796            q.get(timeout=0.1)
797            self.fail("The queue didn't fail when it should have")
798        except FailingQueueException:
799            pass
800        self.assertTrue(q.qsize(), "Queue should not be empty")
801        q.get()
802        self.assertTrue(not q.qsize(), "Queue should be empty")
803        q.fail_next_get = True
804        try:
805            self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
806                                              FailingQueueException)
807            self.fail("The queue didn't fail when it should have")
808        except FailingQueueException:
809            pass
810        # put succeeded, but get failed.
811        self.assertTrue(q.qsize(), "Queue should not be empty")
812        q.get()
813        self.assertTrue(not q.qsize(), "Queue should be empty")
814
815    def test_failing_queue(self):
816
817        # Test to make sure a queue is functioning correctly.
818        # Done twice to the same instance.
819        q = self.FailingQueue(QUEUE_SIZE)
820        self.failing_queue_test(q)
821        self.failing_queue_test(q)
822
823
824
825class PyFailingQueueTest(FailingQueueTest, unittest.TestCase):
826    queue = py_queue
827
828
829@need_c_queue
830class CFailingQueueTest(FailingQueueTest, unittest.TestCase):
831    queue = c_queue
832
833
834class BaseSimpleQueueTest:
835
836    def setUp(self):
837        self.q = self.type2test()
838
839    def feed(self, q, seq, rnd, sentinel):
840        while True:
841            try:
842                val = seq.pop()
843            except IndexError:
844                q.put(sentinel)
845                return
846            q.put(val)
847            if rnd.random() > 0.5:
848                time.sleep(rnd.random() * 1e-3)
849
850    def consume(self, q, results, sentinel):
851        while True:
852            val = q.get()
853            if val == sentinel:
854                return
855            results.append(val)
856
857    def consume_nonblock(self, q, results, sentinel):
858        while True:
859            while True:
860                try:
861                    val = q.get(block=False)
862                except self.queue.Empty:
863                    time.sleep(1e-5)
864                else:
865                    break
866            if val == sentinel:
867                return
868            results.append(val)
869
870    def consume_timeout(self, q, results, sentinel):
871        while True:
872            while True:
873                try:
874                    val = q.get(timeout=1e-5)
875                except self.queue.Empty:
876                    pass
877                else:
878                    break
879            if val == sentinel:
880                return
881            results.append(val)
882
883    def run_threads(self, n_threads, q, inputs, feed_func, consume_func):
884        results = []
885        sentinel = None
886        seq = inputs.copy()
887        seq.reverse()
888        rnd = random.Random(42)
889
890        exceptions = []
891        def log_exceptions(f):
892            def wrapper(*args, **kwargs):
893                try:
894                    f(*args, **kwargs)
895                except BaseException as e:
896                    exceptions.append(e)
897            return wrapper
898
899        feeders = [threading.Thread(target=log_exceptions(feed_func),
900                                    args=(q, seq, rnd, sentinel))
901                   for i in range(n_threads)]
902        consumers = [threading.Thread(target=log_exceptions(consume_func),
903                                      args=(q, results, sentinel))
904                     for i in range(n_threads)]
905
906        with threading_helper.start_threads(feeders + consumers):
907            pass
908
909        self.assertFalse(exceptions)
910        self.assertTrue(q.empty())
911        self.assertEqual(q.qsize(), 0)
912
913        return results
914
915    def test_basic(self):
916        # Basic tests for get(), put() etc.
917        q = self.q
918        self.assertTrue(q.empty())
919        self.assertEqual(q.qsize(), 0)
920        q.put(1)
921        self.assertFalse(q.empty())
922        self.assertEqual(q.qsize(), 1)
923        q.put(2)
924        q.put_nowait(3)
925        q.put(4)
926        self.assertFalse(q.empty())
927        self.assertEqual(q.qsize(), 4)
928
929        self.assertEqual(q.get(), 1)
930        self.assertEqual(q.qsize(), 3)
931
932        self.assertEqual(q.get_nowait(), 2)
933        self.assertEqual(q.qsize(), 2)
934
935        self.assertEqual(q.get(block=False), 3)
936        self.assertFalse(q.empty())
937        self.assertEqual(q.qsize(), 1)
938
939        self.assertEqual(q.get(timeout=0.1), 4)
940        self.assertTrue(q.empty())
941        self.assertEqual(q.qsize(), 0)
942
943        with self.assertRaises(self.queue.Empty):
944            q.get(block=False)
945        with self.assertRaises(self.queue.Empty):
946            q.get(timeout=1e-3)
947        with self.assertRaises(self.queue.Empty):
948            q.get_nowait()
949        self.assertTrue(q.empty())
950        self.assertEqual(q.qsize(), 0)
951
952    def test_negative_timeout_raises_exception(self):
953        q = self.q
954        q.put(1)
955        with self.assertRaises(ValueError):
956            q.get(timeout=-1)
957
958    def test_order(self):
959        # Test a pair of concurrent put() and get()
960        q = self.q
961        inputs = list(range(100))
962        results = self.run_threads(1, q, inputs, self.feed, self.consume)
963
964        # One producer, one consumer => results appended in well-defined order
965        self.assertEqual(results, inputs)
966
967    def test_many_threads(self):
968        # Test multiple concurrent put() and get()
969        N = 50
970        q = self.q
971        inputs = list(range(10000))
972        results = self.run_threads(N, q, inputs, self.feed, self.consume)
973
974        # Multiple consumers without synchronization append the
975        # results in random order
976        self.assertEqual(sorted(results), inputs)
977
978    def test_many_threads_nonblock(self):
979        # Test multiple concurrent put() and get(block=False)
980        N = 50
981        q = self.q
982        inputs = list(range(10000))
983        results = self.run_threads(N, q, inputs,
984                                   self.feed, self.consume_nonblock)
985
986        self.assertEqual(sorted(results), inputs)
987
988    def test_many_threads_timeout(self):
989        # Test multiple concurrent put() and get(timeout=...)
990        N = 50
991        q = self.q
992        inputs = list(range(1000))
993        results = self.run_threads(N, q, inputs,
994                                   self.feed, self.consume_timeout)
995
996        self.assertEqual(sorted(results), inputs)
997
998    def test_references(self):
999        # The queue should lose references to each item as soon as
1000        # it leaves the queue.
1001        class C:
1002            pass
1003
1004        N = 20
1005        q = self.q
1006        for i in range(N):
1007            q.put(C())
1008        for i in range(N):
1009            wr = weakref.ref(q.get())
1010            gc_collect()  # For PyPy or other GCs.
1011            self.assertIsNone(wr())
1012
1013
1014class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
1015
1016    queue = py_queue
1017    def setUp(self):
1018        self.type2test = self.queue._PySimpleQueue
1019        super().setUp()
1020
1021
1022@need_c_queue
1023class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
1024
1025    queue = c_queue
1026
1027    def setUp(self):
1028        self.type2test = self.queue.SimpleQueue
1029        super().setUp()
1030
1031    def test_is_default(self):
1032        self.assertIs(self.type2test, self.queue.SimpleQueue)
1033        self.assertIs(self.type2test, self.queue.SimpleQueue)
1034
1035    def test_reentrancy(self):
1036        # bpo-14976: put() may be called reentrantly in an asynchronous
1037        # callback.
1038        q = self.q
1039        gen = itertools.count()
1040        N = 10000
1041        results = []
1042
1043        # This test exploits the fact that __del__ in a reference cycle
1044        # can be called any time the GC may run.
1045
1046        class Circular(object):
1047            def __init__(self):
1048                self.circular = self
1049
1050            def __del__(self):
1051                q.put(next(gen))
1052
1053        while True:
1054            o = Circular()
1055            q.put(next(gen))
1056            del o
1057            results.append(q.get())
1058            if results[-1] >= N:
1059                break
1060
1061        self.assertEqual(results, list(range(N + 1)))
1062
1063
1064if __name__ == "__main__":
1065    unittest.main()
1066