• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for queues.py"""
2
3import unittest
4from unittest import mock
5
6import asyncio
7from test.test_asyncio import utils as test_utils
8
9
10def tearDownModule():
11    asyncio.set_event_loop_policy(None)
12
13
14class _QueueTestBase(test_utils.TestCase):
15
16    def setUp(self):
17        super().setUp()
18        self.loop = self.new_test_loop()
19
20
21class QueueBasicTests(_QueueTestBase):
22
23    def _test_repr_or_str(self, fn, expect_id):
24        """Test Queue's repr or str.
25
26        fn is repr or str. expect_id is True if we expect the Queue's id to
27        appear in fn(Queue()).
28        """
29        def gen():
30            when = yield
31            self.assertAlmostEqual(0.1, when)
32            when = yield 0.1
33            self.assertAlmostEqual(0.2, when)
34            yield 0.1
35
36        loop = self.new_test_loop(gen)
37
38        q = asyncio.Queue()
39        self.assertTrue(fn(q).startswith('<Queue'), fn(q))
40        id_is_present = hex(id(q)) in fn(q)
41        self.assertEqual(expect_id, id_is_present)
42
43        async def add_getter():
44            q = asyncio.Queue()
45            # Start a task that waits to get.
46            loop.create_task(q.get())
47            # Let it start waiting.
48            await asyncio.sleep(0.1)
49            self.assertTrue('_getters[1]' in fn(q))
50            # resume q.get coroutine to finish generator
51            q.put_nowait(0)
52
53        loop.run_until_complete(add_getter())
54
55        async def add_putter():
56            q = asyncio.Queue(maxsize=1)
57            q.put_nowait(1)
58            # Start a task that waits to put.
59            loop.create_task(q.put(2))
60            # Let it start waiting.
61            await asyncio.sleep(0.1)
62            self.assertTrue('_putters[1]' in fn(q))
63            # resume q.put coroutine to finish generator
64            q.get_nowait()
65
66        loop.run_until_complete(add_putter())
67        q = asyncio.Queue()
68        q.put_nowait(1)
69        self.assertTrue('_queue=[1]' in fn(q))
70
71    def test_repr(self):
72        self._test_repr_or_str(repr, True)
73
74    def test_str(self):
75        self._test_repr_or_str(str, False)
76
77    def test_empty(self):
78        q = asyncio.Queue()
79        self.assertTrue(q.empty())
80        q.put_nowait(1)
81        self.assertFalse(q.empty())
82        self.assertEqual(1, q.get_nowait())
83        self.assertTrue(q.empty())
84
85    def test_full(self):
86        q = asyncio.Queue()
87        self.assertFalse(q.full())
88
89        q = asyncio.Queue(maxsize=1)
90        q.put_nowait(1)
91        self.assertTrue(q.full())
92
93    def test_order(self):
94        q = asyncio.Queue()
95        for i in [1, 3, 2]:
96            q.put_nowait(i)
97
98        items = [q.get_nowait() for _ in range(3)]
99        self.assertEqual([1, 3, 2], items)
100
101    def test_maxsize(self):
102
103        def gen():
104            when = yield
105            self.assertAlmostEqual(0.01, when)
106            when = yield 0.01
107            self.assertAlmostEqual(0.02, when)
108            yield 0.01
109
110        loop = self.new_test_loop(gen)
111
112        q = asyncio.Queue(maxsize=2)
113        self.assertEqual(2, q.maxsize)
114        have_been_put = []
115
116        async def putter():
117            for i in range(3):
118                await q.put(i)
119                have_been_put.append(i)
120            return True
121
122        async def test():
123            t = loop.create_task(putter())
124            await asyncio.sleep(0.01)
125
126            # The putter is blocked after putting two items.
127            self.assertEqual([0, 1], have_been_put)
128            self.assertEqual(0, q.get_nowait())
129
130            # Let the putter resume and put last item.
131            await asyncio.sleep(0.01)
132            self.assertEqual([0, 1, 2], have_been_put)
133            self.assertEqual(1, q.get_nowait())
134            self.assertEqual(2, q.get_nowait())
135
136            self.assertTrue(t.done())
137            self.assertTrue(t.result())
138
139        loop.run_until_complete(test())
140        self.assertAlmostEqual(0.02, loop.time())
141
142
143class QueueGetTests(_QueueTestBase):
144
145    def test_blocking_get(self):
146        q = asyncio.Queue()
147        q.put_nowait(1)
148
149        async def queue_get():
150            return await q.get()
151
152        res = self.loop.run_until_complete(queue_get())
153        self.assertEqual(1, res)
154
155    def test_get_with_putters(self):
156        q = asyncio.Queue(1)
157        q.put_nowait(1)
158
159        waiter = self.loop.create_future()
160        q._putters.append(waiter)
161
162        res = self.loop.run_until_complete(q.get())
163        self.assertEqual(1, res)
164        self.assertTrue(waiter.done())
165        self.assertIsNone(waiter.result())
166
167    def test_blocking_get_wait(self):
168
169        def gen():
170            when = yield
171            self.assertAlmostEqual(0.01, when)
172            yield 0.01
173
174        loop = self.new_test_loop(gen)
175
176        q = asyncio.Queue()
177        started = asyncio.Event()
178        finished = False
179
180        async def queue_get():
181            nonlocal finished
182            started.set()
183            res = await q.get()
184            finished = True
185            return res
186
187        async def queue_put():
188            loop.call_later(0.01, q.put_nowait, 1)
189            queue_get_task = loop.create_task(queue_get())
190            await started.wait()
191            self.assertFalse(finished)
192            res = await queue_get_task
193            self.assertTrue(finished)
194            return res
195
196        res = loop.run_until_complete(queue_put())
197        self.assertEqual(1, res)
198        self.assertAlmostEqual(0.01, loop.time())
199
200    def test_nonblocking_get(self):
201        q = asyncio.Queue()
202        q.put_nowait(1)
203        self.assertEqual(1, q.get_nowait())
204
205    def test_nonblocking_get_exception(self):
206        q = asyncio.Queue()
207        self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
208
209    def test_get_cancelled(self):
210
211        def gen():
212            when = yield
213            self.assertAlmostEqual(0.01, when)
214            when = yield 0.01
215            self.assertAlmostEqual(0.061, when)
216            yield 0.05
217
218        loop = self.new_test_loop(gen)
219
220        q = asyncio.Queue()
221
222        async def queue_get():
223            return await asyncio.wait_for(q.get(), 0.051)
224
225        async def test():
226            get_task = loop.create_task(queue_get())
227            await asyncio.sleep(0.01)  # let the task start
228            q.put_nowait(1)
229            return await get_task
230
231        self.assertEqual(1, loop.run_until_complete(test()))
232        self.assertAlmostEqual(0.06, loop.time())
233
234    def test_get_cancelled_race(self):
235        q = asyncio.Queue()
236
237        t1 = self.loop.create_task(q.get())
238        t2 = self.loop.create_task(q.get())
239
240        test_utils.run_briefly(self.loop)
241        t1.cancel()
242        test_utils.run_briefly(self.loop)
243        self.assertTrue(t1.done())
244        q.put_nowait('a')
245        test_utils.run_briefly(self.loop)
246        self.assertEqual(t2.result(), 'a')
247
248    def test_get_with_waiting_putters(self):
249        q = asyncio.Queue(maxsize=1)
250        self.loop.create_task(q.put('a'))
251        self.loop.create_task(q.put('b'))
252        test_utils.run_briefly(self.loop)
253        self.assertEqual(self.loop.run_until_complete(q.get()), 'a')
254        self.assertEqual(self.loop.run_until_complete(q.get()), 'b')
255
256    def test_why_are_getters_waiting(self):
257        # From issue #268.
258        asyncio.set_event_loop(self.loop)
259
260        async def consumer(queue, num_expected):
261            for _ in range(num_expected):
262                await queue.get()
263
264        async def producer(queue, num_items):
265            for i in range(num_items):
266                await queue.put(i)
267
268        queue_size = 1
269        producer_num_items = 5
270
271        async def create_queue():
272            queue = asyncio.Queue(queue_size)
273            queue._get_loop()
274            return queue
275
276        async def test():
277            q = await create_queue()
278            await asyncio.gather(producer(q, producer_num_items),
279                                 consumer(q, producer_num_items))
280
281        self.loop.run_until_complete(test())
282
283    def test_cancelled_getters_not_being_held_in_self_getters(self):
284        def a_generator():
285            yield 0.1
286            yield 0.2
287
288        self.loop = self.new_test_loop(a_generator)
289
290        async def consumer(queue):
291            try:
292                item = await asyncio.wait_for(queue.get(), 0.1)
293            except asyncio.TimeoutError:
294                pass
295
296        queue = asyncio.Queue(maxsize=5)
297        self.loop.run_until_complete(self.loop.create_task(consumer(queue)))
298        self.assertEqual(len(queue._getters), 0)
299
300
301class QueuePutTests(_QueueTestBase):
302
303    def test_blocking_put(self):
304        q = asyncio.Queue()
305
306        async def queue_put():
307            # No maxsize, won't block.
308            await q.put(1)
309
310        self.loop.run_until_complete(queue_put())
311
312    def test_blocking_put_wait(self):
313
314        def gen():
315            when = yield
316            self.assertAlmostEqual(0.01, when)
317            yield 0.01
318
319        loop = self.new_test_loop(gen)
320
321        q = asyncio.Queue(maxsize=1)
322        started = asyncio.Event()
323        finished = False
324
325        async def queue_put():
326            nonlocal finished
327            started.set()
328            await q.put(1)
329            await q.put(2)
330            finished = True
331
332        async def queue_get():
333            loop.call_later(0.01, q.get_nowait)
334            queue_put_task = loop.create_task(queue_put())
335            await started.wait()
336            self.assertFalse(finished)
337            await queue_put_task
338            self.assertTrue(finished)
339
340        loop.run_until_complete(queue_get())
341        self.assertAlmostEqual(0.01, loop.time())
342
343    def test_nonblocking_put(self):
344        q = asyncio.Queue()
345        q.put_nowait(1)
346        self.assertEqual(1, q.get_nowait())
347
348    def test_get_cancel_drop_one_pending_reader(self):
349        def gen():
350            yield 0.01
351            yield 0.1
352
353        loop = self.new_test_loop(gen)
354
355        q = asyncio.Queue()
356
357        reader = loop.create_task(q.get())
358
359        loop.run_until_complete(asyncio.sleep(0.01))
360
361        q.put_nowait(1)
362        q.put_nowait(2)
363        reader.cancel()
364
365        try:
366            loop.run_until_complete(reader)
367        except asyncio.CancelledError:
368            # try again
369            reader = loop.create_task(q.get())
370            loop.run_until_complete(reader)
371
372        result = reader.result()
373        # if we get 2, it means 1 got dropped!
374        self.assertEqual(1, result)
375
376    def test_get_cancel_drop_many_pending_readers(self):
377        def gen():
378            yield 0.01
379            yield 0.1
380
381        loop = self.new_test_loop(gen)
382        loop.set_debug(True)
383
384        q = asyncio.Queue()
385
386        reader1 = loop.create_task(q.get())
387        reader2 = loop.create_task(q.get())
388        reader3 = loop.create_task(q.get())
389
390        loop.run_until_complete(asyncio.sleep(0.01))
391
392        q.put_nowait(1)
393        q.put_nowait(2)
394        reader1.cancel()
395
396        try:
397            loop.run_until_complete(reader1)
398        except asyncio.CancelledError:
399            pass
400
401        loop.run_until_complete(reader3)
402
403        # It is undefined in which order concurrent readers receive results.
404        self.assertEqual({reader2.result(), reader3.result()}, {1, 2})
405
406    def test_put_cancel_drop(self):
407
408        def gen():
409            yield 0.01
410            yield 0.1
411
412        loop = self.new_test_loop(gen)
413
414        q = asyncio.Queue(1)
415
416        q.put_nowait(1)
417
418        # putting a second item in the queue has to block (qsize=1)
419        writer = loop.create_task(q.put(2))
420        loop.run_until_complete(asyncio.sleep(0.01))
421
422        value1 = q.get_nowait()
423        self.assertEqual(value1, 1)
424
425        writer.cancel()
426        try:
427            loop.run_until_complete(writer)
428        except asyncio.CancelledError:
429            # try again
430            writer = loop.create_task(q.put(2))
431            loop.run_until_complete(writer)
432
433        value2 = q.get_nowait()
434        self.assertEqual(value2, 2)
435        self.assertEqual(q.qsize(), 0)
436
437    def test_nonblocking_put_exception(self):
438        q = asyncio.Queue(maxsize=1, )
439        q.put_nowait(1)
440        self.assertRaises(asyncio.QueueFull, q.put_nowait, 2)
441
442    def test_float_maxsize(self):
443        q = asyncio.Queue(maxsize=1.3, )
444        q.put_nowait(1)
445        q.put_nowait(2)
446        self.assertTrue(q.full())
447        self.assertRaises(asyncio.QueueFull, q.put_nowait, 3)
448
449        q = asyncio.Queue(maxsize=1.3, )
450
451        async def queue_put():
452            await q.put(1)
453            await q.put(2)
454            self.assertTrue(q.full())
455        self.loop.run_until_complete(queue_put())
456
457    def test_put_cancelled(self):
458        q = asyncio.Queue()
459
460        async def queue_put():
461            await q.put(1)
462            return True
463
464        async def test():
465            return await q.get()
466
467        t = self.loop.create_task(queue_put())
468        self.assertEqual(1, self.loop.run_until_complete(test()))
469        self.assertTrue(t.done())
470        self.assertTrue(t.result())
471
472    def test_put_cancelled_race(self):
473        q = asyncio.Queue(maxsize=1)
474
475        put_a = self.loop.create_task(q.put('a'))
476        put_b = self.loop.create_task(q.put('b'))
477        put_c = self.loop.create_task(q.put('X'))
478
479        test_utils.run_briefly(self.loop)
480        self.assertTrue(put_a.done())
481        self.assertFalse(put_b.done())
482
483        put_c.cancel()
484        test_utils.run_briefly(self.loop)
485        self.assertTrue(put_c.done())
486        self.assertEqual(q.get_nowait(), 'a')
487        test_utils.run_briefly(self.loop)
488        self.assertEqual(q.get_nowait(), 'b')
489
490        self.loop.run_until_complete(put_b)
491
492    def test_put_with_waiting_getters(self):
493        q = asyncio.Queue()
494        t = self.loop.create_task(q.get())
495        test_utils.run_briefly(self.loop)
496        self.loop.run_until_complete(q.put('a'))
497        self.assertEqual(self.loop.run_until_complete(t), 'a')
498
499    def test_why_are_putters_waiting(self):
500        # From issue #265.
501        asyncio.set_event_loop(self.loop)
502
503        async def create_queue():
504            q = asyncio.Queue(2)
505            q._get_loop()
506            return q
507
508        queue = self.loop.run_until_complete(create_queue())
509
510        async def putter(item):
511            await queue.put(item)
512
513        async def getter():
514            await asyncio.sleep(0)
515            num = queue.qsize()
516            for _ in range(num):
517                item = queue.get_nowait()
518
519        async def test():
520            t0 = putter(0)
521            t1 = putter(1)
522            t2 = putter(2)
523            t3 = putter(3)
524            await asyncio.gather(getter(), t0, t1, t2, t3)
525
526        self.loop.run_until_complete(test())
527
528    def test_cancelled_puts_not_being_held_in_self_putters(self):
529        def a_generator():
530            yield 0.01
531            yield 0.1
532
533        loop = self.new_test_loop(a_generator)
534
535        # Full queue.
536        queue = asyncio.Queue(maxsize=1)
537        queue.put_nowait(1)
538
539        # Task waiting for space to put an item in the queue.
540        put_task = loop.create_task(queue.put(1))
541        loop.run_until_complete(asyncio.sleep(0.01))
542
543        # Check that the putter is correctly removed from queue._putters when
544        # the task is canceled.
545        self.assertEqual(len(queue._putters), 1)
546        put_task.cancel()
547        with self.assertRaises(asyncio.CancelledError):
548            loop.run_until_complete(put_task)
549        self.assertEqual(len(queue._putters), 0)
550
551    def test_cancelled_put_silence_value_error_exception(self):
552        def gen():
553            yield 0.01
554            yield 0.1
555
556        loop = self.new_test_loop(gen)
557
558        # Full Queue.
559        queue = asyncio.Queue(1)
560        queue.put_nowait(1)
561
562        # Task waiting for space to put a item in the queue.
563        put_task = loop.create_task(queue.put(1))
564        loop.run_until_complete(asyncio.sleep(0.01))
565
566        # get_nowait() remove the future of put_task from queue._putters.
567        queue.get_nowait()
568        # When canceled, queue.put is going to remove its future from
569        # self._putters but it was removed previously by queue.get_nowait().
570        put_task.cancel()
571
572        # The ValueError exception triggered by queue._putters.remove(putter)
573        # inside queue.put should be silenced.
574        # If the ValueError is silenced we should catch a CancelledError.
575        with self.assertRaises(asyncio.CancelledError):
576            loop.run_until_complete(put_task)
577
578
579class LifoQueueTests(_QueueTestBase):
580
581    def test_order(self):
582        q = asyncio.LifoQueue()
583        for i in [1, 3, 2]:
584            q.put_nowait(i)
585
586        items = [q.get_nowait() for _ in range(3)]
587        self.assertEqual([2, 3, 1], items)
588
589
590class PriorityQueueTests(_QueueTestBase):
591
592    def test_order(self):
593        q = asyncio.PriorityQueue()
594        for i in [1, 3, 2]:
595            q.put_nowait(i)
596
597        items = [q.get_nowait() for _ in range(3)]
598        self.assertEqual([1, 2, 3], items)
599
600
601class _QueueJoinTestMixin:
602
603    q_class = None
604
605    def test_task_done_underflow(self):
606        q = self.q_class()
607        self.assertRaises(ValueError, q.task_done)
608
609    def test_task_done(self):
610        q = self.q_class()
611        for i in range(100):
612            q.put_nowait(i)
613
614        accumulator = 0
615
616        # Two workers get items from the queue and call task_done after each.
617        # Join the queue and assert all items have been processed.
618        running = True
619
620        async def worker():
621            nonlocal accumulator
622
623            while running:
624                item = await q.get()
625                accumulator += item
626                q.task_done()
627
628        async def test():
629            tasks = [self.loop.create_task(worker())
630                     for index in range(2)]
631
632            await q.join()
633            return tasks
634
635        tasks = self.loop.run_until_complete(test())
636        self.assertEqual(sum(range(100)), accumulator)
637
638        # close running generators
639        running = False
640        for i in range(len(tasks)):
641            q.put_nowait(0)
642        self.loop.run_until_complete(asyncio.wait(tasks))
643
644    def test_join_empty_queue(self):
645        q = self.q_class()
646
647        # Test that a queue join()s successfully, and before anything else
648        # (done twice for insurance).
649
650        async def join():
651            await q.join()
652            await q.join()
653
654        self.loop.run_until_complete(join())
655
656    def test_format(self):
657        q = self.q_class()
658        self.assertEqual(q._format(), 'maxsize=0')
659
660        q._unfinished_tasks = 2
661        self.assertEqual(q._format(), 'maxsize=0 tasks=2')
662
663
664class QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
665    q_class = asyncio.Queue
666
667
668class LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
669    q_class = asyncio.LifoQueue
670
671
672class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
673    q_class = asyncio.PriorityQueue
674
675
676if __name__ == '__main__':
677    unittest.main()
678