1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Test suite to perform Stress test on A2DP with Wlan. 18 19Test Setup: 20 21One Android device. 22One A2DP Headset connected to Relay. 23""" 24import time 25 26from acts.test_utils.bt import BtEnum 27from acts.test_utils.bt.bt_test_utils import clear_bonded_devices 28from acts.test_utils.coex.audio_test_utils import SshAudioCapture 29from acts.test_utils.coex.CoexBaseTest import CoexBaseTest 30from acts.test_utils.coex.coex_test_utils import connect_dev_to_headset 31from acts.test_utils.coex.coex_test_utils import disconnect_headset_from_dev 32from acts.test_utils.coex.coex_test_utils import multithread_func 33from acts.test_utils.coex.coex_test_utils import music_play_and_check 34from acts.test_utils.coex.coex_test_utils import pair_and_connect_headset 35from acts.test_utils.coex.coex_test_utils import push_music_to_android_device 36 37 38class CoexA2dpStressTest(CoexBaseTest): 39 40 def __init__(self, controllers): 41 super().__init__(controllers) 42 43 def setup_class(self): 44 super().setup_class() 45 req_params = ["iterations", "audio_params", "headset_mac_address"] 46 self.unpack_userparams(req_params) 47 if hasattr(self, "audio_params"): 48 if self.audio_params["music_file"]: 49 self.music_file_to_play = push_music_to_android_device( 50 self.pri_ad, self.audio_params) 51 if not self.music_file_to_play: 52 self.log.error("Music file push failed.") 53 return False 54 else: 55 self.log.warning("No Music files pushed to play.") 56 57 def setup_test(self): 58 super().setup_test() 59 if "a2dp_streaming" in self.current_test_name: 60 self.audio = SshAudioCapture(self.audio_params, self.log_path) 61 if hasattr(self, "RelayDevice"): 62 self.audio_receiver.pairing_mode() 63 time.sleep(5) #Wait time until headset goes into pairing mode. 64 if not pair_and_connect_headset( 65 self.pri_ad, self.headset_mac_address, 66 set([BtEnum.BluetoothProfile.A2DP.value])): 67 self.log.error("Failed to pair and connect to headset") 68 return False 69 70 def teardown_test(self): 71 clear_bonded_devices(self.pri_ad) 72 if hasattr(self, "RelayDevice"): 73 self.audio_receiver.clean_up() 74 if "a2dp_streaming" in self.current_test_name: 75 analysis_path = self.audio.audio_quality_analysis(self.log_path) 76 with open(analysis_path) as f: 77 self.result["audio_artifacts"] = f.readline() 78 self.audio.terminate_and_store_audio_results() 79 super().teardown_test() 80 81 def connect_disconnect_headset(self): 82 """Initiates connection to paired headset and disconnects headset. 83 84 Returns: 85 True if successful False otherwise. 86 """ 87 for i in range(self.iterations): 88 self.log.info("Headset connect/disconnect iteration={}".format(i)) 89 self.pri_ad.droid.bluetoothConnectBonded( 90 self.headset_mac_address) 91 time.sleep(2) #Wait time until device gets connected. 92 self.pri_ad.droid.bluetoothDisconnectConnected( 93 self.headset_mac_address) 94 return True 95 96 def connect_disconnect_a2dp_headset(self): 97 """Connect and disconnect a2dp profile on headset for multiple 98 iterations. 99 100 Steps: 101 1.Connect a2dp profile on headset. 102 2.Disconnect a2dp profile on headset. 103 3.Repeat step 1 and 2 for N iterations. 104 105 Returns: 106 True if successful, False otherwise. 107 """ 108 for i in range(self.iterations): 109 if not connect_dev_to_headset(self.pri_ad, 110 self.headset_mac_address, 111 {BtEnum.BluetoothProfile.A2DP.value}): 112 self.log.error("Failure to connect A2dp headset.") 113 return False 114 115 if not disconnect_headset_from_dev( 116 self.pri_ad, self.headset_mac_address, 117 [BtEnum.BluetoothProfile.A2DP.value]): 118 self.log.error("Could not disconnect {}".format( 119 self.headset_mac_address)) 120 return False 121 return True 122 123 def connect_disconnect_headset_with_iperf(self): 124 """Wrapper function to start iperf traffic and connect/disconnect 125 to headset for N iterations. 126 """ 127 self.run_iperf_and_get_result() 128 if not self.connect_disconnect_headset(): 129 return False 130 return self.teardown_result() 131 132 def connect_disconnect_a2dp_headset_with_iperf(self): 133 """Wrapper function to start iperf traffic and connect/disconnect 134 to a2dp headset for N iterations. 135 """ 136 self.run_iperf_and_get_result() 137 if not self.connect_disconnect_a2dp_headset(): 138 return False 139 return self.teardown_result() 140 141 def music_streaming_with_iperf(self): 142 """Wrapper function to start iperf traffic and music streaming.""" 143 tasks = [(self.audio.capture_audio, ()), 144 (music_play_and_check, 145 (self.pri_ad, self.headset_mac_address, 146 self.music_file_to_play, 147 self.audio_params["music_play_time"])), 148 (self.run_iperf_and_get_result, ())] 149 if not multithread_func(self.log, tasks): 150 return False 151 return self.teardown_result() 152 153 def test_stress_connect_disconnect_headset_with_tcp_ul(self): 154 """Stress test for connect/disconnect headset. 155 156 This test is to start TCP-uplink traffic between host machine and 157 android device and test the integrity of connection and disconnection 158 to headset. 159 160 Steps: 161 1. Run TCP-uplink traffic. 162 2. Connect and disconnect headset. 163 3. Repeat step 2 for N iterations. 164 165 Returns: 166 True if successful, False otherwise. 167 168 Test Id: Bt_CoEx_Stress_013 169 """ 170 if not self.connect_disconnect_headset_with_iperf(): 171 return False 172 return True 173 174 def test_stress_connect_disconnect_headset_with_tcp_dl(self): 175 """Stress test for connect/disconnect headset. 176 177 This test is to start TCP-downlink traffic between host machine and 178 android device and test the integrity of connection and disconnection 179 to headset. 180 181 Steps: 182 1. Run TCP-downlink traffic. 183 2. Connect and disconnect headset. 184 3. Repeat step 2 for N iterations. 185 186 Returns: 187 True if successful, False otherwise. 188 189 Test Id: Bt_CoEx_Stress_014 190 """ 191 if not self.connect_disconnect_headset_with_iperf(): 192 return False 193 return True 194 195 def test_stress_connect_disconnect_headset_with_udp_ul(self): 196 """Stress test for connect/disconnect headset. 197 198 This test is to start UDP-uplink traffic between host machine and 199 android device and test the integrity of connection and disconnection 200 to headset. 201 202 Steps: 203 1. Run UDP-uplink traffic. 204 2. Connect and disconnect headset. 205 3. Repeat step 2 for N iterations. 206 207 Returns: 208 True if successful, False otherwise. 209 210 Test Id: Bt_CoEx_Stress_015 211 """ 212 if not self.connect_disconnect_headset_with_iperf(): 213 return False 214 return True 215 216 def test_stress_connect_disconnect_headset_with_udp_dl(self): 217 """Stress test for connect/disconnect headset. 218 219 This test is to start UDP-downlink traffic between host machine and 220 android device and test the integrity of connection and disconnection 221 to headset. 222 223 Steps: 224 1. Run UDP-downlink traffic. 225 2. Connect and disconnect headset. 226 3. Repeat step 2 for N iterations. 227 228 Returns: 229 True if successful, False otherwise. 230 231 Test Id: Bt_CoEx_Stress_016 232 """ 233 if not self.connect_disconnect_headset_with_iperf(): 234 return False 235 return True 236 237 def test_stress_a2dp_streaming_long_duration_with_tcp_ul(self): 238 """Stress test to stream music to headset continuously for 12 hours. 239 240 This test is to start TCP-uplink traffic between host machine and 241 android device and test the integrity of audio streaming for 12 hours. 242 243 Steps: 244 1. Start TCP uplink traffic. 245 2. Start music streaming to headset. 246 247 Returns: 248 True if successful, False otherwise. 249 250 Test Id: Bt_CoEx_Stress_017 251 """ 252 if not self.music_streaming_with_iperf(): 253 return False 254 return True 255 256 def test_stress_a2dp_streaming_long_duration_with_tcp_dl(self): 257 """Stress test to stream music to headset continuously for 12 hours. 258 259 This test is to start TCP-downlink traffic between host machine and 260 android device and test the integrity of audio streaming for 12 hours. 261 262 Steps: 263 1. Start TCP downlink traffic. 264 2. Start music streaming to headset. 265 266 Returns: 267 True if successful, False otherwise. 268 269 Test Id: Bt_CoEx_Stress_018 270 """ 271 if not self.music_streaming_with_iperf(): 272 return False 273 return True 274 275 def test_stress_a2dp_streaming_long_duration_with_udp_ul(self): 276 """Stress test to stream music to headset continuously for 12 hours. 277 278 This test is to start UDP-uplink traffic between host machine and 279 android device and test the integrity of audio streaming for 12 hours. 280 281 Steps: 282 1. Start UDP uplink traffic. 283 2. Start music streaming to headset. 284 285 Returns: 286 True if successful, False otherwise. 287 288 Test Id: Bt_CoEx_Stress_019 289 """ 290 if not self.music_streaming_with_iperf(): 291 return False 292 return True 293 294 def test_stress_a2dp_streaming_long_duration_with_udp_dl(self): 295 """Stress test to stream music to headset continuously for 12 hours. 296 297 This test is to start UDP-downlink traffic between host machine and 298 android device and test the integrity of audio streaming for 12 hours. 299 300 Steps: 301 1. Start UDP downlink traffic. 302 2. Start music streaming to headset. 303 304 Returns: 305 True if successful, False otherwise. 306 307 Test Id: Bt_CoEx_Stress_020 308 """ 309 if not self.music_streaming_with_iperf(): 310 return False 311 return True 312 313 def test_stress_connect_disconnect_a2dp_profile_with_tcp_ul(self): 314 """Stress test for connect/disconnect a2dp headset. 315 316 This test is to start TCP-uplink traffic between host machine and 317 android device and test the integrity of connection and disconnection 318 to headset with a2dp profile. 319 320 Steps: 321 1. Run TCP-uplink traffic. 322 2. Connect and disconnect headset with a2dp profile. 323 3. Repeat step 2 for N iterations. 324 325 Returns: 326 True if successful, False otherwise. 327 328 Test Id: Bt_CoEx_Stress_029 329 """ 330 if not self.connect_disconnect_a2dp_headset_with_iperf(): 331 return False 332 return True 333 334 def test_stress_connect_disconnect_a2dp_profile_with_tcp_dl(self): 335 """Stress test for connect/disconnect a2dp headset. 336 337 This test is to start TCP-downlink traffic between host machine and 338 android device and test the integrity of connection and disconnection 339 to headset with a2dp profile. 340 341 Steps: 342 1. Run TCP-downlink traffic. 343 2. Connect and disconnect headset with a2dp profile. 344 3. Repeat step 2 for N iterations. 345 346 Returns: 347 True if successful, False otherwise. 348 349 Test Id: Bt_CoEx_Stress_030 350 """ 351 if not self.connect_disconnect_a2dp_headset_with_iperf(): 352 return False 353 return True 354 355 def test_stress_connect_disconnect_a2dp_profile_with_udp_ul(self): 356 """Stress test for connect/disconnect a2dp headset. 357 358 This test is to start UDP-uplink traffic between host machine and 359 android device and test the integrity of connection and disconnection 360 to headset with a2dp profile. 361 362 Steps: 363 1. Run UDP-uplink traffic. 364 2. Connect and disconnect headset with a2dp profile. 365 3. Repeat step 2 for N iterations. 366 367 Returns: 368 True if successful, False otherwise. 369 370 Test Id: Bt_CoEx_Stress_031 371 """ 372 if not self.connect_disconnect_a2dp_headset_with_iperf(): 373 return False 374 return True 375 376 def test_stress_connect_disconnect_a2dp_profile_with_udp_dl(self): 377 """Stress test for connect/disconnect a2dp headset. 378 379 This test is to start UDP-downlink traffic between host machine and 380 android device and test the integrity of connection and disconnection 381 to headset with a2dp profile. 382 383 Steps: 384 1. Run UDP-downlink traffic. 385 2. Connect and disconnect headset with a2dp profile. 386 3. Repeat step 2 for N iterations. 387 388 Returns: 389 True if successful, False otherwise. 390 391 Test Id: Bt_CoEx_Stress_032 392 """ 393 if not self.connect_disconnect_a2dp_headset_with_iperf(): 394 return False 395 return True 396 397 def test_stress_connect_disconnect_headset_with_tcp_bidirectional(self): 398 """Stress test for connect/disconnect headset. 399 400 This test starts TCP-bidirectional traffic between host machine and 401 android device and test the integrity of connection and disconnection 402 to headset. 403 404 Steps: 405 1. Run TCP-bidirectional traffic. 406 2. Connect and disconnect headset. 407 3. Repeat step 2 for N iterations. 408 409 Returns: 410 True if successful, False otherwise. 411 412 Test Id: Bt_CoEx_Stress_057 413 """ 414 if not self.connect_disconnect_headset_with_iperf(): 415 return False 416 return True 417 418 def test_stress_connect_disconnect_headset_with_udp_bidirectional(self): 419 """Stress test for connect/disconnect headset. 420 421 This test starts UDP-bidirectional traffic between host machin and 422 android device and test the integrity of connection and disconnection 423 to headset. 424 425 Steps: 426 1. Run UDP-bidirectional traffic. 427 2. Connect and disconnect headset. 428 3. Repeat step 2 for N iterations. 429 430 Returns: 431 True if successful, False otherwise. 432 433 Test Id: Bt_CoEx_Stress_058 434 """ 435 if not self.connect_disconnect_headset_with_iperf(): 436 return False 437 return True 438 439 def test_stress_a2dp_streaming_long_duration_with_tcp_bidirectional(self): 440 """Stress test to stream music to headset continuously for 12 hours. 441 442 This test starts TCP-bidirectional traffic between host machin and 443 android device and test the integrity of audio streaming for 12 hours. 444 445 Steps: 446 1. Start TCP bidirectional traffic. 447 2. Start music streaming to headset. 448 449 Returns: 450 True if successful, False otherwise. 451 452 Test Id: Bt_CoEx_Stress_065 453 """ 454 if not self.music_streaming_with_iperf(): 455 return False 456 return True 457 458 def test_stress_a2dp_streaming_long_duration_with_udp_bidirectional(self): 459 """Stress test to stream music to headset continuously for 12 hours. 460 461 This test starts UDP-bidirectional traffic between host machin and 462 android device and test the integrity of audio streaming for 12 hours. 463 464 Steps: 465 1. Start UDP bidirectional traffic. 466 2. Start music streaming to headset. 467 468 Returns: 469 True if successful, False otherwise. 470 471 Test Id: Bt_CoEx_Stress_066 472 """ 473 if not self.music_streaming_with_iperf(): 474 return False 475 return True 476