1# 2# A test file for the `multiprocessing` package 3# 4# Copyright (c) 2006-2008, R Oudkerk 5# All rights reserved. 6# 7 8import time, sys, random 9from Queue import Empty 10 11import multiprocessing # may get overwritten 12 13 14#### TEST_VALUE 15 16def value_func(running, mutex): 17 random.seed() 18 time.sleep(random.random()*4) 19 20 mutex.acquire() 21 print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished' 22 running.value -= 1 23 mutex.release() 24 25def test_value(): 26 TASKS = 10 27 running = multiprocessing.Value('i', TASKS) 28 mutex = multiprocessing.Lock() 29 30 for i in range(TASKS): 31 p = multiprocessing.Process(target=value_func, args=(running, mutex)) 32 p.start() 33 34 while running.value > 0: 35 time.sleep(0.08) 36 mutex.acquire() 37 print running.value, 38 sys.stdout.flush() 39 mutex.release() 40 41 print 42 print 'No more running processes' 43 44 45#### TEST_QUEUE 46 47def queue_func(queue): 48 for i in range(30): 49 time.sleep(0.5 * random.random()) 50 queue.put(i*i) 51 queue.put('STOP') 52 53def test_queue(): 54 q = multiprocessing.Queue() 55 56 p = multiprocessing.Process(target=queue_func, args=(q,)) 57 p.start() 58 59 o = None 60 while o != 'STOP': 61 try: 62 o = q.get(timeout=0.3) 63 print o, 64 sys.stdout.flush() 65 except Empty: 66 print 'TIMEOUT' 67 68 print 69 70 71#### TEST_CONDITION 72 73def condition_func(cond): 74 cond.acquire() 75 print '\t' + str(cond) 76 time.sleep(2) 77 print '\tchild is notifying' 78 print '\t' + str(cond) 79 cond.notify() 80 cond.release() 81 82def test_condition(): 83 cond = multiprocessing.Condition() 84 85 p = multiprocessing.Process(target=condition_func, args=(cond,)) 86 print cond 87 88 cond.acquire() 89 print cond 90 cond.acquire() 91 print cond 92 93 p.start() 94 95 print 'main is waiting' 96 cond.wait() 97 print 'main has woken up' 98 99 print cond 100 cond.release() 101 print cond 102 cond.release() 103 104 p.join() 105 print cond 106 107 108#### TEST_SEMAPHORE 109 110def semaphore_func(sema, mutex, running): 111 sema.acquire() 112 113 mutex.acquire() 114 running.value += 1 115 print running.value, 'tasks are running' 116 mutex.release() 117 118 random.seed() 119 time.sleep(random.random()*2) 120 121 mutex.acquire() 122 running.value -= 1 123 print '%s has finished' % multiprocessing.current_process() 124 mutex.release() 125 126 sema.release() 127 128def test_semaphore(): 129 sema = multiprocessing.Semaphore(3) 130 mutex = multiprocessing.RLock() 131 running = multiprocessing.Value('i', 0) 132 133 processes = [ 134 multiprocessing.Process(target=semaphore_func, 135 args=(sema, mutex, running)) 136 for i in range(10) 137 ] 138 139 for p in processes: 140 p.start() 141 142 for p in processes: 143 p.join() 144 145 146#### TEST_JOIN_TIMEOUT 147 148def join_timeout_func(): 149 print '\tchild sleeping' 150 time.sleep(5.5) 151 print '\n\tchild terminating' 152 153def test_join_timeout(): 154 p = multiprocessing.Process(target=join_timeout_func) 155 p.start() 156 157 print 'waiting for process to finish' 158 159 while 1: 160 p.join(timeout=1) 161 if not p.is_alive(): 162 break 163 print '.', 164 sys.stdout.flush() 165 166 167#### TEST_EVENT 168 169def event_func(event): 170 print '\t%r is waiting' % multiprocessing.current_process() 171 event.wait() 172 print '\t%r has woken up' % multiprocessing.current_process() 173 174def test_event(): 175 event = multiprocessing.Event() 176 177 processes = [multiprocessing.Process(target=event_func, args=(event,)) 178 for i in range(5)] 179 180 for p in processes: 181 p.start() 182 183 print 'main is sleeping' 184 time.sleep(2) 185 186 print 'main is setting event' 187 event.set() 188 189 for p in processes: 190 p.join() 191 192 193#### TEST_SHAREDVALUES 194 195def sharedvalues_func(values, arrays, shared_values, shared_arrays): 196 for i in range(len(values)): 197 v = values[i][1] 198 sv = shared_values[i].value 199 assert v == sv 200 201 for i in range(len(values)): 202 a = arrays[i][1] 203 sa = list(shared_arrays[i][:]) 204 assert a == sa 205 206 print 'Tests passed' 207 208def test_sharedvalues(): 209 values = [ 210 ('i', 10), 211 ('h', -2), 212 ('d', 1.25) 213 ] 214 arrays = [ 215 ('i', range(100)), 216 ('d', [0.25 * i for i in range(100)]), 217 ('H', range(1000)) 218 ] 219 220 shared_values = [multiprocessing.Value(id, v) for id, v in values] 221 shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays] 222 223 p = multiprocessing.Process( 224 target=sharedvalues_func, 225 args=(values, arrays, shared_values, shared_arrays) 226 ) 227 p.start() 228 p.join() 229 230 assert p.exitcode == 0 231 232 233#### 234 235def test(namespace=multiprocessing): 236 global multiprocessing 237 238 multiprocessing = namespace 239 240 for func in [ test_value, test_queue, test_condition, 241 test_semaphore, test_join_timeout, test_event, 242 test_sharedvalues ]: 243 244 print '\n\t######## %s\n' % func.__name__ 245 func() 246 247 ignore = multiprocessing.active_children() # cleanup any old processes 248 if hasattr(multiprocessing, '_debug_info'): 249 info = multiprocessing._debug_info() 250 if info: 251 print info 252 raise ValueError('there should be no positive refcounts left') 253 254 255if __name__ == '__main__': 256 multiprocessing.freeze_support() 257 258 assert len(sys.argv) in (1, 2) 259 260 if len(sys.argv) == 1 or sys.argv[1] == 'processes': 261 print ' Using processes '.center(79, '-') 262 namespace = multiprocessing 263 elif sys.argv[1] == 'manager': 264 print ' Using processes and a manager '.center(79, '-') 265 namespace = multiprocessing.Manager() 266 namespace.Process = multiprocessing.Process 267 namespace.current_process = multiprocessing.current_process 268 namespace.active_children = multiprocessing.active_children 269 elif sys.argv[1] == 'threads': 270 print ' Using threads '.center(79, '-') 271 import multiprocessing.dummy as namespace 272 else: 273 print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0] 274 raise SystemExit(2) 275 276 test(namespace) 277