1import multiprocessing 2import time 3import random 4import sys 5 6# 7# Functions used by test code 8# 9 10def calculate(func, args): 11 result = func(*args) 12 return '%s says that %s%s = %s' % ( 13 multiprocessing.current_process().name, 14 func.__name__, args, result 15 ) 16 17def calculatestar(args): 18 return calculate(*args) 19 20def mul(a, b): 21 time.sleep(0.5 * random.random()) 22 return a * b 23 24def plus(a, b): 25 time.sleep(0.5 * random.random()) 26 return a + b 27 28def f(x): 29 return 1.0 / (x - 5.0) 30 31def pow3(x): 32 return x ** 3 33 34def noop(x): 35 pass 36 37# 38# Test code 39# 40 41def test(): 42 PROCESSES = 4 43 print('Creating pool with %d processes\n' % PROCESSES) 44 45 with multiprocessing.Pool(PROCESSES) as pool: 46 # 47 # Tests 48 # 49 50 TASKS = [(mul, (i, 7)) for i in range(10)] + \ 51 [(plus, (i, 8)) for i in range(10)] 52 53 results = [pool.apply_async(calculate, t) for t in TASKS] 54 imap_it = pool.imap(calculatestar, TASKS) 55 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) 56 57 print('Ordered results using pool.apply_async():') 58 for r in results: 59 print('\t', r.get()) 60 print() 61 62 print('Ordered results using pool.imap():') 63 for x in imap_it: 64 print('\t', x) 65 print() 66 67 print('Unordered results using pool.imap_unordered():') 68 for x in imap_unordered_it: 69 print('\t', x) 70 print() 71 72 print('Ordered results using pool.map() --- will block till complete:') 73 for x in pool.map(calculatestar, TASKS): 74 print('\t', x) 75 print() 76 77 # 78 # Test error handling 79 # 80 81 print('Testing error handling:') 82 83 try: 84 print(pool.apply(f, (5,))) 85 except ZeroDivisionError: 86 print('\tGot ZeroDivisionError as expected from pool.apply()') 87 else: 88 raise AssertionError('expected ZeroDivisionError') 89 90 try: 91 print(pool.map(f, list(range(10)))) 92 except ZeroDivisionError: 93 print('\tGot ZeroDivisionError as expected from pool.map()') 94 else: 95 raise AssertionError('expected ZeroDivisionError') 96 97 try: 98 print(list(pool.imap(f, list(range(10))))) 99 except ZeroDivisionError: 100 print('\tGot ZeroDivisionError as expected from list(pool.imap())') 101 else: 102 raise AssertionError('expected ZeroDivisionError') 103 104 it = pool.imap(f, list(range(10))) 105 for i in range(10): 106 try: 107 x = next(it) 108 except ZeroDivisionError: 109 if i == 5: 110 pass 111 except StopIteration: 112 break 113 else: 114 if i == 5: 115 raise AssertionError('expected ZeroDivisionError') 116 117 assert i == 9 118 print('\tGot ZeroDivisionError as expected from IMapIterator.next()') 119 print() 120 121 # 122 # Testing timeouts 123 # 124 125 print('Testing ApplyResult.get() with timeout:', end=' ') 126 res = pool.apply_async(calculate, TASKS[0]) 127 while 1: 128 sys.stdout.flush() 129 try: 130 sys.stdout.write('\n\t%s' % res.get(0.02)) 131 break 132 except multiprocessing.TimeoutError: 133 sys.stdout.write('.') 134 print() 135 print() 136 137 print('Testing IMapIterator.next() with timeout:', end=' ') 138 it = pool.imap(calculatestar, TASKS) 139 while 1: 140 sys.stdout.flush() 141 try: 142 sys.stdout.write('\n\t%s' % it.next(0.02)) 143 except StopIteration: 144 break 145 except multiprocessing.TimeoutError: 146 sys.stdout.write('.') 147 print() 148 print() 149 150 151if __name__ == '__main__': 152 multiprocessing.freeze_support() 153 test() 154