1"""Synchronization primitives.""" 2 3__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore') 4 5import collections 6import warnings 7 8from . import events 9from . import exceptions 10 11 12class _ContextManagerMixin: 13 async def __aenter__(self): 14 await self.acquire() 15 # We have no use for the "as ..." clause in the with 16 # statement for locks. 17 return None 18 19 async def __aexit__(self, exc_type, exc, tb): 20 self.release() 21 22 23class Lock(_ContextManagerMixin): 24 """Primitive lock objects. 25 26 A primitive lock is a synchronization primitive that is not owned 27 by a particular coroutine when locked. A primitive lock is in one 28 of two states, 'locked' or 'unlocked'. 29 30 It is created in the unlocked state. It has two basic methods, 31 acquire() and release(). When the state is unlocked, acquire() 32 changes the state to locked and returns immediately. When the 33 state is locked, acquire() blocks until a call to release() in 34 another coroutine changes it to unlocked, then the acquire() call 35 resets it to locked and returns. The release() method should only 36 be called in the locked state; it changes the state to unlocked 37 and returns immediately. If an attempt is made to release an 38 unlocked lock, a RuntimeError will be raised. 39 40 When more than one coroutine is blocked in acquire() waiting for 41 the state to turn to unlocked, only one coroutine proceeds when a 42 release() call resets the state to unlocked; first coroutine which 43 is blocked in acquire() is being processed. 44 45 acquire() is a coroutine and should be called with 'await'. 46 47 Locks also support the asynchronous context management protocol. 48 'async with lock' statement should be used. 49 50 Usage: 51 52 lock = Lock() 53 ... 54 await lock.acquire() 55 try: 56 ... 57 finally: 58 lock.release() 59 60 Context manager usage: 61 62 lock = Lock() 63 ... 64 async with lock: 65 ... 66 67 Lock objects can be tested for locking state: 68 69 if not lock.locked(): 70 await lock.acquire() 71 else: 72 # lock is acquired 73 ... 74 75 """ 76 77 def __init__(self, *, loop=None): 78 self._waiters = None 79 self._locked = False 80 if loop is None: 81 self._loop = events.get_event_loop() 82 else: 83 self._loop = loop 84 warnings.warn("The loop argument is deprecated since Python 3.8, " 85 "and scheduled for removal in Python 3.10.", 86 DeprecationWarning, stacklevel=2) 87 88 def __repr__(self): 89 res = super().__repr__() 90 extra = 'locked' if self._locked else 'unlocked' 91 if self._waiters: 92 extra = f'{extra}, waiters:{len(self._waiters)}' 93 return f'<{res[1:-1]} [{extra}]>' 94 95 def locked(self): 96 """Return True if lock is acquired.""" 97 return self._locked 98 99 async def acquire(self): 100 """Acquire a lock. 101 102 This method blocks until the lock is unlocked, then sets it to 103 locked and returns True. 104 """ 105 if (not self._locked and (self._waiters is None or 106 all(w.cancelled() for w in self._waiters))): 107 self._locked = True 108 return True 109 110 if self._waiters is None: 111 self._waiters = collections.deque() 112 fut = self._loop.create_future() 113 self._waiters.append(fut) 114 115 # Finally block should be called before the CancelledError 116 # handling as we don't want CancelledError to call 117 # _wake_up_first() and attempt to wake up itself. 118 try: 119 try: 120 await fut 121 finally: 122 self._waiters.remove(fut) 123 except exceptions.CancelledError: 124 if not self._locked: 125 self._wake_up_first() 126 raise 127 128 self._locked = True 129 return True 130 131 def release(self): 132 """Release a lock. 133 134 When the lock is locked, reset it to unlocked, and return. 135 If any other coroutines are blocked waiting for the lock to become 136 unlocked, allow exactly one of them to proceed. 137 138 When invoked on an unlocked lock, a RuntimeError is raised. 139 140 There is no return value. 141 """ 142 if self._locked: 143 self._locked = False 144 self._wake_up_first() 145 else: 146 raise RuntimeError('Lock is not acquired.') 147 148 def _wake_up_first(self): 149 """Wake up the first waiter if it isn't done.""" 150 if not self._waiters: 151 return 152 try: 153 fut = next(iter(self._waiters)) 154 except StopIteration: 155 return 156 157 # .done() necessarily means that a waiter will wake up later on and 158 # either take the lock, or, if it was cancelled and lock wasn't 159 # taken already, will hit this again and wake up a new waiter. 160 if not fut.done(): 161 fut.set_result(True) 162 163 164class Event: 165 """Asynchronous equivalent to threading.Event. 166 167 Class implementing event objects. An event manages a flag that can be set 168 to true with the set() method and reset to false with the clear() method. 169 The wait() method blocks until the flag is true. The flag is initially 170 false. 171 """ 172 173 def __init__(self, *, loop=None): 174 self._waiters = collections.deque() 175 self._value = False 176 if loop is None: 177 self._loop = events.get_event_loop() 178 else: 179 self._loop = loop 180 warnings.warn("The loop argument is deprecated since Python 3.8, " 181 "and scheduled for removal in Python 3.10.", 182 DeprecationWarning, stacklevel=2) 183 184 def __repr__(self): 185 res = super().__repr__() 186 extra = 'set' if self._value else 'unset' 187 if self._waiters: 188 extra = f'{extra}, waiters:{len(self._waiters)}' 189 return f'<{res[1:-1]} [{extra}]>' 190 191 def is_set(self): 192 """Return True if and only if the internal flag is true.""" 193 return self._value 194 195 def set(self): 196 """Set the internal flag to true. All coroutines waiting for it to 197 become true are awakened. Coroutine that call wait() once the flag is 198 true will not block at all. 199 """ 200 if not self._value: 201 self._value = True 202 203 for fut in self._waiters: 204 if not fut.done(): 205 fut.set_result(True) 206 207 def clear(self): 208 """Reset the internal flag to false. Subsequently, coroutines calling 209 wait() will block until set() is called to set the internal flag 210 to true again.""" 211 self._value = False 212 213 async def wait(self): 214 """Block until the internal flag is true. 215 216 If the internal flag is true on entry, return True 217 immediately. Otherwise, block until another coroutine calls 218 set() to set the flag to true, then return True. 219 """ 220 if self._value: 221 return True 222 223 fut = self._loop.create_future() 224 self._waiters.append(fut) 225 try: 226 await fut 227 return True 228 finally: 229 self._waiters.remove(fut) 230 231 232class Condition(_ContextManagerMixin): 233 """Asynchronous equivalent to threading.Condition. 234 235 This class implements condition variable objects. A condition variable 236 allows one or more coroutines to wait until they are notified by another 237 coroutine. 238 239 A new Lock object is created and used as the underlying lock. 240 """ 241 242 def __init__(self, lock=None, *, loop=None): 243 if loop is None: 244 self._loop = events.get_event_loop() 245 else: 246 self._loop = loop 247 warnings.warn("The loop argument is deprecated since Python 3.8, " 248 "and scheduled for removal in Python 3.10.", 249 DeprecationWarning, stacklevel=2) 250 251 if lock is None: 252 lock = Lock(loop=loop) 253 elif lock._loop is not self._loop: 254 raise ValueError("loop argument must agree with lock") 255 256 self._lock = lock 257 # Export the lock's locked(), acquire() and release() methods. 258 self.locked = lock.locked 259 self.acquire = lock.acquire 260 self.release = lock.release 261 262 self._waiters = collections.deque() 263 264 def __repr__(self): 265 res = super().__repr__() 266 extra = 'locked' if self.locked() else 'unlocked' 267 if self._waiters: 268 extra = f'{extra}, waiters:{len(self._waiters)}' 269 return f'<{res[1:-1]} [{extra}]>' 270 271 async def wait(self): 272 """Wait until notified. 273 274 If the calling coroutine has not acquired the lock when this 275 method is called, a RuntimeError is raised. 276 277 This method releases the underlying lock, and then blocks 278 until it is awakened by a notify() or notify_all() call for 279 the same condition variable in another coroutine. Once 280 awakened, it re-acquires the lock and returns True. 281 """ 282 if not self.locked(): 283 raise RuntimeError('cannot wait on un-acquired lock') 284 285 self.release() 286 try: 287 fut = self._loop.create_future() 288 self._waiters.append(fut) 289 try: 290 await fut 291 return True 292 finally: 293 self._waiters.remove(fut) 294 295 finally: 296 # Must reacquire lock even if wait is cancelled 297 cancelled = False 298 while True: 299 try: 300 await self.acquire() 301 break 302 except exceptions.CancelledError: 303 cancelled = True 304 305 if cancelled: 306 raise exceptions.CancelledError 307 308 async def wait_for(self, predicate): 309 """Wait until a predicate becomes true. 310 311 The predicate should be a callable which result will be 312 interpreted as a boolean value. The final predicate value is 313 the return value. 314 """ 315 result = predicate() 316 while not result: 317 await self.wait() 318 result = predicate() 319 return result 320 321 def notify(self, n=1): 322 """By default, wake up one coroutine waiting on this condition, if any. 323 If the calling coroutine has not acquired the lock when this method 324 is called, a RuntimeError is raised. 325 326 This method wakes up at most n of the coroutines waiting for the 327 condition variable; it is a no-op if no coroutines are waiting. 328 329 Note: an awakened coroutine does not actually return from its 330 wait() call until it can reacquire the lock. Since notify() does 331 not release the lock, its caller should. 332 """ 333 if not self.locked(): 334 raise RuntimeError('cannot notify on un-acquired lock') 335 336 idx = 0 337 for fut in self._waiters: 338 if idx >= n: 339 break 340 341 if not fut.done(): 342 idx += 1 343 fut.set_result(False) 344 345 def notify_all(self): 346 """Wake up all threads waiting on this condition. This method acts 347 like notify(), but wakes up all waiting threads instead of one. If the 348 calling thread has not acquired the lock when this method is called, 349 a RuntimeError is raised. 350 """ 351 self.notify(len(self._waiters)) 352 353 354class Semaphore(_ContextManagerMixin): 355 """A Semaphore implementation. 356 357 A semaphore manages an internal counter which is decremented by each 358 acquire() call and incremented by each release() call. The counter 359 can never go below zero; when acquire() finds that it is zero, it blocks, 360 waiting until some other thread calls release(). 361 362 Semaphores also support the context management protocol. 363 364 The optional argument gives the initial value for the internal 365 counter; it defaults to 1. If the value given is less than 0, 366 ValueError is raised. 367 """ 368 369 def __init__(self, value=1, *, loop=None): 370 if value < 0: 371 raise ValueError("Semaphore initial value must be >= 0") 372 self._value = value 373 self._waiters = collections.deque() 374 if loop is None: 375 self._loop = events.get_event_loop() 376 else: 377 self._loop = loop 378 warnings.warn("The loop argument is deprecated since Python 3.8, " 379 "and scheduled for removal in Python 3.10.", 380 DeprecationWarning, stacklevel=2) 381 382 def __repr__(self): 383 res = super().__repr__() 384 extra = 'locked' if self.locked() else f'unlocked, value:{self._value}' 385 if self._waiters: 386 extra = f'{extra}, waiters:{len(self._waiters)}' 387 return f'<{res[1:-1]} [{extra}]>' 388 389 def _wake_up_next(self): 390 while self._waiters: 391 waiter = self._waiters.popleft() 392 if not waiter.done(): 393 waiter.set_result(None) 394 return 395 396 def locked(self): 397 """Returns True if semaphore can not be acquired immediately.""" 398 return self._value == 0 399 400 async def acquire(self): 401 """Acquire a semaphore. 402 403 If the internal counter is larger than zero on entry, 404 decrement it by one and return True immediately. If it is 405 zero on entry, block, waiting until some other coroutine has 406 called release() to make it larger than 0, and then return 407 True. 408 """ 409 while self._value <= 0: 410 fut = self._loop.create_future() 411 self._waiters.append(fut) 412 try: 413 await fut 414 except: 415 # See the similar code in Queue.get. 416 fut.cancel() 417 if self._value > 0 and not fut.cancelled(): 418 self._wake_up_next() 419 raise 420 self._value -= 1 421 return True 422 423 def release(self): 424 """Release a semaphore, incrementing the internal counter by one. 425 When it was zero on entry and another coroutine is waiting for it to 426 become larger than zero again, wake up that coroutine. 427 """ 428 self._value += 1 429 self._wake_up_next() 430 431 432class BoundedSemaphore(Semaphore): 433 """A bounded semaphore implementation. 434 435 This raises ValueError in release() if it would increase the value 436 above the initial value. 437 """ 438 439 def __init__(self, value=1, *, loop=None): 440 if loop: 441 warnings.warn("The loop argument is deprecated since Python 3.8, " 442 "and scheduled for removal in Python 3.10.", 443 DeprecationWarning, stacklevel=2) 444 445 self._bound_value = value 446 super().__init__(value, loop=loop) 447 448 def release(self): 449 if self._value >= self._bound_value: 450 raise ValueError('BoundedSemaphore released too many times') 451 super().release() 452