1# 2# Module implementing queues 3# 4# multiprocessing/queues.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# All rights reserved. 8# 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 13# 1. Redistributions of source code must retain the above copyright 14# notice, this list of conditions and the following disclaimer. 15# 2. Redistributions in binary form must reproduce the above copyright 16# notice, this list of conditions and the following disclaimer in the 17# documentation and/or other materials provided with the distribution. 18# 3. Neither the name of author nor the names of any contributors may be 19# used to endorse or promote products derived from this software 20# without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32# SUCH DAMAGE. 33# 34 35__all__ = ['Queue', 'SimpleQueue', 'JoinableQueue'] 36 37import sys 38import os 39import threading 40import collections 41import time 42import atexit 43import weakref 44 45from Queue import Empty, Full 46import _multiprocessing 47from . import Pipe 48from .synchronize import Lock, BoundedSemaphore, Semaphore, Condition 49from .util import debug, info, Finalize, register_after_fork, is_exiting 50from .forking import assert_spawning 51 52# 53# Queue type using a pipe, buffer and thread 54# 55 56class Queue(object): 57 58 def __init__(self, maxsize=0): 59 if maxsize <= 0: 60 maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 61 self._maxsize = maxsize 62 self._reader, self._writer = Pipe(duplex=False) 63 self._rlock = Lock() 64 self._opid = os.getpid() 65 if sys.platform == 'win32': 66 self._wlock = None 67 else: 68 self._wlock = Lock() 69 self._sem = BoundedSemaphore(maxsize) 70 71 self._after_fork() 72 73 if sys.platform != 'win32': 74 register_after_fork(self, Queue._after_fork) 75 76 def __getstate__(self): 77 assert_spawning(self) 78 return (self._maxsize, self._reader, self._writer, 79 self._rlock, self._wlock, self._sem, self._opid) 80 81 def __setstate__(self, state): 82 (self._maxsize, self._reader, self._writer, 83 self._rlock, self._wlock, self._sem, self._opid) = state 84 self._after_fork() 85 86 def _after_fork(self): 87 debug('Queue._after_fork()') 88 self._notempty = threading.Condition(threading.Lock()) 89 self._buffer = collections.deque() 90 self._thread = None 91 self._jointhread = None 92 self._joincancelled = False 93 self._closed = False 94 self._close = None 95 self._send = self._writer.send 96 self._recv = self._reader.recv 97 self._poll = self._reader.poll 98 99 def put(self, obj, block=True, timeout=None): 100 assert not self._closed 101 if not self._sem.acquire(block, timeout): 102 raise Full 103 104 self._notempty.acquire() 105 try: 106 if self._thread is None: 107 self._start_thread() 108 self._buffer.append(obj) 109 self._notempty.notify() 110 finally: 111 self._notempty.release() 112 113 def get(self, block=True, timeout=None): 114 if block and timeout is None: 115 self._rlock.acquire() 116 try: 117 res = self._recv() 118 self._sem.release() 119 return res 120 finally: 121 self._rlock.release() 122 123 else: 124 if block: 125 deadline = time.time() + timeout 126 if not self._rlock.acquire(block, timeout): 127 raise Empty 128 try: 129 if block: 130 timeout = deadline - time.time() 131 if timeout < 0 or not self._poll(timeout): 132 raise Empty 133 elif not self._poll(): 134 raise Empty 135 res = self._recv() 136 self._sem.release() 137 return res 138 finally: 139 self._rlock.release() 140 141 def qsize(self): 142 # Raises NotImplementedError on Mac OSX because of broken sem_getvalue() 143 return self._maxsize - self._sem._semlock._get_value() 144 145 def empty(self): 146 return not self._poll() 147 148 def full(self): 149 return self._sem._semlock._is_zero() 150 151 def get_nowait(self): 152 return self.get(False) 153 154 def put_nowait(self, obj): 155 return self.put(obj, False) 156 157 def close(self): 158 self._closed = True 159 try: 160 self._reader.close() 161 finally: 162 close = self._close 163 if close: 164 self._close = None 165 close() 166 167 def join_thread(self): 168 debug('Queue.join_thread()') 169 assert self._closed 170 if self._jointhread: 171 self._jointhread() 172 173 def cancel_join_thread(self): 174 debug('Queue.cancel_join_thread()') 175 self._joincancelled = True 176 try: 177 self._jointhread.cancel() 178 except AttributeError: 179 pass 180 181 def _start_thread(self): 182 debug('Queue._start_thread()') 183 184 # Start thread which transfers data from buffer to pipe 185 self._buffer.clear() 186 self._thread = threading.Thread( 187 target=Queue._feed, 188 args=(self._buffer, self._notempty, self._send, 189 self._wlock, self._writer.close), 190 name='QueueFeederThread' 191 ) 192 self._thread.daemon = True 193 194 debug('doing self._thread.start()') 195 self._thread.start() 196 debug('... done self._thread.start()') 197 198 # On process exit we will wait for data to be flushed to pipe. 199 if not self._joincancelled: 200 self._jointhread = Finalize( 201 self._thread, Queue._finalize_join, 202 [weakref.ref(self._thread)], 203 exitpriority=-5 204 ) 205 206 # Send sentinel to the thread queue object when garbage collected 207 self._close = Finalize( 208 self, Queue._finalize_close, 209 [self._buffer, self._notempty], 210 exitpriority=10 211 ) 212 213 @staticmethod 214 def _finalize_join(twr): 215 debug('joining queue thread') 216 thread = twr() 217 if thread is not None: 218 thread.join() 219 debug('... queue thread joined') 220 else: 221 debug('... queue thread already dead') 222 223 @staticmethod 224 def _finalize_close(buffer, notempty): 225 debug('telling queue thread to quit') 226 notempty.acquire() 227 try: 228 buffer.append(_sentinel) 229 notempty.notify() 230 finally: 231 notempty.release() 232 233 @staticmethod 234 def _feed(buffer, notempty, send, writelock, close): 235 debug('starting thread to feed data to pipe') 236 nacquire = notempty.acquire 237 nrelease = notempty.release 238 nwait = notempty.wait 239 bpopleft = buffer.popleft 240 sentinel = _sentinel 241 if sys.platform != 'win32': 242 wacquire = writelock.acquire 243 wrelease = writelock.release 244 else: 245 wacquire = None 246 247 try: 248 while 1: 249 nacquire() 250 try: 251 if not buffer: 252 nwait() 253 finally: 254 nrelease() 255 try: 256 while 1: 257 obj = bpopleft() 258 if obj is sentinel: 259 debug('feeder thread got sentinel -- exiting') 260 close() 261 return 262 263 if wacquire is None: 264 send(obj) 265 else: 266 wacquire() 267 try: 268 send(obj) 269 finally: 270 wrelease() 271 except IndexError: 272 pass 273 except Exception, e: 274 # Since this runs in a daemon thread the resources it uses 275 # may be become unusable while the process is cleaning up. 276 # We ignore errors which happen after the process has 277 # started to cleanup. 278 try: 279 if is_exiting(): 280 info('error in queue thread: %s', e) 281 else: 282 import traceback 283 traceback.print_exc() 284 except Exception: 285 pass 286 287_sentinel = object() 288 289# 290# A queue type which also supports join() and task_done() methods 291# 292# Note that if you do not call task_done() for each finished task then 293# eventually the counter's semaphore may overflow causing Bad Things 294# to happen. 295# 296 297class JoinableQueue(Queue): 298 299 def __init__(self, maxsize=0): 300 Queue.__init__(self, maxsize) 301 self._unfinished_tasks = Semaphore(0) 302 self._cond = Condition() 303 304 def __getstate__(self): 305 return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks) 306 307 def __setstate__(self, state): 308 Queue.__setstate__(self, state[:-2]) 309 self._cond, self._unfinished_tasks = state[-2:] 310 311 def put(self, obj, block=True, timeout=None): 312 assert not self._closed 313 if not self._sem.acquire(block, timeout): 314 raise Full 315 316 self._notempty.acquire() 317 self._cond.acquire() 318 try: 319 if self._thread is None: 320 self._start_thread() 321 self._buffer.append(obj) 322 self._unfinished_tasks.release() 323 self._notempty.notify() 324 finally: 325 self._cond.release() 326 self._notempty.release() 327 328 def task_done(self): 329 self._cond.acquire() 330 try: 331 if not self._unfinished_tasks.acquire(False): 332 raise ValueError('task_done() called too many times') 333 if self._unfinished_tasks._semlock._is_zero(): 334 self._cond.notify_all() 335 finally: 336 self._cond.release() 337 338 def join(self): 339 self._cond.acquire() 340 try: 341 if not self._unfinished_tasks._semlock._is_zero(): 342 self._cond.wait() 343 finally: 344 self._cond.release() 345 346# 347# Simplified Queue type -- really just a locked pipe 348# 349 350class SimpleQueue(object): 351 352 def __init__(self): 353 self._reader, self._writer = Pipe(duplex=False) 354 self._rlock = Lock() 355 if sys.platform == 'win32': 356 self._wlock = None 357 else: 358 self._wlock = Lock() 359 self._make_methods() 360 361 def empty(self): 362 return not self._reader.poll() 363 364 def __getstate__(self): 365 assert_spawning(self) 366 return (self._reader, self._writer, self._rlock, self._wlock) 367 368 def __setstate__(self, state): 369 (self._reader, self._writer, self._rlock, self._wlock) = state 370 self._make_methods() 371 372 def _make_methods(self): 373 recv = self._reader.recv 374 racquire, rrelease = self._rlock.acquire, self._rlock.release 375 def get(): 376 racquire() 377 try: 378 return recv() 379 finally: 380 rrelease() 381 self.get = get 382 383 if self._wlock is None: 384 # writes to a message oriented win32 pipe are atomic 385 self.put = self._writer.send 386 else: 387 send = self._writer.send 388 wacquire, wrelease = self._wlock.acquire, self._wlock.release 389 def put(obj): 390 wacquire() 391 try: 392 return send(obj) 393 finally: 394 wrelease() 395 self.put = put 396