1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import itertools 18import pprint 19import time 20 21import acts.signals 22import acts_contrib.test_utils.wifi.wifi_test_utils as wutils 23 24from acts import asserts 25from acts.test_decorators import test_tracker_info 26from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest 27from acts.controllers import iperf_server as ipf 28from acts.controllers import attenuator 29from acts.controllers.sl4a_lib import rpc_client 30 31import json 32import logging 33import math 34import os 35from acts import utils 36import csv 37 38import serial 39import sys 40import urllib.request 41 42from acts_contrib.test_utils.wifi import wifi_performance_test_utils_RSSI as wperfutils 43 44WifiEnums = wutils.WifiEnums 45 46 47class WifiRvrTwTest(WifiBaseTest): 48 """ Tests for wifi RVR performance. 49 50 Test Bed Requirement: 51 * One Android device 52 * Wi-Fi networks visible to the device 53 """ 54 TEST_TIMEOUT = 10 55 IPERF_SETUP_TIME = 5 56 TURN_TABLE_SETUP_TIME = 5 57 58 def __init__(self, controllers): 59 WifiBaseTest.__init__(self, controllers) 60 61 def setup_class(self): 62 self.dut = self.android_devices[0] 63 64 req_params = ["rvr_networks", "rvr_test_params", "attenuators"] 65 opt_params = ["angle_params", "usb_port"] 66 self.unpack_userparams( 67 req_param_names=req_params, opt_param_names=opt_params) 68 asserts.assert_true( 69 len(self.rvr_networks) > 0, "Need at least one network.") 70 71 if "rvr_test_params" in self.user_params: 72 self.iperf_server = self.iperf_servers[0] 73 self.maxdb = self.rvr_test_params["rvr_atten_maxdb"] 74 self.mindb = self.rvr_test_params["rvr_atten_mindb"] 75 self.stepdb = self.rvr_test_params["rvr_atten_step"] 76 self.country_code = self.rvr_test_params["country_code"] 77 if "angle_params" in self.user_params: 78 self.angle_list = self.angle_params 79 if "usb_port" in self.user_params: 80 self.turntable_port = self.read_comport(self.usb_port["turntable"]) 81 82 # Init DUT 83 wutils.wifi_test_device_init(self.dut, self.country_code) 84 self.dut.droid.bluetoothToggleState(False) 85 utils.set_location_service(self.dut, False) 86 wutils.wifi_toggle_state(self.dut, True) 87 utils.subprocess.check_output( 88 "adb root", shell=True, timeout=self.TEST_TIMEOUT) 89 utils.subprocess.check_output( 90 "adb shell settings put system screen_off_timeout 18000000", 91 shell=True, 92 timeout=self.TEST_TIMEOUT) 93 utils.subprocess.check_output( 94 "adb shell svc power stayon true", 95 shell=True, 96 timeout=self.TEST_TIMEOUT) 97 98 # create folder for rvr test result 99 self.log_path = os.path.join(logging.log_path, "rvr_results") 100 utils.create_dir(self.log_path) 101 102 Header = ("Test_date", "Project", "Device_SN", "ROM", "HW_Stage", 103 "test_SSID", "Frequency", "Turn_table_orientation", 104 "Attenuate_dB", "Signal_poll_avg_rssi", "Chain_0_rssi", 105 "Chain_1_rssi", "Link_speed", "TX_throughput_Mbps", 106 "RX_throughput_Mbps", "HE_Capable", "Country_code", "Channel", 107 "WiFi_chip", "Type", "Host_name", "AP_model", 108 "Incremental_build_id", "Build_type", "TCP_UDP_Protocol", 109 "Security_type", "Test_tool", "Airplane_mode_status", "BT_status", 110 "Bug_ID", "Comment") 111 self.csv_write(Header) 112 113 def setup_test(self): 114 self.dut.droid.wakeLockAcquireBright() 115 self.dut.droid.wakeUpNow() 116 rom_info = self.get_rominfo() 117 self.testdate = time.strftime("%Y-%m-%d", time.localtime()) 118 self.rom = rom_info[0] 119 self.build_id = rom_info[1] 120 self.build_type = rom_info[2] 121 self.project = rom_info[3] 122 self.ret_country_code = self.get_country_code() 123 self.ret_hw_stage = self.get_hw_stage() 124 self.ret_platform = wperfutils.detect_wifi_platform(self.dut) 125 126 def teardown_test(self): 127 self.dut.droid.wakeLockRelease() 128 self.dut.droid.goToSleepNow() 129 wutils.set_attns(self.attenuators, "default") 130 131 def teardown_class(self): 132 if "rvr_test_params" in self.user_params: 133 self.iperf_server.stop() 134 135 def on_fail(self, test_name, begin_time): 136 self.dut.take_bug_report(test_name, begin_time) 137 self.dut.cat_adb_log(test_name, begin_time) 138 139 """Helper Functions""" 140 141 def csv_write(self, data): 142 """Output .CSV file for test result. 143 144 Args: 145 data: Dict containing attenuation, throughput and other meta data. 146 """ 147 with open( 148 "{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: 149 csv_writer = csv.writer(csv_file, delimiter=",") 150 csv_writer.writerow(data) 151 csv_file.close() 152 153 def set_atten(self, db): 154 """Setup attenuator dB for current test. 155 156 Args: 157 db: Attenuator setup dB. 158 """ 159 if db < 0: 160 db = 0 161 elif db > 95: 162 db = 95 163 self.log.info("[Attenuation] %s", "Set dB = " + str(db) + "dB") 164 for atten in self.attenuators: 165 atten.set_atten(db) 166 self.log.info("[Attenuation] %s", 167 "Current dB = " + str(atten.get_atten()) + "dB") 168 retry = 0 169 while atten.get_atten() != db and retry < 11: 170 retry = retry + 1 171 self.log.info( 172 "[Attenuation] %s", "Fail to set Attenuator to " + str(db) + ", " + 173 str(retry) + " times try to reset") 174 self.set_atten(db) 175 if retry == 11: 176 self.log.info("Attenuation] %s", 177 "Retry Attenuator fail for 10 cycles, end test!") 178 sys.exit() 179 180 def read_comport(self, com): 181 """Read com port for current test. 182 183 Args: 184 com: Serial port. 185 186 Returns: 187 port: Serial port with baud rate. 188 """ 189 port = serial.Serial(com, 9600, timeout=1) 190 time.sleep(1) 191 return port 192 193 def get_angle(self, port): 194 """Get turn table angle for current test. 195 196 Args: 197 port: Turn table com port. 198 199 Returns: 200 angle: Angle from turn table. 201 """ 202 angle = "" 203 port.write("DG?;".encode()) 204 time.sleep(0.1) 205 degree_data = port.readline().decode("utf-8") 206 for data in range(len(degree_data)): 207 if (degree_data[data].isdigit()) is True: 208 angle = angle + degree_data[data] 209 if angle == "": 210 return -1 211 return int(angle) 212 213 def set_angle(self, port, angle): 214 """Setup turn table angle for current test. 215 216 Args: 217 port: Turn table com port 218 angle: Turn table setup angle 219 """ 220 if angle > 359: 221 angle = 359 222 elif angle < 0: 223 angle = 0 224 self.log.info("Set angle to " + str(angle)) 225 input_angle = str("DG") + str(angle) + str(";") 226 port.write(input_angle.encode()) 227 time.sleep(self.TURN_TABLE_SETUP_TIME) 228 229 def check_angle(self, port, angle): 230 """Check turn table angle for current test. 231 232 Args: 233 port: Turn table com port 234 angle: Turn table setup angle 235 """ 236 retrytime = self.TEST_TIMEOUT 237 retry = 0 238 while self.get_angle(port) != angle and retry < retrytime: 239 retry = retry + 1 240 self.log.info("Turntable] %s", 241 "Current angle = " + str(self.get_angle(port))) 242 self.log.info( 243 "Turntable] %s", "Fail set angle to " + str(angle) + ", " + 244 str(retry) + " times try to reset") 245 self.set_angle(port, angle) 246 time.sleep(self.TURN_TABLE_SETUP_TIME) 247 if retry == retrytime: 248 self.log.info( 249 "Turntable] %s", 250 "Retry turntable fail for " + str(retry) + " cycles, end test!") 251 sys.exit() 252 253 def get_wifiinfo(self): 254 """Get WiFi RSSI/ link speed/ frequency for current test. 255 256 Returns: 257 [rssi,link_speed,frequency]: DUT WiFi RSSI,Link speed and Frequency. 258 """ 259 def is_number(string): 260 for i in string: 261 if i.isdigit() is False: 262 if (i == "-" or i == "."): 263 continue 264 return str(-1) 265 return string 266 267 try: 268 cmd = "adb shell iw wlan0 link" 269 wifiinfo = utils.subprocess.check_output( 270 cmd, shell=True, timeout=self.TEST_TIMEOUT) 271 272 # Check RSSI Enhance 273 rssi = self.get_rssi_func() 274 275 # Check link speed 276 link_speed = wifiinfo.decode( 277 "utf-8")[wifiinfo.decode("utf-8").find("bitrate:") + 278 8:wifiinfo.decode("utf-8").find("Bit/s") - 2] 279 link_speed = link_speed.strip(" ") 280 link_speed = is_number(link_speed) 281 # Check frequency 282 frequency = wifiinfo.decode( 283 "utf-8")[wifiinfo.decode("utf-8").find("freq:") + 284 6:wifiinfo.decode("utf-8").find("freq:") + 10] 285 frequency = frequency.strip(" ") 286 frequency = is_number(frequency) 287 except: 288 return -1, -1, -1 289 return [rssi, link_speed, frequency] 290 291 def get_rssi_func(self): 292 """Get RSSI from brcm/qcom wifi chip. 293 294 Returns: 295 current_rssi: DUT WiFi RSSI. 296 """ 297 if self.ret_platform == "brcm": 298 rssi_future = wperfutils.get_connected_rssi_brcm(self.dut) 299 signal_poll_avg_rssi_tmp = rssi_future.pop("signal_poll_avg_rssi").pop( 300 "mean") 301 chain_0_rssi_tmp = rssi_future.pop("chain_0_rssi").pop("mean") 302 chain_1_rssi_tmp = rssi_future.pop("chain_1_rssi").pop("mean") 303 current_rssi = { 304 "signal_poll_avg_rssi": signal_poll_avg_rssi_tmp, 305 "chain_0_rssi": chain_0_rssi_tmp, 306 "chain_1_rssi": chain_1_rssi_tmp 307 } 308 elif self.ret_platform == "qcom": 309 rssi_future = wperfutils.get_connected_rssi_qcom( 310 self.dut, interface="wlan0") 311 signal_poll_avg_rssi_tmp = rssi_future.pop("signal_poll_avg_rssi").pop( 312 "mean") 313 chain_0_rssi_tmp = rssi_future.pop("chain_0_rssi").pop("mean") 314 chain_1_rssi_tmp = rssi_future.pop("chain_1_rssi").pop("mean") 315 if math.isnan(signal_poll_avg_rssi_tmp): 316 signal_poll_avg_rssi_tmp = -1 317 if math.isnan(chain_0_rssi_tmp): 318 chain_0_rssi_tmp = -1 319 if math.isnan(chain_1_rssi_tmp): 320 chain_1_rssi_tmp = -1 321 322 if signal_poll_avg_rssi_tmp == -1 & chain_0_rssi_tmp == -1 & chain_1_rssi_tmp == -1: 323 current_rssi = -1 324 else: 325 current_rssi = { 326 "signal_poll_avg_rssi": signal_poll_avg_rssi_tmp, 327 "chain_0_rssi": chain_0_rssi_tmp, 328 "chain_1_rssi": chain_1_rssi_tmp 329 } 330 else: 331 current_rssi = { 332 "signal_poll_avg_rssi": float("nan"), 333 "chain_0_rssi": float("nan"), 334 "chain_1_rssi": float("nan") 335 } 336 return current_rssi 337 338 def get_rominfo(self): 339 """Get DUT ROM build info. 340 341 Returns: 342 rom, build_id, build_type, project: DUT Build info,Build ID, 343 Build type, and Project name 344 """ 345 rom = "NA" 346 build_id = "NA" 347 build_type = "NA" 348 project = "NA" 349 rominfo = self.dut.adb.shell("getprop ro.build.display.id").split() 350 351 if rominfo: 352 rom = rominfo[2] 353 build_id = rominfo[3] 354 project, build_type = rominfo[0].split("-") 355 356 return rom, build_id, build_type, project 357 358 def get_hw_stage(self): 359 """Get DUT HW stage. 360 361 Returns: 362 hw_stage: DUT HW stage e.g. EVT/DVT/PVT..etc. 363 """ 364 cmd = "adb shell getprop ro.boot.hardware.revision" 365 hw_stage_temp = utils.subprocess.check_output( 366 cmd, shell=True, timeout=self.TEST_TIMEOUT) 367 hw_stage = hw_stage_temp.decode("utf-8").split("\n")[0] 368 return hw_stage 369 370 def get_country_code(self): 371 """Get DUT country code. 372 373 Returns: 374 country_code: DUT country code e.g. US/JP/GE..etc. 375 """ 376 cmd = "adb shell cmd wifi get-country-code" 377 country_code_temp = utils.subprocess.check_output( 378 cmd, shell=True, timeout=self.TEST_TIMEOUT) 379 country_code = country_code_temp.decode("utf-8").split(" ")[4].split( 380 "\n")[0] 381 return country_code 382 383 def get_channel(self): 384 """Get DUT WiFi channel. 385 386 Returns: 387 country_code: DUT channel e.g. 6/36/37..etc. 388 """ 389 if self.ret_platform == "brcm": 390 cmd = 'adb shell wl assoc | grep "Primary channel:"' 391 channel_temp = utils.subprocess.check_output( 392 cmd, shell=True, timeout=self.TEST_TIMEOUT) 393 channel = channel_temp.decode("utf-8").split(": ")[1].split("\n")[0] 394 elif self.ret_platform == "qcom": 395 cmd = "adb shell iw wlan0 info | grep channel" 396 channel_temp = utils.subprocess.check_output( 397 cmd, shell=True, timeout=self.TEST_TIMEOUT) 398 channel = channel_temp.decode("utf-8").split(" ")[1].split("\n")[0] 399 return channel 400 401 def get_he_capable(self): 402 """Get DUT WiFi high efficiency capable status . 403 404 Returns: 405 he_capable: DUT high efficiency capable status. 406 """ 407 if self.ret_platform == "brcm": 408 cmd = 'adb shell wl assoc | grep "Chanspec:"' 409 he_temp = utils.subprocess.check_output( 410 cmd, shell=True, timeout=self.TEST_TIMEOUT) 411 he_capable = he_temp.decode("utf-8").split(": ")[1].split("\n")[0].split( 412 "MHz")[0].split(" ")[3] 413 elif self.ret_platform == "qcom": 414 cmd = "adb shell iw wlan0 info | grep channel" 415 he_temp = utils.subprocess.check_output( 416 cmd, shell=True, timeout=self.TEST_TIMEOUT) 417 he_capable = he_temp.decode("utf-8").split("width: ")[1].split(" ")[0] 418 return he_capable 419 420 def post_process_results(self, rvr_result): 421 """Saves JSON formatted results. 422 423 Args: 424 rvr_result: Dict containing attenuation, throughput and other meta data 425 Returns: 426 wifiinfo[0]: To check WiFi connection by RSSI value 427 """ 428 # Save output as text file 429 wifiinfo = self.get_wifiinfo() 430 if wifiinfo[0] != -1: 431 rvr_result["signal_poll_avg_rssi"] = wifiinfo[0]["signal_poll_avg_rssi"] 432 rvr_result["chain_0_rssi"] = wifiinfo[0]["chain_0_rssi"] 433 rvr_result["chain_1_rssi"] = wifiinfo[0]["chain_1_rssi"] 434 else: 435 rvr_result["signal_poll_avg_rssi"] = wifiinfo[0] 436 rvr_result["chain_0_rssi"] = wifiinfo[0] 437 rvr_result["chain_1_rssi"] = wifiinfo[0] 438 if rvr_result["signal_poll_avg_rssi"] == -1: 439 rvr_result["channel"] = "NA" 440 else: 441 rvr_result["channel"] = self.ret_channel 442 rvr_result["country_code"] = self.ret_country_code 443 rvr_result["hw_stage"] = self.ret_hw_stage 444 rvr_result["wifi_chip"] = self.ret_platform 445 rvr_result["test_ssid"] = self.ssid 446 rvr_result["test_angle"] = self.angle_list[self.angle] 447 rvr_result["test_dB"] = self.db 448 rvr_result["test_link_speed"] = wifiinfo[1] 449 rvr_result["test_frequency"] = wifiinfo[2] 450 451 data = ( 452 self.testdate, 453 self.project, 454 self.dut.serial, 455 self.rom, 456 rvr_result["hw_stage"], 457 rvr_result["test_ssid"], 458 rvr_result["test_frequency"], 459 rvr_result["test_angle"], 460 rvr_result["test_dB"], 461 rvr_result["signal_poll_avg_rssi"], 462 rvr_result["chain_0_rssi"], 463 rvr_result["chain_1_rssi"], 464 rvr_result["test_link_speed"], 465 rvr_result["throughput_TX"][0], 466 rvr_result["throughput_RX"][0], 467 "HE" + self.he_capable, 468 rvr_result["country_code"], 469 rvr_result["channel"], 470 rvr_result["wifi_chip"], 471 "OTA_RvR", 472 "OTA_Testbed2", 473 "RAXE500", 474 self.build_id, 475 self.build_type, 476 "TCP", 477 "WPA3", 478 "iperf3", 479 "OFF", 480 "OFF", 481 ) 482 self.csv_write(data) 483 484 results_file_path = "{}/{}_angle{}_{}dB.json".format( 485 self.log_path, self.ssid, self.angle_list[self.angle], self.db) 486 with open(results_file_path, "w") as results_file: 487 json.dump(rvr_result, results_file, indent=4) 488 return wifiinfo[0] 489 490 def connect_to_wifi_network(self, network): 491 """Connection logic for wifi networks. 492 493 Args: 494 params: Dictionary with network info. 495 """ 496 ssid = network[WifiEnums.SSID_KEY] 497 self.dut.ed.clear_all_events() 498 wutils.start_wifi_connection_scan(self.dut) 499 scan_results = self.dut.droid.wifiGetScanResults() 500 wutils.assert_network_in_list({WifiEnums.SSID_KEY: ssid}, scan_results) 501 wutils.wifi_connect(self.dut, network, num_of_tries=3) 502 503 def run_iperf_init(self, network): 504 self.iperf_server.start(tag="init") 505 self.log.info("[Iperf] %s", "Starting iperf traffic init.") 506 time.sleep(self.IPERF_SETUP_TIME) 507 try: 508 port_arg = "-p {} -J -R -t10".format(self.iperf_server.port) 509 self.dut.run_iperf_client( 510 self.rvr_test_params["iperf_server_address"], 511 port_arg, 512 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 513 self.iperf_server.stop() 514 self.log.info("[Iperf] %s", "iperf traffic init Pass") 515 except: 516 self.log.warning("ValueError: iperf init ERROR.") 517 518 def run_iperf_client(self, network): 519 """Run iperf TX throughput after connection. 520 521 Args: 522 network: Dictionary with network info. 523 524 Returns: 525 rvr_result: Dict containing TX rvr_results. 526 """ 527 rvr_result = [] 528 try: 529 self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format( 530 self.ssid, self.angle_list[self.angle], self.db)) 531 ssid = network[WifiEnums.SSID_KEY] 532 self.log.info("[Iperf] %s", 533 "Starting iperf traffic TX through {}".format(ssid)) 534 time.sleep(self.IPERF_SETUP_TIME) 535 port_arg = "-p {} -J {}".format(self.iperf_server.port, 536 self.rvr_test_params["iperf_port_arg"]) 537 success, data = self.dut.run_iperf_client( 538 self.rvr_test_params["iperf_server_address"], 539 port_arg, 540 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 541 # Parse and log result 542 client_output_path = os.path.join( 543 self.iperf_server.log_path, 544 "IperfDUT,{},TX_client_{}_angle{}_{}dB".format( 545 self.iperf_server.port, self.ssid, self.angle_list[self.angle], 546 self.db)) 547 with open(client_output_path, "w") as out_file: 548 out_file.write("\n".join(data)) 549 self.iperf_server.stop() 550 551 iperf_file = self.iperf_server.log_files[-1] 552 iperf_result = ipf.IPerfResult(iperf_file) 553 curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ 554 self.rvr_test_params["iperf_ignored_interval"]:-1]) / 555 len(iperf_result.instantaneous_rates[ 556 self.rvr_test_params["iperf_ignored_interval"]:-1]) 557 ) * 8 * (1.024**2) 558 rvr_result.append(curr_throughput) 559 self.log.info( 560 "[Iperf] %s", "TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( 561 self.db, curr_throughput)) 562 self.log.debug(pprint.pformat(data)) 563 asserts.assert_true(success, "Error occurred in iPerf traffic.") 564 return rvr_result 565 except: 566 rvr_result = ["NA"] 567 self.log.warning("ValueError: TX iperf ERROR.") 568 self.iperf_server.stop() 569 return rvr_result 570 571 def run_iperf_server(self, network): 572 """Run iperf RX throughput after connection. 573 574 Args: 575 network: Dictionary with network info. 576 577 Returns: 578 rvr_result: Dict containing RX rvr_results. 579 """ 580 581 rvr_result = [] 582 try: 583 self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format( 584 self.ssid, self.angle_list[self.angle], self.db)) 585 ssid = network[WifiEnums.SSID_KEY] 586 self.log.info("[Iperf] %s", 587 "Starting iperf traffic RX through {}".format(ssid)) 588 time.sleep(self.IPERF_SETUP_TIME) 589 port_arg = "-p {} -J -R {}".format(self.iperf_server.port, 590 self.rvr_test_params["iperf_port_arg"]) 591 success, data = self.dut.run_iperf_client( 592 self.rvr_test_params["iperf_server_address"], 593 port_arg, 594 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 595 # Parse and log result 596 client_output_path = os.path.join( 597 self.iperf_server.log_path, 598 "IperfDUT,{},RX_server_{}_angle{}_{}dB".format( 599 self.iperf_server.port, self.ssid, self.angle_list[self.angle], 600 self.db)) 601 with open(client_output_path, "w") as out_file: 602 out_file.write("\n".join(data)) 603 self.iperf_server.stop() 604 605 iperf_file = client_output_path 606 iperf_result = ipf.IPerfResult(iperf_file) 607 curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ 608 self.rvr_test_params["iperf_ignored_interval"]:-1]) / 609 len(iperf_result.instantaneous_rates[ 610 self.rvr_test_params["iperf_ignored_interval"]:-1]) 611 ) * 8 * (1.024**2) 612 rvr_result.append(curr_throughput) 613 self.log.info( 614 "[Iperf] %s", "RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( 615 self.db, curr_throughput)) 616 617 self.log.debug(pprint.pformat(data)) 618 asserts.assert_true(success, "Error occurred in iPerf traffic.") 619 return rvr_result 620 except: 621 rvr_result = ["NA"] 622 self.log.warning("ValueError: RX iperf ERROR.") 623 self.iperf_server.stop() 624 return rvr_result 625 626 def iperf_test_func(self, network): 627 """Main function to test iperf TX/RX. 628 629 Args: 630 network: Dictionary with network info. 631 """ 632 # Initialize 633 rvr_result = {} 634 # Run RvR and log result 635 rvr_result["throughput_RX"] = self.run_iperf_server(network) 636 retry_time = 2 637 for retry in range(retry_time): 638 if rvr_result["throughput_RX"] == ["NA"]: 639 if not self.iperf_retry(): 640 time.sleep(self.IPERF_SETUP_TIME) 641 rvr_result["throughput_RX"] = self.run_iperf_server(network) 642 else: 643 break 644 else: 645 break 646 rvr_result["throughput_TX"] = self.run_iperf_client(network) 647 retry_time = 2 648 for retry in range(retry_time): 649 if rvr_result["throughput_TX"] == ["NA"]: 650 if not self.iperf_retry(): 651 time.sleep(self.IPERF_SETUP_TIME) 652 rvr_result["throughput_TX"] = self.run_iperf_client(network) 653 else: 654 break 655 else: 656 break 657 self.post_process_results(rvr_result) 658 self.rssi = wifiinfo[0] 659 return self.rssi 660 661 def iperf_retry(self): 662 """Check iperf TX/RX status and retry.""" 663 try: 664 cmd = "adb -s {} shell pidof iperf3| xargs adb shell kill -9".format( 665 self.dut.serial) 666 utils.subprocess.call(cmd, shell=True, timeout=self.TEST_TIMEOUT) 667 self.log.warning("ValueError: Killed DUT iperf process, keep test") 668 except: 669 self.log.info("[Iperf] %s", "No iperf DUT process found, keep test") 670 671 wifiinfo = self.get_wifiinfo() 672 print("--[iperf_retry]--", wifiinfo[0]) 673 self.log.info("[WiFiinfo] %s", "Current RSSI = " + str(wifiinfo[0]) + "dBm") 674 if wifiinfo[0] == -1: 675 self.log.warning("ValueError: Cannot get RSSI, stop throughput test") 676 return True 677 else: 678 return False 679 680 def rvr_test(self, network): 681 """Test function to run RvR. 682 683 The function runs an RvR test in the current device/AP configuration. 684 Function is called from another wrapper function that sets up the 685 testbed for the RvR test 686 687 Args: 688 params: Dictionary with network info 689 """ 690 self.ssid = network[WifiEnums.SSID_KEY] 691 self.log.info("Start rvr test") 692 693 for angle in range(len(self.angle_list)): 694 self.angle = angle 695 self.set_angle(self.turntable_port, self.angle_list[angle]) 696 self.check_angle(self.turntable_port, self.angle_list[angle]) 697 self.set_atten(0) 698 self.connect_to_wifi_network(network) 699 self.ret_channel = self.get_channel() 700 self.he_capable = self.get_he_capable() 701 self.run_iperf_init(network) 702 for db in range(self.mindb, self.maxdb + self.stepdb, self.stepdb): 703 self.db = db 704 self.set_atten(self.db) 705 self.iperf_test_func(network) 706 if self.rssi == -1: 707 self.log.warning("ValueError: Cannot get RSSI. Run next angle") 708 break 709 else: 710 continue 711 wutils.reset_wifi(self.dut) 712 713 """Tests""" 714 def test_rvr_2g(self): 715 network = self.rvr_networks[0] 716 self.rvr_test(network) 717 718 def test_rvr_5g(self): 719 network = self.rvr_networks[1] 720 self.rvr_test(network) 721 722 def test_rvr_6g(self): 723 network = self.rvr_networks[2] 724 self.rvr_test(network) 725