1#!/usr/bin/env python3 2# 3# Copyright (C) 2018 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Test script for concurrent Gatt connections. 18Testbed assumes 6 Android devices. One will be the central and the rest 19peripherals. 20""" 21 22from queue import Empty 23import concurrent.futures 24import threading 25import time 26from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest 27from acts.test_utils.bt.bt_constants import ble_scan_settings_modes 28from acts.test_utils.bt.bt_constants import ble_advertise_settings_modes 29from acts.test_utils.bt.bt_constants import bt_profile_constants 30from acts.test_utils.bt.bt_constants import gatt_characteristic 31from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format 32from acts.test_utils.bt.bt_constants import gatt_char_desc_uuids 33from acts.test_utils.bt.bt_constants import gatt_descriptor 34from acts.test_utils.bt.bt_constants import gatt_service_types 35from acts.test_utils.bt.bt_constants import scan_result 36from acts.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor 37from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection 38from acts.test_utils.bt.gatts_lib import GattServerLib 39from acts.test_decorators import test_tracker_info 40 41service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb' 42characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630' 43descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb" 44 45gatt_server_read_descriptor_sample = { 46 'services': [{ 47 'uuid': 48 service_uuid, 49 'type': 50 gatt_service_types['primary'], 51 'characteristics': [{ 52 'uuid': 53 characteristic_uuid, 54 'properties': 55 gatt_characteristic['property_write'], 56 'permissions': 57 gatt_characteristic['permission_write'], 58 'instance_id': 59 0x002a, 60 'value_type': 61 gatt_characteristic_value_format['string'], 62 'value': 63 'Test Database', 64 'descriptors': [{ 65 'uuid': descriptor_uuid, 66 'permissions': gatt_descriptor['permission_write'], 67 }] 68 }] 69 }] 70} 71 72 73class ConcurrentGattConnectTest(BluetoothBaseTest): 74 bt_default_timeout = 10 75 max_connections = 5 76 # List of tuples (android_device, advertise_callback) 77 advertise_callbacks = [] 78 # List of tuples (android_device, advertisement_name) 79 advertisement_names = [] 80 list_of_arguments_list = [] 81 82 def __init__(self, controllers): 83 BluetoothBaseTest.__init__(self, controllers) 84 self.pri_dut = self.android_devices[0] 85 86 def setup_class(self): 87 super(BluetoothBaseTest, self).setup_class() 88 89 # Create 5 advertisements from different android devices 90 for i in range(1, self.max_connections + 1): 91 # Set device name 92 ad = self.android_devices[i] 93 name = "test_adv_{}".format(i) 94 self.advertisement_names.append((ad, name)) 95 ad.droid.bluetoothSetLocalName(name) 96 97 # Setup and start advertisements 98 ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) 99 ad.droid.bleSetAdvertiseSettingsAdvertiseMode( 100 ble_advertise_settings_modes['low_latency']) 101 advertise_data = ad.droid.bleBuildAdvertiseData() 102 advertise_settings = ad.droid.bleBuildAdvertiseSettings() 103 advertise_callback = ad.droid.bleGenBleAdvertiseCallback() 104 ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, 105 advertise_settings) 106 self.advertise_callbacks.append((ad, advertise_callback)) 107 108 def obtain_address_list_from_scan(self): 109 """Returns the address list of all devices that match the scan filter. 110 111 Returns: 112 A list if all devices are found; None is any devices are not found. 113 """ 114 # From central device, scan for all appropriate addresses by name. 115 filter_list = self.pri_dut.droid.bleGenFilterList() 116 self.pri_dut.droid.bleSetScanSettingsScanMode( 117 ble_scan_settings_modes['low_latency']) 118 scan_settings = self.pri_dut.droid.bleBuildScanSetting() 119 scan_callback = self.pri_dut.droid.bleGenScanCallback() 120 for android_device, name in self.advertisement_names: 121 self.pri_dut.droid.bleSetScanFilterDeviceName(name) 122 self.pri_dut.droid.bleBuildScanFilter(filter_list) 123 self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings, 124 scan_callback) 125 address_list = [] 126 devices_found = [] 127 # Set the scan time out to 20 sec to provide enough time to discover the 128 # devices in busy environment 129 scan_timeout = 20 130 end_time = time.time() + scan_timeout 131 while time.time() < end_time and len(address_list) < len( 132 self.advertisement_names): 133 try: 134 event = self.pri_dut.ed.pop_event( 135 "BleScan{}onScanResults".format(scan_callback), 136 self.bt_default_timeout) 137 138 adv_name = event['data']['Result']['deviceInfo']['name'] 139 mac_address = event['data']['Result']['deviceInfo']['address'] 140 # Look up the android device handle based on event name 141 device = [ 142 item for item in self.advertisement_names 143 if adv_name in item 144 ] 145 devices_found.append(device[0][0].serial) 146 if len(device) is not 0: 147 address_list_tuple = (device[0][0], mac_address) 148 else: 149 continue 150 result = [item for item in address_list if mac_address in item] 151 # if length of result is 0, it indicates that we have discovered 152 # new mac address. 153 if len(result) is 0: 154 self.log.info("Found new mac address: {}".format( 155 address_list_tuple[1])) 156 address_list.append(address_list_tuple) 157 except Empty as err: 158 self.log.error("Failed to find any scan results.") 159 return None 160 if len(address_list) < self.max_connections: 161 self.log.info("Only found these devices: {}".format(devices_found)) 162 self.log.error("Could not find all necessary advertisements.") 163 return None 164 return address_list 165 166 @BluetoothBaseTest.bt_test_wrap 167 @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f') 168 def test_concurrent_gatt_connections(self): 169 """Test max concurrent GATT connections 170 171 Connect to all peripherals. 172 173 Steps: 174 1. Scan 175 2. Save addresses 176 3. Connect all addresses of the peripherals 177 178 Expected Result: 179 All connections successful. 180 181 Returns: 182 Pass if True 183 Fail if False 184 185 TAGS: Bluetooth, GATT 186 Priority: 2 187 """ 188 189 address_list = self.obtain_address_list_from_scan() 190 if address_list is None: 191 return False 192 193 # Connect to all addresses 194 for address_tuple in address_list: 195 address = address_tuple[1] 196 try: 197 autoconnect = False 198 bluetooth_gatt, gatt_callback = setup_gatt_connection( 199 self.pri_dut, address, autoconnect) 200 self.log.info("Successfully connected to {}".format(address)) 201 except Exception as err: 202 self.log.error( 203 "Failed to establish connection to {}".format(address)) 204 return False 205 if (len( 206 self.pri_dut.droid.bluetoothGetConnectedLeDevices( 207 bt_profile_constants['gatt_server'])) != 208 self.max_connections): 209 self.log.error("Did not reach max connection count.") 210 return False 211 212 return True 213 214 @BluetoothBaseTest.bt_test_wrap 215 @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f') 216 def test_data_transfer_to_concurrent_gatt_connections(self): 217 """Test writing GATT descriptors concurrently to many peripherals. 218 219 Connect to all peripherals and write gatt descriptors concurrently. 220 221 222 Steps: 223 1. Scan the addresses by names 224 2. Save mac addresses of the peripherals 225 3. Connect all addresses of the peripherals and write gatt descriptors 226 227 228 Expected Result: 229 All connections and data transfers are successful. 230 231 Returns: 232 Pass if True 233 Fail if False 234 235 TAGS: Bluetooth, GATT 236 Priority: 2 237 """ 238 239 address_list = self.obtain_address_list_from_scan() 240 if address_list is None: 241 return False 242 243 # Connect to all addresses 244 executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) 245 246 for address_tuple in address_list: 247 ad, address = address_tuple 248 249 gatts = GattServerLib(log=self.log, dut=ad) 250 gatt_server, gatt_server_callback = gatts.setup_gatts_db( 251 database=gatt_server_read_descriptor_sample) 252 253 try: 254 bluetooth_gatt, gatt_callback = setup_gatt_connection( 255 self.pri_dut, address, autoconnect=False) 256 self.log.info("Successfully connected to {}".format(address)) 257 258 except Exception as err: 259 self.log.error( 260 "Failed to establish connection to {}".format(address)) 261 return False 262 263 if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt): 264 event = self.pri_dut.ed.pop_event( 265 "GattConnect{}onServicesDiscovered".format(bluetooth_gatt), 266 self.bt_default_timeout) 267 discovered_services_index = event['data']['ServicesIndex'] 268 else: 269 self.log.info("Failed to discover services.") 270 return False 271 services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount( 272 discovered_services_index) 273 274 arguments_list = [ 275 self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed, 276 gatt_server, gatt_server_callback, bluetooth_gatt, 277 services_count, discovered_services_index, 100 278 ] 279 self.list_of_arguments_list.append(arguments_list) 280 281 for arguments_list in self.list_of_arguments_list: 282 executor.submit(run_continuous_write_descriptor, *arguments_list) 283 284 executor.shutdown(wait=True) 285 286 if (len( 287 self.pri_dut.droid.bluetoothGetConnectedLeDevices( 288 bt_profile_constants['gatt_server'])) != 289 self.max_connections): 290 self.log.error("Failed to write concurrently.") 291 return False 292 293 return True 294