1#!/usr/bin/env python3 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import logging 19import math 20import os 21import shlex 22import subprocess 23import threading 24import time 25 26from acts import context 27from acts import logger as acts_logger 28from acts import utils 29from acts.controllers.android_device import AndroidDevice 30from acts.controllers.utils_lib.ssh import connection 31from acts.controllers.utils_lib.ssh import settings 32from acts.event import event_bus 33from acts.event.decorators import subscribe_static 34from acts.event.event import TestClassBeginEvent 35from acts.event.event import TestClassEndEvent 36from acts.libs.proc import job 37 38MOBLY_CONTROLLER_CONFIG_NAME = 'IPerfServer' 39ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers' 40KILOBITS = 1024 41MEGABITS = KILOBITS * 1024 42GIGABITS = MEGABITS * 1024 43BITS_IN_BYTE = 8 44 45 46def create(configs): 47 """ Factory method for iperf servers. 48 49 The function creates iperf servers based on at least one config. 50 If configs only specify a port number, a regular local IPerfServer object 51 will be created. If configs contains ssh settings or and AndroidDevice, 52 remote iperf servers will be started on those devices 53 54 Args: 55 configs: config parameters for the iperf server 56 """ 57 results = [] 58 for c in configs: 59 if type(c) in (str, int) and str(c).isdigit(): 60 results.append(IPerfServer(int(c))) 61 elif type(c) is dict and 'AndroidDevice' in c and 'port' in c: 62 results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port'])) 63 elif type(c) is dict and 'ssh_config' in c and 'port' in c: 64 results.append( 65 IPerfServerOverSsh(c['ssh_config'], 66 c['port'], 67 test_interface=c.get('test_interface'), 68 use_killall=c.get('use_killall'))) 69 else: 70 raise ValueError( 71 'Config entry %s in %s is not a valid IPerfServer ' 72 'config.' % (repr(c), configs)) 73 return results 74 75 76def get_info(iperf_servers): 77 """Placeholder for info about iperf servers 78 79 Returns: 80 None 81 """ 82 return None 83 84 85def destroy(iperf_server_list): 86 for iperf_server in iperf_server_list: 87 try: 88 iperf_server.stop() 89 except Exception: 90 logging.exception('Unable to properly clean up %s.' % iperf_server) 91 92 93class IPerfResult(object): 94 95 def __init__(self, result_path, reporting_speed_units='Mbytes'): 96 """Loads iperf result from file. 97 98 Loads iperf result from JSON formatted server log. File can be accessed 99 before or after server is stopped. Note that only the first JSON object 100 will be loaded and this funtion is not intended to be used with files 101 containing multiple iperf client runs. 102 """ 103 # if result_path isn't a path, treat it as JSON 104 self.reporting_speed_units = reporting_speed_units 105 if not os.path.exists(result_path): 106 self.result = json.loads(result_path) 107 else: 108 try: 109 with open(result_path, 'r') as f: 110 iperf_output = f.readlines() 111 if '}\n' in iperf_output: 112 iperf_output = iperf_output[:iperf_output.index('}\n' 113 ) + 1] 114 iperf_string = ''.join(iperf_output) 115 iperf_string = iperf_string.replace('nan', '0') 116 self.result = json.loads(iperf_string) 117 except ValueError: 118 with open(result_path, 'r') as f: 119 # Possibly a result from interrupted iperf run, 120 # skip first line and try again. 121 lines = f.readlines()[1:] 122 self.result = json.loads(''.join(lines)) 123 124 def _has_data(self): 125 """Checks if the iperf result has valid throughput data. 126 127 Returns: 128 True if the result contains throughput data. False otherwise. 129 """ 130 return ('end' in self.result) and ('sum_received' in self.result['end'] 131 or 'sum' in self.result['end']) 132 133 def _get_reporting_speed(self, network_speed_in_bits_per_second): 134 """Sets the units for the network speed reporting based on how the 135 object was initiated. Defaults to Megabytes per second. Currently 136 supported, bits per second (bits), kilobits per second (kbits), megabits 137 per second (mbits), gigabits per second (gbits), bytes per second 138 (bytes), kilobits per second (kbytes), megabits per second (mbytes), 139 gigabytes per second (gbytes). 140 141 Args: 142 network_speed_in_bits_per_second: The network speed from iperf in 143 bits per second. 144 145 Returns: 146 The value of the throughput in the appropriate units. 147 """ 148 speed_divisor = 1 149 if self.reporting_speed_units[1:].lower() == 'bytes': 150 speed_divisor = speed_divisor * BITS_IN_BYTE 151 if self.reporting_speed_units[0:1].lower() == 'k': 152 speed_divisor = speed_divisor * KILOBITS 153 if self.reporting_speed_units[0:1].lower() == 'm': 154 speed_divisor = speed_divisor * MEGABITS 155 if self.reporting_speed_units[0:1].lower() == 'g': 156 speed_divisor = speed_divisor * GIGABITS 157 return network_speed_in_bits_per_second / speed_divisor 158 159 def get_json(self): 160 """Returns the raw json output from iPerf.""" 161 return self.result 162 163 @property 164 def error(self): 165 return self.result.get('error', None) 166 167 @property 168 def avg_rate(self): 169 """Average UDP rate in MB/s over the entire run. 170 171 This is the average UDP rate observed at the terminal the iperf result 172 is pulled from. According to iperf3 documentation this is calculated 173 based on bytes sent and thus is not a good representation of the 174 quality of the link. If the result is not from a success run, this 175 property is None. 176 """ 177 if not self._has_data() or 'sum' not in self.result['end']: 178 return None 179 bps = self.result['end']['sum']['bits_per_second'] 180 return self._get_reporting_speed(bps) 181 182 @property 183 def avg_receive_rate(self): 184 """Average receiving rate in MB/s over the entire run. 185 186 This data may not exist if iperf was interrupted. If the result is not 187 from a success run, this property is None. 188 """ 189 if not self._has_data() or 'sum_received' not in self.result['end']: 190 return None 191 bps = self.result['end']['sum_received']['bits_per_second'] 192 return self._get_reporting_speed(bps) 193 194 @property 195 def avg_send_rate(self): 196 """Average sending rate in MB/s over the entire run. 197 198 This data may not exist if iperf was interrupted. If the result is not 199 from a success run, this property is None. 200 """ 201 if not self._has_data() or 'sum_sent' not in self.result['end']: 202 return None 203 bps = self.result['end']['sum_sent']['bits_per_second'] 204 return self._get_reporting_speed(bps) 205 206 @property 207 def instantaneous_rates(self): 208 """Instantaneous received rate in MB/s over entire run. 209 210 This data may not exist if iperf was interrupted. If the result is not 211 from a success run, this property is None. 212 """ 213 if not self._has_data(): 214 return None 215 intervals = [ 216 self._get_reporting_speed(interval['sum']['bits_per_second']) 217 for interval in self.result['intervals'] 218 ] 219 return intervals 220 221 @property 222 def std_deviation(self): 223 """Standard deviation of rates in MB/s over entire run. 224 225 This data may not exist if iperf was interrupted. If the result is not 226 from a success run, this property is None. 227 """ 228 return self.get_std_deviation(0) 229 230 def get_std_deviation(self, iperf_ignored_interval): 231 """Standard deviation of rates in MB/s over entire run. 232 233 This data may not exist if iperf was interrupted. If the result is not 234 from a success run, this property is None. A configurable number of 235 beginning (and the single last) intervals are ignored in the 236 calculation as they are inaccurate (e.g. the last is from a very small 237 interval) 238 239 Args: 240 iperf_ignored_interval: number of iperf interval to ignored in 241 calculating standard deviation 242 243 Returns: 244 The standard deviation. 245 """ 246 if not self._has_data(): 247 return None 248 instantaneous_rates = self.instantaneous_rates[ 249 iperf_ignored_interval:-1] 250 avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) 251 sqd_deviations = ([(rate - avg_rate)**2 252 for rate in instantaneous_rates]) 253 std_dev = math.sqrt( 254 math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) 255 return std_dev 256 257 258class IPerfServerBase(object): 259 # Keeps track of the number of IPerfServer logs to prevent file name 260 # collisions. 261 __log_file_counter = 0 262 263 __log_file_lock = threading.Lock() 264 265 def __init__(self, port): 266 self._port = port 267 # TODO(markdr): We shouldn't be storing the log files in an array like 268 # this. Nobody should be reading this property either. Instead, the 269 # IPerfResult should be returned in stop() with all the necessary info. 270 # See aosp/1012824 for a WIP implementation. 271 self.log_files = [] 272 273 @property 274 def port(self): 275 raise NotImplementedError('port must be specified.') 276 277 @property 278 def started(self): 279 raise NotImplementedError('started must be specified.') 280 281 def start(self, extra_args='', tag=''): 282 """Starts an iperf3 server. 283 284 Args: 285 extra_args: A string representing extra arguments to start iperf 286 server with. 287 tag: Appended to log file name to identify logs from different 288 iperf runs. 289 """ 290 raise NotImplementedError('start() must be specified.') 291 292 def stop(self): 293 """Stops the iperf server. 294 295 Returns: 296 The name of the log file generated from the terminated session. 297 """ 298 raise NotImplementedError('stop() must be specified.') 299 300 def _get_full_file_path(self, tag=None): 301 """Returns the full file path for the IPerfServer log file. 302 303 Note: If the directory for the file path does not exist, it will be 304 created. 305 306 Args: 307 tag: The tag passed in to the server run. 308 """ 309 out_dir = self.log_path 310 311 with IPerfServerBase.__log_file_lock: 312 tags = [tag, IPerfServerBase.__log_file_counter] 313 out_file_name = 'IPerfServer,%s.log' % (','.join( 314 [str(x) for x in tags if x != '' and x is not None])) 315 IPerfServerBase.__log_file_counter += 1 316 317 file_path = os.path.join(out_dir, out_file_name) 318 self.log_files.append(file_path) 319 return file_path 320 321 @property 322 def log_path(self): 323 current_context = context.get_current_context() 324 full_out_dir = os.path.join(current_context.get_full_output_path(), 325 'IPerfServer%s' % self.port) 326 327 # Ensure the directory exists. 328 os.makedirs(full_out_dir, exist_ok=True) 329 330 return full_out_dir 331 332 333def _get_port_from_ss_output(ss_output, pid): 334 pid = str(pid) 335 lines = ss_output.split('\n') 336 for line in lines: 337 if pid in line: 338 # Expected format: 339 # tcp LISTEN 0 5 *:<PORT> *:* users:(("cmd",pid=<PID>,fd=3)) 340 return line.split()[4].split(':')[-1] 341 else: 342 raise ProcessLookupError('Could not find started iperf3 process.') 343 344 345class IPerfServer(IPerfServerBase): 346 """Class that handles iperf server commands on localhost.""" 347 348 def __init__(self, port=5201): 349 super().__init__(port) 350 self._hinted_port = port 351 self._current_log_file = None 352 self._iperf_process = None 353 self._last_opened_file = None 354 355 @property 356 def port(self): 357 return self._port 358 359 @property 360 def started(self): 361 return self._iperf_process is not None 362 363 def start(self, extra_args='', tag=''): 364 """Starts iperf server on local machine. 365 366 Args: 367 extra_args: A string representing extra arguments to start iperf 368 server with. 369 tag: Appended to log file name to identify logs from different 370 iperf runs. 371 """ 372 if self._iperf_process is not None: 373 return 374 375 self._current_log_file = self._get_full_file_path(tag) 376 377 # Run an iperf3 server on the hinted port with JSON output. 378 command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J'] 379 380 command.extend(shlex.split(extra_args)) 381 382 if self._last_opened_file: 383 self._last_opened_file.close() 384 self._last_opened_file = open(self._current_log_file, 'w') 385 self._iperf_process = subprocess.Popen(command, 386 stdout=self._last_opened_file, 387 stderr=subprocess.DEVNULL) 388 for attempts_left in reversed(range(3)): 389 try: 390 self._port = int( 391 _get_port_from_ss_output( 392 job.run('ss -l -p -n | grep iperf').stdout, 393 self._iperf_process.pid)) 394 break 395 except ProcessLookupError: 396 if attempts_left == 0: 397 raise 398 logging.debug('iperf3 process not started yet.') 399 time.sleep(.01) 400 401 def stop(self): 402 """Stops the iperf server. 403 404 Returns: 405 The name of the log file generated from the terminated session. 406 """ 407 if self._iperf_process is None: 408 return 409 410 if self._last_opened_file: 411 self._last_opened_file.close() 412 self._last_opened_file = None 413 414 self._iperf_process.terminate() 415 self._iperf_process = None 416 417 return self._current_log_file 418 419 def __del__(self): 420 self.stop() 421 422 423class IPerfServerOverSsh(IPerfServerBase): 424 """Class that handles iperf3 operations on remote machines.""" 425 426 def __init__(self, 427 ssh_config, 428 port, 429 test_interface=None, 430 use_killall=False): 431 super().__init__(port) 432 self.ssh_settings = settings.from_config(ssh_config) 433 self.log = acts_logger.create_tagged_trace_logger( 434 f'IPerfServer | {self.ssh_settings.hostname}') 435 self._ssh_session = None 436 self.start_ssh() 437 438 self._iperf_pid = None 439 self._current_tag = None 440 self.hostname = self.ssh_settings.hostname 441 self._use_killall = str(use_killall).lower() == 'true' 442 try: 443 # A test interface can only be found if an ip address is specified. 444 # A fully qualified hostname will return None for the 445 # test_interface. 446 self.test_interface = test_interface if test_interface else utils.get_interface_based_on_ip( 447 self._ssh_session, self.hostname) 448 except Exception as e: 449 self.log.warning(e) 450 self.test_interface = None 451 452 @property 453 def port(self): 454 return self._port 455 456 @property 457 def started(self): 458 return self._iperf_pid is not None 459 460 def _get_remote_log_path(self): 461 return '/tmp/iperf_server_port%s.log' % self.port 462 463 def get_interface_ip_addresses(self, interface): 464 """Gets all of the ip addresses, ipv4 and ipv6, associated with a 465 particular interface name. 466 467 Args: 468 interface: The interface name on the device, ie eth0 469 470 Returns: 471 A list of dictionaries of the various IP addresses. See 472 utils.get_interface_ip_addresses. 473 """ 474 if not self._ssh_session: 475 self.start_ssh() 476 477 return utils.get_interface_ip_addresses(self._ssh_session, interface) 478 479 def renew_test_interface_ip_address(self): 480 """Renews the test interface's IPv4 address. 481 482 Necessary for changing DHCP scopes during a test. 483 """ 484 if not self._ssh_session: 485 self.start_ssh() 486 utils.renew_linux_ip_address(self._ssh_session, self.test_interface) 487 488 def get_addr(self, addr_type='ipv4_private', timeout_sec=None): 489 """Wait until a type of IP address on the test interface is available 490 then return it. 491 """ 492 if not self._ssh_session: 493 self.start_ssh() 494 return utils.get_addr(self._ssh_session, self.test_interface, 495 addr_type, timeout_sec) 496 497 def _cleanup_iperf_port(self): 498 """Checks and kills zombie iperf servers occupying intended port.""" 499 iperf_check_cmd = ('netstat -tulpn | grep LISTEN | grep iperf3' 500 ' | grep :{}').format(self.port) 501 iperf_check = self._ssh_session.run(iperf_check_cmd, 502 ignore_status=True) 503 iperf_check = iperf_check.stdout 504 if iperf_check: 505 logging.debug('Killing zombie server on port {}'.format(self.port)) 506 iperf_pid = iperf_check.split(' ')[-1].split('/')[0] 507 self._ssh_session.run('kill -9 {}'.format(str(iperf_pid))) 508 509 def start(self, extra_args='', tag='', iperf_binary=None): 510 """Starts iperf server on specified machine and port. 511 512 Args: 513 extra_args: A string representing extra arguments to start iperf 514 server with. 515 tag: Appended to log file name to identify logs from different 516 iperf runs. 517 iperf_binary: Location of iperf3 binary. If none, it is assumed the 518 the binary is in the path. 519 """ 520 if self.started: 521 return 522 523 if not self._ssh_session: 524 self.start_ssh() 525 self._cleanup_iperf_port() 526 if not iperf_binary: 527 logging.debug('No iperf3 binary specified. ' 528 'Assuming iperf3 is in the path.') 529 iperf_binary = 'iperf3' 530 else: 531 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 532 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 533 534 cmd = '{cmd} {extra_flags} > {log_file}'.format( 535 cmd=iperf_command, 536 extra_flags=extra_args, 537 log_file=self._get_remote_log_path()) 538 539 job_result = self._ssh_session.run_async(cmd) 540 self._iperf_pid = job_result.stdout 541 self._current_tag = tag 542 543 def stop(self): 544 """Stops the iperf server. 545 546 Returns: 547 The name of the log file generated from the terminated session. 548 """ 549 if not self.started: 550 return 551 552 if self._use_killall: 553 self._ssh_session.run('killall iperf3', ignore_status=True) 554 else: 555 self._ssh_session.run_async('kill -9 {}'.format( 556 str(self._iperf_pid))) 557 558 iperf_result = self._ssh_session.run('cat {}'.format( 559 self._get_remote_log_path())) 560 561 log_file = self._get_full_file_path(self._current_tag) 562 with open(log_file, 'w') as f: 563 f.write(iperf_result.stdout) 564 565 self._ssh_session.run_async('rm {}'.format( 566 self._get_remote_log_path())) 567 self._iperf_pid = None 568 return log_file 569 570 def start_ssh(self): 571 """Starts an ssh session to the iperf server.""" 572 if not self._ssh_session: 573 self._ssh_session = connection.SshConnection(self.ssh_settings) 574 575 def close_ssh(self): 576 """Closes the ssh session to the iperf server, if one exists, preventing 577 connection reset errors when rebooting server device. 578 """ 579 if self.started: 580 self.stop() 581 if self._ssh_session: 582 self._ssh_session.close() 583 self._ssh_session = None 584 585 586# TODO(markdr): Remove this after automagic controller creation has been 587# removed. 588class _AndroidDeviceBridge(object): 589 """A helper class for connecting serial numbers to AndroidDevices.""" 590 591 _test_class = None 592 593 @staticmethod 594 @subscribe_static(TestClassBeginEvent) 595 def on_test_begin(event): 596 _AndroidDeviceBridge._test_class = event.test_class 597 598 @staticmethod 599 @subscribe_static(TestClassEndEvent) 600 def on_test_end(_): 601 _AndroidDeviceBridge._test_class = None 602 603 @staticmethod 604 def android_devices(): 605 """A dict of serial -> AndroidDevice, where AndroidDevice is a device 606 found in the current TestClass's controllers. 607 """ 608 if not _AndroidDeviceBridge._test_class: 609 return {} 610 return { 611 device.serial: device 612 for device in _AndroidDeviceBridge._test_class.android_devices 613 } 614 615 616event_bus.register_subscription( 617 _AndroidDeviceBridge.on_test_begin.subscription) 618event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription) 619 620 621class IPerfServerOverAdb(IPerfServerBase): 622 """Class that handles iperf3 operations over ADB devices.""" 623 624 def __init__(self, android_device_or_serial, port): 625 """Creates a new IPerfServerOverAdb object. 626 627 Args: 628 android_device_or_serial: Either an AndroidDevice object, or the 629 serial that corresponds to the AndroidDevice. Note that the 630 serial must be present in an AndroidDevice entry in the ACTS 631 config. 632 port: The port number to open the iperf server on. 633 """ 634 super().__init__(port) 635 self._android_device_or_serial = android_device_or_serial 636 637 self._iperf_process = None 638 self._current_tag = '' 639 640 @property 641 def port(self): 642 return self._port 643 644 @property 645 def started(self): 646 return self._iperf_process is not None 647 648 @property 649 def _android_device(self): 650 if isinstance(self._android_device_or_serial, AndroidDevice): 651 return self._android_device_or_serial 652 else: 653 return _AndroidDeviceBridge.android_devices()[ 654 self._android_device_or_serial] 655 656 def _get_device_log_path(self): 657 return '~/data/iperf_server_port%s.log' % self.port 658 659 def start(self, extra_args='', tag='', iperf_binary=None): 660 """Starts iperf server on an ADB device. 661 662 Args: 663 extra_args: A string representing extra arguments to start iperf 664 server with. 665 tag: Appended to log file name to identify logs from different 666 iperf runs. 667 iperf_binary: Location of iperf3 binary. If none, it is assumed the 668 the binary is in the path. 669 """ 670 if self._iperf_process is not None: 671 return 672 673 if not iperf_binary: 674 logging.debug('No iperf3 binary specified. ' 675 'Assuming iperf3 is in the path.') 676 iperf_binary = 'iperf3' 677 else: 678 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 679 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 680 681 self._iperf_process = self._android_device.adb.shell_nb( 682 '{cmd} {extra_flags} > {log_file}'.format( 683 cmd=iperf_command, 684 extra_flags=extra_args, 685 log_file=self._get_device_log_path())) 686 687 self._iperf_process_adb_pid = '' 688 while len(self._iperf_process_adb_pid) == 0: 689 self._iperf_process_adb_pid = self._android_device.adb.shell( 690 'pgrep iperf3 -n') 691 692 self._current_tag = tag 693 694 def stop(self): 695 """Stops the iperf server. 696 697 Returns: 698 The name of the log file generated from the terminated session. 699 """ 700 if self._iperf_process is None: 701 return 702 703 job.run('kill -9 {}'.format(self._iperf_process.pid)) 704 705 # TODO(markdr): update with definitive kill method 706 while True: 707 iperf_process_list = self._android_device.adb.shell('pgrep iperf3') 708 if iperf_process_list.find(self._iperf_process_adb_pid) == -1: 709 break 710 else: 711 self._android_device.adb.shell("kill -9 {}".format( 712 self._iperf_process_adb_pid)) 713 714 iperf_result = self._android_device.adb.shell('cat {}'.format( 715 self._get_device_log_path())) 716 717 log_file = self._get_full_file_path(self._current_tag) 718 with open(log_file, 'w') as f: 719 f.write(iperf_result) 720 721 self._android_device.adb.shell('rm {}'.format( 722 self._get_device_log_path())) 723 724 self._iperf_process = None 725 return log_file 726