1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16"""Stream music through connected device from phone across different 17attenuations.""" 18from acts.signals import TestPass 19from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest 20from acts.metrics.loggers.blackbox import BlackboxMetricLogger 21from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results 22from multiprocessing import Process, Queue 23 24DEFAULT_THDN_THRESHOLD = 0.9 25MAX_ATTENUATION = 95 26TIME_OVERHEAD = 2 27 28 29class BtInterferenceStaticTest(BtInterferenceBaseTest): 30 def __init__(self, configs): 31 super().__init__(configs) 32 self.bt_attenuation_range = range(self.attenuation_vector['start'], 33 self.attenuation_vector['stop'] + 1, 34 self.attenuation_vector['step']) 35 36 self.iperf_duration = self.audio_params[ 37 'duration'] + TIME_OVERHEAD 38 for level in list( 39 self.static_wifi_interference['interference_level'].keys()): 40 for channels in self.static_wifi_interference['channels']: 41 self.generate_test_case( 42 self.static_wifi_interference['interference_level'][level], 43 channels) 44 45 test_metrics = [ 46 'wifi_chan1_rssi', 'wifi_chan6_rssi', 'wifi_chan11_rssi', 47 'bt_range' 48 ] 49 for metric in test_metrics: 50 setattr(self, '{}_metric'.format(metric), 51 BlackboxMetricLogger.for_test_case(metric_name=metric)) 52 53 def generate_test_case(self, interference_level, channels): 54 """Function to generate test cases with different parameters. 55 56 Args: 57 interference_level: wifi interference signal level 58 channels: wifi interference channel or channel combination 59 """ 60 def test_case_fn(): 61 self.bt_range_with_static_wifi_interference( 62 interference_level, channels) 63 64 str_channel_test = '' 65 for i in channels: 66 str_channel_test = str_channel_test + str(i) + '_' 67 test_case_name = ('test_bt_range_with_static_interference_level_{}_' 68 'channel_{}'.format(interference_level, 69 str_channel_test)) 70 setattr(self, test_case_name, test_case_fn) 71 72 def inject_static_wifi_interference(self, interference_level, channels): 73 """Function to inject wifi interference to bt link and read rssi. 74 75 Interference of IPERF traffic is always running, by setting attenuation, 76 the gate is opened to release the interference to the setup. 77 Args: 78 interference_level: the signal strength of wifi interference, use 79 attenuation level to represent this 80 channels: wifi channels where interference will 81 be injected, list 82 """ 83 all_pair = range(len(self.wifi_int_pairs)) 84 interference_pair_indices = self.locate_interference_pair_by_channel( 85 channels) 86 inactive_interference_pairs_indices = [ 87 item for item in all_pair if item not in interference_pair_indices 88 ] 89 self.log.info( 90 'WiFi interference at {} and inactive channels at {}'.format( 91 interference_pair_indices, 92 inactive_interference_pairs_indices)) 93 for i in interference_pair_indices: 94 self.wifi_int_pairs[i].attenuator.set_atten(interference_level) 95 self.log.info('Set attenuation {} dB on attenuator {}'.format( 96 self.wifi_int_pairs[i].attenuator.get_atten(), i + 1)) 97 for i in inactive_interference_pairs_indices: 98 self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION) 99 self.log.info('Set attenuation {} dB on attenuator {}'.format( 100 self.wifi_int_pairs[i].attenuator.get_atten(), i + 1)) 101 #Read interference RSSI 102 self.get_interference_rssi() 103 self.wifi_chan1_rssi_metric.metric_value = self.interference_rssi[0][ 104 'rssi'] 105 self.wifi_chan6_rssi_metric.metric_value = self.interference_rssi[1][ 106 'rssi'] 107 self.wifi_chan11_rssi_metric.metric_value = self.interference_rssi[2][ 108 'rssi'] 109 110 def bt_range_with_static_wifi_interference(self, interference_level, 111 channels): 112 """Test function to measure bt range under interference. 113 114 Args: 115 interference_level: wifi interference level 116 channels: wifi interference channels 117 """ 118 #setup wifi interference by setting the correct attenuator 119 self.inject_static_wifi_interference(interference_level, channels) 120 for atten in self.bt_attenuation_range: 121 # Set attenuation for BT link 122 self.attenuator.set_atten(atten) 123 [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics() 124 tag = 'attenuation_{}dB_'.format(atten) 125 self.log.info( 126 'BT attenuation set to {} dB and start A2DP streaming'.format( 127 atten)) 128 procs_iperf = [] 129 for obj in self.wifi_int_pairs: 130 obj.iperf_server.start() 131 self.log.info('Started IPERF server at port {}'.format( 132 obj.iperf_server.port)) 133 iperf_args = '-i 1 -t {} -p {} -J -R'.format( 134 self.iperf_duration, obj.iperf_server.port) 135 tag = 'chan_{}'.format(obj.channel) 136 proc_iperf = Process(target=obj.iperf_client.start, 137 args=(obj.server_address, iperf_args, 138 tag)) 139 procs_iperf.append(proc_iperf) 140 141 #play a2dp streaming and run thdn analysis 142 queue = Queue() 143 proc_bt = Process(target=self.play_and_record_audio, 144 args=(self.audio_params['duration'],queue)) 145 for proc in procs_iperf: 146 proc.start() 147 proc_bt.start() 148 proc_bt.join() 149 for proc in procs_iperf: 150 proc.join() 151 for obj in self.wifi_int_pairs: 152 iperf_throughput = get_iperf_results(obj.iperf_server) 153 self.log.info( 154 'Throughput for channel {} interference is {} Mbps'.format( 155 obj.channel, iperf_throughput)) 156 obj.iperf_server.stop() 157 self.log.info('Stopped IPERF server at port {}'.format( 158 obj.iperf_server.port)) 159 audio_captured = queue.get() 160 thdns = self.run_thdn_analysis(audio_captured, tag) 161 self.log.info('THDN results are {} at {} dB attenuation' 162 .format(thdns, atten)) 163 self.log.info('master rssi {} dBm, master tx power level {}, ' 164 'slave rssi {} dBm' 165 .format(rssi_master, pwl_master, rssi_slave)) 166 for thdn in thdns: 167 if thdn >= self.audio_params['thdn_threshold']: 168 self.log.info('Under the WiFi interference condition: ' 169 'channel 1 RSSI: {} dBm, ' 170 'channel 6 RSSI: {} dBm' 171 'channel 11 RSSI: {} dBm' 172 .format(self.interference_rssi[0]['rssi'], 173 self.interference_rssi[1]['rssi'], 174 self.interference_rssi[2]['rssi'])) 175 raise TestPass( 176 'Max range for this test is {}, with BT master RSSI at' 177 ' {} dBm'.format(atten, rssi_master)) 178