• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 """Synchronization primitives."""
2 
3 __all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
4 
5 import collections
6 import types
7 import warnings
8 
9 from . import events
10 from . import futures
11 from . import exceptions
12 from .import coroutines
13 
14 
15 class _ContextManager:
16     """Context manager.
17 
18     This enables the following idiom for acquiring and releasing a
19     lock around a block:
20 
21         with (yield from lock):
22             <block>
23 
24     while failing loudly when accidentally using:
25 
26         with lock:
27             <block>
28 
29     Deprecated, use 'async with' statement:
30         async with lock:
31             <block>
32     """
33 
34     def __init__(self, lock):
35         self._lock = lock
36 
37     def __enter__(self):
38         # We have no use for the "as ..."  clause in the with
39         # statement for locks.
40         return None
41 
42     def __exit__(self, *args):
43         try:
44             self._lock.release()
45         finally:
46             self._lock = None  # Crudely prevent reuse.
47 
48 
49 class _ContextManagerMixin:
50     def __enter__(self):
51         raise RuntimeError(
52             '"yield from" should be used as context manager expression')
53 
54     def __exit__(self, *args):
55         # This must exist because __enter__ exists, even though that
56         # always raises; that's how the with-statement works.
57         pass
58 
59     @types.coroutine
60     def __iter__(self):
61         # This is not a coroutine.  It is meant to enable the idiom:
62         #
63         #     with (yield from lock):
64         #         <block>
65         #
66         # as an alternative to:
67         #
68         #     yield from lock.acquire()
69         #     try:
70         #         <block>
71         #     finally:
72         #         lock.release()
73         # Deprecated, use 'async with' statement:
74         #     async with lock:
75         #         <block>
76         warnings.warn("'with (yield from lock)' is deprecated "
77                       "use 'async with lock' instead",
78                       DeprecationWarning, stacklevel=2)
79         yield from self.acquire()
80         return _ContextManager(self)
81 
82     # The flag is needed for legacy asyncio.iscoroutine()
83     __iter__._is_coroutine = coroutines._is_coroutine
84 
85     async def __acquire_ctx(self):
86         await self.acquire()
87         return _ContextManager(self)
88 
89     def __await__(self):
90         warnings.warn("'with await lock' is deprecated "
91                       "use 'async with lock' instead",
92                       DeprecationWarning, stacklevel=2)
93         # To make "with await lock" work.
94         return self.__acquire_ctx().__await__()
95 
96     async def __aenter__(self):
97         await self.acquire()
98         # We have no use for the "as ..."  clause in the with
99         # statement for locks.
100         return None
101 
102     async def __aexit__(self, exc_type, exc, tb):
103         self.release()
104 
105 
106 class Lock(_ContextManagerMixin):
107     """Primitive lock objects.
108 
109     A primitive lock is a synchronization primitive that is not owned
110     by a particular coroutine when locked.  A primitive lock is in one
111     of two states, 'locked' or 'unlocked'.
112 
113     It is created in the unlocked state.  It has two basic methods,
114     acquire() and release().  When the state is unlocked, acquire()
115     changes the state to locked and returns immediately.  When the
116     state is locked, acquire() blocks until a call to release() in
117     another coroutine changes it to unlocked, then the acquire() call
118     resets it to locked and returns.  The release() method should only
119     be called in the locked state; it changes the state to unlocked
120     and returns immediately.  If an attempt is made to release an
121     unlocked lock, a RuntimeError will be raised.
122 
123     When more than one coroutine is blocked in acquire() waiting for
124     the state to turn to unlocked, only one coroutine proceeds when a
125     release() call resets the state to unlocked; first coroutine which
126     is blocked in acquire() is being processed.
127 
128     acquire() is a coroutine and should be called with 'await'.
129 
130     Locks also support the asynchronous context management protocol.
131     'async with lock' statement should be used.
132 
133     Usage:
134 
135         lock = Lock()
136         ...
137         await lock.acquire()
138         try:
139             ...
140         finally:
141             lock.release()
142 
143     Context manager usage:
144 
145         lock = Lock()
146         ...
147         async with lock:
148              ...
149 
150     Lock objects can be tested for locking state:
151 
152         if not lock.locked():
153            await lock.acquire()
154         else:
155            # lock is acquired
156            ...
157 
158     """
159 
160     def __init__(self, *, loop=None):
161         self._waiters = None
162         self._locked = False
163         if loop is None:
164             self._loop = events.get_event_loop()
165         else:
166             self._loop = loop
167             warnings.warn("The loop argument is deprecated since Python 3.8, "
168                           "and scheduled for removal in Python 3.10.",
169                           DeprecationWarning, stacklevel=2)
170 
171     def __repr__(self):
172         res = super().__repr__()
173         extra = 'locked' if self._locked else 'unlocked'
174         if self._waiters:
175             extra = f'{extra}, waiters:{len(self._waiters)}'
176         return f'<{res[1:-1]} [{extra}]>'
177 
178     def locked(self):
179         """Return True if lock is acquired."""
180         return self._locked
181 
182     async def acquire(self):
183         """Acquire a lock.
184 
185         This method blocks until the lock is unlocked, then sets it to
186         locked and returns True.
187         """
188         if (not self._locked and (self._waiters is None or
189                 all(w.cancelled() for w in self._waiters))):
190             self._locked = True
191             return True
192 
193         if self._waiters is None:
194             self._waiters = collections.deque()
195         fut = self._loop.create_future()
196         self._waiters.append(fut)
197 
198         # Finally block should be called before the CancelledError
199         # handling as we don't want CancelledError to call
200         # _wake_up_first() and attempt to wake up itself.
201         try:
202             try:
203                 await fut
204             finally:
205                 self._waiters.remove(fut)
206         except exceptions.CancelledError:
207             if not self._locked:
208                 self._wake_up_first()
209             raise
210 
211         self._locked = True
212         return True
213 
214     def release(self):
215         """Release a lock.
216 
217         When the lock is locked, reset it to unlocked, and return.
218         If any other coroutines are blocked waiting for the lock to become
219         unlocked, allow exactly one of them to proceed.
220 
221         When invoked on an unlocked lock, a RuntimeError is raised.
222 
223         There is no return value.
224         """
225         if self._locked:
226             self._locked = False
227             self._wake_up_first()
228         else:
229             raise RuntimeError('Lock is not acquired.')
230 
231     def _wake_up_first(self):
232         """Wake up the first waiter if it isn't done."""
233         if not self._waiters:
234             return
235         try:
236             fut = next(iter(self._waiters))
237         except StopIteration:
238             return
239 
240         # .done() necessarily means that a waiter will wake up later on and
241         # either take the lock, or, if it was cancelled and lock wasn't
242         # taken already, will hit this again and wake up a new waiter.
243         if not fut.done():
244             fut.set_result(True)
245 
246 
247 class Event:
248     """Asynchronous equivalent to threading.Event.
249 
250     Class implementing event objects. An event manages a flag that can be set
251     to true with the set() method and reset to false with the clear() method.
252     The wait() method blocks until the flag is true. The flag is initially
253     false.
254     """
255 
256     def __init__(self, *, loop=None):
257         self._waiters = collections.deque()
258         self._value = False
259         if loop is None:
260             self._loop = events.get_event_loop()
261         else:
262             self._loop = loop
263             warnings.warn("The loop argument is deprecated since Python 3.8, "
264                           "and scheduled for removal in Python 3.10.",
265                           DeprecationWarning, stacklevel=2)
266 
267     def __repr__(self):
268         res = super().__repr__()
269         extra = 'set' if self._value else 'unset'
270         if self._waiters:
271             extra = f'{extra}, waiters:{len(self._waiters)}'
272         return f'<{res[1:-1]} [{extra}]>'
273 
274     def is_set(self):
275         """Return True if and only if the internal flag is true."""
276         return self._value
277 
278     def set(self):
279         """Set the internal flag to true. All coroutines waiting for it to
280         become true are awakened. Coroutine that call wait() once the flag is
281         true will not block at all.
282         """
283         if not self._value:
284             self._value = True
285 
286             for fut in self._waiters:
287                 if not fut.done():
288                     fut.set_result(True)
289 
290     def clear(self):
291         """Reset the internal flag to false. Subsequently, coroutines calling
292         wait() will block until set() is called to set the internal flag
293         to true again."""
294         self._value = False
295 
296     async def wait(self):
297         """Block until the internal flag is true.
298 
299         If the internal flag is true on entry, return True
300         immediately.  Otherwise, block until another coroutine calls
301         set() to set the flag to true, then return True.
302         """
303         if self._value:
304             return True
305 
306         fut = self._loop.create_future()
307         self._waiters.append(fut)
308         try:
309             await fut
310             return True
311         finally:
312             self._waiters.remove(fut)
313 
314 
315 class Condition(_ContextManagerMixin):
316     """Asynchronous equivalent to threading.Condition.
317 
318     This class implements condition variable objects. A condition variable
319     allows one or more coroutines to wait until they are notified by another
320     coroutine.
321 
322     A new Lock object is created and used as the underlying lock.
323     """
324 
325     def __init__(self, lock=None, *, loop=None):
326         if loop is None:
327             self._loop = events.get_event_loop()
328         else:
329             self._loop = loop
330             warnings.warn("The loop argument is deprecated since Python 3.8, "
331                           "and scheduled for removal in Python 3.10.",
332                           DeprecationWarning, stacklevel=2)
333 
334         if lock is None:
335             lock = Lock(loop=loop)
336         elif lock._loop is not self._loop:
337             raise ValueError("loop argument must agree with lock")
338 
339         self._lock = lock
340         # Export the lock's locked(), acquire() and release() methods.
341         self.locked = lock.locked
342         self.acquire = lock.acquire
343         self.release = lock.release
344 
345         self._waiters = collections.deque()
346 
347     def __repr__(self):
348         res = super().__repr__()
349         extra = 'locked' if self.locked() else 'unlocked'
350         if self._waiters:
351             extra = f'{extra}, waiters:{len(self._waiters)}'
352         return f'<{res[1:-1]} [{extra}]>'
353 
354     async def wait(self):
355         """Wait until notified.
356 
357         If the calling coroutine has not acquired the lock when this
358         method is called, a RuntimeError is raised.
359 
360         This method releases the underlying lock, and then blocks
361         until it is awakened by a notify() or notify_all() call for
362         the same condition variable in another coroutine.  Once
363         awakened, it re-acquires the lock and returns True.
364         """
365         if not self.locked():
366             raise RuntimeError('cannot wait on un-acquired lock')
367 
368         self.release()
369         try:
370             fut = self._loop.create_future()
371             self._waiters.append(fut)
372             try:
373                 await fut
374                 return True
375             finally:
376                 self._waiters.remove(fut)
377 
378         finally:
379             # Must reacquire lock even if wait is cancelled
380             cancelled = False
381             while True:
382                 try:
383                     await self.acquire()
384                     break
385                 except exceptions.CancelledError:
386                     cancelled = True
387 
388             if cancelled:
389                 raise exceptions.CancelledError
390 
391     async def wait_for(self, predicate):
392         """Wait until a predicate becomes true.
393 
394         The predicate should be a callable which result will be
395         interpreted as a boolean value.  The final predicate value is
396         the return value.
397         """
398         result = predicate()
399         while not result:
400             await self.wait()
401             result = predicate()
402         return result
403 
404     def notify(self, n=1):
405         """By default, wake up one coroutine waiting on this condition, if any.
406         If the calling coroutine has not acquired the lock when this method
407         is called, a RuntimeError is raised.
408 
409         This method wakes up at most n of the coroutines waiting for the
410         condition variable; it is a no-op if no coroutines are waiting.
411 
412         Note: an awakened coroutine does not actually return from its
413         wait() call until it can reacquire the lock. Since notify() does
414         not release the lock, its caller should.
415         """
416         if not self.locked():
417             raise RuntimeError('cannot notify on un-acquired lock')
418 
419         idx = 0
420         for fut in self._waiters:
421             if idx >= n:
422                 break
423 
424             if not fut.done():
425                 idx += 1
426                 fut.set_result(False)
427 
428     def notify_all(self):
429         """Wake up all threads waiting on this condition. This method acts
430         like notify(), but wakes up all waiting threads instead of one. If the
431         calling thread has not acquired the lock when this method is called,
432         a RuntimeError is raised.
433         """
434         self.notify(len(self._waiters))
435 
436 
437 class Semaphore(_ContextManagerMixin):
438     """A Semaphore implementation.
439 
440     A semaphore manages an internal counter which is decremented by each
441     acquire() call and incremented by each release() call. The counter
442     can never go below zero; when acquire() finds that it is zero, it blocks,
443     waiting until some other thread calls release().
444 
445     Semaphores also support the context management protocol.
446 
447     The optional argument gives the initial value for the internal
448     counter; it defaults to 1. If the value given is less than 0,
449     ValueError is raised.
450     """
451 
452     def __init__(self, value=1, *, loop=None):
453         if value < 0:
454             raise ValueError("Semaphore initial value must be >= 0")
455         self._value = value
456         self._waiters = collections.deque()
457         if loop is None:
458             self._loop = events.get_event_loop()
459         else:
460             self._loop = loop
461             warnings.warn("The loop argument is deprecated since Python 3.8, "
462                           "and scheduled for removal in Python 3.10.",
463                           DeprecationWarning, stacklevel=2)
464 
465     def __repr__(self):
466         res = super().__repr__()
467         extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
468         if self._waiters:
469             extra = f'{extra}, waiters:{len(self._waiters)}'
470         return f'<{res[1:-1]} [{extra}]>'
471 
472     def _wake_up_next(self):
473         while self._waiters:
474             waiter = self._waiters.popleft()
475             if not waiter.done():
476                 waiter.set_result(None)
477                 return
478 
479     def locked(self):
480         """Returns True if semaphore can not be acquired immediately."""
481         return self._value == 0
482 
483     async def acquire(self):
484         """Acquire a semaphore.
485 
486         If the internal counter is larger than zero on entry,
487         decrement it by one and return True immediately.  If it is
488         zero on entry, block, waiting until some other coroutine has
489         called release() to make it larger than 0, and then return
490         True.
491         """
492         while self._value <= 0:
493             fut = self._loop.create_future()
494             self._waiters.append(fut)
495             try:
496                 await fut
497             except:
498                 # See the similar code in Queue.get.
499                 fut.cancel()
500                 if self._value > 0 and not fut.cancelled():
501                     self._wake_up_next()
502                 raise
503         self._value -= 1
504         return True
505 
506     def release(self):
507         """Release a semaphore, incrementing the internal counter by one.
508         When it was zero on entry and another coroutine is waiting for it to
509         become larger than zero again, wake up that coroutine.
510         """
511         self._value += 1
512         self._wake_up_next()
513 
514 
515 class BoundedSemaphore(Semaphore):
516     """A bounded semaphore implementation.
517 
518     This raises ValueError in release() if it would increase the value
519     above the initial value.
520     """
521 
522     def __init__(self, value=1, *, loop=None):
523         if loop:
524             warnings.warn("The loop argument is deprecated since Python 3.8, "
525                           "and scheduled for removal in Python 3.10.",
526                           DeprecationWarning, stacklevel=2)
527 
528         self._bound_value = value
529         super().__init__(value, loop=loop)
530 
531     def release(self):
532         if self._value >= self._bound_value:
533             raise ValueError('BoundedSemaphore released too many times')
534         super().release()
535