1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17import logging 18 19from acts_contrib.test_utils.bt.bt_test_utils import BtTestUtilsError 20from acts_contrib.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement 21from acts_contrib.test_utils.bt.bt_constants import gatt_cb_err 22from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings 23from acts_contrib.test_utils.bt.bt_constants import gatt_connection_state 24from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic 25from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor 26from acts_contrib.test_utils.bt.bt_constants import gatt_phy_mask 27from acts_contrib.test_utils.bt.bt_constants import gatt_service_types 28from acts_contrib.test_utils.bt.bt_constants import gatt_transport 29import pprint 30from queue import Empty 31 32default_timeout = 10 33log = logging 34 35 36class GattTestUtilsError(Exception): 37 pass 38 39 40def setup_gatt_connection(cen_ad, 41 mac_address, 42 autoconnect, 43 transport=gatt_transport['auto'], 44 opportunistic=False): 45 gatt_callback = cen_ad.droid.gattCreateGattCallback() 46 log.info("Gatt Connect to mac address {}.".format(mac_address)) 47 bluetooth_gatt = cen_ad.droid.gattClientConnectGatt( 48 gatt_callback, mac_address, autoconnect, transport, opportunistic, 49 gatt_phy_mask['1m_mask']) 50 expected_event = gatt_cb_strings['gatt_conn_change'].format(gatt_callback) 51 try: 52 event = cen_ad.ed.pop_event(expected_event, default_timeout) 53 except Empty: 54 close_gatt_client(cen_ad, bluetooth_gatt) 55 raise GattTestUtilsError( 56 "Could not establish a connection to " 57 "peripheral. Expected event: {}".format(expected_event)) 58 if event['data']['State'] != gatt_connection_state['connected']: 59 close_gatt_client(cen_ad, bluetooth_gatt) 60 try: 61 cen_ad.droid.gattClientClose(bluetooth_gatt) 62 except Exception: 63 self.log.debug("Failed to close gatt client.") 64 raise GattTestUtilsError("Could not establish a connection to " 65 "peripheral. Event Details: {}".format( 66 pprint.pformat(event))) 67 return bluetooth_gatt, gatt_callback 68 69 70def close_gatt_client(cen_ad, bluetooth_gatt): 71 try: 72 cen_ad.droid.gattClientClose(bluetooth_gatt) 73 except Exception: 74 log.debug("Failed to close gatt client.") 75 76 77def disconnect_gatt_connection(cen_ad, bluetooth_gatt, gatt_callback): 78 cen_ad.droid.gattClientDisconnect(bluetooth_gatt) 79 wait_for_gatt_disconnect_event(cen_ad, gatt_callback) 80 return 81 82 83def wait_for_gatt_disconnect_event(cen_ad, gatt_callback): 84 expected_event = gatt_cb_strings['gatt_conn_change'].format(gatt_callback) 85 try: 86 event = cen_ad.ed.pop_event(expected_event, default_timeout) 87 except Empty: 88 raise GattTestUtilsError( 89 gatt_cb_err['gatt_conn_change_err'].format(expected_event)) 90 found_state = event['data']['State'] 91 expected_state = gatt_connection_state['disconnected'] 92 if found_state != expected_state: 93 raise GattTestUtilsError( 94 "GATT connection state change expected {}, found {}".format( 95 expected_event, found_state)) 96 return 97 98 99def orchestrate_gatt_connection(cen_ad, 100 per_ad, 101 transport=gatt_transport['le'], 102 mac_address=None, 103 autoconnect=False, 104 opportunistic=False): 105 adv_callback = None 106 if mac_address is None: 107 if transport == gatt_transport['le']: 108 try: 109 mac_address, adv_callback, scan_callback = ( 110 get_mac_address_of_generic_advertisement(cen_ad, per_ad)) 111 except BtTestUtilsError as err: 112 raise GattTestUtilsError( 113 "Error in getting mac address: {}".format(err)) 114 else: 115 mac_address = per_ad.droid.bluetoothGetLocalAddress() 116 adv_callback = None 117 bluetooth_gatt, gatt_callback = setup_gatt_connection( 118 cen_ad, mac_address, autoconnect, transport, opportunistic) 119 return bluetooth_gatt, gatt_callback, adv_callback 120 121 122def run_continuous_write_descriptor(cen_droid, 123 cen_ed, 124 per_droid, 125 per_ed, 126 gatt_server, 127 gatt_server_callback, 128 bluetooth_gatt, 129 services_count, 130 discovered_services_index, 131 number_of_iterations=100000): 132 log.info("Starting continuous write") 133 bt_device_id = 0 134 status = 1 135 offset = 1 136 test_value = [1, 2, 3, 4, 5, 6, 7] 137 test_value_return = [1, 2, 3] 138 for _ in range(number_of_iterations): 139 try: 140 for i in range(services_count): 141 characteristic_uuids = ( 142 cen_droid.gattClientGetDiscoveredCharacteristicUuids( 143 discovered_services_index, i)) 144 log.info(characteristic_uuids) 145 for characteristic in characteristic_uuids: 146 descriptor_uuids = ( 147 cen_droid.gattClientGetDiscoveredDescriptorUuids( 148 discovered_services_index, i, characteristic)) 149 log.info(descriptor_uuids) 150 for descriptor in descriptor_uuids: 151 cen_droid.gattClientDescriptorSetValue( 152 bluetooth_gatt, discovered_services_index, i, 153 characteristic, descriptor, test_value) 154 cen_droid.gattClientWriteDescriptor( 155 bluetooth_gatt, discovered_services_index, i, 156 characteristic, descriptor) 157 expected_event = gatt_cb_strings[ 158 'desc_write_req'].format(gatt_server_callback) 159 try: 160 event = per_ed.pop_event(expected_event, 161 default_timeout) 162 except Empty: 163 log.error(gatt_cb_err['desc_write_req_err'].format( 164 expected_event)) 165 return False 166 request_id = event['data']['requestId'] 167 found_value = event['data']['value'] 168 if found_value != test_value: 169 log.error( 170 "Values didn't match. Found: {}, Expected: " 171 "{}".format(found_value, test_value)) 172 per_droid.gattServerSendResponse( 173 gatt_server, bt_device_id, request_id, status, 174 offset, test_value_return) 175 expected_event = gatt_cb_strings['desc_write'].format( 176 bluetooth_gatt) 177 try: 178 cen_ed.pop_event(expected_event, default_timeout) 179 except Empty: 180 log.error(gatt_cb_strings['desc_write_err'].format( 181 expected_event)) 182 raise Exception("Thread ended prematurely.") 183 except Exception as err: 184 log.error("Continuing but found exception: {}".format(err)) 185 186 187def setup_characteristics_and_descriptors(droid): 188 characteristic_input = [ 189 { 190 'uuid': 191 "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 192 'property': 193 gatt_characteristic['property_write'] 194 | gatt_characteristic['property_write_no_response'], 195 'permission': 196 gatt_characteristic['permission_write'] 197 }, 198 { 199 'uuid': 200 "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 201 'property': 202 gatt_characteristic['property_notify'] 203 | gatt_characteristic['property_read'], 204 'permission': 205 gatt_characteristic['permission_read'] 206 }, 207 { 208 'uuid': 209 "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 210 'property': 211 gatt_characteristic['property_notify'] 212 | gatt_characteristic['property_read'], 213 'permission': 214 gatt_characteristic['permission_read'] 215 }, 216 ] 217 descriptor_input = [{ 218 'uuid': 219 "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 220 'property': 221 gatt_descriptor['permission_read'] 222 | gatt_descriptor['permission_write'], 223 }, { 224 'uuid': 225 "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 226 'property': 227 gatt_descriptor['permission_read'] 228 | gatt_characteristic['permission_write'], 229 }] 230 characteristic_list = setup_gatt_characteristics(droid, 231 characteristic_input) 232 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 233 return characteristic_list, descriptor_list 234 235 236def setup_multiple_services(per_ad): 237 per_droid, per_ed = per_ad.droid, per_ad.ed 238 gatt_server_callback = per_droid.gattServerCreateGattServerCallback() 239 gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback) 240 characteristic_list, descriptor_list = ( 241 setup_characteristics_and_descriptors(per_droid)) 242 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], 243 descriptor_list[0]) 244 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], 245 descriptor_list[1]) 246 gattService = per_droid.gattServerCreateService( 247 "00000000-0000-1000-8000-00805f9b34fb", gatt_service_types['primary']) 248 gattService2 = per_droid.gattServerCreateService( 249 "FFFFFFFF-0000-1000-8000-00805f9b34fb", gatt_service_types['primary']) 250 gattService3 = per_droid.gattServerCreateService( 251 "3846D7A0-69C8-11E4-BA00-0002A5D5C51B", gatt_service_types['primary']) 252 for characteristic in characteristic_list: 253 per_droid.gattServerAddCharacteristicToService(gattService, 254 characteristic) 255 per_droid.gattServerAddService(gatt_server, gattService) 256 expected_event = gatt_cb_strings['serv_added'].format(gatt_server_callback) 257 try: 258 per_ed.pop_event(expected_event, default_timeout) 259 except Empty: 260 per_ad.droid.gattServerClose(gatt_server) 261 raise GattTestUtilsError( 262 gatt_cb_strings['serv_added_err'].format(expected_event)) 263 for characteristic in characteristic_list: 264 per_droid.gattServerAddCharacteristicToService(gattService2, 265 characteristic) 266 per_droid.gattServerAddService(gatt_server, gattService2) 267 try: 268 per_ed.pop_event(expected_event, default_timeout) 269 except Empty: 270 per_ad.droid.gattServerClose(gatt_server) 271 raise GattTestUtilsError( 272 gatt_cb_strings['serv_added_err'].format(expected_event)) 273 for characteristic in characteristic_list: 274 per_droid.gattServerAddCharacteristicToService(gattService3, 275 characteristic) 276 per_droid.gattServerAddService(gatt_server, gattService3) 277 try: 278 per_ed.pop_event(expected_event, default_timeout) 279 except Empty: 280 per_ad.droid.gattServerClose(gatt_server) 281 raise GattTestUtilsError( 282 gatt_cb_strings['serv_added_err'].format(expected_event)) 283 return gatt_server_callback, gatt_server 284 285 286def setup_characteristics_and_descriptors(droid): 287 characteristic_input = [ 288 { 289 'uuid': 290 "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 291 'property': 292 gatt_characteristic['property_write'] 293 | gatt_characteristic['property_write_no_response'], 294 'permission': 295 gatt_characteristic['property_write'] 296 }, 297 { 298 'uuid': 299 "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 300 'property': 301 gatt_characteristic['property_notify'] 302 | gatt_characteristic['property_read'], 303 'permission': 304 gatt_characteristic['permission_read'] 305 }, 306 { 307 'uuid': 308 "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 309 'property': 310 gatt_characteristic['property_notify'] 311 | gatt_characteristic['property_read'], 312 'permission': 313 gatt_characteristic['permission_read'] 314 }, 315 ] 316 descriptor_input = [{ 317 'uuid': 318 "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 319 'property': 320 gatt_descriptor['permission_read'] 321 | gatt_descriptor['permission_write'], 322 }, { 323 'uuid': 324 "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 325 'property': 326 gatt_descriptor['permission_read'] 327 | gatt_characteristic['permission_write'], 328 }] 329 characteristic_list = setup_gatt_characteristics(droid, 330 characteristic_input) 331 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 332 return characteristic_list, descriptor_list 333 334 335def setup_gatt_characteristics(droid, input): 336 characteristic_list = [] 337 for item in input: 338 index = droid.gattServerCreateBluetoothGattCharacteristic( 339 item['uuid'], item['property'], item['permission']) 340 characteristic_list.append(index) 341 return characteristic_list 342 343 344def setup_gatt_descriptors(droid, input): 345 descriptor_list = [] 346 for item in input: 347 index = droid.gattServerCreateBluetoothGattDescriptor( 348 item['uuid'], 349 item['property'], 350 ) 351 descriptor_list.append(index) 352 log.info("setup descriptor list: {}".format(descriptor_list)) 353 return descriptor_list 354 355 356def setup_gatt_mtu(cen_ad, bluetooth_gatt, gatt_callback, mtu): 357 """utility function to set mtu for GATT connection. 358 359 Steps: 360 1. Request mtu change. 361 2. Check if the mtu is changed to the new value 362 363 Args: 364 cen_ad: test device for client to scan. 365 bluetooth_gatt: GATT object 366 mtu: new mtu value to be set 367 368 Returns: 369 If success, return True. 370 if fail, return False 371 """ 372 cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, mtu) 373 expected_event = gatt_cb_strings['mtu_changed'].format(gatt_callback) 374 try: 375 mtu_event = cen_ad.ed.pop_event(expected_event, default_timeout) 376 mtu_size_found = mtu_event['data']['MTU'] 377 if mtu_size_found != mtu: 378 log.error("MTU size found: {}, expected: {}".format( 379 mtu_size_found, mtu)) 380 return False 381 except Empty: 382 log.error(gatt_cb_err['mtu_changed_err'].format(expected_event)) 383 return False 384 return True 385 386 387def log_gatt_server_uuids(cen_ad, 388 discovered_services_index, 389 bluetooth_gatt=None): 390 services_count = cen_ad.droid.gattClientGetDiscoveredServicesCount( 391 discovered_services_index) 392 for i in range(services_count): 393 service = cen_ad.droid.gattClientGetDiscoveredServiceUuid( 394 discovered_services_index, i) 395 log.info("Discovered service uuid {}".format(service)) 396 characteristic_uuids = ( 397 cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( 398 discovered_services_index, i)) 399 for j in range(len(characteristic_uuids)): 400 descriptor_uuids = ( 401 cen_ad.droid.gattClientGetDiscoveredDescriptorUuidsByIndex( 402 discovered_services_index, i, j)) 403 if bluetooth_gatt: 404 char_inst_id = cen_ad.droid.gattClientGetCharacteristicInstanceId( 405 bluetooth_gatt, discovered_services_index, i, j) 406 log.info("Discovered characteristic handle uuid: {} {}".format( 407 hex(char_inst_id), characteristic_uuids[j])) 408 for k in range(len(descriptor_uuids)): 409 desc_inst_id = cen_ad.droid.gattClientGetDescriptorInstanceId( 410 bluetooth_gatt, discovered_services_index, i, j, k) 411 log.info("Discovered descriptor handle uuid: {} {}".format( 412 hex(desc_inst_id), descriptor_uuids[k])) 413 else: 414 log.info("Discovered characteristic uuid: {}".format( 415 characteristic_uuids[j])) 416 for k in range(len(descriptor_uuids)): 417 log.info("Discovered descriptor uuid {}".format( 418 descriptor_uuids[k])) 419