• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import itertools
18import pprint
19import time
20
21import acts.signals
22import acts.test_utils.wifi.wifi_test_utils as wutils
23
24from acts import asserts
25from acts.test_decorators import test_tracker_info
26from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
27from acts.controllers import iperf_server as ipf
28
29import json
30import logging
31import math
32import os
33from acts import utils
34import csv
35
36import serial
37import sys
38
39
40WifiEnums = wutils.WifiEnums
41
42
43class WifiRvrTWTest(WifiBaseTest):
44    """ Tests for wifi RVR performance
45
46        Test Bed Requirement:
47          * One Android device
48          * Wi-Fi networks visible to the device
49    """
50    TEST_TIMEOUT = 10
51
52    def __init__(self, controllers):
53        self.attenuators = None
54        WifiBaseTest.__init__(self, controllers)
55
56    def setup_class(self):
57        self.dut = self.android_devices[0]
58        wutils.wifi_test_device_init(self.dut)
59
60        req_params = [ "iot_networks","rvr_test_params"]
61        opt_params = [ "angle_params","usb_port"]
62        self.unpack_userparams(req_param_names=req_params,
63                               opt_param_names=opt_params)
64
65        asserts.assert_true(
66            len(self.iot_networks) > 0,
67            "Need at least one iot network with psk.")
68
69        wutils.wifi_toggle_state(self.dut, True)
70        if "rvr_test_params" in self.user_params:
71            self.iperf_server = self.iperf_servers[0]
72            self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"]
73            self.MindB= self.rvr_test_params ["rvr_atten_MinDB"]
74            self.stepdB= self.rvr_test_params ["rvr_atten_step"]
75
76        if "angle_params" in self.user_params:
77            self.angle = self.angle_params
78
79        if "usb_port" in self.user_params:
80            self.T1=self.readport(self.usb_port["turntable"])
81            self.ATT1=self.readport(self.usb_port["atten1"])
82            self.ATT2=self.readport(self.usb_port["atten2"])
83            self.ATT3=self.readport(self.usb_port["atten3"])
84
85        # create hashmap for testcase name and SSIDs
86        self.iot_test_prefix = "test_iot_connection_to_"
87        self.ssid_map = {}
88        for network in self.iot_networks:
89            SSID = network['SSID'].replace('-','_')
90            self.ssid_map[SSID] = network
91
92        # create folder for rvr test result
93        self.log_path = os.path.join(logging.log_path, "rvr_results")
94        utils.create_dir(self.log_path)
95
96        Header=("test_SSID","Turn table (angle)","Attenuator(dBm)",
97                "TX throughput (Mbps)","RX throughput (Mbps)",
98                "RSSI","Link speed","Frequency")
99        self.csv_write(Header)
100
101    def setup_test(self):
102        self.dut.droid.wakeLockAcquireBright()
103        self.dut.droid.wakeUpNow()
104
105    def teardown_test(self):
106        self.dut.droid.wakeLockRelease()
107        self.dut.droid.goToSleepNow()
108
109    def teardown_class(self):
110        if "rvr_test_params" in self.user_params:
111            self.iperf_server.stop()
112
113    def on_fail(self, test_name, begin_time):
114        self.dut.take_bug_report(test_name, begin_time)
115        self.dut.cat_adb_log(test_name, begin_time)
116
117    """Helper Functions"""
118
119    def csv_write(self,data):
120        """Output .CSV file for test result.
121
122        Args:
123            data: Dict containing attenuation, throughput and other meta data.
124        """
125        with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file:
126            csv_writer = csv.writer(csv_file,delimiter=',')
127            csv_writer.writerow(data)
128            csv_file.close()
129
130    def readport(self,com):
131        """Read com port for current test.
132
133        Args:
134            com: Attenuator or turn table com port
135        """
136        port=serial.Serial(com,9600,timeout=1)
137        time.sleep(1)
138        return port
139
140    def getdB(self,port):
141        """Get attenuator dB for current test.
142
143        Args:
144            port: Attenuator com port
145        """
146        port.write('V?;'.encode())
147        dB=port.readline().decode()
148        dB=dB.strip(';')
149        dB=dB[dB.find('V')+1:]
150        return int(dB)
151
152    def setdB(self,port,dB):
153        """Setup attenuator dB for current test.
154
155        Args:
156            port: Attenuator com port
157            dB: Attenuator setup dB
158        """
159        if dB<0:
160            dB=0
161        elif dB>101:
162            dB=101
163        self.log.info("Set dB to "+str(dB))
164        InputdB=str('V')+str(dB)+str(';')
165        port.write(InputdB.encode())
166        time.sleep(0.1)
167
168    def set_Three_Att_dB(self,port1,port2,port3,dB):
169        """Setup 3 attenuator dB for current test.
170
171        Args:
172            port1: Attenuator1 com port
173            port1: Attenuator2 com port
174            port1: Attenuator com port
175            dB: Attenuator setup dB
176        """
177        self.setdB(port1,dB)
178        self.setdB(port2,dB)
179        self.setdB(port3,dB)
180        self.checkdB(port1,dB)
181        self.checkdB(port2,dB)
182        self.checkdB(port3,dB)
183
184    def checkdB(self,port,dB):
185        """Check attenuator dB for current test.
186
187        Args:
188            port: Attenuator com port
189            dB: Attenuator setup dB
190        """
191        retry=0
192        while self.getdB(port)!=dB and retry<10:
193            retry=retry+1
194            self.log.info("Current dB = "+str(self.getdB(port)))
195            self.log.info("Fail to set Attenuator to "+str(dB)+", "
196                          +str(retry)+" times try to reset")
197            self.setdB(port,dB)
198        if retry ==10:
199            self.log.info("Retry Attenuator fail for 9 cycles, end test!")
200            sys.exit()
201        return 0
202
203    def getDG(self,port):
204        """Get turn table angle for current test.
205
206        Args:
207            port: Turn table com port
208        """
209        DG = ""
210        port.write('DG?;'.encode())
211        time.sleep(0.1)
212        data = port.readline().decode('utf-8')
213        for i in range(len(data)):
214            if (data[i].isdigit()) == True:
215                DG = DG + data[i]
216        if DG == "":
217            return -1
218        return int(DG)
219
220    def setDG(self,port,DG):
221        """Setup turn table angle for current test.
222
223        Args:
224            port: Turn table com port
225            DG: Turn table setup angle
226        """
227        if DG>359:
228            DG=359
229        elif DG<0:
230            DG=0
231        self.log.info("Set angle to "+str(DG))
232        InputDG=str('DG')+str(DG)+str(';')
233        port.write(InputDG.encode())
234
235    def checkDG(self,port,DG):
236        """Check turn table angle for current test.
237
238        Args:
239            port: Turn table com port
240            DG: Turn table setup angle
241        """
242        retrytime = self.TEST_TIMEOUT
243        retry = 0
244        while self.getDG(port)!=DG and retry<retrytime:
245            retry=retry+1
246            self.log.info('Current angle = '+str(self.getDG(port)))
247            self.log.info('Fail set angle to '+str(DG)+', '+str(retry)+' times try to reset')
248            self.setDG(port,DG)
249            time.sleep(10)
250        if retry == retrytime:
251            self.log.info('Retry turntable fail for '+str(retry)+' cycles, end test!')
252            sys.exit()
253        return 0
254
255    def getwifiinfo(self):
256        """Get WiFi RSSI/ link speed/ frequency for current test.
257
258        Returns:
259            [RSSI,LS,FR]: WiFi RSSI/ link speed/ frequency
260        """
261        def is_number(string):
262            for i in string:
263                if i.isdigit() == False:
264                    if (i=="-" or i=="."):
265                        continue
266                    return str(-1)
267            return string
268
269        try:
270            cmd = "adb shell iw wlan0 link"
271            wifiinfo = utils.subprocess.check_output(cmd,shell=True,
272                                                     timeout=self.TEST_TIMEOUT)
273            # Check RSSI
274            RSSI = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("signal:") +
275                                            7:wifiinfo.decode("utf-8").find("dBm") - 1]
276            RSSI = RSSI.strip(' ')
277            RSSI = is_number(RSSI)
278            # Check link speed
279            LS = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("bitrate:") +
280                                          8:wifiinfo.decode("utf-8").find("Bit/s") - 2]
281            LS = LS.strip(' ')
282            LS = is_number(LS)
283            # Check frequency
284            FR = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("freq:") +
285                                          6:wifiinfo.decode("utf-8").find("freq:") + 10]
286            FR = FR.strip(' ')
287            FR = is_number(FR)
288        except:
289            return -1, -1, -1
290        return [RSSI,LS,FR]
291
292    def post_process_results(self, rvr_result):
293        """Saves JSON formatted results.
294
295        Args:
296            rvr_result: Dict containing attenuation, throughput and other meta
297            data
298        """
299        # Save output as text file
300        data=(rvr_result["test_name"],rvr_result["test_angle"],rvr_result["test_dB"],
301              rvr_result["throughput_TX"][0],rvr_result["throughput_RX"][0],
302              rvr_result["test_RSSI"],rvr_result["test_LS"],rvr_result["test_FR"])
303        self.csv_write(data)
304
305        results_file_path = "{}/{}_angle{}_{}dB.json".format(self.log_path,
306                                                        self.ssid,
307                                                        self.angle[self.ag],self.DB)
308        with open(results_file_path, 'w') as results_file:
309            json.dump(rvr_result, results_file, indent=4)
310
311    def connect_to_wifi_network(self, network):
312        """Connection logic for psk wifi networks.
313
314        Args:
315            params: Dictionary with network info.
316        """
317        SSID = network[WifiEnums.SSID_KEY]
318        self.dut.ed.clear_all_events()
319        wutils.start_wifi_connection_scan(self.dut)
320        scan_results = self.dut.droid.wifiGetScanResults()
321        wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
322        wutils.wifi_connect(self.dut, network, num_of_tries=3)
323
324    def run_iperf_client(self, network):
325        """Run iperf TX throughput after connection.
326
327        Args:
328            params: Dictionary with network info.
329
330        Returns:
331            rvr_result: Dict containing rvr_results
332        """
333        rvr_result = []
334        self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format(
335            self.ssid,self.angle[self.ag],self.DB))
336        wait_time = 5
337        SSID = network[WifiEnums.SSID_KEY]
338        self.log.info("Starting iperf traffic TX through {}".format(SSID))
339        time.sleep(wait_time)
340        port_arg = "-p {} -J {}".format(self.iperf_server.port,
341                                        self.rvr_test_params["iperf_port_arg"])
342        success, data = self.dut.run_iperf_client(
343            self.rvr_test_params["iperf_server_address"],
344            port_arg,
345            timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
346        # Parse and log result
347        client_output_path = os.path.join(
348            self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format(
349                self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB))
350        with open(client_output_path, 'w') as out_file:
351            out_file.write("\n".join(data))
352        self.iperf_server.stop()
353
354        iperf_file = self.iperf_server.log_files[-1]
355        try:
356            iperf_result = ipf.IPerfResult(iperf_file)
357            curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
358                self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(
359                    iperf_result.instantaneous_rates[self.rvr_test_params[
360                        "iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
361        except:
362            self.log.warning(
363                "ValueError: Cannot get iperf result. Setting to 0")
364            curr_throughput = 0
365        rvr_result.append(curr_throughput)
366        self.log.info("TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
367            self.DB, curr_throughput))
368
369        self.log.debug(pprint.pformat(data))
370        asserts.assert_true(success, "Error occurred in iPerf traffic.")
371        return rvr_result
372
373    def run_iperf_server(self, network):
374        """Run iperf RX throughput after connection.
375
376        Args:
377            params: Dictionary with network info.
378
379        Returns:
380            rvr_result: Dict containing rvr_results
381        """
382        rvr_result = []
383        self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format(
384            self.ssid,self.angle[self.ag],self.DB))
385        wait_time = 5
386        SSID = network[WifiEnums.SSID_KEY]
387        self.log.info("Starting iperf traffic RX through {}".format(SSID))
388        time.sleep(wait_time)
389        port_arg = "-p {} -J -R {}".format(self.iperf_server.port,
390                                           self.rvr_test_params["iperf_port_arg"])
391        success, data = self.dut.run_iperf_client(
392            self.rvr_test_params["iperf_server_address"],
393            port_arg,
394            timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
395        # Parse and log result
396        client_output_path = os.path.join(
397        self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format(
398            self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB))
399        with open(client_output_path, 'w') as out_file:
400            out_file.write("\n".join(data))
401        self.iperf_server.stop()
402
403        iperf_file = client_output_path
404        try:
405            iperf_result = ipf.IPerfResult(iperf_file)
406            curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
407                self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(
408                    iperf_result.instantaneous_rates[self.rvr_test_params[
409                        "iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
410        except:
411            self.log.warning(
412                "ValueError: Cannot get iperf result. Setting to 0")
413            curr_throughput = 0
414        rvr_result.append(curr_throughput)
415        self.log.info("RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
416            self.DB, curr_throughput))
417
418        self.log.debug(pprint.pformat(data))
419        asserts.assert_true(success, "Error occurred in iPerf traffic.")
420        return rvr_result
421
422    def iperf_test_func(self,network):
423        """Main function to test iperf TX/RX.
424
425        Args:
426            params: Dictionary with network info
427        """
428        if "rvr_test_params" in self.user_params:
429            # Initialize
430            rvr_result = {}
431            # Run RvR and log result
432            wifiinfo = self.getwifiinfo()
433            rvr_result["throughput_TX"] = self.run_iperf_client(network)
434            rvr_result["throughput_RX"] = self.run_iperf_server(network)
435            rvr_result["test_name"] = self.ssid
436            rvr_result["test_angle"] = self.angle[self.ag]
437            rvr_result["test_dB"] = self.DB
438            rvr_result["test_RSSI"] = wifiinfo[0]
439            rvr_result["test_LS"] = wifiinfo[1]
440            rvr_result["test_FR"] = wifiinfo[2]
441            self.post_process_results(rvr_result)
442
443    def rvr_test(self,network):
444        """Test function to run RvR.
445
446        The function runs an RvR test in the current device/AP configuration.
447        Function is called from another wrapper function that sets up the
448        testbed for the RvR test
449
450        Args:
451            params: Dictionary with network info
452        """
453        wait_time = 5
454        utils.subprocess.check_output('adb root', shell=True, timeout=20)
455        self.ssid = network[WifiEnums.SSID_KEY]
456        self.log.info("Start rvr test")
457        for i in range(len(self.angle)):
458          self.setDG(self.T1,self.angle[i])
459          time.sleep(wait_time)
460          self.checkDG(self.T1,self.angle[i])
461          self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,0)
462          time.sleep(wait_time)
463          self.connect_to_wifi_network(network)
464          self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.MindB)
465          for j in range(self.MindB,self.MaxdB+self.stepdB,self.stepdB):
466            self.DB=j
467            self.ag=i
468            self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.DB)
469            self.iperf_test_func(network)
470          wutils.reset_wifi(self.dut)
471
472    """Tests"""
473
474    @test_tracker_info(uuid="93816af8-4c63-45f8-b296-cb49fae0b158")
475    def test_iot_connection_to_RVR_2G(self):
476        ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
477        self.rvr_test(self.ssid_map[ssid_key])
478
479    @test_tracker_info(uuid="e1a67e13-946f-4d91-aa73-3f945438a1ac")
480    def test_iot_connection_to_RVR_5G(self):
481        ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
482        self.rvr_test(self.ssid_map[ssid_key])