1# Lint as: python2, python3 2# Copyright 2019 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Server side bluetooth GATT client helper class for testing""" 7 8import base64 9import json 10 11 12class GATT_ClientFacade(object): 13 """A wrapper for getting GATT application from GATT server""" 14 15 def __init__(self, bluetooth_facade): 16 """Initialize a GATT_ClientFacade 17 18 @param bluetooth_facade: facade to communicate with adapter in DUT 19 20 """ 21 self.bluetooth_facade = bluetooth_facade 22 23 24 def browse(self, address): 25 """Browse the application on GATT server 26 27 @param address: a string of MAC address of the GATT server device 28 29 @return: GATT_Application object 30 31 """ 32 attr_map_json = json.loads(self.bluetooth_facade.\ 33 get_gatt_attributes_map(address)) 34 application = GATT_Application() 35 application.browse(attr_map_json, self.bluetooth_facade) 36 37 return application 38 39 40class GATT_Application(object): 41 """A GATT client application class""" 42 43 def __init__(self): 44 """Initialize a GATT Application""" 45 self.services = dict() 46 47 48 def browse(self, attr_map_json, bluetooth_facade): 49 """Browse the application on GATT server 50 51 @param attr_map_json: a json object returned by 52 bluetooth_device_xmlrpc_server 53 54 @bluetooth_facade: facade to communicate with adapter in DUT 55 56 """ 57 servs_json = attr_map_json['services'] 58 for uuid in servs_json: 59 path = servs_json[uuid]['path'] 60 service_obj = GATT_Service(uuid, path, bluetooth_facade) 61 service_obj.read_properties() 62 self.add_service(service_obj) 63 64 chrcs_json = servs_json[uuid]['characteristics'] 65 for uuid in chrcs_json: 66 path = chrcs_json[uuid]['path'] 67 chrc_obj = GATT_Characteristic(uuid, path, bluetooth_facade) 68 chrc_obj.read_properties() 69 service_obj.add_characteristic(chrc_obj) 70 71 descs_json = chrcs_json[uuid]['descriptors'] 72 for uuid in descs_json: 73 path = descs_json[uuid]['path'] 74 desc_obj = GATT_Descriptor(uuid, path, bluetooth_facade) 75 desc_obj.read_properties() 76 chrc_obj.add_descriptor(desc_obj) 77 78 79 def find_by_uuid(self, uuid): 80 """Find attribute under this application by specifying UUID 81 82 @param uuid: string of UUID 83 84 @return: Attribute object if found, 85 none otherwise 86 """ 87 for serv_uuid, serv in self.services.items(): 88 found = serv.find_by_uuid(uuid) 89 if found: 90 return found 91 return None 92 93 94 def add_service(self, service): 95 """Add a service into this application""" 96 self.services[service.uuid] = service 97 98 99 @staticmethod 100 def diff(appl_a, appl_b): 101 """Compare two Applications, and return their difference 102 103 @param appl_a: the first application which is going to be compared 104 105 @param appl_b: the second application which is going to be compared 106 107 @return: a list of string, each describes one difference 108 109 """ 110 result = [] 111 112 uuids_a = set(appl_a.services.keys()) 113 uuids_b = set(appl_b.services.keys()) 114 uuids = uuids_a.union(uuids_b) 115 116 for uuid in uuids: 117 serv_a = appl_a.services.get(uuid, None) 118 serv_b = appl_b.services.get(uuid, None) 119 120 if not serv_a or not serv_b: 121 result.append("Service %s is not included in both Applications:" 122 "%s vs %s" % (uuid, bool(serv_a), bool(serv_b))) 123 else: 124 result.extend(GATT_Service.diff(serv_a, serv_b)) 125 return result 126 127 128class GATT_Service(object): 129 """GATT client service class""" 130 PROPERTIES = ['UUID', 'Primary', 'Device', 'Includes'] 131 132 133 def __init__(self, uuid, object_path, bluetooth_facade): 134 """Initialize a GATT service object 135 136 @param uuid: string of UUID 137 138 @param object_path: object path of this service 139 140 @param bluetooth_facade: facade to communicate with adapter in DUT 141 142 """ 143 self.uuid = uuid 144 self.object_path = object_path 145 self.bluetooth_facade = bluetooth_facade 146 self.properties = dict() 147 self.characteristics = dict() 148 149 150 def add_characteristic(self, chrc_obj): 151 """Add a characteristic attribute into service 152 153 @param chrc_obj: a characteristic object 154 155 """ 156 self.characteristics[chrc_obj.uuid] = chrc_obj 157 158 159 def read_properties(self): 160 """Read all properties in this service""" 161 for prop_name in self.PROPERTIES: 162 self.properties[prop_name] = self.read_property(prop_name) 163 return self.properties 164 165 166 def read_property(self, property_name): 167 """Read a property in this service 168 169 @param property_name: string of the name of the property 170 171 @return: the value of the property 172 173 """ 174 return self.bluetooth_facade.get_gatt_service_property( 175 self.object_path, property_name) 176 177 def find_by_uuid(self, uuid): 178 """Find attribute under this service by specifying UUID 179 180 @param uuid: string of UUID 181 182 @return: Attribute object if found, 183 none otherwise 184 185 """ 186 if self.uuid == uuid: 187 return self 188 189 for chrc_uuid, chrc in self.characteristics.items(): 190 found = chrc.find_by_uuid(uuid) 191 if found: 192 return found 193 return None 194 195 196 @staticmethod 197 def diff(serv_a, serv_b): 198 """Compare two Services, and return their difference 199 200 @param serv_a: the first service which is going to be compared 201 202 @param serv_b: the second service which is going to be compared 203 204 @return: a list of string, each describes one difference 205 206 """ 207 result = [] 208 209 for prop_name in GATT_Service.PROPERTIES: 210 if serv_a.properties[prop_name] != serv_b.properties[prop_name]: 211 result.append("Service %s is different in %s: %s vs %s" % 212 (serv_a.uuid, prop_name, 213 serv_a.properties[prop_name], 214 serv_b.properties[prop_name])) 215 216 uuids_a = set(serv_a.characteristics.keys()) 217 uuids_b = set(serv_b.characteristics.keys()) 218 uuids = uuids_a.union(uuids_b) 219 220 for uuid in uuids: 221 chrc_a = serv_a.characteristics.get(uuid, None) 222 chrc_b = serv_b.characteristics.get(uuid, None) 223 224 if not chrc_a or not chrc_b: 225 result.append("Characteristic %s is not included in both " 226 "Services: %s vs %s" % (uuid, bool(chrc_a), 227 bool(chrc_b))) 228 else: 229 result.extend(GATT_Characteristic.diff(chrc_a, chrc_b)) 230 return result 231 232 233class GATT_Characteristic(object): 234 """GATT client characteristic class""" 235 236 PROPERTIES = ['UUID', 'Service', 'Value', 'Notifying', 'Flags'] 237 238 239 def __init__(self, uuid, object_path, bluetooth_facade): 240 """Initialize a GATT characteristic object 241 242 @param uuid: string of UUID 243 244 @param object_path: object path of this characteristic 245 246 @param bluetooth_facade: facade to communicate with adapter in DUT 247 248 """ 249 self.uuid = uuid 250 self.object_path = object_path 251 self.bluetooth_facade = bluetooth_facade 252 self.properties = dict() 253 self.descriptors = dict() 254 255 256 def add_descriptor(self, desc_obj): 257 """Add a characteristic attribute into service 258 259 @param desc_obj: a descriptor object 260 261 """ 262 self.descriptors[desc_obj.uuid] = desc_obj 263 264 265 def read_properties(self): 266 """Read all properties in this characteristic""" 267 for prop_name in self.PROPERTIES: 268 self.properties[prop_name] = self.read_property(prop_name) 269 return self.properties 270 271 272 def read_property(self, property_name): 273 """Read a property in this characteristic 274 275 @param property_name: string of the name of the property 276 277 @return: the value of the property 278 279 """ 280 return self.bluetooth_facade.get_gatt_characteristic_property( 281 self.object_path, property_name) 282 283 284 def find_by_uuid(self, uuid): 285 """Find attribute under this characteristic by specifying UUID 286 287 @param uuid: string of UUID 288 289 @return: Attribute object if found, 290 none otherwise 291 292 """ 293 if self.uuid == uuid: 294 return self 295 296 for desc_uuid, desc in self.descriptors.items(): 297 if desc_uuid == uuid: 298 return desc 299 return None 300 301 302 def read_value(self): 303 """Perform ReadValue in DUT and store it in property 'Value' 304 305 @return: bytearray of the value 306 307 """ 308 value = self.bluetooth_facade.gatt_characteristic_read_value( 309 self.uuid, self.object_path) 310 self.properties['Value'] = bytearray(base64.standard_b64decode(value)) 311 return self.properties['Value'] 312 313 314 @staticmethod 315 def diff(chrc_a, chrc_b): 316 """Compare two Characteristics, and return their difference 317 318 @param serv_a: the first service which is going to be compared 319 320 @param serv_b: the second service which is going to be compared 321 322 @return: a list of string, each describes one difference 323 324 """ 325 result = [] 326 327 for prop_name in GATT_Characteristic.PROPERTIES: 328 if chrc_a.properties[prop_name] != chrc_b.properties[prop_name]: 329 result.append("Characteristic %s is different in %s: %s vs %s" 330 % (chrc_a.uuid, prop_name, 331 chrc_a.properties[prop_name], 332 chrc_b.properties[prop_name])) 333 334 uuids_a = set(chrc_a.descriptors.keys()) 335 uuids_b = set(chrc_b.descriptors.keys()) 336 uuids = uuids_a.union(uuids_b) 337 338 for uuid in uuids: 339 desc_a = chrc_a.descriptors.get(uuid, None) 340 desc_b = chrc_b.descriptors.get(uuid, None) 341 342 if not desc_a or not desc_b: 343 result.append("Descriptor %s is not included in both" 344 "Characteristic: %s vs %s" % (uuid, bool(desc_a), 345 bool(desc_b))) 346 else: 347 result.extend(GATT_Descriptor.diff(desc_a, desc_b)) 348 return result 349 350 351class GATT_Descriptor(object): 352 """GATT client descriptor class""" 353 354 PROPERTIES = ['UUID', 'Characteristic', 'Value', 'Flags'] 355 356 def __init__(self, uuid, object_path, bluetooth_facade): 357 """Initialize a GATT descriptor object 358 359 @param uuid: string of UUID 360 361 @param object_path: object path of this descriptor 362 363 @param bluetooth_facade: facade to communicate with adapter in DUT 364 365 """ 366 self.uuid = uuid 367 self.object_path = object_path 368 self.bluetooth_facade = bluetooth_facade 369 self.properties = dict() 370 371 372 def read_properties(self): 373 """Read all properties in this characteristic""" 374 for prop_name in self.PROPERTIES: 375 self.properties[prop_name] = self.read_property(prop_name) 376 return self.properties 377 378 379 def read_property(self, property_name): 380 """Read a property in this characteristic 381 382 @param property_name: string of the name of the property 383 384 @return: the value of the property 385 386 """ 387 return self.bluetooth_facade.get_gatt_descriptor_property( 388 self.object_path, property_name) 389 390 391 def read_value(self): 392 """Perform ReadValue in DUT and store it in property 'Value' 393 394 @return: bytearray of the value 395 396 """ 397 value = self.bluetooth_facade.gatt_descriptor_read_value( 398 self.uuid, self.object_path) 399 self.properties['Value'] = bytearray(base64.standard_b64decode(value)) 400 401 return self.properties['Value'] 402 403 404 @staticmethod 405 def diff(desc_a, desc_b): 406 """Compare two Descriptors, and return their difference 407 408 @param serv_a: the first service which is going to be compared 409 410 @param serv_b: the second service which is going to be compared 411 412 @return: a list of string, each describes one difference 413 414 """ 415 result = [] 416 417 for prop_name in desc_a.properties.keys(): 418 if desc_a.properties[prop_name] != desc_b.properties[prop_name]: 419 result.append("Descriptor %s is different in %s: %s vs %s" % 420 (desc_a.uuid, prop_name, 421 desc_a.properties[prop_name], 422 desc_b.properties[prop_name])) 423 424 return result 425 426 427def UUID_Short2Full(uuid): 428 """Transform 2 bytes uuid string to 16 bytes 429 430 @param uuid: 2 bytes shortened UUID string in hex 431 432 @return: full uuid string 433 """ 434 uuid_template = '0000%s-0000-1000-8000-00805f9b34fb' 435 return uuid_template % uuid 436 437 438class GATT_HIDApplication(GATT_Application): 439 """Default HID Application on Raspberry Pi GATT server 440 """ 441 442 BatteryServiceUUID = UUID_Short2Full('180f') 443 BatteryLevelUUID = UUID_Short2Full('2a19') 444 CliChrcConfigUUID = UUID_Short2Full('2902') 445 GenericAttributeProfileUUID = UUID_Short2Full('1801') 446 ServiceChangedUUID = UUID_Short2Full('2a05') 447 DeviceInfoUUID = UUID_Short2Full('180a') 448 ManufacturerNameStrUUID = UUID_Short2Full('2a29') 449 PnPIDUUID = UUID_Short2Full('2a50') 450 GenericAccessProfileUUID = UUID_Short2Full('1800') 451 DeviceNameUUID = UUID_Short2Full('2a00') 452 AppearanceUUID = UUID_Short2Full('2a01') 453 454 455 def __init__(self): 456 """ 457 """ 458 GATT_Application.__init__(self) 459 BatteryService = GATT_Service(self.BatteryServiceUUID, None, None) 460 BatteryService.properties = { 461 'UUID': BatteryService.uuid, 462 'Primary': True, 463 'Device': None, 464 'Includes': [] 465 } 466 self.add_service(BatteryService) 467 468 BatteryLevel = GATT_Characteristic(self.BatteryLevelUUID, None, None) 469 BatteryLevel.properties = { 470 'UUID': BatteryLevel.uuid, 471 'Service': None, 472 'Value': [], 473 'Notifying': False, 474 'Flags': ['read', 'notify'] 475 } 476 BatteryService.add_characteristic(BatteryLevel) 477 478 CliChrcConfig = GATT_Descriptor(self.CliChrcConfigUUID, None, None) 479 CliChrcConfig.properties = { 480 'UUID': CliChrcConfig.uuid, 481 'Characteristic': None, 482 'Value': [], 483 'Flags': None 484 } 485 486 BatteryLevel.add_descriptor(CliChrcConfig) 487 488 GenericAttributeProfile = GATT_Service(self.GenericAttributeProfileUUID, 489 None, None) 490 GenericAttributeProfile.properties = { 491 'UUID': GenericAttributeProfile.uuid, 492 'Primary': True, 493 'Device': None, 494 'Includes': [] 495 } 496 self.add_service(GenericAttributeProfile) 497 498 ServiceChanged = GATT_Characteristic(self.ServiceChangedUUID, None, 499 None) 500 ServiceChanged.properties = { 501 'UUID': ServiceChanged.uuid, 502 'Service': None, 503 'Value': [], 504 'Notifying': False, 505 'Flags': ['indicate'] 506 } 507 GenericAttributeProfile.add_characteristic(ServiceChanged) 508 509 CliChrcConfig = GATT_Descriptor(self.CliChrcConfigUUID, None, None) 510 CliChrcConfig.properties = { 511 'UUID': CliChrcConfig.uuid, 512 'Characteristic': None, 513 'Value': [], 514 'Flags': None 515 } 516 ServiceChanged.add_descriptor(CliChrcConfig) 517 518 DeviceInfo = GATT_Service(self.DeviceInfoUUID, None, None) 519 DeviceInfo.properties = { 520 'UUID': DeviceInfo.uuid, 521 'Primary': True, 522 'Device': None, 523 'Includes': [] 524 } 525 self.add_service(DeviceInfo) 526 527 ManufacturerNameStr = GATT_Characteristic(self.ManufacturerNameStrUUID, 528 None, None) 529 ManufacturerNameStr.properties = { 530 'UUID': ManufacturerNameStr.uuid, 531 'Service': None, 532 'Value': [], 533 'Notifying': None, 534 'Flags': ['read'] 535 } 536 DeviceInfo.add_characteristic(ManufacturerNameStr) 537 538 PnPID = GATT_Characteristic(self.PnPIDUUID, None, None) 539 PnPID.properties = { 540 'UUID': PnPID.uuid, 541 'Service': None, 542 'Value': [], 543 'Notifying': None, 544 'Flags': ['read'] 545 } 546 DeviceInfo.add_characteristic(PnPID) 547 548 GenericAccessProfile = GATT_Service(self.GenericAccessProfileUUID, 549 None, None) 550 GenericAccessProfile.properties = { 551 'UUID': GenericAccessProfile.uuid, 552 'Primary': True, 553 'Device': None, 554 'Includes': [] 555 } 556 self.add_service(GenericAccessProfile) 557 558 DeviceName = GATT_Characteristic(self.DeviceNameUUID, None, None) 559 DeviceName.properties = { 560 'UUID': DeviceName.uuid, 561 'Service': None, 562 'Value': [], 563 'Notifying': None, 564 'Flags': ['read'] 565 } 566 GenericAccessProfile.add_characteristic(DeviceName) 567 568 Appearance = GATT_Characteristic(self.AppearanceUUID, None, None) 569 Appearance.properties = { 570 'UUID': Appearance.uuid, 571 'Service': None, 572 'Value': [], 573 'Notifying': None, 574 'Flags': ['read'] 575 } 576 GenericAccessProfile.add_characteristic(Appearance) 577