#!/usr/bin/env python3.4 # # Copyright 2020 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import collections import itertools import logging import os from acts import asserts from acts import base_test from acts import utils from acts.controllers import iperf_server as ipf from acts.controllers import iperf_client as ipc from acts.metrics.loggers.blackbox import BlackboxMappedMetricLogger from acts.test_decorators import test_tracker_info from acts_contrib.test_utils.wifi import ota_sniffer from acts_contrib.test_utils.wifi import wifi_retail_ap as retail_ap from acts_contrib.test_utils.wifi import wifi_test_utils as wutils from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils from functools import partial from WifiRvrTest import WifiRvrTest AccessPointTuple = collections.namedtuple(('AccessPointTuple'), ['ap_settings']) class WifiTdlsRvrTest(WifiRvrTest): def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) self.testcase_metric_logger = ( BlackboxMappedMetricLogger.for_test_case()) self.testclass_metric_logger = ( BlackboxMappedMetricLogger.for_test_class()) self.publish_testcase_metrics = True def setup_class(self): """Initializes common test hardware and parameters. This function initializes hardwares and compiles parameters that are common to all tests in this class. """ req_params = [ 'tdls_rvr_test_params', 'testbed_params', 'RetailAccessPoints' ] opt_params = ['ap_networks', 'OTASniffer'] self.unpack_userparams(req_params, opt_params) self.access_point = retail_ap.create(self.RetailAccessPoints)[0] self.testclass_params = self.tdls_rvr_test_params self.num_atten = self.attenuators[0].instrument.num_atten self.iperf_server = ipf.create([{ 'AndroidDevice': self.android_devices[0].serial, 'port': '5201' }])[0] self.iperf_client = ipc.create([{ 'AndroidDevice': self.android_devices[1].serial, 'port': '5201' }])[0] self.log_path = os.path.join(logging.log_path, 'results') if hasattr(self, 'OTASniffer') and self.testbed_params['sniffer_enable']: self.sniffer = ota_sniffer.create(self.OTASniffer)[0] os.makedirs(self.log_path, exist_ok=True) if not hasattr(self, 'golden_files_list'): if 'golden_results_path' in self.testbed_params: self.golden_files_list = [ os.path.join(self.testbed_params['golden_results_path'], file) for file in os.listdir(self.testbed_params['golden_results_path']) ] else: self.log.warning('No golden files found.') self.golden_files_list = [] self.testclass_results = [] # Turn WiFi ON if self.testclass_params.get('airplane_mode', 1): self.log.info('Turning on airplane mode.') for ad in self.android_devices: asserts.assert_true(utils.force_airplane_mode(ad, True), "Can not turn on airplane mode.") for ad in self.android_devices: wutils.wifi_toggle_state(ad, True) def teardown_class(self): # Turn WiFi OFF for dev in self.android_devices: wutils.wifi_toggle_state(dev, False) self.process_testclass_results() # Teardown AP and release its lockfile self.access_point.teardown() def setup_test(self): for ad in self.android_devices: wputils.start_wifi_logging(ad) def teardown_test(self): self.iperf_server.stop() for ad in self.android_devices: wutils.reset_wifi(ad) wputils.stop_wifi_logging(ad) def on_exception(self, test_name, begin_time): for ad in self.android_devices: ad.take_bug_report(test_name, begin_time) ad.cat_adb_log(test_name, begin_time) wutils.get_ssrdumps(ad) def compute_test_metrics(self, rvr_result): #Set test metrics rvr_result['metrics'] = {} rvr_result['metrics']['peak_tput'] = max( rvr_result['throughput_receive']) if self.publish_testcase_metrics: self.testcase_metric_logger.add_metric( 'peak_tput', rvr_result['metrics']['peak_tput']) test_mode = rvr_result['testcase_params']['mode'] tput_below_limit = [ tput < self.testclass_params['tput_metric_targets'][test_mode]['high'] for tput in rvr_result['throughput_receive'] ] rvr_result['metrics']['high_tput_range'] = -1 for idx in range(len(tput_below_limit)): if all(tput_below_limit[idx:]): if idx == 0: #Throughput was never above limit rvr_result['metrics']['high_tput_range'] = -1 else: rvr_result['metrics']['high_tput_range'] = rvr_result[ 'total_attenuation'][max(idx, 1) - 1] break if self.publish_testcase_metrics: self.testcase_metric_logger.add_metric( 'high_tput_range', rvr_result['metrics']['high_tput_range']) tput_below_limit = [ tput < self.testclass_params['tput_metric_targets'][test_mode]['low'] for tput in rvr_result['throughput_receive'] ] for idx in range(len(tput_below_limit)): if all(tput_below_limit[idx:]): rvr_result['metrics']['low_tput_range'] = rvr_result[ 'total_attenuation'][max(idx, 1) - 1] break else: rvr_result['metrics']['low_tput_range'] = -1 if self.publish_testcase_metrics: self.testcase_metric_logger.add_metric( 'low_tput_range', rvr_result['metrics']['low_tput_range']) def setup_aps(self, testcase_params): self.log.info('Setting AP to channel {} {}'.format( testcase_params['channel'], testcase_params['bandwidth'])) self.access_point.set_channel(testcase_params['interface_id'], testcase_params['channel']) self.access_point.set_bandwidth(testcase_params['interface_id'], testcase_params['bandwidth']) def setup_duts(self, testcase_params): # Check battery level before test for ad in self.android_devices: if not wputils.health_check(ad, 20): asserts.skip('Overheating or Battery low. Skipping test.') ad.go_to_sleep() wutils.reset_wifi(ad) # Turn screen off to preserve battery for ad in self.android_devices: wutils.wifi_connect( ad, self.ap_networks[0][testcase_params['interface_id']], num_of_tries=5, check_connectivity=True) def setup_tdls_connection(self, testcase_params): tdls_config = {} for idx, ad in enumerate(self.android_devices): tdls_config[idx] = { 'ip_address': ad.droid.connectivityGetIPv4Addresses('wlan0')[0], 'mac_address': ad.droid.wifiGetConnectionInfo()['mac_address'], 'tdls_supported': ad.droid.wifiIsTdlsSupported(), 'off_channel_supported': ad.droid.wifiIsOffChannelTdlsSupported() } self.android_devices[0].droid.wifiSetTdlsEnabledWithMacAddress( tdls_config[1]['mac_address'], True) testcase_params['iperf_server_address'] = tdls_config[0]['ip_address'] testcase_params['tdls_config'] = tdls_config testcase_params['channel'] = testcase_params['channel'] testcase_params['mode'] = testcase_params['bandwidth'] testcase_params['test_network'] = self.ap_networks[0][ testcase_params['interface_id']] def setup_tdls_rvr_test(self, testcase_params): # Setup the aps self.setup_aps(testcase_params) # Setup the duts self.setup_duts(testcase_params) # Set attenuator to 0 dB for attenuator in self.attenuators: attenuator.set_atten(0, strict=False) # Setup the aware connection self.setup_tdls_connection(testcase_params) # Set DUT to monitor RSSI and LLStats on self.monitored_dut = self.android_devices[1] def compile_test_params(self, testcase_params): """Function that completes all test params based on the test name. Args: testcase_params: dict containing test-specific parameters """ for ad in self.android_devices: wputils.check_skip_conditions(testcase_params, ad, self.access_point) # Compile RvR parameters num_atten_steps = int((self.testclass_params['atten_stop'] - self.testclass_params['atten_start']) / self.testclass_params['atten_step']) testcase_params['atten_range'] = [ self.testclass_params['atten_start'] + x * self.testclass_params['atten_step'] for x in range(0, num_atten_steps) ] # Compile iperf arguments if testcase_params['traffic_type'] == 'TCP': testcase_params['iperf_socket_size'] = self.testclass_params.get( 'tcp_socket_size', None) testcase_params['iperf_processes'] = self.testclass_params.get( 'tcp_processes', 1) elif testcase_params['traffic_type'] == 'UDP': testcase_params['iperf_socket_size'] = self.testclass_params.get( 'udp_socket_size', None) testcase_params['iperf_processes'] = self.testclass_params.get( 'udp_processes', 1) testcase_params['iperf_args'] = wputils.get_iperf_arg_string( duration=self.testclass_params['iperf_duration'], reverse_direction=(testcase_params['traffic_direction'] == 'DL'), socket_size=testcase_params['iperf_socket_size'], num_processes=testcase_params['iperf_processes'], traffic_type=testcase_params['traffic_type'], ipv6=False) testcase_params['use_client_output'] = ( testcase_params['traffic_direction'] == 'DL') # Compile AP and infrastructure connection parameters testcase_params['interface_id'] = '2G' if testcase_params[ 'channel'] < 13 else '5G_1' return testcase_params def _test_tdls_rvr(self, testcase_params): """ Function that gets called for each test case Args: testcase_params: dict containing test-specific parameters """ # Compile test parameters from config and test name testcase_params = self.compile_test_params(testcase_params) # Prepare devices and run test self.setup_tdls_rvr_test(testcase_params) rvr_result = self.run_rvr_test(testcase_params) # Post-process results self.testclass_results.append(rvr_result) self.process_test_results(rvr_result) self.pass_fail_check(rvr_result) def generate_test_cases(self, ap_config_list, traffic_type, traffic_directions): """Function that auto-generates test cases for a test class.""" test_cases = [] for ap_config, traffic_direction in itertools.product( ap_config_list, traffic_directions): test_name = 'test_tdls_rvr_{}_{}_ch{}_{}'.format( traffic_type, traffic_direction, ap_config[0], ap_config[1]) test_params = collections.OrderedDict( traffic_type=traffic_type, traffic_direction=traffic_direction, channel=ap_config[0], bandwidth=ap_config[1]) test_class = self.__class__.__name__ if "uuid_list" in self.user_params: test_tracker_uuid = self.user_params["uuid_list"][ test_class][test_name] test_case = test_tracker_info(uuid=test_tracker_uuid)( lambda: self._test_tdls_rvr(test_params)) else: test_case = partial(self._test_tdls_rvr,test_params) setattr(self, test_name, test_case) test_cases.append(test_name) return test_cases class WifiTdlsRvr_FCC_TCP_Test(WifiTdlsRvrTest): def __init__(self, controllers): super().__init__(controllers) ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'], [36, 'bw80'], [149, 'bw20'], [149, 'bw40'], [149, 'bw80']] self.country_code = 'US' self.tests = self.generate_test_cases(ap_config_list=ap_config_list, traffic_type='TCP', traffic_directions=['DL', 'UL']) class WifiTdlsRvr_FCC_UDP_Test(WifiTdlsRvrTest): def __init__(self, controllers): super().__init__(controllers) ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'], [36, 'bw80'], [149, 'bw20'], [149, 'bw40'], [149, 'bw80']] self.country_code = 'US' self.tests = self.generate_test_cases(ap_config_list=ap_config_list, traffic_type='UDP', traffic_directions=['DL', 'UL']) class WifiTdlsRvr_ETSI_TCP_Test(WifiTdlsRvrTest): def __init__(self, controllers): super().__init__(controllers) ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'], [36, 'bw80'], [149, 'bw20'], [149, 'bw40'], [149, 'bw80']] self.country_code = 'GB' self.tests = self.generate_test_cases(ap_config_list=ap_config_list, traffic_type='TCP', traffic_directions=['DL', 'UL']) class WifiTdlsRvr_ETSI_UDP_Test(WifiTdlsRvrTest): def __init__(self, controllers): super().__init__(controllers) ap_config_list = [[6, 'bw20'], [36, 'bw20'], [36, 'bw40'], [36, 'bw80'], [149, 'bw20'], [149, 'bw40'], [149, 'bw80']] self.country_code = 'GB' self.tests = self.generate_test_cases(ap_config_list=ap_config_list, traffic_type='UDP', traffic_directions=['DL', 'UL'])