1#!/usr/bin/env python3 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import logging 19import math 20import os 21import shlex 22import subprocess 23import threading 24import time 25 26from acts import context 27from acts import utils 28from acts.controllers.android_device import AndroidDevice 29from acts.controllers.utils_lib.ssh import connection 30from acts.controllers.utils_lib.ssh import settings 31from acts.event import event_bus 32from acts.event.decorators import subscribe_static 33from acts.event.event import TestClassBeginEvent 34from acts.event.event import TestClassEndEvent 35from acts.libs.proc import job 36 37MOBLY_CONTROLLER_CONFIG_NAME = 'IPerfServer' 38ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers' 39KILOBITS = 1024 40MEGABITS = KILOBITS * 1024 41GIGABITS = MEGABITS * 1024 42BITS_IN_BYTE = 8 43 44 45def create(configs): 46 """ Factory method for iperf servers. 47 48 The function creates iperf servers based on at least one config. 49 If configs only specify a port number, a regular local IPerfServer object 50 will be created. If configs contains ssh settings or and AndroidDevice, 51 remote iperf servers will be started on those devices 52 53 Args: 54 configs: config parameters for the iperf server 55 """ 56 results = [] 57 for c in configs: 58 if type(c) in (str, int) and str(c).isdigit(): 59 results.append(IPerfServer(int(c))) 60 elif type(c) is dict and 'AndroidDevice' in c and 'port' in c: 61 results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port'])) 62 elif type(c) is dict and 'ssh_config' in c and 'port' in c: 63 results.append( 64 IPerfServerOverSsh(c['ssh_config'], 65 c['port'], 66 test_interface=c.get('test_interface'), 67 use_killall=c.get('use_killall'))) 68 else: 69 raise ValueError( 70 'Config entry %s in %s is not a valid IPerfServer ' 71 'config.' % (repr(c), configs)) 72 return results 73 74 75def get_info(iperf_servers): 76 """Placeholder for info about iperf servers 77 78 Returns: 79 None 80 """ 81 return None 82 83 84def destroy(iperf_server_list): 85 for iperf_server in iperf_server_list: 86 try: 87 iperf_server.stop() 88 except Exception: 89 logging.exception('Unable to properly clean up %s.' % iperf_server) 90 91 92class IPerfResult(object): 93 def __init__(self, result_path, reporting_speed_units='Mbytes'): 94 """Loads iperf result from file. 95 96 Loads iperf result from JSON formatted server log. File can be accessed 97 before or after server is stopped. Note that only the first JSON object 98 will be loaded and this funtion is not intended to be used with files 99 containing multiple iperf client runs. 100 """ 101 # if result_path isn't a path, treat it as JSON 102 self.reporting_speed_units = reporting_speed_units 103 if not os.path.exists(result_path): 104 self.result = json.loads(result_path) 105 else: 106 try: 107 with open(result_path, 'r') as f: 108 iperf_output = f.readlines() 109 if '}\n' in iperf_output: 110 iperf_output = iperf_output[:iperf_output.index('}\n' 111 ) + 1] 112 iperf_string = ''.join(iperf_output) 113 iperf_string = iperf_string.replace('nan', '0') 114 self.result = json.loads(iperf_string) 115 except ValueError: 116 with open(result_path, 'r') as f: 117 # Possibly a result from interrupted iperf run, 118 # skip first line and try again. 119 lines = f.readlines()[1:] 120 self.result = json.loads(''.join(lines)) 121 122 def _has_data(self): 123 """Checks if the iperf result has valid throughput data. 124 125 Returns: 126 True if the result contains throughput data. False otherwise. 127 """ 128 return ('end' in self.result) and ('sum_received' in self.result['end'] 129 or 'sum' in self.result['end']) 130 131 def _get_reporting_speed(self, network_speed_in_bits_per_second): 132 """Sets the units for the network speed reporting based on how the 133 object was initiated. Defaults to Megabytes per second. Currently 134 supported, bits per second (bits), kilobits per second (kbits), megabits 135 per second (mbits), gigabits per second (gbits), bytes per second 136 (bytes), kilobits per second (kbytes), megabits per second (mbytes), 137 gigabytes per second (gbytes). 138 139 Args: 140 network_speed_in_bits_per_second: The network speed from iperf in 141 bits per second. 142 143 Returns: 144 The value of the throughput in the appropriate units. 145 """ 146 speed_divisor = 1 147 if self.reporting_speed_units[1:].lower() == 'bytes': 148 speed_divisor = speed_divisor * BITS_IN_BYTE 149 if self.reporting_speed_units[0:1].lower() == 'k': 150 speed_divisor = speed_divisor * KILOBITS 151 if self.reporting_speed_units[0:1].lower() == 'm': 152 speed_divisor = speed_divisor * MEGABITS 153 if self.reporting_speed_units[0:1].lower() == 'g': 154 speed_divisor = speed_divisor * GIGABITS 155 return network_speed_in_bits_per_second / speed_divisor 156 157 def get_json(self): 158 """Returns the raw json output from iPerf.""" 159 return self.result 160 161 @property 162 def error(self): 163 return self.result.get('error', None) 164 165 @property 166 def avg_rate(self): 167 """Average UDP rate in MB/s over the entire run. 168 169 This is the average UDP rate observed at the terminal the iperf result 170 is pulled from. According to iperf3 documentation this is calculated 171 based on bytes sent and thus is not a good representation of the 172 quality of the link. If the result is not from a success run, this 173 property is None. 174 """ 175 if not self._has_data() or 'sum' not in self.result['end']: 176 return None 177 bps = self.result['end']['sum']['bits_per_second'] 178 return self._get_reporting_speed(bps) 179 180 @property 181 def avg_receive_rate(self): 182 """Average receiving rate in MB/s over the entire run. 183 184 This data may not exist if iperf was interrupted. If the result is not 185 from a success run, this property is None. 186 """ 187 if not self._has_data() or 'sum_received' not in self.result['end']: 188 return None 189 bps = self.result['end']['sum_received']['bits_per_second'] 190 return self._get_reporting_speed(bps) 191 192 @property 193 def avg_send_rate(self): 194 """Average sending rate in MB/s over the entire run. 195 196 This data may not exist if iperf was interrupted. If the result is not 197 from a success run, this property is None. 198 """ 199 if not self._has_data() or 'sum_sent' not in self.result['end']: 200 return None 201 bps = self.result['end']['sum_sent']['bits_per_second'] 202 return self._get_reporting_speed(bps) 203 204 @property 205 def instantaneous_rates(self): 206 """Instantaneous received rate in MB/s over entire run. 207 208 This data may not exist if iperf was interrupted. If the result is not 209 from a success run, this property is None. 210 """ 211 if not self._has_data(): 212 return None 213 intervals = [ 214 self._get_reporting_speed(interval['sum']['bits_per_second']) 215 for interval in self.result['intervals'] 216 ] 217 return intervals 218 219 @property 220 def std_deviation(self): 221 """Standard deviation of rates in MB/s over entire run. 222 223 This data may not exist if iperf was interrupted. If the result is not 224 from a success run, this property is None. 225 """ 226 return self.get_std_deviation(0) 227 228 def get_std_deviation(self, iperf_ignored_interval): 229 """Standard deviation of rates in MB/s over entire run. 230 231 This data may not exist if iperf was interrupted. If the result is not 232 from a success run, this property is None. A configurable number of 233 beginning (and the single last) intervals are ignored in the 234 calculation as they are inaccurate (e.g. the last is from a very small 235 interval) 236 237 Args: 238 iperf_ignored_interval: number of iperf interval to ignored in 239 calculating standard deviation 240 241 Returns: 242 The standard deviation. 243 """ 244 if not self._has_data(): 245 return None 246 instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval: 247 -1] 248 avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) 249 sqd_deviations = ([(rate - avg_rate)**2 250 for rate in instantaneous_rates]) 251 std_dev = math.sqrt( 252 math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) 253 return std_dev 254 255 256class IPerfServerBase(object): 257 # Keeps track of the number of IPerfServer logs to prevent file name 258 # collisions. 259 __log_file_counter = 0 260 261 __log_file_lock = threading.Lock() 262 263 def __init__(self, port): 264 self._port = port 265 # TODO(markdr): We shouldn't be storing the log files in an array like 266 # this. Nobody should be reading this property either. Instead, the 267 # IPerfResult should be returned in stop() with all the necessary info. 268 # See aosp/1012824 for a WIP implementation. 269 self.log_files = [] 270 271 @property 272 def port(self): 273 raise NotImplementedError('port must be specified.') 274 275 @property 276 def started(self): 277 raise NotImplementedError('started must be specified.') 278 279 def start(self, extra_args='', tag=''): 280 """Starts an iperf3 server. 281 282 Args: 283 extra_args: A string representing extra arguments to start iperf 284 server with. 285 tag: Appended to log file name to identify logs from different 286 iperf runs. 287 """ 288 raise NotImplementedError('start() must be specified.') 289 290 def stop(self): 291 """Stops the iperf server. 292 293 Returns: 294 The name of the log file generated from the terminated session. 295 """ 296 raise NotImplementedError('stop() must be specified.') 297 298 def _get_full_file_path(self, tag=None): 299 """Returns the full file path for the IPerfServer log file. 300 301 Note: If the directory for the file path does not exist, it will be 302 created. 303 304 Args: 305 tag: The tag passed in to the server run. 306 """ 307 out_dir = self.log_path 308 309 with IPerfServerBase.__log_file_lock: 310 tags = [tag, IPerfServerBase.__log_file_counter] 311 out_file_name = 'IPerfServer,%s.log' % (','.join( 312 [str(x) for x in tags if x != '' and x is not None])) 313 IPerfServerBase.__log_file_counter += 1 314 315 file_path = os.path.join(out_dir, out_file_name) 316 self.log_files.append(file_path) 317 return file_path 318 319 @property 320 def log_path(self): 321 current_context = context.get_current_context() 322 full_out_dir = os.path.join(current_context.get_full_output_path(), 323 'IPerfServer%s' % self.port) 324 325 # Ensure the directory exists. 326 os.makedirs(full_out_dir, exist_ok=True) 327 328 return full_out_dir 329 330 331def _get_port_from_ss_output(ss_output, pid): 332 pid = str(pid) 333 lines = ss_output.split('\n') 334 for line in lines: 335 if pid in line: 336 # Expected format: 337 # tcp LISTEN 0 5 *:<PORT> *:* users:(("cmd",pid=<PID>,fd=3)) 338 return line.split()[4].split(':')[-1] 339 else: 340 raise ProcessLookupError('Could not find started iperf3 process.') 341 342 343class IPerfServer(IPerfServerBase): 344 """Class that handles iperf server commands on localhost.""" 345 def __init__(self, port=5201): 346 super().__init__(port) 347 self._hinted_port = port 348 self._current_log_file = None 349 self._iperf_process = None 350 self._last_opened_file = None 351 352 @property 353 def port(self): 354 return self._port 355 356 @property 357 def started(self): 358 return self._iperf_process is not None 359 360 def start(self, extra_args='', tag=''): 361 """Starts iperf server on local machine. 362 363 Args: 364 extra_args: A string representing extra arguments to start iperf 365 server with. 366 tag: Appended to log file name to identify logs from different 367 iperf runs. 368 """ 369 if self._iperf_process is not None: 370 return 371 372 self._current_log_file = self._get_full_file_path(tag) 373 374 # Run an iperf3 server on the hinted port with JSON output. 375 command = ['iperf3', '-s', '-p', str(self._hinted_port), '-J'] 376 377 command.extend(shlex.split(extra_args)) 378 379 if self._last_opened_file: 380 self._last_opened_file.close() 381 self._last_opened_file = open(self._current_log_file, 'w') 382 self._iperf_process = subprocess.Popen(command, 383 stdout=self._last_opened_file, 384 stderr=subprocess.DEVNULL) 385 for attempts_left in reversed(range(3)): 386 try: 387 self._port = int( 388 _get_port_from_ss_output( 389 job.run('ss -l -p -n | grep iperf').stdout, 390 self._iperf_process.pid)) 391 break 392 except ProcessLookupError: 393 if attempts_left == 0: 394 raise 395 logging.debug('iperf3 process not started yet.') 396 time.sleep(.01) 397 398 def stop(self): 399 """Stops the iperf server. 400 401 Returns: 402 The name of the log file generated from the terminated session. 403 """ 404 if self._iperf_process is None: 405 return 406 407 if self._last_opened_file: 408 self._last_opened_file.close() 409 self._last_opened_file = None 410 411 self._iperf_process.terminate() 412 self._iperf_process = None 413 414 return self._current_log_file 415 416 def __del__(self): 417 self.stop() 418 419 420class IPerfServerOverSsh(IPerfServerBase): 421 """Class that handles iperf3 operations on remote machines.""" 422 def __init__(self, 423 ssh_config, 424 port, 425 test_interface=None, 426 use_killall=False): 427 super().__init__(port) 428 self.ssh_settings = settings.from_config(ssh_config) 429 self._ssh_session = None 430 self.start_ssh() 431 432 self._iperf_pid = None 433 self._current_tag = None 434 self.hostname = self.ssh_settings.hostname 435 self._use_killall = str(use_killall).lower() == 'true' 436 try: 437 # A test interface can only be found if an ip address is specified. 438 # A fully qualified hostname will return None for the 439 # test_interface. 440 self.test_interface = self._get_test_interface_based_on_ip( 441 test_interface) 442 except Exception: 443 self.test_interface = None 444 445 @property 446 def port(self): 447 return self._port 448 449 @property 450 def started(self): 451 return self._iperf_pid is not None 452 453 def _get_remote_log_path(self): 454 return '/tmp/iperf_server_port%s.log' % self.port 455 456 def _get_test_interface_based_on_ip(self, test_interface): 457 """Gets the test interface for a particular IP if the test interface 458 passed in test_interface is None 459 460 Args: 461 test_interface: Either a interface name, ie eth0, or None 462 463 Returns: 464 The name of the test interface. 465 """ 466 if test_interface: 467 return test_interface 468 return utils.get_interface_based_on_ip(self._ssh_session, 469 self.hostname) 470 471 def get_interface_ip_addresses(self, interface): 472 """Gets all of the ip addresses, ipv4 and ipv6, associated with a 473 particular interface name. 474 475 Args: 476 interface: The interface name on the device, ie eth0 477 478 Returns: 479 A list of dictionaries of the the various IP addresses: 480 ipv4_private_local_addresses: Any 192.168, 172.16, or 10 481 addresses 482 ipv4_public_addresses: Any IPv4 public addresses 483 ipv6_link_local_addresses: Any fe80:: addresses 484 ipv6_private_local_addresses: Any fd00:: addresses 485 ipv6_public_addresses: Any publicly routable addresses 486 """ 487 if not self._ssh_session: 488 self.start_ssh() 489 490 return utils.get_interface_ip_addresses(self._ssh_session, interface) 491 492 def renew_test_interface_ip_address(self): 493 """Renews the test interface's IP address. Necessary for changing 494 DHCP scopes during a test. 495 """ 496 if not self._ssh_session: 497 self.start_ssh() 498 utils.renew_linux_ip_address(self._ssh_session, self.test_interface) 499 500 def start(self, extra_args='', tag='', iperf_binary=None): 501 """Starts iperf server on specified machine and port. 502 503 Args: 504 extra_args: A string representing extra arguments to start iperf 505 server with. 506 tag: Appended to log file name to identify logs from different 507 iperf runs. 508 iperf_binary: Location of iperf3 binary. If none, it is assumed the 509 the binary is in the path. 510 """ 511 if self.started: 512 return 513 514 if not self._ssh_session: 515 self.start_ssh() 516 if not iperf_binary: 517 logging.debug('No iperf3 binary specified. ' 518 'Assuming iperf3 is in the path.') 519 iperf_binary = 'iperf3' 520 else: 521 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 522 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 523 524 cmd = '{cmd} {extra_flags} > {log_file}'.format( 525 cmd=iperf_command, 526 extra_flags=extra_args, 527 log_file=self._get_remote_log_path()) 528 529 job_result = self._ssh_session.run_async(cmd) 530 self._iperf_pid = job_result.stdout 531 self._current_tag = tag 532 533 def stop(self): 534 """Stops the iperf server. 535 536 Returns: 537 The name of the log file generated from the terminated session. 538 """ 539 if not self.started: 540 return 541 542 if self._use_killall: 543 self._ssh_session.run('killall iperf3', ignore_status=True) 544 else: 545 self._ssh_session.run_async('kill -9 {}'.format( 546 str(self._iperf_pid))) 547 548 iperf_result = self._ssh_session.run('cat {}'.format( 549 self._get_remote_log_path())) 550 551 log_file = self._get_full_file_path(self._current_tag) 552 with open(log_file, 'w') as f: 553 f.write(iperf_result.stdout) 554 555 self._ssh_session.run_async('rm {}'.format( 556 self._get_remote_log_path())) 557 self._iperf_pid = None 558 return log_file 559 560 def start_ssh(self): 561 """Starts an ssh session to the iperf server.""" 562 if not self._ssh_session: 563 self._ssh_session = connection.SshConnection(self.ssh_settings) 564 565 def close_ssh(self): 566 """Closes the ssh session to the iperf server, if one exists, preventing 567 connection reset errors when rebooting server device. 568 """ 569 if self.started: 570 self.stop() 571 if self._ssh_session: 572 self._ssh_session.close() 573 self._ssh_session = None 574 575 576# TODO(markdr): Remove this after automagic controller creation has been 577# removed. 578class _AndroidDeviceBridge(object): 579 """A helper class for connecting serial numbers to AndroidDevices.""" 580 581 _test_class = None 582 583 @staticmethod 584 @subscribe_static(TestClassBeginEvent) 585 def on_test_begin(event): 586 _AndroidDeviceBridge._test_class = event.test_class 587 588 @staticmethod 589 @subscribe_static(TestClassEndEvent) 590 def on_test_end(_): 591 _AndroidDeviceBridge._test_class = None 592 593 @staticmethod 594 def android_devices(): 595 """A dict of serial -> AndroidDevice, where AndroidDevice is a device 596 found in the current TestClass's controllers. 597 """ 598 if not _AndroidDeviceBridge._test_class: 599 return {} 600 return { 601 device.serial: device 602 for device in _AndroidDeviceBridge._test_class.android_devices 603 } 604 605 606event_bus.register_subscription( 607 _AndroidDeviceBridge.on_test_begin.subscription) 608event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription) 609 610 611class IPerfServerOverAdb(IPerfServerBase): 612 """Class that handles iperf3 operations over ADB devices.""" 613 def __init__(self, android_device_or_serial, port): 614 """Creates a new IPerfServerOverAdb object. 615 616 Args: 617 android_device_or_serial: Either an AndroidDevice object, or the 618 serial that corresponds to the AndroidDevice. Note that the 619 serial must be present in an AndroidDevice entry in the ACTS 620 config. 621 port: The port number to open the iperf server on. 622 """ 623 super().__init__(port) 624 self._android_device_or_serial = android_device_or_serial 625 626 self._iperf_process = None 627 self._current_tag = '' 628 629 @property 630 def port(self): 631 return self._port 632 633 @property 634 def started(self): 635 return self._iperf_process is not None 636 637 @property 638 def _android_device(self): 639 if isinstance(self._android_device_or_serial, AndroidDevice): 640 return self._android_device_or_serial 641 else: 642 return _AndroidDeviceBridge.android_devices()[ 643 self._android_device_or_serial] 644 645 def _get_device_log_path(self): 646 return '~/data/iperf_server_port%s.log' % self.port 647 648 def start(self, extra_args='', tag='', iperf_binary=None): 649 """Starts iperf server on an ADB device. 650 651 Args: 652 extra_args: A string representing extra arguments to start iperf 653 server with. 654 tag: Appended to log file name to identify logs from different 655 iperf runs. 656 iperf_binary: Location of iperf3 binary. If none, it is assumed the 657 the binary is in the path. 658 """ 659 if self._iperf_process is not None: 660 return 661 662 if not iperf_binary: 663 logging.debug('No iperf3 binary specified. ' 664 'Assuming iperf3 is in the path.') 665 iperf_binary = 'iperf3' 666 else: 667 logging.debug('Using iperf3 binary located at %s' % iperf_binary) 668 iperf_command = '{} -s -J -p {}'.format(iperf_binary, self.port) 669 670 self._iperf_process = self._android_device.adb.shell_nb( 671 '{cmd} {extra_flags} > {log_file}'.format( 672 cmd=iperf_command, 673 extra_flags=extra_args, 674 log_file=self._get_device_log_path())) 675 676 self._iperf_process_adb_pid = '' 677 while len(self._iperf_process_adb_pid) == 0: 678 self._iperf_process_adb_pid = self._android_device.adb.shell( 679 'pgrep iperf3 -n') 680 681 self._current_tag = tag 682 683 def stop(self): 684 """Stops the iperf server. 685 686 Returns: 687 The name of the log file generated from the terminated session. 688 """ 689 if self._iperf_process is None: 690 return 691 692 job.run('kill -9 {}'.format(self._iperf_process.pid)) 693 694 # TODO(markdr): update with definitive kill method 695 while True: 696 iperf_process_list = self._android_device.adb.shell('pgrep iperf3') 697 if iperf_process_list.find(self._iperf_process_adb_pid) == -1: 698 break 699 else: 700 self._android_device.adb.shell("kill -9 {}".format( 701 self._iperf_process_adb_pid)) 702 703 iperf_result = self._android_device.adb.shell('cat {}'.format( 704 self._get_device_log_path())) 705 706 log_file = self._get_full_file_path(self._current_tag) 707 with open(log_file, 'w') as f: 708 f.write(iperf_result) 709 710 self._android_device.adb.shell('rm {}'.format( 711 self._get_device_log_path())) 712 713 self._iperf_process = None 714 return log_file 715