• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Synchronization metaclass.
2
3This metaclass  makes it possible to declare synchronized methods.
4
5"""
6
7import thread
8
9# First we need to define a reentrant lock.
10# This is generally useful and should probably be in a standard Python
11# library module.  For now, we in-line it.
12
13class Lock:
14
15    """Reentrant lock.
16
17    This is a mutex-like object which can be acquired by the same
18    thread more than once.  It keeps a reference count of the number
19    of times it has been acquired by the same thread.  Each acquire()
20    call must be matched by a release() call and only the last
21    release() call actually releases the lock for acquisition by
22    another thread.
23
24    The implementation uses two locks internally:
25
26    __mutex is a short term lock used to protect the instance variables
27    __wait is the lock for which other threads wait
28
29    A thread intending to acquire both locks should acquire __wait
30    first.
31
32   The implementation uses two other instance variables, protected by
33   locking __mutex:
34
35    __tid is the thread ID of the thread that currently has the lock
36    __count is the number of times the current thread has acquired it
37
38    When the lock is released, __tid is None and __count is zero.
39
40    """
41
42    def __init__(self):
43        """Constructor.  Initialize all instance variables."""
44        self.__mutex = thread.allocate_lock()
45        self.__wait = thread.allocate_lock()
46        self.__tid = None
47        self.__count = 0
48
49    def acquire(self, flag=1):
50        """Acquire the lock.
51
52        If the optional flag argument is false, returns immediately
53        when it cannot acquire the __wait lock without blocking (it
54        may still block for a little while in order to acquire the
55        __mutex lock).
56
57        The return value is only relevant when the flag argument is
58        false; it is 1 if the lock is acquired, 0 if not.
59
60        """
61        self.__mutex.acquire()
62        try:
63            if self.__tid == thread.get_ident():
64                self.__count = self.__count + 1
65                return 1
66        finally:
67            self.__mutex.release()
68        locked = self.__wait.acquire(flag)
69        if not flag and not locked:
70            return 0
71        try:
72            self.__mutex.acquire()
73            assert self.__tid == None
74            assert self.__count == 0
75            self.__tid = thread.get_ident()
76            self.__count = 1
77            return 1
78        finally:
79            self.__mutex.release()
80
81    def release(self):
82        """Release the lock.
83
84        If this thread doesn't currently have the lock, an assertion
85        error is raised.
86
87        Only allow another thread to acquire the lock when the count
88        reaches zero after decrementing it.
89
90        """
91        self.__mutex.acquire()
92        try:
93            assert self.__tid == thread.get_ident()
94            assert self.__count > 0
95            self.__count = self.__count - 1
96            if self.__count == 0:
97                self.__tid = None
98                self.__wait.release()
99        finally:
100            self.__mutex.release()
101
102
103def _testLock():
104
105    done = []
106
107    def f2(lock, done=done):
108        lock.acquire()
109        print "f2 running in thread %d\n" % thread.get_ident(),
110        lock.release()
111        done.append(1)
112
113    def f1(lock, f2=f2, done=done):
114        lock.acquire()
115        print "f1 running in thread %d\n" % thread.get_ident(),
116        try:
117            f2(lock)
118        finally:
119            lock.release()
120        done.append(1)
121
122    lock = Lock()
123    lock.acquire()
124    f1(lock)                            # Adds 2 to done
125    lock.release()
126
127    lock.acquire()
128
129    thread.start_new_thread(f1, (lock,)) # Adds 2
130    thread.start_new_thread(f1, (lock, f1)) # Adds 3
131    thread.start_new_thread(f2, (lock,)) # Adds 1
132    thread.start_new_thread(f2, (lock,)) # Adds 1
133
134    lock.release()
135    import time
136    while len(done) < 9:
137        print len(done)
138        time.sleep(0.001)
139    print len(done)
140
141
142# Now, the Locking metaclass is a piece of cake.
143# As an example feature, methods whose name begins with exactly one
144# underscore are not synchronized.
145
146from Meta import MetaClass, MetaHelper, MetaMethodWrapper
147
148class LockingMethodWrapper(MetaMethodWrapper):
149    def __call__(self, *args, **kw):
150        if self.__name__[:1] == '_' and self.__name__[1:] != '_':
151            return apply(self.func, (self.inst,) + args, kw)
152        self.inst.__lock__.acquire()
153        try:
154            return apply(self.func, (self.inst,) + args, kw)
155        finally:
156            self.inst.__lock__.release()
157
158class LockingHelper(MetaHelper):
159    __methodwrapper__ = LockingMethodWrapper
160    def __helperinit__(self, formalclass):
161        MetaHelper.__helperinit__(self, formalclass)
162        self.__lock__ = Lock()
163
164class LockingMetaClass(MetaClass):
165    __helper__ = LockingHelper
166
167Locking = LockingMetaClass('Locking', (), {})
168
169def _test():
170    # For kicks, take away the Locking base class and see it die
171    class Buffer(Locking):
172        def __init__(self, initialsize):
173            assert initialsize > 0
174            self.size = initialsize
175            self.buffer = [None]*self.size
176            self.first = self.last = 0
177        def put(self, item):
178            # Do we need to grow the buffer?
179            if (self.last+1) % self.size != self.first:
180                # Insert the new item
181                self.buffer[self.last] = item
182                self.last = (self.last+1) % self.size
183                return
184            # Double the buffer size
185            # First normalize it so that first==0 and last==size-1
186            print "buffer =", self.buffer
187            print "first = %d, last = %d, size = %d" % (
188                self.first, self.last, self.size)
189            if self.first <= self.last:
190                temp = self.buffer[self.first:self.last]
191            else:
192                temp = self.buffer[self.first:] + self.buffer[:self.last]
193            print "temp =", temp
194            self.buffer = temp + [None]*(self.size+1)
195            self.first = 0
196            self.last = self.size-1
197            self.size = self.size*2
198            print "Buffer size doubled to", self.size
199            print "new buffer =", self.buffer
200            print "first = %d, last = %d, size = %d" % (
201                self.first, self.last, self.size)
202            self.put(item)              # Recursive call to test the locking
203        def get(self):
204            # Is the buffer empty?
205            if self.first == self.last:
206                raise EOFError          # Avoid defining a new exception
207            item = self.buffer[self.first]
208            self.first = (self.first+1) % self.size
209            return item
210
211    def producer(buffer, wait, n=1000):
212        import time
213        i = 0
214        while i < n:
215            print "put", i
216            buffer.put(i)
217            i = i+1
218        print "Producer: done producing", n, "items"
219        wait.release()
220
221    def consumer(buffer, wait, n=1000):
222        import time
223        i = 0
224        tout = 0.001
225        while i < n:
226            try:
227                x = buffer.get()
228                if x != i:
229                    raise AssertionError, \
230                          "get() returned %s, expected %s" % (x, i)
231                print "got", i
232                i = i+1
233                tout = 0.001
234            except EOFError:
235                time.sleep(tout)
236                tout = tout*2
237        print "Consumer: done consuming", n, "items"
238        wait.release()
239
240    pwait = thread.allocate_lock()
241    pwait.acquire()
242    cwait = thread.allocate_lock()
243    cwait.acquire()
244    buffer = Buffer(1)
245    n = 1000
246    thread.start_new_thread(consumer, (buffer, cwait, n))
247    thread.start_new_thread(producer, (buffer, pwait, n))
248    pwait.acquire()
249    print "Producer done"
250    cwait.acquire()
251    print "All done"
252    print "buffer size ==", len(buffer.buffer)
253
254if __name__ == '__main__':
255    _testLock()
256    _test()
257