• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Setup:
18This test only requires two fuchsia devices.
19"""
20
21from acts import signals
22from acts.base_test import BaseTestClass
23from acts.test_decorators import test_tracker_info
24from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size
25from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name
26from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name
27from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices
28from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name
29import time
30
31
32class BtFuchsiaEPTest(BaseTestClass):
33    ble_advertise_interval = 50
34    scan_timeout_seconds = 60
35    default_iterations = 1000
36    adv_name = generate_id_by_size(10)
37    test_adv_data = {
38        "name": adv_name,
39        "appearance": None,
40        "service_data": None,
41        "tx_power_level": None,
42        "service_uuids": None,
43        "manufacturer_data": None,
44        "uris": None,
45    }
46    test_connectable = True
47    test_scan_response = None
48
49    def setup_class(self):
50        super().setup_class()
51        for fd in self.fuchsia_devices:
52            fd.bts_lib.initBluetoothSys()
53        self.pri_dut = self.fuchsia_devices[0]
54        self.sec_dut = self.fuchsia_devices[1]
55
56    def on_fail(self, test_name, begin_time):
57        for fd in self.fuchsia_devices:
58            fd.take_bug_report(test_name, begin_time)
59        self._unbond_all_known_devices()
60        self.sec_dut.ble_lib.bleStopBleAdvertising()
61        self._kill_media_services()
62
63    def teardown_class(self):
64        self._kill_media_services()
65
66    def _kill_media_services(self):
67        """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices.
68        """
69        ssh_timeout = 30
70        for fd in self.fuchsia_devices:
71            fd.send_command_ssh("killall bt-a2dp*",
72                                timeout=ssh_timeout,
73                                skip_status_code_check=True)
74            fd.send_command_ssh("killall bt-avrcp*",
75                                timeout=ssh_timeout,
76                                skip_status_code_check=True)
77
78    def _unbond_all_known_devices(self):
79        """For all Fuchsia devices, unbond any known pairings.
80        """
81        time.sleep(5)
82        for fd in self.fuchsia_devices:
83            unbond_all_known_devices(fd, self.log)
84
85    def test_ble_awareness(self):
86        """Verify that Fuchsia devices can advertise and scan each other
87
88        Verify a Fuchsia device that starts a BLE advertisesement can be
89        found by a Fuchsia BLE scanner.
90
91        Steps:
92        1. On one Fuchsia device set an advertisement
93        2. On one Fuchsia device, scan for the advertisement by name
94
95        Expected Result:
96        Verify that there are no errors after each GATT connection.
97
98        Returns:
99          signals.TestPass if no errors
100          signals.TestFailure if there are any errors during the test.
101
102        TAGS: BLE
103        Priority: 0
104        """
105
106        self.sec_dut.ble_lib.bleStartBleAdvertising(
107            self.test_adv_data, self.test_scan_response,
108            self.ble_advertise_interval, self.test_connectable)
109
110        device = le_scan_for_device_by_name(self.pri_dut, self.log,
111                                            self.adv_name,
112                                            self.scan_timeout_seconds)
113        self.sec_dut.ble_lib.bleStopBleAdvertising()
114        if device is None:
115            raise signals.TestFailure("Scanner unable to find advertisement.")
116        raise signals.TestPass("Success")
117
118    def test_gatt_central_peripheral(self):
119        """Verify that Fuchsia devices can perform GATT operations
120
121        Verify a Fuchsia devices can perform GATT connections and interactions.
122
123        Steps:
124        1. On one Fuchsia device set an advertisement
125        2. On one Fuchsia device, scan for the advertisement by name
126        3. Perform GATT connection over LE
127        4. Pair both devices.
128        5. Perform GATT read/write operations.
129        6. Perform GATT disconnection.
130
131        Expected Result:
132        Verify that there are no errors after each GATT connection.
133
134        Returns:
135          signals.TestPass if no errors
136          signals.TestFailure if there are any errors during the test.
137
138        TAGS: BLE
139        Priority: 0
140        """
141        self._unbond_all_known_devices()
142
143        source_device_name = generate_id_by_size(10)
144        self.pri_dut.bts_lib.setName(source_device_name)
145
146        self.sec_dut.ble_lib.bleStartBleAdvertising(
147            self.test_adv_data, self.test_scan_response,
148            self.ble_advertise_interval, self.test_connectable)
149
150        device = le_scan_for_device_by_name(self.pri_dut, self.log,
151                                            self.adv_name,
152                                            self.scan_timeout_seconds)
153        if device is None:
154            raise signals.TestFailure("Scanner unable to find advertisement.")
155
156        connect_result = self.pri_dut.gattc_lib.bleConnectToPeripheral(
157            device["id"])
158        if connect_result.get("error") is not None:
159            raise signals.TestFailure("GATT Connection failed with: {}".format(
160                connect_result.get("error")))
161
162        if not verify_device_state_by_name(self.pri_dut, self.log,
163                                           self.adv_name, "CONNECTED", None):
164            raise signals.TestFailure(
165                "Failed to connect to device {}.".format(target_device_name))
166
167        if not verify_device_state_by_name(
168                self.sec_dut, self.log, source_device_name, "CONNECTED", None):
169            raise signals.TestFailure(
170                "Failed to connect to device {}.".format(source_device_name))
171
172        security_level = "ENCRYPTED"
173        non_bondable = False
174        transport = 2  #LE
175        self.pri_dut.bts_lib.pair(device["id"], security_level, non_bondable,
176                                  transport)
177
178        services = None
179        if not verify_device_state_by_name(self.pri_dut, self.log,
180                                           self.adv_name, "BONDED", services):
181            raise signals.TestFailure(
182                "Failed to pair device {}.".format(target_device_name))
183
184        if not verify_device_state_by_name(self.sec_dut, self.log,
185                                           source_device_name, "BONDED",
186                                           services):
187            raise signals.TestFailure(
188                "Failed to pair device {}.".format(source_device_name))
189
190        disconnect_result = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
191            device["id"])
192        if disconnect_result.get("error") is not None:
193            raise signals.TestFailure(
194                "GATT Disconnection failed with: {}".format(
195                    connect_result.get("error")))
196
197        self.sec_dut.ble_lib.bleStopBleAdvertising()
198
199        # TODO: Setup Proper GATT server and verify services published are found
200
201        raise signals.TestPass("Success")
202
203    def test_pairing_a2dp(self):
204        """Verify that Fuchsia devices can pair to each other and establish
205            an A2DP connection
206
207            Verify that Fuchsia devices can pair to each other and establish
208            an A2DP connection
209
210            Steps:
211            1. Clear out all bonded devices
212            2. Stop any A2DP services running on the device
213                Needed to take ownership of the services
214            3. Init sink and source opposite devices
215            4. Start pairing delegate for all Fuchsia devices
216            5. Set sink device to be discoverable
217            6. Discover sink device from source device
218            7. Connect to sink device from source device
219            8. Pair to sink device
220            9. Validate paired devices and services present
221
222            Expected Result:
223            Verify devices are successfully paired and appropriate a2dp
224            services are running.
225
226            Returns:
227            signals.TestPass if no errors
228            signals.TestFailure if there are any errors during the test.
229
230            TAGS: BREDR, A2DP
231            Priority: 0
232        """
233        self._unbond_all_known_devices()
234        self._kill_media_services()
235
236        source_device_name = generate_id_by_size(10)
237        target_device_name = generate_id_by_size(10)
238
239        self.pri_dut.bts_lib.setName(source_device_name)
240        self.sec_dut.bts_lib.setName(target_device_name)
241
242        input_capabilities = "NONE"
243        output_capabilities = "NONE"
244
245        # Initialize a2dp on both devices.
246        self.pri_dut.avdtp_lib.init()
247        self.sec_dut.avdtp_lib.init()
248
249        self.pri_dut.bts_lib.acceptPairing(input_capabilities,
250                                           output_capabilities)
251
252        self.sec_dut.bts_lib.acceptPairing(input_capabilities,
253                                           output_capabilities)
254        self.sec_dut.bts_lib.setDiscoverable(True)
255
256        unique_mac_addr_id = bredr_scan_for_device_by_name(
257            self.pri_dut, self.log, target_device_name,
258            self.scan_timeout_seconds)
259
260        if not unique_mac_addr_id:
261            raise signals.TestFailure(
262                "Failed to find device {}.".format(target_device_name))
263
264        connect_result = self.pri_dut.bts_lib.connectDevice(unique_mac_addr_id)
265        if connect_result.get("error") is not None:
266            raise signals.TestFailure("Failed to connect with {}.".format(
267                connect_result.get("error")))
268
269        # We pair before checking the CONNECTED status because BR/EDR semantics
270        # were recently changed such that if pairing is not confirmed, then bt
271        # does not report connected = True.
272        security_level = "NONE"
273        bondable = True
274        transport = 1  #BREDR
275        pair_result = self.pri_dut.bts_lib.pair(unique_mac_addr_id,
276                                                security_level, bondable,
277                                                transport)
278        if pair_result.get("error") is not None:
279            raise signals.TestFailure("Failed to pair with {}.".format(
280                pair_result.get("error")))
281
282        if not verify_device_state_by_name(
283                self.pri_dut, self.log, target_device_name, "CONNECTED", None):
284            raise signals.TestFailure(
285                "Failed to connect to device {}.".format(target_device_name))
286
287        if not verify_device_state_by_name(
288                self.sec_dut, self.log, source_device_name, "CONNECTED", None):
289            raise signals.TestFailure(
290                "Failed to connect to device {}.".format(source_device_name))
291
292        #TODO: Validation of services and paired devices (b/175641870)
293        # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb
294        # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb
295        #TODO: Make an easy function for checking/updating devices
296        services = None
297        if not verify_device_state_by_name(self.pri_dut, self.log,
298                                           target_device_name, "BONDED",
299                                           services):
300            raise signals.TestFailure(
301                "Failed to pair device {}.".format(target_device_name))
302
303        if not verify_device_state_by_name(self.sec_dut, self.log,
304                                           source_device_name, "BONDED",
305                                           services):
306            raise signals.TestFailure(
307                "Failed to pair device {}.".format(source_device_name))
308
309        raise signals.TestPass("Success")
310