• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18import os
19import random
20import re
21import string
22import threading
23import time
24from queue import Empty
25from subprocess import call
26
27from acts import asserts
28from acts_contrib.test_utils.bt.bt_constants import adv_fail
29from acts_contrib.test_utils.bt.bt_constants import adv_succ
30from acts_contrib.test_utils.bt.bt_constants import batch_scan_not_supported_list
31from acts_contrib.test_utils.bt.bt_constants import batch_scan_result
32from acts_contrib.test_utils.bt.bt_constants import bits_per_samples
33from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes
34from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_tx_powers
35from acts_contrib.test_utils.bt.bt_constants import bluetooth_a2dp_codec_config_changed
36from acts_contrib.test_utils.bt.bt_constants import bluetooth_off
37from acts_contrib.test_utils.bt.bt_constants import bluetooth_on
38from acts_contrib.test_utils.bt.bt_constants import \
39    bluetooth_profile_connection_state_changed
40from acts_contrib.test_utils.bt.bt_constants import bluetooth_socket_conn_test_uuid
41from acts_contrib.test_utils.bt.bt_constants import bt_default_timeout
42from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants
43from acts_contrib.test_utils.bt.bt_constants import bt_profile_states
44from acts_contrib.test_utils.bt.bt_constants import bt_rfcomm_uuids
45from acts_contrib.test_utils.bt.bt_constants import bt_scan_mode_types
46from acts_contrib.test_utils.bt.bt_constants import btsnoop_last_log_path_on_device
47from acts_contrib.test_utils.bt.bt_constants import btsnoop_log_path_on_device
48from acts_contrib.test_utils.bt.bt_constants import channel_modes
49from acts_contrib.test_utils.bt.bt_constants import codec_types
50from acts_contrib.test_utils.bt.bt_constants import default_bluetooth_socket_timeout_ms
51from acts_contrib.test_utils.bt.bt_constants import default_rfcomm_timeout_ms
52from acts_contrib.test_utils.bt.bt_constants import hid_id_keyboard
53from acts_contrib.test_utils.bt.bt_constants import pairing_variant_passkey_confirmation
54from acts_contrib.test_utils.bt.bt_constants import pan_connect_timeout
55from acts_contrib.test_utils.bt.bt_constants import sample_rates
56from acts_contrib.test_utils.bt.bt_constants import scan_result
57from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants
58from acts_contrib.test_utils.bt.bt_constants import small_timeout
59from acts_contrib.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
60from acts_contrib.test_utils.tel.tel_test_utils import verify_http_connection
61from acts.utils import exe_cmd
62
63from acts import utils
64
65log = logging
66
67advertisements_to_devices = {}
68
69
70class BtTestUtilsError(Exception):
71    pass
72
73
74def _add_android_device_to_dictionary(android_device, profile_list,
75                                      selector_dict):
76    """Adds the AndroidDevice and supported features to the selector dictionary
77
78    Args:
79        android_device: The Android device.
80        profile_list: The list of profiles the Android device supports.
81    """
82    for profile in profile_list:
83        if profile in selector_dict and android_device not in selector_dict[
84                profile]:
85            selector_dict[profile].append(android_device)
86        else:
87            selector_dict[profile] = [android_device]
88
89
90def bluetooth_enabled_check(ad, timeout_sec=5):
91    """Checks if the Bluetooth state is enabled, if not it will attempt to
92    enable it.
93
94    Args:
95        ad: The Android device list to enable Bluetooth on.
96        timeout_sec: number of seconds to wait for toggle to take effect.
97
98    Returns:
99        True if successful, false if unsuccessful.
100    """
101    if not ad.droid.bluetoothCheckState():
102        ad.droid.bluetoothToggleState(True)
103        expected_bluetooth_on_event_name = bluetooth_on
104        try:
105            ad.ed.pop_event(expected_bluetooth_on_event_name,
106                            bt_default_timeout)
107        except Empty:
108            ad.log.info(
109                "Failed to toggle Bluetooth on(no broadcast received).")
110            # Try one more time to poke at the actual state.
111            if ad.droid.bluetoothCheckState():
112                ad.log.info(".. actual state is ON")
113                return True
114            ad.log.error(".. actual state is OFF")
115            return False
116    end_time = time.time() + timeout_sec
117    while not ad.droid.bluetoothCheckState() and time.time() < end_time:
118        time.sleep(1)
119    return ad.droid.bluetoothCheckState()
120
121
122def check_device_supported_profiles(droid):
123    """Checks for Android device supported profiles.
124
125    Args:
126        droid: The droid object to query.
127
128    Returns:
129        A dictionary of supported profiles.
130    """
131    profile_dict = {}
132    profile_dict['hid'] = droid.bluetoothHidIsReady()
133    profile_dict['hsp'] = droid.bluetoothHspIsReady()
134    profile_dict['a2dp'] = droid.bluetoothA2dpIsReady()
135    profile_dict['avrcp'] = droid.bluetoothAvrcpIsReady()
136    profile_dict['a2dp_sink'] = droid.bluetoothA2dpSinkIsReady()
137    profile_dict['hfp_client'] = droid.bluetoothHfpClientIsReady()
138    profile_dict['pbap_client'] = droid.bluetoothPbapClientIsReady()
139    return profile_dict
140
141
142def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
143                                     adv_android_device, adv_callback_list):
144    """Try to gracefully stop all scanning and advertising instances.
145
146    Args:
147        scn_android_device: The Android device that is actively scanning.
148        scn_callback_list: The scan callback id list that needs to be stopped.
149        adv_android_device: The Android device that is actively advertising.
150        adv_callback_list: The advertise callback id list that needs to be
151            stopped.
152    """
153    scan_droid, scan_ed = scn_android_device.droid, scn_android_device.ed
154    adv_droid = adv_android_device.droid
155    try:
156        for scan_callback in scn_callback_list:
157            scan_droid.bleStopBleScan(scan_callback)
158    except Exception as err:
159        scn_android_device.log.debug(
160            "Failed to stop LE scan... reseting Bluetooth. Error {}".format(
161                err))
162        reset_bluetooth([scn_android_device])
163    try:
164        for adv_callback in adv_callback_list:
165            adv_droid.bleStopBleAdvertising(adv_callback)
166    except Exception as err:
167        adv_android_device.log.debug(
168            "Failed to stop LE advertisement... reseting Bluetooth. Error {}".
169            format(err))
170        reset_bluetooth([adv_android_device])
171
172
173def clear_bonded_devices(ad):
174    """Clear bonded devices from the input Android device.
175
176    Args:
177        ad: the Android device performing the connection.
178    Returns:
179        True if clearing bonded devices was successful, false if unsuccessful.
180    """
181    bonded_device_list = ad.droid.bluetoothGetBondedDevices()
182    while bonded_device_list:
183        device_address = bonded_device_list[0]['address']
184        if not ad.droid.bluetoothUnbond(device_address):
185            log.error("Failed to unbond {} from {}".format(
186                device_address, ad.serial))
187            return False
188        log.info("Successfully unbonded {} from {}".format(
189            device_address, ad.serial))
190        #TODO: wait for BOND_STATE_CHANGED intent instead of waiting
191        time.sleep(1)
192
193        # If device was first connected using LE transport, after bonding it is
194        # accessible through it's LE address, and through it classic address.
195        # Unbonding it will unbond two devices representing different
196        # "addresses". Attempt to unbond such already unbonded devices will
197        # result in bluetoothUnbond returning false.
198        bonded_device_list = ad.droid.bluetoothGetBondedDevices()
199    return True
200
201
202def connect_phone_to_headset(android,
203                             headset,
204                             timeout=bt_default_timeout,
205                             connection_check_period=10):
206    """Connects android phone to bluetooth headset.
207    Headset object must have methods power_on and enter_pairing_mode,
208    and attribute mac_address.
209
210    Args:
211        android: AndroidDevice object with SL4A installed.
212        headset: Object with attribute mac_address and methods power_on and
213            enter_pairing_mode.
214        timeout: Seconds to wait for devices to connect.
215        connection_check_period: how often to check for connection once the
216            SL4A connect RPC has been sent.
217    Returns:
218        connected (bool): True if devices are paired and connected by end of
219        method. False otherwise.
220    """
221    headset_mac_address = headset.mac_address
222    connected = is_a2dp_src_device_connected(android, headset_mac_address)
223    log.info('Devices connected before pair attempt: %s' % connected)
224    if not connected:
225        # Turn on headset and initiate pairing mode.
226        headset.enter_pairing_mode()
227        android.droid.bluetoothStartPairingHelper()
228    start_time = time.time()
229    # If already connected, skip pair and connect attempt.
230    while not connected and (time.time() - start_time < timeout):
231        bonded_info = android.droid.bluetoothGetBondedDevices()
232        connected_info = android.droid.bluetoothGetConnectedDevices()
233        if headset.mac_address not in [
234                info["address"] for info in bonded_info
235        ]:
236            # Use SL4A to pair and connect with headset.
237            headset.enter_pairing_mode()
238            android.droid.bluetoothDiscoverAndBond(headset_mac_address)
239        elif headset.mac_address not in [
240                info["address"] for info in connected_info
241        ]:
242            #Device is bonded but not connected
243            android.droid.bluetoothConnectBonded(headset_mac_address)
244        else:
245            #Headset is connected, but A2DP profile is not
246            android.droid.bluetoothA2dpConnect(headset_mac_address)
247        log.info('Waiting for connection...')
248        time.sleep(connection_check_period)
249        # Check for connection.
250        connected = is_a2dp_src_device_connected(android, headset_mac_address)
251    log.info('Devices connected after pair attempt: %s' % connected)
252    return connected
253
254def connect_pri_to_sec(pri_ad, sec_ad, profiles_set, attempts=2):
255    """Connects pri droid to secondary droid.
256
257    Args:
258        pri_ad: AndroidDroid initiating connection
259        sec_ad: AndroidDroid accepting connection
260        profiles_set: Set of profiles to be connected
261        attempts: Number of attempts to try until failure.
262
263    Returns:
264        Pass if True
265        Fail if False
266    """
267    device_addr = sec_ad.droid.bluetoothGetLocalAddress()
268    # Allows extra time for the SDP records to be updated.
269    time.sleep(2)
270    curr_attempts = 0
271    while curr_attempts < attempts:
272        log.info("connect_pri_to_sec curr attempt {} total {}".format(
273            curr_attempts, attempts))
274        if _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
275            return True
276        curr_attempts += 1
277    log.error("connect_pri_to_sec failed to connect after {} attempts".format(
278        attempts))
279    return False
280
281
282def _connect_pri_to_sec(pri_ad, sec_ad, profiles_set):
283    """Connects pri droid to secondary droid.
284
285    Args:
286        pri_ad: AndroidDroid initiating connection.
287        sec_ad: AndroidDroid accepting connection.
288        profiles_set: Set of profiles to be connected.
289
290    Returns:
291        True of connection is successful, false if unsuccessful.
292    """
293    # Check if we support all profiles.
294    supported_profiles = bt_profile_constants.values()
295    for profile in profiles_set:
296        if profile not in supported_profiles:
297            pri_ad.log.info("Profile {} is not supported list {}".format(
298                profile, supported_profiles))
299            return False
300
301    # First check that devices are bonded.
302    paired = False
303    for paired_device in pri_ad.droid.bluetoothGetBondedDevices():
304        if paired_device['address'] == \
305                sec_ad.droid.bluetoothGetLocalAddress():
306            paired = True
307            break
308
309    if not paired:
310        pri_ad.log.error("Not paired to {}".format(sec_ad.serial))
311        return False
312
313    # Now try to connect them, the following call will try to initiate all
314    # connections.
315    pri_ad.droid.bluetoothConnectBonded(
316        sec_ad.droid.bluetoothGetLocalAddress())
317
318    end_time = time.time() + 10
319    profile_connected = set()
320    sec_addr = sec_ad.droid.bluetoothGetLocalAddress()
321    pri_ad.log.info("Profiles to be connected {}".format(profiles_set))
322    # First use APIs to check profile connection state
323    while (time.time() < end_time
324           and not profile_connected.issuperset(profiles_set)):
325        if (bt_profile_constants['headset_client'] not in profile_connected
326                and bt_profile_constants['headset_client'] in profiles_set):
327            if is_hfp_client_device_connected(pri_ad, sec_addr):
328                profile_connected.add(bt_profile_constants['headset_client'])
329        if (bt_profile_constants['a2dp'] not in profile_connected
330                and bt_profile_constants['a2dp'] in profiles_set):
331            if is_a2dp_src_device_connected(pri_ad, sec_addr):
332                profile_connected.add(bt_profile_constants['a2dp'])
333        if (bt_profile_constants['a2dp_sink'] not in profile_connected
334                and bt_profile_constants['a2dp_sink'] in profiles_set):
335            if is_a2dp_snk_device_connected(pri_ad, sec_addr):
336                profile_connected.add(bt_profile_constants['a2dp_sink'])
337        if (bt_profile_constants['map_mce'] not in profile_connected
338                and bt_profile_constants['map_mce'] in profiles_set):
339            if is_map_mce_device_connected(pri_ad, sec_addr):
340                profile_connected.add(bt_profile_constants['map_mce'])
341        if (bt_profile_constants['map'] not in profile_connected
342                and bt_profile_constants['map'] in profiles_set):
343            if is_map_mse_device_connected(pri_ad, sec_addr):
344                profile_connected.add(bt_profile_constants['map'])
345        time.sleep(0.1)
346    # If APIs fail, try to find the connection broadcast receiver.
347    while not profile_connected.issuperset(profiles_set):
348        try:
349            profile_event = pri_ad.ed.pop_event(
350                bluetooth_profile_connection_state_changed,
351                bt_default_timeout + 10)
352            pri_ad.log.info("Got event {}".format(profile_event))
353        except Exception:
354            pri_ad.log.error("Did not get {} profiles left {}".format(
355                bluetooth_profile_connection_state_changed, profile_connected))
356            return False
357
358        profile = profile_event['data']['profile']
359        state = profile_event['data']['state']
360        device_addr = profile_event['data']['addr']
361        if state == bt_profile_states['connected'] and \
362                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
363            profile_connected.add(profile)
364        pri_ad.log.info(
365            "Profiles connected until now {}".format(profile_connected))
366    # Failure happens inside the while loop. If we came here then we already
367    # connected.
368    return True
369
370
371def determine_max_advertisements(android_device):
372    """Determines programatically how many advertisements the Android device
373    supports.
374
375    Args:
376        android_device: The Android device to determine max advertisements of.
377
378    Returns:
379        The maximum advertisement count.
380    """
381    android_device.log.info(
382        "Determining number of maximum concurrent advertisements...")
383    advertisement_count = 0
384    bt_enabled = False
385    expected_bluetooth_on_event_name = bluetooth_on
386    if not android_device.droid.bluetoothCheckState():
387        android_device.droid.bluetoothToggleState(True)
388    try:
389        android_device.ed.pop_event(expected_bluetooth_on_event_name,
390                                    bt_default_timeout)
391    except Exception:
392        android_device.log.info(
393            "Failed to toggle Bluetooth on(no broadcast received).")
394        # Try one more time to poke at the actual state.
395        if android_device.droid.bluetoothCheckState() is True:
396            android_device.log.info(".. actual state is ON")
397        else:
398            android_device.log.error(
399                "Failed to turn Bluetooth on. Setting default advertisements to 1"
400            )
401            advertisement_count = -1
402            return advertisement_count
403    advertise_callback_list = []
404    advertise_data = android_device.droid.bleBuildAdvertiseData()
405    advertise_settings = android_device.droid.bleBuildAdvertiseSettings()
406    while (True):
407        advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
408        advertise_callback_list.append(advertise_callback)
409
410        android_device.droid.bleStartBleAdvertising(advertise_callback,
411                                                    advertise_data,
412                                                    advertise_settings)
413
414        regex = "(" + adv_succ.format(
415            advertise_callback) + "|" + adv_fail.format(
416                advertise_callback) + ")"
417        # wait for either success or failure event
418        evt = android_device.ed.pop_events(regex, bt_default_timeout,
419                                           small_timeout)
420        if evt[0]["name"] == adv_succ.format(advertise_callback):
421            advertisement_count += 1
422            android_device.log.info(
423                "Advertisement {} started.".format(advertisement_count))
424        else:
425            error = evt[0]["data"]["Error"]
426            if error == "ADVERTISE_FAILED_TOO_MANY_ADVERTISERS":
427                android_device.log.info(
428                    "Advertisement failed to start. Reached max " +
429                    "advertisements at {}".format(advertisement_count))
430                break
431            else:
432                raise BtTestUtilsError(
433                    "Expected ADVERTISE_FAILED_TOO_MANY_ADVERTISERS," +
434                    " but received bad error code {}".format(error))
435    try:
436        for adv in advertise_callback_list:
437            android_device.droid.bleStopBleAdvertising(adv)
438    except Exception:
439        android_device.log.error(
440            "Failed to stop advertisingment, resetting Bluetooth.")
441        reset_bluetooth([android_device])
442    return advertisement_count
443
444
445def disable_bluetooth(droid):
446    """Disable Bluetooth on input Droid object.
447
448    Args:
449        droid: The droid object to disable Bluetooth on.
450
451    Returns:
452        True if successful, false if unsuccessful.
453    """
454    if droid.bluetoothCheckState() is True:
455        droid.bluetoothToggleState(False)
456        if droid.bluetoothCheckState() is True:
457            log.error("Failed to toggle Bluetooth off.")
458            return False
459    return True
460
461
462def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
463    """
464    Disconnect primary from secondary on a specific set of profiles
465    Args:
466        pri_ad - Primary android_device initiating disconnection
467        sec_ad - Secondary android droid (sl4a interface to keep the
468          method signature the same connect_pri_to_sec above)
469        profiles_list - List of profiles we want to disconnect from
470
471    Returns:
472        True on Success
473        False on Failure
474    """
475    # Sanity check to see if all the profiles in the given set is supported
476    supported_profiles = bt_profile_constants.values()
477    for profile in profiles_list:
478        if profile not in supported_profiles:
479            pri_ad.log.info("Profile {} is not in supported list {}".format(
480                profile, supported_profiles))
481            return False
482
483    pri_ad.log.info(pri_ad.droid.bluetoothGetBondedDevices())
484    # Disconnecting on a already disconnected profile is a nop,
485    # so not checking for the connection state
486    try:
487        pri_ad.droid.bluetoothDisconnectConnectedProfile(
488            sec_ad.droid.bluetoothGetLocalAddress(), profiles_list)
489    except Exception as err:
490        pri_ad.log.error(
491            "Exception while trying to disconnect profile(s) {}: {}".format(
492                profiles_list, err))
493        return False
494
495    profile_disconnected = set()
496    pri_ad.log.info("Disconnecting from profiles: {}".format(profiles_list))
497
498    while not profile_disconnected.issuperset(profiles_list):
499        try:
500            profile_event = pri_ad.ed.pop_event(
501                bluetooth_profile_connection_state_changed, bt_default_timeout)
502            pri_ad.log.info("Got event {}".format(profile_event))
503        except Exception as e:
504            pri_ad.log.error(
505                "Did not disconnect from Profiles. Reason {}".format(e))
506            return False
507
508        profile = profile_event['data']['profile']
509        state = profile_event['data']['state']
510        device_addr = profile_event['data']['addr']
511
512        if state == bt_profile_states['disconnected'] and \
513                device_addr == sec_ad.droid.bluetoothGetLocalAddress():
514            profile_disconnected.add(profile)
515        pri_ad.log.info(
516            "Profiles disconnected so far {}".format(profile_disconnected))
517
518    return True
519
520
521def enable_bluetooth(droid, ed):
522    if droid.bluetoothCheckState() is True:
523        return True
524
525    droid.bluetoothToggleState(True)
526    expected_bluetooth_on_event_name = bluetooth_on
527    try:
528        ed.pop_event(expected_bluetooth_on_event_name, bt_default_timeout)
529    except Exception:
530        log.info("Failed to toggle Bluetooth on (no broadcast received)")
531        if droid.bluetoothCheckState() is True:
532            log.info(".. actual state is ON")
533            return True
534        log.info(".. actual state is OFF")
535        return False
536
537    return True
538
539
540def factory_reset_bluetooth(android_devices):
541    """Clears Bluetooth stack of input Android device list.
542
543        Args:
544            android_devices: The Android device list to reset Bluetooth
545
546        Returns:
547            True if successful, false if unsuccessful.
548        """
549    for a in android_devices:
550        droid, ed = a.droid, a.ed
551        a.log.info("Reset state of bluetooth on device.")
552        if not bluetooth_enabled_check(a):
553            return False
554        # TODO: remove device unbond b/79418045
555        # Temporary solution to ensure all devices are unbonded
556        bonded_devices = droid.bluetoothGetBondedDevices()
557        for b in bonded_devices:
558            a.log.info("Removing bond for device {}".format(b['address']))
559            droid.bluetoothUnbond(b['address'])
560
561        droid.bluetoothFactoryReset()
562        wait_for_bluetooth_manager_state(droid)
563        if not enable_bluetooth(droid, ed):
564            return False
565    return True
566
567
568def generate_ble_advertise_objects(droid):
569    """Generate generic LE advertise objects.
570
571    Args:
572        droid: The droid object to generate advertise LE objects from.
573
574    Returns:
575        advertise_callback: The generated advertise callback id.
576        advertise_data: The generated advertise data id.
577        advertise_settings: The generated advertise settings id.
578    """
579    advertise_callback = droid.bleGenBleAdvertiseCallback()
580    advertise_data = droid.bleBuildAdvertiseData()
581    advertise_settings = droid.bleBuildAdvertiseSettings()
582    return advertise_callback, advertise_data, advertise_settings
583
584
585def generate_ble_scan_objects(droid):
586    """Generate generic LE scan objects.
587
588    Args:
589        droid: The droid object to generate LE scan objects from.
590
591    Returns:
592        filter_list: The generated scan filter list id.
593        scan_settings: The generated scan settings id.
594        scan_callback: The generated scan callback id.
595    """
596    filter_list = droid.bleGenFilterList()
597    scan_settings = droid.bleBuildScanSetting()
598    scan_callback = droid.bleGenScanCallback()
599    return filter_list, scan_settings, scan_callback
600
601
602def generate_id_by_size(size,
603                        chars=(string.ascii_lowercase +
604                               string.ascii_uppercase + string.digits)):
605    """Generate random ascii characters of input size and input char types
606
607    Args:
608        size: Input size of string.
609        chars: (Optional) Chars to use in generating a random string.
610
611    Returns:
612        String of random input chars at the input size.
613    """
614    return ''.join(random.choice(chars) for _ in range(size))
615
616
617def get_advanced_droid_list(android_devices):
618    """Add max_advertisement and batch_scan_supported attributes to input
619    Android devices
620
621    This will programatically determine maximum LE advertisements of each
622    input Android device.
623
624    Args:
625        android_devices: The Android devices to setup.
626
627    Returns:
628        List of Android devices with new attribtues.
629    """
630    droid_list = []
631    for a in android_devices:
632        d, e = a.droid, a.ed
633        model = d.getBuildModel()
634        max_advertisements = 1
635        batch_scan_supported = True
636        if model in advertisements_to_devices.keys():
637            max_advertisements = advertisements_to_devices[model]
638        else:
639            max_advertisements = determine_max_advertisements(a)
640            max_tries = 3
641            # Retry to calculate max advertisements
642            while max_advertisements == -1 and max_tries > 0:
643                a.log.info(
644                    "Attempts left to determine max advertisements: {}".format(
645                        max_tries))
646                max_advertisements = determine_max_advertisements(a)
647                max_tries -= 1
648            advertisements_to_devices[model] = max_advertisements
649        if model in batch_scan_not_supported_list:
650            batch_scan_supported = False
651        role = {
652            'droid': d,
653            'ed': e,
654            'max_advertisements': max_advertisements,
655            'batch_scan_supported': batch_scan_supported
656        }
657        droid_list.append(role)
658    return droid_list
659
660
661def get_bluetooth_crash_count(android_device):
662    out = android_device.adb.shell("dumpsys bluetooth_manager")
663    return int(re.search("crashed(.*\d)", out).group(1))
664
665
666def read_otp(ad):
667    """Reads and parses the OTP output to return TX power backoff
668
669    Reads the OTP registers from the phone, parses them to return a
670    dict of TX power backoffs for different power levels
671
672    Args:
673        ad : android device object
674
675    Returns :
676        otp_dict : power backoff dict
677    """
678
679    ad.adb.shell('svc bluetooth disable')
680    time.sleep(2)
681    otp_output = ad.adb.shell('bluetooth_sar_test -r')
682    ad.adb.shell('svc bluetooth enable')
683    time.sleep(2)
684    otp_dict = {
685        "BR": {
686            "10": 0,
687            "9": 0,
688            "8": 0
689        },
690        "EDR": {
691            "10": 0,
692            "9": 0,
693            "8": 0
694        },
695        "BLE": {
696            "10": 0,
697            "9": 0,
698            "8": 0
699        }
700    }
701
702    otp_regex = '\s+\[\s+PL10:\s+(\d+)\s+PL9:\s+(\d+)*\s+PL8:\s+(\d+)\s+\]'
703
704    for key in otp_dict:
705        bank_list = re.findall("{}{}".format(key, otp_regex), otp_output)
706        for bank_tuple in bank_list:
707            if ('0', '0', '0') != bank_tuple:
708                [otp_dict[key]["10"], otp_dict[key]["9"],
709                 otp_dict[key]["8"]] = bank_tuple
710    return otp_dict
711
712
713def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True):
714    """ Function to get the bt metric from logcat.
715
716    Captures logcat for the specified duration and returns the bqr results.
717    Takes list of android objects as input. If a single android object is given,
718    converts it into a list.
719
720    Args:
721        ad_list: list of android_device objects
722        duration: time duration (seconds) for which the logcat is parsed.
723        tag: tag to be appended to the logcat dump.
724        processed: flag to process bqr output.
725
726    Returns:
727        metrics_dict: dict of metrics for each android device.
728    """
729
730    # Defining bqr quantitites and their regex to extract
731    regex_dict = {
732        "vsp_txpl": "VSP_TxPL:\s(\S+)",
733        "pwlv": "PwLv:\s(\S+)",
734        "rssi": "RSSI:\s[-](\d+)"
735    }
736    metrics_dict = {"rssi": {}, "pwlv": {}, "vsp_txpl": {}}
737
738    # Converting a single android device object to list
739    if not isinstance(ad_list, list):
740        ad_list = [ad_list]
741
742    #Time sync with the test machine
743    for ad in ad_list:
744        ad.droid.setTime(int(round(time.time() * 1000)))
745        time.sleep(0.5)
746
747    begin_time = utils.get_current_epoch_time()
748    time.sleep(duration)
749    end_time = utils.get_current_epoch_time()
750
751    for ad in ad_list:
752        bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time)
753        bqr_tag = "Handle:"
754
755        # Extracting supporting bqr quantities
756        for metric, regex in regex_dict.items():
757            bqr_metric = []
758            file_bt_log = open(bt_rssi_log, "r")
759            for line in file_bt_log:
760                if bqr_tag in line:
761                    if re.findall(regex, line):
762                        m = re.findall(regex, line)[0].strip(",")
763                        bqr_metric.append(m)
764            metrics_dict[metric][ad.serial] = bqr_metric
765
766        # Ensures back-compatibility for vsp_txpl enabled DUTs
767        if metrics_dict["vsp_txpl"][ad.serial]:
768            metrics_dict["pwlv"][ad.serial] = metrics_dict["vsp_txpl"][
769                ad.serial]
770
771        # Formatting the raw data
772        metrics_dict["rssi"][ad.serial] = [
773            (-1) * int(x) for x in metrics_dict["rssi"][ad.serial]
774        ]
775        metrics_dict["pwlv"][ad.serial] = [
776            int(x, 16) for x in metrics_dict["pwlv"][ad.serial]
777        ]
778
779        # Processing formatted data if processing is required
780        if processed:
781            # Computes the average RSSI
782            metrics_dict["rssi"][ad.serial] = round(
783                sum(metrics_dict["rssi"][ad.serial]) /
784                len(metrics_dict["rssi"][ad.serial]), 2)
785            # Returns last noted value for power level
786            metrics_dict["pwlv"][ad.serial] = float(
787                sum(metrics_dict["pwlv"][ad.serial]) /
788                len(metrics_dict["pwlv"][ad.serial]))
789
790    return metrics_dict
791
792
793def get_bt_rssi(ad, duration=1, processed=True):
794    """Function to get average bt rssi from logcat.
795
796    This function returns the average RSSI for the given duration. RSSI values are
797    extracted from BQR.
798
799    Args:
800        ad: (list of) android_device object.
801        duration: time duration(seconds) for which logcat is parsed.
802
803    Returns:
804        avg_rssi: average RSSI on each android device for the given duration.
805    """
806    function_tag = "get_bt_rssi"
807    bqr_results = get_bt_metric(ad,
808                                duration,
809                                tag=function_tag,
810                                processed=processed)
811    return bqr_results["rssi"]
812
813
814def enable_bqr(
815    ad_list,
816    bqr_interval=10,
817    bqr_event_mask=15,
818):
819    """Sets up BQR reporting.
820
821       Sets up BQR to report BT metrics at the requested frequency and toggles
822       airplane mode for the bqr settings to take effect.
823
824    Args:
825        ad_list: an android_device or list of android devices.
826    """
827    # Converting a single android device object to list
828    if not isinstance(ad_list, list):
829        ad_list = [ad_list]
830
831    for ad in ad_list:
832        #Setting BQR parameters
833        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
834            bqr_event_mask))
835        ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format(
836            bqr_interval))
837
838        ## Toggle airplane mode
839        ad.droid.connectivityToggleAirplaneMode(True)
840        ad.droid.connectivityToggleAirplaneMode(False)
841
842
843def disable_bqr(ad_list):
844    """Disables BQR reporting.
845
846    Args:
847        ad_list: an android_device or list of android devices.
848    """
849    # Converting a single android device object to list
850    if not isinstance(ad_list, list):
851        ad_list = [ad_list]
852
853    DISABLE_BQR_MASK = 0
854
855    for ad in ad_list:
856        #Disabling BQR
857        ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
858            DISABLE_BQR_MASK))
859
860        ## Toggle airplane mode
861        ad.droid.connectivityToggleAirplaneMode(True)
862        ad.droid.connectivityToggleAirplaneMode(False)
863
864
865def get_device_selector_dictionary(android_device_list):
866    """Create a dictionary of Bluetooth features vs Android devices.
867
868    Args:
869        android_device_list: The list of Android devices.
870    Returns:
871        A dictionary of profiles/features to Android devices.
872    """
873    selector_dict = {}
874    for ad in android_device_list:
875        uuids = ad.droid.bluetoothGetLocalUuids()
876
877        for profile, uuid_const in sig_uuid_constants.items():
878            uuid_check = sig_uuid_constants['BASE_UUID'].format(
879                uuid_const).lower()
880            if uuids and uuid_check in uuids:
881                if profile in selector_dict:
882                    selector_dict[profile].append(ad)
883                else:
884                    selector_dict[profile] = [ad]
885
886        # Various services may not be active during BT startup.
887        # If the device can be identified through adb shell pm list features
888        # then try to add them to the appropriate profiles / features.
889
890        # Android TV.
891        if "feature:android.hardware.type.television" in ad.features:
892            ad.log.info("Android TV device found.")
893            supported_profiles = ['AudioSink']
894            _add_android_device_to_dictionary(ad, supported_profiles,
895                                              selector_dict)
896
897        # Android Auto
898        elif "feature:android.hardware.type.automotive" in ad.features:
899            ad.log.info("Android Auto device found.")
900            # Add: AudioSink , A/V_RemoteControl,
901            supported_profiles = [
902                'AudioSink', 'A/V_RemoteControl', 'Message Notification Server'
903            ]
904            _add_android_device_to_dictionary(ad, supported_profiles,
905                                              selector_dict)
906        # Android Wear
907        elif "feature:android.hardware.type.watch" in ad.features:
908            ad.log.info("Android Wear device found.")
909            supported_profiles = []
910            _add_android_device_to_dictionary(ad, supported_profiles,
911                                              selector_dict)
912        # Android Phone
913        elif "feature:android.hardware.telephony" in ad.features:
914            ad.log.info("Android Phone device found.")
915            # Add: AudioSink
916            supported_profiles = [
917                'AudioSource', 'A/V_RemoteControlTarget',
918                'Message Access Server'
919            ]
920            _add_android_device_to_dictionary(ad, supported_profiles,
921                                              selector_dict)
922    return selector_dict
923
924
925def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
926    """Start generic advertisement and get it's mac address by LE scanning.
927
928    Args:
929        scan_ad: The Android device to use as the scanner.
930        adv_ad: The Android device to use as the advertiser.
931
932    Returns:
933        mac_address: The mac address of the advertisement.
934        advertise_callback: The advertise callback id of the active
935            advertisement.
936    """
937    adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
938    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
939        ble_advertise_settings_modes['low_latency'])
940    adv_ad.droid.bleSetAdvertiseSettingsIsConnectable(True)
941    adv_ad.droid.bleSetAdvertiseSettingsTxPowerLevel(
942        ble_advertise_settings_tx_powers['high'])
943    advertise_callback, advertise_data, advertise_settings = (
944        generate_ble_advertise_objects(adv_ad.droid))
945    adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
946                                        advertise_settings)
947    try:
948        adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
949                            bt_default_timeout)
950    except Empty as err:
951        raise BtTestUtilsError(
952            "Advertiser did not start successfully {}".format(err))
953    filter_list = scan_ad.droid.bleGenFilterList()
954    scan_settings = scan_ad.droid.bleBuildScanSetting()
955    scan_callback = scan_ad.droid.bleGenScanCallback()
956    scan_ad.droid.bleSetScanFilterDeviceName(
957        adv_ad.droid.bluetoothGetLocalName())
958    scan_ad.droid.bleBuildScanFilter(filter_list)
959    scan_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
960    try:
961        event = scan_ad.ed.pop_event(
962            "BleScan{}onScanResults".format(scan_callback), bt_default_timeout)
963    except Empty as err:
964        raise BtTestUtilsError(
965            "Scanner did not find advertisement {}".format(err))
966    mac_address = event['data']['Result']['deviceInfo']['address']
967    return mac_address, advertise_callback, scan_callback
968
969
970def hid_device_send_key_data_report(host_id, device_ad, key, interval=1):
971    """Send a HID report simulating a 1-second keyboard press from host_ad to
972    device_ad
973
974    Args:
975        host_id: the Bluetooth MAC address or name of the HID host
976        device_ad: HID device
977        key: the key we want to send
978        interval: the interval between key press and key release
979    """
980    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
981                                                 hid_keyboard_report(key))
982    time.sleep(interval)
983    device_ad.droid.bluetoothHidDeviceSendReport(host_id, hid_id_keyboard,
984                                                 hid_keyboard_report("00"))
985
986
987def hid_keyboard_report(key, modifier="00"):
988    """Get the HID keyboard report for the given key
989
990    Args:
991        key: the key we want
992        modifier: HID keyboard modifier bytes
993    Returns:
994        The byte array for the HID report.
995    """
996    return str(
997        bytearray.fromhex(" ".join(
998            [modifier, "00", key, "00", "00", "00", "00", "00"])), "utf-8")
999
1000
1001def is_a2dp_connected(sink, source):
1002    """
1003    Convenience Function to see if the 2 devices are connected on
1004    A2dp.
1005    Args:
1006        sink:       Audio Sink
1007        source:     Audio Source
1008    Returns:
1009        True if Connected
1010        False if Not connected
1011    """
1012
1013    devices = sink.droid.bluetoothA2dpSinkGetConnectedDevices()
1014    for device in devices:
1015        sink.log.info("A2dp Connected device {}".format(device["name"]))
1016        if (device["address"] == source.droid.bluetoothGetLocalAddress()):
1017            return True
1018    return False
1019
1020
1021def is_a2dp_snk_device_connected(ad, addr):
1022    """Determines if an AndroidDevice has A2DP snk connectivity to input address
1023
1024    Args:
1025        ad: the Android device
1026        addr: the address that's expected
1027    Returns:
1028        True if connection was successful, false if unsuccessful.
1029    """
1030    devices = ad.droid.bluetoothA2dpSinkGetConnectedDevices()
1031    ad.log.info("Connected A2DP Sink devices: {}".format(devices))
1032    if addr in {d['address'] for d in devices}:
1033        return True
1034    return False
1035
1036
1037def is_a2dp_src_device_connected(ad, addr):
1038    """Determines if an AndroidDevice has A2DP connectivity to input address
1039
1040    Args:
1041        ad: the Android device
1042        addr: the address that's expected
1043    Returns:
1044        True if connection was successful, false if unsuccessful.
1045    """
1046    devices = ad.droid.bluetoothA2dpGetConnectedDevices()
1047    ad.log.info("Connected A2DP Source devices: {}".format(devices))
1048    if addr in {d['address'] for d in devices}:
1049        return True
1050    return False
1051
1052
1053def is_hfp_client_device_connected(ad, addr):
1054    """Determines if an AndroidDevice has HFP connectivity to input address
1055
1056    Args:
1057        ad: the Android device
1058        addr: the address that's expected
1059    Returns:
1060        True if connection was successful, false if unsuccessful.
1061    """
1062    devices = ad.droid.bluetoothHfpClientGetConnectedDevices()
1063    ad.log.info("Connected HFP Client devices: {}".format(devices))
1064    if addr in {d['address'] for d in devices}:
1065        return True
1066    return False
1067
1068
1069def is_map_mce_device_connected(ad, addr):
1070    """Determines if an AndroidDevice has MAP MCE connectivity to input address
1071
1072    Args:
1073        ad: the Android device
1074        addr: the address that's expected
1075    Returns:
1076        True if connection was successful, false if unsuccessful.
1077    """
1078    devices = ad.droid.bluetoothMapClientGetConnectedDevices()
1079    ad.log.info("Connected MAP MCE devices: {}".format(devices))
1080    if addr in {d['address'] for d in devices}:
1081        return True
1082    return False
1083
1084
1085def is_map_mse_device_connected(ad, addr):
1086    """Determines if an AndroidDevice has MAP MSE connectivity to input address
1087
1088    Args:
1089        ad: the Android device
1090        addr: the address that's expected
1091    Returns:
1092        True if connection was successful, false if unsuccessful.
1093    """
1094    devices = ad.droid.bluetoothMapGetConnectedDevices()
1095    ad.log.info("Connected MAP MSE devices: {}".format(devices))
1096    if addr in {d['address'] for d in devices}:
1097        return True
1098    return False
1099
1100
1101def kill_bluetooth_process(ad):
1102    """Kill Bluetooth process on Android device.
1103
1104    Args:
1105        ad: Android device to kill BT process on.
1106    """
1107    ad.log.info("Killing Bluetooth process.")
1108    pid = ad.adb.shell(
1109        "ps | grep com.android.bluetooth | awk '{print $2}'").decode('ascii')
1110    call(["adb -s " + ad.serial + " shell kill " + pid], shell=True)
1111
1112
1113def log_energy_info(android_devices, state):
1114    """Logs energy info of input Android devices.
1115
1116    Args:
1117        android_devices: input Android device list to log energy info from.
1118        state: the input state to log. Usually 'Start' or 'Stop' for logging.
1119
1120    Returns:
1121        A logging string of the Bluetooth energy info reported.
1122    """
1123    return_string = "{} Energy info collection:\n".format(state)
1124    # Bug: b/31966929
1125    return return_string
1126
1127
1128def orchestrate_and_verify_pan_connection(pan_dut, panu_dut):
1129    """Setups up a PAN conenction between two android devices.
1130
1131    Args:
1132        pan_dut: the Android device providing tethering services
1133        panu_dut: the Android device using the internet connection from the
1134            pan_dut
1135    Returns:
1136        True if PAN connection and verification is successful,
1137        false if unsuccessful.
1138    """
1139    if not toggle_airplane_mode_by_adb(log, panu_dut, True):
1140        panu_dut.log.error("Failed to toggle airplane mode on")
1141        return False
1142    if not toggle_airplane_mode_by_adb(log, panu_dut, False):
1143        pan_dut.log.error("Failed to toggle airplane mode off")
1144        return False
1145    pan_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1146    panu_dut.droid.bluetoothStartConnectionStateChangeMonitor("")
1147    if not bluetooth_enabled_check(panu_dut):
1148        return False
1149    if not bluetooth_enabled_check(pan_dut):
1150        return False
1151    pan_dut.droid.bluetoothPanSetBluetoothTethering(True)
1152    if not (pair_pri_to_sec(pan_dut, panu_dut)):
1153        return False
1154    if not pan_dut.droid.bluetoothPanIsTetheringOn():
1155        pan_dut.log.error("Failed to enable Bluetooth tethering.")
1156        return False
1157    # Magic sleep needed to give the stack time in between bonding and
1158    # connecting the PAN profile.
1159    time.sleep(pan_connect_timeout)
1160    panu_dut.droid.bluetoothConnectBonded(
1161        pan_dut.droid.bluetoothGetLocalAddress())
1162    if not verify_http_connection(log, panu_dut):
1163        panu_dut.log.error("Can't verify http connection on PANU device.")
1164        if not verify_http_connection(log, pan_dut):
1165            pan_dut.log.info(
1166                "Can't verify http connection on PAN service device")
1167        return False
1168    return True
1169
1170
1171def orchestrate_bluetooth_socket_connection(
1172        client_ad,
1173        server_ad,
1174        accept_timeout_ms=default_bluetooth_socket_timeout_ms,
1175        uuid=None):
1176    """Sets up the Bluetooth Socket connection between two Android devices.
1177
1178    Args:
1179        client_ad: the Android device performing the connection.
1180        server_ad: the Android device accepting the connection.
1181    Returns:
1182        True if connection was successful, false if unsuccessful.
1183    """
1184    server_ad.droid.bluetoothStartPairingHelper()
1185    client_ad.droid.bluetoothStartPairingHelper()
1186
1187    server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
1188        (bluetooth_socket_conn_test_uuid if uuid is None else uuid),
1189        accept_timeout_ms)
1190    client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
1191        server_ad.droid.bluetoothGetLocalAddress(),
1192        (bluetooth_socket_conn_test_uuid if uuid is None else uuid))
1193
1194    end_time = time.time() + bt_default_timeout
1195    result = False
1196    test_result = True
1197    while time.time() < end_time:
1198        if len(client_ad.droid.bluetoothSocketConnActiveConnections()) > 0:
1199            test_result = True
1200            client_ad.log.info("Bluetooth socket Client Connection Active")
1201            break
1202        else:
1203            test_result = False
1204        time.sleep(1)
1205    if not test_result:
1206        client_ad.log.error(
1207            "Failed to establish a Bluetooth socket connection")
1208        return False
1209    return True
1210
1211
1212def orchestrate_rfcomm_connection(client_ad,
1213                                  server_ad,
1214                                  accept_timeout_ms=default_rfcomm_timeout_ms,
1215                                  uuid=None):
1216    """Sets up the RFCOMM connection between two Android devices.
1217
1218    Args:
1219        client_ad: the Android device performing the connection.
1220        server_ad: the Android device accepting the connection.
1221    Returns:
1222        True if connection was successful, false if unsuccessful.
1223    """
1224    result = orchestrate_bluetooth_socket_connection(
1225        client_ad, server_ad, accept_timeout_ms,
1226        (bt_rfcomm_uuids['default_uuid'] if uuid is None else uuid))
1227
1228    return result
1229
1230
1231def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
1232    """Pairs pri droid to secondary droid.
1233
1234    Args:
1235        pri_ad: Android device initiating connection
1236        sec_ad: Android device accepting connection
1237        attempts: Number of attempts to try until failure.
1238        auto_confirm: Auto confirm passkey match for both devices
1239
1240    Returns:
1241        Pass if True
1242        Fail if False
1243    """
1244    pri_ad.droid.bluetoothStartConnectionStateChangeMonitor(
1245        sec_ad.droid.bluetoothGetLocalAddress())
1246    curr_attempts = 0
1247    while curr_attempts < attempts:
1248        if _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1249            return True
1250        # Wait 2 seconds before unbound
1251        time.sleep(2)
1252        if not clear_bonded_devices(pri_ad):
1253            log.error(
1254                "Failed to clear bond for primary device at attempt {}".format(
1255                    str(curr_attempts)))
1256            return False
1257        if not clear_bonded_devices(sec_ad):
1258            log.error(
1259                "Failed to clear bond for secondary device at attempt {}".
1260                format(str(curr_attempts)))
1261            return False
1262        # Wait 2 seconds after unbound
1263        time.sleep(2)
1264        curr_attempts += 1
1265    log.error("pair_pri_to_sec failed to connect after {} attempts".format(
1266        str(attempts)))
1267    return False
1268
1269
1270def _pair_pri_to_sec(pri_ad, sec_ad, auto_confirm):
1271    # Enable discovery on sec_ad so that pri_ad can find it.
1272    # The timeout here is based on how much time it would take for two devices
1273    # to pair with each other once pri_ad starts seeing devices.
1274    pri_droid = pri_ad.droid
1275    sec_droid = sec_ad.droid
1276    pri_ad.ed.clear_all_events()
1277    sec_ad.ed.clear_all_events()
1278    log.info("Bonding device {} to {}".format(
1279        pri_droid.bluetoothGetLocalAddress(),
1280        sec_droid.bluetoothGetLocalAddress()))
1281    sec_droid.bluetoothMakeDiscoverable(bt_default_timeout)
1282    target_address = sec_droid.bluetoothGetLocalAddress()
1283    log.debug("Starting paring helper on each device")
1284    pri_droid.bluetoothStartPairingHelper(auto_confirm)
1285    sec_droid.bluetoothStartPairingHelper(auto_confirm)
1286    pri_ad.log.info("Primary device starting discovery and executing bond")
1287    result = pri_droid.bluetoothDiscoverAndBond(target_address)
1288    if not auto_confirm:
1289        if not _wait_for_passkey_match(pri_ad, sec_ad):
1290            return False
1291    # Loop until we have bonded successfully or timeout.
1292    end_time = time.time() + bt_default_timeout
1293    pri_ad.log.info("Verifying devices are bonded")
1294    while time.time() < end_time:
1295        bonded_devices = pri_droid.bluetoothGetBondedDevices()
1296        bonded = False
1297        for d in bonded_devices:
1298            if d['address'] == target_address:
1299                pri_ad.log.info("Successfully bonded to device")
1300                return True
1301        time.sleep(0.1)
1302    # Timed out trying to bond.
1303    pri_ad.log.info("Failed to bond devices.")
1304    return False
1305
1306
1307def reset_bluetooth(android_devices):
1308    """Resets Bluetooth state of input Android device list.
1309
1310    Args:
1311        android_devices: The Android device list to reset Bluetooth state on.
1312
1313    Returns:
1314        True if successful, false if unsuccessful.
1315    """
1316    for a in android_devices:
1317        droid, ed = a.droid, a.ed
1318        a.log.info("Reset state of bluetooth on device.")
1319        if droid.bluetoothCheckState() is True:
1320            droid.bluetoothToggleState(False)
1321            expected_bluetooth_off_event_name = bluetooth_off
1322            try:
1323                ed.pop_event(expected_bluetooth_off_event_name,
1324                             bt_default_timeout)
1325            except Exception:
1326                a.log.error("Failed to toggle Bluetooth off.")
1327                return False
1328        # temp sleep for b/17723234
1329        time.sleep(3)
1330        if not bluetooth_enabled_check(a):
1331            return False
1332    return True
1333
1334
1335def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
1336    """Verify that input number of advertisements can be found from the scanning
1337    Android device.
1338
1339    Args:
1340        scn_ad: The Android device to start LE scanning on.
1341        max_advertisements: The number of advertisements the scanner expects to
1342        find.
1343
1344    Returns:
1345        True if successful, false if unsuccessful.
1346    """
1347    test_result = False
1348    address_list = []
1349    filter_list = scn_ad.droid.bleGenFilterList()
1350    scn_ad.droid.bleBuildScanFilter(filter_list)
1351    scan_settings = scn_ad.droid.bleBuildScanSetting()
1352    scan_callback = scn_ad.droid.bleGenScanCallback()
1353    scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback)
1354    start_time = time.time()
1355    while (start_time + bt_default_timeout) > time.time():
1356        event = None
1357        try:
1358            event = scn_ad.ed.pop_event(scan_result.format(scan_callback),
1359                                        bt_default_timeout)
1360        except Empty as error:
1361            raise BtTestUtilsError(
1362                "Failed to find scan event: {}".format(error))
1363        address = event['data']['Result']['deviceInfo']['address']
1364        if address not in address_list:
1365            address_list.append(address)
1366        if len(address_list) == max_advertisements:
1367            test_result = True
1368            break
1369    scn_ad.droid.bleStopBleScan(scan_callback)
1370    return test_result
1371
1372
1373def set_bluetooth_codec(android_device,
1374                        codec_type,
1375                        sample_rate,
1376                        bits_per_sample,
1377                        channel_mode,
1378                        codec_specific_1=0):
1379    """Sets the A2DP codec configuration on the AndroidDevice.
1380
1381    Args:
1382        android_device (acts.controllers.android_device.AndroidDevice): the
1383            android device for which to switch the codec.
1384        codec_type (str): the desired codec type. Must be a key in
1385            bt_constants.codec_types.
1386        sample_rate (str): the desired sample rate. Must be a key in
1387            bt_constants.sample_rates.
1388        bits_per_sample (str): the desired bits per sample. Must be a key in
1389            bt_constants.bits_per_samples.
1390        channel_mode (str): the desired channel mode. Must be a key in
1391            bt_constants.channel_modes.
1392        codec_specific_1 (int): the desired bit rate (quality) for LDAC codec.
1393    Returns:
1394        bool: True if the codec config was successfully changed to the desired
1395            values. Else False.
1396    """
1397    message = ("Set Android Device A2DP Bluetooth codec configuration:\n"
1398               "\tCodec: {codec_type}\n"
1399               "\tSample Rate: {sample_rate}\n"
1400               "\tBits per Sample: {bits_per_sample}\n"
1401               "\tChannel Mode: {channel_mode}".format(
1402                   codec_type=codec_type,
1403                   sample_rate=sample_rate,
1404                   bits_per_sample=bits_per_sample,
1405                   channel_mode=channel_mode))
1406    android_device.log.info(message)
1407
1408    # Send SL4A command
1409    droid, ed = android_device.droid, android_device.ed
1410    if not droid.bluetoothA2dpSetCodecConfigPreference(
1411            codec_types[codec_type], sample_rates[str(sample_rate)],
1412            bits_per_samples[str(bits_per_sample)],
1413            channel_modes[channel_mode], codec_specific_1):
1414        android_device.log.warning(
1415            "SL4A command returned False. Codec was not "
1416            "changed.")
1417    else:
1418        try:
1419            ed.pop_event(bluetooth_a2dp_codec_config_changed,
1420                         bt_default_timeout)
1421        except Exception:
1422            android_device.log.warning("SL4A event not registered. Codec "
1423                                       "may not have been changed.")
1424
1425    # Validate codec value through ADB
1426    # TODO (aidanhb): validate codec more robustly using SL4A
1427    command = "dumpsys bluetooth_manager | grep -i 'current codec'"
1428    out = android_device.adb.shell(command)
1429    split_out = out.split(": ")
1430    if len(split_out) != 2:
1431        android_device.log.warning("Could not verify codec config change "
1432                                   "through ADB.")
1433    elif split_out[1].strip().upper() != codec_type:
1434        android_device.log.error("Codec config was not changed.\n"
1435                                 "\tExpected codec: {exp}\n"
1436                                 "\tActual codec: {act}".format(
1437                                     exp=codec_type, act=split_out[1].strip()))
1438        return False
1439    android_device.log.info("Bluetooth codec successfully changed.")
1440    return True
1441
1442
1443def set_bt_scan_mode(ad, scan_mode_value):
1444    """Set Android device's Bluetooth scan mode.
1445
1446    Args:
1447        ad: The Android device to set the scan mode on.
1448        scan_mode_value: The value to set the scan mode to.
1449
1450    Returns:
1451        True if successful, false if unsuccessful.
1452    """
1453    droid, ed = ad.droid, ad.ed
1454    if scan_mode_value == bt_scan_mode_types['state_off']:
1455        disable_bluetooth(droid)
1456        scan_mode = droid.bluetoothGetScanMode()
1457        reset_bluetooth([ad])
1458        if scan_mode != scan_mode_value:
1459            return False
1460    elif scan_mode_value == bt_scan_mode_types['none']:
1461        droid.bluetoothMakeUndiscoverable()
1462        scan_mode = droid.bluetoothGetScanMode()
1463        if scan_mode != scan_mode_value:
1464            return False
1465    elif scan_mode_value == bt_scan_mode_types['connectable']:
1466        droid.bluetoothMakeUndiscoverable()
1467        droid.bluetoothMakeConnectable()
1468        scan_mode = droid.bluetoothGetScanMode()
1469        if scan_mode != scan_mode_value:
1470            return False
1471    elif (scan_mode_value == bt_scan_mode_types['connectable_discoverable']):
1472        droid.bluetoothMakeDiscoverable()
1473        scan_mode = droid.bluetoothGetScanMode()
1474        if scan_mode != scan_mode_value:
1475            return False
1476    else:
1477        # invalid scan mode
1478        return False
1479    return True
1480
1481
1482def set_device_name(droid, name):
1483    """Set and check Bluetooth local name on input droid object.
1484
1485    Args:
1486        droid: Droid object to set local name on.
1487        name: the Bluetooth local name to set.
1488
1489    Returns:
1490        True if successful, false if unsuccessful.
1491    """
1492    droid.bluetoothSetLocalName(name)
1493    time.sleep(2)
1494    droid_name = droid.bluetoothGetLocalName()
1495    if droid_name != name:
1496        return False
1497    return True
1498
1499
1500def set_profile_priority(host_ad, client_ad, profiles, priority):
1501    """Sets the priority of said profile(s) on host_ad for client_ad"""
1502    for profile in profiles:
1503        host_ad.log.info("Profile {} on {} for {} set to priority {}".format(
1504            profile, host_ad.droid.bluetoothGetLocalName(),
1505            client_ad.droid.bluetoothGetLocalAddress(), priority.value))
1506        if bt_profile_constants['a2dp_sink'] == profile:
1507            host_ad.droid.bluetoothA2dpSinkSetPriority(
1508                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1509        elif bt_profile_constants['headset_client'] == profile:
1510            host_ad.droid.bluetoothHfpClientSetPriority(
1511                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1512        elif bt_profile_constants['pbap_client'] == profile:
1513            host_ad.droid.bluetoothPbapClientSetPriority(
1514                client_ad.droid.bluetoothGetLocalAddress(), priority.value)
1515        else:
1516            host_ad.log.error(
1517                "Profile {} not yet supported for priority settings".format(
1518                    profile))
1519
1520
1521def setup_multiple_devices_for_bt_test(android_devices):
1522    """A common setup routine for Bluetooth on input Android device list.
1523
1524    Things this function sets up:
1525    1. Resets Bluetooth
1526    2. Set Bluetooth local name to random string of size 4
1527    3. Disable BLE background scanning.
1528    4. Enable Bluetooth snoop logging.
1529
1530    Args:
1531        android_devices: Android device list to setup Bluetooth on.
1532
1533    Returns:
1534        True if successful, false if unsuccessful.
1535    """
1536    log.info("Setting up Android Devices")
1537    # TODO: Temp fix for an selinux error.
1538    for ad in android_devices:
1539        ad.adb.shell("setenforce 0")
1540    threads = []
1541    try:
1542        for a in android_devices:
1543            thread = threading.Thread(target=factory_reset_bluetooth,
1544                                      args=([[a]]))
1545            threads.append(thread)
1546            thread.start()
1547        for t in threads:
1548            t.join()
1549
1550        for a in android_devices:
1551            d = a.droid
1552            # TODO: Create specific RPC command to instantiate
1553            # BluetoothConnectionFacade. This is just a workaround.
1554            d.bluetoothStartConnectionStateChangeMonitor("")
1555            setup_result = d.bluetoothSetLocalName(generate_id_by_size(4))
1556            if not setup_result:
1557                a.log.error("Failed to set device name.")
1558                return setup_result
1559            d.bluetoothDisableBLE()
1560            utils.set_location_service(a, True)
1561            bonded_devices = d.bluetoothGetBondedDevices()
1562            for b in bonded_devices:
1563                a.log.info("Removing bond for device {}".format(b['address']))
1564                d.bluetoothUnbond(b['address'])
1565        for a in android_devices:
1566            a.adb.shell("setprop persist.bluetooth.btsnooplogmode full")
1567            getprop_result = a.adb.shell(
1568                "getprop persist.bluetooth.btsnooplogmode") == "full"
1569            if not getprop_result:
1570                a.log.warning("Failed to enable Bluetooth Hci Snoop Logging.")
1571    except Exception as err:
1572        log.error("Something went wrong in multi device setup: {}".format(err))
1573        return False
1574    return setup_result
1575
1576
1577def setup_n_advertisements(adv_ad, num_advertisements):
1578    """Setup input number of advertisements on input Android device.
1579
1580    Args:
1581        adv_ad: The Android device to start LE advertisements on.
1582        num_advertisements: The number of advertisements to start.
1583
1584    Returns:
1585        advertise_callback_list: List of advertisement callback ids.
1586    """
1587    adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
1588        ble_advertise_settings_modes['low_latency'])
1589    advertise_data = adv_ad.droid.bleBuildAdvertiseData()
1590    advertise_settings = adv_ad.droid.bleBuildAdvertiseSettings()
1591    advertise_callback_list = []
1592    for i in range(num_advertisements):
1593        advertise_callback = adv_ad.droid.bleGenBleAdvertiseCallback()
1594        advertise_callback_list.append(advertise_callback)
1595        adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
1596                                            advertise_settings)
1597        try:
1598            adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
1599                                bt_default_timeout)
1600            adv_ad.log.info("Advertisement {} started.".format(i + 1))
1601        except Empty as error:
1602            adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
1603            raise BtTestUtilsError(
1604                "Test failed with Empty error: {}".format(error))
1605    return advertise_callback_list
1606
1607
1608def take_btsnoop_log(ad, testcase, testname):
1609    """Grabs the btsnoop_hci log on a device and stores it in the log directory
1610    of the test class.
1611
1612    If you want grab the btsnoop_hci log, call this function with android_device
1613    objects in on_fail. Bug report takes a relative long time to take, so use
1614    this cautiously.
1615
1616    Args:
1617        ad: The android_device instance to take bugreport on.
1618        testcase: Name of the test calss that triggered this snoop log.
1619        testname: Name of the test case that triggered this bug report.
1620    """
1621    testname = "".join(x for x in testname if x.isalnum())
1622    serial = ad.serial
1623    device_model = ad.droid.getBuildModel()
1624    device_model = device_model.replace(" ", "")
1625    out_name = ','.join((testname, device_model, serial))
1626    snoop_path = os.path.join(ad.device_log_path, 'BluetoothSnoopLogs')
1627    os.makedirs(snoop_path, exist_ok=True)
1628    cmd = ''.join(("adb -s ", serial, " pull ", btsnoop_log_path_on_device,
1629                   " ", snoop_path + '/' + out_name, ".btsnoop_hci.log"))
1630    exe_cmd(cmd)
1631    try:
1632        cmd = ''.join(
1633            ("adb -s ", serial, " pull ", btsnoop_last_log_path_on_device, " ",
1634             snoop_path + '/' + out_name, ".btsnoop_hci.log.last"))
1635        exe_cmd(cmd)
1636    except Exception as err:
1637        testcase.log.info(
1638            "File does not exist {}".format(btsnoop_last_log_path_on_device))
1639
1640
1641def take_btsnoop_logs(android_devices, testcase, testname):
1642    """Pull btsnoop logs from an input list of android devices.
1643
1644    Args:
1645        android_devices: the list of Android devices to pull btsnoop logs from.
1646        testcase: Name of the test calss that triggered this snoop log.
1647        testname: Name of the test case that triggered this bug report.
1648    """
1649    for a in android_devices:
1650        take_btsnoop_log(a, testcase, testname)
1651
1652
1653def teardown_n_advertisements(adv_ad, num_advertisements,
1654                              advertise_callback_list):
1655    """Stop input number of advertisements on input Android device.
1656
1657    Args:
1658        adv_ad: The Android device to stop LE advertisements on.
1659        num_advertisements: The number of advertisements to stop.
1660        advertise_callback_list: The list of advertisement callbacks to stop.
1661
1662    Returns:
1663        True if successful, false if unsuccessful.
1664    """
1665    for n in range(num_advertisements):
1666        adv_ad.droid.bleStopBleAdvertising(advertise_callback_list[n])
1667    return True
1668
1669
1670def verify_server_and_client_connected(client_ad, server_ad, log=True):
1671    """Verify that input server and client Android devices are connected.
1672
1673    This code is under the assumption that there will only be
1674    a single connection.
1675
1676    Args:
1677        client_ad: the Android device to check number of active connections.
1678        server_ad: the Android device to check number of active connections.
1679
1680    Returns:
1681        True both server and client have at least 1 active connection,
1682        false if unsuccessful.
1683    """
1684    test_result = True
1685    if len(server_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1686        if log:
1687            server_ad.log.error("No socket connections found on server.")
1688        test_result = False
1689    if len(client_ad.droid.bluetoothSocketConnActiveConnections()) == 0:
1690        if log:
1691            client_ad.log.error("No socket connections found on client.")
1692        test_result = False
1693    return test_result
1694
1695
1696def wait_for_bluetooth_manager_state(droid,
1697                                     state=None,
1698                                     timeout=10,
1699                                     threshold=5):
1700    """ Waits for BlueTooth normalized state or normalized explicit state
1701    args:
1702        droid: droid device object
1703        state: expected BlueTooth state
1704        timeout: max timeout threshold
1705        threshold: list len of bt state
1706    Returns:
1707        True if successful, false if unsuccessful.
1708    """
1709    all_states = []
1710    get_state = lambda: droid.bluetoothGetLeState()
1711    start_time = time.time()
1712    while time.time() < start_time + timeout:
1713        all_states.append(get_state())
1714        if len(all_states) >= threshold:
1715            # for any normalized state
1716            if state is None:
1717                if len(set(all_states[-threshold:])) == 1:
1718                    log.info("State normalized {}".format(
1719                        set(all_states[-threshold:])))
1720                    return True
1721            else:
1722                # explicit check against normalized state
1723                if set([state]).issubset(all_states[-threshold:]):
1724                    return True
1725        time.sleep(0.5)
1726    log.error(
1727        "Bluetooth state fails to normalize" if state is None else
1728        "Failed to match bluetooth state, current state {} expected state {}".
1729        format(get_state(), state))
1730    return False
1731
1732
1733def _wait_for_passkey_match(pri_ad, sec_ad):
1734    pri_pin, sec_pin = -1, 1
1735    pri_variant, sec_variant = -1, 1
1736    pri_pairing_req, sec_pairing_req = None, None
1737    try:
1738        pri_pairing_req = pri_ad.ed.pop_event(
1739            event_name="BluetoothActionPairingRequest",
1740            timeout=bt_default_timeout)
1741        pri_variant = pri_pairing_req["data"]["PairingVariant"]
1742        pri_pin = pri_pairing_req["data"]["Pin"]
1743        pri_ad.log.info("Primary device received Pin: {}, Variant: {}".format(
1744            pri_pin, pri_variant))
1745        sec_pairing_req = sec_ad.ed.pop_event(
1746            event_name="BluetoothActionPairingRequest",
1747            timeout=bt_default_timeout)
1748        sec_variant = sec_pairing_req["data"]["PairingVariant"]
1749        sec_pin = sec_pairing_req["data"]["Pin"]
1750        sec_ad.log.info(
1751            "Secondary device received Pin: {}, Variant: {}".format(
1752                sec_pin, sec_variant))
1753    except Empty as err:
1754        log.error("Wait for pin error: {}".format(err))
1755        log.error("Pairing request state, Primary: {}, Secondary: {}".format(
1756            pri_pairing_req, sec_pairing_req))
1757        return False
1758    if pri_variant == sec_variant == pairing_variant_passkey_confirmation:
1759        confirmation = pri_pin == sec_pin
1760        if confirmation:
1761            log.info("Pairing code matched, accepting connection")
1762        else:
1763            log.info("Pairing code mismatched, rejecting connection")
1764        pri_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1765                               str(confirmation))
1766        sec_ad.droid.eventPost("BluetoothActionPairingRequestUserConfirm",
1767                               str(confirmation))
1768        if not confirmation:
1769            return False
1770    elif pri_variant != sec_variant:
1771        log.error("Pairing variant mismatched, abort connection")
1772        return False
1773    return True
1774
1775
1776def write_read_verify_data(client_ad, server_ad, msg, binary=False):
1777    """Verify that the client wrote data to the server Android device correctly.
1778
1779    Args:
1780        client_ad: the Android device to perform the write.
1781        server_ad: the Android device to read the data written.
1782        msg: the message to write.
1783        binary: if the msg arg is binary or not.
1784
1785    Returns:
1786        True if the data written matches the data read, false if not.
1787    """
1788    client_ad.log.info("Write message.")
1789    try:
1790        if binary:
1791            client_ad.droid.bluetoothSocketConnWriteBinary(msg)
1792        else:
1793            client_ad.droid.bluetoothSocketConnWrite(msg)
1794    except Exception as err:
1795        client_ad.log.error("Failed to write data: {}".format(err))
1796        return False
1797    server_ad.log.info("Read message.")
1798    try:
1799        if binary:
1800            read_msg = server_ad.droid.bluetoothSocketConnReadBinary().rstrip(
1801                "\r\n")
1802        else:
1803            read_msg = server_ad.droid.bluetoothSocketConnRead()
1804    except Exception as err:
1805        server_ad.log.error("Failed to read data: {}".format(err))
1806        return False
1807    log.info("Verify message.")
1808    if msg != read_msg:
1809        log.error("Mismatch! Read: {}, Expected: {}".format(read_msg, msg))
1810        return False
1811    return True
1812
1813
1814class MediaControlOverSl4a(object):
1815    """Media control using sl4a facade for general purpose.
1816
1817    """
1818    def __init__(self, android_device, music_file):
1819        """Initialize the media_control class.
1820
1821        Args:
1822            android_dut: android_device object
1823            music_file: location of the music file
1824        """
1825        self.android_device = android_device
1826        self.music_file = music_file
1827
1828    def play(self):
1829        """Play media.
1830
1831        """
1832        self.android_device.droid.mediaPlayOpen('file://%s' % self.music_file,
1833                                                'default', True)
1834        playing = self.android_device.droid.mediaIsPlaying()
1835        asserts.assert_true(playing,
1836                            'Failed to play music %s' % self.music_file)
1837
1838    def pause(self):
1839        """Pause media.
1840
1841        """
1842        self.android_device.droid.mediaPlayPause('default')
1843        paused = not self.android_device.droid.mediaIsPlaying()
1844        asserts.assert_true(paused,
1845                            'Failed to pause music %s' % self.music_file)
1846
1847    def resume(self):
1848        """Resume media.
1849
1850        """
1851        self.android_device.droid.mediaPlayStart('default')
1852        playing = self.android_device.droid.mediaIsPlaying()
1853        asserts.assert_true(playing,
1854                            'Failed to play music %s' % self.music_file)
1855
1856    def stop(self):
1857        """Stop media.
1858
1859        """
1860        self.android_device.droid.mediaPlayStop('default')
1861        stopped = not self.android_device.droid.mediaIsPlaying()
1862        asserts.assert_true(stopped,
1863                            'Failed to stop music %s' % self.music_file)
1864