1# 2# Module implementing synchronization primitives 3# 4# multiprocessing/synchronize.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10__all__ = [ 11 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' 12 ] 13 14import threading 15import sys 16import tempfile 17import _multiprocessing 18import time 19 20from . import context 21from . import process 22from . import util 23 24# Try to import the mp.synchronize module cleanly, if it fails 25# raise ImportError for platforms lacking a working sem_open implementation. 26# See issue 3770 27try: 28 from _multiprocessing import SemLock, sem_unlink 29except (ImportError): 30 raise ImportError("This platform lacks a functioning sem_open" + 31 " implementation, therefore, the required" + 32 " synchronization primitives needed will not" + 33 " function, see issue 3770.") 34 35# 36# Constants 37# 38 39RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) 40SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX 41 42# 43# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` 44# 45 46class SemLock(object): 47 48 _rand = tempfile._RandomNameSequence() 49 50 def __init__(self, kind, value, maxvalue, *, ctx): 51 if ctx is None: 52 ctx = context._default_context.get_context() 53 name = ctx.get_start_method() 54 unlink_now = sys.platform == 'win32' or name == 'fork' 55 for i in range(100): 56 try: 57 sl = self._semlock = _multiprocessing.SemLock( 58 kind, value, maxvalue, self._make_name(), 59 unlink_now) 60 except FileExistsError: 61 pass 62 else: 63 break 64 else: 65 raise FileExistsError('cannot find name for semaphore') 66 67 util.debug('created semlock with handle %s' % sl.handle) 68 self._make_methods() 69 70 if sys.platform != 'win32': 71 def _after_fork(obj): 72 obj._semlock._after_fork() 73 util.register_after_fork(self, _after_fork) 74 75 if self._semlock.name is not None: 76 # We only get here if we are on Unix with forking 77 # disabled. When the object is garbage collected or the 78 # process shuts down we unlink the semaphore name 79 from .resource_tracker import register 80 register(self._semlock.name, "semaphore") 81 util.Finalize(self, SemLock._cleanup, (self._semlock.name,), 82 exitpriority=0) 83 84 @staticmethod 85 def _cleanup(name): 86 from .resource_tracker import unregister 87 sem_unlink(name) 88 unregister(name, "semaphore") 89 90 def _make_methods(self): 91 self.acquire = self._semlock.acquire 92 self.release = self._semlock.release 93 94 def __enter__(self): 95 return self._semlock.__enter__() 96 97 def __exit__(self, *args): 98 return self._semlock.__exit__(*args) 99 100 def __getstate__(self): 101 context.assert_spawning(self) 102 sl = self._semlock 103 if sys.platform == 'win32': 104 h = context.get_spawning_popen().duplicate_for_child(sl.handle) 105 else: 106 h = sl.handle 107 return (h, sl.kind, sl.maxvalue, sl.name) 108 109 def __setstate__(self, state): 110 self._semlock = _multiprocessing.SemLock._rebuild(*state) 111 util.debug('recreated blocker with handle %r' % state[0]) 112 self._make_methods() 113 114 @staticmethod 115 def _make_name(): 116 return '%s-%s' % (process.current_process()._config['semprefix'], 117 next(SemLock._rand)) 118 119# 120# Semaphore 121# 122 123class Semaphore(SemLock): 124 125 def __init__(self, value=1, *, ctx): 126 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx) 127 128 def get_value(self): 129 return self._semlock._get_value() 130 131 def __repr__(self): 132 try: 133 value = self._semlock._get_value() 134 except Exception: 135 value = 'unknown' 136 return '<%s(value=%s)>' % (self.__class__.__name__, value) 137 138# 139# Bounded semaphore 140# 141 142class BoundedSemaphore(Semaphore): 143 144 def __init__(self, value=1, *, ctx): 145 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx) 146 147 def __repr__(self): 148 try: 149 value = self._semlock._get_value() 150 except Exception: 151 value = 'unknown' 152 return '<%s(value=%s, maxvalue=%s)>' % \ 153 (self.__class__.__name__, value, self._semlock.maxvalue) 154 155# 156# Non-recursive lock 157# 158 159class Lock(SemLock): 160 161 def __init__(self, *, ctx): 162 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) 163 164 def __repr__(self): 165 try: 166 if self._semlock._is_mine(): 167 name = process.current_process().name 168 if threading.current_thread().name != 'MainThread': 169 name += '|' + threading.current_thread().name 170 elif self._semlock._get_value() == 1: 171 name = 'None' 172 elif self._semlock._count() > 0: 173 name = 'SomeOtherThread' 174 else: 175 name = 'SomeOtherProcess' 176 except Exception: 177 name = 'unknown' 178 return '<%s(owner=%s)>' % (self.__class__.__name__, name) 179 180# 181# Recursive lock 182# 183 184class RLock(SemLock): 185 186 def __init__(self, *, ctx): 187 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx) 188 189 def __repr__(self): 190 try: 191 if self._semlock._is_mine(): 192 name = process.current_process().name 193 if threading.current_thread().name != 'MainThread': 194 name += '|' + threading.current_thread().name 195 count = self._semlock._count() 196 elif self._semlock._get_value() == 1: 197 name, count = 'None', 0 198 elif self._semlock._count() > 0: 199 name, count = 'SomeOtherThread', 'nonzero' 200 else: 201 name, count = 'SomeOtherProcess', 'nonzero' 202 except Exception: 203 name, count = 'unknown', 'unknown' 204 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count) 205 206# 207# Condition variable 208# 209 210class Condition(object): 211 212 def __init__(self, lock=None, *, ctx): 213 self._lock = lock or ctx.RLock() 214 self._sleeping_count = ctx.Semaphore(0) 215 self._woken_count = ctx.Semaphore(0) 216 self._wait_semaphore = ctx.Semaphore(0) 217 self._make_methods() 218 219 def __getstate__(self): 220 context.assert_spawning(self) 221 return (self._lock, self._sleeping_count, 222 self._woken_count, self._wait_semaphore) 223 224 def __setstate__(self, state): 225 (self._lock, self._sleeping_count, 226 self._woken_count, self._wait_semaphore) = state 227 self._make_methods() 228 229 def __enter__(self): 230 return self._lock.__enter__() 231 232 def __exit__(self, *args): 233 return self._lock.__exit__(*args) 234 235 def _make_methods(self): 236 self.acquire = self._lock.acquire 237 self.release = self._lock.release 238 239 def __repr__(self): 240 try: 241 num_waiters = (self._sleeping_count._semlock._get_value() - 242 self._woken_count._semlock._get_value()) 243 except Exception: 244 num_waiters = 'unknown' 245 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters) 246 247 def wait(self, timeout=None): 248 assert self._lock._semlock._is_mine(), \ 249 'must acquire() condition before using wait()' 250 251 # indicate that this thread is going to sleep 252 self._sleeping_count.release() 253 254 # release lock 255 count = self._lock._semlock._count() 256 for i in range(count): 257 self._lock.release() 258 259 try: 260 # wait for notification or timeout 261 return self._wait_semaphore.acquire(True, timeout) 262 finally: 263 # indicate that this thread has woken 264 self._woken_count.release() 265 266 # reacquire lock 267 for i in range(count): 268 self._lock.acquire() 269 270 def notify(self, n=1): 271 assert self._lock._semlock._is_mine(), 'lock is not owned' 272 assert not self._wait_semaphore.acquire( 273 False), ('notify: Should not have been able to acquire ' 274 + '_wait_semaphore') 275 276 # to take account of timeouts since last notify*() we subtract 277 # woken_count from sleeping_count and rezero woken_count 278 while self._woken_count.acquire(False): 279 res = self._sleeping_count.acquire(False) 280 assert res, ('notify: Bug in sleeping_count.acquire' 281 + '- res should not be False') 282 283 sleepers = 0 284 while sleepers < n and self._sleeping_count.acquire(False): 285 self._wait_semaphore.release() # wake up one sleeper 286 sleepers += 1 287 288 if sleepers: 289 for i in range(sleepers): 290 self._woken_count.acquire() # wait for a sleeper to wake 291 292 # rezero wait_semaphore in case some timeouts just happened 293 while self._wait_semaphore.acquire(False): 294 pass 295 296 def notify_all(self): 297 self.notify(n=sys.maxsize) 298 299 def wait_for(self, predicate, timeout=None): 300 result = predicate() 301 if result: 302 return result 303 if timeout is not None: 304 endtime = time.monotonic() + timeout 305 else: 306 endtime = None 307 waittime = None 308 while not result: 309 if endtime is not None: 310 waittime = endtime - time.monotonic() 311 if waittime <= 0: 312 break 313 self.wait(waittime) 314 result = predicate() 315 return result 316 317# 318# Event 319# 320 321class Event(object): 322 323 def __init__(self, *, ctx): 324 self._cond = ctx.Condition(ctx.Lock()) 325 self._flag = ctx.Semaphore(0) 326 327 def is_set(self): 328 with self._cond: 329 if self._flag.acquire(False): 330 self._flag.release() 331 return True 332 return False 333 334 def set(self): 335 with self._cond: 336 self._flag.acquire(False) 337 self._flag.release() 338 self._cond.notify_all() 339 340 def clear(self): 341 with self._cond: 342 self._flag.acquire(False) 343 344 def wait(self, timeout=None): 345 with self._cond: 346 if self._flag.acquire(False): 347 self._flag.release() 348 else: 349 self._cond.wait(timeout) 350 351 if self._flag.acquire(False): 352 self._flag.release() 353 return True 354 return False 355 356# 357# Barrier 358# 359 360class Barrier(threading.Barrier): 361 362 def __init__(self, parties, action=None, timeout=None, *, ctx): 363 import struct 364 from .heap import BufferWrapper 365 wrapper = BufferWrapper(struct.calcsize('i') * 2) 366 cond = ctx.Condition() 367 self.__setstate__((parties, action, timeout, cond, wrapper)) 368 self._state = 0 369 self._count = 0 370 371 def __setstate__(self, state): 372 (self._parties, self._action, self._timeout, 373 self._cond, self._wrapper) = state 374 self._array = self._wrapper.create_memoryview().cast('i') 375 376 def __getstate__(self): 377 return (self._parties, self._action, self._timeout, 378 self._cond, self._wrapper) 379 380 @property 381 def _state(self): 382 return self._array[0] 383 384 @_state.setter 385 def _state(self, value): 386 self._array[0] = value 387 388 @property 389 def _count(self): 390 return self._array[1] 391 392 @_count.setter 393 def _count(self, value): 394 self._array[1] = value 395