"""BLE test utils for netsim.""" import logging import time from typing import Any from mobly import asserts from mobly import utils from mobly.controllers import android_device from mobly.snippet import callback_event # Number of seconds for the target to stay BLE advertising. ADVERTISING_TIME = 120 # Number of seconds for the target to start BLE advertising. ADVERTISING_START_TIME = 30 # The number of seconds to wait for receiving scan results. SCAN_TIMEOUT = 20 # The number of seconds to wair for connection established. CONNECTION_TIMEOUT = 60 # The number of seconds to wait before cancel connection. CANCEL_CONNECTION_WAIT_TIME = 0.1 # UUID for test service. TEST_BLE_SERVICE_UUID = '0000fe23-0000-1000-8000-00805f9b34fb' # UUID for write characteristic. TEST_WRITE_UUID = '0000e632-0000-1000-8000-00805f9b34fb' # UUID for second write characteristic. TEST_SECOND_WRITE_UUID = '0000e633-0000-1000-8000-00805f9b34fb' # UUID for read test. TEST_READ_UUID = '0000e631-0000-1000-8000-00805f9b34fb' # UUID for second read characteristic. TEST_SECOND_READ_UUID = '0000e634-0000-1000-8000-00805f9b34fb' # UUID for third read characteristic. TEST_THIRD_READ_UUID = '0000e635-0000-1000-8000-00805f9b34fb' # UUID for scan response. TEST_SCAN_RESPONSE_UUID = '0000e639-0000-1000-8000-00805f9b34fb' # Advertise settings in json format for Ble Advertise. ADVERTISE_SETTINGS = { 'AdvertiseMode': 'ADVERTISE_MODE_LOW_LATENCY', 'Timeout': ADVERTISING_TIME * 1000, 'Connectable': True, 'TxPowerLevel': 'ADVERTISE_TX_POWER_ULTRA_LOW', } # Ramdom data to represent device stored in advertise data. DATA = utils.rand_ascii_str(16) # Random data for scan response. SCAN_RESPONSE_DATA = utils.rand_ascii_str(16) # Random data for read operation. READ_DATA = utils.rand_ascii_str(8) # Random data for second read operation. SECOND_READ_DATA = utils.rand_ascii_str(8) # Random data for third read operation. THIRD_READ_DATA = utils.rand_ascii_str(8) # Random data for write operation. WRITE_DATA = utils.rand_ascii_str(8) # Random data for second write operation. SECOND_WRITE_DATA = utils.rand_ascii_str(8) # Advertise data in json format for BLE advertise. ADVERTISE_DATA = { 'IncludeDeviceName': False, 'ServiceData': [{'UUID': TEST_BLE_SERVICE_UUID, 'Data': DATA}], } # Advertise data in json format representing scan response for BLE advertise. SCAN_RESPONSE = { 'IncludeDeviceName': False, 'ServiceData': [{ 'UUID': TEST_SCAN_RESPONSE_UUID, 'Data': SCAN_RESPONSE_DATA, }], } # Scan filter in json format for BLE scan. SCAN_FILTER = {'ServiceUuid': TEST_BLE_SERVICE_UUID} # Scan settings in json format for BLE scan. SCAN_SETTINGS = {'ScanMode': 'SCAN_MODE_LOW_LATENCY'} # Characteristics for write in json format. WRITE_CHARACTERISTIC = { 'UUID': TEST_WRITE_UUID, 'Property': 'PROPERTY_WRITE', 'Permission': 'PERMISSION_WRITE', } SECOND_WRITE_CHARACTERISTIC = { 'UUID': TEST_SECOND_WRITE_UUID, 'Property': 'PROPERTY_WRITE', 'Permission': 'PERMISSION_WRITE', } # Characteristics for read in json format. READ_CHARACTERISTIC = { 'UUID': TEST_READ_UUID, 'Property': 'PROPERTY_READ', 'Permission': 'PERMISSION_READ', 'Data': READ_DATA, } SECOND_READ_CHARACTERISTIC = { 'UUID': TEST_SECOND_READ_UUID, 'Property': 'PROPERTY_READ', 'Permission': 'PERMISSION_READ', 'Data': SECOND_READ_DATA, } THIRD_READ_CHARACTERISTIC = { 'UUID': TEST_THIRD_READ_UUID, 'Property': 'PROPERTY_READ', 'Permission': 'PERMISSION_READ', 'Data': THIRD_READ_DATA, } # Service data in json format for Ble Server. SERVICE = { 'UUID': TEST_BLE_SERVICE_UUID, 'Type': 'SERVICE_TYPE_PRIMARY', 'Characteristics': [ WRITE_CHARACTERISTIC, SECOND_WRITE_CHARACTERISTIC, READ_CHARACTERISTIC, SECOND_READ_CHARACTERISTIC, THIRD_READ_CHARACTERISTIC, ], } # Macros for literal string. UUID = 'UUID' GATT_SUCCESS = 'GATT_SUCCESS' STATE = 'newState' STATUS = 'status' def IsRequiredScanResult(scan_result: callback_event.CallbackEvent) -> bool: result = scan_result.data['result'] for service in result['ScanRecord']['Services']: if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA: return True return False def Discover( scanner: android_device.AndroidDevice, advertiser: android_device.AndroidDevice, ) -> dict[str, Any]: """Logic for BLE scan and advertising. Steps: 1. Advertiser starts advertising and gets a startSuccess callback. 2. Scanner starts scanning and finds advertiser from scan results. Verifies: Advertiser is discovered within 5s by scanner. Args: scanner: AndroidDevice. The device that starts BLE scan to find target. advertiser: AndroidDevice. The device that keeps advertising so other devices acknowledge it. Returns: dict. Scan results. Raises: TimeoutError: The expected event does not occur within the time limit. """ # Retry initial command in case command is lost after triggering a reset max_attempts = 2 for attempt_num in range(max_attempts): advertiser.advertise_callback = advertiser.mbs.bleStartAdvertising( ADVERTISE_SETTINGS, ADVERTISE_DATA, SCAN_RESPONSE ) scanner.scan_callback = scanner.mbs.bleStartScan( [SCAN_FILTER], SCAN_SETTINGS ) success = False for _ in range(ADVERTISING_START_TIME): failure = advertiser.advertise_callback.getAll('onStartFailure') if failure: logging.warning( "'onStartFailure' event detected after bleStartAdvertising" ) success = advertiser.advertise_callback.getAll('onStartSuccess') if success: break time.sleep(1) else: logging.error( 'Timed out after %ss waiting for an "onStartSuccess" event ', ADVERTISING_START_TIME, ) if not success: if attempt_num < max_attempts - 1: logging.warning( "'onStartSuccess' event was not received after " 'bleStartAdvertising. Retrying... (%d)', attempt_num + 1, ) else: raise TimeoutError( f'Timed out after {max_attempts} retries of ' f'{ADVERTISING_START_TIME}s waiting for an ' '"onStartSuccess" event ' ) advertiser.log.info('BLE advertising started') time.sleep(SCAN_TIMEOUT) scan_result = scanner.scan_callback.waitForEvent( 'onScanResult', IsRequiredScanResult, SCAN_TIMEOUT ) scan_success = False scan_response_found = False result = scan_result.data['result'] scan_start_to_result_time_ms = scan_result.data['StartToResultTimeDeltaMs'] for service in result['ScanRecord']['Services']: if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA: scanner.connect_to_address = result['Device']['Address'] scan_success = True if ( service[UUID] == TEST_SCAN_RESPONSE_UUID and service['Data'] == SCAN_RESPONSE_DATA ): scan_response_found = True asserts.assert_true( scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT ) asserts.assert_true(scan_response_found, 'Scan response is not found') logging.info('Discovery metrics: %d', scan_start_to_result_time_ms) return result def StartScanning( scanner: android_device.AndroidDevice, scan_duration: int ) -> list[dict[str, Any]]: """Logic for BLE scanning for advertisers. Steps: 1. Scanner starts scanning with retries 2. Retrieves the ScanResult Verifies: Advertiser is discovered within timeout by scanner. Args: scanner: AndroidDevice. The device that starts BLE scan to find advertisers. scan_duration: Number of seconds for each scan attempt Returns: List of dicts containing Scan results. Raises: TimeoutError: The expected event does not occur within the time limit. """ # Retry initial command in case command is lost after triggering a reset max_attempts = 3 scan_success = False result = [] scan_result = None for attempt_num in range(max_attempts): scanner.scan_callback = scanner.mbs.bleStartScan() scanner.log.info('BLE scanning started') failure = scanner.scan_callback.getAll('onScanFailed') if failure: logging.warning("'onScanFailed' event detected after bleStartScan") continue success = False for _ in range(int(SCAN_TIMEOUT / scan_duration)): time.sleep(scan_duration) scan_result = scanner.scan_callback.getAll('onScanResult') if scan_result: success = True break else: logging.error( 'Timed out after %ss waiting for an "onScanResult" event ', SCAN_TIMEOUT, ) if success: break if attempt_num < max_attempts - 1: logging.warning( "'onScanResult' event was not received after " 'bleStartScan. Retrying... (%d)', attempt_num + 1, ) else: raise TimeoutError( f'Timed out after {max_attempts} retries of ' f'{SCAN_TIMEOUT}s waiting for an ' '"onScanResult" event ' ) if scan_result: scan_success = True result = [result.data['result'] for result in scan_result] asserts.assert_true( scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT ) return result def StopDiscover( scanner: android_device.AndroidDevice, advertiser: android_device.AndroidDevice, ) -> None: """Logic for stopping BLE scan and advertising. Steps: 1. Scanner stops scanning. 2. Advertiser stops advertising. Args: scanner: AndroidDevice. The device that starts BLE scan to find target. advertiser: AndroidDevice. The device that keeps advertising so other devices acknowledge it. """ scanner.mbs.bleStopScan(scanner.scan_callback.callback_id) scanner.log.info('BLE scanning stopped') advertiser.mbs.bleStopAdvertising(advertiser.advertise_callback.callback_id) advertiser.log.info('BLE advertising stopped') def StopScanning(scanner: android_device.AndroidDevice) -> None: """Logic for stopping BLE scan. Steps: 1. Scanner stops scanning. Args: scanner: AndroidDevice. The device that starts BLE scan to find target. """ scanner.mbs.bleStopScan(scanner.scan_callback.callback_id) scanner.log.info('BLE scanning stopped') def Connect( client: android_device.AndroidDevice, server: android_device.AndroidDevice ) -> None: """Logic for create a Gatt connection between a client and a server. Steps: 1. Server starts and service added properly. 2. Client connects to server via Gatt, connection completes with GATT_SUCCESS within TIMEOUT, onConnectionStateChange/STATE_CONNECTED is called EXACTLY once. Verifies: Both the client and the server consider themselves connected to each other. Args: client: AndroidDevice. The device that behaves as GATT client. server: AndroidDevice. The device that behaves as GATT server. """ server.server_callback = server.mbs.bleStartServer([SERVICE]) start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30) asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS) uuids = [ characteristic[UUID] for characteristic in start_server_result.data['Service'][ 'Characteristics' ] ] for uuid in [ characteristic[UUID] for characteristic in SERVICE['Characteristics'] ]: asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) server.log.info('BLE server started') client.client_callback = client.mbs.bleConnectGatt(client.connect_to_address) start_client_result = client.client_callback.waitAndGet( 'onConnectionStateChange', CONNECTION_TIMEOUT ) extra_events = client.client_callback.getAll('onConnectionStateChange') asserts.assert_false( extra_events, 'Got unexpected onConnectionStateChange events: %s', extra_events, ) asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS) asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED') client.log.info('BLE client connected') # Verify that the server side also considers itself connected. server_event = server.server_callback.waitAndGet('onConnectionStateChange') asserts.assert_equal(server_event.data[STATUS], GATT_SUCCESS) asserts.assert_equal( server_event.data[STATE], 'STATE_CONNECTED', 'The server side does not consider itself connected, error!', ) logging.info('Gatt connection complete.') logging.info( 'Connection metrics: %d', start_client_result.data['gattConnectionTimeMs'] ) def Disconnect( client: android_device.AndroidDevice, server: android_device.AndroidDevice ) -> None: """Logic for stopping BLE client and server. Steps: 1. Client calls disconnect, gets a callback with STATE_DISCONNECTED and GATT_SUCCESS. 2. Server closes. Verifies: Client gets corresponding callback. Args: client: AndroidDevice. The device that behaves as GATT client. server: AndroidDevice. The device that behaves as GATT server. """ client.mbs.bleDisconnect() stop_client_result = client.client_callback.waitAndGet( 'onConnectionStateChange', 30 ) asserts.assert_equal(stop_client_result.data[STATUS], GATT_SUCCESS) asserts.assert_equal(stop_client_result.data[STATE], 'STATE_DISCONNECTED') client.log.info('BLE client disconnected') server.mbs.bleStopServer() server.log.info('BLE server stopped') def DiscoverServices(client: android_device.AndroidDevice) -> None: """Logic for BLE services discovery. Steps: 1. Client successfully completes service discovery & gets onServicesDiscovered callback within some TIMEOUT, onServicesDiscovered/ GATT_SUCCESS is called EXACTLY once. 2. Client discovers the readable and writable characteristics. Verifies: Client gets corresponding callback. Args: client: AndroidDevice. The device that behaves as GATT client. """ client.mbs.bleDiscoverServices() time.sleep(CONNECTION_TIMEOUT) discover_services_results = client.client_callback.getAll( 'onServiceDiscovered' ) asserts.assert_equal(len(discover_services_results), 1) service_discovered = False asserts.assert_equal(discover_services_results[0].data[STATUS], GATT_SUCCESS) for service in discover_services_results[0].data['Services']: if service['UUID'] == TEST_BLE_SERVICE_UUID: service_discovered = True uuids = [ characteristic[UUID] for characteristic in service['Characteristics'] ] for uuid in [ characteristic[UUID] for characteristic in SERVICE['Characteristics'] ]: asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) asserts.assert_true( service_discovered, 'Failed to discover the customize service' ) client.log.info('BLE discover services finished') def ReadCharacteristic(client: android_device.AndroidDevice) -> None: """Logic for BLE characteristic retrieval. Steps: 1. Client reads a characteristic from server & gets true. 2. Server calls sendResponse & client gets onCharacteristicRead. Verifies: Client gets corresponding callback. Args: client: AndroidDevice. The device that behaves as GATT client. """ read_operation_result = client.mbs.bleReadOperation( TEST_BLE_SERVICE_UUID, TEST_READ_UUID ) asserts.assert_true( read_operation_result, 'BLE read operation failed to start' ) read_operation_result = client.client_callback.waitAndGet( 'onCharacteristicRead', 30 ) asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) asserts.assert_equal(read_operation_result.data['Data'], READ_DATA) client.log.info('Read operation finished') read_operation_result = client.mbs.bleReadOperation( TEST_BLE_SERVICE_UUID, TEST_SECOND_READ_UUID ) asserts.assert_true( read_operation_result, 'BLE read operation failed to start' ) read_operation_result = client.client_callback.waitAndGet( 'onCharacteristicRead', 30 ) asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) asserts.assert_equal(read_operation_result.data['Data'], SECOND_READ_DATA) client.log.info('Second read operation finished') read_operation_result = client.mbs.bleReadOperation( TEST_BLE_SERVICE_UUID, TEST_THIRD_READ_UUID ) asserts.assert_true( read_operation_result, 'BLE read operation failed to start' ) read_operation_result = client.client_callback.waitAndGet( 'onCharacteristicRead', 30 ) asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) asserts.assert_equal(read_operation_result.data['Data'], THIRD_READ_DATA) client.log.info('Third read operation finished') def WriteCharacteristic( client: android_device.AndroidDevice, server: android_device.AndroidDevice ) -> None: """Logic for BLE characteristic write. Steps: 1. Client writes a characteristic to server & gets true. 2. Server calls sendResponse & client gets onCharacteristicWrite. Verifies: Client gets corresponding callback. Args: client: AndroidDevice. The device that behaves as GATT client. server: AndroidDevice. The device that behaves as GATT server. """ write_operation_result = client.mbs.bleWriteOperation( TEST_BLE_SERVICE_UUID, TEST_WRITE_UUID, WRITE_DATA ) asserts.assert_true( write_operation_result, 'BLE write operation failed to start' ) server_write_operation_result = server.server_callback.waitAndGet( 'onCharacteristicWriteRequest', 30 ) asserts.assert_equal(server_write_operation_result.data['Data'], WRITE_DATA) client.client_callback.waitAndGet('onCharacteristicWrite', 30) client.log.info('Write operation finished') write_operation_result = client.mbs.bleWriteOperation( TEST_BLE_SERVICE_UUID, TEST_SECOND_WRITE_UUID, SECOND_WRITE_DATA ) asserts.assert_true( write_operation_result, 'BLE write operation failed to start' ) server_write_operation_result = server.server_callback.waitAndGet( 'onCharacteristicWriteRequest', 30 ) asserts.assert_equal( server_write_operation_result.data['Data'], SECOND_WRITE_DATA ) client.client_callback.waitAndGet('onCharacteristicWrite', 30) client.log.info('Second write operation finished')