• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17from acts.logger import LoggerProxy
18
19from acts.test_utils.bt.bt_test_utils import BtTestUtilsError
20from acts.test_utils.bt.bt_test_utils import get_mac_address_of_generic_advertisement
21from acts.test_utils.bt.GattEnum import GattCbErr
22from acts.test_utils.bt.GattEnum import GattCbStrings
23from acts.test_utils.bt.GattEnum import GattConnectionState
24from acts.test_utils.bt.GattEnum import GattCharacteristic
25from acts.test_utils.bt.GattEnum import GattDescriptor
26from acts.test_utils.bt.GattEnum import GattService
27from acts.test_utils.bt.GattEnum import GattTransport
28from acts.test_utils.bt.GattEnum import GattConnectionPriority
29import pprint
30from queue import Empty
31from contextlib import suppress
32
33default_timeout = 10
34log = LoggerProxy()
35
36
37class GattTestUtilsError(Exception):
38    pass
39
40
41def setup_gatt_connection(cen_ad, mac_address, autoconnect,
42                          transport=GattTransport.TRANSPORT_AUTO):
43    test_result = True
44    gatt_callback = cen_ad.droid.gattCreateGattCallback()
45    log.info("Gatt Connect to mac address {}.".format(mac_address))
46    bluetooth_gatt = cen_ad.droid.gattClientConnectGatt(
47        gatt_callback, mac_address, autoconnect, transport)
48    expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(gatt_callback)
49    try:
50        event = cen_ad.ed.pop_event(expected_event, default_timeout)
51    except Empty:
52        log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(expected_event))
53        test_result = False
54        return test_result, bluetooth_gatt, gatt_callback
55    if event['data']['State'] != GattConnectionState.STATE_CONNECTED.value:
56        log.info("Could not establish a connection to peripheral. Event "
57                 "Details:".format(pprint.pformat(event)))
58        test_result = False
59    return test_result, bluetooth_gatt, gatt_callback
60
61
62def disconnect_gatt_connection(cen_ad, bluetooth_gatt, gatt_callback):
63    cen_ad.droid.gattClientDisconnect(bluetooth_gatt)
64    expected_event = GattCbStrings.GATT_CONN_CHANGE.value.format(gatt_callback)
65    try:
66        event = cen_ad.ed.pop_event(expected_event, default_timeout)
67    except Empty:
68        log.error(GattCbErr.GATT_CONN_CHANGE_ERR.value.format(expected_event))
69        return False
70    if event['data']['State'] != GattConnectionState.STATE_DISCONNECTED.value:
71        return False
72    return True
73
74
75def orchestrate_gatt_connection(cen_ad,
76                                per_ad,
77                                transport=GattTransport.TRANSPORT_LE,
78                                mac_address=None,
79                                autoconnect=False):
80    adv_callback = None
81    if mac_address is None:
82        if transport == GattTransport.TRANSPORT_LE:
83            try:
84                mac_address, adv_callback = (
85                    get_mac_address_of_generic_advertisement(cen_ad, per_ad))
86            except BtTestUtilsError as err:
87                raise GattTestUtilsError("Error in getting mac address: {}".format(err))
88        else:
89            mac_address = per_ad.droid.bluetoothGetLocalAddress()
90            adv_callback = None
91    test_result, bluetooth_gatt, gatt_callback = setup_gatt_connection(
92        cen_ad, mac_address, autoconnect, transport)
93    if not test_result:
94        raise GattTestUtilsError("Could not connect to peripheral.")
95    return bluetooth_gatt, gatt_callback, adv_callback
96
97
98def run_continuous_write_descriptor(cen_droid, cen_ed, per_droid, per_ed,
99                                    gatt_server, gatt_server_callback,
100                                    bluetooth_gatt, services_count,
101                                    discovered_services_index):
102    log.info("Starting continuous write")
103    bt_device_id = 0
104    status = 1
105    offset = 1
106    test_value = "1,2,3,4,5,6,7"
107    test_value_return = "1,2,3"
108    with suppress(Exception):
109        for x in range(100000):
110            for i in range(services_count):
111                characteristic_uuids = (
112                    cen_droid.gattClientGetDiscoveredCharacteristicUuids(
113                        discovered_services_index, i))
114                log.info(characteristic_uuids)
115                for characteristic in characteristic_uuids:
116                    descriptor_uuids = (
117                        cen_droid.gattClientGetDiscoveredDescriptorUuids(
118                            discovered_services_index, i, characteristic))
119                    log.info(descriptor_uuids)
120                    for descriptor in descriptor_uuids:
121                        log.info("descriptor to be written {}".format(
122                            descriptor))
123                        cen_droid.gattClientDescriptorSetValue(
124                            bluetooth_gatt, discovered_services_index, i,
125                            characteristic, descriptor, test_value)
126                        cen_droid.gattClientWriteDescriptor(
127                            bluetooth_gatt, discovered_services_index, i,
128                            characteristic, descriptor)
129                        expected_event = GattCbStrings.DESC_WRITE_REQ.value.format(
130                            gatt_server_callback)
131                        try:
132                            event = per_ed.pop_event(expected_event,
133                                                     default_timeout)
134                        except Empty:
135                            log.error(
136                                GattCbErr.DESC_WRITE_REQ_ERR.value.format(
137                                    expected_event))
138                            return False
139                        request_id = event['data']['requestId']
140                        found_value = event['data']['value']
141                        if found_value != test_value:
142                            log.error(
143                                "Values didn't match. Found: {}, Expected: "
144                                "{}".format(found_value, test_value))
145                        per_droid.gattServerSendResponse(
146                            gatt_server, bt_device_id, request_id, status,
147                            offset, test_value_return)
148                        expected_event = GattCbStrings.DESC_WRITE.value.format(
149                            bluetooth_gatt)
150                        try:
151                            cen_ed.pop_event(expected_event, default_timeout)
152                        except Empty:
153                            log.error(GattCbErr.DESC_WRITE_ERR.value.format(
154                                expected_event))
155                            return False
156
157
158def setup_characteristics_and_descriptors(droid):
159    characteristic_input = [
160        {
161            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
162            'property': GattCharacteristic.PROPERTY_WRITE.value
163            | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value,
164            'permission': GattCharacteristic.PROPERTY_WRITE.value
165        },
166        {
167            'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
168            'property': GattCharacteristic.PROPERTY_NOTIFY.value
169            | GattCharacteristic.PROPERTY_READ.value,
170            'permission': GattCharacteristic.PERMISSION_READ.value
171        },
172        {
173            'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
174            'property': GattCharacteristic.PROPERTY_NOTIFY.value
175            | GattCharacteristic.PROPERTY_READ.value,
176            'permission': GattCharacteristic.PERMISSION_READ.value
177        },
178    ]
179    descriptor_input = [
180        {
181            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
182            'property': GattDescriptor.PERMISSION_READ.value
183            | GattDescriptor.PERMISSION_WRITE.value,
184        }, {
185            'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
186            'property': GattDescriptor.PERMISSION_READ.value
187            | GattCharacteristic.PERMISSION_WRITE.value,
188        }
189    ]
190    characteristic_list = setup_gatt_characteristics(droid,
191                                                     characteristic_input)
192    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
193    return characteristic_list, descriptor_list
194
195
196def setup_multiple_services(per_ad):
197    per_droid, per_ed = per_ad.droid, per_ad.ed
198    gatt_server_callback = per_droid.gattServerCreateGattServerCallback()
199    gatt_server = per_droid.gattServerOpenGattServer(gatt_server_callback)
200    characteristic_list, descriptor_list = (
201        setup_characteristics_and_descriptors(per_droid))
202    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[1],
203                                                    descriptor_list[0])
204    per_droid.gattServerCharacteristicAddDescriptor(characteristic_list[2],
205                                                    descriptor_list[1])
206    gattService = per_droid.gattServerCreateService(
207        "00000000-0000-1000-8000-00805f9b34fb",
208        GattService.SERVICE_TYPE_PRIMARY.value)
209    gattService2 = per_droid.gattServerCreateService(
210        "FFFFFFFF-0000-1000-8000-00805f9b34fb",
211        GattService.SERVICE_TYPE_PRIMARY.value)
212    gattService3 = per_droid.gattServerCreateService(
213        "3846D7A0-69C8-11E4-BA00-0002A5D5C51B",
214        GattService.SERVICE_TYPE_PRIMARY.value)
215    for characteristic in characteristic_list:
216        per_droid.gattServerAddCharacteristicToService(gattService,
217                                                       characteristic)
218    per_droid.gattServerAddService(gatt_server, gattService)
219    expected_event = GattCbStrings.SERV_ADDED.value.format(
220        gatt_server_callback)
221    try:
222        per_ed.pop_event(expected_event, default_timeout)
223    except Empty:
224        log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event))
225        return False
226    for characteristic in characteristic_list:
227        per_droid.gattServerAddCharacteristicToService(gattService2,
228                                                       characteristic)
229    per_droid.gattServerAddService(gatt_server, gattService2)
230    try:
231        per_ed.pop_event(expected_event, default_timeout)
232    except Empty:
233        log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event))
234        return False
235    for characteristic in characteristic_list:
236        per_droid.gattServerAddCharacteristicToService(gattService3,
237                                                       characteristic)
238    per_droid.gattServerAddService(gatt_server, gattService3)
239    try:
240        per_ed.pop_event(expected_event, default_timeout)
241    except Empty:
242        log.error(GattCbErr.SERV_ADDED_ERR.value.format(expected_event))
243        return False
244    return gatt_server_callback, gatt_server
245
246
247def setup_characteristics_and_descriptors(droid):
248    characteristic_input = [
249        {
250            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
251            'property': GattCharacteristic.PROPERTY_WRITE.value
252            | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value,
253            'permission': GattCharacteristic.PROPERTY_WRITE.value
254        },
255        {
256            'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8",
257            'property': GattCharacteristic.PROPERTY_NOTIFY.value
258            | GattCharacteristic.PROPERTY_READ.value,
259            'permission': GattCharacteristic.PERMISSION_READ.value
260        },
261        {
262            'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6",
263            'property': GattCharacteristic.PROPERTY_NOTIFY.value
264            | GattCharacteristic.PROPERTY_READ.value,
265            'permission': GattCharacteristic.PERMISSION_READ.value
266        },
267    ]
268    descriptor_input = [
269        {
270            'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630",
271            'property': GattDescriptor.PERMISSION_READ.value
272            | GattDescriptor.PERMISSION_WRITE.value,
273        }, {
274            'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32",
275            'property': GattDescriptor.PERMISSION_READ.value
276            | GattCharacteristic.PERMISSION_WRITE.value,
277        }
278    ]
279    characteristic_list = setup_gatt_characteristics(droid,
280                                                     characteristic_input)
281    descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
282    return characteristic_list, descriptor_list
283
284
285def setup_gatt_characteristics(droid, input):
286    characteristic_list = []
287    for item in input:
288        index = droid.gattServerCreateBluetoothGattCharacteristic(
289            item['uuid'], item['property'], item['permission'])
290        characteristic_list.append(index)
291    return characteristic_list
292
293
294def setup_gatt_descriptors(droid, input):
295    descriptor_list = []
296    for item in input:
297        index = droid.gattServerCreateBluetoothGattDescriptor(
298            item['uuid'],
299            item['property'], )
300        descriptor_list.append(index)
301    log.info("setup descriptor list: {}".format(descriptor_list))
302    return descriptor_list
303