1#!/usr/bin/env python3.5 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18import re 19import statistics 20from datetime import datetime 21from acts import utils 22from acts import signals 23from acts.base_test import BaseTestClass 24from acts_contrib.test_utils.gnss.testtracker_util import log_testtracker_uuid 25from acts_contrib.test_utils.tel.tel_logging_utils import start_adb_tcpdump 26from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump 27from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log 28from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode 29from acts_contrib.test_utils.gnss import gnss_test_utils as gutils 30 31CONCURRENCY_TYPE = { 32 "gnss": "GNSS location received", 33 "gnss_meas": "GNSS measurement received", 34 "ap_location": "reportLocation" 35} 36 37GPS_XML_CONFIG = { 38 "CS": [ 39 ' IgnorePosition=\"true\"\n', ' IgnoreEph=\"true\"\n', 40 ' IgnoreTime=\"true\"\n', ' AsstIgnoreLto=\"true\"\n', 41 ' IgnoreJniTime=\"true\"\n' 42 ], 43 "WS": [ 44 ' IgnorePosition=\"true\"\n', ' AsstIgnoreLto=\"true\"\n', 45 ' IgnoreJniTime=\"true\"\n' 46 ], 47 "HS": [] 48} 49 50ONCHIP_CONFIG = [ 51 ' EnableOnChipStopNotification=\"1\"\n', 52 ' EnableOnChipStopNotification=\"2\"\n' 53] 54 55 56class GnssConcurrencyTest(BaseTestClass): 57 """ GNSS Concurrency TTFF Tests. """ 58 59 def setup_class(self): 60 super().setup_class() 61 self.ad = self.android_devices[0] 62 req_params = [ 63 "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path", 64 "outlier_criteria", "max_outliers", "pixel_lab_location", 65 "max_interval", "onchip_interval", "ttff_test_cycle" 66 ] 67 self.unpack_userparams(req_param_names=req_params) 68 gutils._init_device(self.ad) 69 self.ad.adb.shell("setprop persist.vendor.radio.adb_log_on 0") 70 self.ad.adb.shell("sync") 71 72 def setup_test(self): 73 gutils.log_current_epoch_time(self.ad, "test_start_time") 74 log_testtracker_uuid(self.ad, self.current_test_name) 75 gutils.clear_logd_gnss_qxdm_log(self.ad) 76 gutils.start_pixel_logger(self.ad) 77 start_adb_tcpdump(self.ad) 78 # related properties 79 gutils.check_location_service(self.ad) 80 gutils.get_baseband_and_gms_version(self.ad) 81 self.load_chre_nanoapp() 82 83 def teardown_test(self): 84 gutils.stop_pixel_logger(self.ad) 85 stop_adb_tcpdump(self.ad) 86 gutils.log_current_epoch_time(self.ad, "test_end_time") 87 88 def on_fail(self, test_name, begin_time): 89 self.ad.take_bug_report(test_name, begin_time) 90 gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path) 91 get_tcpdump_log(self.ad, test_name, begin_time) 92 93 def is_brcm_test(self): 94 """ Check the test is for BRCM and skip if not. """ 95 if gutils.check_chipset_vendor_by_qualcomm(self.ad): 96 raise signals.TestSkip("Not BRCM chipset. Skip the test.") 97 98 def load_chre_nanoapp(self): 99 """ Load CHRE nanoapp to target Android Device. """ 100 for _ in range(0, 3): 101 try: 102 self.ad.log.info("Start to load the nanoapp") 103 cmd = "chre_power_test_client load" 104 if gutils.is_device_wearable(self.ad): 105 extra_cmd = "tcm /vendor/etc/chre/power_test_tcm.so" 106 cmd = " ".join([cmd, extra_cmd]) 107 res = self.ad.adb.shell(cmd) 108 if "result 1" in res: 109 self.ad.log.info("Nano app loaded successfully") 110 break 111 except Exception as e: 112 self.ad.log.warning("Nano app loaded fail: %s" % e) 113 gutils.reboot(self.ad) 114 else: 115 raise signals.TestError("Failed to load CHRE nanoapp") 116 117 def enable_chre(self, interval_sec): 118 """ Enable or disable gnss concurrency via nanoapp. 119 120 Args: 121 interval_sec: an int for frequency, set 0 as disable. 122 """ 123 if interval_sec == 0: 124 self.ad.log.info(f"Stop CHRE request") 125 else: 126 self.ad.log.info( 127 f"Initiate CHRE with {interval_sec} seconds interval") 128 interval_msec = interval_sec * 1000 129 cmd = "chre_power_test_client" 130 option = "enable %d" % interval_msec if interval_msec != 0 else "disable" 131 132 for type in CONCURRENCY_TYPE.keys(): 133 if "ap" not in type: 134 self.ad.adb.shell(" ".join([cmd, type, option])) 135 136 def parse_concurrency_result(self, 137 begin_time, 138 request_type, 139 criteria, 140 exam_lower=True): 141 """ Parse the test result with given time and criteria. 142 143 Args: 144 begin_time: test begin time. 145 request_type: str for location request type. 146 criteria: dictionary for test criteria. 147 exam_lower: a boolean to identify the lower bond or not. 148 Return: List for the failure and outlier loops and results. 149 """ 150 results = [] 151 failures = [] 152 outliers = [] 153 upper_bound = criteria * ( 154 1 + self.chre_tolerate_rate) + self.outlier_criteria 155 lower_bound = criteria * ( 156 1 - self.chre_tolerate_rate) - self.outlier_criteria 157 search_results = self.ad.search_logcat(CONCURRENCY_TYPE[request_type], 158 begin_time) 159 if not search_results: 160 raise signals.TestFailure(f"No log entry found for keyword:" 161 f"{CONCURRENCY_TYPE[request_type]}") 162 163 for i in range(len(search_results) - 1): 164 target = search_results[i + 1] 165 timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"] 166 timedelt_sec = timedelt.total_seconds() 167 results.append(timedelt_sec) 168 res_tag = "" 169 if timedelt_sec > upper_bound: 170 failures.append(timedelt_sec) 171 res_tag = "Failure" 172 elif timedelt_sec < lower_bound and exam_lower: 173 failures.append(timedelt_sec) 174 res_tag = "Failure" 175 elif timedelt_sec > criteria * (1 + self.chre_tolerate_rate): 176 outliers.append(timedelt_sec) 177 res_tag = "Outlier" 178 if res_tag: 179 self.ad.log.error( 180 f"[{res_tag}][{target['time_stamp']}]:{timedelt_sec:.2f} sec" 181 ) 182 183 res_summary = " ".join([str(res) for res in results[1:]]) 184 self.ad.log.info(f"[{request_type}]Overall Result: {res_summary}") 185 log_prefix = f"TestResult {request_type}" 186 self.ad.log.info(f"{log_prefix}_samples {len(search_results)}") 187 self.ad.log.info(f"{log_prefix}_outliers {len(outliers)}") 188 self.ad.log.info(f"{log_prefix}_failures {len(failures)}") 189 self.ad.log.info(f"{log_prefix}_max_time {max(results):.2f}") 190 191 return outliers, failures, results 192 193 def run_gnss_concurrency_test(self, criteria, test_duration): 194 """ Execute GNSS concurrency test steps. 195 196 Args: 197 criteria: int for test criteria. 198 test_duration: int for test duration. 199 """ 200 self.enable_chre(criteria["gnss"]) 201 TTFF_criteria = criteria["ap_location"] + self.standalone_cs_criteria 202 gutils.process_gnss_by_gtw_gpstool( 203 self.ad, TTFF_criteria, freq=criteria["ap_location"]) 204 self.ad.log.info("Tracking 10 sec to prevent flakiness.") 205 time.sleep(10) 206 begin_time = datetime.now() 207 self.ad.log.info(f"Test Start at {begin_time}") 208 time.sleep(test_duration) 209 self.enable_chre(0) 210 gutils.start_gnss_by_gtw_gpstool(self.ad, False) 211 self.validate_location_test_result(begin_time, criteria) 212 213 def run_chre_only_test(self, criteria, test_duration): 214 """ Execute CHRE only test steps. 215 216 Args: 217 criteria: int for test criteria. 218 test_duration: int for test duration. 219 """ 220 begin_time = datetime.now() 221 self.ad.log.info(f"Test Start at {begin_time}") 222 self.enable_chre(criteria["gnss"]) 223 time.sleep(test_duration) 224 self.enable_chre(0) 225 self.validate_location_test_result(begin_time, criteria) 226 227 def validate_location_test_result(self, begin_time, request): 228 """ Validate GNSS concurrency/CHRE test results. 229 230 Args: 231 begin_time: epoc of test begin time 232 request: int for test criteria. 233 """ 234 results = {} 235 outliers = {} 236 failures = {} 237 failure_log = "" 238 for request_type, criteria in request.items(): 239 criteria = criteria if criteria > 1 else 1 240 self.ad.log.info("Starting process %s result" % request_type) 241 outliers[request_type], failures[request_type], results[ 242 request_type] = self.parse_concurrency_result( 243 begin_time, request_type, criteria, exam_lower=False) 244 if not results[request_type]: 245 failure_log += "[%s] Fail to find location report.\n" % request_type 246 if len(failures[request_type]) > 0: 247 failure_log += "[%s] Test exceeds criteria(%.2f): %.2f\n" % ( 248 request_type, criteria, max(failures[request_type])) 249 if len(outliers[request_type]) > self.max_outliers: 250 failure_log += "[%s] Outliers excceds max amount: %d\n" % ( 251 request_type, len(outliers[request_type])) 252 253 if failure_log: 254 raise signals.TestFailure(failure_log) 255 256 def run_engine_switching_test(self, freq): 257 """ Conduct engine switching test with given frequency. 258 259 Args: 260 freq: a list identify source1/2 frequency [freq1, freq2] 261 """ 262 request = {"ap_location": self.max_interval} 263 begin_time = datetime.now() 264 self.ad.droid.startLocating(freq[0] * 1000, 0) 265 time.sleep(10) 266 for i in range(5): 267 gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=freq[1]) 268 time.sleep(10) 269 gutils.start_gnss_by_gtw_gpstool(self.ad, False) 270 self.ad.droid.stopLocating() 271 self.calculate_position_error(begin_time) 272 self.validate_location_test_result(begin_time, request) 273 274 def calculate_position_error(self, begin_time): 275 """ Calculate the position error for the logcat search results. 276 277 Args: 278 begin_time: test begin time 279 """ 280 position_errors = [] 281 search_results = self.ad.search_logcat("reportLocation", begin_time) 282 for result in search_results: 283 # search for location like 25.000717,121.455163 284 regex = r"(-?\d{1,5}\.\d{1,10}),\s*(-?\d{1,5}\.\d{1,10})" 285 result = re.search(regex, result["log_message"]) 286 if not result: 287 raise ValueError("lat/lon does not found. " 288 f"original text: {result['log_message']}") 289 lat = float(result.group(1)) 290 lon = float(result.group(2)) 291 pe = gutils.calculate_position_error(lat, lon, 292 self.pixel_lab_location) 293 position_errors.append(pe) 294 self.ad.log.info("TestResult max_position_error %.2f" % 295 max(position_errors)) 296 297 def get_chre_ttff(self, interval_sec, duration): 298 """ Get the TTFF for the first CHRE report. 299 300 Args: 301 interval_sec: test interval in seconds for CHRE. 302 duration: test duration. 303 """ 304 begin_time = datetime.now() 305 self.ad.log.info(f"Test start at {begin_time}") 306 self.enable_chre(interval_sec) 307 time.sleep(duration) 308 self.enable_chre(0) 309 for type, pattern in CONCURRENCY_TYPE.items(): 310 if type == "ap_location": 311 continue 312 search_results = self.ad.search_logcat(pattern, begin_time) 313 if not search_results: 314 raise signals.TestFailure( 315 f"Unable to receive {type} report in {duration} seconds") 316 else: 317 ttff_stamp = search_results[0]["datetime_obj"] 318 self.ad.log.info(search_results[0]["time_stamp"]) 319 ttff = (ttff_stamp - begin_time).total_seconds() 320 self.ad.log.info(f"CHRE {type} TTFF = {ttff}") 321 322 def add_ttff_conf(self, conf_type): 323 """ Add mcu ttff config to gps.xml 324 325 Args: 326 conf_type: a string identify the config type 327 """ 328 search_line_tag = "<gll\n" 329 append_line_str = GPS_XML_CONFIG[conf_type] 330 gutils.bcm_gps_xml_update_option(self.ad, "add", search_line_tag, 331 append_line_str) 332 333 def update_gps_conf(self, search_line, update_line): 334 """ Update gps.xml content 335 336 Args: 337 search_line: target content 338 update_line: update content 339 """ 340 gutils.bcm_gps_xml_update_option( 341 self.ad, "update", search_line, update_txt=update_line) 342 343 def delete_gps_conf(self, conf_type): 344 """ Delete gps.xml content 345 346 Args: 347 conf_type: a string identify the config type 348 """ 349 search_line_tag = GPS_XML_CONFIG[conf_type] 350 gutils.bcm_gps_xml_update_option( 351 self.ad, "delete", delete_txt=search_line_tag) 352 353 def preset_mcu_test(self, mode): 354 """ Preseting mcu test with config and device state 355 356 mode: 357 mode: a string identify the test type 358 """ 359 self.add_ttff_conf(mode) 360 gutils.push_lhd_overlay(self.ad) 361 toggle_airplane_mode(self.ad.log, self.ad, new_state=True) 362 self.update_gps_conf(ONCHIP_CONFIG[1], ONCHIP_CONFIG[0]) 363 gutils.clear_aiding_data_by_gtw_gpstool(self.ad) 364 self.ad.reboot(self.ad) 365 self.load_chre_nanoapp() 366 367 def reset_mcu_test(self, mode): 368 """ Resetting mcu test with config and device state 369 370 mode: 371 mode: a string identify the test type 372 """ 373 self.delete_gps_conf(mode) 374 self.update_gps_conf(ONCHIP_CONFIG[0], ONCHIP_CONFIG[1]) 375 376 def get_mcu_ttff(self): 377 """ Get mcu ttff seconds 378 379 Return: 380 ttff: a float identify ttff seconds 381 """ 382 search_res = "" 383 search_pattern = "$PGLOR,0,FIX" 384 ttff_regex = r"FIX,(.*)\*" 385 cmd_base = "chre_power_test_client gnss tcm" 386 cmd_start = " ".join([cmd_base, "enable 1000"]) 387 cmd_stop = " ".join([cmd_base, "disable"]) 388 begin_time = datetime.now() 389 390 self.ad.log.info("Send CHRE enable to DUT") 391 self.ad.adb.shell(cmd_start) 392 for i in range(6): 393 search_res = self.ad.search_logcat(search_pattern, begin_time) 394 if search_res: 395 break 396 time.sleep(10) 397 else: 398 self.ad.adb.shell(cmd_stop) 399 self.ad.log.error("Unable to get mcu ttff in 60 seconds") 400 return 60 401 self.ad.adb.shell(cmd_stop) 402 403 res = re.search(ttff_regex, search_res[0]["log_message"]) 404 ttff = res.group(1) 405 self.ad.log.info(f"TTFF = {ttff}") 406 return float(ttff) 407 408 def run_mcu_ttff_loops(self, mode, loops): 409 """ Run mcu ttff with given mode and loops 410 411 Args: 412 mode: a string identify mode cs/ws/hs. 413 loops: a int to identify the number of loops 414 """ 415 ttff_res = [] 416 for i in range(10): 417 ttff = self.get_mcu_ttff() 418 self.ad.log.info(f"{mode} TTFF LOOP{i+1} = {ttff}") 419 ttff_res.append(ttff) 420 time.sleep(10) 421 self.ad.log.info(f"TestResult {mode}_MAX_TTFF {max(ttff_res)}") 422 self.ad.log.info( 423 f"TestResult {mode}_AVG_TTFF {statistics.mean(ttff_res)}") 424 425 # Concurrency Test Cases 426 def test_gnss_concurrency_location_1_chre_1(self): 427 test_duration = 15 428 criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1} 429 self.run_gnss_concurrency_test(criteria, test_duration) 430 431 def test_gnss_concurrency_location_1_chre_8(self): 432 test_duration = 30 433 criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8} 434 self.run_gnss_concurrency_test(criteria, test_duration) 435 436 def test_gnss_concurrency_location_15_chre_8(self): 437 test_duration = 60 438 criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8} 439 self.run_gnss_concurrency_test(criteria, test_duration) 440 441 def test_gnss_concurrency_location_61_chre_1(self): 442 test_duration = 120 443 criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1} 444 self.run_gnss_concurrency_test(criteria, test_duration) 445 446 def test_gnss_concurrency_location_61_chre_10(self): 447 test_duration = 120 448 criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10} 449 self.run_gnss_concurrency_test(criteria, test_duration) 450 451 # CHRE Only Test Cases 452 def test_gnss_chre_1(self): 453 test_duration = 15 454 criteria = {"gnss": 1, "gnss_meas": 1} 455 self.run_chre_only_test(criteria, test_duration) 456 457 def test_gnss_chre_8(self): 458 test_duration = 30 459 criteria = {"gnss": 8, "gnss_meas": 8} 460 self.run_chre_only_test(criteria, test_duration) 461 462 # Interval tests 463 def test_variable_interval_via_chre(self): 464 test_duration = 10 465 intervals = [0.1, 0.5, 1.5] 466 for interval in intervals: 467 self.get_chre_ttff(interval, test_duration) 468 469 def test_variable_interval_via_framework(self): 470 test_duration = 10 471 intervals = [0, 0.5, 1.5] 472 for interval in intervals: 473 begin_time = datetime.now() 474 self.ad.droid.startLocating(interval * 1000, 0) 475 time.sleep(test_duration) 476 self.ad.droid.stopLocating() 477 criteria = interval if interval > 1 else 1 478 self.parse_concurrency_result(begin_time, "ap_location", criteria) 479 480 # Engine switching test 481 def test_gps_engine_switching_host_to_onchip(self): 482 self.is_brcm_test() 483 freq = [1, self.onchip_interval] 484 self.run_engine_switching_test(freq) 485 486 def test_gps_engine_switching_onchip_to_host(self): 487 self.is_brcm_test() 488 freq = [self.onchip_interval, 1] 489 self.run_engine_switching_test(freq) 490 491 def test_mcu_cs_ttff(self): 492 mode = "CS" 493 self.preset_mcu_test(mode) 494 self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) 495 self.reset_mcu_test(mode) 496 497 def test_mcu_ws_ttff(self): 498 mode = "WS" 499 self.preset_mcu_test(mode) 500 self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) 501 self.reset_mcu_test(mode) 502 503 def test_mcu_hs_ttff(self): 504 mode = "HS" 505 self.preset_mcu_test(mode) 506 self.run_mcu_ttff_loops(mode, self.ttff_test_cycle) 507 self.reset_mcu_test(mode) 508