• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17This test script exercises different GATT connection tests.
18
19Original location:
20  tools/test/connectivity/acts_tests/tests/google/ble/gatt/GattConnectTest.py
21"""
22
23import logging
24import time
25from queue import Empty
26
27from blueberry.tests.gd.cert.test_decorators import test_tracker_info
28from blueberry.tests.gd_sl4a.lib.bt_constants import scan_result
29from blueberry.tests.sl4a_sl4a.lib import sl4a_sl4a_base_test
30from blueberry.utils.ble_scan_adv_constants import BleAdvertiseSettingsMode
31from blueberry.utils.ble_scan_adv_constants import BleScanSettingsMatchNums
32from blueberry.utils.ble_scan_adv_constants import BleScanSettingsModes
33from blueberry.utils.bt_constants import BluetoothProfile
34from blueberry.utils.bt_gatt_constants import GattCallbackError
35from blueberry.utils.bt_gatt_constants import GattCallbackString
36from blueberry.utils.bt_gatt_constants import GattCharacteristic
37from blueberry.utils.bt_gatt_constants import GattConnectionState
38from blueberry.utils.bt_gatt_constants import GattMtuSize
39from blueberry.utils.bt_gatt_constants import GattPhyMask
40from blueberry.utils.bt_gatt_constants import GattServiceType
41from blueberry.utils.bt_gatt_constants import GattTransport
42from blueberry.utils.bt_gatt_utils import GattTestUtilsError
43from blueberry.utils.bt_gatt_utils import close_gatt_client
44from blueberry.utils.bt_gatt_utils import disconnect_gatt_connection
45from blueberry.utils.bt_gatt_utils import get_mac_address_of_generic_advertisement
46from blueberry.utils.bt_gatt_utils import log_gatt_server_uuids
47from blueberry.utils.bt_gatt_utils import orchestrate_gatt_connection
48from blueberry.utils.bt_gatt_utils import setup_gatt_connection
49from blueberry.utils.bt_gatt_utils import setup_multiple_services
50from blueberry.utils.bt_gatt_utils import wait_for_gatt_disconnect_event
51from blueberry.utils.bt_test_utils import clear_bonded_devices
52from blueberry.tests.gd.cert.truth import assertThat
53from mobly import asserts
54from mobly import test_runner
55
56PHYSICAL_DISCONNECT_TIMEOUT = 5
57
58
59class GattConnectTest(sl4a_sl4a_base_test.Sl4aSl4aBaseTestClass):
60    adv_instances = []
61    bluetooth_gatt_list = []
62    gatt_server_list = []
63    default_timeout = 10
64    default_discovery_timeout = 3
65
66    def setup_class(self):
67        super().setup_class()
68        self.central = self.dut
69        self.peripheral = self.cert
70
71    def setup_test(self):
72        super().setup_test()
73        bluetooth_gatt_list = []
74        self.gatt_server_list = []
75        self.adv_instances = []
76        # Ensure there is ample time for a physical disconnect in between
77        # testcases.
78        logging.info("Waiting for {} seconds for physical GATT disconnections".format(PHYSICAL_DISCONNECT_TIMEOUT))
79        time.sleep(PHYSICAL_DISCONNECT_TIMEOUT)
80
81    def teardown_test(self):
82        for bluetooth_gatt in self.bluetooth_gatt_list:
83            self.central.sl4a.gattClientClose(bluetooth_gatt)
84        for gatt_server in self.gatt_server_list:
85            self.peripheral.sl4a.gattServerClose(gatt_server)
86        for adv in self.adv_instances:
87            self.peripheral.sl4a.bleStopBleAdvertising(adv)
88        super().teardown_test()
89        return True
90
91    def _orchestrate_gatt_disconnection(self, bluetooth_gatt, gatt_callback):
92        logging.info("Disconnecting from peripheral device.")
93        try:
94            disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback)
95            logging.info("Disconnected GATT, closing GATT client.")
96            close_gatt_client(self.central, bluetooth_gatt)
97            logging.info("Closed GATT client, removing it from local tracker.")
98            if bluetooth_gatt in self.bluetooth_gatt_list:
99                self.bluetooth_gatt_list.remove(bluetooth_gatt)
100        except GattTestUtilsError as err:
101            logging.error(err)
102            return False
103        return True
104
105    def _find_service_added_event(self, gatt_server_cb, uuid):
106        expected_event = GattCallbackString.SERV_ADDED.format(gatt_server_cb)
107        try:
108            event = self.peripheral.ed.pop_event(expected_event, self.default_timeout)
109        except Empty:
110            logging.error(GattCallbackError.SERV_ADDED_ERR.format(expected_event))
111            return False
112        if event['data']['serviceUuid'].lower() != uuid.lower():
113            logging.error("Uuid mismatch. Found: {}, Expected {}.".format(event['data']['serviceUuid'], uuid))
114            return False
115        return True
116
117    def _verify_mtu_changed_on_client_and_server(self, expected_mtu, gatt_callback, gatt_server_callback):
118        expected_event = GattCallbackString.MTU_CHANGED.format(gatt_callback)
119        try:
120            mtu_event = self.central.ed.pop_event(expected_event, self.default_timeout)
121            mtu_size_found = mtu_event['data']['MTU']
122            if mtu_size_found != expected_mtu:
123                logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu))
124                return False
125        except Empty:
126            logging.error(GattCallbackError.MTU_CHANGED_ERR.format(expected_event))
127            return False
128
129        expected_event = GattCallbackString.MTU_SERV_CHANGED.format(gatt_server_callback)
130        try:
131            mtu_event = self.peripheral.ed.pop_event(expected_event, self.default_timeout)
132            mtu_size_found = mtu_event['data']['MTU']
133            if mtu_size_found != expected_mtu:
134                logging.error("MTU size found: {}, expected: {}".format(mtu_size_found, expected_mtu))
135                return False
136        except Empty:
137            logging.error(GattCallbackError.MTU_SERV_CHANGED_ERR.format(expected_event))
138            return False
139        return True
140
141    @test_tracker_info(uuid='8a3530a3-c8bb-466b-9710-99e694c38618')
142    def test_gatt_connect(self):
143        """Test GATT connection over LE.
144
145          Test establishing a gatt connection between a GATT server and GATT
146          client.
147
148          Steps:
149            1. Start a generic advertisement.
150            2. Start a generic scanner.
151            3. Find the advertisement and extract the mac address.
152            4. Stop the first scanner.
153            5. Create a GATT connection between the scanner and advertiser.
154            6. Disconnect the GATT connection.
155
156          Expected Result:
157            Verify that a connection was established and then disconnected
158            successfully.
159
160          Returns:
161            Pass if True
162            Fail if False
163
164          TAGS: LE, Advertising, Filtering, Scanning, GATT
165          Priority: 0
166          """
167        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
168        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
169        self.gatt_server_list.append(gatt_server)
170        try:
171            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
172            self.bluetooth_gatt_list.append(bluetooth_gatt)
173        except GattTestUtilsError as err:
174            logging.error(err)
175            asserts.fail("Failed to connect to GATT, error: {}".format(err))
176            return
177        self.adv_instances.append(adv_callback)
178        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
179
180    @test_tracker_info(uuid='a839b505-03ac-4783-be7e-1d43129a1948')
181    def test_gatt_connect_stop_advertising(self):
182        """Test GATT connection over LE then stop advertising
183
184          A test case that verifies the GATT connection doesn't
185          disconnect when LE advertisement is stopped.
186
187          Steps:
188            1. Start a generic advertisement.
189            2. Start a generic scanner.
190            3. Find the advertisement and extract the mac address.
191            4. Stop the first scanner.
192            5. Create a GATT connection between the scanner and advertiser.
193            6. Stop the advertiser.
194            7. Verify no connection state changed happened.
195            8. Disconnect the GATT connection.
196
197          Expected Result:
198            Verify that a connection was established and not disconnected
199            when advertisement stops.
200
201          Returns:
202            Pass if True
203            Fail if False
204
205          TAGS: LE, Advertising, Filtering, Scanning, GATT
206          Priority: 0
207          """
208        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
209        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
210        self.gatt_server_list.append(gatt_server)
211        try:
212            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
213            self.bluetooth_gatt_list.append(bluetooth_gatt)
214        except GattTestUtilsError as err:
215            logging.error(err)
216            asserts.fail("Failed to connect to GATT, error: {}".format(err))
217            return
218        self.peripheral.sl4a.bleStopBleAdvertising(adv_callback)
219        try:
220            event = self.central.ed.pop_event(
221                GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback, self.default_timeout))
222            logging.error("Connection event found when not expected: {}".format(event))
223            asserts.fail("Connection event found when not expected: {}".format(event))
224            return
225        except Empty:
226            logging.info("No connection state change as expected")
227        try:
228            self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)
229        except Exception as err:
230            logging.info("Failed to orchestrate disconnect: {}".format(err))
231            asserts.fail("Failed to orchestrate disconnect: {}".format(err))
232            return
233
234    @test_tracker_info(uuid='b82f91a8-54bb-4779-a117-73dc7fdb28cc')
235    def test_gatt_connect_autoconnect(self):
236        """Test GATT connection over LE.
237
238          Test re-establishing a gatt connection using autoconnect
239          set to True in order to test connection allowlist.
240
241          Steps:
242            1. Start a generic advertisement.
243            2. Start a generic scanner.
244            3. Find the advertisement and extract the mac address.
245            4. Stop the first scanner.
246            5. Create a GATT connection between the scanner and advertiser.
247            6. Disconnect the GATT connection.
248            7. Create a GATT connection with autoconnect set to True
249            8. Disconnect the GATT connection.
250
251          Expected Result:
252            Verify that a connection was re-established and then disconnected
253            successfully.
254
255          Returns:
256            Pass if True
257            Fail if False
258
259          TAGS: LE, Advertising, Filtering, Scanning, GATT
260          Priority: 0
261          """
262        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
263        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
264        self.peripheral.log.info("Opened GATT server on CERT, scanning it from DUT")
265        self.gatt_server_list.append(gatt_server)
266        autoconnect = False
267        mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement(
268            self.central, self.peripheral))
269        self.adv_instances.append(adv_callback)
270        self.central.log.info("Discovered BLE advertisement, connecting GATT with autoConnect={}".format(autoconnect))
271        try:
272            bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect)
273            self.central.log.info("GATT connected, stopping BLE scanning")
274            self.central.sl4a.bleStopBleScan(scan_callback)
275            self.central.log.info("Stopped BLE scanning")
276            self.bluetooth_gatt_list.append(bluetooth_gatt)
277        except GattTestUtilsError as err:
278            logging.error(err)
279            asserts.fail("Failed to connect to GATT, error: {}".format(err))
280            return
281        self.central.log.info("Disconnecting GATT")
282        try:
283            disconnect_gatt_connection(self.central, bluetooth_gatt, gatt_callback)
284            self.central.log.info("GATT disconnected, closing GATT client")
285            close_gatt_client(self.central, bluetooth_gatt)
286            self.central.log.info("GATT client closed, removing it from in-memory tracker")
287            if bluetooth_gatt in self.bluetooth_gatt_list:
288                self.bluetooth_gatt_list.remove(bluetooth_gatt)
289        except GattTestUtilsError as err:
290            logging.error(err)
291            asserts.fail("Failed to disconnect GATT, error: {}".format(err))
292            return
293        autoconnect = True
294        self.central.log.info("Connecting GATT with autoConnect={}".format(autoconnect))
295        bluetooth_gatt = self.central.sl4a.gattClientConnectGatt(
296            gatt_callback, mac_address, autoconnect, GattTransport.TRANSPORT_AUTO, False, GattPhyMask.PHY_LE_1M_MASK)
297        self.central.log.info("Waiting for GATt to become connected")
298        self.bluetooth_gatt_list.append(bluetooth_gatt)
299        expected_event = GattCallbackString.GATT_CONN_CHANGE.format(gatt_callback)
300        try:
301            event = self.central.ed.pop_event(expected_event, self.default_timeout)
302            self.central.log.info("Received event={}".format(event))
303        except Empty:
304            logging.error(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event))
305            asserts.fail(GattCallbackError.GATT_CONN_CHANGE_ERR.format(expected_event))
306            return
307        found_state = event['data']['State']
308        expected_state = GattConnectionState.STATE_CONNECTED
309        assertThat(found_state).isEqualTo(expected_state)
310        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
311
312    @test_tracker_info(uuid='e506fa50-7cd9-4bd8-938a-6b85dcfe6bc6')
313    def test_gatt_connect_opportunistic(self):
314        """Test opportunistic GATT connection over LE.
315
316          Test establishing a gatt connection between a GATT server and GATT
317          client in opportunistic mode.
318
319          Steps:
320            1. Start a generic advertisement.
321            2. Start a generic scanner.
322            3. Find the advertisement and extract the mac address.
323            4. Stop the first scanner.
324            5. Create GATT connection 1 between the scanner and advertiser normally
325            6. Create GATT connection 2 between the scanner and advertiser using
326               opportunistic mode
327            7. Disconnect GATT connection 1
328
329          Expected Result:
330            Verify GATT connection 2 automatically disconnects when GATT connection
331            1 disconnect
332
333          Returns:
334            Pass if True
335            Fail if False
336
337          TAGS: LE, Advertising, Filtering, Scanning, GATT
338          Priority: 0
339          """
340        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
341        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
342        self.gatt_server_list.append(gatt_server)
343        mac_address, adv_callback, scan_callback = (get_mac_address_of_generic_advertisement(
344            self.central, self.peripheral))
345        # Make GATT connection 1
346        try:
347            bluetooth_gatt_1, gatt_callback_1 = setup_gatt_connection(
348                self.central, mac_address, False, transport=GattTransport.TRANSPORT_AUTO, opportunistic=False)
349            self.central.sl4a.bleStopBleScan(scan_callback)
350            self.adv_instances.append(adv_callback)
351            self.bluetooth_gatt_list.append(bluetooth_gatt_1)
352        except GattTestUtilsError as err:
353            logging.error(err)
354            asserts.fail("Failed to connect to GATT 1, error: {}".format(err))
355            return
356        # Make GATT connection 2
357        try:
358            bluetooth_gatt_2, gatt_callback_2 = setup_gatt_connection(
359                self.central, mac_address, False, transport=GattTransport.TRANSPORT_AUTO, opportunistic=True)
360            self.bluetooth_gatt_list.append(bluetooth_gatt_2)
361        except GattTestUtilsError as err:
362            logging.error(err)
363            asserts.fail("Failed to connect to GATT 2, error: {}".format(err))
364            return
365        # Disconnect GATT connection 1
366        try:
367            disconnect_gatt_connection(self.central, bluetooth_gatt_1, gatt_callback_1)
368            close_gatt_client(self.central, bluetooth_gatt_1)
369            if bluetooth_gatt_1 in self.bluetooth_gatt_list:
370                self.bluetooth_gatt_list.remove(bluetooth_gatt_1)
371        except GattTestUtilsError as err:
372            logging.error(err)
373            asserts.fail("Failed to disconnect GATT 1, error: {}".format(err))
374            return
375        # Confirm that GATT connection 2 also disconnects
376        wait_for_gatt_disconnect_event(self.central, gatt_callback_2)
377        close_gatt_client(self.central, bluetooth_gatt_2)
378        if bluetooth_gatt_2 in self.bluetooth_gatt_list:
379            self.bluetooth_gatt_list.remove(bluetooth_gatt_2)
380
381    @test_tracker_info(uuid='1e01838e-c4de-4720-9adf-9e0419378226')
382    def test_gatt_request_min_mtu(self):
383        """Test GATT connection over LE and exercise MTU sizes.
384
385          Test establishing a gatt connection between a GATT server and GATT
386          client. Request an MTU size that matches the correct minimum size.
387
388          Steps:
389            1. Start a generic advertisement.
390            2. Start a generic scanner.
391            3. Find the advertisement and extract the mac address.
392            4. Stop the first scanner.
393            5. Create a GATT connection between the scanner and advertiser.
394            6. From the scanner (client) request MTU size change to the
395            minimum value.
396            7. Find the MTU changed event on the client.
397            8. Disconnect the GATT connection.
398
399          Expected Result:
400            Verify that a connection was established and the MTU value found
401            matches the expected MTU value.
402
403          Returns:
404            Pass if True
405            Fail if False
406
407          TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU
408          Priority: 0
409          """
410        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
411        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
412        self.gatt_server_list.append(gatt_server)
413        try:
414            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
415            self.bluetooth_gatt_list.append(bluetooth_gatt)
416        except GattTestUtilsError as err:
417            logging.error(err)
418            asserts.fail("Failed to connect to GATT, error: {}".format(err))
419            return
420        self.adv_instances.append(adv_callback)
421        expected_mtu = GattMtuSize.MIN
422        self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, expected_mtu)
423        assertThat(self._verify_mtu_changed_on_client_and_server(expected_mtu, gatt_callback, gatt_server_cb)).isTrue()
424        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
425
426    @test_tracker_info(uuid='c1fa3a2d-fb47-47db-bdd1-458928cd6a5f')
427    def test_gatt_request_max_mtu(self):
428        """Test GATT connection over LE and exercise MTU sizes.
429
430          Test establishing a gatt connection between a GATT server and GATT
431          client. Request an MTU size that matches the correct maximum size.
432
433          Steps:
434            1. Start a generic advertisement.
435            2. Start a generic scanner.
436            3. Find the advertisement and extract the mac address.
437            4. Stop the first scanner.
438            5. Create a GATT connection between the scanner and advertiser.
439            6. From the scanner (client) request MTU size change to the
440            maximum value.
441            7. Find the MTU changed event on the client.
442            8. Disconnect the GATT connection.
443
444          Expected Result:
445            Verify that a connection was established and the MTU value found
446            matches the expected MTU value.
447
448          Returns:
449            Pass if True
450            Fail if False
451
452          TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU
453          Priority: 0
454          """
455        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
456        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
457        self.gatt_server_list.append(gatt_server)
458        try:
459            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
460            self.bluetooth_gatt_list.append(bluetooth_gatt)
461        except GattTestUtilsError as err:
462            logging.error(err)
463            asserts.fail("Failed to connect to GATT, error: {}".format(err))
464            return
465        self.adv_instances.append(adv_callback)
466        expected_mtu = GattMtuSize.MAX
467        self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, expected_mtu)
468        assertThat(self._verify_mtu_changed_on_client_and_server(expected_mtu, gatt_callback, gatt_server_cb)).isTrue()
469        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
470
471    @test_tracker_info(uuid='4416d483-dec3-46cb-8038-4d82620f873a')
472    def test_gatt_request_out_of_bounds_mtu(self):
473        """Test GATT connection over LE and exercise an out of bound MTU size.
474
475          Test establishing a gatt connection between a GATT server and GATT
476          client. Request an MTU size that is the MIN value minus 1.
477
478          Steps:
479            1. Start a generic advertisement.
480            2. Start a generic scanner.
481            3. Find the advertisement and extract the mac address.
482            4. Stop the first scanner.
483            5. Create a GATT connection between the scanner and advertiser.
484            6. From the scanner (client) request MTU size change to the
485            minimum value minus one.
486            7. Find the MTU changed event on the client.
487            8. Disconnect the GATT connection.
488
489          Expected Result:
490            Verify that an MTU changed event was not discovered and that
491            it didn't cause an exception when requesting an out of bounds
492            MTU.
493
494          Returns:
495            Pass if True
496            Fail if False
497
498          TAGS: LE, Advertising, Filtering, Scanning, GATT, MTU
499          Priority: 0
500          """
501        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
502        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
503        self.gatt_server_list.append(gatt_server)
504        try:
505            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
506            self.bluetooth_gatt_list.append(bluetooth_gatt)
507        except GattTestUtilsError as err:
508            logging.error(err)
509            asserts.fail("Failed to connect to GATT, error: {}".format(err))
510            return
511        self.adv_instances.append(adv_callback)
512        unexpected_mtu = GattMtuSize.MIN - 1
513        self.central.sl4a.gattClientRequestMtu(bluetooth_gatt, unexpected_mtu)
514        assertThat(self._verify_mtu_changed_on_client_and_server(unexpected_mtu, gatt_callback,
515                                                                 gatt_server_cb)).isFalse()
516        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
517
518    @test_tracker_info(uuid='31ffb9ca-cc75-43fb-9802-c19f1c5856b6')
519    def test_gatt_connect_trigger_on_read_rssi(self):
520        """Test GATT connection over LE read RSSI.
521
522        Test establishing a gatt connection between a GATT server and GATT
523        client then read the RSSI.
524
525        Steps:
526          1. Start a generic advertisement.
527          2. Start a generic scanner.
528          3. Find the advertisement and extract the mac address.
529          4. Stop the first scanner.
530          5. Create a GATT connection between the scanner and advertiser.
531          6. From the scanner, request to read the RSSI of the advertiser.
532          7. Disconnect the GATT connection.
533
534        Expected Result:
535          Verify that a connection was established and then disconnected
536          successfully. Verify that the RSSI was ready correctly.
537
538        Returns:
539          Pass if True
540          Fail if False
541
542        TAGS: LE, Advertising, Filtering, Scanning, GATT, RSSI
543        Priority: 1
544        """
545        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
546        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
547        self.gatt_server_list.append(gatt_server)
548        try:
549            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
550            self.bluetooth_gatt_list.append(bluetooth_gatt)
551        except GattTestUtilsError as err:
552            logging.error(err)
553            asserts.fail("Failed to connect to GATT, error: {}".format(err))
554            return
555        self.adv_instances.append(adv_callback)
556        expected_event = GattCallbackString.RD_REMOTE_RSSI.format(gatt_callback)
557        if self.central.sl4a.gattClientReadRSSI(bluetooth_gatt):
558            try:
559                self.central.ed.pop_event(expected_event, self.default_timeout)
560            except Empty:
561                logging.error(GattCallbackError.RD_REMOTE_RSSI_ERR.format(expected_event))
562        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
563
564    @test_tracker_info(uuid='dee9ef28-b872-428a-821b-cc62f27ba936')
565    def test_gatt_connect_trigger_on_services_discovered(self):
566        """Test GATT connection and discover services of peripheral.
567
568          Test establishing a gatt connection between a GATT server and GATT
569          client the discover all services from the connected device.
570
571          Steps:
572            1. Start a generic advertisement.
573            2. Start a generic scanner.
574            3. Find the advertisement and extract the mac address.
575            4. Stop the first scanner.
576            5. Create a GATT connection between the scanner and advertiser.
577            6. From the scanner (central device), discover services.
578            7. Disconnect the GATT connection.
579
580          Expected Result:
581            Verify that a connection was established and then disconnected
582            successfully. Verify that the service were discovered.
583
584          Returns:
585            Pass if True
586            Fail if False
587
588          TAGS: LE, Advertising, Filtering, Scanning, GATT, Services
589          Priority: 1
590          """
591        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
592        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
593        self.gatt_server_list.append(gatt_server)
594        try:
595            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
596            self.bluetooth_gatt_list.append(bluetooth_gatt)
597        except GattTestUtilsError as err:
598            logging.error(err)
599            asserts.fail("Failed to connect to GATT, error: {}".format(err))
600            return
601        self.adv_instances.append(adv_callback)
602        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
603            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
604            try:
605                event = self.central.ed.pop_event(expected_event, self.default_timeout)
606            except Empty:
607                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
608                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
609                return
610        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
611
612    @test_tracker_info(uuid='01883bdd-0cf8-48fb-bf15-467bbd4f065b')
613    def test_gatt_connect_trigger_on_services_discovered_iterate_attributes(self):
614        """Test GATT connection and iterate peripherals attributes.
615
616        Test establishing a gatt connection between a GATT server and GATT
617        client and iterate over all the characteristics and descriptors of the
618        discovered services.
619
620        Steps:
621          1. Start a generic advertisement.
622          2. Start a generic scanner.
623          3. Find the advertisement and extract the mac address.
624          4. Stop the first scanner.
625          5. Create a GATT connection between the scanner and advertiser.
626          6. From the scanner (central device), discover services.
627          7. Iterate over all the characteristics and descriptors of the
628          discovered features.
629          8. Disconnect the GATT connection.
630
631        Expected Result:
632          Verify that a connection was established and then disconnected
633          successfully. Verify that the services, characteristics, and descriptors
634          were discovered.
635
636        Returns:
637          Pass if True
638          Fail if False
639
640        TAGS: LE, Advertising, Filtering, Scanning, GATT, Services
641        Characteristics, Descriptors
642        Priority: 1
643        """
644        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
645        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
646        self.gatt_server_list.append(gatt_server)
647        try:
648            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
649            self.bluetooth_gatt_list.append(bluetooth_gatt)
650        except GattTestUtilsError as err:
651            logging.error(err)
652            asserts.fail("Failed to connect to GATT, error: {}".format(err))
653            return
654        self.adv_instances.append(adv_callback)
655        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
656            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
657            try:
658                event = self.central.ed.pop_event(expected_event, self.default_timeout)
659                discovered_services_index = event['data']['ServicesIndex']
660            except Empty:
661                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
662                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
663                return
664            log_gatt_server_uuids(self.central, discovered_services_index)
665        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
666
667    @test_tracker_info(uuid='d4277bee-da99-4f48-8a4d-f81b5389da18')
668    def test_gatt_connect_with_service_uuid_variations(self):
669        """Test GATT connection with multiple service uuids.
670
671        Test establishing a gatt connection between a GATT server and GATT
672        client with multiple service uuid variations.
673
674        Steps:
675          1. Start a generic advertisement.
676          2. Start a generic scanner.
677          3. Find the advertisement and extract the mac address.
678          4. Stop the first scanner.
679          5. Create a GATT connection between the scanner and advertiser.
680          6. From the scanner (central device), discover services.
681          7. Verify that all the service uuid variations are found.
682          8. Disconnect the GATT connection.
683
684        Expected Result:
685          Verify that a connection was established and then disconnected
686          successfully. Verify that the service uuid variations are found.
687
688        Returns:
689          Pass if True
690          Fail if False
691
692        TAGS: LE, Advertising, Filtering, Scanning, GATT, Services
693        Priority: 2
694        """
695        try:
696            gatt_server_cb, gatt_server = setup_multiple_services(self.peripheral)
697            self.gatt_server_list.append(gatt_server)
698        except GattTestUtilsError as err:
699            logging.error(err)
700            asserts.fail("Failed to setup GATT service, error: {}".format(err))
701            return
702        try:
703            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
704            self.bluetooth_gatt_list.append(bluetooth_gatt)
705        except GattTestUtilsError as err:
706            logging.error(err)
707            asserts.fail("Failed to connect to GATT, error: {}".format(err))
708            return
709        self.adv_instances.append(adv_callback)
710        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
711            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
712            try:
713                event = self.central.ed.pop_event(expected_event, self.default_timeout)
714            except Empty:
715                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
716                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
717                return
718            discovered_services_index = event['data']['ServicesIndex']
719            log_gatt_server_uuids(self.central, discovered_services_index)
720
721        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
722
723    @test_tracker_info(uuid='7d3442c5-f71f-44ae-bd35-f2569f01b3b8')
724    def test_gatt_connect_in_quick_succession(self):
725        """Test GATT connections multiple times.
726
727        Test establishing a gatt connection between a GATT server and GATT
728        client with multiple iterations.
729
730        Steps:
731          1. Start a generic advertisement.
732          2. Start a generic scanner.
733          3. Find the advertisement and extract the mac address.
734          4. Stop the first scanner.
735          5. Create a GATT connection between the scanner and advertiser.
736          6. Disconnect the GATT connection.
737          7. Repeat steps 5 and 6 twenty times.
738
739        Expected Result:
740          Verify that a connection was established and then disconnected
741          successfully twenty times.
742
743        Returns:
744          Pass if True
745          Fail if False
746
747        TAGS: LE, Advertising, Filtering, Scanning, GATT, Stress
748        Priority: 1
749        """
750        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
751        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
752        self.gatt_server_list.append(gatt_server)
753        mac_address, adv_callback, scan_callback = get_mac_address_of_generic_advertisement(
754            self.central, self.peripheral)
755        autoconnect = False
756        for i in range(100):
757            logging.info("Starting connection iteration {}".format(i + 1))
758            try:
759                bluetooth_gatt, gatt_callback = setup_gatt_connection(self.central, mac_address, autoconnect)
760                self.central.sl4a.bleStopBleScan(scan_callback)
761            except GattTestUtilsError as err:
762                logging.error(err)
763                asserts.fail("Failed to connect to GATT at iteration {}, error: {}".format(i + 1, err))
764                return
765            test_result = self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)
766            if not test_result:
767                logging.info("Failed to disconnect from peripheral device.")
768                asserts.fail("Failed to disconnect from peripheral device.")
769                return
770        self.adv_instances.append(adv_callback)
771
772    @test_tracker_info(uuid='148469d9-7ab0-4c08-b2e9-7e49e88da1fc')
773    def test_gatt_connect_on_path_attack(self):
774        """Test GATT connection with permission write encrypted with on-path attacker prevention
775
776        Test establishing a gatt connection between a GATT server and GATT
777        client while the GATT server's characteristic includes the property
778        write value and the permission write encrypted on-path attacker prevention
779        value. This will prompt LE pairing and then the devices will create a bond.
780
781        Steps:
782          1. Create a GATT server and server callback on the peripheral device.
783          2. Create a unique service and characteristic uuid on the peripheral.
784          3. Create a characteristic on the peripheral with these properties:
785              GattCharacteristic.PROPERTY_WRITE,
786              GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM
787          4. Create a GATT service on the peripheral.
788          5. Add the characteristic to the GATT service.
789          6. Create a GATT connection between your central and peripheral device.
790          7. From the central device, discover the peripheral's services.
791          8. Iterate the services found until you find the unique characteristic
792              created in step 3.
793          9. Once found, write a random but valid value to the characteristic.
794          10. Start pairing helpers on both devices immediately after attempting
795              to write to the characteristic.
796          11. Within 10 seconds of writing the characteristic, there should be
797              a prompt to bond the device from the peripheral. The helpers will
798              handle the UI interaction automatically. (see
799              BluetoothConnectionFacade.java bluetoothStartPairingHelper).
800          12. Verify that the two devices are bonded.
801
802        Expected Result:
803          Verify that a connection was established and the devices are bonded.
804
805        Returns:
806          Pass if True
807          Fail if False
808
809        TAGS: LE, Advertising, Filtering, Scanning, GATT, Characteristic, OnPathAttacker
810        Priority: 1
811        """
812        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
813        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
814        self.gatt_server_list.append(gatt_server)
815        service_uuid = "3846D7A0-69C8-11E4-BA00-0002A5D5C51B"
816        test_uuid = "aa7edd5a-4d1d-4f0e-883a-d145616a1630"
817        bonded = False
818        characteristic = self.peripheral.sl4a.gattServerCreateBluetoothGattCharacteristic(
819            test_uuid, GattCharacteristic.PROPERTY_WRITE, GattCharacteristic.PERMISSION_WRITE_ENCRYPTED_MITM)
820        gatt_service = self.peripheral.sl4a.gattServerCreateService(service_uuid, GattServiceType.SERVICE_TYPE_PRIMARY)
821        self.peripheral.sl4a.gattServerAddCharacteristicToService(gatt_service, characteristic)
822        self.peripheral.sl4a.gattServerAddService(gatt_server, gatt_service)
823        assertThat(self._find_service_added_event(gatt_server_cb, service_uuid)).isTrue()
824        bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
825        self.bluetooth_gatt_list.append(bluetooth_gatt)
826        self.adv_instances.append(adv_callback)
827        if self.central.sl4a.gattClientDiscoverServices(bluetooth_gatt):
828            expected_event = GattCallbackString.GATT_SERV_DISC.format(gatt_callback)
829            try:
830                event = self.central.ed.pop_event(expected_event, self.default_timeout)
831            except Empty:
832                logging.error(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
833                asserts.fail(GattCallbackError.GATT_SERV_DISC_ERR.format(expected_event))
834                return
835            discovered_services_index = event['data']['ServicesIndex']
836        else:
837            logging.info("Failed to discover services.")
838            asserts.fail("Failed to discover services.")
839            return
840        test_value = [1, 2, 3, 4, 5, 6, 7]
841        services_count = self.central.sl4a.gattClientGetDiscoveredServicesCount(discovered_services_index)
842        for i in range(services_count):
843            characteristic_uuids = (self.central.sl4a.gattClientGetDiscoveredCharacteristicUuids(
844                discovered_services_index, i))
845            for characteristic_uuid in characteristic_uuids:
846                if characteristic_uuid == test_uuid:
847                    self.central.sl4a.bluetoothStartPairingHelper()
848                    self.peripheral.sl4a.bluetoothStartPairingHelper()
849                    self.central.sl4a.gattClientCharacteristicSetValue(bluetooth_gatt, discovered_services_index, i,
850                                                                       characteristic_uuid, test_value)
851                    self.central.sl4a.gattClientWriteCharacteristic(bluetooth_gatt, discovered_services_index, i,
852                                                                    characteristic_uuid)
853                    start_time = time.time() + self.default_timeout
854                    target_name = self.peripheral.sl4a.bluetoothGetLocalName()
855                    while time.time() < start_time and bonded == False:
856                        bonded_devices = \
857                            self.central.sl4a.bluetoothGetBondedDevices()
858                        for device in bonded_devices:
859                            if ('name' in device.keys() and device['name'] == target_name):
860                                bonded = True
861                                break
862                    bonded = False
863                    target_name = self.central.sl4a.bluetoothGetLocalName()
864                    while time.time() < start_time and bonded == False:
865                        bonded_devices = \
866                            self.peripheral.sl4a.bluetoothGetBondedDevices()
867                        for device in bonded_devices:
868                            if ('name' in device.keys() and device['name'] == target_name):
869                                bonded = True
870                                break
871
872        # Dual mode devices will establish connection over the classic transport,
873        # in order to establish bond over both transports, and do SDP. Starting
874        # disconnection before all this is finished is not safe, might lead to
875        # race conditions, i.e. bond over classic tranport shows up after LE
876        # bond is already removed.
877        time.sleep(4)
878
879        for ad in [self.central, self.peripheral]:
880            assertThat(clear_bonded_devices(ad)).isTrue()
881
882        # Necessary sleep time for entries to update unbonded state
883        time.sleep(2)
884
885        for ad in [self.central, self.peripheral]:
886            bonded_devices = ad.sl4a.bluetoothGetBondedDevices()
887            if len(bonded_devices) > 0:
888                logging.error("Failed to unbond devices: {}".format(bonded_devices))
889                asserts.fail("Failed to unbond devices: {}".format(bonded_devices))
890                return
891        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
892
893    @test_tracker_info(uuid='cc3fc361-7bf1-4ee2-9e46-4a27c88ce6a8')
894    def test_gatt_connect_get_connected_devices(self):
895        """Test GATT connections show up in getConnectedDevices
896
897        Test establishing a gatt connection between a GATT server and GATT
898        client. Verify that active connections show up using
899        BluetoothManager.getConnectedDevices API.
900
901        Steps:
902          1. Start a generic advertisement.
903          2. Start a generic scanner.
904          3. Find the advertisement and extract the mac address.
905          4. Stop the first scanner.
906          5. Create a GATT connection between the scanner and advertiser.
907          7. Verify the GATT Client has an open connection to the GATT Server.
908          8. Verify the GATT Server has an open connection to the GATT Client.
909          9. Disconnect the GATT connection.
910
911        Expected Result:
912          Verify that a connection was established, connected devices are found
913          on both the central and peripheral devices, and then disconnected
914          successfully.
915
916        Returns:
917          Pass if True
918          Fail if False
919
920        TAGS: LE, Advertising, Scanning, GATT
921        Priority: 2
922        """
923        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
924        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
925        self.gatt_server_list.append(gatt_server)
926        try:
927            bluetooth_gatt, gatt_callback, adv_callback = (orchestrate_gatt_connection(self.central, self.peripheral))
928            self.bluetooth_gatt_list.append(bluetooth_gatt)
929        except GattTestUtilsError as err:
930            logging.error(err)
931            asserts.fail("Failed to connect to GATT, error: {}".format(err))
932            return
933        conn_cen_devices = self.central.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT)
934        conn_per_devices = self.peripheral.sl4a.bluetoothGetConnectedLeDevices(BluetoothProfile.GATT_SERVER)
935        target_name = self.peripheral.sl4a.bluetoothGetLocalName()
936        error_message = ("Connected device {} not found in list of connected " "devices {}")
937        if not any(d['name'] == target_name for d in conn_cen_devices):
938            logging.error(error_message.format(target_name, conn_cen_devices))
939            asserts.fail(error_message.format(target_name, conn_cen_devices))
940            return
941        # For the GATT server only check the size of the list since
942        # it may or may not include the device name.
943        target_name = self.central.sl4a.bluetoothGetLocalName()
944        if not conn_per_devices:
945            logging.error(error_message.format(target_name, conn_per_devices))
946            asserts.fail(error_message.format(target_name, conn_per_devices))
947            return
948        self.adv_instances.append(adv_callback)
949        assertThat(self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback)).isTrue()
950
951    @test_tracker_info(uuid='a0a37ca6-9fa8-4d35-9fdb-0e25b4b8a363')
952    def test_gatt_connect_second_adv_after_canceling_first_adv(self):
953        """Test GATT connection to peripherals second advertising address.
954
955        Test the ability of cancelling GATT connections and trying to reconnect
956        to the same device via a different address.
957
958        Steps:
959          1. A starts advertising
960          2. B starts scanning and finds A's mac address
961          3. Stop advertisement from step 1. Start a new advertisement on A and
962            find the new new mac address, B knows of both old and new address.
963          4. B1 sends connect request to old address of A
964          5. B1 cancel connect attempt after 10 seconds
965          6. B1 sends connect request to new address of A
966          7. Verify B1 establish connection to A in less than 10 seconds
967
968        Expected Result:
969          Verify that a connection was established only on the second
970          advertisement's mac address.
971
972        Returns:
973          Pass if True
974          Fail if False
975
976        TAGS: LE, Advertising, Scanning, GATT
977        Priority: 3
978        """
979        autoconnect = False
980        transport = GattTransport.TRANSPORT_AUTO
981        opportunistic = False
982        # Setup a basic Gatt server on the peripheral
983        gatt_server_cb = self.peripheral.sl4a.gattServerCreateGattServerCallback()
984        gatt_server = self.peripheral.sl4a.gattServerOpenGattServer(gatt_server_cb)
985
986        # Set advertisement settings to include local name in advertisement
987        # and set the advertising mode to low_latency.
988        self.peripheral.sl4a.bleSetAdvertiseSettingsIsConnectable(True)
989        self.peripheral.sl4a.bleSetAdvertiseDataIncludeDeviceName(True)
990        self.peripheral.sl4a.bleSetAdvertiseSettingsAdvertiseMode(BleAdvertiseSettingsMode.LOW_LATENCY)
991
992        # Setup necessary advertisement objects.
993        advertise_data = self.peripheral.sl4a.bleBuildAdvertiseData()
994        advertise_settings = self.peripheral.sl4a.bleBuildAdvertiseSettings()
995        advertise_callback = self.peripheral.sl4a.bleGenBleAdvertiseCallback()
996
997        # Step 1: Start advertisement
998        self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
999
1000        # Setup scan settings for low_latency scanning and to include the local name
1001        # of the advertisement started in step 1.
1002        filter_list = self.central.sl4a.bleGenFilterList()
1003        self.central.sl4a.bleSetScanSettingsNumOfMatches(BleScanSettingsMatchNums.ONE)
1004        self.central.sl4a.bleSetScanFilterDeviceName(self.peripheral.sl4a.bluetoothGetLocalName())
1005        self.central.sl4a.bleBuildScanFilter(filter_list)
1006        self.central.sl4a.bleSetScanSettingsScanMode(BleScanSettingsModes.LOW_LATENCY)
1007
1008        # Setup necessary scan objects.
1009        scan_settings = self.central.sl4a.bleBuildScanSetting()
1010        scan_callback = self.central.sl4a.bleGenScanCallback()
1011
1012        # Step 2: Start scanning on central Android device and find peripheral
1013        # address.
1014        self.central.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
1015        expected_event_name = scan_result.format(scan_callback)
1016        try:
1017            mac_address_pre_restart = self.central.ed.pop_event(
1018                expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address']
1019            logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_pre_restart))
1020        except Empty:
1021            logging.info("Peripheral advertisement not found")
1022            asserts.fail("Peripheral advertisement not found")
1023            return
1024        finally:
1025            self.peripheral.sl4a.bleStopBleAdvertising(advertise_callback)
1026
1027        # Step 3: Restart peripheral advertising such that a new mac address is
1028        # created.
1029        self.peripheral.sl4a.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings)
1030
1031        mac_address_post_restart = mac_address_pre_restart
1032
1033        while True:
1034            try:
1035                mac_address_post_restart = self.central.ed.pop_event(
1036                    expected_event_name, self.default_timeout)['data']['Result']['deviceInfo']['address']
1037                logging.info("Peripheral advertisement found with mac address: {}".format(mac_address_post_restart))
1038            except Empty:
1039                logging.info("Peripheral advertisement not found")
1040                asserts.fail("Peripheral advertisement not found")
1041                return
1042
1043            if mac_address_pre_restart != mac_address_post_restart:
1044                break
1045
1046
1047if __name__ == '__main__':
1048    test_runner.main()
1049