1#/usr/bin/env python3.4 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This is base class for tests that exercises different GATT procedures between two connected devices. 18Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery. 19""" 20 21from queue import Empty 22 23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 24from acts.test_utils.bt.GattEnum import GattCharacteristic 25from acts.test_utils.bt.GattEnum import GattDescriptor 26from acts.test_utils.bt.GattEnum import GattService 27from acts.test_utils.bt.GattEnum import GattEvent 28from acts.test_utils.bt.GattEnum import GattCbErr 29from acts.test_utils.bt.GattEnum import GattCbStrings 30from acts.test_utils.bt.GattEnum import MtuSize 31from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection 32from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection 33from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics 34from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors 35 36 37class GattConnectedBaseTest(BluetoothBaseTest): 38 DEFAULT_TIMEOUT = 10 39 40 TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 41 READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8" 42 READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 43 WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 44 WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32" 45 NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77" 46 # CCC == Client Characteristic Configuration 47 CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb" 48 49 def __init__(self, controllers): 50 BluetoothBaseTest.__init__(self, controllers) 51 self.cen_ad = self.android_devices[0] 52 self.per_ad = self.android_devices[1] 53 54 def setup_test(self): 55 super(GattConnectedBaseTest, self).setup_test() 56 57 self.gatt_server_callback, self.gatt_server = self._setup_multiple_services( 58 ) 59 if not self.gatt_server_callback or not self.gatt_server: 60 raise AssertionError('Service setup failed') 61 62 self.bluetooth_gatt, self.gatt_callback, self.adv_callback = ( 63 orchestrate_gatt_connection(self.cen_ad, self.per_ad)) 64 self.per_ad.droid.bleStopBleAdvertising(self.adv_callback) 65 66 self.mtu = MtuSize.MIN.value 67 68 if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt): 69 event = self._client_wait(GattEvent.GATT_SERV_DISC) 70 self.discovered_services_index = event['data']['ServicesIndex'] 71 services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount( 72 self.discovered_services_index) 73 self.test_service_index = None 74 for i in range(services_count): 75 disc_service_uuid = ( 76 self.cen_ad.droid.gattClientGetDiscoveredServiceUuid( 77 self.discovered_services_index, i).upper()) 78 if disc_service_uuid == self.TEST_SERVICE_UUID: 79 self.test_service_index = i 80 break 81 82 if not self.test_service_index: 83 print("Service not found") 84 return False 85 86 connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices( 87 self.gatt_server) 88 if len(connected_device_list) == 0: 89 self.log.info("No devices connected from peripheral.") 90 return False 91 92 return True 93 94 def teardown_test(self): 95 self.per_ad.droid.gattServerClearServices(self.gatt_server) 96 self.per_ad.droid.gattServerClose(self.gatt_server) 97 98 del self.gatt_server_callback 99 del self.gatt_server 100 101 self._orchestrate_gatt_disconnection(self.bluetooth_gatt, 102 self.gatt_callback) 103 104 return super(GattConnectedBaseTest, self).teardown_test() 105 106 def _server_wait(self, gatt_event): 107 return self._timed_pop(gatt_event, self.per_ad, 108 self.gatt_server_callback) 109 110 def _client_wait(self, gatt_event): 111 return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback) 112 113 def _timed_pop(self, gatt_event, droid, gatt_callback): 114 expected_event = gatt_event.value["evt"].format(gatt_callback) 115 try: 116 return droid.ed.pop_event(expected_event, self.DEFAULT_TIMEOUT) 117 except Empty as emp: 118 raise AssertionError(gatt_event.value["err"].format( 119 expected_event)) 120 121 def _setup_characteristics_and_descriptors(self, droid): 122 characteristic_input = [ 123 { 124 'uuid': self.WRITABLE_CHAR_UUID, 125 'property': GattCharacteristic.PROPERTY_WRITE.value | 126 GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, 127 'permission': GattCharacteristic.PERMISSION_WRITE.value 128 }, 129 { 130 'uuid': self.READABLE_CHAR_UUID, 131 'property': GattCharacteristic.PROPERTY_READ.value, 132 'permission': GattCharacteristic.PERMISSION_READ.value 133 }, 134 { 135 'uuid': self.NOTIFIABLE_CHAR_UUID, 136 'property': GattCharacteristic.PROPERTY_NOTIFY.value | 137 GattCharacteristic.PROPERTY_INDICATE.value, 138 'permission': GattCharacteristic.PERMISSION_READ.value 139 }, 140 ] 141 descriptor_input = [ 142 { 143 'uuid': self.WRITABLE_DESC_UUID, 144 'property': GattDescriptor.PERMISSION_READ.value | 145 GattCharacteristic.PERMISSION_WRITE.value, 146 }, 147 { 148 'uuid': self.READABLE_DESC_UUID, 149 'property': GattDescriptor.PERMISSION_READ.value | 150 GattDescriptor.PERMISSION_WRITE.value, 151 }, 152 { 153 'uuid': self.CCC_DESC_UUID, 154 'property': GattDescriptor.PERMISSION_READ.value | 155 GattDescriptor.PERMISSION_WRITE.value, 156 } 157 ] 158 characteristic_list = setup_gatt_characteristics(droid, 159 characteristic_input) 160 self.notifiable_char_index = characteristic_list[2]; 161 descriptor_list = setup_gatt_descriptors(droid, descriptor_input) 162 return characteristic_list, descriptor_list 163 164 def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): 165 self.log.info("Disconnecting from peripheral device.") 166 test_result = disconnect_gatt_connection(self.cen_ad, bluetooth_gatt, 167 gatt_callback) 168 self.cen_ad.droid.gattClientClose(bluetooth_gatt) 169 if not test_result: 170 self.log.info("Failed to disconnect from peripheral device.") 171 return False 172 return True 173 174 def _find_service_added_event(self, gatt_server_callback, uuid): 175 expected_event = GattCbStrings.SERV_ADDED.value.format( 176 gatt_server_callback) 177 try: 178 event = self.per_ad.ed.pop_event(expected_event, 179 self.DEFAULT_TIMEOUT) 180 except Empty: 181 self.log.error(GattCbErr.SERV_ADDED_ERR.value.format( 182 expected_event)) 183 return False 184 if event['data']['serviceUuid'].lower() != uuid.lower(): 185 self.log.error("Uuid mismatch. Found: {}, Expected {}.".format( 186 event['data']['serviceUuid'], uuid)) 187 return False 188 return True 189 190 def _setup_multiple_services(self): 191 gatt_server_callback = ( 192 self.per_ad.droid.gattServerCreateGattServerCallback()) 193 gatt_server = self.per_ad.droid.gattServerOpenGattServer( 194 gatt_server_callback) 195 characteristic_list, descriptor_list = ( 196 self._setup_characteristics_and_descriptors(self.per_ad.droid)) 197 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 198 characteristic_list[0], descriptor_list[0]) 199 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 200 characteristic_list[1], descriptor_list[1]) 201 self.per_ad.droid.gattServerCharacteristicAddDescriptor( 202 characteristic_list[2], descriptor_list[2]) 203 gatt_service3 = self.per_ad.droid.gattServerCreateService( 204 self.TEST_SERVICE_UUID, GattService.SERVICE_TYPE_PRIMARY.value) 205 for characteristic in characteristic_list: 206 self.per_ad.droid.gattServerAddCharacteristicToService( 207 gatt_service3, characteristic) 208 self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3) 209 result = self._find_service_added_event(gatt_server_callback, 210 self.TEST_SERVICE_UUID) 211 if not result: 212 return False, False 213 return gatt_server_callback, gatt_server 214 215 def assertEqual(self, first, second, msg=None): 216 if not first == second: 217 if not msg: 218 raise AssertionError('%r != %r' % (first, second)) 219 else: 220 raise AssertionError(msg + ' %r != %r' % (first, second)) 221