1#!/usr/bin/env python3 2# 3# Copyright (C) 2016 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17This test script exercises different GATT connection tests. 18 19Original location: 20 tools/test/connectivity/acts_tests/tests/google/ble/gatt/GattConnectTest.py 21""" 22 23import logging 24import time 25from queue import Empty 26 27from blueberry.tests.gd.cert.test_decorators import test_tracker_info 28from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result 29from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test 30from blueberry.utils.ble_scan_adv_constants import BleAdvertiseSettingsMode 31from blueberry.utils.ble_scan_adv_constants import BleScanSettingsMatchNums 32from blueberry.utils.ble_scan_adv_constants import BleScanSettingsModes 33from blueberry.utils.bt_constants import BluetoothProfile 34from blueberry.utils.bt_gatt_constants import GattCallbackError 35from blueberry.utils.bt_gatt_constants import GattCallbackString 36from blueberry.utils.bt_gatt_constants import GattCharacteristic 37from blueberry.utils.bt_gatt_constants import GattConnectionState 38from blueberry.utils.bt_gatt_constants import GattMtuSize 39from blueberry.utils.bt_gatt_constants import GattPhyMask 40from blueberry.utils.bt_gatt_constants import GattServiceType 41from blueberry.utils.bt_gatt_constants import GattTransport 42from blueberry.utils.bt_gatt_utils import GattTestUtilsError 43from blueberry.utils.bt_gatt_utils import close_gatt_client 44from blueberry.utils.bt_gatt_utils import disconnect_gatt_connection 45from blueberry.utils.bt_gatt_utils import get_mac_address_of_generic_advertisement 46from blueberry.utils.bt_gatt_utils import log_gatt_server_uuids 47from blueberry.utils.bt_gatt_utils import orchestrate_gatt_connection 48from blueberry.utils.bt_gatt_utils import setup_gatt_connection 49from blueberry.utils.bt_gatt_utils import setup_multiple_services 50from blueberry.utils.bt_gatt_utils import wait_for_gatt_disconnect_event 51from blueberry.utils.bt_test_utils import clear_bonded_devices 52from blueberry.tests.gd.cert.truth import assertThat 53from mobly import asserts 54from mobly import test_runner 55 56PHYSICAL_DISCONNECT_TIMEOUT = 5 57 58 59class GattConnectTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass): 60 adv_instances = [] 61 bluetooth_gatt_list = [] 62 gatt_server_list = [] 63 default_timeout = 10 64 default_discovery_timeout = 3 65 66 def setup_class(self): 67 super().setup_class() 68 self.central = self.dut 69 self.peripheral = self.cert 70 71 def setup_test(self): 72 super().setup_test() 73 bluetooth_gatt_list = [] 74 self.gatt_server_list = [] 75 self.adv_instances = [] 76 # Ensure there is ample time for a physical disconnect in between 77 # testcases. 78 logging.info("Waiting for {} seconds for physical GATT disconnections".format(PHYSICAL_DISCONNECT_TIMEOUT)) 79 time.sleep(PHYSICAL_DISCONNECT_TIMEOUT) 80 81 def teardown_test(self): 82 for bluetooth_gatt in self.bluetooth_gatt_list: 83 self.central.sl4a.gattClientClose(bluetooth_gatt) 84 for gatt_server in self.gatt_server_list: 85 self.peripheral.sl4a.gattServerClose(gatt_server) 86 for adv in self.adv_instances: 87 self.peripheral.sl4a.bleStopBleAdvertising(adv) 88 super().teardown_test() 89 return True 90 91 def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback): 92 logging.info("Disconnecting from peripheral device.") 93 try: 94 disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback) 95 logging.info("Disconnected GATT, closing GATT client.") 96 close_gatt_client(self.central, bluetooth_gatt) 97 logging.info("Closed GATT client, removing it from local tracker.") 98 if bluetooth_gatt in self.bluetooth_gatt_list: 99 self.bluetooth_gatt_list.remove(bluetooth_gatt) 100 except GattTestUtilsError as err: 101 logging.error(err) 102 return False 103 return True 104 105 def _find_service_added_event(self, gatt_server_cb, uuid): 106 expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_cb) 107 try: 108 event = self.peripheral.ed.pop_event(expected_event, self.default_timeout) 109 except Empty: 110 logging.error(GattCallbackError.SERV_ADDED_ERR.format(expected_event)) 111 return False 112 if event['data']['serviceUuid'].lower() != uuid.lower(): 113 logging.error("Uuid mismatch. Found: {}, Expected {}.".format(event['data']['serviceUuid'], uuid)) 114 return False 115 return True 116 117 def _verify_mtu_changed_on_client_and_server(self, expected_mtu, gatt_callback, gatt_server_callback): 118 expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback) 119 try: 120 mtu_event = self.central.ed.pop_event(expected_event, self.default_timeout) 121 mtu_size_found = mtu_event['data']['MTU'] 122 if mtu_size_found != expected_mtu: 123 logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu)) 124 return False 125 except Empty: 126 logging.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event)) 127 return False 128 129 expected_event = GattCallbackString.MTU_SERV_CHANGED.format(gatt_server_callback) 130 try: 131 mtu_event = self.peripheral.ed.pop_event(expected_event, self.default_timeout) 132 mtu_size_found = mtu_event['data']['MTU'] 133 if mtu_size_found != expected_mtu: 134 logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu)) 135 return False 136 except Empty: 137 logging.error(GattCallbackError.MTU_SERV_CHANGED_ERR.format(expected_event)) 138 return False 139 return True 140 141 @test_tracker_info(uuid='8a3530a3-c8bb-466b-9710-99e694c38618') 142 def test_gatt_connect(self): 143 """Test GATT connection over LE. 144 145 Test establishing a gatt connection between a GATT server and GATT 146 client. 147 148 Steps: 149 1. Start a generic advertisement. 150 2. Start a generic scanner. 151 3. Find the advertisement and extract the mac address. 152 4. Stop the first scanner. 153 5. Create a GATT connection between the scanner and advertiser. 154 6. Disconnect the GATT connection. 155 156 Expected Result: 157 Verify that a connection was established and then disconnected 158 successfully. 159 160 Returns: 161 Pass if True 162 Fail if False 163 164 TAGS: LE, Advertising, Filtering, Scanning, GATT 165 Priority: 0 166 """ 167 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 168 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 169 self.gatt_server_list.append(gatt_server) 170 try: 171 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 172 self.bluetooth_gatt_list.append(bluetooth_gatt) 173 except GattTestUtilsError as err: 174 logging.error(err) 175 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 176 return 177 self.adv_instances.append(adv_callback) 178 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 179 180 @test_tracker_info(uuid='a839b505-03ac-4783-be7e-1d43129a1948') 181 def test_gatt_connect_stop_advertising(self): 182 """Test GATT connection over LE then stop advertising 183 184 A test case that verifies the GATT connection doesn't 185 disconnect when LE advertisement is stopped. 186 187 Steps: 188 1. Start a generic advertisement. 189 2. Start a generic scanner. 190 3. Find the advertisement and extract the mac address. 191 4. Stop the first scanner. 192 5. Create a GATT connection between the scanner and advertiser. 193 6. Stop the advertiser. 194 7. Verify no connection state changed happened. 195 8. Disconnect the GATT connection. 196 197 Expected Result: 198 Verify that a connection was established and not disconnected 199 when advertisement stops. 200 201 Returns: 202 Pass if True 203 Fail if False 204 205 TAGS: LE, Advertising, Filtering, Scanning, GATT 206 Priority: 0 207 """ 208 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 209 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 210 self.gatt_server_list.append(gatt_server) 211 try: 212 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 213 self.bluetooth_gatt_list.append(bluetooth_gatt) 214 except GattTestUtilsError as err: 215 logging.error(err) 216 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 217 return 218 self.peripheral.sl4a.bleStopBleAdvertising(adv_callback) 219 try: 220 event = self.central.ed.pop_event( 221 GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback, self.default_timeout)) 222 logging.error("Connection event found when not expected: {}".format(event)) 223 asserts.fail("Connection event found when not expected: {}".format(event)) 224 return 225 except Empty: 226 logging.info("No connection state change as expected") 227 try: 228 self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback) 229 except Exception as err: 230 logging.info("Failed to orchestrate disconnect: {}".format(err)) 231 asserts.fail("Failed to orchestrate disconnect: {}".format(err)) 232 return 233 234 @test_tracker_info(uuid='b82f91a8-54bb-4779-a117-73dc7fdb28cc') 235 def test_gatt_connect_autoconnect(self): 236 """Test GATT connection over LE. 237 238 Test re-establishing a gatt connection using autoconnect 239 set to True in order to test connection allowlist. 240 241 Steps: 242 1. Start a generic advertisement. 243 2. Start a generic scanner. 244 3. Find the advertisement and extract the mac address. 245 4. Stop the first scanner. 246 5. Create a GATT connection between the scanner and advertiser. 247 6. Disconnect the GATT connection. 248 7. Create a GATT connection with autoconnect set to True 249 8. Disconnect the GATT connection. 250 251 Expected Result: 252 Verify that a connection was re-established and then disconnected 253 successfully. 254 255 Returns: 256 Pass if True 257 Fail if False 258 259 TAGS: LE, Advertising, Filtering, Scanning, GATT 260 Priority: 0 261 """ 262 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 263 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 264 self.peripheral.log.info("Opened GATT server on CERT, scanning it from DUT") 265 self.gatt_server_list.append(gatt_server) 266 autoconnect = False 267 mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement( 268 self.central, self.peripheral)) 269 self.adv_instances.append(adv_callback) 270 self.central.log.info("Discovered BLE advertisement, connecting GATT with autoConnect={}".format(autoconnect)) 271 try: 272 bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect) 273 self.central.log.info("GATT connected, stopping BLE scanning") 274 self.central.sl4a.bleStopBleScan(scan_callback) 275 self.central.log.info("Stopped BLE scanning") 276 self.bluetooth_gatt_list.append(bluetooth_gatt) 277 except GattTestUtilsError as err: 278 logging.error(err) 279 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 280 return 281 self.central.log.info("Disconnecting GATT") 282 try: 283 disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback) 284 self.central.log.info("GATT disconnected, closing GATT client") 285 close_gatt_client(self.central, bluetooth_gatt) 286 self.central.log.info("GATT client closed, removing it from in-memory tracker") 287 if bluetooth_gatt in self.bluetooth_gatt_list: 288 self.bluetooth_gatt_list.remove(bluetooth_gatt) 289 except GattTestUtilsError as err: 290 logging.error(err) 291 asserts.fail("Failed to disconnect GATT, error: {}".format(err)) 292 return 293 autoconnect = True 294 self.central.log.info("Connecting GATT with autoConnect={}".format(autoconnect)) 295 bluetooth_gatt = self.central.sl4a.gattClientConnectGatt( 296 gatt_callback, mac_address, autoconnect, GattTransport.TRANSPORT_AUTO, False, GattPhyMask.PHY_LE_1M_MASK) 297 self.central.log.info("Waiting for GATt to become connected") 298 self.bluetooth_gatt_list.append(bluetooth_gatt) 299 expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback) 300 try: 301 event = self.central.ed.pop_event(expected_event, self.default_timeout) 302 self.central.log.info("Received event={}".format(event)) 303 except Empty: 304 logging.error(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) 305 asserts.fail(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event)) 306 return 307 found_state = event['data']['State'] 308 expected_state = GattConnectionState.STATE_CONNECTED 309 assertThat(found_state).isEqualTo(expected_state) 310 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 311 312 @test_tracker_info(uuid='e506fa50-7cd9-4bd8-938a-6b85dcfe6bc6') 313 def test_gatt_connect_opportunistic(self): 314 """Test opportunistic GATT connection over LE. 315 316 Test establishing a gatt connection between a GATT server and GATT 317 client in opportunistic mode. 318 319 Steps: 320 1. Start a generic advertisement. 321 2. Start a generic scanner. 322 3. Find the advertisement and extract the mac address. 323 4. Stop the first scanner. 324 5. Create GATT connection 1 between the scanner and advertiser normally 325 6. Create GATT connection 2 between the scanner and advertiser using 326 opportunistic mode 327 7. Disconnect GATT connection 1 328 329 Expected Result: 330 Verify GATT connection 2 automatically disconnects when GATT connection 331 1 disconnect 332 333 Returns: 334 Pass if True 335 Fail if False 336 337 TAGS: LE, Advertising, Filtering, Scanning, GATT 338 Priority: 0 339 """ 340 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 341 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 342 self.gatt_server_list.append(gatt_server) 343 mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement( 344 self.central, self.peripheral)) 345 # Make GATT connection 1 346 try: 347 bluetooth_gatt_1, gatt_callback_1 = setup_gatt_connection( 348 self.central, mac_address, False, transport=GattTransport.TRANSPORT_AUTO, opportunistic=False) 349 self.central.sl4a.bleStopBleScan(scan_callback) 350 self.adv_instances.append(adv_callback) 351 self.bluetooth_gatt_list.append(bluetooth_gatt_1) 352 except GattTestUtilsError as err: 353 logging.error(err) 354 asserts.fail("Failed to connect to GATT 1, error: {}".format(err)) 355 return 356 # Make GATT connection 2 357 try: 358 bluetooth_gatt_2, gatt_callback_2 = setup_gatt_connection( 359 self.central, mac_address, False, transport=GattTransport.TRANSPORT_AUTO, opportunistic=True) 360 self.bluetooth_gatt_list.append(bluetooth_gatt_2) 361 except GattTestUtilsError as err: 362 logging.error(err) 363 asserts.fail("Failed to connect to GATT 2, error: {}".format(err)) 364 return 365 # Disconnect GATT connection 1 366 try: 367 disconnect_gatt_connection(self.central, bluetooth_gatt_1, gatt_callback_1) 368 close_gatt_client(self.central, bluetooth_gatt_1) 369 if bluetooth_gatt_1 in self.bluetooth_gatt_list: 370 self.bluetooth_gatt_list.remove(bluetooth_gatt_1) 371 except GattTestUtilsError as err: 372 logging.error(err) 373 asserts.fail("Failed to disconnect GATT 1, error: {}".format(err)) 374 return 375 # Confirm that GATT connection 2 also disconnects 376 wait_for_gatt_disconnect_event(self.central, gatt_callback_2) 377 close_gatt_client(self.central, bluetooth_gatt_2) 378 if bluetooth_gatt_2 in self.bluetooth_gatt_list: 379 self.bluetooth_gatt_list.remove(bluetooth_gatt_2) 380 381 @test_tracker_info(uuid='1e01838e-c4de-4720-9adf-9e0419378226') 382 def test_gatt_request_min_mtu(self): 383 """Test GATT connection over LE and exercise MTU sizes. 384 385 Test establishing a gatt connection between a GATT server and GATT 386 client. Request an MTU size that matches the correct minimum size. 387 388 Steps: 389 1. Start a generic advertisement. 390 2. Start a generic scanner. 391 3. Find the advertisement and extract the mac address. 392 4. Stop the first scanner. 393 5. Create a GATT connection between the scanner and advertiser. 394 6. From the scanner (client) request MTU size change to the 395 minimum value. 396 7. Find the MTU changed event on the client. 397 8. Disconnect the GATT connection. 398 399 Expected Result: 400 Verify that a connection was established and the MTU value found 401 matches the expected MTU value. 402 403 Returns: 404 Pass if True 405 Fail if False 406 407 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 408 Priority: 0 409 """ 410 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 411 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 412 self.gatt_server_list.append(gatt_server) 413 try: 414 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 415 self.bluetooth_gatt_list.append(bluetooth_gatt) 416 except GattTestUtilsError as err: 417 logging.error(err) 418 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 419 return 420 self.adv_instances.append(adv_callback) 421 expected_mtu = GattMtuSize.MIN 422 self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, expected_mtu) 423 assertThat(self._verify_mtu_changed_on_client_and_server(expected_mtu, gatt_callback, gatt_server_cb)).isTrue() 424 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 425 426 @test_tracker_info(uuid='c1fa3a2d-fb47-47db-bdd1-458928cd6a5f') 427 def test_gatt_request_max_mtu(self): 428 """Test GATT connection over LE and exercise MTU sizes. 429 430 Test establishing a gatt connection between a GATT server and GATT 431 client. Request an MTU size that matches the correct maximum size. 432 433 Steps: 434 1. Start a generic advertisement. 435 2. Start a generic scanner. 436 3. Find the advertisement and extract the mac address. 437 4. Stop the first scanner. 438 5. Create a GATT connection between the scanner and advertiser. 439 6. From the scanner (client) request MTU size change to the 440 maximum value. 441 7. Find the MTU changed event on the client. 442 8. Disconnect the GATT connection. 443 444 Expected Result: 445 Verify that a connection was established and the MTU value found 446 matches the expected MTU value. 447 448 Returns: 449 Pass if True 450 Fail if False 451 452 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 453 Priority: 0 454 """ 455 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 456 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 457 self.gatt_server_list.append(gatt_server) 458 try: 459 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 460 self.bluetooth_gatt_list.append(bluetooth_gatt) 461 except GattTestUtilsError as err: 462 logging.error(err) 463 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 464 return 465 self.adv_instances.append(adv_callback) 466 expected_mtu = GattMtuSize.MAX 467 self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, expected_mtu) 468 assertThat(self._verify_mtu_changed_on_client_and_server(expected_mtu, gatt_callback, gatt_server_cb)).isTrue() 469 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 470 471 @test_tracker_info(uuid='4416d483-dec3-46cb-8038-4d82620f873a') 472 def test_gatt_request_out_of_bounds_mtu(self): 473 """Test GATT connection over LE and exercise an out of bound MTU size. 474 475 Test establishing a gatt connection between a GATT server and GATT 476 client. Request an MTU size that is the MIN value minus 1. 477 478 Steps: 479 1. Start a generic advertisement. 480 2. Start a generic scanner. 481 3. Find the advertisement and extract the mac address. 482 4. Stop the first scanner. 483 5. Create a GATT connection between the scanner and advertiser. 484 6. From the scanner (client) request MTU size change to the 485 minimum value minus one. 486 7. Find the MTU changed event on the client. 487 8. Disconnect the GATT connection. 488 489 Expected Result: 490 Verify that an MTU changed event was not discovered and that 491 it didn't cause an exception when requesting an out of bounds 492 MTU. 493 494 Returns: 495 Pass if True 496 Fail if False 497 498 TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU 499 Priority: 0 500 """ 501 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 502 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 503 self.gatt_server_list.append(gatt_server) 504 try: 505 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 506 self.bluetooth_gatt_list.append(bluetooth_gatt) 507 except GattTestUtilsError as err: 508 logging.error(err) 509 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 510 return 511 self.adv_instances.append(adv_callback) 512 unexpected_mtu = GattMtuSize.MIN - 1 513 self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, unexpected_mtu) 514 assertThat(self._verify_mtu_changed_on_client_and_server(unexpected_mtu, gatt_callback, 515 gatt_server_cb)).isFalse() 516 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 517 518 @test_tracker_info(uuid='31ffb9ca-cc75-43fb-9802-c19f1c5856b6') 519 def test_gatt_connect_trigger_on_read_rssi(self): 520 """Test GATT connection over LE read RSSI. 521 522 Test establishing a gatt connection between a GATT server and GATT 523 client then read the RSSI. 524 525 Steps: 526 1. Start a generic advertisement. 527 2. Start a generic scanner. 528 3. Find the advertisement and extract the mac address. 529 4. Stop the first scanner. 530 5. Create a GATT connection between the scanner and advertiser. 531 6. From the scanner, request to read the RSSI of the advertiser. 532 7. Disconnect the GATT connection. 533 534 Expected Result: 535 Verify that a connection was established and then disconnected 536 successfully. Verify that the RSSI was ready correctly. 537 538 Returns: 539 Pass if True 540 Fail if False 541 542 TAGS: LE, Advertising, Filtering, Scanning, GATT, RSSI 543 Priority: 1 544 """ 545 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 546 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 547 self.gatt_server_list.append(gatt_server) 548 try: 549 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 550 self.bluetooth_gatt_list.append(bluetooth_gatt) 551 except GattTestUtilsError as err: 552 logging.error(err) 553 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 554 return 555 self.adv_instances.append(adv_callback) 556 expected_event = GattCallbackString.RD_REMOTE_RSSI.format(gatt_callback) 557 if self.central.sl4a.gattClientReadRSSI(bluetooth_gatt): 558 try: 559 self.central.ed.pop_event(expected_event, self.default_timeout) 560 except Empty: 561 logging.error(GattCallbackError.RD_REMOTE_RSSI_ERR.format(expected_event)) 562 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 563 564 @test_tracker_info(uuid='dee9ef28-b872-428a-821b-cc62f27ba936') 565 def test_gatt_connect_trigger_on_services_discovered(self): 566 """Test GATT connection and discover services of peripheral. 567 568 Test establishing a gatt connection between a GATT server and GATT 569 client the discover all services from the connected device. 570 571 Steps: 572 1. Start a generic advertisement. 573 2. Start a generic scanner. 574 3. Find the advertisement and extract the mac address. 575 4. Stop the first scanner. 576 5. Create a GATT connection between the scanner and advertiser. 577 6. From the scanner (central device), discover services. 578 7. Disconnect the GATT connection. 579 580 Expected Result: 581 Verify that a connection was established and then disconnected 582 successfully. Verify that the service were discovered. 583 584 Returns: 585 Pass if True 586 Fail if False 587 588 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 589 Priority: 1 590 """ 591 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 592 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 593 self.gatt_server_list.append(gatt_server) 594 try: 595 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 596 self.bluetooth_gatt_list.append(bluetooth_gatt) 597 except GattTestUtilsError as err: 598 logging.error(err) 599 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 600 return 601 self.adv_instances.append(adv_callback) 602 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 603 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 604 try: 605 event = self.central.ed.pop_event(expected_event, self.default_timeout) 606 except Empty: 607 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 608 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 609 return 610 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 611 612 @test_tracker_info(uuid='01883bdd-0cf8-48fb-bf15-467bbd4f065b') 613 def test_gatt_connect_trigger_on_services_discovered_iterate_attributes(self): 614 """Test GATT connection and iterate peripherals attributes. 615 616 Test establishing a gatt connection between a GATT server and GATT 617 client and iterate over all the characteristics and descriptors of the 618 discovered services. 619 620 Steps: 621 1. Start a generic advertisement. 622 2. Start a generic scanner. 623 3. Find the advertisement and extract the mac address. 624 4. Stop the first scanner. 625 5. Create a GATT connection between the scanner and advertiser. 626 6. From the scanner (central device), discover services. 627 7. Iterate over all the characteristics and descriptors of the 628 discovered features. 629 8. Disconnect the GATT connection. 630 631 Expected Result: 632 Verify that a connection was established and then disconnected 633 successfully. Verify that the services, characteristics, and descriptors 634 were discovered. 635 636 Returns: 637 Pass if True 638 Fail if False 639 640 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 641 Characteristics, Descriptors 642 Priority: 1 643 """ 644 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 645 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 646 self.gatt_server_list.append(gatt_server) 647 try: 648 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 649 self.bluetooth_gatt_list.append(bluetooth_gatt) 650 except GattTestUtilsError as err: 651 logging.error(err) 652 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 653 return 654 self.adv_instances.append(adv_callback) 655 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 656 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 657 try: 658 event = self.central.ed.pop_event(expected_event, self.default_timeout) 659 discovered_services_index = event['data']['ServicesIndex'] 660 except Empty: 661 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 662 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 663 return 664 log_gatt_server_uuids(self.central, discovered_services_index) 665 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 666 667 @test_tracker_info(uuid='d4277bee-da99-4f48-8a4d-f81b5389da18') 668 def test_gatt_connect_with_service_uuid_variations(self): 669 """Test GATT connection with multiple service uuids. 670 671 Test establishing a gatt connection between a GATT server and GATT 672 client with multiple service uuid variations. 673 674 Steps: 675 1. Start a generic advertisement. 676 2. Start a generic scanner. 677 3. Find the advertisement and extract the mac address. 678 4. Stop the first scanner. 679 5. Create a GATT connection between the scanner and advertiser. 680 6. From the scanner (central device), discover services. 681 7. Verify that all the service uuid variations are found. 682 8. Disconnect the GATT connection. 683 684 Expected Result: 685 Verify that a connection was established and then disconnected 686 successfully. Verify that the service uuid variations are found. 687 688 Returns: 689 Pass if True 690 Fail if False 691 692 TAGS: LE, Advertising, Filtering, Scanning, GATT, Services 693 Priority: 2 694 """ 695 try: 696 gatt_server_cb, gatt_server = setup_multiple_services(self.peripheral) 697 self.gatt_server_list.append(gatt_server) 698 except GattTestUtilsError as err: 699 logging.error(err) 700 asserts.fail("Failed to setup GATT service, error: {}".format(err)) 701 return 702 try: 703 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 704 self.bluetooth_gatt_list.append(bluetooth_gatt) 705 except GattTestUtilsError as err: 706 logging.error(err) 707 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 708 return 709 self.adv_instances.append(adv_callback) 710 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 711 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 712 try: 713 event = self.central.ed.pop_event(expected_event, self.default_timeout) 714 except Empty: 715 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 716 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 717 return 718 discovered_services_index = event['data']['ServicesIndex'] 719 log_gatt_server_uuids(self.central, discovered_services_index) 720 721 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 722 723 @test_tracker_info(uuid='7d3442c5-f71f-44ae-bd35-f2569f01b3b8') 724 def test_gatt_connect_in_quick_succession(self): 725 """Test GATT connections multiple times. 726 727 Test establishing a gatt connection between a GATT server and GATT 728 client with multiple iterations. 729 730 Steps: 731 1. Start a generic advertisement. 732 2. Start a generic scanner. 733 3. Find the advertisement and extract the mac address. 734 4. Stop the first scanner. 735 5. Create a GATT connection between the scanner and advertiser. 736 6. Disconnect the GATT connection. 737 7. Repeat steps 5 and 6 twenty times. 738 739 Expected Result: 740 Verify that a connection was established and then disconnected 741 successfully twenty times. 742 743 Returns: 744 Pass if True 745 Fail if False 746 747 TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress 748 Priority: 1 749 """ 750 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 751 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 752 self.gatt_server_list.append(gatt_server) 753 mac_address, adv_callback, scan_callback = get_mac_address_of_generic_advertisement( 754 self.central, self.peripheral) 755 autoconnect = False 756 for i in range(100): 757 logging.info("Starting connection iteration {}".format(i + 1)) 758 try: 759 bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect) 760 self.central.sl4a.bleStopBleScan(scan_callback) 761 except GattTestUtilsError as err: 762 logging.error(err) 763 asserts.fail("Failed to connect to GATT at iteration {}, error: {}".format(i + 1, err)) 764 return 765 test_result = self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback) 766 if not test_result: 767 logging.info("Failed to disconnect from peripheral device.") 768 asserts.fail("Failed to disconnect from peripheral device.") 769 return 770 self.adv_instances.append(adv_callback) 771 772 @test_tracker_info(uuid='148469d9-7ab0-4c08-b2e9-7e49e88da1fc') 773 def test_gatt_connect_on_path_attack(self): 774 """Test GATT connection with permission write encrypted with on-path attacker prevention 775 776 Test establishing a gatt connection between a GATT server and GATT 777 client while the GATT server's characteristic includes the property 778 write value and the permission write encrypted on-path attacker prevention 779 value. This will prompt LE pairing and then the devices will create a bond. 780 781 Steps: 782 1. Create a GATT server and server callback on the peripheral device. 783 2. Create a unique service and characteristic uuid on the peripheral. 784 3. Create a characteristic on the peripheral with these properties: 785 GattCharacteristic.PROPERTY_WRITE, 786 GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM 787 4. Create a GATT service on the peripheral. 788 5. Add the characteristic to the GATT service. 789 6. Create a GATT connection between your central and peripheral device. 790 7. From the central device, discover the peripheral's services. 791 8. Iterate the services found until you find the unique characteristic 792 created in step 3. 793 9. Once found, write a random but valid value to the characteristic. 794 10. Start pairing helpers on both devices immediately after attempting 795 to write to the characteristic. 796 11. Within 10 seconds of writing the characteristic, there should be 797 a prompt to bond the device from the peripheral. The helpers will 798 handle the UI interaction automatically. (see 799 BluetoothConnectionFacade.java bluetoothStartPairingHelper). 800 12. Verify that the two devices are bonded. 801 802 Expected Result: 803 Verify that a connection was established and the devices are bonded. 804 805 Returns: 806 Pass if True 807 Fail if False 808 809 TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, OnPathAttacker 810 Priority: 1 811 """ 812 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 813 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 814 self.gatt_server_list.append(gatt_server) 815 service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B" 816 test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630" 817 bonded = False 818 characteristic = self.peripheral.sl4a.gattServerCreateBluetoothGattCharacteristic( 819 test_uuid, GattCharacteristic.PROPERTY_WRITE, GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM) 820 gatt_service = self.peripheral.sl4a.gattServerCreateService(service_uuid, GattServiceType.SERVICE_TYPE_PRIMARY) 821 self.peripheral.sl4a.gattServerAddCharacteristicToService(gatt_service, characteristic) 822 self.peripheral.sl4a.gattServerAddService(gatt_server, gatt_service) 823 assertThat(self._find_service_added_event(gatt_server_cb, service_uuid)).isTrue() 824 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 825 self.bluetooth_gatt_list.append(bluetooth_gatt) 826 self.adv_instances.append(adv_callback) 827 if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt): 828 expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback) 829 try: 830 event = self.central.ed.pop_event(expected_event, self.default_timeout) 831 except Empty: 832 logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 833 asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event)) 834 return 835 discovered_services_index = event['data']['ServicesIndex'] 836 else: 837 logging.info("Failed to discover services.") 838 asserts.fail("Failed to discover services.") 839 return 840 test_value = [1, 2, 3, 4, 5, 6, 7] 841 services_count = self.central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index) 842 for i in range(services_count): 843 characteristic_uuids = (self.central.sl4a.gattClientGetDiscoveredCharacteristicUuids( 844 discovered_services_index, i)) 845 for characteristic_uuid in characteristic_uuids: 846 if characteristic_uuid == test_uuid: 847 self.central.sl4a.bluetoothStartPairingHelper() 848 self.peripheral.sl4a.bluetoothStartPairingHelper() 849 self.central.sl4a.gattClientCharacteristicSetValue(bluetooth_gatt, discovered_services_index, i, 850 characteristic_uuid, test_value) 851 self.central.sl4a.gattClientWriteCharacteristic(bluetooth_gatt, discovered_services_index, i, 852 characteristic_uuid) 853 start_time = time.time() + self.default_timeout 854 target_name = self.peripheral.sl4a.bluetoothGetLocalName() 855 while time.time() < start_time and bonded == False: 856 bonded_devices = \ 857 self.central.sl4a.bluetoothGetBondedDevices() 858 for device in bonded_devices: 859 if ('name' in device.keys() and device['name'] == target_name): 860 bonded = True 861 break 862 bonded = False 863 target_name = self.central.sl4a.bluetoothGetLocalName() 864 while time.time() < start_time and bonded == False: 865 bonded_devices = \ 866 self.peripheral.sl4a.bluetoothGetBondedDevices() 867 for device in bonded_devices: 868 if ('name' in device.keys() and device['name'] == target_name): 869 bonded = True 870 break 871 872 # Dual mode devices will establish connection over the classic transport, 873 # in order to establish bond over both transports, and do SDP. Starting 874 # disconnection before all this is finished is not safe, might lead to 875 # race conditions, i.e. bond over classic tranport shows up after LE 876 # bond is already removed. 877 time.sleep(4) 878 879 for ad in [self.central, self.peripheral]: 880 assertThat(clear_bonded_devices(ad)).isTrue() 881 882 # Necessary sleep time for entries to update unbonded state 883 time.sleep(2) 884 885 for ad in [self.central, self.peripheral]: 886 bonded_devices = ad.sl4a.bluetoothGetBondedDevices() 887 if len(bonded_devices) > 0: 888 logging.error("Failed to unbond devices: {}".format(bonded_devices)) 889 asserts.fail("Failed to unbond devices: {}".format(bonded_devices)) 890 return 891 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 892 893 @test_tracker_info(uuid='cc3fc361-7bf1-4ee2-9e46-4a27c88ce6a8') 894 def test_gatt_connect_get_connected_devices(self): 895 """Test GATT connections show up in getConnectedDevices 896 897 Test establishing a gatt connection between a GATT server and GATT 898 client. Verify that active connections show up using 899 BluetoothManager.getConnectedDevices API. 900 901 Steps: 902 1. Start a generic advertisement. 903 2. Start a generic scanner. 904 3. Find the advertisement and extract the mac address. 905 4. Stop the first scanner. 906 5. Create a GATT connection between the scanner and advertiser. 907 7. Verify the GATT Client has an open connection to the GATT Server. 908 8. Verify the GATT Server has an open connection to the GATT Client. 909 9. Disconnect the GATT connection. 910 911 Expected Result: 912 Verify that a connection was established, connected devices are found 913 on both the central and peripheral devices, and then disconnected 914 successfully. 915 916 Returns: 917 Pass if True 918 Fail if False 919 920 TAGS: LE, Advertising, Scanning, GATT 921 Priority: 2 922 """ 923 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 924 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 925 self.gatt_server_list.append(gatt_server) 926 try: 927 bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral)) 928 self.bluetooth_gatt_list.append(bluetooth_gatt) 929 except GattTestUtilsError as err: 930 logging.error(err) 931 asserts.fail("Failed to connect to GATT, error: {}".format(err)) 932 return 933 conn_cen_devices = self.central.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT) 934 conn_per_devices = self.peripheral.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT_SERVER) 935 target_name = self.peripheral.sl4a.bluetoothGetLocalName() 936 error_message = ("Connected device {} not found in list of connected " "devices {}") 937 if not any(d['name'] == target_name for d in conn_cen_devices): 938 logging.error(error_message.format(target_name, conn_cen_devices)) 939 asserts.fail(error_message.format(target_name, conn_cen_devices)) 940 return 941 # For the GATT server only check the size of the list since 942 # it may or may not include the device name. 943 target_name = self.central.sl4a.bluetoothGetLocalName() 944 if not conn_per_devices: 945 logging.error(error_message.format(target_name, conn_per_devices)) 946 asserts.fail(error_message.format(target_name, conn_per_devices)) 947 return 948 self.adv_instances.append(adv_callback) 949 assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue() 950 951 @test_tracker_info(uuid='a0a37ca6-9fa8-4d35-9fdb-0e25b4b8a363') 952 def test_gatt_connect_second_adv_after_canceling_first_adv(self): 953 """Test GATT connection to peripherals second advertising address. 954 955 Test the ability of cancelling GATT connections and trying to reconnect 956 to the same device via a different address. 957 958 Steps: 959 1. A starts advertising 960 2. B starts scanning and finds A's mac address 961 3. Stop advertisement from step 1. Start a new advertisement on A and 962 find the new new mac address, B knows of both old and new address. 963 4. B1 sends connect request to old address of A 964 5. B1 cancel connect attempt after 10 seconds 965 6. B1 sends connect request to new address of A 966 7. Verify B1 establish connection to A in less than 10 seconds 967 968 Expected Result: 969 Verify that a connection was established only on the second 970 advertisement's mac address. 971 972 Returns: 973 Pass if True 974 Fail if False 975 976 TAGS: LE, Advertising, Scanning, GATT 977 Priority: 3 978 """ 979 autoconnect = False 980 transport = GattTransport.TRANSPORT_AUTO 981 opportunistic = False 982 # Setup a basic Gatt server on the peripheral 983 gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback() 984 gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb) 985 986 # Set advertisement settings to include local name in advertisement 987 # and set the advertising mode to low_latency. 988 self.peripheral.sl4a.bleSetAdvertiseSettingsIsConnectable(True) 989 self.peripheral.sl4a.bleSetAdvertiseDataIncludeDeviceName(True) 990 self.peripheral.sl4a.bleSetAdvertiseSettingsAdvertiseMode(BleAdvertiseSettingsMode.LOW_LATENCY) 991 992 # Setup necessary advertisement objects. 993 advertise_data = self.peripheral.sl4a.bleBuildAdvertiseData() 994 advertise_settings = self.peripheral.sl4a.bleBuildAdvertiseSettings() 995 advertise_callback = self.peripheral.sl4a.bleGenBleAdvertiseCallback() 996 997 # Step 1: Start advertisement 998 self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 999 1000 # Setup scan settings for low_latency scanning and to include the local name 1001 # of the advertisement started in step 1. 1002 filter_list = self.central.sl4a.bleGenFilterList() 1003 self.central.sl4a.bleSetScanSettingsNumOfMatches(BleScanSettingsMatchNums.ONE) 1004 self.central.sl4a.bleSetScanFilterDeviceName(self.peripheral.sl4a.bluetoothGetLocalName()) 1005 self.central.sl4a.bleBuildScanFilter(filter_list) 1006 self.central.sl4a.bleSetScanSettingsScanMode(BleScanSettingsModes.LOW_LATENCY) 1007 1008 # Setup necessary scan objects. 1009 scan_settings = self.central.sl4a.bleBuildScanSetting() 1010 scan_callback = self.central.sl4a.bleGenScanCallback() 1011 1012 # Step 2: Start scanning on central Android device and find peripheral 1013 # address. 1014 self.central.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 1015 expected_event_name = scan_result.format(scan_callback) 1016 try: 1017 mac_address_pre_restart = self.central.ed.pop_event( 1018 expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address'] 1019 logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_pre_restart)) 1020 except Empty: 1021 logging.info("Peripheral advertisement not found") 1022 asserts.fail("Peripheral advertisement not found") 1023 return 1024 finally: 1025 self.peripheral.sl4a.bleStopBleAdvertising(advertise_callback) 1026 1027 # Step 3: Restart peripheral advertising such that a new mac address is 1028 # created. 1029 self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) 1030 1031 mac_address_post_restart = mac_address_pre_restart 1032 1033 while True: 1034 try: 1035 mac_address_post_restart = self.central.ed.pop_event( 1036 expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address'] 1037 logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_post_restart)) 1038 except Empty: 1039 logging.info("Peripheral advertisement not found") 1040 asserts.fail("Peripheral advertisement not found") 1041 return 1042 1043 if mac_address_pre_restart != mac_address_post_restart: 1044 break 1045 1046 1047if __name__ == '__main__': 1048 test_runner.main() 1049