1# 2# Simple benchmarks for the multiprocessing package 3# 4# Copyright (c) 2006-2008, R Oudkerk 5# All rights reserved. 6# 7 8import time, sys, multiprocessing, threading, Queue, gc 9 10if sys.platform == 'win32': 11 _timer = time.clock 12else: 13 _timer = time.time 14 15delta = 1 16 17 18#### TEST_QUEUESPEED 19 20def queuespeed_func(q, c, iterations): 21 a = '0' * 256 22 c.acquire() 23 c.notify() 24 c.release() 25 26 for i in xrange(iterations): 27 q.put(a) 28 29 q.put('STOP') 30 31def test_queuespeed(Process, q, c): 32 elapsed = 0 33 iterations = 1 34 35 while elapsed < delta: 36 iterations *= 2 37 38 p = Process(target=queuespeed_func, args=(q, c, iterations)) 39 c.acquire() 40 p.start() 41 c.wait() 42 c.release() 43 44 result = None 45 t = _timer() 46 47 while result != 'STOP': 48 result = q.get() 49 50 elapsed = _timer() - t 51 52 p.join() 53 54 print iterations, 'objects passed through the queue in', elapsed, 'seconds' 55 print 'average number/sec:', iterations/elapsed 56 57 58#### TEST_PIPESPEED 59 60def pipe_func(c, cond, iterations): 61 a = '0' * 256 62 cond.acquire() 63 cond.notify() 64 cond.release() 65 66 for i in xrange(iterations): 67 c.send(a) 68 69 c.send('STOP') 70 71def test_pipespeed(): 72 c, d = multiprocessing.Pipe() 73 cond = multiprocessing.Condition() 74 elapsed = 0 75 iterations = 1 76 77 while elapsed < delta: 78 iterations *= 2 79 80 p = multiprocessing.Process(target=pipe_func, 81 args=(d, cond, iterations)) 82 cond.acquire() 83 p.start() 84 cond.wait() 85 cond.release() 86 87 result = None 88 t = _timer() 89 90 while result != 'STOP': 91 result = c.recv() 92 93 elapsed = _timer() - t 94 p.join() 95 96 print iterations, 'objects passed through connection in',elapsed,'seconds' 97 print 'average number/sec:', iterations/elapsed 98 99 100#### TEST_SEQSPEED 101 102def test_seqspeed(seq): 103 elapsed = 0 104 iterations = 1 105 106 while elapsed < delta: 107 iterations *= 2 108 109 t = _timer() 110 111 for i in xrange(iterations): 112 a = seq[5] 113 114 elapsed = _timer()-t 115 116 print iterations, 'iterations in', elapsed, 'seconds' 117 print 'average number/sec:', iterations/elapsed 118 119 120#### TEST_LOCK 121 122def test_lockspeed(l): 123 elapsed = 0 124 iterations = 1 125 126 while elapsed < delta: 127 iterations *= 2 128 129 t = _timer() 130 131 for i in xrange(iterations): 132 l.acquire() 133 l.release() 134 135 elapsed = _timer()-t 136 137 print iterations, 'iterations in', elapsed, 'seconds' 138 print 'average number/sec:', iterations/elapsed 139 140 141#### TEST_CONDITION 142 143def conditionspeed_func(c, N): 144 c.acquire() 145 c.notify() 146 147 for i in xrange(N): 148 c.wait() 149 c.notify() 150 151 c.release() 152 153def test_conditionspeed(Process, c): 154 elapsed = 0 155 iterations = 1 156 157 while elapsed < delta: 158 iterations *= 2 159 160 c.acquire() 161 p = Process(target=conditionspeed_func, args=(c, iterations)) 162 p.start() 163 164 c.wait() 165 166 t = _timer() 167 168 for i in xrange(iterations): 169 c.notify() 170 c.wait() 171 172 elapsed = _timer()-t 173 174 c.release() 175 p.join() 176 177 print iterations * 2, 'waits in', elapsed, 'seconds' 178 print 'average number/sec:', iterations * 2 / elapsed 179 180#### 181 182def test(): 183 manager = multiprocessing.Manager() 184 185 gc.disable() 186 187 print '\n\t######## testing Queue.Queue\n' 188 test_queuespeed(threading.Thread, Queue.Queue(), 189 threading.Condition()) 190 print '\n\t######## testing multiprocessing.Queue\n' 191 test_queuespeed(multiprocessing.Process, multiprocessing.Queue(), 192 multiprocessing.Condition()) 193 print '\n\t######## testing Queue managed by server process\n' 194 test_queuespeed(multiprocessing.Process, manager.Queue(), 195 manager.Condition()) 196 print '\n\t######## testing multiprocessing.Pipe\n' 197 test_pipespeed() 198 199 print 200 201 print '\n\t######## testing list\n' 202 test_seqspeed(range(10)) 203 print '\n\t######## testing list managed by server process\n' 204 test_seqspeed(manager.list(range(10))) 205 print '\n\t######## testing Array("i", ..., lock=False)\n' 206 test_seqspeed(multiprocessing.Array('i', range(10), lock=False)) 207 print '\n\t######## testing Array("i", ..., lock=True)\n' 208 test_seqspeed(multiprocessing.Array('i', range(10), lock=True)) 209 210 print 211 212 print '\n\t######## testing threading.Lock\n' 213 test_lockspeed(threading.Lock()) 214 print '\n\t######## testing threading.RLock\n' 215 test_lockspeed(threading.RLock()) 216 print '\n\t######## testing multiprocessing.Lock\n' 217 test_lockspeed(multiprocessing.Lock()) 218 print '\n\t######## testing multiprocessing.RLock\n' 219 test_lockspeed(multiprocessing.RLock()) 220 print '\n\t######## testing lock managed by server process\n' 221 test_lockspeed(manager.Lock()) 222 print '\n\t######## testing rlock managed by server process\n' 223 test_lockspeed(manager.RLock()) 224 225 print 226 227 print '\n\t######## testing threading.Condition\n' 228 test_conditionspeed(threading.Thread, threading.Condition()) 229 print '\n\t######## testing multiprocessing.Condition\n' 230 test_conditionspeed(multiprocessing.Process, multiprocessing.Condition()) 231 print '\n\t######## testing condition managed by a server process\n' 232 test_conditionspeed(multiprocessing.Process, manager.Condition()) 233 234 gc.enable() 235 236if __name__ == '__main__': 237 multiprocessing.freeze_support() 238 test() 239