• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""Stream music through connected device from phone across different
17attenuations."""
18from acts.signals import TestPass
19from acts_contrib.test_utils.bt.BtInterferenceBaseTest import BtInterferenceBaseTest
20from acts.metrics.loggers.blackbox import BlackboxMetricLogger
21from acts_contrib.test_utils.bt.BtInterferenceBaseTest import get_iperf_results
22from multiprocessing import Process, Queue
23
24DEFAULT_THDN_THRESHOLD = 0.9
25MAX_ATTENUATION = 95
26TIME_OVERHEAD = 2
27
28
29class BtInterferenceStaticTest(BtInterferenceBaseTest):
30    def __init__(self, configs):
31        super().__init__(configs)
32        self.bt_attenuation_range = range(self.attenuation_vector['start'],
33                                          self.attenuation_vector['stop'] + 1,
34                                          self.attenuation_vector['step'])
35
36        self.iperf_duration = self.audio_params[
37            'duration'] + TIME_OVERHEAD
38        for level in list(
39                self.static_wifi_interference['interference_level'].keys()):
40            for channels in self.static_wifi_interference['channels']:
41                self.generate_test_case(
42                    self.static_wifi_interference['interference_level'][level],
43                    channels)
44
45        test_metrics = [
46            'wifi_chan1_rssi', 'wifi_chan6_rssi', 'wifi_chan11_rssi',
47            'bt_range'
48        ]
49        for metric in test_metrics:
50            setattr(self, '{}_metric'.format(metric),
51                    BlackboxMetricLogger.for_test_case(metric_name=metric))
52
53    def generate_test_case(self, interference_level, channels):
54        """Function to generate test cases with different parameters.
55
56        Args:
57           interference_level: wifi interference signal level
58           channels: wifi interference channel or channel combination
59        """
60        def test_case_fn():
61            self.bt_range_with_static_wifi_interference(
62                interference_level, channels)
63
64        str_channel_test = ''
65        for i in channels:
66            str_channel_test = str_channel_test + str(i) + '_'
67        test_case_name = ('test_bt_range_with_static_interference_level_{}_'
68                          'channel_{}'.format(interference_level,
69                                              str_channel_test))
70        setattr(self, test_case_name, test_case_fn)
71
72    def inject_static_wifi_interference(self, interference_level, channels):
73        """Function to inject wifi interference to bt link and read rssi.
74
75        Interference of IPERF traffic is always running, by setting attenuation,
76        the gate is opened to release the interference to the setup.
77        Args:
78            interference_level: the signal strength of wifi interference, use
79                attenuation level to represent this
80            channels: wifi channels where interference will
81                be injected, list
82        """
83        all_pair = range(len(self.wifi_int_pairs))
84        interference_pair_indices = self.locate_interference_pair_by_channel(
85            channels)
86        inactive_interference_pairs_indices = [
87            item for item in all_pair if item not in interference_pair_indices
88        ]
89        self.log.info(
90            'WiFi interference at {} and inactive channels at {}'.format(
91                interference_pair_indices,
92                inactive_interference_pairs_indices))
93        for i in interference_pair_indices:
94            self.wifi_int_pairs[i].attenuator.set_atten(interference_level)
95            self.log.info('Set attenuation {} dB on attenuator {}'.format(
96                self.wifi_int_pairs[i].attenuator.get_atten(), i + 1))
97        for i in inactive_interference_pairs_indices:
98            self.wifi_int_pairs[i].attenuator.set_atten(MAX_ATTENUATION)
99            self.log.info('Set attenuation {} dB on attenuator {}'.format(
100                self.wifi_int_pairs[i].attenuator.get_atten(), i + 1))
101        #Read interference RSSI
102        self.get_interference_rssi()
103        self.wifi_chan1_rssi_metric.metric_value = self.interference_rssi[0][
104            'rssi']
105        self.wifi_chan6_rssi_metric.metric_value = self.interference_rssi[1][
106            'rssi']
107        self.wifi_chan11_rssi_metric.metric_value = self.interference_rssi[2][
108            'rssi']
109
110    def bt_range_with_static_wifi_interference(self, interference_level,
111                                               channels):
112        """Test function to measure bt range under interference.
113
114        Args:
115            interference_level: wifi interference level
116            channels: wifi interference channels
117        """
118        #setup wifi interference by setting the correct attenuator
119        self.inject_static_wifi_interference(interference_level, channels)
120        for atten in self.bt_attenuation_range:
121            # Set attenuation for BT link
122            self.attenuator.set_atten(atten)
123            [rssi_master, pwl_master, rssi_slave] = self._get_bt_link_metrics()
124            tag = 'attenuation_{}dB_'.format(atten)
125            self.log.info(
126                'BT attenuation set to {} dB and start A2DP streaming'.format(
127                    atten))
128            procs_iperf = []
129            for obj in self.wifi_int_pairs:
130                obj.iperf_server.start()
131                self.log.info('Started IPERF server at port {}'.format(
132                    obj.iperf_server.port))
133                iperf_args = '-i 1 -t {} -p {} -J -R'.format(
134                    self.iperf_duration, obj.iperf_server.port)
135                tag = 'chan_{}'.format(obj.channel)
136                proc_iperf = Process(target=obj.iperf_client.start,
137                                     args=(obj.server_address, iperf_args,
138                                           tag))
139                procs_iperf.append(proc_iperf)
140
141            #play a2dp streaming and run thdn analysis
142            queue = Queue()
143            proc_bt = Process(target=self.play_and_record_audio,
144                              args=(self.audio_params['duration'],queue))
145            for proc in procs_iperf:
146                proc.start()
147            proc_bt.start()
148            proc_bt.join()
149            for proc in procs_iperf:
150                proc.join()
151            for obj in self.wifi_int_pairs:
152                iperf_throughput = get_iperf_results(obj.iperf_server)
153                self.log.info(
154                    'Throughput for channel {} interference is {} Mbps'.format(
155                        obj.channel, iperf_throughput))
156                obj.iperf_server.stop()
157                self.log.info('Stopped IPERF server at port {}'.format(
158                        obj.iperf_server.port))
159            audio_captured = queue.get()
160            thdns = self.run_thdn_analysis(audio_captured, tag)
161            self.log.info('THDN results are {} at {} dB attenuation'
162                          .format(thdns, atten))
163            self.log.info('master rssi {} dBm, master tx power level {}, '
164                          'slave rssi {} dBm'
165                          .format(rssi_master, pwl_master, rssi_slave))
166            for thdn in thdns:
167                if thdn >= self.audio_params['thdn_threshold']:
168                    self.log.info('Under the WiFi interference condition: '
169                                  'channel 1 RSSI: {} dBm, '
170                                  'channel 6 RSSI: {} dBm'
171                                  'channel 11 RSSI: {} dBm'
172                                  .format(self.interference_rssi[0]['rssi'],
173                                          self.interference_rssi[1]['rssi'],
174                                          self.interference_rssi[2]['rssi']))
175                    raise TestPass(
176                        'Max range for this test is {}, with BT master RSSI at'
177                        ' {} dBm'.format(atten, rssi_master))
178