• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""A multi-producer, multi-consumer queue."""
2
3from time import time as _time
4try:
5    import threading as _threading
6except ImportError:
7    import dummy_threading as _threading
8from collections import deque
9import heapq
10
11__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
12
13class Empty(Exception):
14    "Exception raised by Queue.get(block=0)/get_nowait()."
15    pass
16
17class Full(Exception):
18    "Exception raised by Queue.put(block=0)/put_nowait()."
19    pass
20
21class Queue:
22    """Create a queue object with a given maximum size.
23
24    If maxsize is <= 0, the queue size is infinite.
25    """
26    def __init__(self, maxsize=0):
27        self.maxsize = maxsize
28        self._init(maxsize)
29        # mutex must be held whenever the queue is mutating.  All methods
30        # that acquire mutex must release it before returning.  mutex
31        # is shared between the three conditions, so acquiring and
32        # releasing the conditions also acquires and releases mutex.
33        self.mutex = _threading.Lock()
34        # Notify not_empty whenever an item is added to the queue; a
35        # thread waiting to get is notified then.
36        self.not_empty = _threading.Condition(self.mutex)
37        # Notify not_full whenever an item is removed from the queue;
38        # a thread waiting to put is notified then.
39        self.not_full = _threading.Condition(self.mutex)
40        # Notify all_tasks_done whenever the number of unfinished tasks
41        # drops to zero; thread waiting to join() is notified to resume
42        self.all_tasks_done = _threading.Condition(self.mutex)
43        self.unfinished_tasks = 0
44
45    def task_done(self):
46        """Indicate that a formerly enqueued task is complete.
47
48        Used by Queue consumer threads.  For each get() used to fetch a task,
49        a subsequent call to task_done() tells the queue that the processing
50        on the task is complete.
51
52        If a join() is currently blocking, it will resume when all items
53        have been processed (meaning that a task_done() call was received
54        for every item that had been put() into the queue).
55
56        Raises a ValueError if called more times than there were items
57        placed in the queue.
58        """
59        self.all_tasks_done.acquire()
60        try:
61            unfinished = self.unfinished_tasks - 1
62            if unfinished <= 0:
63                if unfinished < 0:
64                    raise ValueError('task_done() called too many times')
65                self.all_tasks_done.notify_all()
66            self.unfinished_tasks = unfinished
67        finally:
68            self.all_tasks_done.release()
69
70    def join(self):
71        """Blocks until all items in the Queue have been gotten and processed.
72
73        The count of unfinished tasks goes up whenever an item is added to the
74        queue. The count goes down whenever a consumer thread calls task_done()
75        to indicate the item was retrieved and all work on it is complete.
76
77        When the count of unfinished tasks drops to zero, join() unblocks.
78        """
79        self.all_tasks_done.acquire()
80        try:
81            while self.unfinished_tasks:
82                self.all_tasks_done.wait()
83        finally:
84            self.all_tasks_done.release()
85
86    def qsize(self):
87        """Return the approximate size of the queue (not reliable!)."""
88        self.mutex.acquire()
89        n = self._qsize()
90        self.mutex.release()
91        return n
92
93    def empty(self):
94        """Return True if the queue is empty, False otherwise (not reliable!)."""
95        self.mutex.acquire()
96        n = not self._qsize()
97        self.mutex.release()
98        return n
99
100    def full(self):
101        """Return True if the queue is full, False otherwise (not reliable!)."""
102        self.mutex.acquire()
103        n = 0 < self.maxsize == self._qsize()
104        self.mutex.release()
105        return n
106
107    def put(self, item, block=True, timeout=None):
108        """Put an item into the queue.
109
110        If optional args 'block' is true and 'timeout' is None (the default),
111        block if necessary until a free slot is available. If 'timeout' is
112        a non-negative number, it blocks at most 'timeout' seconds and raises
113        the Full exception if no free slot was available within that time.
114        Otherwise ('block' is false), put an item on the queue if a free slot
115        is immediately available, else raise the Full exception ('timeout'
116        is ignored in that case).
117        """
118        self.not_full.acquire()
119        try:
120            if self.maxsize > 0:
121                if not block:
122                    if self._qsize() == self.maxsize:
123                        raise Full
124                elif timeout is None:
125                    while self._qsize() == self.maxsize:
126                        self.not_full.wait()
127                elif timeout < 0:
128                    raise ValueError("'timeout' must be a non-negative number")
129                else:
130                    endtime = _time() + timeout
131                    while self._qsize() == self.maxsize:
132                        remaining = endtime - _time()
133                        if remaining <= 0.0:
134                            raise Full
135                        self.not_full.wait(remaining)
136            self._put(item)
137            self.unfinished_tasks += 1
138            self.not_empty.notify()
139        finally:
140            self.not_full.release()
141
142    def put_nowait(self, item):
143        """Put an item into the queue without blocking.
144
145        Only enqueue the item if a free slot is immediately available.
146        Otherwise raise the Full exception.
147        """
148        return self.put(item, False)
149
150    def get(self, block=True, timeout=None):
151        """Remove and return an item from the queue.
152
153        If optional args 'block' is true and 'timeout' is None (the default),
154        block if necessary until an item is available. If 'timeout' is
155        a non-negative number, it blocks at most 'timeout' seconds and raises
156        the Empty exception if no item was available within that time.
157        Otherwise ('block' is false), return an item if one is immediately
158        available, else raise the Empty exception ('timeout' is ignored
159        in that case).
160        """
161        self.not_empty.acquire()
162        try:
163            if not block:
164                if not self._qsize():
165                    raise Empty
166            elif timeout is None:
167                while not self._qsize():
168                    self.not_empty.wait()
169            elif timeout < 0:
170                raise ValueError("'timeout' must be a non-negative number")
171            else:
172                endtime = _time() + timeout
173                while not self._qsize():
174                    remaining = endtime - _time()
175                    if remaining <= 0.0:
176                        raise Empty
177                    self.not_empty.wait(remaining)
178            item = self._get()
179            self.not_full.notify()
180            return item
181        finally:
182            self.not_empty.release()
183
184    def get_nowait(self):
185        """Remove and return an item from the queue without blocking.
186
187        Only get an item if one is immediately available. Otherwise
188        raise the Empty exception.
189        """
190        return self.get(False)
191
192    # Override these methods to implement other queue organizations
193    # (e.g. stack or priority queue).
194    # These will only be called with appropriate locks held
195
196    # Initialize the queue representation
197    def _init(self, maxsize):
198        self.queue = deque()
199
200    def _qsize(self, len=len):
201        return len(self.queue)
202
203    # Put a new item in the queue
204    def _put(self, item):
205        self.queue.append(item)
206
207    # Get an item from the queue
208    def _get(self):
209        return self.queue.popleft()
210
211
212class PriorityQueue(Queue):
213    '''Variant of Queue that retrieves open entries in priority order (lowest first).
214
215    Entries are typically tuples of the form:  (priority number, data).
216    '''
217
218    def _init(self, maxsize):
219        self.queue = []
220
221    def _qsize(self, len=len):
222        return len(self.queue)
223
224    def _put(self, item, heappush=heapq.heappush):
225        heappush(self.queue, item)
226
227    def _get(self, heappop=heapq.heappop):
228        return heappop(self.queue)
229
230
231class LifoQueue(Queue):
232    '''Variant of Queue that retrieves most recently added entries first.'''
233
234    def _init(self, maxsize):
235        self.queue = []
236
237    def _qsize(self, len=len):
238        return len(self.queue)
239
240    def _put(self, item):
241        self.queue.append(item)
242
243    def _get(self):
244        return self.queue.pop()
245