• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import os
19import random
20import re
21import string
22import threading
23import time
24try:
25    import pandas as pd
26except ModuleNotFoundError:
27    pass
28from queue import Empty
29from subprocess import call
30from acts import asserts
31from acts_contrib.test_utils.bt.bt_constants import adv_fail
32from acts_contrib.test_utils.bt.bt_constants import adv_succ
33from acts_contrib.test_utils.bt.bt_constants import batch_scan_not_supported_list
34from acts_contrib.test_utils.bt.bt_constants import batch_scan_result
35from acts_contrib.test_utils.bt.bt_constants import bits_per_samples
36from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes
37from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
38from acts_contrib.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed
39from acts_contrib.test_utils.bt.bt_constants import bluetooth_off
40from acts_contrib.test_utils.bt.bt_constants import bluetooth_on
41from acts_contrib.test_utils.bt.bt_constants import \
42    bluetooth_profile_connection_state_changed
43from acts_contrib.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid
44from acts_contrib.test_utils.bt.bt_constants import bt_default_timeout
45from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants
46from acts_contrib.test_utils.bt.bt_constants import bt_profile_states
47from acts_contrib.test_utils.bt.bt_constants import bt_rfcomm_uuids
48from acts_contrib.test_utils.bt.bt_constants import bt_scan_mode_types
49from acts_contrib.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
50from acts_contrib.test_utils.bt.bt_constants import btsnoop_log_path_on_device
51from acts_contrib.test_utils.bt.bt_constants import channel_modes
52from acts_contrib.test_utils.bt.bt_constants import codec_types
53from acts_contrib.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
54from acts_contrib.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
55from acts_contrib.test_utils.bt.bt_constants import hid_id_keyboard
56from acts_contrib.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
57from acts_contrib.test_utils.bt.bt_constants import pan_connect_timeout
58from acts_contrib.test_utils.bt.bt_constants import sample_rates
59from acts_contrib.test_utils.bt.bt_constants import scan_result
60from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants
61from acts_contrib.test_utils.bt.bt_constants import small_timeout
62from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
63from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
64from acts.utils import exe_cmd
65
66from acts import utils
67
68log = logging
69
70advertisements_to_devices = {}
71
72
73class BtTestUtilsError(Exception):
74    pass
75
76
77def _add_android_device_to_dictionary(android_device, profile_list,
78                                      selector_dict):
79    """Adds the AndroidDevice and supported features to the selector dictionary
80
81    Args:
82        android_device: The Android device.
83        profile_list: The list of profiles the Android device supports.
84    """
85    for profile in profile_list:
86        if profile in selector_dict and android_device not in selector_dict[
87                profile]:
88            selector_dict[profile].append(android_device)
89        else:
90            selector_dict[profile] = [android_device]
91
92
93def bluetooth_enabled_check(ad, timeout_sec=5):
94    """Checks if the Bluetooth state is enabled, if not it will attempt to
95    enable it.
96
97    Args:
98        ad: The Android device list to enable Bluetooth on.
99        timeout_sec: number of seconds to wait for toggle to take effect.
100
101    Returns:
102        True if successful, false if unsuccessful.
103    """
104    if not ad.droid.bluetoothCheckState():
105        ad.droid.bluetoothToggleState(True)
106        expected_bluetooth_on_event_name = bluetooth_on
107        try:
108            ad.ed.pop_event(expected_bluetooth_on_event_name,
109                            bt_default_timeout)
110        except Empty:
111            ad.log.info("Failed to toggle Bluetooth on(no broadcast received).")
112            # Try one more time to poke at the actual state.
113            if ad.droid.bluetoothCheckState():
114                ad.log.info(".. actual state is ON")
115                return True
116            ad.log.error(".. actual state is OFF")
117            return False
118    end_time = time.time() + timeout_sec
119    while not ad.droid.bluetoothCheckState() and time.time() < end_time:
120        time.sleep(1)
121    return ad.droid.bluetoothCheckState()
122
123
124def check_device_supported_profiles(droid):
125    """Checks for Android device supported profiles.
126
127    Args:
128        droid: The droid object to query.
129
130    Returns:
131        A dictionary of supported profiles.
132    """
133    profile_dict = {}
134    profile_dict['hid'] = droid.bluetoothHidIsReady()
135    profile_dict['hsp'] = droid.bluetoothHspIsReady()
136    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
137    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
138    profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady()
139    profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady()
140    profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady()
141    return profile_dict
142
143
144def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
145                                     adv_android_device, adv_callback_list):
146    """Try to gracefully stop all scanning and advertising instances.
147
148    Args:
149        scn_android_device: The Android device that is actively scanning.
150        scn_callback_list: The scan callback id list that needs to be stopped.
151        adv_android_device: The Android device that is actively advertising.
152        adv_callback_list: The advertise callback id list that needs to be
153            stopped.
154    """
155    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
156    adv_droid = adv_android_device.droid
157    try:
158        for scan_callback in scn_callback_list:
159            scan_droid.bleStopBleScan(scan_callback)
160    except Exception as err:
161        scn_android_device.log.debug(
162            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
163                err))
164        reset_bluetooth([scn_android_device])
165    try:
166        for adv_callback in adv_callback_list:
167            adv_droid.bleStopBleAdvertising(adv_callback)
168    except Exception as err:
169        adv_android_device.log.debug(
170            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".
171            format(err))
172        reset_bluetooth([adv_android_device])
173
174
175def clear_bonded_devices(ad):
176    """Clear bonded devices from the input Android device.
177
178    Args:
179        ad: the Android device performing the connection.
180    Returns:
181        True if clearing bonded devices was successful, false if unsuccessful.
182    """
183    bonded_device_list = ad.droid.bluetoothGetBondedDevices()
184    while bonded_device_list:
185        device_address = bonded_device_list[0]['address']
186        if not ad.droid.bluetoothUnbond(device_address):
187            log.error("Failed to unbond {} from {}".format(
188                device_address, ad.serial))
189            return False
190        log.info("Successfully unbonded {} from {}".format(
191            device_address, ad.serial))
192        #TODO: wait for BOND_STATE_CHANGED intent instead of waiting
193        time.sleep(1)
194
195        # If device was first connected using LE transport, after bonding it is
196        # accessible through it's LE address, and through it classic address.
197        # Unbonding it will unbond two devices representing different
198        # "addresses". Attempt to unbond such already unbonded devices will
199        # result in bluetoothUnbond returning false.
200        bonded_device_list = ad.droid.bluetoothGetBondedDevices()
201    return True
202
203
204def connect_phone_to_headset(android,
205                             headset,
206                             timeout=bt_default_timeout,
207                             connection_check_period=10):
208    """Connects android phone to bluetooth headset.
209    Headset object must have methods power_on and enter_pairing_mode,
210    and attribute mac_address.
211
212    Args:
213        android: AndroidDevice object with SL4A installed.
214        headset: Object with attribute mac_address and methods power_on and
215            enter_pairing_mode.
216        timeout: Seconds to wait for devices to connect.
217        connection_check_period: how often to check for connection once the
218            SL4A connect RPC has been sent.
219    Returns:
220        connected (bool): True if devices are paired and connected by end of
221        method. False otherwise.
222    """
223    headset_mac_address = headset.mac_address
224    connected = android.droid.audioIsBluetoothA2dpOn()
225    log.info('Devices connected before pair attempt: %s' % connected)
226    if not connected:
227        # Turn on headset and initiate pairing mode.
228        headset.enter_pairing_mode()
229        android.droid.bluetoothStartPairingHelper()
230    start_time = time.time()
231    # If already connected, skip pair and connect attempt.
232    while not connected and (time.time() - start_time < timeout):
233        bonded_info = android.droid.bluetoothGetBondedDevices()
234        connected_info = android.droid.bluetoothGetConnectedDevices()
235        if headset.mac_address not in [info["address"] for info in bonded_info]:
236            # Use SL4A to pair and connect with headset.
237            headset.enter_pairing_mode()
238            android.droid.bluetoothDiscoverAndBond(headset_mac_address)
239        elif headset.mac_address not in [
240                info["address"] for info in connected_info
241        ]:
242            #Device is bonded but not connected
243            android.droid.bluetoothConnectBonded(headset_mac_address)
244        else:
245            #Headset is connected, but A2DP profile is not
246            android.droid.bluetoothA2dpConnect(headset_mac_address)
247        log.info('Waiting for connection...')
248        time.sleep(connection_check_period)
249        # Check for connection.
250        connected = android.droid.audioIsBluetoothA2dpOn()
251    log.info('Devices connected after pair attempt: %s' % connected)
252    return connected
253
254
255def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2):
256    """Connects pri droid to secondary droid.
257
258    Args:
259        pri_ad: AndroidDroid initiating connection
260        sec_ad: AndroidDroid accepting connection
261        profiles_set: Set of profiles to be connected
262        attempts: Number of attempts to try until failure.
263
264    Returns:
265        Pass if True
266        Fail if False
267    """
268    device_addr = sec_ad.droid.bluetoothGetLocalAddress()
269    # Allows extra time for the SDP records to be updated.
270    time.sleep(2)
271    curr_attempts = 0
272    while curr_attempts < attempts:
273        log.info("connect_pri_to_sec curr attempt {} total {}".format(
274            curr_attempts, attempts))
275        if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
276            return True
277        curr_attempts += 1
278    log.error("connect_pri_to_sec failed to connect after {} attempts".format(
279        attempts))
280    return False
281
282
283def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
284    """Connects pri droid to secondary droid.
285
286    Args:
287        pri_ad: AndroidDroid initiating connection.
288        sec_ad: AndroidDroid accepting connection.
289        profiles_set: Set of profiles to be connected.
290
291    Returns:
292        True of connection is successful, false if unsuccessful.
293    """
294    # Check if we support all profiles.
295    supported_profiles = bt_profile_constants.values()
296    for profile in profiles_set:
297        if profile not in supported_profiles:
298            pri_ad.log.info("Profile {} is not supported list {}".format(
299                profile, supported_profiles))
300            return False
301
302    # First check that devices are bonded.
303    paired = False
304    for paired_device in pri_ad.droid.bluetoothGetBondedDevices():
305        if paired_device['address'] == \
306                sec_ad.droid.bluetoothGetLocalAddress():
307            paired = True
308            break
309
310    if not paired:
311        pri_ad.log.error("Not paired to {}".format(sec_ad.serial))
312        return False
313
314    # Now try to connect them, the following call will try to initiate all
315    # connections.
316    pri_ad.droid.bluetoothConnectBonded(sec_ad.droid.bluetoothGetLocalAddress())
317
318    end_time = time.time() + 10
319    profile_connected = set()
320    sec_addr = sec_ad.droid.bluetoothGetLocalAddress()
321    pri_ad.log.info("Profiles to be connected {}".format(profiles_set))
322    # First use APIs to check profile connection state
323    while (time.time() < end_time and
324           not profile_connected.issuperset(profiles_set)):
325        if (bt_profile_constants['headset_client'] not in profile_connected and
326                bt_profile_constants['headset_client'] in profiles_set):
327            if is_hfp_client_device_connected(pri_ad, sec_addr):
328                profile_connected.add(bt_profile_constants['headset_client'])
329        if (bt_profile_constants['a2dp'] not in profile_connected and
330                bt_profile_constants['a2dp'] in profiles_set):
331            if is_a2dp_src_device_connected(pri_ad, sec_addr):
332                profile_connected.add(bt_profile_constants['a2dp'])
333        if (bt_profile_constants['a2dp_sink'] not in profile_connected and
334                bt_profile_constants['a2dp_sink'] in profiles_set):
335            if is_a2dp_snk_device_connected(pri_ad, sec_addr):
336                profile_connected.add(bt_profile_constants['a2dp_sink'])
337        if (bt_profile_constants['map_mce'] not in profile_connected and
338                bt_profile_constants['map_mce'] in profiles_set):
339            if is_map_mce_device_connected(pri_ad, sec_addr):
340                profile_connected.add(bt_profile_constants['map_mce'])
341        if (bt_profile_constants['map'] not in profile_connected and
342                bt_profile_constants['map'] in profiles_set):
343            if is_map_mse_device_connected(pri_ad, sec_addr):
344                profile_connected.add(bt_profile_constants['map'])
345        time.sleep(0.1)
346    # If APIs fail, try to find the connection broadcast receiver.
347    while not profile_connected.issuperset(profiles_set):
348        try:
349            profile_event = pri_ad.ed.pop_event(
350                bluetooth_profile_connection_state_changed,
351                bt_default_timeout + 10)
352            pri_ad.log.info("Got event {}".format(profile_event))
353        except Exception:
354            pri_ad.log.error("Did not get {} profiles left {}".format(
355                bluetooth_profile_connection_state_changed, profile_connected))
356            return False
357
358        profile = profile_event['data']['profile']
359        state = profile_event['data']['state']
360        device_addr = profile_event['data']['addr']
361        if state == bt_profile_states['connected'] and \
362                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
363            profile_connected.add(profile)
364        pri_ad.log.info(
365            "Profiles connected until now {}".format(profile_connected))
366    # Failure happens inside the while loop. If we came here then we already
367    # connected.
368    return True
369
370
371def determine_max_advertisements(android_device):
372    """Determines programatically how many advertisements the Android device
373    supports.
374
375    Args:
376        android_device: The Android device to determine max advertisements of.
377
378    Returns:
379        The maximum advertisement count.
380    """
381    android_device.log.info(
382        "Determining number of maximum concurrent advertisements...")
383    advertisement_count = 0
384    bt_enabled = False
385    expected_bluetooth_on_event_name = bluetooth_on
386    if not android_device.droid.bluetoothCheckState():
387        android_device.droid.bluetoothToggleState(True)
388    try:
389        android_device.ed.pop_event(expected_bluetooth_on_event_name,
390                                    bt_default_timeout)
391    except Exception:
392        android_device.log.info(
393            "Failed to toggle Bluetooth on(no broadcast received).")
394        # Try one more time to poke at the actual state.
395        if android_device.droid.bluetoothCheckState() is True:
396            android_device.log.info(".. actual state is ON")
397        else:
398            android_device.log.error(
399                "Failed to turn Bluetooth on. Setting default advertisements to 1"
400            )
401            advertisement_count = -1
402            return advertisement_count
403    advertise_callback_list = []
404    advertise_data = android_device.droid.bleBuildAdvertiseData()
405    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
406    while (True):
407        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
408        advertise_callback_list.append(advertise_callback)
409
410        android_device.droid.bleStartBleAdvertising(advertise_callback,
411                                                    advertise_data,
412                                                    advertise_settings)
413
414        regex = "(" + adv_succ.format(
415            advertise_callback) + "|" + adv_fail.format(
416                advertise_callback) + ")"
417        # wait for either success or failure event
418        evt = android_device.ed.pop_events(regex, bt_default_timeout,
419                                           small_timeout)
420        if evt[0]["name"] == adv_succ.format(advertise_callback):
421            advertisement_count += 1
422            android_device.log.info(
423                "Advertisement {} started.".format(advertisement_count))
424        else:
425            error = evt[0]["data"]["Error"]
426            if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS":
427                android_device.log.info(
428                    "Advertisement failed to start. Reached max " +
429                    "advertisements at {}".format(advertisement_count))
430                break
431            else:
432                raise BtTestUtilsError(
433                    "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," +
434                    " but received bad error code {}".format(error))
435    try:
436        for adv in advertise_callback_list:
437            android_device.droid.bleStopBleAdvertising(adv)
438    except Exception:
439        android_device.log.error(
440            "Failed to stop advertisingment, resetting Bluetooth.")
441        reset_bluetooth([android_device])
442    return advertisement_count
443
444
445def disable_bluetooth(droid):
446    """Disable Bluetooth on input Droid object.
447
448    Args:
449        droid: The droid object to disable Bluetooth on.
450
451    Returns:
452        True if successful, false if unsuccessful.
453    """
454    if droid.bluetoothCheckState() is True:
455        droid.bluetoothToggleState(False)
456        if droid.bluetoothCheckState() is True:
457            log.error("Failed to toggle Bluetooth off.")
458            return False
459    return True
460
461
462def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
463    """
464    Disconnect primary from secondary on a specific set of profiles
465    Args:
466        pri_ad - Primary android_device initiating disconnection
467        sec_ad - Secondary android droid (sl4a interface to keep the
468          method signature the same connect_pri_to_sec above)
469        profiles_list - List of profiles we want to disconnect from
470
471    Returns:
472        True on Success
473        False on Failure
474    """
475    # Sanity check to see if all the profiles in the given set is supported
476    supported_profiles = bt_profile_constants.values()
477    for profile in profiles_list:
478        if profile not in supported_profiles:
479            pri_ad.log.info("Profile {} is not in supported list {}".format(
480                profile, supported_profiles))
481            return False
482
483    pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices())
484    # Disconnecting on a already disconnected profile is a nop,
485    # so not checking for the connection state
486    try:
487        pri_ad.droid.bluetoothDisconnectConnectedProfile(
488            sec_ad.droid.bluetoothGetLocalAddress(), profiles_list)
489    except Exception as err:
490        pri_ad.log.error(
491            "Exception while trying to disconnect profile(s) {}: {}".format(
492                profiles_list, err))
493        return False
494
495    profile_disconnected = set()
496    pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list))
497
498    while not profile_disconnected.issuperset(profiles_list):
499        try:
500            profile_event = pri_ad.ed.pop_event(
501                bluetooth_profile_connection_state_changed, bt_default_timeout)
502            pri_ad.log.info("Got event {}".format(profile_event))
503        except Exception as e:
504            pri_ad.log.error(
505                "Did not disconnect from Profiles. Reason {}".format(e))
506            return False
507
508        profile = profile_event['data']['profile']
509        state = profile_event['data']['state']
510        device_addr = profile_event['data']['addr']
511
512        if state == bt_profile_states['disconnected'] and \
513                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
514            profile_disconnected.add(profile)
515        pri_ad.log.info(
516            "Profiles disconnected so far {}".format(profile_disconnected))
517
518    return True
519
520
521def enable_bluetooth(droid, ed):
522    if droid.bluetoothCheckState() is True:
523        return True
524
525    droid.bluetoothToggleState(True)
526    expected_bluetooth_on_event_name = bluetooth_on
527    try:
528        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
529    except Exception:
530        log.info("Failed to toggle Bluetooth on (no broadcast received)")
531        if droid.bluetoothCheckState() is True:
532            log.info(".. actual state is ON")
533            return True
534        log.info(".. actual state is OFF")
535        return False
536
537    return True
538
539
540def factory_reset_bluetooth(android_devices):
541    """Clears Bluetooth stack of input Android device list.
542
543        Args:
544            android_devices: The Android device list to reset Bluetooth
545
546        Returns:
547            True if successful, false if unsuccessful.
548        """
549    for a in android_devices:
550        droid, ed = a.droid, a.ed
551        a.log.info("Reset state of bluetooth on device.")
552        if not bluetooth_enabled_check(a):
553            return False
554        # TODO: remove device unbond b/79418045
555        # Temporary solution to ensure all devices are unbonded
556        bonded_devices = droid.bluetoothGetBondedDevices()
557        for b in bonded_devices:
558            a.log.info("Removing bond for device {}".format(b['address']))
559            droid.bluetoothUnbond(b['address'])
560
561        droid.bluetoothFactoryReset()
562        wait_for_bluetooth_manager_state(droid)
563        if not enable_bluetooth(droid, ed):
564            return False
565    return True
566
567
568def generate_ble_advertise_objects(droid):
569    """Generate generic LE advertise objects.
570
571    Args:
572        droid: The droid object to generate advertise LE objects from.
573
574    Returns:
575        advertise_callback: The generated advertise callback id.
576        advertise_data: The generated advertise data id.
577        advertise_settings: The generated advertise settings id.
578    """
579    advertise_callback = droid.bleGenBleAdvertiseCallback()
580    advertise_data = droid.bleBuildAdvertiseData()
581    advertise_settings = droid.bleBuildAdvertiseSettings()
582    return advertise_callback, advertise_data, advertise_settings
583
584
585def generate_ble_scan_objects(droid):
586    """Generate generic LE scan objects.
587
588    Args:
589        droid: The droid object to generate LE scan objects from.
590
591    Returns:
592        filter_list: The generated scan filter list id.
593        scan_settings: The generated scan settings id.
594        scan_callback: The generated scan callback id.
595    """
596    filter_list = droid.bleGenFilterList()
597    scan_settings = droid.bleBuildScanSetting()
598    scan_callback = droid.bleGenScanCallback()
599    return filter_list, scan_settings, scan_callback
600
601
602def generate_id_by_size(size,
603                        chars=(string.ascii_lowercase + string.ascii_uppercase +
604                               string.digits)):
605    """Generate random ascii characters of input size and input char types
606
607    Args:
608        size: Input size of string.
609        chars: (Optional) Chars to use in generating a random string.
610
611    Returns:
612        String of random input chars at the input size.
613    """
614    return ''.join(random.choice(chars) for _ in range(size))
615
616
617def get_advanced_droid_list(android_devices):
618    """Add max_advertisement and batch_scan_supported attributes to input
619    Android devices
620
621    This will programatically determine maximum LE advertisements of each
622    input Android device.
623
624    Args:
625        android_devices: The Android devices to setup.
626
627    Returns:
628        List of Android devices with new attribtues.
629    """
630    droid_list = []
631    for a in android_devices:
632        d, e = a.droid, a.ed
633        model = d.getBuildModel()
634        max_advertisements = 1
635        batch_scan_supported = True
636        if model in advertisements_to_devices.keys():
637            max_advertisements = advertisements_to_devices[model]
638        else:
639            max_advertisements = determine_max_advertisements(a)
640            max_tries = 3
641            # Retry to calculate max advertisements
642            while max_advertisements == -1 and max_tries > 0:
643                a.log.info(
644                    "Attempts left to determine max advertisements: {}".format(
645                        max_tries))
646                max_advertisements = determine_max_advertisements(a)
647                max_tries -= 1
648            advertisements_to_devices[model] = max_advertisements
649        if model in batch_scan_not_supported_list:
650            batch_scan_supported = False
651        role = {
652            'droid': d,
653            'ed': e,
654            'max_advertisements': max_advertisements,
655            'batch_scan_supported': batch_scan_supported
656        }
657        droid_list.append(role)
658    return droid_list
659
660
661def get_bluetooth_crash_count(android_device):
662    out = android_device.adb.shell("dumpsys bluetooth_manager")
663    return int(re.search("crashed(.*\d)", out).group(1))
664
665
666def read_otp(ad):
667    """Reads and parses the OTP output to return TX power backoff
668
669    Reads the OTP registers from the phone, parses them to return a
670    dict of TX power backoffs for different power levels
671
672    Args:
673        ad : android device object
674
675    Returns :
676        otp_dict : power backoff dict
677    """
678
679    ad.adb.shell('svc bluetooth disable')
680    time.sleep(2)
681    otp_output = ad.adb.shell('bluetooth_sar_test -r')
682    ad.adb.shell('svc bluetooth enable')
683    time.sleep(2)
684    otp_dict = {
685        "BR": {
686            "10": 0,
687            "9": 0,
688            "8": 0
689        },
690        "EDR": {
691            "10": 0,
692            "9": 0,
693            "8": 0
694        },
695        "BLE": {
696            "10": 0,
697            "9": 0,
698            "8": 0
699        }
700    }
701
702    otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]'
703
704    for key in otp_dict:
705        bank_list = re.findall("{}{}".format(key, otp_regex), otp_output)
706        for bank_tuple in bank_list:
707            if ('0', '0', '0') != bank_tuple:
708                [otp_dict[key]["10"], otp_dict[key]["9"],
709                 otp_dict[key]["8"]] = bank_tuple
710    return otp_dict
711
712
713def get_bt_metric(ad_list,
714                  duration=1,
715                  bqr_tag='Monitoring , Handle:',
716                  tag='',
717                  log_path=False):
718    """ Function to get the bt metric from logcat.
719
720    Captures logcat for the specified duration and returns the bqr results.
721    Takes list of android objects as input. If a single android object is given,
722    converts it into a list.
723
724    Args:
725        ad_list: list of android_device objects
726        duration: time duration (seconds) for which the logcat is parsed
727        bqr_tag: tag of bt metrics
728        tag: tag to be appended to the metrics raw data
729        log_path: path of metrics raw data
730
731    Returns:
732        process_data: dict of process raw data for each android devices
733    """
734
735    # Defining bqr quantites and their regex to extract
736    regex_dict = {
737        "pwlv": "PwLv:\s(\S+)",
738        "rssi": "RSSI:\s[-](\d+)",
739        "rssi_c0": "RSSI_C0:\s[-](\d+)",
740        "rssi_c1": "RSSI_C1:\s[-](\d+)",
741        "txpw_c0": "\sTxPw_C0:\s(-?\d+)",
742        "txpw_c1": "\sTxPw_C1:\s(-?\d+)",
743        "bftx": "BFTx:\s(\w+)",
744        "divtx": "DivTx:\s(\w+)"
745    }
746    metrics_dict = {
747        "rssi": {},
748        "pwlv": {},
749        "rssi_c0": {},
750        "rssi_c1": {},
751        "txpw_c0": {},
752        "txpw_c1": {},
753        "bftx": {},
754        "divtx": {}
755    }
756
757    # Converting a single android device object to list
758    if not isinstance(ad_list, list):
759        ad_list = [ad_list]
760
761    #Time sync with the test machine
762    for ad in ad_list:
763        ad.droid.setTime(int(round(time.time() * 1000)))
764        time.sleep(0.5)
765
766    begin_time = utils.get_current_epoch_time()
767    time.sleep(duration)
768    end_time = utils.get_current_epoch_time()
769
770    for ad in ad_list:
771        bt_rssi_log = ad.cat_adb_log(tag + "_bt_metric", begin_time, end_time)
772
773        # Extracting supporting bqr quantities
774        for metric, regex in regex_dict.items():
775            bqr_metric = []
776            file_bt_log = open(bt_rssi_log, "r")
777            for line in file_bt_log:
778                if bqr_tag in line:
779                    if re.findall(regex, line):
780                        m = re.findall(regex, line)[0].strip(",")
781                        bqr_metric.append(m)
782            metrics_dict[metric][ad.serial] = bqr_metric
783            file_bt_log.close()
784
785        # Formatting and saving the raw data
786        metrics_to_be_formatted = [{
787            "name": "rssi",
788            "averagble": "y"
789        }, {
790            "name": "rssi_c0",
791            "averagble": "y"
792        }, {
793            "name": "rssi_c1",
794            "averagble": "y"
795        }, {
796            "name": "pwlv",
797            "averagble": "n"
798        }, {
799            "name": "txpw_c0",
800            "averagble": "n"
801        }, {
802            "name": "txpw_c1",
803            "averagble": "n"
804        }, {
805            "name": "bftx",
806            "averagble": "n"
807        }, {
808            "name": "divtx",
809            "averagble": "n"
810        }]
811        for metric in metrics_to_be_formatted:
812            if metric["averagble"] == "y":
813                metrics_dict[metric["name"]][ad.serial] = [
814                    (-1) * int(x)
815                    for x in metrics_dict[metric["name"]][ad.serial]
816                ]
817            else:
818                metrics_dict[metric["name"]][ad.serial] = [
819                    int(x, 16) if '0x' in x else int(x, 10)
820                    for x in metrics_dict[metric["name"]][ad.serial]
821                ]
822        # Saving metrics raw data for each attenuation
823        if log_path:
824            output_file_name = ad.serial + "_metrics_raw_data_" + tag + ".csv"
825            output_file = os.path.join(log_path, output_file_name)
826            os.makedirs(log_path, exist_ok=True)
827            df_save_metrics = {}
828            for item in metrics_dict.items():
829                df_save_metrics[item[0]] = next(iter(item[1].items()))[1]
830            MetricsDict_df = pd.DataFrame({key:pd.Series(value) for key, value in df_save_metrics.items()})
831            MetricsDict_df.to_csv(output_file)
832        # Defining the process_data_dict
833        process_data = {
834            "rssi": {},
835            "pwlv": {},
836            "rssi_c0": {},
837            "rssi_c1": {},
838            "txpw_c0": {},
839            "txpw_c1": {},
840            "bftx": {},
841            "divtx": {}
842        }
843
844        # Computing and returning the raw data
845        for metric in metrics_to_be_formatted:
846            if metric["averagble"] == "y":
847                process_data[metric["name"]][ad.serial] = [
848                    x for x in metrics_dict[metric["name"]][ad.serial]
849                    if x != 0 and x != -127
850                ]
851
852                try:
853                    #DOING AVERAGE
854                    process_data[metric["name"]][ad.serial] = round(
855                        sum(metrics_dict[metric["name"]][ad.serial]) /
856                        len(metrics_dict[metric["name"]][ad.serial]), 2)
857                except ZeroDivisionError:
858                    #SETTING VALUE TO 'n/a'
859                    process_data[metric["name"]][ad.serial] = "n/a"
860            else:
861                try:
862                    #GETTING MOST_COMMON_VALUE
863                    process_data[metric["name"]][ad.serial] = max(
864                        metrics_dict[metric["name"]][ad.serial],
865                        key=metrics_dict[metric["name"]][ad.serial].count)
866                except ValueError:
867                    #SETTING VALUE TO 'n/a'
868                    process_data[metric["name"]][ad.serial] = "n/a"
869
870    return process_data
871
872
873def get_bt_rssi(ad, duration=1, processed=True, tag='', log_path=False):
874    """Function to get average bt rssi from logcat.
875
876    This function returns the average RSSI for the given duration. RSSI values are
877    extracted from BQR.
878
879    Args:
880        ad: (list of) android_device object.
881        duration: time duration(seconds) for which logcat is parsed.
882
883    Returns:
884        avg_rssi: average RSSI on each android device for the given duration.
885    """
886    bqr_results = get_bt_metric(ad, duration, tag=tag, log_path=log_path)
887    return bqr_results["rssi"]
888
889
890def enable_bqr(
891    ad_list,
892    bqr_interval=10,
893    bqr_event_mask=15,
894):
895    """Sets up BQR reporting.
896
897       Sets up BQR to report BT metrics at the requested frequency and toggles
898       airplane mode for the bqr settings to take effect.
899
900    Args:
901        ad_list: an android_device or list of android devices.
902    """
903    # Converting a single android device object to list
904    if not isinstance(ad_list, list):
905        ad_list = [ad_list]
906
907    for ad in ad_list:
908        #Setting BQR parameters
909        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
910            bqr_event_mask))
911        ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format(
912            bqr_interval))
913
914        ## Toggle airplane mode
915        ad.droid.connectivityToggleAirplaneMode(True)
916        ad.droid.connectivityToggleAirplaneMode(False)
917
918
919def disable_bqr(ad_list):
920    """Disables BQR reporting.
921
922    Args:
923        ad_list: an android_device or list of android devices.
924    """
925    # Converting a single android device object to list
926    if not isinstance(ad_list, list):
927        ad_list = [ad_list]
928
929    DISABLE_BQR_MASK = 0
930
931    for ad in ad_list:
932        #Disabling BQR
933        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
934            DISABLE_BQR_MASK))
935
936        ## Toggle airplane mode
937        ad.droid.connectivityToggleAirplaneMode(True)
938        ad.droid.connectivityToggleAirplaneMode(False)
939
940
941def get_device_selector_dictionary(android_device_list):
942    """Create a dictionary of Bluetooth features vs Android devices.
943
944    Args:
945        android_device_list: The list of Android devices.
946    Returns:
947        A dictionary of profiles/features to Android devices.
948    """
949    selector_dict = {}
950    for ad in android_device_list:
951        uuids = ad.droid.bluetoothGetLocalUuids()
952
953        for profile, uuid_const in sig_uuid_constants.items():
954            uuid_check = sig_uuid_constants['BASE_UUID'].format(
955                uuid_const).lower()
956            if uuids and uuid_check in uuids:
957                if profile in selector_dict:
958                    selector_dict[profile].append(ad)
959                else:
960                    selector_dict[profile] = [ad]
961
962        # Various services may not be active during BT startup.
963        # If the device can be identified through adb shell pm list features
964        # then try to add them to the appropriate profiles / features.
965
966        # Android TV.
967        if "feature:android.hardware.type.television" in ad.features:
968            ad.log.info("Android TV device found.")
969            supported_profiles = ['AudioSink']
970            _add_android_device_to_dictionary(ad, supported_profiles,
971                                              selector_dict)
972
973        # Android Auto
974        elif "feature:android.hardware.type.automotive" in ad.features:
975            ad.log.info("Android Auto device found.")
976            # Add: AudioSink , A/V_RemoteControl,
977            supported_profiles = [
978                'AudioSink', 'A/V_RemoteControl', 'Message Notification Server'
979            ]
980            _add_android_device_to_dictionary(ad, supported_profiles,
981                                              selector_dict)
982        # Android Wear
983        elif "feature:android.hardware.type.watch" in ad.features:
984            ad.log.info("Android Wear device found.")
985            supported_profiles = []
986            _add_android_device_to_dictionary(ad, supported_profiles,
987                                              selector_dict)
988        # Android Phone
989        elif "feature:android.hardware.telephony" in ad.features:
990            ad.log.info("Android Phone device found.")
991            # Add: AudioSink
992            supported_profiles = [
993                'AudioSource', 'A/V_RemoteControlTarget',
994                'Message Access Server'
995            ]
996            _add_android_device_to_dictionary(ad, supported_profiles,
997                                              selector_dict)
998    return selector_dict
999
1000
1001def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
1002    """Start generic advertisement and get it's mac address by LE scanning.
1003
1004    Args:
1005        scan_ad: The Android device to use as the scanner.
1006        adv_ad: The Android device to use as the advertiser.
1007
1008    Returns:
1009        mac_address: The mac address of the advertisement.
1010        advertise_callback: The advertise callback id of the active
1011            advertisement.
1012    """
1013    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
1014    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
1015        ble_advertise_settings_modes['low_latency'])
1016    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
1017    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
1018        ble_advertise_settings_tx_powers['high'])
1019    advertise_callback, advertise_data, advertise_settings = (
1020        generate_ble_advertise_objects(adv_ad.droid))
1021    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
1022                                        advertise_settings)
1023    try:
1024        adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
1025                            bt_default_timeout)
1026    except Empty as err:
1027        raise BtTestUtilsError(
1028            "Advertiser did not start successfully {}".format(err))
1029    filter_list = scan_ad.droid.bleGenFilterList()
1030    scan_settings = scan_ad.droid.bleBuildScanSetting()
1031    scan_callback = scan_ad.droid.bleGenScanCallback()
1032    scan_ad.droid.bleSetScanFilterDeviceName(
1033        adv_ad.droid.bluetoothGetLocalName())
1034    scan_ad.droid.bleBuildScanFilter(filter_list)
1035    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
1036    try:
1037        event = scan_ad.ed.pop_event(
1038            "BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
1039    except Empty as err:
1040        raise BtTestUtilsError(
1041            "Scanner did not find advertisement {}".format(err))
1042    mac_address = event['data']['Result']['deviceInfo']['address']
1043    return mac_address, advertise_callback, scan_callback
1044
1045
1046def hid_device_send_key_data_report(host_id, device_ad, key, interval=1):
1047    """Send a HID report simulating a 1-second keyboard press from host_ad to
1048    device_ad
1049
1050    Args:
1051        host_id: the Bluetooth MAC address or name of the HID host
1052        device_ad: HID device
1053        key: the key we want to send
1054        interval: the interval between key press and key release
1055    """
1056    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
1057                                                 hid_keyboard_report(key))
1058    time.sleep(interval)
1059    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
1060                                                 hid_keyboard_report("00"))
1061
1062
1063def hid_keyboard_report(key, modifier="00"):
1064    """Get the HID keyboard report for the given key
1065
1066    Args:
1067        key: the key we want
1068        modifier: HID keyboard modifier bytes
1069    Returns:
1070        The byte array for the HID report.
1071    """
1072    return str(
1073        bytearray.fromhex(" ".join(
1074            [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8")
1075
1076
1077def is_a2dp_connected(sink, source):
1078    """
1079    Convenience Function to see if the 2 devices are connected on
1080    A2dp.
1081    Args:
1082        sink:       Audio Sink
1083        source:     Audio Source
1084    Returns:
1085        True if Connected
1086        False if Not connected
1087    """
1088
1089    devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices()
1090    for device in devices:
1091        sink.log.info("A2dp Connected device {}".format(device["name"]))
1092        if (device["address"] == source.droid.bluetoothGetLocalAddress()):
1093            return True
1094    return False
1095
1096
1097def is_a2dp_snk_device_connected(ad, addr):
1098    """Determines if an AndroidDevice has A2DP snk connectivity to input address
1099
1100    Args:
1101        ad: the Android device
1102        addr: the address that's expected
1103    Returns:
1104        True if connection was successful, false if unsuccessful.
1105    """
1106    devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices()
1107    ad.log.info("Connected A2DP Sink devices: {}".format(devices))
1108    if addr in {d['address'] for d in devices}:
1109        return True
1110    return False
1111
1112
1113def is_a2dp_src_device_connected(ad, addr):
1114    """Determines if an AndroidDevice has A2DP connectivity to input address
1115
1116    Args:
1117        ad: the Android device
1118        addr: the address that's expected
1119    Returns:
1120        True if connection was successful, false if unsuccessful.
1121    """
1122    devices = ad.droid.bluetoothA2dpGetConnectedDevices()
1123    ad.log.info("Connected A2DP Source devices: {}".format(devices))
1124    if addr in {d['address'] for d in devices}:
1125        return True
1126    return False
1127
1128
1129def is_hfp_client_device_connected(ad, addr):
1130    """Determines if an AndroidDevice has HFP connectivity to input address
1131
1132    Args:
1133        ad: the Android device
1134        addr: the address that's expected
1135    Returns:
1136        True if connection was successful, false if unsuccessful.
1137    """
1138    devices = ad.droid.bluetoothHfpClientGetConnectedDevices()
1139    ad.log.info("Connected HFP Client devices: {}".format(devices))
1140    if addr in {d['address'] for d in devices}:
1141        return True
1142    return False
1143
1144
1145def is_map_mce_device_connected(ad, addr):
1146    """Determines if an AndroidDevice has MAP MCE connectivity to input address
1147
1148    Args:
1149        ad: the Android device
1150        addr: the address that's expected
1151    Returns:
1152        True if connection was successful, false if unsuccessful.
1153    """
1154    devices = ad.droid.bluetoothMapClientGetConnectedDevices()
1155    ad.log.info("Connected MAP MCE devices: {}".format(devices))
1156    if addr in {d['address'] for d in devices}:
1157        return True
1158    return False
1159
1160
1161def is_map_mse_device_connected(ad, addr):
1162    """Determines if an AndroidDevice has MAP MSE connectivity to input address
1163
1164    Args:
1165        ad: the Android device
1166        addr: the address that's expected
1167    Returns:
1168        True if connection was successful, false if unsuccessful.
1169    """
1170    devices = ad.droid.bluetoothMapGetConnectedDevices()
1171    ad.log.info("Connected MAP MSE devices: {}".format(devices))
1172    if addr in {d['address'] for d in devices}:
1173        return True
1174    return False
1175
1176
1177def kill_bluetooth_process(ad):
1178    """Kill Bluetooth process on Android device.
1179
1180    Args:
1181        ad: Android device to kill BT process on.
1182    """
1183    ad.log.info("Killing Bluetooth process.")
1184    pid = ad.adb.shell(
1185        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
1186    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
1187
1188
1189def log_energy_info(android_devices, state):
1190    """Logs energy info of input Android devices.
1191
1192    Args:
1193        android_devices: input Android device list to log energy info from.
1194        state: the input state to log. Usually 'Start' or 'Stop' for logging.
1195
1196    Returns:
1197        A logging string of the Bluetooth energy info reported.
1198    """
1199    return_string = "{} Energy info collection:\n".format(state)
1200    # Bug: b/31966929
1201    return return_string
1202
1203
1204def orchestrate_and_verify_pan_connection(pan_dut, panu_dut):
1205    """Setups up a PAN conenction between two android devices.
1206
1207    Args:
1208        pan_dut: the Android device providing tethering services
1209        panu_dut: the Android device using the internet connection from the
1210            pan_dut
1211    Returns:
1212        True if PAN connection and verification is successful,
1213        false if unsuccessful.
1214    """
1215    if not toggle_airplane_mode_by_adb(log, panu_dut, True):
1216        panu_dut.log.error("Failed to toggle airplane mode on")
1217        return False
1218    if not toggle_airplane_mode_by_adb(log, panu_dut, False):
1219        pan_dut.log.error("Failed to toggle airplane mode off")
1220        return False
1221    pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1222    panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1223    if not bluetooth_enabled_check(panu_dut):
1224        return False
1225    if not bluetooth_enabled_check(pan_dut):
1226        return False
1227    pan_dut.droid.bluetoothPanSetBluetoothTethering(True)
1228    if not (pair_pri_to_sec(pan_dut, panu_dut)):
1229        return False
1230    if not pan_dut.droid.bluetoothPanIsTetheringOn():
1231        pan_dut.log.error("Failed to enable Bluetooth tethering.")
1232        return False
1233    # Magic sleep needed to give the stack time in between bonding and
1234    # connecting the PAN profile.
1235    time.sleep(pan_connect_timeout)
1236    panu_dut.droid.bluetoothConnectBonded(
1237        pan_dut.droid.bluetoothGetLocalAddress())
1238    if not verify_http_connection(log, panu_dut):
1239        panu_dut.log.error("Can't verify http connection on PANU device.")
1240        if not verify_http_connection(log, pan_dut):
1241            pan_dut.log.info(
1242                "Can't verify http connection on PAN service device")
1243        return False
1244    return True
1245
1246
1247def orchestrate_bluetooth_socket_connection(
1248        client_ad,
1249        server_ad,
1250        accept_timeout_ms=default_bluetooth_socket_timeout_ms,
1251        uuid=None):
1252    """Sets up the Bluetooth Socket connection between two Android devices.
1253
1254    Args:
1255        client_ad: the Android device performing the connection.
1256        server_ad: the Android device accepting the connection.
1257    Returns:
1258        True if connection was successful, false if unsuccessful.
1259    """
1260    server_ad.droid.bluetoothStartPairingHelper()
1261    client_ad.droid.bluetoothStartPairingHelper()
1262
1263    server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
1264        (bluetooth_socket_conn_test_uuid if uuid is None else uuid),
1265        accept_timeout_ms)
1266    client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
1267        server_ad.droid.bluetoothGetLocalAddress(),
1268        (bluetooth_socket_conn_test_uuid if uuid is None else uuid))
1269
1270    end_time = time.time() + bt_default_timeout
1271    result = False
1272    test_result = True
1273    while time.time() < end_time:
1274        if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
1275            test_result = True
1276            client_ad.log.info("Bluetooth socket Client Connection Active")
1277            break
1278        else:
1279            test_result = False
1280        time.sleep(1)
1281    if not test_result:
1282        client_ad.log.error("Failed to establish a Bluetooth socket connection")
1283        return False
1284    return True
1285
1286
1287def orchestrate_rfcomm_connection(client_ad,
1288                                  server_ad,
1289                                  accept_timeout_ms=default_rfcomm_timeout_ms,
1290                                  uuid=None):
1291    """Sets up the RFCOMM connection between two Android devices.
1292
1293    Args:
1294        client_ad: the Android device performing the connection.
1295        server_ad: the Android device accepting the connection.
1296    Returns:
1297        True if connection was successful, false if unsuccessful.
1298    """
1299    result = orchestrate_bluetooth_socket_connection(
1300        client_ad, server_ad, accept_timeout_ms,
1301        (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))
1302
1303    return result
1304
1305
1306def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
1307    """Pairs pri droid to secondary droid.
1308
1309    Args:
1310        pri_ad: Android device initiating connection
1311        sec_ad: Android device accepting connection
1312        attempts: Number of attempts to try until failure.
1313        auto_confirm: Auto confirm passkey match for both devices
1314
1315    Returns:
1316        Pass if True
1317        Fail if False
1318    """
1319    pri_ad.droid.bluetoothStartConnectionStateChangeMonitor(
1320        sec_ad.droid.bluetoothGetLocalAddress())
1321    curr_attempts = 0
1322    while curr_attempts < attempts:
1323        if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1324            return True
1325        # Wait 2 seconds before unbound
1326        time.sleep(2)
1327        if not clear_bonded_devices(pri_ad):
1328            log.error(
1329                "Failed to clear bond for primary device at attempt {}".format(
1330                    str(curr_attempts)))
1331            return False
1332        if not clear_bonded_devices(sec_ad):
1333            log.error("Failed to clear bond for secondary device at attempt {}".
1334                      format(str(curr_attempts)))
1335            return False
1336        # Wait 2 seconds after unbound
1337        time.sleep(2)
1338        curr_attempts += 1
1339    log.error("pair_pri_to_sec failed to connect after {} attempts".format(
1340        str(attempts)))
1341    return False
1342
1343
1344def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1345    # Enable discovery on sec_ad so that pri_ad can find it.
1346    # The timeout here is based on how much time it would take for two devices
1347    # to pair with each other once pri_ad starts seeing devices.
1348    pri_droid = pri_ad.droid
1349    sec_droid = sec_ad.droid
1350    pri_ad.ed.clear_all_events()
1351    sec_ad.ed.clear_all_events()
1352    log.info("Bonding device {} to {}".format(
1353        pri_droid.bluetoothGetLocalAddress(),
1354        sec_droid.bluetoothGetLocalAddress()))
1355    sec_droid.bluetoothMakeDiscoverable(bt_default_timeout)
1356    target_address = sec_droid.bluetoothGetLocalAddress()
1357    log.debug("Starting paring helper on each device")
1358    pri_droid.bluetoothStartPairingHelper(auto_confirm)
1359    sec_droid.bluetoothStartPairingHelper(auto_confirm)
1360    pri_ad.log.info("Primary device starting discovery and executing bond")
1361    result = pri_droid.bluetoothDiscoverAndBond(target_address)
1362    if not auto_confirm:
1363        if not _wait_for_passkey_match(pri_ad, sec_ad):
1364            return False
1365    # Loop until we have bonded successfully or timeout.
1366    end_time = time.time() + bt_default_timeout
1367    pri_ad.log.info("Verifying devices are bonded")
1368    while time.time() < end_time:
1369        bonded_devices = pri_droid.bluetoothGetBondedDevices()
1370        bonded = False
1371        for d in bonded_devices:
1372            if d['address'] == target_address:
1373                pri_ad.log.info("Successfully bonded to device")
1374                return True
1375        time.sleep(0.1)
1376    # Timed out trying to bond.
1377    pri_ad.log.info("Failed to bond devices.")
1378    return False
1379
1380
1381def reset_bluetooth(android_devices):
1382    """Resets Bluetooth state of input Android device list.
1383
1384    Args:
1385        android_devices: The Android device list to reset Bluetooth state on.
1386
1387    Returns:
1388        True if successful, false if unsuccessful.
1389    """
1390    for a in android_devices:
1391        droid, ed = a.droid, a.ed
1392        a.log.info("Reset state of bluetooth on device.")
1393        if droid.bluetoothCheckState() is True:
1394            droid.bluetoothToggleState(False)
1395            expected_bluetooth_off_event_name = bluetooth_off
1396            try:
1397                ed.pop_event(expected_bluetooth_off_event_name,
1398                             bt_default_timeout)
1399            except Exception:
1400                a.log.error("Failed to toggle Bluetooth off.")
1401                return False
1402        # temp sleep for b/17723234
1403        time.sleep(3)
1404        if not bluetooth_enabled_check(a):
1405            return False
1406    return True
1407
1408
1409def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
1410    """Verify that input number of advertisements can be found from the scanning
1411    Android device.
1412
1413    Args:
1414        scn_ad: The Android device to start LE scanning on.
1415        max_advertisements: The number of advertisements the scanner expects to
1416        find.
1417
1418    Returns:
1419        True if successful, false if unsuccessful.
1420    """
1421    test_result = False
1422    address_list = []
1423    filter_list = scn_ad.droid.bleGenFilterList()
1424    scn_ad.droid.bleBuildScanFilter(filter_list)
1425    scan_settings = scn_ad.droid.bleBuildScanSetting()
1426    scan_callback = scn_ad.droid.bleGenScanCallback()
1427    scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
1428    start_time = time.time()
1429    while (start_time + bt_default_timeout) > time.time():
1430        event = None
1431        try:
1432            event = scn_ad.ed.pop_event(scan_result.format(scan_callback),
1433                                        bt_default_timeout)
1434        except Empty as error:
1435            raise BtTestUtilsError(
1436                "Failed to find scan event: {}".format(error))
1437        address = event['data']['Result']['deviceInfo']['address']
1438        if address not in address_list:
1439            address_list.append(address)
1440        if len(address_list) == max_advertisements:
1441            test_result = True
1442            break
1443    scn_ad.droid.bleStopBleScan(scan_callback)
1444    return test_result
1445
1446
1447def set_bluetooth_codec(android_device,
1448                        codec_type,
1449                        sample_rate,
1450                        bits_per_sample,
1451                        channel_mode,
1452                        codec_specific_1=0):
1453    """Sets the A2DP codec configuration on the AndroidDevice.
1454
1455    Args:
1456        android_device (acts.controllers.android_device.AndroidDevice): the
1457            android device for which to switch the codec.
1458        codec_type (str): the desired codec type. Must be a key in
1459            bt_constants.codec_types.
1460        sample_rate (str): the desired sample rate. Must be a key in
1461            bt_constants.sample_rates.
1462        bits_per_sample (str): the desired bits per sample. Must be a key in
1463            bt_constants.bits_per_samples.
1464        channel_mode (str): the desired channel mode. Must be a key in
1465            bt_constants.channel_modes.
1466        codec_specific_1 (int): the desired bit rate (quality) for LDAC codec.
1467    Returns:
1468        bool: True if the codec config was successfully changed to the desired
1469            values. Else False.
1470    """
1471    message = ("Set Android Device A2DP Bluetooth codec configuration:\n"
1472               "\tCodec: {codec_type}\n"
1473               "\tSample Rate: {sample_rate}\n"
1474               "\tBits per Sample: {bits_per_sample}\n"
1475               "\tChannel Mode: {channel_mode}".format(
1476                   codec_type=codec_type,
1477                   sample_rate=sample_rate,
1478                   bits_per_sample=bits_per_sample,
1479                   channel_mode=channel_mode))
1480    android_device.log.info(message)
1481
1482    # Send SL4A command
1483    droid, ed = android_device.droid, android_device.ed
1484    if not droid.bluetoothA2dpSetCodecConfigPreference(
1485            codec_types[codec_type], sample_rates[str(sample_rate)],
1486            bits_per_samples[str(bits_per_sample)], channel_modes[channel_mode],
1487            codec_specific_1):
1488        android_device.log.warning("SL4A command returned False. Codec was not "
1489                                   "changed.")
1490    else:
1491        try:
1492            ed.pop_event(bluetooth_a2dp_codec_config_changed,
1493                         bt_default_timeout)
1494        except Exception:
1495            android_device.log.warning("SL4A event not registered. Codec "
1496                                       "may not have been changed.")
1497
1498    # Validate codec value through ADB
1499    # TODO (aidanhb): validate codec more robustly using SL4A
1500    command = "dumpsys bluetooth_manager | grep -i 'current codec'"
1501    out = android_device.adb.shell(command)
1502    split_out = out.split(": ")
1503    if len(split_out) != 2:
1504        android_device.log.warning("Could not verify codec config change "
1505                                   "through ADB.")
1506    elif split_out[1].strip().upper() != codec_type:
1507        android_device.log.error("Codec config was not changed.\n"
1508                                 "\tExpected codec: {exp}\n"
1509                                 "\tActual codec: {act}".format(
1510                                     exp=codec_type, act=split_out[1].strip()))
1511        return False
1512    android_device.log.info("Bluetooth codec successfully changed.")
1513    return True
1514
1515
1516def set_bt_scan_mode(ad, scan_mode_value):
1517    """Set Android device's Bluetooth scan mode.
1518
1519    Args:
1520        ad: The Android device to set the scan mode on.
1521        scan_mode_value: The value to set the scan mode to.
1522
1523    Returns:
1524        True if successful, false if unsuccessful.
1525    """
1526    droid, ed = ad.droid, ad.ed
1527    if scan_mode_value == bt_scan_mode_types['state_off']:
1528        disable_bluetooth(droid)
1529        scan_mode = droid.bluetoothGetScanMode()
1530        reset_bluetooth([ad])
1531        if scan_mode != scan_mode_value:
1532            return False
1533    elif scan_mode_value == bt_scan_mode_types['none']:
1534        droid.bluetoothMakeUndiscoverable()
1535        scan_mode = droid.bluetoothGetScanMode()
1536        if scan_mode != scan_mode_value:
1537            return False
1538    elif scan_mode_value == bt_scan_mode_types['connectable']:
1539        droid.bluetoothMakeUndiscoverable()
1540        droid.bluetoothMakeConnectable()
1541        scan_mode = droid.bluetoothGetScanMode()
1542        if scan_mode != scan_mode_value:
1543            return False
1544    elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']):
1545        droid.bluetoothMakeDiscoverable()
1546        scan_mode = droid.bluetoothGetScanMode()
1547        if scan_mode != scan_mode_value:
1548            return False
1549    else:
1550        # invalid scan mode
1551        return False
1552    return True
1553
1554
1555def set_device_name(droid, name):
1556    """Set and check Bluetooth local name on input droid object.
1557
1558    Args:
1559        droid: Droid object to set local name on.
1560        name: the Bluetooth local name to set.
1561
1562    Returns:
1563        True if successful, false if unsuccessful.
1564    """
1565    droid.bluetoothSetLocalName(name)
1566    time.sleep(2)
1567    droid_name = droid.bluetoothGetLocalName()
1568    if droid_name != name:
1569        return False
1570    return True
1571
1572
1573def set_profile_priority(host_ad, client_ad, profiles, priority):
1574    """Sets the priority of said profile(s) on host_ad for client_ad"""
1575    for profile in profiles:
1576        host_ad.log.info("Profile {} on {} for {} set to priority {}".format(
1577            profile, host_ad.droid.bluetoothGetLocalName(),
1578            client_ad.droid.bluetoothGetLocalAddress(), priority.value))
1579        if bt_profile_constants['a2dp_sink'] == profile:
1580            host_ad.droid.bluetoothA2dpSinkSetPriority(
1581                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1582        elif bt_profile_constants['headset_client'] == profile:
1583            host_ad.droid.bluetoothHfpClientSetPriority(
1584                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1585        elif bt_profile_constants['pbap_client'] == profile:
1586            host_ad.droid.bluetoothPbapClientSetPriority(
1587                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1588        else:
1589            host_ad.log.error(
1590                "Profile {} not yet supported for priority settings".format(
1591                    profile))
1592
1593
1594def setup_multiple_devices_for_bt_test(android_devices):
1595    """A common setup routine for Bluetooth on input Android device list.
1596
1597    Things this function sets up:
1598    1. Resets Bluetooth
1599    2. Set Bluetooth local name to random string of size 4
1600    3. Disable BLE background scanning.
1601    4. Enable Bluetooth snoop logging.
1602
1603    Args:
1604        android_devices: Android device list to setup Bluetooth on.
1605
1606    Returns:
1607        True if successful, false if unsuccessful.
1608    """
1609    log.info("Setting up Android Devices")
1610    # TODO: Temp fix for an selinux error.
1611    for ad in android_devices:
1612        ad.adb.shell("setenforce 0")
1613    threads = []
1614    try:
1615        for a in android_devices:
1616            thread = threading.Thread(target=factory_reset_bluetooth,
1617                                      args=([[a]]))
1618            threads.append(thread)
1619            thread.start()
1620        for t in threads:
1621            t.join()
1622
1623        for a in android_devices:
1624            d = a.droid
1625            # TODO: Create specific RPC command to instantiate
1626            # BluetoothConnectionFacade. This is just a workaround.
1627            d.bluetoothStartConnectionStateChangeMonitor("")
1628            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
1629            if not setup_result:
1630                a.log.error("Failed to set device name.")
1631                return setup_result
1632            d.bluetoothDisableBLE()
1633            utils.set_location_service(a, True)
1634            bonded_devices = d.bluetoothGetBondedDevices()
1635            for b in bonded_devices:
1636                a.log.info("Removing bond for device {}".format(b['address']))
1637                d.bluetoothUnbond(b['address'])
1638        for a in android_devices:
1639            a.adb.shell("setprop persist.bluetooth.btsnooplogmode full")
1640            getprop_result = a.adb.shell(
1641                "getprop persist.bluetooth.btsnooplogmode") == "full"
1642            if not getprop_result:
1643                a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.")
1644    except Exception as err:
1645        log.error("Something went wrong in multi device setup: {}".format(err))
1646        return False
1647    return setup_result
1648
1649
1650def setup_n_advertisements(adv_ad, num_advertisements):
1651    """Setup input number of advertisements on input Android device.
1652
1653    Args:
1654        adv_ad: The Android device to start LE advertisements on.
1655        num_advertisements: The number of advertisements to start.
1656
1657    Returns:
1658        advertise_callback_list: List of advertisement callback ids.
1659    """
1660    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
1661        ble_advertise_settings_modes['low_latency'])
1662    advertise_data = adv_ad.droid.bleBuildAdvertiseData()
1663    advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings()
1664    advertise_callback_list = []
1665    for i in range(num_advertisements):
1666        advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback()
1667        advertise_callback_list.append(advertise_callback)
1668        adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
1669                                            advertise_settings)
1670        try:
1671            adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
1672                                bt_default_timeout)
1673            adv_ad.log.info("Advertisement {} started.".format(i + 1))
1674        except Empty as error:
1675            adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
1676            raise BtTestUtilsError(
1677                "Test failed with Empty error: {}".format(error))
1678    return advertise_callback_list
1679
1680
1681def take_btsnoop_log(ad, testcase, testname):
1682    """Grabs the btsnoop_hci log on a device and stores it in the log directory
1683    of the test class.
1684
1685    If you want grab the btsnoop_hci log, call this function with android_device
1686    objects in on_fail. Bug report takes a relative long time to take, so use
1687    this cautiously.
1688
1689    Args:
1690        ad: The android_device instance to take bugreport on.
1691        testcase: Name of the test calss that triggered this snoop log.
1692        testname: Name of the test case that triggered this bug report.
1693    """
1694    testname = "".join(x for x in testname if x.isalnum())
1695    serial = ad.serial
1696    device_model = ad.droid.getBuildModel()
1697    device_model = device_model.replace(" ", "")
1698    out_name = ','.join((testname, device_model, serial))
1699    snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs')
1700    os.makedirs(snoop_path, exist_ok=True)
1701    cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device, " ",
1702                   snoop_path + '/' + out_name, ".btsnoop_hci.log"))
1703    exe_cmd(cmd)
1704    try:
1705        cmd = ''.join(
1706            ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ",
1707             snoop_path + '/' + out_name, ".btsnoop_hci.log.last"))
1708        exe_cmd(cmd)
1709    except Exception as err:
1710        testcase.log.info(
1711            "File does not exist {}".format(btsnoop_last_log_path_on_device))
1712
1713
1714def take_btsnoop_logs(android_devices, testcase, testname):
1715    """Pull btsnoop logs from an input list of android devices.
1716
1717    Args:
1718        android_devices: the list of Android devices to pull btsnoop logs from.
1719        testcase: Name of the test calss that triggered this snoop log.
1720        testname: Name of the test case that triggered this bug report.
1721    """
1722    for a in android_devices:
1723        take_btsnoop_log(a, testcase, testname)
1724
1725
1726def teardown_n_advertisements(adv_ad, num_advertisements,
1727                              advertise_callback_list):
1728    """Stop input number of advertisements on input Android device.
1729
1730    Args:
1731        adv_ad: The Android device to stop LE advertisements on.
1732        num_advertisements: The number of advertisements to stop.
1733        advertise_callback_list: The list of advertisement callbacks to stop.
1734
1735    Returns:
1736        True if successful, false if unsuccessful.
1737    """
1738    for n in range(num_advertisements):
1739        adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n])
1740    return True
1741
1742
1743def verify_server_and_client_connected(client_ad, server_ad, log=True):
1744    """Verify that input server and client Android devices are connected.
1745
1746    This code is under the assumption that there will only be
1747    a single connection.
1748
1749    Args:
1750        client_ad: the Android device to check number of active connections.
1751        server_ad: the Android device to check number of active connections.
1752
1753    Returns:
1754        True both server and client have at least 1 active connection,
1755        false if unsuccessful.
1756    """
1757    test_result = True
1758    if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1759        if log:
1760            server_ad.log.error("No socket connections found on server.")
1761        test_result = False
1762    if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1763        if log:
1764            client_ad.log.error("No socket connections found on client.")
1765        test_result = False
1766    return test_result
1767
1768
1769def wait_for_bluetooth_manager_state(droid,
1770                                     state=None,
1771                                     timeout=10,
1772                                     threshold=5):
1773    """ Waits for BlueTooth normalized state or normalized explicit state
1774    args:
1775        droid: droid device object
1776        state: expected BlueTooth state
1777        timeout: max timeout threshold
1778        threshold: list len of bt state
1779    Returns:
1780        True if successful, false if unsuccessful.
1781    """
1782    all_states = []
1783    get_state = lambda: droid.bluetoothGetLeState()
1784    start_time = time.time()
1785    while time.time() < start_time + timeout:
1786        all_states.append(get_state())
1787        if len(all_states) >= threshold:
1788            # for any normalized state
1789            if state is None:
1790                if len(set(all_states[-threshold:])) == 1:
1791                    log.info("State normalized {}".format(
1792                        set(all_states[-threshold:])))
1793                    return True
1794            else:
1795                # explicit check against normalized state
1796                if set([state]).issubset(all_states[-threshold:]):
1797                    return True
1798        time.sleep(0.5)
1799    log.error(
1800        "Bluetooth state fails to normalize" if state is None else
1801        "Failed to match bluetooth state, current state {} expected state {}".
1802        format(get_state(), state))
1803    return False
1804
1805
1806def _wait_for_passkey_match(pri_ad, sec_ad):
1807    pri_pin, sec_pin = -1, 1
1808    pri_variant, sec_variant = -1, 1
1809    pri_pairing_req, sec_pairing_req = None, None
1810    try:
1811        pri_pairing_req = pri_ad.ed.pop_event(
1812            event_name="BluetoothActionPairingRequest",
1813            timeout=bt_default_timeout)
1814        pri_variant = pri_pairing_req["data"]["PairingVariant"]
1815        pri_pin = pri_pairing_req["data"]["Pin"]
1816        pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format(
1817            pri_pin, pri_variant))
1818        sec_pairing_req = sec_ad.ed.pop_event(
1819            event_name="BluetoothActionPairingRequest",
1820            timeout=bt_default_timeout)
1821        sec_variant = sec_pairing_req["data"]["PairingVariant"]
1822        sec_pin = sec_pairing_req["data"]["Pin"]
1823        sec_ad.log.info("Secondary device received Pin: {}, Variant: {}".format(
1824            sec_pin, sec_variant))
1825    except Empty as err:
1826        log.error("Wait for pin error: {}".format(err))
1827        log.error("Pairing request state, Primary: {}, Secondary: {}".format(
1828            pri_pairing_req, sec_pairing_req))
1829        return False
1830    if pri_variant == sec_variant == pairing_variant_passkey_confirmation:
1831        confirmation = pri_pin == sec_pin
1832        if confirmation:
1833            log.info("Pairing code matched, accepting connection")
1834        else:
1835            log.info("Pairing code mismatched, rejecting connection")
1836        pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1837                               str(confirmation))
1838        sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1839                               str(confirmation))
1840        if not confirmation:
1841            return False
1842    elif pri_variant != sec_variant:
1843        log.error("Pairing variant mismatched, abort connection")
1844        return False
1845    return True
1846
1847
1848def write_read_verify_data(client_ad, server_ad, msg, binary=False):
1849    """Verify that the client wrote data to the server Android device correctly.
1850
1851    Args:
1852        client_ad: the Android device to perform the write.
1853        server_ad: the Android device to read the data written.
1854        msg: the message to write.
1855        binary: if the msg arg is binary or not.
1856
1857    Returns:
1858        True if the data written matches the data read, false if not.
1859    """
1860    client_ad.log.info("Write message.")
1861    try:
1862        if binary:
1863            client_ad.droid.bluetoothSocketConnWriteBinary(msg)
1864        else:
1865            client_ad.droid.bluetoothSocketConnWrite(msg)
1866    except Exception as err:
1867        client_ad.log.error("Failed to write data: {}".format(err))
1868        return False
1869    server_ad.log.info("Read message.")
1870    try:
1871        if binary:
1872            read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
1873                "\r\n")
1874        else:
1875            read_msg = server_ad.droid.bluetoothSocketConnRead()
1876    except Exception as err:
1877        server_ad.log.error("Failed to read data: {}".format(err))
1878        return False
1879    log.info("Verify message.")
1880    if msg != read_msg:
1881        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
1882        return False
1883    return True
1884
1885
1886class MediaControlOverSl4a(object):
1887    """Media control using sl4a facade for general purpose.
1888
1889    """
1890
1891    def __init__(self, android_device, music_file):
1892        """Initialize the media_control class.
1893
1894        Args:
1895            android_dut: android_device object
1896            music_file: location of the music file
1897        """
1898        self.android_device = android_device
1899        self.music_file = music_file
1900
1901    def play(self):
1902        """Play media.
1903
1904        """
1905        self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file,
1906                                                'default', True)
1907        playing = self.android_device.droid.mediaIsPlaying()
1908        asserts.assert_true(playing,
1909                            'Failed to play music %s' % self.music_file)
1910
1911    def pause(self):
1912        """Pause media.
1913
1914        """
1915        self.android_device.droid.mediaPlayPause('default')
1916        paused = not self.android_device.droid.mediaIsPlaying()
1917        asserts.assert_true(paused,
1918                            'Failed to pause music %s' % self.music_file)
1919
1920    def resume(self):
1921        """Resume media.
1922
1923        """
1924        self.android_device.droid.mediaPlayStart('default')
1925        playing = self.android_device.droid.mediaIsPlaying()
1926        asserts.assert_true(playing,
1927                            'Failed to play music %s' % self.music_file)
1928
1929    def stop(self):
1930        """Stop media.
1931
1932        """
1933        self.android_device.droid.mediaPlayStop('default')
1934        stopped = not self.android_device.droid.mediaIsPlaying()
1935        asserts.assert_true(stopped,
1936                            'Failed to stop music %s' % self.music_file)
1937