1.. currentmodule:: asyncio 2 3.. _asyncio-queues: 4 5====== 6Queues 7====== 8 9**Source code:** :source:`Lib/asyncio/queues.py` 10 11------------------------------------------------ 12 13asyncio queues are designed to be similar to classes of the 14:mod:`queue` module. Although asyncio queues are not thread-safe, 15they are designed to be used specifically in async/await code. 16 17Note that methods of asyncio queues don't have a *timeout* parameter; 18use :func:`asyncio.wait_for` function to do queue operations with a 19timeout. 20 21See also the `Examples`_ section below. 22 23Queue 24===== 25 26.. class:: Queue(maxsize=0, \*, loop=None) 27 28 A first in, first out (FIFO) queue. 29 30 If *maxsize* is less than or equal to zero, the queue size is 31 infinite. If it is an integer greater than ``0``, then 32 ``await put()`` blocks when the queue reaches *maxsize* 33 until an item is removed by :meth:`get`. 34 35 Unlike the standard library threading :mod:`queue`, the size of 36 the queue is always known and can be returned by calling the 37 :meth:`qsize` method. 38 39 .. deprecated-removed:: 3.8 3.10 40 The *loop* parameter. 41 42 43 This class is :ref:`not thread safe <asyncio-multithreading>`. 44 45 .. attribute:: maxsize 46 47 Number of items allowed in the queue. 48 49 .. method:: empty() 50 51 Return ``True`` if the queue is empty, ``False`` otherwise. 52 53 .. method:: full() 54 55 Return ``True`` if there are :attr:`maxsize` items in the queue. 56 57 If the queue was initialized with ``maxsize=0`` (the default), 58 then :meth:`full()` never returns ``True``. 59 60 .. coroutinemethod:: get() 61 62 Remove and return an item from the queue. If queue is empty, 63 wait until an item is available. 64 65 .. method:: get_nowait() 66 67 Return an item if one is immediately available, else raise 68 :exc:`QueueEmpty`. 69 70 .. coroutinemethod:: join() 71 72 Block until all items in the queue have been received and processed. 73 74 The count of unfinished tasks goes up whenever an item is added 75 to the queue. The count goes down whenever a consumer coroutine calls 76 :meth:`task_done` to indicate that the item was retrieved and all 77 work on it is complete. When the count of unfinished tasks drops 78 to zero, :meth:`join` unblocks. 79 80 .. coroutinemethod:: put(item) 81 82 Put an item into the queue. If the queue is full, wait until a 83 free slot is available before adding the item. 84 85 .. method:: put_nowait(item) 86 87 Put an item into the queue without blocking. 88 89 If no free slot is immediately available, raise :exc:`QueueFull`. 90 91 .. method:: qsize() 92 93 Return the number of items in the queue. 94 95 .. method:: task_done() 96 97 Indicate that a formerly enqueued task is complete. 98 99 Used by queue consumers. For each :meth:`~Queue.get` used to 100 fetch a task, a subsequent call to :meth:`task_done` tells the 101 queue that the processing on the task is complete. 102 103 If a :meth:`join` is currently blocking, it will resume when all 104 items have been processed (meaning that a :meth:`task_done` 105 call was received for every item that had been :meth:`~Queue.put` 106 into the queue). 107 108 Raises :exc:`ValueError` if called more times than there were 109 items placed in the queue. 110 111 112Priority Queue 113============== 114 115.. class:: PriorityQueue 116 117 A variant of :class:`Queue`; retrieves entries in priority order 118 (lowest first). 119 120 Entries are typically tuples of the form 121 ``(priority_number, data)``. 122 123 124LIFO Queue 125========== 126 127.. class:: LifoQueue 128 129 A variant of :class:`Queue` that retrieves most recently added 130 entries first (last in, first out). 131 132 133Exceptions 134========== 135 136.. exception:: QueueEmpty 137 138 This exception is raised when the :meth:`~Queue.get_nowait` method 139 is called on an empty queue. 140 141 142.. exception:: QueueFull 143 144 Exception raised when the :meth:`~Queue.put_nowait` method is called 145 on a queue that has reached its *maxsize*. 146 147 148Examples 149======== 150 151.. _asyncio_example_queue_dist: 152 153Queues can be used to distribute workload between several 154concurrent tasks:: 155 156 import asyncio 157 import random 158 import time 159 160 161 async def worker(name, queue): 162 while True: 163 # Get a "work item" out of the queue. 164 sleep_for = await queue.get() 165 166 # Sleep for the "sleep_for" seconds. 167 await asyncio.sleep(sleep_for) 168 169 # Notify the queue that the "work item" has been processed. 170 queue.task_done() 171 172 print(f'{name} has slept for {sleep_for:.2f} seconds') 173 174 175 async def main(): 176 # Create a queue that we will use to store our "workload". 177 queue = asyncio.Queue() 178 179 # Generate random timings and put them into the queue. 180 total_sleep_time = 0 181 for _ in range(20): 182 sleep_for = random.uniform(0.05, 1.0) 183 total_sleep_time += sleep_for 184 queue.put_nowait(sleep_for) 185 186 # Create three worker tasks to process the queue concurrently. 187 tasks = [] 188 for i in range(3): 189 task = asyncio.create_task(worker(f'worker-{i}', queue)) 190 tasks.append(task) 191 192 # Wait until the queue is fully processed. 193 started_at = time.monotonic() 194 await queue.join() 195 total_slept_for = time.monotonic() - started_at 196 197 # Cancel our worker tasks. 198 for task in tasks: 199 task.cancel() 200 # Wait until all worker tasks are cancelled. 201 await asyncio.gather(*tasks, return_exceptions=True) 202 203 print('====') 204 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') 205 print(f'total expected sleep time: {total_sleep_time:.2f} seconds') 206 207 208 asyncio.run(main()) 209