1# 2# Module implementing queues 3# 4# multiprocessing/queues.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11 12import sys 13import os 14import threading 15import collections 16import time 17import types 18import weakref 19import errno 20 21from queue import Empty, Full 22 23import _multiprocessing 24 25from . import connection 26from . import context 27_ForkingPickler = context.reduction.ForkingPickler 28 29from .util import debug, info, Finalize, register_after_fork, is_exiting 30 31# 32# Queue type using a pipe, buffer and thread 33# 34 35class Queue(object): 36 37 def __init__(self, maxsize=0, *, ctx): 38 if maxsize <= 0: 39 # Can raise ImportError (see issues #3770 and #23400) 40 from .synchronize import SEM_VALUE_MAX as maxsize 41 self._maxsize = maxsize 42 self._reader, self._writer = connection.Pipe(duplex=False) 43 self._rlock = ctx.Lock() 44 self._opid = os.getpid() 45 if sys.platform == 'win32': 46 self._wlock = None 47 else: 48 self._wlock = ctx.Lock() 49 self._sem = ctx.BoundedSemaphore(maxsize) 50 # For use by concurrent.futures 51 self._ignore_epipe = False 52 self._reset() 53 54 if sys.platform != 'win32': 55 register_after_fork(self, Queue._after_fork) 56 57 def __getstate__(self): 58 context.assert_spawning(self) 59 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 60 self._rlock, self._wlock, self._sem, self._opid) 61 62 def __setstate__(self, state): 63 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 64 self._rlock, self._wlock, self._sem, self._opid) = state 65 self._reset() 66 67 def _after_fork(self): 68 debug('Queue._after_fork()') 69 self._reset(after_fork=True) 70 71 def _reset(self, after_fork=False): 72 if after_fork: 73 self._notempty._at_fork_reinit() 74 else: 75 self._notempty = threading.Condition(threading.Lock()) 76 self._buffer = collections.deque() 77 self._thread = None 78 self._jointhread = None 79 self._joincancelled = False 80 self._closed = False 81 self._close = None 82 self._send_bytes = self._writer.send_bytes 83 self._recv_bytes = self._reader.recv_bytes 84 self._poll = self._reader.poll 85 86 def put(self, obj, block=True, timeout=None): 87 if self._closed: 88 raise ValueError(f"Queue {self!r} is closed") 89 if not self._sem.acquire(block, timeout): 90 raise Full 91 92 with self._notempty: 93 if self._thread is None: 94 self._start_thread() 95 self._buffer.append(obj) 96 self._notempty.notify() 97 98 def get(self, block=True, timeout=None): 99 if self._closed: 100 raise ValueError(f"Queue {self!r} is closed") 101 if block and timeout is None: 102 with self._rlock: 103 res = self._recv_bytes() 104 self._sem.release() 105 else: 106 if block: 107 deadline = time.monotonic() + timeout 108 if not self._rlock.acquire(block, timeout): 109 raise Empty 110 try: 111 if block: 112 timeout = deadline - time.monotonic() 113 if not self._poll(timeout): 114 raise Empty 115 elif not self._poll(): 116 raise Empty 117 res = self._recv_bytes() 118 self._sem.release() 119 finally: 120 self._rlock.release() 121 # unserialize the data after having released the lock 122 return _ForkingPickler.loads(res) 123 124 def qsize(self): 125 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 126 return self._maxsize - self._sem._semlock._get_value() 127 128 def empty(self): 129 return not self._poll() 130 131 def full(self): 132 return self._sem._semlock._is_zero() 133 134 def get_nowait(self): 135 return self.get(False) 136 137 def put_nowait(self, obj): 138 return self.put(obj, False) 139 140 def close(self): 141 self._closed = True 142 try: 143 self._reader.close() 144 finally: 145 close = self._close 146 if close: 147 self._close = None 148 close() 149 150 def join_thread(self): 151 debug('Queue.join_thread()') 152 assert self._closed, "Queue {0!r} not closed".format(self) 153 if self._jointhread: 154 self._jointhread() 155 156 def cancel_join_thread(self): 157 debug('Queue.cancel_join_thread()') 158 self._joincancelled = True 159 try: 160 self._jointhread.cancel() 161 except AttributeError: 162 pass 163 164 def _start_thread(self): 165 debug('Queue._start_thread()') 166 167 # Start thread which transfers data from buffer to pipe 168 self._buffer.clear() 169 self._thread = threading.Thread( 170 target=Queue._feed, 171 args=(self._buffer, self._notempty, self._send_bytes, 172 self._wlock, self._writer.close, self._ignore_epipe, 173 self._on_queue_feeder_error, self._sem), 174 name='QueueFeederThread' 175 ) 176 self._thread.daemon = True 177 178 debug('doing self._thread.start()') 179 self._thread.start() 180 debug('... done self._thread.start()') 181 182 if not self._joincancelled: 183 self._jointhread = Finalize( 184 self._thread, Queue._finalize_join, 185 [weakref.ref(self._thread)], 186 exitpriority=-5 187 ) 188 189 # Send sentinel to the thread queue object when garbage collected 190 self._close = Finalize( 191 self, Queue._finalize_close, 192 [self._buffer, self._notempty], 193 exitpriority=10 194 ) 195 196 @staticmethod 197 def _finalize_join(twr): 198 debug('joining queue thread') 199 thread = twr() 200 if thread is not None: 201 thread.join() 202 debug('... queue thread joined') 203 else: 204 debug('... queue thread already dead') 205 206 @staticmethod 207 def _finalize_close(buffer, notempty): 208 debug('telling queue thread to quit') 209 with notempty: 210 buffer.append(_sentinel) 211 notempty.notify() 212 213 @staticmethod 214 def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, 215 onerror, queue_sem): 216 debug('starting thread to feed data to pipe') 217 nacquire = notempty.acquire 218 nrelease = notempty.release 219 nwait = notempty.wait 220 bpopleft = buffer.popleft 221 sentinel = _sentinel 222 if sys.platform != 'win32': 223 wacquire = writelock.acquire 224 wrelease = writelock.release 225 else: 226 wacquire = None 227 228 while 1: 229 try: 230 nacquire() 231 try: 232 if not buffer: 233 nwait() 234 finally: 235 nrelease() 236 try: 237 while 1: 238 obj = bpopleft() 239 if obj is sentinel: 240 debug('feeder thread got sentinel -- exiting') 241 close() 242 return 243 244 # serialize the data before acquiring the lock 245 obj = _ForkingPickler.dumps(obj) 246 if wacquire is None: 247 send_bytes(obj) 248 else: 249 wacquire() 250 try: 251 send_bytes(obj) 252 finally: 253 wrelease() 254 except IndexError: 255 pass 256 except Exception as e: 257 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 258 return 259 # Since this runs in a daemon thread the resources it uses 260 # may be become unusable while the process is cleaning up. 261 # We ignore errors which happen after the process has 262 # started to cleanup. 263 if is_exiting(): 264 info('error in queue thread: %s', e) 265 return 266 else: 267 # Since the object has not been sent in the queue, we need 268 # to decrease the size of the queue. The error acts as 269 # if the object had been silently removed from the queue 270 # and this step is necessary to have a properly working 271 # queue. 272 queue_sem.release() 273 onerror(e, obj) 274 275 @staticmethod 276 def _on_queue_feeder_error(e, obj): 277 """ 278 Private API hook called when feeding data in the background thread 279 raises an exception. For overriding by concurrent.futures. 280 """ 281 import traceback 282 traceback.print_exc() 283 284 285_sentinel = object() 286 287# 288# A queue type which also supports join() and task_done() methods 289# 290# Note that if you do not call task_done() for each finished task then 291# eventually the counter's semaphore may overflow causing Bad Things 292# to happen. 293# 294 295class JoinableQueue(Queue): 296 297 def __init__(self, maxsize=0, *, ctx): 298 Queue.__init__(self, maxsize, ctx=ctx) 299 self._unfinished_tasks = ctx.Semaphore(0) 300 self._cond = ctx.Condition() 301 302 def __getstate__(self): 303 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 304 305 def __setstate__(self, state): 306 Queue.__setstate__(self, state[:-2]) 307 self._cond, self._unfinished_tasks = state[-2:] 308 309 def put(self, obj, block=True, timeout=None): 310 if self._closed: 311 raise ValueError(f"Queue {self!r} is closed") 312 if not self._sem.acquire(block, timeout): 313 raise Full 314 315 with self._notempty, self._cond: 316 if self._thread is None: 317 self._start_thread() 318 self._buffer.append(obj) 319 self._unfinished_tasks.release() 320 self._notempty.notify() 321 322 def task_done(self): 323 with self._cond: 324 if not self._unfinished_tasks.acquire(False): 325 raise ValueError('task_done() called too many times') 326 if self._unfinished_tasks._semlock._is_zero(): 327 self._cond.notify_all() 328 329 def join(self): 330 with self._cond: 331 if not self._unfinished_tasks._semlock._is_zero(): 332 self._cond.wait() 333 334# 335# Simplified Queue type -- really just a locked pipe 336# 337 338class SimpleQueue(object): 339 340 def __init__(self, *, ctx): 341 self._reader, self._writer = connection.Pipe(duplex=False) 342 self._rlock = ctx.Lock() 343 self._poll = self._reader.poll 344 if sys.platform == 'win32': 345 self._wlock = None 346 else: 347 self._wlock = ctx.Lock() 348 349 def close(self): 350 self._reader.close() 351 self._writer.close() 352 353 def empty(self): 354 return not self._poll() 355 356 def __getstate__(self): 357 context.assert_spawning(self) 358 return (self._reader, self._writer, self._rlock, self._wlock) 359 360 def __setstate__(self, state): 361 (self._reader, self._writer, self._rlock, self._wlock) = state 362 self._poll = self._reader.poll 363 364 def get(self): 365 with self._rlock: 366 res = self._reader.recv_bytes() 367 # unserialize the data after having released the lock 368 return _ForkingPickler.loads(res) 369 370 def put(self, obj): 371 # serialize the data before acquiring the lock 372 obj = _ForkingPickler.dumps(obj) 373 if self._wlock is None: 374 # writes to a message oriented win32 pipe are atomic 375 self._writer.send_bytes(obj) 376 else: 377 with self._wlock: 378 self._writer.send_bytes(obj) 379 380 __class_getitem__ = classmethod(types.GenericAlias) 381