• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module implementing queues
3#
4# multiprocessing/queues.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11
12import sys
13import os
14import threading
15import collections
16import time
17import types
18import weakref
19import errno
20
21from queue import Empty, Full
22
23import _multiprocessing
24
25from . import connection
26from . import context
27_ForkingPickler = context.reduction.ForkingPickler
28
29from .util import debug, info, Finalize, register_after_fork, is_exiting
30
31#
32# Queue type using a pipe, buffer and thread
33#
34
35class Queue(object):
36
37    def __init__(self, maxsize=0, *, ctx):
38        if maxsize <= 0:
39            # Can raise ImportError (see issues #3770 and #23400)
40            from .synchronize import SEM_VALUE_MAX as maxsize
41        self._maxsize = maxsize
42        self._reader, self._writer = connection.Pipe(duplex=False)
43        self._rlock = ctx.Lock()
44        self._opid = os.getpid()
45        if sys.platform == 'win32':
46            self._wlock = None
47        else:
48            self._wlock = ctx.Lock()
49        self._sem = ctx.BoundedSemaphore(maxsize)
50        # For use by concurrent.futures
51        self._ignore_epipe = False
52        self._reset()
53
54        if sys.platform != 'win32':
55            register_after_fork(self, Queue._after_fork)
56
57    def __getstate__(self):
58        context.assert_spawning(self)
59        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
60                self._rlock, self._wlock, self._sem, self._opid)
61
62    def __setstate__(self, state):
63        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
64         self._rlock, self._wlock, self._sem, self._opid) = state
65        self._reset()
66
67    def _after_fork(self):
68        debug('Queue._after_fork()')
69        self._reset(after_fork=True)
70
71    def _reset(self, after_fork=False):
72        if after_fork:
73            self._notempty._at_fork_reinit()
74        else:
75            self._notempty = threading.Condition(threading.Lock())
76        self._buffer = collections.deque()
77        self._thread = None
78        self._jointhread = None
79        self._joincancelled = False
80        self._closed = False
81        self._close = None
82        self._send_bytes = self._writer.send_bytes
83        self._recv_bytes = self._reader.recv_bytes
84        self._poll = self._reader.poll
85
86    def put(self, obj, block=True, timeout=None):
87        if self._closed:
88            raise ValueError(f"Queue {self!r} is closed")
89        if not self._sem.acquire(block, timeout):
90            raise Full
91
92        with self._notempty:
93            if self._thread is None:
94                self._start_thread()
95            self._buffer.append(obj)
96            self._notempty.notify()
97
98    def get(self, block=True, timeout=None):
99        if self._closed:
100            raise ValueError(f"Queue {self!r} is closed")
101        if block and timeout is None:
102            with self._rlock:
103                res = self._recv_bytes()
104            self._sem.release()
105        else:
106            if block:
107                deadline = time.monotonic() + timeout
108            if not self._rlock.acquire(block, timeout):
109                raise Empty
110            try:
111                if block:
112                    timeout = deadline - time.monotonic()
113                    if not self._poll(timeout):
114                        raise Empty
115                elif not self._poll():
116                    raise Empty
117                res = self._recv_bytes()
118                self._sem.release()
119            finally:
120                self._rlock.release()
121        # unserialize the data after having released the lock
122        return _ForkingPickler.loads(res)
123
124    def qsize(self):
125        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
126        return self._maxsize - self._sem._semlock._get_value()
127
128    def empty(self):
129        return not self._poll()
130
131    def full(self):
132        return self._sem._semlock._is_zero()
133
134    def get_nowait(self):
135        return self.get(False)
136
137    def put_nowait(self, obj):
138        return self.put(obj, False)
139
140    def close(self):
141        self._closed = True
142        try:
143            self._reader.close()
144        finally:
145            close = self._close
146            if close:
147                self._close = None
148                close()
149
150    def join_thread(self):
151        debug('Queue.join_thread()')
152        assert self._closed, "Queue {0!r} not closed".format(self)
153        if self._jointhread:
154            self._jointhread()
155
156    def cancel_join_thread(self):
157        debug('Queue.cancel_join_thread()')
158        self._joincancelled = True
159        try:
160            self._jointhread.cancel()
161        except AttributeError:
162            pass
163
164    def _start_thread(self):
165        debug('Queue._start_thread()')
166
167        # Start thread which transfers data from buffer to pipe
168        self._buffer.clear()
169        self._thread = threading.Thread(
170            target=Queue._feed,
171            args=(self._buffer, self._notempty, self._send_bytes,
172                  self._wlock, self._writer.close, self._ignore_epipe,
173                  self._on_queue_feeder_error, self._sem),
174            name='QueueFeederThread'
175        )
176        self._thread.daemon = True
177
178        debug('doing self._thread.start()')
179        self._thread.start()
180        debug('... done self._thread.start()')
181
182        if not self._joincancelled:
183            self._jointhread = Finalize(
184                self._thread, Queue._finalize_join,
185                [weakref.ref(self._thread)],
186                exitpriority=-5
187                )
188
189        # Send sentinel to the thread queue object when garbage collected
190        self._close = Finalize(
191            self, Queue._finalize_close,
192            [self._buffer, self._notempty],
193            exitpriority=10
194            )
195
196    @staticmethod
197    def _finalize_join(twr):
198        debug('joining queue thread')
199        thread = twr()
200        if thread is not None:
201            thread.join()
202            debug('... queue thread joined')
203        else:
204            debug('... queue thread already dead')
205
206    @staticmethod
207    def _finalize_close(buffer, notempty):
208        debug('telling queue thread to quit')
209        with notempty:
210            buffer.append(_sentinel)
211            notempty.notify()
212
213    @staticmethod
214    def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
215              onerror, queue_sem):
216        debug('starting thread to feed data to pipe')
217        nacquire = notempty.acquire
218        nrelease = notempty.release
219        nwait = notempty.wait
220        bpopleft = buffer.popleft
221        sentinel = _sentinel
222        if sys.platform != 'win32':
223            wacquire = writelock.acquire
224            wrelease = writelock.release
225        else:
226            wacquire = None
227
228        while 1:
229            try:
230                nacquire()
231                try:
232                    if not buffer:
233                        nwait()
234                finally:
235                    nrelease()
236                try:
237                    while 1:
238                        obj = bpopleft()
239                        if obj is sentinel:
240                            debug('feeder thread got sentinel -- exiting')
241                            close()
242                            return
243
244                        # serialize the data before acquiring the lock
245                        obj = _ForkingPickler.dumps(obj)
246                        if wacquire is None:
247                            send_bytes(obj)
248                        else:
249                            wacquire()
250                            try:
251                                send_bytes(obj)
252                            finally:
253                                wrelease()
254                except IndexError:
255                    pass
256            except Exception as e:
257                if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
258                    return
259                # Since this runs in a daemon thread the resources it uses
260                # may be become unusable while the process is cleaning up.
261                # We ignore errors which happen after the process has
262                # started to cleanup.
263                if is_exiting():
264                    info('error in queue thread: %s', e)
265                    return
266                else:
267                    # Since the object has not been sent in the queue, we need
268                    # to decrease the size of the queue. The error acts as
269                    # if the object had been silently removed from the queue
270                    # and this step is necessary to have a properly working
271                    # queue.
272                    queue_sem.release()
273                    onerror(e, obj)
274
275    @staticmethod
276    def _on_queue_feeder_error(e, obj):
277        """
278        Private API hook called when feeding data in the background thread
279        raises an exception.  For overriding by concurrent.futures.
280        """
281        import traceback
282        traceback.print_exc()
283
284
285_sentinel = object()
286
287#
288# A queue type which also supports join() and task_done() methods
289#
290# Note that if you do not call task_done() for each finished task then
291# eventually the counter's semaphore may overflow causing Bad Things
292# to happen.
293#
294
295class JoinableQueue(Queue):
296
297    def __init__(self, maxsize=0, *, ctx):
298        Queue.__init__(self, maxsize, ctx=ctx)
299        self._unfinished_tasks = ctx.Semaphore(0)
300        self._cond = ctx.Condition()
301
302    def __getstate__(self):
303        return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
304
305    def __setstate__(self, state):
306        Queue.__setstate__(self, state[:-2])
307        self._cond, self._unfinished_tasks = state[-2:]
308
309    def put(self, obj, block=True, timeout=None):
310        if self._closed:
311            raise ValueError(f"Queue {self!r} is closed")
312        if not self._sem.acquire(block, timeout):
313            raise Full
314
315        with self._notempty, self._cond:
316            if self._thread is None:
317                self._start_thread()
318            self._buffer.append(obj)
319            self._unfinished_tasks.release()
320            self._notempty.notify()
321
322    def task_done(self):
323        with self._cond:
324            if not self._unfinished_tasks.acquire(False):
325                raise ValueError('task_done() called too many times')
326            if self._unfinished_tasks._semlock._is_zero():
327                self._cond.notify_all()
328
329    def join(self):
330        with self._cond:
331            if not self._unfinished_tasks._semlock._is_zero():
332                self._cond.wait()
333
334#
335# Simplified Queue type -- really just a locked pipe
336#
337
338class SimpleQueue(object):
339
340    def __init__(self, *, ctx):
341        self._reader, self._writer = connection.Pipe(duplex=False)
342        self._rlock = ctx.Lock()
343        self._poll = self._reader.poll
344        if sys.platform == 'win32':
345            self._wlock = None
346        else:
347            self._wlock = ctx.Lock()
348
349    def close(self):
350        self._reader.close()
351        self._writer.close()
352
353    def empty(self):
354        return not self._poll()
355
356    def __getstate__(self):
357        context.assert_spawning(self)
358        return (self._reader, self._writer, self._rlock, self._wlock)
359
360    def __setstate__(self, state):
361        (self._reader, self._writer, self._rlock, self._wlock) = state
362        self._poll = self._reader.poll
363
364    def get(self):
365        with self._rlock:
366            res = self._reader.recv_bytes()
367        # unserialize the data after having released the lock
368        return _ForkingPickler.loads(res)
369
370    def put(self, obj):
371        # serialize the data before acquiring the lock
372        obj = _ForkingPickler.dumps(obj)
373        if self._wlock is None:
374            # writes to a message oriented win32 pipe are atomic
375            self._writer.send_bytes(obj)
376        else:
377            with self._wlock:
378                self._writer.send_bytes(obj)
379
380    __class_getitem__ = classmethod(types.GenericAlias)
381