• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import importlib
19import ipaddress
20import logging
21import numpy
22import re
23import time
24from acts import asserts
25from acts import utils
26from acts.controllers.android_device import AndroidDevice
27from acts.controllers.utils_lib import ssh
28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils
30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils
31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils
32
33from concurrent.futures import ThreadPoolExecutor
34
35SHORT_SLEEP = 1
36MED_SLEEP = 6
37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)]
38BAND_TO_CHANNEL_MAP = {
39    '2.4GHz': list(range(1, 14)),
40    'UNII-1': [36, 40, 44, 48],
41    'UNII-2':
42    [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140],
43    'UNII-3': [149, 153, 157, 161, 165],
44    '6GHz': CHANNELS_6GHz
45}
46CHANNEL_TO_BAND_MAP = {
47    channel: band
48    for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels
49}
50
51
52# Decorators
53def nonblocking(f):
54    """Creates a decorator transforming function calls to non-blocking"""
55    def wrap(*args, **kwargs):
56        executor = ThreadPoolExecutor(max_workers=1)
57        thread_future = executor.submit(f, *args, **kwargs)
58        # Ensure resources are freed up when executor ruturns or raises
59        executor.shutdown(wait=False)
60        return thread_future
61
62    return wrap
63
64
65def detect_wifi_platform(dut):
66    if hasattr(dut, 'wifi_platform'):
67        return dut.wifi_platform
68    qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/qca_cld/'))
69    if qcom_check:
70        dut.wifi_platform = 'qcom'
71    else:
72        dut.wifi_platform = 'brcm'
73    return dut.wifi_platform
74
75
76def detect_wifi_decorator(f):
77    def wrap(*args, **kwargs):
78        if 'dut' in kwargs:
79            dut = kwargs['dut']
80        else:
81            dut = next(arg for arg in args if type(arg) == AndroidDevice)
82        dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format(
83            detect_wifi_platform(dut))
84        dut_package = importlib.import_module(dut_package)
85        f_decorated = getattr(dut_package, f.__name__, lambda: None)
86        return (f_decorated(*args, **kwargs))
87
88    return wrap
89
90
91# JSON serializer
92def serialize_dict(input_dict):
93    """Function to serialize dicts to enable JSON output"""
94    output_dict = collections.OrderedDict()
95    for key, value in input_dict.items():
96        output_dict[_serialize_value(key)] = _serialize_value(value)
97    return output_dict
98
99
100def _serialize_value(value):
101    """Function to recursively serialize dict entries to enable JSON output"""
102    if isinstance(value, tuple):
103        return str(value)
104    if isinstance(value, numpy.int64):
105        return int(value)
106    if isinstance(value, numpy.float64):
107        return float(value)
108    if isinstance(value, list):
109        return [_serialize_value(x) for x in value]
110    if isinstance(value, numpy.ndarray):
111        return [_serialize_value(x) for x in value]
112    elif isinstance(value, dict):
113        return serialize_dict(value)
114    elif type(value) in (float, int, bool, str):
115        return value
116    else:
117        return "Non-serializable object"
118
119
120def extract_sub_dict(full_dict, fields):
121    sub_dict = collections.OrderedDict(
122        (field, full_dict[field]) for field in fields)
123    return sub_dict
124
125
126# Miscellaneous Wifi Utilities
127def check_skip_conditions(testcase_params,
128                          dut,
129                          access_point,
130                          ota_chamber=None):
131    """Checks if test should be skipped."""
132    # Check battery level before test
133    if not health_check(dut, 10):
134        asserts.skip('DUT battery level too low.')
135    if not access_point.band_lookup_by_channel(testcase_params['channel']):
136        asserts.skip('AP does not support requested channel.')
137    if ota_chamber and CHANNEL_TO_BAND_MAP[
138            testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS:
139        asserts.skip('OTA chamber does not support requested channel.')
140    # Check if 6GHz is supported by checking capabilities in the US.
141    if not dut.droid.wifiCheckState():
142        wutils.wifi_toggle_state(dut, True)
143    iw_list = dut.adb.shell('iw list')
144    supports_6ghz = '6135 MHz' in iw_list
145    supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list
146    if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz:
147        asserts.skip('DUT does not support 160 MHz networks.')
148    if testcase_params.get('channel',
149                           6) in CHANNELS_6GHz and not supports_6ghz:
150        asserts.skip('DUT does not support 6 GHz band.')
151
152
153def validate_network(dut, ssid):
154    """Check that DUT has a valid internet connection through expected SSID
155
156    Args:
157        dut: android device of interest
158        ssid: expected ssid
159    """
160    try:
161        connected = wutils.validate_connection(dut, wait_time=3) is not None
162        current_network = dut.droid.wifiGetConnectionInfo()
163    except:
164        connected = False
165        current_network = None
166    if connected and current_network['SSID'] == ssid:
167        return True
168    else:
169        return False
170
171
172def get_server_address(ssh_connection, dut_ip, subnet_mask):
173    """Get server address on a specific subnet,
174
175    This function retrieves the LAN or WAN IP of a remote machine used in
176    testing. If subnet_mask is set to 'public' it returns a machines global ip,
177    else it returns the ip belonging to the dut local network given the dut's
178    ip and subnet mask.
179
180    Args:
181        ssh_connection: object representing server for which we want an ip
182        dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx
183        subnet_mask: string representing subnet mask (public for global ip)
184    """
185    ifconfig_out = ssh_connection.run('ifconfig').stdout
186    ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out)
187    ip_list = [ipaddress.ip_address(ip) for ip in ip_list]
188
189    if subnet_mask == 'public':
190        for ip in ip_list:
191            # is_global is not used to allow for CGNAT ips in 100.x.y.z range
192            if not ip.is_private:
193                return str(ip)
194    else:
195        dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask),
196                                           strict=False)
197        for ip in ip_list:
198            if ip in dut_network:
199                return str(ip)
200    logging.error('No IP address found in requested subnet')
201
202
203# Ping utilities
204def get_ping_stats(src_device, dest_address, ping_duration, ping_interval,
205                   ping_size):
206    """Run ping to or from the DUT.
207
208    The function computes either pings the DUT or pings a remote ip from
209    DUT.
210
211    Args:
212        src_device: object representing device to ping from
213        dest_address: ip address to ping
214        ping_duration: timeout to set on the ping process (in seconds)
215        ping_interval: time between pings (in seconds)
216        ping_size: size of ping packet payload
217    Returns:
218        ping_result: dict containing ping results and other meta data
219    """
220    ping_count = int(ping_duration / ping_interval)
221    ping_deadline = int(ping_count * ping_interval) + 1
222    ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format(
223        ping_count,
224        ping_deadline,
225        ping_interval,
226        ping_size,
227    )
228
229    ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format(
230        ping_count,
231        ping_deadline,
232        ping_interval,
233        ping_size,
234    )
235
236    if isinstance(src_device, AndroidDevice):
237        ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address)
238        ping_output = src_device.adb.shell(ping_cmd,
239                                           timeout=ping_deadline + SHORT_SLEEP,
240                                           ignore_status=True)
241    elif isinstance(src_device, ssh.connection.SshConnection):
242        platform = src_device.run('uname').stdout
243        if 'linux' in platform.lower():
244            ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address)
245        elif 'darwin' in platform.lower():
246            ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format(
247                ping_cmd_macos, dest_address)
248        ping_output = src_device.run(ping_cmd,
249                                     timeout=ping_deadline + SHORT_SLEEP,
250                                     ignore_status=True).stdout
251    else:
252        raise TypeError('Unable to ping using src_device of type %s.' %
253                        type(src_device))
254    return ping_utils.PingResult(ping_output.splitlines())
255
256
257@nonblocking
258def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval,
259                      ping_size):
260    return get_ping_stats(src_device, dest_address, ping_duration,
261                          ping_interval, ping_size)
262
263
264# Iperf utilities
265@nonblocking
266def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag,
267                          timeout):
268    return iperf_client.start(iperf_server_address, iperf_args, tag, timeout)
269
270
271def get_iperf_arg_string(duration,
272                         reverse_direction,
273                         interval=1,
274                         traffic_type='TCP',
275                         socket_size=None,
276                         num_processes=1,
277                         udp_throughput='1000M',
278                         ipv6=False):
279    """Function to format iperf client arguments.
280
281    This function takes in iperf client parameters and returns a properly
282    formatter iperf arg string to be used in throughput tests.
283
284    Args:
285        duration: iperf duration in seconds
286        reverse_direction: boolean controlling the -R flag for iperf clients
287        interval: iperf print interval
288        traffic_type: string specifying TCP or UDP traffic
289        socket_size: string specifying TCP window or socket buffer, e.g., 2M
290        num_processes: int specifying number of iperf processes
291        udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M
292        ipv6: boolean controlling the use of IP V6
293    Returns:
294        iperf_args: string of formatted iperf args
295    """
296    iperf_args = '-i {} -t {} -J '.format(interval, duration)
297    if ipv6:
298        iperf_args = iperf_args + '-6 '
299    if traffic_type.upper() == 'UDP':
300        iperf_args = iperf_args + '-u -b {} -l 1470 -P {} '.format(
301            udp_throughput, num_processes)
302    elif traffic_type.upper() == 'TCP':
303        iperf_args = iperf_args + '-P {} '.format(num_processes)
304    if socket_size:
305        iperf_args = iperf_args + '-w {} '.format(socket_size)
306    if reverse_direction:
307        iperf_args = iperf_args + ' -R'
308    return iperf_args
309
310
311# Attenuator Utilities
312def atten_by_label(atten_list, path_label, atten_level):
313    """Attenuate signals according to their path label.
314
315    Args:
316        atten_list: list of attenuators to iterate over
317        path_label: path label on which to set desired attenuation
318        atten_level: attenuation desired on path
319    """
320    for atten in atten_list:
321        if path_label in atten.path:
322            atten.set_atten(atten_level, retry=True)
323
324
325def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server):
326    """Function to estimate attenuation to hit a target RSSI.
327
328    This function estimates a constant attenuation setting on all atennuation
329    ports to hit a target RSSI. The estimate is not meant to be exact or
330    guaranteed.
331
332    Args:
333        target_rssi: rssi of interest
334        attenuators: list of attenuator ports
335        dut: android device object assumed connected to a wifi network.
336        ping_server: ssh connection object to ping server
337    Returns:
338        target_atten: attenuation setting to achieve target_rssi
339    """
340    logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi))
341    # Set attenuator to 0 dB
342    for atten in attenuators:
343        atten.set_atten(0, strict=False, retry=True)
344    # Start ping traffic
345    dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
346    # Measure starting RSSI
347    ping_future = get_ping_stats_nb(src_device=ping_server,
348                                    dest_address=dut_ip,
349                                    ping_duration=1.5,
350                                    ping_interval=0.02,
351                                    ping_size=64)
352    current_rssi = get_connected_rssi(dut,
353                                      num_measurements=4,
354                                      polling_frequency=0.25,
355                                      first_measurement_delay=0.5,
356                                      disconnect_warning=1,
357                                      ignore_samples=1)
358    current_rssi = current_rssi['signal_poll_rssi']['mean']
359    ping_future.result()
360    target_atten = 0
361    logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format(
362        target_atten, current_rssi))
363    within_range = 0
364    for idx in range(20):
365        atten_delta = max(min(current_rssi - target_rssi, 20), -20)
366        target_atten = int((target_atten + atten_delta) * 4) / 4
367        if target_atten < 0:
368            return 0
369        if target_atten > attenuators[0].get_max_atten():
370            return attenuators[0].get_max_atten()
371        for atten in attenuators:
372            atten.set_atten(target_atten, strict=False, retry=True)
373        ping_future = get_ping_stats_nb(src_device=ping_server,
374                                        dest_address=dut_ip,
375                                        ping_duration=1.5,
376                                        ping_interval=0.02,
377                                        ping_size=64)
378        current_rssi = get_connected_rssi(dut,
379                                          num_measurements=4,
380                                          polling_frequency=0.25,
381                                          first_measurement_delay=0.5,
382                                          disconnect_warning=1,
383                                          ignore_samples=1)
384        current_rssi = current_rssi['signal_poll_rssi']['mean']
385        ping_future.result()
386        logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format(
387            target_atten, current_rssi))
388        if abs(current_rssi - target_rssi) < 1:
389            if within_range:
390                logging.info(
391                    'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.'
392                    'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format(
393                        current_rssi, target_rssi, target_atten, idx))
394                return target_atten
395            else:
396                within_range = True
397        else:
398            within_range = False
399    return target_atten
400
401
402def get_current_atten_dut_chain_map(attenuators,
403                                    dut,
404                                    ping_server,
405                                    ping_from_dut=False):
406    """Function to detect mapping between attenuator ports and DUT chains.
407
408    This function detects the mapping between attenuator ports and DUT chains
409    in cases where DUT chains are connected to only one attenuator port. The
410    function assumes the DUT is already connected to a wifi network. The
411    function starts by measuring per chain RSSI at 0 attenuation, then
412    attenuates one port at a time looking for the chain that reports a lower
413    RSSI.
414
415    Args:
416        attenuators: list of attenuator ports
417        dut: android device object assumed connected to a wifi network.
418        ping_server: ssh connection object to ping server
419        ping_from_dut: boolean controlling whether to ping from or to dut
420    Returns:
421        chain_map: list of dut chains, one entry per attenuator port
422    """
423    # Set attenuator to 0 dB
424    for atten in attenuators:
425        atten.set_atten(0, strict=False, retry=True)
426    # Start ping traffic
427    dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
428    if ping_from_dut:
429        ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname,
430                                        11, 0.02, 64)
431    else:
432        ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64)
433    # Measure starting RSSI
434    base_rssi = get_connected_rssi(dut, 4, 0.25, 1)
435    chain0_base_rssi = base_rssi['chain_0_rssi']['mean']
436    chain1_base_rssi = base_rssi['chain_1_rssi']['mean']
437    if chain0_base_rssi < -70 or chain1_base_rssi < -70:
438        logging.warning('RSSI might be too low to get reliable chain map.')
439    # Compile chain map by attenuating one path at a time and seeing which
440    # chain's RSSI degrades
441    chain_map = []
442    for test_atten in attenuators:
443        # Set one attenuator to 30 dB down
444        test_atten.set_atten(30, strict=False, retry=True)
445        # Get new RSSI
446        test_rssi = get_connected_rssi(dut, 4, 0.25, 1)
447        # Assign attenuator to path that has lower RSSI
448        if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[
449                'chain_0_rssi']['mean'] > 10:
450            chain_map.append('DUT-Chain-0')
451        elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[
452                'chain_1_rssi']['mean'] > 10:
453            chain_map.append('DUT-Chain-1')
454        else:
455            chain_map.append(None)
456        # Reset attenuator to 0
457        test_atten.set_atten(0, strict=False, retry=True)
458    ping_future.result()
459    logging.debug('Chain Map: {}'.format(chain_map))
460    return chain_map
461
462
463def get_full_rf_connection_map(attenuators,
464                               dut,
465                               ping_server,
466                               networks,
467                               ping_from_dut=False):
468    """Function to detect per-network connections between attenuator and DUT.
469
470    This function detects the mapping between attenuator ports and DUT chains
471    on all networks in its arguments. The function connects the DUT to each
472    network then calls get_current_atten_dut_chain_map to get the connection
473    map on the current network. The function outputs the results in two formats
474    to enable easy access when users are interested in indexing by network or
475    attenuator port.
476
477    Args:
478        attenuators: list of attenuator ports
479        dut: android device object assumed connected to a wifi network.
480        ping_server: ssh connection object to ping server
481        networks: dict of network IDs and configs
482    Returns:
483        rf_map_by_network: dict of RF connections indexed by network.
484        rf_map_by_atten: list of RF connections indexed by attenuator
485    """
486    for atten in attenuators:
487        atten.set_atten(0, strict=False, retry=True)
488
489    rf_map_by_network = collections.OrderedDict()
490    rf_map_by_atten = [[] for atten in attenuators]
491    for net_id, net_config in networks.items():
492        wutils.reset_wifi(dut)
493        wutils.wifi_connect(dut,
494                            net_config,
495                            num_of_tries=1,
496                            assert_on_fail=False,
497                            check_connectivity=False)
498        rf_map_by_network[net_id] = get_current_atten_dut_chain_map(
499            attenuators, dut, ping_server, ping_from_dut)
500        for idx, chain in enumerate(rf_map_by_network[net_id]):
501            if chain:
502                rf_map_by_atten[idx].append({
503                    'network': net_id,
504                    'dut_chain': chain
505                })
506    logging.debug('RF Map (by Network): {}'.format(rf_map_by_network))
507    logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten))
508
509    return rf_map_by_network, rf_map_by_atten
510
511
512# Generic device utils
513def get_dut_temperature(dut):
514    """Function to get dut temperature.
515
516    The function fetches and returns the reading from the temperature sensor
517    used for skin temperature and thermal throttling.
518
519    Args:
520        dut: AndroidDevice of interest
521    Returns:
522        temperature: device temperature. 0 if temperature could not be read
523    """
524    candidate_zones = [
525        '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp',
526        '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp',
527        '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp',
528        '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp',
529        '/dev/thermal/tz-by-name/quiet_therm/temp'
530    ]
531    for zone in candidate_zones:
532        try:
533            temperature = int(dut.adb.shell('cat {}'.format(zone)))
534            break
535        except:
536            temperature = 0
537    if temperature == 0:
538        logging.debug('Could not check DUT temperature.')
539    elif temperature > 100:
540        temperature = temperature / 1000
541    return temperature
542
543
544def wait_for_dut_cooldown(dut, target_temp=50, timeout=300):
545    """Function to wait for a DUT to cool down.
546
547    Args:
548        dut: AndroidDevice of interest
549        target_temp: target cooldown temperature
550        timeout: maxt time to wait for cooldown
551    """
552    start_time = time.time()
553    while time.time() - start_time < timeout:
554        temperature = get_dut_temperature(dut)
555        if temperature < target_temp:
556            break
557        time.sleep(SHORT_SLEEP)
558    elapsed_time = time.time() - start_time
559    logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format(
560        temperature, elapsed_time))
561
562
563def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1):
564    """Function to check health status of a DUT.
565
566    The function checks both battery levels and temperature to avoid DUT
567    powering off during the test.
568
569    Args:
570        dut: AndroidDevice of interest
571        batt_thresh: battery level threshold
572        temp_threshold: temperature threshold
573        cooldown: flag to wait for DUT to cool down when overheating
574    Returns:
575        health_check: boolean confirming device is healthy
576    """
577    health_check = True
578    battery_level = utils.get_battery_level(dut)
579    if battery_level < batt_thresh:
580        logging.warning('Battery level low ({}%)'.format(battery_level))
581        health_check = False
582    else:
583        logging.debug('Battery level = {}%'.format(battery_level))
584
585    temperature = get_dut_temperature(dut)
586    if temperature > temp_threshold:
587        if cooldown:
588            logging.warning(
589                'Waiting for DUT to cooldown. ({} C)'.format(temperature))
590            wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5)
591        else:
592            logging.warning('DUT Overheating ({} C)'.format(temperature))
593            health_check = False
594    else:
595        logging.debug('DUT Temperature = {} C'.format(temperature))
596    return health_check
597
598
599# Wifi Device Utils
600def empty_rssi_result():
601    return collections.OrderedDict([('data', []), ('mean', float('nan')),
602                                    ('stdev', float('nan'))])
603
604
605@nonblocking
606def get_connected_rssi_nb(dut,
607                          num_measurements=1,
608                          polling_frequency=SHORT_SLEEP,
609                          first_measurement_delay=0,
610                          disconnect_warning=True,
611                          ignore_samples=0,
612                          interface='wlan0'):
613    return get_connected_rssi(dut, num_measurements, polling_frequency,
614                              first_measurement_delay, disconnect_warning,
615                              ignore_samples, interface)
616
617
618@detect_wifi_decorator
619def get_connected_rssi(dut,
620                       num_measurements=1,
621                       polling_frequency=SHORT_SLEEP,
622                       first_measurement_delay=0,
623                       disconnect_warning=True,
624                       ignore_samples=0,
625                       interface='wlan0'):
626    """Gets all RSSI values reported for the connected access point/BSSID.
627
628    Args:
629        dut: android device object from which to get RSSI
630        num_measurements: number of scans done, and RSSIs collected
631        polling_frequency: time to wait between RSSI measurements
632        disconnect_warning: boolean controlling disconnection logging messages
633        ignore_samples: number of leading samples to ignore
634    Returns:
635        connected_rssi: dict containing the measurements results for
636        all reported RSSI values (signal_poll, per chain, etc.) and their
637        statistics
638    """
639    pass
640
641
642@nonblocking
643def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1):
644    return get_scan_rssi(dut, tracked_bssids, num_measurements)
645
646
647@detect_wifi_decorator
648def get_scan_rssi(dut, tracked_bssids, num_measurements=1):
649    """Gets scan RSSI for specified BSSIDs.
650
651    Args:
652        dut: android device object from which to get RSSI
653        tracked_bssids: array of BSSIDs to gather RSSI data for
654        num_measurements: number of scans done, and RSSIs collected
655    Returns:
656        scan_rssi: dict containing the measurement results as well as the
657        statistics of the scan RSSI for all BSSIDs in tracked_bssids
658    """
659    pass
660
661
662@detect_wifi_decorator
663def get_sw_signature(dut):
664    """Function that checks the signature for wifi firmware and config files.
665
666    Returns:
667        bdf_signature: signature consisting of last three digits of bdf cksums
668        fw_signature: floating point firmware version, i.e., major.minor
669    """
670    pass
671
672
673@detect_wifi_decorator
674def get_country_code(dut):
675    """Function that returns the current wifi country code."""
676    pass
677
678
679@detect_wifi_decorator
680def push_config(dut, config_file):
681    """Function to push Wifi BDF files
682
683    This function checks for existing wifi bdf files and over writes them all,
684    for simplicity, with the bdf file provided in the arguments. The dut is
685    rebooted for the bdf file to take effect
686
687    Args:
688        dut: dut to push bdf file to
689        config_file: path to bdf_file to push
690    """
691    pass
692
693
694@detect_wifi_decorator
695def start_wifi_logging(dut):
696    """Function to start collecting wifi-related logs"""
697    pass
698
699
700@detect_wifi_decorator
701def stop_wifi_logging(dut):
702    """Function to start collecting wifi-related logs"""
703    pass
704
705
706@detect_wifi_decorator
707def push_firmware(dut, firmware_files):
708    """Function to push Wifi firmware files
709
710    Args:
711        dut: dut to push bdf file to
712        firmware_files: path to wlanmdsp.mbn file
713        datamsc_file: path to Data.msc file
714    """
715    pass
716
717
718@detect_wifi_decorator
719def disable_beamforming(dut):
720    """Function to disable beamforming."""
721    pass
722
723
724@detect_wifi_decorator
725def set_nss_capability(dut, nss):
726    """Function to set number of spatial streams supported."""
727    pass
728
729
730@detect_wifi_decorator
731def set_chain_mask(dut, chain_mask):
732    """Function to set DUT chain mask.
733
734    Args:
735        dut: android device
736        chain_mask: desired chain mask in [0, 1, '2x2']
737    """
738    pass
739
740
741# Link layer stats utilities
742class LinkLayerStats():
743    def __new__(self, dut, llstats_enabled=True):
744        if detect_wifi_platform(dut) == 'qcom':
745            return qcom_utils.LinkLayerStats(dut, llstats_enabled)
746        else:
747            return brcm_utils.LinkLayerStats(dut, llstats_enabled)
748