• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import random
18import pprint
19import string
20from queue import Empty
21import threading
22import time
23from acts import utils
24
25from contextlib2 import suppress
26from subprocess import call
27
28from acts.logger import LoggerProxy
29from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseMode
30from acts.test_utils.bt.BleEnum import ScanSettingsCallbackType
31from acts.test_utils.bt.BleEnum import ScanSettingsMatchMode
32from acts.test_utils.bt.BleEnum import ScanSettingsMatchNum
33from acts.test_utils.bt.BleEnum import ScanSettingsScanResultType
34from acts.test_utils.bt.BleEnum import ScanSettingsScanMode
35from acts.test_utils.bt.BleEnum import ScanSettingsReportDelaySeconds
36from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseType
37from acts.test_utils.bt.BleEnum import AdvertiseSettingsAdvertiseTxPower
38from acts.test_utils.bt.BleEnum import ScanSettingsMatchNum
39from acts.test_utils.bt.BleEnum import ScanSettingsScanResultType
40from acts.test_utils.bt.BleEnum import ScanSettingsScanMode
41from acts.test_utils.bt.BtEnum import BluetoothScanModeType
42from acts.test_utils.bt.BtEnum import RfcommUuid
43from acts.utils import exe_cmd
44
45default_timeout = 15
46# bt discovery timeout
47default_discovery_timeout = 3
48log = LoggerProxy()
49
50# Callback strings
51scan_result = "BleScan{}onScanResults"
52scan_failed = "BleScan{}onScanFailed"
53batch_scan_result = "BleScan{}onBatchScanResult"
54adv_fail = "BleAdvertise{}onFailure"
55adv_succ = "BleAdvertise{}onSuccess"
56bluetooth_off = "BluetoothStateChangedOff"
57bluetooth_on = "BluetoothStateChangedOn"
58
59# rfcomm test uuids
60rfcomm_secure_uuid = "fa87c0d0-afac-11de-8a39-0800200c9a66"
61rfcomm_insecure_uuid = "8ce255c0-200a-11e0-ac64-0800200c9a66"
62
63advertisements_to_devices = {}
64
65batch_scan_not_supported_list = ["Nexus 4", "Nexus 5", "Nexus 7", ]
66
67
68class BtTestUtilsError(Exception):
69    pass
70
71
72def generate_ble_scan_objects(droid):
73    filter_list = droid.bleGenFilterList()
74    scan_settings = droid.bleBuildScanSetting()
75    scan_callback = droid.bleGenScanCallback()
76    return filter_list, scan_settings, scan_callback
77
78
79def generate_ble_advertise_objects(droid):
80    advertise_callback = droid.bleGenBleAdvertiseCallback()
81    advertise_data = droid.bleBuildAdvertiseData()
82    advertise_settings = droid.bleBuildAdvertiseSettings()
83    return advertise_callback, advertise_data, advertise_settings
84
85
86def extract_string_from_byte_array(string_list):
87    """Extract the string from array of string list
88    """
89    start = 1
90    end = len(string_list) - 1
91    extract_string = string_list[start:end]
92    return extract_string
93
94
95def extract_uuidlist_from_record(uuid_string_list):
96    """Extract uuid from Service UUID List
97    """
98    start = 1
99    end = len(uuid_string_list) - 1
100    uuid_length = 36
101    uuidlist = []
102    while start < end:
103        uuid = uuid_string_list[start:(start + uuid_length)]
104        start += uuid_length + 1
105        uuidlist.append(uuid)
106    return uuidlist
107
108
109def build_advertise_settings(droid, mode, txpower, type):
110    """Build Advertise Settings
111    """
112    droid.bleSetAdvertiseSettingsAdvertiseMode(mode)
113    droid.bleSetAdvertiseSettingsTxPowerLevel(txpower)
114    droid.bleSetAdvertiseSettingsIsConnectable(type)
115    settings = droid.bleBuildAdvertiseSettings()
116    return settings
117
118
119def setup_multiple_devices_for_bt_test(android_devices):
120    log.info("Setting up Android Devices")
121    # TODO: Temp fix for an selinux error.
122    for ad in android_devices:
123        ad.adb.shell("setenforce 0")
124    threads = []
125    try:
126        for a in android_devices:
127            thread = threading.Thread(target=reset_bluetooth, args=([[a]]))
128            threads.append(thread)
129            thread.start()
130        for t in threads:
131            t.join()
132
133        for a in android_devices:
134            d = a.droid
135            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
136            if not setup_result:
137                log.error("Failed to set device name.")
138                return setup_result
139            d.bluetoothDisableBLE()
140            bonded_devices = d.bluetoothGetBondedDevices()
141            for b in bonded_devices:
142                d.bluetoothUnbond(b['address'])
143        for a in android_devices:
144            setup_result = a.droid.bluetoothConfigHciSnoopLog(True)
145            if not setup_result:
146                log.error("Failed to enable Bluetooth Hci Snoop Logging.")
147                return setup_result
148    except Exception as err:
149        log.error("Something went wrong in multi device setup: {}".format(err))
150        return False
151    return setup_result
152
153
154def bluetooth_enabled_check(ad):
155    if not ad.droid.bluetoothCheckState():
156        ad.droid.bluetoothToggleState(True)
157        expected_bluetooth_on_event_name = bluetooth_on
158        try:
159            ad.ed.pop_event(expected_bluetooth_on_event_name,
160                default_timeout)
161        except Empty:
162            log.info("Failed to toggle Bluetooth on (no broadcast received).")
163            # Try one more time to poke at the actual state.
164            if ad.droid.bluetoothCheckState():
165                log.info(".. actual state is ON")
166                return True
167            log.error(".. actual state is OFF")
168            return False
169    return True
170
171
172def reset_bluetooth(android_devices):
173    """Resets bluetooth on the list of android devices passed into the function.
174    :param android_devices: list of android devices
175    :return: bool
176    """
177    for a in android_devices:
178        droid, ed = a.droid, a.ed
179        log.info("Reset state of bluetooth on device: {}".format(
180            droid.getBuildSerial()))
181        if droid.bluetoothCheckState() is True:
182            droid.bluetoothToggleState(False)
183            expected_bluetooth_off_event_name = bluetooth_off
184            try:
185                ed.pop_event(expected_bluetooth_off_event_name,
186                             default_timeout)
187            except Exception:
188                log.error("Failed to toggle Bluetooth off.")
189                return False
190        # temp sleep for b/17723234
191        time.sleep(3)
192        if not bluetooth_enabled_check(a):
193            return False
194    return True
195
196
197def determine_max_advertisements(android_device):
198    log.info("Determining number of maximum concurrent advertisements...")
199    advertisement_count = 0
200    bt_enabled = False
201    if not android_device.droid.bluetoothCheckState():
202        android_device.droid.bluetoothToggleState(True)
203    try:
204        android_device.ed.pop_event(expected_bluetooth_on_event_name,
205                                    default_timeout)
206    except Exception:
207        log.info("Failed to toggle Bluetooth on (no broadcast received).")
208        # Try one more time to poke at the actual state.
209        if android_device.droid.bluetoothCheckState() is True:
210            log.info(".. actual state is ON")
211        else:
212            log.error(
213                "Failed to turn Bluetooth on. Setting default advertisements to 1")
214            advertisement_count = -1
215            return advertisement_count
216    advertise_callback_list = []
217    advertise_data = android_device.droid.bleBuildAdvertiseData()
218    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
219    while (True):
220        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
221        advertise_callback_list.append(advertise_callback)
222
223        android_device.droid.bleStartBleAdvertising(
224            advertise_callback, advertise_data, advertise_settings)
225        try:
226            android_device.ed.pop_event(
227                adv_succ.format(advertise_callback), default_timeout)
228            log.info("Advertisement {} started.".format(advertisement_count +
229                                                        1))
230            advertisement_count += 1
231        except Exception as err:
232            log.info(
233                "Advertisement failed to start. Reached max advertisements at {}".format(
234                    advertisement_count))
235            break
236    with suppress(Exception):
237        for adv in advertise_callback_list:
238            android_device.droid.bleStopBleAdvertising(adv)
239    return advertisement_count
240
241
242def get_advanced_droid_list(android_devices):
243    droid_list = []
244    for a in android_devices:
245        d, e = a.droid, a.ed
246        model = d.getBuildModel()
247        max_advertisements = 1
248        batch_scan_supported = True
249        if model in advertisements_to_devices.keys():
250            max_advertisements = advertisements_to_devices[model]
251        else:
252            max_advertisements = determine_max_advertisements(a)
253            max_tries = 3
254            #Retry to calculate max advertisements
255            while max_advertisements == -1 and max_tries > 0:
256                log.info(
257                    "Attempts left to determine max advertisements: {}".format(
258                        max_tries))
259                max_advertisements = determine_max_advertisements(a)
260                max_tries -= 1
261            advertisements_to_devices[model] = max_advertisements
262        if model in batch_scan_not_supported_list:
263            batch_scan_supported = False
264        role = {
265            'droid': d,
266            'ed': e,
267            'max_advertisements': max_advertisements,
268            'batch_scan_supported': batch_scan_supported
269        }
270        droid_list.append(role)
271    return droid_list
272
273
274def generate_id_by_size(
275        size,
276        chars=(
277            string.ascii_lowercase + string.ascii_uppercase + string.digits)):
278    return ''.join(random.choice(chars) for _ in range(size))
279
280
281def cleanup_scanners_and_advertisers(scn_android_device, scan_callback_list,
282                                     adv_android_device, adv_callback_list):
283    """
284    Try to gracefully stop all scanning and advertising instances.
285    """
286    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
287    adv_droid = adv_android_device.droid
288    try:
289        for scan_callback in scan_callback_list:
290            scan_droid.bleStopBleScan(scan_callback)
291    except Exception as err:
292        log.debug(
293            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
294                err))
295        reset_bluetooth([scn_android_device])
296    try:
297        for adv_callback in adv_callback_list:
298            adv_droid.bleStopBleAdvertising(adv_callback)
299    except Exception as err:
300        log.debug(
301            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".format(
302                err))
303        reset_bluetooth([adv_android_device])
304
305
306def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
307    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
308    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
309        AdvertiseSettingsAdvertiseMode.ADVERTISE_MODE_LOW_LATENCY.value)
310    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
311    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
312        AdvertiseSettingsAdvertiseTxPower.ADVERTISE_TX_POWER_HIGH.value)
313    advertise_callback, advertise_data, advertise_settings = (
314        generate_ble_advertise_objects(adv_ad.droid))
315    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
316                                        advertise_settings)
317    try:
318        adv_ad.ed.pop_event(
319            "BleAdvertise{}onSuccess".format(advertise_callback),
320            default_timeout)
321    except Empty as err:
322        raise BtTestUtilsError(
323            "Advertiser did not start successfully {}".format(err))
324    filter_list = scan_ad.droid.bleGenFilterList()
325    scan_settings = scan_ad.droid.bleBuildScanSetting()
326    scan_callback = scan_ad.droid.bleGenScanCallback()
327    scan_ad.droid.bleSetScanFilterDeviceName(
328        adv_ad.droid.bluetoothGetLocalName())
329    scan_ad.droid.bleBuildScanFilter(filter_list)
330    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
331    try:
332        event = scan_ad.ed.pop_event(
333            "BleScan{}onScanResults".format(scan_callback), default_timeout)
334    except Empty as err:
335        raise BtTestUtilsError("Scanner did not find advertisement {}".format(
336            err))
337    mac_address = event['data']['Result']['deviceInfo']['address']
338    scan_ad.droid.bleStopBleScan(scan_callback)
339    return mac_address, advertise_callback
340
341
342def get_device_local_info(droid):
343    local_info_dict = {}
344    local_info_dict['name'] = droid.bluetoothGetLocalName()
345    local_info_dict['uuids'] = droid.bluetoothGetLocalUuids()
346    return local_info_dict
347
348
349def enable_bluetooth(droid, ed):
350    if droid.bluetoothCheckState() is False:
351        droid.bluetoothToggleState(True)
352        if droid.bluetoothCheckState() is False:
353            return False
354    return True
355
356
357def disable_bluetooth(droid, ed):
358    if droid.bluetoothCheckState() is True:
359        droid.bluetoothToggleState(False)
360        if droid.bluetoothCheckState() is True:
361            return False
362    return True
363
364
365def set_bt_scan_mode(ad, scan_mode_value):
366    droid, ed = ad.droid, ad.ed
367    if scan_mode_value == BluetoothScanModeType.STATE_OFF.value:
368        disable_bluetooth(droid, ed)
369        scan_mode = droid.bluetoothGetScanMode()
370        reset_bluetooth([ad])
371        if scan_mode != scan_mode_value:
372            return False
373    elif scan_mode_value == BluetoothScanModeType.SCAN_MODE_NONE.value:
374        droid.bluetoothMakeUndiscoverable()
375        scan_mode = droid.bluetoothGetScanMode()
376        if scan_mode != scan_mode_value:
377            return False
378    elif scan_mode_value == BluetoothScanModeType.SCAN_MODE_CONNECTABLE.value:
379        droid.bluetoothMakeUndiscoverable()
380        droid.bluetoothMakeConnectable()
381        scan_mode = droid.bluetoothGetScanMode()
382        if scan_mode != scan_mode_value:
383            return False
384    elif (scan_mode_value ==
385          BluetoothScanModeType.SCAN_MODE_CONNECTABLE_DISCOVERABLE.value):
386        droid.bluetoothMakeDiscoverable()
387        scan_mode = droid.bluetoothGetScanMode()
388        if scan_mode != scan_mode_value:
389            return False
390    else:
391        # invalid scan mode
392        return False
393    return True
394
395
396def set_device_name(droid, name):
397    droid.bluetoothSetLocalName(name)
398    time.sleep(2)
399    droid_name = droid.bluetoothGetLocalName()
400    if droid_name != name:
401        return False
402    return True
403
404
405def check_device_supported_profiles(droid):
406    profile_dict = {}
407    profile_dict['hid'] = droid.bluetoothHidIsReady()
408    profile_dict['hsp'] = droid.bluetoothHspIsReady()
409    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
410    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
411    return profile_dict
412
413
414def log_energy_info(droids, state):
415    return_string = "{} Energy info collection:\n".format(state)
416    for d in droids:
417        with suppress(Exception):
418            if (d.getBuildModel() != "Nexus 5" or
419                    d.getBuildModel() != "Nexus 4"):
420
421                description = ("Device: {}\tEnergyStatus: {}\n".format(
422                    d.getBuildSerial(),
423                    d.bluetoothGetControllerActivityEnergyInfo(1)))
424                return_string = return_string + description
425    return return_string
426
427
428def pair_pri_to_sec(pri_droid, sec_droid):
429    # Enable discovery on sec_droid so that pri_droid can find it.
430    # The timeout here is based on how much time it would take for two devices
431    # to pair with each other once pri_droid starts seeing devices.
432    log.info(
433        "Bonding device {} to {}".format(pri_droid.bluetoothGetLocalAddress(),
434                                         sec_droid.bluetoothGetLocalAddress()))
435    sec_droid.bluetoothMakeDiscoverable(default_timeout)
436    target_address = sec_droid.bluetoothGetLocalAddress()
437    log.debug("Starting paring helper on each device")
438    pri_droid.bluetoothStartPairingHelper()
439    sec_droid.bluetoothStartPairingHelper()
440    log.info("Primary device starting discovery and executing bond")
441    result = pri_droid.bluetoothDiscoverAndBond(target_address)
442    # Loop until we have bonded successfully or timeout.
443    end_time = time.time() + default_timeout
444    log.info("Verifying devices are bonded")
445    while time.time() < end_time:
446        bonded_devices = pri_droid.bluetoothGetBondedDevices()
447        bonded = False
448        for d in bonded_devices:
449            if d['address'] == target_address:
450                log.info("Successfully bonded to device")
451                return True
452        time.sleep(1)
453    # Timed out trying to bond.
454    log.debug("Failed to bond devices.")
455    return False
456
457
458def connect_pri_to_sec(log, pri_droid, sec_droid, profiles_set):
459    """Connects pri droid to secondary droid.
460
461    Args:
462        pri_droid: Droid initiating connection
463        sec_droid: Droid accepting connection
464        profiles_set: Set of profiles to be connected
465
466    Returns:
467        Pass if True
468        Fail if False
469    """
470    # Check if we support all profiles.
471    supported_profiles = [i.value for i in BluetoothProfile]
472    for profile in profiles_set:
473        if profile not in supported_profiles:
474            log.info("Profile {} is not supported list {}".format(
475                profile, supported_profiles))
476            return False
477
478    # First check that devices are bonded.
479    paired = False
480    for paired_device in pri_droid.droid.bluetoothGetBondedDevices():
481        if paired_device['address'] == \
482            sec_droid.bluetoothGetLocalAddress():
483            paired = True
484            break
485
486    if not paired:
487        log.info("{} not paired to {}".format(pri_droid.droid.getBuildSerial(),
488                                              sec_droid.getBuildSerial()))
489        return False
490
491    # Now try to connect them, the following call will try to initiate all
492    # connections.
493    pri_droid.droid.bluetoothConnectBonded(sec_droid.bluetoothGetLocalAddress(
494    ))
495
496    profile_connected = set()
497    log.info("Profiles to be connected {}".format(profiles_set))
498    while not profile_connected.issuperset(profiles_set):
499        try:
500            profile_event = pri_droid.ed.pop_event(
501                bluetooth_profile_connection_state_changed, default_timeout)
502            log.info("Got event {}".format(profile_event))
503        except Exception:
504            log.error("Did not get {} profiles left {}".format(
505                bluetooth_profile_connection_state_changed, profile_connected))
506            return False
507
508        profile = profile_event['data']['profile']
509        state = profile_event['data']['state']
510        device_addr = profile_event['data']['addr']
511
512        if state == BluetoothProfileState.STATE_CONNECTED.value and \
513            device_addr == sec_droid.bluetoothGetLocalAddress():
514            profile_connected.add(profile)
515        log.info("Profiles connected until now {}".format(profile_connected))
516    # Failure happens inside the while loop. If we came here then we already
517    # connected.
518    return True
519
520
521def take_btsnoop_logs(android_devices, testcase, testname):
522    for a in android_devices:
523        take_btsnoop_log(a, testcase, testname)
524
525
526def take_btsnoop_log(ad, testcase, test_name):
527    """Grabs the btsnoop_hci log on a device and stores it in the log directory
528    of the test class.
529
530    If you want grab the btsnoop_hci log, call this function with android_device
531    objects in on_fail. Bug report takes a relative long time to take, so use
532    this cautiously.
533
534    Params:
535      test_name: Name of the test case that triggered this bug report.
536      android_device: The android_device instance to take bugreport on.
537    """
538    test_name = "".join(x for x in test_name if x.isalnum())
539    with suppress(Exception):
540        serial = ad.droid.getBuildSerial()
541        device_model = ad.droid.getBuildModel()
542        device_model = device_model.replace(" ", "")
543        out_name = ','.join((test_name, device_model, serial))
544        snoop_path = ad.log_path + "/BluetoothSnoopLogs"
545        utils.create_dir(snoop_path)
546        cmd = ''.join(("adb -s ", serial, " pull /sdcard/btsnoop_hci.log ",
547                       snoop_path + '/' + out_name, ".btsnoop_hci.log"))
548        testcase.log.info("Test failed, grabbing the bt_snoop logs on {} {}."
549                          .format(device_model, serial))
550        exe_cmd(cmd)
551
552
553def kill_bluetooth_process(ad):
554    log.info("Killing Bluetooth process.")
555    pid = ad.adb.shell(
556        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
557    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
558
559
560def rfcomm_connect(ad, device_address):
561    rf_client_ad = ad
562    log.debug("Performing RFCOMM connection to {}".format(device_address))
563    try:
564        ad.droid.bluetoothRfcommConnect(device_address)
565    except Exception as err:
566        log.error("Failed to connect: {}".format(err))
567        ad.droid.bluetoothRfcommCloseSocket()
568        return
569    finally:
570        return
571    return
572
573
574def rfcomm_accept(ad):
575    rf_server_ad = ad
576    log.debug("Performing RFCOMM accept")
577    try:
578        ad.droid.bluetoothRfcommAccept(RfcommUuid.DEFAULT_UUID.value,
579                                       default_timeout)
580    except Exception as err:
581        log.error("Failed to accept: {}".format(err))
582        ad.droid.bluetoothRfcommCloseSocket()
583        return
584    finally:
585        return
586    return
587
588
589def write_read_verify_data(client_ad, server_ad, msg, binary=False):
590    log.info("Write message.")
591    try:
592        if binary:
593            client_ad.droid.bluetoothRfcommWriteBinary(msg)
594        else:
595            client_ad.droid.bluetoothRfcommWrite(msg)
596    except Exception as err:
597        log.error("Failed to write data: {}".format(err))
598        return False
599    log.info("Read message.")
600    try:
601        if binary:
602            read_msg = server_ad.droid.bluetoothRfcommReadBinary().rstrip(
603                "\r\n")
604        else:
605            read_msg = server_ad.droid.bluetoothRfcommRead()
606    except Exception as err:
607        log.error("Failed to read data: {}".format(err))
608        return False
609    log.info("Verify message.")
610    if msg != read_msg:
611        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
612        return False
613    return True
614