1"""Synchronization metaclass. 2 3This metaclass makes it possible to declare synchronized methods. 4 5""" 6 7import thread 8 9# First we need to define a reentrant lock. 10# This is generally useful and should probably be in a standard Python 11# library module. For now, we in-line it. 12 13class Lock: 14 15 """Reentrant lock. 16 17 This is a mutex-like object which can be acquired by the same 18 thread more than once. It keeps a reference count of the number 19 of times it has been acquired by the same thread. Each acquire() 20 call must be matched by a release() call and only the last 21 release() call actually releases the lock for acquisition by 22 another thread. 23 24 The implementation uses two locks internally: 25 26 __mutex is a short term lock used to protect the instance variables 27 __wait is the lock for which other threads wait 28 29 A thread intending to acquire both locks should acquire __wait 30 first. 31 32 The implementation uses two other instance variables, protected by 33 locking __mutex: 34 35 __tid is the thread ID of the thread that currently has the lock 36 __count is the number of times the current thread has acquired it 37 38 When the lock is released, __tid is None and __count is zero. 39 40 """ 41 42 def __init__(self): 43 """Constructor. Initialize all instance variables.""" 44 self.__mutex = thread.allocate_lock() 45 self.__wait = thread.allocate_lock() 46 self.__tid = None 47 self.__count = 0 48 49 def acquire(self, flag=1): 50 """Acquire the lock. 51 52 If the optional flag argument is false, returns immediately 53 when it cannot acquire the __wait lock without blocking (it 54 may still block for a little while in order to acquire the 55 __mutex lock). 56 57 The return value is only relevant when the flag argument is 58 false; it is 1 if the lock is acquired, 0 if not. 59 60 """ 61 self.__mutex.acquire() 62 try: 63 if self.__tid == thread.get_ident(): 64 self.__count = self.__count + 1 65 return 1 66 finally: 67 self.__mutex.release() 68 locked = self.__wait.acquire(flag) 69 if not flag and not locked: 70 return 0 71 try: 72 self.__mutex.acquire() 73 assert self.__tid == None 74 assert self.__count == 0 75 self.__tid = thread.get_ident() 76 self.__count = 1 77 return 1 78 finally: 79 self.__mutex.release() 80 81 def release(self): 82 """Release the lock. 83 84 If this thread doesn't currently have the lock, an assertion 85 error is raised. 86 87 Only allow another thread to acquire the lock when the count 88 reaches zero after decrementing it. 89 90 """ 91 self.__mutex.acquire() 92 try: 93 assert self.__tid == thread.get_ident() 94 assert self.__count > 0 95 self.__count = self.__count - 1 96 if self.__count == 0: 97 self.__tid = None 98 self.__wait.release() 99 finally: 100 self.__mutex.release() 101 102 103def _testLock(): 104 105 done = [] 106 107 def f2(lock, done=done): 108 lock.acquire() 109 print "f2 running in thread %d\n" % thread.get_ident(), 110 lock.release() 111 done.append(1) 112 113 def f1(lock, f2=f2, done=done): 114 lock.acquire() 115 print "f1 running in thread %d\n" % thread.get_ident(), 116 try: 117 f2(lock) 118 finally: 119 lock.release() 120 done.append(1) 121 122 lock = Lock() 123 lock.acquire() 124 f1(lock) # Adds 2 to done 125 lock.release() 126 127 lock.acquire() 128 129 thread.start_new_thread(f1, (lock,)) # Adds 2 130 thread.start_new_thread(f1, (lock, f1)) # Adds 3 131 thread.start_new_thread(f2, (lock,)) # Adds 1 132 thread.start_new_thread(f2, (lock,)) # Adds 1 133 134 lock.release() 135 import time 136 while len(done) < 9: 137 print len(done) 138 time.sleep(0.001) 139 print len(done) 140 141 142# Now, the Locking metaclass is a piece of cake. 143# As an example feature, methods whose name begins with exactly one 144# underscore are not synchronized. 145 146from Meta import MetaClass, MetaHelper, MetaMethodWrapper 147 148class LockingMethodWrapper(MetaMethodWrapper): 149 def __call__(self, *args, **kw): 150 if self.__name__[:1] == '_' and self.__name__[1:] != '_': 151 return apply(self.func, (self.inst,) + args, kw) 152 self.inst.__lock__.acquire() 153 try: 154 return apply(self.func, (self.inst,) + args, kw) 155 finally: 156 self.inst.__lock__.release() 157 158class LockingHelper(MetaHelper): 159 __methodwrapper__ = LockingMethodWrapper 160 def __helperinit__(self, formalclass): 161 MetaHelper.__helperinit__(self, formalclass) 162 self.__lock__ = Lock() 163 164class LockingMetaClass(MetaClass): 165 __helper__ = LockingHelper 166 167Locking = LockingMetaClass('Locking', (), {}) 168 169def _test(): 170 # For kicks, take away the Locking base class and see it die 171 class Buffer(Locking): 172 def __init__(self, initialsize): 173 assert initialsize > 0 174 self.size = initialsize 175 self.buffer = [None]*self.size 176 self.first = self.last = 0 177 def put(self, item): 178 # Do we need to grow the buffer? 179 if (self.last+1) % self.size != self.first: 180 # Insert the new item 181 self.buffer[self.last] = item 182 self.last = (self.last+1) % self.size 183 return 184 # Double the buffer size 185 # First normalize it so that first==0 and last==size-1 186 print "buffer =", self.buffer 187 print "first = %d, last = %d, size = %d" % ( 188 self.first, self.last, self.size) 189 if self.first <= self.last: 190 temp = self.buffer[self.first:self.last] 191 else: 192 temp = self.buffer[self.first:] + self.buffer[:self.last] 193 print "temp =", temp 194 self.buffer = temp + [None]*(self.size+1) 195 self.first = 0 196 self.last = self.size-1 197 self.size = self.size*2 198 print "Buffer size doubled to", self.size 199 print "new buffer =", self.buffer 200 print "first = %d, last = %d, size = %d" % ( 201 self.first, self.last, self.size) 202 self.put(item) # Recursive call to test the locking 203 def get(self): 204 # Is the buffer empty? 205 if self.first == self.last: 206 raise EOFError # Avoid defining a new exception 207 item = self.buffer[self.first] 208 self.first = (self.first+1) % self.size 209 return item 210 211 def producer(buffer, wait, n=1000): 212 import time 213 i = 0 214 while i < n: 215 print "put", i 216 buffer.put(i) 217 i = i+1 218 print "Producer: done producing", n, "items" 219 wait.release() 220 221 def consumer(buffer, wait, n=1000): 222 import time 223 i = 0 224 tout = 0.001 225 while i < n: 226 try: 227 x = buffer.get() 228 if x != i: 229 raise AssertionError, \ 230 "get() returned %s, expected %s" % (x, i) 231 print "got", i 232 i = i+1 233 tout = 0.001 234 except EOFError: 235 time.sleep(tout) 236 tout = tout*2 237 print "Consumer: done consuming", n, "items" 238 wait.release() 239 240 pwait = thread.allocate_lock() 241 pwait.acquire() 242 cwait = thread.allocate_lock() 243 cwait.acquire() 244 buffer = Buffer(1) 245 n = 1000 246 thread.start_new_thread(consumer, (buffer, cwait, n)) 247 thread.start_new_thread(producer, (buffer, pwait, n)) 248 pwait.acquire() 249 print "Producer done" 250 cwait.acquire() 251 print "All done" 252 print "buffer size ==", len(buffer.buffer) 253 254if __name__ == '__main__': 255 _testLock() 256 _test() 257