#!/usr/bin/env python3 # # Copyright (C) 2016 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """ Original file: tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/bt_gatt_utils.py """ import logging import pprint from queue import Empty from blueberry.utils.bt_gatt_constants import GattCallbackError from blueberry.utils.bt_gatt_constants import GattCallbackString from blueberry.utils.bt_gatt_constants import GattCharacteristic from blueberry.utils.bt_gatt_constants import GattConnectionState from blueberry.utils.bt_gatt_constants import GattDescriptor from blueberry.utils.bt_gatt_constants import GattPhyMask from blueberry.utils.bt_gatt_constants import GattServiceType from blueberry.utils.bt_gatt_constants import GattTransport from blueberry.utils.bt_test_utils import BtTestUtilsError from blueberry.utils.bt_test_utils import get_mac_address_of_generic_advertisement from mobly.controllers.android_device import AndroidDevice from mobly.controllers.android_device_lib.event_dispatcher import EventDispatcher from mobly.controllers.android_device_lib.sl4a_client import Sl4aClient default_timeout = 10 log = logging class GattTestUtilsError(Exception): pass def setup_gatt_connection(central: AndroidDevice, mac_address, autoconnect, transport=GattTransport.TRANSPORT_AUTO, opportunistic=False, timeout_seconds=default_timeout): gatt_callback = central.sl4a.gattCreateGattCallback() log.info("Gatt Connect to mac address {}.".format(mac_address)) bluetooth_gatt = central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect, transport, opportunistic, GattPhyMask.PHY_LE_1M_MASK) expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) try: event = central.ed.pop_event(expected_event, timeout_seconds) except Empty: close_gatt_client(central, bluetooth_gatt) raise GattTestUtilsError("Could not establish a connection to " "peripheral. Expected event: {}".format(expected_event)) logging.info("Got connection event {}".format(event)) if event['data']['State'] != GattConnectionState.STATE_CONNECTED: close_gatt_client(central, bluetooth_gatt) raise GattTestUtilsError("Could not establish a connection to " "peripheral. Event Details: {}".format(pprint.pformat(event))) return bluetooth_gatt, gatt_callback def wait_for_gatt_connection(central: AndroidDevice, gatt_callback, bluetooth_gatt, timeout): expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) try: event = central.ed.pop_event(expected_event, timeout=timeout) except Empty: close_gatt_client(central, bluetooth_gatt) raise GattTestUtilsError("Could not establish a connection to " "peripheral. Expected event: {}".format(expected_event)) if event['data']['State'] != GattConnectionState.STATE_CONNECTED: close_gatt_client(central, bluetooth_gatt) try: central.sl4a.gattClientClose(bluetooth_gatt) except Exception: logging.debug("Failed to close gatt client.") raise GattTestUtilsError("Could not establish a connection to " "peripheral. Event Details: {}".format(pprint.pformat(event))) def close_gatt_client(central: AndroidDevice, bluetooth_gatt): try: central.sl4a.gattClientClose(bluetooth_gatt) except Exception: log.debug("Failed to close gatt client.") def disconnect_gatt_connection(central: AndroidDevice, bluetooth_gatt, gatt_callback): central.sl4a.gattClientDisconnect(bluetooth_gatt) wait_for_gatt_disconnect_event(central, gatt_callback) return def wait_for_gatt_disconnect_event(central: AndroidDevice, gatt_callback): expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) try: event = central.ed.pop_event(expected_event, default_timeout) except Empty: raise GattTestUtilsError(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) found_state = event['data']['State'] expected_state = GattConnectionState.STATE_DISCONNECTED if found_state != expected_state: raise GattTestUtilsError("GATT connection state change expected {}, found {}".format( expected_event, found_state)) return def orchestrate_gatt_connection(central: AndroidDevice, peripheral: AndroidDevice, transport=GattTransport.TRANSPORT_LE, mac_address=None, autoconnect=False, opportunistic=False): adv_callback = None if mac_address is None: if transport == GattTransport.TRANSPORT_LE: try: mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement( central, peripheral)) except BtTestUtilsError as err: raise GattTestUtilsError("Error in getting mac address: {}".format(err)) else: mac_address = peripheral.sl4a.bluetoothGetLocalAddress() adv_callback = None bluetooth_gatt, gatt_callback = setup_gatt_connection(central, mac_address, autoconnect, transport, opportunistic) return bluetooth_gatt, gatt_callback, adv_callback def run_continuous_write_descriptor(cen_droid: Sl4aClient, cen_ed: EventDispatcher, per_droid: Sl4aClient, per_ed: EventDispatcher, gatt_server, gatt_server_callback, bluetooth_gatt, services_count, discovered_services_index, number_of_iterations=100000): log.info("Starting continuous write") bt_device_id = 0 status = 1 offset = 1 test_value = [1, 2, 3, 4, 5, 6, 7] test_value_return = [1, 2, 3] for _ in range(number_of_iterations): try: for i in range(services_count): characteristic_uuids = (cen_droid.gattClientGetDiscoveredCharacteristicUuids( discovered_services_index, i)) log.info(characteristic_uuids) for characteristic in characteristic_uuids: descriptor_uuids = (cen_droid.gattClientGetDiscoveredDescriptorUuids( discovered_services_index, i, characteristic)) log.info(descriptor_uuids) for descriptor in descriptor_uuids: cen_droid.gattClientDescriptorSetValue(bluetooth_gatt, discovered_services_index, i, characteristic, descriptor, test_value) cen_droid.gattClientWriteDescriptor(bluetooth_gatt, discovered_services_index, i, characteristic, descriptor) expected_event = \ GattCallbackString.DESC_WRITE_REQ.format( gatt_server_callback) try: event = per_ed.pop_event(expected_event, default_timeout) except Empty: log.error(GattCallbackError.DESC_WRITE_REQ_ERR.format(expected_event)) return False request_id = event['data']['requestId'] found_value = event['data']['value'] if found_value != test_value: log.error("Values didn't match. Found: {}, Expected: " "{}".format(found_value, test_value)) per_droid.gattServerSendResponse(gatt_server, bt_device_id, request_id, status, offset, test_value_return) expected_event = GattCallbackString.DESC_WRITE.format(bluetooth_gatt) try: cen_ed.pop_event(expected_event, default_timeout) except Empty: log.error(GattCallbackError.DESC_WRITE_ERR.format(expected_event)) raise Exception("Thread ended prematurely.") except Exception as err: log.error("Continuing but found exception: {}".format(err)) def setup_characteristics_and_descriptors_read_write(droid: Sl4aClient): characteristic_input = [ { 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 'permission': GattCharacteristic.PERMISSION_WRITE }, { 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_READ, 'permission': GattCharacteristic.PERMISSION_READ }, { 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 'permission': GattCharacteristic.PERMISSION_READ }, ] descriptor_input = [{ 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], }, { 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], }] characteristic_list = setup_gatt_characteristics(droid, characteristic_input) descriptor_list = setup_gatt_descriptors(droid, descriptor_input) return characteristic_list, descriptor_list def setup_multiple_services(peripheral: AndroidDevice): per_droid, per_ed = peripheral.sl4a, peripheral.sl4a.ed gatt_server_callback = per_droid.gattServerCreateGattServerCallback() gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback) characteristic_list, descriptor_list = (setup_characteristics_and_descriptors_read_write(per_droid)) per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[0]) per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[1]) gattService = per_droid.gattServerCreateService("00000000-0000-1000-8000-00805f9b34fb", GattServiceType.SERVICE_TYPE_PRIMARY) gattService2 = per_droid.gattServerCreateService("FFFFFFFF-0000-1000-8000-00805f9b34fb", GattServiceType.SERVICE_TYPE_PRIMARY) gattService3 = per_droid.gattServerCreateService("3846D7A0-69C8-11E4-BA00-0002A5D5C51B", GattServiceType.SERVICE_TYPE_PRIMARY) for characteristic in characteristic_list: per_droid.gattServerAddCharacteristicToService(gattService, characteristic) per_droid.gattServerAddService(gatt_server, gattService) expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback) try: per_ed.pop_event(expected_event, default_timeout) except Empty: peripheral.sl4a.gattServerClose(gatt_server) raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) for characteristic in characteristic_list: per_droid.gattServerAddCharacteristicToService(gattService2, characteristic) per_droid.gattServerAddService(gatt_server, gattService2) try: per_ed.pop_event(expected_event, default_timeout) except Empty: peripheral.sl4a.gattServerClose(gatt_server) raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) for characteristic in characteristic_list: per_droid.gattServerAddCharacteristicToService(gattService3, characteristic) per_droid.gattServerAddService(gatt_server, gattService3) try: per_ed.pop_event(expected_event, default_timeout) except Empty: peripheral.sl4a.gattServerClose(gatt_server) raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) return gatt_server_callback, gatt_server def setup_characteristics_and_descriptors_notify_read(droid: Sl4aClient): characteristic_input = [ { 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE, 'permission': GattCharacteristic.PROPERTY_WRITE }, { 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 'permission': GattCharacteristic.PERMISSION_READ }, { 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", 'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ, 'permission': GattCharacteristic.PERMISSION_READ }, ] descriptor_input = [{ 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], }, { 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", 'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0], }] characteristic_list = setup_gatt_characteristics(droid, characteristic_input) descriptor_list = setup_gatt_descriptors(droid, descriptor_input) return characteristic_list, descriptor_list def setup_gatt_characteristics(droid: Sl4aClient, input): characteristic_list = [] for item in input: index = droid.gattServerCreateBluetoothGattCharacteristic(item['uuid'], item['property'], item['permission']) characteristic_list.append(index) return characteristic_list def setup_gatt_descriptors(droid: Sl4aClient, input): descriptor_list = [] for item in input: index = droid.gattServerCreateBluetoothGattDescriptor( item['uuid'], item['property'], ) descriptor_list.append(index) log.info("setup descriptor list: {}".format(descriptor_list)) return descriptor_list def setup_gatt_mtu(central: AndroidDevice, bluetooth_gatt, gatt_callback, mtu): """utility function to set mtu for GATT connection. Steps: 1. Request mtu change. 2. Check if the mtu is changed to the new value Args: central: test device for client to scan. bluetooth_gatt: GATT object mtu: new mtu value to be set Returns: If success, return True. if fail, return False """ central.sl4a.gattClientRequestMtu(bluetooth_gatt, mtu) expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback) try: mtu_event = central.ed.pop_event(expected_event, default_timeout) mtu_size_found = mtu_event['data']['MTU'] if mtu_size_found != mtu: log.error("MTU size found: {}, expected: {}".format(mtu_size_found, mtu)) return False except Empty: log.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event)) return False return True def log_gatt_server_uuids(central: AndroidDevice, discovered_services_index, bluetooth_gatt=None): services_count = central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index) for i in range(services_count): service = central.sl4a.gattClientGetDiscoveredServiceUuid(discovered_services_index, i) log.info("Discovered service uuid {}".format(service)) characteristic_uuids = (central.sl4a.gattClientGetDiscoveredCharacteristicUuids(discovered_services_index, i)) for j in range(len(characteristic_uuids)): descriptor_uuids = (central.sl4a.gattClientGetDiscoveredDescriptorUuidsByIndex( discovered_services_index, i, j)) if bluetooth_gatt: char_inst_id = central.sl4a.gattClientGetCharacteristicInstanceId(bluetooth_gatt, discovered_services_index, i, j) log.info("Discovered characteristic handle uuid: {} {}".format( hex(char_inst_id), characteristic_uuids[j])) for k in range(len(descriptor_uuids)): desc_inst_id = central.sl4a.gattClientGetDescriptorInstanceId(bluetooth_gatt, discovered_services_index, i, j, k) log.info("Discovered descriptor handle uuid: {} {}".format(hex(desc_inst_id), descriptor_uuids[k])) else: log.info("Discovered characteristic uuid: {}".format(characteristic_uuids[j])) for k in range(len(descriptor_uuids)): log.info("Discovered descriptor uuid {}".format(descriptor_uuids[k]))