• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import logging
18
19from acts_contrib.test_utils.bt.bt_test_utils import BtTestUtilsError
20from acts_contrib.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement
21from acts_contrib.test_utils.bt.bt_constants import gatt_cb_err
22from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings
23from acts_contrib.test_utils.bt.bt_constants import gatt_connection_state
24from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic
25from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor
26from acts_contrib.test_utils.bt.bt_constants import gatt_phy_mask
27from acts_contrib.test_utils.bt.bt_constants import gatt_service_types
28from acts_contrib.test_utils.bt.bt_constants import gatt_transport
29import pprint
30from queue import Empty
31
32default_timeout = 10
33log = logging
34
35
36class GattTestUtilsError(Exception):
37    pass
38
39
40def setup_gatt_connection(cen_ad,
41                          mac_address,
42                          autoconnect,
43                          transport=gatt_transport['auto'],
44                          opportunistic=False):
45    gatt_callback = cen_ad.droid.gattCreateGattCallback()
46    log.info("Gatt Connect to mac address {}.".format(mac_address))
47    bluetooth_gatt = cen_ad.droid.gattClientConnectGatt(
48        gatt_callback, mac_address, autoconnect, transport, opportunistic,
49        gatt_phy_mask['1m_mask'])
50    expected_event = gatt_cb_strings['gatt_conn_change'].format(gatt_callback)
51    try:
52        event = cen_ad.ed.pop_event(expected_event, default_timeout)
53    except Empty:
54        close_gatt_client(cen_ad, bluetooth_gatt)
55        raise GattTestUtilsError(
56            "Could not establish a connection to "
57            "peripheral. Expected event: {}".format(expected_event))
58    if event['data']['State'] != gatt_connection_state['connected']:
59        close_gatt_client(cen_ad, bluetooth_gatt)
60        try:
61            cen_ad.droid.gattClientClose(bluetooth_gatt)
62        except Exception:
63            self.log.debug("Failed to close gatt client.")
64        raise GattTestUtilsError("Could not establish a connection to "
65                                 "peripheral. Event Details: {}".format(
66                                     pprint.pformat(event)))
67    return bluetooth_gatt, gatt_callback
68
69
70def close_gatt_client(cen_ad, bluetooth_gatt):
71    try:
72        cen_ad.droid.gattClientClose(bluetooth_gatt)
73    except Exception:
74        log.debug("Failed to close gatt client.")
75
76
77def disconnect_gatt_connection(cen_ad, bluetooth_gatt, gatt_callback):
78    cen_ad.droid.gattClientDisconnect(bluetooth_gatt)
79    wait_for_gatt_disconnect_event(cen_ad, gatt_callback)
80    return
81
82
83def wait_for_gatt_disconnect_event(cen_ad, gatt_callback):
84    expected_event = gatt_cb_strings['gatt_conn_change'].format(gatt_callback)
85    try:
86        event = cen_ad.ed.pop_event(expected_event, default_timeout)
87    except Empty:
88        raise GattTestUtilsError(
89            gatt_cb_err['gatt_conn_change_err'].format(expected_event))
90    found_state = event['data']['State']
91    expected_state = gatt_connection_state['disconnected']
92    if found_state != expected_state:
93        raise GattTestUtilsError(
94            "GATT connection state change expected {}, found {}".format(
95                expected_event, found_state))
96    return
97
98
99def orchestrate_gatt_connection(cen_ad,
100                                per_ad,
101                                transport=gatt_transport['le'],
102                                mac_address=None,
103                                autoconnect=False,
104                                opportunistic=False):
105    adv_callback = None
106    if mac_address is None:
107        if transport == gatt_transport['le']:
108            try:
109                mac_address, adv_callback, scan_callback = (
110                    get_mac_address_of_generic_advertisement(cen_ad, per_ad))
111            except BtTestUtilsError as err:
112                raise GattTestUtilsError(
113                    "Error in getting mac address: {}".format(err))
114        else:
115            mac_address = per_ad.droid.bluetoothGetLocalAddress()
116            adv_callback = None
117    bluetooth_gatt, gatt_callback = setup_gatt_connection(
118        cen_ad, mac_address, autoconnect, transport, opportunistic)
119    return bluetooth_gatt, gatt_callback, adv_callback
120
121
122def run_continuous_write_descriptor(cen_droid,
123                                    cen_ed,
124                                    per_droid,
125                                    per_ed,
126                                    gatt_server,
127                                    gatt_server_callback,
128                                    bluetooth_gatt,
129                                    services_count,
130                                    discovered_services_index,
131                                    number_of_iterations=100000):
132    log.info("Starting continuous write")
133    bt_device_id = 0
134    status = 1
135    offset = 1
136    test_value = [1, 2, 3, 4, 5, 6, 7]
137    test_value_return = [1, 2, 3]
138    for _ in range(number_of_iterations):
139        try:
140            for i in range(services_count):
141                characteristic_uuids = (
142                    cen_droid.gattClientGetDiscoveredCharacteristicUuids(
143                        discovered_services_index, i))
144                log.info(characteristic_uuids)
145                for characteristic in characteristic_uuids:
146                    descriptor_uuids = (
147                        cen_droid.gattClientGetDiscoveredDescriptorUuids(
148                            discovered_services_index, i, characteristic))
149                    log.info(descriptor_uuids)
150                    for descriptor in descriptor_uuids:
151                        cen_droid.gattClientDescriptorSetValue(
152                            bluetooth_gatt, discovered_services_index, i,
153                            characteristic, descriptor, test_value)
154                        cen_droid.gattClientWriteDescriptor(
155                            bluetooth_gatt, discovered_services_index, i,
156                            characteristic, descriptor)
157                        expected_event = gatt_cb_strings[
158                            'desc_write_req'].format(gatt_server_callback)
159                        try:
160                            event = per_ed.pop_event(expected_event,
161                                                     default_timeout)
162                        except Empty:
163                            log.error(gatt_cb_err['desc_write_req_err'].format(
164                                expected_event))
165                            return False
166                        request_id = event['data']['requestId']
167                        found_value = event['data']['value']
168                        if found_value != test_value:
169                            log.error(
170                                "Values didn't match. Found: {}, Expected: "
171                                "{}".format(found_value, test_value))
172                        per_droid.gattServerSendResponse(
173                            gatt_server, bt_device_id, request_id, status,
174                            offset, test_value_return)
175                        expected_event = gatt_cb_strings['desc_write'].format(
176                            bluetooth_gatt)
177                        try:
178                            cen_ed.pop_event(expected_event, default_timeout)
179                        except Empty:
180                            log.error(gatt_cb_strings['desc_write_err'].format(
181                                expected_event))
182                            raise Exception("Thread ended prematurely.")
183        except Exception as err:
184            log.error("Continuing but found exception: {}".format(err))
185
186
187def setup_characteristics_and_descriptors(droid):
188    characteristic_input = [
189        {
190            'uuid':
191            "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
192            'property':
193            gatt_characteristic['property_write']
194            | gatt_characteristic['property_write_no_response'],
195            'permission':
196            gatt_characteristic['permission_write']
197        },
198        {
199            'uuid':
200            "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
201            'property':
202            gatt_characteristic['property_notify']
203            | gatt_characteristic['property_read'],
204            'permission':
205            gatt_characteristic['permission_read']
206        },
207        {
208            'uuid':
209            "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
210            'property':
211            gatt_characteristic['property_notify']
212            | gatt_characteristic['property_read'],
213            'permission':
214            gatt_characteristic['permission_read']
215        },
216    ]
217    descriptor_input = [{
218        'uuid':
219        "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
220        'property':
221        gatt_descriptor['permission_read']
222        | gatt_descriptor['permission_write'],
223    }, {
224        'uuid':
225        "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
226        'property':
227        gatt_descriptor['permission_read']
228        | gatt_characteristic['permission_write'],
229    }]
230    characteristic_list = setup_gatt_characteristics(droid,
231                                                     characteristic_input)
232    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
233    return characteristic_list, descriptor_list
234
235
236def setup_multiple_services(per_ad):
237    per_droid, per_ed = per_ad.droid, per_ad.ed
238    gatt_server_callback = per_droid.gattServerCreateGattServerCallback()
239    gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback)
240    characteristic_list, descriptor_list = (
241        setup_characteristics_and_descriptors(per_droid))
242    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1],
243                                                    descriptor_list[0])
244    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2],
245                                                    descriptor_list[1])
246    gattService = per_droid.gattServerCreateService(
247        "00000000-0000-1000-8000-00805f9b34fb", gatt_service_types['primary'])
248    gattService2 = per_droid.gattServerCreateService(
249        "FFFFFFFF-0000-1000-8000-00805f9b34fb", gatt_service_types['primary'])
250    gattService3 = per_droid.gattServerCreateService(
251        "3846D7A0-69C8-11E4-BA00-0002A5D5C51B", gatt_service_types['primary'])
252    for characteristic in characteristic_list:
253        per_droid.gattServerAddCharacteristicToService(gattService,
254                                                       characteristic)
255    per_droid.gattServerAddService(gatt_server, gattService)
256    expected_event = gatt_cb_strings['serv_added'].format(gatt_server_callback)
257    try:
258        per_ed.pop_event(expected_event, default_timeout)
259    except Empty:
260        per_ad.droid.gattServerClose(gatt_server)
261        raise GattTestUtilsError(
262            gatt_cb_strings['serv_added_err'].format(expected_event))
263    for characteristic in characteristic_list:
264        per_droid.gattServerAddCharacteristicToService(gattService2,
265                                                       characteristic)
266    per_droid.gattServerAddService(gatt_server, gattService2)
267    try:
268        per_ed.pop_event(expected_event, default_timeout)
269    except Empty:
270        per_ad.droid.gattServerClose(gatt_server)
271        raise GattTestUtilsError(
272            gatt_cb_strings['serv_added_err'].format(expected_event))
273    for characteristic in characteristic_list:
274        per_droid.gattServerAddCharacteristicToService(gattService3,
275                                                       characteristic)
276    per_droid.gattServerAddService(gatt_server, gattService3)
277    try:
278        per_ed.pop_event(expected_event, default_timeout)
279    except Empty:
280        per_ad.droid.gattServerClose(gatt_server)
281        raise GattTestUtilsError(
282            gatt_cb_strings['serv_added_err'].format(expected_event))
283    return gatt_server_callback, gatt_server
284
285
286def setup_characteristics_and_descriptors(droid):
287    characteristic_input = [
288        {
289            'uuid':
290            "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
291            'property':
292            gatt_characteristic['property_write']
293            | gatt_characteristic['property_write_no_response'],
294            'permission':
295            gatt_characteristic['property_write']
296        },
297        {
298            'uuid':
299            "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
300            'property':
301            gatt_characteristic['property_notify']
302            | gatt_characteristic['property_read'],
303            'permission':
304            gatt_characteristic['permission_read']
305        },
306        {
307            'uuid':
308            "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
309            'property':
310            gatt_characteristic['property_notify']
311            | gatt_characteristic['property_read'],
312            'permission':
313            gatt_characteristic['permission_read']
314        },
315    ]
316    descriptor_input = [{
317        'uuid':
318        "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
319        'property':
320        gatt_descriptor['permission_read']
321        | gatt_descriptor['permission_write'],
322    }, {
323        'uuid':
324        "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
325        'property':
326        gatt_descriptor['permission_read']
327        | gatt_characteristic['permission_write'],
328    }]
329    characteristic_list = setup_gatt_characteristics(droid,
330                                                     characteristic_input)
331    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
332    return characteristic_list, descriptor_list
333
334
335def setup_gatt_characteristics(droid, input):
336    characteristic_list = []
337    for item in input:
338        index = droid.gattServerCreateBluetoothGattCharacteristic(
339            item['uuid'], item['property'], item['permission'])
340        characteristic_list.append(index)
341    return characteristic_list
342
343
344def setup_gatt_descriptors(droid, input):
345    descriptor_list = []
346    for item in input:
347        index = droid.gattServerCreateBluetoothGattDescriptor(
348            item['uuid'],
349            item['property'],
350        )
351        descriptor_list.append(index)
352    log.info("setup descriptor list: {}".format(descriptor_list))
353    return descriptor_list
354
355
356def setup_gatt_mtu(cen_ad, bluetooth_gatt, gatt_callback, mtu):
357    """utility function to set mtu for GATT connection.
358
359    Steps:
360    1. Request mtu change.
361    2. Check if the mtu is changed to the new value
362
363    Args:
364        cen_ad: test device for client to scan.
365        bluetooth_gatt: GATT object
366        mtu: new mtu value to be set
367
368    Returns:
369        If success, return True.
370        if fail, return False
371    """
372    cen_ad.droid.gattClientRequestMtu(bluetooth_gatt, mtu)
373    expected_event = gatt_cb_strings['mtu_changed'].format(gatt_callback)
374    try:
375        mtu_event = cen_ad.ed.pop_event(expected_event, default_timeout)
376        mtu_size_found = mtu_event['data']['MTU']
377        if mtu_size_found != mtu:
378            log.error("MTU size found: {}, expected: {}".format(
379                mtu_size_found, mtu))
380            return False
381    except Empty:
382        log.error(gatt_cb_err['mtu_changed_err'].format(expected_event))
383        return False
384    return True
385
386
387def log_gatt_server_uuids(cen_ad,
388                          discovered_services_index,
389                          bluetooth_gatt=None):
390    services_count = cen_ad.droid.gattClientGetDiscoveredServicesCount(
391        discovered_services_index)
392    for i in range(services_count):
393        service = cen_ad.droid.gattClientGetDiscoveredServiceUuid(
394            discovered_services_index, i)
395        log.info("Discovered service uuid {}".format(service))
396        characteristic_uuids = (
397            cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids(
398                discovered_services_index, i))
399        for j in range(len(characteristic_uuids)):
400            descriptor_uuids = (
401                cen_ad.droid.gattClientGetDiscoveredDescriptorUuidsByIndex(
402                    discovered_services_index, i, j))
403            if bluetooth_gatt:
404                char_inst_id = cen_ad.droid.gattClientGetCharacteristicInstanceId(
405                    bluetooth_gatt, discovered_services_index, i, j)
406                log.info("Discovered characteristic handle uuid: {} {}".format(
407                    hex(char_inst_id), characteristic_uuids[j]))
408                for k in range(len(descriptor_uuids)):
409                    desc_inst_id = cen_ad.droid.gattClientGetDescriptorInstanceId(
410                        bluetooth_gatt, discovered_services_index, i, j, k)
411                    log.info("Discovered descriptor handle uuid: {} {}".format(
412                        hex(desc_inst_id), descriptor_uuids[k]))
413            else:
414                log.info("Discovered characteristic uuid: {}".format(
415                    characteristic_uuids[j]))
416                for k in range(len(descriptor_uuids)):
417                    log.info("Discovered descriptor uuid {}".format(
418                        descriptor_uuids[k]))
419