1#/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import random 18import pprint 19import string 20from queue import Empty 21import threading 22import time 23from acts import utils 24 25from contextlib2 import suppress 26from subprocess import call 27 28from acts.logger import LoggerProxy 29from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseMode 30from acts.test_utils.bt.BleEnum import ScanSettingsCallbackType 31from acts.test_utils.bt.BleEnum import ScanSettingsMatchMode 32from acts.test_utils.bt.BleEnum import ScanSettingsMatchNum 33from acts.test_utils.bt.BleEnum import ScanSettingsScanResultType 34from acts.test_utils.bt.BleEnum import ScanSettingsScanMode 35from acts.test_utils.bt.BleEnum import ScanSettingsReportDelaySeconds 36from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseType 37from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseTxPower 38from acts.test_utils.bt.BleEnum import ScanSettingsMatchNum 39from acts.test_utils.bt.BleEnum import ScanSettingsScanResultType 40from acts.test_utils.bt.BleEnum import ScanSettingsScanMode 41from acts.test_utils.bt.BtEnum import BluetoothScanModeType 42from acts.test_utils.bt.BtEnum import RfcommUuid 43from acts.utils import exe_cmd 44 45default_timeout = 15 46# bt discovery timeout 47default_discovery_timeout = 3 48log = LoggerProxy() 49 50# Callback strings 51scan_result = "BleScan{}onScanResults" 52scan_failed = "BleScan{}onScanFailed" 53batch_scan_result = "BleScan{}onBatchScanResult" 54adv_fail = "BleAdvertise{}onFailure" 55adv_succ = "BleAdvertise{}onSuccess" 56bluetooth_off = "BluetoothStateChangedOff" 57bluetooth_on = "BluetoothStateChangedOn" 58 59# rfcomm test uuids 60rfcomm_secure_uuid = "fa87c0d0-afac-11de-8a39-0800200c9a66" 61rfcomm_insecure_uuid = "8ce255c0-200a-11e0-ac64-0800200c9a66" 62 63advertisements_to_devices = {} 64 65batch_scan_not_supported_list = ["Nexus 4", "Nexus 5", "Nexus 7", ] 66 67 68class BtTestUtilsError(Exception): 69 pass 70 71 72def generate_ble_scan_objects(droid): 73 filter_list = droid.bleGenFilterList() 74 scan_settings = droid.bleBuildScanSetting() 75 scan_callback = droid.bleGenScanCallback() 76 return filter_list, scan_settings, scan_callback 77 78 79def generate_ble_advertise_objects(droid): 80 advertise_callback = droid.bleGenBleAdvertiseCallback() 81 advertise_data = droid.bleBuildAdvertiseData() 82 advertise_settings = droid.bleBuildAdvertiseSettings() 83 return advertise_callback, advertise_data, advertise_settings 84 85 86def extract_string_from_byte_array(string_list): 87 """Extract the string from array of string list 88 """ 89 start = 1 90 end = len(string_list) - 1 91 extract_string = string_list[start:end] 92 return extract_string 93 94 95def extract_uuidlist_from_record(uuid_string_list): 96 """Extract uuid from Service UUID List 97 """ 98 start = 1 99 end = len(uuid_string_list) - 1 100 uuid_length = 36 101 uuidlist = [] 102 while start < end: 103 uuid = uuid_string_list[start:(start + uuid_length)] 104 start += uuid_length + 1 105 uuidlist.append(uuid) 106 return uuidlist 107 108 109def build_advertise_settings(droid, mode, txpower, type): 110 """Build Advertise Settings 111 """ 112 droid.bleSetAdvertiseSettingsAdvertiseMode(mode) 113 droid.bleSetAdvertiseSettingsTxPowerLevel(txpower) 114 droid.bleSetAdvertiseSettingsIsConnectable(type) 115 settings = droid.bleBuildAdvertiseSettings() 116 return settings 117 118 119def setup_multiple_devices_for_bt_test(android_devices): 120 log.info("Setting up Android Devices") 121 # TODO: Temp fix for an selinux error. 122 for ad in android_devices: 123 ad.adb.shell("setenforce 0") 124 threads = [] 125 try: 126 for a in android_devices: 127 thread = threading.Thread(target=reset_bluetooth, args=([[a]])) 128 threads.append(thread) 129 thread.start() 130 for t in threads: 131 t.join() 132 133 for a in android_devices: 134 d = a.droid 135 setup_result = d.bluetoothSetLocalName(generate_id_by_size(4)) 136 if not setup_result: 137 log.error("Failed to set device name.") 138 return setup_result 139 d.bluetoothDisableBLE() 140 bonded_devices = d.bluetoothGetBondedDevices() 141 for b in bonded_devices: 142 d.bluetoothUnbond(b['address']) 143 for a in android_devices: 144 setup_result = a.droid.bluetoothConfigHciSnoopLog(True) 145 if not setup_result: 146 log.error("Failed to enable Bluetooth Hci Snoop Logging.") 147 return setup_result 148 except Exception as err: 149 log.error("Something went wrong in multi device setup: {}".format(err)) 150 return False 151 return setup_result 152 153 154def bluetooth_enabled_check(ad): 155 if not ad.droid.bluetoothCheckState(): 156 ad.droid.bluetoothToggleState(True) 157 expected_bluetooth_on_event_name = bluetooth_on 158 try: 159 ad.ed.pop_event(expected_bluetooth_on_event_name, 160 default_timeout) 161 except Empty: 162 log.info("Failed to toggle Bluetooth on (no broadcast received).") 163 # Try one more time to poke at the actual state. 164 if ad.droid.bluetoothCheckState(): 165 log.info(".. actual state is ON") 166 return True 167 log.error(".. actual state is OFF") 168 return False 169 return True 170 171 172def reset_bluetooth(android_devices): 173 """Resets bluetooth on the list of android devices passed into the function. 174 :param android_devices: list of android devices 175 :return: bool 176 """ 177 for a in android_devices: 178 droid, ed = a.droid, a.ed 179 log.info("Reset state of bluetooth on device: {}".format( 180 droid.getBuildSerial())) 181 if droid.bluetoothCheckState() is True: 182 droid.bluetoothToggleState(False) 183 expected_bluetooth_off_event_name = bluetooth_off 184 try: 185 ed.pop_event(expected_bluetooth_off_event_name, 186 default_timeout) 187 except Exception: 188 log.error("Failed to toggle Bluetooth off.") 189 return False 190 # temp sleep for b/17723234 191 time.sleep(3) 192 if not bluetooth_enabled_check(a): 193 return False 194 return True 195 196 197def determine_max_advertisements(android_device): 198 log.info("Determining number of maximum concurrent advertisements...") 199 advertisement_count = 0 200 bt_enabled = False 201 if not android_device.droid.bluetoothCheckState(): 202 android_device.droid.bluetoothToggleState(True) 203 try: 204 android_device.ed.pop_event(expected_bluetooth_on_event_name, 205 default_timeout) 206 except Exception: 207 log.info("Failed to toggle Bluetooth on (no broadcast received).") 208 # Try one more time to poke at the actual state. 209 if android_device.droid.bluetoothCheckState() is True: 210 log.info(".. actual state is ON") 211 else: 212 log.error( 213 "Failed to turn Bluetooth on. Setting default advertisements to 1") 214 advertisement_count = -1 215 return advertisement_count 216 advertise_callback_list = [] 217 advertise_data = android_device.droid.bleBuildAdvertiseData() 218 advertise_settings = android_device.droid.bleBuildAdvertiseSettings() 219 while (True): 220 advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() 221 advertise_callback_list.append(advertise_callback) 222 223 android_device.droid.bleStartBleAdvertising( 224 advertise_callback, advertise_data, advertise_settings) 225 try: 226 android_device.ed.pop_event( 227 adv_succ.format(advertise_callback), default_timeout) 228 log.info("Advertisement {} started.".format(advertisement_count + 229 1)) 230 advertisement_count += 1 231 except Exception as err: 232 log.info( 233 "Advertisement failed to start. Reached max advertisements at {}".format( 234 advertisement_count)) 235 break 236 with suppress(Exception): 237 for adv in advertise_callback_list: 238 android_device.droid.bleStopBleAdvertising(adv) 239 return advertisement_count 240 241 242def get_advanced_droid_list(android_devices): 243 droid_list = [] 244 for a in android_devices: 245 d, e = a.droid, a.ed 246 model = d.getBuildModel() 247 max_advertisements = 1 248 batch_scan_supported = True 249 if model in advertisements_to_devices.keys(): 250 max_advertisements = advertisements_to_devices[model] 251 else: 252 max_advertisements = determine_max_advertisements(a) 253 max_tries = 3 254 #Retry to calculate max advertisements 255 while max_advertisements == -1 and max_tries > 0: 256 log.info( 257 "Attempts left to determine max advertisements: {}".format( 258 max_tries)) 259 max_advertisements = determine_max_advertisements(a) 260 max_tries -= 1 261 advertisements_to_devices[model] = max_advertisements 262 if model in batch_scan_not_supported_list: 263 batch_scan_supported = False 264 role = { 265 'droid': d, 266 'ed': e, 267 'max_advertisements': max_advertisements, 268 'batch_scan_supported': batch_scan_supported 269 } 270 droid_list.append(role) 271 return droid_list 272 273 274def generate_id_by_size( 275 size, 276 chars=( 277 string.ascii_lowercase + string.ascii_uppercase + string.digits)): 278 return ''.join(random.choice(chars) for _ in range(size)) 279 280 281def cleanup_scanners_and_advertisers(scn_android_device, scan_callback_list, 282 adv_android_device, adv_callback_list): 283 """ 284 Try to gracefully stop all scanning and advertising instances. 285 """ 286 scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed 287 adv_droid = adv_android_device.droid 288 try: 289 for scan_callback in scan_callback_list: 290 scan_droid.bleStopBleScan(scan_callback) 291 except Exception as err: 292 log.debug( 293 "Failed to stop LE scan... reseting Bluetooth. Error {}".format( 294 err)) 295 reset_bluetooth([scn_android_device]) 296 try: 297 for adv_callback in adv_callback_list: 298 adv_droid.bleStopBleAdvertising(adv_callback) 299 except Exception as err: 300 log.debug( 301 "Failed to stop LE advertisement... reseting Bluetooth. Error {}".format( 302 err)) 303 reset_bluetooth([adv_android_device]) 304 305 306def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): 307 adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 308 adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 309 AdvertiseSettingsAdvertiseMode.ADVERTISE_MODE_LOW_LATENCY.value) 310 adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True) 311 adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel( 312 AdvertiseSettingsAdvertiseTxPower.ADVERTISE_TX_POWER_HIGH.value) 313 advertise_callback, advertise_data, advertise_settings = ( 314 generate_ble_advertise_objects(adv_ad.droid)) 315 adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 316 advertise_settings) 317 try: 318 adv_ad.ed.pop_event( 319 "BleAdvertise{}onSuccess".format(advertise_callback), 320 default_timeout) 321 except Empty as err: 322 raise BtTestUtilsError( 323 "Advertiser did not start successfully {}".format(err)) 324 filter_list = scan_ad.droid.bleGenFilterList() 325 scan_settings = scan_ad.droid.bleBuildScanSetting() 326 scan_callback = scan_ad.droid.bleGenScanCallback() 327 scan_ad.droid.bleSetScanFilterDeviceName( 328 adv_ad.droid.bluetoothGetLocalName()) 329 scan_ad.droid.bleBuildScanFilter(filter_list) 330 scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) 331 try: 332 event = scan_ad.ed.pop_event( 333 "BleScan{}onScanResults".format(scan_callback), default_timeout) 334 except Empty as err: 335 raise BtTestUtilsError("Scanner did not find advertisement {}".format( 336 err)) 337 mac_address = event['data']['Result']['deviceInfo']['address'] 338 scan_ad.droid.bleStopBleScan(scan_callback) 339 return mac_address, advertise_callback 340 341 342def get_device_local_info(droid): 343 local_info_dict = {} 344 local_info_dict['name'] = droid.bluetoothGetLocalName() 345 local_info_dict['uuids'] = droid.bluetoothGetLocalUuids() 346 return local_info_dict 347 348 349def enable_bluetooth(droid, ed): 350 if droid.bluetoothCheckState() is False: 351 droid.bluetoothToggleState(True) 352 if droid.bluetoothCheckState() is False: 353 return False 354 return True 355 356 357def disable_bluetooth(droid, ed): 358 if droid.bluetoothCheckState() is True: 359 droid.bluetoothToggleState(False) 360 if droid.bluetoothCheckState() is True: 361 return False 362 return True 363 364 365def set_bt_scan_mode(ad, scan_mode_value): 366 droid, ed = ad.droid, ad.ed 367 if scan_mode_value == BluetoothScanModeType.STATE_OFF.value: 368 disable_bluetooth(droid, ed) 369 scan_mode = droid.bluetoothGetScanMode() 370 reset_bluetooth([ad]) 371 if scan_mode != scan_mode_value: 372 return False 373 elif scan_mode_value == BluetoothScanModeType.SCAN_MODE_NONE.value: 374 droid.bluetoothMakeUndiscoverable() 375 scan_mode = droid.bluetoothGetScanMode() 376 if scan_mode != scan_mode_value: 377 return False 378 elif scan_mode_value == BluetoothScanModeType.SCAN_MODE_CONNECTABLE.value: 379 droid.bluetoothMakeUndiscoverable() 380 droid.bluetoothMakeConnectable() 381 scan_mode = droid.bluetoothGetScanMode() 382 if scan_mode != scan_mode_value: 383 return False 384 elif (scan_mode_value == 385 BluetoothScanModeType.SCAN_MODE_CONNECTABLE_DISCOVERABLE.value): 386 droid.bluetoothMakeDiscoverable() 387 scan_mode = droid.bluetoothGetScanMode() 388 if scan_mode != scan_mode_value: 389 return False 390 else: 391 # invalid scan mode 392 return False 393 return True 394 395 396def set_device_name(droid, name): 397 droid.bluetoothSetLocalName(name) 398 time.sleep(2) 399 droid_name = droid.bluetoothGetLocalName() 400 if droid_name != name: 401 return False 402 return True 403 404 405def check_device_supported_profiles(droid): 406 profile_dict = {} 407 profile_dict['hid'] = droid.bluetoothHidIsReady() 408 profile_dict['hsp'] = droid.bluetoothHspIsReady() 409 profile_dict['a2dp'] = droid.bluetoothA2dpIsReady() 410 profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady() 411 return profile_dict 412 413 414def log_energy_info(droids, state): 415 return_string = "{} Energy info collection:\n".format(state) 416 for d in droids: 417 with suppress(Exception): 418 if (d.getBuildModel() != "Nexus 5" or 419 d.getBuildModel() != "Nexus 4"): 420 421 description = ("Device: {}\tEnergyStatus: {}\n".format( 422 d.getBuildSerial(), 423 d.bluetoothGetControllerActivityEnergyInfo(1))) 424 return_string = return_string + description 425 return return_string 426 427 428def pair_pri_to_sec(pri_droid, sec_droid): 429 # Enable discovery on sec_droid so that pri_droid can find it. 430 # The timeout here is based on how much time it would take for two devices 431 # to pair with each other once pri_droid starts seeing devices. 432 log.info( 433 "Bonding device {} to {}".format(pri_droid.bluetoothGetLocalAddress(), 434 sec_droid.bluetoothGetLocalAddress())) 435 sec_droid.bluetoothMakeDiscoverable(default_timeout) 436 target_address = sec_droid.bluetoothGetLocalAddress() 437 log.debug("Starting paring helper on each device") 438 pri_droid.bluetoothStartPairingHelper() 439 sec_droid.bluetoothStartPairingHelper() 440 log.info("Primary device starting discovery and executing bond") 441 result = pri_droid.bluetoothDiscoverAndBond(target_address) 442 # Loop until we have bonded successfully or timeout. 443 end_time = time.time() + default_timeout 444 log.info("Verifying devices are bonded") 445 while time.time() < end_time: 446 bonded_devices = pri_droid.bluetoothGetBondedDevices() 447 bonded = False 448 for d in bonded_devices: 449 if d['address'] == target_address: 450 log.info("Successfully bonded to device") 451 return True 452 time.sleep(1) 453 # Timed out trying to bond. 454 log.debug("Failed to bond devices.") 455 return False 456 457 458def connect_pri_to_sec(log, pri_droid, sec_droid, profiles_set): 459 """Connects pri droid to secondary droid. 460 461 Args: 462 pri_droid: Droid initiating connection 463 sec_droid: Droid accepting connection 464 profiles_set: Set of profiles to be connected 465 466 Returns: 467 Pass if True 468 Fail if False 469 """ 470 # Check if we support all profiles. 471 supported_profiles = [i.value for i in BluetoothProfile] 472 for profile in profiles_set: 473 if profile not in supported_profiles: 474 log.info("Profile {} is not supported list {}".format( 475 profile, supported_profiles)) 476 return False 477 478 # First check that devices are bonded. 479 paired = False 480 for paired_device in pri_droid.droid.bluetoothGetBondedDevices(): 481 if paired_device['address'] == \ 482 sec_droid.bluetoothGetLocalAddress(): 483 paired = True 484 break 485 486 if not paired: 487 log.info("{} not paired to {}".format(pri_droid.droid.getBuildSerial(), 488 sec_droid.getBuildSerial())) 489 return False 490 491 # Now try to connect them, the following call will try to initiate all 492 # connections. 493 pri_droid.droid.bluetoothConnectBonded(sec_droid.bluetoothGetLocalAddress( 494 )) 495 496 profile_connected = set() 497 log.info("Profiles to be connected {}".format(profiles_set)) 498 while not profile_connected.issuperset(profiles_set): 499 try: 500 profile_event = pri_droid.ed.pop_event( 501 bluetooth_profile_connection_state_changed, default_timeout) 502 log.info("Got event {}".format(profile_event)) 503 except Exception: 504 log.error("Did not get {} profiles left {}".format( 505 bluetooth_profile_connection_state_changed, profile_connected)) 506 return False 507 508 profile = profile_event['data']['profile'] 509 state = profile_event['data']['state'] 510 device_addr = profile_event['data']['addr'] 511 512 if state == BluetoothProfileState.STATE_CONNECTED.value and \ 513 device_addr == sec_droid.bluetoothGetLocalAddress(): 514 profile_connected.add(profile) 515 log.info("Profiles connected until now {}".format(profile_connected)) 516 # Failure happens inside the while loop. If we came here then we already 517 # connected. 518 return True 519 520 521def take_btsnoop_logs(android_devices, testcase, testname): 522 for a in android_devices: 523 take_btsnoop_log(a, testcase, testname) 524 525 526def take_btsnoop_log(ad, testcase, test_name): 527 """Grabs the btsnoop_hci log on a device and stores it in the log directory 528 of the test class. 529 530 If you want grab the btsnoop_hci log, call this function with android_device 531 objects in on_fail. Bug report takes a relative long time to take, so use 532 this cautiously. 533 534 Params: 535 test_name: Name of the test case that triggered this bug report. 536 android_device: The android_device instance to take bugreport on. 537 """ 538 test_name = "".join(x for x in test_name if x.isalnum()) 539 with suppress(Exception): 540 serial = ad.droid.getBuildSerial() 541 device_model = ad.droid.getBuildModel() 542 device_model = device_model.replace(" ", "") 543 out_name = ','.join((test_name, device_model, serial)) 544 snoop_path = ad.log_path + "/BluetoothSnoopLogs" 545 utils.create_dir(snoop_path) 546 cmd = ''.join(("adb -s ", serial, " pull /sdcard/btsnoop_hci.log ", 547 snoop_path + '/' + out_name, ".btsnoop_hci.log")) 548 testcase.log.info("Test failed, grabbing the bt_snoop logs on {} {}." 549 .format(device_model, serial)) 550 exe_cmd(cmd) 551 552 553def kill_bluetooth_process(ad): 554 log.info("Killing Bluetooth process.") 555 pid = ad.adb.shell( 556 "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii') 557 call(["adb -s " + ad.serial + " shell kill " + pid], shell=True) 558 559 560def rfcomm_connect(ad, device_address): 561 rf_client_ad = ad 562 log.debug("Performing RFCOMM connection to {}".format(device_address)) 563 try: 564 ad.droid.bluetoothRfcommConnect(device_address) 565 except Exception as err: 566 log.error("Failed to connect: {}".format(err)) 567 ad.droid.bluetoothRfcommCloseSocket() 568 return 569 finally: 570 return 571 return 572 573 574def rfcomm_accept(ad): 575 rf_server_ad = ad 576 log.debug("Performing RFCOMM accept") 577 try: 578 ad.droid.bluetoothRfcommAccept(RfcommUuid.DEFAULT_UUID.value, 579 default_timeout) 580 except Exception as err: 581 log.error("Failed to accept: {}".format(err)) 582 ad.droid.bluetoothRfcommCloseSocket() 583 return 584 finally: 585 return 586 return 587 588 589def write_read_verify_data(client_ad, server_ad, msg, binary=False): 590 log.info("Write message.") 591 try: 592 if binary: 593 client_ad.droid.bluetoothRfcommWriteBinary(msg) 594 else: 595 client_ad.droid.bluetoothRfcommWrite(msg) 596 except Exception as err: 597 log.error("Failed to write data: {}".format(err)) 598 return False 599 log.info("Read message.") 600 try: 601 if binary: 602 read_msg = server_ad.droid.bluetoothRfcommReadBinary().rstrip( 603 "\r\n") 604 else: 605 read_msg = server_ad.droid.bluetoothRfcommRead() 606 except Exception as err: 607 log.error("Failed to read data: {}".format(err)) 608 return False 609 log.info("Verify message.") 610 if msg != read_msg: 611 log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg)) 612 return False 613 return True 614