1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import csv 19import itertools 20import json 21import logging 22import math 23import os 24import re 25import scipy.stats 26import time 27from acts import asserts 28from acts import context 29from acts import base_test 30from acts import utils 31from acts.controllers.utils_lib import ssh 32from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger 33from acts_contrib.test_utils.wifi import ota_sniffer 34from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils 35from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap 36from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 37from functools import partial 38 39 40class WifiTxPowerCheckTest(base_test.BaseTestClass): 41 """Class for ping-based Wifi performance tests. 42 43 This class implements WiFi ping performance tests such as range and RTT. 44 The class setups up the AP in the desired configurations, configures 45 and connects the phone to the AP, and runs For an example config file to 46 run this test class see example_connectivity_performance_ap_sta.json. 47 """ 48 49 TEST_TIMEOUT = 10 50 RSSI_POLL_INTERVAL = 0.2 51 SHORT_SLEEP = 1 52 MED_SLEEP = 5 53 MAX_CONSECUTIVE_ZEROS = 5 54 DISCONNECTED_PING_RESULT = { 55 'connected': 0, 56 'rtt': [], 57 'time_stamp': [], 58 'ping_interarrivals': [], 59 'packet_loss_percentage': 100 60 } 61 62 BRCM_SAR_MAPPING = { 63 0: 'disable', 64 1: 'head', 65 2: 'grip', 66 16: 'bt', 67 32: 'hotspot' 68 } 69 70 BAND_TO_CHANNEL_MAP = { 71 ('2g', 1): [1, 6, 11], 72 ('5g', 1): [36, 40, 44, 48], 73 ('5g', 2): [52, 56, 60, 64], 74 ('5g', 3): range(100, 148, 4), 75 ('5g', 4): [149, 153, 157, 161], 76 ('6g', 1): ['6g{}'.format(channel) for channel in range(1, 46, 4)], 77 ('6g', 2): ['6g{}'.format(channel) for channel in range(49, 94, 4)], 78 ('6g', 3): ['6g{}'.format(channel) for channel in range(97, 114, 4)], 79 ('6g', 4): ['6g{}'.format(channel) for channel in range(117, 158, 4)], 80 ('6g', 5): ['6g{}'.format(channel) for channel in range(161, 186, 4)], 81 ('6g', 6): ['6g{}'.format(channel) for channel in range(189, 234, 4)] 82 } 83 84 def __init__(self, controllers): 85 base_test.BaseTestClass.__init__(self, controllers) 86 self.testcase_metric_logger = ( 87 BlackboxMappedMetricLogger.for_test_case()) 88 self.testclass_metric_logger = ( 89 BlackboxMappedMetricLogger.for_test_class()) 90 self.publish_testcase_metrics = True 91 self.tests = self.generate_test_cases( 92 ap_power='standard', 93 channels=[6, 36, 52, 100, 149, '6g37', '6g117', '6g213'], 94 modes=['bw20', 'bw40', 'bw80', 'bw160'], 95 test_types=[ 96 'test_tx_power', 97 ], 98 country_codes=['US', 'GB', 'JP'], 99 sar_states=range(0, 13)) 100 101 def setup_class(self): 102 self.dut = self.android_devices[-1] 103 req_params = [ 104 'tx_power_test_params', 'testbed_params', 'main_network', 105 'RetailAccessPoints', 'RemoteServer' 106 ] 107 opt_params = ['OTASniffer'] 108 self.unpack_userparams(req_params, opt_params) 109 self.testclass_params = self.tx_power_test_params 110 self.num_atten = self.attenuators[0].instrument.num_atten 111 self.ping_server = ssh.connection.SshConnection( 112 ssh.settings.from_config(self.RemoteServer[0]['ssh_config'])) 113 self.access_point = retail_ap.create(self.RetailAccessPoints)[0] 114 if hasattr(self, 115 'OTASniffer') and self.testbed_params['sniffer_enable']: 116 try: 117 self.sniffer = ota_sniffer.create(self.OTASniffer)[0] 118 except: 119 self.log.warning('Could not start sniffer. Disabling sniffs.') 120 self.testbed_params['sniffer_enable'] = 0 121 self.log.info('Access Point Configuration: {}'.format( 122 self.access_point.ap_settings)) 123 self.log_path = os.path.join(logging.log_path, 'results') 124 os.makedirs(self.log_path, exist_ok=True) 125 self.atten_dut_chain_map = {} 126 self.testclass_results = [] 127 128 # Turn WiFi ON 129 if self.testclass_params.get('airplane_mode', 1): 130 self.log.info('Turning on airplane mode.') 131 asserts.assert_true(utils.force_airplane_mode(self.dut, True), 132 'Can not turn on airplane mode.') 133 wutils.wifi_toggle_state(self.dut, True) 134 self.dut.droid.wifiEnableVerboseLogging(1) 135 asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1, 136 "Failed to enable WiFi verbose logging.") 137 138 # decode nvram 139 self.nvram_sar_data = self.read_nvram_sar_data() 140 self.csv_sar_data = self.read_sar_csv(self.testclass_params['sar_csv']) 141 142 def teardown_class(self): 143 # Turn WiFi OFF and reset AP 144 self.access_point.teardown() 145 for dev in self.android_devices: 146 wutils.wifi_toggle_state(dev, False) 147 dev.go_to_sleep() 148 self.process_testclass_results() 149 150 def setup_test(self): 151 self.retry_flag = False 152 153 def teardown_test(self): 154 self.retry_flag = False 155 156 def on_retry(self): 157 """Function to control test logic on retried tests. 158 159 This function is automatically executed on tests that are being 160 retried. In this case the function resets wifi, toggles it off and on 161 and sets a retry_flag to enable further tweaking the test logic on 162 second attempts. 163 """ 164 self.retry_flag = True 165 for dev in self.android_devices: 166 wutils.reset_wifi(dev) 167 wutils.toggle_wifi_off_and_on(dev) 168 169 def read_sar_csv(self, sar_csv): 170 """Reads SAR powers from CSV. 171 172 This function reads SAR powers from a CSV and generate a dictionary 173 with all programmed TX powers on a per band and regulatory domain 174 basis. 175 176 Args: 177 sar_csv: path to SAR data file. 178 Returns: 179 sar_powers: dict containing all SAR data 180 """ 181 182 sar_powers = {} 183 sar_csv_data = [] 184 with open(sar_csv, mode='r') as f: 185 reader = csv.DictReader(f) 186 for row in reader: 187 row['Sub-band Powers'] = [ 188 float(val) for key, val in row.items() 189 if 'Sub-band' in key and val != '' 190 ] 191 sar_csv_data.append(row) 192 193 for row in sar_csv_data: 194 sar_powers.setdefault(int(row['Scenario Index']), {}) 195 sar_powers[int(row['Scenario Index'])].setdefault('SAR Powers', {}) 196 sar_row_key = (row['Regulatory Domain'], row['Mode'], row['Band']) 197 sar_powers[int(row['Scenario Index'])]['SAR Powers'].setdefault( 198 sar_row_key, {}) 199 sar_powers[int( 200 row['Scenario Index'])]['SAR Powers'][sar_row_key][int( 201 row['Chain'])] = row['Sub-band Powers'] 202 return sar_powers 203 204 def read_nvram_sar_data(self): 205 """Reads SAR powers from NVRAM. 206 207 This function reads SAR powers from the NVRAM found on the DUT and 208 generates a dictionary with all programmed TX powers on a per band and 209 regulatory domain basis. NThe NVRAM file is chosen based on the build, 210 but if no NVRAM file is found matching the expected name, the default 211 NVRAM will be loaded. The choice of NVRAM is not guaranteed to be 212 correct. 213 214 Returns: 215 nvram_sar_data: dict containing all SAR data 216 """ 217 218 self._read_sar_config_info() 219 try: 220 hardware_version = self.dut.adb.shell( 221 'getprop ro.boot.hardware.revision') 222 nvram_path = '/vendor/firmware/bcmdhd.cal_{}'.format( 223 hardware_version) 224 nvram = self.dut.adb.shell('cat {}'.format(nvram_path)) 225 except: 226 nvram = self.dut.adb.shell('cat /vendor/firmware/bcmdhd.cal') 227 current_context = context.get_current_context().get_full_output_path() 228 file_path = os.path.join(current_context, 'nvram_file') 229 with open(file_path, 'w') as file: 230 file.write(nvram) 231 nvram_sar_data = {} 232 for line in nvram.splitlines(): 233 if 'dynsar' in line: 234 sar_config, sar_powers = self._parse_nvram_sar_line(line) 235 nvram_sar_data[sar_config] = sar_powers 236 file_path = os.path.join(current_context, 'nvram_sar_data') 237 with open(file_path, 'w') as file: 238 json.dump(wputils.serialize_dict(nvram_sar_data), file, indent=4) 239 240 return nvram_sar_data 241 242 def _read_sar_config_info(self): 243 """Function to read SAR scenario mapping, 244 245 This function reads sar_config.info file which contains the mapping 246 of SAR scenarios to NVRAM data tables. 247 """ 248 249 self.sar_state_mapping = collections.OrderedDict([(-1, { 250 "google_name": 251 'WIFI_POWER_SCENARIO_DISABLE' 252 }), (0, { 253 "google_name": 'WIFI_POWER_SCENARIO_DISABLE' 254 }), (1, { 255 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_OFF' 256 }), (2, { 257 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_CELL_ON' 258 }), (3, { 259 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_OFF' 260 }), (4, { 261 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON' 262 }), (5, { 263 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_BT' 264 }), (6, { 265 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT' 266 }), (7, { 267 "google_name": 'WIFI_POWER_SCENARIO_ON_HEAD_HOTSPOT_MMW' 268 }), (8, { 269 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_CELL_ON_BT' 270 }), (9, { 271 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT' 272 }), (10, { 273 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT' 274 }), (11, { 275 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_MMW' 276 }), (12, { 277 "google_name": 'WIFI_POWER_SCENARIO_ON_BODY_HOTSPOT_BT_MMW' 278 })]) 279 sar_config_path = '/vendor/firmware/sarconfig.info' 280 sar_config = self.dut.adb.shell( 281 'cat {}'.format(sar_config_path)).splitlines() 282 sar_config = [line.split(',') for line in sar_config] 283 sar_config = [[int(x) for x in line] for line in sar_config] 284 285 for sar_state in sar_config: 286 self.sar_state_mapping[sar_state[0]]['brcm_index'] = ( 287 self.BRCM_SAR_MAPPING[sar_state[1]], bool(sar_state[2])) 288 current_context = context.get_current_context().get_full_output_path() 289 file_path = os.path.join(current_context, 'sarconfig') 290 with open(file_path, 'w') as file: 291 json.dump(wputils.serialize_dict(self.sar_state_mapping), 292 file, 293 indent=4) 294 295 def _parse_nvram_sar_line(self, sar_line): 296 """Helper function to decode SAR NVRAM data lines. 297 298 Args: 299 sar_line: single line of text from NVRAM file containing SAR data. 300 Returns: 301 sar_config: sar config referenced in this line 302 decoded_values: tx powers configured in this line 303 """ 304 305 sar_config = collections.OrderedDict() 306 list_of_countries = ['fcc', 'jp'] 307 try: 308 sar_config['country'] = next(country 309 for country in list_of_countries 310 if country in sar_line) 311 except: 312 sar_config['country'] = 'row' 313 314 list_of_sar_states = ['grip', 'bt', 'hotspot'] 315 try: 316 sar_config['state'] = next(state for state in list_of_sar_states 317 if state in sar_line) 318 except: 319 sar_config['state'] = 'head' 320 321 list_of_bands = ['2g', '5g', '6g'] 322 sar_config['band'] = next(band for band in list_of_bands 323 if band in sar_line) 324 325 sar_config['rsdb'] = 'rsdb' if 'rsdb' in sar_line else 'mimo' 326 sar_config['airplane_mode'] = '_2=' in sar_line 327 328 sar_powers = sar_line.split('=')[1].split(',') 329 decoded_powers = [] 330 for sar_power in sar_powers: 331 decoded_powers.append([ 332 (int(sar_power[2:4], 16) & int('7f', 16)) / 4, 333 (int(sar_power[4:], 16) & int('7f', 16)) / 4 334 ]) 335 336 return tuple(sar_config.values()), decoded_powers 337 338 def get_sar_power_from_nvram(self, testcase_params): 339 """Function to get current expected SAR power from nvram 340 341 This functions gets the expected SAR TX power from the DUT NVRAM data. 342 The SAR power is looked up based on the current channel and regulatory 343 domain, 344 345 Args: 346 testcase_params: dict containing channel, sar state, country code 347 Returns: 348 sar_config: current expected sar config 349 sar_powers: current expected sar powers 350 """ 351 352 if testcase_params['country_code'] == 'US': 353 reg_domain = 'fcc' 354 elif testcase_params['country_code'] == 'JP': 355 reg_domain = 'jp' 356 else: 357 reg_domain = 'row' 358 for band, channels in self.BAND_TO_CHANNEL_MAP.items(): 359 if testcase_params['channel'] in channels: 360 current_band = band[0] 361 sub_band_idx = band[1] 362 break 363 sar_config = (reg_domain, self.sar_state_mapping[ 364 testcase_params['sar_state']]['brcm_index'][0], current_band, 365 'mimo', self.sar_state_mapping[ 366 testcase_params['sar_state']]['brcm_index'][1]) 367 sar_powers = self.nvram_sar_data[sar_config][sub_band_idx - 1] 368 return sar_config, sar_powers 369 370 def get_sar_power_from_csv(self, testcase_params): 371 """Function to get current expected SAR power from CSV. 372 373 This functions gets the expected SAR TX power from the DUT NVRAM data. 374 The SAR power is looked up based on the current channel and regulatory 375 domain, 376 377 Args: 378 testcase_params: dict containing channel, sar state, country code 379 Returns: 380 sar_config: current expected sar config 381 sar_powers: current expected sar powers 382 """ 383 384 if testcase_params['country_code'] == 'US': 385 reg_domain = 'fcc' 386 elif testcase_params['country_code'] == 'JP': 387 reg_domain = 'jp' 388 else: 389 reg_domain = 'row' 390 for band, channels in self.BAND_TO_CHANNEL_MAP.items(): 391 if testcase_params['channel'] in channels: 392 current_band = band[0] 393 sub_band_idx = band[1] 394 break 395 sar_config = (reg_domain, 'mimo', current_band) 396 sar_powers = [ 397 self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] 398 [sar_config][0][sub_band_idx - 1], 399 self.csv_sar_data[testcase_params['sar_state']]['SAR Powers'] 400 [sar_config][1][sub_band_idx - 1] 401 ] 402 return sar_config, sar_powers 403 404 def process_wl_curpower(self, wl_curpower_file, testcase_params): 405 """Function to parse wl_curpower output. 406 407 Args: 408 wl_curpower_file: path to curpower output file. 409 testcase_params: dict containing channel, sar state, country code 410 Returns: 411 wl_curpower_dict: dict formatted version of curpower data. 412 """ 413 414 with open(wl_curpower_file, 'r') as file: 415 wl_curpower_out = file.read() 416 417 channel_regex = re.compile(r'Current Channel:\s+(?P<channel>[0-9]+)') 418 bandwidth_regex = re.compile( 419 r'Channel Width:\s+(?P<bandwidth>\S+)MHz\n') 420 421 channel = int( 422 re.search(channel_regex, wl_curpower_out).group('channel')) 423 bandwidth = int( 424 re.search(bandwidth_regex, wl_curpower_out).group('bandwidth')) 425 426 regulatory_limits = self.generate_regulatory_table( 427 wl_curpower_out, channel, bandwidth) 428 board_limits = self.generate_board_limit_table(wl_curpower_out, 429 channel, bandwidth) 430 wl_curpower_dict = { 431 'channel': channel, 432 'bandwidth': bandwidth, 433 'country': testcase_params['country_code'], 434 'regulatory_limits': regulatory_limits, 435 'board_limits': board_limits 436 } 437 return wl_curpower_dict 438 439 def generate_regulatory_table(self, wl_curpower_out, channel, bw): 440 """"Helper function to generate regulatory limit table from curpower. 441 442 Args: 443 wl_curpower_out: curpower output 444 channel: current channel 445 bw: current bandwidth 446 Returns: 447 regulatory_table: dict with regulatory limits for current config 448 """ 449 450 regulatory_group_map = { 451 'DSSS': 452 [('CCK', rate, 1) 453 for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]], 454 'OFDM_CDD1': [('LEGACY', rate, 1) for rate in [ 455 '{}Mbps'.format(mbps) 456 for mbps in [6, 9, 12, 18, 24, 36, 48, 54] 457 ]], 458 'MCS0_7_CDD1': 459 [(mode, rate, 1) 460 for (mode, 461 rate) in itertools.product(['HT' + str(bw), 'VHT' + 462 str(bw)], range(0, 8))], 463 'VHT8_9SS1_CDD1': [('VHT' + str(bw), 8, 1), 464 ('VHT' + str(bw), 9, 1)], 465 'VHT10_11SS1_CDD1': [('VHT' + str(bw), 10, 1), 466 ('VHT' + str(bw), 11, 1)], 467 'MCS8_15': 468 [(mode, rate - 8 * ('VHT' in mode), 2) 469 for (mode, 470 rate) in itertools.product(['HT' + str(bw), 'VHT' + 471 str(bw)], range(8, 16))], 472 'VHT8_9SS2': [('VHT' + str(bw), 8, 2), ('VHT' + str(bw), 9, 2)], 473 'VHT10_11SS2': [('VHT' + str(bw), 10, 2), 474 ('VHT' + str(bw), 11, 2)], 475 'HE_MCS0-11_CDD1': [('HE' + str(bw), rate, 1) 476 for rate in range(0, 12)], 477 'HE_MCS0_11SS2': [('HE' + str(bw), rate, 2) 478 for rate in range(0, 12)], 479 } 480 tx_power_regex = re.compile( 481 '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)' 482 ) 483 484 regulatory_section_regex = re.compile( 485 r'Regulatory Limits:(?P<regulatory_limits>[\S\s]+)Board Limits:') 486 regulatory_list = re.search(regulatory_section_regex, 487 wl_curpower_out).group('regulatory_limits') 488 regulatory_list = re.findall(tx_power_regex, regulatory_list) 489 regulatory_dict = {entry[0]: entry[2:] for entry in regulatory_list} 490 491 bw_index = int(math.log(bw / 10, 2)) - 1 492 regulatory_table = collections.OrderedDict() 493 for regulatory_group, rates in regulatory_group_map.items(): 494 for rate in rates: 495 reg_power = regulatory_dict.get(regulatory_group, 496 ['0', '0', '0', '0'])[bw_index] 497 regulatory_table[rate] = float( 498 reg_power) if reg_power != '-' else 0 499 return regulatory_table 500 501 def generate_board_limit_table(self, wl_curpower_out, channel, bw): 502 """"Helper function to generate board limit table from curpower. 503 504 Args: 505 wl_curpower_out: curpower output 506 channel: current channel 507 bw: current bandwidth 508 Returns: 509 board_limit_table: dict with board limits for current config 510 """ 511 512 tx_power_regex = re.compile( 513 '(?P<mcs>\S+)\s+(?P<chain>[2])\s+(?P<power_1>[0-9.-]+)\s*(?P<power_2>[0-9.-]*)\s*(?P<power_3>[0-9.-]*)\s*(?P<power_4>[0-9.-]*)' 514 ) 515 516 board_section_regex = re.compile( 517 r'Board Limits:(?P<board_limits>[\S\s]+)Power Targets:') 518 board_limits_list = re.search(board_section_regex, 519 wl_curpower_out).group('board_limits') 520 board_limits_list = re.findall(tx_power_regex, board_limits_list) 521 board_limits_dict = { 522 entry[0]: entry[2:] 523 for entry in board_limits_list 524 } 525 526 mcs_regex_list = [[ 527 re.compile('DSSS'), 528 [('CCK', rate, 1) 529 for rate in ['{}Mbps'.format(mbps) for mbps in [1, 2, 5.5, 11]]] 530 ], [re.compile('OFDM(?P<mcs>[0-9]+)_CDD1'), [('LEGACY', '{}Mbps', 1)]], 531 [ 532 re.compile('MCS(?P<mcs>[0-7])_CDD1'), 533 [('HT{}'.format(bw), '{}', 1), 534 ('VHT{}'.format(bw), '{}', 1)] 535 ], 536 [ 537 re.compile('VHT(?P<mcs>[8-9])SS1_CDD1'), 538 [('VHT{}'.format(bw), '{}', 1)] 539 ], 540 [ 541 re.compile('VHT10_11SS1_CDD1'), 542 [('VHT{}'.format(bw), '10', 1), 543 ('VHT{}'.format(bw), '11', 1)] 544 ], 545 [ 546 re.compile('MCS(?P<mcs>[0-9]{2})'), 547 [('HT{}'.format(bw), '{}', 2)] 548 ], 549 [ 550 re.compile('VHT(?P<mcs>[0-9])SS2'), 551 [('VHT{}'.format(bw), '{}', 2)] 552 ], 553 [ 554 re.compile('VHT10_11SS2'), 555 [('VHT{}'.format(bw), '10', 2), 556 ('VHT{}'.format(bw), '11', 2)] 557 ], 558 [ 559 re.compile('HE_MCS(?P<mcs>[0-9]+)_CDD1'), 560 [('HE{}'.format(bw), '{}', 1)] 561 ], 562 [ 563 re.compile('HE_MCS(?P<mcs>[0-9]+)SS2'), 564 [('HE{}'.format(bw), '{}', 2)] 565 ]] 566 567 bw_index = int(math.log(bw / 10, 2)) - 1 568 board_limit_table = collections.OrderedDict() 569 for mcs, board_limit in board_limits_dict.items(): 570 for mcs_regex_tuple in mcs_regex_list: 571 mcs_match = re.match(mcs_regex_tuple[0], mcs) 572 if mcs_match: 573 for possible_mcs in mcs_regex_tuple[1]: 574 try: 575 curr_mcs = (possible_mcs[0], 576 possible_mcs[1].format( 577 mcs_match.group('mcs')), 578 possible_mcs[2]) 579 except: 580 curr_mcs = (possible_mcs[0], possible_mcs[1], 581 possible_mcs[2]) 582 board_limit_table[curr_mcs] = float( 583 board_limit[bw_index] 584 ) if board_limit[bw_index] != '-' else 0 585 break 586 return board_limit_table 587 588 def pass_fail_check(self, result): 589 """Function to evaluate if current TX powqe matches CSV/NVRAM settings. 590 591 This function assesses whether the current TX power reported by the 592 DUT matches the powers programmed in NVRAM and CSV after applying the 593 correct TX power backoff used to account for CLPC errors. 594 """ 595 596 if isinstance(result['testcase_params']['channel'], 597 str) and '6g' in result['testcase_params']['channel']: 598 mode = 'HE' + str(result['testcase_params']['bandwidth']) 599 else: 600 mode = 'VHT' + str(result['testcase_params']['bandwidth']) 601 regulatory_power = result['wl_curpower']['regulatory_limits'][(mode, 0, 602 2)] 603 if result['testcase_params']['sar_state'] == 0: 604 #get from wl_curpower 605 csv_powers = [30, 30] 606 nvram_powers = [30, 30] 607 sar_config = 'SAR DISABLED' 608 else: 609 sar_config, nvram_powers = self.get_sar_power_from_nvram( 610 result['testcase_params']) 611 csv_config, csv_powers = self.get_sar_power_from_csv( 612 result['testcase_params']) 613 self.log.info("SAR state: {} ({})".format( 614 result['testcase_params']['sar_state'], 615 self.sar_state_mapping[result['testcase_params']['sar_state']], 616 )) 617 self.log.info("Country Code: {}".format( 618 result['testcase_params']['country_code'])) 619 self.log.info('BRCM SAR Table: {}'.format(sar_config)) 620 expected_power = [ 621 min([csv_powers[0], regulatory_power]) - 1.5, 622 min([csv_powers[1], regulatory_power]) - 1.5 623 ] 624 power_str = "NVRAM Powers: {}, CSV Powers: {}, Reg Powers: {}, Expected Powers: {}, Reported Powers: {}".format( 625 nvram_powers, csv_powers, [regulatory_power] * 2, expected_power, 626 result['tx_powers']) 627 max_error = max([ 628 abs(expected_power[idx] - result['tx_powers'][idx]) 629 for idx in [0, 1] 630 ]) 631 if max_error > 1: 632 asserts.fail(power_str) 633 else: 634 asserts.explicit_pass(power_str) 635 636 def process_testclass_results(self): 637 pass 638 639 def run_tx_power_test(self, testcase_params): 640 """Main function to test tx power. 641 642 The function sets up the AP & DUT in the correct channel and mode 643 configuration, starts ping traffic and queries the current TX power. 644 645 Args: 646 testcase_params: dict containing all test parameters 647 Returns: 648 test_result: dict containing ping results and other meta data 649 """ 650 # Prepare results dict 651 llstats_obj = wputils.LinkLayerStats( 652 self.dut, self.testclass_params.get('llstats_enabled', True)) 653 test_result = collections.OrderedDict() 654 test_result['testcase_params'] = testcase_params.copy() 655 test_result['test_name'] = self.current_test_name 656 test_result['ap_config'] = self.access_point.ap_settings.copy() 657 test_result['attenuation'] = testcase_params['atten_range'] 658 test_result['fixed_attenuation'] = self.testbed_params[ 659 'fixed_attenuation'][str(testcase_params['channel'])] 660 test_result['rssi_results'] = [] 661 test_result['ping_results'] = [] 662 test_result['llstats'] = [] 663 # Setup sniffer 664 if self.testbed_params['sniffer_enable']: 665 self.sniffer.start_capture( 666 testcase_params['test_network'], 667 chan=testcase_params['channel'], 668 bw=testcase_params['bandwidth'], 669 duration=testcase_params['ping_duration'] * 670 len(testcase_params['atten_range']) + self.TEST_TIMEOUT) 671 # Run ping and sweep attenuation as needed 672 self.log.info('Starting ping.') 673 thread_future = wputils.get_ping_stats_nb(self.ping_server, 674 self.dut_ip, 10, 0.02, 64) 675 676 for atten in testcase_params['atten_range']: 677 for attenuator in self.attenuators: 678 attenuator.set_atten(atten, strict=False, retry=True) 679 # Set mcs 680 if isinstance(testcase_params['channel'], 681 int) and testcase_params['channel'] < 13: 682 self.dut.adb.shell('wl 2g_rate -v 0x2 -b {}'.format( 683 testcase_params['bandwidth'])) 684 elif isinstance(testcase_params['channel'], 685 int) and testcase_params['channel'] > 13: 686 self.dut.adb.shell('wl 5g_rate -v 0x2 -b {}'.format( 687 testcase_params['bandwidth'])) 688 else: 689 self.dut.adb.shell('wl 6g_rate -e 0 -s 2 -b {}'.format( 690 testcase_params['bandwidth'])) 691 # Set sar state 692 self.dut.adb.shell('halutil -sar enable {}'.format( 693 testcase_params['sar_state'])) 694 # Refresh link layer stats 695 llstats_obj.update_stats() 696 # Check sar state 697 self.log.info('Current Country: {}'.format( 698 self.dut.adb.shell('wl country'))) 699 # Dump last est power multiple times 700 chain_0_power = [] 701 chain_1_power = [] 702 for idx in range(30): 703 last_est_out = self.dut.adb.shell( 704 "wl curpower | grep 'Last est. power'", ignore_status=True) 705 if "Last est. power" in last_est_out: 706 per_chain_powers = last_est_out.split( 707 ':')[1].strip().split(' ') 708 per_chain_powers = [ 709 float(power) for power in per_chain_powers 710 ] 711 self.log.info( 712 'Current Tx Powers = {}'.format(per_chain_powers)) 713 if per_chain_powers[0] > 0: 714 chain_0_power.append(per_chain_powers[0]) 715 if per_chain_powers[1] > 0: 716 chain_1_power.append(per_chain_powers[1]) 717 time.sleep(0.25) 718 # Check if empty 719 if len(chain_0_power) == 0 or len(chain_1_power) == 0: 720 test_result['tx_powers'] = [0, 0] 721 tx_power_frequency = [100, 100] 722 else: 723 test_result['tx_powers'] = [ 724 scipy.stats.mode(chain_0_power).mode[0], 725 scipy.stats.mode(chain_1_power).mode[0] 726 ] 727 tx_power_frequency = [ 728 100 * scipy.stats.mode(chain_0_power).count[0] / 729 len(chain_0_power), 730 100 * scipy.stats.mode(chain_1_power).count[0] / 731 len(chain_0_power) 732 ] 733 self.log.info( 734 'Filtered Tx Powers = {}. Frequency = [{:.0f}%, {:.0f}%]'. 735 format(test_result['tx_powers'], tx_power_frequency[0], 736 tx_power_frequency[1])) 737 llstats_obj.update_stats() 738 curr_llstats = llstats_obj.llstats_incremental.copy() 739 test_result['llstats'].append(curr_llstats) 740 # DUMP wl curpower one 741 try: 742 wl_curpower = self.dut.adb.shell('wl curpower') 743 except: 744 time.sleep(0.25) 745 wl_curpower = self.dut.adb.shell('wl curpower', 746 ignore_status=True) 747 current_context = context.get_current_context( 748 ).get_full_output_path() 749 wl_curpower_path = os.path.join(current_context, 750 'wl_curpower_output') 751 with open(wl_curpower_path, 'w') as file: 752 file.write(wl_curpower) 753 wl_curpower_dict = self.process_wl_curpower( 754 wl_curpower_path, testcase_params) 755 wl_curpower_path = os.path.join(current_context, 756 'wl_curpower_dict') 757 with open(wl_curpower_path, 'w') as file: 758 json.dump(wputils.serialize_dict(wl_curpower_dict), 759 file, 760 indent=4) 761 test_result['wl_curpower'] = wl_curpower_dict 762 thread_future.result() 763 if self.testbed_params['sniffer_enable']: 764 self.sniffer.stop_capture() 765 return test_result 766 767 def setup_ap(self, testcase_params): 768 """Sets up the access point in the configuration required by the test. 769 770 Args: 771 testcase_params: dict containing AP and other test params 772 """ 773 band = self.access_point.band_lookup_by_channel( 774 testcase_params['channel']) 775 if '6G' in band: 776 frequency = wutils.WifiEnums.channel_6G_to_freq[int( 777 testcase_params['channel'].strip('6g'))] 778 else: 779 if testcase_params['channel'] < 13: 780 frequency = wutils.WifiEnums.channel_2G_to_freq[ 781 testcase_params['channel']] 782 else: 783 frequency = wutils.WifiEnums.channel_5G_to_freq[ 784 testcase_params['channel']] 785 if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: 786 self.access_point.set_region(self.testbed_params['DFS_region']) 787 else: 788 self.access_point.set_region(self.testbed_params['default_region']) 789 self.access_point.set_channel(band, testcase_params['channel']) 790 self.access_point.set_bandwidth(band, testcase_params['mode']) 791 if 'low' in testcase_params['ap_power']: 792 self.log.info('Setting low AP power.') 793 self.access_point.set_power( 794 band, self.testclass_params['low_ap_tx_power']) 795 self.log.info('Access Point Configuration: {}'.format( 796 self.access_point.ap_settings)) 797 798 def setup_dut(self, testcase_params): 799 """Sets up the DUT in the configuration required by the test. 800 801 Args: 802 testcase_params: dict containing AP and other test params 803 """ 804 # Turn screen off to preserve battery 805 if self.testbed_params.get('screen_on', 806 False) or self.testclass_params.get( 807 'screen_on', False): 808 self.dut.droid.wakeLockAcquireDim() 809 else: 810 self.dut.go_to_sleep() 811 if wputils.validate_network(self.dut, 812 testcase_params['test_network']['SSID']): 813 current_country = self.dut.adb.shell('wl country') 814 self.log.info('Current country code: {}'.format(current_country)) 815 if testcase_params['country_code'] in current_country: 816 self.log.info('Already connected to desired network') 817 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses( 818 'wlan0')[0] 819 return 820 testcase_params['test_network']['channel'] = testcase_params['channel'] 821 wutils.wifi_toggle_state(self.dut, False) 822 wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) 823 wutils.wifi_toggle_state(self.dut, True) 824 wutils.reset_wifi(self.dut) 825 if self.testbed_params.get('txbf_off', False): 826 wputils.disable_beamforming(self.dut) 827 wutils.set_wifi_country_code(self.dut, testcase_params['country_code']) 828 wutils.wifi_connect(self.dut, 829 testcase_params['test_network'], 830 num_of_tries=1, 831 check_connectivity=True) 832 self.dut_ip = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 833 834 def setup_tx_power_test(self, testcase_params): 835 """Function that gets devices ready for the test. 836 837 Args: 838 testcase_params: dict containing test-specific parameters 839 """ 840 # Configure AP 841 self.setup_ap(testcase_params) 842 # Set attenuator to 0 dB 843 for attenuator in self.attenuators: 844 attenuator.set_atten(0, strict=False, retry=True) 845 # Reset, configure, and connect DUT 846 self.setup_dut(testcase_params) 847 848 def check_skip_conditions(self, testcase_params): 849 """Checks if test should be skipped.""" 850 # Check battery level before test 851 if not wputils.health_check(self.dut, 10): 852 asserts.skip('DUT battery level too low.') 853 if testcase_params[ 854 'channel'] in wputils.CHANNELS_6GHz and not self.dut.droid.is6GhzBandSupported( 855 ): 856 asserts.skip('DUT does not support 6 GHz band.') 857 if not self.access_point.band_lookup_by_channel( 858 testcase_params['channel']): 859 asserts.skip('AP does not support requested channel.') 860 861 def compile_test_params(self, testcase_params): 862 """Function to compile all testcase parameters.""" 863 864 self.check_skip_conditions(testcase_params) 865 866 band = self.access_point.band_lookup_by_channel( 867 testcase_params['channel']) 868 testcase_params['test_network'] = self.main_network[band] 869 testcase_params['attenuated_chain'] = -1 870 testcase_params.update( 871 ping_interval=self.testclass_params['ping_interval'], 872 ping_duration=self.testclass_params['ping_duration'], 873 ping_size=self.testclass_params['ping_size'], 874 ) 875 876 testcase_params['atten_range'] = [0] 877 return testcase_params 878 879 def _test_ping(self, testcase_params): 880 """ Function that gets called for each range test case 881 882 The function gets called in each range test case. It customizes the 883 range test based on the test name of the test that called it 884 885 Args: 886 testcase_params: dict containing preliminary set of parameters 887 """ 888 # Compile test parameters from config and test name 889 testcase_params = self.compile_test_params(testcase_params) 890 # Run ping test 891 self.setup_tx_power_test(testcase_params) 892 result = self.run_tx_power_test(testcase_params) 893 self.pass_fail_check(result) 894 895 def generate_test_cases(self, ap_power, channels, modes, test_types, 896 country_codes, sar_states): 897 """Function that auto-generates test cases for a test class.""" 898 test_cases = [] 899 allowed_configs = { 900 20: [ 901 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 36, 40, 44, 48, 52, 64, 100, 902 116, 132, 140, 149, 153, 157, 161 903 ], 904 40: [36, 44, 100, 149, 157], 905 80: [36, 100, 149], 906 160: [36, '6g37', '6g117', '6g213'] 907 } 908 909 for channel, mode, test_type, country_code, sar_state in itertools.product( 910 channels, modes, test_types, country_codes, sar_states): 911 bandwidth = int(''.join([x for x in mode if x.isdigit()])) 912 if channel not in allowed_configs[bandwidth]: 913 continue 914 testcase_name = '{}_ch{}_{}_{}_sar_{}'.format( 915 test_type, channel, mode, country_code, sar_state) 916 testcase_params = collections.OrderedDict( 917 test_type=test_type, 918 ap_power=ap_power, 919 channel=channel, 920 mode=mode, 921 bandwidth=bandwidth, 922 country_code=country_code, 923 sar_state=sar_state) 924 setattr(self, testcase_name, 925 partial(self._test_ping, testcase_params)) 926 test_cases.append(testcase_name) 927 return test_cases 928