1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import os 19import random 20import re 21import string 22import threading 23import time 24from queue import Empty 25from subprocess import call 26 27from acts import asserts 28from acts_contrib.test_utils.bt.bt_constants import adv_fail 29from acts_contrib.test_utils.bt.bt_constants import adv_succ 30from acts_contrib.test_utils.bt.bt_constants import batch_scan_not_supported_list 31from acts_contrib.test_utils.bt.bt_constants import batch_scan_result 32from acts_contrib.test_utils.bt.bt_constants import bits_per_samples 33from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes 34from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers 35from acts_contrib.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed 36from acts_contrib.test_utils.bt.bt_constants import bluetooth_off 37from acts_contrib.test_utils.bt.bt_constants import bluetooth_on 38from acts_contrib.test_utils.bt.bt_constants import \ 39 bluetooth_profile_connection_state_changed 40from acts_contrib.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid 41from acts_contrib.test_utils.bt.bt_constants import bt_default_timeout 42from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants 43from acts_contrib.test_utils.bt.bt_constants import bt_profile_states 44from acts_contrib.test_utils.bt.bt_constants import bt_rfcomm_uuids 45from acts_contrib.test_utils.bt.bt_constants import bt_scan_mode_types 46from acts_contrib.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device 47from acts_contrib.test_utils.bt.bt_constants import btsnoop_log_path_on_device 48from acts_contrib.test_utils.bt.bt_constants import channel_modes 49from acts_contrib.test_utils.bt.bt_constants import codec_types 50from acts_contrib.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms 51from acts_contrib.test_utils.bt.bt_constants import default_rfcomm_timeout_ms 52from acts_contrib.test_utils.bt.bt_constants import hid_id_keyboard 53from acts_contrib.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation 54from acts_contrib.test_utils.bt.bt_constants import pan_connect_timeout 55from acts_contrib.test_utils.bt.bt_constants import sample_rates 56from acts_contrib.test_utils.bt.bt_constants import scan_result 57from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants 58from acts_contrib.test_utils.bt.bt_constants import small_timeout 59from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb 60from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection 61from acts.utils import exe_cmd 62 63from acts import utils 64 65log = logging 66 67advertisements_to_devices = {} 68 69 70class BtTestUtilsError(Exception): 71 pass 72 73 74def _add_android_device_to_dictionary(android_device, profile_list, 75 selector_dict): 76 """Adds the AndroidDevice and supported features to the selector dictionary 77 78 Args: 79 android_device: The Android device. 80 profile_list: The list of profiles the Android device supports. 81 """ 82 for profile in profile_list: 83 if profile in selector_dict and android_device not in selector_dict[ 84 profile]: 85 selector_dict[profile].append(android_device) 86 else: 87 selector_dict[profile] = [android_device] 88 89 90def bluetooth_enabled_check(ad, timeout_sec=5): 91 """Checks if the Bluetooth state is enabled, if not it will attempt to 92 enable it. 93 94 Args: 95 ad: The Android device list to enable Bluetooth on. 96 timeout_sec: number of seconds to wait for toggle to take effect. 97 98 Returns: 99 True if successful, false if unsuccessful. 100 """ 101 if not ad.droid.bluetoothCheckState(): 102 ad.droid.bluetoothToggleState(True) 103 expected_bluetooth_on_event_name = bluetooth_on 104 try: 105 ad.ed.pop_event(expected_bluetooth_on_event_name, 106 bt_default_timeout) 107 except Empty: 108 ad.log.info( 109 "Failed to toggle Bluetooth on(no broadcast received).") 110 # Try one more time to poke at the actual state. 111 if ad.droid.bluetoothCheckState(): 112 ad.log.info(".. actual state is ON") 113 return True 114 ad.log.error(".. actual state is OFF") 115 return False 116 end_time = time.time() + timeout_sec 117 while not ad.droid.bluetoothCheckState() and time.time() < end_time: 118 time.sleep(1) 119 return ad.droid.bluetoothCheckState() 120 121 122def check_device_supported_profiles(droid): 123 """Checks for Android device supported profiles. 124 125 Args: 126 droid: The droid object to query. 127 128 Returns: 129 A dictionary of supported profiles. 130 """ 131 profile_dict = {} 132 profile_dict['hid'] = droid.bluetoothHidIsReady() 133 profile_dict['hsp'] = droid.bluetoothHspIsReady() 134 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 135 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 136 profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady() 137 profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady() 138 profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady() 139 return profile_dict 140 141 142def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, 143 adv_android_device, adv_callback_list): 144 """Try to gracefully stop all scanning and advertising instances. 145 146 Args: 147 scn_android_device: The Android device that is actively scanning. 148 scn_callback_list: The scan callback id list that needs to be stopped. 149 adv_android_device: The Android device that is actively advertising. 150 adv_callback_list: The advertise callback id list that needs to be 151 stopped. 152 """ 153 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 154 adv_droid = adv_android_device.droid 155 try: 156 for scan_callback in scn_callback_list: 157 scan_droid.bleStopBleScan(scan_callback) 158 except Exception as err: 159 scn_android_device.log.debug( 160 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 161 err)) 162 reset_bluetooth([scn_android_device]) 163 try: 164 for adv_callback in adv_callback_list: 165 adv_droid.bleStopBleAdvertising(adv_callback) 166 except Exception as err: 167 adv_android_device.log.debug( 168 "Failed to stop LE advertisement... reseting Bluetooth. Error {}". 169 format(err)) 170 reset_bluetooth([adv_android_device]) 171 172 173def clear_bonded_devices(ad): 174 """Clear bonded devices from the input Android device. 175 176 Args: 177 ad: the Android device performing the connection. 178 Returns: 179 True if clearing bonded devices was successful, false if unsuccessful. 180 """ 181 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 182 while bonded_device_list: 183 device_address = bonded_device_list[0]['address'] 184 if not ad.droid.bluetoothUnbond(device_address): 185 log.error("Failed to unbond {} from {}".format( 186 device_address, ad.serial)) 187 return False 188 log.info("Successfully unbonded {} from {}".format( 189 device_address, ad.serial)) 190 #TODO: wait for BOND_STATE_CHANGED intent instead of waiting 191 time.sleep(1) 192 193 # If device was first connected using LE transport, after bonding it is 194 # accessible through it's LE address, and through it classic address. 195 # Unbonding it will unbond two devices representing different 196 # "addresses". Attempt to unbond such already unbonded devices will 197 # result in bluetoothUnbond returning false. 198 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 199 return True 200 201 202def connect_phone_to_headset(android, 203 headset, 204 timeout=bt_default_timeout, 205 connection_check_period=10): 206 """Connects android phone to bluetooth headset. 207 Headset object must have methods power_on and enter_pairing_mode, 208 and attribute mac_address. 209 210 Args: 211 android: AndroidDevice object with SL4A installed. 212 headset: Object with attribute mac_address and methods power_on and 213 enter_pairing_mode. 214 timeout: Seconds to wait for devices to connect. 215 connection_check_period: how often to check for connection once the 216 SL4A connect RPC has been sent. 217 Returns: 218 connected (bool): True if devices are paired and connected by end of 219 method. False otherwise. 220 """ 221 headset_mac_address = headset.mac_address 222 connected = is_a2dp_src_device_connected(android, headset_mac_address) 223 log.info('Devices connected before pair attempt: %s' % connected) 224 if not connected: 225 # Turn on headset and initiate pairing mode. 226 headset.enter_pairing_mode() 227 android.droid.bluetoothStartPairingHelper() 228 start_time = time.time() 229 # If already connected, skip pair and connect attempt. 230 while not connected and (time.time() - start_time < timeout): 231 bonded_info = android.droid.bluetoothGetBondedDevices() 232 connected_info = android.droid.bluetoothGetConnectedDevices() 233 if headset.mac_address not in [ 234 info["address"] for info in bonded_info 235 ]: 236 # Use SL4A to pair and connect with headset. 237 headset.enter_pairing_mode() 238 android.droid.bluetoothDiscoverAndBond(headset_mac_address) 239 elif headset.mac_address not in [ 240 info["address"] for info in connected_info 241 ]: 242 #Device is bonded but not connected 243 android.droid.bluetoothConnectBonded(headset_mac_address) 244 else: 245 #Headset is connected, but A2DP profile is not 246 android.droid.bluetoothA2dpConnect(headset_mac_address) 247 log.info('Waiting for connection...') 248 time.sleep(connection_check_period) 249 # Check for connection. 250 connected = is_a2dp_src_device_connected(android, headset_mac_address) 251 log.info('Devices connected after pair attempt: %s' % connected) 252 return connected 253 254def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2): 255 """Connects pri droid to secondary droid. 256 257 Args: 258 pri_ad: AndroidDroid initiating connection 259 sec_ad: AndroidDroid accepting connection 260 profiles_set: Set of profiles to be connected 261 attempts: Number of attempts to try until failure. 262 263 Returns: 264 Pass if True 265 Fail if False 266 """ 267 device_addr = sec_ad.droid.bluetoothGetLocalAddress() 268 # Allows extra time for the SDP records to be updated. 269 time.sleep(2) 270 curr_attempts = 0 271 while curr_attempts < attempts: 272 log.info("connect_pri_to_sec curr attempt {} total {}".format( 273 curr_attempts, attempts)) 274 if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 275 return True 276 curr_attempts += 1 277 log.error("connect_pri_to_sec failed to connect after {} attempts".format( 278 attempts)) 279 return False 280 281 282def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 283 """Connects pri droid to secondary droid. 284 285 Args: 286 pri_ad: AndroidDroid initiating connection. 287 sec_ad: AndroidDroid accepting connection. 288 profiles_set: Set of profiles to be connected. 289 290 Returns: 291 True of connection is successful, false if unsuccessful. 292 """ 293 # Check if we support all profiles. 294 supported_profiles = bt_profile_constants.values() 295 for profile in profiles_set: 296 if profile not in supported_profiles: 297 pri_ad.log.info("Profile {} is not supported list {}".format( 298 profile, supported_profiles)) 299 return False 300 301 # First check that devices are bonded. 302 paired = False 303 for paired_device in pri_ad.droid.bluetoothGetBondedDevices(): 304 if paired_device['address'] == \ 305 sec_ad.droid.bluetoothGetLocalAddress(): 306 paired = True 307 break 308 309 if not paired: 310 pri_ad.log.error("Not paired to {}".format(sec_ad.serial)) 311 return False 312 313 # Now try to connect them, the following call will try to initiate all 314 # connections. 315 pri_ad.droid.bluetoothConnectBonded( 316 sec_ad.droid.bluetoothGetLocalAddress()) 317 318 end_time = time.time() + 10 319 profile_connected = set() 320 sec_addr = sec_ad.droid.bluetoothGetLocalAddress() 321 pri_ad.log.info("Profiles to be connected {}".format(profiles_set)) 322 # First use APIs to check profile connection state 323 while (time.time() < end_time 324 and not profile_connected.issuperset(profiles_set)): 325 if (bt_profile_constants['headset_client'] not in profile_connected 326 and bt_profile_constants['headset_client'] in profiles_set): 327 if is_hfp_client_device_connected(pri_ad, sec_addr): 328 profile_connected.add(bt_profile_constants['headset_client']) 329 if (bt_profile_constants['a2dp'] not in profile_connected 330 and bt_profile_constants['a2dp'] in profiles_set): 331 if is_a2dp_src_device_connected(pri_ad, sec_addr): 332 profile_connected.add(bt_profile_constants['a2dp']) 333 if (bt_profile_constants['a2dp_sink'] not in profile_connected 334 and bt_profile_constants['a2dp_sink'] in profiles_set): 335 if is_a2dp_snk_device_connected(pri_ad, sec_addr): 336 profile_connected.add(bt_profile_constants['a2dp_sink']) 337 if (bt_profile_constants['map_mce'] not in profile_connected 338 and bt_profile_constants['map_mce'] in profiles_set): 339 if is_map_mce_device_connected(pri_ad, sec_addr): 340 profile_connected.add(bt_profile_constants['map_mce']) 341 if (bt_profile_constants['map'] not in profile_connected 342 and bt_profile_constants['map'] in profiles_set): 343 if is_map_mse_device_connected(pri_ad, sec_addr): 344 profile_connected.add(bt_profile_constants['map']) 345 time.sleep(0.1) 346 # If APIs fail, try to find the connection broadcast receiver. 347 while not profile_connected.issuperset(profiles_set): 348 try: 349 profile_event = pri_ad.ed.pop_event( 350 bluetooth_profile_connection_state_changed, 351 bt_default_timeout + 10) 352 pri_ad.log.info("Got event {}".format(profile_event)) 353 except Exception: 354 pri_ad.log.error("Did not get {} profiles left {}".format( 355 bluetooth_profile_connection_state_changed, profile_connected)) 356 return False 357 358 profile = profile_event['data']['profile'] 359 state = profile_event['data']['state'] 360 device_addr = profile_event['data']['addr'] 361 if state == bt_profile_states['connected'] and \ 362 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 363 profile_connected.add(profile) 364 pri_ad.log.info( 365 "Profiles connected until now {}".format(profile_connected)) 366 # Failure happens inside the while loop. If we came here then we already 367 # connected. 368 return True 369 370 371def determine_max_advertisements(android_device): 372 """Determines programatically how many advertisements the Android device 373 supports. 374 375 Args: 376 android_device: The Android device to determine max advertisements of. 377 378 Returns: 379 The maximum advertisement count. 380 """ 381 android_device.log.info( 382 "Determining number of maximum concurrent advertisements...") 383 advertisement_count = 0 384 bt_enabled = False 385 expected_bluetooth_on_event_name = bluetooth_on 386 if not android_device.droid.bluetoothCheckState(): 387 android_device.droid.bluetoothToggleState(True) 388 try: 389 android_device.ed.pop_event(expected_bluetooth_on_event_name, 390 bt_default_timeout) 391 except Exception: 392 android_device.log.info( 393 "Failed to toggle Bluetooth on(no broadcast received).") 394 # Try one more time to poke at the actual state. 395 if android_device.droid.bluetoothCheckState() is True: 396 android_device.log.info(".. actual state is ON") 397 else: 398 android_device.log.error( 399 "Failed to turn Bluetooth on. Setting default advertisements to 1" 400 ) 401 advertisement_count = -1 402 return advertisement_count 403 advertise_callback_list = [] 404 advertise_data = android_device.droid.bleBuildAdvertiseData() 405 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 406 while (True): 407 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 408 advertise_callback_list.append(advertise_callback) 409 410 android_device.droid.bleStartBleAdvertising(advertise_callback, 411 advertise_data, 412 advertise_settings) 413 414 regex = "(" + adv_succ.format( 415 advertise_callback) + "|" + adv_fail.format( 416 advertise_callback) + ")" 417 # wait for either success or failure event 418 evt = android_device.ed.pop_events(regex, bt_default_timeout, 419 small_timeout) 420 if evt[0]["name"] == adv_succ.format(advertise_callback): 421 advertisement_count += 1 422 android_device.log.info( 423 "Advertisement {} started.".format(advertisement_count)) 424 else: 425 error = evt[0]["data"]["Error"] 426 if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS": 427 android_device.log.info( 428 "Advertisement failed to start. Reached max " + 429 "advertisements at {}".format(advertisement_count)) 430 break 431 else: 432 raise BtTestUtilsError( 433 "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," + 434 " but received bad error code {}".format(error)) 435 try: 436 for adv in advertise_callback_list: 437 android_device.droid.bleStopBleAdvertising(adv) 438 except Exception: 439 android_device.log.error( 440 "Failed to stop advertisingment, resetting Bluetooth.") 441 reset_bluetooth([android_device]) 442 return advertisement_count 443 444 445def disable_bluetooth(droid): 446 """Disable Bluetooth on input Droid object. 447 448 Args: 449 droid: The droid object to disable Bluetooth on. 450 451 Returns: 452 True if successful, false if unsuccessful. 453 """ 454 if droid.bluetoothCheckState() is True: 455 droid.bluetoothToggleState(False) 456 if droid.bluetoothCheckState() is True: 457 log.error("Failed to toggle Bluetooth off.") 458 return False 459 return True 460 461 462def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): 463 """ 464 Disconnect primary from secondary on a specific set of profiles 465 Args: 466 pri_ad - Primary android_device initiating disconnection 467 sec_ad - Secondary android droid (sl4a interface to keep the 468 method signature the same connect_pri_to_sec above) 469 profiles_list - List of profiles we want to disconnect from 470 471 Returns: 472 True on Success 473 False on Failure 474 """ 475 # Sanity check to see if all the profiles in the given set is supported 476 supported_profiles = bt_profile_constants.values() 477 for profile in profiles_list: 478 if profile not in supported_profiles: 479 pri_ad.log.info("Profile {} is not in supported list {}".format( 480 profile, supported_profiles)) 481 return False 482 483 pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) 484 # Disconnecting on a already disconnected profile is a nop, 485 # so not checking for the connection state 486 try: 487 pri_ad.droid.bluetoothDisconnectConnectedProfile( 488 sec_ad.droid.bluetoothGetLocalAddress(), profiles_list) 489 except Exception as err: 490 pri_ad.log.error( 491 "Exception while trying to disconnect profile(s) {}: {}".format( 492 profiles_list, err)) 493 return False 494 495 profile_disconnected = set() 496 pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) 497 498 while not profile_disconnected.issuperset(profiles_list): 499 try: 500 profile_event = pri_ad.ed.pop_event( 501 bluetooth_profile_connection_state_changed, bt_default_timeout) 502 pri_ad.log.info("Got event {}".format(profile_event)) 503 except Exception as e: 504 pri_ad.log.error( 505 "Did not disconnect from Profiles. Reason {}".format(e)) 506 return False 507 508 profile = profile_event['data']['profile'] 509 state = profile_event['data']['state'] 510 device_addr = profile_event['data']['addr'] 511 512 if state == bt_profile_states['disconnected'] and \ 513 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 514 profile_disconnected.add(profile) 515 pri_ad.log.info( 516 "Profiles disconnected so far {}".format(profile_disconnected)) 517 518 return True 519 520 521def enable_bluetooth(droid, ed): 522 if droid.bluetoothCheckState() is True: 523 return True 524 525 droid.bluetoothToggleState(True) 526 expected_bluetooth_on_event_name = bluetooth_on 527 try: 528 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 529 except Exception: 530 log.info("Failed to toggle Bluetooth on (no broadcast received)") 531 if droid.bluetoothCheckState() is True: 532 log.info(".. actual state is ON") 533 return True 534 log.info(".. actual state is OFF") 535 return False 536 537 return True 538 539 540def factory_reset_bluetooth(android_devices): 541 """Clears Bluetooth stack of input Android device list. 542 543 Args: 544 android_devices: The Android device list to reset Bluetooth 545 546 Returns: 547 True if successful, false if unsuccessful. 548 """ 549 for a in android_devices: 550 droid, ed = a.droid, a.ed 551 a.log.info("Reset state of bluetooth on device.") 552 if not bluetooth_enabled_check(a): 553 return False 554 # TODO: remove device unbond b/79418045 555 # Temporary solution to ensure all devices are unbonded 556 bonded_devices = droid.bluetoothGetBondedDevices() 557 for b in bonded_devices: 558 a.log.info("Removing bond for device {}".format(b['address'])) 559 droid.bluetoothUnbond(b['address']) 560 561 droid.bluetoothFactoryReset() 562 wait_for_bluetooth_manager_state(droid) 563 if not enable_bluetooth(droid, ed): 564 return False 565 return True 566 567 568def generate_ble_advertise_objects(droid): 569 """Generate generic LE advertise objects. 570 571 Args: 572 droid: The droid object to generate advertise LE objects from. 573 574 Returns: 575 advertise_callback: The generated advertise callback id. 576 advertise_data: The generated advertise data id. 577 advertise_settings: The generated advertise settings id. 578 """ 579 advertise_callback = droid.bleGenBleAdvertiseCallback() 580 advertise_data = droid.bleBuildAdvertiseData() 581 advertise_settings = droid.bleBuildAdvertiseSettings() 582 return advertise_callback, advertise_data, advertise_settings 583 584 585def generate_ble_scan_objects(droid): 586 """Generate generic LE scan objects. 587 588 Args: 589 droid: The droid object to generate LE scan objects from. 590 591 Returns: 592 filter_list: The generated scan filter list id. 593 scan_settings: The generated scan settings id. 594 scan_callback: The generated scan callback id. 595 """ 596 filter_list = droid.bleGenFilterList() 597 scan_settings = droid.bleBuildScanSetting() 598 scan_callback = droid.bleGenScanCallback() 599 return filter_list, scan_settings, scan_callback 600 601 602def generate_id_by_size(size, 603 chars=(string.ascii_lowercase + 604 string.ascii_uppercase + string.digits)): 605 """Generate random ascii characters of input size and input char types 606 607 Args: 608 size: Input size of string. 609 chars: (Optional) Chars to use in generating a random string. 610 611 Returns: 612 String of random input chars at the input size. 613 """ 614 return ''.join(random.choice(chars) for _ in range(size)) 615 616 617def get_advanced_droid_list(android_devices): 618 """Add max_advertisement and batch_scan_supported attributes to input 619 Android devices 620 621 This will programatically determine maximum LE advertisements of each 622 input Android device. 623 624 Args: 625 android_devices: The Android devices to setup. 626 627 Returns: 628 List of Android devices with new attribtues. 629 """ 630 droid_list = [] 631 for a in android_devices: 632 d, e = a.droid, a.ed 633 model = d.getBuildModel() 634 max_advertisements = 1 635 batch_scan_supported = True 636 if model in advertisements_to_devices.keys(): 637 max_advertisements = advertisements_to_devices[model] 638 else: 639 max_advertisements = determine_max_advertisements(a) 640 max_tries = 3 641 # Retry to calculate max advertisements 642 while max_advertisements == -1 and max_tries > 0: 643 a.log.info( 644 "Attempts left to determine max advertisements: {}".format( 645 max_tries)) 646 max_advertisements = determine_max_advertisements(a) 647 max_tries -= 1 648 advertisements_to_devices[model] = max_advertisements 649 if model in batch_scan_not_supported_list: 650 batch_scan_supported = False 651 role = { 652 'droid': d, 653 'ed': e, 654 'max_advertisements': max_advertisements, 655 'batch_scan_supported': batch_scan_supported 656 } 657 droid_list.append(role) 658 return droid_list 659 660 661def get_bluetooth_crash_count(android_device): 662 out = android_device.adb.shell("dumpsys bluetooth_manager") 663 return int(re.search("crashed(.*\d)", out).group(1)) 664 665 666def read_otp(ad): 667 """Reads and parses the OTP output to return TX power backoff 668 669 Reads the OTP registers from the phone, parses them to return a 670 dict of TX power backoffs for different power levels 671 672 Args: 673 ad : android device object 674 675 Returns : 676 otp_dict : power backoff dict 677 """ 678 679 ad.adb.shell('svc bluetooth disable') 680 time.sleep(2) 681 otp_output = ad.adb.shell('bluetooth_sar_test -r') 682 ad.adb.shell('svc bluetooth enable') 683 time.sleep(2) 684 otp_dict = { 685 "BR": { 686 "10": 0, 687 "9": 0, 688 "8": 0 689 }, 690 "EDR": { 691 "10": 0, 692 "9": 0, 693 "8": 0 694 }, 695 "BLE": { 696 "10": 0, 697 "9": 0, 698 "8": 0 699 } 700 } 701 702 otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]' 703 704 for key in otp_dict: 705 bank_list = re.findall("{}{}".format(key, otp_regex), otp_output) 706 for bank_tuple in bank_list: 707 if ('0', '0', '0') != bank_tuple: 708 [otp_dict[key]["10"], otp_dict[key]["9"], 709 otp_dict[key]["8"]] = bank_tuple 710 return otp_dict 711 712 713def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True): 714 """ Function to get the bt metric from logcat. 715 716 Captures logcat for the specified duration and returns the bqr results. 717 Takes list of android objects as input. If a single android object is given, 718 converts it into a list. 719 720 Args: 721 ad_list: list of android_device objects 722 duration: time duration (seconds) for which the logcat is parsed. 723 tag: tag to be appended to the logcat dump. 724 processed: flag to process bqr output. 725 726 Returns: 727 metrics_dict: dict of metrics for each android device. 728 """ 729 730 # Defining bqr quantitites and their regex to extract 731 regex_dict = { 732 "vsp_txpl": "VSP_TxPL:\s(\S+)", 733 "pwlv": "PwLv:\s(\S+)", 734 "rssi": "RSSI:\s[-](\d+)" 735 } 736 metrics_dict = {"rssi": {}, "pwlv": {}, "vsp_txpl": {}} 737 738 # Converting a single android device object to list 739 if not isinstance(ad_list, list): 740 ad_list = [ad_list] 741 742 #Time sync with the test machine 743 for ad in ad_list: 744 ad.droid.setTime(int(round(time.time() * 1000))) 745 time.sleep(0.5) 746 747 begin_time = utils.get_current_epoch_time() 748 time.sleep(duration) 749 end_time = utils.get_current_epoch_time() 750 751 for ad in ad_list: 752 bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time) 753 bqr_tag = "Handle:" 754 755 # Extracting supporting bqr quantities 756 for metric, regex in regex_dict.items(): 757 bqr_metric = [] 758 file_bt_log = open(bt_rssi_log, "r") 759 for line in file_bt_log: 760 if bqr_tag in line: 761 if re.findall(regex, line): 762 m = re.findall(regex, line)[0].strip(",") 763 bqr_metric.append(m) 764 metrics_dict[metric][ad.serial] = bqr_metric 765 766 # Ensures back-compatibility for vsp_txpl enabled DUTs 767 if metrics_dict["vsp_txpl"][ad.serial]: 768 metrics_dict["pwlv"][ad.serial] = metrics_dict["vsp_txpl"][ 769 ad.serial] 770 771 # Formatting the raw data 772 metrics_dict["rssi"][ad.serial] = [ 773 (-1) * int(x) for x in metrics_dict["rssi"][ad.serial] 774 ] 775 metrics_dict["pwlv"][ad.serial] = [ 776 int(x, 16) for x in metrics_dict["pwlv"][ad.serial] 777 ] 778 779 # Processing formatted data if processing is required 780 if processed: 781 # Computes the average RSSI 782 metrics_dict["rssi"][ad.serial] = round( 783 sum(metrics_dict["rssi"][ad.serial]) / 784 len(metrics_dict["rssi"][ad.serial]), 2) 785 # Returns last noted value for power level 786 metrics_dict["pwlv"][ad.serial] = float( 787 sum(metrics_dict["pwlv"][ad.serial]) / 788 len(metrics_dict["pwlv"][ad.serial])) 789 790 return metrics_dict 791 792 793def get_bt_rssi(ad, duration=1, processed=True): 794 """Function to get average bt rssi from logcat. 795 796 This function returns the average RSSI for the given duration. RSSI values are 797 extracted from BQR. 798 799 Args: 800 ad: (list of) android_device object. 801 duration: time duration(seconds) for which logcat is parsed. 802 803 Returns: 804 avg_rssi: average RSSI on each android device for the given duration. 805 """ 806 function_tag = "get_bt_rssi" 807 bqr_results = get_bt_metric(ad, 808 duration, 809 tag=function_tag, 810 processed=processed) 811 return bqr_results["rssi"] 812 813 814def enable_bqr( 815 ad_list, 816 bqr_interval=10, 817 bqr_event_mask=15, 818): 819 """Sets up BQR reporting. 820 821 Sets up BQR to report BT metrics at the requested frequency and toggles 822 airplane mode for the bqr settings to take effect. 823 824 Args: 825 ad_list: an android_device or list of android devices. 826 """ 827 # Converting a single android device object to list 828 if not isinstance(ad_list, list): 829 ad_list = [ad_list] 830 831 for ad in ad_list: 832 #Setting BQR parameters 833 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 834 bqr_event_mask)) 835 ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format( 836 bqr_interval)) 837 838 ## Toggle airplane mode 839 ad.droid.connectivityToggleAirplaneMode(True) 840 ad.droid.connectivityToggleAirplaneMode(False) 841 842 843def disable_bqr(ad_list): 844 """Disables BQR reporting. 845 846 Args: 847 ad_list: an android_device or list of android devices. 848 """ 849 # Converting a single android device object to list 850 if not isinstance(ad_list, list): 851 ad_list = [ad_list] 852 853 DISABLE_BQR_MASK = 0 854 855 for ad in ad_list: 856 #Disabling BQR 857 ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( 858 DISABLE_BQR_MASK)) 859 860 ## Toggle airplane mode 861 ad.droid.connectivityToggleAirplaneMode(True) 862 ad.droid.connectivityToggleAirplaneMode(False) 863 864 865def get_device_selector_dictionary(android_device_list): 866 """Create a dictionary of Bluetooth features vs Android devices. 867 868 Args: 869 android_device_list: The list of Android devices. 870 Returns: 871 A dictionary of profiles/features to Android devices. 872 """ 873 selector_dict = {} 874 for ad in android_device_list: 875 uuids = ad.droid.bluetoothGetLocalUuids() 876 877 for profile, uuid_const in sig_uuid_constants.items(): 878 uuid_check = sig_uuid_constants['BASE_UUID'].format( 879 uuid_const).lower() 880 if uuids and uuid_check in uuids: 881 if profile in selector_dict: 882 selector_dict[profile].append(ad) 883 else: 884 selector_dict[profile] = [ad] 885 886 # Various services may not be active during BT startup. 887 # If the device can be identified through adb shell pm list features 888 # then try to add them to the appropriate profiles / features. 889 890 # Android TV. 891 if "feature:android.hardware.type.television" in ad.features: 892 ad.log.info("Android TV device found.") 893 supported_profiles = ['AudioSink'] 894 _add_android_device_to_dictionary(ad, supported_profiles, 895 selector_dict) 896 897 # Android Auto 898 elif "feature:android.hardware.type.automotive" in ad.features: 899 ad.log.info("Android Auto device found.") 900 # Add: AudioSink , A/V_RemoteControl, 901 supported_profiles = [ 902 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' 903 ] 904 _add_android_device_to_dictionary(ad, supported_profiles, 905 selector_dict) 906 # Android Wear 907 elif "feature:android.hardware.type.watch" in ad.features: 908 ad.log.info("Android Wear device found.") 909 supported_profiles = [] 910 _add_android_device_to_dictionary(ad, supported_profiles, 911 selector_dict) 912 # Android Phone 913 elif "feature:android.hardware.telephony" in ad.features: 914 ad.log.info("Android Phone device found.") 915 # Add: AudioSink 916 supported_profiles = [ 917 'AudioSource', 'A/V_RemoteControlTarget', 918 'Message Access Server' 919 ] 920 _add_android_device_to_dictionary(ad, supported_profiles, 921 selector_dict) 922 return selector_dict 923 924 925def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 926 """Start generic advertisement and get it's mac address by LE scanning. 927 928 Args: 929 scan_ad: The Android device to use as the scanner. 930 adv_ad: The Android device to use as the advertiser. 931 932 Returns: 933 mac_address: The mac address of the advertisement. 934 advertise_callback: The advertise callback id of the active 935 advertisement. 936 """ 937 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 938 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 939 ble_advertise_settings_modes['low_latency']) 940 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 941 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 942 ble_advertise_settings_tx_powers['high']) 943 advertise_callback, advertise_data, advertise_settings = ( 944 generate_ble_advertise_objects(adv_ad.droid)) 945 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 946 advertise_settings) 947 try: 948 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 949 bt_default_timeout) 950 except Empty as err: 951 raise BtTestUtilsError( 952 "Advertiser did not start successfully {}".format(err)) 953 filter_list = scan_ad.droid.bleGenFilterList() 954 scan_settings = scan_ad.droid.bleBuildScanSetting() 955 scan_callback = scan_ad.droid.bleGenScanCallback() 956 scan_ad.droid.bleSetScanFilterDeviceName( 957 adv_ad.droid.bluetoothGetLocalName()) 958 scan_ad.droid.bleBuildScanFilter(filter_list) 959 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 960 try: 961 event = scan_ad.ed.pop_event( 962 "BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 963 except Empty as err: 964 raise BtTestUtilsError( 965 "Scanner did not find advertisement {}".format(err)) 966 mac_address = event['data']['Result']['deviceInfo']['address'] 967 return mac_address, advertise_callback, scan_callback 968 969 970def hid_device_send_key_data_report(host_id, device_ad, key, interval=1): 971 """Send a HID report simulating a 1-second keyboard press from host_ad to 972 device_ad 973 974 Args: 975 host_id: the Bluetooth MAC address or name of the HID host 976 device_ad: HID device 977 key: the key we want to send 978 interval: the interval between key press and key release 979 """ 980 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 981 hid_keyboard_report(key)) 982 time.sleep(interval) 983 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 984 hid_keyboard_report("00")) 985 986 987def hid_keyboard_report(key, modifier="00"): 988 """Get the HID keyboard report for the given key 989 990 Args: 991 key: the key we want 992 modifier: HID keyboard modifier bytes 993 Returns: 994 The byte array for the HID report. 995 """ 996 return str( 997 bytearray.fromhex(" ".join( 998 [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8") 999 1000 1001def is_a2dp_connected(sink, source): 1002 """ 1003 Convenience Function to see if the 2 devices are connected on 1004 A2dp. 1005 Args: 1006 sink: Audio Sink 1007 source: Audio Source 1008 Returns: 1009 True if Connected 1010 False if Not connected 1011 """ 1012 1013 devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices() 1014 for device in devices: 1015 sink.log.info("A2dp Connected device {}".format(device["name"])) 1016 if (device["address"] == source.droid.bluetoothGetLocalAddress()): 1017 return True 1018 return False 1019 1020 1021def is_a2dp_snk_device_connected(ad, addr): 1022 """Determines if an AndroidDevice has A2DP snk connectivity to input address 1023 1024 Args: 1025 ad: the Android device 1026 addr: the address that's expected 1027 Returns: 1028 True if connection was successful, false if unsuccessful. 1029 """ 1030 devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices() 1031 ad.log.info("Connected A2DP Sink devices: {}".format(devices)) 1032 if addr in {d['address'] for d in devices}: 1033 return True 1034 return False 1035 1036 1037def is_a2dp_src_device_connected(ad, addr): 1038 """Determines if an AndroidDevice has A2DP connectivity to input address 1039 1040 Args: 1041 ad: the Android device 1042 addr: the address that's expected 1043 Returns: 1044 True if connection was successful, false if unsuccessful. 1045 """ 1046 devices = ad.droid.bluetoothA2dpGetConnectedDevices() 1047 ad.log.info("Connected A2DP Source devices: {}".format(devices)) 1048 if addr in {d['address'] for d in devices}: 1049 return True 1050 return False 1051 1052 1053def is_hfp_client_device_connected(ad, addr): 1054 """Determines if an AndroidDevice has HFP connectivity to input address 1055 1056 Args: 1057 ad: the Android device 1058 addr: the address that's expected 1059 Returns: 1060 True if connection was successful, false if unsuccessful. 1061 """ 1062 devices = ad.droid.bluetoothHfpClientGetConnectedDevices() 1063 ad.log.info("Connected HFP Client devices: {}".format(devices)) 1064 if addr in {d['address'] for d in devices}: 1065 return True 1066 return False 1067 1068 1069def is_map_mce_device_connected(ad, addr): 1070 """Determines if an AndroidDevice has MAP MCE connectivity to input address 1071 1072 Args: 1073 ad: the Android device 1074 addr: the address that's expected 1075 Returns: 1076 True if connection was successful, false if unsuccessful. 1077 """ 1078 devices = ad.droid.bluetoothMapClientGetConnectedDevices() 1079 ad.log.info("Connected MAP MCE devices: {}".format(devices)) 1080 if addr in {d['address'] for d in devices}: 1081 return True 1082 return False 1083 1084 1085def is_map_mse_device_connected(ad, addr): 1086 """Determines if an AndroidDevice has MAP MSE connectivity to input address 1087 1088 Args: 1089 ad: the Android device 1090 addr: the address that's expected 1091 Returns: 1092 True if connection was successful, false if unsuccessful. 1093 """ 1094 devices = ad.droid.bluetoothMapGetConnectedDevices() 1095 ad.log.info("Connected MAP MSE devices: {}".format(devices)) 1096 if addr in {d['address'] for d in devices}: 1097 return True 1098 return False 1099 1100 1101def kill_bluetooth_process(ad): 1102 """Kill Bluetooth process on Android device. 1103 1104 Args: 1105 ad: Android device to kill BT process on. 1106 """ 1107 ad.log.info("Killing Bluetooth process.") 1108 pid = ad.adb.shell( 1109 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 1110 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 1111 1112 1113def log_energy_info(android_devices, state): 1114 """Logs energy info of input Android devices. 1115 1116 Args: 1117 android_devices: input Android device list to log energy info from. 1118 state: the input state to log. Usually 'Start' or 'Stop' for logging. 1119 1120 Returns: 1121 A logging string of the Bluetooth energy info reported. 1122 """ 1123 return_string = "{} Energy info collection:\n".format(state) 1124 # Bug: b/31966929 1125 return return_string 1126 1127 1128def orchestrate_and_verify_pan_connection(pan_dut, panu_dut): 1129 """Setups up a PAN conenction between two android devices. 1130 1131 Args: 1132 pan_dut: the Android device providing tethering services 1133 panu_dut: the Android device using the internet connection from the 1134 pan_dut 1135 Returns: 1136 True if PAN connection and verification is successful, 1137 false if unsuccessful. 1138 """ 1139 if not toggle_airplane_mode_by_adb(log, panu_dut, True): 1140 panu_dut.log.error("Failed to toggle airplane mode on") 1141 return False 1142 if not toggle_airplane_mode_by_adb(log, panu_dut, False): 1143 pan_dut.log.error("Failed to toggle airplane mode off") 1144 return False 1145 pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1146 panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 1147 if not bluetooth_enabled_check(panu_dut): 1148 return False 1149 if not bluetooth_enabled_check(pan_dut): 1150 return False 1151 pan_dut.droid.bluetoothPanSetBluetoothTethering(True) 1152 if not (pair_pri_to_sec(pan_dut, panu_dut)): 1153 return False 1154 if not pan_dut.droid.bluetoothPanIsTetheringOn(): 1155 pan_dut.log.error("Failed to enable Bluetooth tethering.") 1156 return False 1157 # Magic sleep needed to give the stack time in between bonding and 1158 # connecting the PAN profile. 1159 time.sleep(pan_connect_timeout) 1160 panu_dut.droid.bluetoothConnectBonded( 1161 pan_dut.droid.bluetoothGetLocalAddress()) 1162 if not verify_http_connection(log, panu_dut): 1163 panu_dut.log.error("Can't verify http connection on PANU device.") 1164 if not verify_http_connection(log, pan_dut): 1165 pan_dut.log.info( 1166 "Can't verify http connection on PAN service device") 1167 return False 1168 return True 1169 1170 1171def orchestrate_bluetooth_socket_connection( 1172 client_ad, 1173 server_ad, 1174 accept_timeout_ms=default_bluetooth_socket_timeout_ms, 1175 uuid=None): 1176 """Sets up the Bluetooth Socket connection between two Android devices. 1177 1178 Args: 1179 client_ad: the Android device performing the connection. 1180 server_ad: the Android device accepting the connection. 1181 Returns: 1182 True if connection was successful, false if unsuccessful. 1183 """ 1184 server_ad.droid.bluetoothStartPairingHelper() 1185 client_ad.droid.bluetoothStartPairingHelper() 1186 1187 server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( 1188 (bluetooth_socket_conn_test_uuid if uuid is None else uuid), 1189 accept_timeout_ms) 1190 client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( 1191 server_ad.droid.bluetoothGetLocalAddress(), 1192 (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) 1193 1194 end_time = time.time() + bt_default_timeout 1195 result = False 1196 test_result = True 1197 while time.time() < end_time: 1198 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0: 1199 test_result = True 1200 client_ad.log.info("Bluetooth socket Client Connection Active") 1201 break 1202 else: 1203 test_result = False 1204 time.sleep(1) 1205 if not test_result: 1206 client_ad.log.error( 1207 "Failed to establish a Bluetooth socket connection") 1208 return False 1209 return True 1210 1211 1212def orchestrate_rfcomm_connection(client_ad, 1213 server_ad, 1214 accept_timeout_ms=default_rfcomm_timeout_ms, 1215 uuid=None): 1216 """Sets up the RFCOMM connection between two Android devices. 1217 1218 Args: 1219 client_ad: the Android device performing the connection. 1220 server_ad: the Android device accepting the connection. 1221 Returns: 1222 True if connection was successful, false if unsuccessful. 1223 """ 1224 result = orchestrate_bluetooth_socket_connection( 1225 client_ad, server_ad, accept_timeout_ms, 1226 (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid)) 1227 1228 return result 1229 1230 1231def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): 1232 """Pairs pri droid to secondary droid. 1233 1234 Args: 1235 pri_ad: Android device initiating connection 1236 sec_ad: Android device accepting connection 1237 attempts: Number of attempts to try until failure. 1238 auto_confirm: Auto confirm passkey match for both devices 1239 1240 Returns: 1241 Pass if True 1242 Fail if False 1243 """ 1244 pri_ad.droid.bluetoothStartConnectionStateChangeMonitor( 1245 sec_ad.droid.bluetoothGetLocalAddress()) 1246 curr_attempts = 0 1247 while curr_attempts < attempts: 1248 if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1249 return True 1250 # Wait 2 seconds before unbound 1251 time.sleep(2) 1252 if not clear_bonded_devices(pri_ad): 1253 log.error( 1254 "Failed to clear bond for primary device at attempt {}".format( 1255 str(curr_attempts))) 1256 return False 1257 if not clear_bonded_devices(sec_ad): 1258 log.error( 1259 "Failed to clear bond for secondary device at attempt {}". 1260 format(str(curr_attempts))) 1261 return False 1262 # Wait 2 seconds after unbound 1263 time.sleep(2) 1264 curr_attempts += 1 1265 log.error("pair_pri_to_sec failed to connect after {} attempts".format( 1266 str(attempts))) 1267 return False 1268 1269 1270def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1271 # Enable discovery on sec_ad so that pri_ad can find it. 1272 # The timeout here is based on how much time it would take for two devices 1273 # to pair with each other once pri_ad starts seeing devices. 1274 pri_droid = pri_ad.droid 1275 sec_droid = sec_ad.droid 1276 pri_ad.ed.clear_all_events() 1277 sec_ad.ed.clear_all_events() 1278 log.info("Bonding device {} to {}".format( 1279 pri_droid.bluetoothGetLocalAddress(), 1280 sec_droid.bluetoothGetLocalAddress())) 1281 sec_droid.bluetoothMakeDiscoverable(bt_default_timeout) 1282 target_address = sec_droid.bluetoothGetLocalAddress() 1283 log.debug("Starting paring helper on each device") 1284 pri_droid.bluetoothStartPairingHelper(auto_confirm) 1285 sec_droid.bluetoothStartPairingHelper(auto_confirm) 1286 pri_ad.log.info("Primary device starting discovery and executing bond") 1287 result = pri_droid.bluetoothDiscoverAndBond(target_address) 1288 if not auto_confirm: 1289 if not _wait_for_passkey_match(pri_ad, sec_ad): 1290 return False 1291 # Loop until we have bonded successfully or timeout. 1292 end_time = time.time() + bt_default_timeout 1293 pri_ad.log.info("Verifying devices are bonded") 1294 while time.time() < end_time: 1295 bonded_devices = pri_droid.bluetoothGetBondedDevices() 1296 bonded = False 1297 for d in bonded_devices: 1298 if d['address'] == target_address: 1299 pri_ad.log.info("Successfully bonded to device") 1300 return True 1301 time.sleep(0.1) 1302 # Timed out trying to bond. 1303 pri_ad.log.info("Failed to bond devices.") 1304 return False 1305 1306 1307def reset_bluetooth(android_devices): 1308 """Resets Bluetooth state of input Android device list. 1309 1310 Args: 1311 android_devices: The Android device list to reset Bluetooth state on. 1312 1313 Returns: 1314 True if successful, false if unsuccessful. 1315 """ 1316 for a in android_devices: 1317 droid, ed = a.droid, a.ed 1318 a.log.info("Reset state of bluetooth on device.") 1319 if droid.bluetoothCheckState() is True: 1320 droid.bluetoothToggleState(False) 1321 expected_bluetooth_off_event_name = bluetooth_off 1322 try: 1323 ed.pop_event(expected_bluetooth_off_event_name, 1324 bt_default_timeout) 1325 except Exception: 1326 a.log.error("Failed to toggle Bluetooth off.") 1327 return False 1328 # temp sleep for b/17723234 1329 time.sleep(3) 1330 if not bluetooth_enabled_check(a): 1331 return False 1332 return True 1333 1334 1335def scan_and_verify_n_advertisements(scn_ad, max_advertisements): 1336 """Verify that input number of advertisements can be found from the scanning 1337 Android device. 1338 1339 Args: 1340 scn_ad: The Android device to start LE scanning on. 1341 max_advertisements: The number of advertisements the scanner expects to 1342 find. 1343 1344 Returns: 1345 True if successful, false if unsuccessful. 1346 """ 1347 test_result = False 1348 address_list = [] 1349 filter_list = scn_ad.droid.bleGenFilterList() 1350 scn_ad.droid.bleBuildScanFilter(filter_list) 1351 scan_settings = scn_ad.droid.bleBuildScanSetting() 1352 scan_callback = scn_ad.droid.bleGenScanCallback() 1353 scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 1354 start_time = time.time() 1355 while (start_time + bt_default_timeout) > time.time(): 1356 event = None 1357 try: 1358 event = scn_ad.ed.pop_event(scan_result.format(scan_callback), 1359 bt_default_timeout) 1360 except Empty as error: 1361 raise BtTestUtilsError( 1362 "Failed to find scan event: {}".format(error)) 1363 address = event['data']['Result']['deviceInfo']['address'] 1364 if address not in address_list: 1365 address_list.append(address) 1366 if len(address_list) == max_advertisements: 1367 test_result = True 1368 break 1369 scn_ad.droid.bleStopBleScan(scan_callback) 1370 return test_result 1371 1372 1373def set_bluetooth_codec(android_device, 1374 codec_type, 1375 sample_rate, 1376 bits_per_sample, 1377 channel_mode, 1378 codec_specific_1=0): 1379 """Sets the A2DP codec configuration on the AndroidDevice. 1380 1381 Args: 1382 android_device (acts.controllers.android_device.AndroidDevice): the 1383 android device for which to switch the codec. 1384 codec_type (str): the desired codec type. Must be a key in 1385 bt_constants.codec_types. 1386 sample_rate (str): the desired sample rate. Must be a key in 1387 bt_constants.sample_rates. 1388 bits_per_sample (str): the desired bits per sample. Must be a key in 1389 bt_constants.bits_per_samples. 1390 channel_mode (str): the desired channel mode. Must be a key in 1391 bt_constants.channel_modes. 1392 codec_specific_1 (int): the desired bit rate (quality) for LDAC codec. 1393 Returns: 1394 bool: True if the codec config was successfully changed to the desired 1395 values. Else False. 1396 """ 1397 message = ("Set Android Device A2DP Bluetooth codec configuration:\n" 1398 "\tCodec: {codec_type}\n" 1399 "\tSample Rate: {sample_rate}\n" 1400 "\tBits per Sample: {bits_per_sample}\n" 1401 "\tChannel Mode: {channel_mode}".format( 1402 codec_type=codec_type, 1403 sample_rate=sample_rate, 1404 bits_per_sample=bits_per_sample, 1405 channel_mode=channel_mode)) 1406 android_device.log.info(message) 1407 1408 # Send SL4A command 1409 droid, ed = android_device.droid, android_device.ed 1410 if not droid.bluetoothA2dpSetCodecConfigPreference( 1411 codec_types[codec_type], sample_rates[str(sample_rate)], 1412 bits_per_samples[str(bits_per_sample)], 1413 channel_modes[channel_mode], codec_specific_1): 1414 android_device.log.warning( 1415 "SL4A command returned False. Codec was not " 1416 "changed.") 1417 else: 1418 try: 1419 ed.pop_event(bluetooth_a2dp_codec_config_changed, 1420 bt_default_timeout) 1421 except Exception: 1422 android_device.log.warning("SL4A event not registered. Codec " 1423 "may not have been changed.") 1424 1425 # Validate codec value through ADB 1426 # TODO (aidanhb): validate codec more robustly using SL4A 1427 command = "dumpsys bluetooth_manager | grep -i 'current codec'" 1428 out = android_device.adb.shell(command) 1429 split_out = out.split(": ") 1430 if len(split_out) != 2: 1431 android_device.log.warning("Could not verify codec config change " 1432 "through ADB.") 1433 elif split_out[1].strip().upper() != codec_type: 1434 android_device.log.error("Codec config was not changed.\n" 1435 "\tExpected codec: {exp}\n" 1436 "\tActual codec: {act}".format( 1437 exp=codec_type, act=split_out[1].strip())) 1438 return False 1439 android_device.log.info("Bluetooth codec successfully changed.") 1440 return True 1441 1442 1443def set_bt_scan_mode(ad, scan_mode_value): 1444 """Set Android device's Bluetooth scan mode. 1445 1446 Args: 1447 ad: The Android device to set the scan mode on. 1448 scan_mode_value: The value to set the scan mode to. 1449 1450 Returns: 1451 True if successful, false if unsuccessful. 1452 """ 1453 droid, ed = ad.droid, ad.ed 1454 if scan_mode_value == bt_scan_mode_types['state_off']: 1455 disable_bluetooth(droid) 1456 scan_mode = droid.bluetoothGetScanMode() 1457 reset_bluetooth([ad]) 1458 if scan_mode != scan_mode_value: 1459 return False 1460 elif scan_mode_value == bt_scan_mode_types['none']: 1461 droid.bluetoothMakeUndiscoverable() 1462 scan_mode = droid.bluetoothGetScanMode() 1463 if scan_mode != scan_mode_value: 1464 return False 1465 elif scan_mode_value == bt_scan_mode_types['connectable']: 1466 droid.bluetoothMakeUndiscoverable() 1467 droid.bluetoothMakeConnectable() 1468 scan_mode = droid.bluetoothGetScanMode() 1469 if scan_mode != scan_mode_value: 1470 return False 1471 elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']): 1472 droid.bluetoothMakeDiscoverable() 1473 scan_mode = droid.bluetoothGetScanMode() 1474 if scan_mode != scan_mode_value: 1475 return False 1476 else: 1477 # invalid scan mode 1478 return False 1479 return True 1480 1481 1482def set_device_name(droid, name): 1483 """Set and check Bluetooth local name on input droid object. 1484 1485 Args: 1486 droid: Droid object to set local name on. 1487 name: the Bluetooth local name to set. 1488 1489 Returns: 1490 True if successful, false if unsuccessful. 1491 """ 1492 droid.bluetoothSetLocalName(name) 1493 time.sleep(2) 1494 droid_name = droid.bluetoothGetLocalName() 1495 if droid_name != name: 1496 return False 1497 return True 1498 1499 1500def set_profile_priority(host_ad, client_ad, profiles, priority): 1501 """Sets the priority of said profile(s) on host_ad for client_ad""" 1502 for profile in profiles: 1503 host_ad.log.info("Profile {} on {} for {} set to priority {}".format( 1504 profile, host_ad.droid.bluetoothGetLocalName(), 1505 client_ad.droid.bluetoothGetLocalAddress(), priority.value)) 1506 if bt_profile_constants['a2dp_sink'] == profile: 1507 host_ad.droid.bluetoothA2dpSinkSetPriority( 1508 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1509 elif bt_profile_constants['headset_client'] == profile: 1510 host_ad.droid.bluetoothHfpClientSetPriority( 1511 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1512 elif bt_profile_constants['pbap_client'] == profile: 1513 host_ad.droid.bluetoothPbapClientSetPriority( 1514 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1515 else: 1516 host_ad.log.error( 1517 "Profile {} not yet supported for priority settings".format( 1518 profile)) 1519 1520 1521def setup_multiple_devices_for_bt_test(android_devices): 1522 """A common setup routine for Bluetooth on input Android device list. 1523 1524 Things this function sets up: 1525 1. Resets Bluetooth 1526 2. Set Bluetooth local name to random string of size 4 1527 3. Disable BLE background scanning. 1528 4. Enable Bluetooth snoop logging. 1529 1530 Args: 1531 android_devices: Android device list to setup Bluetooth on. 1532 1533 Returns: 1534 True if successful, false if unsuccessful. 1535 """ 1536 log.info("Setting up Android Devices") 1537 # TODO: Temp fix for an selinux error. 1538 for ad in android_devices: 1539 ad.adb.shell("setenforce 0") 1540 threads = [] 1541 try: 1542 for a in android_devices: 1543 thread = threading.Thread(target=factory_reset_bluetooth, 1544 args=([[a]])) 1545 threads.append(thread) 1546 thread.start() 1547 for t in threads: 1548 t.join() 1549 1550 for a in android_devices: 1551 d = a.droid 1552 # TODO: Create specific RPC command to instantiate 1553 # BluetoothConnectionFacade. This is just a workaround. 1554 d.bluetoothStartConnectionStateChangeMonitor("") 1555 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 1556 if not setup_result: 1557 a.log.error("Failed to set device name.") 1558 return setup_result 1559 d.bluetoothDisableBLE() 1560 utils.set_location_service(a, True) 1561 bonded_devices = d.bluetoothGetBondedDevices() 1562 for b in bonded_devices: 1563 a.log.info("Removing bond for device {}".format(b['address'])) 1564 d.bluetoothUnbond(b['address']) 1565 for a in android_devices: 1566 a.adb.shell("setprop persist.bluetooth.btsnooplogmode full") 1567 getprop_result = a.adb.shell( 1568 "getprop persist.bluetooth.btsnooplogmode") == "full" 1569 if not getprop_result: 1570 a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.") 1571 except Exception as err: 1572 log.error("Something went wrong in multi device setup: {}".format(err)) 1573 return False 1574 return setup_result 1575 1576 1577def setup_n_advertisements(adv_ad, num_advertisements): 1578 """Setup input number of advertisements on input Android device. 1579 1580 Args: 1581 adv_ad: The Android device to start LE advertisements on. 1582 num_advertisements: The number of advertisements to start. 1583 1584 Returns: 1585 advertise_callback_list: List of advertisement callback ids. 1586 """ 1587 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 1588 ble_advertise_settings_modes['low_latency']) 1589 advertise_data = adv_ad.droid.bleBuildAdvertiseData() 1590 advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings() 1591 advertise_callback_list = [] 1592 for i in range(num_advertisements): 1593 advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback() 1594 advertise_callback_list.append(advertise_callback) 1595 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 1596 advertise_settings) 1597 try: 1598 adv_ad.ed.pop_event(adv_succ.format(advertise_callback), 1599 bt_default_timeout) 1600 adv_ad.log.info("Advertisement {} started.".format(i + 1)) 1601 except Empty as error: 1602 adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) 1603 raise BtTestUtilsError( 1604 "Test failed with Empty error: {}".format(error)) 1605 return advertise_callback_list 1606 1607 1608def take_btsnoop_log(ad, testcase, testname): 1609 """Grabs the btsnoop_hci log on a device and stores it in the log directory 1610 of the test class. 1611 1612 If you want grab the btsnoop_hci log, call this function with android_device 1613 objects in on_fail. Bug report takes a relative long time to take, so use 1614 this cautiously. 1615 1616 Args: 1617 ad: The android_device instance to take bugreport on. 1618 testcase: Name of the test calss that triggered this snoop log. 1619 testname: Name of the test case that triggered this bug report. 1620 """ 1621 testname = "".join(x for x in testname if x.isalnum()) 1622 serial = ad.serial 1623 device_model = ad.droid.getBuildModel() 1624 device_model = device_model.replace(" ", "") 1625 out_name = ','.join((testname, device_model, serial)) 1626 snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs') 1627 os.makedirs(snoop_path, exist_ok=True) 1628 cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, 1629 " ", snoop_path + '/' + out_name, ".btsnoop_hci.log")) 1630 exe_cmd(cmd) 1631 try: 1632 cmd = ''.join( 1633 ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ", 1634 snoop_path + '/' + out_name, ".btsnoop_hci.log.last")) 1635 exe_cmd(cmd) 1636 except Exception as err: 1637 testcase.log.info( 1638 "File does not exist {}".format(btsnoop_last_log_path_on_device)) 1639 1640 1641def take_btsnoop_logs(android_devices, testcase, testname): 1642 """Pull btsnoop logs from an input list of android devices. 1643 1644 Args: 1645 android_devices: the list of Android devices to pull btsnoop logs from. 1646 testcase: Name of the test calss that triggered this snoop log. 1647 testname: Name of the test case that triggered this bug report. 1648 """ 1649 for a in android_devices: 1650 take_btsnoop_log(a, testcase, testname) 1651 1652 1653def teardown_n_advertisements(adv_ad, num_advertisements, 1654 advertise_callback_list): 1655 """Stop input number of advertisements on input Android device. 1656 1657 Args: 1658 adv_ad: The Android device to stop LE advertisements on. 1659 num_advertisements: The number of advertisements to stop. 1660 advertise_callback_list: The list of advertisement callbacks to stop. 1661 1662 Returns: 1663 True if successful, false if unsuccessful. 1664 """ 1665 for n in range(num_advertisements): 1666 adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n]) 1667 return True 1668 1669 1670def verify_server_and_client_connected(client_ad, server_ad, log=True): 1671 """Verify that input server and client Android devices are connected. 1672 1673 This code is under the assumption that there will only be 1674 a single connection. 1675 1676 Args: 1677 client_ad: the Android device to check number of active connections. 1678 server_ad: the Android device to check number of active connections. 1679 1680 Returns: 1681 True both server and client have at least 1 active connection, 1682 false if unsuccessful. 1683 """ 1684 test_result = True 1685 if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1686 if log: 1687 server_ad.log.error("No socket connections found on server.") 1688 test_result = False 1689 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1690 if log: 1691 client_ad.log.error("No socket connections found on client.") 1692 test_result = False 1693 return test_result 1694 1695 1696def wait_for_bluetooth_manager_state(droid, 1697 state=None, 1698 timeout=10, 1699 threshold=5): 1700 """ Waits for BlueTooth normalized state or normalized explicit state 1701 args: 1702 droid: droid device object 1703 state: expected BlueTooth state 1704 timeout: max timeout threshold 1705 threshold: list len of bt state 1706 Returns: 1707 True if successful, false if unsuccessful. 1708 """ 1709 all_states = [] 1710 get_state = lambda: droid.bluetoothGetLeState() 1711 start_time = time.time() 1712 while time.time() < start_time + timeout: 1713 all_states.append(get_state()) 1714 if len(all_states) >= threshold: 1715 # for any normalized state 1716 if state is None: 1717 if len(set(all_states[-threshold:])) == 1: 1718 log.info("State normalized {}".format( 1719 set(all_states[-threshold:]))) 1720 return True 1721 else: 1722 # explicit check against normalized state 1723 if set([state]).issubset(all_states[-threshold:]): 1724 return True 1725 time.sleep(0.5) 1726 log.error( 1727 "Bluetooth state fails to normalize" if state is None else 1728 "Failed to match bluetooth state, current state {} expected state {}". 1729 format(get_state(), state)) 1730 return False 1731 1732 1733def _wait_for_passkey_match(pri_ad, sec_ad): 1734 pri_pin, sec_pin = -1, 1 1735 pri_variant, sec_variant = -1, 1 1736 pri_pairing_req, sec_pairing_req = None, None 1737 try: 1738 pri_pairing_req = pri_ad.ed.pop_event( 1739 event_name="BluetoothActionPairingRequest", 1740 timeout=bt_default_timeout) 1741 pri_variant = pri_pairing_req["data"]["PairingVariant"] 1742 pri_pin = pri_pairing_req["data"]["Pin"] 1743 pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format( 1744 pri_pin, pri_variant)) 1745 sec_pairing_req = sec_ad.ed.pop_event( 1746 event_name="BluetoothActionPairingRequest", 1747 timeout=bt_default_timeout) 1748 sec_variant = sec_pairing_req["data"]["PairingVariant"] 1749 sec_pin = sec_pairing_req["data"]["Pin"] 1750 sec_ad.log.info( 1751 "Secondary device received Pin: {}, Variant: {}".format( 1752 sec_pin, sec_variant)) 1753 except Empty as err: 1754 log.error("Wait for pin error: {}".format(err)) 1755 log.error("Pairing request state, Primary: {}, Secondary: {}".format( 1756 pri_pairing_req, sec_pairing_req)) 1757 return False 1758 if pri_variant == sec_variant == pairing_variant_passkey_confirmation: 1759 confirmation = pri_pin == sec_pin 1760 if confirmation: 1761 log.info("Pairing code matched, accepting connection") 1762 else: 1763 log.info("Pairing code mismatched, rejecting connection") 1764 pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1765 str(confirmation)) 1766 sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1767 str(confirmation)) 1768 if not confirmation: 1769 return False 1770 elif pri_variant != sec_variant: 1771 log.error("Pairing variant mismatched, abort connection") 1772 return False 1773 return True 1774 1775 1776def write_read_verify_data(client_ad, server_ad, msg, binary=False): 1777 """Verify that the client wrote data to the server Android device correctly. 1778 1779 Args: 1780 client_ad: the Android device to perform the write. 1781 server_ad: the Android device to read the data written. 1782 msg: the message to write. 1783 binary: if the msg arg is binary or not. 1784 1785 Returns: 1786 True if the data written matches the data read, false if not. 1787 """ 1788 client_ad.log.info("Write message.") 1789 try: 1790 if binary: 1791 client_ad.droid.bluetoothSocketConnWriteBinary(msg) 1792 else: 1793 client_ad.droid.bluetoothSocketConnWrite(msg) 1794 except Exception as err: 1795 client_ad.log.error("Failed to write data: {}".format(err)) 1796 return False 1797 server_ad.log.info("Read message.") 1798 try: 1799 if binary: 1800 read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip( 1801 "\r\n") 1802 else: 1803 read_msg = server_ad.droid.bluetoothSocketConnRead() 1804 except Exception as err: 1805 server_ad.log.error("Failed to read data: {}".format(err)) 1806 return False 1807 log.info("Verify message.") 1808 if msg != read_msg: 1809 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 1810 return False 1811 return True 1812 1813 1814class MediaControlOverSl4a(object): 1815 """Media control using sl4a facade for general purpose. 1816 1817 """ 1818 def __init__(self, android_device, music_file): 1819 """Initialize the media_control class. 1820 1821 Args: 1822 android_dut: android_device object 1823 music_file: location of the music file 1824 """ 1825 self.android_device = android_device 1826 self.music_file = music_file 1827 1828 def play(self): 1829 """Play media. 1830 1831 """ 1832 self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file, 1833 'default', True) 1834 playing = self.android_device.droid.mediaIsPlaying() 1835 asserts.assert_true(playing, 1836 'Failed to play music %s' % self.music_file) 1837 1838 def pause(self): 1839 """Pause media. 1840 1841 """ 1842 self.android_device.droid.mediaPlayPause('default') 1843 paused = not self.android_device.droid.mediaIsPlaying() 1844 asserts.assert_true(paused, 1845 'Failed to pause music %s' % self.music_file) 1846 1847 def resume(self): 1848 """Resume media. 1849 1850 """ 1851 self.android_device.droid.mediaPlayStart('default') 1852 playing = self.android_device.droid.mediaIsPlaying() 1853 asserts.assert_true(playing, 1854 'Failed to play music %s' % self.music_file) 1855 1856 def stop(self): 1857 """Stop media. 1858 1859 """ 1860 self.android_device.droid.mediaPlayStop('default') 1861 stopped = not self.android_device.droid.mediaIsPlaying() 1862 asserts.assert_true(stopped, 1863 'Failed to stop music %s' % self.music_file) 1864