1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import json 17import logging 18import math 19import os 20import re 21import time 22 23import acts.controllers.power_monitor as power_monitor_lib 24import acts.controllers.monsoon as monsoon_controller 25import acts.controllers.iperf_server as ipf 26from acts import asserts 27from acts import base_test 28from acts import utils 29from acts.metrics.loggers.blackbox import BlackboxMetricLogger 30from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger 31from acts_contrib.test_utils.power import plot_utils 32 33RESET_BATTERY_STATS = 'dumpsys batterystats --reset' 34IPERF_TIMEOUT = 180 35THRESHOLD_TOLERANCE_DEFAULT = 0.2 36GET_FROM_PHONE = 'get_from_dut' 37GET_FROM_AP = 'get_from_ap' 38PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2 39MONSOON_MAX_CURRENT = 8.0 40DEFAULT_MONSOON_FREQUENCY = 500 41ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' 42MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' 43TEMP_FILE = '/sdcard/Download/tmp.log' 44 45 46class ObjNew(object): 47 """Create a random obj with unknown attributes and value. 48 49 """ 50 def __init__(self, **kwargs): 51 self.__dict__.update(kwargs) 52 53 def __contains__(self, item): 54 """Function to check if one attribute is contained in the object. 55 56 Args: 57 item: the item to check 58 Return: 59 True/False 60 """ 61 return hasattr(self, item) 62 63 64class PowerBaseTest(base_test.BaseTestClass): 65 """Base class for all wireless power related tests. 66 67 """ 68 def __init__(self, controllers): 69 70 super().__init__(controllers) 71 self.power_result = BlackboxMetricLogger.for_test_case( 72 metric_name='avg_power') 73 self.start_meas_time = 0 74 self.rockbottom_script = None 75 self.img_name = '' 76 self.dut = None 77 self.power_logger = PowerMetricLogger.for_test_case() 78 self.power_monitor = None 79 80 @property 81 def final_test(self): 82 return len( 83 self.results.requested 84 ) > 0 and self.current_test_name == self.results.requested[-1] 85 86 @property 87 def display_name_test_suite(self): 88 return getattr(self, '_display_name_test_suite', 89 self.__class__.__name__) 90 91 @display_name_test_suite.setter 92 def display_name_test_suite(self, name): 93 self._display_name_test_suite = name 94 95 @property 96 def display_name_test_case(self): 97 default_test_name = getattr(self, 'test_name', None) 98 return getattr(self, '_display_name_test_case', default_test_name) 99 100 @display_name_test_case.setter 101 def display_name_test_case(self, name): 102 self._display_name_test_case = name 103 104 def initialize_power_monitor(self): 105 """ Initializes the power monitor object. 106 107 Raises an exception if there are no controllers available. 108 """ 109 if hasattr(self, 'bitses'): 110 if hasattr(self, 'monsoons'): 111 self.log.info('Destroying monsoon controller.') 112 monsoon_controller.destroy(self.monsoons) 113 time.sleep(2) 114 self.power_monitor = self.bitses[0] 115 self.power_monitor.setup(registry=self.user_params) 116 elif hasattr(self, 'monsoons'): 117 self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade( 118 self.monsoons[0]) 119 self.monsoons[0].set_max_current(8.0) 120 self.monsoons[0].set_voltage(self.mon_voltage) 121 else: 122 raise RuntimeError('No power monitors available.') 123 124 def setup_class(self): 125 126 super().setup_class() 127 128 self.log = logging.getLogger() 129 self.tests = self.get_existing_test_names() 130 131 # Obtain test parameters from user_params 132 TEST_PARAMS = self.TAG + '_params' 133 self.test_params = self.user_params.get(TEST_PARAMS, {}) 134 if not self.test_params: 135 self.log.warning(TEST_PARAMS + ' was not found in the user ' 136 'parameters defined in the config file.') 137 138 # Override user_param values with test parameters 139 self.user_params.update(self.test_params) 140 141 # Unpack user_params with default values. All the usages of user_params 142 # as self attributes need to be included either as a required parameter 143 # or as a parameter with a default value. 144 req_params = ['custom_files', 'mon_duration'] 145 self.unpack_userparams(req_params, 146 mon_freq=DEFAULT_MONSOON_FREQUENCY, 147 mon_offset=0, 148 bug_report=False, 149 extra_wait=None, 150 iperf_duration=None, 151 pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, 152 mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT, 153 ap_dtim_period=None) 154 155 # Setup the must have controllers, phone and monsoon 156 self.dut = self.android_devices[0] 157 self.mon_data_path = os.path.join(self.log_path, 'Monsoon') 158 os.makedirs(self.mon_data_path, exist_ok=True) 159 160 # Initialize the power monitor object that will be used to measure 161 self.initialize_power_monitor() 162 163 # Unpack the thresholds file or fail class setup if it can't be found 164 for file in self.custom_files: 165 if 'pass_fail_threshold_' + self.dut.model in file: 166 self.threshold_file = file 167 break 168 else: 169 raise RuntimeError('Required test pass/fail threshold file is ' 170 'missing') 171 172 # Unpack the rockbottom script or fail class setup if it can't be found 173 for file in self.custom_files: 174 if 'rockbottom_' + self.dut.model in file: 175 self.rockbottom_script = file 176 break 177 else: 178 raise RuntimeError('Required rockbottom script is missing.') 179 180 # Unpack optional custom files 181 for file in self.custom_files: 182 if 'attenuator_setting' in file: 183 self.attenuation_file = file 184 elif 'network_config' in file: 185 self.network_file = file 186 187 if hasattr(self, 'attenuators'): 188 self.num_atten = self.attenuators[0].instrument.num_atten 189 self.atten_level = self.unpack_custom_file(self.attenuation_file) 190 self.threshold = self.unpack_custom_file(self.threshold_file) 191 self.mon_info = self.create_monsoon_info() 192 193 # Sync device time, timezone and country code 194 utils.require_sl4a((self.dut, )) 195 utils.sync_device_time(self.dut) 196 197 screen_on_img = self.user_params.get('screen_on_img', []) 198 if screen_on_img: 199 img_src = screen_on_img[0] 200 img_dest = '/sdcard/Pictures/' 201 success = self.dut.push_system_file(img_src, img_dest) 202 if success: 203 self.img_name = os.path.basename(img_src) 204 205 def setup_test(self): 206 """Set up test specific parameters or configs. 207 208 """ 209 super().setup_test() 210 211 # Reset result variables 212 self.avg_current = 0 213 self.samples = [] 214 self.power_result.metric_value = 0 215 216 # Set the device into rockbottom state 217 self.dut_rockbottom() 218 219 # Wait for extra time if needed for the first test 220 if self.extra_wait: 221 self.more_wait_first_test() 222 223 def teardown_test(self): 224 """Tear down necessary objects after test case is finished. 225 226 """ 227 self.log.info('Tearing down the test case') 228 self.power_monitor.connect_usb() 229 self.power_logger.set_avg_power(self.power_result.metric_value) 230 self.power_logger.set_avg_current(self.avg_current) 231 self.power_logger.set_voltage(self.mon_voltage) 232 self.power_logger.set_testbed(self.testbed_name) 233 234 # If a threshold was provided, log it in the power proto 235 if self.threshold and self.test_name in self.threshold: 236 avg_current_threshold = self.threshold[self.test_name] 237 self.power_logger.set_avg_current_threshold(avg_current_threshold) 238 239 build_id = self.dut.build_info.get('build_id', '') 240 incr_build_id = self.dut.build_info.get('incremental_build_id', '') 241 branch = self.user_params.get('branch', '') 242 target = self.dut.device_info.get('flavor', '') 243 244 self.power_logger.set_branch(branch) 245 self.power_logger.set_build_id(build_id) 246 self.power_logger.set_incremental_build_id(incr_build_id) 247 self.power_logger.set_target(target) 248 249 # Log the display name of the test suite and test case 250 if self.display_name_test_suite: 251 name = self.display_name_test_suite 252 self.power_logger.set_test_suite_display_name(name) 253 254 if self.display_name_test_case: 255 name = self.display_name_test_case 256 self.power_logger.set_test_case_display_name(name) 257 258 # Take Bugreport 259 if self.bug_report: 260 begin_time = utils.get_current_epoch_time() 261 self.dut.take_bug_report(self.test_name, begin_time) 262 263 # Allow the device to cooldown before executing the next test 264 cooldown = self.test_params.get('cooldown', None) 265 if cooldown and not self.final_test: 266 time.sleep(cooldown) 267 268 def teardown_class(self): 269 """Clean up the test class after tests finish running 270 271 """ 272 self.log.info('Tearing down the test class') 273 if self.power_monitor: 274 self.power_monitor.connect_usb() 275 276 def on_fail(self, test_name, begin_time): 277 self.power_logger.set_pass_fail_status('FAIL') 278 279 def on_pass(self, test_name, begin_time): 280 self.power_logger.set_pass_fail_status('PASS') 281 282 def dut_rockbottom(self): 283 """Set the dut to rockbottom state 284 285 """ 286 # The rockbottom script might include a device reboot, so it is 287 # necessary to stop SL4A during its execution. 288 self.dut.stop_services() 289 self.log.info('Executing rockbottom script for ' + self.dut.model) 290 os.chmod(self.rockbottom_script, 0o777) 291 os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial, 292 self.img_name)) 293 # Make sure the DUT is in root mode after coming back 294 self.dut.root_adb() 295 # Restart SL4A 296 self.dut.start_services() 297 298 def unpack_custom_file(self, file, test_specific=True): 299 """Unpack the pass_fail_thresholds from a common file. 300 301 Args: 302 file: the common file containing pass fail threshold. 303 test_specific: if True, returns the JSON element within the file 304 that starts with the test class name. 305 """ 306 with open(file, 'r') as f: 307 params = json.load(f) 308 if test_specific: 309 try: 310 return params[self.TAG] 311 except KeyError: 312 pass 313 else: 314 return params 315 316 def decode_test_configs(self, attrs, indices): 317 """Decode the test config/params from test name. 318 319 Remove redundant function calls when tests are similar. 320 Args: 321 attrs: a list of the attrs of the test config obj 322 indices: a list of the location indices of keyword in the test name. 323 """ 324 # Decode test parameters for the current test 325 test_params = self.current_test_name.split('_') 326 values = [test_params[x] for x in indices] 327 config_dict = dict(zip(attrs, values)) 328 self.test_configs = ObjNew(**config_dict) 329 330 def more_wait_first_test(self): 331 # For the first test, increase the offset for longer wait time 332 if self.current_test_name == self.tests[0]: 333 self.mon_info.offset = self.mon_offset + self.extra_wait 334 else: 335 self.mon_info.offset = self.mon_offset 336 337 def set_attenuation(self, atten_list): 338 """Function to set the attenuator to desired attenuations. 339 340 Args: 341 atten_list: list containing the attenuation for each attenuator. 342 """ 343 if len(atten_list) != self.num_atten: 344 raise Exception('List given does not have the correct length') 345 for i in range(self.num_atten): 346 self.attenuators[i].set_atten(atten_list[i]) 347 348 def measure_power_and_validate(self): 349 """The actual test flow and result processing and validate. 350 351 """ 352 self.collect_power_data() 353 self.pass_fail_check(self.avg_current) 354 355 def collect_power_data(self): 356 """Measure power, plot and take log if needed. 357 358 Returns: 359 A MonsoonResult object. 360 """ 361 # Collecting current measurement data and plot 362 samples = self.power_monitor_data_collect_save() 363 364 current = [sample[1] for sample in samples] 365 average_current = sum(current) * 1000 / len(current) 366 367 self.power_result.metric_value = (average_current * self.mon_voltage) 368 self.avg_current = average_current 369 370 plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model, 371 self.dut.build_info['build_id']) 372 plot_utils.current_waveform_plot(samples, self.mon_voltage, 373 self.mon_info.data_path, plot_title) 374 375 return samples 376 377 def pass_fail_check(self, average_current=None): 378 """Check the test result and decide if it passed or failed. 379 380 The threshold is provided in the config file. In this class, result is 381 current in mA. 382 """ 383 384 if not self.threshold or self.test_name not in self.threshold: 385 self.log.error("No threshold is provided for the test '{}' in " 386 "the configuration file.".format(self.test_name)) 387 return 388 389 current_threshold = self.threshold[self.test_name] 390 if average_current: 391 asserts.assert_true( 392 abs(average_current - current_threshold) / current_threshold < 393 self.pass_fail_tolerance, 394 'Measured average current in [{}]: {:.2f}mA, which is ' 395 'out of the acceptable range {:.2f}±{:.2f}mA'.format( 396 self.test_name, average_current, current_threshold, 397 self.pass_fail_tolerance * current_threshold)) 398 asserts.explicit_pass( 399 'Measurement finished for [{}]: {:.2f}mA, which is ' 400 'within the acceptable range {:.2f}±{:.2f}'.format( 401 self.test_name, average_current, current_threshold, 402 self.pass_fail_tolerance * current_threshold)) 403 else: 404 asserts.fail( 405 'Something happened, measurement is not complete, test failed') 406 407 def create_monsoon_info(self): 408 """Creates the config dictionary for monsoon 409 410 Returns: 411 mon_info: Dictionary with the monsoon packet config 412 """ 413 mon_info = ObjNew(freq=self.mon_freq, 414 duration=self.mon_duration, 415 offset=self.mon_offset, 416 data_path=self.mon_data_path) 417 return mon_info 418 419 def power_monitor_data_collect_save(self): 420 """Current measurement and save the log file. 421 422 Collect current data using Monsoon box and return the path of the 423 log file. Take bug report if requested. 424 425 Returns: 426 A list of tuples in which the first element is a timestamp and the 427 second element is the sampled current in Amperes at that time. 428 """ 429 430 tag = '{}_{}_{}'.format(self.test_name, self.dut.model, 431 self.dut.build_info['build_id']) 432 433 data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag)) 434 435 # If the specified Monsoon data file already exists (e.g., multiple 436 # measurements in a single test), write the results to a new file with 437 # the postfix "_#". 438 if os.path.exists(data_path): 439 highest_value = 1 440 for filename in os.listdir(os.path.dirname(data_path)): 441 match = re.match(r'{}_(\d+).txt'.format(tag), filename) 442 if match: 443 highest_value = max(highest_value, int(match.group(1))) 444 445 data_path = os.path.join(self.mon_info.data_path, 446 '%s_%s.txt' % (tag, highest_value + 1)) 447 448 # Resets the battery status right before the test starts. 449 self.dut.adb.shell(RESET_BATTERY_STATS) 450 self.log.info('Starting power measurement. Duration: {}s. Offset: ' 451 '{}s. Voltage: {} V.'.format(self.mon_info.duration, 452 self.mon_info.offset, 453 self.mon_voltage)) 454 455 # TODO(b/155426729): Create an accurate host-to-device time difference 456 # measurement. 457 device_time_cmd = 'echo $EPOCHREALTIME' 458 device_time = self.dut.adb.shell(device_time_cmd) 459 host_time = time.time() 460 self.log.debug('device start time %s, host start time %s', device_time, 461 host_time) 462 device_to_host_offset = float(device_time) - host_time 463 464 # Start the power measurement using monsoon. 465 self.dut.stop_services() 466 time.sleep(1) 467 self.power_monitor.disconnect_usb() 468 measurement_args = dict(duration=self.mon_info.duration, 469 measure_after_seconds=self.mon_info.offset, 470 hz=self.mon_info.freq) 471 self.power_monitor.measure(measurement_args=measurement_args, 472 measurement_name=self.test_name, 473 start_time=device_to_host_offset, 474 monsoon_output_path=data_path) 475 self.power_monitor.release_resources() 476 self.power_monitor.connect_usb() 477 self.dut.wait_for_boot_completion() 478 time.sleep(10) 479 self.dut.start_services() 480 481 return self.power_monitor.get_waveform(file_path=data_path) 482 483 def process_iperf_results(self): 484 """Get the iperf results and process. 485 486 Returns: 487 throughput: the average throughput during tests. 488 """ 489 # Get IPERF results and add this to the plot title 490 RESULTS_DESTINATION = os.path.join( 491 self.iperf_server.log_path, 492 'iperf_client_output_{}.log'.format(self.current_test_name)) 493 self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION) 494 # Calculate the average throughput 495 if self.use_client_output: 496 iperf_file = RESULTS_DESTINATION 497 else: 498 iperf_file = self.iperf_server.log_files[-1] 499 try: 500 iperf_result = ipf.IPerfResult(iperf_file) 501 502 # Compute the throughput in Mbit/s 503 throughput = (math.fsum( 504 iperf_result.instantaneous_rates[self.start_meas_time:-1] 505 ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1]) 506 ) * 8 * (1.024**2) 507 508 self.log.info('The average throughput is {}'.format(throughput)) 509 except ValueError: 510 self.log.warning('Cannot get iperf result. Setting to 0') 511 throughput = 0 512 return throughput 513