1.. currentmodule:: asyncio 2 3.. _asyncio-queues: 4 5====== 6Queues 7====== 8 9**Source code:** :source:`Lib/asyncio/queues.py` 10 11------------------------------------------------ 12 13asyncio queues are designed to be similar to classes of the 14:mod:`queue` module. Although asyncio queues are not thread-safe, 15they are designed to be used specifically in async/await code. 16 17Note that methods of asyncio queues don't have a *timeout* parameter; 18use :func:`asyncio.wait_for` function to do queue operations with a 19timeout. 20 21See also the `Examples`_ section below. 22 23Queue 24===== 25 26.. class:: Queue(maxsize=0) 27 28 A first in, first out (FIFO) queue. 29 30 If *maxsize* is less than or equal to zero, the queue size is 31 infinite. If it is an integer greater than ``0``, then 32 ``await put()`` blocks when the queue reaches *maxsize* 33 until an item is removed by :meth:`get`. 34 35 Unlike the standard library threading :mod:`queue`, the size of 36 the queue is always known and can be returned by calling the 37 :meth:`qsize` method. 38 39 .. versionchanged:: 3.10 40 Removed the *loop* parameter. 41 42 43 This class is :ref:`not thread safe <asyncio-multithreading>`. 44 45 .. attribute:: maxsize 46 47 Number of items allowed in the queue. 48 49 .. method:: empty() 50 51 Return ``True`` if the queue is empty, ``False`` otherwise. 52 53 .. method:: full() 54 55 Return ``True`` if there are :attr:`maxsize` items in the queue. 56 57 If the queue was initialized with ``maxsize=0`` (the default), 58 then :meth:`full` never returns ``True``. 59 60 .. coroutinemethod:: get() 61 62 Remove and return an item from the queue. If queue is empty, 63 wait until an item is available. 64 65 Raises :exc:`QueueShutDown` if the queue has been shut down and 66 is empty, or if the queue has been shut down immediately. 67 68 .. method:: get_nowait() 69 70 Return an item if one is immediately available, else raise 71 :exc:`QueueEmpty`. 72 73 .. coroutinemethod:: join() 74 75 Block until all items in the queue have been received and processed. 76 77 The count of unfinished tasks goes up whenever an item is added 78 to the queue. The count goes down whenever a consumer coroutine calls 79 :meth:`task_done` to indicate that the item was retrieved and all 80 work on it is complete. When the count of unfinished tasks drops 81 to zero, :meth:`join` unblocks. 82 83 .. coroutinemethod:: put(item) 84 85 Put an item into the queue. If the queue is full, wait until a 86 free slot is available before adding the item. 87 88 Raises :exc:`QueueShutDown` if the queue has been shut down. 89 90 .. method:: put_nowait(item) 91 92 Put an item into the queue without blocking. 93 94 If no free slot is immediately available, raise :exc:`QueueFull`. 95 96 .. method:: qsize() 97 98 Return the number of items in the queue. 99 100 .. method:: shutdown(immediate=False) 101 102 Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` 103 raise :exc:`QueueShutDown`. 104 105 By default, :meth:`~Queue.get` on a shut down queue will only 106 raise once the queue is empty. Set *immediate* to true to make 107 :meth:`~Queue.get` raise immediately instead. 108 109 All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get` 110 will be unblocked. If *immediate* is true, a task will be marked 111 as done for each remaining item in the queue, which may unblock 112 callers of :meth:`~Queue.join`. 113 114 .. versionadded:: 3.13 115 116 .. method:: task_done() 117 118 Indicate that a formerly enqueued task is complete. 119 120 Used by queue consumers. For each :meth:`~Queue.get` used to 121 fetch a task, a subsequent call to :meth:`task_done` tells the 122 queue that the processing on the task is complete. 123 124 If a :meth:`join` is currently blocking, it will resume when all 125 items have been processed (meaning that a :meth:`task_done` 126 call was received for every item that had been :meth:`~Queue.put` 127 into the queue). 128 129 ``shutdown(immediate=True)`` calls :meth:`task_done` for each 130 remaining item in the queue. 131 132 Raises :exc:`ValueError` if called more times than there were 133 items placed in the queue. 134 135 136Priority Queue 137============== 138 139.. class:: PriorityQueue 140 141 A variant of :class:`Queue`; retrieves entries in priority order 142 (lowest first). 143 144 Entries are typically tuples of the form 145 ``(priority_number, data)``. 146 147 148LIFO Queue 149========== 150 151.. class:: LifoQueue 152 153 A variant of :class:`Queue` that retrieves most recently added 154 entries first (last in, first out). 155 156 157Exceptions 158========== 159 160.. exception:: QueueEmpty 161 162 This exception is raised when the :meth:`~Queue.get_nowait` method 163 is called on an empty queue. 164 165 166.. exception:: QueueFull 167 168 Exception raised when the :meth:`~Queue.put_nowait` method is called 169 on a queue that has reached its *maxsize*. 170 171 172.. exception:: QueueShutDown 173 174 Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is 175 called on a queue which has been shut down. 176 177 .. versionadded:: 3.13 178 179 180Examples 181======== 182 183.. _asyncio_example_queue_dist: 184 185Queues can be used to distribute workload between several 186concurrent tasks:: 187 188 import asyncio 189 import random 190 import time 191 192 193 async def worker(name, queue): 194 while True: 195 # Get a "work item" out of the queue. 196 sleep_for = await queue.get() 197 198 # Sleep for the "sleep_for" seconds. 199 await asyncio.sleep(sleep_for) 200 201 # Notify the queue that the "work item" has been processed. 202 queue.task_done() 203 204 print(f'{name} has slept for {sleep_for:.2f} seconds') 205 206 207 async def main(): 208 # Create a queue that we will use to store our "workload". 209 queue = asyncio.Queue() 210 211 # Generate random timings and put them into the queue. 212 total_sleep_time = 0 213 for _ in range(20): 214 sleep_for = random.uniform(0.05, 1.0) 215 total_sleep_time += sleep_for 216 queue.put_nowait(sleep_for) 217 218 # Create three worker tasks to process the queue concurrently. 219 tasks = [] 220 for i in range(3): 221 task = asyncio.create_task(worker(f'worker-{i}', queue)) 222 tasks.append(task) 223 224 # Wait until the queue is fully processed. 225 started_at = time.monotonic() 226 await queue.join() 227 total_slept_for = time.monotonic() - started_at 228 229 # Cancel our worker tasks. 230 for task in tasks: 231 task.cancel() 232 # Wait until all worker tasks are cancelled. 233 await asyncio.gather(*tasks, return_exceptions=True) 234 235 print('====') 236 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') 237 print(f'total expected sleep time: {total_sleep_time:.2f} seconds') 238 239 240 asyncio.run(main()) 241