1#!/usr/bin/env python3 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import logging 19import math 20import os 21import shlex 22import subprocess 23import threading 24import time 25 26from acts import context 27from acts import utils 28from acts.controllers.android_device import AndroidDevice 29from acts.controllers.utils_lib.ssh import connection 30from acts.controllers.utils_lib.ssh import settings 31from acts.event import event_bus 32from acts.event.decorators import subscribe_static 33from acts.event.event import TestClassBeginEvent 34from acts.event.event import TestClassEndEvent 35from acts.libs.proc import job 36 37MOBLY_CONTROLLER_CONFIG_NAME = 'IPerfServer' 38ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers' 39KILOBITS = 1024 40MEGABITS = KILOBITS * 1024 41GIGABITS = MEGABITS * 1024 42BITS_IN_BYTE = 8 43 44 45def create(configs): 46 """ Factory method for iperf servers. 47 48 The function creates iperf servers based on at least one config. 49 If configs only specify a port number, a regular local IPerfServer object 50 will be created. If configs contains ssh settings or and AndroidDevice, 51 remote iperf servers will be started on those devices 52 53 Args: 54 configs: config parameters for the iperf server 55 """ 56 results = [] 57 for c in configs: 58 if type(c) in (str, int) and str(c).isdigit(): 59 results.append(IPerfServer(int(c))) 60 elif type(c) is dict and 'AndroidDevice' in c and 'port' in c: 61 results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port'])) 62 elif type(c) is dict and 'ssh_config' in c and 'port' in c: 63 results.append( 64 IPerfServerOverSsh(c['ssh_config'], 65 c['port'], 66 test_interface=c.get('test_interface'), 67 use_killall=c.get('use_killall'))) 68 else: 69 raise ValueError( 70 'Config entry %s in %s is not a valid IPerfServer ' 71 'config.' % (repr(c), configs)) 72 return results 73 74 75def get_info(iperf_servers): 76 """Placeholder for info about iperf servers 77 78 Returns: 79 None 80 """ 81 return None 82 83 84def destroy(iperf_server_list): 85 for iperf_server in iperf_server_list: 86 try: 87 iperf_server.stop() 88 except Exception: 89 logging.exception('Unable to properly clean up %s.' % iperf_server) 90 91 92class IPerfResult(object): 93 def __init__(self, result_path, reporting_speed_units='Mbytes'): 94 """Loads iperf result from file. 95 96 Loads iperf result from JSON formatted server log. File can be accessed 97 before or after server is stopped. Note that only the first JSON object 98 will be loaded and this funtion is not intended to be used with files 99 containing multiple iperf client runs. 100 """ 101 # if result_path isn't a path, treat it as JSON 102 self.reporting_speed_units = reporting_speed_units 103 if not os.path.exists(result_path): 104 self.result = json.loads(result_path) 105 else: 106 try: 107 with open(result_path, 'r') as f: 108 iperf_output = f.readlines() 109 if '}\n' in iperf_output: 110 iperf_output = iperf_output[:iperf_output.index('}\n' 111 ) + 1] 112 iperf_string = ''.join(iperf_output) 113 iperf_string = iperf_string.replace('nan', '0') 114 self.result = json.loads(iperf_string) 115 except ValueError: 116 with open(result_path, 'r') as f: 117 # Possibly a result from interrupted iperf run, 118 # skip first line and try again. 119 lines = f.readlines()[1:] 120 self.result = json.loads(''.join(lines)) 121 122 def _has_data(self): 123 """Checks if the iperf result has valid throughput data. 124 125 Returns: 126 True if the result contains throughput data. False otherwise. 127 """ 128 return ('end' in self.result) and ('sum_received' in self.result['end'] 129 or 'sum' in self.result['end']) 130 131 def _get_reporting_speed(self, network_speed_in_bits_per_second): 132 """Sets the units for the network speed reporting based on how the 133 object was initiated. Defaults to Megabytes per second. Currently 134 supported, bits per second (bits), kilobits per second (kbits), megabits 135 per second (mbits), gigabits per second (gbits), bytes per second 136 (bytes), kilobits per second (kbytes), megabits per second (mbytes), 137 gigabytes per second (gbytes). 138 139 Args: 140 network_speed_in_bits_per_second: The network speed from iperf in 141 bits per second. 142 143 Returns: 144 The value of the throughput in the appropriate units. 145 """ 146 speed_divisor = 1 147 if self.reporting_speed_units[1:].lower() == 'bytes': 148 speed_divisor = speed_divisor * BITS_IN_BYTE 149 if self.reporting_speed_units[0:1].lower() == 'k': 150 speed_divisor = speed_divisor * KILOBITS 151 if self.reporting_speed_units[0:1].lower() == 'm': 152 speed_divisor = speed_divisor * MEGABITS 153 if self.reporting_speed_units[0:1].lower() == 'g': 154 speed_divisor = speed_divisor * GIGABITS 155 return network_speed_in_bits_per_second / speed_divisor 156 157 def get_json(self): 158 """Returns the raw json output from iPerf.""" 159 return self.result 160 161 @property 162 def error(self): 163 return self.result.get('error', None) 164 165 @property 166 def avg_rate(self): 167 """Average UDP rate in MB/s over the entire run. 168 169 This is the average UDP rate observed at the terminal the iperf result 170 is pulled from. According to iperf3 documentation this is calculated 171 based on bytes sent and thus is not a good representation of the 172 quality of the link. If the result is not from a success run, this 173 property is None. 174 """ 175 if not self._has_data() or 'sum' not in self.result['end']: 176 return None 177 bps = self.result['end']['sum']['bits_per_second'] 178 return self._get_reporting_speed(bps) 179 180 @property 181 def avg_receive_rate(self): 182 """Average receiving rate in MB/s over the entire run. 183 184 This data may not exist if iperf was interrupted. If the result is not 185 from a success run, this property is None. 186 """ 187 if not self._has_data() or 'sum_received' not in self.result['end']: 188 return None 189 bps = self.result['end']['sum_received']['bits_per_second'] 190 return self._get_reporting_speed(bps) 191 192 @property 193 def avg_send_rate(self): 194 """Average sending rate in MB/s over the entire run. 195 196 This data may not exist if iperf was interrupted. If the result is not 197 from a success run, this property is None. 198 """ 199 if not self._has_data() or 'sum_sent' not in self.result['end']: 200 return None 201 bps = self.result['end']['sum_sent']['bits_per_second'] 202 return self._get_reporting_speed(bps) 203 204 @property 205 def instantaneous_rates(self): 206 """Instantaneous received rate in MB/s over entire run. 207 208 This data may not exist if iperf was interrupted. If the result is not 209 from a success run, this property is None. 210 """ 211 if not self._has_data(): 212 return None 213 intervals = [ 214 self._get_reporting_speed(interval['sum']['bits_per_second']) 215 for interval in self.result['intervals'] 216 ] 217 return intervals 218 219 @property 220 def std_deviation(self): 221 """Standard deviation of rates in MB/s over entire run. 222 223 This data may not exist if iperf was interrupted. If the result is not 224 from a success run, this property is None. 225 """ 226 return self.get_std_deviation(0) 227 228 def get_std_deviation(self, iperf_ignored_interval): 229 """Standard deviation of rates in MB/s over entire run. 230 231 This data may not exist if iperf was interrupted. If the result is not 232 from a success run, this property is None. A configurable number of 233 beginning (and the single last) intervals are ignored in the 234 calculation as they are inaccurate (e.g. the last is from a very small 235 interval) 236 237 Args: 238 iperf_ignored_interval: number of iperf interval to ignored in 239 calculating standard deviation 240 241 Returns: 242 The standard deviation. 243 """ 244 if not self._has_data(): 245 return None 246 instantaneous_rates = self.instantaneous_rates[ 247 iperf_ignored_interval:-1] 248 avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) 249 sqd_deviations = ([(rate - avg_rate)**2 250 for rate in instantaneous_rates]) 251 std_dev = math.sqrt( 252 math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) 253 return std_dev 254 255 256class IPerfServerBase(object): 257 # Keeps track of the number of IPerfServer logs to prevent file name 258 # collisions. 259 __log_file_counter = 0 260 261 __log_file_lock = threading.Lock() 262 263 def __init__(self, port): 264 self._port = port 265 # TODO(markdr): We shouldn't be storing the log files in an array like 266 # this. Nobody should be reading this property either. Instead, the 267 # IPerfResult should be returned in stop() with all the necessary info. 268 # See aosp/1012824 for a WIP implementation. 269 self.log_files = [] 270 271 @property 272 def port(self): 273 raise NotImplementedError('port must be specified.') 274 275 @property 276 def started(self): 277 raise NotImplementedError('started must be specified.') 278 279 def start(self, extra_args='', tag=''): 280 """Starts an iperf3 server. 281 282 Args: 283 extra_args: A string representing extra arguments to start iperf 284 server with. 285 tag: Appended to log file name to identify logs from different 286 iperf runs. 287 """ 288 raise NotImplementedError('start() must be specified.') 289 290 def stop(self): 291 """Stops the iperf server. 292 293 Returns: 294 The name of the log file generated from the terminated session. 295 """ 296 raise NotImplementedError('stop() must be specified.') 297 298 def _get_full_file_path(self, tag=None): 299 """Returns the full file path for the IPerfServer log file. 300 301 Note: If the directory for the file path does not exist, it will be 302 created. 303 304 Args: 305 tag: The tag passed in to the server run. 306 """ 307 out_dir = self.log_path 308 309 with IPerfServerBase.__log_file_lock: 310 tags = [tag, IPerfServerBase.__log_file_counter] 311 out_file_name = 'IPerfServer,%s.log' % (','.join( 312 [str(x) for x in tags if x != '' and x is not None])) 313 IPerfServerBase.__log_file_counter += 1 314 315 file_path = os.path.join(out_dir, out_file_name) 316 self.log_files.append(file_path) 317 return file_path 318 319 @property 320 def log_path(self): 321 current_context = context.get_current_context() 322 full_out_dir = os.path.join(current_context.get_full_output_path(), 323 'IPerfServer%s' % self.port) 324 325 # Ensure the directory exists. 326 os.makedirs(full_out_dir, exist_ok=True) 327 328 return full_out_dir 329 330 331def _get_port_from_ss_output(ss_output, pid): 332 pid = str(pid) 333 lines = ss_output.split('\n') 334 for line in lines: 335 if pid in line: 336 # Expected format: 337 # tcp LISTEN 0 5 *:<PORT> *:* users:(("cmd",pid=<PID>,fd=3)) 338 return line.split()[4].split(':')[-1] 339 else: 340 raise ProcessLookupError('Could not find started iperf3 process.') 341 342 343class IPerfServer(IPerfServerBase): 344 """Class that handles iperf server commands on localhost.""" 345 def __init__(self, port=5201): 346 super().__init__(port) 347 self._hinted_port = port 348 self._current_log_file = None 349 self._iperf_process = None 350 self._last_opened_file = None 351 352 @property 353 def port(self): 354 return self._port 355 356 @property 357 def started(self): 358 return self._iperf_process is not None 359 360 def start(self, extra_args='', tag=''): 361 """Starts iperf server on local machine. 362 363 Args: 364 extra_args: A string representing extra arguments to start iperf 365 server with. 366 tag: Appended to log file name to identify logs from different 367 iperf runs. 368 """ 369 if self._iperf_process is not None: 370 return 371 372 self._current_log_file = self._get_full_file_path(tag) 373 374 # Run an iperf3 server on the hinted port with JSON output. 375 command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J'] 376 377 command.extend(shlex.split(extra_args)) 378 379 if self._last_opened_file: 380 self._last_opened_file.close() 381 self._last_opened_file = open(self._current_log_file, 'w') 382 self._iperf_process = subprocess.Popen(command, 383 stdout=self._last_opened_file, 384 stderr=subprocess.DEVNULL) 385 for attempts_left in reversed(range(3)): 386 try: 387 self._port = int( 388 _get_port_from_ss_output( 389 job.run('ss -l -p -n | grep iperf').stdout, 390 self._iperf_process.pid)) 391 break 392 except ProcessLookupError: 393 if attempts_left == 0: 394 raise 395 logging.debug('iperf3 process not started yet.') 396 time.sleep(.01) 397 398 def stop(self): 399 """Stops the iperf server. 400 401 Returns: 402 The name of the log file generated from the terminated session. 403 """ 404 if self._iperf_process is None: 405 return 406 407 if self._last_opened_file: 408 self._last_opened_file.close() 409 self._last_opened_file = None 410 411 self._iperf_process.terminate() 412 self._iperf_process = None 413 414 return self._current_log_file 415 416 def __del__(self): 417 self.stop() 418 419 420class IPerfServerOverSsh(IPerfServerBase): 421 """Class that handles iperf3 operations on remote machines.""" 422 def __init__(self, 423 ssh_config, 424 port, 425 test_interface=None, 426 use_killall=False): 427 super().__init__(port) 428 self.ssh_settings = settings.from_config(ssh_config) 429 self._ssh_session = None 430 self.start_ssh() 431 432 self._iperf_pid = None 433 self._current_tag = None 434 self.hostname = self.ssh_settings.hostname 435 self._use_killall = str(use_killall).lower() == 'true' 436 try: 437 # A test interface can only be found if an ip address is specified. 438 # A fully qualified hostname will return None for the 439 # test_interface. 440 self.test_interface = self._get_test_interface_based_on_ip( 441 test_interface) 442 except Exception: 443 self.test_interface = None 444 445 @property 446 def port(self): 447 return self._port 448 449 @property 450 def started(self): 451 return self._iperf_pid is not None 452 453 def _get_remote_log_path(self): 454 return '/tmp/iperf_server_port%s.log' % self.port 455 456 def _get_test_interface_based_on_ip(self, test_interface): 457 """Gets the test interface for a particular IP if the test interface 458 passed in test_interface is None 459 460 Args: 461 test_interface: Either a interface name, ie eth0, or None 462 463 Returns: 464 The name of the test interface. 465 """ 466 if test_interface: 467 return test_interface 468 return utils.get_interface_based_on_ip(self._ssh_session, 469 self.hostname) 470 471 def get_interface_ip_addresses(self, interface): 472 """Gets all of the ip addresses, ipv4 and ipv6, associated with a 473 particular interface name. 474 475 Args: 476 interface: The interface name on the device, ie eth0 477 478 Returns: 479 A list of dictionaries of the the various IP addresses: 480 ipv4_private_local_addresses: Any 192.168, 172.16, or 10 481 addresses 482 ipv4_public_addresses: Any IPv4 public addresses 483 ipv6_link_local_addresses: Any fe80:: addresses 484 ipv6_private_local_addresses: Any fd00:: addresses 485 ipv6_public_addresses: Any publicly routable addresses 486 """ 487 if not self._ssh_session: 488 self.start_ssh() 489 490 return utils.get_interface_ip_addresses(self._ssh_session, interface) 491 492 def renew_test_interface_ip_address(self): 493 """Renews the test interface's IP address. Necessary for changing 494 DHCP scopes during a test. 495 """ 496 if not self._ssh_session: 497 self.start_ssh() 498 utils.renew_linux_ip_address(self._ssh_session, self.test_interface) 499 500 def _cleanup_iperf_port(self): 501 """Checks and kills zombie iperf servers occupying intended port.""" 502 iperf_check_cmd = ('netstat -tulpn | grep LISTEN | grep iperf3' 503 ' | grep :{}').format(self.port) 504 iperf_check = self._ssh_session.run(iperf_check_cmd, 505 ignore_status=True) 506 iperf_check = iperf_check.stdout 507 if iperf_check: 508 logging.debug('Killing zombie server on port {}'.format(self.port)) 509 iperf_pid = iperf_check.split(' ')[-1].split('/')[0] 510 self._ssh_session.run('kill -9 {}'.format(str(iperf_pid))) 511 512 def start(self, extra_args='', tag='', iperf_binary=None): 513 """Starts iperf server on specified machine and port. 514 515 Args: 516 extra_args: A string representing extra arguments to start iperf 517 server with. 518 tag: Appended to log file name to identify logs from different 519 iperf runs. 520 iperf_binary: Location of iperf3 binary. If none, it is assumed the 521 the binary is in the path. 522 """ 523 if self.started: 524 return 525 526 if not self._ssh_session: 527 self.start_ssh() 528 self._cleanup_iperf_port() 529 if not iperf_binary: 530 logging.debug('No iperf3 binary specified. ' 531 'Assuming iperf3 is in the path.') 532 iperf_binary = 'iperf3' 533 else: 534 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 535 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 536 537 cmd = '{cmd} {extra_flags} > {log_file}'.format( 538 cmd=iperf_command, 539 extra_flags=extra_args, 540 log_file=self._get_remote_log_path()) 541 542 job_result = self._ssh_session.run_async(cmd) 543 self._iperf_pid = job_result.stdout 544 self._current_tag = tag 545 546 def stop(self): 547 """Stops the iperf server. 548 549 Returns: 550 The name of the log file generated from the terminated session. 551 """ 552 if not self.started: 553 return 554 555 if self._use_killall: 556 self._ssh_session.run('killall iperf3', ignore_status=True) 557 else: 558 self._ssh_session.run_async('kill -9 {}'.format( 559 str(self._iperf_pid))) 560 561 iperf_result = self._ssh_session.run('cat {}'.format( 562 self._get_remote_log_path())) 563 564 log_file = self._get_full_file_path(self._current_tag) 565 with open(log_file, 'w') as f: 566 f.write(iperf_result.stdout) 567 568 self._ssh_session.run_async('rm {}'.format( 569 self._get_remote_log_path())) 570 self._iperf_pid = None 571 return log_file 572 573 def start_ssh(self): 574 """Starts an ssh session to the iperf server.""" 575 if not self._ssh_session: 576 self._ssh_session = connection.SshConnection(self.ssh_settings) 577 578 def close_ssh(self): 579 """Closes the ssh session to the iperf server, if one exists, preventing 580 connection reset errors when rebooting server device. 581 """ 582 if self.started: 583 self.stop() 584 if self._ssh_session: 585 self._ssh_session.close() 586 self._ssh_session = None 587 588 589# TODO(markdr): Remove this after automagic controller creation has been 590# removed. 591class _AndroidDeviceBridge(object): 592 """A helper class for connecting serial numbers to AndroidDevices.""" 593 594 _test_class = None 595 596 @staticmethod 597 @subscribe_static(TestClassBeginEvent) 598 def on_test_begin(event): 599 _AndroidDeviceBridge._test_class = event.test_class 600 601 @staticmethod 602 @subscribe_static(TestClassEndEvent) 603 def on_test_end(_): 604 _AndroidDeviceBridge._test_class = None 605 606 @staticmethod 607 def android_devices(): 608 """A dict of serial -> AndroidDevice, where AndroidDevice is a device 609 found in the current TestClass's controllers. 610 """ 611 if not _AndroidDeviceBridge._test_class: 612 return {} 613 return { 614 device.serial: device 615 for device in _AndroidDeviceBridge._test_class.android_devices 616 } 617 618 619event_bus.register_subscription( 620 _AndroidDeviceBridge.on_test_begin.subscription) 621event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription) 622 623 624class IPerfServerOverAdb(IPerfServerBase): 625 """Class that handles iperf3 operations over ADB devices.""" 626 def __init__(self, android_device_or_serial, port): 627 """Creates a new IPerfServerOverAdb object. 628 629 Args: 630 android_device_or_serial: Either an AndroidDevice object, or the 631 serial that corresponds to the AndroidDevice. Note that the 632 serial must be present in an AndroidDevice entry in the ACTS 633 config. 634 port: The port number to open the iperf server on. 635 """ 636 super().__init__(port) 637 self._android_device_or_serial = android_device_or_serial 638 639 self._iperf_process = None 640 self._current_tag = '' 641 642 @property 643 def port(self): 644 return self._port 645 646 @property 647 def started(self): 648 return self._iperf_process is not None 649 650 @property 651 def _android_device(self): 652 if isinstance(self._android_device_or_serial, AndroidDevice): 653 return self._android_device_or_serial 654 else: 655 return _AndroidDeviceBridge.android_devices()[ 656 self._android_device_or_serial] 657 658 def _get_device_log_path(self): 659 return '~/data/iperf_server_port%s.log' % self.port 660 661 def start(self, extra_args='', tag='', iperf_binary=None): 662 """Starts iperf server on an ADB device. 663 664 Args: 665 extra_args: A string representing extra arguments to start iperf 666 server with. 667 tag: Appended to log file name to identify logs from different 668 iperf runs. 669 iperf_binary: Location of iperf3 binary. If none, it is assumed the 670 the binary is in the path. 671 """ 672 if self._iperf_process is not None: 673 return 674 675 if not iperf_binary: 676 logging.debug('No iperf3 binary specified. ' 677 'Assuming iperf3 is in the path.') 678 iperf_binary = 'iperf3' 679 else: 680 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 681 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 682 683 self._iperf_process = self._android_device.adb.shell_nb( 684 '{cmd} {extra_flags} > {log_file}'.format( 685 cmd=iperf_command, 686 extra_flags=extra_args, 687 log_file=self._get_device_log_path())) 688 689 self._iperf_process_adb_pid = '' 690 while len(self._iperf_process_adb_pid) == 0: 691 self._iperf_process_adb_pid = self._android_device.adb.shell( 692 'pgrep iperf3 -n') 693 694 self._current_tag = tag 695 696 def stop(self): 697 """Stops the iperf server. 698 699 Returns: 700 The name of the log file generated from the terminated session. 701 """ 702 if self._iperf_process is None: 703 return 704 705 job.run('kill -9 {}'.format(self._iperf_process.pid)) 706 707 # TODO(markdr): update with definitive kill method 708 while True: 709 iperf_process_list = self._android_device.adb.shell('pgrep iperf3') 710 if iperf_process_list.find(self._iperf_process_adb_pid) == -1: 711 break 712 else: 713 self._android_device.adb.shell("kill -9 {}".format( 714 self._iperf_process_adb_pid)) 715 716 iperf_result = self._android_device.adb.shell('cat {}'.format( 717 self._get_device_log_path())) 718 719 log_file = self._get_full_file_path(self._current_tag) 720 with open(log_file, 'w') as f: 721 f.write(iperf_result) 722 723 self._android_device.adb.shell('rm {}'.format( 724 self._get_device_log_path())) 725 726 self._iperf_process = None 727 return log_file 728