• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1.. currentmodule:: asyncio
2
3.. _asyncio-queues:
4
5======
6Queues
7======
8
9**Source code:** :source:`Lib/asyncio/queues.py`
10
11------------------------------------------------
12
13asyncio queues are designed to be similar to classes of the
14:mod:`queue` module.  Although asyncio queues are not thread-safe,
15they are designed to be used specifically in async/await code.
16
17Note that methods of asyncio queues don't have a *timeout* parameter;
18use :func:`asyncio.wait_for` function to do queue operations with a
19timeout.
20
21See also the `Examples`_ section below.
22
23Queue
24=====
25
26.. class:: Queue(maxsize=0)
27
28   A first in, first out (FIFO) queue.
29
30   If *maxsize* is less than or equal to zero, the queue size is
31   infinite.  If it is an integer greater than ``0``, then
32   ``await put()`` blocks when the queue reaches *maxsize*
33   until an item is removed by :meth:`get`.
34
35   Unlike the standard library threading :mod:`queue`, the size of
36   the queue is always known and can be returned by calling the
37   :meth:`qsize` method.
38
39   .. versionchanged:: 3.10
40      Removed the *loop* parameter.
41
42
43   This class is :ref:`not thread safe <asyncio-multithreading>`.
44
45   .. attribute:: maxsize
46
47      Number of items allowed in the queue.
48
49   .. method:: empty()
50
51      Return ``True`` if the queue is empty, ``False`` otherwise.
52
53   .. method:: full()
54
55      Return ``True`` if there are :attr:`maxsize` items in the queue.
56
57      If the queue was initialized with ``maxsize=0`` (the default),
58      then :meth:`full` never returns ``True``.
59
60   .. coroutinemethod:: get()
61
62      Remove and return an item from the queue. If queue is empty,
63      wait until an item is available.
64
65      Raises :exc:`QueueShutDown` if the queue has been shut down and
66      is empty, or if the queue has been shut down immediately.
67
68   .. method:: get_nowait()
69
70      Return an item if one is immediately available, else raise
71      :exc:`QueueEmpty`.
72
73   .. coroutinemethod:: join()
74
75      Block until all items in the queue have been received and processed.
76
77      The count of unfinished tasks goes up whenever an item is added
78      to the queue. The count goes down whenever a consumer coroutine calls
79      :meth:`task_done` to indicate that the item was retrieved and all
80      work on it is complete.  When the count of unfinished tasks drops
81      to zero, :meth:`join` unblocks.
82
83   .. coroutinemethod:: put(item)
84
85      Put an item into the queue. If the queue is full, wait until a
86      free slot is available before adding the item.
87
88      Raises :exc:`QueueShutDown` if the queue has been shut down.
89
90   .. method:: put_nowait(item)
91
92      Put an item into the queue without blocking.
93
94      If no free slot is immediately available, raise :exc:`QueueFull`.
95
96   .. method:: qsize()
97
98      Return the number of items in the queue.
99
100   .. method:: shutdown(immediate=False)
101
102      Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put`
103      raise :exc:`QueueShutDown`.
104
105      By default, :meth:`~Queue.get` on a shut down queue will only
106      raise once the queue is empty. Set *immediate* to true to make
107      :meth:`~Queue.get` raise immediately instead.
108
109      All blocked callers of :meth:`~Queue.put` and :meth:`~Queue.get`
110      will be unblocked. If *immediate* is true, a task will be marked
111      as done for each remaining item in the queue, which may unblock
112      callers of :meth:`~Queue.join`.
113
114      .. versionadded:: 3.13
115
116   .. method:: task_done()
117
118      Indicate that a formerly enqueued task is complete.
119
120      Used by queue consumers. For each :meth:`~Queue.get` used to
121      fetch a task, a subsequent call to :meth:`task_done` tells the
122      queue that the processing on the task is complete.
123
124      If a :meth:`join` is currently blocking, it will resume when all
125      items have been processed (meaning that a :meth:`task_done`
126      call was received for every item that had been :meth:`~Queue.put`
127      into the queue).
128
129      ``shutdown(immediate=True)`` calls :meth:`task_done` for each
130      remaining item in the queue.
131
132      Raises :exc:`ValueError` if called more times than there were
133      items placed in the queue.
134
135
136Priority Queue
137==============
138
139.. class:: PriorityQueue
140
141   A variant of :class:`Queue`; retrieves entries in priority order
142   (lowest first).
143
144   Entries are typically tuples of the form
145   ``(priority_number, data)``.
146
147
148LIFO Queue
149==========
150
151.. class:: LifoQueue
152
153   A variant of :class:`Queue` that retrieves most recently added
154   entries first (last in, first out).
155
156
157Exceptions
158==========
159
160.. exception:: QueueEmpty
161
162   This exception is raised when the :meth:`~Queue.get_nowait` method
163   is called on an empty queue.
164
165
166.. exception:: QueueFull
167
168   Exception raised when the :meth:`~Queue.put_nowait` method is called
169   on a queue that has reached its *maxsize*.
170
171
172.. exception:: QueueShutDown
173
174   Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is
175   called on a queue which has been shut down.
176
177   .. versionadded:: 3.13
178
179
180Examples
181========
182
183.. _asyncio_example_queue_dist:
184
185Queues can be used to distribute workload between several
186concurrent tasks::
187
188   import asyncio
189   import random
190   import time
191
192
193   async def worker(name, queue):
194       while True:
195           # Get a "work item" out of the queue.
196           sleep_for = await queue.get()
197
198           # Sleep for the "sleep_for" seconds.
199           await asyncio.sleep(sleep_for)
200
201           # Notify the queue that the "work item" has been processed.
202           queue.task_done()
203
204           print(f'{name} has slept for {sleep_for:.2f} seconds')
205
206
207   async def main():
208       # Create a queue that we will use to store our "workload".
209       queue = asyncio.Queue()
210
211       # Generate random timings and put them into the queue.
212       total_sleep_time = 0
213       for _ in range(20):
214           sleep_for = random.uniform(0.05, 1.0)
215           total_sleep_time += sleep_for
216           queue.put_nowait(sleep_for)
217
218       # Create three worker tasks to process the queue concurrently.
219       tasks = []
220       for i in range(3):
221           task = asyncio.create_task(worker(f'worker-{i}', queue))
222           tasks.append(task)
223
224       # Wait until the queue is fully processed.
225       started_at = time.monotonic()
226       await queue.join()
227       total_slept_for = time.monotonic() - started_at
228
229       # Cancel our worker tasks.
230       for task in tasks:
231           task.cancel()
232       # Wait until all worker tasks are cancelled.
233       await asyncio.gather(*tasks, return_exceptions=True)
234
235       print('====')
236       print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
237       print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
238
239
240   asyncio.run(main())
241