1"""Tests for queues.py""" 2 3import unittest 4from unittest import mock 5 6import asyncio 7from test.test_asyncio import utils as test_utils 8 9 10def tearDownModule(): 11 asyncio.set_event_loop_policy(None) 12 13 14class _QueueTestBase(test_utils.TestCase): 15 16 def setUp(self): 17 super().setUp() 18 self.loop = self.new_test_loop() 19 20 21class QueueBasicTests(_QueueTestBase): 22 23 def _test_repr_or_str(self, fn, expect_id): 24 """Test Queue's repr or str. 25 26 fn is repr or str. expect_id is True if we expect the Queue's id to 27 appear in fn(Queue()). 28 """ 29 def gen(): 30 when = yield 31 self.assertAlmostEqual(0.1, when) 32 when = yield 0.1 33 self.assertAlmostEqual(0.2, when) 34 yield 0.1 35 36 loop = self.new_test_loop(gen) 37 38 with self.assertWarns(DeprecationWarning): 39 q = asyncio.Queue(loop=loop) 40 self.assertTrue(fn(q).startswith('<Queue'), fn(q)) 41 id_is_present = hex(id(q)) in fn(q) 42 self.assertEqual(expect_id, id_is_present) 43 44 async def add_getter(): 45 q = asyncio.Queue(loop=loop) 46 # Start a task that waits to get. 47 loop.create_task(q.get()) 48 # Let it start waiting. 49 await asyncio.sleep(0.1) 50 self.assertTrue('_getters[1]' in fn(q)) 51 # resume q.get coroutine to finish generator 52 q.put_nowait(0) 53 54 with self.assertWarns(DeprecationWarning): 55 loop.run_until_complete(add_getter()) 56 57 async def add_putter(): 58 q = asyncio.Queue(maxsize=1, loop=loop) 59 q.put_nowait(1) 60 # Start a task that waits to put. 61 loop.create_task(q.put(2)) 62 # Let it start waiting. 63 await asyncio.sleep(0.1) 64 self.assertTrue('_putters[1]' in fn(q)) 65 # resume q.put coroutine to finish generator 66 q.get_nowait() 67 68 with self.assertWarns(DeprecationWarning): 69 loop.run_until_complete(add_putter()) 70 q = asyncio.Queue(loop=loop) 71 q.put_nowait(1) 72 self.assertTrue('_queue=[1]' in fn(q)) 73 74 def test_ctor_loop(self): 75 loop = mock.Mock() 76 with self.assertWarns(DeprecationWarning): 77 q = asyncio.Queue(loop=loop) 78 self.assertIs(q._loop, loop) 79 80 with self.assertWarns(DeprecationWarning): 81 q = asyncio.Queue(loop=self.loop) 82 self.assertIs(q._loop, self.loop) 83 84 def test_ctor_noloop(self): 85 asyncio.set_event_loop(self.loop) 86 q = asyncio.Queue() 87 self.assertIs(q._loop, self.loop) 88 89 def test_repr(self): 90 self._test_repr_or_str(repr, True) 91 92 def test_str(self): 93 self._test_repr_or_str(str, False) 94 95 def test_empty(self): 96 with self.assertWarns(DeprecationWarning): 97 q = asyncio.Queue(loop=self.loop) 98 self.assertTrue(q.empty()) 99 q.put_nowait(1) 100 self.assertFalse(q.empty()) 101 self.assertEqual(1, q.get_nowait()) 102 self.assertTrue(q.empty()) 103 104 def test_full(self): 105 with self.assertWarns(DeprecationWarning): 106 q = asyncio.Queue(loop=self.loop) 107 self.assertFalse(q.full()) 108 109 with self.assertWarns(DeprecationWarning): 110 q = asyncio.Queue(maxsize=1, loop=self.loop) 111 q.put_nowait(1) 112 self.assertTrue(q.full()) 113 114 def test_order(self): 115 with self.assertWarns(DeprecationWarning): 116 q = asyncio.Queue(loop=self.loop) 117 for i in [1, 3, 2]: 118 q.put_nowait(i) 119 120 items = [q.get_nowait() for _ in range(3)] 121 self.assertEqual([1, 3, 2], items) 122 123 def test_maxsize(self): 124 125 def gen(): 126 when = yield 127 self.assertAlmostEqual(0.01, when) 128 when = yield 0.01 129 self.assertAlmostEqual(0.02, when) 130 yield 0.01 131 132 loop = self.new_test_loop(gen) 133 134 with self.assertWarns(DeprecationWarning): 135 q = asyncio.Queue(maxsize=2, loop=loop) 136 self.assertEqual(2, q.maxsize) 137 have_been_put = [] 138 139 async def putter(): 140 for i in range(3): 141 await q.put(i) 142 have_been_put.append(i) 143 return True 144 145 async def test(): 146 t = loop.create_task(putter()) 147 await asyncio.sleep(0.01) 148 149 # The putter is blocked after putting two items. 150 self.assertEqual([0, 1], have_been_put) 151 self.assertEqual(0, q.get_nowait()) 152 153 # Let the putter resume and put last item. 154 await asyncio.sleep(0.01) 155 self.assertEqual([0, 1, 2], have_been_put) 156 self.assertEqual(1, q.get_nowait()) 157 self.assertEqual(2, q.get_nowait()) 158 159 self.assertTrue(t.done()) 160 self.assertTrue(t.result()) 161 162 loop.run_until_complete(test()) 163 self.assertAlmostEqual(0.02, loop.time()) 164 165 166class QueueGetTests(_QueueTestBase): 167 168 def test_blocking_get(self): 169 with self.assertWarns(DeprecationWarning): 170 q = asyncio.Queue(loop=self.loop) 171 q.put_nowait(1) 172 173 async def queue_get(): 174 return await q.get() 175 176 res = self.loop.run_until_complete(queue_get()) 177 self.assertEqual(1, res) 178 179 def test_get_with_putters(self): 180 with self.assertWarns(DeprecationWarning): 181 q = asyncio.Queue(1, loop=self.loop) 182 q.put_nowait(1) 183 184 waiter = self.loop.create_future() 185 q._putters.append(waiter) 186 187 res = self.loop.run_until_complete(q.get()) 188 self.assertEqual(1, res) 189 self.assertTrue(waiter.done()) 190 self.assertIsNone(waiter.result()) 191 192 def test_blocking_get_wait(self): 193 194 def gen(): 195 when = yield 196 self.assertAlmostEqual(0.01, when) 197 yield 0.01 198 199 loop = self.new_test_loop(gen) 200 201 with self.assertWarns(DeprecationWarning): 202 q = asyncio.Queue(loop=loop) 203 started = asyncio.Event(loop=loop) 204 finished = False 205 206 async def queue_get(): 207 nonlocal finished 208 started.set() 209 res = await q.get() 210 finished = True 211 return res 212 213 async def queue_put(): 214 loop.call_later(0.01, q.put_nowait, 1) 215 queue_get_task = loop.create_task(queue_get()) 216 await started.wait() 217 self.assertFalse(finished) 218 res = await queue_get_task 219 self.assertTrue(finished) 220 return res 221 222 res = loop.run_until_complete(queue_put()) 223 self.assertEqual(1, res) 224 self.assertAlmostEqual(0.01, loop.time()) 225 226 def test_nonblocking_get(self): 227 with self.assertWarns(DeprecationWarning): 228 q = asyncio.Queue(loop=self.loop) 229 q.put_nowait(1) 230 self.assertEqual(1, q.get_nowait()) 231 232 def test_nonblocking_get_exception(self): 233 with self.assertWarns(DeprecationWarning): 234 q = asyncio.Queue(loop=self.loop) 235 self.assertRaises(asyncio.QueueEmpty, q.get_nowait) 236 237 def test_get_cancelled(self): 238 239 def gen(): 240 when = yield 241 self.assertAlmostEqual(0.01, when) 242 when = yield 0.01 243 self.assertAlmostEqual(0.061, when) 244 yield 0.05 245 246 loop = self.new_test_loop(gen) 247 248 with self.assertWarns(DeprecationWarning): 249 q = asyncio.Queue(loop=loop) 250 251 async def queue_get(): 252 return await asyncio.wait_for(q.get(), 0.051) 253 254 async def test(): 255 get_task = loop.create_task(queue_get()) 256 await asyncio.sleep(0.01) # let the task start 257 q.put_nowait(1) 258 return await get_task 259 260 self.assertEqual(1, loop.run_until_complete(test())) 261 self.assertAlmostEqual(0.06, loop.time()) 262 263 def test_get_cancelled_race(self): 264 with self.assertWarns(DeprecationWarning): 265 q = asyncio.Queue(loop=self.loop) 266 267 t1 = self.loop.create_task(q.get()) 268 t2 = self.loop.create_task(q.get()) 269 270 test_utils.run_briefly(self.loop) 271 t1.cancel() 272 test_utils.run_briefly(self.loop) 273 self.assertTrue(t1.done()) 274 q.put_nowait('a') 275 test_utils.run_briefly(self.loop) 276 self.assertEqual(t2.result(), 'a') 277 278 def test_get_with_waiting_putters(self): 279 with self.assertWarns(DeprecationWarning): 280 q = asyncio.Queue(loop=self.loop, maxsize=1) 281 self.loop.create_task(q.put('a')) 282 self.loop.create_task(q.put('b')) 283 test_utils.run_briefly(self.loop) 284 self.assertEqual(self.loop.run_until_complete(q.get()), 'a') 285 self.assertEqual(self.loop.run_until_complete(q.get()), 'b') 286 287 def test_why_are_getters_waiting(self): 288 # From issue #268. 289 290 async def consumer(queue, num_expected): 291 for _ in range(num_expected): 292 await queue.get() 293 294 async def producer(queue, num_items): 295 for i in range(num_items): 296 await queue.put(i) 297 298 queue_size = 1 299 producer_num_items = 5 300 301 with self.assertWarns(DeprecationWarning): 302 q = asyncio.Queue(queue_size, loop=self.loop) 303 304 self.loop.run_until_complete( 305 asyncio.gather(producer(q, producer_num_items), 306 consumer(q, producer_num_items), 307 loop=self.loop), 308 ) 309 310 def test_cancelled_getters_not_being_held_in_self_getters(self): 311 def a_generator(): 312 yield 0.1 313 yield 0.2 314 315 self.loop = self.new_test_loop(a_generator) 316 317 async def consumer(queue): 318 try: 319 item = await asyncio.wait_for(queue.get(), 0.1) 320 except asyncio.TimeoutError: 321 pass 322 323 with self.assertWarns(DeprecationWarning): 324 queue = asyncio.Queue(loop=self.loop, maxsize=5) 325 self.loop.run_until_complete(self.loop.create_task(consumer(queue))) 326 self.assertEqual(len(queue._getters), 0) 327 328 329class QueuePutTests(_QueueTestBase): 330 331 def test_blocking_put(self): 332 with self.assertWarns(DeprecationWarning): 333 q = asyncio.Queue(loop=self.loop) 334 335 async def queue_put(): 336 # No maxsize, won't block. 337 await q.put(1) 338 339 self.loop.run_until_complete(queue_put()) 340 341 def test_blocking_put_wait(self): 342 343 def gen(): 344 when = yield 345 self.assertAlmostEqual(0.01, when) 346 yield 0.01 347 348 loop = self.new_test_loop(gen) 349 350 with self.assertWarns(DeprecationWarning): 351 q = asyncio.Queue(maxsize=1, loop=loop) 352 started = asyncio.Event(loop=loop) 353 finished = False 354 355 async def queue_put(): 356 nonlocal finished 357 started.set() 358 await q.put(1) 359 await q.put(2) 360 finished = True 361 362 async def queue_get(): 363 loop.call_later(0.01, q.get_nowait) 364 queue_put_task = loop.create_task(queue_put()) 365 await started.wait() 366 self.assertFalse(finished) 367 await queue_put_task 368 self.assertTrue(finished) 369 370 loop.run_until_complete(queue_get()) 371 self.assertAlmostEqual(0.01, loop.time()) 372 373 def test_nonblocking_put(self): 374 with self.assertWarns(DeprecationWarning): 375 q = asyncio.Queue(loop=self.loop) 376 q.put_nowait(1) 377 self.assertEqual(1, q.get_nowait()) 378 379 def test_get_cancel_drop_one_pending_reader(self): 380 def gen(): 381 yield 0.01 382 yield 0.1 383 384 loop = self.new_test_loop(gen) 385 386 with self.assertWarns(DeprecationWarning): 387 q = asyncio.Queue(loop=loop) 388 389 reader = loop.create_task(q.get()) 390 391 loop.run_until_complete(asyncio.sleep(0.01)) 392 393 q.put_nowait(1) 394 q.put_nowait(2) 395 reader.cancel() 396 397 try: 398 loop.run_until_complete(reader) 399 except asyncio.CancelledError: 400 # try again 401 reader = loop.create_task(q.get()) 402 loop.run_until_complete(reader) 403 404 result = reader.result() 405 # if we get 2, it means 1 got dropped! 406 self.assertEqual(1, result) 407 408 def test_get_cancel_drop_many_pending_readers(self): 409 def gen(): 410 yield 0.01 411 yield 0.1 412 413 loop = self.new_test_loop(gen) 414 loop.set_debug(True) 415 416 with self.assertWarns(DeprecationWarning): 417 q = asyncio.Queue(loop=loop) 418 419 reader1 = loop.create_task(q.get()) 420 reader2 = loop.create_task(q.get()) 421 reader3 = loop.create_task(q.get()) 422 423 loop.run_until_complete(asyncio.sleep(0.01)) 424 425 q.put_nowait(1) 426 q.put_nowait(2) 427 reader1.cancel() 428 429 try: 430 loop.run_until_complete(reader1) 431 except asyncio.CancelledError: 432 pass 433 434 loop.run_until_complete(reader3) 435 436 # It is undefined in which order concurrent readers receive results. 437 self.assertEqual({reader2.result(), reader3.result()}, {1, 2}) 438 439 def test_put_cancel_drop(self): 440 441 def gen(): 442 yield 0.01 443 yield 0.1 444 445 loop = self.new_test_loop(gen) 446 447 with self.assertWarns(DeprecationWarning): 448 q = asyncio.Queue(1, loop=loop) 449 450 q.put_nowait(1) 451 452 # putting a second item in the queue has to block (qsize=1) 453 writer = loop.create_task(q.put(2)) 454 loop.run_until_complete(asyncio.sleep(0.01)) 455 456 value1 = q.get_nowait() 457 self.assertEqual(value1, 1) 458 459 writer.cancel() 460 try: 461 loop.run_until_complete(writer) 462 except asyncio.CancelledError: 463 # try again 464 writer = loop.create_task(q.put(2)) 465 loop.run_until_complete(writer) 466 467 value2 = q.get_nowait() 468 self.assertEqual(value2, 2) 469 self.assertEqual(q.qsize(), 0) 470 471 def test_nonblocking_put_exception(self): 472 with self.assertWarns(DeprecationWarning): 473 q = asyncio.Queue(maxsize=1, loop=self.loop) 474 q.put_nowait(1) 475 self.assertRaises(asyncio.QueueFull, q.put_nowait, 2) 476 477 def test_float_maxsize(self): 478 with self.assertWarns(DeprecationWarning): 479 q = asyncio.Queue(maxsize=1.3, loop=self.loop) 480 q.put_nowait(1) 481 q.put_nowait(2) 482 self.assertTrue(q.full()) 483 self.assertRaises(asyncio.QueueFull, q.put_nowait, 3) 484 485 with self.assertWarns(DeprecationWarning): 486 q = asyncio.Queue(maxsize=1.3, loop=self.loop) 487 488 async def queue_put(): 489 await q.put(1) 490 await q.put(2) 491 self.assertTrue(q.full()) 492 self.loop.run_until_complete(queue_put()) 493 494 def test_put_cancelled(self): 495 with self.assertWarns(DeprecationWarning): 496 q = asyncio.Queue(loop=self.loop) 497 498 async def queue_put(): 499 await q.put(1) 500 return True 501 502 async def test(): 503 return await q.get() 504 505 t = self.loop.create_task(queue_put()) 506 self.assertEqual(1, self.loop.run_until_complete(test())) 507 self.assertTrue(t.done()) 508 self.assertTrue(t.result()) 509 510 def test_put_cancelled_race(self): 511 with self.assertWarns(DeprecationWarning): 512 q = asyncio.Queue(loop=self.loop, maxsize=1) 513 514 put_a = self.loop.create_task(q.put('a')) 515 put_b = self.loop.create_task(q.put('b')) 516 put_c = self.loop.create_task(q.put('X')) 517 518 test_utils.run_briefly(self.loop) 519 self.assertTrue(put_a.done()) 520 self.assertFalse(put_b.done()) 521 522 put_c.cancel() 523 test_utils.run_briefly(self.loop) 524 self.assertTrue(put_c.done()) 525 self.assertEqual(q.get_nowait(), 'a') 526 test_utils.run_briefly(self.loop) 527 self.assertEqual(q.get_nowait(), 'b') 528 529 self.loop.run_until_complete(put_b) 530 531 def test_put_with_waiting_getters(self): 532 with self.assertWarns(DeprecationWarning): 533 q = asyncio.Queue(loop=self.loop) 534 t = self.loop.create_task(q.get()) 535 test_utils.run_briefly(self.loop) 536 self.loop.run_until_complete(q.put('a')) 537 self.assertEqual(self.loop.run_until_complete(t), 'a') 538 539 def test_why_are_putters_waiting(self): 540 # From issue #265. 541 542 with self.assertWarns(DeprecationWarning): 543 queue = asyncio.Queue(2, loop=self.loop) 544 545 async def putter(item): 546 await queue.put(item) 547 548 async def getter(): 549 await asyncio.sleep(0) 550 num = queue.qsize() 551 for _ in range(num): 552 item = queue.get_nowait() 553 554 t0 = putter(0) 555 t1 = putter(1) 556 t2 = putter(2) 557 t3 = putter(3) 558 self.loop.run_until_complete( 559 asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop)) 560 561 def test_cancelled_puts_not_being_held_in_self_putters(self): 562 def a_generator(): 563 yield 0.01 564 yield 0.1 565 566 loop = self.new_test_loop(a_generator) 567 568 # Full queue. 569 with self.assertWarns(DeprecationWarning): 570 queue = asyncio.Queue(loop=loop, maxsize=1) 571 queue.put_nowait(1) 572 573 # Task waiting for space to put an item in the queue. 574 put_task = loop.create_task(queue.put(1)) 575 loop.run_until_complete(asyncio.sleep(0.01)) 576 577 # Check that the putter is correctly removed from queue._putters when 578 # the task is canceled. 579 self.assertEqual(len(queue._putters), 1) 580 put_task.cancel() 581 with self.assertRaises(asyncio.CancelledError): 582 loop.run_until_complete(put_task) 583 self.assertEqual(len(queue._putters), 0) 584 585 def test_cancelled_put_silence_value_error_exception(self): 586 def gen(): 587 yield 0.01 588 yield 0.1 589 590 loop = self.new_test_loop(gen) 591 592 # Full Queue. 593 with self.assertWarns(DeprecationWarning): 594 queue = asyncio.Queue(1, loop=loop) 595 queue.put_nowait(1) 596 597 # Task waiting for space to put a item in the queue. 598 put_task = loop.create_task(queue.put(1)) 599 loop.run_until_complete(asyncio.sleep(0.01)) 600 601 # get_nowait() remove the future of put_task from queue._putters. 602 queue.get_nowait() 603 # When canceled, queue.put is going to remove its future from 604 # self._putters but it was removed previously by queue.get_nowait(). 605 put_task.cancel() 606 607 # The ValueError exception triggered by queue._putters.remove(putter) 608 # inside queue.put should be silenced. 609 # If the ValueError is silenced we should catch a CancelledError. 610 with self.assertRaises(asyncio.CancelledError): 611 loop.run_until_complete(put_task) 612 613 614class LifoQueueTests(_QueueTestBase): 615 616 def test_order(self): 617 with self.assertWarns(DeprecationWarning): 618 q = asyncio.LifoQueue(loop=self.loop) 619 for i in [1, 3, 2]: 620 q.put_nowait(i) 621 622 items = [q.get_nowait() for _ in range(3)] 623 self.assertEqual([2, 3, 1], items) 624 625 626class PriorityQueueTests(_QueueTestBase): 627 628 def test_order(self): 629 with self.assertWarns(DeprecationWarning): 630 q = asyncio.PriorityQueue(loop=self.loop) 631 for i in [1, 3, 2]: 632 q.put_nowait(i) 633 634 items = [q.get_nowait() for _ in range(3)] 635 self.assertEqual([1, 2, 3], items) 636 637 638class _QueueJoinTestMixin: 639 640 q_class = None 641 642 def test_task_done_underflow(self): 643 with self.assertWarns(DeprecationWarning): 644 q = self.q_class(loop=self.loop) 645 self.assertRaises(ValueError, q.task_done) 646 647 def test_task_done(self): 648 with self.assertWarns(DeprecationWarning): 649 q = self.q_class(loop=self.loop) 650 for i in range(100): 651 q.put_nowait(i) 652 653 accumulator = 0 654 655 # Two workers get items from the queue and call task_done after each. 656 # Join the queue and assert all items have been processed. 657 running = True 658 659 async def worker(): 660 nonlocal accumulator 661 662 while running: 663 item = await q.get() 664 accumulator += item 665 q.task_done() 666 667 async def test(): 668 tasks = [self.loop.create_task(worker()) 669 for index in range(2)] 670 671 await q.join() 672 return tasks 673 674 tasks = self.loop.run_until_complete(test()) 675 self.assertEqual(sum(range(100)), accumulator) 676 677 # close running generators 678 running = False 679 for i in range(len(tasks)): 680 q.put_nowait(0) 681 self.loop.run_until_complete(asyncio.wait(tasks)) 682 683 def test_join_empty_queue(self): 684 with self.assertWarns(DeprecationWarning): 685 q = self.q_class(loop=self.loop) 686 687 # Test that a queue join()s successfully, and before anything else 688 # (done twice for insurance). 689 690 async def join(): 691 await q.join() 692 await q.join() 693 694 self.loop.run_until_complete(join()) 695 696 def test_format(self): 697 with self.assertWarns(DeprecationWarning): 698 q = self.q_class(loop=self.loop) 699 self.assertEqual(q._format(), 'maxsize=0') 700 701 q._unfinished_tasks = 2 702 self.assertEqual(q._format(), 'maxsize=0 tasks=2') 703 704 705class QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): 706 q_class = asyncio.Queue 707 708 709class LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): 710 q_class = asyncio.LifoQueue 711 712 713class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): 714 q_class = asyncio.PriorityQueue 715 716 717if __name__ == '__main__': 718 unittest.main() 719