1"""Tests for queues.py""" 2 3import unittest 4from unittest import mock 5 6import asyncio 7from test.test_asyncio import utils as test_utils 8 9 10def tearDownModule(): 11 asyncio.set_event_loop_policy(None) 12 13 14class _QueueTestBase(test_utils.TestCase): 15 16 def setUp(self): 17 super().setUp() 18 self.loop = self.new_test_loop() 19 20 21class QueueBasicTests(_QueueTestBase): 22 23 def _test_repr_or_str(self, fn, expect_id): 24 """Test Queue's repr or str. 25 26 fn is repr or str. expect_id is True if we expect the Queue's id to 27 appear in fn(Queue()). 28 """ 29 def gen(): 30 when = yield 31 self.assertAlmostEqual(0.1, when) 32 when = yield 0.1 33 self.assertAlmostEqual(0.2, when) 34 yield 0.1 35 36 loop = self.new_test_loop(gen) 37 38 q = asyncio.Queue() 39 self.assertTrue(fn(q).startswith('<Queue'), fn(q)) 40 id_is_present = hex(id(q)) in fn(q) 41 self.assertEqual(expect_id, id_is_present) 42 43 async def add_getter(): 44 q = asyncio.Queue() 45 # Start a task that waits to get. 46 loop.create_task(q.get()) 47 # Let it start waiting. 48 await asyncio.sleep(0.1) 49 self.assertTrue('_getters[1]' in fn(q)) 50 # resume q.get coroutine to finish generator 51 q.put_nowait(0) 52 53 loop.run_until_complete(add_getter()) 54 55 async def add_putter(): 56 q = asyncio.Queue(maxsize=1) 57 q.put_nowait(1) 58 # Start a task that waits to put. 59 loop.create_task(q.put(2)) 60 # Let it start waiting. 61 await asyncio.sleep(0.1) 62 self.assertTrue('_putters[1]' in fn(q)) 63 # resume q.put coroutine to finish generator 64 q.get_nowait() 65 66 loop.run_until_complete(add_putter()) 67 q = asyncio.Queue() 68 q.put_nowait(1) 69 self.assertTrue('_queue=[1]' in fn(q)) 70 71 def test_repr(self): 72 self._test_repr_or_str(repr, True) 73 74 def test_str(self): 75 self._test_repr_or_str(str, False) 76 77 def test_empty(self): 78 q = asyncio.Queue() 79 self.assertTrue(q.empty()) 80 q.put_nowait(1) 81 self.assertFalse(q.empty()) 82 self.assertEqual(1, q.get_nowait()) 83 self.assertTrue(q.empty()) 84 85 def test_full(self): 86 q = asyncio.Queue() 87 self.assertFalse(q.full()) 88 89 q = asyncio.Queue(maxsize=1) 90 q.put_nowait(1) 91 self.assertTrue(q.full()) 92 93 def test_order(self): 94 q = asyncio.Queue() 95 for i in [1, 3, 2]: 96 q.put_nowait(i) 97 98 items = [q.get_nowait() for _ in range(3)] 99 self.assertEqual([1, 3, 2], items) 100 101 def test_maxsize(self): 102 103 def gen(): 104 when = yield 105 self.assertAlmostEqual(0.01, when) 106 when = yield 0.01 107 self.assertAlmostEqual(0.02, when) 108 yield 0.01 109 110 loop = self.new_test_loop(gen) 111 112 q = asyncio.Queue(maxsize=2) 113 self.assertEqual(2, q.maxsize) 114 have_been_put = [] 115 116 async def putter(): 117 for i in range(3): 118 await q.put(i) 119 have_been_put.append(i) 120 return True 121 122 async def test(): 123 t = loop.create_task(putter()) 124 await asyncio.sleep(0.01) 125 126 # The putter is blocked after putting two items. 127 self.assertEqual([0, 1], have_been_put) 128 self.assertEqual(0, q.get_nowait()) 129 130 # Let the putter resume and put last item. 131 await asyncio.sleep(0.01) 132 self.assertEqual([0, 1, 2], have_been_put) 133 self.assertEqual(1, q.get_nowait()) 134 self.assertEqual(2, q.get_nowait()) 135 136 self.assertTrue(t.done()) 137 self.assertTrue(t.result()) 138 139 loop.run_until_complete(test()) 140 self.assertAlmostEqual(0.02, loop.time()) 141 142 143class QueueGetTests(_QueueTestBase): 144 145 def test_blocking_get(self): 146 q = asyncio.Queue() 147 q.put_nowait(1) 148 149 async def queue_get(): 150 return await q.get() 151 152 res = self.loop.run_until_complete(queue_get()) 153 self.assertEqual(1, res) 154 155 def test_get_with_putters(self): 156 q = asyncio.Queue(1) 157 q.put_nowait(1) 158 159 waiter = self.loop.create_future() 160 q._putters.append(waiter) 161 162 res = self.loop.run_until_complete(q.get()) 163 self.assertEqual(1, res) 164 self.assertTrue(waiter.done()) 165 self.assertIsNone(waiter.result()) 166 167 def test_blocking_get_wait(self): 168 169 def gen(): 170 when = yield 171 self.assertAlmostEqual(0.01, when) 172 yield 0.01 173 174 loop = self.new_test_loop(gen) 175 176 q = asyncio.Queue() 177 started = asyncio.Event() 178 finished = False 179 180 async def queue_get(): 181 nonlocal finished 182 started.set() 183 res = await q.get() 184 finished = True 185 return res 186 187 async def queue_put(): 188 loop.call_later(0.01, q.put_nowait, 1) 189 queue_get_task = loop.create_task(queue_get()) 190 await started.wait() 191 self.assertFalse(finished) 192 res = await queue_get_task 193 self.assertTrue(finished) 194 return res 195 196 res = loop.run_until_complete(queue_put()) 197 self.assertEqual(1, res) 198 self.assertAlmostEqual(0.01, loop.time()) 199 200 def test_nonblocking_get(self): 201 q = asyncio.Queue() 202 q.put_nowait(1) 203 self.assertEqual(1, q.get_nowait()) 204 205 def test_nonblocking_get_exception(self): 206 q = asyncio.Queue() 207 self.assertRaises(asyncio.QueueEmpty, q.get_nowait) 208 209 def test_get_cancelled(self): 210 211 def gen(): 212 when = yield 213 self.assertAlmostEqual(0.01, when) 214 when = yield 0.01 215 self.assertAlmostEqual(0.061, when) 216 yield 0.05 217 218 loop = self.new_test_loop(gen) 219 220 q = asyncio.Queue() 221 222 async def queue_get(): 223 return await asyncio.wait_for(q.get(), 0.051) 224 225 async def test(): 226 get_task = loop.create_task(queue_get()) 227 await asyncio.sleep(0.01) # let the task start 228 q.put_nowait(1) 229 return await get_task 230 231 self.assertEqual(1, loop.run_until_complete(test())) 232 self.assertAlmostEqual(0.06, loop.time()) 233 234 def test_get_cancelled_race(self): 235 q = asyncio.Queue() 236 237 t1 = self.loop.create_task(q.get()) 238 t2 = self.loop.create_task(q.get()) 239 240 test_utils.run_briefly(self.loop) 241 t1.cancel() 242 test_utils.run_briefly(self.loop) 243 self.assertTrue(t1.done()) 244 q.put_nowait('a') 245 test_utils.run_briefly(self.loop) 246 self.assertEqual(t2.result(), 'a') 247 248 def test_get_with_waiting_putters(self): 249 q = asyncio.Queue(maxsize=1) 250 self.loop.create_task(q.put('a')) 251 self.loop.create_task(q.put('b')) 252 test_utils.run_briefly(self.loop) 253 self.assertEqual(self.loop.run_until_complete(q.get()), 'a') 254 self.assertEqual(self.loop.run_until_complete(q.get()), 'b') 255 256 def test_why_are_getters_waiting(self): 257 # From issue #268. 258 asyncio.set_event_loop(self.loop) 259 260 async def consumer(queue, num_expected): 261 for _ in range(num_expected): 262 await queue.get() 263 264 async def producer(queue, num_items): 265 for i in range(num_items): 266 await queue.put(i) 267 268 queue_size = 1 269 producer_num_items = 5 270 271 async def create_queue(): 272 queue = asyncio.Queue(queue_size) 273 queue._get_loop() 274 return queue 275 276 async def test(): 277 q = await create_queue() 278 await asyncio.gather(producer(q, producer_num_items), 279 consumer(q, producer_num_items)) 280 281 self.loop.run_until_complete(test()) 282 283 def test_cancelled_getters_not_being_held_in_self_getters(self): 284 def a_generator(): 285 yield 0.1 286 yield 0.2 287 288 self.loop = self.new_test_loop(a_generator) 289 290 async def consumer(queue): 291 try: 292 item = await asyncio.wait_for(queue.get(), 0.1) 293 except asyncio.TimeoutError: 294 pass 295 296 queue = asyncio.Queue(maxsize=5) 297 self.loop.run_until_complete(self.loop.create_task(consumer(queue))) 298 self.assertEqual(len(queue._getters), 0) 299 300 301class QueuePutTests(_QueueTestBase): 302 303 def test_blocking_put(self): 304 q = asyncio.Queue() 305 306 async def queue_put(): 307 # No maxsize, won't block. 308 await q.put(1) 309 310 self.loop.run_until_complete(queue_put()) 311 312 def test_blocking_put_wait(self): 313 314 def gen(): 315 when = yield 316 self.assertAlmostEqual(0.01, when) 317 yield 0.01 318 319 loop = self.new_test_loop(gen) 320 321 q = asyncio.Queue(maxsize=1) 322 started = asyncio.Event() 323 finished = False 324 325 async def queue_put(): 326 nonlocal finished 327 started.set() 328 await q.put(1) 329 await q.put(2) 330 finished = True 331 332 async def queue_get(): 333 loop.call_later(0.01, q.get_nowait) 334 queue_put_task = loop.create_task(queue_put()) 335 await started.wait() 336 self.assertFalse(finished) 337 await queue_put_task 338 self.assertTrue(finished) 339 340 loop.run_until_complete(queue_get()) 341 self.assertAlmostEqual(0.01, loop.time()) 342 343 def test_nonblocking_put(self): 344 q = asyncio.Queue() 345 q.put_nowait(1) 346 self.assertEqual(1, q.get_nowait()) 347 348 def test_get_cancel_drop_one_pending_reader(self): 349 def gen(): 350 yield 0.01 351 yield 0.1 352 353 loop = self.new_test_loop(gen) 354 355 q = asyncio.Queue() 356 357 reader = loop.create_task(q.get()) 358 359 loop.run_until_complete(asyncio.sleep(0.01)) 360 361 q.put_nowait(1) 362 q.put_nowait(2) 363 reader.cancel() 364 365 try: 366 loop.run_until_complete(reader) 367 except asyncio.CancelledError: 368 # try again 369 reader = loop.create_task(q.get()) 370 loop.run_until_complete(reader) 371 372 result = reader.result() 373 # if we get 2, it means 1 got dropped! 374 self.assertEqual(1, result) 375 376 def test_get_cancel_drop_many_pending_readers(self): 377 def gen(): 378 yield 0.01 379 yield 0.1 380 381 loop = self.new_test_loop(gen) 382 loop.set_debug(True) 383 384 q = asyncio.Queue() 385 386 reader1 = loop.create_task(q.get()) 387 reader2 = loop.create_task(q.get()) 388 reader3 = loop.create_task(q.get()) 389 390 loop.run_until_complete(asyncio.sleep(0.01)) 391 392 q.put_nowait(1) 393 q.put_nowait(2) 394 reader1.cancel() 395 396 try: 397 loop.run_until_complete(reader1) 398 except asyncio.CancelledError: 399 pass 400 401 loop.run_until_complete(reader3) 402 403 # It is undefined in which order concurrent readers receive results. 404 self.assertEqual({reader2.result(), reader3.result()}, {1, 2}) 405 406 def test_put_cancel_drop(self): 407 408 def gen(): 409 yield 0.01 410 yield 0.1 411 412 loop = self.new_test_loop(gen) 413 414 q = asyncio.Queue(1) 415 416 q.put_nowait(1) 417 418 # putting a second item in the queue has to block (qsize=1) 419 writer = loop.create_task(q.put(2)) 420 loop.run_until_complete(asyncio.sleep(0.01)) 421 422 value1 = q.get_nowait() 423 self.assertEqual(value1, 1) 424 425 writer.cancel() 426 try: 427 loop.run_until_complete(writer) 428 except asyncio.CancelledError: 429 # try again 430 writer = loop.create_task(q.put(2)) 431 loop.run_until_complete(writer) 432 433 value2 = q.get_nowait() 434 self.assertEqual(value2, 2) 435 self.assertEqual(q.qsize(), 0) 436 437 def test_nonblocking_put_exception(self): 438 q = asyncio.Queue(maxsize=1, ) 439 q.put_nowait(1) 440 self.assertRaises(asyncio.QueueFull, q.put_nowait, 2) 441 442 def test_float_maxsize(self): 443 q = asyncio.Queue(maxsize=1.3, ) 444 q.put_nowait(1) 445 q.put_nowait(2) 446 self.assertTrue(q.full()) 447 self.assertRaises(asyncio.QueueFull, q.put_nowait, 3) 448 449 q = asyncio.Queue(maxsize=1.3, ) 450 451 async def queue_put(): 452 await q.put(1) 453 await q.put(2) 454 self.assertTrue(q.full()) 455 self.loop.run_until_complete(queue_put()) 456 457 def test_put_cancelled(self): 458 q = asyncio.Queue() 459 460 async def queue_put(): 461 await q.put(1) 462 return True 463 464 async def test(): 465 return await q.get() 466 467 t = self.loop.create_task(queue_put()) 468 self.assertEqual(1, self.loop.run_until_complete(test())) 469 self.assertTrue(t.done()) 470 self.assertTrue(t.result()) 471 472 def test_put_cancelled_race(self): 473 q = asyncio.Queue(maxsize=1) 474 475 put_a = self.loop.create_task(q.put('a')) 476 put_b = self.loop.create_task(q.put('b')) 477 put_c = self.loop.create_task(q.put('X')) 478 479 test_utils.run_briefly(self.loop) 480 self.assertTrue(put_a.done()) 481 self.assertFalse(put_b.done()) 482 483 put_c.cancel() 484 test_utils.run_briefly(self.loop) 485 self.assertTrue(put_c.done()) 486 self.assertEqual(q.get_nowait(), 'a') 487 test_utils.run_briefly(self.loop) 488 self.assertEqual(q.get_nowait(), 'b') 489 490 self.loop.run_until_complete(put_b) 491 492 def test_put_with_waiting_getters(self): 493 q = asyncio.Queue() 494 t = self.loop.create_task(q.get()) 495 test_utils.run_briefly(self.loop) 496 self.loop.run_until_complete(q.put('a')) 497 self.assertEqual(self.loop.run_until_complete(t), 'a') 498 499 def test_why_are_putters_waiting(self): 500 # From issue #265. 501 asyncio.set_event_loop(self.loop) 502 503 async def create_queue(): 504 q = asyncio.Queue(2) 505 q._get_loop() 506 return q 507 508 queue = self.loop.run_until_complete(create_queue()) 509 510 async def putter(item): 511 await queue.put(item) 512 513 async def getter(): 514 await asyncio.sleep(0) 515 num = queue.qsize() 516 for _ in range(num): 517 item = queue.get_nowait() 518 519 async def test(): 520 t0 = putter(0) 521 t1 = putter(1) 522 t2 = putter(2) 523 t3 = putter(3) 524 await asyncio.gather(getter(), t0, t1, t2, t3) 525 526 self.loop.run_until_complete(test()) 527 528 def test_cancelled_puts_not_being_held_in_self_putters(self): 529 def a_generator(): 530 yield 0.01 531 yield 0.1 532 533 loop = self.new_test_loop(a_generator) 534 535 # Full queue. 536 queue = asyncio.Queue(maxsize=1) 537 queue.put_nowait(1) 538 539 # Task waiting for space to put an item in the queue. 540 put_task = loop.create_task(queue.put(1)) 541 loop.run_until_complete(asyncio.sleep(0.01)) 542 543 # Check that the putter is correctly removed from queue._putters when 544 # the task is canceled. 545 self.assertEqual(len(queue._putters), 1) 546 put_task.cancel() 547 with self.assertRaises(asyncio.CancelledError): 548 loop.run_until_complete(put_task) 549 self.assertEqual(len(queue._putters), 0) 550 551 def test_cancelled_put_silence_value_error_exception(self): 552 def gen(): 553 yield 0.01 554 yield 0.1 555 556 loop = self.new_test_loop(gen) 557 558 # Full Queue. 559 queue = asyncio.Queue(1) 560 queue.put_nowait(1) 561 562 # Task waiting for space to put a item in the queue. 563 put_task = loop.create_task(queue.put(1)) 564 loop.run_until_complete(asyncio.sleep(0.01)) 565 566 # get_nowait() remove the future of put_task from queue._putters. 567 queue.get_nowait() 568 # When canceled, queue.put is going to remove its future from 569 # self._putters but it was removed previously by queue.get_nowait(). 570 put_task.cancel() 571 572 # The ValueError exception triggered by queue._putters.remove(putter) 573 # inside queue.put should be silenced. 574 # If the ValueError is silenced we should catch a CancelledError. 575 with self.assertRaises(asyncio.CancelledError): 576 loop.run_until_complete(put_task) 577 578 579class LifoQueueTests(_QueueTestBase): 580 581 def test_order(self): 582 q = asyncio.LifoQueue() 583 for i in [1, 3, 2]: 584 q.put_nowait(i) 585 586 items = [q.get_nowait() for _ in range(3)] 587 self.assertEqual([2, 3, 1], items) 588 589 590class PriorityQueueTests(_QueueTestBase): 591 592 def test_order(self): 593 q = asyncio.PriorityQueue() 594 for i in [1, 3, 2]: 595 q.put_nowait(i) 596 597 items = [q.get_nowait() for _ in range(3)] 598 self.assertEqual([1, 2, 3], items) 599 600 601class _QueueJoinTestMixin: 602 603 q_class = None 604 605 def test_task_done_underflow(self): 606 q = self.q_class() 607 self.assertRaises(ValueError, q.task_done) 608 609 def test_task_done(self): 610 q = self.q_class() 611 for i in range(100): 612 q.put_nowait(i) 613 614 accumulator = 0 615 616 # Two workers get items from the queue and call task_done after each. 617 # Join the queue and assert all items have been processed. 618 running = True 619 620 async def worker(): 621 nonlocal accumulator 622 623 while running: 624 item = await q.get() 625 accumulator += item 626 q.task_done() 627 628 async def test(): 629 tasks = [self.loop.create_task(worker()) 630 for index in range(2)] 631 632 await q.join() 633 return tasks 634 635 tasks = self.loop.run_until_complete(test()) 636 self.assertEqual(sum(range(100)), accumulator) 637 638 # close running generators 639 running = False 640 for i in range(len(tasks)): 641 q.put_nowait(0) 642 self.loop.run_until_complete(asyncio.wait(tasks)) 643 644 def test_join_empty_queue(self): 645 q = self.q_class() 646 647 # Test that a queue join()s successfully, and before anything else 648 # (done twice for insurance). 649 650 async def join(): 651 await q.join() 652 await q.join() 653 654 self.loop.run_until_complete(join()) 655 656 def test_format(self): 657 q = self.q_class() 658 self.assertEqual(q._format(), 'maxsize=0') 659 660 q._unfinished_tasks = 2 661 self.assertEqual(q._format(), 'maxsize=0 tasks=2') 662 663 664class QueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): 665 q_class = asyncio.Queue 666 667 668class LifoQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): 669 q_class = asyncio.LifoQueue 670 671 672class PriorityQueueJoinTests(_QueueJoinTestMixin, _QueueTestBase): 673 q_class = asyncio.PriorityQueue 674 675 676if __name__ == '__main__': 677 unittest.main() 678