• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module implementing synchronization primitives
3#
4# multiprocessing/synchronize.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10__all__ = [
11    'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
12    ]
13
14import threading
15import sys
16import tempfile
17import _multiprocessing
18import time
19
20from . import context
21from . import process
22from . import util
23
24# Try to import the mp.synchronize module cleanly, if it fails
25# raise ImportError for platforms lacking a working sem_open implementation.
26# See issue 3770
27try:
28    from _multiprocessing import SemLock, sem_unlink
29except (ImportError):
30    raise ImportError("This platform lacks a functioning sem_open" +
31                      " implementation, therefore, the required" +
32                      " synchronization primitives needed will not" +
33                      " function, see issue 3770.")
34
35#
36# Constants
37#
38
39RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
40SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
41
42#
43# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
44#
45
46class SemLock(object):
47
48    _rand = tempfile._RandomNameSequence()
49
50    def __init__(self, kind, value, maxvalue, *, ctx):
51        if ctx is None:
52            ctx = context._default_context.get_context()
53        name = ctx.get_start_method()
54        unlink_now = sys.platform == 'win32' or name == 'fork'
55        for i in range(100):
56            try:
57                sl = self._semlock = _multiprocessing.SemLock(
58                    kind, value, maxvalue, self._make_name(),
59                    unlink_now)
60            except FileExistsError:
61                pass
62            else:
63                break
64        else:
65            raise FileExistsError('cannot find name for semaphore')
66
67        util.debug('created semlock with handle %s' % sl.handle)
68        self._make_methods()
69
70        if sys.platform != 'win32':
71            def _after_fork(obj):
72                obj._semlock._after_fork()
73            util.register_after_fork(self, _after_fork)
74
75        if self._semlock.name is not None:
76            # We only get here if we are on Unix with forking
77            # disabled.  When the object is garbage collected or the
78            # process shuts down we unlink the semaphore name
79            from .resource_tracker import register
80            register(self._semlock.name, "semaphore")
81            util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
82                          exitpriority=0)
83
84    @staticmethod
85    def _cleanup(name):
86        from .resource_tracker import unregister
87        sem_unlink(name)
88        unregister(name, "semaphore")
89
90    def _make_methods(self):
91        self.acquire = self._semlock.acquire
92        self.release = self._semlock.release
93
94    def __enter__(self):
95        return self._semlock.__enter__()
96
97    def __exit__(self, *args):
98        return self._semlock.__exit__(*args)
99
100    def __getstate__(self):
101        context.assert_spawning(self)
102        sl = self._semlock
103        if sys.platform == 'win32':
104            h = context.get_spawning_popen().duplicate_for_child(sl.handle)
105        else:
106            h = sl.handle
107        return (h, sl.kind, sl.maxvalue, sl.name)
108
109    def __setstate__(self, state):
110        self._semlock = _multiprocessing.SemLock._rebuild(*state)
111        util.debug('recreated blocker with handle %r' % state[0])
112        self._make_methods()
113
114    @staticmethod
115    def _make_name():
116        return '%s-%s' % (process.current_process()._config['semprefix'],
117                          next(SemLock._rand))
118
119#
120# Semaphore
121#
122
123class Semaphore(SemLock):
124
125    def __init__(self, value=1, *, ctx):
126        SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
127
128    def get_value(self):
129        return self._semlock._get_value()
130
131    def __repr__(self):
132        try:
133            value = self._semlock._get_value()
134        except Exception:
135            value = 'unknown'
136        return '<%s(value=%s)>' % (self.__class__.__name__, value)
137
138#
139# Bounded semaphore
140#
141
142class BoundedSemaphore(Semaphore):
143
144    def __init__(self, value=1, *, ctx):
145        SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
146
147    def __repr__(self):
148        try:
149            value = self._semlock._get_value()
150        except Exception:
151            value = 'unknown'
152        return '<%s(value=%s, maxvalue=%s)>' % \
153               (self.__class__.__name__, value, self._semlock.maxvalue)
154
155#
156# Non-recursive lock
157#
158
159class Lock(SemLock):
160
161    def __init__(self, *, ctx):
162        SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
163
164    def __repr__(self):
165        try:
166            if self._semlock._is_mine():
167                name = process.current_process().name
168                if threading.current_thread().name != 'MainThread':
169                    name += '|' + threading.current_thread().name
170            elif self._semlock._get_value() == 1:
171                name = 'None'
172            elif self._semlock._count() > 0:
173                name = 'SomeOtherThread'
174            else:
175                name = 'SomeOtherProcess'
176        except Exception:
177            name = 'unknown'
178        return '<%s(owner=%s)>' % (self.__class__.__name__, name)
179
180#
181# Recursive lock
182#
183
184class RLock(SemLock):
185
186    def __init__(self, *, ctx):
187        SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
188
189    def __repr__(self):
190        try:
191            if self._semlock._is_mine():
192                name = process.current_process().name
193                if threading.current_thread().name != 'MainThread':
194                    name += '|' + threading.current_thread().name
195                count = self._semlock._count()
196            elif self._semlock._get_value() == 1:
197                name, count = 'None', 0
198            elif self._semlock._count() > 0:
199                name, count = 'SomeOtherThread', 'nonzero'
200            else:
201                name, count = 'SomeOtherProcess', 'nonzero'
202        except Exception:
203            name, count = 'unknown', 'unknown'
204        return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
205
206#
207# Condition variable
208#
209
210class Condition(object):
211
212    def __init__(self, lock=None, *, ctx):
213        self._lock = lock or ctx.RLock()
214        self._sleeping_count = ctx.Semaphore(0)
215        self._woken_count = ctx.Semaphore(0)
216        self._wait_semaphore = ctx.Semaphore(0)
217        self._make_methods()
218
219    def __getstate__(self):
220        context.assert_spawning(self)
221        return (self._lock, self._sleeping_count,
222                self._woken_count, self._wait_semaphore)
223
224    def __setstate__(self, state):
225        (self._lock, self._sleeping_count,
226         self._woken_count, self._wait_semaphore) = state
227        self._make_methods()
228
229    def __enter__(self):
230        return self._lock.__enter__()
231
232    def __exit__(self, *args):
233        return self._lock.__exit__(*args)
234
235    def _make_methods(self):
236        self.acquire = self._lock.acquire
237        self.release = self._lock.release
238
239    def __repr__(self):
240        try:
241            num_waiters = (self._sleeping_count._semlock._get_value() -
242                           self._woken_count._semlock._get_value())
243        except Exception:
244            num_waiters = 'unknown'
245        return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
246
247    def wait(self, timeout=None):
248        assert self._lock._semlock._is_mine(), \
249               'must acquire() condition before using wait()'
250
251        # indicate that this thread is going to sleep
252        self._sleeping_count.release()
253
254        # release lock
255        count = self._lock._semlock._count()
256        for i in range(count):
257            self._lock.release()
258
259        try:
260            # wait for notification or timeout
261            return self._wait_semaphore.acquire(True, timeout)
262        finally:
263            # indicate that this thread has woken
264            self._woken_count.release()
265
266            # reacquire lock
267            for i in range(count):
268                self._lock.acquire()
269
270    def notify(self, n=1):
271        assert self._lock._semlock._is_mine(), 'lock is not owned'
272        assert not self._wait_semaphore.acquire(
273            False), ('notify: Should not have been able to acquire '
274                     + '_wait_semaphore')
275
276        # to take account of timeouts since last notify*() we subtract
277        # woken_count from sleeping_count and rezero woken_count
278        while self._woken_count.acquire(False):
279            res = self._sleeping_count.acquire(False)
280            assert res, ('notify: Bug in sleeping_count.acquire'
281                         + '- res should not be False')
282
283        sleepers = 0
284        while sleepers < n and self._sleeping_count.acquire(False):
285            self._wait_semaphore.release()        # wake up one sleeper
286            sleepers += 1
287
288        if sleepers:
289            for i in range(sleepers):
290                self._woken_count.acquire()       # wait for a sleeper to wake
291
292            # rezero wait_semaphore in case some timeouts just happened
293            while self._wait_semaphore.acquire(False):
294                pass
295
296    def notify_all(self):
297        self.notify(n=sys.maxsize)
298
299    def wait_for(self, predicate, timeout=None):
300        result = predicate()
301        if result:
302            return result
303        if timeout is not None:
304            endtime = time.monotonic() + timeout
305        else:
306            endtime = None
307            waittime = None
308        while not result:
309            if endtime is not None:
310                waittime = endtime - time.monotonic()
311                if waittime <= 0:
312                    break
313            self.wait(waittime)
314            result = predicate()
315        return result
316
317#
318# Event
319#
320
321class Event(object):
322
323    def __init__(self, *, ctx):
324        self._cond = ctx.Condition(ctx.Lock())
325        self._flag = ctx.Semaphore(0)
326
327    def is_set(self):
328        with self._cond:
329            if self._flag.acquire(False):
330                self._flag.release()
331                return True
332            return False
333
334    def set(self):
335        with self._cond:
336            self._flag.acquire(False)
337            self._flag.release()
338            self._cond.notify_all()
339
340    def clear(self):
341        with self._cond:
342            self._flag.acquire(False)
343
344    def wait(self, timeout=None):
345        with self._cond:
346            if self._flag.acquire(False):
347                self._flag.release()
348            else:
349                self._cond.wait(timeout)
350
351            if self._flag.acquire(False):
352                self._flag.release()
353                return True
354            return False
355
356#
357# Barrier
358#
359
360class Barrier(threading.Barrier):
361
362    def __init__(self, parties, action=None, timeout=None, *, ctx):
363        import struct
364        from .heap import BufferWrapper
365        wrapper = BufferWrapper(struct.calcsize('i') * 2)
366        cond = ctx.Condition()
367        self.__setstate__((parties, action, timeout, cond, wrapper))
368        self._state = 0
369        self._count = 0
370
371    def __setstate__(self, state):
372        (self._parties, self._action, self._timeout,
373         self._cond, self._wrapper) = state
374        self._array = self._wrapper.create_memoryview().cast('i')
375
376    def __getstate__(self):
377        return (self._parties, self._action, self._timeout,
378                self._cond, self._wrapper)
379
380    @property
381    def _state(self):
382        return self._array[0]
383
384    @_state.setter
385    def _state(self, value):
386        self._array[0] = value
387
388    @property
389    def _count(self):
390        return self._array[1]
391
392    @_count.setter
393    def _count(self, value):
394        self._array[1] = value
395