1#!/usr/bin/env python3.4 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import importlib 19import ipaddress 20import logging 21import numpy 22import re 23import time 24from acts import asserts 25from acts import utils 26from acts.controllers.android_device import AndroidDevice 27from acts.controllers.utils_lib import ssh 28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils 30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils 31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils 32 33from concurrent.futures import ThreadPoolExecutor 34 35SHORT_SLEEP = 1 36MED_SLEEP = 6 37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)] 38BAND_TO_CHANNEL_MAP = { 39 '2.4GHz': list(range(1, 14)), 40 'UNII-1': [36, 40, 44, 48], 41 'UNII-2': 42 [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140], 43 'UNII-3': [149, 153, 157, 161, 165], 44 '6GHz': CHANNELS_6GHz 45} 46CHANNEL_TO_BAND_MAP = { 47 channel: band 48 for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels 49} 50 51 52# Decorators 53def nonblocking(f): 54 """Creates a decorator transforming function calls to non-blocking""" 55 56 def wrap(*args, **kwargs): 57 executor = ThreadPoolExecutor(max_workers=1) 58 thread_future = executor.submit(f, *args, **kwargs) 59 # Ensure resources are freed up when executor ruturns or raises 60 executor.shutdown(wait=False) 61 return thread_future 62 63 return wrap 64 65 66def detect_wifi_platform(dut): 67 if hasattr(dut, 'wifi_platform'): 68 return dut.wifi_platform 69 qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/')) 70 if qcom_check: 71 dut.wifi_platform = 'qcom' 72 else: 73 dut.wifi_platform = 'brcm' 74 return dut.wifi_platform 75 76 77def detect_wifi_decorator(f): 78 79 def wrap(*args, **kwargs): 80 if 'dut' in kwargs: 81 dut = kwargs['dut'] 82 else: 83 dut = next(arg for arg in args if type(arg) == AndroidDevice) 84 dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format( 85 detect_wifi_platform(dut)) 86 dut_package = importlib.import_module(dut_package) 87 f_decorated = getattr(dut_package, f.__name__, lambda: None) 88 return (f_decorated(*args, **kwargs)) 89 90 return wrap 91 92 93# JSON serializer 94def serialize_dict(input_dict): 95 """Function to serialize dicts to enable JSON output""" 96 output_dict = collections.OrderedDict() 97 for key, value in input_dict.items(): 98 output_dict[_serialize_value(key)] = _serialize_value(value) 99 return output_dict 100 101 102def _serialize_value(value): 103 """Function to recursively serialize dict entries to enable JSON output""" 104 if isinstance(value, tuple): 105 return str(value) 106 if isinstance(value, numpy.int64): 107 return int(value) 108 if isinstance(value, numpy.float64): 109 return float(value) 110 if isinstance(value, list): 111 return [_serialize_value(x) for x in value] 112 if isinstance(value, numpy.ndarray): 113 return [_serialize_value(x) for x in value] 114 elif isinstance(value, dict): 115 return serialize_dict(value) 116 elif type(value) in (float, int, bool, str): 117 return value 118 else: 119 return "Non-serializable object" 120 121 122def extract_sub_dict(full_dict, fields): 123 sub_dict = collections.OrderedDict( 124 (field, full_dict[field]) for field in fields) 125 return sub_dict 126 127 128# Miscellaneous Wifi Utilities 129def check_skip_conditions(testcase_params, dut, access_point, 130 ota_chamber=None): 131 """Checks if test should be skipped.""" 132 # Check battery level before test 133 if not health_check(dut, 10): 134 asserts.skip('DUT battery level too low.') 135 if not access_point.band_lookup_by_channel(testcase_params['channel']): 136 asserts.skip('AP does not support requested channel.') 137 if ota_chamber and CHANNEL_TO_BAND_MAP[ 138 testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS: 139 asserts.skip('OTA chamber does not support requested channel.') 140 # Check if 6GHz is supported by checking capabilities in the US. 141 if not dut.droid.wifiCheckState(): 142 wutils.wifi_toggle_state(dut, True) 143 iw_list = dut.adb.shell('iw list') 144 supports_6ghz = '6135 MHz' in iw_list 145 supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list 146 if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz: 147 asserts.skip('DUT does not support 160 MHz networks.') 148 if testcase_params.get('channel', 149 6) in CHANNELS_6GHz and not supports_6ghz: 150 asserts.skip('DUT does not support 6 GHz band.') 151 152 153def validate_network(dut, ssid): 154 """Check that DUT has a valid internet connection through expected SSID 155 156 Args: 157 dut: android device of interest 158 ssid: expected ssid 159 """ 160 try: 161 connected = wutils.validate_connection(dut, wait_time=3) is not None 162 current_network = dut.droid.wifiGetConnectionInfo() 163 except: 164 connected = False 165 current_network = None 166 if connected and current_network['SSID'] == ssid: 167 return True 168 else: 169 return False 170 171 172def get_server_address(ssh_connection, dut_ip, subnet_mask): 173 """Get server address on a specific subnet, 174 175 This function retrieves the LAN or WAN IP of a remote machine used in 176 testing. If subnet_mask is set to 'public' it returns a machines global ip, 177 else it returns the ip belonging to the dut local network given the dut's 178 ip and subnet mask. 179 180 Args: 181 ssh_connection: object representing server for which we want an ip 182 dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx 183 subnet_mask: string representing subnet mask (public for global ip) 184 """ 185 ifconfig_out = ssh_connection.run('ifconfig').stdout 186 ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out) 187 ip_list = [ipaddress.ip_address(ip) for ip in ip_list] 188 189 if subnet_mask == 'public': 190 for ip in ip_list: 191 # is_global is not used to allow for CGNAT ips in 100.x.y.z range 192 if not ip.is_private: 193 return str(ip) 194 else: 195 dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask), 196 strict=False) 197 for ip in ip_list: 198 if ip in dut_network: 199 return str(ip) 200 logging.error('No IP address found in requested subnet') 201 202 203# Ping utilities 204def get_ping_stats(src_device, dest_address, ping_duration, ping_interval, 205 ping_size): 206 """Run ping to or from the DUT. 207 208 The function computes either pings the DUT or pings a remote ip from 209 DUT. 210 211 Args: 212 src_device: object representing device to ping from 213 dest_address: ip address to ping 214 ping_duration: timeout to set on the ping process (in seconds) 215 ping_interval: time between pings (in seconds) 216 ping_size: size of ping packet payload 217 Returns: 218 ping_result: dict containing ping results and other meta data 219 """ 220 ping_count = int(ping_duration / ping_interval) 221 ping_deadline = int(ping_count * ping_interval) + 1 222 ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format( 223 ping_count, 224 ping_deadline, 225 ping_interval, 226 ping_size, 227 ) 228 229 ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format( 230 ping_count, 231 ping_deadline, 232 ping_interval, 233 ping_size, 234 ) 235 236 if isinstance(src_device, AndroidDevice): 237 ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address) 238 ping_output = src_device.adb.shell(ping_cmd, 239 timeout=ping_deadline + SHORT_SLEEP, 240 ignore_status=True) 241 elif isinstance(src_device, ssh.connection.SshConnection): 242 platform = src_device.run('uname').stdout 243 if 'linux' in platform.lower(): 244 ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address) 245 elif 'darwin' in platform.lower(): 246 ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format( 247 ping_cmd_macos, dest_address) 248 ping_output = src_device.run(ping_cmd, 249 timeout=ping_deadline + SHORT_SLEEP, 250 ignore_status=True).stdout 251 else: 252 raise TypeError('Unable to ping using src_device of type %s.' % 253 type(src_device)) 254 return ping_utils.PingResult(ping_output.splitlines()) 255 256 257@nonblocking 258def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval, 259 ping_size): 260 return get_ping_stats(src_device, dest_address, ping_duration, 261 ping_interval, ping_size) 262 263 264# Iperf utilities 265@nonblocking 266def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag, 267 timeout): 268 return iperf_client.start(iperf_server_address, iperf_args, tag, timeout) 269 270 271def get_iperf_arg_string(duration, 272 reverse_direction, 273 interval=1, 274 traffic_type='TCP', 275 socket_size=None, 276 num_processes=1, 277 udp_throughput='1000M', 278 ipv6=False, 279 udp_length=1470): 280 """Function to format iperf client arguments. 281 282 This function takes in iperf client parameters and returns a properly 283 formatter iperf arg string to be used in throughput tests. 284 285 Args: 286 duration: iperf duration in seconds 287 reverse_direction: boolean controlling the -R flag for iperf clients 288 interval: iperf print interval 289 traffic_type: string specifying TCP or UDP traffic 290 socket_size: string specifying TCP window or socket buffer, e.g., 2M 291 num_processes: int specifying number of iperf processes 292 udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M 293 ipv6: boolean controlling the use of IP V6 294 Returns: 295 iperf_args: string of formatted iperf args 296 """ 297 iperf_args = '-i {} -t {} -J '.format(interval, duration) 298 if ipv6: 299 iperf_args = iperf_args + '-6 ' 300 if traffic_type.upper() == 'UDP': 301 iperf_args = iperf_args + '-u -b {} -l {} -P {} '.format( 302 udp_throughput, udp_length, num_processes) 303 elif traffic_type.upper() == 'TCP': 304 iperf_args = iperf_args + '-P {} '.format(num_processes) 305 if socket_size: 306 iperf_args = iperf_args + '-w {} '.format(socket_size) 307 if reverse_direction: 308 iperf_args = iperf_args + ' -R' 309 return iperf_args 310 311 312# Attenuator Utilities 313def atten_by_label(atten_list, path_label, atten_level): 314 """Attenuate signals according to their path label. 315 316 Args: 317 atten_list: list of attenuators to iterate over 318 path_label: path label on which to set desired attenuation 319 atten_level: attenuation desired on path 320 """ 321 for atten in atten_list: 322 if path_label in atten.path: 323 atten.set_atten(atten_level, retry=True) 324 325 326def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server): 327 """Function to estimate attenuation to hit a target RSSI. 328 329 This function estimates a constant attenuation setting on all atennuation 330 ports to hit a target RSSI. The estimate is not meant to be exact or 331 guaranteed. 332 333 Args: 334 target_rssi: rssi of interest 335 attenuators: list of attenuator ports 336 dut: android device object assumed connected to a wifi network. 337 ping_server: ssh connection object to ping server 338 Returns: 339 target_atten: attenuation setting to achieve target_rssi 340 """ 341 logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi)) 342 # Set attenuator to 0 dB 343 for atten in attenuators: 344 atten.set_atten(0, strict=False, retry=True) 345 # Start ping traffic 346 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 347 # Measure starting RSSI 348 ping_future = get_ping_stats_nb(src_device=ping_server, 349 dest_address=dut_ip, 350 ping_duration=1.5, 351 ping_interval=0.02, 352 ping_size=64) 353 current_rssi = get_connected_rssi(dut, 354 num_measurements=4, 355 polling_frequency=0.25, 356 first_measurement_delay=0.5, 357 disconnect_warning=1, 358 ignore_samples=1) 359 current_rssi = current_rssi['signal_poll_rssi']['mean'] 360 ping_future.result() 361 target_atten = 0 362 logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 363 target_atten, current_rssi)) 364 within_range = 0 365 for idx in range(20): 366 atten_delta = max(min(current_rssi - target_rssi, 20), -20) 367 target_atten = int((target_atten + atten_delta) * 4) / 4 368 if target_atten < 0: 369 return 0 370 if target_atten > attenuators[0].get_max_atten(): 371 return attenuators[0].get_max_atten() 372 for atten in attenuators: 373 atten.set_atten(target_atten, strict=False, retry=True) 374 ping_future = get_ping_stats_nb(src_device=ping_server, 375 dest_address=dut_ip, 376 ping_duration=1.5, 377 ping_interval=0.02, 378 ping_size=64) 379 current_rssi = get_connected_rssi(dut, 380 num_measurements=4, 381 polling_frequency=0.25, 382 first_measurement_delay=0.5, 383 disconnect_warning=1, 384 ignore_samples=1) 385 current_rssi = current_rssi['signal_poll_rssi']['mean'] 386 ping_future.result() 387 logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 388 target_atten, current_rssi)) 389 if abs(current_rssi - target_rssi) < 1: 390 if within_range: 391 logging.info( 392 'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.' 393 'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format( 394 current_rssi, target_rssi, target_atten, idx)) 395 return target_atten 396 else: 397 within_range = True 398 else: 399 within_range = False 400 return target_atten 401 402 403def get_current_atten_dut_chain_map(attenuators, 404 dut, 405 ping_server, 406 ping_from_dut=False): 407 """Function to detect mapping between attenuator ports and DUT chains. 408 409 This function detects the mapping between attenuator ports and DUT chains 410 in cases where DUT chains are connected to only one attenuator port. The 411 function assumes the DUT is already connected to a wifi network. The 412 function starts by measuring per chain RSSI at 0 attenuation, then 413 attenuates one port at a time looking for the chain that reports a lower 414 RSSI. 415 416 Args: 417 attenuators: list of attenuator ports 418 dut: android device object assumed connected to a wifi network. 419 ping_server: ssh connection object to ping server 420 ping_from_dut: boolean controlling whether to ping from or to dut 421 Returns: 422 chain_map: list of dut chains, one entry per attenuator port 423 """ 424 # Set attenuator to 0 dB 425 for atten in attenuators: 426 atten.set_atten(0, strict=False, retry=True) 427 # Start ping traffic 428 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 429 if ping_from_dut: 430 ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname, 431 11, 0.02, 64) 432 else: 433 ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64) 434 # Measure starting RSSI 435 base_rssi = get_connected_rssi(dut, 4, 0.25, 1) 436 chain0_base_rssi = base_rssi['chain_0_rssi']['mean'] 437 chain1_base_rssi = base_rssi['chain_1_rssi']['mean'] 438 if chain0_base_rssi < -70 or chain1_base_rssi < -70: 439 logging.warning('RSSI might be too low to get reliable chain map.') 440 # Compile chain map by attenuating one path at a time and seeing which 441 # chain's RSSI degrades 442 chain_map = [] 443 for test_atten in attenuators: 444 # Set one attenuator to 30 dB down 445 test_atten.set_atten(30, strict=False, retry=True) 446 # Get new RSSI 447 test_rssi = get_connected_rssi(dut, 4, 0.25, 1) 448 # Assign attenuator to path that has lower RSSI 449 if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[ 450 'chain_0_rssi']['mean'] > 10: 451 chain_map.append('DUT-Chain-0') 452 elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[ 453 'chain_1_rssi']['mean'] > 10: 454 chain_map.append('DUT-Chain-1') 455 else: 456 chain_map.append(None) 457 # Reset attenuator to 0 458 test_atten.set_atten(0, strict=False, retry=True) 459 ping_future.result() 460 logging.debug('Chain Map: {}'.format(chain_map)) 461 return chain_map 462 463 464def get_full_rf_connection_map(attenuators, 465 dut, 466 ping_server, 467 networks, 468 ping_from_dut=False): 469 """Function to detect per-network connections between attenuator and DUT. 470 471 This function detects the mapping between attenuator ports and DUT chains 472 on all networks in its arguments. The function connects the DUT to each 473 network then calls get_current_atten_dut_chain_map to get the connection 474 map on the current network. The function outputs the results in two formats 475 to enable easy access when users are interested in indexing by network or 476 attenuator port. 477 478 Args: 479 attenuators: list of attenuator ports 480 dut: android device object assumed connected to a wifi network. 481 ping_server: ssh connection object to ping server 482 networks: dict of network IDs and configs 483 Returns: 484 rf_map_by_network: dict of RF connections indexed by network. 485 rf_map_by_atten: list of RF connections indexed by attenuator 486 """ 487 for atten in attenuators: 488 atten.set_atten(0, strict=False, retry=True) 489 490 rf_map_by_network = collections.OrderedDict() 491 rf_map_by_atten = [[] for atten in attenuators] 492 for net_id, net_config in networks.items(): 493 wutils.reset_wifi(dut) 494 wutils.wifi_connect(dut, 495 net_config, 496 num_of_tries=1, 497 assert_on_fail=False, 498 check_connectivity=False) 499 rf_map_by_network[net_id] = get_current_atten_dut_chain_map( 500 attenuators, dut, ping_server, ping_from_dut) 501 for idx, chain in enumerate(rf_map_by_network[net_id]): 502 if chain: 503 rf_map_by_atten[idx].append({ 504 'network': net_id, 505 'dut_chain': chain 506 }) 507 logging.debug('RF Map (by Network): {}'.format(rf_map_by_network)) 508 logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten)) 509 510 return rf_map_by_network, rf_map_by_atten 511 512 513# Generic device utils 514def get_dut_temperature(dut): 515 """Function to get dut temperature. 516 517 The function fetches and returns the reading from the temperature sensor 518 used for skin temperature and thermal throttling. 519 520 Args: 521 dut: AndroidDevice of interest 522 Returns: 523 temperature: device temperature. 0 if temperature could not be read 524 """ 525 candidate_zones = [ 526 '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp', 527 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp', 528 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp', 529 '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp', 530 '/dev/thermal/tz-by-name/quiet_therm/temp' 531 ] 532 for zone in candidate_zones: 533 try: 534 temperature = int(dut.adb.shell('cat {}'.format(zone))) 535 break 536 except: 537 temperature = 0 538 if temperature == 0: 539 logging.debug('Could not check DUT temperature.') 540 elif temperature > 100: 541 temperature = temperature / 1000 542 return temperature 543 544 545def wait_for_dut_cooldown(dut, target_temp=50, timeout=300): 546 """Function to wait for a DUT to cool down. 547 548 Args: 549 dut: AndroidDevice of interest 550 target_temp: target cooldown temperature 551 timeout: maxt time to wait for cooldown 552 """ 553 start_time = time.time() 554 while time.time() - start_time < timeout: 555 temperature = get_dut_temperature(dut) 556 if temperature < target_temp: 557 break 558 time.sleep(SHORT_SLEEP) 559 elapsed_time = time.time() - start_time 560 logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format( 561 temperature, elapsed_time)) 562 563 564def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1): 565 """Function to check health status of a DUT. 566 567 The function checks both battery levels and temperature to avoid DUT 568 powering off during the test. 569 570 Args: 571 dut: AndroidDevice of interest 572 batt_thresh: battery level threshold 573 temp_threshold: temperature threshold 574 cooldown: flag to wait for DUT to cool down when overheating 575 Returns: 576 health_check: boolean confirming device is healthy 577 """ 578 health_check = True 579 battery_level = utils.get_battery_level(dut) 580 if battery_level < batt_thresh: 581 logging.warning('Battery level low ({}%)'.format(battery_level)) 582 health_check = False 583 else: 584 logging.debug('Battery level = {}%'.format(battery_level)) 585 586 temperature = get_dut_temperature(dut) 587 if temperature > temp_threshold: 588 if cooldown: 589 logging.warning( 590 'Waiting for DUT to cooldown. ({} C)'.format(temperature)) 591 wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5) 592 else: 593 logging.warning('DUT Overheating ({} C)'.format(temperature)) 594 health_check = False 595 else: 596 logging.debug('DUT Temperature = {} C'.format(temperature)) 597 return health_check 598 599 600# Wifi Device Utils 601def empty_rssi_result(): 602 return collections.OrderedDict([('data', []), ('mean', float('nan')), 603 ('stdev', float('nan'))]) 604 605 606@nonblocking 607def get_connected_rssi_nb(dut, 608 num_measurements=1, 609 polling_frequency=SHORT_SLEEP, 610 first_measurement_delay=0, 611 disconnect_warning=True, 612 ignore_samples=0, 613 interface='wlan0'): 614 return get_connected_rssi(dut, num_measurements, polling_frequency, 615 first_measurement_delay, disconnect_warning, 616 ignore_samples, interface) 617 618 619@detect_wifi_decorator 620def get_connected_rssi(dut, 621 num_measurements=1, 622 polling_frequency=SHORT_SLEEP, 623 first_measurement_delay=0, 624 disconnect_warning=True, 625 ignore_samples=0, 626 interface='wlan0'): 627 """Gets all RSSI values reported for the connected access point/BSSID. 628 629 Args: 630 dut: android device object from which to get RSSI 631 num_measurements: number of scans done, and RSSIs collected 632 polling_frequency: time to wait between RSSI measurements 633 disconnect_warning: boolean controlling disconnection logging messages 634 ignore_samples: number of leading samples to ignore 635 Returns: 636 connected_rssi: dict containing the measurements results for 637 all reported RSSI values (signal_poll, per chain, etc.) and their 638 statistics 639 """ 640 641 642@nonblocking 643def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1): 644 return get_scan_rssi(dut, tracked_bssids, num_measurements) 645 646 647@detect_wifi_decorator 648def get_scan_rssi(dut, tracked_bssids, num_measurements=1): 649 """Gets scan RSSI for specified BSSIDs. 650 651 Args: 652 dut: android device object from which to get RSSI 653 tracked_bssids: array of BSSIDs to gather RSSI data for 654 num_measurements: number of scans done, and RSSIs collected 655 Returns: 656 scan_rssi: dict containing the measurement results as well as the 657 statistics of the scan RSSI for all BSSIDs in tracked_bssids 658 """ 659 660 661@detect_wifi_decorator 662def get_sw_signature(dut): 663 """Function that checks the signature for wifi firmware and config files. 664 665 Returns: 666 bdf_signature: signature consisting of last three digits of bdf cksums 667 fw_signature: floating point firmware version, i.e., major.minor 668 """ 669 670 671@detect_wifi_decorator 672def get_country_code(dut): 673 """Function that returns the current wifi country code.""" 674 675 676@detect_wifi_decorator 677def push_config(dut, config_file): 678 """Function to push Wifi BDF files 679 680 This function checks for existing wifi bdf files and over writes them all, 681 for simplicity, with the bdf file provided in the arguments. The dut is 682 rebooted for the bdf file to take effect 683 684 Args: 685 dut: dut to push bdf file to 686 config_file: path to bdf_file to push 687 """ 688 689 690@detect_wifi_decorator 691def start_wifi_logging(dut): 692 """Function to start collecting wifi-related logs""" 693 694 695@detect_wifi_decorator 696def stop_wifi_logging(dut): 697 """Function to start collecting wifi-related logs""" 698 699 700@detect_wifi_decorator 701def push_firmware(dut, firmware_files): 702 """Function to push Wifi firmware files 703 704 Args: 705 dut: dut to push bdf file to 706 firmware_files: path to wlanmdsp.mbn file 707 datamsc_file: path to Data.msc file 708 """ 709 710 711@detect_wifi_decorator 712def disable_beamforming(dut): 713 """Function to disable beamforming.""" 714 715 716@detect_wifi_decorator 717def set_nss_capability(dut, nss): 718 """Function to set number of spatial streams supported.""" 719 720 721@detect_wifi_decorator 722def set_chain_mask(dut, chain_mask): 723 """Function to set DUT chain mask. 724 725 Args: 726 dut: android device 727 chain_mask: desired chain mask in [0, 1, '2x2'] 728 """ 729 730 731# Link layer stats utilities 732class LinkLayerStats(): 733 734 def __new__(self, dut, llstats_enabled=True): 735 if detect_wifi_platform(dut) == 'qcom': 736 return qcom_utils.LinkLayerStats(dut, llstats_enabled) 737 else: 738 return brcm_utils.LinkLayerStats(dut, llstats_enabled) 739