1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import json 18import os 19import time 20from collections import defaultdict 21 22from acts.metrics.loggers.blackbox import BlackboxMetricLogger 23from acts.test_utils.bt.bt_test_utils import disable_bluetooth 24from acts.test_utils.coex.CoexBaseTest import CoexBaseTest 25from acts.test_utils.coex.coex_test_utils import bokeh_chart_plot 26from acts.test_utils.coex.coex_test_utils import collect_bluetooth_manager_dumpsys_logs 27from acts.test_utils.coex.coex_test_utils import multithread_func 28from acts.test_utils.coex.coex_test_utils import wifi_connection_check 29from acts.test_utils.wifi.wifi_test_utils import wifi_connect 30from acts.test_utils.wifi.wifi_test_utils import wifi_test_device_init 31 32 33def get_atten_range(start, stop, step): 34 """Function to derive attenuation range for tests. 35 36 Args: 37 start: Start attenuation value. 38 stop: Stop attenuation value. 39 step: Step attenuation value. 40 41 Returns: 42 list of attenuation range. 43 """ 44 atten_step = int(round((stop - start) / float(step))) 45 atten_range = [start + x * step for x in range(0, atten_step)] 46 return atten_range 47 48 49class CoexPerformanceBaseTest(CoexBaseTest): 50 """Base test class for performance tests. 51 52 Attributes: 53 rvr : Dict to save attenuation, throughput, fixed_attenuation. 54 a2dp_streaming : Used to denote a2dp test cases. 55 """ 56 57 def __init__(self, controllers): 58 super().__init__(controllers) 59 self.a2dp_streaming = False 60 self.rvr = {} 61 self.bt_range_metric = BlackboxMetricLogger.for_test_case( 62 metric_name='bt_range') 63 self.wifi_max_atten_metric = BlackboxMetricLogger.for_test_case( 64 metric_name='wifi_max_atten') 65 self.wifi_min_atten_metric = BlackboxMetricLogger.for_test_case( 66 metric_name='wifi_min_atten') 67 self.wifi_range_metric = BlackboxMetricLogger.for_test_case( 68 metric_name='wifi_range_metric') 69 70 def setup_class(self): 71 req_params = ["test_params", "Attenuator"] 72 self.unpack_userparams(req_params) 73 if hasattr(self, "Attenuator"): 74 self.num_atten = self.attenuators[0].instrument.num_atten 75 else: 76 self.log.error("Attenuator should be connected to run tests.") 77 return False 78 for i in range(self.num_atten): 79 self.attenuators[i].set_atten(0) 80 super().setup_class() 81 if "performance_result_path" in self.user_params["test_params"]: 82 self.performance_files_list = [ 83 os.path.join(self.test_params["performance_result_path"], 84 files) for files in os.listdir( 85 self.test_params["performance_result_path"]) 86 ] 87 self.bt_atten_range = get_atten_range( 88 self.test_params["bt_atten_start"], 89 self.test_params["bt_atten_stop"], 90 self.test_params["bt_atten_step"]) 91 self.wifi_atten_range = get_atten_range( 92 self.test_params["attenuation_start"], 93 self.test_params["attenuation_stop"], 94 self.test_params["attenuation_step"]) 95 96 def setup_test(self): 97 if "a2dp_streaming" in self.current_test_name: 98 self.a2dp_streaming = True 99 for i in range(self.num_atten): 100 self.attenuators[i].set_atten(0) 101 if not wifi_connection_check(self.pri_ad, self.network["SSID"]): 102 wifi_connect(self.pri_ad, self.network, num_of_tries=5) 103 super().setup_test() 104 105 def teardown_test(self): 106 self.performance_baseline_check() 107 for i in range(self.num_atten): 108 self.attenuators[i].set_atten(0) 109 current_atten = int(self.attenuators[i].get_atten()) 110 self.log.debug( 111 "Setting attenuation to zero : Current atten {} : {}".format( 112 self.attenuators[i], current_atten)) 113 self.a2dp_streaming = False 114 if not disable_bluetooth(self.pri_ad.droid): 115 self.log.info("Failed to disable bluetooth") 116 return False 117 self.destroy_android_and_relay_object() 118 self.rvr = {} 119 120 def teardown_class(self): 121 self.reset_wifi_and_store_results() 122 123 def set_attenuation_and_run_iperf(self, called_func=None): 124 """Sets attenuation and runs iperf for Attenuation max value. 125 126 Args: 127 called_func : Function object to run. 128 129 Returns: 130 True if Pass 131 False if Fail 132 """ 133 self.attenuators[self.num_atten - 1].set_atten(0) 134 self.rvr["bt_attenuation"] = [] 135 self.rvr["test_name"] = self.current_test_name 136 self.rvr["bt_gap_analysis"] = {} 137 self.rvr["bt_range"] = [] 138 status_flag = True 139 for bt_atten in self.bt_atten_range: 140 self.rvr[bt_atten] = {} 141 self.rvr[bt_atten]["fixed_attenuation"] = ( 142 self.test_params["fixed_attenuation"][str(self.network["channel"])]) 143 self.log.info("Setting bt attenuation = {}".format(bt_atten)) 144 self.attenuators[self.num_atten - 1].set_atten(bt_atten) 145 for i in range(self.num_atten - 1): 146 self.attenuators[i].set_atten(0) 147 if not wifi_connection_check(self.pri_ad, self.network["SSID"]): 148 wifi_test_device_init(self.pri_ad) 149 wifi_connect(self.pri_ad, self.network, num_of_tries=5) 150 (self.rvr[bt_atten]["throughput_received"], 151 self.rvr[bt_atten]["a2dp_packet_drop"], 152 status_flag) = self.rvr_throughput(bt_atten, called_func) 153 self.wifi_max_atten_metric.metric_value = max(self.rvr[bt_atten] 154 ["attenuation"]) 155 self.wifi_min_atten_metric.metric_value = min(self.rvr[bt_atten] 156 ["attenuation"]) 157 for i, atten in enumerate(self.rvr[bt_atten]["attenuation"]): 158 if self.rvr[bt_atten]["throughput_received"][i] < 1.0: 159 self.wifi_range_metric = self.rvr[bt_atten]["attenuation"][i-1] 160 break 161 else: 162 self.wifi_range_metric = max(self.rvr[bt_atten]["attenuation"]) 163 if self.a2dp_streaming: 164 if not any(x > 0 for x in self.a2dp_dropped_list): 165 self.rvr[bt_atten]["a2dp_packet_drop"] = [] 166 if not self.rvr["bt_range"]: 167 self.rvr["bt_range"].append(0) 168 return status_flag 169 170 def rvr_throughput(self, bt_atten, called_func=None): 171 """Sets attenuation and runs the function passed. 172 173 Args: 174 bt_atten: Bluetooth attenuation. 175 called_func: Functions object to run parallely. 176 177 Returns: 178 Throughput, a2dp_drops and True/False. 179 """ 180 self.iperf_received = [] 181 self.iperf_variables.received = [] 182 self.a2dp_dropped_list = [] 183 self.rvr["bt_attenuation"].append(bt_atten) 184 self.rvr[bt_atten]["audio_artifacts"] = {} 185 self.rvr[bt_atten]["attenuation"] = [] 186 self.rvr["bt_gap_analysis"][bt_atten] = {} 187 for atten in self.wifi_atten_range: 188 self.rvr[bt_atten]["attenuation"].append( 189 atten + self.rvr[bt_atten]["fixed_attenuation"]) 190 self.log.info("Setting attenuation = {}".format(atten)) 191 for i in range(self.num_atten - 1): 192 self.attenuators[i].set_atten(atten) 193 if not wifi_connection_check(self.pri_ad, self.network["SSID"]): 194 return self.iperf_received, self.a2dp_dropped_list 195 time.sleep(5) # Time for attenuation to set. 196 if called_func: 197 if not multithread_func(self.log, called_func): 198 self.teardown_result() 199 self.iperf_received.append(float(str( 200 self.iperf_variables.received[-1]).strip("Mb/s"))) 201 return self.iperf_received, self.a2dp_dropped_list, False 202 else: 203 self.run_iperf_and_get_result() 204 if self.a2dp_streaming: 205 analysis_path = self.audio.audio_quality_analysis(self.log_path) 206 with open(analysis_path) as f: 207 self.rvr[bt_atten]["audio_artifacts"][atten] = f.readline() 208 content = json.loads(self.rvr[bt_atten]["audio_artifacts"][atten]) 209 self.rvr["bt_gap_analysis"][bt_atten][atten] = {} 210 for idx, data in enumerate(content["quality_result"]): 211 if data['artifacts']['delay_during_playback']: 212 self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = ( 213 data['artifacts']['delay_during_playback']) 214 self.rvr["bt_range"].append(bt_atten) 215 else: 216 self.rvr["bt_gap_analysis"][bt_atten][atten][idx] = 0 217 file_path = collect_bluetooth_manager_dumpsys_logs( 218 self.pri_ad, self.current_test_name) 219 self.a2dp_dropped_list.append( 220 self.a2dp_dumpsys.parse(file_path)) 221 self.teardown_result() 222 self.iperf_received.append( 223 float(str(self.iperf_variables.received[-1]).strip("Mb/s"))) 224 for i in range(self.num_atten - 1): 225 self.attenuators[i].set_atten(0) 226 return self.iperf_received, self.a2dp_dropped_list, True 227 228 def performance_baseline_check(self): 229 """Checks for performance_result_path in config. If present, plots 230 comparision chart else plot chart for that particular test run. 231 232 Returns: 233 True if success, False otherwise. 234 """ 235 if self.rvr: 236 with open(self.json_file, 'a') as results_file: 237 json.dump({str(k): v for k, v in self.rvr.items()}, 238 results_file, indent=4, sort_keys=True) 239 self.bt_range_metric.metric_value = self.rvr["bt_range"] 240 self.log.info("BT range where gap has occurred = %s" % 241 self.rvr["bt_range"][0]) 242 self.log.info("BT min range = %s" % min(self.rvr["bt_attenuation"])) 243 self.log.info("BT max range = %s" % max(self.rvr["bt_attenuation"])) 244 with open(self.json_file, 'a') as result_file: 245 json.dump({str(k): v for k, v in self.rvr.items()}, result_file, 246 indent=4, sort_keys=True) 247 self.plot_graph_for_attenuation() 248 self.throughput_pass_fail_check() 249 else: 250 self.log.error("Throughput dict empty!") 251 return False 252 return True 253 254 def plot_graph_for_attenuation(self): 255 """Plots graph and add as JSON formatted results for attenuation with 256 respect to its iperf values. Compares rvr results with baseline 257 values by calculating throughput limits. 258 """ 259 data_sets = defaultdict(dict) 260 test_name = self.current_test_name 261 x_label = 'WIFI Attenuation (dB)' 262 y_label = [] 263 legends = defaultdict(list) 264 fig_property = { 265 "title": test_name, 266 "x_label": x_label, 267 "linewidth": 3, 268 "markersize": 10 269 } 270 271 for bt_atten in self.rvr["bt_attenuation"]: 272 y_label.insert(0, 'Throughput (Mbps)') 273 legends[bt_atten].insert( 274 0, str("BT Attenuation @ %sdB" % bt_atten)) 275 data_sets[bt_atten]["attenuation"] = ( 276 self.rvr[bt_atten]["attenuation"]) 277 data_sets[bt_atten]["throughput_received"] = ( 278 self.rvr[bt_atten]["throughput_received"]) 279 shaded_region = None 280 281 if "performance_result_path" in self.user_params["test_params"]: 282 try: 283 attenuation_path = [ 284 file_name for file_name in self.performance_files_list 285 if test_name in file_name 286 ] 287 attenuation_path = attenuation_path[0] 288 with open(attenuation_path, 'r') as throughput_file: 289 throughput_results = json.load(throughput_file) 290 for bt_atten in self.bt_atten_range: 291 throughput_received = [] 292 legends[bt_atten].insert( 293 0, ('Performance Results @ {}dB'.format(bt_atten))) 294 throughput_attenuation = [ 295 att + 296 (throughput_results[str(bt_atten)]["fixed_attenuation"]) 297 for att in self.rvr[bt_atten]["attenuation"] 298 ] 299 for idx, _ in enumerate(throughput_attenuation): 300 throughput_received.append(throughput_results[str( 301 bt_atten)]["throughput_received"][idx]) 302 data_sets[bt_atten][ 303 "user_attenuation"] = throughput_attenuation 304 data_sets[bt_atten]["user_throughput"] = throughput_received 305 throughput_limits = self.get_throughput_limits(attenuation_path) 306 shaded_region = defaultdict(dict) 307 for bt_atten in self.bt_atten_range: 308 shaded_region[bt_atten] = {} 309 shaded_region[bt_atten] = { 310 "x_vector": throughput_limits[bt_atten]["attenuation"], 311 "lower_limit": 312 throughput_limits[bt_atten]["lower_limit"], 313 "upper_limit": 314 throughput_limits[bt_atten]["upper_limit"] 315 } 316 except Exception as e: 317 shaded_region = None 318 self.log.warning("ValueError: Performance file not found") 319 320 if self.a2dp_streaming: 321 for bt_atten in self.bt_atten_range: 322 legends[bt_atten].insert( 323 0, ('Packet drops(in %) @ {}dB'.format(bt_atten))) 324 data_sets[bt_atten]["a2dp_attenuation"] = ( 325 self.rvr[bt_atten]["attenuation"]) 326 data_sets[bt_atten]["a2dp_packet_drops"] = ( 327 self.rvr[bt_atten]["a2dp_packet_drop"]) 328 y_label.insert(0, "Packets Dropped") 329 fig_property["y_label"] = y_label 330 output_file_path = os.path.join(self.pri_ad.log_path, test_name, 331 "attenuation_plot.html") 332 bokeh_chart_plot( 333 list(self.rvr["bt_attenuation"]), 334 data_sets, 335 legends, 336 fig_property, 337 shaded_region=shaded_region, 338 output_file_path=output_file_path) 339 340 def total_attenuation(self, performance_dict): 341 """Calculates attenuation with adding fixed attenuation. 342 343 Args: 344 performance_dict: dict containing attenuation and fixed attenuation. 345 346 Returns: 347 Total attenuation is returned. 348 """ 349 if "fixed_attenuation" in self.test_params: 350 total_atten = [ 351 att + performance_dict["fixed_attenuation"] 352 for att in performance_dict["attenuation"] 353 ] 354 return total_atten 355 356 def throughput_pass_fail_check(self): 357 """Check the test result and decide if it passed or failed 358 by comparing with throughput limits.The pass/fail tolerances are 359 provided in the config file. 360 361 Returns: 362 True if successful, False otherwise. 363 """ 364 test_name = self.current_test_name 365 try: 366 performance_path = [ 367 file_name for file_name in self.performance_files_list 368 if test_name in file_name 369 ] 370 performance_path = performance_path[0] 371 throughput_limits = self.get_throughput_limits(performance_path) 372 373 failure_count = 0 374 for bt_atten in self.bt_atten_range: 375 for idx, current_throughput in enumerate( 376 self.rvr[bt_atten]["throughput_received"]): 377 current_att = self.rvr[bt_atten]["attenuation"][idx] + ( 378 self.rvr[bt_atten]["fixed_attenuation"]) 379 if (current_throughput < 380 (throughput_limits[bt_atten]["lower_limit"][idx]) or 381 current_throughput > 382 (throughput_limits[bt_atten]["upper_limit"][idx])): 383 failure_count = failure_count + 1 384 self.log.info( 385 "Throughput at {} dB attenuation is beyond limits. " 386 "Throughput is {} Mbps. Expected within [{}, {}] Mbps.". 387 format( 388 current_att, current_throughput, 389 throughput_limits[bt_atten]["lower_limit"][idx], 390 throughput_limits[bt_atten]["upper_limit"][ 391 idx])) 392 if failure_count >= self.test_params["failure_count_tolerance"]: 393 self.log.error( 394 "Test failed. Found {} points outside throughput limits.". 395 format(failure_count)) 396 return False 397 self.log.info( 398 "Test passed. Found {} points outside throughput limits.". 399 format(failure_count)) 400 except Exception as e: 401 self.log.warning("ValueError: Performance file not found cannot " 402 "calculate throughput limits") 403 404 def get_throughput_limits(self, performance_path): 405 """Compute throughput limits for current test. 406 407 Checks the RvR test result and compares to a throughput limits for 408 the same configuration. The pass/fail tolerances are provided in the 409 config file. 410 411 Args: 412 performance_path: path to baseline file used to generate limits 413 414 Returns: 415 throughput_limits: dict containing attenuation and throughput 416 limit data 417 """ 418 with open(performance_path, 'r') as performance_file: 419 performance_results = json.load(performance_file) 420 throughput_limits = defaultdict(dict) 421 for bt_atten in self.bt_atten_range: 422 performance_attenuation = (self.total_attenuation( 423 performance_results[str(bt_atten)])) 424 attenuation = [] 425 lower_limit = [] 426 upper_limit = [] 427 for idx, current_throughput in enumerate( 428 self.rvr[bt_atten]["throughput_received"]): 429 current_att = self.rvr[bt_atten]["attenuation"][idx] + ( 430 self.rvr[bt_atten]["fixed_attenuation"]) 431 att_distances = [ 432 abs(current_att - performance_att) 433 for performance_att in performance_attenuation 434 ] 435 sorted_distances = sorted( 436 enumerate(att_distances), key=lambda x: x[1]) 437 closest_indeces = [dist[0] for dist in sorted_distances[0:3]] 438 closest_throughputs = [ 439 performance_results[str(bt_atten)]["throughput_received"][ 440 index] for index in closest_indeces 441 ] 442 closest_throughputs.sort() 443 attenuation.append(current_att) 444 lower_limit.append( 445 max(closest_throughputs[0] - 446 max(self.test_params["abs_tolerance"], 447 closest_throughputs[0] * 448 self.test_params["pct_tolerance"] / 100), 0)) 449 upper_limit.append(closest_throughputs[-1] + max( 450 self.test_params["abs_tolerance"], closest_throughputs[-1] * 451 self.test_params["pct_tolerance"] / 100)) 452 throughput_limits[bt_atten]["attenuation"] = attenuation 453 throughput_limits[bt_atten]["lower_limit"] = lower_limit 454 throughput_limits[bt_atten]["upper_limit"] = upper_limit 455 return throughput_limits 456 457