• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Synchronization primitives."""
2
3__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
4
5import collections
6
7from . import compat
8from . import events
9from . import futures
10from .coroutines import coroutine
11
12
13class _ContextManager:
14    """Context manager.
15
16    This enables the following idiom for acquiring and releasing a
17    lock around a block:
18
19        with (yield from lock):
20            <block>
21
22    while failing loudly when accidentally using:
23
24        with lock:
25            <block>
26    """
27
28    def __init__(self, lock):
29        self._lock = lock
30
31    def __enter__(self):
32        # We have no use for the "as ..."  clause in the with
33        # statement for locks.
34        return None
35
36    def __exit__(self, *args):
37        try:
38            self._lock.release()
39        finally:
40            self._lock = None  # Crudely prevent reuse.
41
42
43class _ContextManagerMixin:
44    def __enter__(self):
45        raise RuntimeError(
46            '"yield from" should be used as context manager expression')
47
48    def __exit__(self, *args):
49        # This must exist because __enter__ exists, even though that
50        # always raises; that's how the with-statement works.
51        pass
52
53    @coroutine
54    def __iter__(self):
55        # This is not a coroutine.  It is meant to enable the idiom:
56        #
57        #     with (yield from lock):
58        #         <block>
59        #
60        # as an alternative to:
61        #
62        #     yield from lock.acquire()
63        #     try:
64        #         <block>
65        #     finally:
66        #         lock.release()
67        yield from self.acquire()
68        return _ContextManager(self)
69
70    if compat.PY35:
71
72        def __await__(self):
73            # To make "with await lock" work.
74            yield from self.acquire()
75            return _ContextManager(self)
76
77        @coroutine
78        def __aenter__(self):
79            yield from self.acquire()
80            # We have no use for the "as ..."  clause in the with
81            # statement for locks.
82            return None
83
84        @coroutine
85        def __aexit__(self, exc_type, exc, tb):
86            self.release()
87
88
89class Lock(_ContextManagerMixin):
90    """Primitive lock objects.
91
92    A primitive lock is a synchronization primitive that is not owned
93    by a particular coroutine when locked.  A primitive lock is in one
94    of two states, 'locked' or 'unlocked'.
95
96    It is created in the unlocked state.  It has two basic methods,
97    acquire() and release().  When the state is unlocked, acquire()
98    changes the state to locked and returns immediately.  When the
99    state is locked, acquire() blocks until a call to release() in
100    another coroutine changes it to unlocked, then the acquire() call
101    resets it to locked and returns.  The release() method should only
102    be called in the locked state; it changes the state to unlocked
103    and returns immediately.  If an attempt is made to release an
104    unlocked lock, a RuntimeError will be raised.
105
106    When more than one coroutine is blocked in acquire() waiting for
107    the state to turn to unlocked, only one coroutine proceeds when a
108    release() call resets the state to unlocked; first coroutine which
109    is blocked in acquire() is being processed.
110
111    acquire() is a coroutine and should be called with 'yield from'.
112
113    Locks also support the context management protocol.  '(yield from lock)'
114    should be used as the context manager expression.
115
116    Usage:
117
118        lock = Lock()
119        ...
120        yield from lock
121        try:
122            ...
123        finally:
124            lock.release()
125
126    Context manager usage:
127
128        lock = Lock()
129        ...
130        with (yield from lock):
131             ...
132
133    Lock objects can be tested for locking state:
134
135        if not lock.locked():
136           yield from lock
137        else:
138           # lock is acquired
139           ...
140
141    """
142
143    def __init__(self, *, loop=None):
144        self._waiters = collections.deque()
145        self._locked = False
146        if loop is not None:
147            self._loop = loop
148        else:
149            self._loop = events.get_event_loop()
150
151    def __repr__(self):
152        res = super().__repr__()
153        extra = 'locked' if self._locked else 'unlocked'
154        if self._waiters:
155            extra = '{},waiters:{}'.format(extra, len(self._waiters))
156        return '<{} [{}]>'.format(res[1:-1], extra)
157
158    def locked(self):
159        """Return True if lock is acquired."""
160        return self._locked
161
162    @coroutine
163    def acquire(self):
164        """Acquire a lock.
165
166        This method blocks until the lock is unlocked, then sets it to
167        locked and returns True.
168        """
169        if not self._locked and all(w.cancelled() for w in self._waiters):
170            self._locked = True
171            return True
172
173        fut = self._loop.create_future()
174        self._waiters.append(fut)
175        try:
176            yield from fut
177            self._locked = True
178            return True
179        finally:
180            self._waiters.remove(fut)
181
182    def release(self):
183        """Release a lock.
184
185        When the lock is locked, reset it to unlocked, and return.
186        If any other coroutines are blocked waiting for the lock to become
187        unlocked, allow exactly one of them to proceed.
188
189        When invoked on an unlocked lock, a RuntimeError is raised.
190
191        There is no return value.
192        """
193        if self._locked:
194            self._locked = False
195            # Wake up the first waiter who isn't cancelled.
196            for fut in self._waiters:
197                if not fut.done():
198                    fut.set_result(True)
199                    break
200        else:
201            raise RuntimeError('Lock is not acquired.')
202
203
204class Event:
205    """Asynchronous equivalent to threading.Event.
206
207    Class implementing event objects. An event manages a flag that can be set
208    to true with the set() method and reset to false with the clear() method.
209    The wait() method blocks until the flag is true. The flag is initially
210    false.
211    """
212
213    def __init__(self, *, loop=None):
214        self._waiters = collections.deque()
215        self._value = False
216        if loop is not None:
217            self._loop = loop
218        else:
219            self._loop = events.get_event_loop()
220
221    def __repr__(self):
222        res = super().__repr__()
223        extra = 'set' if self._value else 'unset'
224        if self._waiters:
225            extra = '{},waiters:{}'.format(extra, len(self._waiters))
226        return '<{} [{}]>'.format(res[1:-1], extra)
227
228    def is_set(self):
229        """Return True if and only if the internal flag is true."""
230        return self._value
231
232    def set(self):
233        """Set the internal flag to true. All coroutines waiting for it to
234        become true are awakened. Coroutine that call wait() once the flag is
235        true will not block at all.
236        """
237        if not self._value:
238            self._value = True
239
240            for fut in self._waiters:
241                if not fut.done():
242                    fut.set_result(True)
243
244    def clear(self):
245        """Reset the internal flag to false. Subsequently, coroutines calling
246        wait() will block until set() is called to set the internal flag
247        to true again."""
248        self._value = False
249
250    @coroutine
251    def wait(self):
252        """Block until the internal flag is true.
253
254        If the internal flag is true on entry, return True
255        immediately.  Otherwise, block until another coroutine calls
256        set() to set the flag to true, then return True.
257        """
258        if self._value:
259            return True
260
261        fut = self._loop.create_future()
262        self._waiters.append(fut)
263        try:
264            yield from fut
265            return True
266        finally:
267            self._waiters.remove(fut)
268
269
270class Condition(_ContextManagerMixin):
271    """Asynchronous equivalent to threading.Condition.
272
273    This class implements condition variable objects. A condition variable
274    allows one or more coroutines to wait until they are notified by another
275    coroutine.
276
277    A new Lock object is created and used as the underlying lock.
278    """
279
280    def __init__(self, lock=None, *, loop=None):
281        if loop is not None:
282            self._loop = loop
283        else:
284            self._loop = events.get_event_loop()
285
286        if lock is None:
287            lock = Lock(loop=self._loop)
288        elif lock._loop is not self._loop:
289            raise ValueError("loop argument must agree with lock")
290
291        self._lock = lock
292        # Export the lock's locked(), acquire() and release() methods.
293        self.locked = lock.locked
294        self.acquire = lock.acquire
295        self.release = lock.release
296
297        self._waiters = collections.deque()
298
299    def __repr__(self):
300        res = super().__repr__()
301        extra = 'locked' if self.locked() else 'unlocked'
302        if self._waiters:
303            extra = '{},waiters:{}'.format(extra, len(self._waiters))
304        return '<{} [{}]>'.format(res[1:-1], extra)
305
306    @coroutine
307    def wait(self):
308        """Wait until notified.
309
310        If the calling coroutine has not acquired the lock when this
311        method is called, a RuntimeError is raised.
312
313        This method releases the underlying lock, and then blocks
314        until it is awakened by a notify() or notify_all() call for
315        the same condition variable in another coroutine.  Once
316        awakened, it re-acquires the lock and returns True.
317        """
318        if not self.locked():
319            raise RuntimeError('cannot wait on un-acquired lock')
320
321        self.release()
322        try:
323            fut = self._loop.create_future()
324            self._waiters.append(fut)
325            try:
326                yield from fut
327                return True
328            finally:
329                self._waiters.remove(fut)
330
331        finally:
332            # Must reacquire lock even if wait is cancelled
333            while True:
334                try:
335                    yield from self.acquire()
336                    break
337                except futures.CancelledError:
338                    pass
339
340    @coroutine
341    def wait_for(self, predicate):
342        """Wait until a predicate becomes true.
343
344        The predicate should be a callable which result will be
345        interpreted as a boolean value.  The final predicate value is
346        the return value.
347        """
348        result = predicate()
349        while not result:
350            yield from self.wait()
351            result = predicate()
352        return result
353
354    def notify(self, n=1):
355        """By default, wake up one coroutine waiting on this condition, if any.
356        If the calling coroutine has not acquired the lock when this method
357        is called, a RuntimeError is raised.
358
359        This method wakes up at most n of the coroutines waiting for the
360        condition variable; it is a no-op if no coroutines are waiting.
361
362        Note: an awakened coroutine does not actually return from its
363        wait() call until it can reacquire the lock. Since notify() does
364        not release the lock, its caller should.
365        """
366        if not self.locked():
367            raise RuntimeError('cannot notify on un-acquired lock')
368
369        idx = 0
370        for fut in self._waiters:
371            if idx >= n:
372                break
373
374            if not fut.done():
375                idx += 1
376                fut.set_result(False)
377
378    def notify_all(self):
379        """Wake up all threads waiting on this condition. This method acts
380        like notify(), but wakes up all waiting threads instead of one. If the
381        calling thread has not acquired the lock when this method is called,
382        a RuntimeError is raised.
383        """
384        self.notify(len(self._waiters))
385
386
387class Semaphore(_ContextManagerMixin):
388    """A Semaphore implementation.
389
390    A semaphore manages an internal counter which is decremented by each
391    acquire() call and incremented by each release() call. The counter
392    can never go below zero; when acquire() finds that it is zero, it blocks,
393    waiting until some other thread calls release().
394
395    Semaphores also support the context management protocol.
396
397    The optional argument gives the initial value for the internal
398    counter; it defaults to 1. If the value given is less than 0,
399    ValueError is raised.
400    """
401
402    def __init__(self, value=1, *, loop=None):
403        if value < 0:
404            raise ValueError("Semaphore initial value must be >= 0")
405        self._value = value
406        self._waiters = collections.deque()
407        if loop is not None:
408            self._loop = loop
409        else:
410            self._loop = events.get_event_loop()
411
412    def __repr__(self):
413        res = super().__repr__()
414        extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
415            self._value)
416        if self._waiters:
417            extra = '{},waiters:{}'.format(extra, len(self._waiters))
418        return '<{} [{}]>'.format(res[1:-1], extra)
419
420    def _wake_up_next(self):
421        while self._waiters:
422            waiter = self._waiters.popleft()
423            if not waiter.done():
424                waiter.set_result(None)
425                return
426
427    def locked(self):
428        """Returns True if semaphore can not be acquired immediately."""
429        return self._value == 0
430
431    @coroutine
432    def acquire(self):
433        """Acquire a semaphore.
434
435        If the internal counter is larger than zero on entry,
436        decrement it by one and return True immediately.  If it is
437        zero on entry, block, waiting until some other coroutine has
438        called release() to make it larger than 0, and then return
439        True.
440        """
441        while self._value <= 0:
442            fut = self._loop.create_future()
443            self._waiters.append(fut)
444            try:
445                yield from fut
446            except:
447                # See the similar code in Queue.get.
448                fut.cancel()
449                if self._value > 0 and not fut.cancelled():
450                    self._wake_up_next()
451                raise
452        self._value -= 1
453        return True
454
455    def release(self):
456        """Release a semaphore, incrementing the internal counter by one.
457        When it was zero on entry and another coroutine is waiting for it to
458        become larger than zero again, wake up that coroutine.
459        """
460        self._value += 1
461        self._wake_up_next()
462
463
464class BoundedSemaphore(Semaphore):
465    """A bounded semaphore implementation.
466
467    This raises ValueError in release() if it would increase the value
468    above the initial value.
469    """
470
471    def __init__(self, value=1, *, loop=None):
472        self._bound_value = value
473        super().__init__(value, loop=loop)
474
475    def release(self):
476        if self._value >= self._bound_value:
477            raise ValueError('BoundedSemaphore released too many times')
478        super().release()
479