• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
3#   Copyright 2021 - The Android Open Source Project
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
9#       http://www.apache.org/licenses/LICENSE-2.0
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16'''GNSS Base Class for Blanking and Hot Start Sensitivity Search'''
18import os
19import re
20from time import sleep
21from collections import namedtuple
22from itertools import product
23from numpy import arange
24from pandas import DataFrame, merge
25from acts.signals import TestError
26from acts.signals import TestFailure
27from acts.logger import epoch_to_log_line_timestamp
28from acts.context import get_current_context
29from acts_contrib.test_utils.gnss import LabTtffTestBase as lttb
30from acts_contrib.test_utils.gnss.LabTtffTestBase import glob_re
31from acts_contrib.test_utils.gnss.gnss_test_utils import launch_eecoexer
32from acts_contrib.test_utils.gnss.gnss_test_utils import execute_eecoexer_function
33from acts_contrib.test_utils.gnss.gnss_test_utils import start_gnss_by_gtw_gpstool
34from acts_contrib.test_utils.gnss.gnss_test_utils import get_current_epoch_time
35from acts_contrib.test_utils.gnss.gnss_test_utils import check_current_focus_app
36from acts_contrib.test_utils.gnss.gnss_test_utils import process_ttff_by_gtw_gpstool
37from acts_contrib.test_utils.gnss.gnss_test_utils import check_ttff_data
38from acts_contrib.test_utils.gnss.gnss_test_utils import process_gnss_by_gtw_gpstool
39from acts_contrib.test_utils.gnss.dut_log_test_utils import get_gpstool_logs
40from acts_contrib.test_utils.gnss.gnss_testlog_utils import parse_gpstool_ttfflog_to_df
43def range_wi_end(dut, start, stop, step):
44    """
45    Generate a list of data from start to stop with the step. The list includes start and stop value
46    and also supports floating point.
47    Args:
48        dut: An AndroidDevice object.
49        start: start value.
50            Type, int or float.
51        stop: stop value.
52            Type, int or float.
53        step: step value.
54            Type, int or float.
55    Returns:
56        range_ls: the list of data.
57    """
58    if step == 0:
59        dut.log.warn('Step is 0. Return empty list')
60        range_ls = []
61    else:
62        if start == stop:
63            range_ls = [stop]
64        else:
65            range_ls = list(arange(start, stop, step))
66            if len(range_ls) > 0:
67                if (step < 0 and range_ls[-1] > stop) or (step > 0 and
68                                                          range_ls[-1] < stop):
69                    range_ls.append(stop)
70    dut.log.debug(f'The range list is: {range_ls}')
71    return range_ls
74def check_ttff_pe(dut, ttff_data, ttff_mode, pe_criteria):
75    """Verify all TTFF results from ttff_data.
77    Args:
78        dut: An AndroidDevice object.
79        ttff_data: TTFF data of secs, position error and signal strength.
80        ttff_mode: TTFF Test mode for current test item.
81        pe_criteria: Criteria for current test item.
83    """
84    ret = True
85    no_iteration = len(ttff_data.keys())
86    dut.log.info(
87        f'{no_iteration} iterations of TTFF {ttff_mode} tests finished.')
88    dut.log.info(f'{ttff_mode} PASS criteria is {pe_criteria} meters')
89    dut.log.debug(f'{ttff_mode} TTFF data: {ttff_data}')
91    if len(ttff_data.keys()) == 0:
92        dut.log.error("GTW_GPSTool didn't process TTFF properly.")
93        raise TestFailure("GTW_GPSTool didn't process TTFF properly.")
95    if any(
96            float(ttff_data[key].ttff_pe) >= pe_criteria
97            for key in ttff_data.keys()):
98        dut.log.error(
99            f'One or more TTFF {ttff_mode} are over test criteria {pe_criteria} meters'
100        )
101        ret = False
102    else:
103        dut.log.info(
104            f'All TTFF {ttff_mode} are within test criteria {pe_criteria} meters.'
105        )
106        ret = True
107    return ret
110class GnssBlankingBase(lttb.LabTtffTestBase):
111    """ LAB GNSS Cellular Coex Tx Power Sweep TTFF/FFPE Tests"""
112    GNSS_PWR_SWEEP = 'gnss_pwr_sweep'
113    CELL_PWR_SWEEP = 'cell_pwr_sweep'
115    def __init__(self, controllers):
116        """ Initializes class attributes. """
117        super().__init__(controllers)
118        self.eecoex_func = ''
119        self.start_pwr = 10
120        self.stop_pwr = 24
121        self.offset = 1
122        self.result_cell_pwr = 10
123        self.gsm_sweep_params = None
124        self.lte_tdd_pc3_sweep_params = None
125        self.lte_tdd_pc2_sweep_params = None
126        self.coex_stop_cmd = None
127        self.scen_sweep = False
128        self.gnss_pwr_sweep_init_ls = []
129        self.gnss_pwr_sweep_fine_sweep_ls = []
131    def setup_class(self):
132        super().setup_class()
134        # Required parameters
135        req_params = [self.GNSS_PWR_SWEEP]
136        self.unpack_userparams(req_param_names=req_params)
137        self.unpack_gnss_pwr_sweep()
139        # Optional parameters
140        cell_sweep_params = self.user_params.get(self.CELL_PWR_SWEEP, [])
142        if cell_sweep_params:
143            self.gsm_sweep_params = cell_sweep_params.get("GSM", [10, 33, 1])
144            self.lte_tdd_pc3_sweep_params = cell_sweep_params.get(
145                "LTE_TDD_PC3", [10, 24, 1])
146            self.lte_tdd_pc2_sweep_params = cell_sweep_params.get(
147                "LTE_TDD_PC2", [10, 26, 1])
149    def setup_test(self):
150        super().setup_test()
151        launch_eecoexer(self.dut)
153        # Set DUT temperature the limit to 60 degree
154        self.dut.adb.shell(
155            'setprop persist.com.google.eecoexer.cellular.temperature_limit 60')
157        # Get current context full path to create the log folder.
158        cur_test_item_dir = get_current_context().get_full_output_path()
159        self.gnss_log_path = os.path.join(self.log_path, cur_test_item_dir)
160        os.makedirs(self.gnss_log_path, exist_ok=True)
162        ## Start GNSS chip log
163        self.start_dut_gnss_log()
165    def teardown_test(self):
166        super().teardown_test()
167        # Set gnss_vendor_log_path based on GNSS solution vendor.
168        gnss_vendor_log_path = os.path.join(self.gnss_log_path,
169                                            self.diag_option)
170        os.makedirs(gnss_vendor_log_path, exist_ok=True)
172        # Stop GNSS chip log and pull the logs to local file system
173        self.stop_and_pull_dut_gnss_log(gnss_vendor_log_path)
175        # Stop cellular Tx and close GPStool and EEcoexer APPs.
176        self.stop_coex_tx()
177        self.log.debug('Close GPStool APP')
178        self.dut.force_stop_apk("com.android.gpstool")
179        self.log.debug('Close EEcoexer APP')
180        self.dut.force_stop_apk("com.google.eecoexer")
182    def derive_sweep_list(self, data):
183        """
184        Derive sweep list from config
185        Args:
186            data: GNSS simulator scenario power setting.
187                type, dictionary.
188        """
189        match_tag = r'(?P<sat>[a-z]+)_(?P<band>[a-z]+\d\S*)'
190        sweep_all_ls = []
191        set_all_ls = []
192        regex_match = re.compile(match_tag)
193        method = data.get('method')
194        for key, value in data.items():
195            result = regex_match.search(key)
196            if result:
197                set_all_ls.append(result.groupdict())
198                sweep_all_ls.append(
199                    range_wi_end(self.dut, value[0], value[1], value[2]))
200        if method == 'product':
201            swp_result_ls = list(product(*sweep_all_ls))
202        else:
203            swp_result_ls = list(zip(*sweep_all_ls))
205        self.log.debug(f'set_all_ls: {set_all_ls}')
206        self.log.debug(f'swp_result_ls: {swp_result_ls}')
207        return set_all_ls, swp_result_ls
209    def unpack_gnss_pwr_sweep(self):
210        """ Unpack gnss_pwr_sweep and construct sweep parameters
211        """
213        for key, value in self.gnss_pwr_sweep.items():
214            if key == 'init':
215                self.gnss_pwr_sweep_init_ls = []
216                self.log.info(f'Sweep: {value}')
217                result = self.derive_sweep_list(value)
218                self.gnss_pwr_sweep_init_ls.append(result)
219            elif key == 'fine_sweep':
220                self.gnss_pwr_sweep_fine_sweep_ls = []
221                self.log.info(f'Sweep: {value}')
222                result = self.derive_sweep_list(value)
223                self.gnss_pwr_sweep_fine_sweep_ls.append(result)
224            else:
225                self.log.error(f'{key} is a unsupported key in gnss_pwr_sweep.')
227    def stop_coex_tx(self):
228        """
229        Stop EEcoexer Tx power.
230        """
231        # Stop cellular Tx by EEcoexer.
232        if self.coex_stop_cmd:
233            self.log.info(f'Stop EEcoexer Test Command: {self.coex_stop_cmd}')
234            execute_eecoexer_function(self.dut, self.coex_stop_cmd)
236    def analysis_ttff_ffpe(self, ttff_data, json_tag=''):
237        """
238        Pull logs and parsing logs into json file.
239        Args:
240            ttff_data: ttff_data from test results.
241                Type, list.
242            json_tag: tag for parsed json file name.
243                Type, str.
244        """
245        # Create log directory.
246        gps_log_path = os.path.join(self.gnss_log_path,
247                                    'Cell_Pwr_Sweep_Results')
249        # Pull logs of GTW GPStool.
250        get_gpstool_logs(self.dut, gps_log_path, False)
252        # Parsing the log of GTW GPStool into pandas dataframe.
253        target_dir = os.path.join(gps_log_path, 'GPSLogs', 'files')
254        gps_api_log_ls = glob_re(self.dut, target_dir, r'GNSS_\d+')
255        latest_gps_api_log = max(gps_api_log_ls, key=os.path.getctime)
256        self.log.info(f'Get latest GPStool log is: {latest_gps_api_log}')
257        df_ttff_ffpe = DataFrame(
258            parse_gpstool_ttfflog_to_df(latest_gps_api_log))
259        # Add test case, TTFF and FFPE data into the dataframe.
260        ttff_dict = {}
261        for i in ttff_data:
262            data = ttff_data[i]._asdict()
263            ttff_dict[i] = dict(data)
265        ttff_data_df = DataFrame(ttff_dict).transpose()
266        ttff_data_df = ttff_data_df[[
267            'ttff_loop', 'ttff_sec', 'ttff_pe', 'ttff_haccu'
268        ]]
269        try:
270            df_ttff_ffpe = merge(df_ttff_ffpe,
271                                 ttff_data_df,
272                                 left_on='loop',
273                                 right_on='ttff_loop')
274        except:  # pylint: disable=bare-except
275            self.log.warning("Can't merge ttff_data and df.")
276        df_ttff_ffpe['test_case'] = json_tag
278        json_file = f'gps_log_{json_tag}.json'
279        ttff_data_json_file = f'gps_log_{json_tag}_ttff_data.json'
280        json_path = os.path.join(gps_log_path, json_file)
281        ttff_data_json_path = os.path.join(gps_log_path, ttff_data_json_file)
282        # Save dataframe into json file.
283        df_ttff_ffpe.to_json(json_path, orient='table', index=False)
284        ttff_data_df.to_json(ttff_data_json_path, orient='table', index=False)
286    def hot_start_ttff_ffpe_process(self, iteration, wait):
287        """
288        Function to run hot start TTFF/FFPE by GTW GPSTool
289        Args:
290            iteration: TTFF/FFPE test iteration.
291                type, integer.
292            wait: wait time before the hot start TTFF/FFPE test.
293                type, integer.
294        """
295        # Start GTW GPStool.
296        self.dut.log.info("Restart GTW GPSTool")
297        start_gnss_by_gtw_gpstool(self.dut, state=True)
298        if wait > 0:
299            self.log.info(
300                f'Wait for {wait} seconds before TTFF to acquire data.')
301            sleep(wait)
302        # Get current time and convert to human readable format
303        begin_time = get_current_epoch_time()
304        log_begin_time = epoch_to_log_line_timestamp(begin_time)
305        self.dut.log.debug(f'Start time is {log_begin_time}')
307        # Run hot start TTFF
308        for i in range(3):
309            self.log.info(f'Start hot start attempt {i + 1}')
310            self.dut.adb.shell(
311                f'am broadcast -a com.android.gpstool.ttff_action '
312                f'--es ttff hs --es cycle {iteration} --ez raninterval False')
313            sleep(1)
314            if self.dut.search_logcat(
315                    "act=com.android.gpstool.start_test_action", begin_time):
316                self.dut.log.info("Send TTFF start_test_action successfully.")
317                break
318        else:
319            check_current_focus_app(self.dut)
320            raise TestError("Fail to send TTFF start_test_action.")
321        return begin_time
323    def gnss_hot_start_ttff_ffpe_test(self,
324                                      iteration,
325                                      sweep_enable=False,
326                                      json_tag='',
327                                      wait=0):
328        """
329        GNSS hot start ttff ffpe tset
331        Args:
332            iteration: hot start TTFF test iteration.
333                    Type, int.
334                    Default, 1.
335            sweep_enable: Indicator for the function to check if it is run by cell_power_sweep()
336                    Type, bool.
337                    Default, False.
338            json_tag: if the function is run by cell_power_sweep(), the function would use
339                    this as a part of file name to save TTFF and FFPE results into json file.
340                    Type, str.
341                    Default, ''.
342            wait: wait time before ttff test.
343                    Type, int.
344                    Default, 0.
345        Raise:
346            TestError: fail to send TTFF start_test_action.
347        """
348        test_type = namedtuple('Type', ['command', 'criteria'])
349        test_type_ttff = test_type('Hot Start', self.hs_criteria)
350        test_type_pe = test_type('Hot Start', self.hs_ttff_pecriteria)
352        # Verify hot start TTFF results
353        begin_time = self.hot_start_ttff_ffpe_process(iteration, wait)
354        try:
355            ttff_data = process_ttff_by_gtw_gpstool(self.dut, begin_time,
356                                                    self.simulator_location)
357        except:  # pylint: disable=bare-except
358            self.log.warning('Fail to acquire TTFF data. Retry again.')
359            begin_time = self.hot_start_ttff_ffpe_process(iteration, wait)
360            ttff_data = process_ttff_by_gtw_gpstool(self.dut, begin_time,
361                                                    self.simulator_location)
363        # Stop GTW GPSTool
364        self.dut.log.info("Stop GTW GPSTool")
365        start_gnss_by_gtw_gpstool(self.dut, state=False)
367        if sweep_enable:
368            self.analysis_ttff_ffpe(ttff_data, json_tag)
370        result_ttff = check_ttff_data(self.dut,
371                                      ttff_data,
372                                      ttff_mode=test_type_ttff.command,
373                                      criteria=test_type_ttff.criteria)
374        result_pe = check_ttff_pe(self.dut,
375                                  ttff_data,
376                                  ttff_mode=test_type_pe.command,
377                                  pe_criteria=test_type_pe.criteria)
378        if not result_ttff or not result_pe:
379            self.dut.log.warning('%s TTFF fails to reach '
380                                 'designated criteria' % test_type_ttff.command)
381            self.dut.log.info("Stop GTW GPSTool")
382            return False
384        return True
386    def hot_start_gnss_power_sweep(self,
387                                   sweep_ls,
388                                   wait=0,
389                                   iteration=1,
390                                   sweep_enable=False,
391                                   title=''):
392        """
393        GNSS simulator power sweep of hot start test.
395        Args:
396            sweep_ls: list of sweep parameters.
397                    Type, tuple.
398            wait: Wait time before the power sweep.
399                    Type, int.
400                    Default, 0.
401            iteration: The iteration times of hot start test.
402                    Type, int.
403                    Default, 1.
404            sweep_enable: Indicator for power sweep.
405                          It will be True only in GNSS sensitivity search case.
406                    Type, bool.
407                    Defaule, False.
408            title: the target log folder title for GNSS sensitivity search test items.
409                    Type, str.
410                    Default, ''.
411        Return:
412            Bool, gnss_pwr_params.
413        """
415        # Calculate loop range list from gnss_simulator_power_level and sa_sensitivity
417        self.log.debug(
418            f'Start the GNSS simulator power sweep. The sweep tuple is [{sweep_ls}]'
419        )
421        if sweep_enable:
422            self.start_gnss_and_wait(wait)
423        else:
424            self.dut.log.info('Wait %d seconds to start TTFF HS' % wait)
425            sleep(wait)
427        # Sweep GNSS simulator power level in range_ls.
428        # Do hot start for every power level.
429        # Check the TTFF result if it can pass the criteria.
430        gnss_pwr_params = None
431        previous_pwr_lvl = None
432        current_pwr_lvl = None
433        return_pwr_lvl = {}
434        for j, gnss_pwr_params in enumerate(sweep_ls[1]):
435            json_tag = f'{title}_'
436            current_pwr_lvl = gnss_pwr_params
437            if j == 0:
438                previous_pwr_lvl = current_pwr_lvl
439            for i, pwr in enumerate(gnss_pwr_params):
440                sat_sys = sweep_ls[0][i].get('sat').upper()
441                band = sweep_ls[0][i].get('band').upper()
442                # Set GNSS Simulator power level
443                self.gnss_simulator.ping_inst()
444                self.gnss_simulator.set_scenario_power(power_level=pwr,
445                                                       sat_system=sat_sys,
446                                                       freq_band=band)
447                self.log.info(f'Set {sat_sys} {band} with power {pwr}')
448                json_tag = json_tag + f'{sat_sys}_{band}_{pwr}'
449            # Wait 30 seconds if major power sweep level is changed.
450            wait = 0
451            if j > 0:
452                if current_pwr_lvl[0] != previous_pwr_lvl[0]:
453                    wait = 30
454            # GNSS hot start test
455            if not self.gnss_hot_start_ttff_ffpe_test(iteration, sweep_enable,
456                                                      json_tag, wait):
457                result = False
458                break
459            previous_pwr_lvl = current_pwr_lvl
460        result = True
461        for i, pwr in enumerate(previous_pwr_lvl):
462            key = f'{sweep_ls[0][i].get("sat").upper()}_{sweep_ls[0][i].get("band").upper()}'
463            return_pwr_lvl.setdefault(key, pwr)
464        return result, return_pwr_lvl
466    def gnss_init_power_setting(self, first_wait=180):
467        """
468        GNSS initial power level setting.
469        Args:
470            first_wait: wait time after the cold start.
471                        Type, int.
472                        Default, 180.
473        Returns:
474            True if the process is done successully and hot start results pass criteria.
475        Raise:
476            TestFailure: fail TTFF test criteria.
477        """
479        # Start and set GNSS simulator
480        self.start_set_gnss_power()
482        # Start 1st time cold start to obtain ephemeris
483        process_gnss_by_gtw_gpstool(self.dut, self.test_types['cs'].criteria)
485        # Read initial power sweep settings
486        if self.gnss_pwr_sweep_init_ls:
487            for sweep_ls in self.gnss_pwr_sweep_init_ls:
488                ret, gnss_pwr_lvl = self.hot_start_gnss_power_sweep(
489                    sweep_ls, first_wait)
490        else:
491            self.log.warning('Skip initial power sweep.')
492            ret = False
493            gnss_pwr_lvl = None
495        return ret, gnss_pwr_lvl
497    def cell_power_sweep(self):
498        """
499        Linear search cellular power level. Doing GNSS hot start with cellular coexistence
500        and checking if hot start can pass hot start criteria or not.
502        Returns: final power level of cellular power
503        """
504        # Get parameters from user params.
505        ttft_iteration = self.user_params.get('ttff_iteration', 25)
506        wait_before_test = self.user_params.get('wait_before_test', 60)
507        wait_between_pwr = self.user_params.get('wait_between_pwr', 60)
508        power_th = self.start_pwr
510        # Generate the power sweep list.
511        power_search_ls = range_wi_end(self.dut, self.start_pwr, self.stop_pwr,
512                                       self.offset)
514        # Create gnss log folders for init and cellular sweep
515        gnss_init_log_dir = os.path.join(self.gnss_log_path, 'GNSS_init')
517        # Pull all exist GPStool logs into GNSS_init folder
518        get_gpstool_logs(self.dut, gnss_init_log_dir, False)
520        if power_search_ls:
521            # Run the cellular and GNSS coexistence test item.
522            for i, pwr_lvl in enumerate(power_search_ls):
523                self.log.info(f'Cellular power sweep loop: {i}')
524                self.log.info(f'Cellular target power: {pwr_lvl}')
526                # Enable GNSS to receive satellites' signals for "wait_between_pwr" seconds.
527                # Wait more time before 1st power level
528                if i == 0:
529                    wait = wait_before_test
530                else:
531                    wait = wait_between_pwr
532                self.start_gnss_and_wait(wait)
534                # Set cellular Tx power level.
535                eecoex_cmd = self.eecoex_func.format(pwr_lvl)
536                eecoex_cmd_file_str = eecoex_cmd.replace(',', '_')
537                execute_eecoexer_function(self.dut, eecoex_cmd)
539                # Get the last power level that can pass hots start ttff/ffpe spec.
540                if self.gnss_hot_start_ttff_ffpe_test(ttft_iteration, True,
541                                                      eecoex_cmd_file_str):
542                    if i + 1 == len(power_search_ls):
543                        power_th = pwr_lvl
544                else:
545                    if i == 0:
546                        power_th = self.start_pwr
547                    else:
548                        power_th = power_search_ls[i - 1]
550                # Stop cellular Tx after a test cycle.
551                self.stop_coex_tx()
553        else:
554            # Run the stand alone test item.
555            self.start_gnss_and_wait(wait_between_pwr)
557            eecoex_cmd_file_str = 'no_cellular_coex'
558            self.gnss_hot_start_ttff_ffpe_test(ttft_iteration, True,
559                                               eecoex_cmd_file_str)
561        self.log.info(f'The GNSS WWAN coex celluar Tx power is {power_th}')
563        return power_th