• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests for queues.py"""
2
3import unittest
4from unittest import mock
5
6import asyncio
7from test.test_asyncio import utils as test_utils
8
9
10def tearDownModule():
11    asyncio.set_event_loop_policy(None)
12
13
14class _QueueTestBase(test_utils.TestCase):
15
16    def setUp(self):
17        super().setUp()
18        self.loop = self.new_test_loop()
19
20
21class QueueBasicTests(_QueueTestBase):
22
23    def _test_repr_or_str(self, fn, expect_id):
24        """Test Queue's repr or str.
25
26        fn is repr or str. expect_id is True if we expect the Queue's id to
27        appear in fn(Queue()).
28        """
29        def gen():
30            when = yield
31            self.assertAlmostEqual(0.1, when)
32            when = yield 0.1
33            self.assertAlmostEqual(0.2, when)
34            yield 0.1
35
36        loop = self.new_test_loop(gen)
37
38        with self.assertWarns(DeprecationWarning):
39            q = asyncio.Queue(loop=loop)
40        self.assertTrue(fn(q).startswith('<Queue'), fn(q))
41        id_is_present = hex(id(q)) in fn(q)
42        self.assertEqual(expect_id, id_is_present)
43
44        async def add_getter():
45            q = asyncio.Queue(loop=loop)
46            # Start a task that waits to get.
47            loop.create_task(q.get())
48            # Let it start waiting.
49            await asyncio.sleep(0.1)
50            self.assertTrue('_getters[1]' in fn(q))
51            # resume q.get coroutine to finish generator
52            q.put_nowait(0)
53
54        with self.assertWarns(DeprecationWarning):
55            loop.run_until_complete(add_getter())
56
57        async def add_putter():
58            q = asyncio.Queue(maxsize=1, loop=loop)
59            q.put_nowait(1)
60            # Start a task that waits to put.
61            loop.create_task(q.put(2))
62            # Let it start waiting.
63            await asyncio.sleep(0.1)
64            self.assertTrue('_putters[1]' in fn(q))
65            # resume q.put coroutine to finish generator
66            q.get_nowait()
67
68        with self.assertWarns(DeprecationWarning):
69            loop.run_until_complete(add_putter())
70            q = asyncio.Queue(loop=loop)
71        q.put_nowait(1)
72        self.assertTrue('_queue=[1]' in fn(q))
73
74    def test_ctor_loop(self):
75        loop = mock.Mock()
76        with self.assertWarns(DeprecationWarning):
77            q = asyncio.Queue(loop=loop)
78        self.assertIs(q._loop, loop)
79
80        with self.assertWarns(DeprecationWarning):
81            q = asyncio.Queue(loop=self.loop)
82        self.assertIs(q._loop, self.loop)
83
84    def test_ctor_noloop(self):
85        asyncio.set_event_loop(self.loop)
86        q = asyncio.Queue()
87        self.assertIs(q._loop, self.loop)
88
89    def test_repr(self):
90        self._test_repr_or_str(repr, True)
91
92    def test_str(self):
93        self._test_repr_or_str(str, False)
94
95    def test_empty(self):
96        with self.assertWarns(DeprecationWarning):
97            q = asyncio.Queue(loop=self.loop)
98        self.assertTrue(q.empty())
99        q.put_nowait(1)
100        self.assertFalse(q.empty())
101        self.assertEqual(1, q.get_nowait())
102        self.assertTrue(q.empty())
103
104    def test_full(self):
105        with self.assertWarns(DeprecationWarning):
106            q = asyncio.Queue(loop=self.loop)
107        self.assertFalse(q.full())
108
109        with self.assertWarns(DeprecationWarning):
110            q = asyncio.Queue(maxsize=1, loop=self.loop)
111        q.put_nowait(1)
112        self.assertTrue(q.full())
113
114    def test_order(self):
115        with self.assertWarns(DeprecationWarning):
116            q = asyncio.Queue(loop=self.loop)
117        for i in [1, 3, 2]:
118            q.put_nowait(i)
119
120        items = [q.get_nowait() for _ in range(3)]
121        self.assertEqual([1, 3, 2], items)
122
123    def test_maxsize(self):
124
125        def gen():
126            when = yield
127            self.assertAlmostEqual(0.01, when)
128            when = yield 0.01
129            self.assertAlmostEqual(0.02, when)
130            yield 0.01
131
132        loop = self.new_test_loop(gen)
133
134        with self.assertWarns(DeprecationWarning):
135            q = asyncio.Queue(maxsize=2, loop=loop)
136        self.assertEqual(2, q.maxsize)
137        have_been_put = []
138
139        async def putter():
140            for i in range(3):
141                await q.put(i)
142                have_been_put.append(i)
143            return True
144
145        async def test():
146            t = loop.create_task(putter())
147            await asyncio.sleep(0.01)
148
149            # The putter is blocked after putting two items.
150            self.assertEqual([0, 1], have_been_put)
151            self.assertEqual(0, q.get_nowait())
152
153            # Let the putter resume and put last item.
154            await asyncio.sleep(0.01)
155            self.assertEqual([0, 1, 2], have_been_put)
156            self.assertEqual(1, q.get_nowait())
157            self.assertEqual(2, q.get_nowait())
158
159            self.assertTrue(t.done())
160            self.assertTrue(t.result())
161
162        loop.run_until_complete(test())
163        self.assertAlmostEqual(0.02, loop.time())
164
165
166class QueueGetTests(_QueueTestBase):
167
168    def test_blocking_get(self):
169        with self.assertWarns(DeprecationWarning):
170            q = asyncio.Queue(loop=self.loop)
171        q.put_nowait(1)
172
173        async def queue_get():
174            return await q.get()
175
176        res = self.loop.run_until_complete(queue_get())
177        self.assertEqual(1, res)
178
179    def test_get_with_putters(self):
180        with self.assertWarns(DeprecationWarning):
181            q = asyncio.Queue(1, loop=self.loop)
182        q.put_nowait(1)
183
184        waiter = self.loop.create_future()
185        q._putters.append(waiter)
186
187        res = self.loop.run_until_complete(q.get())
188        self.assertEqual(1, res)
189        self.assertTrue(waiter.done())
190        self.assertIsNone(waiter.result())
191
192    def test_blocking_get_wait(self):
193
194        def gen():
195            when = yield
196            self.assertAlmostEqual(0.01, when)
197            yield 0.01
198
199        loop = self.new_test_loop(gen)
200
201        with self.assertWarns(DeprecationWarning):
202            q = asyncio.Queue(loop=loop)
203            started = asyncio.Event(loop=loop)
204        finished = False
205
206        async def queue_get():
207            nonlocal finished
208            started.set()
209            res = await q.get()
210            finished = True
211            return res
212
213        async def queue_put():
214            loop.call_later(0.01, q.put_nowait, 1)
215            queue_get_task = loop.create_task(queue_get())
216            await started.wait()
217            self.assertFalse(finished)
218            res = await queue_get_task
219            self.assertTrue(finished)
220            return res
221
222        res = loop.run_until_complete(queue_put())
223        self.assertEqual(1, res)
224        self.assertAlmostEqual(0.01, loop.time())
225
226    def test_nonblocking_get(self):
227        with self.assertWarns(DeprecationWarning):
228            q = asyncio.Queue(loop=self.loop)
229        q.put_nowait(1)
230        self.assertEqual(1, q.get_nowait())
231
232    def test_nonblocking_get_exception(self):
233        with self.assertWarns(DeprecationWarning):
234            q = asyncio.Queue(loop=self.loop)
235        self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
236
237    def test_get_cancelled(self):
238
239        def gen():
240            when = yield
241            self.assertAlmostEqual(0.01, when)
242            when = yield 0.01
243            self.assertAlmostEqual(0.061, when)
244            yield 0.05
245
246        loop = self.new_test_loop(gen)
247
248        with self.assertWarns(DeprecationWarning):
249            q = asyncio.Queue(loop=loop)
250
251        async def queue_get():
252            return await asyncio.wait_for(q.get(), 0.051)
253
254        async def test():
255            get_task = loop.create_task(queue_get())
256            await asyncio.sleep(0.01)  # let the task start
257            q.put_nowait(1)
258            return await get_task
259
260        self.assertEqual(1, loop.run_until_complete(test()))
261        self.assertAlmostEqual(0.06, loop.time())
262
263    def test_get_cancelled_race(self):
264        with self.assertWarns(DeprecationWarning):
265            q = asyncio.Queue(loop=self.loop)
266
267        t1 = self.loop.create_task(q.get())
268        t2 = self.loop.create_task(q.get())
269
270        test_utils.run_briefly(self.loop)
271        t1.cancel()
272        test_utils.run_briefly(self.loop)
273        self.assertTrue(t1.done())
274        q.put_nowait('a')
275        test_utils.run_briefly(self.loop)
276        self.assertEqual(t2.result(), 'a')
277
278    def test_get_with_waiting_putters(self):
279        with self.assertWarns(DeprecationWarning):
280            q = asyncio.Queue(loop=self.loop, maxsize=1)
281        self.loop.create_task(q.put('a'))
282        self.loop.create_task(q.put('b'))
283        test_utils.run_briefly(self.loop)
284        self.assertEqual(self.loop.run_until_complete(q.get()), 'a')
285        self.assertEqual(self.loop.run_until_complete(q.get()), 'b')
286
287    def test_why_are_getters_waiting(self):
288        # From issue #268.
289
290        async def consumer(queue, num_expected):
291            for _ in range(num_expected):
292                await queue.get()
293
294        async def producer(queue, num_items):
295            for i in range(num_items):
296                await queue.put(i)
297
298        queue_size = 1
299        producer_num_items = 5
300
301        with self.assertWarns(DeprecationWarning):
302            q = asyncio.Queue(queue_size, loop=self.loop)
303
304        self.loop.run_until_complete(
305            asyncio.gather(producer(q, producer_num_items),
306                           consumer(q, producer_num_items),
307                           loop=self.loop),
308            )
309
310    def test_cancelled_getters_not_being_held_in_self_getters(self):
311        def a_generator():
312            yield 0.1
313            yield 0.2
314
315        self.loop = self.new_test_loop(a_generator)
316
317        async def consumer(queue):
318            try:
319                item = await asyncio.wait_for(queue.get(), 0.1)
320            except asyncio.TimeoutError:
321                pass
322
323        with self.assertWarns(DeprecationWarning):
324            queue = asyncio.Queue(loop=self.loop, maxsize=5)
325        self.loop.run_until_complete(self.loop.create_task(consumer(queue)))
326        self.assertEqual(len(queue._getters), 0)
327
328
329class QueuePutTests(_QueueTestBase):
330
331    def test_blocking_put(self):
332        with self.assertWarns(DeprecationWarning):
333            q = asyncio.Queue(loop=self.loop)
334
335        async def queue_put():
336            # No maxsize, won't block.
337            await q.put(1)
338
339        self.loop.run_until_complete(queue_put())
340
341    def test_blocking_put_wait(self):
342
343        def gen():
344            when = yield
345            self.assertAlmostEqual(0.01, when)
346            yield 0.01
347
348        loop = self.new_test_loop(gen)
349
350        with self.assertWarns(DeprecationWarning):
351            q = asyncio.Queue(maxsize=1, loop=loop)
352            started = asyncio.Event(loop=loop)
353        finished = False
354
355        async def queue_put():
356            nonlocal finished
357            started.set()
358            await q.put(1)
359            await q.put(2)
360            finished = True
361
362        async def queue_get():
363            loop.call_later(0.01, q.get_nowait)
364            queue_put_task = loop.create_task(queue_put())
365            await started.wait()
366            self.assertFalse(finished)
367            await queue_put_task
368            self.assertTrue(finished)
369
370        loop.run_until_complete(queue_get())
371        self.assertAlmostEqual(0.01, loop.time())
372
373    def test_nonblocking_put(self):
374        with self.assertWarns(DeprecationWarning):
375            q = asyncio.Queue(loop=self.loop)
376        q.put_nowait(1)
377        self.assertEqual(1, q.get_nowait())
378
379    def test_get_cancel_drop_one_pending_reader(self):
380        def gen():
381            yield 0.01
382            yield 0.1
383
384        loop = self.new_test_loop(gen)
385
386        with self.assertWarns(DeprecationWarning):
387            q = asyncio.Queue(loop=loop)
388
389        reader = loop.create_task(q.get())
390
391        loop.run_until_complete(asyncio.sleep(0.01))
392
393        q.put_nowait(1)
394        q.put_nowait(2)
395        reader.cancel()
396
397        try:
398            loop.run_until_complete(reader)
399        except asyncio.CancelledError:
400            # try again
401            reader = loop.create_task(q.get())
402            loop.run_until_complete(reader)
403
404        result = reader.result()
405        # if we get 2, it means 1 got dropped!
406        self.assertEqual(1, result)
407
408    def test_get_cancel_drop_many_pending_readers(self):
409        def gen():
410            yield 0.01
411            yield 0.1
412
413        loop = self.new_test_loop(gen)
414        loop.set_debug(True)
415
416        with self.assertWarns(DeprecationWarning):
417            q = asyncio.Queue(loop=loop)
418
419        reader1 = loop.create_task(q.get())
420        reader2 = loop.create_task(q.get())
421        reader3 = loop.create_task(q.get())
422
423        loop.run_until_complete(asyncio.sleep(0.01))
424
425        q.put_nowait(1)
426        q.put_nowait(2)
427        reader1.cancel()
428
429        try:
430            loop.run_until_complete(reader1)
431        except asyncio.CancelledError:
432            pass
433
434        loop.run_until_complete(reader3)
435
436        # It is undefined in which order concurrent readers receive results.
437        self.assertEqual({reader2.result(), reader3.result()}, {1, 2})
438
439    def test_put_cancel_drop(self):
440
441        def gen():
442            yield 0.01
443            yield 0.1
444
445        loop = self.new_test_loop(gen)
446
447        with self.assertWarns(DeprecationWarning):
448            q = asyncio.Queue(1, loop=loop)
449
450        q.put_nowait(1)
451
452        # putting a second item in the queue has to block (qsize=1)
453        writer = loop.create_task(q.put(2))
454        loop.run_until_complete(asyncio.sleep(0.01))
455
456        value1 = q.get_nowait()
457        self.assertEqual(value1, 1)
458
459        writer.cancel()
460        try:
461            loop.run_until_complete(writer)
462        except asyncio.CancelledError:
463            # try again
464            writer = loop.create_task(q.put(2))
465            loop.run_until_complete(writer)
466
467        value2 = q.get_nowait()
468        self.assertEqual(value2, 2)
469        self.assertEqual(q.qsize(), 0)
470
471    def test_nonblocking_put_exception(self):
472        with self.assertWarns(DeprecationWarning):
473            q = asyncio.Queue(maxsize=1, loop=self.loop)
474        q.put_nowait(1)
475        self.assertRaises(asyncio.QueueFull, q.put_nowait, 2)
476
477    def test_float_maxsize(self):
478        with self.assertWarns(DeprecationWarning):
479            q = asyncio.Queue(maxsize=1.3, loop=self.loop)
480        q.put_nowait(1)
481        q.put_nowait(2)
482        self.assertTrue(q.full())
483        self.assertRaises(asyncio.QueueFull, q.put_nowait, 3)
484
485        with self.assertWarns(DeprecationWarning):
486            q = asyncio.Queue(maxsize=1.3, loop=self.loop)
487
488        async def queue_put():
489            await q.put(1)
490            await q.put(2)
491            self.assertTrue(q.full())
492        self.loop.run_until_complete(queue_put())
493
494    def test_put_cancelled(self):
495        with self.assertWarns(DeprecationWarning):
496            q = asyncio.Queue(loop=self.loop)
497
498        async def queue_put():
499            await q.put(1)
500            return True
501
502        async def test():
503            return await q.get()
504
505        t = self.loop.create_task(queue_put())
506        self.assertEqual(1, self.loop.run_until_complete(test()))
507        self.assertTrue(t.done())
508        self.assertTrue(t.result())
509
510    def test_put_cancelled_race(self):
511        with self.assertWarns(DeprecationWarning):
512            q = asyncio.Queue(loop=self.loop, maxsize=1)
513
514        put_a = self.loop.create_task(q.put('a'))
515        put_b = self.loop.create_task(q.put('b'))
516        put_c = self.loop.create_task(q.put('X'))
517
518        test_utils.run_briefly(self.loop)
519        self.assertTrue(put_a.done())
520        self.assertFalse(put_b.done())
521
522        put_c.cancel()
523        test_utils.run_briefly(self.loop)
524        self.assertTrue(put_c.done())
525        self.assertEqual(q.get_nowait(), 'a')
526        test_utils.run_briefly(self.loop)
527        self.assertEqual(q.get_nowait(), 'b')
528
529        self.loop.run_until_complete(put_b)
530
531    def test_put_with_waiting_getters(self):
532        with self.assertWarns(DeprecationWarning):
533            q = asyncio.Queue(loop=self.loop)
534        t = self.loop.create_task(q.get())
535        test_utils.run_briefly(self.loop)
536        self.loop.run_until_complete(q.put('a'))
537        self.assertEqual(self.loop.run_until_complete(t), 'a')
538
539    def test_why_are_putters_waiting(self):
540        # From issue #265.
541
542        with self.assertWarns(DeprecationWarning):
543            queue = asyncio.Queue(2, loop=self.loop)
544
545        async def putter(item):
546            await queue.put(item)
547
548        async def getter():
549            await asyncio.sleep(0)
550            num = queue.qsize()
551            for _ in range(num):
552                item = queue.get_nowait()
553
554        t0 = putter(0)
555        t1 = putter(1)
556        t2 = putter(2)
557        t3 = putter(3)
558        self.loop.run_until_complete(
559            asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop))
560
561    def test_cancelled_puts_not_being_held_in_self_putters(self):
562        def a_generator():
563            yield 0.01
564            yield 0.1
565
566        loop = self.new_test_loop(a_generator)
567
568        # Full queue.
569        with self.assertWarns(DeprecationWarning):
570            queue = asyncio.Queue(loop=loop, maxsize=1)
571        queue.put_nowait(1)
572
573        # Task waiting for space to put an item in the queue.
574        put_task = loop.create_task(queue.put(1))
575        loop.run_until_complete(asyncio.sleep(0.01))
576
577        # Check that the putter is correctly removed from queue._putters when
578        # the task is canceled.
579        self.assertEqual(len(queue._putters), 1)
580        put_task.cancel()
581        with self.assertRaises(asyncio.CancelledError):
582            loop.run_until_complete(put_task)
583        self.assertEqual(len(queue._putters), 0)
584
585    def test_cancelled_put_silence_value_error_exception(self):
586        def gen():
587            yield 0.01
588            yield 0.1
589
590        loop = self.new_test_loop(gen)
591
592        # Full Queue.
593        with self.assertWarns(DeprecationWarning):
594            queue = asyncio.Queue(1, loop=loop)
595        queue.put_nowait(1)
596
597        # Task waiting for space to put a item in the queue.
598        put_task = loop.create_task(queue.put(1))
599        loop.run_until_complete(asyncio.sleep(0.01))
600
601        # get_nowait() remove the future of put_task from queue._putters.
602        queue.get_nowait()
603        # When canceled, queue.put is going to remove its future from
604        # self._putters but it was removed previously by queue.get_nowait().
605        put_task.cancel()
606
607        # The ValueError exception triggered by queue._putters.remove(putter)
608        # inside queue.put should be silenced.
609        # If the ValueError is silenced we should catch a CancelledError.
610        with self.assertRaises(asyncio.CancelledError):
611            loop.run_until_complete(put_task)
612
613
614class LifoQueueTests(_QueueTestBase):
615
616    def test_order(self):
617        with self.assertWarns(DeprecationWarning):
618            q = asyncio.LifoQueue(loop=self.loop)
619        for i in [1, 3, 2]:
620            q.put_nowait(i)
621
622        items = [q.get_nowait() for _ in range(3)]
623        self.assertEqual([2, 3, 1], items)
624
625
626class PriorityQueueTests(_QueueTestBase):
627
628    def test_order(self):
629        with self.assertWarns(DeprecationWarning):
630            q = asyncio.PriorityQueue(loop=self.loop)
631        for i in [1, 3, 2]:
632            q.put_nowait(i)
633
634        items = [q.get_nowait() for _ in range(3)]
635        self.assertEqual([1, 2, 3], items)
636
637
638class _QueueJoinTestMixin:
639
640    q_class = None
641
642    def test_task_done_underflow(self):
643        with self.assertWarns(DeprecationWarning):
644            q = self.q_class(loop=self.loop)
645        self.assertRaises(ValueError, q.task_done)
646
647    def test_task_done(self):
648        with self.assertWarns(DeprecationWarning):
649            q = self.q_class(loop=self.loop)
650        for i in range(100):
651            q.put_nowait(i)
652
653        accumulator = 0
654
655        # Two workers get items from the queue and call task_done after each.
656        # Join the queue and assert all items have been processed.
657        running = True
658
659        async def worker():
660            nonlocal accumulator
661
662            while running:
663                item = await q.get()
664                accumulator += item
665                q.task_done()
666
667        async def test():
668            tasks = [self.loop.create_task(worker())
669                     for index in range(2)]
670
671            await q.join()
672            return tasks
673
674        tasks = self.loop.run_until_complete(test())
675        self.assertEqual(sum(range(100)), accumulator)
676
677        # close running generators
678        running = False
679        for i in range(len(tasks)):
680            q.put_nowait(0)
681        self.loop.run_until_complete(asyncio.wait(tasks))
682
683    def test_join_empty_queue(self):
684        with self.assertWarns(DeprecationWarning):
685            q = self.q_class(loop=self.loop)
686
687        # Test that a queue join()s successfully, and before anything else
688        # (done twice for insurance).
689
690        async def join():
691            await q.join()
692            await q.join()
693
694        self.loop.run_until_complete(join())
695
696    def test_format(self):
697        with self.assertWarns(DeprecationWarning):
698            q = self.q_class(loop=self.loop)
699        self.assertEqual(q._format(), 'maxsize=0')
700
701        q._unfinished_tasks = 2
702        self.assertEqual(q._format(), 'maxsize=0 tasks=2')
703
704
705class QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
706    q_class = asyncio.Queue
707
708
709class LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
710    q_class = asyncio.LifoQueue
711
712
713class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase):
714    q_class = asyncio.PriorityQueue
715
716
717if __name__ == '__main__':
718    unittest.main()
719