1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from autotest_lib.client.common_lib import error 6from autotest_lib.server.cros.bluetooth import bluetooth_test 7 8class bluetooth_SDP_ServiceBrowse(bluetooth_test.BluetoothTest): 9 """ 10 Verify that the IUT behave correct during Service Browse procedure. 11 """ 12 version = 1 13 14 MAX_REC_CNT = 100 15 MAX_ATTR_BYTE_CNT = 300 16 PUBLIC_BROWSE_ROOT = 0x1002 17 SERVICE_CLASS_ID_LIST = 0x0001 18 BROWSE_GROUP_DESCRIPTOR = 0x1001 19 GROUP_ID = 0x0200 20 21 22 def get_attribute_ssr_sar(self, class_id, attr_id, size): 23 """Get service attributes using Service Search Request and Service 24 Attribute Request. 25 26 @param class_id: Class ID of service to check. 27 @param attr_id: ID of attribute to check. 28 @param size: Preferred size of UUID. 29 30 @return attribute value if attribute exists, None otherwise 31 32 """ 33 handles = self.tester.service_search_request( 34 [class_id], self.MAX_REC_CNT, size) 35 if not (isinstance(handles, list) and len(handles) > 0): 36 return None 37 38 res = [] 39 for record_handle in handles: 40 value = self.tester.service_attribute_request( 41 record_handle, self.MAX_ATTR_BYTE_CNT, [attr_id]) 42 if not (isinstance(value, list) and len(value) == 2 and 43 value[0] == attr_id): 44 return None 45 res.append(value[1]) 46 47 return res 48 49 50 def get_attribute_ssar(self, class_id, attr_id, size): 51 """Get service attributes using Service Search Attribute Request. 52 53 @param class_id: Class ID of service to check. 54 @param attr_id: ID of attribute to check. 55 @param size: Preferred size of UUID. 56 57 @return attribute value if attribute exists, None otherwise 58 59 """ 60 response = self.tester.service_search_attribute_request( 61 [class_id], self.MAX_ATTR_BYTE_CNT, [attr_id], size) 62 63 if not isinstance(response, list): 64 return None 65 66 res = [] 67 for elem in response: 68 if not (isinstance(elem, list) and len(elem) == 2 and 69 elem[0] == attr_id): 70 return None 71 res.append(elem[1]) 72 73 return res 74 75 76 def test_attribute(self, class_id, attr_id, get_attribute): 77 """Test service attributes using 16-bit, 32-bit and 128-bit 78 size of UUID. 79 80 @param class_id: Class ID of service to check. 81 @param attr_id: ID of attribute to check. 82 @param get_attribute: Method to use to get an attribute value. 83 84 @return attribute value if attribute exists and values from three tests 85 are equal, None otherwise 86 87 """ 88 result_16 = get_attribute(class_id, attr_id, 16) 89 90 for size in 32, 128: 91 result_cur = get_attribute(class_id, attr_id, size) 92 if result_16 != result_cur: 93 return None 94 95 return result_16 96 97 98 def service_browse(self, get_attribute): 99 """Execute a Service Browse procedure. 100 101 @param get_attribute: Method to use to get an attribute value. 102 103 @return sorted list of unique services on the DUT, or False if browse 104 did not finish correctly 105 106 """ 107 # Find services on top of hierarchy. 108 root_services = self.test_attribute(self.PUBLIC_BROWSE_ROOT, 109 self.SERVICE_CLASS_ID_LIST, 110 get_attribute) 111 if not root_services: 112 return False 113 114 # Find additional browse groups. 115 group_ids = self.test_attribute(self.BROWSE_GROUP_DESCRIPTOR, 116 self.GROUP_ID, 117 get_attribute) 118 if not group_ids: 119 return False 120 121 # Find services from all browse groups. 122 all_services = [] 123 for group_id in group_ids: 124 services = self.test_attribute(group_id, 125 self.SERVICE_CLASS_ID_LIST, 126 get_attribute) 127 if not services: 128 return False 129 all_services.extend(services) 130 131 # Ensure that root services are among all services. 132 for service in root_services: 133 if service not in all_services: 134 return False 135 136 # Sort all services and remove duplicates. 137 all_services.sort() 138 last = 0 139 for service in all_services[1:]: 140 if all_services[last] != service: 141 last += 1 142 all_services[last] = service 143 144 return all_services[:last + 1] 145 146 147 def correct_request(self): 148 """Run basic tests for Service Browse procedure. 149 150 @return True if all tests finishes correctly, False otherwise 151 152 """ 153 # Connect to the DUT via L2CAP using SDP socket. 154 self.tester.connect(self.adapter['Address']) 155 156 browse_ssar = self.service_browse(self.get_attribute_ssar) 157 if not browse_ssar: 158 return False 159 160 browse_ssr_sar = self.service_browse(self.get_attribute_ssr_sar) 161 162 # Ensure that two different browse methods return the same results. 163 return browse_ssar == browse_ssr_sar 164 165 166 def run_once(self): 167 # Reset the adapter to the powered on, discoverable state. 168 if not (self.device.reset_on() and 169 self.device.set_discoverable(True)): 170 raise error.TestFail('DUT could not be reset to initial state') 171 172 self.adapter = self.device.get_adapter_properties() 173 174 # Setup the tester as a generic computer. 175 if not self.tester.setup('computer'): 176 raise error.TestFail('Tester could not be initialized') 177 178 # Since radio is involved, this test is not 100% reliable; instead we 179 # repeat a few times until it succeeds. 180 for failed_attempts in range(0, 5): 181 if self.correct_request(): 182 break 183 else: 184 raise error.TestFail('Expected device was not found') 185 186 # Record how many attempts this took, hopefully we'll one day figure out 187 # a way to reduce this to zero and then the loop above can go away. 188 self.write_perf_keyval({'failed_attempts': failed_attempts }) 189