• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2018 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Test script for concurrent Gatt connections.
18Testbed assumes 6 Android devices. One will be the central and the rest
19peripherals.
20"""
21
22from queue import Empty
23import concurrent.futures
24import threading
25import time
26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest
27from acts.test_utils.bt.bt_constants import ble_scan_settings_modes
28from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes
29from acts.test_utils.bt.bt_constants import bt_profile_constants
30from acts.test_utils.bt.bt_constants import gatt_characteristic
31from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format
32from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids
33from acts.test_utils.bt.bt_constants import gatt_descriptor
34from acts.test_utils.bt.bt_constants import gatt_service_types
35from acts.test_utils.bt.bt_constants import scan_result
36from acts.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor
37from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection
38from acts.test_utils.bt.gatts_lib import GattServerLib
39from acts.test_decorators import test_tracker_info
40
41service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb'
42characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630'
43descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb"
44
45gatt_server_read_descriptor_sample = {
46    'services': [{
47        'uuid':
48        service_uuid,
49        'type':
50        gatt_service_types['primary'],
51        'characteristics': [{
52            'uuid':
53            characteristic_uuid,
54            'properties':
55            gatt_characteristic['property_write'],
56            'permissions':
57            gatt_characteristic['permission_write'],
58            'instance_id':
59            0x002a,
60            'value_type':
61            gatt_characteristic_value_format['string'],
62            'value':
63            'Test Database',
64            'descriptors': [{
65                'uuid': descriptor_uuid,
66                'permissions': gatt_descriptor['permission_write'],
67            }]
68        }]
69    }]
70}
71
72
73class ConcurrentGattConnectTest(BluetoothBaseTest):
74    bt_default_timeout = 10
75    max_connections = 5
76    # List of tuples (android_device, advertise_callback)
77    advertise_callbacks = []
78    # List of tuples (android_device, advertisement_name)
79    advertisement_names = []
80    list_of_arguments_list = []
81
82    def __init__(self, controllers):
83        BluetoothBaseTest.__init__(self, controllers)
84        self.pri_dut = self.android_devices[0]
85
86    def setup_class(self):
87        super(BluetoothBaseTest, self).setup_class()
88
89        # Create 5 advertisements from different android devices
90        for i in range(1, self.max_connections + 1):
91            # Set device name
92            ad = self.android_devices[i]
93            name = "test_adv_{}".format(i)
94            self.advertisement_names.append((ad, name))
95            ad.droid.bluetoothSetLocalName(name)
96
97            # Setup and start advertisements
98            ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
99            ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
100                ble_advertise_settings_modes['low_latency'])
101            advertise_data = ad.droid.bleBuildAdvertiseData()
102            advertise_settings = ad.droid.bleBuildAdvertiseSettings()
103            advertise_callback = ad.droid.bleGenBleAdvertiseCallback()
104            ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
105                                            advertise_settings)
106            self.advertise_callbacks.append((ad, advertise_callback))
107
108    def obtain_address_list_from_scan(self):
109        """Returns the address list of all devices that match the scan filter.
110
111        Returns:
112          A list if all devices are found; None is any devices are not found.
113        """
114        # From central device, scan for all appropriate addresses by name.
115        filter_list = self.pri_dut.droid.bleGenFilterList()
116        self.pri_dut.droid.bleSetScanSettingsScanMode(
117            ble_scan_settings_modes['low_latency'])
118        scan_settings = self.pri_dut.droid.bleBuildScanSetting()
119        scan_callback = self.pri_dut.droid.bleGenScanCallback()
120        for android_device, name in self.advertisement_names:
121            self.pri_dut.droid.bleSetScanFilterDeviceName(name)
122            self.pri_dut.droid.bleBuildScanFilter(filter_list)
123        self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings,
124                                           scan_callback)
125        address_list = []
126        devices_found = []
127        # Set the scan time out to 20 sec to provide enough time to discover the
128        # devices in busy environment
129        scan_timeout = 20
130        end_time = time.time() + scan_timeout
131        while time.time() < end_time and len(address_list) < len(
132                self.advertisement_names):
133            try:
134                event = self.pri_dut.ed.pop_event(
135                    "BleScan{}onScanResults".format(scan_callback),
136                    self.bt_default_timeout)
137
138                adv_name = event['data']['Result']['deviceInfo']['name']
139                mac_address = event['data']['Result']['deviceInfo']['address']
140                # Look up the android device handle based on event name
141                device = [
142                    item for item in self.advertisement_names
143                    if adv_name in item
144                ]
145                devices_found.append(device[0][0].serial)
146                if len(device) is not 0:
147                    address_list_tuple = (device[0][0], mac_address)
148                else:
149                    continue
150                result = [item for item in address_list if mac_address in item]
151                # if length of result is 0, it indicates that we have discovered
152                # new mac address.
153                if len(result) is 0:
154                    self.log.info("Found new mac address: {}".format(
155                        address_list_tuple[1]))
156                    address_list.append(address_list_tuple)
157            except Empty as err:
158                self.log.error("Failed to find any scan results.")
159                return None
160        if len(address_list) < self.max_connections:
161            self.log.info("Only found these devices: {}".format(devices_found))
162            self.log.error("Could not find all necessary advertisements.")
163            return None
164        return address_list
165
166    @BluetoothBaseTest.bt_test_wrap
167    @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f')
168    def test_concurrent_gatt_connections(self):
169        """Test max concurrent GATT connections
170
171        Connect to all peripherals.
172
173        Steps:
174        1. Scan
175        2. Save addresses
176        3. Connect all addresses of the peripherals
177
178        Expected Result:
179        All connections successful.
180
181        Returns:
182          Pass if True
183          Fail if False
184
185        TAGS: Bluetooth, GATT
186        Priority: 2
187        """
188
189        address_list = self.obtain_address_list_from_scan()
190        if address_list is None:
191            return False
192
193        # Connect to all addresses
194        for address_tuple in address_list:
195            address = address_tuple[1]
196            try:
197                autoconnect = False
198                bluetooth_gatt, gatt_callback = setup_gatt_connection(
199                    self.pri_dut, address, autoconnect)
200                self.log.info("Successfully connected to {}".format(address))
201            except Exception as err:
202                self.log.error(
203                    "Failed to establish connection to {}".format(address))
204                return False
205        if (len(
206                self.pri_dut.droid.bluetoothGetConnectedLeDevices(
207                    bt_profile_constants['gatt_server'])) !=
208                self.max_connections):
209            self.log.error("Did not reach max connection count.")
210            return False
211
212        return True
213
214    @BluetoothBaseTest.bt_test_wrap
215    @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f')
216    def test_data_transfer_to_concurrent_gatt_connections(self):
217        """Test writing GATT descriptors concurrently to many peripherals.
218
219        Connect to all peripherals and write gatt descriptors concurrently.
220
221
222        Steps:
223        1. Scan the addresses by names
224        2. Save mac addresses of the peripherals
225        3. Connect all addresses of the peripherals and write gatt descriptors
226
227
228        Expected Result:
229        All connections and data transfers are successful.
230
231        Returns:
232          Pass if True
233          Fail if False
234
235        TAGS: Bluetooth, GATT
236        Priority: 2
237        """
238
239        address_list = self.obtain_address_list_from_scan()
240        if address_list is None:
241            return False
242
243        # Connect to all addresses
244        executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
245
246        for address_tuple in address_list:
247            ad, address = address_tuple
248
249            gatts = GattServerLib(log=self.log, dut=ad)
250            gatt_server, gatt_server_callback = gatts.setup_gatts_db(
251                database=gatt_server_read_descriptor_sample)
252
253            try:
254                bluetooth_gatt, gatt_callback = setup_gatt_connection(
255                    self.pri_dut, address, autoconnect=False)
256                self.log.info("Successfully connected to {}".format(address))
257
258            except Exception as err:
259                self.log.error(
260                    "Failed to establish connection to {}".format(address))
261                return False
262
263            if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt):
264                event = self.pri_dut.ed.pop_event(
265                    "GattConnect{}onServicesDiscovered".format(bluetooth_gatt),
266                    self.bt_default_timeout)
267                discovered_services_index = event['data']['ServicesIndex']
268            else:
269                self.log.info("Failed to discover services.")
270                return False
271            services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount(
272                discovered_services_index)
273
274            arguments_list = [
275                self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed,
276                gatt_server, gatt_server_callback, bluetooth_gatt,
277                services_count, discovered_services_index, 100
278            ]
279            self.list_of_arguments_list.append(arguments_list)
280
281        for arguments_list in self.list_of_arguments_list:
282            executor.submit(run_continuous_write_descriptor, *arguments_list)
283
284        executor.shutdown(wait=True)
285
286        if (len(
287                self.pri_dut.droid.bluetoothGetConnectedLeDevices(
288                    bt_profile_constants['gatt_server'])) !=
289                self.max_connections):
290            self.log.error("Failed to write concurrently.")
291            return False
292
293        return True
294