1#!/usr/bin/env python3 2# 3# Copyright 2016 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import logging 19import math 20import os 21import threading 22 23from acts import context 24from acts import utils 25from acts.controllers.android_device import AndroidDevice 26from acts.controllers.utils_lib.ssh import connection 27from acts.controllers.utils_lib.ssh import settings 28from acts.event import event_bus 29from acts.event.decorators import subscribe_static 30from acts.event.event import TestClassBeginEvent 31from acts.event.event import TestClassEndEvent 32from acts.libs.proc import job 33 34ACTS_CONTROLLER_CONFIG_NAME = 'IPerfServer' 35ACTS_CONTROLLER_REFERENCE_NAME = 'iperf_servers' 36 37 38def create(configs): 39 """ Factory method for iperf servers. 40 41 The function creates iperf servers based on at least one config. 42 If configs only specify a port number, a regular local IPerfServer object 43 will be created. If configs contains ssh settings or and AndroidDevice, 44 remote iperf servers will be started on those devices 45 46 Args: 47 configs: config parameters for the iperf server 48 """ 49 results = [] 50 for c in configs: 51 if type(c) in (str, int) and str(c).isdigit(): 52 results.append(IPerfServer(int(c))) 53 elif type(c) is dict and 'AndroidDevice' in c and 'port' in c: 54 results.append(IPerfServerOverAdb(c['AndroidDevice'], c['port'])) 55 elif type(c) is dict and 'ssh_config' in c and 'port' in c: 56 results.append(IPerfServerOverSsh(c['ssh_config'], c['port'])) 57 else: 58 raise ValueError( 59 'Config entry %s in %s is not a valid IPerfServer ' 60 'config.' % (repr(c), configs)) 61 return results 62 63 64def destroy(iperf_server_list): 65 for iperf_server in iperf_server_list: 66 try: 67 iperf_server.stop() 68 except Exception: 69 logging.exception('Unable to properly clean up %s.' % iperf_server) 70 71 72class IPerfResult(object): 73 def __init__(self, result_path): 74 """Loads iperf result from file. 75 76 Loads iperf result from JSON formatted server log. File can be accessed 77 before or after server is stopped. Note that only the first JSON object 78 will be loaded and this funtion is not intended to be used with files 79 containing multiple iperf client runs. 80 """ 81 try: 82 with open(result_path, 'r') as f: 83 iperf_output = f.readlines() 84 if '}\n' in iperf_output: 85 iperf_output = iperf_output[:iperf_output.index('}\n') + 1] 86 iperf_string = ''.join(iperf_output) 87 iperf_string = iperf_string.replace('nan', '0') 88 self.result = json.loads(iperf_string) 89 except ValueError: 90 with open(result_path, 'r') as f: 91 # Possibly a result from interrupted iperf run, skip first line 92 # and try again. 93 lines = f.readlines()[1:] 94 self.result = json.loads(''.join(lines)) 95 96 def _has_data(self): 97 """Checks if the iperf result has valid throughput data. 98 99 Returns: 100 True if the result contains throughput data. False otherwise. 101 """ 102 return ('end' in self.result) and ('sum_received' in self.result['end'] 103 or 'sum' in self.result['end']) 104 105 def get_json(self): 106 """Returns the raw json output from iPerf.""" 107 return self.result 108 109 @property 110 def error(self): 111 return self.result.get('error', None) 112 113 @property 114 def avg_rate(self): 115 """Average UDP rate in MB/s over the entire run. 116 117 This is the average UDP rate observed at the terminal the iperf result 118 is pulled from. According to iperf3 documentation this is calculated 119 based on bytes sent and thus is not a good representation of the 120 quality of the link. If the result is not from a success run, this 121 property is None. 122 """ 123 if not self._has_data() or 'sum' not in self.result['end']: 124 return None 125 bps = self.result['end']['sum']['bits_per_second'] 126 return bps / 8 / 1024 / 1024 127 128 @property 129 def avg_receive_rate(self): 130 """Average receiving rate in MB/s over the entire run. 131 132 This data may not exist if iperf was interrupted. If the result is not 133 from a success run, this property is None. 134 """ 135 if not self._has_data() or 'sum_received' not in self.result['end']: 136 return None 137 bps = self.result['end']['sum_received']['bits_per_second'] 138 return bps / 8 / 1024 / 1024 139 140 @property 141 def avg_send_rate(self): 142 """Average sending rate in MB/s over the entire run. 143 144 This data may not exist if iperf was interrupted. If the result is not 145 from a success run, this property is None. 146 """ 147 if not self._has_data() or 'sum_sent' not in self.result['end']: 148 return None 149 bps = self.result['end']['sum_sent']['bits_per_second'] 150 return bps / 8 / 1024 / 1024 151 152 @property 153 def instantaneous_rates(self): 154 """Instantaneous received rate in MB/s over entire run. 155 156 This data may not exist if iperf was interrupted. If the result is not 157 from a success run, this property is None. 158 """ 159 if not self._has_data(): 160 return None 161 intervals = [ 162 interval['sum']['bits_per_second'] / 8 / 1024 / 1024 163 for interval in self.result['intervals'] 164 ] 165 return intervals 166 167 @property 168 def std_deviation(self): 169 """Standard deviation of rates in MB/s over entire run. 170 171 This data may not exist if iperf was interrupted. If the result is not 172 from a success run, this property is None. 173 """ 174 return self.get_std_deviation(0) 175 176 def get_std_deviation(self, iperf_ignored_interval): 177 """Standard deviation of rates in MB/s over entire run. 178 179 This data may not exist if iperf was interrupted. If the result is not 180 from a success run, this property is None. A configurable number of 181 beginning (and the single last) intervals are ignored in the 182 calculation as they are inaccurate (e.g. the last is from a very small 183 interval) 184 185 Args: 186 iperf_ignored_interval: number of iperf interval to ignored in 187 calculating standard deviation 188 189 Returns: 190 The standard deviation. 191 """ 192 if not self._has_data(): 193 return None 194 instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval: 195 -1] 196 avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) 197 sqd_deviations = [(rate - avg_rate)**2 for rate in instantaneous_rates] 198 std_dev = math.sqrt( 199 math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) 200 return std_dev 201 202 203class IPerfServerBase(object): 204 # Keeps track of the number of IPerfServer logs to prevent file name 205 # collisions. 206 __log_file_counter = 0 207 208 __log_file_lock = threading.Lock() 209 210 def __init__(self, port): 211 self._port = port 212 # TODO(markdr): Remove this after migration to the new iperf APIs. 213 self.log_files = [] 214 215 @property 216 def port(self): 217 raise NotImplementedError('port must be specified.') 218 219 @property 220 def started(self): 221 raise NotImplementedError('started must be specified.') 222 223 def start(self, extra_args='', tag=''): 224 """Starts an iperf3 server. 225 226 Args: 227 extra_args: A string representing extra arguments to start iperf 228 server with. 229 tag: Appended to log file name to identify logs from different 230 iperf runs. 231 """ 232 raise NotImplementedError('start() must be specified.') 233 234 def stop(self): 235 """Stops the iperf server. 236 237 Returns: 238 The name of the log file generated from the terminated session. 239 """ 240 raise NotImplementedError('stop() must be specified.') 241 242 def _get_full_file_path(self, tag=None): 243 """Returns the full file path for the IPerfServer log file. 244 245 Note: If the directory for the file path does not exist, it will be 246 created. 247 248 Args: 249 tag: The tag passed in to the server run. 250 """ 251 out_dir = self.log_path 252 253 with IPerfServerBase.__log_file_lock: 254 tags = [tag, IPerfServerBase.__log_file_counter] 255 out_file_name = 'IPerfServer,%s.log' % (','.join( 256 [str(x) for x in tags if x != '' and x is not None])) 257 IPerfServerBase.__log_file_counter += 1 258 259 file_path = os.path.join(out_dir, out_file_name) 260 self.log_files.append(file_path) 261 return file_path 262 263 @property 264 def log_path(self): 265 current_context = context.get_current_context() 266 full_out_dir = os.path.join(current_context.get_full_output_path(), 267 'IPerfServer%s' % self.port) 268 269 # Ensure the directory exists. 270 utils.create_dir(full_out_dir) 271 272 return full_out_dir 273 274 275class IPerfServer(IPerfServerBase): 276 """Class that handles iperf server commands on localhost.""" 277 278 def __init__(self, port): 279 super().__init__(port) 280 self._iperf_command = 'iperf3 -s -J -p {}'.format(self.port) 281 self._current_log_file = None 282 self._iperf_process = None 283 284 @property 285 def port(self): 286 return self._port 287 288 @property 289 def started(self): 290 return self._iperf_process is not None 291 292 def start(self, extra_args='', tag=''): 293 """Starts iperf server on local machine. 294 295 Args: 296 extra_args: A string representing extra arguments to start iperf 297 server with. 298 tag: Appended to log file name to identify logs from different 299 iperf runs. 300 """ 301 if self._iperf_process is not None: 302 return 303 304 self._current_log_file = self._get_full_file_path(tag) 305 306 cmd = '{cmd} {extra_flags} > {log_file}'.format( 307 cmd=self._iperf_command, 308 extra_flags=extra_args, 309 log_file=self._current_log_file) 310 311 self._iperf_process = utils.start_standing_subprocess(cmd) 312 313 def stop(self): 314 """Stops the iperf server. 315 316 Returns: 317 The name of the log file generated from the terminated session. 318 """ 319 if self._iperf_process is None: 320 return 321 322 utils.stop_standing_subprocess(self._iperf_process) 323 324 self._iperf_process = None 325 return self._current_log_file 326 327 328class IPerfServerOverSsh(IPerfServerBase): 329 """Class that handles iperf3 operations on remote machines.""" 330 331 def __init__(self, ssh_config, port): 332 super().__init__(port) 333 ssh_settings = settings.from_config(ssh_config) 334 self._ssh_session = connection.SshConnection(ssh_settings) 335 336 self._iperf_command = 'iperf3 -s -J -p {}'.format(self.port) 337 self._iperf_pid = None 338 self._current_tag = None 339 340 @property 341 def port(self): 342 return self._port 343 344 @property 345 def started(self): 346 return self._iperf_pid is not None 347 348 def _get_remote_log_path(self): 349 return 'iperf_server_port%s.log' % self.port 350 351 def start(self, extra_args='', tag=''): 352 """Starts iperf server on specified machine and port. 353 354 Args: 355 extra_args: A string representing extra arguments to start iperf 356 server with. 357 tag: Appended to log file name to identify logs from different 358 iperf runs. 359 """ 360 if self.started: 361 return 362 363 cmd = '{cmd} {extra_flags} > {log_file}'.format( 364 cmd=self._iperf_command, 365 extra_flags=extra_args, 366 log_file=self._get_remote_log_path()) 367 368 job_result = self._ssh_session.run_async(cmd) 369 self._iperf_pid = job_result.stdout 370 self._current_tag = tag 371 372 def stop(self): 373 """Stops the iperf server. 374 375 Returns: 376 The name of the log file generated from the terminated session. 377 """ 378 if not self.started: 379 return 380 381 self._ssh_session.run_async('kill -9 {}'.format(str(self._iperf_pid))) 382 iperf_result = self._ssh_session.run('cat {}'.format( 383 self._get_remote_log_path())) 384 385 log_file = self._get_full_file_path(self._current_tag) 386 with open(log_file, 'w') as f: 387 f.write(iperf_result.stdout) 388 389 self._ssh_session.run_async('rm {}'.format( 390 self._get_remote_log_path())) 391 self._iperf_pid = None 392 return log_file 393 394 395# TODO(markdr): Remove this after automagic controller creation has been 396# removed. 397class _AndroidDeviceBridge(object): 398 """A helper class for connecting serial numbers to AndroidDevices.""" 399 400 # A dict of serial -> AndroidDevice, where AndroidDevice is a device found 401 # in the current TestClass's controllers. 402 android_devices = {} 403 404 @staticmethod 405 @subscribe_static(TestClassBeginEvent) 406 def on_test_begin(event): 407 for device in getattr(event.test_class, 'android_devices', []): 408 _AndroidDeviceBridge.android_devices[device.serial] = device 409 410 @staticmethod 411 @subscribe_static(TestClassEndEvent) 412 def on_test_end(_): 413 _AndroidDeviceBridge.android_devices = {} 414 415 416event_bus.register_subscription( 417 _AndroidDeviceBridge.on_test_begin.subscription) 418event_bus.register_subscription(_AndroidDeviceBridge.on_test_end.subscription) 419 420 421class IPerfServerOverAdb(IPerfServerBase): 422 """Class that handles iperf3 operations over ADB devices.""" 423 424 def __init__(self, android_device_or_serial, port): 425 """Creates a new IPerfServerOverAdb object. 426 427 Args: 428 android_device_or_serial: Either an AndroidDevice object, or the 429 serial that corresponds to the AndroidDevice. Note that the 430 serial must be present in an AndroidDevice entry in the ACTS 431 config. 432 port: The port number to open the iperf server on. 433 """ 434 super().__init__(port) 435 self._android_device_or_serial = android_device_or_serial 436 437 self._iperf_command = 'iperf3 -s -J -p {}'.format(self.port) 438 self._iperf_process = None 439 self._current_tag = '' 440 441 @property 442 def port(self): 443 return self._port 444 445 @property 446 def started(self): 447 return self._iperf_process is not None 448 449 @property 450 def _android_device(self): 451 if isinstance(self._android_device_or_serial, AndroidDevice): 452 return self._android_device_or_serial 453 else: 454 return _AndroidDeviceBridge.android_devices[ 455 self._android_device_or_serial] 456 457 def _get_device_log_path(self): 458 return '~/data/iperf_server_port%s.log' % self.port 459 460 def start(self, extra_args='', tag=''): 461 """Starts iperf server on an ADB device. 462 463 Args: 464 extra_args: A string representing extra arguments to start iperf 465 server with. 466 tag: Appended to log file name to identify logs from different 467 iperf runs. 468 """ 469 if self._iperf_process is not None: 470 return 471 472 self._iperf_process = self._android_device.adb.shell_nb( 473 '{cmd} {extra_flags} > {log_file}'.format( 474 cmd=self._iperf_command, 475 extra_flags=extra_args, 476 log_file=self._get_device_log_path())) 477 self._iperf_process_adb_pid = '' 478 while len(self._iperf_process_adb_pid) == 0: 479 self._iperf_process_adb_pid = self._android_device.adb.shell( 480 'pgrep iperf3 -n') 481 482 self._current_tag = tag 483 484 def stop(self): 485 """Stops the iperf server. 486 487 Returns: 488 The name of the log file generated from the terminated session. 489 """ 490 if self._iperf_process is None: 491 return 492 493 job.run('kill -9 {}'.format(self._iperf_process.pid)) 494 495 #TODO(markdr): update with definitive kill method 496 while True: 497 iperf_process_list = self._android_device.adb.shell('pgrep iperf3') 498 if iperf_process_list.find(self._iperf_process_adb_pid) == -1: 499 break 500 else: 501 self._android_device.adb.shell("kill -9 {}".format( 502 self._iperf_process_adb_pid)) 503 504 iperf_result = self._android_device.adb.shell('cat {}'.format( 505 self._get_device_log_path())) 506 507 log_file = self._get_full_file_path(self._current_tag) 508 with open(log_file, 'w') as f: 509 f.write(iperf_result) 510 511 self._android_device.adb.shell('rm {}'.format( 512 self._get_device_log_path())) 513 514 self._iperf_process = None 515 return log_file 516