1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16import json 17import logging 18import math 19import os 20import re 21import time 22 23import acts.controllers.power_monitor as power_monitor_lib 24import acts.controllers.monsoon as monsoon_controller 25import acts.controllers.iperf_server as ipf 26from acts import asserts 27from acts import base_test 28from acts import utils 29from acts.metrics.loggers.blackbox import BlackboxMetricLogger 30from acts.controllers.adb_lib.error import AdbError 31from acts_contrib.test_utils.power.loggers.power_metric_logger import PowerMetricLogger 32from acts_contrib.test_utils.power import plot_utils 33 34RESET_BATTERY_STATS = 'dumpsys batterystats --reset' 35IPERF_TIMEOUT = 180 36THRESHOLD_TOLERANCE_DEFAULT = 0.2 37GET_FROM_PHONE = 'get_from_dut' 38GET_FROM_AP = 'get_from_ap' 39GET_PROPERTY_HARDWARE_PLATFORM = 'getprop ro.boot.hardware.platform' 40POWER_STATS_DUMPSYS_CMD = 'dumpsys android.hardware.power.stats.IPowerStats/default delta' 41PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2 42MONSOON_MAX_CURRENT = 8.0 43DEFAULT_MONSOON_FREQUENCY = 500 44ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' 45MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' 46TEMP_FILE = '/sdcard/Download/tmp.log' 47 48 49class ObjNew(object): 50 """Create a random obj with unknown attributes and value. 51 52 """ 53 def __init__(self, **kwargs): 54 self.__dict__.update(kwargs) 55 56 def __contains__(self, item): 57 """Function to check if one attribute is contained in the object. 58 59 Args: 60 item: the item to check 61 Return: 62 True/False 63 """ 64 return hasattr(self, item) 65 66 67class PowerBaseTest(base_test.BaseTestClass): 68 """Base class for all wireless power related tests. 69 70 """ 71 def __init__(self, controllers): 72 73 super().__init__(controllers) 74 self.power_result = BlackboxMetricLogger.for_test_case( 75 metric_name='avg_power') 76 self.start_meas_time = 0 77 self.rockbottom_script = None 78 self.img_name = '' 79 self.dut = None 80 self.power_logger = PowerMetricLogger.for_test_case() 81 self.power_monitor = None 82 self.odpm_folder = None 83 84 @property 85 def final_test(self): 86 return len( 87 self.results.requested 88 ) > 0 and self.current_test_name == self.results.requested[-1] 89 90 @property 91 def display_name_test_suite(self): 92 return getattr(self, '_display_name_test_suite', 93 self.__class__.__name__) 94 95 @display_name_test_suite.setter 96 def display_name_test_suite(self, name): 97 self._display_name_test_suite = name 98 99 @property 100 def display_name_test_case(self): 101 default_test_name = getattr(self, 'test_name', None) 102 return getattr(self, '_display_name_test_case', default_test_name) 103 104 @display_name_test_case.setter 105 def display_name_test_case(self, name): 106 self._display_name_test_case = name 107 108 def initialize_power_monitor(self): 109 """ Initializes the power monitor object. 110 111 Raises an exception if there are no controllers available. 112 """ 113 if hasattr(self, 'bitses'): 114 if hasattr(self, 'monsoons'): 115 self.log.info('Destroying monsoon controller.') 116 monsoon_controller.destroy(self.monsoons) 117 time.sleep(2) 118 self.power_monitor = self.bitses[0] 119 self.power_monitor.setup(registry=self.user_params) 120 elif hasattr(self, 'monsoons'): 121 self.power_monitor = power_monitor_lib.PowerMonitorMonsoonFacade( 122 self.monsoons[0]) 123 self.monsoons[0].set_max_current(8.0) 124 self.monsoons[0].set_voltage(self.mon_voltage) 125 else: 126 raise RuntimeError('No power monitors available.') 127 128 def setup_class(self): 129 130 super().setup_class() 131 132 self.log = logging.getLogger() 133 self.tests = self.get_existing_test_names() 134 135 # Obtain test parameters from user_params 136 TEST_PARAMS = self.TAG + '_params' 137 self.test_params = self.user_params.get(TEST_PARAMS, {}) 138 if not self.test_params: 139 self.log.warning(TEST_PARAMS + ' was not found in the user ' 140 'parameters defined in the config file.') 141 142 # Override user_param values with test parameters 143 self.user_params.update(self.test_params) 144 145 # Unpack user_params with default values. All the usages of user_params 146 # as self attributes need to be included either as a required parameter 147 # or as a parameter with a default value. 148 req_params = ['custom_files', 'mon_duration'] 149 self.unpack_userparams(req_params, 150 mon_freq=DEFAULT_MONSOON_FREQUENCY, 151 mon_offset=0, 152 bug_report=False, 153 extra_wait=None, 154 iperf_duration=None, 155 pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, 156 mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT, 157 ap_dtim_period=None, 158 bits_root_rail_csv_export=False) 159 160 # Setup the must have controllers, phone and monsoon 161 self.dut = self.android_devices[0] 162 self.mon_data_path = os.path.join(self.log_path, 'Monsoon') 163 os.makedirs(self.mon_data_path, exist_ok=True) 164 165 # Make odpm path for P21 or later 166 platform = self.dut.adb.shell(GET_PROPERTY_HARDWARE_PLATFORM) 167 self.log.info('The hardware platform is {}'.format(platform)) 168 if platform.startswith('gs'): 169 self.odpm_folder = os.path.join(self.log_path, 'odpm') 170 os.makedirs(self.odpm_folder, exist_ok=True) 171 self.log.info('For P21 or later, create odpm folder {}'.format( 172 self.odpm_folder)) 173 174 # Initialize the power monitor object that will be used to measure 175 self.initialize_power_monitor() 176 177 # Unpack the thresholds file or fail class setup if it can't be found 178 for file in self.custom_files: 179 if 'pass_fail_threshold_' + self.dut.model in file: 180 self.threshold_file = file 181 break 182 else: 183 raise RuntimeError('Required test pass/fail threshold file is ' 184 'missing') 185 186 # Unpack the rockbottom script or fail class setup if it can't be found 187 for file in self.custom_files: 188 if 'rockbottom_' + self.dut.model in file: 189 self.rockbottom_script = file 190 break 191 else: 192 raise RuntimeError('Required rockbottom script is missing.') 193 194 # Unpack optional custom files 195 for file in self.custom_files: 196 if 'attenuator_setting' in file: 197 self.attenuation_file = file 198 elif 'network_config' in file: 199 self.network_file = file 200 201 if hasattr(self, 'attenuators'): 202 self.num_atten = self.attenuators[0].instrument.num_atten 203 self.atten_level = self.unpack_custom_file(self.attenuation_file) 204 self.threshold = self.unpack_custom_file(self.threshold_file) 205 self.mon_info = self.create_monsoon_info() 206 207 # Sync device time, timezone and country code 208 utils.require_sl4a((self.dut, )) 209 utils.sync_device_time(self.dut) 210 211 screen_on_img = self.user_params.get('screen_on_img', []) 212 if screen_on_img: 213 img_src = screen_on_img[0] 214 img_dest = '/sdcard/Pictures/' 215 success = self.dut.push_system_file(img_src, img_dest) 216 if success: 217 self.img_name = os.path.basename(img_src) 218 219 def setup_test(self): 220 """Set up test specific parameters or configs. 221 222 """ 223 super().setup_test() 224 225 # Reset result variables 226 self.avg_current = 0 227 self.samples = [] 228 self.power_result.metric_value = 0 229 230 # Set the device into rockbottom state 231 self.dut_rockbottom() 232 233 # Wait for extra time if needed for the first test 234 if self.extra_wait: 235 self.more_wait_first_test() 236 237 def teardown_test(self): 238 """Tear down necessary objects after test case is finished. 239 240 """ 241 self.log.info('Tearing down the test case') 242 self.power_monitor.connect_usb() 243 self.power_logger.set_avg_power(self.power_result.metric_value) 244 self.power_logger.set_avg_current(self.avg_current) 245 self.power_logger.set_voltage(self.mon_voltage) 246 self.power_logger.set_testbed(self.testbed_name) 247 248 # If a threshold was provided, log it in the power proto 249 if self.threshold and self.test_name in self.threshold: 250 avg_current_threshold = self.threshold[self.test_name] 251 self.power_logger.set_avg_current_threshold(avg_current_threshold) 252 253 build_id = self.dut.build_info.get('build_id', '') 254 incr_build_id = self.dut.build_info.get('incremental_build_id', '') 255 branch = self.user_params.get('branch', '') 256 target = self.dut.device_info.get('flavor', '') 257 258 self.power_logger.set_branch(branch) 259 self.power_logger.set_build_id(build_id) 260 self.power_logger.set_incremental_build_id(incr_build_id) 261 self.power_logger.set_target(target) 262 263 # Log the display name of the test suite and test case 264 if self.display_name_test_suite: 265 name = self.display_name_test_suite 266 self.power_logger.set_test_suite_display_name(name) 267 268 if self.display_name_test_case: 269 name = self.display_name_test_case 270 self.power_logger.set_test_case_display_name(name) 271 272 # Take Bugreport 273 if self.bug_report: 274 begin_time = utils.get_current_epoch_time() 275 self.dut.take_bug_report(self.test_name, begin_time) 276 277 # Allow the device to cooldown before executing the next test 278 cooldown = self.test_params.get('cooldown', None) 279 if cooldown and not self.final_test: 280 time.sleep(cooldown) 281 282 def teardown_class(self): 283 """Clean up the test class after tests finish running 284 285 """ 286 self.log.info('Tearing down the test class') 287 if self.power_monitor: 288 self.power_monitor.connect_usb() 289 290 def on_fail(self, test_name, begin_time): 291 self.power_logger.set_pass_fail_status('FAIL') 292 293 def on_pass(self, test_name, begin_time): 294 self.power_logger.set_pass_fail_status('PASS') 295 296 def dut_save_odpm(self, tag): 297 """Dumpsys ODPM data and save it to self.odpm_folder. 298 299 Args: 300 tag: the moment of save ODPM data 301 """ 302 odpm_file_name = '{}.{}.dumpsys_odpm_{}.txt'.format( 303 self.__class__.__name__, 304 self.current_test_name, 305 tag) 306 odpm_file_path = os.path.join(self.odpm_folder, odpm_file_name) 307 308 try: 309 stats = self.dut.adb.shell(POWER_STATS_DUMPSYS_CMD) 310 with open(odpm_file_path, 'w') as f: 311 f.write(stats) 312 except AdbError as e: 313 self.log.warning('Odpm data with tag {} did not save due to adb ' 314 'error {}'.format(e)) 315 316 def dut_rockbottom(self): 317 """Set the dut to rockbottom state 318 319 """ 320 # The rockbottom script might include a device reboot, so it is 321 # necessary to stop SL4A during its execution. 322 self.dut.stop_services() 323 self.log.info('Executing rockbottom script for ' + self.dut.model) 324 os.chmod(self.rockbottom_script, 0o777) 325 os.system('{} {} {}'.format(self.rockbottom_script, self.dut.serial, 326 self.img_name)) 327 # Make sure the DUT is in root mode after coming back 328 self.dut.root_adb() 329 # Restart SL4A 330 self.dut.start_services() 331 332 def unpack_custom_file(self, file, test_specific=True): 333 """Unpack the pass_fail_thresholds from a common file. 334 335 Args: 336 file: the common file containing pass fail threshold. 337 test_specific: if True, returns the JSON element within the file 338 that starts with the test class name. 339 """ 340 with open(file, 'r') as f: 341 params = json.load(f) 342 if test_specific: 343 try: 344 return params[self.TAG] 345 except KeyError: 346 pass 347 else: 348 return params 349 350 def decode_test_configs(self, attrs, indices): 351 """Decode the test config/params from test name. 352 353 Remove redundant function calls when tests are similar. 354 Args: 355 attrs: a list of the attrs of the test config obj 356 indices: a list of the location indices of keyword in the test name. 357 """ 358 # Decode test parameters for the current test 359 test_params = self.current_test_name.split('_') 360 values = [test_params[x] for x in indices] 361 config_dict = dict(zip(attrs, values)) 362 self.test_configs = ObjNew(**config_dict) 363 364 def more_wait_first_test(self): 365 # For the first test, increase the offset for longer wait time 366 if self.current_test_name == self.tests[0]: 367 self.mon_info.offset = self.mon_offset + self.extra_wait 368 else: 369 self.mon_info.offset = self.mon_offset 370 371 def set_attenuation(self, atten_list): 372 """Function to set the attenuator to desired attenuations. 373 374 Args: 375 atten_list: list containing the attenuation for each attenuator. 376 """ 377 if len(atten_list) != self.num_atten: 378 raise Exception('List given does not have the correct length') 379 for i in range(self.num_atten): 380 self.attenuators[i].set_atten(atten_list[i]) 381 382 def measure_power_and_validate(self): 383 """The actual test flow and result processing and validate. 384 385 """ 386 self.collect_power_data() 387 self.pass_fail_check(self.avg_current) 388 389 def collect_power_data(self): 390 """Measure power, plot and take log if needed. 391 392 Returns: 393 A MonsoonResult object. 394 """ 395 # Collecting current measurement data and plot 396 samples = self.power_monitor_data_collect_save() 397 398 current = [sample[1] for sample in samples] 399 average_current = sum(current) * 1000 / len(current) 400 401 self.power_result.metric_value = (average_current * self.mon_voltage) 402 self.avg_current = average_current 403 404 plot_title = '{}_{}_{}'.format(self.test_name, self.dut.model, 405 self.dut.build_info['build_id']) 406 plot_utils.current_waveform_plot(samples, self.mon_voltage, 407 self.mon_info.data_path, plot_title) 408 409 return samples 410 411 def pass_fail_check(self, average_current=None): 412 """Check the test result and decide if it passed or failed. 413 414 The threshold is provided in the config file. In this class, result is 415 current in mA. 416 """ 417 418 if not self.threshold or self.test_name not in self.threshold: 419 self.log.error("No threshold is provided for the test '{}' in " 420 "the configuration file.".format(self.test_name)) 421 return 422 423 current_threshold = self.threshold[self.test_name] 424 if average_current: 425 asserts.assert_true( 426 abs(average_current - current_threshold) / current_threshold < 427 self.pass_fail_tolerance, 428 'Measured average current in [{}]: {:.2f}mA, which is ' 429 'out of the acceptable range {:.2f}±{:.2f}mA'.format( 430 self.test_name, average_current, current_threshold, 431 self.pass_fail_tolerance * current_threshold)) 432 asserts.explicit_pass( 433 'Measurement finished for [{}]: {:.2f}mA, which is ' 434 'within the acceptable range {:.2f}±{:.2f}'.format( 435 self.test_name, average_current, current_threshold, 436 self.pass_fail_tolerance * current_threshold)) 437 else: 438 asserts.fail( 439 'Something happened, measurement is not complete, test failed') 440 441 def create_monsoon_info(self): 442 """Creates the config dictionary for monsoon 443 444 Returns: 445 mon_info: Dictionary with the monsoon packet config 446 """ 447 mon_info = ObjNew(freq=self.mon_freq, 448 duration=self.mon_duration, 449 offset=self.mon_offset, 450 data_path=self.mon_data_path) 451 return mon_info 452 453 def power_monitor_data_collect_save(self): 454 """Current measurement and save the log file. 455 456 Collect current data using Monsoon box and return the path of the 457 log file. Take bug report if requested. 458 459 Returns: 460 A list of tuples in which the first element is a timestamp and the 461 second element is the sampled current in Amperes at that time. 462 """ 463 464 tag = '{}_{}_{}'.format(self.test_name, self.dut.model, 465 self.dut.build_info['build_id']) 466 467 data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag)) 468 469 # If the specified Monsoon data file already exists (e.g., multiple 470 # measurements in a single test), write the results to a new file with 471 # the postfix "_#". 472 if os.path.exists(data_path): 473 highest_value = 1 474 for filename in os.listdir(os.path.dirname(data_path)): 475 match = re.match(r'{}_(\d+).txt'.format(tag), filename) 476 if match: 477 highest_value = max(highest_value, int(match.group(1))) 478 479 data_path = os.path.join(self.mon_info.data_path, 480 '%s_%s.txt' % (tag, highest_value + 1)) 481 482 # Resets the battery status right before the test starts. 483 self.dut.adb.shell(RESET_BATTERY_STATS) 484 self.log.info('Starting power measurement. Duration: {}s. Offset: ' 485 '{}s. Voltage: {} V.'.format(self.mon_info.duration, 486 self.mon_info.offset, 487 self.mon_voltage)) 488 489 # TODO(b/155426729): Create an accurate host-to-device time difference 490 # measurement. 491 device_time_cmd = 'echo $EPOCHREALTIME' 492 device_time = self.dut.adb.shell(device_time_cmd) 493 host_time = time.time() 494 self.log.debug('device start time %s, host start time %s', device_time, 495 host_time) 496 device_to_host_offset = float(device_time) - host_time 497 498 # Start the power measurement using monsoon. 499 self.dut.stop_services() 500 time.sleep(1) 501 502 # P21 or later device, save the odpm data before power measurement 503 if self.odpm_folder: 504 self.dut_save_odpm('before') 505 506 self.power_monitor.disconnect_usb() 507 measurement_args = dict(duration=self.mon_info.duration, 508 measure_after_seconds=self.mon_info.offset, 509 hz=self.mon_info.freq) 510 self.power_monitor.measure(measurement_args=measurement_args, 511 measurement_name=self.test_name, 512 start_time=device_to_host_offset, 513 monsoon_output_path=data_path) 514 self.power_monitor.release_resources() 515 self.collect_raw_data_samples() 516 self.power_monitor.connect_usb() 517 self.dut.wait_for_boot_completion() 518 time.sleep(10) 519 520 # For P21 or later device, save the odpm data after power measurement 521 if self.odpm_folder: 522 self.dut_save_odpm('after') 523 524 self.dut.start_services() 525 526 return self.power_monitor.get_waveform(file_path=data_path) 527 528 def process_iperf_results(self): 529 """Get the iperf results and process. 530 531 Returns: 532 throughput: the average throughput during tests. 533 """ 534 # Get IPERF results and add this to the plot title 535 RESULTS_DESTINATION = os.path.join( 536 self.iperf_server.log_path, 537 'iperf_client_output_{}.log'.format(self.current_test_name)) 538 self.dut.pull_files(TEMP_FILE, RESULTS_DESTINATION) 539 # Calculate the average throughput 540 if self.use_client_output: 541 iperf_file = RESULTS_DESTINATION 542 else: 543 iperf_file = self.iperf_server.log_files[-1] 544 try: 545 iperf_result = ipf.IPerfResult(iperf_file) 546 547 # Compute the throughput in Mbit/s 548 throughput = (math.fsum( 549 iperf_result.instantaneous_rates[self.start_meas_time:-1] 550 ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1]) 551 ) * 8 * (1.024**2) 552 553 self.log.info('The average throughput is {}'.format(throughput)) 554 except ValueError: 555 self.log.warning('Cannot get iperf result. Setting to 0') 556 throughput = 0 557 return throughput 558 559 def collect_raw_data_samples(self): 560 if hasattr(self, 'bitses') and self.bits_root_rail_csv_export: 561 path = os.path.join(os.path.dirname(self.mon_info.data_path), 562 'Kibble') 563 self.power_monitor.get_bits_root_rail_csv_export(path, self.test_name) 564