1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import os 19import random 20import re 21import string 22import threading 23import time 24try: 25 import pandas as pd 26except ModuleNotFoundError: 27 pass 28from queue import Empty 29from subprocess import call 30from acts import asserts 31from acts_contrib.test_utils.bt.bt_constants import adv_fail 32from acts_contrib.test_utils.bt.bt_constants import adv_succ 33from acts_contrib.test_utils.bt.bt_constants import batch_scan_not_supported_list 34from acts_contrib.test_utils.bt.bt_constants import batch_scan_result 35from acts_contrib.test_utils.bt.bt_constants import bits_per_samples 36from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes 37from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers 38from acts_contrib.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed 39from acts_contrib.test_utils.bt.bt_constants import bluetooth_off 40from acts_contrib.test_utils.bt.bt_constants import bluetooth_on 41from acts_contrib.test_utils.bt.bt_constants import \ 42 bluetooth_profile_connection_state_changed 43from acts_contrib.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid 44from acts_contrib.test_utils.bt.bt_constants import bt_default_timeout 45from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants 46from acts_contrib.test_utils.bt.bt_constants import bt_profile_states 47from acts_contrib.test_utils.bt.bt_constants import bt_rfcomm_uuids 48from acts_contrib.test_utils.bt.bt_constants import bt_scan_mode_types 49from acts_contrib.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device 50from acts_contrib.test_utils.bt.bt_constants import btsnoop_log_path_on_device 51from acts_contrib.test_utils.bt.bt_constants import channel_modes 52from acts_contrib.test_utils.bt.bt_constants import codec_types 53from acts_contrib.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms 54from acts_contrib.test_utils.bt.bt_constants import default_rfcomm_timeout_ms 55from acts_contrib.test_utils.bt.bt_constants import hid_id_keyboard 56from acts_contrib.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation 57from acts_contrib.test_utils.bt.bt_constants import pan_connect_timeout 58from acts_contrib.test_utils.bt.bt_constants import sample_rates 59from acts_contrib.test_utils.bt.bt_constants import scan_result 60from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants 61from acts_contrib.test_utils.bt.bt_constants import small_timeout 62from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb 63from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection 64from acts.utils import exe_cmd 65 66from acts import utils 67 68log = logging 69 70advertisements_to_devices = {} 71 72 73class BtTestUtilsError(Exception): 74 pass 75 76 77def _add_android_device_to_dictionary(android_device, profile_list, 78 selector_dict): 79 """Adds the AndroidDevice and supported features to the selector dictionary 80 81 Args: 82 android_device: The Android device. 83 profile_list: The list of profiles the Android device supports. 84 """ 85 for profile in profile_list: 86 if profile in selector_dict and android_device not in selector_dict[ 87 profile]: 88 selector_dict[profile].append(android_device) 89 else: 90 selector_dict[profile] = [android_device] 91 92 93def bluetooth_enabled_check(ad, timeout_sec=5): 94 """Checks if the Bluetooth state is enabled, if not it will attempt to 95 enable it. 96 97 Args: 98 ad: The Android device list to enable Bluetooth on. 99 timeout_sec: number of seconds to wait for toggle to take effect. 100 101 Returns: 102 True if successful, false if unsuccessful. 103 """ 104 if not ad.droid.bluetoothCheckState(): 105 ad.droid.bluetoothToggleState(True) 106 expected_bluetooth_on_event_name = bluetooth_on 107 try: 108 ad.ed.pop_event(expected_bluetooth_on_event_name, 109 bt_default_timeout) 110 except Empty: 111 ad.log.info("Failed to toggle Bluetooth on(no broadcast received).") 112 # Try one more time to poke at the actual state. 113 if ad.droid.bluetoothCheckState(): 114 ad.log.info(".. actual state is ON") 115 return True 116 ad.log.error(".. actual state is OFF") 117 return False 118 end_time = time.time() + timeout_sec 119 while not ad.droid.bluetoothCheckState() and time.time() < end_time: 120 time.sleep(1) 121 return ad.droid.bluetoothCheckState() 122 123 124def check_device_supported_profiles(droid): 125 """Checks for Android device supported profiles. 126 127 Args: 128 droid: The droid object to query. 129 130 Returns: 131 A dictionary of supported profiles. 132 """ 133 profile_dict = {} 134 profile_dict['hid'] = droid.bluetoothHidIsReady() 135 profile_dict['hsp'] = droid.bluetoothHspIsReady() 136 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 137 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 138 profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady() 139 profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady() 140 profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady() 141 return profile_dict 142 143 144def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, 145 adv_android_device, adv_callback_list): 146 """Try to gracefully stop all scanning and advertising instances. 147 148 Args: 149 scn_android_device: The Android device that is actively scanning. 150 scn_callback_list: The scan callback id list that needs to be stopped. 151 adv_android_device: The Android device that is actively advertising. 152 adv_callback_list: The advertise callback id list that needs to be 153 stopped. 154 """ 155 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 156 adv_droid = adv_android_device.droid 157 try: 158 for scan_callback in scn_callback_list: 159 scan_droid.bleStopBleScan(scan_callback) 160 except Exception as err: 161 scn_android_device.log.debug( 162 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 163 err)) 164 reset_bluetooth([scn_android_device]) 165 try: 166 for adv_callback in adv_callback_list: 167 adv_droid.bleStopBleAdvertising(adv_callback) 168 except Exception as err: 169 adv_android_device.log.debug( 170 "Failed to stop LE advertisement... reseting Bluetooth. Error {}". 171 format(err)) 172 reset_bluetooth([adv_android_device]) 173 174 175def clear_bonded_devices(ad): 176 """Clear bonded devices from the input Android device. 177 178 Args: 179 ad: the Android device performing the connection. 180 Returns: 181 True if clearing bonded devices was successful, false if unsuccessful. 182 """ 183 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 184 while bonded_device_list: 185 device_address = bonded_device_list[0]['address'] 186 if not ad.droid.bluetoothUnbond(device_address): 187 log.error("Failed to unbond {} from {}".format( 188 device_address, ad.serial)) 189 return False 190 log.info("Successfully unbonded {} from {}".format( 191 device_address, ad.serial)) 192 #TODO: wait for BOND_STATE_CHANGED intent instead of waiting 193 time.sleep(1) 194 195 # If device was first connected using LE transport, after bonding it is 196 # accessible through it's LE address, and through it classic address. 197 # Unbonding it will unbond two devices representing different 198 # "addresses". Attempt to unbond such already unbonded devices will 199 # result in bluetoothUnbond returning false. 200 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 201 return True 202 203 204def connect_phone_to_headset(android, 205 headset, 206 timeout=bt_default_timeout, 207 connection_check_period=10): 208 """Connects android phone to bluetooth headset. 209 Headset object must have methods power_on and enter_pairing_mode, 210 and attribute mac_address. 211 212 Args: 213 android: AndroidDevice object with SL4A installed. 214 headset: Object with attribute mac_address and methods power_on and 215 enter_pairing_mode. 216 timeout: Seconds to wait for devices to connect. 217 connection_check_period: how often to check for connection once the 218 SL4A connect RPC has been sent. 219 Returns: 220 connected (bool): True if devices are paired and connected by end of 221 method. False otherwise. 222 """ 223 headset_mac_address = headset.mac_address 224 connected = android.droid.audioIsBluetoothA2dpOn() 225 log.info('Devices connected before pair attempt: %s' % connected) 226 if not connected: 227 # Turn on headset and initiate pairing mode. 228 headset.enter_pairing_mode() 229 android.droid.bluetoothStartPairingHelper() 230 start_time = time.time() 231 # If already connected, skip pair and connect attempt. 232 while not connected and (time.time() - start_time < timeout): 233 bonded_info = android.droid.bluetoothGetBondedDevices() 234 connected_info = android.droid.bluetoothGetConnectedDevices() 235 if headset.mac_address not in [info["address"] for info in bonded_info]: 236 # Use SL4A to pair and connect with headset. 237 headset.enter_pairing_mode() 238 android.droid.bluetoothDiscoverAndBond(headset_mac_address) 239 elif headset.mac_address not in [ 240 info["address"] for info in connected_info 241 ]: 242 #Device is bonded but not connected 243 android.droid.bluetoothConnectBonded(headset_mac_address) 244 else: 245 #Headset is connected, but A2DP profile is not 246 android.droid.bluetoothA2dpConnect(headset_mac_address) 247 log.info('Waiting for connection...') 248 time.sleep(connection_check_period) 249 # Check for connection. 250 connected = android.droid.audioIsBluetoothA2dpOn() 251 log.info('Devices connected after pair attempt: %s' % connected) 252 return connected 253 254 255def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2): 256 """Connects pri droid to secondary droid. 257 258 Args: 259 pri_ad: AndroidDroid initiating connection 260 sec_ad: AndroidDroid accepting connection 261 profiles_set: Set of profiles to be connected 262 attempts: Number of attempts to try until failure. 263 264 Returns: 265 Pass if True 266 Fail if False 267 """ 268 device_addr = sec_ad.droid.bluetoothGetLocalAddress() 269 # Allows extra time for the SDP records to be updated. 270 time.sleep(2) 271 curr_attempts = 0 272 while curr_attempts < attempts: 273 log.info("connect_pri_to_sec curr attempt {} total {}".format( 274 curr_attempts, attempts)) 275 if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 276 return True 277 curr_attempts += 1 278 log.error("connect_pri_to_sec failed to connect after {} attempts".format( 279 attempts)) 280 return False 281 282 283def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 284 """Connects pri droid to secondary droid. 285 286 Args: 287 pri_ad: AndroidDroid initiating connection. 288 sec_ad: AndroidDroid accepting connection. 289 profiles_set: Set of profiles to be connected. 290 291 Returns: 292 True of connection is successful, false if unsuccessful. 293 """ 294 # Check if we support all profiles. 295 supported_profiles = bt_profile_constants.values() 296 for profile in profiles_set: 297 if profile not in supported_profiles: 298 pri_ad.log.info("Profile {} is not supported list {}".format( 299 profile, supported_profiles)) 300 return False 301 302 # First check that devices are bonded. 303 paired = False 304 for paired_device in pri_ad.droid.bluetoothGetBondedDevices(): 305 if paired_device['address'] == \ 306 sec_ad.droid.bluetoothGetLocalAddress(): 307 paired = True 308 break 309 310 if not paired: 311 pri_ad.log.error("Not paired to {}".format(sec_ad.serial)) 312 return False 313 314 # Now try to connect them, the following call will try to initiate all 315 # connections. 316 pri_ad.droid.bluetoothConnectBonded(sec_ad.droid.bluetoothGetLocalAddress()) 317 318 end_time = time.time() + 10 319 profile_connected = set() 320 sec_addr = sec_ad.droid.bluetoothGetLocalAddress() 321 pri_ad.log.info("Profiles to be connected {}".format(profiles_set)) 322 # First use APIs to check profile connection state 323 while (time.time() < end_time and 324 not profile_connected.issuperset(profiles_set)): 325 if (bt_profile_constants['headset_client'] not in profile_connected and 326 bt_profile_constants['headset_client'] in profiles_set): 327 if is_hfp_client_device_connected(pri_ad, sec_addr): 328 profile_connected.add(bt_profile_constants['headset_client']) 329 if (bt_profile_constants['a2dp'] not in profile_connected and 330 bt_profile_constants['a2dp'] in profiles_set): 331 if is_a2dp_src_device_connected(pri_ad, sec_addr): 332 profile_connected.add(bt_profile_constants['a2dp']) 333 if (bt_profile_constants['a2dp_sink'] not in profile_connected and 334 bt_profile_constants['a2dp_sink'] in profiles_set): 335 if is_a2dp_snk_device_connected(pri_ad, sec_addr): 336 profile_connected.add(bt_profile_constants['a2dp_sink']) 337 if (bt_profile_constants['map_mce'] not in profile_connected and 338 bt_profile_constants['map_mce'] in profiles_set): 339 if is_map_mce_device_connected(pri_ad, sec_addr): 340 profile_connected.add(bt_profile_constants['map_mce']) 341 if (bt_profile_constants['map'] not in profile_connected and 342 bt_profile_constants['map'] in profiles_set): 343 if is_map_mse_device_connected(pri_ad, sec_addr): 344 profile_connected.add(bt_profile_constants['map']) 345 time.sleep(0.1) 346 # If APIs fail, try to find the connection broadcast receiver. 347 while not profile_connected.issuperset(profiles_set): 348 try: 349 profile_event = pri_ad.ed.pop_event( 350 bluetooth_profile_connection_state_changed, 351 bt_default_timeout + 10) 352 pri_ad.log.info("Got event {}".format(profile_event)) 353 except Exception: 354 pri_ad.log.error("Did not get {} profiles left {}".format( 355 bluetooth_profile_connection_state_changed, profile_connected)) 356 return False 357 358 profile = profile_event['data']['profile'] 359 state = profile_event['data']['state'] 360 device_addr = profile_event['data']['addr'] 361 if state == bt_profile_states['connected'] and \ 362 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 363 profile_connected.add(profile) 364 pri_ad.log.info( 365 "Profiles connected until now {}".format(profile_connected)) 366 # Failure happens inside the while loop. If we came here then we already 367 # connected. 368 return True 369 370 371def determine_max_advertisements(android_device): 372 """Determines programatically how many advertisements the Android device 373 supports. 374 375 Args: 376 android_device: The Android device to determine max advertisements of. 377 378 Returns: 379 The maximum advertisement count. 380 """ 381 android_device.log.info( 382 "Determining number of maximum concurrent advertisements...") 383 advertisement_count = 0 384 bt_enabled = False 385 expected_bluetooth_on_event_name = bluetooth_on 386 if not android_device.droid.bluetoothCheckState(): 387 android_device.droid.bluetoothToggleState(True) 388 try: 389 android_device.ed.pop_event(expected_bluetooth_on_event_name, 390 bt_default_timeout) 391 except Exception: 392 android_device.log.info( 393 "Failed to toggle Bluetooth on(no broadcast received).") 394 # Try one more time to poke at the actual state. 395 if android_device.droid.bluetoothCheckState() is True: 396 android_device.log.info(".. actual state is ON") 397 else: 398 android_device.log.error( 399 "Failed to turn Bluetooth on. Setting default advertisements to 1" 400 ) 401 advertisement_count = -1 402 return advertisement_count 403 advertise_callback_list = [] 404 advertise_data = android_device.droid.bleBuildAdvertiseData() 405 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 406 while (True): 407 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 408 advertise_callback_list.append(advertise_callback) 409 410 android_device.droid.bleStartBleAdvertising(advertise_callback, 411 advertise_data, 412 advertise_settings) 413 414 regex = "(" + adv_succ.format( 415 advertise_callback) + "|" + adv_fail.format( 416 advertise_callback) + ")" 417 # wait for either success or failure event 418 evt = android_device.ed.pop_events(regex, bt_default_timeout, 419 small_timeout) 420 if evt[0]["name"] == adv_succ.format(advertise_callback): 421 advertisement_count += 1 422 android_device.log.info( 423 "Advertisement {} started.".format(advertisement_count)) 424 else: 425 error = evt[0]["data"]["Error"] 426 if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS": 427 android_device.log.info( 428 "Advertisement failed to start. Reached max " + 429 "advertisements at {}".format(advertisement_count)) 430 break 431 else: 432 raise BtTestUtilsError( 433 "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," + 434 " but received bad error code {}".format(error)) 435 try: 436 for adv in advertise_callback_list: 437 android_device.droid.bleStopBleAdvertising(adv) 438 except Exception: 439 android_device.log.error( 440 "Failed to stop advertisingment, resetting Bluetooth.") 441 reset_bluetooth([android_device]) 442 return advertisement_count 443 444 445def disable_bluetooth(droid): 446 """Disable Bluetooth on input Droid object. 447 448 Args: 449 droid: The droid object to disable Bluetooth on. 450 451 Returns: 452 True if successful, false if unsuccessful. 453 """ 454 if droid.bluetoothCheckState() is True: 455 droid.bluetoothToggleState(False) 456 if droid.bluetoothCheckState() is True: 457 log.error("Failed to toggle Bluetooth off.") 458 return False 459 return True 460 461 462def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): 463 """ 464 Disconnect primary from secondary on a specific set of profiles 465 Args: 466 pri_ad - Primary android_device initiating disconnection 467 sec_ad - Secondary android droid (sl4a interface to keep the 468 method signature the same connect_pri_to_sec above) 469 profiles_list - List of profiles we want to disconnect from 470 471 Returns: 472 True on Success 473 False on Failure 474 """ 475 # Sanity check to see if all the profiles in the given set is supported 476 supported_profiles = bt_profile_constants.values() 477 for profile in profiles_list: 478 if profile not in supported_profiles: 479 pri_ad.log.info("Profile {} is not in supported list {}".format( 480 profile, supported_profiles)) 481 return False 482 483 pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) 484 # Disconnecting on a already disconnected profile is a nop, 485 # so not checking for the connection state 486 try: 487 pri_ad.droid.bluetoothDisconnectConnectedProfile( 488 sec_ad.droid.bluetoothGetLocalAddress(), profiles_list) 489 except Exception as err: 490 pri_ad.log.error( 491 "Exception while trying to disconnect profile(s) {}: {}".format( 492 profiles_list, err)) 493 return False 494 495 profile_disconnected = set() 496 pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) 497 498 while not profile_disconnected.issuperset(profiles_list): 499 try: 500 profile_event = pri_ad.ed.pop_event( 501 bluetooth_profile_connection_state_changed, bt_default_timeout) 502 pri_ad.log.info("Got event {}".format(profile_event)) 503 except Exception as e: 504 pri_ad.log.error( 505 "Did not disconnect from Profiles. Reason {}".format(e)) 506 return False 507 508 profile = profile_event['data']['profile'] 509 state = profile_event['data']['state'] 510 device_addr = profile_event['data']['addr'] 511 512 if state == bt_profile_states['disconnected'] and \ 513 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 514 profile_disconnected.add(profile) 515 pri_ad.log.info( 516 "Profiles disconnected so far {}".format(profile_disconnected)) 517 518 return True 519 520 521def enable_bluetooth(droid, ed): 522 if droid.bluetoothCheckState() is True: 523 return True 524 525 droid.bluetoothToggleState(True) 526 expected_bluetooth_on_event_name = bluetooth_on 527 try: 528 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 529 except Exception: 530 log.info("Failed to toggle Bluetooth on (no broadcast received)") 531 if droid.bluetoothCheckState() is True: 532 log.info(".. actual state is ON") 533 return True 534 log.info(".. actual state is OFF") 535 return False 536 537 return True 538 539 540def factory_reset_bluetooth(android_devices): 541 """Clears Bluetooth stack of input Android device list. 542 543 Args: 544 android_devices: The Android device list to reset Bluetooth 545 546 Returns: 547 True if successful, false if unsuccessful. 548 """ 549 for a in android_devices: 550 droid, ed = a.droid, a.ed 551 a.log.info("Reset state of bluetooth on device.") 552 if not bluetooth_enabled_check(a): 553 return False 554 # TODO: remove device unbond b/79418045 555 # Temporary solution to ensure all devices are unbonded 556 bonded_devices = droid.bluetoothGetBondedDevices() 557 for b in bonded_devices: 558 a.log.info("Removing bond for device {}".format(b['address'])) 559 droid.bluetoothUnbond(b['address']) 560 561 droid.bluetoothFactoryReset() 562 wait_for_bluetooth_manager_state(droid) 563 if not enable_bluetooth(droid, ed): 564 return False 565 return True 566 567 568def generate_ble_advertise_objects(droid): 569 """Generate generic LE advertise objects. 570 571 Args: 572 droid: The droid object to generate advertise LE objects from. 573 574 Returns: 575 advertise_callback: The generated advertise callback id. 576 advertise_data: The generated advertise data id. 577 advertise_settings: The generated advertise settings id. 578 """ 579 advertise_callback = droid.bleGenBleAdvertiseCallback() 580 advertise_data = droid.bleBuildAdvertiseData() 581 advertise_settings = droid.bleBuildAdvertiseSettings() 582 return advertise_callback, advertise_data, advertise_settings 583 584 585def generate_ble_scan_objects(droid): 586 """Generate generic LE scan objects. 587 588 Args: 589 droid: The droid object to generate LE scan objects from. 590 591 Returns: 592 filter_list: The generated scan filter list id. 593 scan_settings: The generated scan settings id. 594 scan_callback: The generated scan callback id. 595 """ 596 filter_list = droid.bleGenFilterList() 597 scan_settings = droid.bleBuildScanSetting() 598 scan_callback = droid.bleGenScanCallback() 599 return filter_list, scan_settings, scan_callback 600 601 602def generate_id_by_size(size, 603 chars=(string.ascii_lowercase + string.ascii_uppercase + 604 string.digits)): 605 """Generate random ascii characters of input size and input char types 606 607 Args: 608 size: Input size of string. 609 chars: (Optional) Chars to use in generating a random string. 610 611 Returns: 612 String of random input chars at the input size. 613 """ 614 return ''.join(random.choice(chars) for _ in range(size)) 615 616 617def get_advanced_droid_list(android_devices): 618 """Add max_advertisement and batch_scan_supported attributes to input 619 Android devices 620 621 This will programatically determine maximum LE advertisements of each 622 input Android device. 623 624 Args: 625 android_devices: The Android devices to setup. 626 627 Returns: 628 List of Android devices with new attribtues. 629 """ 630 droid_list = [] 631 for a in android_devices: 632 d, e = a.droid, a.ed 633 model = d.getBuildModel() 634 max_advertisements = 1 635 batch_scan_supported = True 636 if model in advertisements_to_devices.keys(): 637 max_advertisements = advertisements_to_devices[model] 638 else: 639 max_advertisements = determine_max_advertisements(a) 640 max_tries = 3 641 # Retry to calculate max advertisements 642 while max_advertisements == -1 and max_tries > 0: 643 a.log.info( 644 "Attempts left to determine max advertisements: {}".format( 645 max_tries)) 646 max_advertisements = determine_max_advertisements(a) 647 max_tries -= 1 648 advertisements_to_devices[model] = max_advertisements 649 if model in batch_scan_not_supported_list: 650 batch_scan_supported = False 651 role = { 652 'droid': d, 653 'ed': e, 654 'max_advertisements': max_advertisements, 655 'batch_scan_supported': batch_scan_supported 656 } 657 droid_list.append(role) 658 return droid_list 659 660 661def get_bluetooth_crash_count(android_device): 662 out = android_device.adb.shell("dumpsys bluetooth_manager") 663 return int(re.search("crashed(.*\d)", out).group(1)) 664 665 666def read_otp(ad): 667 """Reads and parses the OTP output to return TX power backoff 668 669 Reads the OTP registers from the phone, parses them to return a 670 dict of TX power backoffs for different power levels 671 672 Args: 673 ad : android device object 674 675 Returns : 676 otp_dict : power backoff dict 677 """ 678 679 ad.adb.shell('svc bluetooth disable') 680 time.sleep(2) 681 otp_output = ad.adb.shell('bluetooth_sar_test -r') 682 ad.adb.shell('svc bluetooth enable') 683 time.sleep(2) 684 otp_dict = { 685 "BR": { 686 "10": 0, 687 "9": 0, 688 "8": 0 689 }, 690 "EDR": { 691 "10": 0, 692 "9": 0, 693 "8": 0 694 }, 695 "BLE": { 696 "10": 0, 697 "9": 0, 698 "8": 0 699 } 700 } 701 702 otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]' 703 704 for key in otp_dict: 705 bank_list = re.findall("{}{}".format(key, otp_regex), otp_output) 706 for bank_tuple in bank_list: 707 if ('0', '0', '0') != bank_tuple: 708 [otp_dict[key]["10"], otp_dict[key]["9"], 709 otp_dict[key]["8"]] = bank_tuple 710 return otp_dict 711 712 713def get_bt_metric(ad_list, 714 duration=1, 715 bqr_tag='Monitoring , Handle:', 716 tag='', 717 log_path=False): 718 """ Function to get the bt metric from logcat. 719 720 Captures logcat for the specified duration and returns the bqr results. 721 Takes list of android objects as input. If a single android object is given, 722 converts it into a list. 723 724 Args: 725 ad_list: list of android_device objects 726 duration: time duration (seconds) for which the logcat is parsed 727 bqr_tag: tag of bt metrics 728 tag: tag to be appended to the metrics raw data 729 log_path: path of metrics raw data 730 731 Returns: 732 process_data: dict of process raw data for each android devices 733 """ 734 735 # Defining bqr quantites and their regex to extract 736 regex_dict = { 737 "pwlv": "PwLv:\s(\S+)", 738 "rssi": "RSSI:\s[-](\d+)", 739 "rssi_c0": "RSSI_C0:\s[-](\d+)", 740 "rssi_c1": "RSSI_C1:\s[-](\d+)", 741 "txpw_c0": "\sTxPw_C0:\s(-?\d+)", 742 "txpw_c1": "\sTxPw_C1:\s(-?\d+)", 743 "bftx": "BFTx:\s(\w+)", 744 "divtx": "DivTx:\s(\w+)" 745 } 746 metrics_dict = { 747 "rssi": {}, 748 "pwlv": {}, 749 "rssi_c0": {}, 750 "rssi_c1": {}, 751 "txpw_c0": {}, 752 "txpw_c1": {}, 753 "bftx": {}, 754 "divtx": {} 755 } 756 757 # Converting a single android device object to list 758 if not isinstance(ad_list, list): 759 ad_list = [ad_list] 760 761 #Time sync with the test machine 762 for ad in ad_list: 763 ad.droid.setTime(int(round(time.time() * 1000))) 764 time.sleep(0.5) 765 766 begin_time = utils.get_current_epoch_time() 767 time.sleep(duration) 768 end_time = utils.get_current_epoch_time() 769 770 for ad in ad_list: 771 bt_rssi_log = ad.cat_adb_log(tag + "_bt_metric", begin_time, end_time) 772 773 # Extracting supporting bqr quantities 774 for metric, regex in regex_dict.items(): 775 bqr_metric = [] 776 file_bt_log = open(bt_rssi_log, "r") 777 for line in file_bt_log: 778 if bqr_tag in line: 779 if re.findall(regex, line): 780 m = re.findall(regex, line)[0].strip(",") 781 bqr_metric.append(m) 782 metrics_dict[metric][ad.serial] = bqr_metric 783 file_bt_log.close() 784 785 # Formatting and saving the raw data 786 metrics_to_be_formatted = [{ 787 "name": "rssi", 788 "averagble": "y" 789 }, { 790 "name": "rssi_c0", 791 "averagble": "y" 792 }, { 793 "name": "rssi_c1", 794 "averagble": "y" 795 }, { 796 "name": "pwlv", 797 "averagble": "n" 798 }, { 799 "name": "txpw_c0", 800 "averagble": "n" 801 }, { 802 "name": "txpw_c1", 803 "averagble": "n" 804 }, { 805 "name": "bftx", 806 "averagble": "n" 807 }, { 808 "name": "divtx", 809 "averagble": "n" 810 }] 811 for metric in metrics_to_be_formatted: 812 if metric["averagble"] == "y": 813 metrics_dict[metric["name"]][ad.serial] = [ 814 (-1) * int(x) 815 for x in metrics_dict[metric["name"]][ad.serial] 816 ] 817 else: 818 metrics_dict[metric["name"]][ad.serial] = [ 819 int(x, 16) if '0x' in x else int(x, 10) 820 for x in metrics_dict[metric["name"]][ad.serial] 821 ] 822 # Saving metrics raw data for each attenuation 823 if log_path: 824 output_file_name = ad.serial + "_metrics_raw_data_" + tag + ".csv" 825 output_file = os.path.join(log_path, output_file_name) 826 os.makedirs(log_path, exist_ok=True) 827 df_save_metrics = {} 828 for item in metrics_dict.items(): 829 df_save_metrics[item[0]] = next(iter(item[1].items()))[1] 830 MetricsDict_df = pd.DataFrame({key:pd.Series(value) for key, value in df_save_metrics.items()}) 831 MetricsDict_df.to_csv(output_file) 832 # Defining the process_data_dict 833 process_data = { 834 "rssi": {}, 835 "pwlv": {}, 836 "rssi_c0": {}, 837 "rssi_c1": {}, 838 "txpw_c0": {}, 839 "txpw_c1": {}, 840 "bftx": {}, 841 "divtx": {} 842 } 843 844 # Computing and returning the raw data 845 for metric in metrics_to_be_formatted: 846 if metric["averagble"] == "y": 847 process_data[metric["name"]][ad.serial] = [ 848 x for x in metrics_dict[metric["name"]][ad.serial] 849 if x != 0 and x != -127 850 ] 851 852 try: 853 #DOING AVERAGE 854 process_data[metric["name"]][ad.serial] = round( 855 sum(metrics_dict[metric["name"]][ad.serial]) / 856 len(metrics_dict[metric["name"]][ad.serial]), 2) 857 except ZeroDivisionError: 858 #SETTING VALUE TO 'n/a' 859 process_data[metric["name"]][ad.serial] = "n/a" 860 else: 861 try: 862 #GETTING MOST_COMMON_VALUE 863 process_data[metric["name"]][ad.serial] = max( 864 metrics_dict[metric["name"]][ad.serial], 865 key=metrics_dict[metric["name"]][ad.serial].count) 866 except ValueError: 867 #SETTING VALUE TO 'n/a' 868 process_data[metric["name"]][ad.serial] = "n/a" 869 870 return process_data 871 872 873def get_bt_rssi(ad, duration=1, processed=True, tag='', log_path=False): 874 """Function to get average bt rssi from logcat. 875 876 This function returns the average RSSI for the given duration. RSSI values are 877 extracted from BQR. 878 879 Args: 880 ad: (list of) android_device object. 881 duration: time duration(seconds) for which logcat is parsed. 882 883 Returns: 884 avg_rssi: average RSSI on each android device for the given duration. 885 """ 886 bqr_results = get_bt_metric(ad, duration, tag=tag, log_path=log_path) 887 return bqr_results["rssi"] 888 889 890def enable_bqr( 891 ad_list, 892 bqr_interval=10, 893 bqr_event_mask=15, 894): 895 """Sets up BQR reporting. 896 897 Sets up BQR to report BT metrics at the requested frequency and toggles 898 airplane mode for the bqr settings to take effect. 899 900 Args: 901 ad_list: an android_device or list of android devices. 902 """ 903 # Converting a single android device object to list 904 if not isinstance(ad_list, list): 905 ad_list = [ad_list] 906 907 for ad in ad_list: 908 #Setting BQR parameters 909 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 910 bqr_event_mask)) 911 ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format( 912 bqr_interval)) 913 914 ## Toggle airplane mode 915 ad.droid.connectivityToggleAirplaneMode(True) 916 ad.droid.connectivityToggleAirplaneMode(False) 917 918 919def disable_bqr(ad_list): 920 """Disables BQR reporting. 921 922 Args: 923 ad_list: an android_device or list of android devices. 924 """ 925 # Converting a single android device object to list 926 if not isinstance(ad_list, list): 927 ad_list = [ad_list] 928 929 DISABLE_BQR_MASK = 0 930 931 for ad in ad_list: 932 #Disabling BQR 933 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 934 DISABLE_BQR_MASK)) 935 936 ## Toggle airplane mode 937 ad.droid.connectivityToggleAirplaneMode(True) 938 ad.droid.connectivityToggleAirplaneMode(False) 939 940 941def get_device_selector_dictionary(android_device_list): 942 """Create a dictionary of Bluetooth features vs Android devices. 943 944 Args: 945 android_device_list: The list of Android devices. 946 Returns: 947 A dictionary of profiles/features to Android devices. 948 """ 949 selector_dict = {} 950 for ad in android_device_list: 951 uuids = ad.droid.bluetoothGetLocalUuids() 952 953 for profile, uuid_const in sig_uuid_constants.items(): 954 uuid_check = sig_uuid_constants['BASE_UUID'].format( 955 uuid_const).lower() 956 if uuids and uuid_check in uuids: 957 if profile in selector_dict: 958 selector_dict[profile].append(ad) 959 else: 960 selector_dict[profile] = [ad] 961 962 # Various services may not be active during BT startup. 963 # If the device can be identified through adb shell pm list features 964 # then try to add them to the appropriate profiles / features. 965 966 # Android TV. 967 if "feature:android.hardware.type.television" in ad.features: 968 ad.log.info("Android TV device found.") 969 supported_profiles = ['AudioSink'] 970 _add_android_device_to_dictionary(ad, supported_profiles, 971 selector_dict) 972 973 # Android Auto 974 elif "feature:android.hardware.type.automotive" in ad.features: 975 ad.log.info("Android Auto device found.") 976 # Add: AudioSink , A/V_RemoteControl, 977 supported_profiles = [ 978 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' 979 ] 980 _add_android_device_to_dictionary(ad, supported_profiles, 981 selector_dict) 982 # Android Wear 983 elif "feature:android.hardware.type.watch" in ad.features: 984 ad.log.info("Android Wear device found.") 985 supported_profiles = [] 986 _add_android_device_to_dictionary(ad, supported_profiles, 987 selector_dict) 988 # Android Phone 989 elif "feature:android.hardware.telephony" in ad.features: 990 ad.log.info("Android Phone device found.") 991 # Add: AudioSink 992 supported_profiles = [ 993 'AudioSource', 'A/V_RemoteControlTarget', 994 'Message Access Server' 995 ] 996 _add_android_device_to_dictionary(ad, supported_profiles, 997 selector_dict) 998 return selector_dict 999 1000 1001def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 1002 """Start generic advertisement and get it's mac address by LE scanning. 1003 1004 Args: 1005 scan_ad: The Android device to use as the scanner. 1006 adv_ad: The Android device to use as the advertiser. 1007 1008 Returns: 1009 mac_address: The mac address of the advertisement. 1010 advertise_callback: The advertise callback id of the active 1011 advertisement. 1012 """ 1013 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 1014 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 1015 ble_advertise_settings_modes['low_latency']) 1016 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 1017 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 1018 ble_advertise_settings_tx_powers['high']) 1019 advertise_callback, advertise_data, advertise_settings = ( 1020 generate_ble_advertise_objects(adv_ad.droid)) 1021 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 1022 advertise_settings) 1023 try: 1024 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 1025 bt_default_timeout) 1026 except Empty as err: 1027 raise BtTestUtilsError( 1028 "Advertiser did not start successfully {}".format(err)) 1029 filter_list = scan_ad.droid.bleGenFilterList() 1030 scan_settings = scan_ad.droid.bleBuildScanSetting() 1031 scan_callback = scan_ad.droid.bleGenScanCallback() 1032 scan_ad.droid.bleSetScanFilterDeviceName( 1033 adv_ad.droid.bluetoothGetLocalName()) 1034 scan_ad.droid.bleBuildScanFilter(filter_list) 1035 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 1036 try: 1037 event = scan_ad.ed.pop_event( 1038 "BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 1039 except Empty as err: 1040 raise BtTestUtilsError( 1041 "Scanner did not find advertisement {}".format(err)) 1042 mac_address = event['data']['Result']['deviceInfo']['address'] 1043 return mac_address, advertise_callback, scan_callback 1044 1045 1046def hid_device_send_key_data_report(host_id, device_ad, key, interval=1): 1047 """Send a HID report simulating a 1-second keyboard press from host_ad to 1048 device_ad 1049 1050 Args: 1051 host_id: the Bluetooth MAC address or name of the HID host 1052 device_ad: HID device 1053 key: the key we want to send 1054 interval: the interval between key press and key release 1055 """ 1056 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 1057 hid_keyboard_report(key)) 1058 time.sleep(interval) 1059 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 1060 hid_keyboard_report("00")) 1061 1062 1063def hid_keyboard_report(key, modifier="00"): 1064 """Get the HID keyboard report for the given key 1065 1066 Args: 1067 key: the key we want 1068 modifier: HID keyboard modifier bytes 1069 Returns: 1070 The byte array for the HID report. 1071 """ 1072 return str( 1073 bytearray.fromhex(" ".join( 1074 [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8") 1075 1076 1077def is_a2dp_connected(sink, source): 1078 """ 1079 Convenience Function to see if the 2 devices are connected on 1080 A2dp. 1081 Args: 1082 sink: Audio Sink 1083 source: Audio Source 1084 Returns: 1085 True if Connected 1086 False if Not connected 1087 """ 1088 1089 devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices() 1090 for device in devices: 1091 sink.log.info("A2dp Connected device {}".format(device["name"])) 1092 if (device["address"] == source.droid.bluetoothGetLocalAddress()): 1093 return True 1094 return False 1095 1096 1097def is_a2dp_snk_device_connected(ad, addr): 1098 """Determines if an AndroidDevice has A2DP snk connectivity to input address 1099 1100 Args: 1101 ad: the Android device 1102 addr: the address that's expected 1103 Returns: 1104 True if connection was successful, false if unsuccessful. 1105 """ 1106 devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices() 1107 ad.log.info("Connected A2DP Sink devices: {}".format(devices)) 1108 if addr in {d['address'] for d in devices}: 1109 return True 1110 return False 1111 1112 1113def is_a2dp_src_device_connected(ad, addr): 1114 """Determines if an AndroidDevice has A2DP connectivity to input address 1115 1116 Args: 1117 ad: the Android device 1118 addr: the address that's expected 1119 Returns: 1120 True if connection was successful, false if unsuccessful. 1121 """ 1122 devices = ad.droid.bluetoothA2dpGetConnectedDevices() 1123 ad.log.info("Connected A2DP Source devices: {}".format(devices)) 1124 if addr in {d['address'] for d in devices}: 1125 return True 1126 return False 1127 1128 1129def is_hfp_client_device_connected(ad, addr): 1130 """Determines if an AndroidDevice has HFP connectivity to input address 1131 1132 Args: 1133 ad: the Android device 1134 addr: the address that's expected 1135 Returns: 1136 True if connection was successful, false if unsuccessful. 1137 """ 1138 devices = ad.droid.bluetoothHfpClientGetConnectedDevices() 1139 ad.log.info("Connected HFP Client devices: {}".format(devices)) 1140 if addr in {d['address'] for d in devices}: 1141 return True 1142 return False 1143 1144 1145def is_map_mce_device_connected(ad, addr): 1146 """Determines if an AndroidDevice has MAP MCE connectivity to input address 1147 1148 Args: 1149 ad: the Android device 1150 addr: the address that's expected 1151 Returns: 1152 True if connection was successful, false if unsuccessful. 1153 """ 1154 devices = ad.droid.bluetoothMapClientGetConnectedDevices() 1155 ad.log.info("Connected MAP MCE devices: {}".format(devices)) 1156 if addr in {d['address'] for d in devices}: 1157 return True 1158 return False 1159 1160 1161def is_map_mse_device_connected(ad, addr): 1162 """Determines if an AndroidDevice has MAP MSE connectivity to input address 1163 1164 Args: 1165 ad: the Android device 1166 addr: the address that's expected 1167 Returns: 1168 True if connection was successful, false if unsuccessful. 1169 """ 1170 devices = ad.droid.bluetoothMapGetConnectedDevices() 1171 ad.log.info("Connected MAP MSE devices: {}".format(devices)) 1172 if addr in {d['address'] for d in devices}: 1173 return True 1174 return False 1175 1176 1177def kill_bluetooth_process(ad): 1178 """Kill Bluetooth process on Android device. 1179 1180 Args: 1181 ad: Android device to kill BT process on. 1182 """ 1183 ad.log.info("Killing Bluetooth process.") 1184 pid = ad.adb.shell( 1185 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 1186 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 1187 1188 1189def log_energy_info(android_devices, state): 1190 """Logs energy info of input Android devices. 1191 1192 Args: 1193 android_devices: input Android device list to log energy info from. 1194 state: the input state to log. Usually 'Start' or 'Stop' for logging. 1195 1196 Returns: 1197 A logging string of the Bluetooth energy info reported. 1198 """ 1199 return_string = "{} Energy info collection:\n".format(state) 1200 # Bug: b/31966929 1201 return return_string 1202 1203 1204def orchestrate_and_verify_pan_connection(pan_dut, panu_dut): 1205 """Setups up a PAN conenction between two android devices. 1206 1207 Args: 1208 pan_dut: the Android device providing tethering services 1209 panu_dut: the Android device using the internet connection from the 1210 pan_dut 1211 Returns: 1212 True if PAN connection and verification is successful, 1213 false if unsuccessful. 1214 """ 1215 if not toggle_airplane_mode_by_adb(log, panu_dut, True): 1216 panu_dut.log.error("Failed to toggle airplane mode on") 1217 return False 1218 if not toggle_airplane_mode_by_adb(log, panu_dut, False): 1219 pan_dut.log.error("Failed to toggle airplane mode off") 1220 return False 1221 pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1222 panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1223 if not bluetooth_enabled_check(panu_dut): 1224 return False 1225 if not bluetooth_enabled_check(pan_dut): 1226 return False 1227 pan_dut.droid.bluetoothPanSetBluetoothTethering(True) 1228 if not (pair_pri_to_sec(pan_dut, panu_dut)): 1229 return False 1230 if not pan_dut.droid.bluetoothPanIsTetheringOn(): 1231 pan_dut.log.error("Failed to enable Bluetooth tethering.") 1232 return False 1233 # Magic sleep needed to give the stack time in between bonding and 1234 # connecting the PAN profile. 1235 time.sleep(pan_connect_timeout) 1236 panu_dut.droid.bluetoothConnectBonded( 1237 pan_dut.droid.bluetoothGetLocalAddress()) 1238 if not verify_http_connection(log, panu_dut): 1239 panu_dut.log.error("Can't verify http connection on PANU device.") 1240 if not verify_http_connection(log, pan_dut): 1241 pan_dut.log.info( 1242 "Can't verify http connection on PAN service device") 1243 return False 1244 return True 1245 1246 1247def orchestrate_bluetooth_socket_connection( 1248 client_ad, 1249 server_ad, 1250 accept_timeout_ms=default_bluetooth_socket_timeout_ms, 1251 uuid=None): 1252 """Sets up the Bluetooth Socket connection between two Android devices. 1253 1254 Args: 1255 client_ad: the Android device performing the connection. 1256 server_ad: the Android device accepting the connection. 1257 Returns: 1258 True if connection was successful, false if unsuccessful. 1259 """ 1260 server_ad.droid.bluetoothStartPairingHelper() 1261 client_ad.droid.bluetoothStartPairingHelper() 1262 1263 server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( 1264 (bluetooth_socket_conn_test_uuid if uuid is None else uuid), 1265 accept_timeout_ms) 1266 client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( 1267 server_ad.droid.bluetoothGetLocalAddress(), 1268 (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) 1269 1270 end_time = time.time() + bt_default_timeout 1271 result = False 1272 test_result = True 1273 while time.time() < end_time: 1274 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0: 1275 test_result = True 1276 client_ad.log.info("Bluetooth socket Client Connection Active") 1277 break 1278 else: 1279 test_result = False 1280 time.sleep(1) 1281 if not test_result: 1282 client_ad.log.error("Failed to establish a Bluetooth socket connection") 1283 return False 1284 return True 1285 1286 1287def orchestrate_rfcomm_connection(client_ad, 1288 server_ad, 1289 accept_timeout_ms=default_rfcomm_timeout_ms, 1290 uuid=None): 1291 """Sets up the RFCOMM connection between two Android devices. 1292 1293 Args: 1294 client_ad: the Android device performing the connection. 1295 server_ad: the Android device accepting the connection. 1296 Returns: 1297 True if connection was successful, false if unsuccessful. 1298 """ 1299 result = orchestrate_bluetooth_socket_connection( 1300 client_ad, server_ad, accept_timeout_ms, 1301 (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid)) 1302 1303 return result 1304 1305 1306def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): 1307 """Pairs pri droid to secondary droid. 1308 1309 Args: 1310 pri_ad: Android device initiating connection 1311 sec_ad: Android device accepting connection 1312 attempts: Number of attempts to try until failure. 1313 auto_confirm: Auto confirm passkey match for both devices 1314 1315 Returns: 1316 Pass if True 1317 Fail if False 1318 """ 1319 pri_ad.droid.bluetoothStartConnectionStateChangeMonitor( 1320 sec_ad.droid.bluetoothGetLocalAddress()) 1321 curr_attempts = 0 1322 while curr_attempts < attempts: 1323 if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1324 return True 1325 # Wait 2 seconds before unbound 1326 time.sleep(2) 1327 if not clear_bonded_devices(pri_ad): 1328 log.error( 1329 "Failed to clear bond for primary device at attempt {}".format( 1330 str(curr_attempts))) 1331 return False 1332 if not clear_bonded_devices(sec_ad): 1333 log.error("Failed to clear bond for secondary device at attempt {}". 1334 format(str(curr_attempts))) 1335 return False 1336 # Wait 2 seconds after unbound 1337 time.sleep(2) 1338 curr_attempts += 1 1339 log.error("pair_pri_to_sec failed to connect after {} attempts".format( 1340 str(attempts))) 1341 return False 1342 1343 1344def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1345 # Enable discovery on sec_ad so that pri_ad can find it. 1346 # The timeout here is based on how much time it would take for two devices 1347 # to pair with each other once pri_ad starts seeing devices. 1348 pri_droid = pri_ad.droid 1349 sec_droid = sec_ad.droid 1350 pri_ad.ed.clear_all_events() 1351 sec_ad.ed.clear_all_events() 1352 log.info("Bonding device {} to {}".format( 1353 pri_droid.bluetoothGetLocalAddress(), 1354 sec_droid.bluetoothGetLocalAddress())) 1355 sec_droid.bluetoothMakeDiscoverable(bt_default_timeout) 1356 target_address = sec_droid.bluetoothGetLocalAddress() 1357 log.debug("Starting paring helper on each device") 1358 pri_droid.bluetoothStartPairingHelper(auto_confirm) 1359 sec_droid.bluetoothStartPairingHelper(auto_confirm) 1360 pri_ad.log.info("Primary device starting discovery and executing bond") 1361 result = pri_droid.bluetoothDiscoverAndBond(target_address) 1362 if not auto_confirm: 1363 if not _wait_for_passkey_match(pri_ad, sec_ad): 1364 return False 1365 # Loop until we have bonded successfully or timeout. 1366 end_time = time.time() + bt_default_timeout 1367 pri_ad.log.info("Verifying devices are bonded") 1368 while time.time() < end_time: 1369 bonded_devices = pri_droid.bluetoothGetBondedDevices() 1370 bonded = False 1371 for d in bonded_devices: 1372 if d['address'] == target_address: 1373 pri_ad.log.info("Successfully bonded to device") 1374 return True 1375 time.sleep(0.1) 1376 # Timed out trying to bond. 1377 pri_ad.log.info("Failed to bond devices.") 1378 return False 1379 1380 1381def reset_bluetooth(android_devices): 1382 """Resets Bluetooth state of input Android device list. 1383 1384 Args: 1385 android_devices: The Android device list to reset Bluetooth state on. 1386 1387 Returns: 1388 True if successful, false if unsuccessful. 1389 """ 1390 for a in android_devices: 1391 droid, ed = a.droid, a.ed 1392 a.log.info("Reset state of bluetooth on device.") 1393 if droid.bluetoothCheckState() is True: 1394 droid.bluetoothToggleState(False) 1395 expected_bluetooth_off_event_name = bluetooth_off 1396 try: 1397 ed.pop_event(expected_bluetooth_off_event_name, 1398 bt_default_timeout) 1399 except Exception: 1400 a.log.error("Failed to toggle Bluetooth off.") 1401 return False 1402 # temp sleep for b/17723234 1403 time.sleep(3) 1404 if not bluetooth_enabled_check(a): 1405 return False 1406 return True 1407 1408 1409def scan_and_verify_n_advertisements(scn_ad, max_advertisements): 1410 """Verify that input number of advertisements can be found from the scanning 1411 Android device. 1412 1413 Args: 1414 scn_ad: The Android device to start LE scanning on. 1415 max_advertisements: The number of advertisements the scanner expects to 1416 find. 1417 1418 Returns: 1419 True if successful, false if unsuccessful. 1420 """ 1421 test_result = False 1422 address_list = [] 1423 filter_list = scn_ad.droid.bleGenFilterList() 1424 scn_ad.droid.bleBuildScanFilter(filter_list) 1425 scan_settings = scn_ad.droid.bleBuildScanSetting() 1426 scan_callback = scn_ad.droid.bleGenScanCallback() 1427 scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 1428 start_time = time.time() 1429 while (start_time + bt_default_timeout) > time.time(): 1430 event = None 1431 try: 1432 event = scn_ad.ed.pop_event(scan_result.format(scan_callback), 1433 bt_default_timeout) 1434 except Empty as error: 1435 raise BtTestUtilsError( 1436 "Failed to find scan event: {}".format(error)) 1437 address = event['data']['Result']['deviceInfo']['address'] 1438 if address not in address_list: 1439 address_list.append(address) 1440 if len(address_list) == max_advertisements: 1441 test_result = True 1442 break 1443 scn_ad.droid.bleStopBleScan(scan_callback) 1444 return test_result 1445 1446 1447def set_bluetooth_codec(android_device, 1448 codec_type, 1449 sample_rate, 1450 bits_per_sample, 1451 channel_mode, 1452 codec_specific_1=0): 1453 """Sets the A2DP codec configuration on the AndroidDevice. 1454 1455 Args: 1456 android_device (acts.controllers.android_device.AndroidDevice): the 1457 android device for which to switch the codec. 1458 codec_type (str): the desired codec type. Must be a key in 1459 bt_constants.codec_types. 1460 sample_rate (str): the desired sample rate. Must be a key in 1461 bt_constants.sample_rates. 1462 bits_per_sample (str): the desired bits per sample. Must be a key in 1463 bt_constants.bits_per_samples. 1464 channel_mode (str): the desired channel mode. Must be a key in 1465 bt_constants.channel_modes. 1466 codec_specific_1 (int): the desired bit rate (quality) for LDAC codec. 1467 Returns: 1468 bool: True if the codec config was successfully changed to the desired 1469 values. Else False. 1470 """ 1471 message = ("Set Android Device A2DP Bluetooth codec configuration:\n" 1472 "\tCodec: {codec_type}\n" 1473 "\tSample Rate: {sample_rate}\n" 1474 "\tBits per Sample: {bits_per_sample}\n" 1475 "\tChannel Mode: {channel_mode}".format( 1476 codec_type=codec_type, 1477 sample_rate=sample_rate, 1478 bits_per_sample=bits_per_sample, 1479 channel_mode=channel_mode)) 1480 android_device.log.info(message) 1481 1482 # Send SL4A command 1483 droid, ed = android_device.droid, android_device.ed 1484 if not droid.bluetoothA2dpSetCodecConfigPreference( 1485 codec_types[codec_type], sample_rates[str(sample_rate)], 1486 bits_per_samples[str(bits_per_sample)], channel_modes[channel_mode], 1487 codec_specific_1): 1488 android_device.log.warning("SL4A command returned False. Codec was not " 1489 "changed.") 1490 else: 1491 try: 1492 ed.pop_event(bluetooth_a2dp_codec_config_changed, 1493 bt_default_timeout) 1494 except Exception: 1495 android_device.log.warning("SL4A event not registered. Codec " 1496 "may not have been changed.") 1497 1498 # Validate codec value through ADB 1499 # TODO (aidanhb): validate codec more robustly using SL4A 1500 command = "dumpsys bluetooth_manager | grep -i 'current codec'" 1501 out = android_device.adb.shell(command) 1502 split_out = out.split(": ") 1503 if len(split_out) != 2: 1504 android_device.log.warning("Could not verify codec config change " 1505 "through ADB.") 1506 elif split_out[1].strip().upper() != codec_type: 1507 android_device.log.error("Codec config was not changed.\n" 1508 "\tExpected codec: {exp}\n" 1509 "\tActual codec: {act}".format( 1510 exp=codec_type, act=split_out[1].strip())) 1511 return False 1512 android_device.log.info("Bluetooth codec successfully changed.") 1513 return True 1514 1515 1516def set_bt_scan_mode(ad, scan_mode_value): 1517 """Set Android device's Bluetooth scan mode. 1518 1519 Args: 1520 ad: The Android device to set the scan mode on. 1521 scan_mode_value: The value to set the scan mode to. 1522 1523 Returns: 1524 True if successful, false if unsuccessful. 1525 """ 1526 droid, ed = ad.droid, ad.ed 1527 if scan_mode_value == bt_scan_mode_types['state_off']: 1528 disable_bluetooth(droid) 1529 scan_mode = droid.bluetoothGetScanMode() 1530 reset_bluetooth([ad]) 1531 if scan_mode != scan_mode_value: 1532 return False 1533 elif scan_mode_value == bt_scan_mode_types['none']: 1534 droid.bluetoothMakeUndiscoverable() 1535 scan_mode = droid.bluetoothGetScanMode() 1536 if scan_mode != scan_mode_value: 1537 return False 1538 elif scan_mode_value == bt_scan_mode_types['connectable']: 1539 droid.bluetoothMakeUndiscoverable() 1540 droid.bluetoothMakeConnectable() 1541 scan_mode = droid.bluetoothGetScanMode() 1542 if scan_mode != scan_mode_value: 1543 return False 1544 elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']): 1545 droid.bluetoothMakeDiscoverable() 1546 scan_mode = droid.bluetoothGetScanMode() 1547 if scan_mode != scan_mode_value: 1548 return False 1549 else: 1550 # invalid scan mode 1551 return False 1552 return True 1553 1554 1555def set_device_name(droid, name): 1556 """Set and check Bluetooth local name on input droid object. 1557 1558 Args: 1559 droid: Droid object to set local name on. 1560 name: the Bluetooth local name to set. 1561 1562 Returns: 1563 True if successful, false if unsuccessful. 1564 """ 1565 droid.bluetoothSetLocalName(name) 1566 time.sleep(2) 1567 droid_name = droid.bluetoothGetLocalName() 1568 if droid_name != name: 1569 return False 1570 return True 1571 1572 1573def set_profile_priority(host_ad, client_ad, profiles, priority): 1574 """Sets the priority of said profile(s) on host_ad for client_ad""" 1575 for profile in profiles: 1576 host_ad.log.info("Profile {} on {} for {} set to priority {}".format( 1577 profile, host_ad.droid.bluetoothGetLocalName(), 1578 client_ad.droid.bluetoothGetLocalAddress(), priority.value)) 1579 if bt_profile_constants['a2dp_sink'] == profile: 1580 host_ad.droid.bluetoothA2dpSinkSetPriority( 1581 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1582 elif bt_profile_constants['headset_client'] == profile: 1583 host_ad.droid.bluetoothHfpClientSetPriority( 1584 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1585 elif bt_profile_constants['pbap_client'] == profile: 1586 host_ad.droid.bluetoothPbapClientSetPriority( 1587 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1588 else: 1589 host_ad.log.error( 1590 "Profile {} not yet supported for priority settings".format( 1591 profile)) 1592 1593 1594def setup_multiple_devices_for_bt_test(android_devices): 1595 """A common setup routine for Bluetooth on input Android device list. 1596 1597 Things this function sets up: 1598 1. Resets Bluetooth 1599 2. Set Bluetooth local name to random string of size 4 1600 3. Disable BLE background scanning. 1601 4. Enable Bluetooth snoop logging. 1602 1603 Args: 1604 android_devices: Android device list to setup Bluetooth on. 1605 1606 Returns: 1607 True if successful, false if unsuccessful. 1608 """ 1609 log.info("Setting up Android Devices") 1610 # TODO: Temp fix for an selinux error. 1611 for ad in android_devices: 1612 ad.adb.shell("setenforce 0") 1613 threads = [] 1614 try: 1615 for a in android_devices: 1616 thread = threading.Thread(target=factory_reset_bluetooth, 1617 args=([[a]])) 1618 threads.append(thread) 1619 thread.start() 1620 for t in threads: 1621 t.join() 1622 1623 for a in android_devices: 1624 d = a.droid 1625 # TODO: Create specific RPC command to instantiate 1626 # BluetoothConnectionFacade. This is just a workaround. 1627 d.bluetoothStartConnectionStateChangeMonitor("") 1628 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 1629 if not setup_result: 1630 a.log.error("Failed to set device name.") 1631 return setup_result 1632 d.bluetoothDisableBLE() 1633 utils.set_location_service(a, True) 1634 bonded_devices = d.bluetoothGetBondedDevices() 1635 for b in bonded_devices: 1636 a.log.info("Removing bond for device {}".format(b['address'])) 1637 d.bluetoothUnbond(b['address']) 1638 for a in android_devices: 1639 a.adb.shell("setprop persist.bluetooth.btsnooplogmode full") 1640 getprop_result = a.adb.shell( 1641 "getprop persist.bluetooth.btsnooplogmode") == "full" 1642 if not getprop_result: 1643 a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.") 1644 except Exception as err: 1645 log.error("Something went wrong in multi device setup: {}".format(err)) 1646 return False 1647 return setup_result 1648 1649 1650def setup_n_advertisements(adv_ad, num_advertisements): 1651 """Setup input number of advertisements on input Android device. 1652 1653 Args: 1654 adv_ad: The Android device to start LE advertisements on. 1655 num_advertisements: The number of advertisements to start. 1656 1657 Returns: 1658 advertise_callback_list: List of advertisement callback ids. 1659 """ 1660 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 1661 ble_advertise_settings_modes['low_latency']) 1662 advertise_data = adv_ad.droid.bleBuildAdvertiseData() 1663 advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings() 1664 advertise_callback_list = [] 1665 for i in range(num_advertisements): 1666 advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback() 1667 advertise_callback_list.append(advertise_callback) 1668 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 1669 advertise_settings) 1670 try: 1671 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 1672 bt_default_timeout) 1673 adv_ad.log.info("Advertisement {} started.".format(i + 1)) 1674 except Empty as error: 1675 adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) 1676 raise BtTestUtilsError( 1677 "Test failed with Empty error: {}".format(error)) 1678 return advertise_callback_list 1679 1680 1681def take_btsnoop_log(ad, testcase, testname): 1682 """Grabs the btsnoop_hci log on a device and stores it in the log directory 1683 of the test class. 1684 1685 If you want grab the btsnoop_hci log, call this function with android_device 1686 objects in on_fail. Bug report takes a relative long time to take, so use 1687 this cautiously. 1688 1689 Args: 1690 ad: The android_device instance to take bugreport on. 1691 testcase: Name of the test calss that triggered this snoop log. 1692 testname: Name of the test case that triggered this bug report. 1693 """ 1694 testname = "".join(x for x in testname if x.isalnum()) 1695 serial = ad.serial 1696 device_model = ad.droid.getBuildModel() 1697 device_model = device_model.replace(" ", "") 1698 out_name = ','.join((testname, device_model, serial)) 1699 snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs') 1700 os.makedirs(snoop_path, exist_ok=True) 1701 cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, " ", 1702 snoop_path + '/' + out_name, ".btsnoop_hci.log")) 1703 exe_cmd(cmd) 1704 try: 1705 cmd = ''.join( 1706 ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ", 1707 snoop_path + '/' + out_name, ".btsnoop_hci.log.last")) 1708 exe_cmd(cmd) 1709 except Exception as err: 1710 testcase.log.info( 1711 "File does not exist {}".format(btsnoop_last_log_path_on_device)) 1712 1713 1714def take_btsnoop_logs(android_devices, testcase, testname): 1715 """Pull btsnoop logs from an input list of android devices. 1716 1717 Args: 1718 android_devices: the list of Android devices to pull btsnoop logs from. 1719 testcase: Name of the test calss that triggered this snoop log. 1720 testname: Name of the test case that triggered this bug report. 1721 """ 1722 for a in android_devices: 1723 take_btsnoop_log(a, testcase, testname) 1724 1725 1726def teardown_n_advertisements(adv_ad, num_advertisements, 1727 advertise_callback_list): 1728 """Stop input number of advertisements on input Android device. 1729 1730 Args: 1731 adv_ad: The Android device to stop LE advertisements on. 1732 num_advertisements: The number of advertisements to stop. 1733 advertise_callback_list: The list of advertisement callbacks to stop. 1734 1735 Returns: 1736 True if successful, false if unsuccessful. 1737 """ 1738 for n in range(num_advertisements): 1739 adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n]) 1740 return True 1741 1742 1743def verify_server_and_client_connected(client_ad, server_ad, log=True): 1744 """Verify that input server and client Android devices are connected. 1745 1746 This code is under the assumption that there will only be 1747 a single connection. 1748 1749 Args: 1750 client_ad: the Android device to check number of active connections. 1751 server_ad: the Android device to check number of active connections. 1752 1753 Returns: 1754 True both server and client have at least 1 active connection, 1755 false if unsuccessful. 1756 """ 1757 test_result = True 1758 if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1759 if log: 1760 server_ad.log.error("No socket connections found on server.") 1761 test_result = False 1762 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1763 if log: 1764 client_ad.log.error("No socket connections found on client.") 1765 test_result = False 1766 return test_result 1767 1768 1769def wait_for_bluetooth_manager_state(droid, 1770 state=None, 1771 timeout=10, 1772 threshold=5): 1773 """ Waits for BlueTooth normalized state or normalized explicit state 1774 args: 1775 droid: droid device object 1776 state: expected BlueTooth state 1777 timeout: max timeout threshold 1778 threshold: list len of bt state 1779 Returns: 1780 True if successful, false if unsuccessful. 1781 """ 1782 all_states = [] 1783 get_state = lambda: droid.bluetoothGetLeState() 1784 start_time = time.time() 1785 while time.time() < start_time + timeout: 1786 all_states.append(get_state()) 1787 if len(all_states) >= threshold: 1788 # for any normalized state 1789 if state is None: 1790 if len(set(all_states[-threshold:])) == 1: 1791 log.info("State normalized {}".format( 1792 set(all_states[-threshold:]))) 1793 return True 1794 else: 1795 # explicit check against normalized state 1796 if set([state]).issubset(all_states[-threshold:]): 1797 return True 1798 time.sleep(0.5) 1799 log.error( 1800 "Bluetooth state fails to normalize" if state is None else 1801 "Failed to match bluetooth state, current state {} expected state {}". 1802 format(get_state(), state)) 1803 return False 1804 1805 1806def _wait_for_passkey_match(pri_ad, sec_ad): 1807 pri_pin, sec_pin = -1, 1 1808 pri_variant, sec_variant = -1, 1 1809 pri_pairing_req, sec_pairing_req = None, None 1810 try: 1811 pri_pairing_req = pri_ad.ed.pop_event( 1812 event_name="BluetoothActionPairingRequest", 1813 timeout=bt_default_timeout) 1814 pri_variant = pri_pairing_req["data"]["PairingVariant"] 1815 pri_pin = pri_pairing_req["data"]["Pin"] 1816 pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format( 1817 pri_pin, pri_variant)) 1818 sec_pairing_req = sec_ad.ed.pop_event( 1819 event_name="BluetoothActionPairingRequest", 1820 timeout=bt_default_timeout) 1821 sec_variant = sec_pairing_req["data"]["PairingVariant"] 1822 sec_pin = sec_pairing_req["data"]["Pin"] 1823 sec_ad.log.info("Secondary device received Pin: {}, Variant: {}".format( 1824 sec_pin, sec_variant)) 1825 except Empty as err: 1826 log.error("Wait for pin error: {}".format(err)) 1827 log.error("Pairing request state, Primary: {}, Secondary: {}".format( 1828 pri_pairing_req, sec_pairing_req)) 1829 return False 1830 if pri_variant == sec_variant == pairing_variant_passkey_confirmation: 1831 confirmation = pri_pin == sec_pin 1832 if confirmation: 1833 log.info("Pairing code matched, accepting connection") 1834 else: 1835 log.info("Pairing code mismatched, rejecting connection") 1836 pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1837 str(confirmation)) 1838 sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1839 str(confirmation)) 1840 if not confirmation: 1841 return False 1842 elif pri_variant != sec_variant: 1843 log.error("Pairing variant mismatched, abort connection") 1844 return False 1845 return True 1846 1847 1848def write_read_verify_data(client_ad, server_ad, msg, binary=False): 1849 """Verify that the client wrote data to the server Android device correctly. 1850 1851 Args: 1852 client_ad: the Android device to perform the write. 1853 server_ad: the Android device to read the data written. 1854 msg: the message to write. 1855 binary: if the msg arg is binary or not. 1856 1857 Returns: 1858 True if the data written matches the data read, false if not. 1859 """ 1860 client_ad.log.info("Write message.") 1861 try: 1862 if binary: 1863 client_ad.droid.bluetoothSocketConnWriteBinary(msg) 1864 else: 1865 client_ad.droid.bluetoothSocketConnWrite(msg) 1866 except Exception as err: 1867 client_ad.log.error("Failed to write data: {}".format(err)) 1868 return False 1869 server_ad.log.info("Read message.") 1870 try: 1871 if binary: 1872 read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip( 1873 "\r\n") 1874 else: 1875 read_msg = server_ad.droid.bluetoothSocketConnRead() 1876 except Exception as err: 1877 server_ad.log.error("Failed to read data: {}".format(err)) 1878 return False 1879 log.info("Verify message.") 1880 if msg != read_msg: 1881 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 1882 return False 1883 return True 1884 1885 1886class MediaControlOverSl4a(object): 1887 """Media control using sl4a facade for general purpose. 1888 1889 """ 1890 1891 def __init__(self, android_device, music_file): 1892 """Initialize the media_control class. 1893 1894 Args: 1895 android_dut: android_device object 1896 music_file: location of the music file 1897 """ 1898 self.android_device = android_device 1899 self.music_file = music_file 1900 1901 def play(self): 1902 """Play media. 1903 1904 """ 1905 self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file, 1906 'default', True) 1907 playing = self.android_device.droid.mediaIsPlaying() 1908 asserts.assert_true(playing, 1909 'Failed to play music %s' % self.music_file) 1910 1911 def pause(self): 1912 """Pause media. 1913 1914 """ 1915 self.android_device.droid.mediaPlayPause('default') 1916 paused = not self.android_device.droid.mediaIsPlaying() 1917 asserts.assert_true(paused, 1918 'Failed to pause music %s' % self.music_file) 1919 1920 def resume(self): 1921 """Resume media. 1922 1923 """ 1924 self.android_device.droid.mediaPlayStart('default') 1925 playing = self.android_device.droid.mediaIsPlaying() 1926 asserts.assert_true(playing, 1927 'Failed to play music %s' % self.music_file) 1928 1929 def stop(self): 1930 """Stop media. 1931 1932 """ 1933 self.android_device.droid.mediaPlayStop('default') 1934 stopped = not self.android_device.droid.mediaIsPlaying() 1935 asserts.assert_true(stopped, 1936 'Failed to stop music %s' % self.music_file) 1937