• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1.. currentmodule:: asyncio
2
3.. _asyncio-queues:
4
5======
6Queues
7======
8
9**Source code:** :source:`Lib/asyncio/queues.py`
10
11------------------------------------------------
12
13asyncio queues are designed to be similar to classes of the
14:mod:`queue` module.  Although asyncio queues are not thread-safe,
15they are designed to be used specifically in async/await code.
16
17Note that methods of asyncio queues don't have a *timeout* parameter;
18use :func:`asyncio.wait_for` function to do queue operations with a
19timeout.
20
21See also the `Examples`_ section below.
22
23Queue
24=====
25
26.. class:: Queue(maxsize=0)
27
28   A first in, first out (FIFO) queue.
29
30   If *maxsize* is less than or equal to zero, the queue size is
31   infinite.  If it is an integer greater than ``0``, then
32   ``await put()`` blocks when the queue reaches *maxsize*
33   until an item is removed by :meth:`get`.
34
35   Unlike the standard library threading :mod:`queue`, the size of
36   the queue is always known and can be returned by calling the
37   :meth:`qsize` method.
38
39
40   This class is :ref:`not thread safe <asyncio-multithreading>`.
41
42   .. attribute:: maxsize
43
44      Number of items allowed in the queue.
45
46   .. method:: empty()
47
48      Return ``True`` if the queue is empty, ``False`` otherwise.
49
50   .. method:: full()
51
52      Return ``True`` if there are :attr:`maxsize` items in the queue.
53
54      If the queue was initialized with ``maxsize=0`` (the default),
55      then :meth:`full()` never returns ``True``.
56
57   .. coroutinemethod:: get()
58
59      Remove and return an item from the queue. If queue is empty,
60      wait until an item is available.
61
62   .. method:: get_nowait()
63
64      Return an item if one is immediately available, else raise
65      :exc:`QueueEmpty`.
66
67   .. coroutinemethod:: join()
68
69      Block until all items in the queue have been received and processed.
70
71      The count of unfinished tasks goes up whenever an item is added
72      to the queue. The count goes down whenever a consumer coroutine calls
73      :meth:`task_done` to indicate that the item was retrieved and all
74      work on it is complete.  When the count of unfinished tasks drops
75      to zero, :meth:`join` unblocks.
76
77   .. coroutinemethod:: put(item)
78
79      Put an item into the queue. If the queue is full, wait until a
80      free slot is available before adding the item.
81
82   .. method:: put_nowait(item)
83
84      Put an item into the queue without blocking.
85
86      If no free slot is immediately available, raise :exc:`QueueFull`.
87
88   .. method:: qsize()
89
90      Return the number of items in the queue.
91
92   .. method:: task_done()
93
94      Indicate that a formerly enqueued task is complete.
95
96      Used by queue consumers. For each :meth:`~Queue.get` used to
97      fetch a task, a subsequent call to :meth:`task_done` tells the
98      queue that the processing on the task is complete.
99
100      If a :meth:`join` is currently blocking, it will resume when all
101      items have been processed (meaning that a :meth:`task_done`
102      call was received for every item that had been :meth:`~Queue.put`
103      into the queue).
104
105      Raises :exc:`ValueError` if called more times than there were
106      items placed in the queue.
107
108   .. deprecated-removed:: 3.8 3.10
109
110      The ``loop`` parameter.  This function has been implicitly getting the
111      current running loop since 3.7.  See
112      :ref:`What's New in 3.10's Removed section <whatsnew310-removed>`
113      for more information.
114
115
116Priority Queue
117==============
118
119.. class:: PriorityQueue
120
121   A variant of :class:`Queue`; retrieves entries in priority order
122   (lowest first).
123
124   Entries are typically tuples of the form
125   ``(priority_number, data)``.
126
127
128LIFO Queue
129==========
130
131.. class:: LifoQueue
132
133   A variant of :class:`Queue` that retrieves most recently added
134   entries first (last in, first out).
135
136
137Exceptions
138==========
139
140.. exception:: QueueEmpty
141
142   This exception is raised when the :meth:`~Queue.get_nowait` method
143   is called on an empty queue.
144
145
146.. exception:: QueueFull
147
148   Exception raised when the :meth:`~Queue.put_nowait` method is called
149   on a queue that has reached its *maxsize*.
150
151
152Examples
153========
154
155.. _asyncio_example_queue_dist:
156
157Queues can be used to distribute workload between several
158concurrent tasks::
159
160   import asyncio
161   import random
162   import time
163
164
165   async def worker(name, queue):
166       while True:
167           # Get a "work item" out of the queue.
168           sleep_for = await queue.get()
169
170           # Sleep for the "sleep_for" seconds.
171           await asyncio.sleep(sleep_for)
172
173           # Notify the queue that the "work item" has been processed.
174           queue.task_done()
175
176           print(f'{name} has slept for {sleep_for:.2f} seconds')
177
178
179   async def main():
180       # Create a queue that we will use to store our "workload".
181       queue = asyncio.Queue()
182
183       # Generate random timings and put them into the queue.
184       total_sleep_time = 0
185       for _ in range(20):
186           sleep_for = random.uniform(0.05, 1.0)
187           total_sleep_time += sleep_for
188           queue.put_nowait(sleep_for)
189
190       # Create three worker tasks to process the queue concurrently.
191       tasks = []
192       for i in range(3):
193           task = asyncio.create_task(worker(f'worker-{i}', queue))
194           tasks.append(task)
195
196       # Wait until the queue is fully processed.
197       started_at = time.monotonic()
198       await queue.join()
199       total_slept_for = time.monotonic() - started_at
200
201       # Cancel our worker tasks.
202       for task in tasks:
203           task.cancel()
204       # Wait until all worker tasks are cancelled.
205       await asyncio.gather(*tasks, return_exceptions=True)
206
207       print('====')
208       print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
209       print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
210
211
212   asyncio.run(main())
213