• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import json
18import os
19import threading
20import time
21
22from acts.base_test import BaseTestClass
23from acts.controllers import android_device
24from acts.controllers import relay_device_controller
25from acts.libs.proc.process import Process
26from acts.test_utils.bt.bt_test_utils import disable_bluetooth
27from acts.test_utils.bt.bt_test_utils import enable_bluetooth
28from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
29from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
30from acts.test_utils.coex.coex_test_utils import A2dpDumpsysParser
31from acts.test_utils.coex.coex_test_utils import (
32    collect_bluetooth_manager_dumpsys_logs)
33from acts.test_utils.coex.coex_test_utils import configure_and_start_ap
34from acts.test_utils.coex.coex_test_utils import check_wifi_status
35from acts.test_utils.coex.coex_test_utils import iperf_result
36from acts.test_utils.coex.coex_test_utils import get_phone_ip
37from acts.test_utils.coex.coex_test_utils import parse_fping_results
38from acts.test_utils.coex.coex_test_utils import wifi_connection_check
39from acts.test_utils.wifi import wifi_retail_ap as retail_ap
40from acts.test_utils.wifi.wifi_test_utils import reset_wifi
41from acts.test_utils.wifi.wifi_test_utils import wifi_connect
42from acts.test_utils.wifi.wifi_test_utils import wifi_test_device_init
43from acts.test_utils.wifi.wifi_test_utils import wifi_toggle_state
44from acts.utils import create_dir
45
46AVRCP_WAIT_TIME = 3
47
48
49class CoexBaseTest(BaseTestClass):
50
51    def __init__(self, controllers):
52        super().__init__(controllers)
53        self.pri_ad = self.android_devices[0]
54        if len(self.android_devices) == 2:
55            self.sec_ad = self.android_devices[1]
56        elif len(self.android_devices) == 3:
57            self.third_ad = self.android_devices[2]
58
59    class IperfVariables:
60
61        def __init__(self, current_test_name):
62            self.iperf_started = False
63            self.bidirectional_client_path = None
64            self.bidirectional_server_path = None
65            self.iperf_server_path = None
66            self.iperf_client_path = None
67            self.received = []
68            self.protocol = current_test_name.split("_")[-2]
69            self.stream = current_test_name.split("_")[-1]
70            self.is_bidirectional = False
71            if self.stream == "bidirectional":
72                self.is_bidirectional = True
73
74    def setup_class(self):
75        self.counter = 0
76        self.thread_list = []
77        if not setup_multiple_devices_for_bt_test(self.android_devices):
78            self.log.error("Failed to setup devices for bluetooth test")
79            return False
80        req_params = ["network", "iperf"]
81        opt_params = [
82            "AccessPoint", "RetailAccessPoints", "RelayDevice", "IPerfClient",
83            "required_devices"
84        ]
85        self.unpack_userparams(req_params, opt_params)
86        if hasattr(self, "RelayDevice"):
87            self.audio_receiver = self.relay_devices[0]
88            self.audio_receiver.power_on()
89            self.headset_mac_address = self.audio_receiver.mac_address
90        else:
91            self.log.warning("Missing Relay config file.")
92        if hasattr(self, "AccessPoint"):
93            self.ap = self.access_points[0]
94            configure_and_start_ap(self.ap, self.network)
95        elif hasattr(self, "RetailAccessPoints"):
96            self.retail_access_points = retail_ap.create(
97                self.RetailAccessPoints)
98            self.retail_access_point = self.retail_access_points[0]
99            band = self.retail_access_point.band_lookup_by_channel(
100                self.network["channel"])
101            self.retail_access_point.set_channel(band, self.network["channel"])
102        else:
103            self.log.warning("config file have no access point information")
104        if hasattr(self, "IPerfClient"):
105            self.log.info("Iperfclient is given in config file")
106            self.iperf_client = self.iperf_clients[0]
107        else:
108            self.log.warning("Iperfclient is not given in config file")
109        wifi_test_device_init(self.pri_ad)
110        wifi_connect(self.pri_ad, self.network)
111
112    def setup_test(self):
113        self.tag = 0
114        self.result = {}
115        self.dev_list = {}
116        self.iperf_variables = self.IperfVariables(self.current_test_name)
117        self.a2dp_dumpsys = A2dpDumpsysParser()
118        self.log_path = os.path.join(self.pri_ad.log_path,
119                self.current_test_name)
120        create_dir(self.log_path)
121        self.json_file = os.path.join(self.log_path, "test_results.json")
122        for a in self.android_devices:
123            a.ed.clear_all_events()
124        if not wifi_connection_check(self.pri_ad, self.network["SSID"]):
125            self.log.error("Wifi connection does not exist")
126            return False
127        if not enable_bluetooth(self.pri_ad.droid, self.pri_ad.ed):
128            self.log.error("Failed to enable bluetooth")
129            return False
130        if hasattr(self, "required_devices"):
131            if ("discovery" in self.current_test_name or
132                    "ble" in self.current_test_name):
133                self.create_android_relay_object()
134        else:
135            self.log.warning("required_devices is not given in config file")
136
137    def teardown_test(self):
138        self.parsing_results()
139        self.teardown_result()
140        with open(self.json_file, 'a') as results_file:
141            json.dump(self.result, results_file, indent=4, sort_keys=True)
142        if not disable_bluetooth(self.pri_ad.droid):
143            self.log.info("Failed to disable bluetooth")
144            return False
145        self.destroy_android_and_relay_object()
146
147    def teardown_class(self):
148        if hasattr(self, "AccessPoint"):
149            self.ap.close()
150        self.reset_wifi_and_store_results()
151
152    def reset_wifi_and_store_results(self):
153        """Resets wifi and store test results."""
154        reset_wifi(self.pri_ad)
155        wifi_toggle_state(self.pri_ad, False)
156
157    def create_android_relay_object(self):
158        """Creates android device object and relay device object if required
159        devices has android device and relay device."""
160        if "AndroidDevice" in self.required_devices:
161            self.inquiry_devices = android_device.create(
162                self.required_devices["AndroidDevice"])
163            self.dev_list["AndroidDevice"] = self.inquiry_devices
164        if "RelayDevice" in self.required_devices:
165            self.relay = relay_device_controller.create(
166                self.required_devices["RelayDevice"])
167            self.dev_list["RelayDevice"] = self.relay
168
169    def destroy_android_and_relay_object(self):
170        """Destroys android device object and relay device object if required
171        devices has android device and relay device."""
172        if hasattr(self, "required_devices"):
173            if ("discovery" in self.current_test_name or
174                    "ble" in self.current_test_name):
175                if hasattr(self, "inquiry_devices"):
176                    for device in range(len(self.inquiry_devices)):
177                        inquiry_device = self.inquiry_devices[device]
178                        if not disable_bluetooth(inquiry_device.droid):
179                            self.log.info("Failed to disable bluetooth")
180                    android_device.destroy(self.inquiry_devices)
181                if hasattr(self, "relay"):
182                    relay_device_controller.destroy(self.relay)
183
184    def parsing_results(self):
185        """Result parser for fping results and a2dp packet drops."""
186        if "fping" in self.current_test_name:
187            output_path = "{}{}{}".format(self.pri_ad.log_path, "/Fping/",
188                                          "fping_%s.txt" % self.counter)
189            self.result["fping_loss%"] = parse_fping_results(
190                self.fping_params["fping_drop_tolerance"], output_path)
191            self.counter = +1
192        if "a2dp_streaming" in self.current_test_name:
193            file_path = collect_bluetooth_manager_dumpsys_logs(
194                self.pri_ad, self.current_test_name)
195            self.result["a2dp_packet_drop"] = (
196                self.a2dp_dumpsys.parse(file_path))
197            if self.result["a2dp_packet_drop"] == 0:
198                self.result["a2dp_packet_drop"] = None
199
200    def start_iperf_server_on_shell(self, server_port):
201        """Starts iperf server on android device with specified.
202
203        Args:
204            server_port: Port in which server should be started.
205        """
206        log_path = os.path.join(self.pri_ad.log_path, self.current_test_name,
207                                "iPerf{}".format(server_port))
208        self.iperf_server = "iperf3 -s -p {} -J".format(server_port)
209        create_dir(log_path)
210        out_file_name = "{}/IPerfServer,{},{}.log".format(
211            log_path,
212            server_port,
213            self.tag,
214        )
215
216        cmd = "adb -s {} shell {}".format(self.pri_ad.serial, self.iperf_server)
217
218        def appender_iperf_logs(line):
219            with open(out_file_name, 'a') as f:
220                f.writelines(line)
221                f.writelines("\n")
222
223        process = Process(
224            cmd.split()).set_on_output_callback(appender_iperf_logs)
225        process.start()
226
227        self.iperf_process.extend(
228            (self.pri_ad.adb.shell("pgrep iperf3").split('\n')))
229        self.iperf_variables.iperf_started = True
230        self.log.info("iperf_process list with pid={}".format(
231            self.iperf_process))
232        return out_file_name
233
234    def stop_iperf_server_on_shell(self):
235        """Stops all the instances of iperf server on shell."""
236        self.log.info("Stopping iperf server")
237        for process in self.iperf_process:
238            self.pri_ad.adb.shell("kill -9 {}".format(process))
239
240    def run_iperf_and_get_result(self):
241        """Frames iperf command based on test and starts iperf client on
242        host machine.
243        """
244        self.iperf_process = []
245        self.flag_list = []
246        if self.iperf_variables.is_bidirectional:
247            self.iperf_variables.bidirectional_server_path = (
248                self.start_iperf_server_on_shell(self.iperf["port_2"]))
249        self.iperf_variables.iperf_server_path = self.start_iperf_server_on_shell(
250            self.iperf["port_1"])
251        if self.iperf_variables.protocol == "tcp":
252            self.iperf_args = "-t {} -p {} {} -J".format(
253                self.iperf["duration"], self.iperf["port_1"],
254                self.iperf["tcp_window_size"])
255        else:
256            self.iperf_args = "-t {} -p {} -u {} -J".format(
257                self.iperf["duration"], self.iperf["port_1"],
258                self.iperf["udp_bandwidth"])
259
260        if self.iperf_variables.stream == "ul":
261            self.iperf_args += " -R"
262
263        if self.iperf_variables.protocol == "tcp" and (
264                self.iperf_variables.is_bidirectional):
265            self.bidirectional_args = "-t {} -p {} {} -R -J".format(
266                self.iperf["duration"], self.iperf["port_2"],
267                self.iperf["tcp_window_size"])
268        elif self.iperf_variables.protocol == "udp" and (
269                self.iperf_variables.is_bidirectional):
270            self.bidirectional_args = ("-t {} -p {} -u {} -R -J".format(
271                self.iperf["duration"], self.iperf["port_2"],
272                self.iperf["udp_bandwidth"]))
273        if self.iperf_variables.is_bidirectional:
274            args = [
275                lambda: self.run_iperf(self.iperf_args),
276                lambda: self.run_iperf(self.bidirectional_args,
277                                       bidirectional=True)
278            ]
279            self.run_thread(args)
280        else:
281            args = [lambda: self.run_iperf(self.iperf_args)]
282            self.run_thread(args)
283        return True
284
285    def run_iperf(self, iperf_args, bidirectional=False):
286        """Gets android device ip and start iperf client from host machine to
287        that ip and parses the iperf result.
288
289        Args:
290            iperf_args: Iperf parameters to run traffic.
291            bidirectional: True if testcase has bidirectional traffic.
292        """
293        ip = get_phone_ip(self.pri_ad)
294
295        args = [
296            lambda: check_wifi_status(self.pri_ad, self.network,
297                                      self.iperf["ssh_config"])
298        ]
299        self.run_thread(args)
300        if bidirectional:
301            self.tag = self.tag + 1
302            self.iperf_variables.bidirectional_client_path = (
303                    self.iperf_client.start(ip,
304                                            self.bidirectional_args,
305                                            self.tag))
306        else:
307            self.tag = self.tag + 1
308            self.iperf_variables.iperf_client_path = (
309                    self.iperf_client.start(ip,
310                                            iperf_args,
311                                            self.tag))
312
313        return True
314
315    def result_parser(self, iperf_args, bidirectional=False):
316        """Parses iperf result.
317
318        Args:
319            iperf_args: Iperf parameters.
320            bidirectional: True if testcase has bidirectional traffic.
321        """
322        if self.iperf_variables.protocol == "udp":
323            if "-R" in iperf_args:
324                if bidirectional:
325                    received = iperf_result(
326                        self.log, self.iperf_variables.protocol,
327                        self.iperf_variables.bidirectional_client_path)
328                else:
329                    received = iperf_result(
330                        self.log, self.iperf_variables.protocol,
331                        self.iperf_variables.iperf_client_path)
332            else:
333                received = iperf_result(self.log,
334                                        self.iperf_variables.protocol,
335                                        self.iperf_variables.iperf_server_path)
336        else:
337            if bidirectional:
338                received = iperf_result(
339                    self.log, self.iperf_variables.protocol,
340                    self.iperf_variables.bidirectional_client_path)
341            else:
342                received = iperf_result(self.log,
343                                        self.iperf_variables.protocol,
344                                        self.iperf_variables.iperf_client_path)
345
346        if not received:
347            self.log.error("Iperf failed/stopped")
348            self.flag_list.append(False)
349            self.iperf_variables.received.append(0)
350        else:
351            self.iperf_variables.received.append(
352                str(round(received, 2)) + "Mb/s")
353            self.log.info("Throughput: {} Mb/s".format(received))
354            self.flag_list.append(True)
355        self.result["throughput"] = self.iperf_variables.received
356
357    def on_fail(self, test_name, begin_time):
358        """A function that is executed upon a test case failure.
359
360        Args:
361            test_name: Name of the test that triggered this function.
362            begin_time: Logline format timestamp taken when the test started.
363        """
364        self.log.info("Test {} failed, Fetching Btsnoop logs and bugreport".
365                      format(test_name))
366        take_btsnoop_logs(self.android_devices, self, test_name)
367        self._take_bug_report(test_name, begin_time)
368
369    def run_thread(self, kwargs):
370        """Convenience function to start thread.
371
372        Args:
373            kwargs: Function object to start in thread.
374        """
375        for function in kwargs:
376            self.thread = threading.Thread(target=function)
377            self.thread_list.append(self.thread)
378            self.thread.start()
379
380    def teardown_result(self):
381        """Convenience function to join thread and fetch iperf result
382        if iperf is started."""
383        if self.iperf_variables.iperf_started:
384            self.teardown_thread()
385            self.result_parser(self.iperf_args)
386            if self.iperf_variables.is_bidirectional:
387                self.result_parser(self.bidirectional_args,
388                                   bidirectional=True)
389            if False in self.flag_list:
390                return False
391        return True
392
393    def teardown_thread(self):
394        """Convenience function to join thread."""
395        for thread_id in self.thread_list:
396            if thread_id.is_alive():
397                thread_id.join()
398        self.stop_iperf_server_on_shell()
399
400    def get_call_volume(self):
401        """Function to get call volume when bluetooth headset connected.
402
403        Returns:
404            Call volume.
405        """
406        return self.pri_ad.adb.shell(
407            "settings list system|grep volume_bluetooth_sco_bt_sco_hs")
408
409    def change_volume(self):
410        """Changes volume with HFP call.
411
412        Returns: True if successful, otherwise False.
413        """
414        if "Volume_up" and "Volume_down" in (
415                self.relay_devices[0].relays.keys()):
416            current_volume = self.get_call_volume()
417            self.audio_receiver.press_volume_down()
418            time.sleep(AVRCP_WAIT_TIME)  # wait till volume_changes
419            if current_volume == self.get_call_volume():
420                self.log.error("Decrease volume failed")
421                return False
422            time.sleep(AVRCP_WAIT_TIME)
423            current_volume = self.get_call_volume()
424            self.audio_receiver.press_volume_up()
425            time.sleep(AVRCP_WAIT_TIME)  # wait till volume_changes
426            if current_volume == self.get_call_volume():
427                self.log.error("Increase volume failed")
428                return False
429        else:
430            self.log.warning("No volume control pins specfied in relay config.")
431        return True
432