1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Test script for concurrent Gatt connections. 18Testbed assumes 6 Android devices. One will be the central and the rest 19peripherals. 20""" 21 22from queue import Empty 23import concurrent.futures 24import time 25from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 26from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes 27from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes 28from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants 29from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic 30from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic_value_format 31from acts_contrib.test_utils.bt.bt_constants import gatt_char_desc_uuids 32from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor 33from acts_contrib.test_utils.bt.bt_constants import gatt_service_types 34from acts_contrib.test_utils.bt.bt_constants import scan_result 35from acts_contrib.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor 36from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection 37from acts_contrib.test_utils.bt.gatts_lib import GattServerLib 38from acts.test_decorators import test_tracker_info 39 40service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb' 41characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630' 42descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb" 43 44gatt_server_read_descriptor_sample = { 45 'services': [{ 46 'uuid': 47 service_uuid, 48 'type': 49 gatt_service_types['primary'], 50 'characteristics': [{ 51 'uuid': 52 characteristic_uuid, 53 'properties': 54 gatt_characteristic['property_write'], 55 'permissions': 56 gatt_characteristic['permission_write'], 57 'instance_id': 58 0x002a, 59 'value_type': 60 gatt_characteristic_value_format['string'], 61 'value': 62 'Test Database', 63 'descriptors': [{ 64 'uuid': descriptor_uuid, 65 'permissions': gatt_descriptor['permission_write'], 66 }] 67 }] 68 }] 69} 70 71 72class ConcurrentGattConnectTest(BluetoothBaseTest): 73 bt_default_timeout = 10 74 max_connections = 5 75 # List of tuples (android_device, advertise_callback) 76 advertise_callbacks = [] 77 # List of tuples (android_device, advertisement_name) 78 advertisement_names = [] 79 list_of_arguments_list = [] 80 81 def setup_class(self): 82 super(BluetoothBaseTest, self).setup_class() 83 self.pri_dut = self.android_devices[0] 84 85 # Create 5 advertisements from different android devices 86 for i in range(1, self.max_connections + 1): 87 # Set device name 88 ad = self.android_devices[i] 89 name = "test_adv_{}".format(i) 90 self.advertisement_names.append((ad, name)) 91 ad.droid.bluetoothSetLocalName(name) 92 93 # Setup and start advertisements 94 ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 95 ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 96 ble_advertise_settings_modes['low_latency']) 97 advertise_data = ad.droid.bleBuildAdvertiseData() 98 advertise_settings = ad.droid.bleBuildAdvertiseSettings() 99 advertise_callback = ad.droid.bleGenBleAdvertiseCallback() 100 ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 101 advertise_settings) 102 self.advertise_callbacks.append((ad, advertise_callback)) 103 104 def obtain_address_list_from_scan(self): 105 """Returns the address list of all devices that match the scan filter. 106 107 Returns: 108 A list if all devices are found; None is any devices are not found. 109 """ 110 # From central device, scan for all appropriate addresses by name. 111 filter_list = self.pri_dut.droid.bleGenFilterList() 112 self.pri_dut.droid.bleSetScanSettingsScanMode( 113 ble_scan_settings_modes['low_latency']) 114 scan_settings = self.pri_dut.droid.bleBuildScanSetting() 115 scan_callback = self.pri_dut.droid.bleGenScanCallback() 116 for android_device, name in self.advertisement_names: 117 self.pri_dut.droid.bleSetScanFilterDeviceName(name) 118 self.pri_dut.droid.bleBuildScanFilter(filter_list) 119 self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings, 120 scan_callback) 121 address_list = [] 122 devices_found = [] 123 # Set the scan time out to 20 sec to provide enough time to discover the 124 # devices in busy environment 125 scan_timeout = 20 126 end_time = time.time() + scan_timeout 127 while time.time() < end_time and len(address_list) < len( 128 self.advertisement_names): 129 try: 130 event = self.pri_dut.ed.pop_event( 131 "BleScan{}onScanResults".format(scan_callback), 132 self.bt_default_timeout) 133 134 adv_name = event['data']['Result']['deviceInfo']['name'] 135 mac_address = event['data']['Result']['deviceInfo']['address'] 136 # Look up the android device handle based on event name 137 device = [ 138 item for item in self.advertisement_names 139 if adv_name in item 140 ] 141 devices_found.append(device[0][0].serial) 142 if len(device) != 0: 143 address_list_tuple = (device[0][0], mac_address) 144 else: 145 continue 146 result = [item for item in address_list if mac_address in item] 147 # if length of result is 0, it indicates that we have discovered 148 # new mac address. 149 if len(result) == 0: 150 self.log.info("Found new mac address: {}".format( 151 address_list_tuple[1])) 152 address_list.append(address_list_tuple) 153 except Empty as err: 154 self.log.error("Failed to find any scan results.") 155 return None 156 if len(address_list) < self.max_connections: 157 self.log.info("Only found these devices: {}".format(devices_found)) 158 self.log.error("Could not find all necessary advertisements.") 159 return None 160 return address_list 161 162 @BluetoothBaseTest.bt_test_wrap 163 @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f') 164 def test_concurrent_gatt_connections(self): 165 """Test max concurrent GATT connections 166 167 Connect to all peripherals. 168 169 Steps: 170 1. Scan 171 2. Save addresses 172 3. Connect all addresses of the peripherals 173 174 Expected Result: 175 All connections successful. 176 177 Returns: 178 Pass if True 179 Fail if False 180 181 TAGS: Bluetooth, GATT 182 Priority: 2 183 """ 184 185 address_list = self.obtain_address_list_from_scan() 186 if address_list is None: 187 return False 188 189 # Connect to all addresses 190 for address_tuple in address_list: 191 address = address_tuple[1] 192 try: 193 autoconnect = False 194 bluetooth_gatt, gatt_callback = setup_gatt_connection( 195 self.pri_dut, address, autoconnect) 196 self.log.info("Successfully connected to {}".format(address)) 197 except Exception as err: 198 self.log.error( 199 "Failed to establish connection to {}".format(address)) 200 return False 201 if (len( 202 self.pri_dut.droid.bluetoothGetConnectedLeDevices( 203 bt_profile_constants['gatt_server'])) != 204 self.max_connections): 205 self.log.error("Did not reach max connection count.") 206 return False 207 208 return True 209 210 @BluetoothBaseTest.bt_test_wrap 211 @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f') 212 def test_data_transfer_to_concurrent_gatt_connections(self): 213 """Test writing GATT descriptors concurrently to many peripherals. 214 215 Connect to all peripherals and write gatt descriptors concurrently. 216 217 218 Steps: 219 1. Scan the addresses by names 220 2. Save mac addresses of the peripherals 221 3. Connect all addresses of the peripherals and write gatt descriptors 222 223 224 Expected Result: 225 All connections and data transfers are successful. 226 227 Returns: 228 Pass if True 229 Fail if False 230 231 TAGS: Bluetooth, GATT 232 Priority: 2 233 """ 234 235 address_list = self.obtain_address_list_from_scan() 236 if address_list is None: 237 return False 238 239 # Connect to all addresses 240 executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) 241 242 for address_tuple in address_list: 243 ad, address = address_tuple 244 245 gatts = GattServerLib(log=self.log, dut=ad) 246 gatt_server, gatt_server_callback = gatts.setup_gatts_db( 247 database=gatt_server_read_descriptor_sample) 248 249 try: 250 bluetooth_gatt, gatt_callback = setup_gatt_connection( 251 self.pri_dut, address, autoconnect=False) 252 self.log.info("Successfully connected to {}".format(address)) 253 254 except Exception as err: 255 self.log.error( 256 "Failed to establish connection to {}".format(address)) 257 return False 258 259 if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt): 260 event = self.pri_dut.ed.pop_event( 261 "GattConnect{}onServicesDiscovered".format(bluetooth_gatt), 262 self.bt_default_timeout) 263 discovered_services_index = event['data']['ServicesIndex'] 264 else: 265 self.log.info("Failed to discover services.") 266 return False 267 services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount( 268 discovered_services_index) 269 270 arguments_list = [ 271 self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed, 272 gatt_server, gatt_server_callback, bluetooth_gatt, 273 services_count, discovered_services_index, 100 274 ] 275 self.list_of_arguments_list.append(arguments_list) 276 277 for arguments_list in self.list_of_arguments_list: 278 executor.submit(run_continuous_write_descriptor, *arguments_list) 279 280 executor.shutdown(wait=True) 281 282 if (len( 283 self.pri_dut.droid.bluetoothGetConnectedLeDevices( 284 bt_profile_constants['gatt_server'])) != 285 self.max_connections): 286 self.log.error("Failed to write concurrently.") 287 return False 288 289 return True 290