• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3from __future__ import division
4from __future__ import print_function
5
6"""
7ccbench, a Python concurrency benchmark.
8"""
9
10import time
11import os
12import sys
13import itertools
14import threading
15import subprocess
16import socket
17from optparse import OptionParser, SUPPRESS_HELP
18import platform
19
20# Compatibility
21try:
22    xrange
23except NameError:
24    xrange = range
25
26try:
27    map = itertools.imap
28except AttributeError:
29    pass
30
31
32THROUGHPUT_DURATION = 2.0
33
34LATENCY_PING_INTERVAL = 0.1
35LATENCY_DURATION = 2.0
36
37BANDWIDTH_PACKET_SIZE = 1024
38BANDWIDTH_DURATION = 2.0
39
40
41def task_pidigits():
42    """Pi calculation (Python)"""
43    _map = map
44    _count = itertools.count
45    _islice = itertools.islice
46
47    def calc_ndigits(n):
48        # From http://shootout.alioth.debian.org/
49        def gen_x():
50            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
51
52        def compose(a, b):
53            aq, ar, as_, at = a
54            bq, br, bs, bt = b
55            return (aq * bq,
56                    aq * br + ar * bt,
57                    as_ * bq + at * bs,
58                    as_ * br + at * bt)
59
60        def extract(z, j):
61            q, r, s, t = z
62            return (q*j + r) // (s*j + t)
63
64        def pi_digits():
65            z = (1, 0, 0, 1)
66            x = gen_x()
67            while 1:
68                y = extract(z, 3)
69                while y != extract(z, 4):
70                    z = compose(z, next(x))
71                    y = extract(z, 3)
72                z = compose((10, -10*y, 0, 1), z)
73                yield y
74
75        return list(_islice(pi_digits(), n))
76
77    return calc_ndigits, (50, )
78
79def task_regex():
80    """regular expression (C)"""
81    # XXX this task gives horrendous latency results.
82    import re
83    # Taken from the `inspect` module
84    pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
85    with open(__file__, "r") as f:
86        arg = f.read(2000)
87    return pat.findall, (arg, )
88
89def task_sort():
90    """list sorting (C)"""
91    def list_sort(l):
92        l = l[::-1]
93        l.sort()
94
95    return list_sort, (list(range(1000)), )
96
97def task_compress_zlib():
98    """zlib compression (C)"""
99    import zlib
100    with open(__file__, "rb") as f:
101        arg = f.read(5000) * 3
102
103    def compress(s):
104        zlib.decompress(zlib.compress(s, 5))
105    return compress, (arg, )
106
107def task_compress_bz2():
108    """bz2 compression (C)"""
109    import bz2
110    with open(__file__, "rb") as f:
111        arg = f.read(3000) * 2
112
113    def compress(s):
114        bz2.compress(s)
115    return compress, (arg, )
116
117def task_hashing():
118    """SHA1 hashing (C)"""
119    import hashlib
120    with open(__file__, "rb") as f:
121        arg = f.read(5000) * 30
122
123    def compute(s):
124        hashlib.sha1(s).digest()
125    return compute, (arg, )
126
127
128throughput_tasks = [task_pidigits, task_regex]
129for mod in 'bz2', 'hashlib':
130    try:
131        globals()[mod] = __import__(mod)
132    except ImportError:
133        globals()[mod] = None
134
135# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
136# hashlib if available.
137# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
138if bz2 is not None:
139    throughput_tasks.append(task_compress_bz2)
140elif hashlib is not None:
141    throughput_tasks.append(task_hashing)
142else:
143    throughput_tasks.append(task_compress_zlib)
144
145latency_tasks = throughput_tasks
146bandwidth_tasks = [task_pidigits]
147
148
149class TimedLoop:
150    def __init__(self, func, args):
151        self.func = func
152        self.args = args
153
154    def __call__(self, start_time, min_duration, end_event, do_yield=False):
155        step = 20
156        niters = 0
157        duration = 0.0
158        _time = time.time
159        _sleep = time.sleep
160        _func = self.func
161        _args = self.args
162        t1 = start_time
163        while True:
164            for i in range(step):
165                _func(*_args)
166            t2 = _time()
167            # If another thread terminated, the current measurement is invalid
168            # => return the previous one.
169            if end_event:
170                return niters, duration
171            niters += step
172            duration = t2 - start_time
173            if duration >= min_duration:
174                end_event.append(None)
175                return niters, duration
176            if t2 - t1 < 0.01:
177                # Minimize interference of measurement on overall runtime
178                step = step * 3 // 2
179            elif do_yield:
180                # OS scheduling of Python threads is sometimes so bad that we
181                # have to force thread switching ourselves, otherwise we get
182                # completely useless results.
183                _sleep(0.0001)
184            t1 = t2
185
186
187def run_throughput_test(func, args, nthreads):
188    assert nthreads >= 1
189
190    # Warm up
191    func(*args)
192
193    results = []
194    loop = TimedLoop(func, args)
195    end_event = []
196
197    if nthreads == 1:
198        # Pure single-threaded performance, without any switching or
199        # synchronization overhead.
200        start_time = time.time()
201        results.append(loop(start_time, THROUGHPUT_DURATION,
202                            end_event, do_yield=False))
203        return results
204
205    started = False
206    ready_cond = threading.Condition()
207    start_cond = threading.Condition()
208    ready = []
209
210    def run():
211        with ready_cond:
212            ready.append(None)
213            ready_cond.notify()
214        with start_cond:
215            while not started:
216                start_cond.wait()
217        results.append(loop(start_time, THROUGHPUT_DURATION,
218                            end_event, do_yield=True))
219
220    threads = []
221    for i in range(nthreads):
222        threads.append(threading.Thread(target=run))
223    for t in threads:
224        t.daemon = True
225        t.start()
226    # We don't want measurements to include thread startup overhead,
227    # so we arrange for timing to start after all threads are ready.
228    with ready_cond:
229        while len(ready) < nthreads:
230            ready_cond.wait()
231    with start_cond:
232        start_time = time.time()
233        started = True
234        start_cond.notify(nthreads)
235    for t in threads:
236        t.join()
237
238    return results
239
240def run_throughput_tests(max_threads):
241    for task in throughput_tasks:
242        print(task.__doc__)
243        print()
244        func, args = task()
245        nthreads = 1
246        baseline_speed = None
247        while nthreads <= max_threads:
248            results = run_throughput_test(func, args, nthreads)
249            # Taking the max duration rather than average gives pessimistic
250            # results rather than optimistic.
251            speed = sum(r[0] for r in results) / max(r[1] for r in results)
252            print("threads=%d: %d" % (nthreads, speed), end="")
253            if baseline_speed is None:
254                print(" iterations/s.")
255                baseline_speed = speed
256            else:
257                print(" ( %d %%)" % (speed / baseline_speed * 100))
258            nthreads += 1
259        print()
260
261
262LAT_END = "END"
263
264def _sendto(sock, s, addr):
265    sock.sendto(s.encode('ascii'), addr)
266
267def _recv(sock, n):
268    return sock.recv(n).decode('ascii')
269
270def latency_client(addr, nb_pings, interval):
271    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
272    try:
273        _time = time.time
274        _sleep = time.sleep
275        def _ping():
276            _sendto(sock, "%r\n" % _time(), addr)
277        # The first ping signals the parent process that we are ready.
278        _ping()
279        # We give the parent a bit of time to notice.
280        _sleep(1.0)
281        for i in range(nb_pings):
282            _sleep(interval)
283            _ping()
284        _sendto(sock, LAT_END + "\n", addr)
285    finally:
286        sock.close()
287
288def run_latency_client(**kwargs):
289    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
290    cmd_line.extend(['--latclient', repr(kwargs)])
291    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
292                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
293
294def run_latency_test(func, args, nthreads):
295    # Create a listening socket to receive the pings. We use UDP which should
296    # be painlessly cross-platform.
297    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
298    sock.bind(("127.0.0.1", 0))
299    addr = sock.getsockname()
300
301    interval = LATENCY_PING_INTERVAL
302    duration = LATENCY_DURATION
303    nb_pings = int(duration / interval)
304
305    results = []
306    threads = []
307    end_event = []
308    start_cond = threading.Condition()
309    started = False
310    if nthreads > 0:
311        # Warm up
312        func(*args)
313
314        results = []
315        loop = TimedLoop(func, args)
316        ready = []
317        ready_cond = threading.Condition()
318
319        def run():
320            with ready_cond:
321                ready.append(None)
322                ready_cond.notify()
323            with start_cond:
324                while not started:
325                    start_cond.wait()
326            loop(start_time, duration * 1.5, end_event, do_yield=False)
327
328        for i in range(nthreads):
329            threads.append(threading.Thread(target=run))
330        for t in threads:
331            t.daemon = True
332            t.start()
333        # Wait for threads to be ready
334        with ready_cond:
335            while len(ready) < nthreads:
336                ready_cond.wait()
337
338    # Run the client and wait for the first ping(s) to arrive before
339    # unblocking the background threads.
340    chunks = []
341    process = run_latency_client(addr=sock.getsockname(),
342                                 nb_pings=nb_pings, interval=interval)
343    s = _recv(sock, 4096)
344    _time = time.time
345
346    with start_cond:
347        start_time = _time()
348        started = True
349        start_cond.notify(nthreads)
350
351    while LAT_END not in s:
352        s = _recv(sock, 4096)
353        t = _time()
354        chunks.append((t, s))
355
356    # Tell the background threads to stop.
357    end_event.append(None)
358    for t in threads:
359        t.join()
360    process.wait()
361    sock.close()
362
363    for recv_time, chunk in chunks:
364        # NOTE: it is assumed that a line sent by a client wasn't received
365        # in two chunks because the lines are very small.
366        for line in chunk.splitlines():
367            line = line.strip()
368            if line and line != LAT_END:
369                send_time = eval(line)
370                assert isinstance(send_time, float)
371                results.append((send_time, recv_time))
372
373    return results
374
375def run_latency_tests(max_threads):
376    for task in latency_tasks:
377        print("Background CPU task:", task.__doc__)
378        print()
379        func, args = task()
380        nthreads = 0
381        while nthreads <= max_threads:
382            results = run_latency_test(func, args, nthreads)
383            n = len(results)
384            # We print out milliseconds
385            lats = [1000 * (t2 - t1) for (t1, t2) in results]
386            #print(list(map(int, lats)))
387            avg = sum(lats) / n
388            dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
389            print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
390            print()
391            #print("    [... from %d samples]" % n)
392            nthreads += 1
393        print()
394
395
396BW_END = "END"
397
398def bandwidth_client(addr, packet_size, duration):
399    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
400    sock.bind(("127.0.0.1", 0))
401    local_addr = sock.getsockname()
402    _time = time.time
403    _sleep = time.sleep
404    def _send_chunk(msg):
405        _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
406    # We give the parent some time to be ready.
407    _sleep(1.0)
408    try:
409        start_time = _time()
410        end_time = start_time + duration * 2.0
411        i = 0
412        while _time() < end_time:
413            _send_chunk(str(i))
414            s = _recv(sock, packet_size)
415            assert len(s) == packet_size
416            i += 1
417        _send_chunk(BW_END)
418    finally:
419        sock.close()
420
421def run_bandwidth_client(**kwargs):
422    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
423    cmd_line.extend(['--bwclient', repr(kwargs)])
424    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
425                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
426
427def run_bandwidth_test(func, args, nthreads):
428    # Create a listening socket to receive the packets. We use UDP which should
429    # be painlessly cross-platform.
430    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
431        sock.bind(("127.0.0.1", 0))
432        addr = sock.getsockname()
433
434        duration = BANDWIDTH_DURATION
435        packet_size = BANDWIDTH_PACKET_SIZE
436
437        results = []
438        threads = []
439        end_event = []
440        start_cond = threading.Condition()
441        started = False
442        if nthreads > 0:
443            # Warm up
444            func(*args)
445
446            results = []
447            loop = TimedLoop(func, args)
448            ready = []
449            ready_cond = threading.Condition()
450
451            def run():
452                with ready_cond:
453                    ready.append(None)
454                    ready_cond.notify()
455                with start_cond:
456                    while not started:
457                        start_cond.wait()
458                loop(start_time, duration * 1.5, end_event, do_yield=False)
459
460            for i in range(nthreads):
461                threads.append(threading.Thread(target=run))
462            for t in threads:
463                t.daemon = True
464                t.start()
465            # Wait for threads to be ready
466            with ready_cond:
467                while len(ready) < nthreads:
468                    ready_cond.wait()
469
470        # Run the client and wait for the first packet to arrive before
471        # unblocking the background threads.
472        process = run_bandwidth_client(addr=addr,
473                                       packet_size=packet_size,
474                                       duration=duration)
475        _time = time.time
476        # This will also wait for the parent to be ready
477        s = _recv(sock, packet_size)
478        remote_addr = eval(s.partition('#')[0])
479
480        with start_cond:
481            start_time = _time()
482            started = True
483            start_cond.notify(nthreads)
484
485        n = 0
486        first_time = None
487        while not end_event and BW_END not in s:
488            _sendto(sock, s, remote_addr)
489            s = _recv(sock, packet_size)
490            if first_time is None:
491                first_time = _time()
492            n += 1
493        end_time = _time()
494
495    end_event.append(None)
496    for t in threads:
497        t.join()
498    process.kill()
499
500    return (n - 1) / (end_time - first_time)
501
502def run_bandwidth_tests(max_threads):
503    for task in bandwidth_tasks:
504        print("Background CPU task:", task.__doc__)
505        print()
506        func, args = task()
507        nthreads = 0
508        baseline_speed = None
509        while nthreads <= max_threads:
510            results = run_bandwidth_test(func, args, nthreads)
511            speed = results
512            #speed = len(results) * 1.0 / results[-1][0]
513            print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
514            if baseline_speed is None:
515                print(" packets/s.")
516                baseline_speed = speed
517            else:
518                print(" ( %d %%)" % (speed / baseline_speed * 100))
519            nthreads += 1
520        print()
521
522
523def main():
524    usage = "usage: %prog [-h|--help] [options]"
525    parser = OptionParser(usage=usage)
526    parser.add_option("-t", "--throughput",
527                      action="store_true", dest="throughput", default=False,
528                      help="run throughput tests")
529    parser.add_option("-l", "--latency",
530                      action="store_true", dest="latency", default=False,
531                      help="run latency tests")
532    parser.add_option("-b", "--bandwidth",
533                      action="store_true", dest="bandwidth", default=False,
534                      help="run I/O bandwidth tests")
535    parser.add_option("-i", "--interval",
536                      action="store", type="int", dest="check_interval", default=None,
537                      help="sys.setcheckinterval() value "
538                           "(Python 3.8 and older)")
539    parser.add_option("-I", "--switch-interval",
540                      action="store", type="float", dest="switch_interval", default=None,
541                      help="sys.setswitchinterval() value "
542                           "(Python 3.2 and newer)")
543    parser.add_option("-n", "--num-threads",
544                      action="store", type="int", dest="nthreads", default=4,
545                      help="max number of threads in tests")
546
547    # Hidden option to run the pinging and bandwidth clients
548    parser.add_option("", "--latclient",
549                      action="store", dest="latclient", default=None,
550                      help=SUPPRESS_HELP)
551    parser.add_option("", "--bwclient",
552                      action="store", dest="bwclient", default=None,
553                      help=SUPPRESS_HELP)
554
555    options, args = parser.parse_args()
556    if args:
557        parser.error("unexpected arguments")
558
559    if options.latclient:
560        kwargs = eval(options.latclient)
561        latency_client(**kwargs)
562        return
563
564    if options.bwclient:
565        kwargs = eval(options.bwclient)
566        bandwidth_client(**kwargs)
567        return
568
569    if not options.throughput and not options.latency and not options.bandwidth:
570        options.throughput = options.latency = options.bandwidth = True
571    if options.check_interval:
572        sys.setcheckinterval(options.check_interval)
573    if options.switch_interval:
574        sys.setswitchinterval(options.switch_interval)
575
576    print("== %s %s (%s) ==" % (
577        platform.python_implementation(),
578        platform.python_version(),
579        platform.python_build()[0],
580    ))
581    # Processor identification often has repeated spaces
582    cpu = ' '.join(platform.processor().split())
583    print("== %s %s on '%s' ==" % (
584        platform.machine(),
585        platform.system(),
586        cpu,
587    ))
588    print()
589
590    if options.throughput:
591        print("--- Throughput ---")
592        print()
593        run_throughput_tests(options.nthreads)
594
595    if options.latency:
596        print("--- Latency ---")
597        print()
598        run_latency_tests(options.nthreads)
599
600    if options.bandwidth:
601        print("--- I/O bandwidth ---")
602        print()
603        run_bandwidth_tests(options.nthreads)
604
605if __name__ == "__main__":
606    main()
607