1# Copyright 2019 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Server side bluetooth GATT client helper class for testing""" 6 7import base64 8import json 9 10 11class GATT_ClientFacade(object): 12 """A wrapper for getting GATT application from GATT server""" 13 14 def __init__(self, bluetooth_facade): 15 """Initialize a GATT_ClientFacade 16 17 @param bluetooth_facade: facade to communicate with adapter in DUT 18 19 """ 20 self.bluetooth_facade = bluetooth_facade 21 22 23 def browse(self, address): 24 """Browse the application on GATT server 25 26 @param address: a string of MAC address of the GATT server device 27 28 @return: GATT_Application object 29 30 """ 31 attr_map_json = json.loads(self.bluetooth_facade.\ 32 get_gatt_attributes_map(address)) 33 application = GATT_Application() 34 application.browse(attr_map_json, self.bluetooth_facade) 35 36 return application 37 38 39class GATT_Application(object): 40 """A GATT client application class""" 41 42 def __init__(self): 43 """Initialize a GATT Application""" 44 self.services = dict() 45 46 47 def browse(self, attr_map_json, bluetooth_facade): 48 """Browse the application on GATT server 49 50 @param attr_map_json: a json object returned by 51 bluetooth_device_xmlrpc_server 52 53 @bluetooth_facade: facade to communicate with adapter in DUT 54 55 """ 56 servs_json = attr_map_json['services'] 57 for uuid in servs_json: 58 path = servs_json[uuid]['path'] 59 service_obj = GATT_Service(uuid, path, bluetooth_facade) 60 service_obj.read_properties() 61 self.add_service(service_obj) 62 63 chrcs_json = servs_json[uuid]['characteristics'] 64 for uuid in chrcs_json: 65 path = chrcs_json[uuid]['path'] 66 chrc_obj = GATT_Characteristic(uuid, path, bluetooth_facade) 67 chrc_obj.read_properties() 68 service_obj.add_characteristic(chrc_obj) 69 70 descs_json = chrcs_json[uuid]['descriptors'] 71 for uuid in descs_json: 72 path = descs_json[uuid]['path'] 73 desc_obj = GATT_Descriptor(uuid, path, bluetooth_facade) 74 desc_obj.read_properties() 75 chrc_obj.add_descriptor(desc_obj) 76 77 78 def find_by_uuid(self, uuid): 79 """Find attribute under this application by specifying UUID 80 81 @param uuid: string of UUID 82 83 @return: Attribute object if found, 84 none otherwise 85 """ 86 for serv_uuid, serv in self.services.items(): 87 found = serv.find_by_uuid(uuid) 88 if found: 89 return found 90 return None 91 92 93 def add_service(self, service): 94 """Add a service into this application""" 95 self.services[service.uuid] = service 96 97 98 @staticmethod 99 def diff(appl_a, appl_b): 100 """Compare two Applications, and return their difference 101 102 @param appl_a: the first application which is going to be compared 103 104 @param appl_b: the second application which is going to be compared 105 106 @return: a list of string, each describes one difference 107 108 """ 109 result = [] 110 111 uuids_a = set(appl_a.services.keys()) 112 uuids_b = set(appl_b.services.keys()) 113 uuids = uuids_a.union(uuids_b) 114 115 for uuid in uuids: 116 serv_a = appl_a.services.get(uuid, None) 117 serv_b = appl_b.services.get(uuid, None) 118 119 if not serv_a or not serv_b: 120 result.append("Service %s is not included in both Applications:" 121 "%s vs %s" % (uuid, bool(serv_a), bool(serv_b))) 122 else: 123 result.extend(GATT_Service.diff(serv_a, serv_b)) 124 return result 125 126 127class GATT_Service(object): 128 """GATT client service class""" 129 PROPERTIES = ['UUID', 'Primary', 'Device', 'Includes'] 130 131 132 def __init__(self, uuid, object_path, bluetooth_facade): 133 """Initialize a GATT service object 134 135 @param uuid: string of UUID 136 137 @param object_path: object path of this service 138 139 @param bluetooth_facade: facade to communicate with adapter in DUT 140 141 """ 142 self.uuid = uuid 143 self.object_path = object_path 144 self.bluetooth_facade = bluetooth_facade 145 self.properties = dict() 146 self.characteristics = dict() 147 148 149 def add_characteristic(self, chrc_obj): 150 """Add a characteristic attribute into service 151 152 @param chrc_obj: a characteristic object 153 154 """ 155 self.characteristics[chrc_obj.uuid] = chrc_obj 156 157 158 def read_properties(self): 159 """Read all properties in this service""" 160 for prop_name in self.PROPERTIES: 161 self.properties[prop_name] = self.read_property(prop_name) 162 return self.properties 163 164 165 def read_property(self, property_name): 166 """Read a property in this service 167 168 @param property_name: string of the name of the property 169 170 @return: the value of the property 171 172 """ 173 return self.bluetooth_facade.get_gatt_service_property( 174 self.object_path, property_name) 175 176 def find_by_uuid(self, uuid): 177 """Find attribute under this service by specifying UUID 178 179 @param uuid: string of UUID 180 181 @return: Attribute object if found, 182 none otherwise 183 184 """ 185 if self.uuid == uuid: 186 return self 187 188 for chrc_uuid, chrc in self.characteristics.items(): 189 found = chrc.find_by_uuid(uuid) 190 if found: 191 return found 192 return None 193 194 195 @staticmethod 196 def diff(serv_a, serv_b): 197 """Compare two Services, and return their difference 198 199 @param serv_a: the first service which is going to be compared 200 201 @param serv_b: the second service which is going to be compared 202 203 @return: a list of string, each describes one difference 204 205 """ 206 result = [] 207 208 for prop_name in GATT_Service.PROPERTIES: 209 if serv_a.properties[prop_name] != serv_b.properties[prop_name]: 210 result.append("Service %s is different in %s: %s vs %s" % 211 (serv_a.uuid, prop_name, 212 serv_a.properties[prop_name], 213 serv_b.properties[prop_name])) 214 215 uuids_a = set(serv_a.characteristics.keys()) 216 uuids_b = set(serv_b.characteristics.keys()) 217 uuids = uuids_a.union(uuids_b) 218 219 for uuid in uuids: 220 chrc_a = serv_a.characteristics.get(uuid, None) 221 chrc_b = serv_b.characteristics.get(uuid, None) 222 223 if not chrc_a or not chrc_b: 224 result.append("Characteristic %s is not included in both " 225 "Services: %s vs %s" % (uuid, bool(chrc_a), 226 bool(chrc_b))) 227 else: 228 result.extend(GATT_Characteristic.diff(chrc_a, chrc_b)) 229 return result 230 231 232class GATT_Characteristic(object): 233 """GATT client characteristic class""" 234 235 PROPERTIES = ['UUID', 'Service', 'Value', 'Notifying', 'Flags'] 236 237 238 def __init__(self, uuid, object_path, bluetooth_facade): 239 """Initialize a GATT characteristic object 240 241 @param uuid: string of UUID 242 243 @param object_path: object path of this characteristic 244 245 @param bluetooth_facade: facade to communicate with adapter in DUT 246 247 """ 248 self.uuid = uuid 249 self.object_path = object_path 250 self.bluetooth_facade = bluetooth_facade 251 self.properties = dict() 252 self.descriptors = dict() 253 254 255 def add_descriptor(self, desc_obj): 256 """Add a characteristic attribute into service 257 258 @param desc_obj: a descriptor object 259 260 """ 261 self.descriptors[desc_obj.uuid] = desc_obj 262 263 264 def read_properties(self): 265 """Read all properties in this characteristic""" 266 for prop_name in self.PROPERTIES: 267 self.properties[prop_name] = self.read_property(prop_name) 268 return self.properties 269 270 271 def read_property(self, property_name): 272 """Read a property in this characteristic 273 274 @param property_name: string of the name of the property 275 276 @return: the value of the property 277 278 """ 279 return self.bluetooth_facade.get_gatt_characteristic_property( 280 self.object_path, property_name) 281 282 283 def find_by_uuid(self, uuid): 284 """Find attribute under this characteristic by specifying UUID 285 286 @param uuid: string of UUID 287 288 @return: Attribute object if found, 289 none otherwise 290 291 """ 292 if self.uuid == uuid: 293 return self 294 295 for desc_uuid, desc in self.descriptors.items(): 296 if desc_uuid == uuid: 297 return desc 298 return None 299 300 301 def read_value(self): 302 """Perform ReadValue in DUT and store it in property 'Value' 303 304 @return: bytearray of the value 305 306 """ 307 value = self.bluetooth_facade.gatt_characteristic_read_value( 308 self.uuid, self.object_path) 309 self.properties['Value'] = bytearray(base64.standard_b64decode(value)) 310 return self.properties['Value'] 311 312 313 @staticmethod 314 def diff(chrc_a, chrc_b): 315 """Compare two Characteristics, and return their difference 316 317 @param serv_a: the first service which is going to be compared 318 319 @param serv_b: the second service which is going to be compared 320 321 @return: a list of string, each describes one difference 322 323 """ 324 result = [] 325 326 for prop_name in GATT_Characteristic.PROPERTIES: 327 if chrc_a.properties[prop_name] != chrc_b.properties[prop_name]: 328 result.append("Characteristic %s is different in %s: %s vs %s" 329 % (chrc_a.uuid, prop_name, 330 chrc_a.properties[prop_name], 331 chrc_b.properties[prop_name])) 332 333 uuids_a = set(chrc_a.descriptors.keys()) 334 uuids_b = set(chrc_b.descriptors.keys()) 335 uuids = uuids_a.union(uuids_b) 336 337 for uuid in uuids: 338 desc_a = chrc_a.descriptors.get(uuid, None) 339 desc_b = chrc_b.descriptors.get(uuid, None) 340 341 if not desc_a or not desc_b: 342 result.append("Descriptor %s is not included in both" 343 "Characteristic: %s vs %s" % (uuid, bool(desc_a), 344 bool(desc_b))) 345 else: 346 result.extend(GATT_Descriptor.diff(desc_a, desc_b)) 347 return result 348 349 350class GATT_Descriptor(object): 351 """GATT client descriptor class""" 352 353 PROPERTIES = ['UUID', 'Characteristic', 'Value', 'Flags'] 354 355 def __init__(self, uuid, object_path, bluetooth_facade): 356 """Initialize a GATT descriptor object 357 358 @param uuid: string of UUID 359 360 @param object_path: object path of this descriptor 361 362 @param bluetooth_facade: facade to communicate with adapter in DUT 363 364 """ 365 self.uuid = uuid 366 self.object_path = object_path 367 self.bluetooth_facade = bluetooth_facade 368 self.properties = dict() 369 370 371 def read_properties(self): 372 """Read all properties in this characteristic""" 373 for prop_name in self.PROPERTIES: 374 self.properties[prop_name] = self.read_property(prop_name) 375 return self.properties 376 377 378 def read_property(self, property_name): 379 """Read a property in this characteristic 380 381 @param property_name: string of the name of the property 382 383 @return: the value of the property 384 385 """ 386 return self.bluetooth_facade.get_gatt_descriptor_property( 387 self.object_path, property_name) 388 389 390 def read_value(self): 391 """Perform ReadValue in DUT and store it in property 'Value' 392 393 @return: bytearray of the value 394 395 """ 396 value = self.bluetooth_facade.gatt_descriptor_read_value( 397 self.uuid, self.object_path) 398 self.properties['Value'] = bytearray(base64.standard_b64decode(value)) 399 400 return self.properties['Value'] 401 402 403 @staticmethod 404 def diff(desc_a, desc_b): 405 """Compare two Descriptors, and return their difference 406 407 @param serv_a: the first service which is going to be compared 408 409 @param serv_b: the second service which is going to be compared 410 411 @return: a list of string, each describes one difference 412 413 """ 414 result = [] 415 416 for prop_name in desc_a.properties.keys(): 417 if desc_a.properties[prop_name] != desc_b.properties[prop_name]: 418 result.append("Descriptor %s is different in %s: %s vs %s" % 419 (desc_a.uuid, prop_name, 420 desc_a.properties[prop_name], 421 desc_b.properties[prop_name])) 422 423 return result 424 425 426def UUID_Short2Full(uuid): 427 """Transform 2 bytes uuid string to 16 bytes 428 429 @param uuid: 2 bytes shortened UUID string in hex 430 431 @return: full uuid string 432 """ 433 uuid_template = '0000%s-0000-1000-8000-00805f9b34fb' 434 return uuid_template % uuid 435 436 437class GATT_HIDApplication(GATT_Application): 438 """Default HID Application on Raspberry Pi GATT server 439 """ 440 441 BatteryServiceUUID = UUID_Short2Full('180f') 442 BatteryLevelUUID = UUID_Short2Full('2a19') 443 CliChrcConfigUUID = UUID_Short2Full('2902') 444 GenericAttributeProfileUUID = UUID_Short2Full('1801') 445 ServiceChangedUUID = UUID_Short2Full('2a05') 446 DeviceInfoUUID = UUID_Short2Full('180a') 447 ManufacturerNameStrUUID = UUID_Short2Full('2a29') 448 PnPIDUUID = UUID_Short2Full('2a50') 449 GenericAccessProfileUUID = UUID_Short2Full('1800') 450 DeviceNameUUID = UUID_Short2Full('2a00') 451 AppearanceUUID = UUID_Short2Full('2a01') 452 453 454 def __init__(self): 455 """ 456 """ 457 GATT_Application.__init__(self) 458 BatteryService = GATT_Service(self.BatteryServiceUUID, None, None) 459 BatteryService.properties = { 460 'UUID': BatteryService.uuid, 461 'Primary': True, 462 'Device': None, 463 'Includes': [] 464 } 465 self.add_service(BatteryService) 466 467 BatteryLevel = GATT_Characteristic(self.BatteryLevelUUID, None, None) 468 BatteryLevel.properties = { 469 'UUID': BatteryLevel.uuid, 470 'Service': None, 471 'Value': [], 472 'Notifying': False, 473 'Flags': ['read', 'notify'] 474 } 475 BatteryService.add_characteristic(BatteryLevel) 476 477 CliChrcConfig = GATT_Descriptor(self.CliChrcConfigUUID, None, None) 478 CliChrcConfig.properties = { 479 'UUID': CliChrcConfig.uuid, 480 'Characteristic': None, 481 'Value': [], 482 'Flags': None 483 } 484 485 BatteryLevel.add_descriptor(CliChrcConfig) 486 487 GenericAttributeProfile = GATT_Service(self.GenericAttributeProfileUUID, 488 None, None) 489 GenericAttributeProfile.properties = { 490 'UUID': GenericAttributeProfile.uuid, 491 'Primary': True, 492 'Device': None, 493 'Includes': [] 494 } 495 self.add_service(GenericAttributeProfile) 496 497 ServiceChanged = GATT_Characteristic(self.ServiceChangedUUID, None, 498 None) 499 ServiceChanged.properties = { 500 'UUID': ServiceChanged.uuid, 501 'Service': None, 502 'Value': [], 503 'Notifying': False, 504 'Flags': ['indicate'] 505 } 506 GenericAttributeProfile.add_characteristic(ServiceChanged) 507 508 CliChrcConfig = GATT_Descriptor(self.CliChrcConfigUUID, None, None) 509 CliChrcConfig.properties = { 510 'UUID': CliChrcConfig.uuid, 511 'Characteristic': None, 512 'Value': [], 513 'Flags': None 514 } 515 ServiceChanged.add_descriptor(CliChrcConfig) 516 517 DeviceInfo = GATT_Service(self.DeviceInfoUUID, None, None) 518 DeviceInfo.properties = { 519 'UUID': DeviceInfo.uuid, 520 'Primary': True, 521 'Device': None, 522 'Includes': [] 523 } 524 self.add_service(DeviceInfo) 525 526 ManufacturerNameStr = GATT_Characteristic(self.ManufacturerNameStrUUID, 527 None, None) 528 ManufacturerNameStr.properties = { 529 'UUID': ManufacturerNameStr.uuid, 530 'Service': None, 531 'Value': [], 532 'Notifying': None, 533 'Flags': ['read'] 534 } 535 DeviceInfo.add_characteristic(ManufacturerNameStr) 536 537 PnPID = GATT_Characteristic(self.PnPIDUUID, None, None) 538 PnPID.properties = { 539 'UUID': PnPID.uuid, 540 'Service': None, 541 'Value': [], 542 'Notifying': None, 543 'Flags': ['read'] 544 } 545 DeviceInfo.add_characteristic(PnPID) 546 547 GenericAccessProfile = GATT_Service(self.GenericAccessProfileUUID, 548 None, None) 549 GenericAccessProfile.properties = { 550 'UUID': GenericAccessProfile.uuid, 551 'Primary': True, 552 'Device': None, 553 'Includes': [] 554 } 555 self.add_service(GenericAccessProfile) 556 557 DeviceName = GATT_Characteristic(self.DeviceNameUUID, None, None) 558 DeviceName.properties = { 559 'UUID': DeviceName.uuid, 560 'Service': None, 561 'Value': [], 562 'Notifying': None, 563 'Flags': ['read'] 564 } 565 GenericAccessProfile.add_characteristic(DeviceName) 566 567 Appearance = GATT_Characteristic(self.AppearanceUUID, None, None) 568 Appearance.properties = { 569 'UUID': Appearance.uuid, 570 'Service': None, 571 'Value': [], 572 'Notifying': None, 573 'Flags': ['read'] 574 } 575 GenericAccessProfile.add_characteristic(Appearance) 576