• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'''A multi-producer, multi-consumer queue.'''
2
3import threading
4import types
5from collections import deque
6from heapq import heappush, heappop
7from time import monotonic as time
8try:
9    from _queue import SimpleQueue
10except ImportError:
11    SimpleQueue = None
12
13__all__ = [
14    'Empty',
15    'Full',
16    'ShutDown',
17    'Queue',
18    'PriorityQueue',
19    'LifoQueue',
20    'SimpleQueue',
21]
22
23
24try:
25    from _queue import Empty
26except ImportError:
27    class Empty(Exception):
28        'Exception raised by Queue.get(block=0)/get_nowait().'
29        pass
30
31class Full(Exception):
32    'Exception raised by Queue.put(block=0)/put_nowait().'
33    pass
34
35
36class ShutDown(Exception):
37    '''Raised when put/get with shut-down queue.'''
38
39
40class Queue:
41    '''Create a queue object with a given maximum size.
42
43    If maxsize is <= 0, the queue size is infinite.
44    '''
45
46    def __init__(self, maxsize=0):
47        self.maxsize = maxsize
48        self._init(maxsize)
49
50        # mutex must be held whenever the queue is mutating.  All methods
51        # that acquire mutex must release it before returning.  mutex
52        # is shared between the three conditions, so acquiring and
53        # releasing the conditions also acquires and releases mutex.
54        self.mutex = threading.Lock()
55
56        # Notify not_empty whenever an item is added to the queue; a
57        # thread waiting to get is notified then.
58        self.not_empty = threading.Condition(self.mutex)
59
60        # Notify not_full whenever an item is removed from the queue;
61        # a thread waiting to put is notified then.
62        self.not_full = threading.Condition(self.mutex)
63
64        # Notify all_tasks_done whenever the number of unfinished tasks
65        # drops to zero; thread waiting to join() is notified to resume
66        self.all_tasks_done = threading.Condition(self.mutex)
67        self.unfinished_tasks = 0
68
69        # Queue shutdown state
70        self.is_shutdown = False
71
72    def task_done(self):
73        '''Indicate that a formerly enqueued task is complete.
74
75        Used by Queue consumer threads.  For each get() used to fetch a task,
76        a subsequent call to task_done() tells the queue that the processing
77        on the task is complete.
78
79        If a join() is currently blocking, it will resume when all items
80        have been processed (meaning that a task_done() call was received
81        for every item that had been put() into the queue).
82
83        shutdown(immediate=True) calls task_done() for each remaining item in
84        the queue.
85
86        Raises a ValueError if called more times than there were items
87        placed in the queue.
88        '''
89        with self.all_tasks_done:
90            unfinished = self.unfinished_tasks - 1
91            if unfinished <= 0:
92                if unfinished < 0:
93                    raise ValueError('task_done() called too many times')
94                self.all_tasks_done.notify_all()
95            self.unfinished_tasks = unfinished
96
97    def join(self):
98        '''Blocks until all items in the Queue have been gotten and processed.
99
100        The count of unfinished tasks goes up whenever an item is added to the
101        queue. The count goes down whenever a consumer thread calls task_done()
102        to indicate the item was retrieved and all work on it is complete.
103
104        When the count of unfinished tasks drops to zero, join() unblocks.
105        '''
106        with self.all_tasks_done:
107            while self.unfinished_tasks:
108                self.all_tasks_done.wait()
109
110    def qsize(self):
111        '''Return the approximate size of the queue (not reliable!).'''
112        with self.mutex:
113            return self._qsize()
114
115    def empty(self):
116        '''Return True if the queue is empty, False otherwise (not reliable!).
117
118        This method is likely to be removed at some point.  Use qsize() == 0
119        as a direct substitute, but be aware that either approach risks a race
120        condition where a queue can grow before the result of empty() or
121        qsize() can be used.
122
123        To create code that needs to wait for all queued tasks to be
124        completed, the preferred technique is to use the join() method.
125        '''
126        with self.mutex:
127            return not self._qsize()
128
129    def full(self):
130        '''Return True if the queue is full, False otherwise (not reliable!).
131
132        This method is likely to be removed at some point.  Use qsize() >= n
133        as a direct substitute, but be aware that either approach risks a race
134        condition where a queue can shrink before the result of full() or
135        qsize() can be used.
136        '''
137        with self.mutex:
138            return 0 < self.maxsize <= self._qsize()
139
140    def put(self, item, block=True, timeout=None):
141        '''Put an item into the queue.
142
143        If optional args 'block' is true and 'timeout' is None (the default),
144        block if necessary until a free slot is available. If 'timeout' is
145        a non-negative number, it blocks at most 'timeout' seconds and raises
146        the Full exception if no free slot was available within that time.
147        Otherwise ('block' is false), put an item on the queue if a free slot
148        is immediately available, else raise the Full exception ('timeout'
149        is ignored in that case).
150
151        Raises ShutDown if the queue has been shut down.
152        '''
153        with self.not_full:
154            if self.is_shutdown:
155                raise ShutDown
156            if self.maxsize > 0:
157                if not block:
158                    if self._qsize() >= self.maxsize:
159                        raise Full
160                elif timeout is None:
161                    while self._qsize() >= self.maxsize:
162                        self.not_full.wait()
163                        if self.is_shutdown:
164                            raise ShutDown
165                elif timeout < 0:
166                    raise ValueError("'timeout' must be a non-negative number")
167                else:
168                    endtime = time() + timeout
169                    while self._qsize() >= self.maxsize:
170                        remaining = endtime - time()
171                        if remaining <= 0.0:
172                            raise Full
173                        self.not_full.wait(remaining)
174                        if self.is_shutdown:
175                            raise ShutDown
176            self._put(item)
177            self.unfinished_tasks += 1
178            self.not_empty.notify()
179
180    def get(self, block=True, timeout=None):
181        '''Remove and return an item from the queue.
182
183        If optional args 'block' is true and 'timeout' is None (the default),
184        block if necessary until an item is available. If 'timeout' is
185        a non-negative number, it blocks at most 'timeout' seconds and raises
186        the Empty exception if no item was available within that time.
187        Otherwise ('block' is false), return an item if one is immediately
188        available, else raise the Empty exception ('timeout' is ignored
189        in that case).
190
191        Raises ShutDown if the queue has been shut down and is empty,
192        or if the queue has been shut down immediately.
193        '''
194        with self.not_empty:
195            if self.is_shutdown and not self._qsize():
196                raise ShutDown
197            if not block:
198                if not self._qsize():
199                    raise Empty
200            elif timeout is None:
201                while not self._qsize():
202                    self.not_empty.wait()
203                    if self.is_shutdown and not self._qsize():
204                        raise ShutDown
205            elif timeout < 0:
206                raise ValueError("'timeout' must be a non-negative number")
207            else:
208                endtime = time() + timeout
209                while not self._qsize():
210                    remaining = endtime - time()
211                    if remaining <= 0.0:
212                        raise Empty
213                    self.not_empty.wait(remaining)
214                    if self.is_shutdown and not self._qsize():
215                        raise ShutDown
216            item = self._get()
217            self.not_full.notify()
218            return item
219
220    def put_nowait(self, item):
221        '''Put an item into the queue without blocking.
222
223        Only enqueue the item if a free slot is immediately available.
224        Otherwise raise the Full exception.
225        '''
226        return self.put(item, block=False)
227
228    def get_nowait(self):
229        '''Remove and return an item from the queue without blocking.
230
231        Only get an item if one is immediately available. Otherwise
232        raise the Empty exception.
233        '''
234        return self.get(block=False)
235
236    def shutdown(self, immediate=False):
237        '''Shut-down the queue, making queue gets and puts raise ShutDown.
238
239        By default, gets will only raise once the queue is empty. Set
240        'immediate' to True to make gets raise immediately instead.
241
242        All blocked callers of put() and get() will be unblocked. If
243        'immediate', a task is marked as done for each item remaining in
244        the queue, which may unblock callers of join().
245        '''
246        with self.mutex:
247            self.is_shutdown = True
248            if immediate:
249                while self._qsize():
250                    self._get()
251                    if self.unfinished_tasks > 0:
252                        self.unfinished_tasks -= 1
253                # release all blocked threads in `join()`
254                self.all_tasks_done.notify_all()
255            # All getters need to re-check queue-empty to raise ShutDown
256            self.not_empty.notify_all()
257            self.not_full.notify_all()
258
259    # Override these methods to implement other queue organizations
260    # (e.g. stack or priority queue).
261    # These will only be called with appropriate locks held
262
263    # Initialize the queue representation
264    def _init(self, maxsize):
265        self.queue = deque()
266
267    def _qsize(self):
268        return len(self.queue)
269
270    # Put a new item in the queue
271    def _put(self, item):
272        self.queue.append(item)
273
274    # Get an item from the queue
275    def _get(self):
276        return self.queue.popleft()
277
278    __class_getitem__ = classmethod(types.GenericAlias)
279
280
281class PriorityQueue(Queue):
282    '''Variant of Queue that retrieves open entries in priority order (lowest first).
283
284    Entries are typically tuples of the form:  (priority number, data).
285    '''
286
287    def _init(self, maxsize):
288        self.queue = []
289
290    def _qsize(self):
291        return len(self.queue)
292
293    def _put(self, item):
294        heappush(self.queue, item)
295
296    def _get(self):
297        return heappop(self.queue)
298
299
300class LifoQueue(Queue):
301    '''Variant of Queue that retrieves most recently added entries first.'''
302
303    def _init(self, maxsize):
304        self.queue = []
305
306    def _qsize(self):
307        return len(self.queue)
308
309    def _put(self, item):
310        self.queue.append(item)
311
312    def _get(self):
313        return self.queue.pop()
314
315
316class _PySimpleQueue:
317    '''Simple, unbounded FIFO queue.
318
319    This pure Python implementation is not reentrant.
320    '''
321    # Note: while this pure Python version provides fairness
322    # (by using a threading.Semaphore which is itself fair, being based
323    #  on threading.Condition), fairness is not part of the API contract.
324    # This allows the C version to use a different implementation.
325
326    def __init__(self):
327        self._queue = deque()
328        self._count = threading.Semaphore(0)
329
330    def put(self, item, block=True, timeout=None):
331        '''Put the item on the queue.
332
333        The optional 'block' and 'timeout' arguments are ignored, as this method
334        never blocks.  They are provided for compatibility with the Queue class.
335        '''
336        self._queue.append(item)
337        self._count.release()
338
339    def get(self, block=True, timeout=None):
340        '''Remove and return an item from the queue.
341
342        If optional args 'block' is true and 'timeout' is None (the default),
343        block if necessary until an item is available. If 'timeout' is
344        a non-negative number, it blocks at most 'timeout' seconds and raises
345        the Empty exception if no item was available within that time.
346        Otherwise ('block' is false), return an item if one is immediately
347        available, else raise the Empty exception ('timeout' is ignored
348        in that case).
349        '''
350        if timeout is not None and timeout < 0:
351            raise ValueError("'timeout' must be a non-negative number")
352        if not self._count.acquire(block, timeout):
353            raise Empty
354        return self._queue.popleft()
355
356    def put_nowait(self, item):
357        '''Put an item into the queue without blocking.
358
359        This is exactly equivalent to `put(item, block=False)` and is only provided
360        for compatibility with the Queue class.
361        '''
362        return self.put(item, block=False)
363
364    def get_nowait(self):
365        '''Remove and return an item from the queue without blocking.
366
367        Only get an item if one is immediately available. Otherwise
368        raise the Empty exception.
369        '''
370        return self.get(block=False)
371
372    def empty(self):
373        '''Return True if the queue is empty, False otherwise (not reliable!).'''
374        return len(self._queue) == 0
375
376    def qsize(self):
377        '''Return the approximate size of the queue (not reliable!).'''
378        return len(self._queue)
379
380    __class_getitem__ = classmethod(types.GenericAlias)
381
382
383if SimpleQueue is None:
384    SimpleQueue = _PySimpleQueue
385