• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This is base class for tests that exercises different GATT procedures between two connected devices.
18Setup/Teardown methods take care of establishing connection, and doing GATT DB initialization/discovery.
19"""
20
21from queue import Empty
22
23from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
24from acts.test_utils.bt.GattEnum import GattCharacteristic
25from acts.test_utils.bt.GattEnum import GattDescriptor
26from acts.test_utils.bt.GattEnum import GattService
27from acts.test_utils.bt.GattEnum import GattEvent
28from acts.test_utils.bt.GattEnum import GattCbErr
29from acts.test_utils.bt.GattEnum import GattCbStrings
30from acts.test_utils.bt.GattEnum import MtuSize
31from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection
32from acts.test_utils.bt.bt_gatt_utils import orchestrate_gatt_connection
33from acts.test_utils.bt.bt_gatt_utils import setup_gatt_characteristics
34from acts.test_utils.bt.bt_gatt_utils import setup_gatt_descriptors
35
36
37class GattConnectedBaseTest(BluetoothBaseTest):
38    DEFAULT_TIMEOUT = 10
39
40    TEST_SERVICE_UUID = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
41    READABLE_CHAR_UUID = "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8"
42    READABLE_DESC_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
43    WRITABLE_CHAR_UUID = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
44    WRITABLE_DESC_UUID = "76d5ed92-ca81-4edb-bb6b-9f019665fb32"
45    NOTIFIABLE_CHAR_UUID = "b2c83efa-34ca-11e6-ac61-9e71128cae77"
46    # CCC == Client Characteristic Configuration
47    CCC_DESC_UUID = "00002902-0000-1000-8000-00805f9b34fb"
48
49    def __init__(self, controllers):
50        BluetoothBaseTest.__init__(self, controllers)
51        self.cen_ad = self.android_devices[0]
52        self.per_ad = self.android_devices[1]
53
54    def setup_test(self):
55        super(GattConnectedBaseTest, self).setup_test()
56
57        self.gatt_server_callback, self.gatt_server = self._setup_multiple_services(
58        )
59        if not self.gatt_server_callback or not self.gatt_server:
60            raise AssertionError('Service setup failed')
61
62        self.bluetooth_gatt, self.gatt_callback, self.adv_callback = (
63            orchestrate_gatt_connection(self.cen_ad, self.per_ad))
64        self.per_ad.droid.bleStopBleAdvertising(self.adv_callback)
65
66        self.mtu = MtuSize.MIN.value
67
68        if self.cen_ad.droid.gattClientDiscoverServices(self.bluetooth_gatt):
69            event = self._client_wait(GattEvent.GATT_SERV_DISC)
70            self.discovered_services_index = event['data']['ServicesIndex']
71        services_count = self.cen_ad.droid.gattClientGetDiscoveredServicesCount(
72            self.discovered_services_index)
73        self.test_service_index = None
74        for i in range(services_count):
75            disc_service_uuid = (
76                self.cen_ad.droid.gattClientGetDiscoveredServiceUuid(
77                    self.discovered_services_index, i).upper())
78            if disc_service_uuid == self.TEST_SERVICE_UUID:
79                self.test_service_index = i
80                break
81
82        if not self.test_service_index:
83            print("Service not found")
84            return False
85
86        connected_device_list = self.per_ad.droid.gattServerGetConnectedDevices(
87            self.gatt_server)
88        if len(connected_device_list) == 0:
89            self.log.info("No devices connected from peripheral.")
90            return False
91
92        return True
93
94    def teardown_test(self):
95        self.per_ad.droid.gattServerClearServices(self.gatt_server)
96        self.per_ad.droid.gattServerClose(self.gatt_server)
97
98        del self.gatt_server_callback
99        del self.gatt_server
100
101        self._orchestrate_gatt_disconnection(self.bluetooth_gatt,
102                                             self.gatt_callback)
103
104        return super(GattConnectedBaseTest, self).teardown_test()
105
106    def _server_wait(self, gatt_event):
107        return self._timed_pop(gatt_event, self.per_ad,
108                               self.gatt_server_callback)
109
110    def _client_wait(self, gatt_event):
111        return self._timed_pop(gatt_event, self.cen_ad, self.gatt_callback)
112
113    def _timed_pop(self, gatt_event, droid, gatt_callback):
114        expected_event = gatt_event.value["evt"].format(gatt_callback)
115        try:
116            return droid.ed.pop_event(expected_event, self.DEFAULT_TIMEOUT)
117        except Empty as emp:
118            raise AssertionError(gatt_event.value["err"].format(
119                expected_event))
120
121    def _setup_characteristics_and_descriptors(self, droid):
122        characteristic_input = [
123            {
124                'uuid': self.WRITABLE_CHAR_UUID,
125                'property': GattCharacteristic.PROPERTY_WRITE.value |
126                GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value,
127                'permission': GattCharacteristic.PERMISSION_WRITE.value
128            },
129            {
130                'uuid': self.READABLE_CHAR_UUID,
131                'property': GattCharacteristic.PROPERTY_READ.value,
132                'permission': GattCharacteristic.PERMISSION_READ.value
133            },
134            {
135                'uuid': self.NOTIFIABLE_CHAR_UUID,
136                'property': GattCharacteristic.PROPERTY_NOTIFY.value |
137                GattCharacteristic.PROPERTY_INDICATE.value,
138                'permission': GattCharacteristic.PERMISSION_READ.value
139            },
140        ]
141        descriptor_input = [
142            {
143                'uuid': self.WRITABLE_DESC_UUID,
144                'property': GattDescriptor.PERMISSION_READ.value |
145                GattCharacteristic.PERMISSION_WRITE.value,
146            },
147            {
148                'uuid': self.READABLE_DESC_UUID,
149                'property': GattDescriptor.PERMISSION_READ.value |
150                GattDescriptor.PERMISSION_WRITE.value,
151            },
152            {
153                'uuid': self.CCC_DESC_UUID,
154                'property': GattDescriptor.PERMISSION_READ.value |
155                GattDescriptor.PERMISSION_WRITE.value,
156            }
157        ]
158        characteristic_list = setup_gatt_characteristics(droid,
159                                                         characteristic_input)
160        self.notifiable_char_index = characteristic_list[2];
161        descriptor_list = setup_gatt_descriptors(droid, descriptor_input)
162        return characteristic_list, descriptor_list
163
164    def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
165        self.log.info("Disconnecting from peripheral device.")
166        test_result = disconnect_gatt_connection(self.cen_ad, bluetooth_gatt,
167                                                 gatt_callback)
168        self.cen_ad.droid.gattClientClose(bluetooth_gatt)
169        if not test_result:
170            self.log.info("Failed to disconnect from peripheral device.")
171            return False
172        return True
173
174    def _find_service_added_event(self, gatt_server_callback, uuid):
175        expected_event = GattCbStrings.SERV_ADDED.value.format(
176            gatt_server_callback)
177        try:
178            event = self.per_ad.ed.pop_event(expected_event,
179                                             self.DEFAULT_TIMEOUT)
180        except Empty:
181            self.log.error(GattCbErr.SERV_ADDED_ERR.value.format(
182                expected_event))
183            return False
184        if event['data']['serviceUuid'].lower() != uuid.lower():
185            self.log.error("Uuid mismatch. Found: {}, Expected {}.".format(
186                event['data']['serviceUuid'], uuid))
187            return False
188        return True
189
190    def _setup_multiple_services(self):
191        gatt_server_callback = (
192            self.per_ad.droid.gattServerCreateGattServerCallback())
193        gatt_server = self.per_ad.droid.gattServerOpenGattServer(
194            gatt_server_callback)
195        characteristic_list, descriptor_list = (
196            self._setup_characteristics_and_descriptors(self.per_ad.droid))
197        self.per_ad.droid.gattServerCharacteristicAddDescriptor(
198            characteristic_list[0], descriptor_list[0])
199        self.per_ad.droid.gattServerCharacteristicAddDescriptor(
200            characteristic_list[1], descriptor_list[1])
201        self.per_ad.droid.gattServerCharacteristicAddDescriptor(
202            characteristic_list[2], descriptor_list[2])
203        gatt_service3 = self.per_ad.droid.gattServerCreateService(
204            self.TEST_SERVICE_UUID, GattService.SERVICE_TYPE_PRIMARY.value)
205        for characteristic in characteristic_list:
206            self.per_ad.droid.gattServerAddCharacteristicToService(
207                gatt_service3, characteristic)
208        self.per_ad.droid.gattServerAddService(gatt_server, gatt_service3)
209        result = self._find_service_added_event(gatt_server_callback,
210                                                self.TEST_SERVICE_UUID)
211        if not result:
212            return False, False
213        return gatt_server_callback, gatt_server
214
215    def assertEqual(self, first, second, msg=None):
216        if not first == second:
217            if not msg:
218                raise AssertionError('%r != %r' % (first, second))
219            else:
220                raise AssertionError(msg + ' %r != %r' % (first, second))
221