1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16'''GNSS Base Class for Blanking and Hot Start Sensitivity Search''' 17 18import os 19import re 20from time import sleep 21from collections import namedtuple 22from itertools import product 23from numpy import arange 24from pandas import DataFrame, merge 25from acts.signals import TestError 26from acts.signals import TestFailure 27from acts.logger import epoch_to_log_line_timestamp 28from acts.context import get_current_context 29from acts_contrib.test_utils.gnss import LabTtffTestBase as lttb 30from acts_contrib.test_utils.gnss.LabTtffTestBase import glob_re 31from acts_contrib.test_utils.gnss.gnss_test_utils import launch_eecoexer 32from acts_contrib.test_utils.gnss.gnss_test_utils import execute_eecoexer_function 33from acts_contrib.test_utils.gnss.gnss_test_utils import start_gnss_by_gtw_gpstool 34from acts_contrib.test_utils.gnss.gnss_test_utils import get_current_epoch_time 35from acts_contrib.test_utils.gnss.gnss_test_utils import check_current_focus_app 36from acts_contrib.test_utils.gnss.gnss_test_utils import process_ttff_by_gtw_gpstool 37from acts_contrib.test_utils.gnss.gnss_test_utils import check_ttff_data 38from acts_contrib.test_utils.gnss.gnss_test_utils import process_gnss_by_gtw_gpstool 39from acts_contrib.test_utils.gnss.dut_log_test_utils import get_gpstool_logs 40from acts_contrib.test_utils.gnss.gnss_testlog_utils import parse_gpstool_ttfflog_to_df 41 42 43def range_wi_end(dut, start, stop, step): 44 """ 45 Generate a list of data from start to stop with the step. The list includes start and stop value 46 and also supports floating point. 47 Args: 48 dut: An AndroidDevice object. 49 start: start value. 50 Type, int or float. 51 stop: stop value. 52 Type, int or float. 53 step: step value. 54 Type, int or float. 55 Returns: 56 range_ls: the list of data. 57 """ 58 if step == 0: 59 dut.log.warn('Step is 0. Return empty list') 60 range_ls = [] 61 else: 62 if start == stop: 63 range_ls = [stop] 64 else: 65 range_ls = list(arange(start, stop, step)) 66 if len(range_ls) > 0: 67 if (step < 0 and range_ls[-1] > stop) or (step > 0 and 68 range_ls[-1] < stop): 69 range_ls.append(stop) 70 dut.log.debug(f'The range list is: {range_ls}') 71 return range_ls 72 73 74def check_ttff_pe(dut, ttff_data, ttff_mode, pe_criteria): 75 """Verify all TTFF results from ttff_data. 76 77 Args: 78 dut: An AndroidDevice object. 79 ttff_data: TTFF data of secs, position error and signal strength. 80 ttff_mode: TTFF Test mode for current test item. 81 pe_criteria: Criteria for current test item. 82 83 """ 84 ret = True 85 no_iteration = len(ttff_data.keys()) 86 dut.log.info( 87 f'{no_iteration} iterations of TTFF {ttff_mode} tests finished.') 88 dut.log.info(f'{ttff_mode} PASS criteria is {pe_criteria} meters') 89 dut.log.debug(f'{ttff_mode} TTFF data: {ttff_data}') 90 91 if len(ttff_data.keys()) == 0: 92 dut.log.error("GTW_GPSTool didn't process TTFF properly.") 93 raise TestFailure("GTW_GPSTool didn't process TTFF properly.") 94 95 if any( 96 float(ttff_data[key].ttff_pe) >= pe_criteria 97 for key in ttff_data.keys()): 98 dut.log.error( 99 f'One or more TTFF {ttff_mode} are over test criteria {pe_criteria} meters' 100 ) 101 ret = False 102 else: 103 dut.log.info( 104 f'All TTFF {ttff_mode} are within test criteria {pe_criteria} meters.' 105 ) 106 ret = True 107 return ret 108 109 110class GnssBlankingBase(lttb.LabTtffTestBase): 111 """ LAB GNSS Cellular Coex Tx Power Sweep TTFF/FFPE Tests""" 112 GNSS_PWR_SWEEP = 'gnss_pwr_sweep' 113 CELL_PWR_SWEEP = 'cell_pwr_sweep' 114 115 def __init__(self, controllers): 116 """ Initializes class attributes. """ 117 super().__init__(controllers) 118 self.eecoex_func = '' 119 self.start_pwr = 10 120 self.stop_pwr = 24 121 self.offset = 1 122 self.result_cell_pwr = 10 123 self.gsm_sweep_params = None 124 self.lte_tdd_pc3_sweep_params = None 125 self.lte_tdd_pc2_sweep_params = None 126 self.coex_stop_cmd = None 127 self.scen_sweep = False 128 self.gnss_pwr_sweep_init_ls = [] 129 self.gnss_pwr_sweep_fine_sweep_ls = [] 130 131 def setup_class(self): 132 super().setup_class() 133 134 # Required parameters 135 req_params = [self.GNSS_PWR_SWEEP] 136 self.unpack_userparams(req_param_names=req_params) 137 self.unpack_gnss_pwr_sweep() 138 139 # Optional parameters 140 cell_sweep_params = self.user_params.get(self.CELL_PWR_SWEEP, []) 141 142 if cell_sweep_params: 143 self.gsm_sweep_params = cell_sweep_params.get("GSM", [10, 33, 1]) 144 self.lte_tdd_pc3_sweep_params = cell_sweep_params.get( 145 "LTE_TDD_PC3", [10, 24, 1]) 146 self.lte_tdd_pc2_sweep_params = cell_sweep_params.get( 147 "LTE_TDD_PC2", [10, 26, 1]) 148 149 def setup_test(self): 150 super().setup_test() 151 launch_eecoexer(self.dut) 152 153 # Set DUT temperature the limit to 60 degree 154 self.dut.adb.shell( 155 'setprop persist.com.google.eecoexer.cellular.temperature_limit 60') 156 157 # Get current context full path to create the log folder. 158 cur_test_item_dir = get_current_context().get_full_output_path() 159 self.gnss_log_path = os.path.join(self.log_path, cur_test_item_dir) 160 os.makedirs(self.gnss_log_path, exist_ok=True) 161 162 ## Start GNSS chip log 163 self.start_dut_gnss_log() 164 165 def teardown_test(self): 166 super().teardown_test() 167 # Set gnss_vendor_log_path based on GNSS solution vendor. 168 gnss_vendor_log_path = os.path.join(self.gnss_log_path, 169 self.diag_option) 170 os.makedirs(gnss_vendor_log_path, exist_ok=True) 171 172 # Stop GNSS chip log and pull the logs to local file system 173 self.stop_and_pull_dut_gnss_log(gnss_vendor_log_path) 174 175 # Stop cellular Tx and close GPStool and EEcoexer APPs. 176 self.stop_coex_tx() 177 self.log.debug('Close GPStool APP') 178 self.dut.force_stop_apk("com.android.gpstool") 179 self.log.debug('Close EEcoexer APP') 180 self.dut.force_stop_apk("com.google.eecoexer") 181 182 def derive_sweep_list(self, data): 183 """ 184 Derive sweep list from config 185 Args: 186 data: GNSS simulator scenario power setting. 187 type, dictionary. 188 """ 189 match_tag = r'(?P<sat>[a-z]+)_(?P<band>[a-z]+\d\S*)' 190 sweep_all_ls = [] 191 set_all_ls = [] 192 regex_match = re.compile(match_tag) 193 method = data.get('method') 194 for key, value in data.items(): 195 result = regex_match.search(key) 196 if result: 197 set_all_ls.append(result.groupdict()) 198 sweep_all_ls.append( 199 range_wi_end(self.dut, value[0], value[1], value[2])) 200 if method == 'product': 201 swp_result_ls = list(product(*sweep_all_ls)) 202 else: 203 swp_result_ls = list(zip(*sweep_all_ls)) 204 205 self.log.debug(f'set_all_ls: {set_all_ls}') 206 self.log.debug(f'swp_result_ls: {swp_result_ls}') 207 return set_all_ls, swp_result_ls 208 209 def unpack_gnss_pwr_sweep(self): 210 """ Unpack gnss_pwr_sweep and construct sweep parameters 211 """ 212 213 for key, value in self.gnss_pwr_sweep.items(): 214 if key == 'init': 215 self.gnss_pwr_sweep_init_ls = [] 216 self.log.info(f'Sweep: {value}') 217 result = self.derive_sweep_list(value) 218 self.gnss_pwr_sweep_init_ls.append(result) 219 elif key == 'fine_sweep': 220 self.gnss_pwr_sweep_fine_sweep_ls = [] 221 self.log.info(f'Sweep: {value}') 222 result = self.derive_sweep_list(value) 223 self.gnss_pwr_sweep_fine_sweep_ls.append(result) 224 else: 225 self.log.error(f'{key} is a unsupported key in gnss_pwr_sweep.') 226 227 def stop_coex_tx(self): 228 """ 229 Stop EEcoexer Tx power. 230 """ 231 # Stop cellular Tx by EEcoexer. 232 if self.coex_stop_cmd: 233 self.log.info(f'Stop EEcoexer Test Command: {self.coex_stop_cmd}') 234 execute_eecoexer_function(self.dut, self.coex_stop_cmd) 235 236 def analysis_ttff_ffpe(self, ttff_data, json_tag=''): 237 """ 238 Pull logs and parsing logs into json file. 239 Args: 240 ttff_data: ttff_data from test results. 241 Type, list. 242 json_tag: tag for parsed json file name. 243 Type, str. 244 """ 245 # Create log directory. 246 gps_log_path = os.path.join(self.gnss_log_path, 247 'Cell_Pwr_Sweep_Results') 248 249 # Pull logs of GTW GPStool. 250 get_gpstool_logs(self.dut, gps_log_path, False) 251 252 # Parsing the log of GTW GPStool into pandas dataframe. 253 target_dir = os.path.join(gps_log_path, 'GPSLogs', 'files') 254 gps_api_log_ls = glob_re(self.dut, target_dir, r'GNSS_\d+') 255 latest_gps_api_log = max(gps_api_log_ls, key=os.path.getctime) 256 self.log.info(f'Get latest GPStool log is: {latest_gps_api_log}') 257 df_ttff_ffpe = DataFrame( 258 parse_gpstool_ttfflog_to_df(latest_gps_api_log)) 259 # Add test case, TTFF and FFPE data into the dataframe. 260 ttff_dict = {} 261 for i in ttff_data: 262 data = ttff_data[i]._asdict() 263 ttff_dict[i] = dict(data) 264 265 ttff_data_df = DataFrame(ttff_dict).transpose() 266 ttff_data_df = ttff_data_df[[ 267 'ttff_loop', 'ttff_sec', 'ttff_pe', 'ttff_haccu' 268 ]] 269 try: 270 df_ttff_ffpe = merge(df_ttff_ffpe, 271 ttff_data_df, 272 left_on='loop', 273 right_on='ttff_loop') 274 except: # pylint: disable=bare-except 275 self.log.warning("Can't merge ttff_data and df.") 276 df_ttff_ffpe['test_case'] = json_tag 277 278 json_file = f'gps_log_{json_tag}.json' 279 ttff_data_json_file = f'gps_log_{json_tag}_ttff_data.json' 280 json_path = os.path.join(gps_log_path, json_file) 281 ttff_data_json_path = os.path.join(gps_log_path, ttff_data_json_file) 282 # Save dataframe into json file. 283 df_ttff_ffpe.to_json(json_path, orient='table', index=False) 284 ttff_data_df.to_json(ttff_data_json_path, orient='table', index=False) 285 286 def hot_start_ttff_ffpe_process(self, iteration, wait): 287 """ 288 Function to run hot start TTFF/FFPE by GTW GPSTool 289 Args: 290 iteration: TTFF/FFPE test iteration. 291 type, integer. 292 wait: wait time before the hot start TTFF/FFPE test. 293 type, integer. 294 """ 295 # Start GTW GPStool. 296 self.dut.log.info("Restart GTW GPSTool") 297 start_gnss_by_gtw_gpstool(self.dut, state=True) 298 if wait > 0: 299 self.log.info( 300 f'Wait for {wait} seconds before TTFF to acquire data.') 301 sleep(wait) 302 # Get current time and convert to human readable format 303 begin_time = get_current_epoch_time() 304 log_begin_time = epoch_to_log_line_timestamp(begin_time) 305 self.dut.log.debug(f'Start time is {log_begin_time}') 306 307 # Run hot start TTFF 308 for i in range(3): 309 self.log.info(f'Start hot start attempt {i + 1}') 310 self.dut.adb.shell( 311 f'am broadcast -a com.android.gpstool.ttff_action ' 312 f'--es ttff hs --es cycle {iteration} --ez raninterval False') 313 sleep(1) 314 if self.dut.search_logcat( 315 "act=com.android.gpstool.start_test_action", begin_time): 316 self.dut.log.info("Send TTFF start_test_action successfully.") 317 break 318 else: 319 check_current_focus_app(self.dut) 320 raise TestError("Fail to send TTFF start_test_action.") 321 return begin_time 322 323 def gnss_hot_start_ttff_ffpe_test(self, 324 iteration, 325 sweep_enable=False, 326 json_tag='', 327 wait=0): 328 """ 329 GNSS hot start ttff ffpe tset 330 331 Args: 332 iteration: hot start TTFF test iteration. 333 Type, int. 334 Default, 1. 335 sweep_enable: Indicator for the function to check if it is run by cell_power_sweep() 336 Type, bool. 337 Default, False. 338 json_tag: if the function is run by cell_power_sweep(), the function would use 339 this as a part of file name to save TTFF and FFPE results into json file. 340 Type, str. 341 Default, ''. 342 wait: wait time before ttff test. 343 Type, int. 344 Default, 0. 345 Raise: 346 TestError: fail to send TTFF start_test_action. 347 """ 348 test_type = namedtuple('Type', ['command', 'criteria']) 349 test_type_ttff = test_type('Hot Start', self.hs_criteria) 350 test_type_pe = test_type('Hot Start', self.hs_ttff_pecriteria) 351 352 # Verify hot start TTFF results 353 begin_time = self.hot_start_ttff_ffpe_process(iteration, wait) 354 try: 355 ttff_data = process_ttff_by_gtw_gpstool(self.dut, begin_time, 356 self.simulator_location) 357 except: # pylint: disable=bare-except 358 self.log.warning('Fail to acquire TTFF data. Retry again.') 359 begin_time = self.hot_start_ttff_ffpe_process(iteration, wait) 360 ttff_data = process_ttff_by_gtw_gpstool(self.dut, begin_time, 361 self.simulator_location) 362 363 # Stop GTW GPSTool 364 self.dut.log.info("Stop GTW GPSTool") 365 start_gnss_by_gtw_gpstool(self.dut, state=False) 366 367 if sweep_enable: 368 self.analysis_ttff_ffpe(ttff_data, json_tag) 369 370 result_ttff = check_ttff_data(self.dut, 371 ttff_data, 372 ttff_mode=test_type_ttff.command, 373 criteria=test_type_ttff.criteria) 374 result_pe = check_ttff_pe(self.dut, 375 ttff_data, 376 ttff_mode=test_type_pe.command, 377 pe_criteria=test_type_pe.criteria) 378 if not result_ttff or not result_pe: 379 self.dut.log.warning('%s TTFF fails to reach ' 380 'designated criteria' % test_type_ttff.command) 381 self.dut.log.info("Stop GTW GPSTool") 382 return False 383 384 return True 385 386 def hot_start_gnss_power_sweep(self, 387 sweep_ls, 388 wait=0, 389 iteration=1, 390 sweep_enable=False, 391 title=''): 392 """ 393 GNSS simulator power sweep of hot start test. 394 395 Args: 396 sweep_ls: list of sweep parameters. 397 Type, tuple. 398 wait: Wait time before the power sweep. 399 Type, int. 400 Default, 0. 401 iteration: The iteration times of hot start test. 402 Type, int. 403 Default, 1. 404 sweep_enable: Indicator for power sweep. 405 It will be True only in GNSS sensitivity search case. 406 Type, bool. 407 Defaule, False. 408 title: the target log folder title for GNSS sensitivity search test items. 409 Type, str. 410 Default, ''. 411 Return: 412 Bool, gnss_pwr_params. 413 """ 414 415 # Calculate loop range list from gnss_simulator_power_level and sa_sensitivity 416 417 self.log.debug( 418 f'Start the GNSS simulator power sweep. The sweep tuple is [{sweep_ls}]' 419 ) 420 421 if sweep_enable: 422 self.start_gnss_and_wait(wait) 423 else: 424 self.dut.log.info('Wait %d seconds to start TTFF HS' % wait) 425 sleep(wait) 426 427 # Sweep GNSS simulator power level in range_ls. 428 # Do hot start for every power level. 429 # Check the TTFF result if it can pass the criteria. 430 gnss_pwr_params = None 431 previous_pwr_lvl = None 432 current_pwr_lvl = None 433 return_pwr_lvl = {} 434 for j, gnss_pwr_params in enumerate(sweep_ls[1]): 435 json_tag = f'{title}_' 436 current_pwr_lvl = gnss_pwr_params 437 if j == 0: 438 previous_pwr_lvl = current_pwr_lvl 439 for i, pwr in enumerate(gnss_pwr_params): 440 sat_sys = sweep_ls[0][i].get('sat').upper() 441 band = sweep_ls[0][i].get('band').upper() 442 # Set GNSS Simulator power level 443 self.gnss_simulator.ping_inst() 444 self.gnss_simulator.set_scenario_power(power_level=pwr, 445 sat_system=sat_sys, 446 freq_band=band) 447 self.log.info(f'Set {sat_sys} {band} with power {pwr}') 448 json_tag = json_tag + f'{sat_sys}_{band}_{pwr}' 449 # Wait 30 seconds if major power sweep level is changed. 450 wait = 0 451 if j > 0: 452 if current_pwr_lvl[0] != previous_pwr_lvl[0]: 453 wait = 30 454 # GNSS hot start test 455 if not self.gnss_hot_start_ttff_ffpe_test(iteration, sweep_enable, 456 json_tag, wait): 457 result = False 458 break 459 previous_pwr_lvl = current_pwr_lvl 460 result = True 461 for i, pwr in enumerate(previous_pwr_lvl): 462 key = f'{sweep_ls[0][i].get("sat").upper()}_{sweep_ls[0][i].get("band").upper()}' 463 return_pwr_lvl.setdefault(key, pwr) 464 return result, return_pwr_lvl 465 466 def gnss_init_power_setting(self, first_wait=180): 467 """ 468 GNSS initial power level setting. 469 Args: 470 first_wait: wait time after the cold start. 471 Type, int. 472 Default, 180. 473 Returns: 474 True if the process is done successully and hot start results pass criteria. 475 Raise: 476 TestFailure: fail TTFF test criteria. 477 """ 478 479 # Start and set GNSS simulator 480 self.start_set_gnss_power() 481 482 # Start 1st time cold start to obtain ephemeris 483 process_gnss_by_gtw_gpstool(self.dut, self.test_types['cs'].criteria) 484 485 # Read initial power sweep settings 486 if self.gnss_pwr_sweep_init_ls: 487 for sweep_ls in self.gnss_pwr_sweep_init_ls: 488 ret, gnss_pwr_lvl = self.hot_start_gnss_power_sweep( 489 sweep_ls, first_wait) 490 else: 491 self.log.warning('Skip initial power sweep.') 492 ret = False 493 gnss_pwr_lvl = None 494 495 return ret, gnss_pwr_lvl 496 497 def cell_power_sweep(self): 498 """ 499 Linear search cellular power level. Doing GNSS hot start with cellular coexistence 500 and checking if hot start can pass hot start criteria or not. 501 502 Returns: final power level of cellular power 503 """ 504 # Get parameters from user params. 505 ttft_iteration = self.user_params.get('ttff_iteration', 25) 506 wait_before_test = self.user_params.get('wait_before_test', 60) 507 wait_between_pwr = self.user_params.get('wait_between_pwr', 60) 508 power_th = self.start_pwr 509 510 # Generate the power sweep list. 511 power_search_ls = range_wi_end(self.dut, self.start_pwr, self.stop_pwr, 512 self.offset) 513 514 # Create gnss log folders for init and cellular sweep 515 gnss_init_log_dir = os.path.join(self.gnss_log_path, 'GNSS_init') 516 517 # Pull all exist GPStool logs into GNSS_init folder 518 get_gpstool_logs(self.dut, gnss_init_log_dir, False) 519 520 if power_search_ls: 521 # Run the cellular and GNSS coexistence test item. 522 for i, pwr_lvl in enumerate(power_search_ls): 523 self.log.info(f'Cellular power sweep loop: {i}') 524 self.log.info(f'Cellular target power: {pwr_lvl}') 525 526 # Enable GNSS to receive satellites' signals for "wait_between_pwr" seconds. 527 # Wait more time before 1st power level 528 if i == 0: 529 wait = wait_before_test 530 else: 531 wait = wait_between_pwr 532 self.start_gnss_and_wait(wait) 533 534 # Set cellular Tx power level. 535 eecoex_cmd = self.eecoex_func.format(pwr_lvl) 536 eecoex_cmd_file_str = eecoex_cmd.replace(',', '_') 537 execute_eecoexer_function(self.dut, eecoex_cmd) 538 539 # Get the last power level that can pass hots start ttff/ffpe spec. 540 if self.gnss_hot_start_ttff_ffpe_test(ttft_iteration, True, 541 eecoex_cmd_file_str): 542 if i + 1 == len(power_search_ls): 543 power_th = pwr_lvl 544 else: 545 if i == 0: 546 power_th = self.start_pwr 547 else: 548 power_th = power_search_ls[i - 1] 549 550 # Stop cellular Tx after a test cycle. 551 self.stop_coex_tx() 552 553 else: 554 # Run the stand alone test item. 555 self.start_gnss_and_wait(wait_between_pwr) 556 557 eecoex_cmd_file_str = 'no_cellular_coex' 558 self.gnss_hot_start_ttff_ffpe_test(ttft_iteration, True, 559 eecoex_cmd_file_str) 560 561 self.log.info(f'The GNSS WWAN coex celluar Tx power is {power_th}') 562 563 return power_th 564