1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18import os 19import random 20import re 21import string 22import threading 23import time 24from queue import Empty 25from subprocess import call 26 27from acts.test_utils.bt.bt_constants import adv_fail 28from acts.test_utils.bt.bt_constants import adv_succ 29from acts.test_utils.bt.bt_constants import batch_scan_not_supported_list 30from acts.test_utils.bt.bt_constants import bits_per_samples 31from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes 32from acts.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers 33from acts.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed 34from acts.test_utils.bt.bt_constants import bluetooth_off 35from acts.test_utils.bt.bt_constants import bluetooth_on 36from acts.test_utils.bt.bt_constants import \ 37 bluetooth_profile_connection_state_changed 38from acts.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid 39from acts.test_utils.bt.bt_constants import bt_default_timeout 40from acts.test_utils.bt.bt_constants import bt_profile_constants 41from acts.test_utils.bt.bt_constants import bt_profile_states 42from acts.test_utils.bt.bt_constants import bt_rfcomm_uuids 43from acts.test_utils.bt.bt_constants import bt_scan_mode_types 44from acts.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device 45from acts.test_utils.bt.bt_constants import btsnoop_log_path_on_device 46from acts.test_utils.bt.bt_constants import channel_modes 47from acts.test_utils.bt.bt_constants import codec_types 48from acts.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms 49from acts.test_utils.bt.bt_constants import default_rfcomm_timeout_ms 50from acts.test_utils.bt.bt_constants import hid_id_keyboard 51from acts.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation 52from acts.test_utils.bt.bt_constants import pan_connect_timeout 53from acts.test_utils.bt.bt_constants import sample_rates 54from acts.test_utils.bt.bt_constants import scan_result 55from acts.test_utils.bt.bt_constants import sig_uuid_constants 56from acts.test_utils.bt.bt_constants import small_timeout 57from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb 58from acts.test_utils.tel.tel_test_utils import verify_http_connection 59from acts.utils import exe_cmd 60 61from acts import context 62from acts import utils 63 64log = logging 65 66advertisements_to_devices = {} 67 68 69class BtTestUtilsError(Exception): 70 pass 71 72 73def _add_android_device_to_dictionary(android_device, profile_list, 74 selector_dict): 75 """Adds the AndroidDevice and supported features to the selector dictionary 76 77 Args: 78 android_device: The Android device. 79 profile_list: The list of profiles the Android device supports. 80 """ 81 for profile in profile_list: 82 if profile in selector_dict and android_device not in selector_dict[profile]: 83 selector_dict[profile].append(android_device) 84 else: 85 selector_dict[profile] = [android_device] 86 87 88def bluetooth_enabled_check(ad): 89 """Checks if the Bluetooth state is enabled, if not it will attempt to 90 enable it. 91 92 Args: 93 ad: The Android device list to enable Bluetooth on. 94 95 Returns: 96 True if successful, false if unsuccessful. 97 """ 98 if not ad.droid.bluetoothCheckState(): 99 ad.droid.bluetoothToggleState(True) 100 expected_bluetooth_on_event_name = bluetooth_on 101 try: 102 ad.ed.pop_event(expected_bluetooth_on_event_name, 103 bt_default_timeout) 104 except Empty: 105 ad.log.info( 106 "Failed to toggle Bluetooth on(no broadcast received).") 107 # Try one more time to poke at the actual state. 108 if ad.droid.bluetoothCheckState(): 109 ad.log.info(".. actual state is ON") 110 return True 111 ad.log.error(".. actual state is OFF") 112 return False 113 return True 114 115 116def check_device_supported_profiles(droid): 117 """Checks for Android device supported profiles. 118 119 Args: 120 droid: The droid object to query. 121 122 Returns: 123 A dictionary of supported profiles. 124 """ 125 profile_dict = {} 126 profile_dict['hid'] = droid.bluetoothHidIsReady() 127 profile_dict['hsp'] = droid.bluetoothHspIsReady() 128 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 129 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 130 profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady() 131 profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady() 132 profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady() 133 return profile_dict 134 135 136def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, 137 adv_android_device, adv_callback_list): 138 """Try to gracefully stop all scanning and advertising instances. 139 140 Args: 141 scn_android_device: The Android device that is actively scanning. 142 scn_callback_list: The scan callback id list that needs to be stopped. 143 adv_android_device: The Android device that is actively advertising. 144 adv_callback_list: The advertise callback id list that needs to be 145 stopped. 146 """ 147 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 148 adv_droid = adv_android_device.droid 149 try: 150 for scan_callback in scn_callback_list: 151 scan_droid.bleStopBleScan(scan_callback) 152 except Exception as err: 153 scn_android_device.log.debug( 154 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 155 err)) 156 reset_bluetooth([scn_android_device]) 157 try: 158 for adv_callback in adv_callback_list: 159 adv_droid.bleStopBleAdvertising(adv_callback) 160 except Exception as err: 161 adv_android_device.log.debug( 162 "Failed to stop LE advertisement... reseting Bluetooth. Error {}". 163 format(err)) 164 reset_bluetooth([adv_android_device]) 165 166 167def clear_bonded_devices(ad): 168 """Clear bonded devices from the input Android device. 169 170 Args: 171 ad: the Android device performing the connection. 172 Returns: 173 True if clearing bonded devices was successful, false if unsuccessful. 174 """ 175 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 176 while bonded_device_list: 177 device_address = bonded_device_list[0]['address'] 178 if not ad.droid.bluetoothUnbond(device_address): 179 log.error("Failed to unbond {} from {}".format( 180 device_address, ad.serial)) 181 return False 182 log.info("Successfully unbonded {} from {}".format( 183 device_address, ad.serial)) 184 #TODO: wait for BOND_STATE_CHANGED intent instead of waiting 185 time.sleep(1) 186 187 # If device was first connected using LE transport, after bonding it is 188 # accessible through it's LE address, and through it classic address. 189 # Unbonding it will unbond two devices representing different 190 # "addresses". Attempt to unbond such already unbonded devices will 191 # result in bluetoothUnbond returning false. 192 bonded_device_list = ad.droid.bluetoothGetBondedDevices() 193 return True 194 195 196def connect_phone_to_headset(android, headset, timeout=bt_default_timeout, 197 connection_check_period=10): 198 """Connects android phone to bluetooth headset. 199 Headset object must have methods power_on and enter_pairing_mode, 200 and attribute mac_address. 201 202 Args: 203 android: AndroidDevice object with SL4A installed. 204 headset: Object with attribute mac_address and methods power_on and 205 enter_pairing_mode. 206 timeout: Seconds to wait for devices to connect. 207 connection_check_period: how often to check for connection once the 208 SL4A connect RPC has been sent. 209 Returns: 210 connected (bool): True if devices are paired and connected by end of 211 method. False otherwise. 212 """ 213 connected = is_a2dp_src_device_connected(android, headset.mac_address) 214 log.info('Devices connected before pair attempt: %s' % connected) 215 start_time = time.time() 216 # If already connected, skip pair and connect attempt. 217 while not connected and (time.time() - start_time < timeout): 218 bonded_info = android.droid.bluetoothA2dpGetConnectedDevices() 219 if headset.mac_address not in [info["address"] for info in bonded_info]: 220 # Turn on headset and initiate pairing mode. 221 headset.enter_pairing_mode() 222 # Use SL4A to pair and connect with headset. 223 android.droid.bluetoothDiscoverAndBond(headset.mac_address) 224 else: # Device is bonded but not connected 225 android.droid.bluetoothConnectBonded(headset.mac_address) 226 log.info('Waiting for connection...') 227 time.sleep(connection_check_period) 228 # Check for connection. 229 connected = is_a2dp_src_device_connected(android, headset.mac_address) 230 log.info('Devices connected after pair attempt: %s' % connected) 231 return connected 232 233 234def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2): 235 """Connects pri droid to secondary droid. 236 237 Args: 238 pri_ad: AndroidDroid initiating connection 239 sec_ad: AndroidDroid accepting connection 240 profiles_set: Set of profiles to be connected 241 attempts: Number of attempts to try until failure. 242 243 Returns: 244 Pass if True 245 Fail if False 246 """ 247 device_addr = sec_ad.droid.bluetoothGetLocalAddress() 248 # Allows extra time for the SDP records to be updated. 249 time.sleep(2) 250 curr_attempts = 0 251 while curr_attempts < attempts: 252 log.info("connect_pri_to_sec curr attempt {} total {}".format( 253 curr_attempts, attempts)) 254 if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 255 return True 256 curr_attempts += 1 257 log.error("connect_pri_to_sec failed to connect after {} attempts".format( 258 attempts)) 259 return False 260 261 262def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set): 263 """Connects pri droid to secondary droid. 264 265 Args: 266 pri_ad: AndroidDroid initiating connection. 267 sec_ad: AndroidDroid accepting connection. 268 profiles_set: Set of profiles to be connected. 269 270 Returns: 271 True of connection is successful, false if unsuccessful. 272 """ 273 # Check if we support all profiles. 274 supported_profiles = bt_profile_constants.values() 275 for profile in profiles_set: 276 if profile not in supported_profiles: 277 pri_ad.log.info("Profile {} is not supported list {}".format( 278 profile, supported_profiles)) 279 return False 280 281 # First check that devices are bonded. 282 paired = False 283 for paired_device in pri_ad.droid.bluetoothGetBondedDevices(): 284 if paired_device['address'] == \ 285 sec_ad.droid.bluetoothGetLocalAddress(): 286 paired = True 287 break 288 289 if not paired: 290 pri_ad.log.error("Not paired to {}".format(sec_ad.serial)) 291 return False 292 293 # Now try to connect them, the following call will try to initiate all 294 # connections. 295 pri_ad.droid.bluetoothConnectBonded( 296 sec_ad.droid.bluetoothGetLocalAddress()) 297 298 end_time = time.time() + 10 299 profile_connected = set() 300 sec_addr = sec_ad.droid.bluetoothGetLocalAddress() 301 pri_ad.log.info("Profiles to be connected {}".format(profiles_set)) 302 # First use APIs to check profile connection state 303 while (time.time() < end_time 304 and not profile_connected.issuperset(profiles_set)): 305 if (bt_profile_constants['headset_client'] not in profile_connected 306 and bt_profile_constants['headset_client'] in profiles_set): 307 if is_hfp_client_device_connected(pri_ad, sec_addr): 308 profile_connected.add(bt_profile_constants['headset_client']) 309 if (bt_profile_constants['a2dp'] not in profile_connected 310 and bt_profile_constants['a2dp'] in profiles_set): 311 if is_a2dp_src_device_connected(pri_ad, sec_addr): 312 profile_connected.add(bt_profile_constants['a2dp']) 313 if (bt_profile_constants['a2dp_sink'] not in profile_connected 314 and bt_profile_constants['a2dp_sink'] in profiles_set): 315 if is_a2dp_snk_device_connected(pri_ad, sec_addr): 316 profile_connected.add(bt_profile_constants['a2dp_sink']) 317 if (bt_profile_constants['map_mce'] not in profile_connected 318 and bt_profile_constants['map_mce'] in profiles_set): 319 if is_map_mce_device_connected(pri_ad, sec_addr): 320 profile_connected.add(bt_profile_constants['map_mce']) 321 if (bt_profile_constants['map'] not in profile_connected 322 and bt_profile_constants['map'] in profiles_set): 323 if is_map_mse_device_connected(pri_ad, sec_addr): 324 profile_connected.add(bt_profile_constants['map']) 325 time.sleep(0.1) 326 # If APIs fail, try to find the connection broadcast receiver. 327 while not profile_connected.issuperset(profiles_set): 328 try: 329 profile_event = pri_ad.ed.pop_event( 330 bluetooth_profile_connection_state_changed, 331 bt_default_timeout + 10) 332 pri_ad.log.info("Got event {}".format(profile_event)) 333 except Exception: 334 pri_ad.log.error("Did not get {} profiles left {}".format( 335 bluetooth_profile_connection_state_changed, profile_connected)) 336 return False 337 338 profile = profile_event['data']['profile'] 339 state = profile_event['data']['state'] 340 device_addr = profile_event['data']['addr'] 341 if state == bt_profile_states['connected'] and \ 342 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 343 profile_connected.add(profile) 344 pri_ad.log.info( 345 "Profiles connected until now {}".format(profile_connected)) 346 # Failure happens inside the while loop. If we came here then we already 347 # connected. 348 return True 349 350 351def determine_max_advertisements(android_device): 352 """Determines programatically how many advertisements the Android device 353 supports. 354 355 Args: 356 android_device: The Android device to determine max advertisements of. 357 358 Returns: 359 The maximum advertisement count. 360 """ 361 android_device.log.info( 362 "Determining number of maximum concurrent advertisements...") 363 advertisement_count = 0 364 bt_enabled = False 365 expected_bluetooth_on_event_name = bluetooth_on 366 if not android_device.droid.bluetoothCheckState(): 367 android_device.droid.bluetoothToggleState(True) 368 try: 369 android_device.ed.pop_event(expected_bluetooth_on_event_name, 370 bt_default_timeout) 371 except Exception: 372 android_device.log.info( 373 "Failed to toggle Bluetooth on(no broadcast received).") 374 # Try one more time to poke at the actual state. 375 if android_device.droid.bluetoothCheckState() is True: 376 android_device.log.info(".. actual state is ON") 377 else: 378 android_device.log.error( 379 "Failed to turn Bluetooth on. Setting default advertisements to 1" 380 ) 381 advertisement_count = -1 382 return advertisement_count 383 advertise_callback_list = [] 384 advertise_data = android_device.droid.bleBuildAdvertiseData() 385 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 386 while (True): 387 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 388 advertise_callback_list.append(advertise_callback) 389 390 android_device.droid.bleStartBleAdvertising( 391 advertise_callback, advertise_data, advertise_settings) 392 393 regex = "(" + adv_succ.format( 394 advertise_callback) + "|" + adv_fail.format( 395 advertise_callback) + ")" 396 # wait for either success or failure event 397 evt = android_device.ed.pop_events(regex, bt_default_timeout, 398 small_timeout) 399 if evt[0]["name"] == adv_succ.format(advertise_callback): 400 advertisement_count += 1 401 android_device.log.info( 402 "Advertisement {} started.".format(advertisement_count)) 403 else: 404 error = evt[0]["data"]["Error"] 405 if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS": 406 android_device.log.info( 407 "Advertisement failed to start. Reached max " + 408 "advertisements at {}".format(advertisement_count)) 409 break 410 else: 411 raise BtTestUtilsError( 412 "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," + 413 " but received bad error code {}".format(error)) 414 try: 415 for adv in advertise_callback_list: 416 android_device.droid.bleStopBleAdvertising(adv) 417 except Exception: 418 android_device.log.error( 419 "Failed to stop advertisingment, resetting Bluetooth.") 420 reset_bluetooth([android_device]) 421 return advertisement_count 422 423 424def disable_bluetooth(droid): 425 """Disable Bluetooth on input Droid object. 426 427 Args: 428 droid: The droid object to disable Bluetooth on. 429 430 Returns: 431 True if successful, false if unsuccessful. 432 """ 433 if droid.bluetoothCheckState() is True: 434 droid.bluetoothToggleState(False) 435 if droid.bluetoothCheckState() is True: 436 log.error("Failed to toggle Bluetooth off.") 437 return False 438 return True 439 440 441def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): 442 """ 443 Disconnect primary from secondary on a specific set of profiles 444 Args: 445 pri_ad - Primary android_device initiating disconnection 446 sec_ad - Secondary android droid (sl4a interface to keep the 447 method signature the same connect_pri_to_sec above) 448 profiles_list - List of profiles we want to disconnect from 449 450 Returns: 451 True on Success 452 False on Failure 453 """ 454 # Sanity check to see if all the profiles in the given set is supported 455 supported_profiles = bt_profile_constants.values() 456 for profile in profiles_list: 457 if profile not in supported_profiles: 458 pri_ad.log.info("Profile {} is not in supported list {}".format( 459 profile, supported_profiles)) 460 return False 461 462 pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices()) 463 # Disconnecting on a already disconnected profile is a nop, 464 # so not checking for the connection state 465 try: 466 pri_ad.droid.bluetoothDisconnectConnectedProfile( 467 sec_ad.droid.bluetoothGetLocalAddress(), profiles_list) 468 except Exception as err: 469 pri_ad.log.error( 470 "Exception while trying to disconnect profile(s) {}: {}".format( 471 profiles_list, err)) 472 return False 473 474 profile_disconnected = set() 475 pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list)) 476 477 while not profile_disconnected.issuperset(profiles_list): 478 try: 479 profile_event = pri_ad.ed.pop_event( 480 bluetooth_profile_connection_state_changed, bt_default_timeout) 481 pri_ad.log.info("Got event {}".format(profile_event)) 482 except Exception as e: 483 pri_ad.log.error( 484 "Did not disconnect from Profiles. Reason {}".format(e)) 485 return False 486 487 profile = profile_event['data']['profile'] 488 state = profile_event['data']['state'] 489 device_addr = profile_event['data']['addr'] 490 491 if state == bt_profile_states['disconnected'] and \ 492 device_addr == sec_ad.droid.bluetoothGetLocalAddress(): 493 profile_disconnected.add(profile) 494 pri_ad.log.info( 495 "Profiles disconnected so far {}".format(profile_disconnected)) 496 497 return True 498 499 500def enable_bluetooth(droid, ed): 501 if droid.bluetoothCheckState() is True: 502 return True 503 504 droid.bluetoothToggleState(True) 505 expected_bluetooth_on_event_name = bluetooth_on 506 try: 507 ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout) 508 except Exception: 509 log.info("Failed to toggle Bluetooth on (no broadcast received)") 510 if droid.bluetoothCheckState() is True: 511 log.info(".. actual state is ON") 512 return True 513 log.info(".. actual state is OFF") 514 return False 515 516 return True 517 518 519def factory_reset_bluetooth(android_devices): 520 """Clears Bluetooth stack of input Android device list. 521 522 Args: 523 android_devices: The Android device list to reset Bluetooth 524 525 Returns: 526 True if successful, false if unsuccessful. 527 """ 528 for a in android_devices: 529 droid, ed = a.droid, a.ed 530 a.log.info("Reset state of bluetooth on device.") 531 if not bluetooth_enabled_check(a): 532 return False 533 # TODO: remove device unbond b/79418045 534 # Temporary solution to ensure all devices are unbonded 535 bonded_devices = droid.bluetoothGetBondedDevices() 536 for b in bonded_devices: 537 a.log.info("Removing bond for device {}".format(b['address'])) 538 droid.bluetoothUnbond(b['address']) 539 540 droid.bluetoothFactoryReset() 541 wait_for_bluetooth_manager_state(droid) 542 if not enable_bluetooth(droid, ed): 543 return False 544 return True 545 546 547def generate_ble_advertise_objects(droid): 548 """Generate generic LE advertise objects. 549 550 Args: 551 droid: The droid object to generate advertise LE objects from. 552 553 Returns: 554 advertise_callback: The generated advertise callback id. 555 advertise_data: The generated advertise data id. 556 advertise_settings: The generated advertise settings id. 557 """ 558 advertise_callback = droid.bleGenBleAdvertiseCallback() 559 advertise_data = droid.bleBuildAdvertiseData() 560 advertise_settings = droid.bleBuildAdvertiseSettings() 561 return advertise_callback, advertise_data, advertise_settings 562 563 564def generate_ble_scan_objects(droid): 565 """Generate generic LE scan objects. 566 567 Args: 568 droid: The droid object to generate LE scan objects from. 569 570 Returns: 571 filter_list: The generated scan filter list id. 572 scan_settings: The generated scan settings id. 573 scan_callback: The generated scan callback id. 574 """ 575 filter_list = droid.bleGenFilterList() 576 scan_settings = droid.bleBuildScanSetting() 577 scan_callback = droid.bleGenScanCallback() 578 return filter_list, scan_settings, scan_callback 579 580 581def generate_id_by_size( 582 size, 583 chars=( 584 string.ascii_lowercase + string.ascii_uppercase + string.digits)): 585 """Generate random ascii characters of input size and input char types 586 587 Args: 588 size: Input size of string. 589 chars: (Optional) Chars to use in generating a random string. 590 591 Returns: 592 String of random input chars at the input size. 593 """ 594 return ''.join(random.choice(chars) for _ in range(size)) 595 596 597def get_advanced_droid_list(android_devices): 598 """Add max_advertisement and batch_scan_supported attributes to input 599 Android devices 600 601 This will programatically determine maximum LE advertisements of each 602 input Android device. 603 604 Args: 605 android_devices: The Android devices to setup. 606 607 Returns: 608 List of Android devices with new attribtues. 609 """ 610 droid_list = [] 611 for a in android_devices: 612 d, e = a.droid, a.ed 613 model = d.getBuildModel() 614 max_advertisements = 1 615 batch_scan_supported = True 616 if model in advertisements_to_devices.keys(): 617 max_advertisements = advertisements_to_devices[model] 618 else: 619 max_advertisements = determine_max_advertisements(a) 620 max_tries = 3 621 # Retry to calculate max advertisements 622 while max_advertisements == -1 and max_tries > 0: 623 a.log.info( 624 "Attempts left to determine max advertisements: {}".format( 625 max_tries)) 626 max_advertisements = determine_max_advertisements(a) 627 max_tries -= 1 628 advertisements_to_devices[model] = max_advertisements 629 if model in batch_scan_not_supported_list: 630 batch_scan_supported = False 631 role = { 632 'droid': d, 633 'ed': e, 634 'max_advertisements': max_advertisements, 635 'batch_scan_supported': batch_scan_supported 636 } 637 droid_list.append(role) 638 return droid_list 639 640 641def get_bluetooth_crash_count(android_device): 642 out = android_device.adb.shell("dumpsys bluetooth_manager") 643 return int(re.search("crashed(.*\d)", out).group(1)) 644 645 646def get_device_selector_dictionary(android_device_list): 647 """Create a dictionary of Bluetooth features vs Android devices. 648 649 Args: 650 android_device_list: The list of Android devices. 651 Returns: 652 A dictionary of profiles/features to Android devices. 653 """ 654 selector_dict = {} 655 for ad in android_device_list: 656 uuids = ad.droid.bluetoothGetLocalUuids() 657 658 for profile, uuid_const in sig_uuid_constants.items(): 659 uuid_check = sig_uuid_constants['BASE_UUID'].format( 660 uuid_const).lower() 661 if uuids and uuid_check in uuids: 662 if profile in selector_dict: 663 selector_dict[profile].append(ad) 664 else: 665 selector_dict[profile] = [ad] 666 667 # Various services may not be active during BT startup. 668 # If the device can be identified through adb shell pm list features 669 # then try to add them to the appropriate profiles / features. 670 671 # Android TV. 672 if "feature:com.google.android.tv.installed" in ad.features: 673 ad.log.info("Android TV device found.") 674 supported_profiles = ['AudioSink'] 675 _add_android_device_to_dictionary(ad, supported_profiles, 676 selector_dict) 677 678 # Android Auto 679 elif "feature:android.hardware.type.automotive" in ad.features: 680 ad.log.info("Android Auto device found.") 681 # Add: AudioSink , A/V_RemoteControl, 682 supported_profiles = [ 683 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' 684 ] 685 _add_android_device_to_dictionary(ad, supported_profiles, 686 selector_dict) 687 # Android Wear 688 elif "feature:android.hardware.type.watch" in ad.features: 689 ad.log.info("Android Wear device found.") 690 supported_profiles = [] 691 _add_android_device_to_dictionary(ad, supported_profiles, 692 selector_dict) 693 # Android Phone 694 elif "feature:android.hardware.telephony" in ad.features: 695 ad.log.info("Android Phone device found.") 696 # Add: AudioSink 697 supported_profiles = [ 698 'AudioSource', 'A/V_RemoteControlTarget', 699 'Message Access Server' 700 ] 701 _add_android_device_to_dictionary(ad, supported_profiles, 702 selector_dict) 703 return selector_dict 704 705 706def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 707 """Start generic advertisement and get it's mac address by LE scanning. 708 709 Args: 710 scan_ad: The Android device to use as the scanner. 711 adv_ad: The Android device to use as the advertiser. 712 713 Returns: 714 mac_address: The mac address of the advertisement. 715 advertise_callback: The advertise callback id of the active 716 advertisement. 717 """ 718 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 719 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 720 ble_advertise_settings_modes['low_latency']) 721 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 722 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 723 ble_advertise_settings_tx_powers['high']) 724 advertise_callback, advertise_data, advertise_settings = ( 725 generate_ble_advertise_objects(adv_ad.droid)) 726 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 727 advertise_settings) 728 try: 729 adv_ad.ed.pop_event( 730 adv_succ.format(advertise_callback), bt_default_timeout) 731 except Empty as err: 732 raise BtTestUtilsError( 733 "Advertiser did not start successfully {}".format(err)) 734 filter_list = scan_ad.droid.bleGenFilterList() 735 scan_settings = scan_ad.droid.bleBuildScanSetting() 736 scan_callback = scan_ad.droid.bleGenScanCallback() 737 scan_ad.droid.bleSetScanFilterDeviceName( 738 adv_ad.droid.bluetoothGetLocalName()) 739 scan_ad.droid.bleBuildScanFilter(filter_list) 740 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 741 try: 742 event = scan_ad.ed.pop_event( 743 "BleScan{}onScanResults".format(scan_callback), bt_default_timeout) 744 except Empty as err: 745 raise BtTestUtilsError( 746 "Scanner did not find advertisement {}".format(err)) 747 mac_address = event['data']['Result']['deviceInfo']['address'] 748 return mac_address, advertise_callback, scan_callback 749 750 751def hid_device_send_key_data_report(host_id, device_ad, key, interval=1): 752 """Send a HID report simulating a 1-second keyboard press from host_ad to 753 device_ad 754 755 Args: 756 host_id: the Bluetooth MAC address or name of the HID host 757 device_ad: HID device 758 key: the key we want to send 759 interval: the interval between key press and key release 760 """ 761 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 762 hid_keyboard_report(key)) 763 time.sleep(interval) 764 device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard, 765 hid_keyboard_report("00")) 766 767 768def hid_keyboard_report(key, modifier="00"): 769 """Get the HID keyboard report for the given key 770 771 Args: 772 key: the key we want 773 modifier: HID keyboard modifier bytes 774 Returns: 775 The byte array for the HID report. 776 """ 777 return str( 778 bytearray.fromhex(" ".join( 779 [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8") 780 781 782def is_a2dp_connected(sink, source): 783 """ 784 Convenience Function to see if the 2 devices are connected on 785 A2dp. 786 Args: 787 sink: Audio Sink 788 source: Audio Source 789 Returns: 790 True if Connected 791 False if Not connected 792 """ 793 794 devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices() 795 for device in devices: 796 sink.log.info("A2dp Connected device {}".format(device["name"])) 797 if (device["address"] == source.droid.bluetoothGetLocalAddress()): 798 return True 799 return False 800 801 802def is_a2dp_snk_device_connected(ad, addr): 803 """Determines if an AndroidDevice has A2DP snk connectivity to input address 804 805 Args: 806 ad: the Android device 807 addr: the address that's expected 808 Returns: 809 True if connection was successful, false if unsuccessful. 810 """ 811 devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices() 812 ad.log.info("Connected A2DP Sink devices: {}".format(devices)) 813 if addr in {d['address'] for d in devices}: 814 return True 815 return False 816 817 818def is_a2dp_src_device_connected(ad, addr): 819 """Determines if an AndroidDevice has A2DP connectivity to input address 820 821 Args: 822 ad: the Android device 823 addr: the address that's expected 824 Returns: 825 True if connection was successful, false if unsuccessful. 826 """ 827 devices = ad.droid.bluetoothA2dpGetConnectedDevices() 828 ad.log.info("Connected A2DP Source devices: {}".format(devices)) 829 if addr in {d['address'] for d in devices}: 830 return True 831 return False 832 833 834def is_hfp_client_device_connected(ad, addr): 835 """Determines if an AndroidDevice has HFP connectivity to input address 836 837 Args: 838 ad: the Android device 839 addr: the address that's expected 840 Returns: 841 True if connection was successful, false if unsuccessful. 842 """ 843 devices = ad.droid.bluetoothHfpClientGetConnectedDevices() 844 ad.log.info("Connected HFP Client devices: {}".format(devices)) 845 if addr in {d['address'] for d in devices}: 846 return True 847 return False 848 849 850def is_map_mce_device_connected(ad, addr): 851 """Determines if an AndroidDevice has MAP MCE connectivity to input address 852 853 Args: 854 ad: the Android device 855 addr: the address that's expected 856 Returns: 857 True if connection was successful, false if unsuccessful. 858 """ 859 devices = ad.droid.bluetoothMapClientGetConnectedDevices() 860 ad.log.info("Connected MAP MCE devices: {}".format(devices)) 861 if addr in {d['address'] for d in devices}: 862 return True 863 return False 864 865 866def is_map_mse_device_connected(ad, addr): 867 """Determines if an AndroidDevice has MAP MSE connectivity to input address 868 869 Args: 870 ad: the Android device 871 addr: the address that's expected 872 Returns: 873 True if connection was successful, false if unsuccessful. 874 """ 875 devices = ad.droid.bluetoothMapGetConnectedDevices() 876 ad.log.info("Connected MAP MSE devices: {}".format(devices)) 877 if addr in {d['address'] for d in devices}: 878 return True 879 return False 880 881 882def kill_bluetooth_process(ad): 883 """Kill Bluetooth process on Android device. 884 885 Args: 886 ad: Android device to kill BT process on. 887 """ 888 ad.log.info("Killing Bluetooth process.") 889 pid = ad.adb.shell( 890 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 891 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 892 893 894def log_energy_info(android_devices, state): 895 """Logs energy info of input Android devices. 896 897 Args: 898 android_devices: input Android device list to log energy info from. 899 state: the input state to log. Usually 'Start' or 'Stop' for logging. 900 901 Returns: 902 A logging string of the Bluetooth energy info reported. 903 """ 904 return_string = "{} Energy info collection:\n".format(state) 905 # Bug: b/31966929 906 return return_string 907 908 909def orchestrate_and_verify_pan_connection(pan_dut, panu_dut): 910 """Setups up a PAN conenction between two android devices. 911 912 Args: 913 pan_dut: the Android device providing tethering services 914 panu_dut: the Android device using the internet connection from the 915 pan_dut 916 Returns: 917 True if PAN connection and verification is successful, 918 false if unsuccessful. 919 """ 920 if not toggle_airplane_mode_by_adb(log, panu_dut, True): 921 panu_dut.log.error("Failed to toggle airplane mode on") 922 return False 923 if not toggle_airplane_mode_by_adb(log, panu_dut, False): 924 pan_dut.log.error("Failed to toggle airplane mode off") 925 return False 926 pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 927 panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("") 928 if not bluetooth_enabled_check(panu_dut): 929 return False 930 if not bluetooth_enabled_check(pan_dut): 931 return False 932 pan_dut.droid.bluetoothPanSetBluetoothTethering(True) 933 if not (pair_pri_to_sec(pan_dut, panu_dut)): 934 return False 935 if not pan_dut.droid.bluetoothPanIsTetheringOn(): 936 pan_dut.log.error("Failed to enable Bluetooth tethering.") 937 return False 938 # Magic sleep needed to give the stack time in between bonding and 939 # connecting the PAN profile. 940 time.sleep(pan_connect_timeout) 941 panu_dut.droid.bluetoothConnectBonded( 942 pan_dut.droid.bluetoothGetLocalAddress()) 943 if not verify_http_connection(log, panu_dut): 944 panu_dut.log.error("Can't verify http connection on PANU device.") 945 if not verify_http_connection(log, pan_dut): 946 pan_dut.log.info( 947 "Can't verify http connection on PAN service device") 948 return False 949 return True 950 951 952def orchestrate_bluetooth_socket_connection( 953 client_ad, 954 server_ad, 955 accept_timeout_ms=default_bluetooth_socket_timeout_ms, 956 uuid=None): 957 """Sets up the Bluetooth Socket connection between two Android devices. 958 959 Args: 960 client_ad: the Android device performing the connection. 961 server_ad: the Android device accepting the connection. 962 Returns: 963 True if connection was successful, false if unsuccessful. 964 """ 965 server_ad.droid.bluetoothStartPairingHelper() 966 client_ad.droid.bluetoothStartPairingHelper() 967 968 server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( 969 (bluetooth_socket_conn_test_uuid 970 if uuid is None else uuid), accept_timeout_ms) 971 client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( 972 server_ad.droid.bluetoothGetLocalAddress(), 973 (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) 974 975 end_time = time.time() + bt_default_timeout 976 result = False 977 test_result = True 978 while time.time() < end_time: 979 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0: 980 test_result = True 981 client_ad.log.info("Bluetooth socket Client Connection Active") 982 break 983 else: 984 test_result = False 985 time.sleep(1) 986 if not test_result: 987 client_ad.log.error( 988 "Failed to establish a Bluetooth socket connection") 989 return False 990 return True 991 992 993def orchestrate_rfcomm_connection(client_ad, 994 server_ad, 995 accept_timeout_ms=default_rfcomm_timeout_ms, 996 uuid=None): 997 """Sets up the RFCOMM connection between two Android devices. 998 999 Args: 1000 client_ad: the Android device performing the connection. 1001 server_ad: the Android device accepting the connection. 1002 Returns: 1003 True if connection was successful, false if unsuccessful. 1004 """ 1005 result = orchestrate_bluetooth_socket_connection( 1006 client_ad, server_ad, accept_timeout_ms, 1007 (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid)) 1008 1009 return result 1010 1011 1012def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): 1013 """Pairs pri droid to secondary droid. 1014 1015 Args: 1016 pri_ad: Android device initiating connection 1017 sec_ad: Android device accepting connection 1018 attempts: Number of attempts to try until failure. 1019 auto_confirm: Auto confirm passkey match for both devices 1020 1021 Returns: 1022 Pass if True 1023 Fail if False 1024 """ 1025 pri_ad.droid.bluetoothStartConnectionStateChangeMonitor( 1026 sec_ad.droid.bluetoothGetLocalAddress()) 1027 curr_attempts = 0 1028 while curr_attempts < attempts: 1029 if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1030 return True 1031 # Wait 2 seconds before unbound 1032 time.sleep(2) 1033 if not clear_bonded_devices(pri_ad): 1034 log.error("Failed to clear bond for primary device at attempt {}" 1035 .format(str(curr_attempts))) 1036 return False 1037 if not clear_bonded_devices(sec_ad): 1038 log.error("Failed to clear bond for secondary device at attempt {}" 1039 .format(str(curr_attempts))) 1040 return False 1041 # Wait 2 seconds after unbound 1042 time.sleep(2) 1043 curr_attempts += 1 1044 log.error("pair_pri_to_sec failed to connect after {} attempts".format( 1045 str(attempts))) 1046 return False 1047 1048 1049def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm): 1050 # Enable discovery on sec_ad so that pri_ad can find it. 1051 # The timeout here is based on how much time it would take for two devices 1052 # to pair with each other once pri_ad starts seeing devices. 1053 pri_droid = pri_ad.droid 1054 sec_droid = sec_ad.droid 1055 pri_ad.ed.clear_all_events() 1056 sec_ad.ed.clear_all_events() 1057 log.info("Bonding device {} to {}".format( 1058 pri_droid.bluetoothGetLocalAddress(), 1059 sec_droid.bluetoothGetLocalAddress())) 1060 sec_droid.bluetoothMakeDiscoverable(bt_default_timeout) 1061 target_address = sec_droid.bluetoothGetLocalAddress() 1062 log.debug("Starting paring helper on each device") 1063 pri_droid.bluetoothStartPairingHelper(auto_confirm) 1064 sec_droid.bluetoothStartPairingHelper(auto_confirm) 1065 pri_ad.log.info("Primary device starting discovery and executing bond") 1066 result = pri_droid.bluetoothDiscoverAndBond(target_address) 1067 if not auto_confirm: 1068 if not _wait_for_passkey_match(pri_ad, sec_ad): 1069 return False 1070 # Loop until we have bonded successfully or timeout. 1071 end_time = time.time() + bt_default_timeout 1072 pri_ad.log.info("Verifying devices are bonded") 1073 while time.time() < end_time: 1074 bonded_devices = pri_droid.bluetoothGetBondedDevices() 1075 bonded = False 1076 for d in bonded_devices: 1077 if d['address'] == target_address: 1078 pri_ad.log.info("Successfully bonded to device") 1079 return True 1080 time.sleep(0.1) 1081 # Timed out trying to bond. 1082 pri_ad.log.info("Failed to bond devices.") 1083 return False 1084 1085 1086def reset_bluetooth(android_devices): 1087 """Resets Bluetooth state of input Android device list. 1088 1089 Args: 1090 android_devices: The Android device list to reset Bluetooth state on. 1091 1092 Returns: 1093 True if successful, false if unsuccessful. 1094 """ 1095 for a in android_devices: 1096 droid, ed = a.droid, a.ed 1097 a.log.info("Reset state of bluetooth on device.") 1098 if droid.bluetoothCheckState() is True: 1099 droid.bluetoothToggleState(False) 1100 expected_bluetooth_off_event_name = bluetooth_off 1101 try: 1102 ed.pop_event(expected_bluetooth_off_event_name, 1103 bt_default_timeout) 1104 except Exception: 1105 a.log.error("Failed to toggle Bluetooth off.") 1106 return False 1107 # temp sleep for b/17723234 1108 time.sleep(3) 1109 if not bluetooth_enabled_check(a): 1110 return False 1111 return True 1112 1113 1114def scan_and_verify_n_advertisements(scn_ad, max_advertisements): 1115 """Verify that input number of advertisements can be found from the scanning 1116 Android device. 1117 1118 Args: 1119 scn_ad: The Android device to start LE scanning on. 1120 max_advertisements: The number of advertisements the scanner expects to 1121 find. 1122 1123 Returns: 1124 True if successful, false if unsuccessful. 1125 """ 1126 test_result = False 1127 address_list = [] 1128 filter_list = scn_ad.droid.bleGenFilterList() 1129 scn_ad.droid.bleBuildScanFilter(filter_list) 1130 scan_settings = scn_ad.droid.bleBuildScanSetting() 1131 scan_callback = scn_ad.droid.bleGenScanCallback() 1132 scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 1133 start_time = time.time() 1134 while (start_time + bt_default_timeout) > time.time(): 1135 event = None 1136 try: 1137 event = scn_ad.ed.pop_event( 1138 scan_result.format(scan_callback), bt_default_timeout) 1139 except Empty as error: 1140 raise BtTestUtilsError( 1141 "Failed to find scan event: {}".format(error)) 1142 address = event['data']['Result']['deviceInfo']['address'] 1143 if address not in address_list: 1144 address_list.append(address) 1145 if len(address_list) == max_advertisements: 1146 test_result = True 1147 break 1148 scn_ad.droid.bleStopBleScan(scan_callback) 1149 return test_result 1150 1151 1152def set_bluetooth_codec( 1153 android_device, 1154 codec_type, 1155 sample_rate, 1156 bits_per_sample, 1157 channel_mode, 1158 codec_specific_1=0): 1159 """Sets the A2DP codec configuration on the AndroidDevice. 1160 1161 Args: 1162 android_device (acts.controllers.android_device.AndroidDevice): the 1163 android device for which to switch the codec. 1164 codec_type (str): the desired codec type. Must be a key in 1165 bt_constants.codec_types. 1166 sample_rate (str): the desired sample rate. Must be a key in 1167 bt_constants.sample_rates. 1168 bits_per_sample (str): the desired bits per sample. Must be a key in 1169 bt_constants.bits_per_samples. 1170 channel_mode (str): the desired channel mode. Must be a key in 1171 bt_constants.channel_modes. 1172 codec_specific_1 (int): the desired bit rate (quality) for LDAC codec. 1173 Returns: 1174 bool: True if the codec config was successfully changed to the desired 1175 values. Else False. 1176 """ 1177 message = ( 1178 "Set Android Device A2DP Bluetooth codec configuration:\n" 1179 "\tCodec: {codec_type}\n" 1180 "\tSample Rate: {sample_rate}\n" 1181 "\tBits per Sample: {bits_per_sample}\n" 1182 "\tChannel Mode: {channel_mode}".format( 1183 codec_type=codec_type, 1184 sample_rate=sample_rate, 1185 bits_per_sample=bits_per_sample, 1186 channel_mode=channel_mode 1187 ) 1188 ) 1189 android_device.log.info(message) 1190 1191 # Send SL4A command 1192 droid, ed = android_device.droid, android_device.ed 1193 if not droid.bluetoothA2dpSetCodecConfigPreference( 1194 codec_types[codec_type], 1195 sample_rates[str(sample_rate)], 1196 bits_per_samples[str(bits_per_sample)], 1197 channel_modes[channel_mode], 1198 codec_specific_1 1199 ): 1200 android_device.log.warning("SL4A command returned False. Codec was not " 1201 "changed.") 1202 else: 1203 try: 1204 ed.pop_event(bluetooth_a2dp_codec_config_changed, 1205 bt_default_timeout) 1206 except Exception: 1207 android_device.log.warning("SL4A event not registered. Codec " 1208 "may not have been changed.") 1209 1210 # Validate codec value through ADB 1211 # TODO (aidanhb): validate codec more robustly using SL4A 1212 command = "dumpsys bluetooth_manager | grep -i 'current codec'" 1213 out = android_device.adb.shell(command) 1214 split_out = out.split(": ") 1215 if len(split_out) != 2: 1216 android_device.log.warning("Could not verify codec config change " 1217 "through ADB.") 1218 elif split_out[1].strip().upper() != codec_type: 1219 android_device.log.error( 1220 "Codec config was not changed.\n" 1221 "\tExpected codec: {exp}\n" 1222 "\tActual codec: {act}".format( 1223 exp=codec_type, 1224 act=split_out[1].strip() 1225 ) 1226 ) 1227 return False 1228 android_device.log.info("Bluetooth codec successfully changed.") 1229 return True 1230 1231 1232def set_bt_scan_mode(ad, scan_mode_value): 1233 """Set Android device's Bluetooth scan mode. 1234 1235 Args: 1236 ad: The Android device to set the scan mode on. 1237 scan_mode_value: The value to set the scan mode to. 1238 1239 Returns: 1240 True if successful, false if unsuccessful. 1241 """ 1242 droid, ed = ad.droid, ad.ed 1243 if scan_mode_value == bt_scan_mode_types['state_off']: 1244 disable_bluetooth(droid) 1245 scan_mode = droid.bluetoothGetScanMode() 1246 reset_bluetooth([ad]) 1247 if scan_mode != scan_mode_value: 1248 return False 1249 elif scan_mode_value == bt_scan_mode_types['none']: 1250 droid.bluetoothMakeUndiscoverable() 1251 scan_mode = droid.bluetoothGetScanMode() 1252 if scan_mode != scan_mode_value: 1253 return False 1254 elif scan_mode_value == bt_scan_mode_types['connectable']: 1255 droid.bluetoothMakeUndiscoverable() 1256 droid.bluetoothMakeConnectable() 1257 scan_mode = droid.bluetoothGetScanMode() 1258 if scan_mode != scan_mode_value: 1259 return False 1260 elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']): 1261 droid.bluetoothMakeDiscoverable() 1262 scan_mode = droid.bluetoothGetScanMode() 1263 if scan_mode != scan_mode_value: 1264 return False 1265 else: 1266 # invalid scan mode 1267 return False 1268 return True 1269 1270 1271def set_device_name(droid, name): 1272 """Set and check Bluetooth local name on input droid object. 1273 1274 Args: 1275 droid: Droid object to set local name on. 1276 name: the Bluetooth local name to set. 1277 1278 Returns: 1279 True if successful, false if unsuccessful. 1280 """ 1281 droid.bluetoothSetLocalName(name) 1282 time.sleep(2) 1283 droid_name = droid.bluetoothGetLocalName() 1284 if droid_name != name: 1285 return False 1286 return True 1287 1288 1289def set_profile_priority(host_ad, client_ad, profiles, priority): 1290 """Sets the priority of said profile(s) on host_ad for client_ad""" 1291 for profile in profiles: 1292 host_ad.log.info("Profile {} on {} for {} set to priority {}".format( 1293 profile, host_ad.droid.bluetoothGetLocalName(), 1294 client_ad.droid.bluetoothGetLocalAddress(), priority.value)) 1295 if bt_profile_constants['a2dp_sink'] == profile: 1296 host_ad.droid.bluetoothA2dpSinkSetPriority( 1297 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1298 elif bt_profile_constants['headset_client'] == profile: 1299 host_ad.droid.bluetoothHfpClientSetPriority( 1300 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1301 elif bt_profile_constants['pbap_client'] == profile: 1302 host_ad.droid.bluetoothPbapClientSetPriority( 1303 client_ad.droid.bluetoothGetLocalAddress(), priority.value) 1304 else: 1305 host_ad.log.error( 1306 "Profile {} not yet supported for priority settings".format( 1307 profile)) 1308 1309 1310def setup_multiple_devices_for_bt_test(android_devices): 1311 """A common setup routine for Bluetooth on input Android device list. 1312 1313 Things this function sets up: 1314 1. Resets Bluetooth 1315 2. Set Bluetooth local name to random string of size 4 1316 3. Disable BLE background scanning. 1317 4. Enable Bluetooth snoop logging. 1318 1319 Args: 1320 android_devices: Android device list to setup Bluetooth on. 1321 1322 Returns: 1323 True if successful, false if unsuccessful. 1324 """ 1325 log.info("Setting up Android Devices") 1326 # TODO: Temp fix for an selinux error. 1327 for ad in android_devices: 1328 ad.adb.shell("setenforce 0") 1329 threads = [] 1330 try: 1331 for a in android_devices: 1332 thread = threading.Thread( 1333 target=factory_reset_bluetooth, args=([[a]])) 1334 threads.append(thread) 1335 thread.start() 1336 for t in threads: 1337 t.join() 1338 1339 for a in android_devices: 1340 d = a.droid 1341 # TODO: Create specific RPC command to instantiate 1342 # BluetoothConnectionFacade. This is just a workaround. 1343 d.bluetoothStartConnectionStateChangeMonitor("") 1344 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 1345 if not setup_result: 1346 a.log.error("Failed to set device name.") 1347 return setup_result 1348 d.bluetoothDisableBLE() 1349 utils.set_location_service(a, True) 1350 bonded_devices = d.bluetoothGetBondedDevices() 1351 for b in bonded_devices: 1352 a.log.info("Removing bond for device {}".format(b['address'])) 1353 d.bluetoothUnbond(b['address']) 1354 for a in android_devices: 1355 a.adb.shell("setprop persist.bluetooth.btsnoopenable true") 1356 getprop_result = bool( 1357 a.adb.shell("getprop persist.bluetooth.btsnoopenable")) 1358 if not getprop_result: 1359 a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.") 1360 except Exception as err: 1361 log.error("Something went wrong in multi device setup: {}".format(err)) 1362 return False 1363 return setup_result 1364 1365 1366def setup_n_advertisements(adv_ad, num_advertisements): 1367 """Setup input number of advertisements on input Android device. 1368 1369 Args: 1370 adv_ad: The Android device to start LE advertisements on. 1371 num_advertisements: The number of advertisements to start. 1372 1373 Returns: 1374 advertise_callback_list: List of advertisement callback ids. 1375 """ 1376 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 1377 ble_advertise_settings_modes['low_latency']) 1378 advertise_data = adv_ad.droid.bleBuildAdvertiseData() 1379 advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings() 1380 advertise_callback_list = [] 1381 for i in range(num_advertisements): 1382 advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback() 1383 advertise_callback_list.append(advertise_callback) 1384 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 1385 advertise_settings) 1386 try: 1387 adv_ad.ed.pop_event( 1388 adv_succ.format(advertise_callback), bt_default_timeout) 1389 adv_ad.log.info("Advertisement {} started.".format(i + 1)) 1390 except Empty as error: 1391 adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) 1392 raise BtTestUtilsError( 1393 "Test failed with Empty error: {}".format(error)) 1394 return advertise_callback_list 1395 1396 1397def take_btsnoop_log(ad, testcase, testname): 1398 """Grabs the btsnoop_hci log on a device and stores it in the log directory 1399 of the test class. 1400 1401 If you want grab the btsnoop_hci log, call this function with android_device 1402 objects in on_fail. Bug report takes a relative long time to take, so use 1403 this cautiously. 1404 1405 Args: 1406 ad: The android_device instance to take bugreport on. 1407 testcase: Name of the test calss that triggered this snoop log. 1408 testname: Name of the test case that triggered this bug report. 1409 """ 1410 testname = "".join(x for x in testname if x.isalnum()) 1411 serial = ad.serial 1412 device_model = ad.droid.getBuildModel() 1413 device_model = device_model.replace(" ", "") 1414 out_name = ','.join((testname, device_model, serial)) 1415 snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs') 1416 utils.create_dir(snoop_path) 1417 cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, 1418 " ", snoop_path + '/' + out_name, ".btsnoop_hci.log")) 1419 exe_cmd(cmd) 1420 try: 1421 cmd = ''.join( 1422 ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ", 1423 snoop_path + '/' + out_name, ".btsnoop_hci.log.last")) 1424 exe_cmd(cmd) 1425 except Exception as err: 1426 testcase.log.info( 1427 "File does not exist {}".format(btsnoop_last_log_path_on_device)) 1428 1429 1430def take_btsnoop_logs(android_devices, testcase, testname): 1431 """Pull btsnoop logs from an input list of android devices. 1432 1433 Args: 1434 android_devices: the list of Android devices to pull btsnoop logs from. 1435 testcase: Name of the test calss that triggered this snoop log. 1436 testname: Name of the test case that triggered this bug report. 1437 """ 1438 for a in android_devices: 1439 take_btsnoop_log(a, testcase, testname) 1440 1441 1442def teardown_n_advertisements(adv_ad, num_advertisements, 1443 advertise_callback_list): 1444 """Stop input number of advertisements on input Android device. 1445 1446 Args: 1447 adv_ad: The Android device to stop LE advertisements on. 1448 num_advertisements: The number of advertisements to stop. 1449 advertise_callback_list: The list of advertisement callbacks to stop. 1450 1451 Returns: 1452 True if successful, false if unsuccessful. 1453 """ 1454 for n in range(num_advertisements): 1455 adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n]) 1456 return True 1457 1458 1459def verify_server_and_client_connected(client_ad, server_ad, log=True): 1460 """Verify that input server and client Android devices are connected. 1461 1462 This code is under the assumption that there will only be 1463 a single connection. 1464 1465 Args: 1466 client_ad: the Android device to check number of active connections. 1467 server_ad: the Android device to check number of active connections. 1468 1469 Returns: 1470 True both server and client have at least 1 active connection, 1471 false if unsuccessful. 1472 """ 1473 test_result = True 1474 if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1475 if log: 1476 server_ad.log.error("No socket connections found on server.") 1477 test_result = False 1478 if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0: 1479 if log: 1480 client_ad.log.error("No socket connections found on client.") 1481 test_result = False 1482 return test_result 1483 1484 1485def wait_for_bluetooth_manager_state(droid, 1486 state=None, 1487 timeout=10, 1488 threshold=5): 1489 """ Waits for BlueTooth normalized state or normalized explicit state 1490 args: 1491 droid: droid device object 1492 state: expected BlueTooth state 1493 timeout: max timeout threshold 1494 threshold: list len of bt state 1495 Returns: 1496 True if successful, false if unsuccessful. 1497 """ 1498 all_states = [] 1499 get_state = lambda: droid.bluetoothGetLeState() 1500 start_time = time.time() 1501 while time.time() < start_time + timeout: 1502 all_states.append(get_state()) 1503 if len(all_states) >= threshold: 1504 # for any normalized state 1505 if state is None: 1506 if len(set(all_states[-threshold:])) == 1: 1507 log.info("State normalized {}".format( 1508 set(all_states[-threshold:]))) 1509 return True 1510 else: 1511 # explicit check against normalized state 1512 if set([state]).issubset(all_states[-threshold:]): 1513 return True 1514 time.sleep(0.5) 1515 log.error( 1516 "Bluetooth state fails to normalize" if state is None else 1517 "Failed to match bluetooth state, current state {} expected state {}". 1518 format(get_state(), state)) 1519 return False 1520 1521 1522def _wait_for_passkey_match(pri_ad, sec_ad): 1523 pri_pin, sec_pin = -1, 1 1524 pri_variant, sec_variant = -1, 1 1525 pri_pairing_req, sec_pairing_req = None, None 1526 try: 1527 pri_pairing_req = pri_ad.ed.pop_event( 1528 event_name="BluetoothActionPairingRequest", 1529 timeout=bt_default_timeout) 1530 pri_variant = pri_pairing_req["data"]["PairingVariant"] 1531 pri_pin = pri_pairing_req["data"]["Pin"] 1532 pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format( 1533 pri_pin, pri_variant)) 1534 sec_pairing_req = sec_ad.ed.pop_event( 1535 event_name="BluetoothActionPairingRequest", 1536 timeout=bt_default_timeout) 1537 sec_variant = sec_pairing_req["data"]["PairingVariant"] 1538 sec_pin = sec_pairing_req["data"]["Pin"] 1539 sec_ad.log.info("Secondary device received Pin: {}, Variant: {}" 1540 .format(sec_pin, sec_variant)) 1541 except Empty as err: 1542 log.error("Wait for pin error: {}".format(err)) 1543 log.error("Pairing request state, Primary: {}, Secondary: {}".format( 1544 pri_pairing_req, sec_pairing_req)) 1545 return False 1546 if pri_variant == sec_variant == pairing_variant_passkey_confirmation: 1547 confirmation = pri_pin == sec_pin 1548 if confirmation: 1549 log.info("Pairing code matched, accepting connection") 1550 else: 1551 log.info("Pairing code mismatched, rejecting connection") 1552 pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1553 str(confirmation)) 1554 sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm", 1555 str(confirmation)) 1556 if not confirmation: 1557 return False 1558 elif pri_variant != sec_variant: 1559 log.error("Pairing variant mismatched, abort connection") 1560 return False 1561 return True 1562 1563 1564def write_read_verify_data(client_ad, server_ad, msg, binary=False): 1565 """Verify that the client wrote data to the server Android device correctly. 1566 1567 Args: 1568 client_ad: the Android device to perform the write. 1569 server_ad: the Android device to read the data written. 1570 msg: the message to write. 1571 binary: if the msg arg is binary or not. 1572 1573 Returns: 1574 True if the data written matches the data read, false if not. 1575 """ 1576 client_ad.log.info("Write message.") 1577 try: 1578 if binary: 1579 client_ad.droid.bluetoothSocketConnWriteBinary(msg) 1580 else: 1581 client_ad.droid.bluetoothSocketConnWrite(msg) 1582 except Exception as err: 1583 client_ad.log.error("Failed to write data: {}".format(err)) 1584 return False 1585 server_ad.log.info("Read message.") 1586 try: 1587 if binary: 1588 read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip( 1589 "\r\n") 1590 else: 1591 read_msg = server_ad.droid.bluetoothSocketConnRead() 1592 except Exception as err: 1593 server_ad.log.error("Failed to read data: {}".format(err)) 1594 return False 1595 log.info("Verify message.") 1596 if msg != read_msg: 1597 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 1598 return False 1599 return True 1600 1601