1#!/usr/bin/env python3.4 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import importlib 19import ipaddress 20import logging 21import numpy 22import re 23import time 24from acts import asserts 25from acts import utils 26from acts.controllers.android_device import AndroidDevice 27from acts.controllers.utils_lib import ssh 28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils 30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils 31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils 32 33from concurrent.futures import ThreadPoolExecutor 34 35SHORT_SLEEP = 1 36MED_SLEEP = 6 37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)] 38BAND_TO_CHANNEL_MAP = { 39 '2.4GHz': list(range(1, 14)), 40 'UNII-1': [36, 40, 44, 48], 41 'UNII-2': 42 [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140], 43 'UNII-3': [149, 153, 157, 161, 165], 44 '6GHz': CHANNELS_6GHz 45} 46CHANNEL_TO_BAND_MAP = { 47 channel: band 48 for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels 49} 50 51 52# Decorators 53def nonblocking(f): 54 """Creates a decorator transforming function calls to non-blocking""" 55 def wrap(*args, **kwargs): 56 executor = ThreadPoolExecutor(max_workers=1) 57 thread_future = executor.submit(f, *args, **kwargs) 58 # Ensure resources are freed up when executor ruturns or raises 59 executor.shutdown(wait=False) 60 return thread_future 61 62 return wrap 63 64 65def detect_wifi_platform(dut): 66 if hasattr(dut, 'wifi_platform'): 67 return dut.wifi_platform 68 qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/qca_cld/')) 69 if qcom_check: 70 dut.wifi_platform = 'qcom' 71 else: 72 dut.wifi_platform = 'brcm' 73 return dut.wifi_platform 74 75 76def detect_wifi_decorator(f): 77 def wrap(*args, **kwargs): 78 if 'dut' in kwargs: 79 dut = kwargs['dut'] 80 else: 81 dut = next(arg for arg in args if type(arg) == AndroidDevice) 82 dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format( 83 detect_wifi_platform(dut)) 84 dut_package = importlib.import_module(dut_package) 85 f_decorated = getattr(dut_package, f.__name__, lambda: None) 86 return (f_decorated(*args, **kwargs)) 87 88 return wrap 89 90 91# JSON serializer 92def serialize_dict(input_dict): 93 """Function to serialize dicts to enable JSON output""" 94 output_dict = collections.OrderedDict() 95 for key, value in input_dict.items(): 96 output_dict[_serialize_value(key)] = _serialize_value(value) 97 return output_dict 98 99 100def _serialize_value(value): 101 """Function to recursively serialize dict entries to enable JSON output""" 102 if isinstance(value, tuple): 103 return str(value) 104 if isinstance(value, numpy.int64): 105 return int(value) 106 if isinstance(value, numpy.float64): 107 return float(value) 108 if isinstance(value, list): 109 return [_serialize_value(x) for x in value] 110 if isinstance(value, numpy.ndarray): 111 return [_serialize_value(x) for x in value] 112 elif isinstance(value, dict): 113 return serialize_dict(value) 114 elif type(value) in (float, int, bool, str): 115 return value 116 else: 117 return "Non-serializable object" 118 119 120def extract_sub_dict(full_dict, fields): 121 sub_dict = collections.OrderedDict( 122 (field, full_dict[field]) for field in fields) 123 return sub_dict 124 125 126# Miscellaneous Wifi Utilities 127def check_skip_conditions(testcase_params, 128 dut, 129 access_point, 130 ota_chamber=None): 131 """Checks if test should be skipped.""" 132 # Check battery level before test 133 if not health_check(dut, 10): 134 asserts.skip('DUT battery level too low.') 135 if not access_point.band_lookup_by_channel(testcase_params['channel']): 136 asserts.skip('AP does not support requested channel.') 137 if ota_chamber and CHANNEL_TO_BAND_MAP[ 138 testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS: 139 asserts.skip('OTA chamber does not support requested channel.') 140 # Check if 6GHz is supported by checking capabilities in the US. 141 if not dut.droid.wifiCheckState(): 142 wutils.wifi_toggle_state(dut, True) 143 iw_list = dut.adb.shell('iw list') 144 supports_6ghz = '6135 MHz' in iw_list 145 supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list 146 if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz: 147 asserts.skip('DUT does not support 160 MHz networks.') 148 if testcase_params.get('channel', 149 6) in CHANNELS_6GHz and not supports_6ghz: 150 asserts.skip('DUT does not support 6 GHz band.') 151 152 153def validate_network(dut, ssid): 154 """Check that DUT has a valid internet connection through expected SSID 155 156 Args: 157 dut: android device of interest 158 ssid: expected ssid 159 """ 160 try: 161 connected = wutils.validate_connection(dut, wait_time=3) is not None 162 current_network = dut.droid.wifiGetConnectionInfo() 163 except: 164 connected = False 165 current_network = None 166 if connected and current_network['SSID'] == ssid: 167 return True 168 else: 169 return False 170 171 172def get_server_address(ssh_connection, dut_ip, subnet_mask): 173 """Get server address on a specific subnet, 174 175 This function retrieves the LAN or WAN IP of a remote machine used in 176 testing. If subnet_mask is set to 'public' it returns a machines global ip, 177 else it returns the ip belonging to the dut local network given the dut's 178 ip and subnet mask. 179 180 Args: 181 ssh_connection: object representing server for which we want an ip 182 dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx 183 subnet_mask: string representing subnet mask (public for global ip) 184 """ 185 ifconfig_out = ssh_connection.run('ifconfig').stdout 186 ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out) 187 ip_list = [ipaddress.ip_address(ip) for ip in ip_list] 188 189 if subnet_mask == 'public': 190 for ip in ip_list: 191 # is_global is not used to allow for CGNAT ips in 100.x.y.z range 192 if not ip.is_private: 193 return str(ip) 194 else: 195 dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask), 196 strict=False) 197 for ip in ip_list: 198 if ip in dut_network: 199 return str(ip) 200 logging.error('No IP address found in requested subnet') 201 202 203# Ping utilities 204def get_ping_stats(src_device, dest_address, ping_duration, ping_interval, 205 ping_size): 206 """Run ping to or from the DUT. 207 208 The function computes either pings the DUT or pings a remote ip from 209 DUT. 210 211 Args: 212 src_device: object representing device to ping from 213 dest_address: ip address to ping 214 ping_duration: timeout to set on the ping process (in seconds) 215 ping_interval: time between pings (in seconds) 216 ping_size: size of ping packet payload 217 Returns: 218 ping_result: dict containing ping results and other meta data 219 """ 220 ping_count = int(ping_duration / ping_interval) 221 ping_deadline = int(ping_count * ping_interval) + 1 222 ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format( 223 ping_count, 224 ping_deadline, 225 ping_interval, 226 ping_size, 227 ) 228 229 ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format( 230 ping_count, 231 ping_deadline, 232 ping_interval, 233 ping_size, 234 ) 235 236 if isinstance(src_device, AndroidDevice): 237 ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address) 238 ping_output = src_device.adb.shell(ping_cmd, 239 timeout=ping_deadline + SHORT_SLEEP, 240 ignore_status=True) 241 elif isinstance(src_device, ssh.connection.SshConnection): 242 platform = src_device.run('uname').stdout 243 if 'linux' in platform.lower(): 244 ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address) 245 elif 'darwin' in platform.lower(): 246 ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format( 247 ping_cmd_macos, dest_address) 248 ping_output = src_device.run(ping_cmd, 249 timeout=ping_deadline + SHORT_SLEEP, 250 ignore_status=True).stdout 251 else: 252 raise TypeError('Unable to ping using src_device of type %s.' % 253 type(src_device)) 254 return ping_utils.PingResult(ping_output.splitlines()) 255 256 257@nonblocking 258def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval, 259 ping_size): 260 return get_ping_stats(src_device, dest_address, ping_duration, 261 ping_interval, ping_size) 262 263 264# Iperf utilities 265@nonblocking 266def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag, 267 timeout): 268 return iperf_client.start(iperf_server_address, iperf_args, tag, timeout) 269 270 271def get_iperf_arg_string(duration, 272 reverse_direction, 273 interval=1, 274 traffic_type='TCP', 275 socket_size=None, 276 num_processes=1, 277 udp_throughput='1000M', 278 ipv6=False): 279 """Function to format iperf client arguments. 280 281 This function takes in iperf client parameters and returns a properly 282 formatter iperf arg string to be used in throughput tests. 283 284 Args: 285 duration: iperf duration in seconds 286 reverse_direction: boolean controlling the -R flag for iperf clients 287 interval: iperf print interval 288 traffic_type: string specifying TCP or UDP traffic 289 socket_size: string specifying TCP window or socket buffer, e.g., 2M 290 num_processes: int specifying number of iperf processes 291 udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M 292 ipv6: boolean controlling the use of IP V6 293 Returns: 294 iperf_args: string of formatted iperf args 295 """ 296 iperf_args = '-i {} -t {} -J '.format(interval, duration) 297 if ipv6: 298 iperf_args = iperf_args + '-6 ' 299 if traffic_type.upper() == 'UDP': 300 iperf_args = iperf_args + '-u -b {} -l 1470 -P {} '.format( 301 udp_throughput, num_processes) 302 elif traffic_type.upper() == 'TCP': 303 iperf_args = iperf_args + '-P {} '.format(num_processes) 304 if socket_size: 305 iperf_args = iperf_args + '-w {} '.format(socket_size) 306 if reverse_direction: 307 iperf_args = iperf_args + ' -R' 308 return iperf_args 309 310 311# Attenuator Utilities 312def atten_by_label(atten_list, path_label, atten_level): 313 """Attenuate signals according to their path label. 314 315 Args: 316 atten_list: list of attenuators to iterate over 317 path_label: path label on which to set desired attenuation 318 atten_level: attenuation desired on path 319 """ 320 for atten in atten_list: 321 if path_label in atten.path: 322 atten.set_atten(atten_level, retry=True) 323 324 325def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server): 326 """Function to estimate attenuation to hit a target RSSI. 327 328 This function estimates a constant attenuation setting on all atennuation 329 ports to hit a target RSSI. The estimate is not meant to be exact or 330 guaranteed. 331 332 Args: 333 target_rssi: rssi of interest 334 attenuators: list of attenuator ports 335 dut: android device object assumed connected to a wifi network. 336 ping_server: ssh connection object to ping server 337 Returns: 338 target_atten: attenuation setting to achieve target_rssi 339 """ 340 logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi)) 341 # Set attenuator to 0 dB 342 for atten in attenuators: 343 atten.set_atten(0, strict=False, retry=True) 344 # Start ping traffic 345 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 346 # Measure starting RSSI 347 ping_future = get_ping_stats_nb(src_device=ping_server, 348 dest_address=dut_ip, 349 ping_duration=1.5, 350 ping_interval=0.02, 351 ping_size=64) 352 current_rssi = get_connected_rssi(dut, 353 num_measurements=4, 354 polling_frequency=0.25, 355 first_measurement_delay=0.5, 356 disconnect_warning=1, 357 ignore_samples=1) 358 current_rssi = current_rssi['signal_poll_rssi']['mean'] 359 ping_future.result() 360 target_atten = 0 361 logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 362 target_atten, current_rssi)) 363 within_range = 0 364 for idx in range(20): 365 atten_delta = max(min(current_rssi - target_rssi, 20), -20) 366 target_atten = int((target_atten + atten_delta) * 4) / 4 367 if target_atten < 0: 368 return 0 369 if target_atten > attenuators[0].get_max_atten(): 370 return attenuators[0].get_max_atten() 371 for atten in attenuators: 372 atten.set_atten(target_atten, strict=False, retry=True) 373 ping_future = get_ping_stats_nb(src_device=ping_server, 374 dest_address=dut_ip, 375 ping_duration=1.5, 376 ping_interval=0.02, 377 ping_size=64) 378 current_rssi = get_connected_rssi(dut, 379 num_measurements=4, 380 polling_frequency=0.25, 381 first_measurement_delay=0.5, 382 disconnect_warning=1, 383 ignore_samples=1) 384 current_rssi = current_rssi['signal_poll_rssi']['mean'] 385 ping_future.result() 386 logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format( 387 target_atten, current_rssi)) 388 if abs(current_rssi - target_rssi) < 1: 389 if within_range: 390 logging.info( 391 'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.' 392 'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format( 393 current_rssi, target_rssi, target_atten, idx)) 394 return target_atten 395 else: 396 within_range = True 397 else: 398 within_range = False 399 return target_atten 400 401 402def get_current_atten_dut_chain_map(attenuators, 403 dut, 404 ping_server, 405 ping_from_dut=False): 406 """Function to detect mapping between attenuator ports and DUT chains. 407 408 This function detects the mapping between attenuator ports and DUT chains 409 in cases where DUT chains are connected to only one attenuator port. The 410 function assumes the DUT is already connected to a wifi network. The 411 function starts by measuring per chain RSSI at 0 attenuation, then 412 attenuates one port at a time looking for the chain that reports a lower 413 RSSI. 414 415 Args: 416 attenuators: list of attenuator ports 417 dut: android device object assumed connected to a wifi network. 418 ping_server: ssh connection object to ping server 419 ping_from_dut: boolean controlling whether to ping from or to dut 420 Returns: 421 chain_map: list of dut chains, one entry per attenuator port 422 """ 423 # Set attenuator to 0 dB 424 for atten in attenuators: 425 atten.set_atten(0, strict=False, retry=True) 426 # Start ping traffic 427 dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0] 428 if ping_from_dut: 429 ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname, 430 11, 0.02, 64) 431 else: 432 ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64) 433 # Measure starting RSSI 434 base_rssi = get_connected_rssi(dut, 4, 0.25, 1) 435 chain0_base_rssi = base_rssi['chain_0_rssi']['mean'] 436 chain1_base_rssi = base_rssi['chain_1_rssi']['mean'] 437 if chain0_base_rssi < -70 or chain1_base_rssi < -70: 438 logging.warning('RSSI might be too low to get reliable chain map.') 439 # Compile chain map by attenuating one path at a time and seeing which 440 # chain's RSSI degrades 441 chain_map = [] 442 for test_atten in attenuators: 443 # Set one attenuator to 30 dB down 444 test_atten.set_atten(30, strict=False, retry=True) 445 # Get new RSSI 446 test_rssi = get_connected_rssi(dut, 4, 0.25, 1) 447 # Assign attenuator to path that has lower RSSI 448 if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[ 449 'chain_0_rssi']['mean'] > 10: 450 chain_map.append('DUT-Chain-0') 451 elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[ 452 'chain_1_rssi']['mean'] > 10: 453 chain_map.append('DUT-Chain-1') 454 else: 455 chain_map.append(None) 456 # Reset attenuator to 0 457 test_atten.set_atten(0, strict=False, retry=True) 458 ping_future.result() 459 logging.debug('Chain Map: {}'.format(chain_map)) 460 return chain_map 461 462 463def get_full_rf_connection_map(attenuators, 464 dut, 465 ping_server, 466 networks, 467 ping_from_dut=False): 468 """Function to detect per-network connections between attenuator and DUT. 469 470 This function detects the mapping between attenuator ports and DUT chains 471 on all networks in its arguments. The function connects the DUT to each 472 network then calls get_current_atten_dut_chain_map to get the connection 473 map on the current network. The function outputs the results in two formats 474 to enable easy access when users are interested in indexing by network or 475 attenuator port. 476 477 Args: 478 attenuators: list of attenuator ports 479 dut: android device object assumed connected to a wifi network. 480 ping_server: ssh connection object to ping server 481 networks: dict of network IDs and configs 482 Returns: 483 rf_map_by_network: dict of RF connections indexed by network. 484 rf_map_by_atten: list of RF connections indexed by attenuator 485 """ 486 for atten in attenuators: 487 atten.set_atten(0, strict=False, retry=True) 488 489 rf_map_by_network = collections.OrderedDict() 490 rf_map_by_atten = [[] for atten in attenuators] 491 for net_id, net_config in networks.items(): 492 wutils.reset_wifi(dut) 493 wutils.wifi_connect(dut, 494 net_config, 495 num_of_tries=1, 496 assert_on_fail=False, 497 check_connectivity=False) 498 rf_map_by_network[net_id] = get_current_atten_dut_chain_map( 499 attenuators, dut, ping_server, ping_from_dut) 500 for idx, chain in enumerate(rf_map_by_network[net_id]): 501 if chain: 502 rf_map_by_atten[idx].append({ 503 'network': net_id, 504 'dut_chain': chain 505 }) 506 logging.debug('RF Map (by Network): {}'.format(rf_map_by_network)) 507 logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten)) 508 509 return rf_map_by_network, rf_map_by_atten 510 511 512# Generic device utils 513def get_dut_temperature(dut): 514 """Function to get dut temperature. 515 516 The function fetches and returns the reading from the temperature sensor 517 used for skin temperature and thermal throttling. 518 519 Args: 520 dut: AndroidDevice of interest 521 Returns: 522 temperature: device temperature. 0 if temperature could not be read 523 """ 524 candidate_zones = [ 525 '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp', 526 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp', 527 '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp', 528 '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp', 529 '/dev/thermal/tz-by-name/quiet_therm/temp' 530 ] 531 for zone in candidate_zones: 532 try: 533 temperature = int(dut.adb.shell('cat {}'.format(zone))) 534 break 535 except: 536 temperature = 0 537 if temperature == 0: 538 logging.debug('Could not check DUT temperature.') 539 elif temperature > 100: 540 temperature = temperature / 1000 541 return temperature 542 543 544def wait_for_dut_cooldown(dut, target_temp=50, timeout=300): 545 """Function to wait for a DUT to cool down. 546 547 Args: 548 dut: AndroidDevice of interest 549 target_temp: target cooldown temperature 550 timeout: maxt time to wait for cooldown 551 """ 552 start_time = time.time() 553 while time.time() - start_time < timeout: 554 temperature = get_dut_temperature(dut) 555 if temperature < target_temp: 556 break 557 time.sleep(SHORT_SLEEP) 558 elapsed_time = time.time() - start_time 559 logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format( 560 temperature, elapsed_time)) 561 562 563def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1): 564 """Function to check health status of a DUT. 565 566 The function checks both battery levels and temperature to avoid DUT 567 powering off during the test. 568 569 Args: 570 dut: AndroidDevice of interest 571 batt_thresh: battery level threshold 572 temp_threshold: temperature threshold 573 cooldown: flag to wait for DUT to cool down when overheating 574 Returns: 575 health_check: boolean confirming device is healthy 576 """ 577 health_check = True 578 battery_level = utils.get_battery_level(dut) 579 if battery_level < batt_thresh: 580 logging.warning('Battery level low ({}%)'.format(battery_level)) 581 health_check = False 582 else: 583 logging.debug('Battery level = {}%'.format(battery_level)) 584 585 temperature = get_dut_temperature(dut) 586 if temperature > temp_threshold: 587 if cooldown: 588 logging.warning( 589 'Waiting for DUT to cooldown. ({} C)'.format(temperature)) 590 wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5) 591 else: 592 logging.warning('DUT Overheating ({} C)'.format(temperature)) 593 health_check = False 594 else: 595 logging.debug('DUT Temperature = {} C'.format(temperature)) 596 return health_check 597 598 599# Wifi Device Utils 600def empty_rssi_result(): 601 return collections.OrderedDict([('data', []), ('mean', float('nan')), 602 ('stdev', float('nan'))]) 603 604 605@nonblocking 606def get_connected_rssi_nb(dut, 607 num_measurements=1, 608 polling_frequency=SHORT_SLEEP, 609 first_measurement_delay=0, 610 disconnect_warning=True, 611 ignore_samples=0, 612 interface='wlan0'): 613 return get_connected_rssi(dut, num_measurements, polling_frequency, 614 first_measurement_delay, disconnect_warning, 615 ignore_samples, interface) 616 617 618@detect_wifi_decorator 619def get_connected_rssi(dut, 620 num_measurements=1, 621 polling_frequency=SHORT_SLEEP, 622 first_measurement_delay=0, 623 disconnect_warning=True, 624 ignore_samples=0, 625 interface='wlan0'): 626 """Gets all RSSI values reported for the connected access point/BSSID. 627 628 Args: 629 dut: android device object from which to get RSSI 630 num_measurements: number of scans done, and RSSIs collected 631 polling_frequency: time to wait between RSSI measurements 632 disconnect_warning: boolean controlling disconnection logging messages 633 ignore_samples: number of leading samples to ignore 634 Returns: 635 connected_rssi: dict containing the measurements results for 636 all reported RSSI values (signal_poll, per chain, etc.) and their 637 statistics 638 """ 639 pass 640 641 642@nonblocking 643def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1): 644 return get_scan_rssi(dut, tracked_bssids, num_measurements) 645 646 647@detect_wifi_decorator 648def get_scan_rssi(dut, tracked_bssids, num_measurements=1): 649 """Gets scan RSSI for specified BSSIDs. 650 651 Args: 652 dut: android device object from which to get RSSI 653 tracked_bssids: array of BSSIDs to gather RSSI data for 654 num_measurements: number of scans done, and RSSIs collected 655 Returns: 656 scan_rssi: dict containing the measurement results as well as the 657 statistics of the scan RSSI for all BSSIDs in tracked_bssids 658 """ 659 pass 660 661 662@detect_wifi_decorator 663def get_sw_signature(dut): 664 """Function that checks the signature for wifi firmware and config files. 665 666 Returns: 667 bdf_signature: signature consisting of last three digits of bdf cksums 668 fw_signature: floating point firmware version, i.e., major.minor 669 """ 670 pass 671 672 673@detect_wifi_decorator 674def get_country_code(dut): 675 """Function that returns the current wifi country code.""" 676 pass 677 678 679@detect_wifi_decorator 680def push_config(dut, config_file): 681 """Function to push Wifi BDF files 682 683 This function checks for existing wifi bdf files and over writes them all, 684 for simplicity, with the bdf file provided in the arguments. The dut is 685 rebooted for the bdf file to take effect 686 687 Args: 688 dut: dut to push bdf file to 689 config_file: path to bdf_file to push 690 """ 691 pass 692 693 694@detect_wifi_decorator 695def start_wifi_logging(dut): 696 """Function to start collecting wifi-related logs""" 697 pass 698 699 700@detect_wifi_decorator 701def stop_wifi_logging(dut): 702 """Function to start collecting wifi-related logs""" 703 pass 704 705 706@detect_wifi_decorator 707def push_firmware(dut, firmware_files): 708 """Function to push Wifi firmware files 709 710 Args: 711 dut: dut to push bdf file to 712 firmware_files: path to wlanmdsp.mbn file 713 datamsc_file: path to Data.msc file 714 """ 715 pass 716 717 718@detect_wifi_decorator 719def disable_beamforming(dut): 720 """Function to disable beamforming.""" 721 pass 722 723 724@detect_wifi_decorator 725def set_nss_capability(dut, nss): 726 """Function to set number of spatial streams supported.""" 727 pass 728 729 730@detect_wifi_decorator 731def set_chain_mask(dut, chain_mask): 732 """Function to set DUT chain mask. 733 734 Args: 735 dut: android device 736 chain_mask: desired chain mask in [0, 1, '2x2'] 737 """ 738 pass 739 740 741# Link layer stats utilities 742class LinkLayerStats(): 743 def __new__(self, dut, llstats_enabled=True): 744 if detect_wifi_platform(dut) == 'qcom': 745 return qcom_utils.LinkLayerStats(dut, llstats_enabled) 746 else: 747 return brcm_utils.LinkLayerStats(dut, llstats_enabled) 748