1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import itertools 18import pprint 19import time 20 21import acts.signals 22import acts.test_utils.wifi.wifi_test_utils as wutils 23 24from acts import asserts 25from acts.test_decorators import test_tracker_info 26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest 27from acts.controllers import iperf_server as ipf 28 29import json 30import logging 31import math 32import os 33from acts import utils 34import csv 35 36import serial 37import sys 38 39 40WifiEnums = wutils.WifiEnums 41 42 43class WifiRvrTWTest(WifiBaseTest): 44 """ Tests for wifi RVR performance 45 46 Test Bed Requirement: 47 * One Android device 48 * Wi-Fi networks visible to the device 49 """ 50 TEST_TIMEOUT = 10 51 52 def __init__(self, controllers): 53 self.attenuators = None 54 WifiBaseTest.__init__(self, controllers) 55 56 def setup_class(self): 57 self.dut = self.android_devices[0] 58 wutils.wifi_test_device_init(self.dut) 59 60 req_params = [ "iot_networks","rvr_test_params"] 61 opt_params = [ "angle_params","usb_port"] 62 self.unpack_userparams(req_param_names=req_params, 63 opt_param_names=opt_params) 64 65 asserts.assert_true( 66 len(self.iot_networks) > 0, 67 "Need at least one iot network with psk.") 68 69 wutils.wifi_toggle_state(self.dut, True) 70 if "rvr_test_params" in self.user_params: 71 self.iperf_server = self.iperf_servers[0] 72 self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"] 73 self.MindB= self.rvr_test_params ["rvr_atten_MinDB"] 74 self.stepdB= self.rvr_test_params ["rvr_atten_step"] 75 76 if "angle_params" in self.user_params: 77 self.angle = self.angle_params 78 79 if "usb_port" in self.user_params: 80 self.T1=self.readport(self.usb_port["turntable"]) 81 self.ATT1=self.readport(self.usb_port["atten1"]) 82 self.ATT2=self.readport(self.usb_port["atten2"]) 83 self.ATT3=self.readport(self.usb_port["atten3"]) 84 85 # create hashmap for testcase name and SSIDs 86 self.iot_test_prefix = "test_iot_connection_to_" 87 self.ssid_map = {} 88 for network in self.iot_networks: 89 SSID = network['SSID'].replace('-','_') 90 self.ssid_map[SSID] = network 91 92 # create folder for rvr test result 93 self.log_path = os.path.join(logging.log_path, "rvr_results") 94 utils.create_dir(self.log_path) 95 96 Header=("test_SSID","Turn table (angle)","Attenuator(dBm)", 97 "TX throughput (Mbps)","RX throughput (Mbps)", 98 "RSSI","Link speed","Frequency") 99 self.csv_write(Header) 100 101 def setup_test(self): 102 self.dut.droid.wakeLockAcquireBright() 103 self.dut.droid.wakeUpNow() 104 105 def teardown_test(self): 106 self.dut.droid.wakeLockRelease() 107 self.dut.droid.goToSleepNow() 108 109 def teardown_class(self): 110 if "rvr_test_params" in self.user_params: 111 self.iperf_server.stop() 112 113 def on_fail(self, test_name, begin_time): 114 self.dut.take_bug_report(test_name, begin_time) 115 self.dut.cat_adb_log(test_name, begin_time) 116 117 """Helper Functions""" 118 119 def csv_write(self,data): 120 """Output .CSV file for test result. 121 122 Args: 123 data: Dict containing attenuation, throughput and other meta data. 124 """ 125 with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: 126 csv_writer = csv.writer(csv_file,delimiter=',') 127 csv_writer.writerow(data) 128 csv_file.close() 129 130 def readport(self,com): 131 """Read com port for current test. 132 133 Args: 134 com: Attenuator or turn table com port 135 """ 136 port=serial.Serial(com,9600,timeout=1) 137 time.sleep(1) 138 return port 139 140 def getdB(self,port): 141 """Get attenuator dB for current test. 142 143 Args: 144 port: Attenuator com port 145 """ 146 port.write('V?;'.encode()) 147 dB=port.readline().decode() 148 dB=dB.strip(';') 149 dB=dB[dB.find('V')+1:] 150 return int(dB) 151 152 def setdB(self,port,dB): 153 """Setup attenuator dB for current test. 154 155 Args: 156 port: Attenuator com port 157 dB: Attenuator setup dB 158 """ 159 if dB<0: 160 dB=0 161 elif dB>101: 162 dB=101 163 self.log.info("Set dB to "+str(dB)) 164 InputdB=str('V')+str(dB)+str(';') 165 port.write(InputdB.encode()) 166 time.sleep(0.1) 167 168 def set_Three_Att_dB(self,port1,port2,port3,dB): 169 """Setup 3 attenuator dB for current test. 170 171 Args: 172 port1: Attenuator1 com port 173 port1: Attenuator2 com port 174 port1: Attenuator com port 175 dB: Attenuator setup dB 176 """ 177 self.setdB(port1,dB) 178 self.setdB(port2,dB) 179 self.setdB(port3,dB) 180 self.checkdB(port1,dB) 181 self.checkdB(port2,dB) 182 self.checkdB(port3,dB) 183 184 def checkdB(self,port,dB): 185 """Check attenuator dB for current test. 186 187 Args: 188 port: Attenuator com port 189 dB: Attenuator setup dB 190 """ 191 retry=0 192 while self.getdB(port)!=dB and retry<10: 193 retry=retry+1 194 self.log.info("Current dB = "+str(self.getdB(port))) 195 self.log.info("Fail to set Attenuator to "+str(dB)+", " 196 +str(retry)+" times try to reset") 197 self.setdB(port,dB) 198 if retry ==10: 199 self.log.info("Retry Attenuator fail for 9 cycles, end test!") 200 sys.exit() 201 return 0 202 203 def getDG(self,port): 204 """Get turn table angle for current test. 205 206 Args: 207 port: Turn table com port 208 """ 209 DG = "" 210 port.write('DG?;'.encode()) 211 time.sleep(0.1) 212 data = port.readline().decode('utf-8') 213 for i in range(len(data)): 214 if (data[i].isdigit()) == True: 215 DG = DG + data[i] 216 if DG == "": 217 return -1 218 return int(DG) 219 220 def setDG(self,port,DG): 221 """Setup turn table angle for current test. 222 223 Args: 224 port: Turn table com port 225 DG: Turn table setup angle 226 """ 227 if DG>359: 228 DG=359 229 elif DG<0: 230 DG=0 231 self.log.info("Set angle to "+str(DG)) 232 InputDG=str('DG')+str(DG)+str(';') 233 port.write(InputDG.encode()) 234 235 def checkDG(self,port,DG): 236 """Check turn table angle for current test. 237 238 Args: 239 port: Turn table com port 240 DG: Turn table setup angle 241 """ 242 retrytime = self.TEST_TIMEOUT 243 retry = 0 244 while self.getDG(port)!=DG and retry<retrytime: 245 retry=retry+1 246 self.log.info('Current angle = '+str(self.getDG(port))) 247 self.log.info('Fail set angle to '+str(DG)+', '+str(retry)+' times try to reset') 248 self.setDG(port,DG) 249 time.sleep(10) 250 if retry == retrytime: 251 self.log.info('Retry turntable fail for '+str(retry)+' cycles, end test!') 252 sys.exit() 253 return 0 254 255 def getwifiinfo(self): 256 """Get WiFi RSSI/ link speed/ frequency for current test. 257 258 Returns: 259 [RSSI,LS,FR]: WiFi RSSI/ link speed/ frequency 260 """ 261 def is_number(string): 262 for i in string: 263 if i.isdigit() == False: 264 if (i=="-" or i=="."): 265 continue 266 return str(-1) 267 return string 268 269 try: 270 cmd = "adb shell iw wlan0 link" 271 wifiinfo = utils.subprocess.check_output(cmd,shell=True, 272 timeout=self.TEST_TIMEOUT) 273 # Check RSSI 274 RSSI = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("signal:") + 275 7:wifiinfo.decode("utf-8").find("dBm") - 1] 276 RSSI = RSSI.strip(' ') 277 RSSI = is_number(RSSI) 278 # Check link speed 279 LS = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("bitrate:") + 280 8:wifiinfo.decode("utf-8").find("Bit/s") - 2] 281 LS = LS.strip(' ') 282 LS = is_number(LS) 283 # Check frequency 284 FR = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("freq:") + 285 6:wifiinfo.decode("utf-8").find("freq:") + 10] 286 FR = FR.strip(' ') 287 FR = is_number(FR) 288 except: 289 return -1, -1, -1 290 return [RSSI,LS,FR] 291 292 def post_process_results(self, rvr_result): 293 """Saves JSON formatted results. 294 295 Args: 296 rvr_result: Dict containing attenuation, throughput and other meta 297 data 298 """ 299 # Save output as text file 300 data=(rvr_result["test_name"],rvr_result["test_angle"],rvr_result["test_dB"], 301 rvr_result["throughput_TX"][0],rvr_result["throughput_RX"][0], 302 rvr_result["test_RSSI"],rvr_result["test_LS"],rvr_result["test_FR"]) 303 self.csv_write(data) 304 305 results_file_path = "{}/{}_angle{}_{}dB.json".format(self.log_path, 306 self.ssid, 307 self.angle[self.ag],self.DB) 308 with open(results_file_path, 'w') as results_file: 309 json.dump(rvr_result, results_file, indent=4) 310 311 def connect_to_wifi_network(self, network): 312 """Connection logic for psk wifi networks. 313 314 Args: 315 params: Dictionary with network info. 316 """ 317 SSID = network[WifiEnums.SSID_KEY] 318 self.dut.ed.clear_all_events() 319 wutils.start_wifi_connection_scan(self.dut) 320 scan_results = self.dut.droid.wifiGetScanResults() 321 wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results) 322 wutils.wifi_connect(self.dut, network, num_of_tries=3) 323 324 def run_iperf_client(self, network): 325 """Run iperf TX throughput after connection. 326 327 Args: 328 params: Dictionary with network info. 329 330 Returns: 331 rvr_result: Dict containing rvr_results 332 """ 333 rvr_result = [] 334 self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format( 335 self.ssid,self.angle[self.ag],self.DB)) 336 wait_time = 5 337 SSID = network[WifiEnums.SSID_KEY] 338 self.log.info("Starting iperf traffic TX through {}".format(SSID)) 339 time.sleep(wait_time) 340 port_arg = "-p {} -J {}".format(self.iperf_server.port, 341 self.rvr_test_params["iperf_port_arg"]) 342 success, data = self.dut.run_iperf_client( 343 self.rvr_test_params["iperf_server_address"], 344 port_arg, 345 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 346 # Parse and log result 347 client_output_path = os.path.join( 348 self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format( 349 self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB)) 350 with open(client_output_path, 'w') as out_file: 351 out_file.write("\n".join(data)) 352 self.iperf_server.stop() 353 354 iperf_file = self.iperf_server.log_files[-1] 355 try: 356 iperf_result = ipf.IPerfResult(iperf_file) 357 curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ 358 self.rvr_test_params["iperf_ignored_interval"]:-1]) / len( 359 iperf_result.instantaneous_rates[self.rvr_test_params[ 360 "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) 361 except: 362 self.log.warning( 363 "ValueError: Cannot get iperf result. Setting to 0") 364 curr_throughput = 0 365 rvr_result.append(curr_throughput) 366 self.log.info("TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( 367 self.DB, curr_throughput)) 368 369 self.log.debug(pprint.pformat(data)) 370 asserts.assert_true(success, "Error occurred in iPerf traffic.") 371 return rvr_result 372 373 def run_iperf_server(self, network): 374 """Run iperf RX throughput after connection. 375 376 Args: 377 params: Dictionary with network info. 378 379 Returns: 380 rvr_result: Dict containing rvr_results 381 """ 382 rvr_result = [] 383 self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format( 384 self.ssid,self.angle[self.ag],self.DB)) 385 wait_time = 5 386 SSID = network[WifiEnums.SSID_KEY] 387 self.log.info("Starting iperf traffic RX through {}".format(SSID)) 388 time.sleep(wait_time) 389 port_arg = "-p {} -J -R {}".format(self.iperf_server.port, 390 self.rvr_test_params["iperf_port_arg"]) 391 success, data = self.dut.run_iperf_client( 392 self.rvr_test_params["iperf_server_address"], 393 port_arg, 394 timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) 395 # Parse and log result 396 client_output_path = os.path.join( 397 self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format( 398 self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB)) 399 with open(client_output_path, 'w') as out_file: 400 out_file.write("\n".join(data)) 401 self.iperf_server.stop() 402 403 iperf_file = client_output_path 404 try: 405 iperf_result = ipf.IPerfResult(iperf_file) 406 curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ 407 self.rvr_test_params["iperf_ignored_interval"]:-1]) / len( 408 iperf_result.instantaneous_rates[self.rvr_test_params[ 409 "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) 410 except: 411 self.log.warning( 412 "ValueError: Cannot get iperf result. Setting to 0") 413 curr_throughput = 0 414 rvr_result.append(curr_throughput) 415 self.log.info("RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( 416 self.DB, curr_throughput)) 417 418 self.log.debug(pprint.pformat(data)) 419 asserts.assert_true(success, "Error occurred in iPerf traffic.") 420 return rvr_result 421 422 def iperf_test_func(self,network): 423 """Main function to test iperf TX/RX. 424 425 Args: 426 params: Dictionary with network info 427 """ 428 if "rvr_test_params" in self.user_params: 429 # Initialize 430 rvr_result = {} 431 # Run RvR and log result 432 wifiinfo = self.getwifiinfo() 433 rvr_result["throughput_TX"] = self.run_iperf_client(network) 434 rvr_result["throughput_RX"] = self.run_iperf_server(network) 435 rvr_result["test_name"] = self.ssid 436 rvr_result["test_angle"] = self.angle[self.ag] 437 rvr_result["test_dB"] = self.DB 438 rvr_result["test_RSSI"] = wifiinfo[0] 439 rvr_result["test_LS"] = wifiinfo[1] 440 rvr_result["test_FR"] = wifiinfo[2] 441 self.post_process_results(rvr_result) 442 443 def rvr_test(self,network): 444 """Test function to run RvR. 445 446 The function runs an RvR test in the current device/AP configuration. 447 Function is called from another wrapper function that sets up the 448 testbed for the RvR test 449 450 Args: 451 params: Dictionary with network info 452 """ 453 wait_time = 5 454 utils.subprocess.check_output('adb root', shell=True, timeout=20) 455 self.ssid = network[WifiEnums.SSID_KEY] 456 self.log.info("Start rvr test") 457 for i in range(len(self.angle)): 458 self.setDG(self.T1,self.angle[i]) 459 time.sleep(wait_time) 460 self.checkDG(self.T1,self.angle[i]) 461 self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,0) 462 time.sleep(wait_time) 463 self.connect_to_wifi_network(network) 464 self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.MindB) 465 for j in range(self.MindB,self.MaxdB+self.stepdB,self.stepdB): 466 self.DB=j 467 self.ag=i 468 self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.DB) 469 self.iperf_test_func(network) 470 wutils.reset_wifi(self.dut) 471 472 """Tests""" 473 474 @test_tracker_info(uuid="93816af8-4c63-45f8-b296-cb49fae0b158") 475 def test_iot_connection_to_RVR_2G(self): 476 ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") 477 self.rvr_test(self.ssid_map[ssid_key]) 478 479 @test_tracker_info(uuid="e1a67e13-946f-4d91-aa73-3f945438a1ac") 480 def test_iot_connection_to_RVR_5G(self): 481 ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") 482 self.rvr_test(self.ssid_map[ssid_key])