• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'''A multi-producer, multi-consumer queue.'''
2
3import threading
4import types
5from collections import deque
6from heapq import heappush, heappop
7from time import monotonic as time
8try:
9    from _queue import SimpleQueue
10except ImportError:
11    SimpleQueue = None
12
13__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']
14
15
16try:
17    from _queue import Empty
18except ImportError:
19    class Empty(Exception):
20        'Exception raised by Queue.get(block=0)/get_nowait().'
21        pass
22
23class Full(Exception):
24    'Exception raised by Queue.put(block=0)/put_nowait().'
25    pass
26
27
28class Queue:
29    '''Create a queue object with a given maximum size.
30
31    If maxsize is <= 0, the queue size is infinite.
32    '''
33
34    def __init__(self, maxsize=0):
35        self.maxsize = maxsize
36        self._init(maxsize)
37
38        # mutex must be held whenever the queue is mutating.  All methods
39        # that acquire mutex must release it before returning.  mutex
40        # is shared between the three conditions, so acquiring and
41        # releasing the conditions also acquires and releases mutex.
42        self.mutex = threading.Lock()
43
44        # Notify not_empty whenever an item is added to the queue; a
45        # thread waiting to get is notified then.
46        self.not_empty = threading.Condition(self.mutex)
47
48        # Notify not_full whenever an item is removed from the queue;
49        # a thread waiting to put is notified then.
50        self.not_full = threading.Condition(self.mutex)
51
52        # Notify all_tasks_done whenever the number of unfinished tasks
53        # drops to zero; thread waiting to join() is notified to resume
54        self.all_tasks_done = threading.Condition(self.mutex)
55        self.unfinished_tasks = 0
56
57    def task_done(self):
58        '''Indicate that a formerly enqueued task is complete.
59
60        Used by Queue consumer threads.  For each get() used to fetch a task,
61        a subsequent call to task_done() tells the queue that the processing
62        on the task is complete.
63
64        If a join() is currently blocking, it will resume when all items
65        have been processed (meaning that a task_done() call was received
66        for every item that had been put() into the queue).
67
68        Raises a ValueError if called more times than there were items
69        placed in the queue.
70        '''
71        with self.all_tasks_done:
72            unfinished = self.unfinished_tasks - 1
73            if unfinished <= 0:
74                if unfinished < 0:
75                    raise ValueError('task_done() called too many times')
76                self.all_tasks_done.notify_all()
77            self.unfinished_tasks = unfinished
78
79    def join(self):
80        '''Blocks until all items in the Queue have been gotten and processed.
81
82        The count of unfinished tasks goes up whenever an item is added to the
83        queue. The count goes down whenever a consumer thread calls task_done()
84        to indicate the item was retrieved and all work on it is complete.
85
86        When the count of unfinished tasks drops to zero, join() unblocks.
87        '''
88        with self.all_tasks_done:
89            while self.unfinished_tasks:
90                self.all_tasks_done.wait()
91
92    def qsize(self):
93        '''Return the approximate size of the queue (not reliable!).'''
94        with self.mutex:
95            return self._qsize()
96
97    def empty(self):
98        '''Return True if the queue is empty, False otherwise (not reliable!).
99
100        This method is likely to be removed at some point.  Use qsize() == 0
101        as a direct substitute, but be aware that either approach risks a race
102        condition where a queue can grow before the result of empty() or
103        qsize() can be used.
104
105        To create code that needs to wait for all queued tasks to be
106        completed, the preferred technique is to use the join() method.
107        '''
108        with self.mutex:
109            return not self._qsize()
110
111    def full(self):
112        '''Return True if the queue is full, False otherwise (not reliable!).
113
114        This method is likely to be removed at some point.  Use qsize() >= n
115        as a direct substitute, but be aware that either approach risks a race
116        condition where a queue can shrink before the result of full() or
117        qsize() can be used.
118        '''
119        with self.mutex:
120            return 0 < self.maxsize <= self._qsize()
121
122    def put(self, item, block=True, timeout=None):
123        '''Put an item into the queue.
124
125        If optional args 'block' is true and 'timeout' is None (the default),
126        block if necessary until a free slot is available. If 'timeout' is
127        a non-negative number, it blocks at most 'timeout' seconds and raises
128        the Full exception if no free slot was available within that time.
129        Otherwise ('block' is false), put an item on the queue if a free slot
130        is immediately available, else raise the Full exception ('timeout'
131        is ignored in that case).
132        '''
133        with self.not_full:
134            if self.maxsize > 0:
135                if not block:
136                    if self._qsize() >= self.maxsize:
137                        raise Full
138                elif timeout is None:
139                    while self._qsize() >= self.maxsize:
140                        self.not_full.wait()
141                elif timeout < 0:
142                    raise ValueError("'timeout' must be a non-negative number")
143                else:
144                    endtime = time() + timeout
145                    while self._qsize() >= self.maxsize:
146                        remaining = endtime - time()
147                        if remaining <= 0.0:
148                            raise Full
149                        self.not_full.wait(remaining)
150            self._put(item)
151            self.unfinished_tasks += 1
152            self.not_empty.notify()
153
154    def get(self, block=True, timeout=None):
155        '''Remove and return an item from the queue.
156
157        If optional args 'block' is true and 'timeout' is None (the default),
158        block if necessary until an item is available. If 'timeout' is
159        a non-negative number, it blocks at most 'timeout' seconds and raises
160        the Empty exception if no item was available within that time.
161        Otherwise ('block' is false), return an item if one is immediately
162        available, else raise the Empty exception ('timeout' is ignored
163        in that case).
164        '''
165        with self.not_empty:
166            if not block:
167                if not self._qsize():
168                    raise Empty
169            elif timeout is None:
170                while not self._qsize():
171                    self.not_empty.wait()
172            elif timeout < 0:
173                raise ValueError("'timeout' must be a non-negative number")
174            else:
175                endtime = time() + timeout
176                while not self._qsize():
177                    remaining = endtime - time()
178                    if remaining <= 0.0:
179                        raise Empty
180                    self.not_empty.wait(remaining)
181            item = self._get()
182            self.not_full.notify()
183            return item
184
185    def put_nowait(self, item):
186        '''Put an item into the queue without blocking.
187
188        Only enqueue the item if a free slot is immediately available.
189        Otherwise raise the Full exception.
190        '''
191        return self.put(item, block=False)
192
193    def get_nowait(self):
194        '''Remove and return an item from the queue without blocking.
195
196        Only get an item if one is immediately available. Otherwise
197        raise the Empty exception.
198        '''
199        return self.get(block=False)
200
201    # Override these methods to implement other queue organizations
202    # (e.g. stack or priority queue).
203    # These will only be called with appropriate locks held
204
205    # Initialize the queue representation
206    def _init(self, maxsize):
207        self.queue = deque()
208
209    def _qsize(self):
210        return len(self.queue)
211
212    # Put a new item in the queue
213    def _put(self, item):
214        self.queue.append(item)
215
216    # Get an item from the queue
217    def _get(self):
218        return self.queue.popleft()
219
220    __class_getitem__ = classmethod(types.GenericAlias)
221
222
223class PriorityQueue(Queue):
224    '''Variant of Queue that retrieves open entries in priority order (lowest first).
225
226    Entries are typically tuples of the form:  (priority number, data).
227    '''
228
229    def _init(self, maxsize):
230        self.queue = []
231
232    def _qsize(self):
233        return len(self.queue)
234
235    def _put(self, item):
236        heappush(self.queue, item)
237
238    def _get(self):
239        return heappop(self.queue)
240
241
242class LifoQueue(Queue):
243    '''Variant of Queue that retrieves most recently added entries first.'''
244
245    def _init(self, maxsize):
246        self.queue = []
247
248    def _qsize(self):
249        return len(self.queue)
250
251    def _put(self, item):
252        self.queue.append(item)
253
254    def _get(self):
255        return self.queue.pop()
256
257
258class _PySimpleQueue:
259    '''Simple, unbounded FIFO queue.
260
261    This pure Python implementation is not reentrant.
262    '''
263    # Note: while this pure Python version provides fairness
264    # (by using a threading.Semaphore which is itself fair, being based
265    #  on threading.Condition), fairness is not part of the API contract.
266    # This allows the C version to use a different implementation.
267
268    def __init__(self):
269        self._queue = deque()
270        self._count = threading.Semaphore(0)
271
272    def put(self, item, block=True, timeout=None):
273        '''Put the item on the queue.
274
275        The optional 'block' and 'timeout' arguments are ignored, as this method
276        never blocks.  They are provided for compatibility with the Queue class.
277        '''
278        self._queue.append(item)
279        self._count.release()
280
281    def get(self, block=True, timeout=None):
282        '''Remove and return an item from the queue.
283
284        If optional args 'block' is true and 'timeout' is None (the default),
285        block if necessary until an item is available. If 'timeout' is
286        a non-negative number, it blocks at most 'timeout' seconds and raises
287        the Empty exception if no item was available within that time.
288        Otherwise ('block' is false), return an item if one is immediately
289        available, else raise the Empty exception ('timeout' is ignored
290        in that case).
291        '''
292        if timeout is not None and timeout < 0:
293            raise ValueError("'timeout' must be a non-negative number")
294        if not self._count.acquire(block, timeout):
295            raise Empty
296        return self._queue.popleft()
297
298    def put_nowait(self, item):
299        '''Put an item into the queue without blocking.
300
301        This is exactly equivalent to `put(item)` and is only provided
302        for compatibility with the Queue class.
303        '''
304        return self.put(item, block=False)
305
306    def get_nowait(self):
307        '''Remove and return an item from the queue without blocking.
308
309        Only get an item if one is immediately available. Otherwise
310        raise the Empty exception.
311        '''
312        return self.get(block=False)
313
314    def empty(self):
315        '''Return True if the queue is empty, False otherwise (not reliable!).'''
316        return len(self._queue) == 0
317
318    def qsize(self):
319        '''Return the approximate size of the queue (not reliable!).'''
320        return len(self._queue)
321
322    __class_getitem__ = classmethod(types.GenericAlias)
323
324
325if SimpleQueue is None:
326    SimpleQueue = _PySimpleQueue
327