#!/usr/bin/env python3 # # Copyright (C) 2019 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. import json import random import sys import logging import re from acts.base_test import BaseTestClass from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference from acts_contrib.test_utils.bt.BtInterferenceBaseTest import unpack_custom_file from acts_contrib.test_utils.power.PowerBaseTest import ObjNew from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils from acts_contrib.test_utils.wifi import wifi_test_utils as wutils import time MAX_ATTENUATION = 95 INIT_ATTEN = 0 SCAN = 'wpa_cli scan' SCAN_RESULTS = 'wpa_cli scan_results' class InjectWifiInterferenceTest(BaseTestClass): def __init__(self, configs): super().__init__(configs) req_params = ['custom_files', 'wifi_networks'] self.unpack_userparams(req_params) for file in self.custom_files: if 'static_interference' in file: self.static_wifi_interference = unpack_custom_file(file) elif 'dynamic_interference' in file: self.dynamic_wifi_interference = unpack_custom_file(file) def setup_class(self): self.dut = self.android_devices[0] # Set attenuator to minimum attenuation if hasattr(self, 'attenuators'): self.attenuator = self.attenuators[0] self.attenuator.set_atten(INIT_ATTEN) self.wifi_int_pairs = [] for i in range(len(self.attenuators) - 1): tmp_dict = { 'attenuator': self.attenuators[i + 1], 'network': self.wifi_networks[i], 'channel': self.wifi_networks[i]['channel'] } tmp_obj = ObjNew(**tmp_dict) self.wifi_int_pairs.append(tmp_obj) ##Setup connection between WiFi APs and Phones and get DHCP address # for the interface for obj in self.wifi_int_pairs: obj.attenuator.set_atten(INIT_ATTEN) def setup_test(self): self.log.info("Setup test initiated") def teardown_class(self): for obj in self.wifi_int_pairs: obj.attenuator.set_atten(MAX_ATTENUATION) def teardown_test(self): for obj in self.wifi_int_pairs: obj.attenuator.set_atten(MAX_ATTENUATION) def test_inject_static_wifi_interference(self): condition = True while condition: attenuation = [ int(x) for x in input( "Please enter 4 channel attenuation value followed by comma :\n" ).split(',') ] self.set_atten_all_channel(attenuation) # Read interference RSSI self.interference_rssi = get_interference_rssi( self.dut, self.wifi_int_pairs) self.log.info('Under the WiFi interference condition: ' 'channel 1 RSSI: {} dBm, ' 'channel 6 RSSI: {} dBm' 'channel 11 RSSI: {} dBm'.format( self.interference_rssi[0]['rssi'], self.interference_rssi[1]['rssi'], self.interference_rssi[2]['rssi'])) condition = True return True def test_inject_dynamic_interface(self): atten = int(input("Please enter the attenuation level for CHAN1 :")) self.attenuator.set_atten(atten) self.log.info("Attenuation for CHAN1 set to:{} dB".format(atten)) interference_rssi = None self.channel_change_interval = self.dynamic_wifi_interference[ 'channel_change_interval_second'] self.wifi_int_levels = list( self.dynamic_wifi_interference['interference_level'].keys()) for wifi_level in self.wifi_int_levels: interference_atten_level = self.dynamic_wifi_interference[ 'interference_level'][wifi_level] all_pair = range(len(self.wifi_int_pairs)) # Set initial WiFi interference at channel 1 logging.info('Start with interference at channel 1') self.wifi_int_pairs[0].attenuator.set_atten( interference_atten_level) self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION) self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION) current_int_pair = [0] inactive_int_pairs = [ item for item in all_pair if item not in current_int_pair ] logging.info( 'Inject random changing channel (1,6,11) wifi interference' 'every {} second'.format(self.channel_change_interval)) while True: current_int_pair = [ random.randint(inactive_int_pairs[0], inactive_int_pairs[1]) ] inactive_int_pairs = [ item for item in all_pair if item not in current_int_pair ] self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten( interference_atten_level) logging.info('Current interference at channel {}'.format( self.wifi_int_pairs[current_int_pair[0]].channel)) for i in inactive_int_pairs: self.wifi_int_pairs[i].attenuator.set_atten( MAX_ATTENUATION) # Read interference RSSI self.interference_rssi = get_interference_rssi( self.dut, self.wifi_int_pairs) self.log.info('Under the WiFi interference condition: ' 'channel 1 RSSI: {} dBm, ' 'channel 6 RSSI: {} dBm' 'channel 11 RSSI: {} dBm'.format( self.interference_rssi[0]['rssi'], self.interference_rssi[1]['rssi'], self.interference_rssi[2]['rssi'])) time.sleep(self.channel_change_interval) return True def set_atten_all_channel(self, attenuation): self.attenuators[0].set_atten(attenuation[0]) self.attenuators[1].set_atten(attenuation[1]) self.attenuators[2].set_atten(attenuation[2]) self.attenuators[3].set_atten(attenuation[3]) self.log.info( "Attenuation set to CHAN1:{},CHAN2:{},CHAN3:{},CHAN4:{}".format( self.attenuators[0].get_atten(), self.attenuators[1].get_atten(), self.attenuators[2].get_atten(), self.attenuators[3].get_atten())) def get_interference_rssi(dut, wifi_int_pairs): """Function to read wifi interference RSSI level.""" bssids = [] interference_rssi = [] wutils.wifi_toggle_state(dut, True) for item in wifi_int_pairs: ssid = item.network['SSID'] bssid = item.network['bssid'] bssids.append(bssid) interference_rssi_dict = { "ssid": ssid, "bssid": bssid, "chan": item.channel, "rssi": 0 } interference_rssi.append(interference_rssi_dict) scaned_rssi = wpeutils.get_scan_rssi(dut, bssids, num_measurements=2) for item in interference_rssi: item['rssi'] = scaned_rssi[item['bssid']]['mean'] logging.info('Interference RSSI at channel {} is {} dBm'.format( item['chan'], item['rssi'])) wutils.wifi_toggle_state(dut, False) return interference_rssi