• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.5
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import time
18import re
19import statistics
20from datetime import datetime
21from acts import utils
22from acts import signals
23from acts.base_test import BaseTestClass
24from acts_contrib.test_utils.gnss.testtracker_util import log_testtracker_uuid
25from acts_contrib.test_utils.tel.tel_logging_utils import start_adb_tcpdump
26from acts_contrib.test_utils.tel.tel_logging_utils import stop_adb_tcpdump
27from acts_contrib.test_utils.tel.tel_logging_utils import get_tcpdump_log
28from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode
29from acts_contrib.test_utils.gnss import gnss_test_utils as gutils
30
31CONCURRENCY_TYPE = {
32    "gnss": "GNSS location received",
33    "gnss_meas": "GNSS measurement received",
34    "ap_location": "reportLocation"
35}
36
37GPS_XML_CONFIG = {
38    "CS": [
39        '    IgnorePosition=\"true\"\n', '    IgnoreEph=\"true\"\n',
40        '    IgnoreTime=\"true\"\n', '    AsstIgnoreLto=\"true\"\n',
41        '    IgnoreJniTime=\"true\"\n'
42    ],
43    "WS": [
44        '    IgnorePosition=\"true\"\n', '    AsstIgnoreLto=\"true\"\n',
45        '    IgnoreJniTime=\"true\"\n'
46    ],
47    "HS": []
48}
49
50ONCHIP_CONFIG = [
51    '    EnableOnChipStopNotification=\"1\"\n',
52    '    EnableOnChipStopNotification=\"2\"\n'
53]
54
55
56class GnssConcurrencyTest(BaseTestClass):
57    """ GNSS Concurrency TTFF Tests. """
58
59    def setup_class(self):
60        super().setup_class()
61        self.ad = self.android_devices[0]
62        req_params = [
63            "standalone_cs_criteria", "chre_tolerate_rate", "qdsp6m_path",
64            "outlier_criteria", "max_outliers", "pixel_lab_location",
65            "max_interval", "onchip_interval", "ttff_test_cycle"
66        ]
67        self.unpack_userparams(req_param_names=req_params)
68        gutils._init_device(self.ad)
69        self.ad.adb.shell("setprop persist.vendor.radio.adb_log_on 0")
70        self.ad.adb.shell("sync")
71
72    def setup_test(self):
73        gutils.log_current_epoch_time(self.ad, "test_start_time")
74        log_testtracker_uuid(self.ad, self.current_test_name)
75        gutils.clear_logd_gnss_qxdm_log(self.ad)
76        gutils.start_pixel_logger(self.ad)
77        start_adb_tcpdump(self.ad)
78        # related properties
79        gutils.check_location_service(self.ad)
80        gutils.get_baseband_and_gms_version(self.ad)
81        self.load_chre_nanoapp()
82
83    def teardown_test(self):
84        gutils.stop_pixel_logger(self.ad)
85        stop_adb_tcpdump(self.ad)
86        gutils.log_current_epoch_time(self.ad, "test_end_time")
87
88    def on_fail(self, test_name, begin_time):
89        self.ad.take_bug_report(test_name, begin_time)
90        gutils.get_gnss_qxdm_log(self.ad, self.qdsp6m_path)
91        get_tcpdump_log(self.ad, test_name, begin_time)
92
93    def is_brcm_test(self):
94        """ Check the test is for BRCM and skip if not. """
95        if gutils.check_chipset_vendor_by_qualcomm(self.ad):
96            raise signals.TestSkip("Not BRCM chipset. Skip the test.")
97
98    def load_chre_nanoapp(self):
99        """ Load CHRE nanoapp to target Android Device. """
100        for _ in range(0, 3):
101            try:
102                self.ad.log.info("Start to load the nanoapp")
103                cmd = "chre_power_test_client load"
104                if gutils.is_device_wearable(self.ad):
105                    extra_cmd = "tcm /vendor/etc/chre/power_test_tcm.so"
106                    cmd = " ".join([cmd, extra_cmd])
107                res = self.ad.adb.shell(cmd)
108                if "result 1" in res:
109                    self.ad.log.info("Nano app loaded successfully")
110                    break
111            except Exception as e:
112                self.ad.log.warning("Nano app loaded fail: %s" % e)
113                gutils.reboot(self.ad)
114        else:
115            raise signals.TestError("Failed to load CHRE nanoapp")
116
117    def enable_chre(self, interval_sec):
118        """ Enable or disable gnss concurrency via nanoapp.
119
120        Args:
121            interval_sec: an int for frequency, set 0 as disable.
122        """
123        if interval_sec == 0:
124            self.ad.log.info(f"Stop CHRE request")
125        else:
126            self.ad.log.info(
127                f"Initiate CHRE with {interval_sec} seconds interval")
128        interval_msec = interval_sec * 1000
129        cmd = "chre_power_test_client"
130        option = "enable %d" % interval_msec if interval_msec != 0 else "disable"
131
132        for type in CONCURRENCY_TYPE.keys():
133            if "ap" not in type:
134                self.ad.adb.shell(" ".join([cmd, type, option]))
135
136    def parse_concurrency_result(self,
137                                 begin_time,
138                                 request_type,
139                                 criteria,
140                                 exam_lower=True):
141        """ Parse the test result with given time and criteria.
142
143        Args:
144            begin_time: test begin time.
145            request_type: str for location request type.
146            criteria: dictionary for test criteria.
147            exam_lower: a boolean to identify the lower bond or not.
148        Return: List for the failure and outlier loops and results.
149        """
150        results = []
151        failures = []
152        outliers = []
153        upper_bound = criteria * (
154            1 + self.chre_tolerate_rate) + self.outlier_criteria
155        lower_bound = criteria * (
156            1 - self.chre_tolerate_rate) - self.outlier_criteria
157        search_results = self.ad.search_logcat(CONCURRENCY_TYPE[request_type],
158                                               begin_time)
159        if not search_results:
160            raise signals.TestFailure(f"No log entry found for keyword:"
161                                      f"{CONCURRENCY_TYPE[request_type]}")
162
163        for i in range(len(search_results) - 1):
164            target = search_results[i + 1]
165            timedelt = target["datetime_obj"] - search_results[i]["datetime_obj"]
166            timedelt_sec = timedelt.total_seconds()
167            results.append(timedelt_sec)
168            res_tag = ""
169            if timedelt_sec > upper_bound:
170                failures.append(timedelt_sec)
171                res_tag = "Failure"
172            elif timedelt_sec < lower_bound and exam_lower:
173                failures.append(timedelt_sec)
174                res_tag = "Failure"
175            elif timedelt_sec > criteria * (1 + self.chre_tolerate_rate):
176                outliers.append(timedelt_sec)
177                res_tag = "Outlier"
178            if res_tag:
179                self.ad.log.error(
180                    f"[{res_tag}][{target['time_stamp']}]:{timedelt_sec:.2f} sec"
181                )
182
183        res_summary = " ".join([str(res) for res in results[1:]])
184        self.ad.log.info(f"[{request_type}]Overall Result: {res_summary}")
185        log_prefix = f"TestResult {request_type}"
186        self.ad.log.info(f"{log_prefix}_samples {len(search_results)}")
187        self.ad.log.info(f"{log_prefix}_outliers {len(outliers)}")
188        self.ad.log.info(f"{log_prefix}_failures {len(failures)}")
189        self.ad.log.info(f"{log_prefix}_max_time {max(results):.2f}")
190
191        return outliers, failures, results
192
193    def run_gnss_concurrency_test(self, criteria, test_duration):
194        """ Execute GNSS concurrency test steps.
195
196        Args:
197            criteria: int for test criteria.
198            test_duration: int for test duration.
199        """
200        self.enable_chre(criteria["gnss"])
201        TTFF_criteria = criteria["ap_location"] + self.standalone_cs_criteria
202        gutils.process_gnss_by_gtw_gpstool(
203            self.ad, TTFF_criteria, freq=criteria["ap_location"])
204        self.ad.log.info("Tracking 10 sec to prevent flakiness.")
205        time.sleep(10)
206        begin_time = datetime.now()
207        self.ad.log.info(f"Test Start at {begin_time}")
208        time.sleep(test_duration)
209        self.enable_chre(0)
210        gutils.start_gnss_by_gtw_gpstool(self.ad, False)
211        self.validate_location_test_result(begin_time, criteria)
212
213    def run_chre_only_test(self, criteria, test_duration):
214        """ Execute CHRE only test steps.
215
216        Args:
217            criteria: int for test criteria.
218            test_duration: int for test duration.
219        """
220        begin_time = datetime.now()
221        self.ad.log.info(f"Test Start at {begin_time}")
222        self.enable_chre(criteria["gnss"])
223        time.sleep(test_duration)
224        self.enable_chre(0)
225        self.validate_location_test_result(begin_time, criteria)
226
227    def validate_location_test_result(self, begin_time, request):
228        """ Validate GNSS concurrency/CHRE test results.
229
230        Args:
231            begin_time: epoc of test begin time
232            request: int for test criteria.
233        """
234        results = {}
235        outliers = {}
236        failures = {}
237        failure_log = ""
238        for request_type, criteria in request.items():
239            criteria = criteria if criteria > 1 else 1
240            self.ad.log.info("Starting process %s result" % request_type)
241            outliers[request_type], failures[request_type], results[
242                request_type] = self.parse_concurrency_result(
243                    begin_time, request_type, criteria, exam_lower=False)
244            if not results[request_type]:
245                failure_log += "[%s] Fail to find location report.\n" % request_type
246            if len(failures[request_type]) > 0:
247                failure_log += "[%s] Test exceeds criteria(%.2f): %.2f\n" % (
248                    request_type, criteria, max(failures[request_type]))
249            if len(outliers[request_type]) > self.max_outliers:
250                failure_log += "[%s] Outliers excceds max amount: %d\n" % (
251                    request_type, len(outliers[request_type]))
252
253        if failure_log:
254            raise signals.TestFailure(failure_log)
255
256    def run_engine_switching_test(self, freq):
257        """ Conduct engine switching test with given frequency.
258
259        Args:
260            freq: a list identify source1/2 frequency [freq1, freq2]
261        """
262        request = {"ap_location": self.max_interval}
263        begin_time = datetime.now()
264        self.ad.droid.startLocating(freq[0] * 1000, 0)
265        time.sleep(10)
266        for i in range(5):
267            gutils.start_gnss_by_gtw_gpstool(self.ad, True, freq=freq[1])
268            time.sleep(10)
269            gutils.start_gnss_by_gtw_gpstool(self.ad, False)
270        self.ad.droid.stopLocating()
271        self.calculate_position_error(begin_time)
272        self.validate_location_test_result(begin_time, request)
273
274    def calculate_position_error(self, begin_time):
275        """ Calculate the position error for the logcat search results.
276
277        Args:
278            begin_time: test begin time
279        """
280        position_errors = []
281        search_results = self.ad.search_logcat("reportLocation", begin_time)
282        for result in search_results:
283            # search for location like 25.000717,121.455163
284            regex = r"(-?\d{1,5}\.\d{1,10}),\s*(-?\d{1,5}\.\d{1,10})"
285            result = re.search(regex, result["log_message"])
286            if not result:
287                raise ValueError("lat/lon does not found. "
288                                 f"original text: {result['log_message']}")
289            lat = float(result.group(1))
290            lon = float(result.group(2))
291            pe = gutils.calculate_position_error(lat, lon,
292                                                 self.pixel_lab_location)
293            position_errors.append(pe)
294        self.ad.log.info("TestResult max_position_error %.2f" %
295                         max(position_errors))
296
297    def get_chre_ttff(self, interval_sec, duration):
298        """ Get the TTFF for the first CHRE report.
299
300        Args:
301            interval_sec: test interval in seconds for CHRE.
302            duration: test duration.
303        """
304        begin_time = datetime.now()
305        self.ad.log.info(f"Test start at {begin_time}")
306        self.enable_chre(interval_sec)
307        time.sleep(duration)
308        self.enable_chre(0)
309        for type, pattern in CONCURRENCY_TYPE.items():
310            if type == "ap_location":
311                continue
312            search_results = self.ad.search_logcat(pattern, begin_time)
313            if not search_results:
314                raise signals.TestFailure(
315                    f"Unable to receive {type} report in {duration} seconds")
316            else:
317                ttff_stamp = search_results[0]["datetime_obj"]
318                self.ad.log.info(search_results[0]["time_stamp"])
319                ttff = (ttff_stamp - begin_time).total_seconds()
320                self.ad.log.info(f"CHRE {type} TTFF = {ttff}")
321
322    def add_ttff_conf(self, conf_type):
323        """ Add mcu ttff config to gps.xml
324
325        Args:
326            conf_type: a string identify the config type
327        """
328        search_line_tag = "<gll\n"
329        append_line_str = GPS_XML_CONFIG[conf_type]
330        gutils.bcm_gps_xml_update_option(self.ad, "add", search_line_tag,
331                                         append_line_str)
332
333    def update_gps_conf(self, search_line, update_line):
334        """ Update gps.xml content
335
336        Args:
337            search_line: target content
338            update_line: update content
339        """
340        gutils.bcm_gps_xml_update_option(
341            self.ad, "update", search_line, update_txt=update_line)
342
343    def delete_gps_conf(self, conf_type):
344        """ Delete gps.xml content
345
346        Args:
347            conf_type: a string identify the config type
348        """
349        search_line_tag = GPS_XML_CONFIG[conf_type]
350        gutils.bcm_gps_xml_update_option(
351            self.ad, "delete", delete_txt=search_line_tag)
352
353    def preset_mcu_test(self, mode):
354        """ Preseting mcu test with config and device state
355
356        mode:
357            mode: a string identify the test type
358        """
359        self.add_ttff_conf(mode)
360        gutils.push_lhd_overlay(self.ad)
361        toggle_airplane_mode(self.ad.log, self.ad, new_state=True)
362        self.update_gps_conf(ONCHIP_CONFIG[1], ONCHIP_CONFIG[0])
363        gutils.clear_aiding_data_by_gtw_gpstool(self.ad)
364        self.ad.reboot(self.ad)
365        self.load_chre_nanoapp()
366
367    def reset_mcu_test(self, mode):
368        """ Resetting mcu test with config and device state
369
370        mode:
371            mode: a string identify the test type
372        """
373        self.delete_gps_conf(mode)
374        self.update_gps_conf(ONCHIP_CONFIG[0], ONCHIP_CONFIG[1])
375
376    def get_mcu_ttff(self):
377        """ Get mcu ttff seconds
378
379        Return:
380            ttff: a float identify ttff seconds
381        """
382        search_res = ""
383        search_pattern = "$PGLOR,0,FIX"
384        ttff_regex = r"FIX,(.*)\*"
385        cmd_base = "chre_power_test_client gnss tcm"
386        cmd_start = " ".join([cmd_base, "enable 1000"])
387        cmd_stop = " ".join([cmd_base, "disable"])
388        begin_time = datetime.now()
389
390        self.ad.log.info("Send CHRE enable to DUT")
391        self.ad.adb.shell(cmd_start)
392        for i in range(6):
393            search_res = self.ad.search_logcat(search_pattern, begin_time)
394            if search_res:
395                break
396            time.sleep(10)
397        else:
398            self.ad.adb.shell(cmd_stop)
399            self.ad.log.error("Unable to get mcu ttff in 60 seconds")
400            return 60
401        self.ad.adb.shell(cmd_stop)
402
403        res = re.search(ttff_regex, search_res[0]["log_message"])
404        ttff = res.group(1)
405        self.ad.log.info(f"TTFF = {ttff}")
406        return float(ttff)
407
408    def run_mcu_ttff_loops(self, mode, loops):
409        """ Run mcu ttff with given mode and loops
410
411        Args:
412            mode: a string identify mode cs/ws/hs.
413            loops: a int to identify the number of loops
414        """
415        ttff_res = []
416        for i in range(10):
417            ttff = self.get_mcu_ttff()
418            self.ad.log.info(f"{mode} TTFF LOOP{i+1} = {ttff}")
419            ttff_res.append(ttff)
420            time.sleep(10)
421        self.ad.log.info(f"TestResult {mode}_MAX_TTFF {max(ttff_res)}")
422        self.ad.log.info(
423            f"TestResult {mode}_AVG_TTFF {statistics.mean(ttff_res)}")
424
425    # Concurrency Test Cases
426    def test_gnss_concurrency_location_1_chre_1(self):
427        test_duration = 15
428        criteria = {"ap_location": 1, "gnss": 1, "gnss_meas": 1}
429        self.run_gnss_concurrency_test(criteria, test_duration)
430
431    def test_gnss_concurrency_location_1_chre_8(self):
432        test_duration = 30
433        criteria = {"ap_location": 1, "gnss": 8, "gnss_meas": 8}
434        self.run_gnss_concurrency_test(criteria, test_duration)
435
436    def test_gnss_concurrency_location_15_chre_8(self):
437        test_duration = 60
438        criteria = {"ap_location": 15, "gnss": 8, "gnss_meas": 8}
439        self.run_gnss_concurrency_test(criteria, test_duration)
440
441    def test_gnss_concurrency_location_61_chre_1(self):
442        test_duration = 120
443        criteria = {"ap_location": 61, "gnss": 1, "gnss_meas": 1}
444        self.run_gnss_concurrency_test(criteria, test_duration)
445
446    def test_gnss_concurrency_location_61_chre_10(self):
447        test_duration = 120
448        criteria = {"ap_location": 61, "gnss": 10, "gnss_meas": 10}
449        self.run_gnss_concurrency_test(criteria, test_duration)
450
451    # CHRE Only Test Cases
452    def test_gnss_chre_1(self):
453        test_duration = 15
454        criteria = {"gnss": 1, "gnss_meas": 1}
455        self.run_chre_only_test(criteria, test_duration)
456
457    def test_gnss_chre_8(self):
458        test_duration = 30
459        criteria = {"gnss": 8, "gnss_meas": 8}
460        self.run_chre_only_test(criteria, test_duration)
461
462    # Interval tests
463    def test_variable_interval_via_chre(self):
464        test_duration = 10
465        intervals = [0.1, 0.5, 1.5]
466        for interval in intervals:
467            self.get_chre_ttff(interval, test_duration)
468
469    def test_variable_interval_via_framework(self):
470        test_duration = 10
471        intervals = [0, 0.5, 1.5]
472        for interval in intervals:
473            begin_time = datetime.now()
474            self.ad.droid.startLocating(interval * 1000, 0)
475            time.sleep(test_duration)
476            self.ad.droid.stopLocating()
477            criteria = interval if interval > 1 else 1
478            self.parse_concurrency_result(begin_time, "ap_location", criteria)
479
480    # Engine switching test
481    def test_gps_engine_switching_host_to_onchip(self):
482        self.is_brcm_test()
483        freq = [1, self.onchip_interval]
484        self.run_engine_switching_test(freq)
485
486    def test_gps_engine_switching_onchip_to_host(self):
487        self.is_brcm_test()
488        freq = [self.onchip_interval, 1]
489        self.run_engine_switching_test(freq)
490
491    def test_mcu_cs_ttff(self):
492        mode = "CS"
493        self.preset_mcu_test(mode)
494        self.run_mcu_ttff_loops(mode, self.ttff_test_cycle)
495        self.reset_mcu_test(mode)
496
497    def test_mcu_ws_ttff(self):
498        mode = "WS"
499        self.preset_mcu_test(mode)
500        self.run_mcu_ttff_loops(mode, self.ttff_test_cycle)
501        self.reset_mcu_test(mode)
502
503    def test_mcu_hs_ttff(self):
504        mode = "HS"
505        self.preset_mcu_test(mode)
506        self.run_mcu_ttff_loops(mode, self.ttff_test_cycle)
507        self.reset_mcu_test(mode)
508