1# This file should be kept compatible with both Python 2.6 and Python >= 3.0. 2 3from __future__ import division 4from __future__ import print_function 5 6""" 7ccbench, a Python concurrency benchmark. 8""" 9 10import time 11import os 12import sys 13import itertools 14import threading 15import subprocess 16import socket 17from optparse import OptionParser, SUPPRESS_HELP 18import platform 19 20# Compatibility 21try: 22 xrange 23except NameError: 24 xrange = range 25 26try: 27 map = itertools.imap 28except AttributeError: 29 pass 30 31 32THROUGHPUT_DURATION = 2.0 33 34LATENCY_PING_INTERVAL = 0.1 35LATENCY_DURATION = 2.0 36 37BANDWIDTH_PACKET_SIZE = 1024 38BANDWIDTH_DURATION = 2.0 39 40 41def task_pidigits(): 42 """Pi calculation (Python)""" 43 _map = map 44 _count = itertools.count 45 _islice = itertools.islice 46 47 def calc_ndigits(n): 48 # From http://shootout.alioth.debian.org/ 49 def gen_x(): 50 return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1)) 51 52 def compose(a, b): 53 aq, ar, as_, at = a 54 bq, br, bs, bt = b 55 return (aq * bq, 56 aq * br + ar * bt, 57 as_ * bq + at * bs, 58 as_ * br + at * bt) 59 60 def extract(z, j): 61 q, r, s, t = z 62 return (q*j + r) // (s*j + t) 63 64 def pi_digits(): 65 z = (1, 0, 0, 1) 66 x = gen_x() 67 while 1: 68 y = extract(z, 3) 69 while y != extract(z, 4): 70 z = compose(z, next(x)) 71 y = extract(z, 3) 72 z = compose((10, -10*y, 0, 1), z) 73 yield y 74 75 return list(_islice(pi_digits(), n)) 76 77 return calc_ndigits, (50, ) 78 79def task_regex(): 80 """regular expression (C)""" 81 # XXX this task gives horrendous latency results. 82 import re 83 # Taken from the `inspect` module 84 pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE) 85 with open(__file__, "r") as f: 86 arg = f.read(2000) 87 return pat.findall, (arg, ) 88 89def task_sort(): 90 """list sorting (C)""" 91 def list_sort(l): 92 l = l[::-1] 93 l.sort() 94 95 return list_sort, (list(range(1000)), ) 96 97def task_compress_zlib(): 98 """zlib compression (C)""" 99 import zlib 100 with open(__file__, "rb") as f: 101 arg = f.read(5000) * 3 102 103 def compress(s): 104 zlib.decompress(zlib.compress(s, 5)) 105 return compress, (arg, ) 106 107def task_compress_bz2(): 108 """bz2 compression (C)""" 109 import bz2 110 with open(__file__, "rb") as f: 111 arg = f.read(3000) * 2 112 113 def compress(s): 114 bz2.compress(s) 115 return compress, (arg, ) 116 117def task_hashing(): 118 """SHA1 hashing (C)""" 119 import hashlib 120 with open(__file__, "rb") as f: 121 arg = f.read(5000) * 30 122 123 def compute(s): 124 hashlib.sha1(s).digest() 125 return compute, (arg, ) 126 127 128throughput_tasks = [task_pidigits, task_regex] 129for mod in 'bz2', 'hashlib': 130 try: 131 globals()[mod] = __import__(mod) 132 except ImportError: 133 globals()[mod] = None 134 135# For whatever reasons, zlib gives irregular results, so we prefer bz2 or 136# hashlib if available. 137# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards) 138if bz2 is not None: 139 throughput_tasks.append(task_compress_bz2) 140elif hashlib is not None: 141 throughput_tasks.append(task_hashing) 142else: 143 throughput_tasks.append(task_compress_zlib) 144 145latency_tasks = throughput_tasks 146bandwidth_tasks = [task_pidigits] 147 148 149class TimedLoop: 150 def __init__(self, func, args): 151 self.func = func 152 self.args = args 153 154 def __call__(self, start_time, min_duration, end_event, do_yield=False): 155 step = 20 156 niters = 0 157 duration = 0.0 158 _time = time.time 159 _sleep = time.sleep 160 _func = self.func 161 _args = self.args 162 t1 = start_time 163 while True: 164 for i in range(step): 165 _func(*_args) 166 t2 = _time() 167 # If another thread terminated, the current measurement is invalid 168 # => return the previous one. 169 if end_event: 170 return niters, duration 171 niters += step 172 duration = t2 - start_time 173 if duration >= min_duration: 174 end_event.append(None) 175 return niters, duration 176 if t2 - t1 < 0.01: 177 # Minimize interference of measurement on overall runtime 178 step = step * 3 // 2 179 elif do_yield: 180 # OS scheduling of Python threads is sometimes so bad that we 181 # have to force thread switching ourselves, otherwise we get 182 # completely useless results. 183 _sleep(0.0001) 184 t1 = t2 185 186 187def run_throughput_test(func, args, nthreads): 188 assert nthreads >= 1 189 190 # Warm up 191 func(*args) 192 193 results = [] 194 loop = TimedLoop(func, args) 195 end_event = [] 196 197 if nthreads == 1: 198 # Pure single-threaded performance, without any switching or 199 # synchronization overhead. 200 start_time = time.time() 201 results.append(loop(start_time, THROUGHPUT_DURATION, 202 end_event, do_yield=False)) 203 return results 204 205 started = False 206 ready_cond = threading.Condition() 207 start_cond = threading.Condition() 208 ready = [] 209 210 def run(): 211 with ready_cond: 212 ready.append(None) 213 ready_cond.notify() 214 with start_cond: 215 while not started: 216 start_cond.wait() 217 results.append(loop(start_time, THROUGHPUT_DURATION, 218 end_event, do_yield=True)) 219 220 threads = [] 221 for i in range(nthreads): 222 threads.append(threading.Thread(target=run)) 223 for t in threads: 224 t.daemon = True 225 t.start() 226 # We don't want measurements to include thread startup overhead, 227 # so we arrange for timing to start after all threads are ready. 228 with ready_cond: 229 while len(ready) < nthreads: 230 ready_cond.wait() 231 with start_cond: 232 start_time = time.time() 233 started = True 234 start_cond.notify(nthreads) 235 for t in threads: 236 t.join() 237 238 return results 239 240def run_throughput_tests(max_threads): 241 for task in throughput_tasks: 242 print(task.__doc__) 243 print() 244 func, args = task() 245 nthreads = 1 246 baseline_speed = None 247 while nthreads <= max_threads: 248 results = run_throughput_test(func, args, nthreads) 249 # Taking the max duration rather than average gives pessimistic 250 # results rather than optimistic. 251 speed = sum(r[0] for r in results) / max(r[1] for r in results) 252 print("threads=%d: %d" % (nthreads, speed), end="") 253 if baseline_speed is None: 254 print(" iterations/s.") 255 baseline_speed = speed 256 else: 257 print(" ( %d %%)" % (speed / baseline_speed * 100)) 258 nthreads += 1 259 print() 260 261 262LAT_END = "END" 263 264def _sendto(sock, s, addr): 265 sock.sendto(s.encode('ascii'), addr) 266 267def _recv(sock, n): 268 return sock.recv(n).decode('ascii') 269 270def latency_client(addr, nb_pings, interval): 271 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 272 try: 273 _time = time.time 274 _sleep = time.sleep 275 def _ping(): 276 _sendto(sock, "%r\n" % _time(), addr) 277 # The first ping signals the parent process that we are ready. 278 _ping() 279 # We give the parent a bit of time to notice. 280 _sleep(1.0) 281 for i in range(nb_pings): 282 _sleep(interval) 283 _ping() 284 _sendto(sock, LAT_END + "\n", addr) 285 finally: 286 sock.close() 287 288def run_latency_client(**kwargs): 289 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] 290 cmd_line.extend(['--latclient', repr(kwargs)]) 291 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, 292 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 293 294def run_latency_test(func, args, nthreads): 295 # Create a listening socket to receive the pings. We use UDP which should 296 # be painlessly cross-platform. 297 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 298 sock.bind(("127.0.0.1", 0)) 299 addr = sock.getsockname() 300 301 interval = LATENCY_PING_INTERVAL 302 duration = LATENCY_DURATION 303 nb_pings = int(duration / interval) 304 305 results = [] 306 threads = [] 307 end_event = [] 308 start_cond = threading.Condition() 309 started = False 310 if nthreads > 0: 311 # Warm up 312 func(*args) 313 314 results = [] 315 loop = TimedLoop(func, args) 316 ready = [] 317 ready_cond = threading.Condition() 318 319 def run(): 320 with ready_cond: 321 ready.append(None) 322 ready_cond.notify() 323 with start_cond: 324 while not started: 325 start_cond.wait() 326 loop(start_time, duration * 1.5, end_event, do_yield=False) 327 328 for i in range(nthreads): 329 threads.append(threading.Thread(target=run)) 330 for t in threads: 331 t.daemon = True 332 t.start() 333 # Wait for threads to be ready 334 with ready_cond: 335 while len(ready) < nthreads: 336 ready_cond.wait() 337 338 # Run the client and wait for the first ping(s) to arrive before 339 # unblocking the background threads. 340 chunks = [] 341 process = run_latency_client(addr=sock.getsockname(), 342 nb_pings=nb_pings, interval=interval) 343 s = _recv(sock, 4096) 344 _time = time.time 345 346 with start_cond: 347 start_time = _time() 348 started = True 349 start_cond.notify(nthreads) 350 351 while LAT_END not in s: 352 s = _recv(sock, 4096) 353 t = _time() 354 chunks.append((t, s)) 355 356 # Tell the background threads to stop. 357 end_event.append(None) 358 for t in threads: 359 t.join() 360 process.wait() 361 sock.close() 362 363 for recv_time, chunk in chunks: 364 # NOTE: it is assumed that a line sent by a client wasn't received 365 # in two chunks because the lines are very small. 366 for line in chunk.splitlines(): 367 line = line.strip() 368 if line and line != LAT_END: 369 send_time = eval(line) 370 assert isinstance(send_time, float) 371 results.append((send_time, recv_time)) 372 373 return results 374 375def run_latency_tests(max_threads): 376 for task in latency_tasks: 377 print("Background CPU task:", task.__doc__) 378 print() 379 func, args = task() 380 nthreads = 0 381 while nthreads <= max_threads: 382 results = run_latency_test(func, args, nthreads) 383 n = len(results) 384 # We print out milliseconds 385 lats = [1000 * (t2 - t1) for (t1, t2) in results] 386 #print(list(map(int, lats))) 387 avg = sum(lats) / n 388 dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5 389 print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="") 390 print() 391 #print(" [... from %d samples]" % n) 392 nthreads += 1 393 print() 394 395 396BW_END = "END" 397 398def bandwidth_client(addr, packet_size, duration): 399 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 400 sock.bind(("127.0.0.1", 0)) 401 local_addr = sock.getsockname() 402 _time = time.time 403 _sleep = time.sleep 404 def _send_chunk(msg): 405 _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr) 406 # We give the parent some time to be ready. 407 _sleep(1.0) 408 try: 409 start_time = _time() 410 end_time = start_time + duration * 2.0 411 i = 0 412 while _time() < end_time: 413 _send_chunk(str(i)) 414 s = _recv(sock, packet_size) 415 assert len(s) == packet_size 416 i += 1 417 _send_chunk(BW_END) 418 finally: 419 sock.close() 420 421def run_bandwidth_client(**kwargs): 422 cmd_line = [sys.executable, '-E', os.path.abspath(__file__)] 423 cmd_line.extend(['--bwclient', repr(kwargs)]) 424 return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE, 425 #stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 426 427def run_bandwidth_test(func, args, nthreads): 428 # Create a listening socket to receive the packets. We use UDP which should 429 # be painlessly cross-platform. 430 with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: 431 sock.bind(("127.0.0.1", 0)) 432 addr = sock.getsockname() 433 434 duration = BANDWIDTH_DURATION 435 packet_size = BANDWIDTH_PACKET_SIZE 436 437 results = [] 438 threads = [] 439 end_event = [] 440 start_cond = threading.Condition() 441 started = False 442 if nthreads > 0: 443 # Warm up 444 func(*args) 445 446 results = [] 447 loop = TimedLoop(func, args) 448 ready = [] 449 ready_cond = threading.Condition() 450 451 def run(): 452 with ready_cond: 453 ready.append(None) 454 ready_cond.notify() 455 with start_cond: 456 while not started: 457 start_cond.wait() 458 loop(start_time, duration * 1.5, end_event, do_yield=False) 459 460 for i in range(nthreads): 461 threads.append(threading.Thread(target=run)) 462 for t in threads: 463 t.daemon = True 464 t.start() 465 # Wait for threads to be ready 466 with ready_cond: 467 while len(ready) < nthreads: 468 ready_cond.wait() 469 470 # Run the client and wait for the first packet to arrive before 471 # unblocking the background threads. 472 process = run_bandwidth_client(addr=addr, 473 packet_size=packet_size, 474 duration=duration) 475 _time = time.time 476 # This will also wait for the parent to be ready 477 s = _recv(sock, packet_size) 478 remote_addr = eval(s.partition('#')[0]) 479 480 with start_cond: 481 start_time = _time() 482 started = True 483 start_cond.notify(nthreads) 484 485 n = 0 486 first_time = None 487 while not end_event and BW_END not in s: 488 _sendto(sock, s, remote_addr) 489 s = _recv(sock, packet_size) 490 if first_time is None: 491 first_time = _time() 492 n += 1 493 end_time = _time() 494 495 end_event.append(None) 496 for t in threads: 497 t.join() 498 process.kill() 499 500 return (n - 1) / (end_time - first_time) 501 502def run_bandwidth_tests(max_threads): 503 for task in bandwidth_tasks: 504 print("Background CPU task:", task.__doc__) 505 print() 506 func, args = task() 507 nthreads = 0 508 baseline_speed = None 509 while nthreads <= max_threads: 510 results = run_bandwidth_test(func, args, nthreads) 511 speed = results 512 #speed = len(results) * 1.0 / results[-1][0] 513 print("CPU threads=%d: %.1f" % (nthreads, speed), end="") 514 if baseline_speed is None: 515 print(" packets/s.") 516 baseline_speed = speed 517 else: 518 print(" ( %d %%)" % (speed / baseline_speed * 100)) 519 nthreads += 1 520 print() 521 522 523def main(): 524 usage = "usage: %prog [-h|--help] [options]" 525 parser = OptionParser(usage=usage) 526 parser.add_option("-t", "--throughput", 527 action="store_true", dest="throughput", default=False, 528 help="run throughput tests") 529 parser.add_option("-l", "--latency", 530 action="store_true", dest="latency", default=False, 531 help="run latency tests") 532 parser.add_option("-b", "--bandwidth", 533 action="store_true", dest="bandwidth", default=False, 534 help="run I/O bandwidth tests") 535 parser.add_option("-i", "--interval", 536 action="store", type="int", dest="check_interval", default=None, 537 help="sys.setcheckinterval() value " 538 "(Python 3.8 and older)") 539 parser.add_option("-I", "--switch-interval", 540 action="store", type="float", dest="switch_interval", default=None, 541 help="sys.setswitchinterval() value " 542 "(Python 3.2 and newer)") 543 parser.add_option("-n", "--num-threads", 544 action="store", type="int", dest="nthreads", default=4, 545 help="max number of threads in tests") 546 547 # Hidden option to run the pinging and bandwidth clients 548 parser.add_option("", "--latclient", 549 action="store", dest="latclient", default=None, 550 help=SUPPRESS_HELP) 551 parser.add_option("", "--bwclient", 552 action="store", dest="bwclient", default=None, 553 help=SUPPRESS_HELP) 554 555 options, args = parser.parse_args() 556 if args: 557 parser.error("unexpected arguments") 558 559 if options.latclient: 560 kwargs = eval(options.latclient) 561 latency_client(**kwargs) 562 return 563 564 if options.bwclient: 565 kwargs = eval(options.bwclient) 566 bandwidth_client(**kwargs) 567 return 568 569 if not options.throughput and not options.latency and not options.bandwidth: 570 options.throughput = options.latency = options.bandwidth = True 571 if options.check_interval: 572 sys.setcheckinterval(options.check_interval) 573 if options.switch_interval: 574 sys.setswitchinterval(options.switch_interval) 575 576 print("== %s %s (%s) ==" % ( 577 platform.python_implementation(), 578 platform.python_version(), 579 platform.python_build()[0], 580 )) 581 # Processor identification often has repeated spaces 582 cpu = ' '.join(platform.processor().split()) 583 print("== %s %s on '%s' ==" % ( 584 platform.machine(), 585 platform.system(), 586 cpu, 587 )) 588 print() 589 590 if options.throughput: 591 print("--- Throughput ---") 592 print() 593 run_throughput_tests(options.nthreads) 594 595 if options.latency: 596 print("--- Latency ---") 597 print() 598 run_latency_tests(options.nthreads) 599 600 if options.bandwidth: 601 print("--- I/O bandwidth ---") 602 print() 603 run_bandwidth_tests(options.nthreads) 604 605if __name__ == "__main__": 606 main() 607