1"""BLE test utils for netsim.""" 2 3import logging 4import time 5from typing import Any 6 7from mobly import asserts 8from mobly import utils 9from mobly.controllers import android_device 10from mobly.snippet import callback_event 11 12 13# Number of seconds for the target to stay BLE advertising. 14ADVERTISING_TIME = 120 15# Number of seconds for the target to start BLE advertising. 16ADVERTISING_START_TIME = 30 17# The number of seconds to wait for receiving scan results. 18SCAN_TIMEOUT = 20 19# The number of seconds to wair for connection established. 20CONNECTION_TIMEOUT = 60 21# The number of seconds to wait before cancel connection. 22CANCEL_CONNECTION_WAIT_TIME = 0.1 23# UUID for test service. 24TEST_BLE_SERVICE_UUID = '0000fe23-0000-1000-8000-00805f9b34fb' 25# UUID for write characteristic. 26TEST_WRITE_UUID = '0000e632-0000-1000-8000-00805f9b34fb' 27# UUID for second write characteristic. 28TEST_SECOND_WRITE_UUID = '0000e633-0000-1000-8000-00805f9b34fb' 29# UUID for read test. 30TEST_READ_UUID = '0000e631-0000-1000-8000-00805f9b34fb' 31# UUID for second read characteristic. 32TEST_SECOND_READ_UUID = '0000e634-0000-1000-8000-00805f9b34fb' 33# UUID for third read characteristic. 34TEST_THIRD_READ_UUID = '0000e635-0000-1000-8000-00805f9b34fb' 35# UUID for scan response. 36TEST_SCAN_RESPONSE_UUID = '0000e639-0000-1000-8000-00805f9b34fb' 37# Advertise settings in json format for Ble Advertise. 38ADVERTISE_SETTINGS = { 39 'AdvertiseMode': 'ADVERTISE_MODE_LOW_LATENCY', 40 'Timeout': ADVERTISING_TIME * 1000, 41 'Connectable': True, 42 'TxPowerLevel': 'ADVERTISE_TX_POWER_ULTRA_LOW', 43} 44# Ramdom data to represent device stored in advertise data. 45DATA = utils.rand_ascii_str(16) 46# Random data for scan response. 47SCAN_RESPONSE_DATA = utils.rand_ascii_str(16) 48# Random data for read operation. 49READ_DATA = utils.rand_ascii_str(8) 50# Random data for second read operation. 51SECOND_READ_DATA = utils.rand_ascii_str(8) 52# Random data for third read operation. 53THIRD_READ_DATA = utils.rand_ascii_str(8) 54# Random data for write operation. 55WRITE_DATA = utils.rand_ascii_str(8) 56# Random data for second write operation. 57SECOND_WRITE_DATA = utils.rand_ascii_str(8) 58# Advertise data in json format for BLE advertise. 59ADVERTISE_DATA = { 60 'IncludeDeviceName': False, 61 'ServiceData': [{'UUID': TEST_BLE_SERVICE_UUID, 'Data': DATA}], 62} 63# Advertise data in json format representing scan response for BLE advertise. 64SCAN_RESPONSE = { 65 'IncludeDeviceName': False, 66 'ServiceData': [{ 67 'UUID': TEST_SCAN_RESPONSE_UUID, 68 'Data': SCAN_RESPONSE_DATA, 69 }], 70} 71# Scan filter in json format for BLE scan. 72SCAN_FILTER = {'ServiceUuid': TEST_BLE_SERVICE_UUID} 73# Scan settings in json format for BLE scan. 74SCAN_SETTINGS = {'ScanMode': 'SCAN_MODE_LOW_LATENCY'} 75# Characteristics for write in json format. 76WRITE_CHARACTERISTIC = { 77 'UUID': TEST_WRITE_UUID, 78 'Property': 'PROPERTY_WRITE', 79 'Permission': 'PERMISSION_WRITE', 80} 81SECOND_WRITE_CHARACTERISTIC = { 82 'UUID': TEST_SECOND_WRITE_UUID, 83 'Property': 'PROPERTY_WRITE', 84 'Permission': 'PERMISSION_WRITE', 85} 86# Characteristics for read in json format. 87READ_CHARACTERISTIC = { 88 'UUID': TEST_READ_UUID, 89 'Property': 'PROPERTY_READ', 90 'Permission': 'PERMISSION_READ', 91 'Data': READ_DATA, 92} 93SECOND_READ_CHARACTERISTIC = { 94 'UUID': TEST_SECOND_READ_UUID, 95 'Property': 'PROPERTY_READ', 96 'Permission': 'PERMISSION_READ', 97 'Data': SECOND_READ_DATA, 98} 99THIRD_READ_CHARACTERISTIC = { 100 'UUID': TEST_THIRD_READ_UUID, 101 'Property': 'PROPERTY_READ', 102 'Permission': 'PERMISSION_READ', 103 'Data': THIRD_READ_DATA, 104} 105# Service data in json format for Ble Server. 106SERVICE = { 107 'UUID': TEST_BLE_SERVICE_UUID, 108 'Type': 'SERVICE_TYPE_PRIMARY', 109 'Characteristics': [ 110 WRITE_CHARACTERISTIC, 111 SECOND_WRITE_CHARACTERISTIC, 112 READ_CHARACTERISTIC, 113 SECOND_READ_CHARACTERISTIC, 114 THIRD_READ_CHARACTERISTIC, 115 ], 116} 117# Macros for literal string. 118UUID = 'UUID' 119GATT_SUCCESS = 'GATT_SUCCESS' 120STATE = 'newState' 121STATUS = 'status' 122 123 124def IsRequiredScanResult(scan_result: callback_event.CallbackEvent) -> bool: 125 result = scan_result.data['result'] 126 for service in result['ScanRecord']['Services']: 127 if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA: 128 return True 129 return False 130 131 132def Discover( 133 scanner: android_device.AndroidDevice, 134 advertiser: android_device.AndroidDevice, 135) -> dict[str, Any]: 136 """Logic for BLE scan and advertising. 137 138 Steps: 139 1. Advertiser starts advertising and gets a startSuccess callback. 140 2. Scanner starts scanning and finds advertiser from scan results. 141 142 Verifies: 143 Advertiser is discovered within 5s by scanner. 144 145 Args: 146 scanner: AndroidDevice. The device that starts BLE scan to find target. 147 advertiser: AndroidDevice. The device that keeps advertising so other 148 devices acknowledge it. 149 150 Returns: 151 dict. Scan results. 152 153 Raises: 154 TimeoutError: The expected event does not occur within the time limit. 155 """ 156 # Retry initial command in case command is lost after triggering a reset 157 max_attempts = 2 158 for attempt_num in range(max_attempts): 159 advertiser.advertise_callback = advertiser.mbs.bleStartAdvertising( 160 ADVERTISE_SETTINGS, ADVERTISE_DATA, SCAN_RESPONSE 161 ) 162 scanner.scan_callback = scanner.mbs.bleStartScan( 163 [SCAN_FILTER], SCAN_SETTINGS 164 ) 165 success = False 166 for _ in range(ADVERTISING_START_TIME): 167 failure = advertiser.advertise_callback.getAll('onStartFailure') 168 if failure: 169 logging.warning( 170 "'onStartFailure' event detected after bleStartAdvertising" 171 ) 172 success = advertiser.advertise_callback.getAll('onStartSuccess') 173 if success: 174 break 175 time.sleep(1) 176 else: 177 logging.error( 178 'Timed out after %ss waiting for an "onStartSuccess" event ', 179 ADVERTISING_START_TIME, 180 ) 181 if not success: 182 if attempt_num < max_attempts - 1: 183 logging.warning( 184 "'onStartSuccess' event was not received after " 185 'bleStartAdvertising. Retrying... (%d)', 186 attempt_num + 1, 187 ) 188 else: 189 raise TimeoutError( 190 f'Timed out after {max_attempts} retries of ' 191 f'{ADVERTISING_START_TIME}s waiting for an ' 192 '"onStartSuccess" event ' 193 ) 194 195 advertiser.log.info('BLE advertising started') 196 time.sleep(SCAN_TIMEOUT) 197 scan_result = scanner.scan_callback.waitForEvent( 198 'onScanResult', IsRequiredScanResult, SCAN_TIMEOUT 199 ) 200 scan_success = False 201 scan_response_found = False 202 result = scan_result.data['result'] 203 scan_start_to_result_time_ms = scan_result.data['StartToResultTimeDeltaMs'] 204 for service in result['ScanRecord']['Services']: 205 if service[UUID] == TEST_BLE_SERVICE_UUID and service['Data'] == DATA: 206 scanner.connect_to_address = result['Device']['Address'] 207 scan_success = True 208 if ( 209 service[UUID] == TEST_SCAN_RESPONSE_UUID 210 and service['Data'] == SCAN_RESPONSE_DATA 211 ): 212 scan_response_found = True 213 asserts.assert_true( 214 scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT 215 ) 216 asserts.assert_true(scan_response_found, 'Scan response is not found') 217 logging.info('Discovery metrics: %d', scan_start_to_result_time_ms) 218 return result 219 220 221def StartScanning( 222 scanner: android_device.AndroidDevice, scan_duration: int 223) -> list[dict[str, Any]]: 224 """Logic for BLE scanning for advertisers. 225 226 Steps: 227 1. Scanner starts scanning with retries 228 2. Retrieves the ScanResult 229 230 Verifies: 231 Advertiser is discovered within timeout by scanner. 232 233 Args: 234 scanner: AndroidDevice. The device that starts BLE scan to find advertisers. 235 scan_duration: Number of seconds for each scan attempt 236 237 Returns: 238 List of dicts containing Scan results. 239 240 Raises: 241 TimeoutError: The expected event does not occur within the time limit. 242 """ 243 # Retry initial command in case command is lost after triggering a reset 244 max_attempts = 3 245 scan_success = False 246 result = [] 247 scan_result = None 248 for attempt_num in range(max_attempts): 249 scanner.scan_callback = scanner.mbs.bleStartScan() 250 scanner.log.info('BLE scanning started') 251 failure = scanner.scan_callback.getAll('onScanFailed') 252 if failure: 253 logging.warning("'onScanFailed' event detected after bleStartScan") 254 continue 255 success = False 256 for _ in range(int(SCAN_TIMEOUT / scan_duration)): 257 time.sleep(scan_duration) 258 scan_result = scanner.scan_callback.getAll('onScanResult') 259 if scan_result: 260 success = True 261 break 262 else: 263 logging.error( 264 'Timed out after %ss waiting for an "onScanResult" event ', 265 SCAN_TIMEOUT, 266 ) 267 if success: 268 break 269 if attempt_num < max_attempts - 1: 270 logging.warning( 271 "'onScanResult' event was not received after " 272 'bleStartScan. Retrying... (%d)', 273 attempt_num + 1, 274 ) 275 else: 276 raise TimeoutError( 277 f'Timed out after {max_attempts} retries of ' 278 f'{SCAN_TIMEOUT}s waiting for an ' 279 '"onScanResult" event ' 280 ) 281 282 if scan_result: 283 scan_success = True 284 result = [result.data['result'] for result in scan_result] 285 286 asserts.assert_true( 287 scan_success, 'Advertiser is not found inside %d seconds' % SCAN_TIMEOUT 288 ) 289 return result 290 291 292def StopDiscover( 293 scanner: android_device.AndroidDevice, 294 advertiser: android_device.AndroidDevice, 295) -> None: 296 """Logic for stopping BLE scan and advertising. 297 298 Steps: 299 1. Scanner stops scanning. 300 2. Advertiser stops advertising. 301 302 Args: 303 scanner: AndroidDevice. The device that starts BLE scan to find target. 304 advertiser: AndroidDevice. The device that keeps advertising so other 305 devices acknowledge it. 306 """ 307 scanner.mbs.bleStopScan(scanner.scan_callback.callback_id) 308 scanner.log.info('BLE scanning stopped') 309 advertiser.mbs.bleStopAdvertising(advertiser.advertise_callback.callback_id) 310 advertiser.log.info('BLE advertising stopped') 311 312 313def StopScanning(scanner: android_device.AndroidDevice) -> None: 314 """Logic for stopping BLE scan. 315 316 Steps: 317 1. Scanner stops scanning. 318 319 Args: 320 scanner: AndroidDevice. The device that starts BLE scan to find target. 321 """ 322 scanner.mbs.bleStopScan(scanner.scan_callback.callback_id) 323 scanner.log.info('BLE scanning stopped') 324 325 326def Connect( 327 client: android_device.AndroidDevice, server: android_device.AndroidDevice 328) -> None: 329 """Logic for create a Gatt connection between a client and a server. 330 331 Steps: 332 1. Server starts and service added properly. 333 2. Client connects to server via Gatt, connection completes with 334 GATT_SUCCESS within TIMEOUT, onConnectionStateChange/STATE_CONNECTED is 335 called EXACTLY once. 336 337 Verifies: 338 Both the client and the server consider themselves connected to each other. 339 340 Args: 341 client: AndroidDevice. The device that behaves as GATT client. 342 server: AndroidDevice. The device that behaves as GATT server. 343 """ 344 server.server_callback = server.mbs.bleStartServer([SERVICE]) 345 start_server_result = server.server_callback.waitAndGet('onServiceAdded', 30) 346 asserts.assert_equal(start_server_result.data[STATUS], GATT_SUCCESS) 347 uuids = [ 348 characteristic[UUID] 349 for characteristic in start_server_result.data['Service'][ 350 'Characteristics' 351 ] 352 ] 353 for uuid in [ 354 characteristic[UUID] for characteristic in SERVICE['Characteristics'] 355 ]: 356 asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) 357 server.log.info('BLE server started') 358 client.client_callback = client.mbs.bleConnectGatt(client.connect_to_address) 359 start_client_result = client.client_callback.waitAndGet( 360 'onConnectionStateChange', CONNECTION_TIMEOUT 361 ) 362 extra_events = client.client_callback.getAll('onConnectionStateChange') 363 asserts.assert_false( 364 extra_events, 365 'Got unexpected onConnectionStateChange events: %s', 366 extra_events, 367 ) 368 asserts.assert_equal(start_client_result.data[STATUS], GATT_SUCCESS) 369 asserts.assert_equal(start_client_result.data[STATE], 'STATE_CONNECTED') 370 client.log.info('BLE client connected') 371 # Verify that the server side also considers itself connected. 372 server_event = server.server_callback.waitAndGet('onConnectionStateChange') 373 asserts.assert_equal(server_event.data[STATUS], GATT_SUCCESS) 374 asserts.assert_equal( 375 server_event.data[STATE], 376 'STATE_CONNECTED', 377 'The server side does not consider itself connected, error!', 378 ) 379 logging.info('Gatt connection complete.') 380 logging.info( 381 'Connection metrics: %d', start_client_result.data['gattConnectionTimeMs'] 382 ) 383 384 385def Disconnect( 386 client: android_device.AndroidDevice, server: android_device.AndroidDevice 387) -> None: 388 """Logic for stopping BLE client and server. 389 390 Steps: 391 1. Client calls disconnect, gets a callback with STATE_DISCONNECTED and 392 GATT_SUCCESS. 393 2. Server closes. 394 395 Verifies: Client gets corresponding callback. 396 397 Args: 398 client: AndroidDevice. The device that behaves as GATT client. 399 server: AndroidDevice. The device that behaves as GATT server. 400 """ 401 client.mbs.bleDisconnect() 402 stop_client_result = client.client_callback.waitAndGet( 403 'onConnectionStateChange', 30 404 ) 405 asserts.assert_equal(stop_client_result.data[STATUS], GATT_SUCCESS) 406 asserts.assert_equal(stop_client_result.data[STATE], 'STATE_DISCONNECTED') 407 client.log.info('BLE client disconnected') 408 server.mbs.bleStopServer() 409 server.log.info('BLE server stopped') 410 411 412def DiscoverServices(client: android_device.AndroidDevice) -> None: 413 """Logic for BLE services discovery. 414 415 Steps: 416 1. Client successfully completes service discovery & gets 417 onServicesDiscovered callback within some TIMEOUT, onServicesDiscovered/ 418 GATT_SUCCESS is called EXACTLY once. 419 2. Client discovers the readable and writable characteristics. 420 421 Verifies: 422 Client gets corresponding callback. 423 424 Args: 425 client: AndroidDevice. The device that behaves as GATT client. 426 """ 427 client.mbs.bleDiscoverServices() 428 time.sleep(CONNECTION_TIMEOUT) 429 discover_services_results = client.client_callback.getAll( 430 'onServiceDiscovered' 431 ) 432 asserts.assert_equal(len(discover_services_results), 1) 433 service_discovered = False 434 asserts.assert_equal(discover_services_results[0].data[STATUS], GATT_SUCCESS) 435 for service in discover_services_results[0].data['Services']: 436 if service['UUID'] == TEST_BLE_SERVICE_UUID: 437 service_discovered = True 438 uuids = [ 439 characteristic[UUID] for characteristic in service['Characteristics'] 440 ] 441 for uuid in [ 442 characteristic[UUID] for characteristic in SERVICE['Characteristics'] 443 ]: 444 asserts.assert_true(uuid in uuids, 'Failed to find uuid %s.' % uuid) 445 asserts.assert_true( 446 service_discovered, 'Failed to discover the customize service' 447 ) 448 client.log.info('BLE discover services finished') 449 450 451def ReadCharacteristic(client: android_device.AndroidDevice) -> None: 452 """Logic for BLE characteristic retrieval. 453 454 Steps: 455 1. Client reads a characteristic from server & gets true. 456 2. Server calls sendResponse & client gets onCharacteristicRead. 457 458 Verifies: 459 Client gets corresponding callback. 460 461 Args: 462 client: AndroidDevice. The device that behaves as GATT client. 463 """ 464 read_operation_result = client.mbs.bleReadOperation( 465 TEST_BLE_SERVICE_UUID, TEST_READ_UUID 466 ) 467 asserts.assert_true( 468 read_operation_result, 'BLE read operation failed to start' 469 ) 470 read_operation_result = client.client_callback.waitAndGet( 471 'onCharacteristicRead', 30 472 ) 473 asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) 474 asserts.assert_equal(read_operation_result.data['Data'], READ_DATA) 475 client.log.info('Read operation finished') 476 read_operation_result = client.mbs.bleReadOperation( 477 TEST_BLE_SERVICE_UUID, TEST_SECOND_READ_UUID 478 ) 479 asserts.assert_true( 480 read_operation_result, 'BLE read operation failed to start' 481 ) 482 read_operation_result = client.client_callback.waitAndGet( 483 'onCharacteristicRead', 30 484 ) 485 asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) 486 asserts.assert_equal(read_operation_result.data['Data'], SECOND_READ_DATA) 487 client.log.info('Second read operation finished') 488 read_operation_result = client.mbs.bleReadOperation( 489 TEST_BLE_SERVICE_UUID, TEST_THIRD_READ_UUID 490 ) 491 asserts.assert_true( 492 read_operation_result, 'BLE read operation failed to start' 493 ) 494 read_operation_result = client.client_callback.waitAndGet( 495 'onCharacteristicRead', 30 496 ) 497 asserts.assert_equal(read_operation_result.data[STATUS], GATT_SUCCESS) 498 asserts.assert_equal(read_operation_result.data['Data'], THIRD_READ_DATA) 499 client.log.info('Third read operation finished') 500 501 502def WriteCharacteristic( 503 client: android_device.AndroidDevice, server: android_device.AndroidDevice 504) -> None: 505 """Logic for BLE characteristic write. 506 507 Steps: 508 1. Client writes a characteristic to server & gets true. 509 2. Server calls sendResponse & client gets onCharacteristicWrite. 510 511 Verifies: 512 Client gets corresponding callback. 513 514 Args: 515 client: AndroidDevice. The device that behaves as GATT client. 516 server: AndroidDevice. The device that behaves as GATT server. 517 """ 518 write_operation_result = client.mbs.bleWriteOperation( 519 TEST_BLE_SERVICE_UUID, TEST_WRITE_UUID, WRITE_DATA 520 ) 521 asserts.assert_true( 522 write_operation_result, 'BLE write operation failed to start' 523 ) 524 server_write_operation_result = server.server_callback.waitAndGet( 525 'onCharacteristicWriteRequest', 30 526 ) 527 asserts.assert_equal(server_write_operation_result.data['Data'], WRITE_DATA) 528 client.client_callback.waitAndGet('onCharacteristicWrite', 30) 529 client.log.info('Write operation finished') 530 write_operation_result = client.mbs.bleWriteOperation( 531 TEST_BLE_SERVICE_UUID, TEST_SECOND_WRITE_UUID, SECOND_WRITE_DATA 532 ) 533 asserts.assert_true( 534 write_operation_result, 'BLE write operation failed to start' 535 ) 536 server_write_operation_result = server.server_callback.waitAndGet( 537 'onCharacteristicWriteRequest', 30 538 ) 539 asserts.assert_equal( 540 server_write_operation_result.data['Data'], SECOND_WRITE_DATA 541 ) 542 client.client_callback.waitAndGet('onCharacteristicWrite', 30) 543 client.log.info('Second write operation finished') 544