• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import logging
22import math
23import os
24import re
25import scipy.stats
26import time
27from acts import asserts
28from acts import context
29from acts import base_test
30from acts import utils
31from acts.controllers.utils_lib import ssh
32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
33from acts_contrib.test_utils.wifi import ota_sniffer
34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
35from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
37from functools import partial
38
39
40class WifiTxPowerCheckTest(base_test.BaseTestClass):
41    """Class for ping-based Wifi performance tests.
42
43    This class implements WiFi ping performance tests such as range and RTT.
44    The class setups up the AP in the desired configurations, configures
45    and connects the phone to the AP, and runs  For an example config file to
46    run this test class see example_connectivity_performance_ap_sta.json.
47    """
48
49    TEST_TIMEOUT = 10
50    RSSI_POLL_INTERVAL = 0.2
51    SHORT_SLEEP = 1
52    MED_SLEEP = 5
53    MAX_CONSECUTIVE_ZEROS = 5
54    DISCONNECTED_PING_RESULT = {
55        'connected': 0,
56        'rtt': [],
57        'time_stamp': [],
58        'ping_interarrivals': [],
59        'packet_loss_percentage': 100
60    }
61
62    BRCM_SAR_MAPPING = {
63        0: 'disable',
64        1: 'head',
65        2: 'grip',
66        16: 'bt',
67        32: 'hotspot'
68    }
69
70    BAND_TO_CHANNEL_MAP = {
71        ('2g', 1): [1, 6, 11],
72        ('5g', 1): [36, 40, 44, 48],
73        ('5g', 2): [52, 56, 60, 64],
74        ('5g', 3): range(100, 148, 4),
75        ('5g', 4): [149, 153, 157, 161],
76        ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)],
77        ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)],
78        ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)],
79        ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)],
80        ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)],
81        ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)]
82    }
83
84    def __init__(self, controllers):
85        base_test.BaseTestClass.__init__(self, controllers)
86        self.testcase_metric_logger = (
87            BlackboxMappedMetricLogger.for_test_case())
88        self.testclass_metric_logger = (
89            BlackboxMappedMetricLogger.for_test_class())
90        self.publish_testcase_metrics = True
91        self.tests = self.generate_test_cases(
92            ap_power='standard',
93            channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'],
94            modes=['bw20', 'bw40', 'bw80', 'bw160'],
95            test_types=[
96                'test_tx_power',
97            ],
98            country_codes=['US', 'GB', 'JP'],
99            sar_states=range(0, 13))
100
101    def setup_class(self):
102        self.dut = self.android_devices[-1]
103        req_params = [
104            'tx_power_test_params', 'testbed_params', 'main_network',
105            'RetailAccessPoints', 'RemoteServer'
106        ]
107        opt_params = ['OTASniffer']
108        self.unpack_userparams(req_params, opt_params)
109        self.testclass_params = self.tx_power_test_params
110        self.num_atten = self.attenuators[0].instrument.num_atten
111        self.ping_server = ssh.connection.SshConnection(
112            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
113        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
114        if hasattr(self,
115                   'OTASniffer') and self.testbed_params['sniffer_enable']:
116            try:
117                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
118            except:
119                self.log.warning('Could not start sniffer. Disabling sniffs.')
120                self.testbed_params['sniffer_enable'] = 0
121        self.log.info('Access Point Configuration: {}'.format(
122            self.access_point.ap_settings))
123        self.log_path = os.path.join(logging.log_path, 'results')
124        os.makedirs(self.log_path, exist_ok=True)
125        self.atten_dut_chain_map = {}
126        self.testclass_results = []
127
128        # Turn WiFi ON
129        if self.testclass_params.get('airplane_mode', 1):
130            self.log.info('Turning on airplane mode.')
131            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
132                                'Can not turn on airplane mode.')
133        wutils.wifi_toggle_state(self.dut, True)
134        self.dut.droid.wifiEnableVerboseLogging(1)
135        asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
136                             "Failed to enable WiFi verbose logging.")
137
138        # decode nvram
139        self.nvram_sar_data = self.read_nvram_sar_data()
140        self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv'])
141
142    def teardown_class(self):
143        # Turn WiFi OFF and reset AP
144        self.access_point.teardown()
145        for dev in self.android_devices:
146            wutils.wifi_toggle_state(dev, False)
147            dev.go_to_sleep()
148        self.process_testclass_results()
149
150    def setup_test(self):
151        self.retry_flag = False
152
153    def teardown_test(self):
154        self.retry_flag = False
155
156    def on_retry(self):
157        """Function to control test logic on retried tests.
158
159        This function is automatically executed on tests that are being
160        retried. In this case the function resets wifi, toggles it off and on
161        and sets a retry_flag to enable further tweaking the test logic on
162        second attempts.
163        """
164        self.retry_flag = True
165        for dev in self.android_devices:
166            wutils.reset_wifi(dev)
167            wutils.toggle_wifi_off_and_on(dev)
168
169    def read_sar_csv(self, sar_csv):
170        """Reads SAR powers from CSV.
171
172        This function reads SAR powers from a CSV and generate a dictionary
173        with all programmed TX powers on a per band and regulatory domain
174        basis.
175
176        Args:
177            sar_csv: path to SAR data file.
178        Returns:
179            sar_powers: dict containing all SAR data
180        """
181
182        sar_powers = {}
183        sar_csv_data = []
184        with open(sar_csv, mode='r') as f:
185            reader = csv.DictReader(f)
186            for row in reader:
187                row['Sub-band Powers'] = [
188                    float(val) for key, val in row.items()
189                    if 'Sub-band' in key and val != ''
190                ]
191                sar_csv_data.append(row)
192
193        for row in sar_csv_data:
194            sar_powers.setdefault(int(row['Scenario Index']), {})
195            sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {})
196            sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band'])
197            sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault(
198                sar_row_key, {})
199            sar_powers[int(
200                row['Scenario Index'])]['SAR Powers'][sar_row_key][int(
201                    row['Chain'])] = row['Sub-band Powers']
202        return sar_powers
203
204    def read_nvram_sar_data(self):
205        """Reads SAR powers from NVRAM.
206
207        This function reads SAR powers from the NVRAM found on the DUT and
208        generates a dictionary with all programmed TX powers on a per band and
209        regulatory domain basis. NThe NVRAM file is chosen based on the build,
210        but if no NVRAM file is found matching the expected name, the default
211        NVRAM will be loaded. The choice of NVRAM is not guaranteed to be
212        correct.
213
214        Returns:
215            nvram_sar_data: dict containing all SAR data
216        """
217
218        self._read_sar_config_info()
219        try:
220            hardware_version = self.dut.adb.shell(
221                'getprop ro.boot.hardware.revision')
222            nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format(
223                hardware_version)
224            nvram = self.dut.adb.shell('cat {}'.format(nvram_path))
225        except:
226            nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal')
227        current_context = context.get_current_context().get_full_output_path()
228        file_path = os.path.join(current_context, 'nvram_file')
229        with open(file_path, 'w') as file:
230            file.write(nvram)
231        nvram_sar_data = {}
232        for line in nvram.splitlines():
233            if 'dynsar' in line:
234                sar_config, sar_powers = self._parse_nvram_sar_line(line)
235                nvram_sar_data[sar_config] = sar_powers
236        file_path = os.path.join(current_context, 'nvram_sar_data')
237        with open(file_path, 'w') as file:
238            json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4)
239
240        return nvram_sar_data
241
242    def _read_sar_config_info(self):
243        """Function to read SAR scenario mapping,
244
245        This function reads sar_config.info file which contains the mapping
246        of SAR scenarios to NVRAM data tables.
247        """
248
249        self.sar_state_mapping = collections.OrderedDict([(-1, {
250            "google_name":
251            'WIFI_POWER_SCENARIO_DISABLE'
252        }), (0, {
253            "google_name": 'WIFI_POWER_SCENARIO_DISABLE'
254        }), (1, {
255            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF'
256        }), (2, {
257            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON'
258        }), (3, {
259            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF'
260        }), (4, {
261            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON'
262        }), (5, {
263            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT'
264        }), (6, {
265            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT'
266        }), (7, {
267            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW'
268        }), (8, {
269            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT'
270        }), (9, {
271            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT'
272        }), (10, {
273            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT'
274        }), (11, {
275            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW'
276        }), (12, {
277            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW'
278        })])
279        sar_config_path = '/vendor/firmware/sarconfig.info'
280        sar_config = self.dut.adb.shell(
281            'cat {}'.format(sar_config_path)).splitlines()
282        sar_config = [line.split(',') for line in sar_config]
283        sar_config = [[int(x) for x in line] for line in sar_config]
284
285        for sar_state in sar_config:
286            self.sar_state_mapping[sar_state[0]]['brcm_index'] = (
287                self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2]))
288        current_context = context.get_current_context().get_full_output_path()
289        file_path = os.path.join(current_context, 'sarconfig')
290        with open(file_path, 'w') as file:
291            json.dump(wputils.serialize_dict(self.sar_state_mapping),
292                      file,
293                      indent=4)
294
295    def _parse_nvram_sar_line(self, sar_line):
296        """Helper function to decode SAR NVRAM data lines.
297
298        Args:
299            sar_line: single line of text from NVRAM file containing SAR data.
300        Returns:
301            sar_config: sar config referenced in this line
302            decoded_values: tx powers configured in this line
303        """
304
305        sar_config = collections.OrderedDict()
306        list_of_countries = ['fcc', 'jp']
307        try:
308            sar_config['country'] = next(country
309                                         for country in list_of_countries
310                                         if country in sar_line)
311        except:
312            sar_config['country'] = 'row'
313
314        list_of_sar_states = ['grip', 'bt', 'hotspot']
315        try:
316            sar_config['state'] = next(state for state in list_of_sar_states
317                                       if state in sar_line)
318        except:
319            sar_config['state'] = 'head'
320
321        list_of_bands = ['2g', '5g', '6g']
322        sar_config['band'] = next(band for band in list_of_bands
323                                  if band in sar_line)
324
325        sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo'
326        sar_config['airplane_mode'] = '_2=' in sar_line
327
328        sar_powers = sar_line.split('=')[1].split(',')
329        decoded_powers = []
330        for sar_power in sar_powers:
331            decoded_powers.append([
332                (int(sar_power[2:4], 16) & int('7f', 16)) / 4,
333                (int(sar_power[4:], 16) & int('7f', 16)) / 4
334            ])
335
336        return tuple(sar_config.values()), decoded_powers
337
338    def get_sar_power_from_nvram(self, testcase_params):
339        """Function to get current expected SAR power from nvram
340
341        This functions gets the expected SAR TX power from the DUT NVRAM data.
342        The SAR power is looked up based on the current channel and regulatory
343        domain,
344
345        Args:
346            testcase_params: dict containing channel, sar state, country code
347        Returns:
348            sar_config: current expected sar config
349            sar_powers: current expected sar powers
350        """
351
352        if testcase_params['country_code'] == 'US':
353            reg_domain = 'fcc'
354        elif testcase_params['country_code'] == 'JP':
355            reg_domain = 'jp'
356        else:
357            reg_domain = 'row'
358        for band, channels in self.BAND_TO_CHANNEL_MAP.items():
359            if testcase_params['channel'] in channels:
360                current_band = band[0]
361                sub_band_idx = band[1]
362                break
363        sar_config = (reg_domain, self.sar_state_mapping[
364            testcase_params['sar_state']]['brcm_index'][0], current_band,
365                      'mimo', self.sar_state_mapping[
366                          testcase_params['sar_state']]['brcm_index'][1])
367        sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1]
368        return sar_config, sar_powers
369
370    def get_sar_power_from_csv(self, testcase_params):
371        """Function to get current expected SAR power from CSV.
372
373        This functions gets the expected SAR TX power from the DUT NVRAM data.
374        The SAR power is looked up based on the current channel and regulatory
375        domain,
376
377        Args:
378            testcase_params: dict containing channel, sar state, country code
379        Returns:
380            sar_config: current expected sar config
381            sar_powers: current expected sar powers
382        """
383
384        if testcase_params['country_code'] == 'US':
385            reg_domain = 'fcc'
386        elif testcase_params['country_code'] == 'JP':
387            reg_domain = 'jp'
388        else:
389            reg_domain = 'row'
390        for band, channels in self.BAND_TO_CHANNEL_MAP.items():
391            if testcase_params['channel'] in channels:
392                current_band = band[0]
393                sub_band_idx = band[1]
394                break
395        sar_config = (reg_domain, 'mimo', current_band)
396        sar_powers = [
397            self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
398            [sar_config][0][sub_band_idx - 1],
399            self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
400            [sar_config][1][sub_band_idx - 1]
401        ]
402        return sar_config, sar_powers
403
404    def process_wl_curpower(self, wl_curpower_file, testcase_params):
405        """Function to parse wl_curpower output.
406
407        Args:
408            wl_curpower_file: path to curpower output file.
409            testcase_params: dict containing channel, sar state, country code
410        Returns:
411            wl_curpower_dict: dict formatted version of curpower data.
412        """
413
414        with open(wl_curpower_file, 'r') as file:
415            wl_curpower_out = file.read()
416
417        channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)')
418        bandwidth_regex = re.compile(
419            r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n')
420
421        channel = int(
422            re.search(channel_regex, wl_curpower_out).group('channel'))
423        bandwidth = int(
424            re.search(bandwidth_regex, wl_curpower_out).group('bandwidth'))
425
426        regulatory_limits = self.generate_regulatory_table(
427            wl_curpower_out, channel, bandwidth)
428        board_limits = self.generate_board_limit_table(wl_curpower_out,
429                                                       channel, bandwidth)
430        wl_curpower_dict = {
431            'channel': channel,
432            'bandwidth': bandwidth,
433            'country': testcase_params['country_code'],
434            'regulatory_limits': regulatory_limits,
435            'board_limits': board_limits
436        }
437        return wl_curpower_dict
438
439    def generate_regulatory_table(self, wl_curpower_out, channel, bw):
440        """"Helper function to generate regulatory limit table from curpower.
441
442        Args:
443            wl_curpower_out: curpower output
444            channel: current channel
445            bw: current bandwidth
446        Returns:
447            regulatory_table: dict with regulatory limits for current config
448        """
449
450        regulatory_group_map = {
451            'DSSS':
452            [('CCK', rate, 1)
453             for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]],
454            'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [
455                '{}Mbps'.format(mbps)
456                for mbps in [6, 9, 12, 18, 24, 36, 48, 54]
457            ]],
458            'MCS0_7_CDD1':
459            [(mode, rate, 1)
460             for (mode,
461                  rate) in itertools.product(['HT' + str(bw), 'VHT' +
462                                              str(bw)], range(0, 8))],
463            'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1),
464                               ('VHT' + str(bw), 9, 1)],
465            'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1),
466                                 ('VHT' + str(bw), 11, 1)],
467            'MCS8_15':
468            [(mode, rate - 8 * ('VHT' in mode), 2)
469             for (mode,
470                  rate) in itertools.product(['HT' + str(bw), 'VHT' +
471                                              str(bw)], range(8, 16))],
472            'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)],
473            'VHT10_11SS2': [('VHT' + str(bw), 10, 2),
474                            ('VHT' + str(bw), 11, 2)],
475            'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1)
476                                for rate in range(0, 12)],
477            'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2)
478                              for rate in range(0, 12)],
479        }
480        tx_power_regex = re.compile(
481            '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
482        )
483
484        regulatory_section_regex = re.compile(
485            r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:')
486        regulatory_list = re.search(regulatory_section_regex,
487                                    wl_curpower_out).group('regulatory_limits')
488        regulatory_list = re.findall(tx_power_regex, regulatory_list)
489        regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list}
490
491        bw_index = int(math.log(bw / 10, 2)) - 1
492        regulatory_table = collections.OrderedDict()
493        for regulatory_group, rates in regulatory_group_map.items():
494            for rate in rates:
495                reg_power = regulatory_dict.get(regulatory_group,
496                                                ['0', '0', '0', '0'])[bw_index]
497                regulatory_table[rate] = float(
498                    reg_power) if reg_power != '-' else 0
499        return regulatory_table
500
501    def generate_board_limit_table(self, wl_curpower_out, channel, bw):
502        """"Helper function to generate board limit table from curpower.
503
504        Args:
505            wl_curpower_out: curpower output
506            channel: current channel
507            bw: current bandwidth
508        Returns:
509            board_limit_table: dict with board limits for current config
510        """
511
512        tx_power_regex = re.compile(
513            '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
514        )
515
516        board_section_regex = re.compile(
517            r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:')
518        board_limits_list = re.search(board_section_regex,
519                                      wl_curpower_out).group('board_limits')
520        board_limits_list = re.findall(tx_power_regex, board_limits_list)
521        board_limits_dict = {
522            entry[0]: entry[2:]
523            for entry in board_limits_list
524        }
525
526        mcs_regex_list = [[
527            re.compile('DSSS'),
528            [('CCK', rate, 1)
529             for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]]
530        ], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]],
531                          [
532                              re.compile('MCS(?P<mcs>[0-7])_CDD1'),
533                              [('HT{}'.format(bw), '{}', 1),
534                               ('VHT{}'.format(bw), '{}', 1)]
535                          ],
536                          [
537                              re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'),
538                              [('VHT{}'.format(bw), '{}', 1)]
539                          ],
540                          [
541                              re.compile('VHT10_11SS1_CDD1'),
542                              [('VHT{}'.format(bw), '10', 1),
543                               ('VHT{}'.format(bw), '11', 1)]
544                          ],
545                          [
546                              re.compile('MCS(?P<mcs>[0-9]{2})'),
547                              [('HT{}'.format(bw), '{}', 2)]
548                          ],
549                          [
550                              re.compile('VHT(?P<mcs>[0-9])SS2'),
551                              [('VHT{}'.format(bw), '{}', 2)]
552                          ],
553                          [
554                              re.compile('VHT10_11SS2'),
555                              [('VHT{}'.format(bw), '10', 2),
556                               ('VHT{}'.format(bw), '11', 2)]
557                          ],
558                          [
559                              re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'),
560                              [('HE{}'.format(bw), '{}', 1)]
561                          ],
562                          [
563                              re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'),
564                              [('HE{}'.format(bw), '{}', 2)]
565                          ]]
566
567        bw_index = int(math.log(bw / 10, 2)) - 1
568        board_limit_table = collections.OrderedDict()
569        for mcs, board_limit in board_limits_dict.items():
570            for mcs_regex_tuple in mcs_regex_list:
571                mcs_match = re.match(mcs_regex_tuple[0], mcs)
572                if mcs_match:
573                    for possible_mcs in mcs_regex_tuple[1]:
574                        try:
575                            curr_mcs = (possible_mcs[0],
576                                        possible_mcs[1].format(
577                                            mcs_match.group('mcs')),
578                                        possible_mcs[2])
579                        except:
580                            curr_mcs = (possible_mcs[0], possible_mcs[1],
581                                        possible_mcs[2])
582                        board_limit_table[curr_mcs] = float(
583                            board_limit[bw_index]
584                        ) if board_limit[bw_index] != '-' else 0
585                    break
586        return board_limit_table
587
588    def pass_fail_check(self, result):
589        """Function to evaluate if current TX powqe matches CSV/NVRAM settings.
590
591        This function assesses whether the current TX power reported by the
592        DUT matches the powers programmed in NVRAM and CSV after applying the
593        correct TX power backoff used to account for CLPC errors.
594        """
595
596        if isinstance(result['testcase_params']['channel'],
597                      str) and '6g' in result['testcase_params']['channel']:
598            mode = 'HE' + str(result['testcase_params']['bandwidth'])
599        else:
600            mode = 'VHT' + str(result['testcase_params']['bandwidth'])
601        regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0,
602                                                                       2)]
603        if result['testcase_params']['sar_state'] == 0:
604            #get from wl_curpower
605            csv_powers = [30, 30]
606            nvram_powers = [30, 30]
607            sar_config = 'SAR DISABLED'
608        else:
609            sar_config, nvram_powers = self.get_sar_power_from_nvram(
610                result['testcase_params'])
611            csv_config, csv_powers = self.get_sar_power_from_csv(
612                result['testcase_params'])
613        self.log.info("SAR state: {} ({})".format(
614            result['testcase_params']['sar_state'],
615            self.sar_state_mapping[result['testcase_params']['sar_state']],
616        ))
617        self.log.info("Country Code: {}".format(
618            result['testcase_params']['country_code']))
619        self.log.info('BRCM SAR Table: {}'.format(sar_config))
620        expected_power = [
621            min([csv_powers[0], regulatory_power]) - 1.5,
622            min([csv_powers[1], regulatory_power]) - 1.5
623        ]
624        power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Expected Powers: {}, Reported Powers: {}".format(
625            nvram_powers, csv_powers, [regulatory_power] * 2, expected_power,
626            result['tx_powers'])
627        max_error = max([
628            abs(expected_power[idx] - result['tx_powers'][idx])
629            for idx in [0, 1]
630        ])
631        if max_error > 1:
632            asserts.fail(power_str)
633        else:
634            asserts.explicit_pass(power_str)
635
636    def process_testclass_results(self):
637        pass
638
639    def run_tx_power_test(self, testcase_params):
640        """Main function to test tx power.
641
642        The function sets up the AP & DUT in the correct channel and mode
643        configuration, starts ping traffic and queries the current TX power.
644
645        Args:
646            testcase_params: dict containing all test parameters
647        Returns:
648            test_result: dict containing ping results and other meta data
649        """
650        # Prepare results dict
651        llstats_obj = wputils.LinkLayerStats(
652            self.dut, self.testclass_params.get('llstats_enabled', True))
653        test_result = collections.OrderedDict()
654        test_result['testcase_params'] = testcase_params.copy()
655        test_result['test_name'] = self.current_test_name
656        test_result['ap_config'] = self.access_point.ap_settings.copy()
657        test_result['attenuation'] = testcase_params['atten_range']
658        test_result['fixed_attenuation'] = self.testbed_params[
659            'fixed_attenuation'][str(testcase_params['channel'])]
660        test_result['rssi_results'] = []
661        test_result['ping_results'] = []
662        test_result['llstats'] = []
663        # Setup sniffer
664        if self.testbed_params['sniffer_enable']:
665            self.sniffer.start_capture(
666                testcase_params['test_network'],
667                chan=testcase_params['channel'],
668                bw=testcase_params['bandwidth'],
669                duration=testcase_params['ping_duration'] *
670                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
671        # Run ping and sweep attenuation as needed
672        self.log.info('Starting ping.')
673        thread_future = wputils.get_ping_stats_nb(self.ping_server,
674                                                  self.dut_ip, 10, 0.02, 64)
675
676        for atten in testcase_params['atten_range']:
677            for attenuator in self.attenuators:
678                attenuator.set_atten(atten, strict=False, retry=True)
679            # Set mcs
680            if isinstance(testcase_params['channel'],
681                          int) and testcase_params['channel'] < 13:
682                self.dut.adb.shell('wl 2g_rate -v 0x2 -b {}'.format(
683                    testcase_params['bandwidth']))
684            elif isinstance(testcase_params['channel'],
685                            int) and testcase_params['channel'] > 13:
686                self.dut.adb.shell('wl 5g_rate -v 0x2 -b {}'.format(
687                    testcase_params['bandwidth']))
688            else:
689                self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format(
690                    testcase_params['bandwidth']))
691            # Set sar state
692            self.dut.adb.shell('halutil -sar enable {}'.format(
693                testcase_params['sar_state']))
694            # Refresh link layer stats
695            llstats_obj.update_stats()
696            # Check sar state
697            self.log.info('Current Country: {}'.format(
698                self.dut.adb.shell('wl country')))
699            # Dump last est power multiple times
700            chain_0_power = []
701            chain_1_power = []
702            for idx in range(30):
703                last_est_out = self.dut.adb.shell(
704                    "wl curpower | grep 'Last est. power'", ignore_status=True)
705                if "Last est. power" in last_est_out:
706                    per_chain_powers = last_est_out.split(
707                        ':')[1].strip().split('  ')
708                    per_chain_powers = [
709                        float(power) for power in per_chain_powers
710                    ]
711                    self.log.info(
712                        'Current Tx Powers = {}'.format(per_chain_powers))
713                    if per_chain_powers[0] > 0:
714                        chain_0_power.append(per_chain_powers[0])
715                    if per_chain_powers[1] > 0:
716                        chain_1_power.append(per_chain_powers[1])
717                time.sleep(0.25)
718            # Check if empty
719            if len(chain_0_power) == 0 or len(chain_1_power) == 0:
720                test_result['tx_powers'] = [0, 0]
721                tx_power_frequency = [100, 100]
722            else:
723                test_result['tx_powers'] = [
724                    scipy.stats.mode(chain_0_power).mode[0],
725                    scipy.stats.mode(chain_1_power).mode[0]
726                ]
727                tx_power_frequency = [
728                    100 * scipy.stats.mode(chain_0_power).count[0] /
729                    len(chain_0_power),
730                    100 * scipy.stats.mode(chain_1_power).count[0] /
731                    len(chain_0_power)
732                ]
733            self.log.info(
734                'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'.
735                format(test_result['tx_powers'], tx_power_frequency[0],
736                       tx_power_frequency[1]))
737            llstats_obj.update_stats()
738            curr_llstats = llstats_obj.llstats_incremental.copy()
739            test_result['llstats'].append(curr_llstats)
740            # DUMP wl curpower one
741            try:
742                wl_curpower = self.dut.adb.shell('wl curpower')
743            except:
744                time.sleep(0.25)
745                wl_curpower = self.dut.adb.shell('wl curpower',
746                                                 ignore_status=True)
747            current_context = context.get_current_context(
748            ).get_full_output_path()
749            wl_curpower_path = os.path.join(current_context,
750                                            'wl_curpower_output')
751            with open(wl_curpower_path, 'w') as file:
752                file.write(wl_curpower)
753            wl_curpower_dict = self.process_wl_curpower(
754                wl_curpower_path, testcase_params)
755            wl_curpower_path = os.path.join(current_context,
756                                            'wl_curpower_dict')
757            with open(wl_curpower_path, 'w') as file:
758                json.dump(wputils.serialize_dict(wl_curpower_dict),
759                          file,
760                          indent=4)
761            test_result['wl_curpower'] = wl_curpower_dict
762        thread_future.result()
763        if self.testbed_params['sniffer_enable']:
764            self.sniffer.stop_capture()
765        return test_result
766
767    def setup_ap(self, testcase_params):
768        """Sets up the access point in the configuration required by the test.
769
770        Args:
771            testcase_params: dict containing AP and other test params
772        """
773        band = self.access_point.band_lookup_by_channel(
774            testcase_params['channel'])
775        if '6G' in band:
776            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
777                testcase_params['channel'].strip('6g'))]
778        else:
779            if testcase_params['channel'] < 13:
780                frequency = wutils.WifiEnums.channel_2G_to_freq[
781                    testcase_params['channel']]
782            else:
783                frequency = wutils.WifiEnums.channel_5G_to_freq[
784                    testcase_params['channel']]
785        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
786            self.access_point.set_region(self.testbed_params['DFS_region'])
787        else:
788            self.access_point.set_region(self.testbed_params['default_region'])
789        self.access_point.set_channel(band, testcase_params['channel'])
790        self.access_point.set_bandwidth(band, testcase_params['mode'])
791        if 'low' in testcase_params['ap_power']:
792            self.log.info('Setting low AP power.')
793            self.access_point.set_power(
794                band, self.testclass_params['low_ap_tx_power'])
795        self.log.info('Access Point Configuration: {}'.format(
796            self.access_point.ap_settings))
797
798    def setup_dut(self, testcase_params):
799        """Sets up the DUT in the configuration required by the test.
800
801        Args:
802            testcase_params: dict containing AP and other test params
803        """
804        # Turn screen off to preserve battery
805        if self.testbed_params.get('screen_on',
806                                   False) or self.testclass_params.get(
807                                       'screen_on', False):
808            self.dut.droid.wakeLockAcquireDim()
809        else:
810            self.dut.go_to_sleep()
811        if wputils.validate_network(self.dut,
812                                    testcase_params['test_network']['SSID']):
813            current_country = self.dut.adb.shell('wl country')
814            self.log.info('Current country code: {}'.format(current_country))
815            if testcase_params['country_code'] in current_country:
816                self.log.info('Already connected to desired network')
817                self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses(
818                    'wlan0')[0]
819                return
820        testcase_params['test_network']['channel'] = testcase_params['channel']
821        wutils.wifi_toggle_state(self.dut, False)
822        wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
823        wutils.wifi_toggle_state(self.dut, True)
824        wutils.reset_wifi(self.dut)
825        if self.testbed_params.get('txbf_off', False):
826            wputils.disable_beamforming(self.dut)
827        wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
828        wutils.wifi_connect(self.dut,
829                            testcase_params['test_network'],
830                            num_of_tries=1,
831                            check_connectivity=True)
832        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
833
834    def setup_tx_power_test(self, testcase_params):
835        """Function that gets devices ready for the test.
836
837        Args:
838            testcase_params: dict containing test-specific parameters
839        """
840        # Configure AP
841        self.setup_ap(testcase_params)
842        # Set attenuator to 0 dB
843        for attenuator in self.attenuators:
844            attenuator.set_atten(0, strict=False, retry=True)
845        # Reset, configure, and connect DUT
846        self.setup_dut(testcase_params)
847
848    def check_skip_conditions(self, testcase_params):
849        """Checks if test should be skipped."""
850        # Check battery level before test
851        if not wputils.health_check(self.dut, 10):
852            asserts.skip('DUT battery level too low.')
853        if testcase_params[
854                'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported(
855                ):
856            asserts.skip('DUT does not support 6 GHz band.')
857        if not self.access_point.band_lookup_by_channel(
858                testcase_params['channel']):
859            asserts.skip('AP does not support requested channel.')
860
861    def compile_test_params(self, testcase_params):
862        """Function to compile all testcase parameters."""
863
864        self.check_skip_conditions(testcase_params)
865
866        band = self.access_point.band_lookup_by_channel(
867            testcase_params['channel'])
868        testcase_params['test_network'] = self.main_network[band]
869        testcase_params['attenuated_chain'] = -1
870        testcase_params.update(
871            ping_interval=self.testclass_params['ping_interval'],
872            ping_duration=self.testclass_params['ping_duration'],
873            ping_size=self.testclass_params['ping_size'],
874        )
875
876        testcase_params['atten_range'] = [0]
877        return testcase_params
878
879    def _test_ping(self, testcase_params):
880        """ Function that gets called for each range test case
881
882        The function gets called in each range test case. It customizes the
883        range test based on the test name of the test that called it
884
885        Args:
886            testcase_params: dict containing preliminary set of parameters
887        """
888        # Compile test parameters from config and test name
889        testcase_params = self.compile_test_params(testcase_params)
890        # Run ping test
891        self.setup_tx_power_test(testcase_params)
892        result = self.run_tx_power_test(testcase_params)
893        self.pass_fail_check(result)
894
895    def generate_test_cases(self, ap_power, channels, modes, test_types,
896                            country_codes, sar_states):
897        """Function that auto-generates test cases for a test class."""
898        test_cases = []
899        allowed_configs = {
900            20: [
901                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100,
902                116, 132, 140, 149, 153, 157, 161
903            ],
904            40: [36, 44, 100, 149, 157],
905            80: [36, 100, 149],
906            160: [36, '6g37', '6g117', '6g213']
907        }
908
909        for channel, mode, test_type, country_code, sar_state in itertools.product(
910                channels, modes, test_types, country_codes, sar_states):
911            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
912            if channel not in allowed_configs[bandwidth]:
913                continue
914            testcase_name = '{}_ch{}_{}_{}_sar_{}'.format(
915                test_type, channel, mode, country_code, sar_state)
916            testcase_params = collections.OrderedDict(
917                test_type=test_type,
918                ap_power=ap_power,
919                channel=channel,
920                mode=mode,
921                bandwidth=bandwidth,
922                country_code=country_code,
923                sar_state=sar_state)
924            setattr(self, testcase_name,
925                    partial(self._test_ping, testcase_params))
926            test_cases.append(testcase_name)
927        return test_cases
928