1# This file should be kept compatible with both Python 2.6 and Python >= 3.0. 2 3from __future__ import division 4from __future__ import print_function 5 6""" 7ccbench, a Python concurrency benchmark. 8""" 9 10import time 11import os 12import sys 13import itertools 14import threading 15import subprocess 16import socket 17from optparse import OptionParser, SUPPRESS_HELP 18import platform 19 20# Compatibility 21try: 22 xrange 23except NameError: 24 xrange = range 25 26try: 27 map = itertools.imap 28except AttributeError: 29 pass 30 31 32THROUGHPUT_DURATION = 2.0 33 34LATENCY_PING_INTERVAL = 0.1 35LATENCY_DURATION = 2.0 36 37BANDWIDTH_PACKET_SIZE = 1024 38BANDWIDTH_DURATION = 2.0 39 40 41def task_pidigits(): 42 """Pi calculation (Python)""" 43 _map = map 44 _count = itertools.count 45 _islice = itertools.islice 46 47 def calc_ndigits(n): 48 # From http://shootout.alioth.debian.org/ 49 def gen_x(): 50 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) 51 52 def compose(a, b): 53 aq, ar, as_, at = a 54 bq, br, bs, bt = b 55 return (aq * bq, 56 aq * br + ar * bt, 57 as_ * bq + at * bs, 58 as_ * br + at * bt) 59 60 def extract(z, j): 61 q, r, s, t = z 62 return (q*j + r) // (s*j + t) 63 64 def pi_digits(): 65 z = (1, 0, 0, 1) 66 x = gen_x() 67 while 1: 68 y = extract(z, 3) 69 while y != extract(z, 4): 70 z = compose(z, next(x)) 71 y = extract(z, 3) 72 z = compose((10, -10*y, 0, 1), z) 73 yield y 74 75 return list(_islice(pi_digits(), n)) 76 77 return calc_ndigits, (50, ) 78 79def task_regex(): 80 """regular expression (C)""" 81 # XXX this task gives horrendous latency results. 82 import re 83 # Taken from the `inspect` module 84 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE) 85 with open(__file__, "r") as f: 86 arg = f.read(2000) 87 88 def findall(s): 89 t = time.time() 90 try: 91 return pat.findall(s) 92 finally: 93 print(time.time() - t) 94 return pat.findall, (arg, ) 95 96def task_sort(): 97 """list sorting (C)""" 98 def list_sort(l): 99 l = l[::-1] 100 l.sort() 101 102 return list_sort, (list(range(1000)), ) 103 104def task_compress_zlib(): 105 """zlib compression (C)""" 106 import zlib 107 with open(__file__, "rb") as f: 108 arg = f.read(5000) * 3 109 110 def compress(s): 111 zlib.decompress(zlib.compress(s, 5)) 112 return compress, (arg, ) 113 114def task_compress_bz2(): 115 """bz2 compression (C)""" 116 import bz2 117 with open(__file__, "rb") as f: 118 arg = f.read(3000) * 2 119 120 def compress(s): 121 bz2.compress(s) 122 return compress, (arg, ) 123 124def task_hashing(): 125 """SHA1 hashing (C)""" 126 import hashlib 127 with open(__file__, "rb") as f: 128 arg = f.read(5000) * 30 129 130 def compute(s): 131 hashlib.sha1(s).digest() 132 return compute, (arg, ) 133 134 135throughput_tasks = [task_pidigits, task_regex] 136for mod in 'bz2', 'hashlib': 137 try: 138 globals()[mod] = __import__(mod) 139 except ImportError: 140 globals()[mod] = None 141 142# For whatever reasons, zlib gives irregular results, so we prefer bz2 or 143# hashlib if available. 144# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards) 145if bz2 is not None: 146 throughput_tasks.append(task_compress_bz2) 147elif hashlib is not None: 148 throughput_tasks.append(task_hashing) 149else: 150 throughput_tasks.append(task_compress_zlib) 151 152latency_tasks = throughput_tasks 153bandwidth_tasks = [task_pidigits] 154 155 156class TimedLoop: 157 def __init__(self, func, args): 158 self.func = func 159 self.args = args 160 161 def __call__(self, start_time, min_duration, end_event, do_yield=False): 162 step = 20 163 niters = 0 164 duration = 0.0 165 _time = time.time 166 _sleep = time.sleep 167 _func = self.func 168 _args = self.args 169 t1 = start_time 170 while True: 171 for i in range(step): 172 _func(*_args) 173 t2 = _time() 174 # If another thread terminated, the current measurement is invalid 175 # => return the previous one. 176 if end_event: 177 return niters, duration 178 niters += step 179 duration = t2 - start_time 180 if duration >= min_duration: 181 end_event.append(None) 182 return niters, duration 183 if t2 - t1 < 0.01: 184 # Minimize interference of measurement on overall runtime 185 step = step * 3 // 2 186 elif do_yield: 187 # OS scheduling of Python threads is sometimes so bad that we 188 # have to force thread switching ourselves, otherwise we get 189 # completely useless results. 190 _sleep(0.0001) 191 t1 = t2 192 193 194def run_throughput_test(func, args, nthreads): 195 assert nthreads >= 1 196 197 # Warm up 198 func(*args) 199 200 results = [] 201 loop = TimedLoop(func, args) 202 end_event = [] 203 204 if nthreads == 1: 205 # Pure single-threaded performance, without any switching or 206 # synchronization overhead. 207 start_time = time.time() 208 results.append(loop(start_time, THROUGHPUT_DURATION, 209 end_event, do_yield=False)) 210 return results 211 212 started = False 213 ready_cond = threading.Condition() 214 start_cond = threading.Condition() 215 ready = [] 216 217 def run(): 218 with ready_cond: 219 ready.append(None) 220 ready_cond.notify() 221 with start_cond: 222 while not started: 223 start_cond.wait() 224 results.append(loop(start_time, THROUGHPUT_DURATION, 225 end_event, do_yield=True)) 226 227 threads = [] 228 for i in range(nthreads): 229 threads.append(threading.Thread(target=run)) 230 for t in threads: 231 t.setDaemon(True) 232 t.start() 233 # We don't want measurements to include thread startup overhead, 234 # so we arrange for timing to start after all threads are ready. 235 with ready_cond: 236 while len(ready) < nthreads: 237 ready_cond.wait() 238 with start_cond: 239 start_time = time.time() 240 started = True 241 start_cond.notify(nthreads) 242 for t in threads: 243 t.join() 244 245 return results 246 247def run_throughput_tests(max_threads): 248 for task in throughput_tasks: 249 print(task.__doc__) 250 print() 251 func, args = task() 252 nthreads = 1 253 baseline_speed = None 254 while nthreads <= max_threads: 255 results = run_throughput_test(func, args, nthreads) 256 # Taking the max duration rather than average gives pessimistic 257 # results rather than optimistic. 258 speed = sum(r[0] for r in results) / max(r[1] for r in results) 259 print("threads=%d: %d" % (nthreads, speed), end="") 260 if baseline_speed is None: 261 print(" iterations/s.") 262 baseline_speed = speed 263 else: 264 print(" ( %d %%)" % (speed / baseline_speed * 100)) 265 nthreads += 1 266 print() 267 268 269LAT_END = "END" 270 271def _sendto(sock, s, addr): 272 sock.sendto(s.encode('ascii'), addr) 273 274def _recv(sock, n): 275 return sock.recv(n).decode('ascii') 276 277def latency_client(addr, nb_pings, interval): 278 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 279 try: 280 _time = time.time 281 _sleep = time.sleep 282 def _ping(): 283 _sendto(sock, "%r\n" % _time(), addr) 284 # The first ping signals the parent process that we are ready. 285 _ping() 286 # We give the parent a bit of time to notice. 287 _sleep(1.0) 288 for i in range(nb_pings): 289 _sleep(interval) 290 _ping() 291 _sendto(sock, LAT_END + "\n", addr) 292 finally: 293 sock.close() 294 295def run_latency_client(**kwargs): 296 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] 297 cmd_line.extend(['--latclient', repr(kwargs)]) 298 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, 299 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 300 301def run_latency_test(func, args, nthreads): 302 # Create a listening socket to receive the pings. We use UDP which should 303 # be painlessly cross-platform. 304 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 305 sock.bind(("127.0.0.1", 0)) 306 addr = sock.getsockname() 307 308 interval = LATENCY_PING_INTERVAL 309 duration = LATENCY_DURATION 310 nb_pings = int(duration / interval) 311 312 results = [] 313 threads = [] 314 end_event = [] 315 start_cond = threading.Condition() 316 started = False 317 if nthreads > 0: 318 # Warm up 319 func(*args) 320 321 results = [] 322 loop = TimedLoop(func, args) 323 ready = [] 324 ready_cond = threading.Condition() 325 326 def run(): 327 with ready_cond: 328 ready.append(None) 329 ready_cond.notify() 330 with start_cond: 331 while not started: 332 start_cond.wait() 333 loop(start_time, duration * 1.5, end_event, do_yield=False) 334 335 for i in range(nthreads): 336 threads.append(threading.Thread(target=run)) 337 for t in threads: 338 t.setDaemon(True) 339 t.start() 340 # Wait for threads to be ready 341 with ready_cond: 342 while len(ready) < nthreads: 343 ready_cond.wait() 344 345 # Run the client and wait for the first ping(s) to arrive before 346 # unblocking the background threads. 347 chunks = [] 348 process = run_latency_client(addr=sock.getsockname(), 349 nb_pings=nb_pings, interval=interval) 350 s = _recv(sock, 4096) 351 _time = time.time 352 353 with start_cond: 354 start_time = _time() 355 started = True 356 start_cond.notify(nthreads) 357 358 while LAT_END not in s: 359 s = _recv(sock, 4096) 360 t = _time() 361 chunks.append((t, s)) 362 363 # Tell the background threads to stop. 364 end_event.append(None) 365 for t in threads: 366 t.join() 367 process.wait() 368 sock.close() 369 370 for recv_time, chunk in chunks: 371 # NOTE: it is assumed that a line sent by a client wasn't received 372 # in two chunks because the lines are very small. 373 for line in chunk.splitlines(): 374 line = line.strip() 375 if line and line != LAT_END: 376 send_time = eval(line) 377 assert isinstance(send_time, float) 378 results.append((send_time, recv_time)) 379 380 return results 381 382def run_latency_tests(max_threads): 383 for task in latency_tasks: 384 print("Background CPU task:", task.__doc__) 385 print() 386 func, args = task() 387 nthreads = 0 388 while nthreads <= max_threads: 389 results = run_latency_test(func, args, nthreads) 390 n = len(results) 391 # We print out milliseconds 392 lats = [1000 * (t2 - t1) for (t1, t2) in results] 393 #print(list(map(int, lats))) 394 avg = sum(lats) / n 395 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 396 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") 397 print() 398 #print(" [... from %d samples]" % n) 399 nthreads += 1 400 print() 401 402 403BW_END = "END" 404 405def bandwidth_client(addr, packet_size, duration): 406 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 407 sock.bind(("127.0.0.1", 0)) 408 local_addr = sock.getsockname() 409 _time = time.time 410 _sleep = time.sleep 411 def _send_chunk(msg): 412 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr) 413 # We give the parent some time to be ready. 414 _sleep(1.0) 415 try: 416 start_time = _time() 417 end_time = start_time + duration * 2.0 418 i = 0 419 while _time() < end_time: 420 _send_chunk(str(i)) 421 s = _recv(sock, packet_size) 422 assert len(s) == packet_size 423 i += 1 424 _send_chunk(BW_END) 425 finally: 426 sock.close() 427 428def run_bandwidth_client(**kwargs): 429 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] 430 cmd_line.extend(['--bwclient', repr(kwargs)]) 431 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, 432 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 433 434def run_bandwidth_test(func, args, nthreads): 435 # Create a listening socket to receive the packets. We use UDP which should 436 # be painlessly cross-platform. 437 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: 438 sock.bind(("127.0.0.1", 0)) 439 addr = sock.getsockname() 440 441 duration = BANDWIDTH_DURATION 442 packet_size = BANDWIDTH_PACKET_SIZE 443 444 results = [] 445 threads = [] 446 end_event = [] 447 start_cond = threading.Condition() 448 started = False 449 if nthreads > 0: 450 # Warm up 451 func(*args) 452 453 results = [] 454 loop = TimedLoop(func, args) 455 ready = [] 456 ready_cond = threading.Condition() 457 458 def run(): 459 with ready_cond: 460 ready.append(None) 461 ready_cond.notify() 462 with start_cond: 463 while not started: 464 start_cond.wait() 465 loop(start_time, duration * 1.5, end_event, do_yield=False) 466 467 for i in range(nthreads): 468 threads.append(threading.Thread(target=run)) 469 for t in threads: 470 t.setDaemon(True) 471 t.start() 472 # Wait for threads to be ready 473 with ready_cond: 474 while len(ready) < nthreads: 475 ready_cond.wait() 476 477 # Run the client and wait for the first packet to arrive before 478 # unblocking the background threads. 479 process = run_bandwidth_client(addr=addr, 480 packet_size=packet_size, 481 duration=duration) 482 _time = time.time 483 # This will also wait for the parent to be ready 484 s = _recv(sock, packet_size) 485 remote_addr = eval(s.partition('#')[0]) 486 487 with start_cond: 488 start_time = _time() 489 started = True 490 start_cond.notify(nthreads) 491 492 n = 0 493 first_time = None 494 while not end_event and BW_END not in s: 495 _sendto(sock, s, remote_addr) 496 s = _recv(sock, packet_size) 497 if first_time is None: 498 first_time = _time() 499 n += 1 500 end_time = _time() 501 502 end_event.append(None) 503 for t in threads: 504 t.join() 505 process.kill() 506 507 return (n - 1) / (end_time - first_time) 508 509def run_bandwidth_tests(max_threads): 510 for task in bandwidth_tasks: 511 print("Background CPU task:", task.__doc__) 512 print() 513 func, args = task() 514 nthreads = 0 515 baseline_speed = None 516 while nthreads <= max_threads: 517 results = run_bandwidth_test(func, args, nthreads) 518 speed = results 519 #speed = len(results) * 1.0 / results[-1][0] 520 print("CPU threads=%d: %.1f" % (nthreads, speed), end="") 521 if baseline_speed is None: 522 print(" packets/s.") 523 baseline_speed = speed 524 else: 525 print(" ( %d %%)" % (speed / baseline_speed * 100)) 526 nthreads += 1 527 print() 528 529 530def main(): 531 usage = "usage: %prog [-h|--help] [options]" 532 parser = OptionParser(usage=usage) 533 parser.add_option("-t", "--throughput", 534 action="store_true", dest="throughput", default=False, 535 help="run throughput tests") 536 parser.add_option("-l", "--latency", 537 action="store_true", dest="latency", default=False, 538 help="run latency tests") 539 parser.add_option("-b", "--bandwidth", 540 action="store_true", dest="bandwidth", default=False, 541 help="run I/O bandwidth tests") 542 parser.add_option("-i", "--interval", 543 action="store", type="int", dest="check_interval", default=None, 544 help="sys.setcheckinterval() value") 545 parser.add_option("-I", "--switch-interval", 546 action="store", type="float", dest="switch_interval", default=None, 547 help="sys.setswitchinterval() value") 548 parser.add_option("-n", "--num-threads", 549 action="store", type="int", dest="nthreads", default=4, 550 help="max number of threads in tests") 551 552 # Hidden option to run the pinging and bandwidth clients 553 parser.add_option("", "--latclient", 554 action="store", dest="latclient", default=None, 555 help=SUPPRESS_HELP) 556 parser.add_option("", "--bwclient", 557 action="store", dest="bwclient", default=None, 558 help=SUPPRESS_HELP) 559 560 options, args = parser.parse_args() 561 if args: 562 parser.error("unexpected arguments") 563 564 if options.latclient: 565 kwargs = eval(options.latclient) 566 latency_client(**kwargs) 567 return 568 569 if options.bwclient: 570 kwargs = eval(options.bwclient) 571 bandwidth_client(**kwargs) 572 return 573 574 if not options.throughput and not options.latency and not options.bandwidth: 575 options.throughput = options.latency = options.bandwidth = True 576 if options.check_interval: 577 sys.setcheckinterval(options.check_interval) 578 if options.switch_interval: 579 sys.setswitchinterval(options.switch_interval) 580 581 print("== %s %s (%s) ==" % ( 582 platform.python_implementation(), 583 platform.python_version(), 584 platform.python_build()[0], 585 )) 586 # Processor identification often has repeated spaces 587 cpu = ' '.join(platform.processor().split()) 588 print("== %s %s on '%s' ==" % ( 589 platform.machine(), 590 platform.system(), 591 cpu, 592 )) 593 print() 594 595 if options.throughput: 596 print("--- Throughput ---") 597 print() 598 run_throughput_tests(options.nthreads) 599 600 if options.latency: 601 print("--- Latency ---") 602 print() 603 run_latency_tests(options.nthreads) 604 605 if options.bandwidth: 606 print("--- I/O bandwidth ---") 607 print() 608 run_bandwidth_tests(options.nthreads) 609 610if __name__ == "__main__": 611 main() 612