1# 2# Module implementing queues 3# 4# multiprocessing/queues.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 11 12import sys 13import os 14import threading 15import collections 16import time 17import types 18import weakref 19import errno 20 21from queue import Empty, Full 22 23from . import connection 24from . import context 25_ForkingPickler = context.reduction.ForkingPickler 26 27from .util import debug, info, Finalize, register_after_fork, is_exiting 28 29# 30# Queue type using a pipe, buffer and thread 31# 32 33class Queue(object): 34 35 def __init__(self, maxsize=0, *, ctx): 36 if maxsize <= 0: 37 # Can raise ImportError (see issues #3770 and #23400) 38 from .synchronize import SEM_VALUE_MAX as maxsize 39 self._maxsize = maxsize 40 self._reader, self._writer = connection.Pipe(duplex=False) 41 self._rlock = ctx.Lock() 42 self._opid = os.getpid() 43 if sys.platform == 'win32': 44 self._wlock = None 45 else: 46 self._wlock = ctx.Lock() 47 self._sem = ctx.BoundedSemaphore(maxsize) 48 # For use by concurrent.futures 49 self._ignore_epipe = False 50 self._reset() 51 52 if sys.platform != 'win32': 53 register_after_fork(self, Queue._after_fork) 54 55 def __getstate__(self): 56 context.assert_spawning(self) 57 return (self._ignore_epipe, self._maxsize, self._reader, self._writer, 58 self._rlock, self._wlock, self._sem, self._opid) 59 60 def __setstate__(self, state): 61 (self._ignore_epipe, self._maxsize, self._reader, self._writer, 62 self._rlock, self._wlock, self._sem, self._opid) = state 63 self._reset() 64 65 def _after_fork(self): 66 debug('Queue._after_fork()') 67 self._reset(after_fork=True) 68 69 def _reset(self, after_fork=False): 70 if after_fork: 71 self._notempty._at_fork_reinit() 72 else: 73 self._notempty = threading.Condition(threading.Lock()) 74 self._buffer = collections.deque() 75 self._thread = None 76 self._jointhread = None 77 self._joincancelled = False 78 self._closed = False 79 self._close = None 80 self._send_bytes = self._writer.send_bytes 81 self._recv_bytes = self._reader.recv_bytes 82 self._poll = self._reader.poll 83 84 def put(self, obj, block=True, timeout=None): 85 if self._closed: 86 raise ValueError(f"Queue {self!r} is closed") 87 if not self._sem.acquire(block, timeout): 88 raise Full 89 90 with self._notempty: 91 if self._thread is None: 92 self._start_thread() 93 self._buffer.append(obj) 94 self._notempty.notify() 95 96 def get(self, block=True, timeout=None): 97 if self._closed: 98 raise ValueError(f"Queue {self!r} is closed") 99 if block and timeout is None: 100 with self._rlock: 101 res = self._recv_bytes() 102 self._sem.release() 103 else: 104 if block: 105 deadline = time.monotonic() + timeout 106 if not self._rlock.acquire(block, timeout): 107 raise Empty 108 try: 109 if block: 110 timeout = deadline - time.monotonic() 111 if not self._poll(timeout): 112 raise Empty 113 elif not self._poll(): 114 raise Empty 115 res = self._recv_bytes() 116 self._sem.release() 117 finally: 118 self._rlock.release() 119 # unserialize the data after having released the lock 120 return _ForkingPickler.loads(res) 121 122 def qsize(self): 123 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 124 return self._maxsize - self._sem._semlock._get_value() 125 126 def empty(self): 127 return not self._poll() 128 129 def full(self): 130 return self._sem._semlock._is_zero() 131 132 def get_nowait(self): 133 return self.get(False) 134 135 def put_nowait(self, obj): 136 return self.put(obj, False) 137 138 def close(self): 139 self._closed = True 140 close = self._close 141 if close: 142 self._close = None 143 close() 144 145 def join_thread(self): 146 debug('Queue.join_thread()') 147 assert self._closed, "Queue {0!r} not closed".format(self) 148 if self._jointhread: 149 self._jointhread() 150 151 def cancel_join_thread(self): 152 debug('Queue.cancel_join_thread()') 153 self._joincancelled = True 154 try: 155 self._jointhread.cancel() 156 except AttributeError: 157 pass 158 159 def _terminate_broken(self): 160 # Close a Queue on error. 161 162 # gh-94777: Prevent queue writing to a pipe which is no longer read. 163 self._reader.close() 164 165 # gh-107219: Close the connection writer which can unblock 166 # Queue._feed() if it was stuck in send_bytes(). 167 if sys.platform == 'win32': 168 self._writer.close() 169 170 self.close() 171 self.join_thread() 172 173 def _start_thread(self): 174 debug('Queue._start_thread()') 175 176 # Start thread which transfers data from buffer to pipe 177 self._buffer.clear() 178 self._thread = threading.Thread( 179 target=Queue._feed, 180 args=(self._buffer, self._notempty, self._send_bytes, 181 self._wlock, self._reader.close, self._writer.close, 182 self._ignore_epipe, self._on_queue_feeder_error, 183 self._sem), 184 name='QueueFeederThread', 185 daemon=True, 186 ) 187 188 try: 189 debug('doing self._thread.start()') 190 self._thread.start() 191 debug('... done self._thread.start()') 192 except: 193 # gh-109047: During Python finalization, creating a thread 194 # can fail with RuntimeError. 195 self._thread = None 196 raise 197 198 if not self._joincancelled: 199 self._jointhread = Finalize( 200 self._thread, Queue._finalize_join, 201 [weakref.ref(self._thread)], 202 exitpriority=-5 203 ) 204 205 # Send sentinel to the thread queue object when garbage collected 206 self._close = Finalize( 207 self, Queue._finalize_close, 208 [self._buffer, self._notempty], 209 exitpriority=10 210 ) 211 212 @staticmethod 213 def _finalize_join(twr): 214 debug('joining queue thread') 215 thread = twr() 216 if thread is not None: 217 thread.join() 218 debug('... queue thread joined') 219 else: 220 debug('... queue thread already dead') 221 222 @staticmethod 223 def _finalize_close(buffer, notempty): 224 debug('telling queue thread to quit') 225 with notempty: 226 buffer.append(_sentinel) 227 notempty.notify() 228 229 @staticmethod 230 def _feed(buffer, notempty, send_bytes, writelock, reader_close, 231 writer_close, ignore_epipe, onerror, queue_sem): 232 debug('starting thread to feed data to pipe') 233 nacquire = notempty.acquire 234 nrelease = notempty.release 235 nwait = notempty.wait 236 bpopleft = buffer.popleft 237 sentinel = _sentinel 238 if sys.platform != 'win32': 239 wacquire = writelock.acquire 240 wrelease = writelock.release 241 else: 242 wacquire = None 243 244 while 1: 245 try: 246 nacquire() 247 try: 248 if not buffer: 249 nwait() 250 finally: 251 nrelease() 252 try: 253 while 1: 254 obj = bpopleft() 255 if obj is sentinel: 256 debug('feeder thread got sentinel -- exiting') 257 reader_close() 258 writer_close() 259 return 260 261 # serialize the data before acquiring the lock 262 obj = _ForkingPickler.dumps(obj) 263 if wacquire is None: 264 send_bytes(obj) 265 else: 266 wacquire() 267 try: 268 send_bytes(obj) 269 finally: 270 wrelease() 271 except IndexError: 272 pass 273 except Exception as e: 274 if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE: 275 return 276 # Since this runs in a daemon thread the resources it uses 277 # may be become unusable while the process is cleaning up. 278 # We ignore errors which happen after the process has 279 # started to cleanup. 280 if is_exiting(): 281 info('error in queue thread: %s', e) 282 return 283 else: 284 # Since the object has not been sent in the queue, we need 285 # to decrease the size of the queue. The error acts as 286 # if the object had been silently removed from the queue 287 # and this step is necessary to have a properly working 288 # queue. 289 queue_sem.release() 290 onerror(e, obj) 291 292 @staticmethod 293 def _on_queue_feeder_error(e, obj): 294 """ 295 Private API hook called when feeding data in the background thread 296 raises an exception. For overriding by concurrent.futures. 297 """ 298 import traceback 299 traceback.print_exc() 300 301 __class_getitem__ = classmethod(types.GenericAlias) 302 303 304_sentinel = object() 305 306# 307# A queue type which also supports join() and task_done() methods 308# 309# Note that if you do not call task_done() for each finished task then 310# eventually the counter's semaphore may overflow causing Bad Things 311# to happen. 312# 313 314class JoinableQueue(Queue): 315 316 def __init__(self, maxsize=0, *, ctx): 317 Queue.__init__(self, maxsize, ctx=ctx) 318 self._unfinished_tasks = ctx.Semaphore(0) 319 self._cond = ctx.Condition() 320 321 def __getstate__(self): 322 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 323 324 def __setstate__(self, state): 325 Queue.__setstate__(self, state[:-2]) 326 self._cond, self._unfinished_tasks = state[-2:] 327 328 def put(self, obj, block=True, timeout=None): 329 if self._closed: 330 raise ValueError(f"Queue {self!r} is closed") 331 if not self._sem.acquire(block, timeout): 332 raise Full 333 334 with self._notempty, self._cond: 335 if self._thread is None: 336 self._start_thread() 337 self._buffer.append(obj) 338 self._unfinished_tasks.release() 339 self._notempty.notify() 340 341 def task_done(self): 342 with self._cond: 343 if not self._unfinished_tasks.acquire(False): 344 raise ValueError('task_done() called too many times') 345 if self._unfinished_tasks._semlock._is_zero(): 346 self._cond.notify_all() 347 348 def join(self): 349 with self._cond: 350 if not self._unfinished_tasks._semlock._is_zero(): 351 self._cond.wait() 352 353# 354# Simplified Queue type -- really just a locked pipe 355# 356 357class SimpleQueue(object): 358 359 def __init__(self, *, ctx): 360 self._reader, self._writer = connection.Pipe(duplex=False) 361 self._rlock = ctx.Lock() 362 self._poll = self._reader.poll 363 if sys.platform == 'win32': 364 self._wlock = None 365 else: 366 self._wlock = ctx.Lock() 367 368 def close(self): 369 self._reader.close() 370 self._writer.close() 371 372 def empty(self): 373 return not self._poll() 374 375 def __getstate__(self): 376 context.assert_spawning(self) 377 return (self._reader, self._writer, self._rlock, self._wlock) 378 379 def __setstate__(self, state): 380 (self._reader, self._writer, self._rlock, self._wlock) = state 381 self._poll = self._reader.poll 382 383 def get(self): 384 with self._rlock: 385 res = self._reader.recv_bytes() 386 # unserialize the data after having released the lock 387 return _ForkingPickler.loads(res) 388 389 def put(self, obj): 390 # serialize the data before acquiring the lock 391 obj = _ForkingPickler.dumps(obj) 392 if self._wlock is None: 393 # writes to a message oriented win32 pipe are atomic 394 self._writer.send_bytes(obj) 395 else: 396 with self._wlock: 397 self._writer.send_bytes(obj) 398 399 __class_getitem__ = classmethod(types.GenericAlias) 400