• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import csv
19import itertools
20import json
21import logging
22import math
23import os
24import re
25import scipy.stats
26import time
27from acts import asserts
28from acts import context
29from acts import base_test
30from acts import utils
31from acts.controllers.utils_lib import ssh
32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger
33from acts_contrib.test_utils.wifi import ota_sniffer
34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils
35from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap
36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
37from functools import partial
38
39
40class WifiTxPowerCheckTest(base_test.BaseTestClass):
41    """Class for ping-based Wifi performance tests.
42
43    This class implements WiFi ping performance tests such as range and RTT.
44    The class setups up the AP in the desired configurations, configures
45    and connects the phone to the AP, and runs  For an example config file to
46    run this test class see example_connectivity_performance_ap_sta.json.
47    """
48
49    TEST_TIMEOUT = 10
50    RSSI_POLL_INTERVAL = 0.2
51    SHORT_SLEEP = 1
52    MED_SLEEP = 5
53    MAX_CONSECUTIVE_ZEROS = 5
54    DISCONNECTED_PING_RESULT = {
55        'connected': 0,
56        'rtt': [],
57        'time_stamp': [],
58        'ping_interarrivals': [],
59        'packet_loss_percentage': 100
60    }
61
62    BRCM_SAR_MAPPING = {
63        0: 'disable',
64        1: 'head',
65        2: 'grip',
66        16: 'bt',
67        32: 'hotspot'
68    }
69
70    BAND_TO_CHANNEL_MAP = {
71        ('2g', 1): [1, 6, 11],
72        ('5g', 1): [36, 40, 44, 48],
73        ('5g', 2): [52, 56, 60, 64],
74        ('5g', 3): range(100, 148, 4),
75        ('5g', 4): [149, 153, 157, 161],
76        ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)],
77        ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)],
78        ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)],
79        ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)],
80        ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)],
81        ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)]
82    }
83
84    def __init__(self, controllers):
85        base_test.BaseTestClass.__init__(self, controllers)
86        self.testcase_metric_logger = (
87            BlackboxMappedMetricLogger.for_test_case())
88        self.testclass_metric_logger = (
89            BlackboxMappedMetricLogger.for_test_class())
90        self.publish_testcase_metrics = True
91        self.tests = self.generate_test_cases(
92            ap_power='standard',
93            channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'],
94            modes=['bw20', 'bw40', 'bw80', 'bw160'],
95            test_types=[
96                'test_tx_power',
97            ],
98            country_codes=['US', 'GB', 'JP', 'CA', 'AU'],
99            sar_states=range(-1, 13))
100
101    def setup_class(self):
102        self.dut = self.android_devices[-1]
103        req_params = [
104            'tx_power_test_params', 'testbed_params', 'main_network',
105            'RetailAccessPoints', 'RemoteServer'
106        ]
107        opt_params = ['OTASniffer']
108        self.unpack_userparams(req_params, opt_params)
109        self.testclass_params = self.tx_power_test_params
110        self.num_atten = self.attenuators[0].instrument.num_atten
111        self.ping_server = ssh.connection.SshConnection(
112            ssh.settings.from_config(self.RemoteServer[0]['ssh_config']))
113        self.access_point = retail_ap.create(self.RetailAccessPoints)[0]
114        if hasattr(self,
115                   'OTASniffer') and self.testbed_params['sniffer_enable']:
116            try:
117                self.sniffer = ota_sniffer.create(self.OTASniffer)[0]
118            except:
119                self.log.warning('Could not start sniffer. Disabling sniffs.')
120                self.testbed_params['sniffer_enable'] = 0
121        self.log.info('Access Point Configuration: {}'.format(
122            self.access_point.ap_settings))
123        self.log_path = os.path.join(logging.log_path, 'results')
124        os.makedirs(self.log_path, exist_ok=True)
125        self.atten_dut_chain_map = {}
126        self.testclass_results = []
127
128        # Turn WiFi ON
129        if self.testclass_params.get('airplane_mode', 1):
130            self.log.info('Turning on airplane mode.')
131            asserts.assert_true(utils.force_airplane_mode(self.dut, True),
132                                'Can not turn on airplane mode.')
133        wutils.wifi_toggle_state(self.dut, True)
134        self.dut.droid.wifiEnableVerboseLogging(1)
135        asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1,
136                             "Failed to enable WiFi verbose logging.")
137
138        # decode nvram
139        try:
140            self.nvram_sar_data = self.read_nvram_sar_data()
141        except:
142            self.nvram_sar_data = None
143        if 'sar_csv' in self.testclass_params:
144            self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv'])
145        else:
146            self.csv_sar_data = None
147
148        # Configure test retries
149        self.user_params['retry_tests'] = [self.__class__.__name__]
150
151    def teardown_class(self):
152        # Turn WiFi OFF and reset AP
153        self.access_point.teardown()
154        for dev in self.android_devices:
155            wutils.wifi_toggle_state(dev, False)
156            dev.go_to_sleep()
157        self.process_testclass_results()
158
159    def setup_test(self):
160        self.retry_flag = False
161
162    def teardown_test(self):
163        self.retry_flag = False
164
165    def on_retry(self):
166        """Function to control test logic on retried tests.
167
168        This function is automatically executed on tests that are being
169        retried. In this case the function resets wifi, toggles it off and on
170        and sets a retry_flag to enable further tweaking the test logic on
171        second attempts.
172        """
173        self.retry_flag = True
174        for dev in self.android_devices:
175            wutils.reset_wifi(dev)
176            wutils.toggle_wifi_off_and_on(dev)
177
178    def read_sar_csv(self, sar_csv):
179        """Reads SAR powers from CSV.
180
181        This function reads SAR powers from a CSV and generate a dictionary
182        with all programmed TX powers on a per band and regulatory domain
183        basis.
184
185        Args:
186            sar_csv: path to SAR data file.
187        Returns:
188            sar_powers: dict containing all SAR data
189        """
190
191        sar_powers = {}
192        sar_csv_data = []
193        with open(sar_csv, mode='r') as f:
194            reader = csv.DictReader(f)
195            for row in reader:
196                row['Sub-band Powers'] = [
197                    float(val) for key, val in row.items()
198                    if 'Sub-band' in key and val != ''
199                ]
200                sar_csv_data.append(row)
201
202        for row in sar_csv_data:
203            sar_powers.setdefault(int(row['Scenario Index']), {})
204            sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {})
205            sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band'])
206            sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault(
207                sar_row_key, {})
208            sar_powers[int(
209                row['Scenario Index'])]['SAR Powers'][sar_row_key][int(
210                    row['Chain'])] = row['Sub-band Powers']
211        return sar_powers
212
213    def read_nvram_sar_data(self):
214        """Reads SAR powers from NVRAM.
215
216        This function reads SAR powers from the NVRAM found on the DUT and
217        generates a dictionary with all programmed TX powers on a per band and
218        regulatory domain basis. NThe NVRAM file is chosen based on the build,
219        but if no NVRAM file is found matching the expected name, the default
220        NVRAM will be loaded. The choice of NVRAM is not guaranteed to be
221        correct.
222
223        Returns:
224            nvram_sar_data: dict containing all SAR data
225        """
226
227        self._read_sar_config_info()
228        try:
229            hardware_version = self.dut.adb.shell(
230                'getprop ro.boot.hardware.revision')
231            nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format(
232                hardware_version)
233            nvram = self.dut.adb.shell('cat {}'.format(nvram_path))
234        except:
235            nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal')
236        current_context = context.get_current_context().get_full_output_path()
237        file_path = os.path.join(current_context, 'nvram_file')
238        with open(file_path, 'w') as file:
239            file.write(nvram)
240        nvram_sar_data = {}
241        for line in nvram.splitlines():
242            if 'dynsar' in line:
243                sar_config, sar_powers = self._parse_nvram_sar_line(line)
244                nvram_sar_data[sar_config] = sar_powers
245        file_path = os.path.join(current_context, 'nvram_sar_data')
246        with open(file_path, 'w') as file:
247            json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4)
248
249        return nvram_sar_data
250
251    def _read_sar_config_info(self):
252        """Function to read SAR scenario mapping,
253
254        This function reads sar_config.info file which contains the mapping
255        of SAR scenarios to NVRAM data tables.
256        """
257
258        self.sar_state_mapping = collections.OrderedDict([(-2, {
259            "google_name":
260            'WIFI_POWER_SCENARIO_INVALID'
261        }), (-1, {
262            "google_name": 'WIFI_POWER_SCENARIO_DISABLE'
263        }), (0, {
264            "google_name": 'WIFI_POWER_SCENARIO_VOICE_CALL'
265        }), (1, {
266            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF'
267        }), (2, {
268            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON'
269        }), (3, {
270            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF'
271        }), (4, {
272            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON'
273        }), (5, {
274            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT'
275        }), (6, {
276            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT'
277        }), (7, {
278            "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW'
279        }), (8, {
280            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT'
281        }), (9, {
282            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT'
283        }), (10, {
284            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT'
285        }), (11, {
286            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW'
287        }), (12, {
288            "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW'
289        })])
290        sar_config_path = '/vendor/firmware/sarconfig.info'
291        sar_config = self.dut.adb.shell(
292            'cat {}'.format(sar_config_path)).splitlines()
293        sar_config = [line.split(',') for line in sar_config]
294        sar_config = [[int(x) for x in line] for line in sar_config]
295
296        for sar_state in sar_config:
297            self.sar_state_mapping[sar_state[0]]['brcm_index'] = (
298                self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2]))
299        current_context = context.get_current_context().get_full_output_path()
300        file_path = os.path.join(current_context, 'sarconfig')
301        with open(file_path, 'w') as file:
302            json.dump(wputils.serialize_dict(self.sar_state_mapping),
303                      file,
304                      indent=4)
305
306    def _parse_nvram_sar_line(self, sar_line):
307        """Helper function to decode SAR NVRAM data lines.
308
309        Args:
310            sar_line: single line of text from NVRAM file containing SAR data.
311        Returns:
312            sar_config: sar config referenced in this line
313            decoded_values: tx powers configured in this line
314        """
315
316        sar_config = collections.OrderedDict()
317        list_of_countries = ['fcc', 'jp', 'ca']
318        try:
319            sar_config['country'] = next(country
320                                         for country in list_of_countries
321                                         if country in sar_line.split('=')[0])
322        except:
323            sar_config['country'] = 'row'
324
325        list_of_sar_states = ['grip', 'bt', 'hotspot']
326        try:
327            sar_config['state'] = next(state for state in list_of_sar_states
328                                       if state in sar_line.split('=')[0])
329        except:
330            sar_config['state'] = 'head'
331
332        list_of_bands = ['2g', '5g', '6g']
333        sar_config['band'] = next(band for band in list_of_bands
334                                  if band in sar_line.split('=')[0])
335
336        sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo'
337        sar_config['airplane_mode'] = '_2=' in sar_line
338
339        sar_powers = sar_line.split('=')[1].split(',')
340        decoded_powers = []
341        for sar_power in sar_powers:
342            # Note that core 0 and 1 are flipped in the NVRAM entries
343            decoded_powers.append([
344                (int(sar_power[4:], 16) & int('7f', 16)) / 4,
345                (int(sar_power[2:4], 16) & int('7f', 16)) / 4
346            ])
347
348        return tuple(sar_config.values()), decoded_powers
349
350    def get_sar_power_from_nvram(self, testcase_params):
351        """Function to get current expected SAR power from nvram
352
353        This functions gets the expected SAR TX power from the DUT NVRAM data.
354        The SAR power is looked up based on the current channel and regulatory
355        domain,
356
357        Args:
358            testcase_params: dict containing channel, sar state, country code
359        Returns:
360            sar_config: current expected sar config
361            sar_powers: current expected sar powers
362        """
363
364        if testcase_params['country_code'] == 'US':
365            reg_domain = 'fcc'
366        elif testcase_params['country_code'] == 'JP':
367            reg_domain = 'jp'
368        elif testcase_params['country_code'] == 'CA':
369            reg_domain = 'ca'
370        else:
371            reg_domain = 'row'
372        for band, channels in self.BAND_TO_CHANNEL_MAP.items():
373            if testcase_params['channel'] in channels:
374                current_band = band[0]
375                sub_band_idx = band[1]
376                break
377        sar_config = (reg_domain, self.sar_state_mapping[
378            testcase_params['sar_state']]['brcm_index'][0], current_band,
379                      'mimo', self.sar_state_mapping[
380                          testcase_params['sar_state']]['brcm_index'][1])
381        if self.nvram_sar_data:
382            sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1]
383        else:
384            sar_powers = [float('nan'), float('nan')]
385        return sar_config, sar_powers
386
387    def get_sar_power_from_csv(self, testcase_params):
388        """Function to get current expected SAR power from CSV.
389
390        This functions gets the expected SAR TX power from the DUT NVRAM data.
391        The SAR power is looked up based on the current channel and regulatory
392        domain,
393
394        Args:
395            testcase_params: dict containing channel, sar state, country code
396        Returns:
397            sar_config: current expected sar config
398            sar_powers: current expected sar powers
399        """
400
401        if testcase_params['country_code'] == 'US':
402            reg_domain = 'fcc'
403        elif testcase_params['country_code'] == 'JP':
404            reg_domain = 'jp'
405        elif testcase_params['country_code'] == 'CA':
406            reg_domain = 'ca'
407        else:
408            reg_domain = 'row'
409        for band, channels in self.BAND_TO_CHANNEL_MAP.items():
410            if testcase_params['channel'] in channels:
411                current_band = band[0]
412                sub_band_idx = band[1]
413                break
414        sar_config = (reg_domain, 'mimo', current_band)
415        if self.csv_sar_data:
416            sar_powers = [
417                self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
418                [sar_config][0][sub_band_idx - 1],
419                self.csv_sar_data[testcase_params['sar_state']]['SAR Powers']
420                [sar_config][1][sub_band_idx - 1]
421            ]
422        else:
423            sar_powers = [float('nan'), float('nan')]
424        return sar_config, sar_powers
425
426    def process_wl_curpower(self, wl_curpower_file, testcase_params):
427        """Function to parse wl_curpower output.
428
429        Args:
430            wl_curpower_file: path to curpower output file.
431            testcase_params: dict containing channel, sar state, country code
432        Returns:
433            wl_curpower_dict: dict formatted version of curpower data.
434        """
435
436        with open(wl_curpower_file, 'r') as file:
437            wl_curpower_out = file.read()
438
439        channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)')
440        bandwidth_regex = re.compile(
441            r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n')
442
443        channel = int(
444            re.search(channel_regex, wl_curpower_out).group('channel'))
445        bandwidth = int(
446            re.search(bandwidth_regex, wl_curpower_out).group('bandwidth'))
447
448        regulatory_limits = self.generate_regulatory_table(
449            wl_curpower_out, channel, bandwidth)
450        board_limits = self.generate_board_limit_table(wl_curpower_out,
451                                                       channel, bandwidth)
452        wl_curpower_dict = {
453            'channel': channel,
454            'bandwidth': bandwidth,
455            'country': testcase_params['country_code'],
456            'regulatory_limits': regulatory_limits,
457            'board_limits': board_limits
458        }
459        return wl_curpower_dict
460
461    def generate_regulatory_table(self, wl_curpower_out, channel, bw):
462        """"Helper function to generate regulatory limit table from curpower.
463
464        Args:
465            wl_curpower_out: curpower output
466            channel: current channel
467            bw: current bandwidth
468        Returns:
469            regulatory_table: dict with regulatory limits for current config
470        """
471
472        regulatory_group_map = {
473            'DSSS':
474            [('CCK', rate, 1)
475             for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]],
476            'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [
477                '{}Mbps'.format(mbps)
478                for mbps in [6, 9, 12, 18, 24, 36, 48, 54]
479            ]],
480            'MCS0_7_CDD1':
481            [(mode, rate, 1)
482             for (mode,
483                  rate) in itertools.product(['HT' + str(bw), 'VHT' +
484                                              str(bw)], range(0, 8))],
485            'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1),
486                               ('VHT' + str(bw), 9, 1)],
487            'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1),
488                                 ('VHT' + str(bw), 11, 1)],
489            'MCS8_15':
490            [(mode, rate - 8 * ('VHT' in mode), 2)
491             for (mode,
492                  rate) in itertools.product(['HT' + str(bw), 'VHT' +
493                                              str(bw)], range(8, 16))],
494            'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)],
495            'VHT10_11SS2': [('VHT' + str(bw), 10, 2),
496                            ('VHT' + str(bw), 11, 2)],
497            'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1)
498                                for rate in range(0, 12)],
499            'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2)
500                              for rate in range(0, 12)],
501        }
502        tx_power_regex = re.compile(
503            '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
504        )
505
506        regulatory_section_regex = re.compile(
507            r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:')
508        regulatory_list = re.search(regulatory_section_regex,
509                                    wl_curpower_out).group('regulatory_limits')
510        regulatory_list = re.findall(tx_power_regex, regulatory_list)
511        regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list}
512
513        bw_index = int(math.log(bw / 10, 2)) - 1
514        regulatory_table = collections.OrderedDict()
515        for regulatory_group, rates in regulatory_group_map.items():
516            for rate in rates:
517                reg_power = regulatory_dict.get(regulatory_group,
518                                                ['0', '0', '0', '0'])[bw_index]
519                regulatory_table[rate] = float(
520                    reg_power) if reg_power != '-' else 0
521        return regulatory_table
522
523    def generate_board_limit_table(self, wl_curpower_out, channel, bw):
524        """"Helper function to generate board limit table from curpower.
525
526        Args:
527            wl_curpower_out: curpower output
528            channel: current channel
529            bw: current bandwidth
530        Returns:
531            board_limit_table: dict with board limits for current config
532        """
533
534        tx_power_regex = re.compile(
535            '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)'
536        )
537
538        board_section_regex = re.compile(
539            r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:')
540        board_limits_list = re.search(board_section_regex,
541                                      wl_curpower_out).group('board_limits')
542        board_limits_list = re.findall(tx_power_regex, board_limits_list)
543        board_limits_dict = {
544            entry[0]: entry[2:]
545            for entry in board_limits_list
546        }
547
548        mcs_regex_list = [[
549            re.compile('DSSS'),
550            [('CCK', rate, 1)
551             for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]]
552        ], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]],
553                          [
554                              re.compile('MCS(?P<mcs>[0-7])_CDD1'),
555                              [('HT{}'.format(bw), '{}', 1),
556                               ('VHT{}'.format(bw), '{}', 1)]
557                          ],
558                          [
559                              re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'),
560                              [('VHT{}'.format(bw), '{}', 1)]
561                          ],
562                          [
563                              re.compile('VHT10_11SS1_CDD1'),
564                              [('VHT{}'.format(bw), '10', 1),
565                               ('VHT{}'.format(bw), '11', 1)]
566                          ],
567                          [
568                              re.compile('MCS(?P<mcs>[0-9]{2})'),
569                              [('HT{}'.format(bw), '{}', 2)]
570                          ],
571                          [
572                              re.compile('VHT(?P<mcs>[0-9])SS2'),
573                              [('VHT{}'.format(bw), '{}', 2)]
574                          ],
575                          [
576                              re.compile('VHT10_11SS2'),
577                              [('VHT{}'.format(bw), '10', 2),
578                               ('VHT{}'.format(bw), '11', 2)]
579                          ],
580                          [
581                              re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'),
582                              [('HE{}'.format(bw), '{}', 1)]
583                          ],
584                          [
585                              re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'),
586                              [('HE{}'.format(bw), '{}', 2)]
587                          ]]
588
589        bw_index = int(math.log(bw / 10, 2)) - 1
590        board_limit_table = collections.OrderedDict()
591        for mcs, board_limit in board_limits_dict.items():
592            for mcs_regex_tuple in mcs_regex_list:
593                mcs_match = re.match(mcs_regex_tuple[0], mcs)
594                if mcs_match:
595                    for possible_mcs in mcs_regex_tuple[1]:
596                        try:
597                            curr_mcs = (possible_mcs[0],
598                                        possible_mcs[1].format(
599                                            mcs_match.group('mcs')),
600                                        possible_mcs[2])
601                        except:
602                            curr_mcs = (possible_mcs[0], possible_mcs[1],
603                                        possible_mcs[2])
604                        board_limit_table[curr_mcs] = float(
605                            board_limit[bw_index]
606                        ) if board_limit[bw_index] != '-' else 0
607                    break
608        return board_limit_table
609
610    def pass_fail_check(self, result):
611        """Function to evaluate if current TX powqe matches CSV/NVRAM settings.
612
613        This function assesses whether the current TX power reported by the
614        DUT matches the powers programmed in NVRAM and CSV after applying the
615        correct TX power backoff used to account for CLPC errors.
616        """
617
618        if isinstance(result['testcase_params']['channel'],
619                      str) and '6g' in result['testcase_params']['channel']:
620            mode = 'HE' + str(result['testcase_params']['bandwidth'])
621        else:
622            mode = 'HE' + str(result['testcase_params']['bandwidth'])
623        regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0,
624                                                                       2)]
625        board_power = result['wl_curpower']['board_limits'][(mode, str(0), 2)]
626        # try:
627        sar_config, nvram_powers = self.get_sar_power_from_nvram(
628            result['testcase_params'])
629        # except:
630        #     nvram_powers = [99, 99]
631        #     sar_config = 'SAR DISABLED'
632        try:
633            csv_config, csv_powers = self.get_sar_power_from_csv(
634                result['testcase_params'])
635        except:
636            #get from wl_curpower
637            csv_powers = [99, 99]
638        self.log.info("SAR state: {} ({})".format(
639            result['testcase_params']['sar_state'],
640            self.sar_state_mapping[result['testcase_params']['sar_state']],
641        ))
642        self.log.info("Country Code: {}".format(
643            result['testcase_params']['country_code']))
644        self.log.info('BRCM SAR Table: {}'.format(sar_config))
645        expected_power = [
646            min([csv_powers[0], regulatory_power, board_power]) - 1.5,
647            min([csv_powers[1], regulatory_power, board_power]) - 1.5
648        ]
649        power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Board Power: {}, Expected Powers: {}, Reported Powers: {}".format(
650            nvram_powers, csv_powers, [regulatory_power] * 2,
651            [board_power] * 2, expected_power, result['tx_powers'])
652        max_error = max([
653            abs(expected_power[idx] - result['tx_powers'][idx])
654            for idx in [0, 1]
655        ])
656        if max_error > 1:
657            asserts.fail(power_str)
658        else:
659            asserts.explicit_pass(power_str)
660
661    def process_testclass_results(self):
662        pass
663
664    def run_tx_power_test(self, testcase_params):
665        """Main function to test tx power.
666
667        The function sets up the AP & DUT in the correct channel and mode
668        configuration, starts ping traffic and queries the current TX power.
669
670        Args:
671            testcase_params: dict containing all test parameters
672        Returns:
673            test_result: dict containing ping results and other meta data
674        """
675        # Prepare results dict
676        llstats_obj = wputils.LinkLayerStats(
677            self.dut, self.testclass_params.get('llstats_enabled', True))
678        test_result = collections.OrderedDict()
679        test_result['testcase_params'] = testcase_params.copy()
680        test_result['test_name'] = self.current_test_name
681        test_result['ap_config'] = self.access_point.ap_settings.copy()
682        test_result['attenuation'] = testcase_params['atten_range']
683        test_result['fixed_attenuation'] = self.testbed_params[
684            'fixed_attenuation'][str(testcase_params['channel'])]
685        test_result['rssi_results'] = []
686        test_result['ping_results'] = []
687        test_result['llstats'] = []
688        # Setup sniffer
689        if self.testbed_params['sniffer_enable']:
690            self.sniffer.start_capture(
691                testcase_params['test_network'],
692                chan=testcase_params['channel'],
693                bw=testcase_params['bandwidth'],
694                duration=testcase_params['ping_duration'] *
695                len(testcase_params['atten_range']) + self.TEST_TIMEOUT)
696        # Set sar state
697        if testcase_params['sar_state'] == -1:
698            self.dut.adb.shell('halutil -sar disable')
699        else:
700            self.dut.adb.shell('halutil -sar enable {}'.format(
701                testcase_params['sar_state']))
702        # Run ping and sweep attenuation as needed
703        self.log.info('Starting ping.')
704        thread_future = wputils.get_ping_stats_nb(self.ping_server,
705                                                  self.dut_ip, 10, 0.02, 64)
706
707        for atten in testcase_params['atten_range']:
708            for attenuator in self.attenuators:
709                attenuator.set_atten(atten, strict=False, retry=True)
710            # Set mcs
711            if isinstance(testcase_params['channel'],
712                          int) and testcase_params['channel'] < 13:
713                self.dut.adb.shell('wl 2g_rate -e 0 -s 2 -b {}'.format(
714                    testcase_params['bandwidth']))
715            elif isinstance(testcase_params['channel'],
716                            int) and testcase_params['channel'] > 13:
717                self.dut.adb.shell('wl 5g_rate -e 0 -s 2 -b {}'.format(
718                    testcase_params['bandwidth']))
719            else:
720                self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format(
721                    testcase_params['bandwidth']))
722            # Refresh link layer stats
723            llstats_obj.update_stats()
724            # Check sar state
725            self.log.info('Current Country: {}'.format(
726                self.dut.adb.shell('wl country')))
727            # Dump last est power multiple times
728            chain_0_power = []
729            chain_1_power = []
730            for idx in range(30):
731                last_est_out = self.dut.adb.shell(
732                    "wl curpower | grep 'Last est. power'", ignore_status=True)
733                if "Last est. power" in last_est_out:
734                    try:
735                        per_chain_powers = last_est_out.split(
736                            ':')[1].strip().split('  ')
737                        per_chain_powers = [
738                            float(power) for power in per_chain_powers
739                        ]
740                    except:
741                        per_chain_powers = [0, 0]
742                        self.log.warning(
743                            'Could not parse output: {}'.format(last_est_out))
744                    self.log.info(
745                        'Current Tx Powers = {}'.format(per_chain_powers))
746                    if per_chain_powers[0] > 0:
747                        chain_0_power.append(per_chain_powers[0])
748                    if per_chain_powers[1] > 0:
749                        chain_1_power.append(per_chain_powers[1])
750                time.sleep(0.25)
751            # Check if empty
752            if len(chain_0_power) == 0 or len(chain_1_power) == 0:
753                test_result['tx_powers'] = [0, 0]
754                tx_power_frequency = [100, 100]
755            else:
756                test_result['tx_powers'] = [
757                    scipy.stats.mode(chain_0_power).mode[0],
758                    scipy.stats.mode(chain_1_power).mode[0]
759                ]
760                tx_power_frequency = [
761                    100 * scipy.stats.mode(chain_0_power).count[0] /
762                    len(chain_0_power),
763                    100 * scipy.stats.mode(chain_1_power).count[0] /
764                    len(chain_0_power)
765                ]
766            self.log.info(
767                'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'.
768                format(test_result['tx_powers'], tx_power_frequency[0],
769                       tx_power_frequency[1]))
770            llstats_obj.update_stats()
771            curr_llstats = llstats_obj.llstats_incremental.copy()
772            test_result['llstats'].append(curr_llstats)
773            # DUMP wl curpower one
774            try:
775                wl_curpower = self.dut.adb.shell('wl curpower')
776            except:
777                time.sleep(0.25)
778                wl_curpower = self.dut.adb.shell('wl curpower',
779                                                 ignore_status=True)
780            current_context = context.get_current_context(
781            ).get_full_output_path()
782            wl_curpower_path = os.path.join(current_context,
783                                            'wl_curpower_output')
784            with open(wl_curpower_path, 'w') as file:
785                file.write(wl_curpower)
786            wl_curpower_dict = self.process_wl_curpower(
787                wl_curpower_path, testcase_params)
788            wl_curpower_path = os.path.join(current_context,
789                                            'wl_curpower_dict')
790            with open(wl_curpower_path, 'w') as file:
791                json.dump(wputils.serialize_dict(wl_curpower_dict),
792                          file,
793                          indent=4)
794            test_result['wl_curpower'] = wl_curpower_dict
795        thread_future.result()
796        if self.testbed_params['sniffer_enable']:
797            self.sniffer.stop_capture()
798        return test_result
799
800    def setup_ap(self, testcase_params):
801        """Sets up the access point in the configuration required by the test.
802
803        Args:
804            testcase_params: dict containing AP and other test params
805        """
806        band = self.access_point.band_lookup_by_channel(
807            testcase_params['channel'])
808        if '6G' in band:
809            frequency = wutils.WifiEnums.channel_6G_to_freq[int(
810                testcase_params['channel'].strip('6g'))]
811        else:
812            if testcase_params['channel'] < 13:
813                frequency = wutils.WifiEnums.channel_2G_to_freq[
814                    testcase_params['channel']]
815            else:
816                frequency = wutils.WifiEnums.channel_5G_to_freq[
817                    testcase_params['channel']]
818        if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
819            self.access_point.set_region(self.testbed_params['DFS_region'])
820        else:
821            self.access_point.set_region(self.testbed_params['default_region'])
822        self.access_point.set_channel_and_bandwidth(band,
823                                                    testcase_params['channel'],
824                                                    testcase_params['mode'])
825        #self.access_point.set_channel(band, testcase_params['channel'])
826        #self.access_point.set_bandwidth(band, testcase_params['mode'])
827        if 'low' in testcase_params['ap_power']:
828            self.log.info('Setting low AP power.')
829            self.access_point.set_power(
830                band, self.testclass_params['low_ap_tx_power'])
831        self.log.info('Access Point Configuration: {}'.format(
832            self.access_point.ap_settings))
833
834    def setup_dut(self, testcase_params):
835        """Sets up the DUT in the configuration required by the test.
836
837        Args:
838            testcase_params: dict containing AP and other test params
839        """
840        # Turn screen off to preserve battery
841        if self.testbed_params.get('screen_on',
842                                   False) or self.testclass_params.get(
843                                       'screen_on', False):
844            self.dut.droid.wakeLockAcquireDim()
845        else:
846            self.dut.go_to_sleep()
847        if wputils.validate_network(self.dut,
848                                    testcase_params['test_network']['SSID']):
849            current_country = self.dut.adb.shell('wl country')
850            self.log.info('Current country code: {}'.format(current_country))
851            if testcase_params['country_code'] in current_country:
852                self.log.info('Already connected to desired network')
853                self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses(
854                    'wlan0')[0]
855                return
856        testcase_params['test_network']['channel'] = testcase_params['channel']
857        wutils.wifi_toggle_state(self.dut, False)
858        wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
859        wutils.wifi_toggle_state(self.dut, True)
860        wutils.reset_wifi(self.dut)
861        if self.testbed_params.get('txbf_off', False):
862            wputils.disable_beamforming(self.dut)
863        wutils.set_wifi_country_code(self.dut, testcase_params['country_code'])
864        current_country = self.dut.adb.shell('wl country')
865        self.log.info('Current country code: {}'.format(current_country))
866        if testcase_params['country_code'] not in current_country:
867            asserts.fail('Country code not correct.')
868        chan_list = self.dut.adb.shell('wl chan_info_list')
869        if str(testcase_params['channel']) not in chan_list:
870            asserts.skip('Channel {} not supported in {}'.format(
871                testcase_params['channel'], testcase_params['country_code']))
872        wutils.wifi_connect(self.dut,
873                            testcase_params['test_network'],
874                            num_of_tries=5,
875                            check_connectivity=True)
876        self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
877
878    def setup_tx_power_test(self, testcase_params):
879        """Function that gets devices ready for the test.
880
881        Args:
882            testcase_params: dict containing test-specific parameters
883        """
884        # Configure AP
885        self.setup_ap(testcase_params)
886        # Set attenuator to 0 dB
887        for attenuator in self.attenuators:
888            attenuator.set_atten(0, strict=False, retry=True)
889        # Reset, configure, and connect DUT
890        self.setup_dut(testcase_params)
891
892    def check_skip_conditions(self, testcase_params):
893        """Checks if test should be skipped."""
894        # Check battery level before test
895        if not wputils.health_check(self.dut, 10):
896            asserts.skip('DUT battery level too low.')
897        if testcase_params[
898                'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported(
899                ):
900            asserts.skip('DUT does not support 6 GHz band.')
901        if not self.access_point.band_lookup_by_channel(
902                testcase_params['channel']):
903            asserts.skip('AP does not support requested channel.')
904
905    def compile_test_params(self, testcase_params):
906        """Function to compile all testcase parameters."""
907
908        self.check_skip_conditions(testcase_params)
909
910        band = self.access_point.band_lookup_by_channel(
911            testcase_params['channel'])
912        testcase_params['test_network'] = self.main_network[band]
913        testcase_params['attenuated_chain'] = -1
914        testcase_params.update(
915            ping_interval=self.testclass_params['ping_interval'],
916            ping_duration=self.testclass_params['ping_duration'],
917            ping_size=self.testclass_params['ping_size'],
918        )
919
920        testcase_params['atten_range'] = [0]
921        return testcase_params
922
923    def _test_ping(self, testcase_params):
924        """ Function that gets called for each range test case
925
926        The function gets called in each range test case. It customizes the
927        range test based on the test name of the test that called it
928
929        Args:
930            testcase_params: dict containing preliminary set of parameters
931        """
932        # Compile test parameters from config and test name
933        testcase_params = self.compile_test_params(testcase_params)
934        # Run ping test
935        self.setup_tx_power_test(testcase_params)
936        result = self.run_tx_power_test(testcase_params)
937        self.pass_fail_check(result)
938
939    def generate_test_cases(self, ap_power, channels, modes, test_types,
940                            country_codes, sar_states):
941        """Function that auto-generates test cases for a test class."""
942        test_cases = []
943        allowed_configs = {
944            20: [
945                1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100,
946                116, 132, 140, 149, 153, 157, 161
947            ],
948            40: [36, 44, 100, 149, 157],
949            80: [36, 100, 149],
950            160: [36, '6g37', '6g117', '6g213']
951        }
952
953        for channel, mode, test_type, country_code, sar_state in itertools.product(
954                channels, modes, test_types, country_codes, sar_states):
955            bandwidth = int(''.join([x for x in mode if x.isdigit()]))
956            if channel not in allowed_configs[bandwidth]:
957                continue
958            testcase_name = '{}_ch{}_{}_{}_sar_{}'.format(
959                test_type, channel, mode, country_code, sar_state)
960            testcase_params = collections.OrderedDict(
961                test_type=test_type,
962                ap_power=ap_power,
963                channel=channel,
964                mode=mode,
965                bandwidth=bandwidth,
966                country_code=country_code,
967                sar_state=sar_state)
968            setattr(self, testcase_name,
969                    partial(self._test_ping, testcase_params))
970            test_cases.append(testcase_name)
971        return test_cases
972
973
974class WifiTxPowerCheck_BasicSAR_Test(WifiTxPowerCheckTest):
975
976    def __init__(self, controllers):
977        base_test.BaseTestClass.__init__(self, controllers)
978        self.testcase_metric_logger = (
979            BlackboxMappedMetricLogger.for_test_case())
980        self.testclass_metric_logger = (
981            BlackboxMappedMetricLogger.for_test_class())
982        self.publish_testcase_metrics = True
983        self.tests = self.generate_test_cases(
984            ap_power='standard',
985            channels=[6, 36, 52, 100, 149, '6g37'],
986            modes=['bw20', 'bw80', 'bw160'],
987            test_types=[
988                'test_tx_power',
989            ],
990            country_codes=['US', 'GB', 'JP', 'CA'],
991            sar_states=[-1, 0, 1, 2, 3, 4])
992