• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the 'License');
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an 'AS IS' BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import collections
18import importlib
19import ipaddress
20import logging
21import numpy
22import re
23import time
24from acts import asserts
25from acts import utils
26from acts.controllers.android_device import AndroidDevice
27from acts.controllers.utils_lib import ssh
28from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
29from acts_contrib.test_utils.wifi.wifi_performance_test_utils import ping_utils
30from acts_contrib.test_utils.wifi.wifi_performance_test_utils import qcom_utils
31from acts_contrib.test_utils.wifi.wifi_performance_test_utils import brcm_utils
32
33from concurrent.futures import ThreadPoolExecutor
34
35SHORT_SLEEP = 1
36MED_SLEEP = 6
37CHANNELS_6GHz = ['6g{}'.format(4 * x + 1) for x in range(59)]
38BAND_TO_CHANNEL_MAP = {
39    '2.4GHz': list(range(1, 14)),
40    'UNII-1': [36, 40, 44, 48],
41    'UNII-2':
42    [52, 56, 60, 64, 100, 104, 108, 112, 116, 120, 124, 128, 132, 140],
43    'UNII-3': [149, 153, 157, 161, 165],
44    '6GHz': CHANNELS_6GHz
45}
46CHANNEL_TO_BAND_MAP = {
47    channel: band
48    for band, channels in BAND_TO_CHANNEL_MAP.items() for channel in channels
49}
50
51
52# Decorators
53def nonblocking(f):
54    """Creates a decorator transforming function calls to non-blocking"""
55
56    def wrap(*args, **kwargs):
57        executor = ThreadPoolExecutor(max_workers=1)
58        thread_future = executor.submit(f, *args, **kwargs)
59        # Ensure resources are freed up when executor ruturns or raises
60        executor.shutdown(wait=False)
61        return thread_future
62
63    return wrap
64
65
66def detect_wifi_platform(dut):
67    if hasattr(dut, 'wifi_platform'):
68        return dut.wifi_platform
69    qcom_check = len(dut.get_file_names('/vendor/firmware/wlan/'))
70    if qcom_check:
71        dut.wifi_platform = 'qcom'
72    else:
73        dut.wifi_platform = 'brcm'
74    return dut.wifi_platform
75
76
77def detect_wifi_decorator(f):
78
79    def wrap(*args, **kwargs):
80        if 'dut' in kwargs:
81            dut = kwargs['dut']
82        else:
83            dut = next(arg for arg in args if type(arg) == AndroidDevice)
84        dut_package = 'acts_contrib.test_utils.wifi.wifi_performance_test_utils.{}_utils'.format(
85            detect_wifi_platform(dut))
86        dut_package = importlib.import_module(dut_package)
87        f_decorated = getattr(dut_package, f.__name__, lambda: None)
88        return (f_decorated(*args, **kwargs))
89
90    return wrap
91
92
93# JSON serializer
94def serialize_dict(input_dict):
95    """Function to serialize dicts to enable JSON output"""
96    output_dict = collections.OrderedDict()
97    for key, value in input_dict.items():
98        output_dict[_serialize_value(key)] = _serialize_value(value)
99    return output_dict
100
101
102def _serialize_value(value):
103    """Function to recursively serialize dict entries to enable JSON output"""
104    if isinstance(value, tuple):
105        return str(value)
106    if isinstance(value, numpy.int64):
107        return int(value)
108    if isinstance(value, numpy.float64):
109        return float(value)
110    if isinstance(value, list):
111        return [_serialize_value(x) for x in value]
112    if isinstance(value, numpy.ndarray):
113        return [_serialize_value(x) for x in value]
114    elif isinstance(value, dict):
115        return serialize_dict(value)
116    elif type(value) in (float, int, bool, str):
117        return value
118    else:
119        return "Non-serializable object"
120
121
122def extract_sub_dict(full_dict, fields):
123    sub_dict = collections.OrderedDict(
124        (field, full_dict[field]) for field in fields)
125    return sub_dict
126
127def write_antenna_tune_code(dut, tune_code, readback_check = True):
128    flag_tune_code_forcing_on = 0
129    for n_enable in range(5):
130        logging.debug('{}-th Enabling modem test mode'.format(n_enable))
131        try:
132            at_lmodetest_output = dut.adb.shell("modem_cmd raw AT+LMODETEST")
133            time.sleep(SHORT_SLEEP)
134            logging.debug('Command AT+LMODETEST output: {}'.format(at_lmodetest_output))
135        except:
136            logging.error('{}-th Failed modem test mode AT+LMODETEST'.format(n_enable))
137            continue
138        try:
139            at_lrffinalstart_output = dut.adb.shell("modem_cmd raw AT+LRFFINALSTART")
140            time.sleep(SHORT_SLEEP)
141            logging.debug('Command AT+LRFFINALSTART output: {}'.format(at_lrffinalstart_output))
142            if "+LRFFINALSTART:0" in at_lrffinalstart_output and not "ERROR" in at_lrffinalstart_output:
143                flag_tune_code_forcing_on = 1
144                logging.info('Enable modem test mode SUCCESSFUL')
145                break
146        except:
147            logging.error('{}-th Failed modem test mode AT+LRFFINALSTART'.format(n_enable))
148            continue
149    if not flag_tune_code_forcing_on:
150        raise RuntimeError("AT Command File Not set up")
151    at_cmd_output = dut.adb.shell("modem_cmd raw " + tune_code['tune_code_cmd'])
152    logging.debug('Write Tune Code: {}'.format("modem_cmd raw " + tune_code['tune_code_cmd']))
153    flag_tc_reg_correct = True
154    # Check tune code register values
155    if readback_check:
156        # Check tune code register values
157        for tune_code_register_key in tune_code['tune_code_registers'].keys():
158            try:
159                at_tc_reg_output = dut.adb.shell("modem_cmd raw AT+MIPIREAD={}".format(tune_code_register_key))
160                time.sleep(SHORT_SLEEP)
161            except:
162                pass
163            if "+MIPIREAD:"+tune_code['tune_code_registers'][tune_code_register_key].lower() in at_tc_reg_output:
164                logging.info('Forced tune code register {} value matches expectation: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key]))
165            else:
166                logging.warning('Expected tune code register {} value: {}'.format(tune_code_register_key, tune_code['tune_code_registers'][tune_code_register_key]))
167                logging.warning('tune code register value is set to {}'.format(at_tc_reg_output))
168                flag_tc_reg_correct = False
169        if flag_tc_reg_correct:
170            return True
171        else:
172            raise RuntimeError("Enable modem test mode SUCCESSFUL, but register values NOT correct")
173    else:
174        logging.info('Tune code validation skipped.')
175        return True
176
177# Miscellaneous Wifi Utilities
178def check_skip_conditions(testcase_params, dut, access_point,
179                          ota_chamber=None):
180    """Checks if test should be skipped."""
181    # Check battery level before test
182    if not health_check(dut, 10):
183        asserts.skip('DUT battery level too low.')
184    if not access_point.band_lookup_by_channel(testcase_params['channel']):
185        asserts.skip('AP does not support requested channel.')
186    if ota_chamber and CHANNEL_TO_BAND_MAP[
187            testcase_params['channel']] not in ota_chamber.SUPPORTED_BANDS:
188        asserts.skip('OTA chamber does not support requested channel.')
189    # Check if 6GHz is supported by checking capabilities in the US.
190    if not dut.droid.wifiCheckState():
191        wutils.wifi_toggle_state(dut, True)
192    iw_list = dut.adb.shell('iw list')
193    supports_6ghz = '6135 MHz' in iw_list
194    supports_160mhz = 'Supported Channel Width: 160 MHz' in iw_list
195    if testcase_params.get('bandwidth', 20) == 160 and not supports_160mhz:
196        asserts.skip('DUT does not support 160 MHz networks.')
197    if testcase_params.get('channel',
198                           6) in CHANNELS_6GHz and not supports_6ghz:
199        asserts.skip('DUT does not support 6 GHz band.')
200
201
202def validate_network(dut, ssid):
203    """Check that DUT has a valid internet connection through expected SSID
204
205    Args:
206        dut: android device of interest
207        ssid: expected ssid
208    """
209    try:
210        connected = wutils.validate_connection(dut, wait_time=3) is not None
211        current_network = dut.droid.wifiGetConnectionInfo()
212    except:
213        connected = False
214        current_network = None
215    if connected and current_network['SSID'] == ssid:
216        return True
217    else:
218        return False
219
220
221def get_server_address(ssh_connection, dut_ip, subnet_mask):
222    """Get server address on a specific subnet,
223
224    This function retrieves the LAN or WAN IP of a remote machine used in
225    testing. If subnet_mask is set to 'public' it returns a machines global ip,
226    else it returns the ip belonging to the dut local network given the dut's
227    ip and subnet mask.
228
229    Args:
230        ssh_connection: object representing server for which we want an ip
231        dut_ip: string in ip address format, i.e., xxx.xxx.xxx.xxx
232        subnet_mask: string representing subnet mask (public for global ip)
233    """
234    ifconfig_out = ssh_connection.run('ifconfig').stdout
235    ip_list = re.findall('inet (?:addr:)?(\d+.\d+.\d+.\d+)', ifconfig_out)
236    ip_list = [ipaddress.ip_address(ip) for ip in ip_list]
237
238    if subnet_mask == 'public':
239        for ip in ip_list:
240            # is_global is not used to allow for CGNAT ips in 100.x.y.z range
241            if not ip.is_private:
242                return str(ip)
243    else:
244        dut_network = ipaddress.ip_network('{}/{}'.format(dut_ip, subnet_mask),
245                                           strict=False)
246        for ip in ip_list:
247            if ip in dut_network:
248                return str(ip)
249    logging.error('No IP address found in requested subnet')
250
251
252# Ping utilities
253def get_ping_stats(src_device, dest_address, ping_duration, ping_interval,
254                   ping_size):
255    """Run ping to or from the DUT.
256
257    The function computes either pings the DUT or pings a remote ip from
258    DUT.
259
260    Args:
261        src_device: object representing device to ping from
262        dest_address: ip address to ping
263        ping_duration: timeout to set on the ping process (in seconds)
264        ping_interval: time between pings (in seconds)
265        ping_size: size of ping packet payload
266    Returns:
267        ping_result: dict containing ping results and other meta data
268    """
269    ping_count = int(ping_duration / ping_interval)
270    ping_deadline = int(ping_count * ping_interval) + 1
271    ping_cmd_linux = 'ping -c {} -w {} -i {} -s {} -D'.format(
272        ping_count,
273        ping_deadline,
274        ping_interval,
275        ping_size,
276    )
277
278    ping_cmd_macos = 'ping -c {} -t {} -i {} -s {}'.format(
279        ping_count,
280        ping_deadline,
281        ping_interval,
282        ping_size,
283    )
284
285    if isinstance(src_device, AndroidDevice):
286        ping_cmd = '{} {}'.format(ping_cmd_linux, dest_address)
287        ping_output = src_device.adb.shell(ping_cmd,
288                                           timeout=ping_deadline + SHORT_SLEEP,
289                                           ignore_status=True)
290    elif isinstance(src_device, ssh.connection.SshConnection):
291        platform = src_device.run('uname').stdout
292        if 'linux' in platform.lower():
293            ping_cmd = 'sudo {} {}'.format(ping_cmd_linux, dest_address)
294        elif 'darwin' in platform.lower():
295            ping_cmd = "sudo {} {}| while IFS= read -r line; do printf '[%s] %s\n' \"$(gdate '+%s.%N')\" \"$line\"; done".format(
296                ping_cmd_macos, dest_address)
297        ping_output = src_device.run(ping_cmd,
298                                     timeout=ping_deadline + SHORT_SLEEP,
299                                     ignore_status=True).stdout
300    else:
301        raise TypeError('Unable to ping using src_device of type %s.' %
302                        type(src_device))
303    return ping_utils.PingResult(ping_output.splitlines())
304
305
306@nonblocking
307def get_ping_stats_nb(src_device, dest_address, ping_duration, ping_interval,
308                      ping_size):
309    return get_ping_stats(src_device, dest_address, ping_duration,
310                          ping_interval, ping_size)
311
312
313# Iperf utilities
314@nonblocking
315def start_iperf_client_nb(iperf_client, iperf_server_address, iperf_args, tag,
316                          timeout):
317    return iperf_client.start(iperf_server_address, iperf_args, tag, timeout)
318
319
320def get_iperf_arg_string(duration,
321                         reverse_direction,
322                         interval=1,
323                         traffic_type='TCP',
324                         socket_size=None,
325                         num_processes=1,
326                         udp_throughput='1000M',
327                         ipv6=False,
328                         udp_length=1448):
329    """Function to format iperf client arguments.
330
331    This function takes in iperf client parameters and returns a properly
332    formatter iperf arg string to be used in throughput tests.
333
334    Args:
335        duration: iperf duration in seconds
336        reverse_direction: boolean controlling the -R flag for iperf clients
337        interval: iperf print interval
338        traffic_type: string specifying TCP or UDP traffic
339        socket_size: string specifying TCP window or socket buffer, e.g., 2M
340        num_processes: int specifying number of iperf processes
341        udp_throughput: string specifying TX throughput in UDP tests, e.g. 100M
342        ipv6: boolean controlling the use of IP V6
343    Returns:
344        iperf_args: string of formatted iperf args
345    """
346    iperf_args = '-i {} -t {} -J '.format(interval, duration)
347    if ipv6:
348        iperf_args = iperf_args + '-6 '
349    if traffic_type.upper() == 'UDP':
350        iperf_args = iperf_args + '-u -b {} -l {} -P {} '.format(
351            udp_throughput, udp_length, num_processes)
352    elif traffic_type.upper() == 'TCP':
353        iperf_args = iperf_args + '-P {} '.format(num_processes)
354    if socket_size:
355        iperf_args = iperf_args + '-w {} '.format(socket_size)
356    if reverse_direction:
357        iperf_args = iperf_args + ' -R'
358    return iperf_args
359
360
361# Attenuator Utilities
362def atten_by_label(atten_list, path_label, atten_level):
363    """Attenuate signals according to their path label.
364
365    Args:
366        atten_list: list of attenuators to iterate over
367        path_label: path label on which to set desired attenuation
368        atten_level: attenuation desired on path
369    """
370    for atten in atten_list:
371        if path_label in atten.path:
372            atten.set_atten(atten_level, retry=True)
373
374
375def get_atten_for_target_rssi(target_rssi, attenuators, dut, ping_server):
376    """Function to estimate attenuation to hit a target RSSI.
377
378    This function estimates a constant attenuation setting on all atennuation
379    ports to hit a target RSSI. The estimate is not meant to be exact or
380    guaranteed.
381
382    Args:
383        target_rssi: rssi of interest
384        attenuators: list of attenuator ports
385        dut: android device object assumed connected to a wifi network.
386        ping_server: ssh connection object to ping server
387    Returns:
388        target_atten: attenuation setting to achieve target_rssi
389    """
390    logging.info('Searching attenuation for RSSI = {}dB'.format(target_rssi))
391    # Set attenuator to 0 dB
392    for atten in attenuators:
393        atten.set_atten(0, strict=False, retry=True)
394    # Start ping traffic
395    dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
396    # Measure starting RSSI
397    ping_future = get_ping_stats_nb(src_device=ping_server,
398                                    dest_address=dut_ip,
399                                    ping_duration=1.5,
400                                    ping_interval=0.02,
401                                    ping_size=64)
402    current_rssi = get_connected_rssi(dut,
403                                      num_measurements=4,
404                                      polling_frequency=0.25,
405                                      first_measurement_delay=0.5,
406                                      disconnect_warning=1,
407                                      ignore_samples=1)
408    current_rssi = current_rssi['signal_poll_rssi']['mean']
409    ping_future.result()
410    target_atten = 0
411    logging.debug('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format(
412        target_atten, current_rssi))
413    within_range = 0
414    for idx in range(20):
415        atten_delta = max(min(current_rssi - target_rssi, 20), -20)
416        target_atten = int((target_atten + atten_delta) * 4) / 4
417        if target_atten < 0:
418            return 0
419        if target_atten > attenuators[0].get_max_atten():
420            return attenuators[0].get_max_atten()
421        for atten in attenuators:
422            atten.set_atten(target_atten, strict=False, retry=True)
423        ping_future = get_ping_stats_nb(src_device=ping_server,
424                                        dest_address=dut_ip,
425                                        ping_duration=1.5,
426                                        ping_interval=0.02,
427                                        ping_size=64)
428        current_rssi = get_connected_rssi(dut,
429                                          num_measurements=4,
430                                          polling_frequency=0.25,
431                                          first_measurement_delay=0.5,
432                                          disconnect_warning=1,
433                                          ignore_samples=1)
434        current_rssi = current_rssi['signal_poll_rssi']['mean']
435        ping_future.result()
436        logging.info('RSSI @ {0:.2f}dB attenuation = {1:.2f}'.format(
437            target_atten, current_rssi))
438        if abs(current_rssi - target_rssi) < 1:
439            if within_range:
440                logging.info(
441                    'Reached RSSI: {0:.2f}. Target RSSI: {1:.2f}.'
442                    'Attenuation: {2:.2f}, Iterations = {3:.2f}'.format(
443                        current_rssi, target_rssi, target_atten, idx))
444                return target_atten
445            else:
446                within_range = True
447        else:
448            within_range = False
449    return target_atten
450
451
452def get_current_atten_dut_chain_map(attenuators,
453                                    dut,
454                                    ping_server,
455                                    ping_from_dut=False):
456    """Function to detect mapping between attenuator ports and DUT chains.
457
458    This function detects the mapping between attenuator ports and DUT chains
459    in cases where DUT chains are connected to only one attenuator port. The
460    function assumes the DUT is already connected to a wifi network. The
461    function starts by measuring per chain RSSI at 0 attenuation, then
462    attenuates one port at a time looking for the chain that reports a lower
463    RSSI.
464
465    Args:
466        attenuators: list of attenuator ports
467        dut: android device object assumed connected to a wifi network.
468        ping_server: ssh connection object to ping server
469        ping_from_dut: boolean controlling whether to ping from or to dut
470    Returns:
471        chain_map: list of dut chains, one entry per attenuator port
472    """
473    # Set attenuator to 0 dB
474    for atten in attenuators:
475        atten.set_atten(0, strict=False, retry=True)
476    # Start ping traffic
477    dut_ip = dut.droid.connectivityGetIPv4Addresses('wlan0')[0]
478    if ping_from_dut:
479        ping_future = get_ping_stats_nb(dut, ping_server._settings.hostname,
480                                        11, 0.02, 64)
481    else:
482        ping_future = get_ping_stats_nb(ping_server, dut_ip, 11, 0.02, 64)
483    # Measure starting RSSI
484    base_rssi = get_connected_rssi(dut, 4, 0.25, 1)
485    chain0_base_rssi = base_rssi['chain_0_rssi']['mean']
486    chain1_base_rssi = base_rssi['chain_1_rssi']['mean']
487    if chain0_base_rssi < -70 or chain1_base_rssi < -70:
488        logging.warning('RSSI might be too low to get reliable chain map.')
489    # Compile chain map by attenuating one path at a time and seeing which
490    # chain's RSSI degrades
491    chain_map = []
492    for test_atten in attenuators:
493        # Set one attenuator to 30 dB down
494        test_atten.set_atten(30, strict=False, retry=True)
495        # Get new RSSI
496        test_rssi = get_connected_rssi(dut, 4, 0.25, 1)
497        # Assign attenuator to path that has lower RSSI
498        if chain0_base_rssi > -70 and chain0_base_rssi - test_rssi[
499                'chain_0_rssi']['mean'] > 10:
500            chain_map.append('DUT-Chain-0')
501        elif chain1_base_rssi > -70 and chain1_base_rssi - test_rssi[
502                'chain_1_rssi']['mean'] > 10:
503            chain_map.append('DUT-Chain-1')
504        else:
505            chain_map.append(None)
506        # Reset attenuator to 0
507        test_atten.set_atten(0, strict=False, retry=True)
508    ping_future.result()
509    logging.debug('Chain Map: {}'.format(chain_map))
510    return chain_map
511
512
513def get_full_rf_connection_map(attenuators,
514                               dut,
515                               ping_server,
516                               networks,
517                               ping_from_dut=False):
518    """Function to detect per-network connections between attenuator and DUT.
519
520    This function detects the mapping between attenuator ports and DUT chains
521    on all networks in its arguments. The function connects the DUT to each
522    network then calls get_current_atten_dut_chain_map to get the connection
523    map on the current network. The function outputs the results in two formats
524    to enable easy access when users are interested in indexing by network or
525    attenuator port.
526
527    Args:
528        attenuators: list of attenuator ports
529        dut: android device object assumed connected to a wifi network.
530        ping_server: ssh connection object to ping server
531        networks: dict of network IDs and configs
532    Returns:
533        rf_map_by_network: dict of RF connections indexed by network.
534        rf_map_by_atten: list of RF connections indexed by attenuator
535    """
536    for atten in attenuators:
537        atten.set_atten(0, strict=False, retry=True)
538
539    rf_map_by_network = collections.OrderedDict()
540    rf_map_by_atten = [[] for atten in attenuators]
541    for net_id, net_config in networks.items():
542        wutils.reset_wifi(dut)
543        wutils.wifi_connect(dut,
544                            net_config,
545                            num_of_tries=1,
546                            assert_on_fail=False,
547                            check_connectivity=False)
548        rf_map_by_network[net_id] = get_current_atten_dut_chain_map(
549            attenuators, dut, ping_server, ping_from_dut)
550        for idx, chain in enumerate(rf_map_by_network[net_id]):
551            if chain:
552                rf_map_by_atten[idx].append({
553                    'network': net_id,
554                    'dut_chain': chain
555                })
556    logging.debug('RF Map (by Network): {}'.format(rf_map_by_network))
557    logging.debug('RF Map (by Atten): {}'.format(rf_map_by_atten))
558
559    return rf_map_by_network, rf_map_by_atten
560
561
562# Generic device utils
563def get_dut_temperature(dut):
564    """Function to get dut temperature.
565
566    The function fetches and returns the reading from the temperature sensor
567    used for skin temperature and thermal throttling.
568
569    Args:
570        dut: AndroidDevice of interest
571    Returns:
572        temperature: device temperature. 0 if temperature could not be read
573    """
574    candidate_zones = [
575        '/sys/devices/virtual/thermal/tz-by-name/skin-therm/temp',
576        '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-monitor/temp',
577        '/sys/devices/virtual/thermal/tz-by-name/sdm-therm-adc/temp',
578        '/sys/devices/virtual/thermal/tz-by-name/back_therm/temp',
579        '/dev/thermal/tz-by-name/quiet_therm/temp'
580    ]
581    for zone in candidate_zones:
582        try:
583            temperature = int(dut.adb.shell('cat {}'.format(zone)))
584            break
585        except:
586            temperature = 0
587    if temperature == 0:
588        logging.debug('Could not check DUT temperature.')
589    elif temperature > 100:
590        temperature = temperature / 1000
591    return temperature
592
593
594def wait_for_dut_cooldown(dut, target_temp=50, timeout=300):
595    """Function to wait for a DUT to cool down.
596
597    Args:
598        dut: AndroidDevice of interest
599        target_temp: target cooldown temperature
600        timeout: maxt time to wait for cooldown
601    """
602    start_time = time.time()
603    while time.time() - start_time < timeout:
604        temperature = get_dut_temperature(dut)
605        if temperature < target_temp:
606            break
607        time.sleep(SHORT_SLEEP)
608    elapsed_time = time.time() - start_time
609    logging.debug('DUT Final Temperature: {}C. Cooldown duration: {}'.format(
610        temperature, elapsed_time))
611
612
613def health_check(dut, batt_thresh=5, temp_threshold=53, cooldown=1):
614    """Function to check health status of a DUT.
615
616    The function checks both battery levels and temperature to avoid DUT
617    powering off during the test.
618
619    Args:
620        dut: AndroidDevice of interest
621        batt_thresh: battery level threshold
622        temp_threshold: temperature threshold
623        cooldown: flag to wait for DUT to cool down when overheating
624    Returns:
625        health_check: boolean confirming device is healthy
626    """
627    health_check = True
628    battery_level = utils.get_battery_level(dut)
629    if battery_level < batt_thresh:
630        logging.warning('Battery level low ({}%)'.format(battery_level))
631        health_check = False
632    else:
633        logging.debug('Battery level = {}%'.format(battery_level))
634
635    temperature = get_dut_temperature(dut)
636    if temperature > temp_threshold:
637        if cooldown:
638            logging.warning(
639                'Waiting for DUT to cooldown. ({} C)'.format(temperature))
640            wait_for_dut_cooldown(dut, target_temp=temp_threshold - 5)
641        else:
642            logging.warning('DUT Overheating ({} C)'.format(temperature))
643            health_check = False
644    else:
645        logging.debug('DUT Temperature = {} C'.format(temperature))
646    return health_check
647
648
649# Wifi Device Utils
650def empty_rssi_result():
651    return collections.OrderedDict([('data', []), ('mean', float('nan')),
652                                    ('stdev', float('nan'))])
653
654# Phone fold status
655def check_fold_status(dut):
656    fold_status = 'NA'
657    try:
658        fold_status_str = dut.adb.shell('sensor_test sample -s 65547.0 -n1',timeout=2)
659    except:
660        return fold_status
661    if 'Data: 1.000000' in fold_status_str:
662        fold_status = 'folded'
663    elif 'Data: 0.000000' in fold_status_str:
664        fold_status = 'unfolded'
665    else:
666        fold_status = 'NA'
667    return fold_status
668
669@nonblocking
670def get_connected_rssi_nb(dut,
671                          num_measurements=1,
672                          polling_frequency=SHORT_SLEEP,
673                          first_measurement_delay=0,
674                          disconnect_warning=True,
675                          ignore_samples=0,
676                          interface='wlan0'):
677    return get_connected_rssi(dut, num_measurements, polling_frequency,
678                              first_measurement_delay, disconnect_warning,
679                              ignore_samples, interface)
680
681
682@detect_wifi_decorator
683def get_connected_rssi(dut,
684                       num_measurements=1,
685                       polling_frequency=SHORT_SLEEP,
686                       first_measurement_delay=0,
687                       disconnect_warning=True,
688                       ignore_samples=0,
689                       interface='wlan0'):
690    """Gets all RSSI values reported for the connected access point/BSSID.
691
692    Args:
693        dut: android device object from which to get RSSI
694        num_measurements: number of scans done, and RSSIs collected
695        polling_frequency: time to wait between RSSI measurements
696        disconnect_warning: boolean controlling disconnection logging messages
697        ignore_samples: number of leading samples to ignore
698    Returns:
699        connected_rssi: dict containing the measurements results for
700        all reported RSSI values (signal_poll, per chain, etc.) and their
701        statistics
702    """
703    pass
704
705
706@nonblocking
707def get_scan_rssi_nb(dut, tracked_bssids, num_measurements=1):
708    return get_scan_rssi(dut, tracked_bssids, num_measurements)
709
710
711@detect_wifi_decorator
712def get_scan_rssi(dut, tracked_bssids, num_measurements=1):
713    """Gets scan RSSI for specified BSSIDs.
714
715    Args:
716        dut: android device object from which to get RSSI
717        tracked_bssids: array of BSSIDs to gather RSSI data for
718        num_measurements: number of scans done, and RSSIs collected
719    Returns:
720        scan_rssi: dict containing the measurement results as well as the
721        statistics of the scan RSSI for all BSSIDs in tracked_bssids
722    """
723    pass
724
725
726@detect_wifi_decorator
727def get_sw_signature(dut):
728    """Function that checks the signature for wifi firmware and config files.
729
730    Returns:
731        bdf_signature: signature consisting of last three digits of bdf cksums
732        fw_signature: floating point firmware version, i.e., major.minor
733    """
734    pass
735
736
737@detect_wifi_decorator
738def get_country_code(dut):
739    """Function that returns the current wifi country code."""
740    pass
741
742
743@detect_wifi_decorator
744def push_config(dut, config_file):
745    """Function to push Wifi BDF files
746
747    This function checks for existing wifi bdf files and over writes them all,
748    for simplicity, with the bdf file provided in the arguments. The dut is
749    rebooted for the bdf file to take effect
750
751    Args:
752        dut: dut to push bdf file to
753        config_file: path to bdf_file to push
754    """
755    pass
756
757
758@detect_wifi_decorator
759def start_wifi_logging(dut):
760    """Function to start collecting wifi-related logs"""
761    pass
762
763
764@detect_wifi_decorator
765def stop_wifi_logging(dut):
766    """Function to start collecting wifi-related logs"""
767    pass
768
769
770@detect_wifi_decorator
771def push_firmware(dut, firmware_files):
772    """Function to push Wifi firmware files
773
774    Args:
775        dut: dut to push bdf file to
776        firmware_files: path to wlanmdsp.mbn file
777        datamsc_file: path to Data.msc file
778    """
779    pass
780
781
782@detect_wifi_decorator
783def disable_beamforming(dut):
784    """Function to disable beamforming."""
785    pass
786
787
788@detect_wifi_decorator
789def set_nss_capability(dut, nss):
790    """Function to set number of spatial streams supported."""
791    pass
792
793
794@detect_wifi_decorator
795def set_chain_mask(dut, chain_mask):
796    """Function to set DUT chain mask.
797
798    Args:
799        dut: android device
800        chain_mask: desired chain mask in [0, 1, '2x2']
801    """
802    pass
803
804
805# Link layer stats utilities
806class LinkLayerStats():
807
808    def __new__(self, dut, llstats_enabled=True):
809        if detect_wifi_platform(dut) == 'qcom':
810            return qcom_utils.LinkLayerStats(dut, llstats_enabled)
811        else:
812            return brcm_utils.LinkLayerStats(dut, llstats_enabled)
813