1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16import json 17import random 18import sys 19import logging 20import re 21from acts.base_test import BaseTestClass 22from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference 23from acts_contrib.test_utils.bt.BtInterferenceBaseTest import unpack_custom_file 24from acts_contrib.test_utils.power.PowerBaseTest import ObjNew 25from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils 26from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 27import time 28 29MAX_ATTENUATION = 95 30INIT_ATTEN = 0 31SCAN = 'wpa_cli scan' 32SCAN_RESULTS = 'wpa_cli scan_results' 33 34 35class InjectWifiInterferenceTest(BaseTestClass): 36 def __init__(self, configs): 37 super().__init__(configs) 38 req_params = ['custom_files', 'wifi_networks'] 39 self.unpack_userparams(req_params) 40 for file in self.custom_files: 41 if 'static_interference' in file: 42 self.static_wifi_interference = unpack_custom_file(file) 43 elif 'dynamic_interference' in file: 44 self.dynamic_wifi_interference = unpack_custom_file(file) 45 46 def setup_class(self): 47 48 self.dut = self.android_devices[0] 49 # Set attenuator to minimum attenuation 50 if hasattr(self, 'attenuators'): 51 self.attenuator = self.attenuators[0] 52 self.attenuator.set_atten(INIT_ATTEN) 53 self.wifi_int_pairs = [] 54 for i in range(len(self.attenuators) - 1): 55 tmp_dict = { 56 'attenuator': self.attenuators[i + 1], 57 'network': self.wifi_networks[i], 58 'channel': self.wifi_networks[i]['channel'] 59 } 60 tmp_obj = ObjNew(**tmp_dict) 61 self.wifi_int_pairs.append(tmp_obj) 62 ##Setup connection between WiFi APs and Phones and get DHCP address 63 # for the interface 64 for obj in self.wifi_int_pairs: 65 obj.attenuator.set_atten(INIT_ATTEN) 66 67 def setup_test(self): 68 self.log.info("Setup test initiated") 69 70 def teardown_class(self): 71 for obj in self.wifi_int_pairs: 72 obj.attenuator.set_atten(MAX_ATTENUATION) 73 74 def teardown_test(self): 75 for obj in self.wifi_int_pairs: 76 obj.attenuator.set_atten(MAX_ATTENUATION) 77 78 def test_inject_static_wifi_interference(self): 79 condition = True 80 while condition: 81 attenuation = [ 82 int(x) for x in input( 83 "Please enter 4 channel attenuation value followed by comma :\n" 84 ).split(',') 85 ] 86 self.set_atten_all_channel(attenuation) 87 # Read interference RSSI 88 self.interference_rssi = get_interference_rssi( 89 self.dut, self.wifi_int_pairs) 90 self.log.info('Under the WiFi interference condition: ' 91 'channel 1 RSSI: {} dBm, ' 92 'channel 6 RSSI: {} dBm' 93 'channel 11 RSSI: {} dBm'.format( 94 self.interference_rssi[0]['rssi'], 95 self.interference_rssi[1]['rssi'], 96 self.interference_rssi[2]['rssi'])) 97 condition = True 98 return True 99 100 def test_inject_dynamic_interface(self): 101 atten = int(input("Please enter the attenuation level for CHAN1 :")) 102 self.attenuator.set_atten(atten) 103 self.log.info("Attenuation for CHAN1 set to:{} dB".format(atten)) 104 interference_rssi = None 105 self.channel_change_interval = self.dynamic_wifi_interference[ 106 'channel_change_interval_second'] 107 self.wifi_int_levels = list( 108 self.dynamic_wifi_interference['interference_level'].keys()) 109 for wifi_level in self.wifi_int_levels: 110 interference_atten_level = self.dynamic_wifi_interference[ 111 'interference_level'][wifi_level] 112 all_pair = range(len(self.wifi_int_pairs)) 113 # Set initial WiFi interference at channel 1 114 logging.info('Start with interference at channel 1') 115 self.wifi_int_pairs[0].attenuator.set_atten( 116 interference_atten_level) 117 self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION) 118 self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION) 119 current_int_pair = [0] 120 inactive_int_pairs = [ 121 item for item in all_pair if item not in current_int_pair 122 ] 123 logging.info( 124 'Inject random changing channel (1,6,11) wifi interference' 125 'every {} second'.format(self.channel_change_interval)) 126 while True: 127 current_int_pair = [ 128 random.randint(inactive_int_pairs[0], 129 inactive_int_pairs[1]) 130 ] 131 inactive_int_pairs = [ 132 item for item in all_pair if item not in current_int_pair 133 ] 134 self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten( 135 interference_atten_level) 136 logging.info('Current interference at channel {}'.format( 137 self.wifi_int_pairs[current_int_pair[0]].channel)) 138 for i in inactive_int_pairs: 139 self.wifi_int_pairs[i].attenuator.set_atten( 140 MAX_ATTENUATION) 141 # Read interference RSSI 142 self.interference_rssi = get_interference_rssi( 143 self.dut, self.wifi_int_pairs) 144 self.log.info('Under the WiFi interference condition: ' 145 'channel 1 RSSI: {} dBm, ' 146 'channel 6 RSSI: {} dBm' 147 'channel 11 RSSI: {} dBm'.format( 148 self.interference_rssi[0]['rssi'], 149 self.interference_rssi[1]['rssi'], 150 self.interference_rssi[2]['rssi'])) 151 time.sleep(self.channel_change_interval) 152 return True 153 154 def set_atten_all_channel(self, attenuation): 155 self.attenuators[0].set_atten(attenuation[0]) 156 self.attenuators[1].set_atten(attenuation[1]) 157 self.attenuators[2].set_atten(attenuation[2]) 158 self.attenuators[3].set_atten(attenuation[3]) 159 self.log.info( 160 "Attenuation set to CHAN1:{},CHAN2:{},CHAN3:{},CHAN4:{}".format( 161 self.attenuators[0].get_atten(), 162 self.attenuators[1].get_atten(), 163 self.attenuators[2].get_atten(), 164 self.attenuators[3].get_atten())) 165 166 167def get_interference_rssi(dut, wifi_int_pairs): 168 """Function to read wifi interference RSSI level.""" 169 170 bssids = [] 171 interference_rssi = [] 172 wutils.wifi_toggle_state(dut, True) 173 for item in wifi_int_pairs: 174 ssid = item.network['SSID'] 175 bssid = item.network['bssid'] 176 bssids.append(bssid) 177 interference_rssi_dict = { 178 "ssid": ssid, 179 "bssid": bssid, 180 "chan": item.channel, 181 "rssi": 0 182 } 183 interference_rssi.append(interference_rssi_dict) 184 scaned_rssi = wpeutils.get_scan_rssi(dut, bssids, num_measurements=2) 185 for item in interference_rssi: 186 item['rssi'] = scaned_rssi[item['bssid']]['mean'] 187 logging.info('Interference RSSI at channel {} is {} dBm'.format( 188 item['chan'], item['rssi'])) 189 wutils.wifi_toggle_state(dut, False) 190 return interference_rssi 191