1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import csv 19import itertools 20import json 21import logging 22import math 23import os 24import re 25import scipy.stats 26import time 27from acts import asserts 28from acts import context 29from acts import base_test 30from acts import utils 31from acts.controllers.utils_lib import ssh 32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger 33from acts_contrib.test_utils.wifi import ota_sniffer 34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils 35from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap 36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 37from functools import partial 38 39 40class WifiTxPowerCheckTest(base_test.BaseTestClass): 41 """Class for ping-based Wifi performance tests. 42 43 This class implements WiFi ping performance tests such as range and RTT. 44 The class setups up the AP in the desired configurations, configures 45 and connects the phone to the AP, and runs For an example config file to 46 run this test class see example_connectivity_performance_ap_sta.json. 47 """ 48 49 TEST_TIMEOUT = 10 50 RSSI_POLL_INTERVAL = 0.2 51 SHORT_SLEEP = 1 52 MED_SLEEP = 5 53 MAX_CONSECUTIVE_ZEROS = 5 54 DISCONNECTED_PING_RESULT = { 55 'connected': 0, 56 'rtt': [], 57 'time_stamp': [], 58 'ping_interarrivals': [], 59 'packet_loss_percentage': 100 60 } 61 62 BRCM_SAR_MAPPING = { 63 0: 'disable', 64 1: 'head', 65 2: 'grip', 66 16: 'bt', 67 32: 'hotspot' 68 } 69 70 BAND_TO_CHANNEL_MAP = { 71 ('2g', 1): [1, 6, 11], 72 ('5g', 1): [36, 40, 44, 48], 73 ('5g', 2): [52, 56, 60, 64], 74 ('5g', 3): range(100, 148, 4), 75 ('5g', 4): [149, 153, 157, 161], 76 ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)], 77 ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)], 78 ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)], 79 ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)], 80 ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)], 81 ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)] 82 } 83 84 def __init__(self, controllers): 85 base_test.BaseTestClass.__init__(self, controllers) 86 self.testcase_metric_logger = ( 87 BlackboxMappedMetricLogger.for_test_case()) 88 self.testclass_metric_logger = ( 89 BlackboxMappedMetricLogger.for_test_class()) 90 self.publish_testcase_metrics = True 91 self.tests = self.generate_test_cases( 92 ap_power='standard', 93 channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'], 94 modes=['bw20', 'bw40', 'bw80', 'bw160'], 95 test_types=[ 96 'test_tx_power', 97 ], 98 country_codes=['US', 'GB', 'JP', 'CA', 'AU'], 99 sar_states=range(-1, 13)) 100 101 def setup_class(self): 102 self.dut = self.android_devices[-1] 103 req_params = [ 104 'tx_power_test_params', 'testbed_params', 'main_network', 105 'RetailAccessPoints', 'RemoteServer' 106 ] 107 opt_params = ['OTASniffer'] 108 self.unpack_userparams(req_params, opt_params) 109 self.testclass_params = self.tx_power_test_params 110 self.num_atten = self.attenuators[0].instrument.num_atten 111 self.ping_server = ssh.connection.SshConnection( 112 ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) 113 self.access_point = retail_ap.create(self.RetailAccessPoints)[0] 114 if hasattr(self, 115 'OTASniffer') and self.testbed_params['sniffer_enable']: 116 try: 117 self.sniffer = ota_sniffer.create(self.OTASniffer)[0] 118 except: 119 self.log.warning('Could not start sniffer. Disabling sniffs.') 120 self.testbed_params['sniffer_enable'] = 0 121 self.log.info('Access Point Configuration: {}'.format( 122 self.access_point.ap_settings)) 123 self.log_path = os.path.join(logging.log_path, 'results') 124 os.makedirs(self.log_path, exist_ok=True) 125 self.atten_dut_chain_map = {} 126 self.testclass_results = [] 127 128 # Turn WiFi ON 129 if self.testclass_params.get('airplane_mode', 1): 130 self.log.info('Turning on airplane mode.') 131 asserts.assert_true(utils.force_airplane_mode(self.dut, True), 132 'Can not turn on airplane mode.') 133 wutils.wifi_toggle_state(self.dut, True) 134 self.dut.droid.wifiEnableVerboseLogging(1) 135 asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1, 136 "Failed to enable WiFi verbose logging.") 137 138 # decode nvram 139 try: 140 self.nvram_sar_data = self.read_nvram_sar_data() 141 except: 142 self.nvram_sar_data = None 143 if 'sar_csv' in self.testclass_params: 144 self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv']) 145 else: 146 self.csv_sar_data = None 147 148 # Configure test retries 149 self.user_params['retry_tests'] = [self.__class__.__name__] 150 151 def teardown_class(self): 152 # Turn WiFi OFF and reset AP 153 self.access_point.teardown() 154 for dev in self.android_devices: 155 wutils.wifi_toggle_state(dev, False) 156 dev.go_to_sleep() 157 self.process_testclass_results() 158 159 def setup_test(self): 160 self.retry_flag = False 161 162 def teardown_test(self): 163 self.retry_flag = False 164 165 def on_retry(self): 166 """Function to control test logic on retried tests. 167 168 This function is automatically executed on tests that are being 169 retried. In this case the function resets wifi, toggles it off and on 170 and sets a retry_flag to enable further tweaking the test logic on 171 second attempts. 172 """ 173 self.retry_flag = True 174 for dev in self.android_devices: 175 wutils.reset_wifi(dev) 176 wutils.toggle_wifi_off_and_on(dev) 177 178 def read_sar_csv(self, sar_csv): 179 """Reads SAR powers from CSV. 180 181 This function reads SAR powers from a CSV and generate a dictionary 182 with all programmed TX powers on a per band and regulatory domain 183 basis. 184 185 Args: 186 sar_csv: path to SAR data file. 187 Returns: 188 sar_powers: dict containing all SAR data 189 """ 190 191 sar_powers = {} 192 sar_csv_data = [] 193 with open(sar_csv, mode='r') as f: 194 reader = csv.DictReader(f) 195 for row in reader: 196 row['Sub-band Powers'] = [ 197 float(val) for key, val in row.items() 198 if 'Sub-band' in key and val != '' 199 ] 200 sar_csv_data.append(row) 201 202 for row in sar_csv_data: 203 sar_powers.setdefault(int(row['Scenario Index']), {}) 204 sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {}) 205 sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band']) 206 sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault( 207 sar_row_key, {}) 208 sar_powers[int( 209 row['Scenario Index'])]['SAR Powers'][sar_row_key][int( 210 row['Chain'])] = row['Sub-band Powers'] 211 return sar_powers 212 213 def read_nvram_sar_data(self): 214 """Reads SAR powers from NVRAM. 215 216 This function reads SAR powers from the NVRAM found on the DUT and 217 generates a dictionary with all programmed TX powers on a per band and 218 regulatory domain basis. NThe NVRAM file is chosen based on the build, 219 but if no NVRAM file is found matching the expected name, the default 220 NVRAM will be loaded. The choice of NVRAM is not guaranteed to be 221 correct. 222 223 Returns: 224 nvram_sar_data: dict containing all SAR data 225 """ 226 227 self._read_sar_config_info() 228 try: 229 hardware_version = self.dut.adb.shell( 230 'getprop ro.boot.hardware.revision') 231 nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format( 232 hardware_version) 233 nvram = self.dut.adb.shell('cat {}'.format(nvram_path)) 234 except: 235 nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal') 236 current_context = context.get_current_context().get_full_output_path() 237 file_path = os.path.join(current_context, 'nvram_file') 238 with open(file_path, 'w') as file: 239 file.write(nvram) 240 nvram_sar_data = {} 241 for line in nvram.splitlines(): 242 if 'dynsar' in line: 243 sar_config, sar_powers = self._parse_nvram_sar_line(line) 244 nvram_sar_data[sar_config] = sar_powers 245 file_path = os.path.join(current_context, 'nvram_sar_data') 246 with open(file_path, 'w') as file: 247 json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4) 248 249 return nvram_sar_data 250 251 def _read_sar_config_info(self): 252 """Function to read SAR scenario mapping, 253 254 This function reads sar_config.info file which contains the mapping 255 of SAR scenarios to NVRAM data tables. 256 """ 257 258 self.sar_state_mapping = collections.OrderedDict([(-2, { 259 "google_name": 260 'WIFI_POWER_SCENARIO_INVALID' 261 }), (-1, { 262 "google_name": 'WIFI_POWER_SCENARIO_DISABLE' 263 }), (0, { 264 "google_name": 'WIFI_POWER_SCENARIO_VOICE_CALL' 265 }), (1, { 266 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF' 267 }), (2, { 268 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON' 269 }), (3, { 270 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF' 271 }), (4, { 272 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON' 273 }), (5, { 274 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT' 275 }), (6, { 276 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT' 277 }), (7, { 278 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW' 279 }), (8, { 280 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT' 281 }), (9, { 282 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT' 283 }), (10, { 284 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT' 285 }), (11, { 286 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW' 287 }), (12, { 288 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW' 289 })]) 290 sar_config_path = '/vendor/firmware/sarconfig.info' 291 sar_config = self.dut.adb.shell( 292 'cat {}'.format(sar_config_path)).splitlines() 293 sar_config = [line.split(',') for line in sar_config] 294 sar_config = [[int(x) for x in line] for line in sar_config] 295 296 for sar_state in sar_config: 297 self.sar_state_mapping[sar_state[0]]['brcm_index'] = ( 298 self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2])) 299 current_context = context.get_current_context().get_full_output_path() 300 file_path = os.path.join(current_context, 'sarconfig') 301 with open(file_path, 'w') as file: 302 json.dump(wputils.serialize_dict(self.sar_state_mapping), 303 file, 304 indent=4) 305 306 def _parse_nvram_sar_line(self, sar_line): 307 """Helper function to decode SAR NVRAM data lines. 308 309 Args: 310 sar_line: single line of text from NVRAM file containing SAR data. 311 Returns: 312 sar_config: sar config referenced in this line 313 decoded_values: tx powers configured in this line 314 """ 315 316 sar_config = collections.OrderedDict() 317 list_of_countries = ['fcc', 'jp', 'ca'] 318 try: 319 sar_config['country'] = next(country 320 for country in list_of_countries 321 if country in sar_line.split('=')[0]) 322 except: 323 sar_config['country'] = 'row' 324 325 list_of_sar_states = ['grip', 'bt', 'hotspot'] 326 try: 327 sar_config['state'] = next(state for state in list_of_sar_states 328 if state in sar_line.split('=')[0]) 329 except: 330 sar_config['state'] = 'head' 331 332 list_of_bands = ['2g', '5g', '6g'] 333 sar_config['band'] = next(band for band in list_of_bands 334 if band in sar_line.split('=')[0]) 335 336 sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo' 337 sar_config['airplane_mode'] = '_2=' in sar_line 338 339 sar_powers = sar_line.split('=')[1].split(',') 340 decoded_powers = [] 341 for sar_power in sar_powers: 342 # Note that core 0 and 1 are flipped in the NVRAM entries 343 decoded_powers.append([ 344 (int(sar_power[4:], 16) & int('7f', 16)) / 4, 345 (int(sar_power[2:4], 16) & int('7f', 16)) / 4 346 ]) 347 348 return tuple(sar_config.values()), decoded_powers 349 350 def get_sar_power_from_nvram(self, testcase_params): 351 """Function to get current expected SAR power from nvram 352 353 This functions gets the expected SAR TX power from the DUT NVRAM data. 354 The SAR power is looked up based on the current channel and regulatory 355 domain, 356 357 Args: 358 testcase_params: dict containing channel, sar state, country code 359 Returns: 360 sar_config: current expected sar config 361 sar_powers: current expected sar powers 362 """ 363 364 if testcase_params['country_code'] == 'US': 365 reg_domain = 'fcc' 366 elif testcase_params['country_code'] == 'JP': 367 reg_domain = 'jp' 368 elif testcase_params['country_code'] == 'CA': 369 reg_domain = 'ca' 370 else: 371 reg_domain = 'row' 372 for band, channels in self.BAND_TO_CHANNEL_MAP.items(): 373 if testcase_params['channel'] in channels: 374 current_band = band[0] 375 sub_band_idx = band[1] 376 break 377 sar_config = (reg_domain, self.sar_state_mapping[ 378 testcase_params['sar_state']]['brcm_index'][0], current_band, 379 'mimo', self.sar_state_mapping[ 380 testcase_params['sar_state']]['brcm_index'][1]) 381 if self.nvram_sar_data: 382 sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1] 383 else: 384 sar_powers = [float('nan'), float('nan')] 385 return sar_config, sar_powers 386 387 def get_sar_power_from_csv(self, testcase_params): 388 """Function to get current expected SAR power from CSV. 389 390 This functions gets the expected SAR TX power from the DUT NVRAM data. 391 The SAR power is looked up based on the current channel and regulatory 392 domain, 393 394 Args: 395 testcase_params: dict containing channel, sar state, country code 396 Returns: 397 sar_config: current expected sar config 398 sar_powers: current expected sar powers 399 """ 400 401 if testcase_params['country_code'] == 'US': 402 reg_domain = 'fcc' 403 elif testcase_params['country_code'] == 'JP': 404 reg_domain = 'jp' 405 elif testcase_params['country_code'] == 'CA': 406 reg_domain = 'ca' 407 else: 408 reg_domain = 'row' 409 for band, channels in self.BAND_TO_CHANNEL_MAP.items(): 410 if testcase_params['channel'] in channels: 411 current_band = band[0] 412 sub_band_idx = band[1] 413 break 414 sar_config = (reg_domain, 'mimo', current_band) 415 if self.csv_sar_data: 416 sar_powers = [ 417 self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] 418 [sar_config][0][sub_band_idx - 1], 419 self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] 420 [sar_config][1][sub_band_idx - 1] 421 ] 422 else: 423 sar_powers = [float('nan'), float('nan')] 424 return sar_config, sar_powers 425 426 def process_wl_curpower(self, wl_curpower_file, testcase_params): 427 """Function to parse wl_curpower output. 428 429 Args: 430 wl_curpower_file: path to curpower output file. 431 testcase_params: dict containing channel, sar state, country code 432 Returns: 433 wl_curpower_dict: dict formatted version of curpower data. 434 """ 435 436 with open(wl_curpower_file, 'r') as file: 437 wl_curpower_out = file.read() 438 439 channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)') 440 bandwidth_regex = re.compile( 441 r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n') 442 443 channel = int( 444 re.search(channel_regex, wl_curpower_out).group('channel')) 445 bandwidth = int( 446 re.search(bandwidth_regex, wl_curpower_out).group('bandwidth')) 447 448 regulatory_limits = self.generate_regulatory_table( 449 wl_curpower_out, channel, bandwidth) 450 board_limits = self.generate_board_limit_table(wl_curpower_out, 451 channel, bandwidth) 452 wl_curpower_dict = { 453 'channel': channel, 454 'bandwidth': bandwidth, 455 'country': testcase_params['country_code'], 456 'regulatory_limits': regulatory_limits, 457 'board_limits': board_limits 458 } 459 return wl_curpower_dict 460 461 def generate_regulatory_table(self, wl_curpower_out, channel, bw): 462 """"Helper function to generate regulatory limit table from curpower. 463 464 Args: 465 wl_curpower_out: curpower output 466 channel: current channel 467 bw: current bandwidth 468 Returns: 469 regulatory_table: dict with regulatory limits for current config 470 """ 471 472 regulatory_group_map = { 473 'DSSS': 474 [('CCK', rate, 1) 475 for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]], 476 'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [ 477 '{}Mbps'.format(mbps) 478 for mbps in [6, 9, 12, 18, 24, 36, 48, 54] 479 ]], 480 'MCS0_7_CDD1': 481 [(mode, rate, 1) 482 for (mode, 483 rate) in itertools.product(['HT' + str(bw), 'VHT' + 484 str(bw)], range(0, 8))], 485 'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1), 486 ('VHT' + str(bw), 9, 1)], 487 'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1), 488 ('VHT' + str(bw), 11, 1)], 489 'MCS8_15': 490 [(mode, rate - 8 * ('VHT' in mode), 2) 491 for (mode, 492 rate) in itertools.product(['HT' + str(bw), 'VHT' + 493 str(bw)], range(8, 16))], 494 'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)], 495 'VHT10_11SS2': [('VHT' + str(bw), 10, 2), 496 ('VHT' + str(bw), 11, 2)], 497 'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1) 498 for rate in range(0, 12)], 499 'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2) 500 for rate in range(0, 12)], 501 } 502 tx_power_regex = re.compile( 503 '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)' 504 ) 505 506 regulatory_section_regex = re.compile( 507 r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:') 508 regulatory_list = re.search(regulatory_section_regex, 509 wl_curpower_out).group('regulatory_limits') 510 regulatory_list = re.findall(tx_power_regex, regulatory_list) 511 regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list} 512 513 bw_index = int(math.log(bw / 10, 2)) - 1 514 regulatory_table = collections.OrderedDict() 515 for regulatory_group, rates in regulatory_group_map.items(): 516 for rate in rates: 517 reg_power = regulatory_dict.get(regulatory_group, 518 ['0', '0', '0', '0'])[bw_index] 519 regulatory_table[rate] = float( 520 reg_power) if reg_power != '-' else 0 521 return regulatory_table 522 523 def generate_board_limit_table(self, wl_curpower_out, channel, bw): 524 """"Helper function to generate board limit table from curpower. 525 526 Args: 527 wl_curpower_out: curpower output 528 channel: current channel 529 bw: current bandwidth 530 Returns: 531 board_limit_table: dict with board limits for current config 532 """ 533 534 tx_power_regex = re.compile( 535 '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)' 536 ) 537 538 board_section_regex = re.compile( 539 r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:') 540 board_limits_list = re.search(board_section_regex, 541 wl_curpower_out).group('board_limits') 542 board_limits_list = re.findall(tx_power_regex, board_limits_list) 543 board_limits_dict = { 544 entry[0]: entry[2:] 545 for entry in board_limits_list 546 } 547 548 mcs_regex_list = [[ 549 re.compile('DSSS'), 550 [('CCK', rate, 1) 551 for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]] 552 ], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]], 553 [ 554 re.compile('MCS(?P<mcs>[0-7])_CDD1'), 555 [('HT{}'.format(bw), '{}', 1), 556 ('VHT{}'.format(bw), '{}', 1)] 557 ], 558 [ 559 re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'), 560 [('VHT{}'.format(bw), '{}', 1)] 561 ], 562 [ 563 re.compile('VHT10_11SS1_CDD1'), 564 [('VHT{}'.format(bw), '10', 1), 565 ('VHT{}'.format(bw), '11', 1)] 566 ], 567 [ 568 re.compile('MCS(?P<mcs>[0-9]{2})'), 569 [('HT{}'.format(bw), '{}', 2)] 570 ], 571 [ 572 re.compile('VHT(?P<mcs>[0-9])SS2'), 573 [('VHT{}'.format(bw), '{}', 2)] 574 ], 575 [ 576 re.compile('VHT10_11SS2'), 577 [('VHT{}'.format(bw), '10', 2), 578 ('VHT{}'.format(bw), '11', 2)] 579 ], 580 [ 581 re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'), 582 [('HE{}'.format(bw), '{}', 1)] 583 ], 584 [ 585 re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'), 586 [('HE{}'.format(bw), '{}', 2)] 587 ]] 588 589 bw_index = int(math.log(bw / 10, 2)) - 1 590 board_limit_table = collections.OrderedDict() 591 for mcs, board_limit in board_limits_dict.items(): 592 for mcs_regex_tuple in mcs_regex_list: 593 mcs_match = re.match(mcs_regex_tuple[0], mcs) 594 if mcs_match: 595 for possible_mcs in mcs_regex_tuple[1]: 596 try: 597 curr_mcs = (possible_mcs[0], 598 possible_mcs[1].format( 599 mcs_match.group('mcs')), 600 possible_mcs[2]) 601 except: 602 curr_mcs = (possible_mcs[0], possible_mcs[1], 603 possible_mcs[2]) 604 board_limit_table[curr_mcs] = float( 605 board_limit[bw_index] 606 ) if board_limit[bw_index] != '-' else 0 607 break 608 return board_limit_table 609 610 def pass_fail_check(self, result): 611 """Function to evaluate if current TX powqe matches CSV/NVRAM settings. 612 613 This function assesses whether the current TX power reported by the 614 DUT matches the powers programmed in NVRAM and CSV after applying the 615 correct TX power backoff used to account for CLPC errors. 616 """ 617 618 if isinstance(result['testcase_params']['channel'], 619 str) and '6g' in result['testcase_params']['channel']: 620 mode = 'HE' + str(result['testcase_params']['bandwidth']) 621 else: 622 mode = 'HE' + str(result['testcase_params']['bandwidth']) 623 regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0, 624 2)] 625 board_power = result['wl_curpower']['board_limits'][(mode, str(0), 2)] 626 # try: 627 sar_config, nvram_powers = self.get_sar_power_from_nvram( 628 result['testcase_params']) 629 # except: 630 # nvram_powers = [99, 99] 631 # sar_config = 'SAR DISABLED' 632 try: 633 csv_config, csv_powers = self.get_sar_power_from_csv( 634 result['testcase_params']) 635 except: 636 #get from wl_curpower 637 csv_powers = [99, 99] 638 self.log.info("SAR state: {} ({})".format( 639 result['testcase_params']['sar_state'], 640 self.sar_state_mapping[result['testcase_params']['sar_state']], 641 )) 642 self.log.info("Country Code: {}".format( 643 result['testcase_params']['country_code'])) 644 self.log.info('BRCM SAR Table: {}'.format(sar_config)) 645 expected_power = [ 646 min([csv_powers[0], regulatory_power, board_power]) - 1.5, 647 min([csv_powers[1], regulatory_power, board_power]) - 1.5 648 ] 649 power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Board Power: {}, Expected Powers: {}, Reported Powers: {}".format( 650 nvram_powers, csv_powers, [regulatory_power] * 2, 651 [board_power] * 2, expected_power, result['tx_powers']) 652 max_error = max([ 653 abs(expected_power[idx] - result['tx_powers'][idx]) 654 for idx in [0, 1] 655 ]) 656 if max_error > 1: 657 asserts.fail(power_str) 658 else: 659 asserts.explicit_pass(power_str) 660 661 def process_testclass_results(self): 662 pass 663 664 def run_tx_power_test(self, testcase_params): 665 """Main function to test tx power. 666 667 The function sets up the AP & DUT in the correct channel and mode 668 configuration, starts ping traffic and queries the current TX power. 669 670 Args: 671 testcase_params: dict containing all test parameters 672 Returns: 673 test_result: dict containing ping results and other meta data 674 """ 675 # Prepare results dict 676 llstats_obj = wputils.LinkLayerStats( 677 self.dut, self.testclass_params.get('llstats_enabled', True)) 678 test_result = collections.OrderedDict() 679 test_result['testcase_params'] = testcase_params.copy() 680 test_result['test_name'] = self.current_test_name 681 test_result['ap_config'] = self.access_point.ap_settings.copy() 682 test_result['attenuation'] = testcase_params['atten_range'] 683 test_result['fixed_attenuation'] = self.testbed_params[ 684 'fixed_attenuation'][str(testcase_params['channel'])] 685 test_result['rssi_results'] = [] 686 test_result['ping_results'] = [] 687 test_result['llstats'] = [] 688 # Setup sniffer 689 if self.testbed_params['sniffer_enable']: 690 self.sniffer.start_capture( 691 testcase_params['test_network'], 692 chan=testcase_params['channel'], 693 bw=testcase_params['bandwidth'], 694 duration=testcase_params['ping_duration'] * 695 len(testcase_params['atten_range']) + self.TEST_TIMEOUT) 696 # Set sar state 697 if testcase_params['sar_state'] == -1: 698 self.dut.adb.shell('halutil -sar disable') 699 else: 700 self.dut.adb.shell('halutil -sar enable {}'.format( 701 testcase_params['sar_state'])) 702 # Run ping and sweep attenuation as needed 703 self.log.info('Starting ping.') 704 thread_future = wputils.get_ping_stats_nb(self.ping_server, 705 self.dut_ip, 10, 0.02, 64) 706 707 for atten in testcase_params['atten_range']: 708 for attenuator in self.attenuators: 709 attenuator.set_atten(atten, strict=False, retry=True) 710 # Set mcs 711 if isinstance(testcase_params['channel'], 712 int) and testcase_params['channel'] < 13: 713 self.dut.adb.shell('wl 2g_rate -e 0 -s 2 -b {}'.format( 714 testcase_params['bandwidth'])) 715 elif isinstance(testcase_params['channel'], 716 int) and testcase_params['channel'] > 13: 717 self.dut.adb.shell('wl 5g_rate -e 0 -s 2 -b {}'.format( 718 testcase_params['bandwidth'])) 719 else: 720 self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format( 721 testcase_params['bandwidth'])) 722 # Refresh link layer stats 723 llstats_obj.update_stats() 724 # Check sar state 725 self.log.info('Current Country: {}'.format( 726 self.dut.adb.shell('wl country'))) 727 # Dump last est power multiple times 728 chain_0_power = [] 729 chain_1_power = [] 730 for idx in range(30): 731 last_est_out = self.dut.adb.shell( 732 "wl curpower | grep 'Last est. power'", ignore_status=True) 733 if "Last est. power" in last_est_out: 734 try: 735 per_chain_powers = last_est_out.split( 736 ':')[1].strip().split(' ') 737 per_chain_powers = [ 738 float(power) for power in per_chain_powers 739 ] 740 except: 741 per_chain_powers = [0, 0] 742 self.log.warning( 743 'Could not parse output: {}'.format(last_est_out)) 744 self.log.info( 745 'Current Tx Powers = {}'.format(per_chain_powers)) 746 if per_chain_powers[0] > 0: 747 chain_0_power.append(per_chain_powers[0]) 748 if per_chain_powers[1] > 0: 749 chain_1_power.append(per_chain_powers[1]) 750 time.sleep(0.25) 751 # Check if empty 752 if len(chain_0_power) == 0 or len(chain_1_power) == 0: 753 test_result['tx_powers'] = [0, 0] 754 tx_power_frequency = [100, 100] 755 else: 756 test_result['tx_powers'] = [ 757 scipy.stats.mode(chain_0_power).mode[0], 758 scipy.stats.mode(chain_1_power).mode[0] 759 ] 760 tx_power_frequency = [ 761 100 * scipy.stats.mode(chain_0_power).count[0] / 762 len(chain_0_power), 763 100 * scipy.stats.mode(chain_1_power).count[0] / 764 len(chain_0_power) 765 ] 766 self.log.info( 767 'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'. 768 format(test_result['tx_powers'], tx_power_frequency[0], 769 tx_power_frequency[1])) 770 llstats_obj.update_stats() 771 curr_llstats = llstats_obj.llstats_incremental.copy() 772 test_result['llstats'].append(curr_llstats) 773 # DUMP wl curpower one 774 try: 775 wl_curpower = self.dut.adb.shell('wl curpower') 776 except: 777 time.sleep(0.25) 778 wl_curpower = self.dut.adb.shell('wl curpower', 779 ignore_status=True) 780 current_context = context.get_current_context( 781 ).get_full_output_path() 782 wl_curpower_path = os.path.join(current_context, 783 'wl_curpower_output') 784 with open(wl_curpower_path, 'w') as file: 785 file.write(wl_curpower) 786 wl_curpower_dict = self.process_wl_curpower( 787 wl_curpower_path, testcase_params) 788 wl_curpower_path = os.path.join(current_context, 789 'wl_curpower_dict') 790 with open(wl_curpower_path, 'w') as file: 791 json.dump(wputils.serialize_dict(wl_curpower_dict), 792 file, 793 indent=4) 794 test_result['wl_curpower'] = wl_curpower_dict 795 thread_future.result() 796 if self.testbed_params['sniffer_enable']: 797 self.sniffer.stop_capture() 798 return test_result 799 800 def setup_ap(self, testcase_params): 801 """Sets up the access point in the configuration required by the test. 802 803 Args: 804 testcase_params: dict containing AP and other test params 805 """ 806 band = self.access_point.band_lookup_by_channel( 807 testcase_params['channel']) 808 if '6G' in band: 809 frequency = wutils.WifiEnums.channel_6G_to_freq[int( 810 testcase_params['channel'].strip('6g'))] 811 else: 812 if testcase_params['channel'] < 13: 813 frequency = wutils.WifiEnums.channel_2G_to_freq[ 814 testcase_params['channel']] 815 else: 816 frequency = wutils.WifiEnums.channel_5G_to_freq[ 817 testcase_params['channel']] 818 if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: 819 self.access_point.set_region(self.testbed_params['DFS_region']) 820 else: 821 self.access_point.set_region(self.testbed_params['default_region']) 822 self.access_point.set_channel_and_bandwidth(band, 823 testcase_params['channel'], 824 testcase_params['mode']) 825 #self.access_point.set_channel(band, testcase_params['channel']) 826 #self.access_point.set_bandwidth(band, testcase_params['mode']) 827 if 'low' in testcase_params['ap_power']: 828 self.log.info('Setting low AP power.') 829 self.access_point.set_power( 830 band, self.testclass_params['low_ap_tx_power']) 831 self.log.info('Access Point Configuration: {}'.format( 832 self.access_point.ap_settings)) 833 834 def setup_dut(self, testcase_params): 835 """Sets up the DUT in the configuration required by the test. 836 837 Args: 838 testcase_params: dict containing AP and other test params 839 """ 840 # Turn screen off to preserve battery 841 if self.testbed_params.get('screen_on', 842 False) or self.testclass_params.get( 843 'screen_on', False): 844 self.dut.droid.wakeLockAcquireDim() 845 else: 846 self.dut.go_to_sleep() 847 if wputils.validate_network(self.dut, 848 testcase_params['test_network']['SSID']): 849 current_country = self.dut.adb.shell('wl country') 850 self.log.info('Current country code: {}'.format(current_country)) 851 if testcase_params['country_code'] in current_country: 852 self.log.info('Already connected to desired network') 853 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses( 854 'wlan0')[0] 855 return 856 testcase_params['test_network']['channel'] = testcase_params['channel'] 857 wutils.wifi_toggle_state(self.dut, False) 858 wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) 859 wutils.wifi_toggle_state(self.dut, True) 860 wutils.reset_wifi(self.dut) 861 if self.testbed_params.get('txbf_off', False): 862 wputils.disable_beamforming(self.dut) 863 wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) 864 current_country = self.dut.adb.shell('wl country') 865 self.log.info('Current country code: {}'.format(current_country)) 866 if testcase_params['country_code'] not in current_country: 867 asserts.fail('Country code not correct.') 868 chan_list = self.dut.adb.shell('wl chan_info_list') 869 if str(testcase_params['channel']) not in chan_list: 870 asserts.skip('Channel {} not supported in {}'.format( 871 testcase_params['channel'], testcase_params['country_code'])) 872 wutils.wifi_connect(self.dut, 873 testcase_params['test_network'], 874 num_of_tries=5, 875 check_connectivity=True) 876 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 877 878 def setup_tx_power_test(self, testcase_params): 879 """Function that gets devices ready for the test. 880 881 Args: 882 testcase_params: dict containing test-specific parameters 883 """ 884 # Configure AP 885 self.setup_ap(testcase_params) 886 # Set attenuator to 0 dB 887 for attenuator in self.attenuators: 888 attenuator.set_atten(0, strict=False, retry=True) 889 # Reset, configure, and connect DUT 890 self.setup_dut(testcase_params) 891 892 def check_skip_conditions(self, testcase_params): 893 """Checks if test should be skipped.""" 894 # Check battery level before test 895 if not wputils.health_check(self.dut, 10): 896 asserts.skip('DUT battery level too low.') 897 if testcase_params[ 898 'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported( 899 ): 900 asserts.skip('DUT does not support 6 GHz band.') 901 if not self.access_point.band_lookup_by_channel( 902 testcase_params['channel']): 903 asserts.skip('AP does not support requested channel.') 904 905 def compile_test_params(self, testcase_params): 906 """Function to compile all testcase parameters.""" 907 908 self.check_skip_conditions(testcase_params) 909 910 band = self.access_point.band_lookup_by_channel( 911 testcase_params['channel']) 912 testcase_params['test_network'] = self.main_network[band] 913 testcase_params['attenuated_chain'] = -1 914 testcase_params.update( 915 ping_interval=self.testclass_params['ping_interval'], 916 ping_duration=self.testclass_params['ping_duration'], 917 ping_size=self.testclass_params['ping_size'], 918 ) 919 920 testcase_params['atten_range'] = [0] 921 return testcase_params 922 923 def _test_ping(self, testcase_params): 924 """ Function that gets called for each range test case 925 926 The function gets called in each range test case. It customizes the 927 range test based on the test name of the test that called it 928 929 Args: 930 testcase_params: dict containing preliminary set of parameters 931 """ 932 # Compile test parameters from config and test name 933 testcase_params = self.compile_test_params(testcase_params) 934 # Run ping test 935 self.setup_tx_power_test(testcase_params) 936 result = self.run_tx_power_test(testcase_params) 937 self.pass_fail_check(result) 938 939 def generate_test_cases(self, ap_power, channels, modes, test_types, 940 country_codes, sar_states): 941 """Function that auto-generates test cases for a test class.""" 942 test_cases = [] 943 allowed_configs = { 944 20: [ 945 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100, 946 116, 132, 140, 149, 153, 157, 161 947 ], 948 40: [36, 44, 100, 149, 157], 949 80: [36, 100, 149], 950 160: [36, '6g37', '6g117', '6g213'] 951 } 952 953 for channel, mode, test_type, country_code, sar_state in itertools.product( 954 channels, modes, test_types, country_codes, sar_states): 955 bandwidth = int(''.join([x for x in mode if x.isdigit()])) 956 if channel not in allowed_configs[bandwidth]: 957 continue 958 testcase_name = '{}_ch{}_{}_{}_sar_{}'.format( 959 test_type, channel, mode, country_code, sar_state) 960 testcase_params = collections.OrderedDict( 961 test_type=test_type, 962 ap_power=ap_power, 963 channel=channel, 964 mode=mode, 965 bandwidth=bandwidth, 966 country_code=country_code, 967 sar_state=sar_state) 968 setattr(self, testcase_name, 969 partial(self._test_ping, testcase_params)) 970 test_cases.append(testcase_name) 971 return test_cases 972 973 974class WifiTxPowerCheck_BasicSAR_Test(WifiTxPowerCheckTest): 975 976 def __init__(self, controllers): 977 base_test.BaseTestClass.__init__(self, controllers) 978 self.testcase_metric_logger = ( 979 BlackboxMappedMetricLogger.for_test_case()) 980 self.testclass_metric_logger = ( 981 BlackboxMappedMetricLogger.for_test_class()) 982 self.publish_testcase_metrics = True 983 self.tests = self.generate_test_cases( 984 ap_power='standard', 985 channels=[6, 36, 52, 100, 149, '6g37'], 986 modes=['bw20', 'bw80', 'bw160'], 987 test_types=[ 988 'test_tx_power', 989 ], 990 country_codes=['US', 'GB', 'JP', 'CA'], 991 sar_states=[-1, 0, 1, 2, 3, 4]) 992