1# 2# Module implementing synchronization primitives 3# 4# multiprocessing/synchronize.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# All rights reserved. 8# 9# Redistribution and use in source and binary forms, with or without 10# modification, are permitted provided that the following conditions 11# are met: 12# 13# 1. Redistributions of source code must retain the above copyright 14# notice, this list of conditions and the following disclaimer. 15# 2. Redistributions in binary form must reproduce the above copyright 16# notice, this list of conditions and the following disclaimer in the 17# documentation and/or other materials provided with the distribution. 18# 3. Neither the name of author nor the names of any contributors may be 19# used to endorse or promote products derived from this software 20# without specific prior written permission. 21# 22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND 23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32# SUCH DAMAGE. 33# 34 35__all__ = [ 36 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' 37 ] 38 39import threading 40import os 41import sys 42 43from time import time as _time, sleep as _sleep 44 45import _multiprocessing 46from multiprocessing.process import current_process 47from multiprocessing.util import Finalize, register_after_fork, debug 48from multiprocessing.forking import assert_spawning, Popen 49 50# Try to import the mp.synchronize module cleanly, if it fails 51# raise ImportError for platforms lacking a working sem_open implementation. 52# See issue 3770 53try: 54 from _multiprocessing import SemLock 55except (ImportError): 56 raise ImportError("This platform lacks a functioning sem_open" + 57 " implementation, therefore, the required" + 58 " synchronization primitives needed will not" + 59 " function, see issue 3770.") 60 61# 62# Constants 63# 64 65RECURSIVE_MUTEX, SEMAPHORE = range(2) 66SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX 67 68# 69# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` 70# 71 72class SemLock(object): 73 74 def __init__(self, kind, value, maxvalue): 75 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) 76 debug('created semlock with handle %s' % sl.handle) 77 self._make_methods() 78 79 if sys.platform != 'win32': 80 def _after_fork(obj): 81 obj._semlock._after_fork() 82 register_after_fork(self, _after_fork) 83 84 def _make_methods(self): 85 self.acquire = self._semlock.acquire 86 self.release = self._semlock.release 87 88 def __enter__(self): 89 return self._semlock.__enter__() 90 91 def __exit__(self, *args): 92 return self._semlock.__exit__(*args) 93 94 def __getstate__(self): 95 assert_spawning(self) 96 sl = self._semlock 97 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) 98 99 def __setstate__(self, state): 100 self._semlock = _multiprocessing.SemLock._rebuild(*state) 101 debug('recreated blocker with handle %r' % state[0]) 102 self._make_methods() 103 104# 105# Semaphore 106# 107 108class Semaphore(SemLock): 109 110 def __init__(self, value=1): 111 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) 112 113 def get_value(self): 114 return self._semlock._get_value() 115 116 def __repr__(self): 117 try: 118 value = self._semlock._get_value() 119 except Exception: 120 value = 'unknown' 121 return '<Semaphore(value=%s)>' % value 122 123# 124# Bounded semaphore 125# 126 127class BoundedSemaphore(Semaphore): 128 129 def __init__(self, value=1): 130 SemLock.__init__(self, SEMAPHORE, value, value) 131 132 def __repr__(self): 133 try: 134 value = self._semlock._get_value() 135 except Exception: 136 value = 'unknown' 137 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \ 138 (value, self._semlock.maxvalue) 139 140# 141# Non-recursive lock 142# 143 144class Lock(SemLock): 145 146 def __init__(self): 147 SemLock.__init__(self, SEMAPHORE, 1, 1) 148 149 def __repr__(self): 150 try: 151 if self._semlock._is_mine(): 152 name = current_process().name 153 if threading.current_thread().name != 'MainThread': 154 name += '|' + threading.current_thread().name 155 elif self._semlock._get_value() == 1: 156 name = 'None' 157 elif self._semlock._count() > 0: 158 name = 'SomeOtherThread' 159 else: 160 name = 'SomeOtherProcess' 161 except Exception: 162 name = 'unknown' 163 return '<Lock(owner=%s)>' % name 164 165# 166# Recursive lock 167# 168 169class RLock(SemLock): 170 171 def __init__(self): 172 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) 173 174 def __repr__(self): 175 try: 176 if self._semlock._is_mine(): 177 name = current_process().name 178 if threading.current_thread().name != 'MainThread': 179 name += '|' + threading.current_thread().name 180 count = self._semlock._count() 181 elif self._semlock._get_value() == 1: 182 name, count = 'None', 0 183 elif self._semlock._count() > 0: 184 name, count = 'SomeOtherThread', 'nonzero' 185 else: 186 name, count = 'SomeOtherProcess', 'nonzero' 187 except Exception: 188 name, count = 'unknown', 'unknown' 189 return '<RLock(%s, %s)>' % (name, count) 190 191# 192# Condition variable 193# 194 195class Condition(object): 196 197 def __init__(self, lock=None): 198 self._lock = lock or RLock() 199 self._sleeping_count = Semaphore(0) 200 self._woken_count = Semaphore(0) 201 self._wait_semaphore = Semaphore(0) 202 self._make_methods() 203 204 def __getstate__(self): 205 assert_spawning(self) 206 return (self._lock, self._sleeping_count, 207 self._woken_count, self._wait_semaphore) 208 209 def __setstate__(self, state): 210 (self._lock, self._sleeping_count, 211 self._woken_count, self._wait_semaphore) = state 212 self._make_methods() 213 214 def __enter__(self): 215 return self._lock.__enter__() 216 217 def __exit__(self, *args): 218 return self._lock.__exit__(*args) 219 220 def _make_methods(self): 221 self.acquire = self._lock.acquire 222 self.release = self._lock.release 223 224 def __repr__(self): 225 try: 226 num_waiters = (self._sleeping_count._semlock._get_value() - 227 self._woken_count._semlock._get_value()) 228 except Exception: 229 num_waiters = 'unknown' 230 return '<Condition(%s, %s)>' % (self._lock, num_waiters) 231 232 def wait(self, timeout=None): 233 assert self._lock._semlock._is_mine(), \ 234 'must acquire() condition before using wait()' 235 236 # indicate that this thread is going to sleep 237 self._sleeping_count.release() 238 239 # release lock 240 count = self._lock._semlock._count() 241 for i in xrange(count): 242 self._lock.release() 243 244 try: 245 # wait for notification or timeout 246 self._wait_semaphore.acquire(True, timeout) 247 finally: 248 # indicate that this thread has woken 249 self._woken_count.release() 250 251 # reacquire lock 252 for i in xrange(count): 253 self._lock.acquire() 254 255 def notify(self): 256 assert self._lock._semlock._is_mine(), 'lock is not owned' 257 assert not self._wait_semaphore.acquire(False) 258 259 # to take account of timeouts since last notify() we subtract 260 # woken_count from sleeping_count and rezero woken_count 261 while self._woken_count.acquire(False): 262 res = self._sleeping_count.acquire(False) 263 assert res 264 265 if self._sleeping_count.acquire(False): # try grabbing a sleeper 266 self._wait_semaphore.release() # wake up one sleeper 267 self._woken_count.acquire() # wait for the sleeper to wake 268 269 # rezero _wait_semaphore in case a timeout just happened 270 self._wait_semaphore.acquire(False) 271 272 def notify_all(self): 273 assert self._lock._semlock._is_mine(), 'lock is not owned' 274 assert not self._wait_semaphore.acquire(False) 275 276 # to take account of timeouts since last notify*() we subtract 277 # woken_count from sleeping_count and rezero woken_count 278 while self._woken_count.acquire(False): 279 res = self._sleeping_count.acquire(False) 280 assert res 281 282 sleepers = 0 283 while self._sleeping_count.acquire(False): 284 self._wait_semaphore.release() # wake up one sleeper 285 sleepers += 1 286 287 if sleepers: 288 for i in xrange(sleepers): 289 self._woken_count.acquire() # wait for a sleeper to wake 290 291 # rezero wait_semaphore in case some timeouts just happened 292 while self._wait_semaphore.acquire(False): 293 pass 294 295# 296# Event 297# 298 299class Event(object): 300 301 def __init__(self): 302 self._cond = Condition(Lock()) 303 self._flag = Semaphore(0) 304 305 def is_set(self): 306 self._cond.acquire() 307 try: 308 if self._flag.acquire(False): 309 self._flag.release() 310 return True 311 return False 312 finally: 313 self._cond.release() 314 315 def set(self): 316 self._cond.acquire() 317 try: 318 self._flag.acquire(False) 319 self._flag.release() 320 self._cond.notify_all() 321 finally: 322 self._cond.release() 323 324 def clear(self): 325 self._cond.acquire() 326 try: 327 self._flag.acquire(False) 328 finally: 329 self._cond.release() 330 331 def wait(self, timeout=None): 332 self._cond.acquire() 333 try: 334 if self._flag.acquire(False): 335 self._flag.release() 336 else: 337 self._cond.wait(timeout) 338 339 if self._flag.acquire(False): 340 self._flag.release() 341 return True 342 return False 343 finally: 344 self._cond.release() 345