1# -*- coding: utf-8 -*- 2# This file should be kept compatible with both Python 2.6 and Python >= 3.0. 3 4from __future__ import division 5from __future__ import print_function 6 7""" 8ccbench, a Python concurrency benchmark. 9""" 10 11import time 12import os 13import sys 14import functools 15import itertools 16import threading 17import subprocess 18import socket 19from optparse import OptionParser, SUPPRESS_HELP 20import platform 21 22# Compatibility 23try: 24 xrange 25except NameError: 26 xrange = range 27 28try: 29 map = itertools.imap 30except AttributeError: 31 pass 32 33 34THROUGHPUT_DURATION = 2.0 35 36LATENCY_PING_INTERVAL = 0.1 37LATENCY_DURATION = 2.0 38 39BANDWIDTH_PACKET_SIZE = 1024 40BANDWIDTH_DURATION = 2.0 41 42 43def task_pidigits(): 44 """Pi calculation (Python)""" 45 _map = map 46 _count = itertools.count 47 _islice = itertools.islice 48 49 def calc_ndigits(n): 50 # From http://shootout.alioth.debian.org/ 51 def gen_x(): 52 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) 53 54 def compose(a, b): 55 aq, ar, as_, at = a 56 bq, br, bs, bt = b 57 return (aq * bq, 58 aq * br + ar * bt, 59 as_ * bq + at * bs, 60 as_ * br + at * bt) 61 62 def extract(z, j): 63 q, r, s, t = z 64 return (q*j + r) // (s*j + t) 65 66 def pi_digits(): 67 z = (1, 0, 0, 1) 68 x = gen_x() 69 while 1: 70 y = extract(z, 3) 71 while y != extract(z, 4): 72 z = compose(z, next(x)) 73 y = extract(z, 3) 74 z = compose((10, -10*y, 0, 1), z) 75 yield y 76 77 return list(_islice(pi_digits(), n)) 78 79 return calc_ndigits, (50, ) 80 81def task_regex(): 82 """regular expression (C)""" 83 # XXX this task gives horrendous latency results. 84 import re 85 # Taken from the `inspect` module 86 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE) 87 with open(__file__, "r") as f: 88 arg = f.read(2000) 89 90 def findall(s): 91 t = time.time() 92 try: 93 return pat.findall(s) 94 finally: 95 print(time.time() - t) 96 return pat.findall, (arg, ) 97 98def task_sort(): 99 """list sorting (C)""" 100 def list_sort(l): 101 l = l[::-1] 102 l.sort() 103 104 return list_sort, (list(range(1000)), ) 105 106def task_compress_zlib(): 107 """zlib compression (C)""" 108 import zlib 109 with open(__file__, "rb") as f: 110 arg = f.read(5000) * 3 111 112 def compress(s): 113 zlib.decompress(zlib.compress(s, 5)) 114 return compress, (arg, ) 115 116def task_compress_bz2(): 117 """bz2 compression (C)""" 118 import bz2 119 with open(__file__, "rb") as f: 120 arg = f.read(3000) * 2 121 122 def compress(s): 123 bz2.compress(s) 124 return compress, (arg, ) 125 126def task_hashing(): 127 """SHA1 hashing (C)""" 128 import hashlib 129 with open(__file__, "rb") as f: 130 arg = f.read(5000) * 30 131 132 def compute(s): 133 hashlib.sha1(s).digest() 134 return compute, (arg, ) 135 136 137throughput_tasks = [task_pidigits, task_regex] 138for mod in 'bz2', 'hashlib': 139 try: 140 globals()[mod] = __import__(mod) 141 except ImportError: 142 globals()[mod] = None 143 144# For whatever reasons, zlib gives irregular results, so we prefer bz2 or 145# hashlib if available. 146# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards) 147if bz2 is not None: 148 throughput_tasks.append(task_compress_bz2) 149elif hashlib is not None: 150 throughput_tasks.append(task_hashing) 151else: 152 throughput_tasks.append(task_compress_zlib) 153 154latency_tasks = throughput_tasks 155bandwidth_tasks = [task_pidigits] 156 157 158class TimedLoop: 159 def __init__(self, func, args): 160 self.func = func 161 self.args = args 162 163 def __call__(self, start_time, min_duration, end_event, do_yield=False): 164 step = 20 165 niters = 0 166 duration = 0.0 167 _time = time.time 168 _sleep = time.sleep 169 _func = self.func 170 _args = self.args 171 t1 = start_time 172 while True: 173 for i in range(step): 174 _func(*_args) 175 t2 = _time() 176 # If another thread terminated, the current measurement is invalid 177 # => return the previous one. 178 if end_event: 179 return niters, duration 180 niters += step 181 duration = t2 - start_time 182 if duration >= min_duration: 183 end_event.append(None) 184 return niters, duration 185 if t2 - t1 < 0.01: 186 # Minimize interference of measurement on overall runtime 187 step = step * 3 // 2 188 elif do_yield: 189 # OS scheduling of Python threads is sometimes so bad that we 190 # have to force thread switching ourselves, otherwise we get 191 # completely useless results. 192 _sleep(0.0001) 193 t1 = t2 194 195 196def run_throughput_test(func, args, nthreads): 197 assert nthreads >= 1 198 199 # Warm up 200 func(*args) 201 202 results = [] 203 loop = TimedLoop(func, args) 204 end_event = [] 205 206 if nthreads == 1: 207 # Pure single-threaded performance, without any switching or 208 # synchronization overhead. 209 start_time = time.time() 210 results.append(loop(start_time, THROUGHPUT_DURATION, 211 end_event, do_yield=False)) 212 return results 213 214 started = False 215 ready_cond = threading.Condition() 216 start_cond = threading.Condition() 217 ready = [] 218 219 def run(): 220 with ready_cond: 221 ready.append(None) 222 ready_cond.notify() 223 with start_cond: 224 while not started: 225 start_cond.wait() 226 results.append(loop(start_time, THROUGHPUT_DURATION, 227 end_event, do_yield=True)) 228 229 threads = [] 230 for i in range(nthreads): 231 threads.append(threading.Thread(target=run)) 232 for t in threads: 233 t.setDaemon(True) 234 t.start() 235 # We don't want measurements to include thread startup overhead, 236 # so we arrange for timing to start after all threads are ready. 237 with ready_cond: 238 while len(ready) < nthreads: 239 ready_cond.wait() 240 with start_cond: 241 start_time = time.time() 242 started = True 243 start_cond.notify(nthreads) 244 for t in threads: 245 t.join() 246 247 return results 248 249def run_throughput_tests(max_threads): 250 for task in throughput_tasks: 251 print(task.__doc__) 252 print() 253 func, args = task() 254 nthreads = 1 255 baseline_speed = None 256 while nthreads <= max_threads: 257 results = run_throughput_test(func, args, nthreads) 258 # Taking the max duration rather than average gives pessimistic 259 # results rather than optimistic. 260 speed = sum(r[0] for r in results) / max(r[1] for r in results) 261 print("threads=%d: %d" % (nthreads, speed), end="") 262 if baseline_speed is None: 263 print(" iterations/s.") 264 baseline_speed = speed 265 else: 266 print(" ( %d %%)" % (speed / baseline_speed * 100)) 267 nthreads += 1 268 print() 269 270 271LAT_END = "END" 272 273def _sendto(sock, s, addr): 274 sock.sendto(s.encode('ascii'), addr) 275 276def _recv(sock, n): 277 return sock.recv(n).decode('ascii') 278 279def latency_client(addr, nb_pings, interval): 280 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 281 _time = time.time 282 _sleep = time.sleep 283 def _ping(): 284 _sendto(sock, "%r\n" % _time(), addr) 285 # The first ping signals the parent process that we are ready. 286 _ping() 287 # We give the parent a bit of time to notice. 288 _sleep(1.0) 289 for i in range(nb_pings): 290 _sleep(interval) 291 _ping() 292 _sendto(sock, LAT_END + "\n", addr) 293 294def run_latency_client(**kwargs): 295 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] 296 cmd_line.extend(['--latclient', repr(kwargs)]) 297 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, 298 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 299 300def run_latency_test(func, args, nthreads): 301 # Create a listening socket to receive the pings. We use UDP which should 302 # be painlessly cross-platform. 303 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 304 sock.bind(("127.0.0.1", 0)) 305 addr = sock.getsockname() 306 307 interval = LATENCY_PING_INTERVAL 308 duration = LATENCY_DURATION 309 nb_pings = int(duration / interval) 310 311 results = [] 312 threads = [] 313 end_event = [] 314 start_cond = threading.Condition() 315 started = False 316 if nthreads > 0: 317 # Warm up 318 func(*args) 319 320 results = [] 321 loop = TimedLoop(func, args) 322 ready = [] 323 ready_cond = threading.Condition() 324 325 def run(): 326 with ready_cond: 327 ready.append(None) 328 ready_cond.notify() 329 with start_cond: 330 while not started: 331 start_cond.wait() 332 loop(start_time, duration * 1.5, end_event, do_yield=False) 333 334 for i in range(nthreads): 335 threads.append(threading.Thread(target=run)) 336 for t in threads: 337 t.setDaemon(True) 338 t.start() 339 # Wait for threads to be ready 340 with ready_cond: 341 while len(ready) < nthreads: 342 ready_cond.wait() 343 344 # Run the client and wait for the first ping(s) to arrive before 345 # unblocking the background threads. 346 chunks = [] 347 process = run_latency_client(addr=sock.getsockname(), 348 nb_pings=nb_pings, interval=interval) 349 s = _recv(sock, 4096) 350 _time = time.time 351 352 with start_cond: 353 start_time = _time() 354 started = True 355 start_cond.notify(nthreads) 356 357 while LAT_END not in s: 358 s = _recv(sock, 4096) 359 t = _time() 360 chunks.append((t, s)) 361 362 # Tell the background threads to stop. 363 end_event.append(None) 364 for t in threads: 365 t.join() 366 process.wait() 367 368 for recv_time, chunk in chunks: 369 # NOTE: it is assumed that a line sent by a client wasn't received 370 # in two chunks because the lines are very small. 371 for line in chunk.splitlines(): 372 line = line.strip() 373 if line and line != LAT_END: 374 send_time = eval(line) 375 assert isinstance(send_time, float) 376 results.append((send_time, recv_time)) 377 378 return results 379 380def run_latency_tests(max_threads): 381 for task in latency_tasks: 382 print("Background CPU task:", task.__doc__) 383 print() 384 func, args = task() 385 nthreads = 0 386 while nthreads <= max_threads: 387 results = run_latency_test(func, args, nthreads) 388 n = len(results) 389 # We print out milliseconds 390 lats = [1000 * (t2 - t1) for (t1, t2) in results] 391 #print(list(map(int, lats))) 392 avg = sum(lats) / n 393 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 394 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") 395 print() 396 #print(" [... from %d samples]" % n) 397 nthreads += 1 398 print() 399 400 401BW_END = "END" 402 403def bandwidth_client(addr, packet_size, duration): 404 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 405 sock.bind(("127.0.0.1", 0)) 406 local_addr = sock.getsockname() 407 _time = time.time 408 _sleep = time.sleep 409 def _send_chunk(msg): 410 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr) 411 # We give the parent some time to be ready. 412 _sleep(1.0) 413 try: 414 start_time = _time() 415 end_time = start_time + duration * 2.0 416 i = 0 417 while _time() < end_time: 418 _send_chunk(str(i)) 419 s = _recv(sock, packet_size) 420 assert len(s) == packet_size 421 i += 1 422 _send_chunk(BW_END) 423 finally: 424 sock.close() 425 426def run_bandwidth_client(**kwargs): 427 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] 428 cmd_line.extend(['--bwclient', repr(kwargs)]) 429 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, 430 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 431 432def run_bandwidth_test(func, args, nthreads): 433 # Create a listening socket to receive the packets. We use UDP which should 434 # be painlessly cross-platform. 435 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 436 sock.bind(("127.0.0.1", 0)) 437 addr = sock.getsockname() 438 439 duration = BANDWIDTH_DURATION 440 packet_size = BANDWIDTH_PACKET_SIZE 441 442 results = [] 443 threads = [] 444 end_event = [] 445 start_cond = threading.Condition() 446 started = False 447 if nthreads > 0: 448 # Warm up 449 func(*args) 450 451 results = [] 452 loop = TimedLoop(func, args) 453 ready = [] 454 ready_cond = threading.Condition() 455 456 def run(): 457 with ready_cond: 458 ready.append(None) 459 ready_cond.notify() 460 with start_cond: 461 while not started: 462 start_cond.wait() 463 loop(start_time, duration * 1.5, end_event, do_yield=False) 464 465 for i in range(nthreads): 466 threads.append(threading.Thread(target=run)) 467 for t in threads: 468 t.setDaemon(True) 469 t.start() 470 # Wait for threads to be ready 471 with ready_cond: 472 while len(ready) < nthreads: 473 ready_cond.wait() 474 475 # Run the client and wait for the first packet to arrive before 476 # unblocking the background threads. 477 process = run_bandwidth_client(addr=addr, 478 packet_size=packet_size, 479 duration=duration) 480 _time = time.time 481 # This will also wait for the parent to be ready 482 s = _recv(sock, packet_size) 483 remote_addr = eval(s.partition('#')[0]) 484 485 with start_cond: 486 start_time = _time() 487 started = True 488 start_cond.notify(nthreads) 489 490 n = 0 491 first_time = None 492 while not end_event and BW_END not in s: 493 _sendto(sock, s, remote_addr) 494 s = _recv(sock, packet_size) 495 if first_time is None: 496 first_time = _time() 497 n += 1 498 end_time = _time() 499 500 end_event.append(None) 501 for t in threads: 502 t.join() 503 process.kill() 504 505 return (n - 1) / (end_time - first_time) 506 507def run_bandwidth_tests(max_threads): 508 for task in bandwidth_tasks: 509 print("Background CPU task:", task.__doc__) 510 print() 511 func, args = task() 512 nthreads = 0 513 baseline_speed = None 514 while nthreads <= max_threads: 515 results = run_bandwidth_test(func, args, nthreads) 516 speed = results 517 #speed = len(results) * 1.0 / results[-1][0] 518 print("CPU threads=%d: %.1f" % (nthreads, speed), end="") 519 if baseline_speed is None: 520 print(" packets/s.") 521 baseline_speed = speed 522 else: 523 print(" ( %d %%)" % (speed / baseline_speed * 100)) 524 nthreads += 1 525 print() 526 527 528def main(): 529 usage = "usage: %prog [-h|--help] [options]" 530 parser = OptionParser(usage=usage) 531 parser.add_option("-t", "--throughput", 532 action="store_true", dest="throughput", default=False, 533 help="run throughput tests") 534 parser.add_option("-l", "--latency", 535 action="store_true", dest="latency", default=False, 536 help="run latency tests") 537 parser.add_option("-b", "--bandwidth", 538 action="store_true", dest="bandwidth", default=False, 539 help="run I/O bandwidth tests") 540 parser.add_option("-i", "--interval", 541 action="store", type="int", dest="check_interval", default=None, 542 help="sys.setcheckinterval() value") 543 parser.add_option("-I", "--switch-interval", 544 action="store", type="float", dest="switch_interval", default=None, 545 help="sys.setswitchinterval() value") 546 parser.add_option("-n", "--num-threads", 547 action="store", type="int", dest="nthreads", default=4, 548 help="max number of threads in tests") 549 550 # Hidden option to run the pinging and bandwidth clients 551 parser.add_option("", "--latclient", 552 action="store", dest="latclient", default=None, 553 help=SUPPRESS_HELP) 554 parser.add_option("", "--bwclient", 555 action="store", dest="bwclient", default=None, 556 help=SUPPRESS_HELP) 557 558 options, args = parser.parse_args() 559 if args: 560 parser.error("unexpected arguments") 561 562 if options.latclient: 563 kwargs = eval(options.latclient) 564 latency_client(**kwargs) 565 return 566 567 if options.bwclient: 568 kwargs = eval(options.bwclient) 569 bandwidth_client(**kwargs) 570 return 571 572 if not options.throughput and not options.latency and not options.bandwidth: 573 options.throughput = options.latency = options.bandwidth = True 574 if options.check_interval: 575 sys.setcheckinterval(options.check_interval) 576 if options.switch_interval: 577 sys.setswitchinterval(options.switch_interval) 578 579 print("== %s %s (%s) ==" % ( 580 platform.python_implementation(), 581 platform.python_version(), 582 platform.python_build()[0], 583 )) 584 # Processor identification often has repeated spaces 585 cpu = ' '.join(platform.processor().split()) 586 print("== %s %s on '%s' ==" % ( 587 platform.machine(), 588 platform.system(), 589 cpu, 590 )) 591 print() 592 593 if options.throughput: 594 print("--- Throughput ---") 595 print() 596 run_throughput_tests(options.nthreads) 597 598 if options.latency: 599 print("--- Latency ---") 600 print() 601 run_latency_tests(options.nthreads) 602 603 if options.bandwidth: 604 print("--- I/O bandwidth ---") 605 print() 606 run_bandwidth_tests(options.nthreads) 607 608if __name__ == "__main__": 609 main() 610