1#!/usr/bin/env python3.4 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import importlib 19import ipaddress 20import logging 21import numpy 22import re 23import time 24from acts import asserts 25from acts import utils 26from acts.controllers.android_device import AndroidDevice 27from acts.controllers.utils_lib import ssh 28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils 30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils 31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils 32 33from concurrent.futures import ThreadPoolExecutor 34 35SHORT_SLEEP = 1 36MED_SLEEP = 6 37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)] 38BAND_TO_CHANNEL_MAP = { 39 '2.4GHz': list(range(1, 14)), 40 'UNII-1': [36, 40, 44, 48], 41 'UNII-2': 42 [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140], 43 'UNII-3': [149, 153, 157, 161, 165], 44 '6GHz': CHANNELS_6GHz 45} 46CHANNEL_TO_BAND_MAP = { 47 channel: band 48 for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels 49} 50 51 52# Decorators 53def nonblocking(f): 54 """Creates a decorator transforming function calls to non-blocking""" 55 56 def wrap(*args, **kwargs): 57 executor = ThreadPoolExecutor(max_workers=1) 58 thread_future = executor.submit(f, *args, **kwargs) 59 # Ensure resources are freed up when executor ruturns or raises 60 executor.shutdown(wait=False) 61 return thread_future 62 63 return wrap 64 65 66def detect_wifi_platform(dut): 67 if hasattr(dut, 'wifi_platform'): 68 return dut.wifi_platform 69 qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/')) 70 if qcom_check: 71 dut.wifi_platform = 'qcom' 72 else: 73 dut.wifi_platform = 'brcm' 74 return dut.wifi_platform 75 76 77def detect_wifi_decorator(f): 78 79 def wrap(*args, **kwargs): 80 if 'dut' in kwargs: 81 dut = kwargs['dut'] 82 else: 83 dut = next(arg for arg in args if type(arg) == AndroidDevice) 84 dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format( 85 detect_wifi_platform(dut)) 86 dut_package = importlib.import_module(dut_package) 87 f_decorated = getattr(dut_package, f.__name__, lambda: None) 88 return (f_decorated(*args, **kwargs)) 89 90 return wrap 91 92 93# JSON serializer 94def serialize_dict(input_dict): 95 """Function to serialize dicts to enable JSON output""" 96 output_dict = collections.OrderedDict() 97 for key, value in input_dict.items(): 98 output_dict[_serialize_value(key)] = _serialize_value(value) 99 return output_dict 100 101 102def _serialize_value(value): 103 """Function to recursively serialize dict entries to enable JSON output""" 104 if isinstance(value, tuple): 105 return str(value) 106 if isinstance(value, numpy.int64): 107 return int(value) 108 if isinstance(value, numpy.float64): 109 return float(value) 110 if isinstance(value, list): 111 return [_serialize_value(x) for x in value] 112 if isinstance(value, numpy.ndarray): 113 return [_serialize_value(x) for x in value] 114 elif isinstance(value, dict): 115 return serialize_dict(value) 116 elif type(value) in (float, int, bool, str): 117 return value 118 else: 119 return "Non-serializable object" 120 121 122def extract_sub_dict(full_dict, fields): 123 sub_dict = collections.OrderedDict( 124 (field, full_dict[field]) for field in fields) 125 return sub_dict 126 127def write_antenna_tune_code(dut, tune_code, readback_check = True): 128 flag_tune_code_forcing_on = 0 129 for n_enable in range(5): 130 logging.debug('{}-th Enabling modem test mode'.format(n_enable)) 131 try: 132 at_lmodetest_output = dut.adb.shell("modem_cmd raw AT+LMODETEST") 133 time.sleep(SHORT_SLEEP) 134 logging.debug('Command AT+LMODETEST output: {}'.format(at_lmodetest_output)) 135 except: 136 logging.error('{}-th Failed modem test mode AT+LMODETEST'.format(n_enable)) 137 continue 138 try: 139 at_lrffinalstart_output = dut.adb.shell("modem_cmd raw AT+LRFFINALSTART") 140 time.sleep(SHORT_SLEEP) 141 logging.debug('Command AT+LRFFINALSTART output: {}'.format(at_lrffinalstart_output)) 142 if "+LRFFINALSTART:0" in at_lrffinalstart_output and not "ERROR" in at_lrffinalstart_output: 143 flag_tune_code_forcing_on = 1 144 logging.info('Enable modem test mode SUCCESSFUL') 145 break 146 except: 147 logging.error('{}-th Failed modem test mode AT+LRFFINALSTART'.format(n_enable)) 148 continue 149 if not flag_tune_code_forcing_on: 150 raise RuntimeError("AT Command File Not set up") 151 at_cmd_output = dut.adb.shell("modem_cmd raw " + tune_code['tune_code_cmd']) 152 logging.debug('Write Tune Code: {}'.format("modem_cmd raw " + tune_code['tune_code_cmd'])) 153 flag_tc_reg_correct = True 154 # Check tune code register values 155 if readback_check: 156 # Check tune code register values 157 for tune_code_register_key in tune_code['tune_code_registers'].keys(): 158 try: 159 at_tc_reg_output = dut.adb.shell("modem_cmd raw AT+MIPIREAD={}".format(tune_code_register_key)) 160 time.sleep(SHORT_SLEEP) 161 except: 162 pass 163 if "+MIPIREAD:"+tune_code['tune_code_registers'][tune_code_register_key].lower() in at_tc_reg_output: 164 logging.info('Forced tune code register {} value matches expectation: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key])) 165 else: 166 logging.warning('Expected tune code register {} value: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key])) 167 logging.warning('tune code register value is set to {}'.format(at_tc_reg_output)) 168 flag_tc_reg_correct = False 169 if flag_tc_reg_correct: 170 return True 171 else: 172 raise RuntimeError("Enable modem test mode SUCCESSFUL, but register values NOT correct") 173 else: 174 logging.info('Tune code validation skipped.') 175 return True 176 177# Miscellaneous Wifi Utilities 178def check_skip_conditions(testcase_params, dut, access_point, 179 ota_chamber=None): 180 """Checks if test should be skipped.""" 181 # Check battery level before test 182 if not health_check(dut, 10): 183 asserts.skip('DUT battery level too low.') 184 if not access_point.band_lookup_by_channel(testcase_params['channel']): 185 asserts.skip('AP does not support requested channel.') 186 if ota_chamber and CHANNEL_TO_BAND_MAP[ 187 testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS: 188 asserts.skip('OTA chamber does not support requested channel.') 189 # Check if 6GHz is supported by checking capabilities in the US. 190 if not dut.droid.wifiCheckState(): 191 wutils.wifi_toggle_state(dut, True) 192 iw_list = dut.adb.shell('iw list') 193 supports_6ghz = '6135 MHz' in iw_list 194 supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list 195 if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz: 196 asserts.skip('DUT does not support 160 MHz networks.') 197 if testcase_params.get('channel', 198 6) in CHANNELS_6GHz and not supports_6ghz: 199 asserts.skip('DUT does not support 6 GHz band.') 200 201 202def validate_network(dut, ssid): 203 """Check that DUT has a valid internet connection through expected SSID 204 205 Args: 206 dut: android device of interest 207 ssid: expected ssid 208 """ 209 try: 210 connected = wutils.validate_connection(dut, wait_time=3) is not None 211 current_network = dut.droid.wifiGetConnectionInfo() 212 except: 213 connected = False 214 current_network = None 215 if connected and current_network['SSID'] == ssid: 216 return True 217 else: 218 return False 219 220 221def get_server_address(ssh_connection, dut_ip, subnet_mask): 222 """Get server address on a specific subnet, 223 224 This function retrieves the LAN or WAN IP of a remote machine used in 225 testing. If subnet_mask is set to 'public' it returns a machines global ip, 226 else it returns the ip belonging to the dut local network given the dut's 227 ip and subnet mask. 228 229 Args: 230 ssh_connection: object representing server for which we want an ip 231 dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx 232 subnet_mask: string representing subnet mask (public for global ip) 233 """ 234 ifconfig_out = ssh_connection.run('ifconfig').stdout 235 ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out) 236 ip_list = [ipaddress.ip_address(ip) for ip in ip_list] 237 238 if subnet_mask == 'public': 239 for ip in ip_list: 240 # is_global is not used to allow for CGNAT ips in 100.x.y.z range 241 if not ip.is_private: 242 return str(ip) 243 else: 244 dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask), 245 strict=False) 246 for ip in ip_list: 247 if ip in dut_network: 248 return str(ip) 249 logging.error('No IP address found in requested subnet') 250 251 252# Ping utilities 253def get_ping_stats(src_device, dest_address, ping_duration, ping_interval, 254 ping_size): 255 """Run ping to or from the DUT. 256 257 The function computes either pings the DUT or pings a remote ip from 258 DUT. 259 260 Args: 261 src_device: object representing device to ping from 262 dest_address: ip address to ping 263 ping_duration: timeout to set on the ping process (in seconds) 264 ping_interval: time between pings (in seconds) 265 ping_size: size of ping packet payload 266 Returns: 267 ping_result: dict containing ping results and other meta data 268 """ 269 ping_count = int(ping_duration / ping_interval) 270 ping_deadline = int(ping_count * ping_interval) + 1 271 ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format( 272 ping_count, 273 ping_deadline, 274 ping_interval, 275 ping_size, 276 ) 277 278 ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format( 279 ping_count, 280 ping_deadline, 281 ping_interval, 282 ping_size, 283 ) 284 285 if isinstance(src_device, AndroidDevice): 286 ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address) 287 ping_output = src_device.adb.shell(ping_cmd, 288 timeout=ping_deadline + SHORT_SLEEP, 289 ignore_status=True) 290 elif isinstance(src_device, ssh.connection.SshConnection): 291 platform = src_device.run('uname').stdout 292 if 'linux' in platform.lower(): 293 ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address) 294 elif 'darwin' in platform.lower(): 295 ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format( 296 ping_cmd_macos, dest_address) 297 ping_output = src_device.run(ping_cmd, 298 timeout=ping_deadline + SHORT_SLEEP, 299 ignore_status=True).stdout 300 else: 301 raise TypeError('Unable to ping using src_device of type %s.' % 302 type(src_device)) 303 return ping_utils.PingResult(ping_output.splitlines()) 304 305 306@nonblocking 307def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval, 308 ping_size): 309 return get_ping_stats(src_device, dest_address, ping_duration, 310 ping_interval, ping_size) 311 312 313# Iperf utilities 314@nonblocking 315def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag, 316 timeout): 317 return iperf_client.start(iperf_server_address, iperf_args, tag, timeout) 318 319 320def get_iperf_arg_string(duration, 321 reverse_direction, 322 interval=1, 323 traffic_type='TCP', 324 socket_size=None, 325 num_processes=1, 326 udp_throughput='1000M', 327 ipv6=False, 328 udp_length=1448): 329 """Function to format iperf client arguments. 330 331 This function takes in iperf client parameters and returns a properly 332 formatter iperf arg string to be used in throughput tests. 333 334 Args: 335 duration: iperf duration in seconds 336 reverse_direction: boolean controlling the -R flag for iperf clients 337 interval: iperf print interval 338 traffic_type: string specifying TCP or UDP traffic 339 socket_size: string specifying TCP window or socket buffer, e.g., 2M 340 num_processes: int specifying number of iperf processes 341 udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M 342 ipv6: boolean controlling the use of IP V6 343 Returns: 344 iperf_args: string of formatted iperf args 345 """ 346 iperf_args = '-i {} -t {} -J '.format(interval, duration) 347 if ipv6: 348 iperf_args = iperf_args + '-6 ' 349 if traffic_type.upper() == 'UDP': 350 iperf_args = iperf_args + '-u -b {} -l {} -P {} '.format( 351 udp_throughput, udp_length, num_processes) 352 elif traffic_type.upper() == 'TCP': 353 iperf_args = iperf_args + '-P {} '.format(num_processes) 354 if socket_size: 355 iperf_args = iperf_args + '-w {} '.format(socket_size) 356 if reverse_direction: 357 iperf_args = iperf_args + ' -R' 358 return iperf_args 359 360 361# Attenuator Utilities 362def atten_by_label(atten_list, path_label, atten_level): 363 """Attenuate signals according to their path label. 364 365 Args: 366 atten_list: list of attenuators to iterate over 367 path_label: path label on which to set desired attenuation 368 atten_level: attenuation desired on path 369 """ 370 for atten in atten_list: 371 if path_label in atten.path: 372 atten.set_atten(atten_level, retry=True) 373 374 375def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server): 376 """Function to estimate attenuation to hit a target RSSI. 377 378 This function estimates a constant attenuation setting on all atennuation 379 ports to hit a target RSSI. The estimate is not meant to be exact or 380 guaranteed. 381 382 Args: 383 target_rssi: rssi of interest 384 attenuators: list of attenuator ports 385 dut: android device object assumed connected to a wifi network. 386 ping_server: ssh connection object to ping server 387 Returns: 388 target_atten: attenuation setting to achieve target_rssi 389 """ 390 logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi)) 391 # Set attenuator to 0 dB 392 for atten in attenuators: 393 atten.set_atten(0, strict=False, retry=True) 394 # Start ping traffic 395 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 396 # Measure starting RSSI 397 ping_future = get_ping_stats_nb(src_device=ping_server, 398 dest_address=dut_ip, 399 ping_duration=1.5, 400 ping_interval=0.02, 401 ping_size=64) 402 current_rssi = get_connected_rssi(dut, 403 num_measurements=4, 404 polling_frequency=0.25, 405 first_measurement_delay=0.5, 406 disconnect_warning=1, 407 ignore_samples=1) 408 current_rssi = current_rssi['signal_poll_rssi']['mean'] 409 ping_future.result() 410 target_atten = 0 411 logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 412 target_atten, current_rssi)) 413 within_range = 0 414 for idx in range(20): 415 atten_delta = max(min(current_rssi - target_rssi, 20), -20) 416 target_atten = int((target_atten + atten_delta) * 4) / 4 417 if target_atten < 0: 418 return 0 419 if target_atten > attenuators[0].get_max_atten(): 420 return attenuators[0].get_max_atten() 421 for atten in attenuators: 422 atten.set_atten(target_atten, strict=False, retry=True) 423 ping_future = get_ping_stats_nb(src_device=ping_server, 424 dest_address=dut_ip, 425 ping_duration=1.5, 426 ping_interval=0.02, 427 ping_size=64) 428 current_rssi = get_connected_rssi(dut, 429 num_measurements=4, 430 polling_frequency=0.25, 431 first_measurement_delay=0.5, 432 disconnect_warning=1, 433 ignore_samples=1) 434 current_rssi = current_rssi['signal_poll_rssi']['mean'] 435 ping_future.result() 436 logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 437 target_atten, current_rssi)) 438 if abs(current_rssi - target_rssi) < 1: 439 if within_range: 440 logging.info( 441 'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.' 442 'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format( 443 current_rssi, target_rssi, target_atten, idx)) 444 return target_atten 445 else: 446 within_range = True 447 else: 448 within_range = False 449 return target_atten 450 451 452def get_current_atten_dut_chain_map(attenuators, 453 dut, 454 ping_server, 455 ping_from_dut=False): 456 """Function to detect mapping between attenuator ports and DUT chains. 457 458 This function detects the mapping between attenuator ports and DUT chains 459 in cases where DUT chains are connected to only one attenuator port. The 460 function assumes the DUT is already connected to a wifi network. The 461 function starts by measuring per chain RSSI at 0 attenuation, then 462 attenuates one port at a time looking for the chain that reports a lower 463 RSSI. 464 465 Args: 466 attenuators: list of attenuator ports 467 dut: android device object assumed connected to a wifi network. 468 ping_server: ssh connection object to ping server 469 ping_from_dut: boolean controlling whether to ping from or to dut 470 Returns: 471 chain_map: list of dut chains, one entry per attenuator port 472 """ 473 # Set attenuator to 0 dB 474 for atten in attenuators: 475 atten.set_atten(0, strict=False, retry=True) 476 # Start ping traffic 477 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 478 if ping_from_dut: 479 ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname, 480 11, 0.02, 64) 481 else: 482 ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64) 483 # Measure starting RSSI 484 base_rssi = get_connected_rssi(dut, 4, 0.25, 1) 485 chain0_base_rssi = base_rssi['chain_0_rssi']['mean'] 486 chain1_base_rssi = base_rssi['chain_1_rssi']['mean'] 487 if chain0_base_rssi < -70 or chain1_base_rssi < -70: 488 logging.warning('RSSI might be too low to get reliable chain map.') 489 # Compile chain map by attenuating one path at a time and seeing which 490 # chain's RSSI degrades 491 chain_map = [] 492 for test_atten in attenuators: 493 # Set one attenuator to 30 dB down 494 test_atten.set_atten(30, strict=False, retry=True) 495 # Get new RSSI 496 test_rssi = get_connected_rssi(dut, 4, 0.25, 1) 497 # Assign attenuator to path that has lower RSSI 498 if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[ 499 'chain_0_rssi']['mean'] > 10: 500 chain_map.append('DUT-Chain-0') 501 elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[ 502 'chain_1_rssi']['mean'] > 10: 503 chain_map.append('DUT-Chain-1') 504 else: 505 chain_map.append(None) 506 # Reset attenuator to 0 507 test_atten.set_atten(0, strict=False, retry=True) 508 ping_future.result() 509 logging.debug('Chain Map: {}'.format(chain_map)) 510 return chain_map 511 512 513def get_full_rf_connection_map(attenuators, 514 dut, 515 ping_server, 516 networks, 517 ping_from_dut=False): 518 """Function to detect per-network connections between attenuator and DUT. 519 520 This function detects the mapping between attenuator ports and DUT chains 521 on all networks in its arguments. The function connects the DUT to each 522 network then calls get_current_atten_dut_chain_map to get the connection 523 map on the current network. The function outputs the results in two formats 524 to enable easy access when users are interested in indexing by network or 525 attenuator port. 526 527 Args: 528 attenuators: list of attenuator ports 529 dut: android device object assumed connected to a wifi network. 530 ping_server: ssh connection object to ping server 531 networks: dict of network IDs and configs 532 Returns: 533 rf_map_by_network: dict of RF connections indexed by network. 534 rf_map_by_atten: list of RF connections indexed by attenuator 535 """ 536 for atten in attenuators: 537 atten.set_atten(0, strict=False, retry=True) 538 539 rf_map_by_network = collections.OrderedDict() 540 rf_map_by_atten = [[] for atten in attenuators] 541 for net_id, net_config in networks.items(): 542 wutils.reset_wifi(dut) 543 wutils.wifi_connect(dut, 544 net_config, 545 num_of_tries=1, 546 assert_on_fail=False, 547 check_connectivity=False) 548 rf_map_by_network[net_id] = get_current_atten_dut_chain_map( 549 attenuators, dut, ping_server, ping_from_dut) 550 for idx, chain in enumerate(rf_map_by_network[net_id]): 551 if chain: 552 rf_map_by_atten[idx].append({ 553 'network': net_id, 554 'dut_chain': chain 555 }) 556 logging.debug('RF Map (by Network): {}'.format(rf_map_by_network)) 557 logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten)) 558 559 return rf_map_by_network, rf_map_by_atten 560 561 562# Generic device utils 563def get_dut_temperature(dut): 564 """Function to get dut temperature. 565 566 The function fetches and returns the reading from the temperature sensor 567 used for skin temperature and thermal throttling. 568 569 Args: 570 dut: AndroidDevice of interest 571 Returns: 572 temperature: device temperature. 0 if temperature could not be read 573 """ 574 candidate_zones = [ 575 '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp', 576 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp', 577 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp', 578 '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp', 579 '/dev/thermal/tz-by-name/quiet_therm/temp' 580 ] 581 for zone in candidate_zones: 582 try: 583 temperature = int(dut.adb.shell('cat {}'.format(zone))) 584 break 585 except: 586 temperature = 0 587 if temperature == 0: 588 logging.debug('Could not check DUT temperature.') 589 elif temperature > 100: 590 temperature = temperature / 1000 591 return temperature 592 593 594def wait_for_dut_cooldown(dut, target_temp=50, timeout=300): 595 """Function to wait for a DUT to cool down. 596 597 Args: 598 dut: AndroidDevice of interest 599 target_temp: target cooldown temperature 600 timeout: maxt time to wait for cooldown 601 """ 602 start_time = time.time() 603 while time.time() - start_time < timeout: 604 temperature = get_dut_temperature(dut) 605 if temperature < target_temp: 606 break 607 time.sleep(SHORT_SLEEP) 608 elapsed_time = time.time() - start_time 609 logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format( 610 temperature, elapsed_time)) 611 612 613def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1): 614 """Function to check health status of a DUT. 615 616 The function checks both battery levels and temperature to avoid DUT 617 powering off during the test. 618 619 Args: 620 dut: AndroidDevice of interest 621 batt_thresh: battery level threshold 622 temp_threshold: temperature threshold 623 cooldown: flag to wait for DUT to cool down when overheating 624 Returns: 625 health_check: boolean confirming device is healthy 626 """ 627 health_check = True 628 battery_level = utils.get_battery_level(dut) 629 if battery_level < batt_thresh: 630 logging.warning('Battery level low ({}%)'.format(battery_level)) 631 health_check = False 632 else: 633 logging.debug('Battery level = {}%'.format(battery_level)) 634 635 temperature = get_dut_temperature(dut) 636 if temperature > temp_threshold: 637 if cooldown: 638 logging.warning( 639 'Waiting for DUT to cooldown. ({} C)'.format(temperature)) 640 wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5) 641 else: 642 logging.warning('DUT Overheating ({} C)'.format(temperature)) 643 health_check = False 644 else: 645 logging.debug('DUT Temperature = {} C'.format(temperature)) 646 return health_check 647 648 649# Wifi Device Utils 650def empty_rssi_result(): 651 return collections.OrderedDict([('data', []), ('mean', float('nan')), 652 ('stdev', float('nan'))]) 653 654# Phone fold status 655def check_fold_status(dut): 656 fold_status = 'NA' 657 try: 658 fold_status_str = dut.adb.shell('sensor_test sample -s 65547.0 -n1',timeout=2) 659 except: 660 return fold_status 661 if 'Data: 1.000000' in fold_status_str: 662 fold_status = 'folded' 663 elif 'Data: 0.000000' in fold_status_str: 664 fold_status = 'unfolded' 665 else: 666 fold_status = 'NA' 667 return fold_status 668 669@nonblocking 670def get_connected_rssi_nb(dut, 671 num_measurements=1, 672 polling_frequency=SHORT_SLEEP, 673 first_measurement_delay=0, 674 disconnect_warning=True, 675 ignore_samples=0, 676 interface='wlan0'): 677 return get_connected_rssi(dut, num_measurements, polling_frequency, 678 first_measurement_delay, disconnect_warning, 679 ignore_samples, interface) 680 681 682@detect_wifi_decorator 683def get_connected_rssi(dut, 684 num_measurements=1, 685 polling_frequency=SHORT_SLEEP, 686 first_measurement_delay=0, 687 disconnect_warning=True, 688 ignore_samples=0, 689 interface='wlan0'): 690 """Gets all RSSI values reported for the connected access point/BSSID. 691 692 Args: 693 dut: android device object from which to get RSSI 694 num_measurements: number of scans done, and RSSIs collected 695 polling_frequency: time to wait between RSSI measurements 696 disconnect_warning: boolean controlling disconnection logging messages 697 ignore_samples: number of leading samples to ignore 698 Returns: 699 connected_rssi: dict containing the measurements results for 700 all reported RSSI values (signal_poll, per chain, etc.) and their 701 statistics 702 """ 703 pass 704 705 706@nonblocking 707def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1): 708 return get_scan_rssi(dut, tracked_bssids, num_measurements) 709 710 711@detect_wifi_decorator 712def get_scan_rssi(dut, tracked_bssids, num_measurements=1): 713 """Gets scan RSSI for specified BSSIDs. 714 715 Args: 716 dut: android device object from which to get RSSI 717 tracked_bssids: array of BSSIDs to gather RSSI data for 718 num_measurements: number of scans done, and RSSIs collected 719 Returns: 720 scan_rssi: dict containing the measurement results as well as the 721 statistics of the scan RSSI for all BSSIDs in tracked_bssids 722 """ 723 pass 724 725 726@detect_wifi_decorator 727def get_sw_signature(dut): 728 """Function that checks the signature for wifi firmware and config files. 729 730 Returns: 731 bdf_signature: signature consisting of last three digits of bdf cksums 732 fw_signature: floating point firmware version, i.e., major.minor 733 """ 734 pass 735 736 737@detect_wifi_decorator 738def get_country_code(dut): 739 """Function that returns the current wifi country code.""" 740 pass 741 742 743@detect_wifi_decorator 744def push_config(dut, config_file): 745 """Function to push Wifi BDF files 746 747 This function checks for existing wifi bdf files and over writes them all, 748 for simplicity, with the bdf file provided in the arguments. The dut is 749 rebooted for the bdf file to take effect 750 751 Args: 752 dut: dut to push bdf file to 753 config_file: path to bdf_file to push 754 """ 755 pass 756 757 758@detect_wifi_decorator 759def start_wifi_logging(dut): 760 """Function to start collecting wifi-related logs""" 761 pass 762 763 764@detect_wifi_decorator 765def stop_wifi_logging(dut): 766 """Function to start collecting wifi-related logs""" 767 pass 768 769 770@detect_wifi_decorator 771def push_firmware(dut, firmware_files): 772 """Function to push Wifi firmware files 773 774 Args: 775 dut: dut to push bdf file to 776 firmware_files: path to wlanmdsp.mbn file 777 datamsc_file: path to Data.msc file 778 """ 779 pass 780 781 782@detect_wifi_decorator 783def disable_beamforming(dut): 784 """Function to disable beamforming.""" 785 pass 786 787 788@detect_wifi_decorator 789def set_nss_capability(dut, nss): 790 """Function to set number of spatial streams supported.""" 791 pass 792 793 794@detect_wifi_decorator 795def set_chain_mask(dut, chain_mask): 796 """Function to set DUT chain mask. 797 798 Args: 799 dut: android device 800 chain_mask: desired chain mask in [0, 1, '2x2'] 801 """ 802 pass 803 804 805# Link layer stats utilities 806class LinkLayerStats(): 807 808 def __new__(self, dut, llstats_enabled=True): 809 if detect_wifi_platform(dut) == 'qcom': 810 return qcom_utils.LinkLayerStats(dut, llstats_enabled) 811 else: 812 return brcm_utils.LinkLayerStats(dut, llstats_enabled) 813