1__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') 2 3import collections 4import heapq 5import warnings 6 7from . import events 8from . import locks 9 10 11class QueueEmpty(Exception): 12 """Raised when Queue.get_nowait() is called on an empty Queue.""" 13 pass 14 15 16class QueueFull(Exception): 17 """Raised when the Queue.put_nowait() method is called on a full Queue.""" 18 pass 19 20 21class Queue: 22 """A queue, useful for coordinating producer and consumer coroutines. 23 24 If maxsize is less than or equal to zero, the queue size is infinite. If it 25 is an integer greater than 0, then "await put()" will block when the 26 queue reaches maxsize, until an item is removed by get(). 27 28 Unlike the standard library Queue, you can reliably know this Queue's size 29 with qsize(), since your single-threaded asyncio application won't be 30 interrupted between calling qsize() and doing an operation on the Queue. 31 """ 32 33 def __init__(self, maxsize=0, *, loop=None): 34 if loop is None: 35 self._loop = events.get_event_loop() 36 else: 37 self._loop = loop 38 warnings.warn("The loop argument is deprecated since Python 3.8, " 39 "and scheduled for removal in Python 3.10.", 40 DeprecationWarning, stacklevel=2) 41 self._maxsize = maxsize 42 43 # Futures. 44 self._getters = collections.deque() 45 # Futures. 46 self._putters = collections.deque() 47 self._unfinished_tasks = 0 48 self._finished = locks.Event(loop=loop) 49 self._finished.set() 50 self._init(maxsize) 51 52 # These three are overridable in subclasses. 53 54 def _init(self, maxsize): 55 self._queue = collections.deque() 56 57 def _get(self): 58 return self._queue.popleft() 59 60 def _put(self, item): 61 self._queue.append(item) 62 63 # End of the overridable methods. 64 65 def _wakeup_next(self, waiters): 66 # Wake up the next waiter (if any) that isn't cancelled. 67 while waiters: 68 waiter = waiters.popleft() 69 if not waiter.done(): 70 waiter.set_result(None) 71 break 72 73 def __repr__(self): 74 return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' 75 76 def __str__(self): 77 return f'<{type(self).__name__} {self._format()}>' 78 79 def __class_getitem__(cls, type): 80 return cls 81 82 def _format(self): 83 result = f'maxsize={self._maxsize!r}' 84 if getattr(self, '_queue', None): 85 result += f' _queue={list(self._queue)!r}' 86 if self._getters: 87 result += f' _getters[{len(self._getters)}]' 88 if self._putters: 89 result += f' _putters[{len(self._putters)}]' 90 if self._unfinished_tasks: 91 result += f' tasks={self._unfinished_tasks}' 92 return result 93 94 def qsize(self): 95 """Number of items in the queue.""" 96 return len(self._queue) 97 98 @property 99 def maxsize(self): 100 """Number of items allowed in the queue.""" 101 return self._maxsize 102 103 def empty(self): 104 """Return True if the queue is empty, False otherwise.""" 105 return not self._queue 106 107 def full(self): 108 """Return True if there are maxsize items in the queue. 109 110 Note: if the Queue was initialized with maxsize=0 (the default), 111 then full() is never True. 112 """ 113 if self._maxsize <= 0: 114 return False 115 else: 116 return self.qsize() >= self._maxsize 117 118 async def put(self, item): 119 """Put an item into the queue. 120 121 Put an item into the queue. If the queue is full, wait until a free 122 slot is available before adding item. 123 """ 124 while self.full(): 125 putter = self._loop.create_future() 126 self._putters.append(putter) 127 try: 128 await putter 129 except: 130 putter.cancel() # Just in case putter is not done yet. 131 try: 132 # Clean self._putters from canceled putters. 133 self._putters.remove(putter) 134 except ValueError: 135 # The putter could be removed from self._putters by a 136 # previous get_nowait call. 137 pass 138 if not self.full() and not putter.cancelled(): 139 # We were woken up by get_nowait(), but can't take 140 # the call. Wake up the next in line. 141 self._wakeup_next(self._putters) 142 raise 143 return self.put_nowait(item) 144 145 def put_nowait(self, item): 146 """Put an item into the queue without blocking. 147 148 If no free slot is immediately available, raise QueueFull. 149 """ 150 if self.full(): 151 raise QueueFull 152 self._put(item) 153 self._unfinished_tasks += 1 154 self._finished.clear() 155 self._wakeup_next(self._getters) 156 157 async def get(self): 158 """Remove and return an item from the queue. 159 160 If queue is empty, wait until an item is available. 161 """ 162 while self.empty(): 163 getter = self._loop.create_future() 164 self._getters.append(getter) 165 try: 166 await getter 167 except: 168 getter.cancel() # Just in case getter is not done yet. 169 try: 170 # Clean self._getters from canceled getters. 171 self._getters.remove(getter) 172 except ValueError: 173 # The getter could be removed from self._getters by a 174 # previous put_nowait call. 175 pass 176 if not self.empty() and not getter.cancelled(): 177 # We were woken up by put_nowait(), but can't take 178 # the call. Wake up the next in line. 179 self._wakeup_next(self._getters) 180 raise 181 return self.get_nowait() 182 183 def get_nowait(self): 184 """Remove and return an item from the queue. 185 186 Return an item if one is immediately available, else raise QueueEmpty. 187 """ 188 if self.empty(): 189 raise QueueEmpty 190 item = self._get() 191 self._wakeup_next(self._putters) 192 return item 193 194 def task_done(self): 195 """Indicate that a formerly enqueued task is complete. 196 197 Used by queue consumers. For each get() used to fetch a task, 198 a subsequent call to task_done() tells the queue that the processing 199 on the task is complete. 200 201 If a join() is currently blocking, it will resume when all items have 202 been processed (meaning that a task_done() call was received for every 203 item that had been put() into the queue). 204 205 Raises ValueError if called more times than there were items placed in 206 the queue. 207 """ 208 if self._unfinished_tasks <= 0: 209 raise ValueError('task_done() called too many times') 210 self._unfinished_tasks -= 1 211 if self._unfinished_tasks == 0: 212 self._finished.set() 213 214 async def join(self): 215 """Block until all items in the queue have been gotten and processed. 216 217 The count of unfinished tasks goes up whenever an item is added to the 218 queue. The count goes down whenever a consumer calls task_done() to 219 indicate that the item was retrieved and all work on it is complete. 220 When the count of unfinished tasks drops to zero, join() unblocks. 221 """ 222 if self._unfinished_tasks > 0: 223 await self._finished.wait() 224 225 226class PriorityQueue(Queue): 227 """A subclass of Queue; retrieves entries in priority order (lowest first). 228 229 Entries are typically tuples of the form: (priority number, data). 230 """ 231 232 def _init(self, maxsize): 233 self._queue = [] 234 235 def _put(self, item, heappush=heapq.heappush): 236 heappush(self._queue, item) 237 238 def _get(self, heappop=heapq.heappop): 239 return heappop(self._queue) 240 241 242class LifoQueue(Queue): 243 """A subclass of Queue that retrieves most recently added entries first.""" 244 245 def _init(self, maxsize): 246 self._queue = [] 247 248 def _put(self, item): 249 self._queue.append(item) 250 251 def _get(self): 252 return self._queue.pop() 253