1# 2# A test of `multiprocessing.Pool` class 3# 4# Copyright (c) 2006-2008, R Oudkerk 5# All rights reserved. 6# 7 8import multiprocessing 9import time 10import random 11import sys 12 13# 14# Functions used by test code 15# 16 17def calculate(func, args): 18 result = func(*args) 19 return '%s says that %s%s = %s' % ( 20 multiprocessing.current_process().name, 21 func.__name__, args, result 22 ) 23 24def calculatestar(args): 25 return calculate(*args) 26 27def mul(a, b): 28 time.sleep(0.5*random.random()) 29 return a * b 30 31def plus(a, b): 32 time.sleep(0.5*random.random()) 33 return a + b 34 35def f(x): 36 return 1.0 / (x-5.0) 37 38def pow3(x): 39 return x**3 40 41def noop(x): 42 pass 43 44# 45# Test code 46# 47 48def test(): 49 print 'cpu_count() = %d\n' % multiprocessing.cpu_count() 50 51 # 52 # Create pool 53 # 54 55 PROCESSES = 4 56 print 'Creating pool with %d processes\n' % PROCESSES 57 pool = multiprocessing.Pool(PROCESSES) 58 print 'pool = %s' % pool 59 print 60 61 # 62 # Tests 63 # 64 65 TASKS = [(mul, (i, 7)) for i in range(10)] + \ 66 [(plus, (i, 8)) for i in range(10)] 67 68 results = [pool.apply_async(calculate, t) for t in TASKS] 69 imap_it = pool.imap(calculatestar, TASKS) 70 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) 71 72 print 'Ordered results using pool.apply_async():' 73 for r in results: 74 print '\t', r.get() 75 print 76 77 print 'Ordered results using pool.imap():' 78 for x in imap_it: 79 print '\t', x 80 print 81 82 print 'Unordered results using pool.imap_unordered():' 83 for x in imap_unordered_it: 84 print '\t', x 85 print 86 87 print 'Ordered results using pool.map() --- will block till complete:' 88 for x in pool.map(calculatestar, TASKS): 89 print '\t', x 90 print 91 92 # 93 # Simple benchmarks 94 # 95 96 N = 100000 97 print 'def pow3(x): return x**3' 98 99 t = time.time() 100 A = map(pow3, xrange(N)) 101 print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \ 102 (N, time.time() - t) 103 104 t = time.time() 105 B = pool.map(pow3, xrange(N)) 106 print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \ 107 (N, time.time() - t) 108 109 t = time.time() 110 C = list(pool.imap(pow3, xrange(N), chunksize=N//8)) 111 print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \ 112 ' seconds' % (N, N//8, time.time() - t) 113 114 assert A == B == C, (len(A), len(B), len(C)) 115 print 116 117 L = [None] * 1000000 118 print 'def noop(x): pass' 119 print 'L = [None] * 1000000' 120 121 t = time.time() 122 A = map(noop, L) 123 print '\tmap(noop, L):\n\t\t%s seconds' % \ 124 (time.time() - t) 125 126 t = time.time() 127 B = pool.map(noop, L) 128 print '\tpool.map(noop, L):\n\t\t%s seconds' % \ 129 (time.time() - t) 130 131 t = time.time() 132 C = list(pool.imap(noop, L, chunksize=len(L)//8)) 133 print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \ 134 (len(L)//8, time.time() - t) 135 136 assert A == B == C, (len(A), len(B), len(C)) 137 print 138 139 del A, B, C, L 140 141 # 142 # Test error handling 143 # 144 145 print 'Testing error handling:' 146 147 try: 148 print pool.apply(f, (5,)) 149 except ZeroDivisionError: 150 print '\tGot ZeroDivisionError as expected from pool.apply()' 151 else: 152 raise AssertionError('expected ZeroDivisionError') 153 154 try: 155 print pool.map(f, range(10)) 156 except ZeroDivisionError: 157 print '\tGot ZeroDivisionError as expected from pool.map()' 158 else: 159 raise AssertionError('expected ZeroDivisionError') 160 161 try: 162 print list(pool.imap(f, range(10))) 163 except ZeroDivisionError: 164 print '\tGot ZeroDivisionError as expected from list(pool.imap())' 165 else: 166 raise AssertionError('expected ZeroDivisionError') 167 168 it = pool.imap(f, range(10)) 169 for i in range(10): 170 try: 171 x = it.next() 172 except ZeroDivisionError: 173 if i == 5: 174 pass 175 except StopIteration: 176 break 177 else: 178 if i == 5: 179 raise AssertionError('expected ZeroDivisionError') 180 181 assert i == 9 182 print '\tGot ZeroDivisionError as expected from IMapIterator.next()' 183 print 184 185 # 186 # Testing timeouts 187 # 188 189 print 'Testing ApplyResult.get() with timeout:', 190 res = pool.apply_async(calculate, TASKS[0]) 191 while 1: 192 sys.stdout.flush() 193 try: 194 sys.stdout.write('\n\t%s' % res.get(0.02)) 195 break 196 except multiprocessing.TimeoutError: 197 sys.stdout.write('.') 198 print 199 print 200 201 print 'Testing IMapIterator.next() with timeout:', 202 it = pool.imap(calculatestar, TASKS) 203 while 1: 204 sys.stdout.flush() 205 try: 206 sys.stdout.write('\n\t%s' % it.next(0.02)) 207 except StopIteration: 208 break 209 except multiprocessing.TimeoutError: 210 sys.stdout.write('.') 211 print 212 print 213 214 # 215 # Testing callback 216 # 217 218 print 'Testing callback:' 219 220 A = [] 221 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729] 222 223 r = pool.apply_async(mul, (7, 8), callback=A.append) 224 r.wait() 225 226 r = pool.map_async(pow3, range(10), callback=A.extend) 227 r.wait() 228 229 if A == B: 230 print '\tcallbacks succeeded\n' 231 else: 232 print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B) 233 234 # 235 # Check there are no outstanding tasks 236 # 237 238 assert not pool._cache, 'cache = %r' % pool._cache 239 240 # 241 # Check close() methods 242 # 243 244 print 'Testing close():' 245 246 for worker in pool._pool: 247 assert worker.is_alive() 248 249 result = pool.apply_async(time.sleep, [0.5]) 250 pool.close() 251 pool.join() 252 253 assert result.get() is None 254 255 for worker in pool._pool: 256 assert not worker.is_alive() 257 258 print '\tclose() succeeded\n' 259 260 # 261 # Check terminate() method 262 # 263 264 print 'Testing terminate():' 265 266 pool = multiprocessing.Pool(2) 267 DELTA = 0.1 268 ignore = pool.apply(pow3, [2]) 269 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] 270 pool.terminate() 271 pool.join() 272 273 for worker in pool._pool: 274 assert not worker.is_alive() 275 276 print '\tterminate() succeeded\n' 277 278 # 279 # Check garbage collection 280 # 281 282 print 'Testing garbage collection:' 283 284 pool = multiprocessing.Pool(2) 285 DELTA = 0.1 286 processes = pool._pool 287 ignore = pool.apply(pow3, [2]) 288 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] 289 290 results = pool = None 291 292 time.sleep(DELTA * 2) 293 294 for worker in processes: 295 assert not worker.is_alive() 296 297 print '\tgarbage collection succeeded\n' 298 299 300if __name__ == '__main__': 301 multiprocessing.freeze_support() 302 303 assert len(sys.argv) in (1, 2) 304 305 if len(sys.argv) == 1 or sys.argv[1] == 'processes': 306 print ' Using processes '.center(79, '-') 307 elif sys.argv[1] == 'threads': 308 print ' Using threads '.center(79, '-') 309 import multiprocessing.dummy as multiprocessing 310 else: 311 print 'Usage:\n\t%s [processes | threads]' % sys.argv[0] 312 raise SystemExit(2) 313 314 test() 315