• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""This is the PTS base class that is inherited from all PTS
17Tests.
18"""
19
20import ctypes
21import random
22import re
23import time
24import traceback
25
26from ctypes import *
27from datetime import datetime
28
29from acts import signals
30from acts.base_test import BaseTestClass
31from acts.controllers.bluetooth_pts_device import VERDICT_STRINGS
32from acts.controllers.fuchsia_device import FuchsiaDevice
33from acts.signals import TestSignal
34from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device
35from acts_contrib.test_utils.bt.bt_constants import gatt_transport
36from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name
37
38
39class PtsBaseClass(BaseTestClass):
40    """ Class for representing common functionality across all PTS tests.
41
42    This includes the ability to rerun tests due to PTS instability,
43    common PTS action mappings, and setup/teardown related devices.
44
45    """
46    scan_timeout_seconds = 10
47    peer_identifier = None
48
49    def setup_class(self):
50        super().setup_class()
51        if 'dut' in self.user_params:
52            if self.user_params['dut'] == 'fuchsia_devices':
53                self.dut = create_bluetooth_device(self.fuchsia_devices[0])
54            elif self.user_params['dut'] == 'android_devices':
55                self.dut = create_bluetooth_device(self.android_devices[0])
56            else:
57                raise ValueError('Invalid DUT specified in config. (%s)' %
58                                 self.user_params['dut'])
59        else:
60            # Default is an fuchsia device
61            self.dut = create_bluetooth_device(self.fuchsia_devices[0])
62
63        self.characteristic_read_not_permitted_uuid = self.user_params.get(
64            "characteristic_read_not_permitted_uuid")
65        self.characteristic_read_not_permitted_handle = self.user_params.get(
66            "characteristic_read_not_permitted_handle")
67        self.characteristic_read_invalid_handle = self.user_params.get(
68            "characteristic_read_invalid_handle")
69        self.characteristic_attribute_not_found_uuid = self.user_params.get(
70            "characteristic_attribute_not_found_uuid")
71        self.write_characteristic_not_permitted_handle = self.user_params.get(
72            "write_characteristic_not_permitted_handle")
73
74        self.pts = self.bluetooth_pts_device[0]
75        # MMI functions commented out until implemented. Added for tracking
76        # purposes.
77        self.pts_action_mapping = {
78            "A2DP": {
79                1: self.a2dp_mmi_iut_connectable,
80                1002: self.a2dp_mmi_iut_accept_connect,
81                1020: self.a2dp_mmi_initiate_open_stream,
82            },
83            "GATT": {
84                1: self.mmi_make_iut_connectable,
85                2: self.mmi_iut_initiate_connection,
86                3: self.mmi_iut_initiate_disconnection,
87                # 4: self.mmi_iut_no_security,
88                # 5: self.mmi_iut_initiate_br_connection,
89                10: self.mmi_discover_primary_service,
90                # 11: self.mmi_confirm_no_primary_service_small,
91                # 12: self.mmi_iut_mtu_exchange,
92                # 13: self.mmi_discover_all_service_record,
93                # 14: self.mmi_iut_discover_gatt_service_record,
94                15: self.mmi_iut_find_included_services,
95                # 16: self.mmi_confirm_no_characteristic_uuid_small,
96                17: self.mmi_confirm_primary_service,
97                # 18: self.mmi_send_primary_service_uuid,
98                # 19: self.mmi_confirm_primary_service_uuid,
99                # 22: self.confirm_primary_service_1801,
100                24: self.mmi_confirm_include_service,
101                26: self.mmi_confirm_characteristic_service,
102                # 27: self.perform_read_all_characteristics,
103                29: self.
104                mmi_discover_service_uuid_range,  # AKA: discover service by uuid
105                # 31: self.perform_read_all_descriptors,
106                48: self.mmi_iut_send_read_characteristic_handle,
107                58: self.mmi_iut_send_read_descriptor_handle,
108                70: self.mmi_send_write_command,
109                74: self.mmi_send_write_request,
110                76: self.mmi_send_prepare_write,
111                77: self.mmi_iut_send_prepare_write_greater_offset,
112                80: self.mmi_iut_send_prepare_write_greater,
113                110: self.mmi_iut_enter_handle_read_not_permitted,
114                111: self.mmi_iut_enter_uuid_read_not_permitted,
115                118: self.mmi_iut_enter_handle_invalid,
116                119: self.mmi_iut_enter_uuid_attribute_not_found,
117                120: self.mmi_iut_enter_handle_write_not_permitted,
118                2000: self.mmi_verify_secure_id,  # Enter pairing pin from DUT.
119            },
120            "SDP": {
121                # TODO: Implement MMIs as necessary
122            }
123        }
124        self.pts.bind_to(self.process_next_action)
125
126    def teardown_class(self):
127        self.pts.clean_up()
128
129    def setup_test(self):
130        # Always start the test with RESULT_INCOMP
131        self.pts.pts_test_result = VERDICT_STRINGS['RESULT_INCOMP']
132
133    def teardown_test(self):
134        return True
135
136    @staticmethod
137    def pts_test_wrap(fn):
138        def _safe_wrap_test_case(self, *args, **kwargs):
139            test_id = "{}:{}:{}".format(self.__class__.__name__, fn.__name__,
140                                        time.time())
141            log_string = "[Test ID] {}".format(test_id)
142            self.log.info(log_string)
143            try:
144                self.dut.log_info("Started " + log_string)
145                result = fn(self, *args, **kwargs)
146                self.dut.log_info("Finished " + log_string)
147                rerun_count = self.user_params.get("pts_auto_rerun_count", 0)
148                for i in range(int(rerun_count)):
149                    if result is not True:
150                        self.teardown_test()
151                        log_string = "[Rerun Test ID] {}. Run #{} run failed... Retrying".format(
152                            test_id, i + 1)
153                        self.log.info(log_string)
154                        self.setup_test()
155                        self.dut.log_info("Rerun Started " + log_string)
156                        result = fn(self, *args, **kwargs)
157                    else:
158                        return result
159                return result
160            except TestSignal:
161                raise
162            except Exception as e:
163                self.log.error(traceback.format_exc())
164                self.log.error(str(e))
165                raise
166            return fn(self, *args, **kwargs)
167
168        return _safe_wrap_test_case
169
170    def process_next_action(self, action):
171        func = self.pts_action_mapping.get(
172            self.pts.pts_profile_mmi_request).get(action, "Nothing")
173        if func != 'Nothing':
174            func()
175
176    ### BEGIN A2DP MMI Actions ###
177
178    def a2dp_mmi_iut_connectable(self):
179        self.dut.start_profile_a2dp_sink()
180        self.dut.set_discoverable(True)
181
182    def a2dp_mmi_iut_accept_connect(self):
183        self.dut.start_profile_a2dp_sink()
184        self.dut.set_discoverable(True)
185
186    def a2dp_mmi_initiate_open_stream(self):
187        self.dut.a2dp_initiate_open_stream()
188
189    ### END A2DP MMI Actions ###
190
191    ### BEGIN GATT MMI Actions ###
192
193    def create_write_value_by_size(self, size):
194        write_value = []
195        for i in range(size):
196            write_value.append(i % 256)
197        return write_value
198
199    def mmi_send_write_command(self):
200        description_to_parse = self.pts.current_implicit_send_description
201        raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse)
202        handle = int(raw_handle.group(1), 16)
203        raw_size = re.search('with <= \'(.*)\' byte', description_to_parse)
204        size = int(raw_size.group(1))
205        self.dut.gatt_client_write_characteristic_without_response_by_handle(
206            self.peer_identifier, handle,
207            self.create_write_value_by_size(size))
208
209    def mmi_send_write_request(self):
210        description_to_parse = self.pts.current_implicit_send_description
211        raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse)
212        handle = int(raw_handle.group(1), 16)
213        raw_size = re.search('with <= \'(.*)\' byte', description_to_parse)
214        size = int(raw_size.group(1))
215        offset = 0
216        self.dut.gatt_client_write_characteristic_by_handle(
217            self.peer_identifier, handle, offset,
218            self.create_write_value_by_size(size))
219
220    def mmi_send_prepare_write(self):
221        description_to_parse = self.pts.current_implicit_send_description
222        raw_handle = re.search('handle = \'(.*)\'O <=', description_to_parse)
223        handle = int(raw_handle.group(1), 16)
224        raw_size = re.search('<= \'(.*)\' byte', description_to_parse)
225        size = int(math.floor(int(raw_size.group(1)) / 2))
226        offset = int(size / 2)
227        self.dut.gatt_client_write_characteristic_by_handle(
228            self.peer_identifier, handle, offset,
229            self.create_write_value_by_size(size))
230
231    def mmi_iut_send_prepare_write_greater_offset(self):
232        description_to_parse = self.pts.current_implicit_send_description
233        raw_handle = re.search('handle = \'(.*)\'O and', description_to_parse)
234        handle = int(raw_handle.group(1), 16)
235        raw_offset = re.search('greater than \'(.*)\' byte',
236                               description_to_parse)
237        offset = int(raw_offset.group(1))
238        size = 1
239        self.dut.gatt_client_write_characteristic_by_handle(
240            self.peer_identifier, handle, offset,
241            self.create_write_value_by_size(size))
242
243    def mmi_iut_send_prepare_write_greater(self):
244        description_to_parse = self.pts.current_implicit_send_description
245        raw_handle = re.search('handle = \'(.*)\'O with', description_to_parse)
246        handle = int(raw_handle.group(1), 16)
247        raw_size = re.search('greater than \'(.*)\' byte',
248                             description_to_parse)
249        size = int(raw_size.group(1))
250        offset = 0
251        self.dut.gatt_client_write_characteristic_by_handle(
252            self.peer_identifier, handle, offset,
253            self.create_write_value_by_size(size))
254
255    def mmi_make_iut_connectable(self):
256        adv_data = {
257            "name": fuchsia_name,
258            "appearance": None,
259            "service_data": None,
260            "tx_power_level": None,
261            "service_uuids": None,
262            "manufacturer_data": None,
263            "uris": None,
264        }
265        scan_response = None
266        connectable = True
267        interval = 1000
268
269        self.dut.start_le_advertisement(adv_data, scan_response, interval,
270                                        connectable)
271
272    def mmi_iut_enter_uuid_read_not_permitted(self):
273        self.pts.extra_answers.append(
274            self.characteristic_read_not_permitted_uuid)
275
276    def mmi_iut_enter_handle_read_not_permitted(self):
277        self.pts.extra_answers.append(
278            self.characteristic_read_not_permitted_handle)
279
280    def mmi_iut_enter_handle_invalid(self):
281        self.pts.extra_answers.append(self.characteristic_read_invalid_handle)
282
283    def mmi_iut_enter_uuid_attribute_not_found(self):
284        self.pts.extra_answers.append(
285            self.characteristic_attribute_not_found_uuid)
286
287    def mmi_iut_enter_handle_write_not_permitted(self):
288        self.pts.extra_answers.append(
289            self.write_characteristic_not_permitted_handle)
290
291    def mmi_verify_secure_id(self):
292        self.pts.extra_answers.append(self.dut.get_pairing_pin())
293
294    def mmi_discover_service_uuid_range(self, uuid):
295        self.dut.gatt_client_mmi_discover_service_uuid_range(
296            self.peer_identifier, uuid)
297
298    def mmi_iut_initiate_connection(self):
299        autoconnect = False
300        transport = gatt_transport['le']
301        adv_name = "PTS"
302        self.peer_identifier = self.dut.le_scan_with_name_filter(
303            "PTS", self.scan_timeout_seconds)
304        if self.peer_identifier is None:
305            raise signals.TestFailure("Scanner unable to find advertisement.")
306        tries = 3
307        for _ in range(tries):
308            if self.dut.gatt_connect(self.peer_identifier, transport,
309                                     autoconnect):
310                return
311
312        raise signals.TestFailure("Unable to connect to peripheral.")
313
314    def mmi_iut_initiate_disconnection(self):
315        if not self.dut.gatt_disconnect(self.peer_identifier):
316            raise signals.TestFailure("Failed to disconnect from peer.")
317
318    def mmi_discover_primary_service(self):
319        self.dut.gatt_refresh()
320
321    def mmi_iut_find_included_services(self):
322        self.dut.gatt_refresh()
323
324        test_result = self.pts.execute_test(test_name)
325        return test_result
326
327    def mmi_confirm_primary_service(self):
328        # TODO: Write verifier that 1800 and 1801 exists. For now just pass.
329        return True
330
331    def mmi_confirm_characteristic_service(self):
332        # TODO: Write verifier that no services exist. For now just pass.
333        return True
334
335    def mmi_confirm_include_service(self, uuid_description):
336        # TODO: Write verifier that input services exist. For now just pass.
337        # Note: List comes in the form of a long string to parse:
338        # Attribute Handle = '0002'O Included Service Attribute handle = '0080'O,End Group Handle = '0085'O,Service UUID = 'A00B'O
339        # \n
340        # Attribute Handle = '0021'O Included Service Attribute handle = '0001'O,End Group Handle = '0006'O,Service UUID = 'A00D'O
341        # \n ...
342        return True
343
344    def mmi_iut_send_read_characteristic_handle(self):
345        description_to_parse = self.pts.current_implicit_send_description
346        raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse)
347        handle = int(raw_handle.group(1), 16)
348        self.dut.gatt_client_read_characteristic_by_handle(
349            self.peer_identifier, handle)
350
351    def mmi_iut_send_read_descriptor_handle(self):
352        description_to_parse = self.pts.current_implicit_send_description
353        raw_handle = re.search('handle = \'(.*)\'O to', description_to_parse)
354        handle = int(raw_handle.group(1), 16)
355        self.dut.gatt_client_descriptor_read_by_handle(self.peer_identifier,
356                                                       handle)
357
358    ### END GATT MMI Actions ###
359