1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17# TODO: In the future to decide whether to move it to a common directory rather 18# than the one specific to apollo. 19# TODO: The move is contingent on understanding the functions that should be 20# supported by the dut device (sec_device). 21 22"""A generic library with bluetooth related functions. The connection is assumed 23to be between and android phone with any dut (referred to as secondary device) 24device that supports the following calls: 25 sec_device.turn_on_bluetooth() 26 sec_device.is_bt_enabled(): 27 sec_device.bluetooth_address 28 sec_device.set_pairing_mode() 29 sec_device.factory_reset() 30 31""" 32import queue 33import time 34from logging import Logger 35 36from acts import asserts 37from acts.controllers.buds_lib import tako_trace_logger 38from acts.utils import TimeoutError 39from acts.utils import wait_until 40 41# Add connection profile for future devices in this dictionary 42WEARABLE_BT_PROTOCOLS = { 43 'rio': { 44 'Comp. App': 'FALSE', 45 'HFP (pri.)': 'FALSE', 46 'HFP (sec.)': 'FALSE', 47 'A2DP (pri.)': 'FALSE', 48 'A2DP (sec.)': 'FALSE', 49 }, 50 'apollo': { 51 'Comp': 'FALSE', 52 'HFP(pri.)': 'FALSE', 53 'HFP(sec.)': 'FALSE', 54 'A2DP(pri)': 'FALSE', 55 'A2DP(sec)': 'FALSE', 56 } 57} 58 59 60class BTUtilsError(Exception): 61 """Generic BTUtils error""" 62 63 64class BTUtils(object): 65 """A utility that provides access to bluetooth controls. 66 67 This class to be maintained as a generic class such that it is compatible 68 with any devices that pair with a phone. 69 """ 70 71 def __init__(self): 72 self.default_timeout = 60 73 self.logger = tako_trace_logger.TakoTraceLogger(Logger(__file__)) 74 75 def bt_pair_and_connect(self, pri_device, sec_device): 76 """Pair and connect a pri_device to a sec_device. 77 78 Args: 79 pri_device: an android device with sl4a installed. 80 sec_device: a wearable device. 81 82 Returns: 83 (Tuple)True if pair and connect successful. False Otherwise. 84 Time in ms to execute the flow. 85 """ 86 87 pair_time = self.bt_pair(pri_device, sec_device) 88 connect_result, connect_time = self.bt_connect(pri_device, sec_device) 89 return connect_result, pair_time + connect_time 90 91 def bt_pair(self, pri_device, sec_device): 92 """Pair a pri_device to a sec_device. 93 94 Args: 95 pri_device: an android device with sl4a installed. 96 sec_device: a wearable device. 97 98 Returns: 99 (Tuple)True if pair successful. False Otherwise. 100 Time in ms to execute the flow. 101 """ 102 start_time = time.time() 103 # Enable BT on the primary device if it's not currently ON. 104 if not pri_device.droid.bluetoothCheckState(): 105 pri_device.droid.bluetoothToggleState(True) 106 try: 107 pri_device.ed.pop_event(event_name='BluetoothStateChangedOn', 108 timeout=10) 109 except queue.Empty: 110 raise BTUtilsError( 111 'Failed to toggle Bluetooth on the primary device.') 112 sec_device.turn_on_bluetooth() 113 if not sec_device.is_bt_enabled(): 114 raise BTUtilsError('Could not turn on Bluetooth on secondary ' 115 'devices') 116 target_addr = sec_device.bluetooth_address 117 sec_device.set_pairing_mode() 118 119 pri_device.droid.bluetoothDiscoverAndBond(target_addr) 120 # Loop until we have bonded successfully or timeout. 121 self.logger.info('Verifying devices are bonded') 122 try: 123 wait_until(lambda: self.android_device_in_paired_state(pri_device, 124 target_addr), 125 self.default_timeout) 126 except TimeoutError as err: 127 raise BTUtilsError('bt_pair failed: {}'.format(err)) 128 end_time = time.time() 129 return end_time - start_time 130 131 def bt_connect(self, pri_device, sec_device): 132 """Connect a previously paired sec_device to a pri_device. 133 134 Args: 135 pri_device: an android device with sl4a installed. 136 sec_device: a wearable device. 137 138 Returns: 139 (Tuple)True if connect successful. False otherwise. 140 Time in ms to execute the flow. 141 """ 142 start_time = end_time = time.time() 143 target_addr = sec_device.bluetooth_address 144 # First check that devices are bonded. 145 paired = False 146 for paired_device in pri_device.droid.bluetoothGetBondedDevices(): 147 if paired_device['address'] == target_addr: 148 paired = True 149 break 150 if not paired: 151 self.logger.error('Not paired to %s', sec_device.device_name) 152 return False, 0 153 154 self.logger.info('Attempting to connect.') 155 pri_device.droid.bluetoothConnectBonded(target_addr) 156 157 self.logger.info('Verifying devices are connected') 158 wait_until( 159 lambda: self.android_device_in_connected_state(pri_device, 160 target_addr), 161 self.default_timeout) 162 end_time = time.time() 163 return True, end_time - start_time 164 165 def android_device_in_paired_state(self, device, mac_address): 166 """Check device in paired list.""" 167 bonded_devices = device.droid.bluetoothGetBondedDevices() 168 for d in bonded_devices: 169 if d['address'] == mac_address: 170 self.logger.info('Successfully bonded to device') 171 return True 172 return False 173 174 def android_device_in_connected_state(self, device, mac_address): 175 """Check device in connected list.""" 176 connected_devices = device.droid.bluetoothGetConnectedDevices() 177 for d in connected_devices: 178 if d['address'] == mac_address: 179 self.logger.info('Successfully connected to device') 180 return True 181 return False 182 183 def bt_unpair(self, pri_device, sec_device, factory_reset_dut=True): 184 """Unpairs two Android devices using bluetooth. 185 186 Args: 187 pri_device: an android device with sl4a installed. 188 sec_device: a wearable device. 189 190 Returns: 191 (Tuple)True: if the devices successfully unpaired. 192 Time in ms to execute the flow. 193 Raises: 194 Error: When devices fail to unpair. 195 """ 196 target_address = sec_device.bluetooth_address 197 if not self.android_device_in_paired_state(pri_device, target_address): 198 self.logger.debug('Already unpaired.') 199 return True, 0 200 self.logger.debug('Unpairing from %s' % target_address) 201 start_time = end_time = time.time() 202 asserts.assert_true( 203 pri_device.droid.bluetoothUnbond(target_address), 204 'Failed to request device unpairing.') 205 206 # Check that devices have unpaired successfully. 207 self.logger.debug('Verifying devices are unpaired') 208 209 # Loop until we have unbonded successfully or timeout. 210 wait_until( 211 lambda: self.android_device_in_paired_state(pri_device, 212 target_address), 213 self.default_timeout, 214 condition=False) 215 216 self.logger.info('Successfully unpaired from %s' % target_address) 217 if factory_reset_dut: 218 self.logger.info('Factory reset DUT') 219 sec_device.factory_reset() 220 end_time = time.time() 221 return True, end_time - start_time 222 223 def check_device_bt(self, device, **kwargs): 224 """Check the Bluetooth connection status from device. 225 226 Args: 227 device: a wearable device. 228 **kwargs: additional parameters 229 230 Returns: 231 True: if bt status check success, False otherwise. 232 """ 233 if device.dut_type in ['rio', 'apollo']: 234 profiles = kwargs.get('profiles') 235 return self.check_dut_status(device, profiles) 236 237 def check_dut_status(self, device, profiles=None): 238 """Check the Bluetooth connection status from rio/apollo device. 239 240 Args: 241 device: rio/apollo device 242 profiles: A dict of profiles, eg. {'HFP (pri.)': 'TRUE', 'Comp. App': 243 'TRUE', 'A2DP (pri.)': 'TRUE'} 244 245 Returns: 246 True: if bt status check success, False otherwise. 247 """ 248 expected = WEARABLE_BT_PROTOCOLS 249 self.logger.info(profiles) 250 for key in profiles: 251 expected[device.dut_type][key] = profiles[key] 252 try: 253 wait_until(lambda: self._compare_profile(device, 254 expected[device.dut_type]), 255 self.default_timeout) 256 except TimeoutError: 257 status = device.get_bt_status() 258 msg_fmt = self._get_formatted_output(expected[device.dut_type], 259 status) 260 self.logger.error(msg_fmt) 261 return False 262 return True 263 264 def _get_formatted_output(self, expected, actual): 265 """On BT status mismatch generate formatted output string. 266 267 Args: 268 expected: Expected BT status hash. 269 actual: Actual BT status hash from Rio. 270 271 Returns: 272 Formatted mismatch string. 273 274 Raises: 275 Error: When unexpcted parameter encounterd. 276 """ 277 msg = '' 278 mismatch_format = '{}: Expected {} Actual {}. ' 279 if actual is None: 280 raise BTUtilsError('None is not expected.') 281 for key in expected.keys(): 282 if expected[key] != actual[key]: 283 msg += mismatch_format.format(key, expected[key], actual[key]) 284 return msg 285 286 def _compare_profile(self, device, expected): 287 """Compare input expected profile with actual.""" 288 actual = device.get_bt_status() 289 if actual is None: 290 raise BTUtilsError('None is not expected.') 291 for key in expected.keys(): 292 if expected[key] != actual[key]: 293 return False 294 return True 295