• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import multiprocessing
2import time
3import random
4import sys
5
6#
7# Functions used by test code
8#
9
10def calculate(func, args):
11    result = func(*args)
12    return '%s says that %s%s = %s' % (
13        multiprocessing.current_process().name,
14        func.__name__, args, result
15        )
16
17def calculatestar(args):
18    return calculate(*args)
19
20def mul(a, b):
21    time.sleep(0.5 * random.random())
22    return a * b
23
24def plus(a, b):
25    time.sleep(0.5 * random.random())
26    return a + b
27
28def f(x):
29    return 1.0 / (x - 5.0)
30
31def pow3(x):
32    return x ** 3
33
34def noop(x):
35    pass
36
37#
38# Test code
39#
40
41def test():
42    PROCESSES = 4
43    print('Creating pool with %d processes\n' % PROCESSES)
44
45    with multiprocessing.Pool(PROCESSES) as pool:
46        #
47        # Tests
48        #
49
50        TASKS = [(mul, (i, 7)) for i in range(10)] + \
51                [(plus, (i, 8)) for i in range(10)]
52
53        results = [pool.apply_async(calculate, t) for t in TASKS]
54        imap_it = pool.imap(calculatestar, TASKS)
55        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
56
57        print('Ordered results using pool.apply_async():')
58        for r in results:
59            print('\t', r.get())
60        print()
61
62        print('Ordered results using pool.imap():')
63        for x in imap_it:
64            print('\t', x)
65        print()
66
67        print('Unordered results using pool.imap_unordered():')
68        for x in imap_unordered_it:
69            print('\t', x)
70        print()
71
72        print('Ordered results using pool.map() --- will block till complete:')
73        for x in pool.map(calculatestar, TASKS):
74            print('\t', x)
75        print()
76
77        #
78        # Test error handling
79        #
80
81        print('Testing error handling:')
82
83        try:
84            print(pool.apply(f, (5,)))
85        except ZeroDivisionError:
86            print('\tGot ZeroDivisionError as expected from pool.apply()')
87        else:
88            raise AssertionError('expected ZeroDivisionError')
89
90        try:
91            print(pool.map(f, list(range(10))))
92        except ZeroDivisionError:
93            print('\tGot ZeroDivisionError as expected from pool.map()')
94        else:
95            raise AssertionError('expected ZeroDivisionError')
96
97        try:
98            print(list(pool.imap(f, list(range(10)))))
99        except ZeroDivisionError:
100            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
101        else:
102            raise AssertionError('expected ZeroDivisionError')
103
104        it = pool.imap(f, list(range(10)))
105        for i in range(10):
106            try:
107                x = next(it)
108            except ZeroDivisionError:
109                if i == 5:
110                    pass
111            except StopIteration:
112                break
113            else:
114                if i == 5:
115                    raise AssertionError('expected ZeroDivisionError')
116
117        assert i == 9
118        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
119        print()
120
121        #
122        # Testing timeouts
123        #
124
125        print('Testing ApplyResult.get() with timeout:', end=' ')
126        res = pool.apply_async(calculate, TASKS[0])
127        while 1:
128            sys.stdout.flush()
129            try:
130                sys.stdout.write('\n\t%s' % res.get(0.02))
131                break
132            except multiprocessing.TimeoutError:
133                sys.stdout.write('.')
134        print()
135        print()
136
137        print('Testing IMapIterator.next() with timeout:', end=' ')
138        it = pool.imap(calculatestar, TASKS)
139        while 1:
140            sys.stdout.flush()
141            try:
142                sys.stdout.write('\n\t%s' % it.next(0.02))
143            except StopIteration:
144                break
145            except multiprocessing.TimeoutError:
146                sys.stdout.write('.')
147        print()
148        print()
149
150
151if __name__ == '__main__':
152    multiprocessing.freeze_support()
153    test()
154