1'''A multi-producer, multi-consumer queue.''' 2 3try: 4 import threading 5except ImportError: 6 import dummy_threading as threading 7from collections import deque 8from heapq import heappush, heappop 9from time import monotonic as time 10 11__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] 12 13class Empty(Exception): 14 'Exception raised by Queue.get(block=0)/get_nowait().' 15 pass 16 17class Full(Exception): 18 'Exception raised by Queue.put(block=0)/put_nowait().' 19 pass 20 21class Queue: 22 '''Create a queue object with a given maximum size. 23 24 If maxsize is <= 0, the queue size is infinite. 25 ''' 26 27 def __init__(self, maxsize=0): 28 self.maxsize = maxsize 29 self._init(maxsize) 30 31 # mutex must be held whenever the queue is mutating. All methods 32 # that acquire mutex must release it before returning. mutex 33 # is shared between the three conditions, so acquiring and 34 # releasing the conditions also acquires and releases mutex. 35 self.mutex = threading.Lock() 36 37 # Notify not_empty whenever an item is added to the queue; a 38 # thread waiting to get is notified then. 39 self.not_empty = threading.Condition(self.mutex) 40 41 # Notify not_full whenever an item is removed from the queue; 42 # a thread waiting to put is notified then. 43 self.not_full = threading.Condition(self.mutex) 44 45 # Notify all_tasks_done whenever the number of unfinished tasks 46 # drops to zero; thread waiting to join() is notified to resume 47 self.all_tasks_done = threading.Condition(self.mutex) 48 self.unfinished_tasks = 0 49 50 def task_done(self): 51 '''Indicate that a formerly enqueued task is complete. 52 53 Used by Queue consumer threads. For each get() used to fetch a task, 54 a subsequent call to task_done() tells the queue that the processing 55 on the task is complete. 56 57 If a join() is currently blocking, it will resume when all items 58 have been processed (meaning that a task_done() call was received 59 for every item that had been put() into the queue). 60 61 Raises a ValueError if called more times than there were items 62 placed in the queue. 63 ''' 64 with self.all_tasks_done: 65 unfinished = self.unfinished_tasks - 1 66 if unfinished <= 0: 67 if unfinished < 0: 68 raise ValueError('task_done() called too many times') 69 self.all_tasks_done.notify_all() 70 self.unfinished_tasks = unfinished 71 72 def join(self): 73 '''Blocks until all items in the Queue have been gotten and processed. 74 75 The count of unfinished tasks goes up whenever an item is added to the 76 queue. The count goes down whenever a consumer thread calls task_done() 77 to indicate the item was retrieved and all work on it is complete. 78 79 When the count of unfinished tasks drops to zero, join() unblocks. 80 ''' 81 with self.all_tasks_done: 82 while self.unfinished_tasks: 83 self.all_tasks_done.wait() 84 85 def qsize(self): 86 '''Return the approximate size of the queue (not reliable!).''' 87 with self.mutex: 88 return self._qsize() 89 90 def empty(self): 91 '''Return True if the queue is empty, False otherwise (not reliable!). 92 93 This method is likely to be removed at some point. Use qsize() == 0 94 as a direct substitute, but be aware that either approach risks a race 95 condition where a queue can grow before the result of empty() or 96 qsize() can be used. 97 98 To create code that needs to wait for all queued tasks to be 99 completed, the preferred technique is to use the join() method. 100 ''' 101 with self.mutex: 102 return not self._qsize() 103 104 def full(self): 105 '''Return True if the queue is full, False otherwise (not reliable!). 106 107 This method is likely to be removed at some point. Use qsize() >= n 108 as a direct substitute, but be aware that either approach risks a race 109 condition where a queue can shrink before the result of full() or 110 qsize() can be used. 111 ''' 112 with self.mutex: 113 return 0 < self.maxsize <= self._qsize() 114 115 def put(self, item, block=True, timeout=None): 116 '''Put an item into the queue. 117 118 If optional args 'block' is true and 'timeout' is None (the default), 119 block if necessary until a free slot is available. If 'timeout' is 120 a non-negative number, it blocks at most 'timeout' seconds and raises 121 the Full exception if no free slot was available within that time. 122 Otherwise ('block' is false), put an item on the queue if a free slot 123 is immediately available, else raise the Full exception ('timeout' 124 is ignored in that case). 125 ''' 126 with self.not_full: 127 if self.maxsize > 0: 128 if not block: 129 if self._qsize() >= self.maxsize: 130 raise Full 131 elif timeout is None: 132 while self._qsize() >= self.maxsize: 133 self.not_full.wait() 134 elif timeout < 0: 135 raise ValueError("'timeout' must be a non-negative number") 136 else: 137 endtime = time() + timeout 138 while self._qsize() >= self.maxsize: 139 remaining = endtime - time() 140 if remaining <= 0.0: 141 raise Full 142 self.not_full.wait(remaining) 143 self._put(item) 144 self.unfinished_tasks += 1 145 self.not_empty.notify() 146 147 def get(self, block=True, timeout=None): 148 '''Remove and return an item from the queue. 149 150 If optional args 'block' is true and 'timeout' is None (the default), 151 block if necessary until an item is available. If 'timeout' is 152 a non-negative number, it blocks at most 'timeout' seconds and raises 153 the Empty exception if no item was available within that time. 154 Otherwise ('block' is false), return an item if one is immediately 155 available, else raise the Empty exception ('timeout' is ignored 156 in that case). 157 ''' 158 with self.not_empty: 159 if not block: 160 if not self._qsize(): 161 raise Empty 162 elif timeout is None: 163 while not self._qsize(): 164 self.not_empty.wait() 165 elif timeout < 0: 166 raise ValueError("'timeout' must be a non-negative number") 167 else: 168 endtime = time() + timeout 169 while not self._qsize(): 170 remaining = endtime - time() 171 if remaining <= 0.0: 172 raise Empty 173 self.not_empty.wait(remaining) 174 item = self._get() 175 self.not_full.notify() 176 return item 177 178 def put_nowait(self, item): 179 '''Put an item into the queue without blocking. 180 181 Only enqueue the item if a free slot is immediately available. 182 Otherwise raise the Full exception. 183 ''' 184 return self.put(item, block=False) 185 186 def get_nowait(self): 187 '''Remove and return an item from the queue without blocking. 188 189 Only get an item if one is immediately available. Otherwise 190 raise the Empty exception. 191 ''' 192 return self.get(block=False) 193 194 # Override these methods to implement other queue organizations 195 # (e.g. stack or priority queue). 196 # These will only be called with appropriate locks held 197 198 # Initialize the queue representation 199 def _init(self, maxsize): 200 self.queue = deque() 201 202 def _qsize(self): 203 return len(self.queue) 204 205 # Put a new item in the queue 206 def _put(self, item): 207 self.queue.append(item) 208 209 # Get an item from the queue 210 def _get(self): 211 return self.queue.popleft() 212 213 214class PriorityQueue(Queue): 215 '''Variant of Queue that retrieves open entries in priority order (lowest first). 216 217 Entries are typically tuples of the form: (priority number, data). 218 ''' 219 220 def _init(self, maxsize): 221 self.queue = [] 222 223 def _qsize(self): 224 return len(self.queue) 225 226 def _put(self, item): 227 heappush(self.queue, item) 228 229 def _get(self): 230 return heappop(self.queue) 231 232 233class LifoQueue(Queue): 234 '''Variant of Queue that retrieves most recently added entries first.''' 235 236 def _init(self, maxsize): 237 self.queue = [] 238 239 def _qsize(self): 240 return len(self.queue) 241 242 def _put(self, item): 243 self.queue.append(item) 244 245 def _get(self): 246 return self.queue.pop() 247