• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Setup:
18This test only requires two fuchsia devices.
19"""
20
21from acts import signals
22from acts.base_test import BaseTestClass
23from acts.test_decorators import test_tracker_info
24from acts_contrib.test_utils.bt.bt_test_utils import generate_id_by_size
25from acts_contrib.test_utils.fuchsia.bt_test_utils import bredr_scan_for_device_by_name
26from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name
27from acts_contrib.test_utils.fuchsia.bt_test_utils import unbond_all_known_devices
28from acts_contrib.test_utils.fuchsia.bt_test_utils import verify_device_state_by_name
29import time
30
31
32class BtFuchsiaEPTest(BaseTestClass):
33    ble_advertise_interval = 50
34    scan_timeout_seconds = 60
35    default_iterations = 1000
36    adv_name = generate_id_by_size(10)
37    test_adv_data = {
38        "name": adv_name,
39        "appearance": None,
40        "service_data": None,
41        "tx_power_level": None,
42        "service_uuids": None,
43        "manufacturer_data": None,
44        "uris": None,
45    }
46    test_connectable = True
47    test_scan_response = None
48
49    def setup_class(self):
50        super().setup_class()
51        for fd in self.fuchsia_devices:
52            fd.bts_lib.initBluetoothSys()
53        self.pri_dut = self.fuchsia_devices[0]
54        self.sec_dut = self.fuchsia_devices[1]
55
56    def on_fail(self, test_name, begin_time):
57        for fd in self.fuchsia_devices:
58            fd.take_bug_report(test_name, begin_time)
59        self._unbond_all_known_devices()
60        self.sec_dut.ble_lib.bleStopBleAdvertising()
61        self._kill_media_services()
62
63    def teardown_class(self):
64        self._kill_media_services()
65
66    def _kill_media_services(self):
67        """Kill any BT services related to A2DP/AVRCP on all Fuchsia devices.
68        """
69        ssh_timeout = 30
70        for fd in self.fuchsia_devices:
71            fd.send_command_ssh("killall bt-a2dp*",
72                                timeout=ssh_timeout,
73                                skip_status_code_check=True)
74            fd.send_command_ssh("killall bt-avrcp*",
75                                timeout=ssh_timeout,
76                                skip_status_code_check=True)
77
78    def _unbond_all_known_devices(self):
79        """For all Fuchsia devices, unbond any known pairings.
80        """
81        time.sleep(5)
82        for fd in self.fuchsia_devices:
83            unbond_all_known_devices(fd, self.log)
84
85    def test_ble_awareness(self):
86        """Verify that Fuchsia devices can advertise and scan each other
87
88        Verify a Fuchsia device that starts a BLE advertisesement can be
89        found by a Fuchsia BLE scanner.
90
91        Steps:
92        1. On one Fuchsia device set an advertisement
93        2. On one Fuchsia device, scan for the advertisement by name
94
95        Expected Result:
96        Verify that there are no errors after each GATT connection.
97
98        Returns:
99          signals.TestPass if no errors
100          signals.TestFailure if there are any errors during the test.
101
102        TAGS: BLE
103        Priority: 0
104        """
105
106        self.sec_dut.ble_lib.bleStartBleAdvertising(
107            self.test_adv_data, self.test_scan_response,
108            self.ble_advertise_interval, self.test_connectable)
109
110        device = le_scan_for_device_by_name(self.pri_dut, self.log,
111                                            self.adv_name,
112                                            self.scan_timeout_seconds)
113        self.sec_dut.ble_lib.bleStopBleAdvertising()
114        if device is None:
115            raise signals.TestFailure("Scanner unable to find advertisement.")
116        raise signals.TestPass("Success")
117
118    def test_gatt_central_peripheral(self):
119        """Verify that Fuchsia devices can perform GATT operations
120
121        Verify a Fuchsia devices can perform GATT connections and interactions.
122
123        Steps:
124        1. On one Fuchsia device set an advertisement
125        2. On one Fuchsia device, scan for the advertisement by name
126        3. Perform GATT connection over LE
127        4. Pair both devices.
128        5. Perform GATT read/write operations.
129        6. Perform GATT disconnection.
130
131        Expected Result:
132        Verify that there are no errors after each GATT connection.
133
134        Returns:
135          signals.TestPass if no errors
136          signals.TestFailure if there are any errors during the test.
137
138        TAGS: BLE
139        Priority: 0
140        """
141        self._unbond_all_known_devices()
142
143        source_device_name = generate_id_by_size(10)
144        self.pri_dut.bts_lib.setName(source_device_name)
145
146        self.sec_dut.ble_lib.bleStartBleAdvertising(
147            self.test_adv_data, self.test_scan_response,
148            self.ble_advertise_interval, self.test_connectable)
149
150        device = le_scan_for_device_by_name(self.pri_dut, self.log,
151                                            self.adv_name,
152                                            self.scan_timeout_seconds)
153        if device is None:
154            raise signals.TestFailure("Scanner unable to find advertisement.")
155
156        connect_result = self.pri_dut.gattc_lib.bleConnectToPeripheral(
157            device["id"])
158        if connect_result.get("error") is not None:
159            raise signals.TestFailure("GATT Connection failed with: {}".format(
160                connect_result.get("error")))
161
162        if not verify_device_state_by_name(self.pri_dut, self.log,
163                                           self.adv_name, "CONNECTED", None):
164            raise signals.TestFailure(
165                "Failed to connect to device {}.".format(target_device_name))
166
167        if not verify_device_state_by_name(
168                self.sec_dut, self.log, source_device_name, "CONNECTED", None):
169            raise signals.TestFailure(
170                "Failed to connect to device {}.".format(source_device_name))
171
172        security_level = "ENCRYPTED"
173        non_bondable = False
174        transport = 2  #LE
175        self.pri_dut.bts_lib.pair(device["id"], security_level, non_bondable,
176                                  transport)
177
178        services = None
179        if not verify_device_state_by_name(self.pri_dut, self.log,
180                                           self.adv_name, "BONDED", services):
181            raise signals.TestFailure(
182                "Failed to pair device {}.".format(target_device_name))
183
184        if not verify_device_state_by_name(self.sec_dut, self.log,
185                                           source_device_name, "BONDED",
186                                           services):
187            raise signals.TestFailure(
188                "Failed to pair device {}.".format(source_device_name))
189
190        disconnect_result = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
191            device["id"])
192        if disconnect_result.get("error") is not None:
193            raise signals.TestFailure(
194                "GATT Disconnection failed with: {}".format(
195                    connect_result.get("error")))
196
197        self.sec_dut.ble_lib.bleStopBleAdvertising()
198
199        # TODO: Setup Proper GATT server and verify services published are found
200
201        raise signals.TestPass("Success")
202
203    def test_pairing_a2dp(self):
204        """Verify that Fuchsia devices can pair to each other and establish
205            an A2DP connection
206
207            Verify that Fuchsia devices can pair to each other and establish
208            an A2DP connection
209
210            Steps:
211            1. Clear out all bonded devices
212            2. Stop any A2DP services running on the device
213                Needed to take ownership of the services
214            3. Init sink and source opposite devices
215            4. Start pairing delegate for all Fuchsia devices
216            5. Set sink device to be discoverable
217            6. Discover sink device from source device
218            7. Connect to sink device from source device
219            8. Pair to sink device
220            9. Validate paired devices and services present
221
222            Expected Result:
223            Verify devices are successfully paired and appropriate a2dp
224            services are running.
225
226            Returns:
227            signals.TestPass if no errors
228            signals.TestFailure if there are any errors during the test.
229
230            TAGS: BREDR, A2DP
231            Priority: 0
232        """
233        self._unbond_all_known_devices()
234        self._kill_media_services()
235
236        source_device_name = generate_id_by_size(10)
237        target_device_name = generate_id_by_size(10)
238
239        self.pri_dut.bts_lib.setName(source_device_name)
240        self.sec_dut.bts_lib.setName(target_device_name)
241
242        input_capabilities = "NONE"
243        output_capabilities = "NONE"
244        self.pri_dut.avdtp_lib.init()
245        self.pri_dut.control_daemon("bt-avrcp.cmx", "start")
246        self.sec_dut.avdtp_lib.init(initiator_delay=2000)
247        self.sec_dut.control_daemon("bt-avrcp-target.cmx", "start")
248        self.pri_dut.bts_lib.acceptPairing(input_capabilities,
249                                           output_capabilities)
250
251        self.sec_dut.bts_lib.acceptPairing(input_capabilities,
252                                           output_capabilities)
253        self.sec_dut.bts_lib.setDiscoverable(True)
254
255        unique_mac_addr_id = bredr_scan_for_device_by_name(
256            self.pri_dut, self.log, target_device_name,
257            self.scan_timeout_seconds)
258
259        if not unique_mac_addr_id:
260            raise signals.TestFailure(
261                "Failed to find device {}.".format(target_device_name))
262
263        connect_result = self.pri_dut.bts_lib.connectDevice(unique_mac_addr_id)
264        if connect_result.get("error") is not None:
265            raise signals.TestFailure("Failed to connect with {}.".format(
266                connect_result.get("error")))
267
268        if not verify_device_state_by_name(
269                self.pri_dut, self.log, target_device_name, "CONNECTED", None):
270            raise signals.TestFailure(
271                "Failed to connect to device {}.".format(target_device_name))
272
273        if not verify_device_state_by_name(
274                self.sec_dut, self.log, source_device_name, "CONNECTED", None):
275            raise signals.TestFailure(
276                "Failed to connect to device {}.".format(source_device_name))
277
278        security_level = "NONE"
279        bondable = True
280        transport = 1  #BREDR
281        pair_result = self.pri_dut.bts_lib.pair(unique_mac_addr_id,
282                                                security_level, bondable,
283                                                transport)
284        if pair_result.get("error") is not None:
285            raise signals.TestFailure("Failed to pair with {}.".format(
286                pair_result.get("error")))
287
288        #TODO: Validation of services and paired devices (b/175641870)
289        # A2DP sink: 0000110b-0000-1000-8000-00805f9b34fb
290        # A2DP source: 0000110a-0000-1000-8000-00805f9b34fb
291        #TODO: Make an easy function for checking/updating devices
292        services = None
293        if not verify_device_state_by_name(self.pri_dut, self.log,
294                                           target_device_name, "BONDED",
295                                           services):
296            raise signals.TestFailure(
297                "Failed to pair device {}.".format(target_device_name))
298
299        if not verify_device_state_by_name(self.sec_dut, self.log,
300                                           source_device_name, "BONDED",
301                                           services):
302            raise signals.TestFailure(
303                "Failed to pair device {}.".format(source_device_name))
304
305        raise signals.TestPass("Success")
306