1# 2# Module implementing queues 3# 4# multiprocessing/queues.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11 12import sys 13import os 14import threading 15import collections 16import time 17import weakref 18import errno 19 20from queue import Empty, Full 21 22import _multiprocessing 23 24from . import connection 25from . import context 26_ForkingPickler = context.reduction.ForkingPickler 27 28from .util import debug, info, Finalize, register_after_fork, is_exiting 29 30# 31# Queue type using a pipe, buffer and thread 32# 33 34class Queue(object): 35 36 def __init__(self, maxsize=0, *, ctx): 37 if maxsize <= 0: 38 # Can raise ImportError (see issues #3770 and #23400) 39 from .synchronize import SEM_VALUE_MAX as maxsize 40 self._maxsize = maxsize 41 self._reader, self._writer = connection.Pipe(duplex=False) 42 self._rlock = ctx.Lock() 43 self._opid = os.getpid() 44 if sys.platform == 'win32': 45 self._wlock = None 46 else: 47 self._wlock = ctx.Lock() 48 self._sem = ctx.BoundedSemaphore(maxsize) 49 # For use by concurrent.futures 50 self._ignore_epipe = False 51 52 self._after_fork() 53 54 if sys.platform != 'win32': 55 register_after_fork(self, Queue._after_fork) 56 57 def __getstate__(self): 58 context.assert_spawning(self) 59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 60 self._rlock, self._wlock, self._sem, self._opid) 61 62 def __setstate__(self, state): 63 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 64 self._rlock, self._wlock, self._sem, self._opid) = state 65 self._after_fork() 66 67 def _after_fork(self): 68 debug('Queue._after_fork()') 69 self._notempty = threading.Condition(threading.Lock()) 70 self._buffer = collections.deque() 71 self._thread = None 72 self._jointhread = None 73 self._joincancelled = False 74 self._closed = False 75 self._close = None 76 self._send_bytes = self._writer.send_bytes 77 self._recv_bytes = self._reader.recv_bytes 78 self._poll = self._reader.poll 79 80 def put(self, obj, block=True, timeout=None): 81 if self._closed: 82 raise ValueError(f"Queue {self!r} is closed") 83 if not self._sem.acquire(block, timeout): 84 raise Full 85 86 with self._notempty: 87 if self._thread is None: 88 self._start_thread() 89 self._buffer.append(obj) 90 self._notempty.notify() 91 92 def get(self, block=True, timeout=None): 93 if self._closed: 94 raise ValueError(f"Queue {self!r} is closed") 95 if block and timeout is None: 96 with self._rlock: 97 res = self._recv_bytes() 98 self._sem.release() 99 else: 100 if block: 101 deadline = time.monotonic() + timeout 102 if not self._rlock.acquire(block, timeout): 103 raise Empty 104 try: 105 if block: 106 timeout = deadline - time.monotonic() 107 if not self._poll(timeout): 108 raise Empty 109 elif not self._poll(): 110 raise Empty 111 res = self._recv_bytes() 112 self._sem.release() 113 finally: 114 self._rlock.release() 115 # unserialize the data after having released the lock 116 return _ForkingPickler.loads(res) 117 118 def qsize(self): 119 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 120 return self._maxsize - self._sem._semlock._get_value() 121 122 def empty(self): 123 return not self._poll() 124 125 def full(self): 126 return self._sem._semlock._is_zero() 127 128 def get_nowait(self): 129 return self.get(False) 130 131 def put_nowait(self, obj): 132 return self.put(obj, False) 133 134 def close(self): 135 self._closed = True 136 try: 137 self._reader.close() 138 finally: 139 close = self._close 140 if close: 141 self._close = None 142 close() 143 144 def join_thread(self): 145 debug('Queue.join_thread()') 146 assert self._closed, "Queue {0!r} not closed".format(self) 147 if self._jointhread: 148 self._jointhread() 149 150 def cancel_join_thread(self): 151 debug('Queue.cancel_join_thread()') 152 self._joincancelled = True 153 try: 154 self._jointhread.cancel() 155 except AttributeError: 156 pass 157 158 def _start_thread(self): 159 debug('Queue._start_thread()') 160 161 # Start thread which transfers data from buffer to pipe 162 self._buffer.clear() 163 self._thread = threading.Thread( 164 target=Queue._feed, 165 args=(self._buffer, self._notempty, self._send_bytes, 166 self._wlock, self._writer.close, self._ignore_epipe, 167 self._on_queue_feeder_error, self._sem), 168 name='QueueFeederThread' 169 ) 170 self._thread.daemon = True 171 172 debug('doing self._thread.start()') 173 self._thread.start() 174 debug('... done self._thread.start()') 175 176 if not self._joincancelled: 177 self._jointhread = Finalize( 178 self._thread, Queue._finalize_join, 179 [weakref.ref(self._thread)], 180 exitpriority=-5 181 ) 182 183 # Send sentinel to the thread queue object when garbage collected 184 self._close = Finalize( 185 self, Queue._finalize_close, 186 [self._buffer, self._notempty], 187 exitpriority=10 188 ) 189 190 @staticmethod 191 def _finalize_join(twr): 192 debug('joining queue thread') 193 thread = twr() 194 if thread is not None: 195 thread.join() 196 debug('... queue thread joined') 197 else: 198 debug('... queue thread already dead') 199 200 @staticmethod 201 def _finalize_close(buffer, notempty): 202 debug('telling queue thread to quit') 203 with notempty: 204 buffer.append(_sentinel) 205 notempty.notify() 206 207 @staticmethod 208 def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, 209 onerror, queue_sem): 210 debug('starting thread to feed data to pipe') 211 nacquire = notempty.acquire 212 nrelease = notempty.release 213 nwait = notempty.wait 214 bpopleft = buffer.popleft 215 sentinel = _sentinel 216 if sys.platform != 'win32': 217 wacquire = writelock.acquire 218 wrelease = writelock.release 219 else: 220 wacquire = None 221 222 while 1: 223 try: 224 nacquire() 225 try: 226 if not buffer: 227 nwait() 228 finally: 229 nrelease() 230 try: 231 while 1: 232 obj = bpopleft() 233 if obj is sentinel: 234 debug('feeder thread got sentinel -- exiting') 235 close() 236 return 237 238 # serialize the data before acquiring the lock 239 obj = _ForkingPickler.dumps(obj) 240 if wacquire is None: 241 send_bytes(obj) 242 else: 243 wacquire() 244 try: 245 send_bytes(obj) 246 finally: 247 wrelease() 248 except IndexError: 249 pass 250 except Exception as e: 251 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 252 return 253 # Since this runs in a daemon thread the resources it uses 254 # may be become unusable while the process is cleaning up. 255 # We ignore errors which happen after the process has 256 # started to cleanup. 257 if is_exiting(): 258 info('error in queue thread: %s', e) 259 return 260 else: 261 # Since the object has not been sent in the queue, we need 262 # to decrease the size of the queue. The error acts as 263 # if the object had been silently removed from the queue 264 # and this step is necessary to have a properly working 265 # queue. 266 queue_sem.release() 267 onerror(e, obj) 268 269 @staticmethod 270 def _on_queue_feeder_error(e, obj): 271 """ 272 Private API hook called when feeding data in the background thread 273 raises an exception. For overriding by concurrent.futures. 274 """ 275 import traceback 276 traceback.print_exc() 277 278 279_sentinel = object() 280 281# 282# A queue type which also supports join() and task_done() methods 283# 284# Note that if you do not call task_done() for each finished task then 285# eventually the counter's semaphore may overflow causing Bad Things 286# to happen. 287# 288 289class JoinableQueue(Queue): 290 291 def __init__(self, maxsize=0, *, ctx): 292 Queue.__init__(self, maxsize, ctx=ctx) 293 self._unfinished_tasks = ctx.Semaphore(0) 294 self._cond = ctx.Condition() 295 296 def __getstate__(self): 297 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 298 299 def __setstate__(self, state): 300 Queue.__setstate__(self, state[:-2]) 301 self._cond, self._unfinished_tasks = state[-2:] 302 303 def put(self, obj, block=True, timeout=None): 304 if self._closed: 305 raise ValueError(f"Queue {self!r} is closed") 306 if not self._sem.acquire(block, timeout): 307 raise Full 308 309 with self._notempty, self._cond: 310 if self._thread is None: 311 self._start_thread() 312 self._buffer.append(obj) 313 self._unfinished_tasks.release() 314 self._notempty.notify() 315 316 def task_done(self): 317 with self._cond: 318 if not self._unfinished_tasks.acquire(False): 319 raise ValueError('task_done() called too many times') 320 if self._unfinished_tasks._semlock._is_zero(): 321 self._cond.notify_all() 322 323 def join(self): 324 with self._cond: 325 if not self._unfinished_tasks._semlock._is_zero(): 326 self._cond.wait() 327 328# 329# Simplified Queue type -- really just a locked pipe 330# 331 332class SimpleQueue(object): 333 334 def __init__(self, *, ctx): 335 self._reader, self._writer = connection.Pipe(duplex=False) 336 self._rlock = ctx.Lock() 337 self._poll = self._reader.poll 338 if sys.platform == 'win32': 339 self._wlock = None 340 else: 341 self._wlock = ctx.Lock() 342 343 def empty(self): 344 return not self._poll() 345 346 def __getstate__(self): 347 context.assert_spawning(self) 348 return (self._reader, self._writer, self._rlock, self._wlock) 349 350 def __setstate__(self, state): 351 (self._reader, self._writer, self._rlock, self._wlock) = state 352 self._poll = self._reader.poll 353 354 def get(self): 355 with self._rlock: 356 res = self._reader.recv_bytes() 357 # unserialize the data after having released the lock 358 return _ForkingPickler.loads(res) 359 360 def put(self, obj): 361 # serialize the data before acquiring the lock 362 obj = _ForkingPickler.dumps(obj) 363 if self._wlock is None: 364 # writes to a message oriented win32 pipe are atomic 365 self._writer.send_bytes(obj) 366 else: 367 with self._wlock: 368 self._writer.send_bytes(obj) 369