1#/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16 17from acts.logger import LoggerProxy 18 19from acts.test_utils.bt.bt_test_utils import BtTestUtilsError 20from acts.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement 21from acts.test_utils.bt.GattEnum import GattCbErr 22from acts.test_utils.bt.GattEnum import GattCbStrings 23from acts.test_utils.bt.GattEnum import GattConnectionState 24from acts.test_utils.bt.GattEnum import GattCharacteristic 25from acts.test_utils.bt.GattEnum import GattDescriptor 26from acts.test_utils.bt.GattEnum import GattService 27from acts.test_utils.bt.GattEnum import GattTransport 28from acts.test_utils.bt.GattEnum import GattConnectionPriority 29import pprint 30from queue import Empty 31from contextlib import suppress 32 33default_timeout = 10 34log = LoggerProxy() 35 36 37class GattTestUtilsError(Exception): 38 pass 39 40 41def setup_gatt_connection(cen_ad, mac_address, autoconnect, 42 transport=GattTransport.TRANSPORT_AUTO): 43 test_result = True 44 gatt_callback = cen_ad.droid.gattCreateGattCallback() 45 log.info("Gatt Connect to mac address {}.".format(mac_address)) 46 bluetooth_gatt = cen_ad.droid.gattClientConnectGatt( 47 gatt_callback, mac_address, autoconnect, transport) 48 expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(gatt_callback) 49 try: 50 event = cen_ad.ed.pop_event(expected_event, default_timeout) 51 except Empty: 52 log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(expected_event)) 53 test_result = False 54 return test_result, bluetooth_gatt, gatt_callback 55 if event['data']['State'] != GattConnectionState.STATE_CONNECTED.value: 56 log.info("Could not establish a connection to peripheral. Event " 57 "Details:".format(pprint.pformat(event))) 58 test_result = False 59 return test_result, bluetooth_gatt, gatt_callback 60 61 62def disconnect_gatt_connection(cen_ad, bluetooth_gatt, gatt_callback): 63 cen_ad.droid.gattClientDisconnect(bluetooth_gatt) 64 expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(gatt_callback) 65 try: 66 event = cen_ad.ed.pop_event(expected_event, default_timeout) 67 except Empty: 68 log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(expected_event)) 69 return False 70 if event['data']['State'] != GattConnectionState.STATE_DISCONNECTED.value: 71 return False 72 return True 73 74 75def orchestrate_gatt_connection(cen_ad, 76 per_ad, 77 transport=GattTransport.TRANSPORT_LE, 78 mac_address=None, 79 autoconnect=False): 80 adv_callback = None 81 if mac_address is None: 82 if transport == GattTransport.TRANSPORT_LE: 83 try: 84 mac_address, adv_callback = ( 85 get_mac_address_of_generic_advertisement(cen_ad, per_ad)) 86 except BtTestUtilsError as err: 87 raise GattTestUtilsError("Error in getting mac address: {}".format(err)) 88 else: 89 mac_address = per_ad.droid.bluetoothGetLocalAddress() 90 adv_callback = None 91 test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection( 92 cen_ad, mac_address, autoconnect, transport) 93 if not test_result: 94 raise GattTestUtilsError("Could not connect to peripheral.") 95 return bluetooth_gatt, gatt_callback, adv_callback 96 97 98def run_continuous_write_descriptor(cen_droid, cen_ed, per_droid, per_ed, 99 gatt_server, gatt_server_callback, 100 bluetooth_gatt, services_count, 101 discovered_services_index): 102 log.info("Starting continuous write") 103 bt_device_id = 0 104 status = 1 105 offset = 1 106 test_value = "1,2,3,4,5,6,7" 107 test_value_return = "1,2,3" 108 with suppress(Exception): 109 for x in range(100000): 110 for i in range(services_count): 111 characteristic_uuids = ( 112 cen_droid.gattClientGetDiscoveredCharacteristicUuids( 113 discovered_services_index, i)) 114 log.info(characteristic_uuids) 115 for characteristic in characteristic_uuids: 116 descriptor_uuids = ( 117 cen_droid.gattClientGetDiscoveredDescriptorUuids( 118 discovered_services_index, i, characteristic)) 119 log.info(descriptor_uuids) 120 for descriptor in descriptor_uuids: 121 log.info("descriptor to be written {}".format( 122 descriptor)) 123 cen_droid.gattClientDescriptorSetValue( 124 bluetooth_gatt, discovered_services_index, i, 125 characteristic, descriptor, test_value) 126 cen_droid.gattClientWriteDescriptor( 127 bluetooth_gatt, discovered_services_index, i, 128 characteristic, descriptor) 129 expected_event = GattCbStrings.DESC_WRITE_REQ.value.format( 130 gatt_server_callback) 131 try: 132 event = per_ed.pop_event(expected_event, 133 default_timeout) 134 except Empty: 135 log.error( 136 GattCbErr.DESC_WRITE_REQ_ERR.value.format( 137 expected_event)) 138 return False 139 request_id = event['data']['requestId'] 140 found_value = event['data']['value'] 141 if found_value != test_value: 142 log.error( 143 "Values didn't match. Found: {}, Expected: " 144 "{}".format(found_value, test_value)) 145 per_droid.gattServerSendResponse( 146 gatt_server, bt_device_id, request_id, status, 147 offset, test_value_return) 148 expected_event = GattCbStrings.DESC_WRITE.value.format( 149 bluetooth_gatt) 150 try: 151 cen_ed.pop_event(expected_event, default_timeout) 152 except Empty: 153 log.error(GattCbErr.DESC_WRITE_ERR.value.format( 154 expected_event)) 155 return False 156 157 158def setup_characteristics_and_descriptors(droid): 159 characteristic_input = [ 160 { 161 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 162 'property': GattCharacteristic.PROPERTY_WRITE.value 163 | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, 164 'permission': GattCharacteristic.PROPERTY_WRITE.value 165 }, 166 { 167 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 168 'property': GattCharacteristic.PROPERTY_NOTIFY.value 169 | GattCharacteristic.PROPERTY_READ.value, 170 'permission': GattCharacteristic.PERMISSION_READ.value 171 }, 172 { 173 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 174 'property': GattCharacteristic.PROPERTY_NOTIFY.value 175 | GattCharacteristic.PROPERTY_READ.value, 176 'permission': GattCharacteristic.PERMISSION_READ.value 177 }, 178 ] 179 descriptor_input = [ 180 { 181 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 182 'property': GattDescriptor.PERMISSION_READ.value 183 | GattDescriptor.PERMISSION_WRITE.value, 184 }, { 185 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 186 'property': GattDescriptor.PERMISSION_READ.value 187 | GattCharacteristic.PERMISSION_WRITE.value, 188 } 189 ] 190 characteristic_list = setup_gatt_characteristics(droid, 191 characteristic_input) 192 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 193 return characteristic_list, descriptor_list 194 195 196def setup_multiple_services(per_ad): 197 per_droid, per_ed = per_ad.droid, per_ad.ed 198 gatt_server_callback = per_droid.gattServerCreateGattServerCallback() 199 gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback) 200 characteristic_list, descriptor_list = ( 201 setup_characteristics_and_descriptors(per_droid)) 202 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], 203 descriptor_list[0]) 204 per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], 205 descriptor_list[1]) 206 gattService = per_droid.gattServerCreateService( 207 "00000000-0000-1000-8000-00805f9b34fb", 208 GattService.SERVICE_TYPE_PRIMARY.value) 209 gattService2 = per_droid.gattServerCreateService( 210 "FFFFFFFF-0000-1000-8000-00805f9b34fb", 211 GattService.SERVICE_TYPE_PRIMARY.value) 212 gattService3 = per_droid.gattServerCreateService( 213 "3846D7A0-69C8-11E4-BA00-0002A5D5C51B", 214 GattService.SERVICE_TYPE_PRIMARY.value) 215 for characteristic in characteristic_list: 216 per_droid.gattServerAddCharacteristicToService(gattService, 217 characteristic) 218 per_droid.gattServerAddService(gatt_server, gattService) 219 expected_event = GattCbStrings.SERV_ADDED.value.format( 220 gatt_server_callback) 221 try: 222 per_ed.pop_event(expected_event, default_timeout) 223 except Empty: 224 log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event)) 225 return False 226 for characteristic in characteristic_list: 227 per_droid.gattServerAddCharacteristicToService(gattService2, 228 characteristic) 229 per_droid.gattServerAddService(gatt_server, gattService2) 230 try: 231 per_ed.pop_event(expected_event, default_timeout) 232 except Empty: 233 log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event)) 234 return False 235 for characteristic in characteristic_list: 236 per_droid.gattServerAddCharacteristicToService(gattService3, 237 characteristic) 238 per_droid.gattServerAddService(gatt_server, gattService3) 239 try: 240 per_ed.pop_event(expected_event, default_timeout) 241 except Empty: 242 log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event)) 243 return False 244 return gatt_server_callback, gatt_server 245 246 247def setup_characteristics_and_descriptors(droid): 248 characteristic_input = [ 249 { 250 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 251 'property': GattCharacteristic.PROPERTY_WRITE.value 252 | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, 253 'permission': GattCharacteristic.PROPERTY_WRITE.value 254 }, 255 { 256 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 257 'property': GattCharacteristic.PROPERTY_NOTIFY.value 258 | GattCharacteristic.PROPERTY_READ.value, 259 'permission': GattCharacteristic.PERMISSION_READ.value 260 }, 261 { 262 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 263 'property': GattCharacteristic.PROPERTY_NOTIFY.value 264 | GattCharacteristic.PROPERTY_READ.value, 265 'permission': GattCharacteristic.PERMISSION_READ.value 266 }, 267 ] 268 descriptor_input = [ 269 { 270 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 271 'property': GattDescriptor.PERMISSION_READ.value 272 | GattDescriptor.PERMISSION_WRITE.value, 273 }, { 274 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 275 'property': GattDescriptor.PERMISSION_READ.value 276 | GattCharacteristic.PERMISSION_WRITE.value, 277 } 278 ] 279 characteristic_list = setup_gatt_characteristics(droid, 280 characteristic_input) 281 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 282 return characteristic_list, descriptor_list 283 284 285def setup_gatt_characteristics(droid, input): 286 characteristic_list = [] 287 for item in input: 288 index = droid.gattServerCreateBluetoothGattCharacteristic( 289 item['uuid'], item['property'], item['permission']) 290 characteristic_list.append(index) 291 return characteristic_list 292 293 294def setup_gatt_descriptors(droid, input): 295 descriptor_list = [] 296 for item in input: 297 index = droid.gattServerCreateBluetoothGattDescriptor( 298 item['uuid'], 299 item['property'], ) 300 descriptor_list.append(index) 301 log.info("setup descriptor list: {}".format(descriptor_list)) 302 return descriptor_list 303