• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import json
18import os
19import time
20from collections import defaultdict
21
22from acts.metrics.loggers.blackbox import BlackboxMetricLogger
23from acts.test_utils.bt.bt_test_utils import disable_bluetooth
24from acts.test_utils.coex.CoexBaseTest import CoexBaseTest
25from acts.test_utils.coex.coex_test_utils import bokeh_chart_plot
26from acts.test_utils.coex.coex_test_utils import collect_bluetooth_manager_dumpsys_logs
27from acts.test_utils.coex.coex_test_utils import multithread_func
28from acts.test_utils.coex.coex_test_utils import wifi_connection_check
29from acts.test_utils.wifi.wifi_test_utils import wifi_connect
30from acts.test_utils.wifi.wifi_test_utils import wifi_test_device_init
31
32
33def get_atten_range(start, stop, step):
34    """Function to derive attenuation range for tests.
35
36    Args:
37        start: Start attenuation value.
38        stop: Stop attenuation value.
39        step: Step attenuation value.
40
41    Returns:
42        list of attenuation range.
43    """
44    atten_step = int(round((stop - start) / float(step)))
45    atten_range = [start + x * step for x in range(0, atten_step)]
46    return atten_range
47
48
49class CoexPerformanceBaseTest(CoexBaseTest):
50    """Base test class for performance tests.
51
52    Attributes:
53        rvr : Dict to save attenuation, throughput, fixed_attenuation.
54        a2dp_streaming : Used to denote a2dp test cases.
55    """
56
57    def __init__(self, controllers):
58        super().__init__(controllers)
59        self.a2dp_streaming = False
60        self.rvr = {}
61        self.bt_range_metric = BlackboxMetricLogger.for_test_case(
62            metric_name='bt_range')
63        self.wifi_max_atten_metric = BlackboxMetricLogger.for_test_case(
64            metric_name='wifi_max_atten')
65        self.wifi_min_atten_metric = BlackboxMetricLogger.for_test_case(
66            metric_name='wifi_min_atten')
67        self.wifi_range_metric = BlackboxMetricLogger.for_test_case(
68            metric_name='wifi_range_metric')
69
70    def setup_class(self):
71        req_params = ["test_params", "Attenuator"]
72        self.unpack_userparams(req_params)
73        if hasattr(self, "Attenuator"):
74            self.num_atten = self.attenuators[0].instrument.num_atten
75        else:
76            self.log.error("Attenuator should be connected to run tests.")
77            return False
78        for i in range(self.num_atten):
79            self.attenuators[i].set_atten(0)
80        super().setup_class()
81        if "performance_result_path" in self.user_params["test_params"]:
82            self.performance_files_list = [
83                os.path.join(self.test_params["performance_result_path"],
84                             files) for files in os.listdir(
85                                 self.test_params["performance_result_path"])
86            ]
87        self.bt_atten_range = get_atten_range(
88                            self.test_params["bt_atten_start"],
89                            self.test_params["bt_atten_stop"],
90                            self.test_params["bt_atten_step"])
91        self.wifi_atten_range = get_atten_range(
92                            self.test_params["attenuation_start"],
93                            self.test_params["attenuation_stop"],
94                            self.test_params["attenuation_step"])
95
96    def setup_test(self):
97        if "a2dp_streaming" in self.current_test_name:
98            self.a2dp_streaming = True
99        for i in range(self.num_atten):
100            self.attenuators[i].set_atten(0)
101        if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
102            wifi_connect(self.pri_ad, self.network, num_of_tries=5)
103        super().setup_test()
104
105    def teardown_test(self):
106        self.performance_baseline_check()
107        for i in range(self.num_atten):
108            self.attenuators[i].set_atten(0)
109            current_atten = int(self.attenuators[i].get_atten())
110            self.log.debug(
111                "Setting attenuation to zero : Current atten {} : {}".format(
112                    self.attenuators[i], current_atten))
113        self.a2dp_streaming = False
114        if not disable_bluetooth(self.pri_ad.droid):
115            self.log.info("Failed to disable bluetooth")
116            return False
117        self.destroy_android_and_relay_object()
118        self.rvr = {}
119
120    def teardown_class(self):
121        self.reset_wifi_and_store_results()
122
123    def set_attenuation_and_run_iperf(self, called_func=None):
124        """Sets attenuation and runs iperf for Attenuation max value.
125
126        Args:
127            called_func : Function object to run.
128
129        Returns:
130            True if Pass
131            False if Fail
132        """
133        self.attenuators[self.num_atten - 1].set_atten(0)
134        self.rvr["bt_attenuation"] = []
135        self.rvr["test_name"] = self.current_test_name
136        self.rvr["bt_gap_analysis"] = {}
137        self.rvr["bt_range"] = []
138        status_flag = True
139        for bt_atten in self.bt_atten_range:
140            self.rvr[bt_atten] = {}
141            self.rvr[bt_atten]["fixed_attenuation"] = (
142                self.test_params["fixed_attenuation"][str(self.network["channel"])])
143            self.log.info("Setting bt attenuation = {}".format(bt_atten))
144            self.attenuators[self.num_atten - 1].set_atten(bt_atten)
145            for i in range(self.num_atten - 1):
146                self.attenuators[i].set_atten(0)
147            if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
148                wifi_test_device_init(self.pri_ad)
149                wifi_connect(self.pri_ad, self.network, num_of_tries=5)
150            (self.rvr[bt_atten]["throughput_received"],
151             self.rvr[bt_atten]["a2dp_packet_drop"],
152             status_flag) = self.rvr_throughput(bt_atten, called_func)
153            self.wifi_max_atten_metric.metric_value = max(self.rvr[bt_atten]
154                                                          ["attenuation"])
155            self.wifi_min_atten_metric.metric_value = min(self.rvr[bt_atten]
156                                                          ["attenuation"])
157            for i, atten in enumerate(self.rvr[bt_atten]["attenuation"]):
158                if self.rvr[bt_atten]["throughput_received"][i] < 1.0:
159                    self.wifi_range_metric = self.rvr[bt_atten]["attenuation"][i-1]
160                    break
161            else:
162                self.wifi_range_metric = max(self.rvr[bt_atten]["attenuation"])
163            if self.a2dp_streaming:
164                if not any(x > 0 for x in self.a2dp_dropped_list):
165                    self.rvr[bt_atten]["a2dp_packet_drop"] = []
166        if not self.rvr["bt_range"]:
167            self.rvr["bt_range"].append(0)
168        return status_flag
169
170    def rvr_throughput(self, bt_atten, called_func=None):
171        """Sets attenuation and runs the function passed.
172
173        Args:
174            bt_atten: Bluetooth attenuation.
175            called_func: Functions object to run parallely.
176
177        Returns:
178            Throughput, a2dp_drops and True/False.
179        """
180        self.iperf_received = []
181        self.iperf_variables.received = []
182        self.a2dp_dropped_list = []
183        self.rvr["bt_attenuation"].append(bt_atten)
184        self.rvr[bt_atten]["audio_artifacts"] = {}
185        self.rvr[bt_atten]["attenuation"] = []
186        self.rvr["bt_gap_analysis"][bt_atten] = {}
187        for atten in self.wifi_atten_range:
188            self.rvr[bt_atten]["attenuation"].append(
189                atten + self.rvr[bt_atten]["fixed_attenuation"])
190            self.log.info("Setting attenuation = {}".format(atten))
191            for i in range(self.num_atten - 1):
192                self.attenuators[i].set_atten(atten)
193            if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
194                return self.iperf_received, self.a2dp_dropped_list
195            time.sleep(5)  # Time for attenuation to set.
196            if called_func:
197                if not multithread_func(self.log, called_func):
198                    self.teardown_result()
199                    self.iperf_received.append(float(str(
200                        self.iperf_variables.received[-1]).strip("Mb/s")))
201                    return self.iperf_received, self.a2dp_dropped_list, False
202            else:
203                self.run_iperf_and_get_result()
204            if self.a2dp_streaming:
205                analysis_path = self.audio.audio_quality_analysis(self.log_path)
206                with open(analysis_path) as f:
207                    self.rvr[bt_atten]["audio_artifacts"][atten] = f.readline()
208                content = json.loads(self.rvr[bt_atten]["audio_artifacts"][atten])
209                self.rvr["bt_gap_analysis"][bt_atten][atten] = {}
210                for idx, data in enumerate(content["quality_result"]):
211                    if data['artifacts']['delay_during_playback']:
212                        self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = (
213                                data['artifacts']['delay_during_playback'])
214                        self.rvr["bt_range"].append(bt_atten)
215                    else:
216                        self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = 0
217                file_path = collect_bluetooth_manager_dumpsys_logs(
218                    self.pri_ad, self.current_test_name)
219                self.a2dp_dropped_list.append(
220                    self.a2dp_dumpsys.parse(file_path))
221            self.teardown_result()
222            self.iperf_received.append(
223                    float(str(self.iperf_variables.received[-1]).strip("Mb/s")))
224        for i in range(self.num_atten - 1):
225            self.attenuators[i].set_atten(0)
226        return self.iperf_received, self.a2dp_dropped_list, True
227
228    def performance_baseline_check(self):
229        """Checks for performance_result_path in config. If present, plots
230        comparision chart else plot chart for that particular test run.
231
232        Returns:
233            True if success, False otherwise.
234        """
235        if self.rvr:
236            with open(self.json_file, 'a') as results_file:
237                json.dump({str(k): v for k, v in self.rvr.items()},
238                          results_file, indent=4, sort_keys=True)
239            self.bt_range_metric.metric_value = self.rvr["bt_range"]
240            self.log.info("BT range where gap has occurred = %s" %
241                          self.rvr["bt_range"][0])
242            self.log.info("BT min range = %s" % min(self.rvr["bt_attenuation"]))
243            self.log.info("BT max range = %s" % max(self.rvr["bt_attenuation"]))
244            with open(self.json_file, 'a') as result_file:
245                json.dump({str(k): v for k, v in self.rvr.items()}, result_file,
246                          indent=4, sort_keys=True)
247            self.plot_graph_for_attenuation()
248            self.throughput_pass_fail_check()
249        else:
250            self.log.error("Throughput dict empty!")
251            return False
252        return True
253
254    def plot_graph_for_attenuation(self):
255        """Plots graph and add as JSON formatted results for attenuation with
256        respect to its iperf values. Compares rvr results with baseline
257        values by calculating throughput limits.
258        """
259        data_sets = defaultdict(dict)
260        test_name = self.current_test_name
261        x_label = 'WIFI Attenuation (dB)'
262        y_label = []
263        legends = defaultdict(list)
264        fig_property = {
265            "title": test_name,
266            "x_label": x_label,
267            "linewidth": 3,
268            "markersize": 10
269        }
270
271        for bt_atten in self.rvr["bt_attenuation"]:
272            y_label.insert(0, 'Throughput (Mbps)')
273            legends[bt_atten].insert(
274                0, str("BT Attenuation @ %sdB" % bt_atten))
275            data_sets[bt_atten]["attenuation"] = (
276                self.rvr[bt_atten]["attenuation"])
277            data_sets[bt_atten]["throughput_received"] = (
278                self.rvr[bt_atten]["throughput_received"])
279        shaded_region = None
280
281        if "performance_result_path" in self.user_params["test_params"]:
282            try:
283                attenuation_path = [
284                    file_name for file_name in self.performance_files_list
285                    if test_name in file_name
286                ]
287                attenuation_path = attenuation_path[0]
288                with open(attenuation_path, 'r') as throughput_file:
289                    throughput_results = json.load(throughput_file)
290                for bt_atten in self.bt_atten_range:
291                    throughput_received = []
292                    legends[bt_atten].insert(
293                        0, ('Performance Results @ {}dB'.format(bt_atten)))
294                    throughput_attenuation = [
295                        att +
296                        (throughput_results[str(bt_atten)]["fixed_attenuation"])
297                        for att in self.rvr[bt_atten]["attenuation"]
298                    ]
299                    for idx, _ in enumerate(throughput_attenuation):
300                        throughput_received.append(throughput_results[str(
301                            bt_atten)]["throughput_received"][idx])
302                    data_sets[bt_atten][
303                        "user_attenuation"] = throughput_attenuation
304                    data_sets[bt_atten]["user_throughput"] = throughput_received
305                throughput_limits = self.get_throughput_limits(attenuation_path)
306                shaded_region = defaultdict(dict)
307                for bt_atten in self.bt_atten_range:
308                    shaded_region[bt_atten] = {}
309                    shaded_region[bt_atten] = {
310                        "x_vector": throughput_limits[bt_atten]["attenuation"],
311                        "lower_limit":
312                        throughput_limits[bt_atten]["lower_limit"],
313                        "upper_limit":
314                        throughput_limits[bt_atten]["upper_limit"]
315                    }
316            except Exception as e:
317                shaded_region = None
318                self.log.warning("ValueError: Performance file not found")
319
320        if self.a2dp_streaming:
321            for bt_atten in self.bt_atten_range:
322                legends[bt_atten].insert(
323                    0, ('Packet drops(in %) @ {}dB'.format(bt_atten)))
324                data_sets[bt_atten]["a2dp_attenuation"] = (
325                    self.rvr[bt_atten]["attenuation"])
326                data_sets[bt_atten]["a2dp_packet_drops"] = (
327                    self.rvr[bt_atten]["a2dp_packet_drop"])
328            y_label.insert(0, "Packets Dropped")
329        fig_property["y_label"] = y_label
330        output_file_path = os.path.join(self.pri_ad.log_path, test_name,
331                                        "attenuation_plot.html")
332        bokeh_chart_plot(
333            list(self.rvr["bt_attenuation"]),
334            data_sets,
335            legends,
336            fig_property,
337            shaded_region=shaded_region,
338            output_file_path=output_file_path)
339
340    def total_attenuation(self, performance_dict):
341        """Calculates attenuation with adding fixed attenuation.
342
343        Args:
344            performance_dict: dict containing attenuation and fixed attenuation.
345
346        Returns:
347            Total attenuation is returned.
348        """
349        if "fixed_attenuation" in self.test_params:
350            total_atten = [
351                att + performance_dict["fixed_attenuation"]
352                for att in performance_dict["attenuation"]
353            ]
354            return total_atten
355
356    def throughput_pass_fail_check(self):
357        """Check the test result and decide if it passed or failed
358        by comparing with throughput limits.The pass/fail tolerances are
359        provided in the config file.
360
361        Returns:
362            True if successful, False otherwise.
363        """
364        test_name = self.current_test_name
365        try:
366            performance_path = [
367                file_name for file_name in self.performance_files_list
368                if test_name in file_name
369            ]
370            performance_path = performance_path[0]
371            throughput_limits = self.get_throughput_limits(performance_path)
372
373            failure_count = 0
374            for bt_atten in self.bt_atten_range:
375                for idx, current_throughput in enumerate(
376                        self.rvr[bt_atten]["throughput_received"]):
377                    current_att = self.rvr[bt_atten]["attenuation"][idx] + (
378                        self.rvr[bt_atten]["fixed_attenuation"])
379                    if (current_throughput <
380                            (throughput_limits[bt_atten]["lower_limit"][idx]) or
381                            current_throughput >
382                            (throughput_limits[bt_atten]["upper_limit"][idx])):
383                        failure_count = failure_count + 1
384                        self.log.info(
385                            "Throughput at {} dB attenuation is beyond limits. "
386                            "Throughput is {} Mbps. Expected within [{}, {}] Mbps.".
387                            format(
388                                current_att, current_throughput,
389                                throughput_limits[bt_atten]["lower_limit"][idx],
390                                throughput_limits[bt_atten]["upper_limit"][
391                                    idx]))
392                if failure_count >= self.test_params["failure_count_tolerance"]:
393                    self.log.error(
394                        "Test failed. Found {} points outside throughput limits.".
395                        format(failure_count))
396                    return False
397                self.log.info(
398                    "Test passed. Found {} points outside throughput limits.".
399                    format(failure_count))
400        except Exception as e:
401            self.log.warning("ValueError: Performance file not found cannot "
402                             "calculate throughput limits")
403
404    def get_throughput_limits(self, performance_path):
405        """Compute throughput limits for current test.
406
407        Checks the RvR test result and compares to a throughput limits for
408        the same configuration. The pass/fail tolerances are provided in the
409        config file.
410
411        Args:
412            performance_path: path to baseline file used to generate limits
413
414        Returns:
415            throughput_limits: dict containing attenuation and throughput
416            limit data
417        """
418        with open(performance_path, 'r') as performance_file:
419            performance_results = json.load(performance_file)
420        throughput_limits = defaultdict(dict)
421        for bt_atten in self.bt_atten_range:
422            performance_attenuation = (self.total_attenuation(
423                performance_results[str(bt_atten)]))
424            attenuation = []
425            lower_limit = []
426            upper_limit = []
427            for idx, current_throughput in enumerate(
428                    self.rvr[bt_atten]["throughput_received"]):
429                current_att = self.rvr[bt_atten]["attenuation"][idx] + (
430                    self.rvr[bt_atten]["fixed_attenuation"])
431                att_distances = [
432                    abs(current_att - performance_att)
433                    for performance_att in performance_attenuation
434                ]
435                sorted_distances = sorted(
436                    enumerate(att_distances), key=lambda x: x[1])
437                closest_indeces = [dist[0] for dist in sorted_distances[0:3]]
438                closest_throughputs = [
439                    performance_results[str(bt_atten)]["throughput_received"][
440                        index] for index in closest_indeces
441                ]
442                closest_throughputs.sort()
443                attenuation.append(current_att)
444                lower_limit.append(
445                    max(closest_throughputs[0] -
446                        max(self.test_params["abs_tolerance"],
447                            closest_throughputs[0] *
448                            self.test_params["pct_tolerance"] / 100), 0))
449                upper_limit.append(closest_throughputs[-1] + max(
450                    self.test_params["abs_tolerance"], closest_throughputs[-1] *
451                    self.test_params["pct_tolerance"] / 100))
452            throughput_limits[bt_atten]["attenuation"] = attenuation
453            throughput_limits[bt_atten]["lower_limit"] = lower_limit
454            throughput_limits[bt_atten]["upper_limit"] = upper_limit
455        return throughput_limits
456
457