• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from autotest_lib.client.common_lib import error
6from autotest_lib.server.cros.bluetooth import bluetooth_test
7
8class bluetooth_SDP_ServiceBrowse(bluetooth_test.BluetoothTest):
9    """
10    Verify that the IUT behave correct during Service Browse procedure.
11    """
12    version = 1
13
14    MAX_REC_CNT             = 100
15    MAX_ATTR_BYTE_CNT       = 300
16    PUBLIC_BROWSE_ROOT      = 0x1002
17    SERVICE_CLASS_ID_LIST   = 0x0001
18    BROWSE_GROUP_DESCRIPTOR = 0x1001
19    GROUP_ID                = 0x0200
20
21
22    def get_attribute_ssr_sar(self, class_id, attr_id, size):
23        """Get service attributes using Service Search Request and Service
24        Attribute Request.
25
26        @param class_id: Class ID of service to check.
27        @param attr_id: ID of attribute to check.
28        @param size: Preferred size of UUID.
29
30        @return attribute value if attribute exists, None otherwise
31
32        """
33        handles = self.tester.service_search_request(
34                      [class_id], self.MAX_REC_CNT, size)
35        if not (isinstance(handles, list) and len(handles) > 0):
36            return None
37
38        res = []
39        for record_handle in handles:
40            value = self.tester.service_attribute_request(
41                        record_handle, self.MAX_ATTR_BYTE_CNT, [attr_id])
42            if not (isinstance(value, list) and len(value) == 2 and
43                value[0] == attr_id):
44                return None
45            res.append(value[1])
46
47        return res
48
49
50    def get_attribute_ssar(self, class_id, attr_id, size):
51        """Get service attributes using Service Search Attribute Request.
52
53        @param class_id: Class ID of service to check.
54        @param attr_id: ID of attribute to check.
55        @param size: Preferred size of UUID.
56
57        @return attribute value if attribute exists, None otherwise
58
59        """
60        response = self.tester.service_search_attribute_request(
61                       [class_id], self.MAX_ATTR_BYTE_CNT, [attr_id], size)
62
63        if not isinstance(response, list):
64            return None
65
66        res = []
67        for elem in response:
68            if not (isinstance(elem, list) and len(elem) == 2 and
69                    elem[0] == attr_id):
70                return None
71            res.append(elem[1])
72
73        return res
74
75
76    def test_attribute(self, class_id, attr_id, get_attribute):
77        """Test service attributes using 16-bit, 32-bit and 128-bit
78        size of UUID.
79
80        @param class_id: Class ID of service to check.
81        @param attr_id: ID of attribute to check.
82        @param get_attribute: Method to use to get an attribute value.
83
84        @return attribute value if attribute exists and values from three tests
85        are equal, None otherwise
86
87        """
88        result_16 = get_attribute(class_id, attr_id, 16)
89
90        for size in 32, 128:
91            result_cur = get_attribute(class_id, attr_id, size)
92            if result_16 != result_cur:
93                return None
94
95        return result_16
96
97
98    def service_browse(self, get_attribute):
99        """Execute a Service Browse procedure.
100
101        @param get_attribute: Method to use to get an attribute value.
102
103        @return sorted list of unique services on the DUT, or False if browse
104        did not finish correctly
105
106        """
107        # Find services on top of hierarchy.
108        root_services = self.test_attribute(self.PUBLIC_BROWSE_ROOT,
109                                            self.SERVICE_CLASS_ID_LIST,
110                                            get_attribute)
111        if not root_services:
112            return False
113
114        # Find additional browse groups.
115        group_ids = self.test_attribute(self.BROWSE_GROUP_DESCRIPTOR,
116                                        self.GROUP_ID,
117                                        get_attribute)
118        if not group_ids:
119            return False
120
121        # Find services from all browse groups.
122        all_services = []
123        for group_id in group_ids:
124            services = self.test_attribute(group_id,
125                                           self.SERVICE_CLASS_ID_LIST,
126                                           get_attribute)
127            if not services:
128                return False
129            all_services.extend(services)
130
131        # Ensure that root services are among all services.
132        for service in root_services:
133            if service not in all_services:
134                return False
135
136        # Sort all services and remove duplicates.
137        all_services.sort()
138        last = 0
139        for service in all_services[1:]:
140            if all_services[last] != service:
141                last += 1
142                all_services[last] = service
143
144        return all_services[:last + 1]
145
146
147    def correct_request(self):
148        """Run basic tests for Service Browse procedure.
149
150        @return True if all tests finishes correctly, False otherwise
151
152        """
153        # Connect to the DUT via L2CAP using SDP socket.
154        self.tester.connect(self.adapter['Address'])
155
156        browse_ssar = self.service_browse(self.get_attribute_ssar)
157        if not browse_ssar:
158            return False
159
160        browse_ssr_sar = self.service_browse(self.get_attribute_ssr_sar)
161
162        # Ensure that two different browse methods return the same results.
163        return browse_ssar == browse_ssr_sar
164
165
166    def run_once(self):
167        # Reset the adapter to the powered on, discoverable state.
168        if not (self.device.reset_on() and
169                self.device.set_discoverable(True)):
170            raise error.TestFail('DUT could not be reset to initial state')
171
172        self.adapter = self.device.get_adapter_properties()
173
174        # Setup the tester as a generic computer.
175        if not self.tester.setup('computer'):
176            raise error.TestFail('Tester could not be initialized')
177
178        # Since radio is involved, this test is not 100% reliable; instead we
179        # repeat a few times until it succeeds.
180        for failed_attempts in range(0, 5):
181            if self.correct_request():
182                break
183        else:
184            raise error.TestFail('Expected device was not found')
185
186        # Record how many attempts this took, hopefully we'll one day figure out
187        # a way to reduce this to zero and then the loop above can go away.
188        self.write_perf_keyval({'failed_attempts': failed_attempts })
189