#!/usr/bin/env python3 # # Copyright (C) 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); you may not # use this file except in compliance with the License. You may obtain a copy of # the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. """ Test script for concurrent Gatt connections. Testbed assumes 6 Android devices. One will be the central and the rest peripherals. """ from queue import Empty import concurrent.futures import threading import time from acts_contrib.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes from acts_contrib.test_utils.bt.bt_constants import ble_advertise_settings_modes from acts_contrib.test_utils.bt.bt_constants import bt_profile_constants from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic from acts_contrib.test_utils.bt.bt_constants import gatt_characteristic_value_format from acts_contrib.test_utils.bt.bt_constants import gatt_char_desc_uuids from acts_contrib.test_utils.bt.bt_constants import gatt_descriptor from acts_contrib.test_utils.bt.bt_constants import gatt_service_types from acts_contrib.test_utils.bt.bt_constants import scan_result from acts_contrib.test_utils.bt.bt_gatt_utils import run_continuous_write_descriptor from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection from acts_contrib.test_utils.bt.gatts_lib import GattServerLib from acts.test_decorators import test_tracker_info service_uuid = '0000a00a-0000-1000-8000-00805f9b34fb' characteristic_uuid = 'aa7edd5a-4d1d-4f0e-883a-d145616a1630' descriptor_uuid = "00000003-0000-1000-8000-00805f9b34fb" gatt_server_read_descriptor_sample = { 'services': [{ 'uuid': service_uuid, 'type': gatt_service_types['primary'], 'characteristics': [{ 'uuid': characteristic_uuid, 'properties': gatt_characteristic['property_write'], 'permissions': gatt_characteristic['permission_write'], 'instance_id': 0x002a, 'value_type': gatt_characteristic_value_format['string'], 'value': 'Test Database', 'descriptors': [{ 'uuid': descriptor_uuid, 'permissions': gatt_descriptor['permission_write'], }] }] }] } class ConcurrentGattConnectTest(BluetoothBaseTest): bt_default_timeout = 10 max_connections = 5 # List of tuples (android_device, advertise_callback) advertise_callbacks = [] # List of tuples (android_device, advertisement_name) advertisement_names = [] list_of_arguments_list = [] def setup_class(self): super(BluetoothBaseTest, self).setup_class() self.pri_dut = self.android_devices[0] # Create 5 advertisements from different android devices for i in range(1, self.max_connections + 1): # Set device name ad = self.android_devices[i] name = "test_adv_{}".format(i) self.advertisement_names.append((ad, name)) ad.droid.bluetoothSetLocalName(name) # Setup and start advertisements ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) advertise_data = ad.droid.bleBuildAdvertiseData() advertise_settings = ad.droid.bleBuildAdvertiseSettings() advertise_callback = ad.droid.bleGenBleAdvertiseCallback() ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) self.advertise_callbacks.append((ad, advertise_callback)) def obtain_address_list_from_scan(self): """Returns the address list of all devices that match the scan filter. Returns: A list if all devices are found; None is any devices are not found. """ # From central device, scan for all appropriate addresses by name. filter_list = self.pri_dut.droid.bleGenFilterList() self.pri_dut.droid.bleSetScanSettingsScanMode( ble_scan_settings_modes['low_latency']) scan_settings = self.pri_dut.droid.bleBuildScanSetting() scan_callback = self.pri_dut.droid.bleGenScanCallback() for android_device, name in self.advertisement_names: self.pri_dut.droid.bleSetScanFilterDeviceName(name) self.pri_dut.droid.bleBuildScanFilter(filter_list) self.pri_dut.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) address_list = [] devices_found = [] # Set the scan time out to 20 sec to provide enough time to discover the # devices in busy environment scan_timeout = 20 end_time = time.time() + scan_timeout while time.time() < end_time and len(address_list) < len( self.advertisement_names): try: event = self.pri_dut.ed.pop_event( "BleScan{}onScanResults".format(scan_callback), self.bt_default_timeout) adv_name = event['data']['Result']['deviceInfo']['name'] mac_address = event['data']['Result']['deviceInfo']['address'] # Look up the android device handle based on event name device = [ item for item in self.advertisement_names if adv_name in item ] devices_found.append(device[0][0].serial) if len(device) is not 0: address_list_tuple = (device[0][0], mac_address) else: continue result = [item for item in address_list if mac_address in item] # if length of result is 0, it indicates that we have discovered # new mac address. if len(result) is 0: self.log.info("Found new mac address: {}".format( address_list_tuple[1])) address_list.append(address_list_tuple) except Empty as err: self.log.error("Failed to find any scan results.") return None if len(address_list) < self.max_connections: self.log.info("Only found these devices: {}".format(devices_found)) self.log.error("Could not find all necessary advertisements.") return None return address_list @BluetoothBaseTest.bt_test_wrap @test_tracker_info(uuid='6638282c-69b5-4237-9f0d-18e131424a9f') def test_concurrent_gatt_connections(self): """Test max concurrent GATT connections Connect to all peripherals. Steps: 1. Scan 2. Save addresses 3. Connect all addresses of the peripherals Expected Result: All connections successful. Returns: Pass if True Fail if False TAGS: Bluetooth, GATT Priority: 2 """ address_list = self.obtain_address_list_from_scan() if address_list is None: return False # Connect to all addresses for address_tuple in address_list: address = address_tuple[1] try: autoconnect = False bluetooth_gatt, gatt_callback = setup_gatt_connection( self.pri_dut, address, autoconnect) self.log.info("Successfully connected to {}".format(address)) except Exception as err: self.log.error( "Failed to establish connection to {}".format(address)) return False if (len( self.pri_dut.droid.bluetoothGetConnectedLeDevices( bt_profile_constants['gatt_server'])) != self.max_connections): self.log.error("Did not reach max connection count.") return False return True @BluetoothBaseTest.bt_test_wrap @test_tracker_info(uuid='660bf05e-a8e5-45f3-b42b-b66b4ac0d85f') def test_data_transfer_to_concurrent_gatt_connections(self): """Test writing GATT descriptors concurrently to many peripherals. Connect to all peripherals and write gatt descriptors concurrently. Steps: 1. Scan the addresses by names 2. Save mac addresses of the peripherals 3. Connect all addresses of the peripherals and write gatt descriptors Expected Result: All connections and data transfers are successful. Returns: Pass if True Fail if False TAGS: Bluetooth, GATT Priority: 2 """ address_list = self.obtain_address_list_from_scan() if address_list is None: return False # Connect to all addresses executor = concurrent.futures.ThreadPoolExecutor(max_workers=10) for address_tuple in address_list: ad, address = address_tuple gatts = GattServerLib(log=self.log, dut=ad) gatt_server, gatt_server_callback = gatts.setup_gatts_db( database=gatt_server_read_descriptor_sample) try: bluetooth_gatt, gatt_callback = setup_gatt_connection( self.pri_dut, address, autoconnect=False) self.log.info("Successfully connected to {}".format(address)) except Exception as err: self.log.error( "Failed to establish connection to {}".format(address)) return False if self.pri_dut.droid.gattClientDiscoverServices(bluetooth_gatt): event = self.pri_dut.ed.pop_event( "GattConnect{}onServicesDiscovered".format(bluetooth_gatt), self.bt_default_timeout) discovered_services_index = event['data']['ServicesIndex'] else: self.log.info("Failed to discover services.") return False services_count = self.pri_dut.droid.gattClientGetDiscoveredServicesCount( discovered_services_index) arguments_list = [ self.pri_dut.droid, self.pri_dut.ed, ad.droid, ad.ed, gatt_server, gatt_server_callback, bluetooth_gatt, services_count, discovered_services_index, 100 ] self.list_of_arguments_list.append(arguments_list) for arguments_list in self.list_of_arguments_list: executor.submit(run_continuous_write_descriptor, *arguments_list) executor.shutdown(wait=True) if (len( self.pri_dut.droid.bluetoothGetConnectedLeDevices( bt_profile_constants['gatt_server'])) != self.max_connections): self.log.error("Failed to write concurrently.") return False return True