• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
3
4from __future__ import division
5from __future__ import print_function
6
7"""
8ccbench, a Python concurrency benchmark.
9"""
10
11import time
12import os
13import sys
14import functools
15import itertools
16import threading
17import subprocess
18import socket
19from optparse import OptionParser, SUPPRESS_HELP
20import platform
21
22# Compatibility
23try:
24    xrange
25except NameError:
26    xrange = range
27
28try:
29    map = itertools.imap
30except AttributeError:
31    pass
32
33
34THROUGHPUT_DURATION = 2.0
35
36LATENCY_PING_INTERVAL = 0.1
37LATENCY_DURATION = 2.0
38
39BANDWIDTH_PACKET_SIZE = 1024
40BANDWIDTH_DURATION = 2.0
41
42
43def task_pidigits():
44    """Pi calculation (Python)"""
45    _map = map
46    _count = itertools.count
47    _islice = itertools.islice
48
49    def calc_ndigits(n):
50        # From http://shootout.alioth.debian.org/
51        def gen_x():
52            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
53
54        def compose(a, b):
55            aq, ar, as_, at = a
56            bq, br, bs, bt = b
57            return (aq * bq,
58                    aq * br + ar * bt,
59                    as_ * bq + at * bs,
60                    as_ * br + at * bt)
61
62        def extract(z, j):
63            q, r, s, t = z
64            return (q*j + r) // (s*j + t)
65
66        def pi_digits():
67            z = (1, 0, 0, 1)
68            x = gen_x()
69            while 1:
70                y = extract(z, 3)
71                while y != extract(z, 4):
72                    z = compose(z, next(x))
73                    y = extract(z, 3)
74                z = compose((10, -10*y, 0, 1), z)
75                yield y
76
77        return list(_islice(pi_digits(), n))
78
79    return calc_ndigits, (50, )
80
81def task_regex():
82    """regular expression (C)"""
83    # XXX this task gives horrendous latency results.
84    import re
85    # Taken from the `inspect` module
86    pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
87    with open(__file__, "r") as f:
88        arg = f.read(2000)
89
90    def findall(s):
91        t = time.time()
92        try:
93            return pat.findall(s)
94        finally:
95            print(time.time() - t)
96    return pat.findall, (arg, )
97
98def task_sort():
99    """list sorting (C)"""
100    def list_sort(l):
101        l = l[::-1]
102        l.sort()
103
104    return list_sort, (list(range(1000)), )
105
106def task_compress_zlib():
107    """zlib compression (C)"""
108    import zlib
109    with open(__file__, "rb") as f:
110        arg = f.read(5000) * 3
111
112    def compress(s):
113        zlib.decompress(zlib.compress(s, 5))
114    return compress, (arg, )
115
116def task_compress_bz2():
117    """bz2 compression (C)"""
118    import bz2
119    with open(__file__, "rb") as f:
120        arg = f.read(3000) * 2
121
122    def compress(s):
123        bz2.compress(s)
124    return compress, (arg, )
125
126def task_hashing():
127    """SHA1 hashing (C)"""
128    import hashlib
129    with open(__file__, "rb") as f:
130        arg = f.read(5000) * 30
131
132    def compute(s):
133        hashlib.sha1(s).digest()
134    return compute, (arg, )
135
136
137throughput_tasks = [task_pidigits, task_regex]
138for mod in 'bz2', 'hashlib':
139    try:
140        globals()[mod] = __import__(mod)
141    except ImportError:
142        globals()[mod] = None
143
144# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
145# hashlib if available.
146# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
147if bz2 is not None:
148    throughput_tasks.append(task_compress_bz2)
149elif hashlib is not None:
150    throughput_tasks.append(task_hashing)
151else:
152    throughput_tasks.append(task_compress_zlib)
153
154latency_tasks = throughput_tasks
155bandwidth_tasks = [task_pidigits]
156
157
158class TimedLoop:
159    def __init__(self, func, args):
160        self.func = func
161        self.args = args
162
163    def __call__(self, start_time, min_duration, end_event, do_yield=False):
164        step = 20
165        niters = 0
166        duration = 0.0
167        _time = time.time
168        _sleep = time.sleep
169        _func = self.func
170        _args = self.args
171        t1 = start_time
172        while True:
173            for i in range(step):
174                _func(*_args)
175            t2 = _time()
176            # If another thread terminated, the current measurement is invalid
177            # => return the previous one.
178            if end_event:
179                return niters, duration
180            niters += step
181            duration = t2 - start_time
182            if duration >= min_duration:
183                end_event.append(None)
184                return niters, duration
185            if t2 - t1 < 0.01:
186                # Minimize interference of measurement on overall runtime
187                step = step * 3 // 2
188            elif do_yield:
189                # OS scheduling of Python threads is sometimes so bad that we
190                # have to force thread switching ourselves, otherwise we get
191                # completely useless results.
192                _sleep(0.0001)
193            t1 = t2
194
195
196def run_throughput_test(func, args, nthreads):
197    assert nthreads >= 1
198
199    # Warm up
200    func(*args)
201
202    results = []
203    loop = TimedLoop(func, args)
204    end_event = []
205
206    if nthreads == 1:
207        # Pure single-threaded performance, without any switching or
208        # synchronization overhead.
209        start_time = time.time()
210        results.append(loop(start_time, THROUGHPUT_DURATION,
211                            end_event, do_yield=False))
212        return results
213
214    started = False
215    ready_cond = threading.Condition()
216    start_cond = threading.Condition()
217    ready = []
218
219    def run():
220        with ready_cond:
221            ready.append(None)
222            ready_cond.notify()
223        with start_cond:
224            while not started:
225                start_cond.wait()
226        results.append(loop(start_time, THROUGHPUT_DURATION,
227                            end_event, do_yield=True))
228
229    threads = []
230    for i in range(nthreads):
231        threads.append(threading.Thread(target=run))
232    for t in threads:
233        t.setDaemon(True)
234        t.start()
235    # We don't want measurements to include thread startup overhead,
236    # so we arrange for timing to start after all threads are ready.
237    with ready_cond:
238        while len(ready) < nthreads:
239            ready_cond.wait()
240    with start_cond:
241        start_time = time.time()
242        started = True
243        start_cond.notify(nthreads)
244    for t in threads:
245        t.join()
246
247    return results
248
249def run_throughput_tests(max_threads):
250    for task in throughput_tasks:
251        print(task.__doc__)
252        print()
253        func, args = task()
254        nthreads = 1
255        baseline_speed = None
256        while nthreads <= max_threads:
257            results = run_throughput_test(func, args, nthreads)
258            # Taking the max duration rather than average gives pessimistic
259            # results rather than optimistic.
260            speed = sum(r[0] for r in results) / max(r[1] for r in results)
261            print("threads=%d: %d" % (nthreads, speed), end="")
262            if baseline_speed is None:
263                print(" iterations/s.")
264                baseline_speed = speed
265            else:
266                print(" ( %d %%)" % (speed / baseline_speed * 100))
267            nthreads += 1
268        print()
269
270
271LAT_END = "END"
272
273def _sendto(sock, s, addr):
274    sock.sendto(s.encode('ascii'), addr)
275
276def _recv(sock, n):
277    return sock.recv(n).decode('ascii')
278
279def latency_client(addr, nb_pings, interval):
280    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
281    _time = time.time
282    _sleep = time.sleep
283    def _ping():
284        _sendto(sock, "%r\n" % _time(), addr)
285    # The first ping signals the parent process that we are ready.
286    _ping()
287    # We give the parent a bit of time to notice.
288    _sleep(1.0)
289    for i in range(nb_pings):
290        _sleep(interval)
291        _ping()
292    _sendto(sock, LAT_END + "\n", addr)
293
294def run_latency_client(**kwargs):
295    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
296    cmd_line.extend(['--latclient', repr(kwargs)])
297    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
298                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
299
300def run_latency_test(func, args, nthreads):
301    # Create a listening socket to receive the pings. We use UDP which should
302    # be painlessly cross-platform.
303    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
304    sock.bind(("127.0.0.1", 0))
305    addr = sock.getsockname()
306
307    interval = LATENCY_PING_INTERVAL
308    duration = LATENCY_DURATION
309    nb_pings = int(duration / interval)
310
311    results = []
312    threads = []
313    end_event = []
314    start_cond = threading.Condition()
315    started = False
316    if nthreads > 0:
317        # Warm up
318        func(*args)
319
320        results = []
321        loop = TimedLoop(func, args)
322        ready = []
323        ready_cond = threading.Condition()
324
325        def run():
326            with ready_cond:
327                ready.append(None)
328                ready_cond.notify()
329            with start_cond:
330                while not started:
331                    start_cond.wait()
332            loop(start_time, duration * 1.5, end_event, do_yield=False)
333
334        for i in range(nthreads):
335            threads.append(threading.Thread(target=run))
336        for t in threads:
337            t.setDaemon(True)
338            t.start()
339        # Wait for threads to be ready
340        with ready_cond:
341            while len(ready) < nthreads:
342                ready_cond.wait()
343
344    # Run the client and wait for the first ping(s) to arrive before
345    # unblocking the background threads.
346    chunks = []
347    process = run_latency_client(addr=sock.getsockname(),
348                                 nb_pings=nb_pings, interval=interval)
349    s = _recv(sock, 4096)
350    _time = time.time
351
352    with start_cond:
353        start_time = _time()
354        started = True
355        start_cond.notify(nthreads)
356
357    while LAT_END not in s:
358        s = _recv(sock, 4096)
359        t = _time()
360        chunks.append((t, s))
361
362    # Tell the background threads to stop.
363    end_event.append(None)
364    for t in threads:
365        t.join()
366    process.wait()
367
368    for recv_time, chunk in chunks:
369        # NOTE: it is assumed that a line sent by a client wasn't received
370        # in two chunks because the lines are very small.
371        for line in chunk.splitlines():
372            line = line.strip()
373            if line and line != LAT_END:
374                send_time = eval(line)
375                assert isinstance(send_time, float)
376                results.append((send_time, recv_time))
377
378    return results
379
380def run_latency_tests(max_threads):
381    for task in latency_tasks:
382        print("Background CPU task:", task.__doc__)
383        print()
384        func, args = task()
385        nthreads = 0
386        while nthreads <= max_threads:
387            results = run_latency_test(func, args, nthreads)
388            n = len(results)
389            # We print out milliseconds
390            lats = [1000 * (t2 - t1) for (t1, t2) in results]
391            #print(list(map(int, lats)))
392            avg = sum(lats) / n
393            dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
394            print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
395            print()
396            #print("    [... from %d samples]" % n)
397            nthreads += 1
398        print()
399
400
401BW_END = "END"
402
403def bandwidth_client(addr, packet_size, duration):
404    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
405    sock.bind(("127.0.0.1", 0))
406    local_addr = sock.getsockname()
407    _time = time.time
408    _sleep = time.sleep
409    def _send_chunk(msg):
410        _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
411    # We give the parent some time to be ready.
412    _sleep(1.0)
413    try:
414        start_time = _time()
415        end_time = start_time + duration * 2.0
416        i = 0
417        while _time() < end_time:
418            _send_chunk(str(i))
419            s = _recv(sock, packet_size)
420            assert len(s) == packet_size
421            i += 1
422        _send_chunk(BW_END)
423    finally:
424        sock.close()
425
426def run_bandwidth_client(**kwargs):
427    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
428    cmd_line.extend(['--bwclient', repr(kwargs)])
429    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
430                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
431
432def run_bandwidth_test(func, args, nthreads):
433    # Create a listening socket to receive the packets. We use UDP which should
434    # be painlessly cross-platform.
435    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
436    sock.bind(("127.0.0.1", 0))
437    addr = sock.getsockname()
438
439    duration = BANDWIDTH_DURATION
440    packet_size = BANDWIDTH_PACKET_SIZE
441
442    results = []
443    threads = []
444    end_event = []
445    start_cond = threading.Condition()
446    started = False
447    if nthreads > 0:
448        # Warm up
449        func(*args)
450
451        results = []
452        loop = TimedLoop(func, args)
453        ready = []
454        ready_cond = threading.Condition()
455
456        def run():
457            with ready_cond:
458                ready.append(None)
459                ready_cond.notify()
460            with start_cond:
461                while not started:
462                    start_cond.wait()
463            loop(start_time, duration * 1.5, end_event, do_yield=False)
464
465        for i in range(nthreads):
466            threads.append(threading.Thread(target=run))
467        for t in threads:
468            t.setDaemon(True)
469            t.start()
470        # Wait for threads to be ready
471        with ready_cond:
472            while len(ready) < nthreads:
473                ready_cond.wait()
474
475    # Run the client and wait for the first packet to arrive before
476    # unblocking the background threads.
477    process = run_bandwidth_client(addr=addr,
478                                   packet_size=packet_size,
479                                   duration=duration)
480    _time = time.time
481    # This will also wait for the parent to be ready
482    s = _recv(sock, packet_size)
483    remote_addr = eval(s.partition('#')[0])
484
485    with start_cond:
486        start_time = _time()
487        started = True
488        start_cond.notify(nthreads)
489
490    n = 0
491    first_time = None
492    while not end_event and BW_END not in s:
493        _sendto(sock, s, remote_addr)
494        s = _recv(sock, packet_size)
495        if first_time is None:
496            first_time = _time()
497        n += 1
498    end_time = _time()
499
500    end_event.append(None)
501    for t in threads:
502        t.join()
503    process.kill()
504
505    return (n - 1) / (end_time - first_time)
506
507def run_bandwidth_tests(max_threads):
508    for task in bandwidth_tasks:
509        print("Background CPU task:", task.__doc__)
510        print()
511        func, args = task()
512        nthreads = 0
513        baseline_speed = None
514        while nthreads <= max_threads:
515            results = run_bandwidth_test(func, args, nthreads)
516            speed = results
517            #speed = len(results) * 1.0 / results[-1][0]
518            print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
519            if baseline_speed is None:
520                print(" packets/s.")
521                baseline_speed = speed
522            else:
523                print(" ( %d %%)" % (speed / baseline_speed * 100))
524            nthreads += 1
525        print()
526
527
528def main():
529    usage = "usage: %prog [-h|--help] [options]"
530    parser = OptionParser(usage=usage)
531    parser.add_option("-t", "--throughput",
532                      action="store_true", dest="throughput", default=False,
533                      help="run throughput tests")
534    parser.add_option("-l", "--latency",
535                      action="store_true", dest="latency", default=False,
536                      help="run latency tests")
537    parser.add_option("-b", "--bandwidth",
538                      action="store_true", dest="bandwidth", default=False,
539                      help="run I/O bandwidth tests")
540    parser.add_option("-i", "--interval",
541                      action="store", type="int", dest="check_interval", default=None,
542                      help="sys.setcheckinterval() value")
543    parser.add_option("-I", "--switch-interval",
544                      action="store", type="float", dest="switch_interval", default=None,
545                      help="sys.setswitchinterval() value")
546    parser.add_option("-n", "--num-threads",
547                      action="store", type="int", dest="nthreads", default=4,
548                      help="max number of threads in tests")
549
550    # Hidden option to run the pinging and bandwidth clients
551    parser.add_option("", "--latclient",
552                      action="store", dest="latclient", default=None,
553                      help=SUPPRESS_HELP)
554    parser.add_option("", "--bwclient",
555                      action="store", dest="bwclient", default=None,
556                      help=SUPPRESS_HELP)
557
558    options, args = parser.parse_args()
559    if args:
560        parser.error("unexpected arguments")
561
562    if options.latclient:
563        kwargs = eval(options.latclient)
564        latency_client(**kwargs)
565        return
566
567    if options.bwclient:
568        kwargs = eval(options.bwclient)
569        bandwidth_client(**kwargs)
570        return
571
572    if not options.throughput and not options.latency and not options.bandwidth:
573        options.throughput = options.latency = options.bandwidth = True
574    if options.check_interval:
575        sys.setcheckinterval(options.check_interval)
576    if options.switch_interval:
577        sys.setswitchinterval(options.switch_interval)
578
579    print("== %s %s (%s) ==" % (
580        platform.python_implementation(),
581        platform.python_version(),
582        platform.python_build()[0],
583    ))
584    # Processor identification often has repeated spaces
585    cpu = ' '.join(platform.processor().split())
586    print("== %s %s on '%s' ==" % (
587        platform.machine(),
588        platform.system(),
589        cpu,
590    ))
591    print()
592
593    if options.throughput:
594        print("--- Throughput ---")
595        print()
596        run_throughput_tests(options.nthreads)
597
598    if options.latency:
599        print("--- Latency ---")
600        print()
601        run_latency_tests(options.nthreads)
602
603    if options.bandwidth:
604        print("--- I/O bandwidth ---")
605        print()
606        run_bandwidth_tests(options.nthreads)
607
608if __name__ == "__main__":
609    main()
610