#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import itertools import pprint import time import acts.signals import acts.test_utils.wifi.wifi_test_utils as wutils from acts import asserts from acts.test_decorators import test_tracker_info from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest from acts.controllers import iperf_server as ipf import json import logging import math import os from acts import utils import csv import serial import sys WifiEnums = wutils.WifiEnums class WifiRvrTWTest(WifiBaseTest): """ Tests for wifi RVR performance Test Bed Requirement: * One Android device * Wi-Fi networks visible to the device """ TEST_TIMEOUT = 10 def __init__(self, controllers): self.attenuators = None WifiBaseTest.__init__(self, controllers) def setup_class(self): self.dut = self.android_devices[0] wutils.wifi_test_device_init(self.dut) req_params = [ "iot_networks","rvr_test_params"] opt_params = [ "angle_params","usb_port"] self.unpack_userparams(req_param_names=req_params, opt_param_names=opt_params) asserts.assert_true( len(self.iot_networks) > 0, "Need at least one iot network with psk.") wutils.wifi_toggle_state(self.dut, True) if "rvr_test_params" in self.user_params: self.iperf_server = self.iperf_servers[0] self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"] self.MindB= self.rvr_test_params ["rvr_atten_MinDB"] self.stepdB= self.rvr_test_params ["rvr_atten_step"] if "angle_params" in self.user_params: self.angle = self.angle_params if "usb_port" in self.user_params: self.T1=self.readport(self.usb_port["turntable"]) self.ATT1=self.readport(self.usb_port["atten1"]) self.ATT2=self.readport(self.usb_port["atten2"]) self.ATT3=self.readport(self.usb_port["atten3"]) # create hashmap for testcase name and SSIDs self.iot_test_prefix = "test_iot_connection_to_" self.ssid_map = {} for network in self.iot_networks: SSID = network['SSID'].replace('-','_') self.ssid_map[SSID] = network # create folder for rvr test result self.log_path = os.path.join(logging.log_path, "rvr_results") utils.create_dir(self.log_path) Header=("test_SSID","Turn table (angle)","Attenuator(dBm)", "TX throughput (Mbps)","RX throughput (Mbps)", "RSSI","Link speed","Frequency") self.csv_write(Header) def setup_test(self): self.dut.droid.wakeLockAcquireBright() self.dut.droid.wakeUpNow() def teardown_test(self): self.dut.droid.wakeLockRelease() self.dut.droid.goToSleepNow() def teardown_class(self): if "rvr_test_params" in self.user_params: self.iperf_server.stop() def on_fail(self, test_name, begin_time): self.dut.take_bug_report(test_name, begin_time) self.dut.cat_adb_log(test_name, begin_time) """Helper Functions""" def csv_write(self,data): """Output .CSV file for test result. Args: data: Dict containing attenuation, throughput and other meta data. """ with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: csv_writer = csv.writer(csv_file,delimiter=',') csv_writer.writerow(data) csv_file.close() def readport(self,com): """Read com port for current test. Args: com: Attenuator or turn table com port """ port=serial.Serial(com,9600,timeout=1) time.sleep(1) return port def getdB(self,port): """Get attenuator dB for current test. Args: port: Attenuator com port """ port.write('V?;'.encode()) dB=port.readline().decode() dB=dB.strip(';') dB=dB[dB.find('V')+1:] return int(dB) def setdB(self,port,dB): """Setup attenuator dB for current test. Args: port: Attenuator com port dB: Attenuator setup dB """ if dB<0: dB=0 elif dB>101: dB=101 self.log.info("Set dB to "+str(dB)) InputdB=str('V')+str(dB)+str(';') port.write(InputdB.encode()) time.sleep(0.1) def set_Three_Att_dB(self,port1,port2,port3,dB): """Setup 3 attenuator dB for current test. Args: port1: Attenuator1 com port port1: Attenuator2 com port port1: Attenuator com port dB: Attenuator setup dB """ self.setdB(port1,dB) self.setdB(port2,dB) self.setdB(port3,dB) self.checkdB(port1,dB) self.checkdB(port2,dB) self.checkdB(port3,dB) def checkdB(self,port,dB): """Check attenuator dB for current test. Args: port: Attenuator com port dB: Attenuator setup dB """ retry=0 while self.getdB(port)!=dB and retry<10: retry=retry+1 self.log.info("Current dB = "+str(self.getdB(port))) self.log.info("Fail to set Attenuator to "+str(dB)+", " +str(retry)+" times try to reset") self.setdB(port,dB) if retry ==10: self.log.info("Retry Attenuator fail for 9 cycles, end test!") sys.exit() return 0 def getDG(self,port): """Get turn table angle for current test. Args: port: Turn table com port """ DG = "" port.write('DG?;'.encode()) time.sleep(0.1) data = port.readline().decode('utf-8') for i in range(len(data)): if (data[i].isdigit()) == True: DG = DG + data[i] if DG == "": return -1 return int(DG) def setDG(self,port,DG): """Setup turn table angle for current test. Args: port: Turn table com port DG: Turn table setup angle """ if DG>359: DG=359 elif DG<0: DG=0 self.log.info("Set angle to "+str(DG)) InputDG=str('DG')+str(DG)+str(';') port.write(InputDG.encode()) def checkDG(self,port,DG): """Check turn table angle for current test. Args: port: Turn table com port DG: Turn table setup angle """ retrytime = self.TEST_TIMEOUT retry = 0 while self.getDG(port)!=DG and retry