# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import logging import math import numbers import re import time import os.path from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib.cros import path_utils class NetperfResult(object): """Encapsulates logic to parse and represent netperf results.""" @staticmethod def from_netperf_results(test_type, results, duration_seconds): """Parse the text output of netperf and return a NetperfResult. @param test_type string one of NetperfConfig.TEST_TYPE_* below. @param results string raw results from netperf. @param duration_seconds float number of seconds the test ran for. @return NetperfResult result. """ lines = results.splitlines() # Include only results lines, which should start with a number. This # helps eliminate inconsistent output, e.g., from benign warnings # like: # catcher: timer popped with times_up != 0 lines = [l for l in lines if re.match('[0-9]+', l.strip())] if test_type in NetperfConfig.TCP_STREAM_TESTS: """Parses the following (works for both TCP_STREAM, TCP_MAERTS and TCP_SENDFILE) and returns a singleton containing throughput. TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to \ foo.bar.com (10.10.10.3) port 0 AF_INET Recv Send Send Socket Socket Message Elapsed Size Size Size Time Throughput bytes bytes bytes secs. 10^6bits/sec 87380 16384 16384 2.00 941.28 """ if len(lines) < 1: return None result = NetperfResult(test_type, duration_seconds, throughput=float(lines[0].split()[4])) elif test_type in NetperfConfig.UDP_STREAM_TESTS: """Parses the following and returns a tuple containing throughput and the number of errors. UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \ to foo.bar.com (10.10.10.3) port 0 AF_INET Socket Message Elapsed Messages Size Size Time Okay Errors Throughput bytes bytes secs # # 10^6bits/sec 129024 65507 2.00 3673 0 961.87 131072 2.00 3673 961.87 """ if len(lines) < 1: return None udp_tokens = lines[0].split() result = NetperfResult(test_type, duration_seconds, throughput=float(udp_tokens[5]), errors=float(udp_tokens[4])) elif test_type in NetperfConfig.REQUEST_RESPONSE_TESTS: """Parses the following which works for both rr (TCP and UDP) and crr tests and returns a singleton containing transfer rate. TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \ to foo.bar.com (10.10.10.3) port 0 AF_INET Local /Remote Socket Size Request Resp. Elapsed Trans. Send Recv Size Size Time Rate bytes Bytes bytes bytes secs. per sec 16384 87380 1 1 2.00 14118.53 16384 87380 """ if len(lines) < 1: return None result = NetperfResult(test_type, duration_seconds, transaction_rate=float(lines[0].split()[5])) else: raise error.TestFail('Invalid netperf test type: %r.' % test_type) logging.info('%r', result) return result @staticmethod def _get_stats(samples, field_name): if any(map(lambda x: getattr(x, field_name) is None, samples)): return (None, None) values = map(lambda x: getattr(x, field_name), samples) N = len(samples) mean = math.fsum(values) / N deviation = None if N > 1: differences = map(lambda x: math.pow(mean - x, 2), values) deviation = math.sqrt(math.fsum(differences) / (N - 1)) return mean, deviation @staticmethod def from_samples(samples): """Build an averaged NetperfResult from |samples|. Calculate an representative sample with averaged values and standard deviation from samples. @param samples list of NetperfResult objects. @return NetperfResult object. """ if len(set([x.test_type for x in samples])) != 1: # We have either no samples or multiple test types. return None duration_seconds, duration_seconds_dev = NetperfResult._get_stats( samples, 'duration_seconds') throughput, throughput_dev = NetperfResult._get_stats( samples, 'throughput') errors, errors_dev = NetperfResult._get_stats(samples, 'errors') transaction_rate, transaction_rate_dev = NetperfResult._get_stats( samples, 'transaction_rate') return NetperfResult( samples[0].test_type, duration_seconds, duration_seconds_dev=duration_seconds_dev, throughput=throughput, throughput_dev=throughput_dev, errors=errors, errors_dev=errors_dev, transaction_rate=transaction_rate, transaction_rate_dev=transaction_rate_dev) @property def human_readable_tag(self): """@return string human readable test description.""" return NetperfConfig.test_type_to_human_readable_tag(self.test_type) @property def tag(self): """@return string very short test description.""" return NetperfConfig.test_type_to_tag(self.test_type) def __init__(self, test_type, duration_seconds, duration_seconds_dev=None, throughput=None, throughput_dev=None, errors=None, errors_dev=None, transaction_rate=None, transaction_rate_dev=None): """Construct a NetperfResult. @param duration_seconds float how long the test took. @param throughput float test throughput in Mbps. @param errors int number of UDP errors in test. @param transaction_rate float transactions per second. """ self.test_type = test_type self.duration_seconds = duration_seconds self.duration_seconds_dev = duration_seconds_dev self.throughput = throughput self.throughput_dev = throughput_dev self.errors = errors self.errors_dev = errors_dev self.transaction_rate = transaction_rate self.transaction_rate_dev = transaction_rate_dev if throughput is None and transaction_rate is None and errors is None: logging.error('Created a NetperfResult with no data.') def __repr__(self): fields = ['test_type=%s' % self.test_type] fields += ['%s=%0.2f' % item for item in vars(self).iteritems() if item[1] is not None and isinstance(item[1], numbers.Number)] return '%s(%s)' % (self.__class__.__name__, ', '.join(fields)) def all_deviations_less_than_fraction(self, fraction): """Check that this result is "acurate" enough. We say that a NetperfResult is "acurate" enough when for each measurement X with standard deviation d(X), d(X)/X <= |fraction|. @param fraction float used in constraint above. @return True on above condition. """ for measurement in ['throughput', 'errors', 'transaction_rate']: value = getattr(self, measurement) dev = getattr(self, measurement + '_dev') if value is None or dev is None: continue if not dev and not value: # 0/0 is undefined, but take this to be good for our purposes. continue if dev and not value: # Deviation is non-zero, but the average is 0. Deviation # as a fraction of the value is undefined but in theory # a "very large number." return False if dev / value > fraction: return False return True def get_keyval(self, prefix='', suffix=''): ret = {} if prefix: prefix = prefix + '_' if suffix: suffix = '_' + suffix for measurement in ['throughput', 'errors', 'transaction_rate']: value = getattr(self, measurement) dev = getattr(self, measurement + '_dev') if dev is None: margin = '' else: margin = '+-%0.2f' % dev if value is not None: ret[prefix + measurement + suffix] = '%0.2f%s' % (value, margin) return ret class NetperfAssertion(object): """Defines a set of expectations for netperf results.""" def _passes(self, result, field): value = getattr(result, field) deviation = getattr(result, field + '_dev') bounds = getattr(self, field + '_bounds') if bounds[0] is None and bounds[1] is None: return True if value is None: # We have bounds requirements, but no value to check? return False if bounds[0] is not None and bounds[0] > value + deviation: return False if bounds[1] is not None and bounds[1] < value - deviation: return False return True def __init__(self, duration_seconds_min=None, duration_seconds_max=None, throughput_min=None, throughput_max=None, error_min=None, error_max=None, transaction_rate_min=None, transaction_rate_max=None): """Construct a NetperfAssertion. Leaving bounds undefined sets them to values which are permissive. @param duration_seconds_min float minimal test duration in seconds. @param duration_seconds_max float maximal test duration in seconds. @param throughput_min float minimal throughput in Mbps. @param throughput_max float maximal throughput in Mbps. @param error_min int minimal number of UDP frame errors. @param error_max int max number of UDP frame errors. @param transaction_rate_min float minimal number of transactions per second. @param transaction_rate_max float max number of transactions per second. """ Bound = collections.namedtuple('Bound', ['lower', 'upper']) self.duration_seconds_bounds = Bound(duration_seconds_min, duration_seconds_max) self.throughput_bounds = Bound(throughput_min, throughput_max) self.errors_bounds = Bound(error_min, error_max) self.transaction_rate_bounds = Bound(transaction_rate_min, transaction_rate_max) def passes(self, result): """Check that a result matches the given assertion. @param result NetperfResult object produced by a test. @return True iff all this assertion passes for the give result. """ passed = [self._passes(result, field) for field in ['duration_seconds', 'throughput', 'errors', 'transaction_rate']] if all(passed): return True return False def __repr__(self): fields = {'duration_seconds_min': self.duration_seconds_bounds.lower, 'duration_seconds_max': self.duration_seconds_bounds.upper, 'throughput_min': self.throughput_bounds.lower, 'throughput_max': self.throughput_bounds.upper, 'error_min': self.errors_bounds.lower, 'error_max': self.errors_bounds.upper, 'transaction_rate_min': self.transaction_rate_bounds.lower, 'transaction_rate_max': self.transaction_rate_bounds.upper} return '%s(%s)' % (self.__class__.__name__, ', '.join(['%s=%r' % item for item in fields.iteritems() if item[1] is not None])) class NetperfConfig(object): """Defines a single netperf run.""" DEFAULT_TEST_TIME = 10 # Measures how many times we can connect, request a byte, and receive a # byte per second. TEST_TYPE_TCP_CRR = 'TCP_CRR' # MAERTS is stream backwards. Measure bitrate of a stream from the netperf # server to the client. TEST_TYPE_TCP_MAERTS = 'TCP_MAERTS' # Measures how many times we can request a byte and receive a byte per # second. TEST_TYPE_TCP_RR = 'TCP_RR' # This is like a TCP_STREAM test except that the netperf client will use # a platform dependent call like sendfile() rather than the simple send() # call. This can result in better performance. TEST_TYPE_TCP_SENDFILE = 'TCP_SENDFILE' # Measures throughput sending bytes from the client to the server in a # TCP stream. TEST_TYPE_TCP_STREAM = 'TCP_STREAM' # Measures how many times we can request a byte from the client and receive # a byte from the server. If any datagram is dropped, the client or server # will block indefinitely. This failure is not evident except as a low # transaction rate. TEST_TYPE_UDP_RR = 'UDP_RR' # Test UDP throughput sending from the client to the server. There is no # flow control here, and generally sending is easier that receiving, so # there will be two types of throughput, both receiving and sending. TEST_TYPE_UDP_STREAM = 'UDP_STREAM' # This isn't a real test type, but we can emulate a UDP stream from the # server to the DUT by running the netperf server on the DUT and the # client on the server and then doing a UDP_STREAM test. TEST_TYPE_UDP_MAERTS = 'UDP_MAERTS' # Different kinds of tests have different output formats. REQUEST_RESPONSE_TESTS = [ TEST_TYPE_TCP_CRR, TEST_TYPE_TCP_RR, TEST_TYPE_UDP_RR ] TCP_STREAM_TESTS = [ TEST_TYPE_TCP_MAERTS, TEST_TYPE_TCP_SENDFILE, TEST_TYPE_TCP_STREAM ] UDP_STREAM_TESTS = [ TEST_TYPE_UDP_STREAM, TEST_TYPE_UDP_MAERTS ] SHORT_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_crr', TEST_TYPE_TCP_MAERTS: 'tcp_rx', TEST_TYPE_TCP_RR: 'tcp_rr', TEST_TYPE_TCP_SENDFILE: 'tcp_stx', TEST_TYPE_TCP_STREAM: 'tcp_tx', TEST_TYPE_UDP_RR: 'udp_rr', TEST_TYPE_UDP_STREAM: 'udp_tx', TEST_TYPE_UDP_MAERTS: 'udp_rx' } READABLE_TAGS = { TEST_TYPE_TCP_CRR: 'tcp_connect_roundtrip_rate', TEST_TYPE_TCP_MAERTS: 'tcp_downstream', TEST_TYPE_TCP_RR: 'tcp_roundtrip_rate', TEST_TYPE_TCP_SENDFILE: 'tcp_upstream_sendfile', TEST_TYPE_TCP_STREAM: 'tcp_upstream', TEST_TYPE_UDP_RR: 'udp_roundtrip', TEST_TYPE_UDP_STREAM: 'udp_upstream', TEST_TYPE_UDP_MAERTS: 'udp_downstream' } @staticmethod def _assert_is_valid_test_type(test_type): """Assert that |test_type| is one of TEST_TYPE_* above. @param test_type string test type. """ if (test_type not in NetperfConfig.REQUEST_RESPONSE_TESTS and test_type not in NetperfConfig.TCP_STREAM_TESTS and test_type not in NetperfConfig.UDP_STREAM_TESTS): raise error.TestFail('Invalid netperf test type: %r.' % test_type) @staticmethod def test_type_to_tag(test_type): """Convert a test type to a concise unique tag. @param test_type string, one of TEST_TYPE_* above. @return string very short test description. """ return NetperfConfig.SHORT_TAGS.get(test_type, 'unknown') @staticmethod def test_type_to_human_readable_tag(test_type): """Convert a test type to a unique human readable tag. @param test_type string, one of TEST_TYPE_* above. @return string human readable test description. """ return NetperfConfig.READABLE_TAGS.get(test_type, 'unknown') @property def human_readable_tag(self): """@return string human readable test description.""" return self.test_type_to_human_readable_tag(self.test_type) @property def netperf_test_type(self): """@return string test type suitable for passing to netperf.""" if self.test_type == self.TEST_TYPE_UDP_MAERTS: return self.TEST_TYPE_UDP_STREAM return self.test_type @property def server_serves(self): """False iff the server and DUT should switch roles for running netperf. @return True iff netserv should be run on server host. When false this indicates that the DUT should run netserv and netperf should be run on the server against the client. """ return self.test_type != self.TEST_TYPE_UDP_MAERTS @property def tag(self): """@return string very short test description.""" return self.test_type_to_tag(self.test_type) def __init__(self, test_type, test_time=DEFAULT_TEST_TIME): """Construct a NetperfConfig. @param test_type string one of TEST_TYPE_* above. @param test_time int number of seconds to run the test for. """ self.test_type = test_type self.test_time = test_time self._assert_is_valid_test_type(self.netperf_test_type) def __repr__(self): return '%s(test_type=%r, test_time=%r' % ( self.__class__.__name__, self.test_type, self.test_time) class NetperfRunner(object): """Delegate to run netperf on a client/server pair.""" NETPERF_DATA_PORT = 12866 NETPERF_PORT = 12865 NETSERV_STARTUP_WAIT_TIME = 3 NETPERF_COMMAND_TIMEOUT_MARGIN = 60 def __init__(self, client_proxy, server_proxy, config): """Construct a NetperfRunner. @param client WiFiClient object. @param server LinuxSystem object. """ self._client_proxy = client_proxy self._server_proxy = server_proxy if config.server_serves: self._server_host = server_proxy.host self._client_host = client_proxy.host self._target_ip = server_proxy.wifi_ip else: self._server_host = client_proxy.host self._client_host = server_proxy.host self._target_ip = client_proxy.wifi_ip # Assume minijail0 is on ${PATH}, but raise exception if it's not # available on both server and client. self._minijail = 'minijail0' path_utils.must_be_installed(self._minijail, host=self._server_host) path_utils.must_be_installed(self._minijail, host=self._client_host) # Bind mount a tmpfs over /tmp, since netserver hard-codes the /tmp # path. netserver's log files aren't useful anyway. self._minijail = ("%s -v -k 'tmpfs,/tmp,tmpfs," "MS_NODEV|MS_NOEXEC|MS_NOSUID,mode=755,size=10M'" % self._minijail) self._command_netserv = path_utils.must_be_installed( 'netserver', host=self._server_host) self._command_netperf = path_utils.must_be_installed( 'netperf', host=self._client_host) self._config = config def __enter__(self): self._restart_netserv() return self def __exit__(self, exc_type, exc_value, traceback): self._client_proxy.firewall_cleanup() self._kill_netserv() def _kill_netserv(self): """Kills any existing netserv process on the serving host.""" self._server_host.run('pkill %s' % os.path.basename(self._command_netserv), ignore_status=True) def _restart_netserv(self): logging.info('Starting netserver...') self._kill_netserv() self._server_host.run('%s %s -p %d' % (self._minijail, self._command_netserv, self.NETPERF_PORT)) startup_time = time.time() self._client_proxy.firewall_open('tcp', self._server_proxy.wifi_ip) self._client_proxy.firewall_open('udp', self._server_proxy.wifi_ip) # Wait for the netserv to come up. while time.time() - startup_time < self.NETSERV_STARTUP_WAIT_TIME: time.sleep(0.1) def run(self, ignore_failures=False, retry_count=3): """Run netperf and take a performance measurement. @param ignore_failures bool True iff netperf runs that fail should be ignored. If this happens, run will return a None value rather than a NetperfResult. @param retry_count int number of times to retry the netperf command if it fails due to an internal timeout within netperf. @return NetperfResult summarizing a netperf run. """ netperf = '%s -H %s -p %s -t %s -l %d -- -P 0,%d' % ( self._command_netperf, self._target_ip, self.NETPERF_PORT, self._config.netperf_test_type, self._config.test_time, self.NETPERF_DATA_PORT) logging.debug('Running netperf client.') logging.info('Running netperf for %d seconds.', self._config.test_time) timeout = self._config.test_time + self.NETPERF_COMMAND_TIMEOUT_MARGIN for _ in range(retry_count): start_time = time.time() result = self._client_host.run(netperf, ignore_status=True, ignore_timeout=ignore_failures, timeout=timeout) if not result: logging.info('Retrying netperf after empty result.') continue # Exit retry loop on success. if not result.exit_status: break # Only retry for known retryable conditions. if 'Interrupted system call' in result.stderr: logging.info('Retrying netperf after internal timeout error.') continue if 'establish the control connection' in result.stdout: logging.info('Restarting netserv after client failed connect.') self._restart_netserv() continue # We are in an unhandled error case. logging.info('Retrying netperf after an unknown error.') if ignore_failures and (result is None or result.exit_status): return None if result is None: raise error.TestFail("No results; cmd: %s", netperf) if result.exit_status: raise error.CmdError(netperf, result, "Command returned non-zero exit status") duration = time.time() - start_time return NetperfResult.from_netperf_results( self._config.test_type, result.stdout, duration)