1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Original file: 18 tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/bt_gatt_utils.py 19""" 20 21import logging 22import pprint 23from queue import Empty 24 25from blueberry.utils.bt_gatt_constants import GattCallbackError 26from blueberry.utils.bt_gatt_constants import GattCallbackString 27from blueberry.utils.bt_gatt_constants import GattCharacteristic 28from blueberry.utils.bt_gatt_constants import GattConnectionState 29from blueberry.utils.bt_gatt_constants import GattDescriptor 30from blueberry.utils.bt_gatt_constants import GattPhyMask 31from blueberry.utils.bt_gatt_constants import GattServiceType 32from blueberry.utils.bt_gatt_constants import GattTransport 33from blueberry.utils.bt_test_utils import BtTestUtilsError 34from blueberry.utils.bt_test_utils import get_mac_address_of_generic_advertisement 35from mobly.controllers.android_device import AndroidDevice 36from mobly.controllers.android_device_lib.event_dispatcher import EventDispatcher 37from mobly.controllers.android_device_lib.sl4a_client import Sl4aClient 38 39default_timeout = 10 40log = logging 41 42 43class GattTestUtilsError(Exception): 44 pass 45 46 47def setup_gatt_connection(central: AndroidDevice, 48 mac_address, 49 autoconnect, 50 transport=GattTransport.TRANSPORT_AUTO, 51 opportunistic=False, 52 timeout_seconds=default_timeout): 53 gatt_callback = central.sl4a.gattCreateGattCallback() 54 log.info("Gatt Connect to mac address {}.".format(mac_address)) 55 bluetooth_gatt = central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect, transport, 56 opportunistic, GattPhyMask.PHY_LE_1M_MASK) 57 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 58 try: 59 event = central.ed.pop_event(expected_event, timeout_seconds) 60 except Empty: 61 close_gatt_client(central, bluetooth_gatt) 62 raise GattTestUtilsError("Could not establish a connection to " 63 "peripheral. Expected event: {}".format(expected_event)) 64 logging.info("Got connection event {}".format(event)) 65 if event['data']['State'] != GattConnectionState.STATE_CONNECTED: 66 close_gatt_client(central, bluetooth_gatt) 67 raise GattTestUtilsError("Could not establish a connection to " 68 "peripheral. Event Details: {}".format(pprint.pformat(event))) 69 return bluetooth_gatt, gatt_callback 70 71 72def wait_for_gatt_connection(central: AndroidDevice, gatt_callback, bluetooth_gatt, timeout): 73 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 74 try: 75 event = central.ed.pop_event(expected_event, timeout=timeout) 76 except Empty: 77 close_gatt_client(central, bluetooth_gatt) 78 raise GattTestUtilsError("Could not establish a connection to " 79 "peripheral. Expected event: {}".format(expected_event)) 80 if event['data']['State'] != GattConnectionState.STATE_CONNECTED: 81 close_gatt_client(central, bluetooth_gatt) 82 try: 83 central.sl4a.gattClientClose(bluetooth_gatt) 84 except Exception: 85 logging.debug("Failed to close gatt client.") 86 raise GattTestUtilsError("Could not establish a connection to " 87 "peripheral. Event Details: {}".format(pprint.pformat(event))) 88 89 90def close_gatt_client(central: AndroidDevice, bluetooth_gatt): 91 try: 92 central.sl4a.gattClientClose(bluetooth_gatt) 93 except Exception: 94 log.debug("Failed to close gatt client.") 95 96 97def disconnect_gatt_connection(central: AndroidDevice, bluetooth_gatt, gatt_callback): 98 central.sl4a.gattClientDisconnect(bluetooth_gatt) 99 wait_for_gatt_disconnect_event(central, gatt_callback) 100 return 101 102 103def wait_for_gatt_disconnect_event(central: AndroidDevice, gatt_callback): 104 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 105 try: 106 event = central.ed.pop_event(expected_event, default_timeout) 107 except Empty: 108 raise GattTestUtilsError(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) 109 found_state = event['data']['State'] 110 expected_state = GattConnectionState.STATE_DISCONNECTED 111 if found_state != expected_state: 112 raise GattTestUtilsError("GATT connection state change expected {}, found {}".format( 113 expected_event, found_state)) 114 return 115 116 117def orchestrate_gatt_connection(central: AndroidDevice, 118 peripheral: AndroidDevice, 119 transport=GattTransport.TRANSPORT_LE, 120 mac_address=None, 121 autoconnect=False, 122 opportunistic=False): 123 adv_callback = None 124 if mac_address is None: 125 if transport == GattTransport.TRANSPORT_LE: 126 try: 127 mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement( 128 central, peripheral)) 129 except BtTestUtilsError as err: 130 raise GattTestUtilsError("Error in getting mac address: {}".format(err)) 131 else: 132 mac_address = peripheral.sl4a.bluetoothGetLocalAddress() 133 adv_callback = None 134 bluetooth_gatt, gatt_callback = setup_gatt_connection(central, mac_address, autoconnect, transport, opportunistic) 135 return bluetooth_gatt, gatt_callback, adv_callback 136 137 138def run_continuous_write_descriptor(cen_droid: Sl4aClient, 139 cen_ed: EventDispatcher, 140 per_droid: Sl4aClient, 141 per_ed: EventDispatcher, 142 gatt_server, 143 gatt_server_callback, 144 bluetooth_gatt, 145 services_count, 146 discovered_services_index, 147 number_of_iterations=100000): 148 log.info("Starting continuous write") 149 bt_device_id = 0 150 status = 1 151 offset = 1 152 test_value = [1, 2, 3, 4, 5, 6, 7] 153 test_value_return = [1, 2, 3] 154 for _ in range(number_of_iterations): 155 try: 156 for i in range(services_count): 157 characteristic_uuids = (cen_droid.gattClientGetDiscoveredCharacteristicUuids( 158 discovered_services_index, i)) 159 log.info(characteristic_uuids) 160 for characteristic in characteristic_uuids: 161 descriptor_uuids = (cen_droid.gattClientGetDiscoveredDescriptorUuids( 162 discovered_services_index, i, characteristic)) 163 log.info(descriptor_uuids) 164 for descriptor in descriptor_uuids: 165 cen_droid.gattClientDescriptorSetValue(bluetooth_gatt, discovered_services_index, i, 166 characteristic, descriptor, test_value) 167 cen_droid.gattClientWriteDescriptor(bluetooth_gatt, discovered_services_index, i, 168 characteristic, descriptor) 169 expected_event = \ 170 GattCallbackString.DESC_WRITE_REQ.format( 171 gatt_server_callback) 172 try: 173 event = per_ed.pop_event(expected_event, default_timeout) 174 except Empty: 175 log.error(GattCallbackError.DESC_WRITE_REQ_ERR.format(expected_event)) 176 return False 177 request_id = event['data']['requestId'] 178 found_value = event['data']['value'] 179 if found_value != test_value: 180 log.error("Values didn't match. Found: {}, Expected: " "{}".format(found_value, test_value)) 181 per_droid.gattServerSendResponse(gatt_server, bt_device_id, request_id, status, offset, 182 test_value_return) 183 expected_event = GattCallbackString.DESC_WRITE.format(bluetooth_gatt) 184 try: 185 cen_ed.pop_event(expected_event, default_timeout) 186 except Empty: 187 log.error(GattCallbackError.DESC_WRITE_ERR.format(expected_event)) 188 raise Exception("Thread ended prematurely.") 189 except Exception as err: 190 log.error("Continuing but found exception: {}".format(err)) 191 192 193def setup_characteristics_and_descriptors_read_write(droid: Sl4aClient): 194 characteristic_input = [ 195 { 196 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 197 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 198 'permission': GattCharacteristic.PERMISSION_WRITE 199 }, 200 { 201 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 202 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_READ, 203 'permission': GattCharacteristic.PERMISSION_READ 204 }, 205 { 206 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 207 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 208 'permission': GattCharacteristic.PERMISSION_READ 209 }, 210 ] 211 descriptor_input = [{ 212 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 213 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 214 }, { 215 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 216 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 217 }] 218 characteristic_list = setup_gatt_characteristics(droid, characteristic_input) 219 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 220 return characteristic_list, descriptor_list 221 222 223def setup_multiple_services(peripheral: AndroidDevice): 224 per_droid, per_ed = peripheral.sl4a, peripheral.sl4a.ed 225 gatt_server_callback = per_droid.gattServerCreateGattServerCallback() 226 gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback) 227 characteristic_list, descriptor_list = (setup_characteristics_and_descriptors_read_write(per_droid)) 228 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[0]) 229 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[1]) 230 gattService = per_droid.gattServerCreateService("00000000-0000-1000-8000-00805f9b34fb", 231 GattServiceType.SERVICE_TYPE_PRIMARY) 232 gattService2 = per_droid.gattServerCreateService("FFFFFFFF-0000-1000-8000-00805f9b34fb", 233 GattServiceType.SERVICE_TYPE_PRIMARY) 234 gattService3 = per_droid.gattServerCreateService("3846D7A0-69C8-11E4-BA00-0002A5D5C51B", 235 GattServiceType.SERVICE_TYPE_PRIMARY) 236 for characteristic in characteristic_list: 237 per_droid.gattServerAddCharacteristicToService(gattService, characteristic) 238 per_droid.gattServerAddService(gatt_server, gattService) 239 expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback) 240 try: 241 per_ed.pop_event(expected_event, default_timeout) 242 except Empty: 243 peripheral.sl4a.gattServerClose(gatt_server) 244 raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 245 for characteristic in characteristic_list: 246 per_droid.gattServerAddCharacteristicToService(gattService2, characteristic) 247 per_droid.gattServerAddService(gatt_server, gattService2) 248 try: 249 per_ed.pop_event(expected_event, default_timeout) 250 except Empty: 251 peripheral.sl4a.gattServerClose(gatt_server) 252 raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 253 for characteristic in characteristic_list: 254 per_droid.gattServerAddCharacteristicToService(gattService3, characteristic) 255 per_droid.gattServerAddService(gatt_server, gattService3) 256 try: 257 per_ed.pop_event(expected_event, default_timeout) 258 except Empty: 259 peripheral.sl4a.gattServerClose(gatt_server) 260 raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 261 return gatt_server_callback, gatt_server 262 263 264def setup_characteristics_and_descriptors_notify_read(droid: Sl4aClient): 265 characteristic_input = [ 266 { 267 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 268 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 269 'permission': GattCharacteristic.PROPERTY_WRITE 270 }, 271 { 272 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 273 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 274 'permission': GattCharacteristic.PERMISSION_READ 275 }, 276 { 277 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 278 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 279 'permission': GattCharacteristic.PERMISSION_READ 280 }, 281 ] 282 descriptor_input = [{ 283 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 284 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 285 }, { 286 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 287 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], 288 }] 289 characteristic_list = setup_gatt_characteristics(droid, characteristic_input) 290 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 291 return characteristic_list, descriptor_list 292 293 294def setup_gatt_characteristics(droid: Sl4aClient, input): 295 characteristic_list = [] 296 for item in input: 297 index = droid.gattServerCreateBluetoothGattCharacteristic(item['uuid'], item['property'], item['permission']) 298 characteristic_list.append(index) 299 return characteristic_list 300 301 302def setup_gatt_descriptors(droid: Sl4aClient, input): 303 descriptor_list = [] 304 for item in input: 305 index = droid.gattServerCreateBluetoothGattDescriptor( 306 item['uuid'], 307 item['property'], 308 ) 309 descriptor_list.append(index) 310 log.info("setup descriptor list: {}".format(descriptor_list)) 311 return descriptor_list 312 313 314def setup_gatt_mtu(central: AndroidDevice, bluetooth_gatt, gatt_callback, mtu): 315 """utility function to set mtu for GATT connection. 316 317 Steps: 318 1. Request mtu change. 319 2. Check if the mtu is changed to the new value 320 321 Args: 322 central: test device for client to scan. 323 bluetooth_gatt: GATT object 324 mtu: new mtu value to be set 325 326 Returns: 327 If success, return True. 328 if fail, return False 329 """ 330 central.sl4a.gattClientRequestMtu(bluetooth_gatt, mtu) 331 expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback) 332 try: 333 mtu_event = central.ed.pop_event(expected_event, default_timeout) 334 mtu_size_found = mtu_event['data']['MTU'] 335 if mtu_size_found != mtu: 336 log.error("MTU size found: {}, expected: {}".format(mtu_size_found, mtu)) 337 return False 338 except Empty: 339 log.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event)) 340 return False 341 return True 342 343 344def log_gatt_server_uuids(central: AndroidDevice, discovered_services_index, bluetooth_gatt=None): 345 services_count = central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index) 346 for i in range(services_count): 347 service = central.sl4a.gattClientGetDiscoveredServiceUuid(discovered_services_index, i) 348 log.info("Discovered service uuid {}".format(service)) 349 characteristic_uuids = (central.sl4a.gattClientGetDiscoveredCharacteristicUuids(discovered_services_index, i)) 350 for j in range(len(characteristic_uuids)): 351 descriptor_uuids = (central.sl4a.gattClientGetDiscoveredDescriptorUuidsByIndex( 352 discovered_services_index, i, j)) 353 if bluetooth_gatt: 354 char_inst_id = central.sl4a.gattClientGetCharacteristicInstanceId(bluetooth_gatt, 355 discovered_services_index, i, j) 356 log.info("Discovered characteristic handle uuid: {} {}".format( 357 hex(char_inst_id), characteristic_uuids[j])) 358 for k in range(len(descriptor_uuids)): 359 desc_inst_id = central.sl4a.gattClientGetDescriptorInstanceId(bluetooth_gatt, 360 discovered_services_index, i, j, k) 361 log.info("Discovered descriptor handle uuid: {} {}".format(hex(desc_inst_id), descriptor_uuids[k])) 362 else: 363 log.info("Discovered characteristic uuid: {}".format(characteristic_uuids[j])) 364 for k in range(len(descriptor_uuids)): 365 log.info("Discovered descriptor uuid {}".format(descriptor_uuids[k])) 366