1.. currentmodule:: asyncio 2 3.. _asyncio-queues: 4 5====== 6Queues 7====== 8 9**Source code:** :source:`Lib/asyncio/queues.py` 10 11------------------------------------------------ 12 13asyncio queues are designed to be similar to classes of the 14:mod:`queue` module. Although asyncio queues are not thread-safe, 15they are designed to be used specifically in async/await code. 16 17Note that methods of asyncio queues don't have a *timeout* parameter; 18use :func:`asyncio.wait_for` function to do queue operations with a 19timeout. 20 21See also the `Examples`_ section below. 22 23Queue 24===== 25 26.. class:: Queue(maxsize=0) 27 28 A first in, first out (FIFO) queue. 29 30 If *maxsize* is less than or equal to zero, the queue size is 31 infinite. If it is an integer greater than ``0``, then 32 ``await put()`` blocks when the queue reaches *maxsize* 33 until an item is removed by :meth:`get`. 34 35 Unlike the standard library threading :mod:`queue`, the size of 36 the queue is always known and can be returned by calling the 37 :meth:`qsize` method. 38 39 40 This class is :ref:`not thread safe <asyncio-multithreading>`. 41 42 .. attribute:: maxsize 43 44 Number of items allowed in the queue. 45 46 .. method:: empty() 47 48 Return ``True`` if the queue is empty, ``False`` otherwise. 49 50 .. method:: full() 51 52 Return ``True`` if there are :attr:`maxsize` items in the queue. 53 54 If the queue was initialized with ``maxsize=0`` (the default), 55 then :meth:`full()` never returns ``True``. 56 57 .. coroutinemethod:: get() 58 59 Remove and return an item from the queue. If queue is empty, 60 wait until an item is available. 61 62 .. method:: get_nowait() 63 64 Return an item if one is immediately available, else raise 65 :exc:`QueueEmpty`. 66 67 .. coroutinemethod:: join() 68 69 Block until all items in the queue have been received and processed. 70 71 The count of unfinished tasks goes up whenever an item is added 72 to the queue. The count goes down whenever a consumer coroutine calls 73 :meth:`task_done` to indicate that the item was retrieved and all 74 work on it is complete. When the count of unfinished tasks drops 75 to zero, :meth:`join` unblocks. 76 77 .. coroutinemethod:: put(item) 78 79 Put an item into the queue. If the queue is full, wait until a 80 free slot is available before adding the item. 81 82 .. method:: put_nowait(item) 83 84 Put an item into the queue without blocking. 85 86 If no free slot is immediately available, raise :exc:`QueueFull`. 87 88 .. method:: qsize() 89 90 Return the number of items in the queue. 91 92 .. method:: task_done() 93 94 Indicate that a formerly enqueued task is complete. 95 96 Used by queue consumers. For each :meth:`~Queue.get` used to 97 fetch a task, a subsequent call to :meth:`task_done` tells the 98 queue that the processing on the task is complete. 99 100 If a :meth:`join` is currently blocking, it will resume when all 101 items have been processed (meaning that a :meth:`task_done` 102 call was received for every item that had been :meth:`~Queue.put` 103 into the queue). 104 105 Raises :exc:`ValueError` if called more times than there were 106 items placed in the queue. 107 108 .. deprecated-removed:: 3.8 3.10 109 110 The ``loop`` parameter. This function has been implicitly getting the 111 current running loop since 3.7. See 112 :ref:`What's New in 3.10's Removed section <whatsnew310-removed>` 113 for more information. 114 115 116Priority Queue 117============== 118 119.. class:: PriorityQueue 120 121 A variant of :class:`Queue`; retrieves entries in priority order 122 (lowest first). 123 124 Entries are typically tuples of the form 125 ``(priority_number, data)``. 126 127 128LIFO Queue 129========== 130 131.. class:: LifoQueue 132 133 A variant of :class:`Queue` that retrieves most recently added 134 entries first (last in, first out). 135 136 137Exceptions 138========== 139 140.. exception:: QueueEmpty 141 142 This exception is raised when the :meth:`~Queue.get_nowait` method 143 is called on an empty queue. 144 145 146.. exception:: QueueFull 147 148 Exception raised when the :meth:`~Queue.put_nowait` method is called 149 on a queue that has reached its *maxsize*. 150 151 152Examples 153======== 154 155.. _asyncio_example_queue_dist: 156 157Queues can be used to distribute workload between several 158concurrent tasks:: 159 160 import asyncio 161 import random 162 import time 163 164 165 async def worker(name, queue): 166 while True: 167 # Get a "work item" out of the queue. 168 sleep_for = await queue.get() 169 170 # Sleep for the "sleep_for" seconds. 171 await asyncio.sleep(sleep_for) 172 173 # Notify the queue that the "work item" has been processed. 174 queue.task_done() 175 176 print(f'{name} has slept for {sleep_for:.2f} seconds') 177 178 179 async def main(): 180 # Create a queue that we will use to store our "workload". 181 queue = asyncio.Queue() 182 183 # Generate random timings and put them into the queue. 184 total_sleep_time = 0 185 for _ in range(20): 186 sleep_for = random.uniform(0.05, 1.0) 187 total_sleep_time += sleep_for 188 queue.put_nowait(sleep_for) 189 190 # Create three worker tasks to process the queue concurrently. 191 tasks = [] 192 for i in range(3): 193 task = asyncio.create_task(worker(f'worker-{i}', queue)) 194 tasks.append(task) 195 196 # Wait until the queue is fully processed. 197 started_at = time.monotonic() 198 await queue.join() 199 total_slept_for = time.monotonic() - started_at 200 201 # Cancel our worker tasks. 202 for task in tasks: 203 task.cancel() 204 # Wait until all worker tasks are cancelled. 205 await asyncio.gather(*tasks, return_exceptions=True) 206 207 print('====') 208 print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') 209 print(f'total expected sleep time: {total_sleep_time:.2f} seconds') 210 211 212 asyncio.run(main()) 213