1'''A multi-producer, multi-consumer queue.''' 2 3import threading 4import types 5from collections import deque 6from heapq import heappush, heappop 7from time import monotonic as time 8try: 9 from _queue import SimpleQueue 10except ImportError: 11 SimpleQueue = None 12 13__all__ = [ 14 'Empty', 15 'Full', 16 'ShutDown', 17 'Queue', 18 'PriorityQueue', 19 'LifoQueue', 20 'SimpleQueue', 21] 22 23 24try: 25 from _queue import Empty 26except ImportError: 27 class Empty(Exception): 28 'Exception raised by Queue.get(block=0)/get_nowait().' 29 pass 30 31class Full(Exception): 32 'Exception raised by Queue.put(block=0)/put_nowait().' 33 pass 34 35 36class ShutDown(Exception): 37 '''Raised when put/get with shut-down queue.''' 38 39 40class Queue: 41 '''Create a queue object with a given maximum size. 42 43 If maxsize is <= 0, the queue size is infinite. 44 ''' 45 46 def __init__(self, maxsize=0): 47 self.maxsize = maxsize 48 self._init(maxsize) 49 50 # mutex must be held whenever the queue is mutating. All methods 51 # that acquire mutex must release it before returning. mutex 52 # is shared between the three conditions, so acquiring and 53 # releasing the conditions also acquires and releases mutex. 54 self.mutex = threading.Lock() 55 56 # Notify not_empty whenever an item is added to the queue; a 57 # thread waiting to get is notified then. 58 self.not_empty = threading.Condition(self.mutex) 59 60 # Notify not_full whenever an item is removed from the queue; 61 # a thread waiting to put is notified then. 62 self.not_full = threading.Condition(self.mutex) 63 64 # Notify all_tasks_done whenever the number of unfinished tasks 65 # drops to zero; thread waiting to join() is notified to resume 66 self.all_tasks_done = threading.Condition(self.mutex) 67 self.unfinished_tasks = 0 68 69 # Queue shutdown state 70 self.is_shutdown = False 71 72 def task_done(self): 73 '''Indicate that a formerly enqueued task is complete. 74 75 Used by Queue consumer threads. For each get() used to fetch a task, 76 a subsequent call to task_done() tells the queue that the processing 77 on the task is complete. 78 79 If a join() is currently blocking, it will resume when all items 80 have been processed (meaning that a task_done() call was received 81 for every item that had been put() into the queue). 82 83 shutdown(immediate=True) calls task_done() for each remaining item in 84 the queue. 85 86 Raises a ValueError if called more times than there were items 87 placed in the queue. 88 ''' 89 with self.all_tasks_done: 90 unfinished = self.unfinished_tasks - 1 91 if unfinished <= 0: 92 if unfinished < 0: 93 raise ValueError('task_done() called too many times') 94 self.all_tasks_done.notify_all() 95 self.unfinished_tasks = unfinished 96 97 def join(self): 98 '''Blocks until all items in the Queue have been gotten and processed. 99 100 The count of unfinished tasks goes up whenever an item is added to the 101 queue. The count goes down whenever a consumer thread calls task_done() 102 to indicate the item was retrieved and all work on it is complete. 103 104 When the count of unfinished tasks drops to zero, join() unblocks. 105 ''' 106 with self.all_tasks_done: 107 while self.unfinished_tasks: 108 self.all_tasks_done.wait() 109 110 def qsize(self): 111 '''Return the approximate size of the queue (not reliable!).''' 112 with self.mutex: 113 return self._qsize() 114 115 def empty(self): 116 '''Return True if the queue is empty, False otherwise (not reliable!). 117 118 This method is likely to be removed at some point. Use qsize() == 0 119 as a direct substitute, but be aware that either approach risks a race 120 condition where a queue can grow before the result of empty() or 121 qsize() can be used. 122 123 To create code that needs to wait for all queued tasks to be 124 completed, the preferred technique is to use the join() method. 125 ''' 126 with self.mutex: 127 return not self._qsize() 128 129 def full(self): 130 '''Return True if the queue is full, False otherwise (not reliable!). 131 132 This method is likely to be removed at some point. Use qsize() >= n 133 as a direct substitute, but be aware that either approach risks a race 134 condition where a queue can shrink before the result of full() or 135 qsize() can be used. 136 ''' 137 with self.mutex: 138 return 0 < self.maxsize <= self._qsize() 139 140 def put(self, item, block=True, timeout=None): 141 '''Put an item into the queue. 142 143 If optional args 'block' is true and 'timeout' is None (the default), 144 block if necessary until a free slot is available. If 'timeout' is 145 a non-negative number, it blocks at most 'timeout' seconds and raises 146 the Full exception if no free slot was available within that time. 147 Otherwise ('block' is false), put an item on the queue if a free slot 148 is immediately available, else raise the Full exception ('timeout' 149 is ignored in that case). 150 151 Raises ShutDown if the queue has been shut down. 152 ''' 153 with self.not_full: 154 if self.is_shutdown: 155 raise ShutDown 156 if self.maxsize > 0: 157 if not block: 158 if self._qsize() >= self.maxsize: 159 raise Full 160 elif timeout is None: 161 while self._qsize() >= self.maxsize: 162 self.not_full.wait() 163 if self.is_shutdown: 164 raise ShutDown 165 elif timeout < 0: 166 raise ValueError("'timeout' must be a non-negative number") 167 else: 168 endtime = time() + timeout 169 while self._qsize() >= self.maxsize: 170 remaining = endtime - time() 171 if remaining <= 0.0: 172 raise Full 173 self.not_full.wait(remaining) 174 if self.is_shutdown: 175 raise ShutDown 176 self._put(item) 177 self.unfinished_tasks += 1 178 self.not_empty.notify() 179 180 def get(self, block=True, timeout=None): 181 '''Remove and return an item from the queue. 182 183 If optional args 'block' is true and 'timeout' is None (the default), 184 block if necessary until an item is available. If 'timeout' is 185 a non-negative number, it blocks at most 'timeout' seconds and raises 186 the Empty exception if no item was available within that time. 187 Otherwise ('block' is false), return an item if one is immediately 188 available, else raise the Empty exception ('timeout' is ignored 189 in that case). 190 191 Raises ShutDown if the queue has been shut down and is empty, 192 or if the queue has been shut down immediately. 193 ''' 194 with self.not_empty: 195 if self.is_shutdown and not self._qsize(): 196 raise ShutDown 197 if not block: 198 if not self._qsize(): 199 raise Empty 200 elif timeout is None: 201 while not self._qsize(): 202 self.not_empty.wait() 203 if self.is_shutdown and not self._qsize(): 204 raise ShutDown 205 elif timeout < 0: 206 raise ValueError("'timeout' must be a non-negative number") 207 else: 208 endtime = time() + timeout 209 while not self._qsize(): 210 remaining = endtime - time() 211 if remaining <= 0.0: 212 raise Empty 213 self.not_empty.wait(remaining) 214 if self.is_shutdown and not self._qsize(): 215 raise ShutDown 216 item = self._get() 217 self.not_full.notify() 218 return item 219 220 def put_nowait(self, item): 221 '''Put an item into the queue without blocking. 222 223 Only enqueue the item if a free slot is immediately available. 224 Otherwise raise the Full exception. 225 ''' 226 return self.put(item, block=False) 227 228 def get_nowait(self): 229 '''Remove and return an item from the queue without blocking. 230 231 Only get an item if one is immediately available. Otherwise 232 raise the Empty exception. 233 ''' 234 return self.get(block=False) 235 236 def shutdown(self, immediate=False): 237 '''Shut-down the queue, making queue gets and puts raise ShutDown. 238 239 By default, gets will only raise once the queue is empty. Set 240 'immediate' to True to make gets raise immediately instead. 241 242 All blocked callers of put() and get() will be unblocked. If 243 'immediate', a task is marked as done for each item remaining in 244 the queue, which may unblock callers of join(). 245 ''' 246 with self.mutex: 247 self.is_shutdown = True 248 if immediate: 249 while self._qsize(): 250 self._get() 251 if self.unfinished_tasks > 0: 252 self.unfinished_tasks -= 1 253 # release all blocked threads in `join()` 254 self.all_tasks_done.notify_all() 255 # All getters need to re-check queue-empty to raise ShutDown 256 self.not_empty.notify_all() 257 self.not_full.notify_all() 258 259 # Override these methods to implement other queue organizations 260 # (e.g. stack or priority queue). 261 # These will only be called with appropriate locks held 262 263 # Initialize the queue representation 264 def _init(self, maxsize): 265 self.queue = deque() 266 267 def _qsize(self): 268 return len(self.queue) 269 270 # Put a new item in the queue 271 def _put(self, item): 272 self.queue.append(item) 273 274 # Get an item from the queue 275 def _get(self): 276 return self.queue.popleft() 277 278 __class_getitem__ = classmethod(types.GenericAlias) 279 280 281class PriorityQueue(Queue): 282 '''Variant of Queue that retrieves open entries in priority order (lowest first). 283 284 Entries are typically tuples of the form: (priority number, data). 285 ''' 286 287 def _init(self, maxsize): 288 self.queue = [] 289 290 def _qsize(self): 291 return len(self.queue) 292 293 def _put(self, item): 294 heappush(self.queue, item) 295 296 def _get(self): 297 return heappop(self.queue) 298 299 300class LifoQueue(Queue): 301 '''Variant of Queue that retrieves most recently added entries first.''' 302 303 def _init(self, maxsize): 304 self.queue = [] 305 306 def _qsize(self): 307 return len(self.queue) 308 309 def _put(self, item): 310 self.queue.append(item) 311 312 def _get(self): 313 return self.queue.pop() 314 315 316class _PySimpleQueue: 317 '''Simple, unbounded FIFO queue. 318 319 This pure Python implementation is not reentrant. 320 ''' 321 # Note: while this pure Python version provides fairness 322 # (by using a threading.Semaphore which is itself fair, being based 323 # on threading.Condition), fairness is not part of the API contract. 324 # This allows the C version to use a different implementation. 325 326 def __init__(self): 327 self._queue = deque() 328 self._count = threading.Semaphore(0) 329 330 def put(self, item, block=True, timeout=None): 331 '''Put the item on the queue. 332 333 The optional 'block' and 'timeout' arguments are ignored, as this method 334 never blocks. They are provided for compatibility with the Queue class. 335 ''' 336 self._queue.append(item) 337 self._count.release() 338 339 def get(self, block=True, timeout=None): 340 '''Remove and return an item from the queue. 341 342 If optional args 'block' is true and 'timeout' is None (the default), 343 block if necessary until an item is available. If 'timeout' is 344 a non-negative number, it blocks at most 'timeout' seconds and raises 345 the Empty exception if no item was available within that time. 346 Otherwise ('block' is false), return an item if one is immediately 347 available, else raise the Empty exception ('timeout' is ignored 348 in that case). 349 ''' 350 if timeout is not None and timeout < 0: 351 raise ValueError("'timeout' must be a non-negative number") 352 if not self._count.acquire(block, timeout): 353 raise Empty 354 return self._queue.popleft() 355 356 def put_nowait(self, item): 357 '''Put an item into the queue without blocking. 358 359 This is exactly equivalent to `put(item, block=False)` and is only provided 360 for compatibility with the Queue class. 361 ''' 362 return self.put(item, block=False) 363 364 def get_nowait(self): 365 '''Remove and return an item from the queue without blocking. 366 367 Only get an item if one is immediately available. Otherwise 368 raise the Empty exception. 369 ''' 370 return self.get(block=False) 371 372 def empty(self): 373 '''Return True if the queue is empty, False otherwise (not reliable!).''' 374 return len(self._queue) == 0 375 376 def qsize(self): 377 '''Return the approximate size of the queue (not reliable!).''' 378 return len(self._queue) 379 380 __class_getitem__ = classmethod(types.GenericAlias) 381 382 383if SimpleQueue is None: 384 SimpleQueue = _PySimpleQueue 385