• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script for concurrent Gatt connections.
18Testbed assumes 6 Android devices. One will be the central and the rest
19peripherals.
20"""
21
22from queue import Empty
23import concurrent.futures
24import time
25from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
26from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes
27from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes
28from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants
29from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic
30from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic_value_format
31from acts_contrib.test_utils.bt.bt_constants import gatt_char_desc_uuids
32from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor
33from acts_contrib.test_utils.bt.bt_constants import gatt_service_types
34from acts_contrib.test_utils.bt.bt_constants import scan_result
35from acts_contrib.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor
36from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection
37from acts_contrib.test_utils.bt.gatts_lib import GattServerLib
38from acts.test_decorators import test_tracker_info
39
40service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb'
41characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630'
42descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb"
43
44gatt_server_read_descriptor_sample = {
45    'services': [{
46        'uuid':
47        service_uuid,
48        'type':
49        gatt_service_types['primary'],
50        'characteristics': [{
51            'uuid':
52            characteristic_uuid,
53            'properties':
54            gatt_characteristic['property_write'],
55            'permissions':
56            gatt_characteristic['permission_write'],
57            'instance_id':
58            0x002a,
59            'value_type':
60            gatt_characteristic_value_format['string'],
61            'value':
62            'Test Database',
63            'descriptors': [{
64                'uuid': descriptor_uuid,
65                'permissions': gatt_descriptor['permission_write'],
66            }]
67        }]
68    }]
69}
70
71
72class ConcurrentGattConnectTest(BluetoothBaseTest):
73    bt_default_timeout = 10
74    max_connections = 5
75    # List of tuples (android_device, advertise_callback)
76    advertise_callbacks = []
77    # List of tuples (android_device, advertisement_name)
78    advertisement_names = []
79    list_of_arguments_list = []
80
81    def setup_class(self):
82        super(BluetoothBaseTest, self).setup_class()
83        self.pri_dut = self.android_devices[0]
84
85        # Create 5 advertisements from different android devices
86        for i in range(1, self.max_connections + 1):
87            # Set device name
88            ad = self.android_devices[i]
89            name = "test_adv_{}".format(i)
90            self.advertisement_names.append((ad, name))
91            ad.droid.bluetoothSetLocalName(name)
92
93            # Setup and start advertisements
94            ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
95            ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
96                ble_advertise_settings_modes['low_latency'])
97            advertise_data = ad.droid.bleBuildAdvertiseData()
98            advertise_settings = ad.droid.bleBuildAdvertiseSettings()
99            advertise_callback = ad.droid.bleGenBleAdvertiseCallback()
100            ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
101                                            advertise_settings)
102            self.advertise_callbacks.append((ad, advertise_callback))
103
104    def obtain_address_list_from_scan(self):
105        """Returns the address list of all devices that match the scan filter.
106
107        Returns:
108          A list if all devices are found; None is any devices are not found.
109        """
110        # From central device, scan for all appropriate addresses by name.
111        filter_list = self.pri_dut.droid.bleGenFilterList()
112        self.pri_dut.droid.bleSetScanSettingsScanMode(
113            ble_scan_settings_modes['low_latency'])
114        scan_settings = self.pri_dut.droid.bleBuildScanSetting()
115        scan_callback = self.pri_dut.droid.bleGenScanCallback()
116        for android_device, name in self.advertisement_names:
117            self.pri_dut.droid.bleSetScanFilterDeviceName(name)
118            self.pri_dut.droid.bleBuildScanFilter(filter_list)
119        self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings,
120                                           scan_callback)
121        address_list = []
122        devices_found = []
123        # Set the scan time out to 20 sec to provide enough time to discover the
124        # devices in busy environment
125        scan_timeout = 20
126        end_time = time.time() + scan_timeout
127        while time.time() < end_time and len(address_list) < len(
128                self.advertisement_names):
129            try:
130                event = self.pri_dut.ed.pop_event(
131                    "BleScan{}onScanResults".format(scan_callback),
132                    self.bt_default_timeout)
133
134                adv_name = event['data']['Result']['deviceInfo']['name']
135                mac_address = event['data']['Result']['deviceInfo']['address']
136                # Look up the android device handle based on event name
137                device = [
138                    item for item in self.advertisement_names
139                    if adv_name in item
140                ]
141                devices_found.append(device[0][0].serial)
142                if len(device) != 0:
143                    address_list_tuple = (device[0][0], mac_address)
144                else:
145                    continue
146                result = [item for item in address_list if mac_address in item]
147                # if length of result is 0, it indicates that we have discovered
148                # new mac address.
149                if len(result) == 0:
150                    self.log.info("Found new mac address: {}".format(
151                        address_list_tuple[1]))
152                    address_list.append(address_list_tuple)
153            except Empty as err:
154                self.log.error("Failed to find any scan results.")
155                return None
156        if len(address_list) < self.max_connections:
157            self.log.info("Only found these devices: {}".format(devices_found))
158            self.log.error("Could not find all necessary advertisements.")
159            return None
160        return address_list
161
162    @BluetoothBaseTest.bt_test_wrap
163    @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f')
164    def test_concurrent_gatt_connections(self):
165        """Test max concurrent GATT connections
166
167        Connect to all peripherals.
168
169        Steps:
170        1. Scan
171        2. Save addresses
172        3. Connect all addresses of the peripherals
173
174        Expected Result:
175        All connections successful.
176
177        Returns:
178          Pass if True
179          Fail if False
180
181        TAGS: Bluetooth, GATT
182        Priority: 2
183        """
184
185        address_list = self.obtain_address_list_from_scan()
186        if address_list is None:
187            return False
188
189        # Connect to all addresses
190        for address_tuple in address_list:
191            address = address_tuple[1]
192            try:
193                autoconnect = False
194                bluetooth_gatt, gatt_callback = setup_gatt_connection(
195                    self.pri_dut, address, autoconnect)
196                self.log.info("Successfully connected to {}".format(address))
197            except Exception as err:
198                self.log.error(
199                    "Failed to establish connection to {}".format(address))
200                return False
201        if (len(
202                self.pri_dut.droid.bluetoothGetConnectedLeDevices(
203                    bt_profile_constants['gatt_server'])) !=
204                self.max_connections):
205            self.log.error("Did not reach max connection count.")
206            return False
207
208        return True
209
210    @BluetoothBaseTest.bt_test_wrap
211    @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f')
212    def test_data_transfer_to_concurrent_gatt_connections(self):
213        """Test writing GATT descriptors concurrently to many peripherals.
214
215        Connect to all peripherals and write gatt descriptors concurrently.
216
217
218        Steps:
219        1. Scan the addresses by names
220        2. Save mac addresses of the peripherals
221        3. Connect all addresses of the peripherals and write gatt descriptors
222
223
224        Expected Result:
225        All connections and data transfers are successful.
226
227        Returns:
228          Pass if True
229          Fail if False
230
231        TAGS: Bluetooth, GATT
232        Priority: 2
233        """
234
235        address_list = self.obtain_address_list_from_scan()
236        if address_list is None:
237            return False
238
239        # Connect to all addresses
240        executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
241
242        for address_tuple in address_list:
243            ad, address = address_tuple
244
245            gatts = GattServerLib(log=self.log, dut=ad)
246            gatt_server, gatt_server_callback = gatts.setup_gatts_db(
247                database=gatt_server_read_descriptor_sample)
248
249            try:
250                bluetooth_gatt, gatt_callback = setup_gatt_connection(
251                    self.pri_dut, address, autoconnect=False)
252                self.log.info("Successfully connected to {}".format(address))
253
254            except Exception as err:
255                self.log.error(
256                    "Failed to establish connection to {}".format(address))
257                return False
258
259            if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt):
260                event = self.pri_dut.ed.pop_event(
261                    "GattConnect{}onServicesDiscovered".format(bluetooth_gatt),
262                    self.bt_default_timeout)
263                discovered_services_index = event['data']['ServicesIndex']
264            else:
265                self.log.info("Failed to discover services.")
266                return False
267            services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount(
268                discovered_services_index)
269
270            arguments_list = [
271                self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed,
272                gatt_server, gatt_server_callback, bluetooth_gatt,
273                services_count, discovered_services_index, 100
274            ]
275            self.list_of_arguments_list.append(arguments_list)
276
277        for arguments_list in self.list_of_arguments_list:
278            executor.submit(run_continuous_write_descriptor, *arguments_list)
279
280        executor.shutdown(wait=True)
281
282        if (len(
283                self.pri_dut.droid.bluetoothGetConnectedLeDevices(
284                    bt_profile_constants['gatt_server'])) !=
285                self.max_connections):
286            self.log.error("Failed to write concurrently.")
287            return False
288
289        return True
290