• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Original file:
18    tools/test/connectivity/acts_tests/acts_contrib/test_utils/bt/bt_gatt_utils.py
19"""
20
21import logging
22import pprint
23from queue import Empty
24
25from blueberry.utils.bt_gatt_constants import GattCallbackError
26from blueberry.utils.bt_gatt_constants import GattCallbackString
27from blueberry.utils.bt_gatt_constants import GattCharacteristic
28from blueberry.utils.bt_gatt_constants import GattConnectionState
29from blueberry.utils.bt_gatt_constants import GattDescriptor
30from blueberry.utils.bt_gatt_constants import GattPhyMask
31from blueberry.utils.bt_gatt_constants import GattServiceType
32from blueberry.utils.bt_gatt_constants import GattTransport
33from blueberry.utils.bt_test_utils import BtTestUtilsError
34from blueberry.utils.bt_test_utils import get_mac_address_of_generic_advertisement
35from mobly.controllers.android_device import AndroidDevice
36from mobly.controllers.android_device_lib.event_dispatcher import EventDispatcher
37from mobly.controllers.android_device_lib.sl4a_client import Sl4aClient
38
39default_timeout = 10
40log = logging
41
42
43class GattTestUtilsError(Exception):
44    pass
45
46
47def setup_gatt_connection(central: AndroidDevice,
48                          mac_address,
49                          autoconnect,
50                          transport=GattTransport.TRANSPORT_AUTO,
51                          opportunistic=False,
52                          timeout_seconds=default_timeout):
53    gatt_callback = central.sl4a.gattCreateGattCallback()
54    log.info("Gatt Connect to mac address {}.".format(mac_address))
55    bluetooth_gatt = central.sl4a.gattClientConnectGatt(gatt_callback, mac_address, autoconnect, transport,
56                                                        opportunistic, GattPhyMask.PHY_LE_1M_MASK)
57    expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
58    try:
59        event = central.ed.pop_event(expected_event, timeout_seconds)
60    except Empty:
61        close_gatt_client(central, bluetooth_gatt)
62        raise GattTestUtilsError("Could not establish a connection to "
63                                 "peripheral. Expected event: {}".format(expected_event))
64    logging.info("Got connection event {}".format(event))
65    if event['data']['State'] != GattConnectionState.STATE_CONNECTED:
66        close_gatt_client(central, bluetooth_gatt)
67        raise GattTestUtilsError("Could not establish a connection to "
68                                 "peripheral. Event Details: {}".format(pprint.pformat(event)))
69    return bluetooth_gatt, gatt_callback
70
71
72def wait_for_gatt_connection(central: AndroidDevice, gatt_callback, bluetooth_gatt, timeout):
73    expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
74    try:
75        event = central.ed.pop_event(expected_event, timeout=timeout)
76    except Empty:
77        close_gatt_client(central, bluetooth_gatt)
78        raise GattTestUtilsError("Could not establish a connection to "
79                                 "peripheral. Expected event: {}".format(expected_event))
80    if event['data']['State'] != GattConnectionState.STATE_CONNECTED:
81        close_gatt_client(central, bluetooth_gatt)
82        try:
83            central.sl4a.gattClientClose(bluetooth_gatt)
84        except Exception:
85            logging.debug("Failed to close gatt client.")
86        raise GattTestUtilsError("Could not establish a connection to "
87                                 "peripheral. Event Details: {}".format(pprint.pformat(event)))
88
89
90def close_gatt_client(central: AndroidDevice, bluetooth_gatt):
91    try:
92        central.sl4a.gattClientClose(bluetooth_gatt)
93    except Exception:
94        log.debug("Failed to close gatt client.")
95
96
97def disconnect_gatt_connection(central: AndroidDevice, bluetooth_gatt, gatt_callback):
98    central.sl4a.gattClientDisconnect(bluetooth_gatt)
99    wait_for_gatt_disconnect_event(central, gatt_callback)
100    return
101
102
103def wait_for_gatt_disconnect_event(central: AndroidDevice, gatt_callback):
104    expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
105    try:
106        event = central.ed.pop_event(expected_event, default_timeout)
107    except Empty:
108        raise GattTestUtilsError(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event))
109    found_state = event['data']['State']
110    expected_state = GattConnectionState.STATE_DISCONNECTED
111    if found_state != expected_state:
112        raise GattTestUtilsError("GATT connection state change expected {}, found {}".format(
113            expected_event, found_state))
114    return
115
116
117def orchestrate_gatt_connection(central: AndroidDevice,
118                                peripheral: AndroidDevice,
119                                transport=GattTransport.TRANSPORT_LE,
120                                mac_address=None,
121                                autoconnect=False,
122                                opportunistic=False):
123    adv_callback = None
124    if mac_address is None:
125        if transport == GattTransport.TRANSPORT_LE:
126            try:
127                mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement(
128                    central, peripheral))
129            except BtTestUtilsError as err:
130                raise GattTestUtilsError("Error in getting mac address: {}".format(err))
131        else:
132            mac_address = peripheral.sl4a.bluetoothGetLocalAddress()
133            adv_callback = None
134    bluetooth_gatt, gatt_callback = setup_gatt_connection(central, mac_address, autoconnect, transport, opportunistic)
135    return bluetooth_gatt, gatt_callback, adv_callback
136
137
138def run_continuous_write_descriptor(cen_droid: Sl4aClient,
139                                    cen_ed: EventDispatcher,
140                                    per_droid: Sl4aClient,
141                                    per_ed: EventDispatcher,
142                                    gatt_server,
143                                    gatt_server_callback,
144                                    bluetooth_gatt,
145                                    services_count,
146                                    discovered_services_index,
147                                    number_of_iterations=100000):
148    log.info("Starting continuous write")
149    bt_device_id = 0
150    status = 1
151    offset = 1
152    test_value = [1, 2, 3, 4, 5, 6, 7]
153    test_value_return = [1, 2, 3]
154    for _ in range(number_of_iterations):
155        try:
156            for i in range(services_count):
157                characteristic_uuids = (cen_droid.gattClientGetDiscoveredCharacteristicUuids(
158                    discovered_services_index, i))
159                log.info(characteristic_uuids)
160                for characteristic in characteristic_uuids:
161                    descriptor_uuids = (cen_droid.gattClientGetDiscoveredDescriptorUuids(
162                        discovered_services_index, i, characteristic))
163                    log.info(descriptor_uuids)
164                    for descriptor in descriptor_uuids:
165                        cen_droid.gattClientDescriptorSetValue(bluetooth_gatt, discovered_services_index, i,
166                                                               characteristic, descriptor, test_value)
167                        cen_droid.gattClientWriteDescriptor(bluetooth_gatt, discovered_services_index, i,
168                                                            characteristic, descriptor)
169                        expected_event = \
170                            GattCallbackString.DESC_WRITE_REQ.format(
171                                gatt_server_callback)
172                        try:
173                            event = per_ed.pop_event(expected_event, default_timeout)
174                        except Empty:
175                            log.error(GattCallbackError.DESC_WRITE_REQ_ERR.format(expected_event))
176                            return False
177                        request_id = event['data']['requestId']
178                        found_value = event['data']['value']
179                        if found_value != test_value:
180                            log.error("Values didn't match. Found: {}, Expected: " "{}".format(found_value, test_value))
181                        per_droid.gattServerSendResponse(gatt_server, bt_device_id, request_id, status, offset,
182                                                         test_value_return)
183                        expected_event = GattCallbackString.DESC_WRITE.format(bluetooth_gatt)
184                        try:
185                            cen_ed.pop_event(expected_event, default_timeout)
186                        except Empty:
187                            log.error(GattCallbackError.DESC_WRITE_ERR.format(expected_event))
188                            raise Exception("Thread ended prematurely.")
189        except Exception as err:
190            log.error("Continuing but found exception: {}".format(err))
191
192
193def setup_characteristics_and_descriptors_read_write(droid: Sl4aClient):
194    characteristic_input = [
195        {
196            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
197            'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
198            'permission': GattCharacteristic.PERMISSION_WRITE
199        },
200        {
201            'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
202            'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_READ,
203            'permission': GattCharacteristic.PERMISSION_READ
204        },
205        {
206            'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
207            'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ,
208            'permission': GattCharacteristic.PERMISSION_READ
209        },
210    ]
211    descriptor_input = [{
212        'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
213        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
214    }, {
215        'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
216        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
217    }]
218    characteristic_list = setup_gatt_characteristics(droid, characteristic_input)
219    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
220    return characteristic_list, descriptor_list
221
222
223def setup_multiple_services(peripheral: AndroidDevice):
224    per_droid, per_ed = peripheral.sl4a, peripheral.sl4a.ed
225    gatt_server_callback = per_droid.gattServerCreateGattServerCallback()
226    gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback)
227    characteristic_list, descriptor_list = (setup_characteristics_and_descriptors_read_write(per_droid))
228    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1], descriptor_list[0])
229    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2], descriptor_list[1])
230    gattService = per_droid.gattServerCreateService("00000000-0000-1000-8000-00805f9b34fb",
231                                                    GattServiceType.SERVICE_TYPE_PRIMARY)
232    gattService2 = per_droid.gattServerCreateService("FFFFFFFF-0000-1000-8000-00805f9b34fb",
233                                                     GattServiceType.SERVICE_TYPE_PRIMARY)
234    gattService3 = per_droid.gattServerCreateService("3846D7A0-69C8-11E4-BA00-0002A5D5C51B",
235                                                     GattServiceType.SERVICE_TYPE_PRIMARY)
236    for characteristic in characteristic_list:
237        per_droid.gattServerAddCharacteristicToService(gattService, characteristic)
238    per_droid.gattServerAddService(gatt_server, gattService)
239    expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_callback)
240    try:
241        per_ed.pop_event(expected_event, default_timeout)
242    except Empty:
243        peripheral.sl4a.gattServerClose(gatt_server)
244        raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
245    for characteristic in characteristic_list:
246        per_droid.gattServerAddCharacteristicToService(gattService2, characteristic)
247    per_droid.gattServerAddService(gatt_server, gattService2)
248    try:
249        per_ed.pop_event(expected_event, default_timeout)
250    except Empty:
251        peripheral.sl4a.gattServerClose(gatt_server)
252        raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
253    for characteristic in characteristic_list:
254        per_droid.gattServerAddCharacteristicToService(gattService3, characteristic)
255    per_droid.gattServerAddService(gatt_server, gattService3)
256    try:
257        per_ed.pop_event(expected_event, default_timeout)
258    except Empty:
259        peripheral.sl4a.gattServerClose(gatt_server)
260        raise GattTestUtilsError(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
261    return gatt_server_callback, gatt_server
262
263
264def setup_characteristics_and_descriptors_notify_read(droid: Sl4aClient):
265    characteristic_input = [
266        {
267            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
268            'property': GattCharacteristic.PROPERTY_WRITE | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE,
269            'permission': GattCharacteristic.PROPERTY_WRITE
270        },
271        {
272            'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
273            'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ,
274            'permission': GattCharacteristic.PERMISSION_READ
275        },
276        {
277            'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
278            'property': GattCharacteristic.PROPERTY_NOTIFY | GattCharacteristic.PROPERTY_READ,
279            'permission': GattCharacteristic.PERMISSION_READ
280        },
281    ]
282    descriptor_input = [{
283        'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
284        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
285    }, {
286        'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
287        'property': GattDescriptor.PERMISSION_READ[0] | GattDescriptor.PERMISSION_WRITE[0],
288    }]
289    characteristic_list = setup_gatt_characteristics(droid, characteristic_input)
290    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
291    return characteristic_list, descriptor_list
292
293
294def setup_gatt_characteristics(droid: Sl4aClient, input):
295    characteristic_list = []
296    for item in input:
297        index = droid.gattServerCreateBluetoothGattCharacteristic(item['uuid'], item['property'], item['permission'])
298        characteristic_list.append(index)
299    return characteristic_list
300
301
302def setup_gatt_descriptors(droid: Sl4aClient, input):
303    descriptor_list = []
304    for item in input:
305        index = droid.gattServerCreateBluetoothGattDescriptor(
306            item['uuid'],
307            item['property'],
308        )
309        descriptor_list.append(index)
310    log.info("setup descriptor list: {}".format(descriptor_list))
311    return descriptor_list
312
313
314def setup_gatt_mtu(central: AndroidDevice, bluetooth_gatt, gatt_callback, mtu):
315    """utility function to set mtu for GATT connection.
316
317    Steps:
318    1. Request mtu change.
319    2. Check if the mtu is changed to the new value
320
321    Args:
322        central: test device for client to scan.
323        bluetooth_gatt: GATT object
324        mtu: new mtu value to be set
325
326    Returns:
327        If success, return True.
328        if fail, return False
329    """
330    central.sl4a.gattClientRequestMtu(bluetooth_gatt, mtu)
331    expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback)
332    try:
333        mtu_event = central.ed.pop_event(expected_event, default_timeout)
334        mtu_size_found = mtu_event['data']['MTU']
335        if mtu_size_found != mtu:
336            log.error("MTU size found: {}, expected: {}".format(mtu_size_found, mtu))
337            return False
338    except Empty:
339        log.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event))
340        return False
341    return True
342
343
344def log_gatt_server_uuids(central: AndroidDevice, discovered_services_index, bluetooth_gatt=None):
345    services_count = central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index)
346    for i in range(services_count):
347        service = central.sl4a.gattClientGetDiscoveredServiceUuid(discovered_services_index, i)
348        log.info("Discovered service uuid {}".format(service))
349        characteristic_uuids = (central.sl4a.gattClientGetDiscoveredCharacteristicUuids(discovered_services_index, i))
350        for j in range(len(characteristic_uuids)):
351            descriptor_uuids = (central.sl4a.gattClientGetDiscoveredDescriptorUuidsByIndex(
352                discovered_services_index, i, j))
353            if bluetooth_gatt:
354                char_inst_id = central.sl4a.gattClientGetCharacteristicInstanceId(bluetooth_gatt,
355                                                                                  discovered_services_index, i, j)
356                log.info("Discovered characteristic handle uuid: {} {}".format(
357                    hex(char_inst_id), characteristic_uuids[j]))
358                for k in range(len(descriptor_uuids)):
359                    desc_inst_id = central.sl4a.gattClientGetDescriptorInstanceId(bluetooth_gatt,
360                                                                                  discovered_services_index, i, j, k)
361                    log.info("Discovered descriptor handle uuid: {} {}".format(hex(desc_inst_id), descriptor_uuids[k]))
362            else:
363                log.info("Discovered characteristic uuid: {}".format(characteristic_uuids[j]))
364                for k in range(len(descriptor_uuids)):
365                    log.info("Discovered descriptor uuid {}".format(descriptor_uuids[k]))
366