• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module implementing queues
3#
4# multiprocessing/queues.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue']
11
12import sys
13import os
14import threading
15import collections
16import time
17import types
18import weakref
19import errno
20
21from queue import Empty, Full
22
23from . import connection
24from . import context
25_ForkingPickler = context.reduction.ForkingPickler
26
27from .util import debug, info, Finalize, register_after_fork, is_exiting
28
29#
30# Queue type using a pipe, buffer and thread
31#
32
33class Queue(object):
34
35    def __init__(self, maxsize=0, *, ctx):
36        if maxsize <= 0:
37            # Can raise ImportError (see issues #3770 and #23400)
38            from .synchronize import SEM_VALUE_MAX as maxsize
39        self._maxsize = maxsize
40        self._reader, self._writer = connection.Pipe(duplex=False)
41        self._rlock = ctx.Lock()
42        self._opid = os.getpid()
43        if sys.platform == 'win32':
44            self._wlock = None
45        else:
46            self._wlock = ctx.Lock()
47        self._sem = ctx.BoundedSemaphore(maxsize)
48        # For use by concurrent.futures
49        self._ignore_epipe = False
50        self._reset()
51
52        if sys.platform != 'win32':
53            register_after_fork(self, Queue._after_fork)
54
55    def __getstate__(self):
56        context.assert_spawning(self)
57        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
58                self._rlock, self._wlock, self._sem, self._opid)
59
60    def __setstate__(self, state):
61        (self._ignore_epipe, self._maxsize, self._reader, self._writer,
62         self._rlock, self._wlock, self._sem, self._opid) = state
63        self._reset()
64
65    def _after_fork(self):
66        debug('Queue._after_fork()')
67        self._reset(after_fork=True)
68
69    def _reset(self, after_fork=False):
70        if after_fork:
71            self._notempty._at_fork_reinit()
72        else:
73            self._notempty = threading.Condition(threading.Lock())
74        self._buffer = collections.deque()
75        self._thread = None
76        self._jointhread = None
77        self._joincancelled = False
78        self._closed = False
79        self._close = None
80        self._send_bytes = self._writer.send_bytes
81        self._recv_bytes = self._reader.recv_bytes
82        self._poll = self._reader.poll
83
84    def put(self, obj, block=True, timeout=None):
85        if self._closed:
86            raise ValueError(f"Queue {self!r} is closed")
87        if not self._sem.acquire(block, timeout):
88            raise Full
89
90        with self._notempty:
91            if self._thread is None:
92                self._start_thread()
93            self._buffer.append(obj)
94            self._notempty.notify()
95
96    def get(self, block=True, timeout=None):
97        if self._closed:
98            raise ValueError(f"Queue {self!r} is closed")
99        if block and timeout is None:
100            with self._rlock:
101                res = self._recv_bytes()
102            self._sem.release()
103        else:
104            if block:
105                deadline = time.monotonic() + timeout
106            if not self._rlock.acquire(block, timeout):
107                raise Empty
108            try:
109                if block:
110                    timeout = deadline - time.monotonic()
111                    if not self._poll(timeout):
112                        raise Empty
113                elif not self._poll():
114                    raise Empty
115                res = self._recv_bytes()
116                self._sem.release()
117            finally:
118                self._rlock.release()
119        # unserialize the data after having released the lock
120        return _ForkingPickler.loads(res)
121
122    def qsize(self):
123        # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
124        return self._maxsize - self._sem._semlock._get_value()
125
126    def empty(self):
127        return not self._poll()
128
129    def full(self):
130        return self._sem._semlock._is_zero()
131
132    def get_nowait(self):
133        return self.get(False)
134
135    def put_nowait(self, obj):
136        return self.put(obj, False)
137
138    def close(self):
139        self._closed = True
140        close = self._close
141        if close:
142            self._close = None
143            close()
144
145    def join_thread(self):
146        debug('Queue.join_thread()')
147        assert self._closed, "Queue {0!r} not closed".format(self)
148        if self._jointhread:
149            self._jointhread()
150
151    def cancel_join_thread(self):
152        debug('Queue.cancel_join_thread()')
153        self._joincancelled = True
154        try:
155            self._jointhread.cancel()
156        except AttributeError:
157            pass
158
159    def _terminate_broken(self):
160        # Close a Queue on error.
161
162        # gh-94777: Prevent queue writing to a pipe which is no longer read.
163        self._reader.close()
164
165        # gh-107219: Close the connection writer which can unblock
166        # Queue._feed() if it was stuck in send_bytes().
167        if sys.platform == 'win32':
168            self._writer.close()
169
170        self.close()
171        self.join_thread()
172
173    def _start_thread(self):
174        debug('Queue._start_thread()')
175
176        # Start thread which transfers data from buffer to pipe
177        self._buffer.clear()
178        self._thread = threading.Thread(
179            target=Queue._feed,
180            args=(self._buffer, self._notempty, self._send_bytes,
181                  self._wlock, self._reader.close, self._writer.close,
182                  self._ignore_epipe, self._on_queue_feeder_error,
183                  self._sem),
184            name='QueueFeederThread',
185            daemon=True,
186        )
187
188        try:
189            debug('doing self._thread.start()')
190            self._thread.start()
191            debug('... done self._thread.start()')
192        except:
193            # gh-109047: During Python finalization, creating a thread
194            # can fail with RuntimeError.
195            self._thread = None
196            raise
197
198        if not self._joincancelled:
199            self._jointhread = Finalize(
200                self._thread, Queue._finalize_join,
201                [weakref.ref(self._thread)],
202                exitpriority=-5
203                )
204
205        # Send sentinel to the thread queue object when garbage collected
206        self._close = Finalize(
207            self, Queue._finalize_close,
208            [self._buffer, self._notempty],
209            exitpriority=10
210            )
211
212    @staticmethod
213    def _finalize_join(twr):
214        debug('joining queue thread')
215        thread = twr()
216        if thread is not None:
217            thread.join()
218            debug('... queue thread joined')
219        else:
220            debug('... queue thread already dead')
221
222    @staticmethod
223    def _finalize_close(buffer, notempty):
224        debug('telling queue thread to quit')
225        with notempty:
226            buffer.append(_sentinel)
227            notempty.notify()
228
229    @staticmethod
230    def _feed(buffer, notempty, send_bytes, writelock, reader_close,
231              writer_close, ignore_epipe, onerror, queue_sem):
232        debug('starting thread to feed data to pipe')
233        nacquire = notempty.acquire
234        nrelease = notempty.release
235        nwait = notempty.wait
236        bpopleft = buffer.popleft
237        sentinel = _sentinel
238        if sys.platform != 'win32':
239            wacquire = writelock.acquire
240            wrelease = writelock.release
241        else:
242            wacquire = None
243
244        while 1:
245            try:
246                nacquire()
247                try:
248                    if not buffer:
249                        nwait()
250                finally:
251                    nrelease()
252                try:
253                    while 1:
254                        obj = bpopleft()
255                        if obj is sentinel:
256                            debug('feeder thread got sentinel -- exiting')
257                            reader_close()
258                            writer_close()
259                            return
260
261                        # serialize the data before acquiring the lock
262                        obj = _ForkingPickler.dumps(obj)
263                        if wacquire is None:
264                            send_bytes(obj)
265                        else:
266                            wacquire()
267                            try:
268                                send_bytes(obj)
269                            finally:
270                                wrelease()
271                except IndexError:
272                    pass
273            except Exception as e:
274                if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
275                    return
276                # Since this runs in a daemon thread the resources it uses
277                # may be become unusable while the process is cleaning up.
278                # We ignore errors which happen after the process has
279                # started to cleanup.
280                if is_exiting():
281                    info('error in queue thread: %s', e)
282                    return
283                else:
284                    # Since the object has not been sent in the queue, we need
285                    # to decrease the size of the queue. The error acts as
286                    # if the object had been silently removed from the queue
287                    # and this step is necessary to have a properly working
288                    # queue.
289                    queue_sem.release()
290                    onerror(e, obj)
291
292    @staticmethod
293    def _on_queue_feeder_error(e, obj):
294        """
295        Private API hook called when feeding data in the background thread
296        raises an exception.  For overriding by concurrent.futures.
297        """
298        import traceback
299        traceback.print_exc()
300
301    __class_getitem__ = classmethod(types.GenericAlias)
302
303
304_sentinel = object()
305
306#
307# A queue type which also supports join() and task_done() methods
308#
309# Note that if you do not call task_done() for each finished task then
310# eventually the counter's semaphore may overflow causing Bad Things
311# to happen.
312#
313
314class JoinableQueue(Queue):
315
316    def __init__(self, maxsize=0, *, ctx):
317        Queue.__init__(self, maxsize, ctx=ctx)
318        self._unfinished_tasks = ctx.Semaphore(0)
319        self._cond = ctx.Condition()
320
321    def __getstate__(self):
322        return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
323
324    def __setstate__(self, state):
325        Queue.__setstate__(self, state[:-2])
326        self._cond, self._unfinished_tasks = state[-2:]
327
328    def put(self, obj, block=True, timeout=None):
329        if self._closed:
330            raise ValueError(f"Queue {self!r} is closed")
331        if not self._sem.acquire(block, timeout):
332            raise Full
333
334        with self._notempty, self._cond:
335            if self._thread is None:
336                self._start_thread()
337            self._buffer.append(obj)
338            self._unfinished_tasks.release()
339            self._notempty.notify()
340
341    def task_done(self):
342        with self._cond:
343            if not self._unfinished_tasks.acquire(False):
344                raise ValueError('task_done() called too many times')
345            if self._unfinished_tasks._semlock._is_zero():
346                self._cond.notify_all()
347
348    def join(self):
349        with self._cond:
350            if not self._unfinished_tasks._semlock._is_zero():
351                self._cond.wait()
352
353#
354# Simplified Queue type -- really just a locked pipe
355#
356
357class SimpleQueue(object):
358
359    def __init__(self, *, ctx):
360        self._reader, self._writer = connection.Pipe(duplex=False)
361        self._rlock = ctx.Lock()
362        self._poll = self._reader.poll
363        if sys.platform == 'win32':
364            self._wlock = None
365        else:
366            self._wlock = ctx.Lock()
367
368    def close(self):
369        self._reader.close()
370        self._writer.close()
371
372    def empty(self):
373        return not self._poll()
374
375    def __getstate__(self):
376        context.assert_spawning(self)
377        return (self._reader, self._writer, self._rlock, self._wlock)
378
379    def __setstate__(self, state):
380        (self._reader, self._writer, self._rlock, self._wlock) = state
381        self._poll = self._reader.poll
382
383    def get(self):
384        with self._rlock:
385            res = self._reader.recv_bytes()
386        # unserialize the data after having released the lock
387        return _ForkingPickler.loads(res)
388
389    def put(self, obj):
390        # serialize the data before acquiring the lock
391        obj = _ForkingPickler.dumps(obj)
392        if self._wlock is None:
393            # writes to a message oriented win32 pipe are atomic
394            self._writer.send_bytes(obj)
395        else:
396            with self._wlock:
397                self._writer.send_bytes(obj)
398
399    __class_getitem__ = classmethod(types.GenericAlias)
400