• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import time
18import os
19
20from acts.keys import Config
21from acts.utils import rand_ascii_str
22from acts.test_utils.bt.bt_constants import gatt_cb_strings
23from acts.test_utils.bt.bt_constants import gatt_characteristic
24from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format
25from acts.test_utils.bt.bt_constants import gatt_cb_err
26from acts.test_utils.bt.bt_constants import gatt_transport
27from acts.test_utils.bt.bt_constants import gatt_event
28from acts.test_utils.bt.bt_constants import gatt_server_responses
29from acts.test_utils.bt.bt_constants import gatt_service_types
30from acts.test_utils.bt.bt_constants import small_timeout
31from acts.test_utils.bt.gatt_test_database import STRING_512BYTES
32
33from acts.utils import exe_cmd
34from math import ceil
35
36
37class GattServerLib():
38
39    characteristic_list = []
40    default_timeout = 10
41    descriptor_list = []
42    dut = None
43    gatt_server = None
44    gatt_server_callback = None
45    gatt_server_list = []
46    log = None
47    service_list = []
48    write_mapping = {}
49
50    def __init__(self, log, dut):
51        self.dut = dut
52        self.log = log
53
54    def list_all_uuids(self):
55        """From the GATT Client, discover services and list all services,
56        chars and descriptors.
57        """
58        self.log.info("Service List:")
59        for service in self.dut.droid.gattGetServiceUuidList(self.gatt_server):
60            self.dut.log.info("GATT Server service uuid: {}".format(service))
61        self.log.info("Characteristics List:")
62        for characteristic in self.characteristic_list:
63            instance_id = self.dut.droid.gattServerGetCharacteristicInstanceId(
64                characteristic)
65            uuid = self.dut.droid.gattServerGetCharacteristicUuid(
66                characteristic)
67            self.dut.log.info(
68                "GATT Server characteristic handle uuid: {} {}".format(
69                    hex(instance_id), uuid))
70        # TODO: add getting insance ids and uuids from each descriptor.
71
72    def open(self):
73        """Open an empty GATT Server instance"""
74        self.gatt_server_callback = self.dut.droid.gattServerCreateGattServerCallback(
75        )
76        self.gatt_server = self.dut.droid.gattServerOpenGattServer(
77            self.gatt_server_callback)
78        self.gatt_server_list.append(self.gatt_server)
79
80    def clear_services(self):
81        """Clear BluetoothGattServices from BluetoothGattServer"""
82        self.dut.droid.gattServerClearServices(self.gatt_server)
83
84    def close_bluetooth_gatt_servers(self):
85        """Close Bluetooth Gatt Servers"""
86        try:
87            for btgs in self.gatt_server_list:
88                self.dut.droid.gattServerClose(btgs)
89        except Exception as err:
90            self.log.error(
91                "Failed to close Bluetooth GATT Servers: {}".format(err))
92        self.characteristic_list = []
93        self.descriptor_list = []
94        self.gatt_server_list = []
95        self.service_list = []
96
97    def characteristic_set_value_by_instance_id(self, instance_id, value):
98        """Set Characteristic value by instance id"""
99        self.dut.droid.gattServerCharacteristicSetValueByInstanceId(
100            int(instance_id, 16), value)
101
102    def notify_characteristic_changed(self, instance_id, confirm):
103        """ Notify characteristic changed """
104        self.dut.droid.gattServerNotifyCharacteristicChangedByInstanceId(
105            self.gatt_server, 0, int(instance_id, 16), confirm)
106
107    def send_response(self, user_input):
108        """Send a single response to the GATT Client"""
109        args = user_input.split()
110        mtu = 23
111        if len(args) == 2:
112            user_input = args[0]
113            mtu = int(args[1])
114        desc_read = gatt_event['desc_read_req']['evt'].format(
115            self.gatt_server_callback)
116        desc_write = gatt_event['desc_write_req']['evt'].format(
117            self.gatt_server_callback)
118        char_read = gatt_event['char_read_req']['evt'].format(
119            self.gatt_server_callback)
120        char_write_req = gatt_event['char_write_req']['evt'].format(
121            self.gatt_server_callback)
122        char_write = gatt_event['char_write']['evt'].format(
123            self.gatt_server_callback)
124        execute_write = gatt_event['exec_write']['evt'].format(
125            self.gatt_server_callback)
126        regex = "({}|{}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
127                                             char_write, execute_write,
128                                             char_write_req)
129        events = self.dut.ed.pop_events(regex, 5, small_timeout)
130        status = 0
131        if user_input:
132            status = gatt_server_responses.get(user_input)
133        for event in events:
134            self.log.debug("Found event: {}.".format(event))
135            request_id = event['data']['requestId']
136            if event['name'] == execute_write:
137                if ('execute' in event['data']
138                        and event['data']['execute'] == True):
139                    for key in self.write_mapping:
140                        value = self.write_mapping[key]
141                        self.log.info("Writing key, value: {}, {}".format(
142                            key, value))
143                        self.dut.droid.gattServerSetByteArrayValueByInstanceId(
144                            key, value)
145                else:
146                    self.log.info("Execute result is false")
147                self.write_mapping = {}
148                self.dut.droid.gattServerSendResponse(
149                    self.gatt_server, 0, request_id, status, 0, [])
150                continue
151            offset = event['data']['offset']
152            instance_id = event['data']['instanceId']
153            if (event['name'] == desc_write or event['name'] == char_write
154                    or event['name'] == char_write_req):
155                if ('preparedWrite' in event['data']
156                        and event['data']['preparedWrite'] == True):
157                    value = event['data']['value']
158                    if instance_id in self.write_mapping.keys():
159                        self.write_mapping[
160                            instance_id] = self.write_mapping[instance_id] + value
161                        self.log.info(
162                            "New Prepared Write Value for {}: {}".format(
163                                instance_id, self.write_mapping[instance_id]))
164                    else:
165                        self.log.info("write mapping key, value {}, {}".format(
166                            instance_id, value))
167                        self.write_mapping[instance_id] = value
168                        self.log.info("current value {}, {}".format(
169                            instance_id, value))
170                    self.dut.droid.gattServerSendResponse(
171                        self.gatt_server, 0, request_id, status, 0, value)
172                    continue
173                else:
174                    self.dut.droid.gattServerSetByteArrayValueByInstanceId(
175                        event['data']['instanceId'], event['data']['value'])
176
177            try:
178                data = self.dut.droid.gattServerGetReadValueByInstanceId(
179                    int(event['data']['instanceId']))
180            except Exception as err:
181                self.log.error(err)
182            if not data:
183                data = [1]
184            self.log.info(
185                "GATT Server Send Response [request_id, status, offset, data]" \
186                " [{}, {}, {}, {}]".
187                format(request_id, status, offset, data))
188            data = data[offset:offset + mtu - 1]
189            self.dut.droid.gattServerSendResponse(
190                self.gatt_server, 0, request_id, status, offset, data)
191
192    def _setup_service(self, serv):
193        service = self.dut.droid.gattServerCreateService(
194            serv['uuid'], serv['type'])
195        if 'handles' in serv:
196            self.dut.droid.gattServerServiceSetHandlesToReserve(
197                service, serv['handles'])
198        return service
199
200    def _setup_characteristic(self, char):
201        characteristic = \
202            self.dut.droid.gattServerCreateBluetoothGattCharacteristic(
203                char['uuid'], char['properties'], char['permissions'])
204        if 'instance_id' in char:
205            self.dut.droid.gattServerCharacteristicSetInstanceId(
206                characteristic, char['instance_id'])
207            set_id = self.dut.droid.gattServerCharacteristicGetInstanceId(
208                characteristic)
209            if set_id != char['instance_id']:
210                self.log.error(
211                    "Instance ID did not match up. Found {} Expected {}".
212                    format(set_id, char['instance_id']))
213        if 'value_type' in char:
214            value_type = char['value_type']
215            value = char['value']
216            if value_type == gatt_characteristic_value_format['string']:
217                self.log.info("Set String value result: {}".format(
218                    self.dut.droid.gattServerCharacteristicSetStringValue(
219                        characteristic, value)))
220            elif value_type == gatt_characteristic_value_format['byte']:
221                self.log.info("Set Byte Array value result: {}".format(
222                    self.dut.droid.gattServerCharacteristicSetByteValue(
223                        characteristic, value)))
224            else:
225                self.log.info("Set Int value result: {}".format(
226                    self.dut.droid.gattServerCharacteristicSetIntValue(
227                        characteristic, value, value_type, char['offset'])))
228        return characteristic
229
230    def _setup_descriptor(self, desc):
231        descriptor = self.dut.droid.gattServerCreateBluetoothGattDescriptor(
232            desc['uuid'], desc['permissions'])
233        if 'value' in desc:
234            self.dut.droid.gattServerDescriptorSetByteValue(
235                descriptor, desc['value'])
236        if 'instance_id' in desc:
237            self.dut.droid.gattServerDescriptorSetInstanceId(
238                descriptor, desc['instance_id'])
239        self.descriptor_list.append(descriptor)
240        return descriptor
241
242    def setup_gatts_db(self, database):
243        """Setup GATT Server database"""
244        self.gatt_server_callback = \
245            self.dut.droid.gattServerCreateGattServerCallback()
246        self.gatt_server = self.dut.droid.gattServerOpenGattServer(
247            self.gatt_server_callback)
248        self.gatt_server_list.append(self.gatt_server)
249        for serv in database['services']:
250            service = self._setup_service(serv)
251            self.service_list.append(service)
252            if 'characteristics' in serv:
253                for char in serv['characteristics']:
254                    characteristic = self._setup_characteristic(char)
255                    if 'descriptors' in char:
256                        for desc in char['descriptors']:
257                            descriptor = self._setup_descriptor(desc)
258                            self.dut.droid.gattServerCharacteristicAddDescriptor(
259                                characteristic, descriptor)
260                    self.characteristic_list.append(characteristic)
261                    self.dut.droid.gattServerAddCharacteristicToService(
262                        service, characteristic)
263            self.dut.droid.gattServerAddService(self.gatt_server, service)
264            expected_event = gatt_cb_strings['serv_added'].format(
265                self.gatt_server_callback)
266            self.dut.ed.pop_event(expected_event, 10)
267        return self.gatt_server, self.gatt_server_callback
268
269    def send_continuous_response(self, user_input):
270        """Send the same response"""
271        desc_read = gatt_event['desc_read_req']['evt'].format(
272            self.gatt_server_callback)
273        desc_write = gatt_event['desc_write_req']['evt'].format(
274            self.gatt_server_callback)
275        char_read = gatt_event['char_read_req']['evt'].format(
276            self.gatt_server_callback)
277        char_write = gatt_event['char_write']['evt'].format(
278            self.gatt_server_callback)
279        execute_write = gatt_event['exec_write']['evt'].format(
280            self.gatt_server_callback)
281        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
282                                          char_write, execute_write)
283        offset = 0
284        status = 0
285        mtu = 23
286        char_value = []
287        for i in range(512):
288            char_value.append(i % 256)
289        len_min = 470
290        end_time = time.time() + 180
291        i = 0
292        num_packets = ceil((len(char_value) + 1) / (mtu - 1))
293        while time.time() < end_time:
294            events = self.dut.ed.pop_events(regex, 10, small_timeout)
295            for event in events:
296                start_offset = i * (mtu - 1)
297                i += 1
298                self.log.debug("Found event: {}.".format(event))
299                request_id = event['data']['requestId']
300                data = char_value[start_offset:start_offset + mtu - 1]
301                if not data:
302                    data = [1]
303                self.log.debug(
304                    "GATT Server Send Response [request_id, status, offset, " \
305                    "data] [{}, {}, {}, {}]".format(request_id, status, offset,
306                        data))
307                self.dut.droid.gattServerSendResponse(
308                    self.gatt_server, 0, request_id, status, offset, data)
309
310    def send_continuous_response_data(self, user_input):
311        """Send the same response with data"""
312        desc_read = gatt_event['desc_read_req']['evt'].format(
313            self.gatt_server_callback)
314        desc_write = gatt_event['desc_write_req']['evt'].format(
315            self.gatt_server_callback)
316        char_read = gatt_event['char_read_req']['evt'].format(
317            self.gatt_server_callback)
318        char_write = gatt_event['char_write']['evt'].format(
319            self.gatt_server_callback)
320        execute_write = gatt_event['exec_write']['evt'].format(
321            self.gatt_server_callback)
322        regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read,
323                                          char_write, execute_write)
324        offset = 0
325        status = 0
326        mtu = 11
327        char_value = []
328        len_min = 470
329        end_time = time.time() + 180
330        i = 0
331        num_packets = ceil((len(char_value) + 1) / (mtu - 1))
332        while time.time() < end_time:
333            events = self.dut.ed.pop_events(regex, 10, small_timeout)
334            for event in events:
335                self.log.info(event)
336                request_id = event['data']['requestId']
337                if event['name'] == execute_write:
338                    if ('execute' in event['data']
339                            and event['data']['execute'] == True):
340                        for key in self.write_mapping:
341                            value = self.write_mapping[key]
342                            self.log.debug("Writing key, value: {}, {}".format(
343                                key, value))
344                            self.dut.droid.gattServerSetByteArrayValueByInstanceId(
345                                key, value)
346                        self.write_mapping = {}
347                    self.dut.droid.gattServerSendResponse(
348                        self.gatt_server, 0, request_id, status, 0, [1])
349                    continue
350                offset = event['data']['offset']
351                instance_id = event['data']['instanceId']
352                if (event['name'] == desc_write
353                        or event['name'] == char_write):
354                    if ('preparedWrite' in event['data']
355                            and event['data']['preparedWrite'] == True):
356                        value = event['data']['value']
357                        if instance_id in self.write_mapping:
358                            self.write_mapping[
359                                instance_id] = self.write_mapping[instance_id] + value
360                        else:
361                            self.write_mapping[instance_id] = value
362                    else:
363                        self.dut.droid.gattServerSetByteArrayValueByInstanceId(
364                            event['data']['instanceId'],
365                            event['data']['value'])
366                try:
367                    data = self.dut.droid.gattServerGetReadValueByInstanceId(
368                        int(event['data']['instanceId']))
369                except Exception as err:
370                    self.log.error(err)
371                if not data:
372                    self.dut.droid.gattServerSendResponse(
373                        self.gatt_server, 0, request_id, status, offset, [1])
374                else:
375                    self.dut.droid.gattServerSendResponse(
376                        self.gatt_server, 0, request_id, status, offset,
377                        data[offset:offset + 17])
378