• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16import json
17import random
18import sys
19import logging
20import re
21from acts.base_test import BaseTestClass
22from acts_contrib.test_utils.bt.BtInterferenceBaseTest import inject_static_wifi_interference
23from acts_contrib.test_utils.bt.BtInterferenceBaseTest import unpack_custom_file
24from acts_contrib.test_utils.power.PowerBaseTest import ObjNew
25from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wpeutils
26from acts_contrib.test_utils.wifi import wifi_test_utils as wutils
27import time
28
29MAX_ATTENUATION = 95
30INIT_ATTEN = 0
31SCAN = 'wpa_cli scan'
32SCAN_RESULTS = 'wpa_cli scan_results'
33
34
35class InjectWifiInterferenceTest(BaseTestClass):
36    def __init__(self, configs):
37        super().__init__(configs)
38        req_params = ['custom_files', 'wifi_networks']
39        self.unpack_userparams(req_params)
40        for file in self.custom_files:
41            if 'static_interference' in file:
42                self.static_wifi_interference = unpack_custom_file(file)
43            elif 'dynamic_interference' in file:
44                self.dynamic_wifi_interference = unpack_custom_file(file)
45
46    def setup_class(self):
47
48        self.dut = self.android_devices[0]
49        # Set attenuator to minimum attenuation
50        if hasattr(self, 'attenuators'):
51            self.attenuator = self.attenuators[0]
52            self.attenuator.set_atten(INIT_ATTEN)
53        self.wifi_int_pairs = []
54        for i in range(len(self.attenuators) - 1):
55            tmp_dict = {
56                'attenuator': self.attenuators[i + 1],
57                'network': self.wifi_networks[i],
58                'channel': self.wifi_networks[i]['channel']
59            }
60            tmp_obj = ObjNew(**tmp_dict)
61            self.wifi_int_pairs.append(tmp_obj)
62        ##Setup connection between WiFi APs and Phones and get DHCP address
63        # for the interface
64        for obj in self.wifi_int_pairs:
65            obj.attenuator.set_atten(INIT_ATTEN)
66
67    def setup_test(self):
68        self.log.info("Setup test initiated")
69
70    def teardown_class(self):
71        for obj in self.wifi_int_pairs:
72            obj.attenuator.set_atten(MAX_ATTENUATION)
73
74    def teardown_test(self):
75        for obj in self.wifi_int_pairs:
76            obj.attenuator.set_atten(MAX_ATTENUATION)
77
78    def test_inject_static_wifi_interference(self):
79        condition = True
80        while condition:
81            attenuation = [
82                int(x) for x in input(
83                    "Please enter 4 channel attenuation value followed by comma :\n"
84                ).split(',')
85            ]
86            self.set_atten_all_channel(attenuation)
87            # Read interference RSSI
88            self.interference_rssi = get_interference_rssi(
89                self.dut, self.wifi_int_pairs)
90            self.log.info('Under the WiFi interference condition: '
91                          'channel 1 RSSI: {} dBm, '
92                          'channel 6 RSSI: {} dBm'
93                          'channel 11 RSSI: {} dBm'.format(
94                              self.interference_rssi[0]['rssi'],
95                              self.interference_rssi[1]['rssi'],
96                              self.interference_rssi[2]['rssi']))
97            condition = True
98        return True
99
100    def test_inject_dynamic_interface(self):
101        atten = int(input("Please enter the attenuation level for CHAN1 :"))
102        self.attenuator.set_atten(atten)
103        self.log.info("Attenuation for CHAN1 set to:{} dB".format(atten))
104        interference_rssi = None
105        self.channel_change_interval = self.dynamic_wifi_interference[
106            'channel_change_interval_second']
107        self.wifi_int_levels = list(
108            self.dynamic_wifi_interference['interference_level'].keys())
109        for wifi_level in self.wifi_int_levels:
110            interference_atten_level = self.dynamic_wifi_interference[
111                'interference_level'][wifi_level]
112            all_pair = range(len(self.wifi_int_pairs))
113            # Set initial WiFi interference at channel 1
114            logging.info('Start with interference at channel 1')
115            self.wifi_int_pairs[0].attenuator.set_atten(
116                interference_atten_level)
117            self.wifi_int_pairs[1].attenuator.set_atten(MAX_ATTENUATION)
118            self.wifi_int_pairs[2].attenuator.set_atten(MAX_ATTENUATION)
119            current_int_pair = [0]
120            inactive_int_pairs = [
121                item for item in all_pair if item not in current_int_pair
122            ]
123            logging.info(
124                'Inject random changing channel (1,6,11) wifi interference'
125                'every {} second'.format(self.channel_change_interval))
126            while True:
127                current_int_pair = [
128                    random.randint(inactive_int_pairs[0],
129                                   inactive_int_pairs[1])
130                ]
131                inactive_int_pairs = [
132                    item for item in all_pair if item not in current_int_pair
133                ]
134                self.wifi_int_pairs[current_int_pair[0]].attenuator.set_atten(
135                    interference_atten_level)
136                logging.info('Current interference at channel {}'.format(
137                    self.wifi_int_pairs[current_int_pair[0]].channel))
138                for i in inactive_int_pairs:
139                    self.wifi_int_pairs[i].attenuator.set_atten(
140                        MAX_ATTENUATION)
141                # Read interference RSSI
142                self.interference_rssi = get_interference_rssi(
143                    self.dut, self.wifi_int_pairs)
144                self.log.info('Under the WiFi interference condition: '
145                              'channel 1 RSSI: {} dBm, '
146                              'channel 6 RSSI: {} dBm'
147                              'channel 11 RSSI: {} dBm'.format(
148                                  self.interference_rssi[0]['rssi'],
149                                  self.interference_rssi[1]['rssi'],
150                                  self.interference_rssi[2]['rssi']))
151                time.sleep(self.channel_change_interval)
152            return True
153
154    def set_atten_all_channel(self, attenuation):
155        self.attenuators[0].set_atten(attenuation[0])
156        self.attenuators[1].set_atten(attenuation[1])
157        self.attenuators[2].set_atten(attenuation[2])
158        self.attenuators[3].set_atten(attenuation[3])
159        self.log.info(
160            "Attenuation set to CHAN1:{},CHAN2:{},CHAN3:{},CHAN4:{}".format(
161                self.attenuators[0].get_atten(),
162                self.attenuators[1].get_atten(),
163                self.attenuators[2].get_atten(),
164                self.attenuators[3].get_atten()))
165
166
167def get_interference_rssi(dut, wifi_int_pairs):
168    """Function to read wifi interference RSSI level."""
169
170    bssids = []
171    interference_rssi = []
172    wutils.wifi_toggle_state(dut, True)
173    for item in wifi_int_pairs:
174        ssid = item.network['SSID']
175        bssid = item.network['bssid']
176        bssids.append(bssid)
177        interference_rssi_dict = {
178            "ssid": ssid,
179            "bssid": bssid,
180            "chan": item.channel,
181            "rssi": 0
182        }
183        interference_rssi.append(interference_rssi_dict)
184    scaned_rssi = wpeutils.get_scan_rssi(dut, bssids, num_measurements=2)
185    for item in interference_rssi:
186        item['rssi'] = scaned_rssi[item['bssid']]['mean']
187        logging.info('Interference RSSI at channel {} is {} dBm'.format(
188            item['chan'], item['rssi']))
189    wutils.wifi_toggle_state(dut, False)
190    return interference_rssi
191