• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2019 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Server side bluetooth GATT client helper class for testing"""
7
8import base64
9import json
10
11
12class GATT_ClientFacade(object):
13    """A wrapper for getting GATT application from GATT server"""
14
15    def __init__(self, bluetooth_facade):
16        """Initialize a GATT_ClientFacade
17
18        @param bluetooth_facade: facade to communicate with adapter in DUT
19
20        """
21        self.bluetooth_facade = bluetooth_facade
22
23
24    def browse(self, address):
25        """Browse the application on GATT server
26
27        @param address: a string of MAC address of the GATT server device
28
29        @return: GATT_Application object
30
31        """
32        attr_map_json = json.loads(self.bluetooth_facade.\
33                              get_gatt_attributes_map(address))
34        application = GATT_Application()
35        application.browse(attr_map_json, self.bluetooth_facade)
36
37        return application
38
39
40class GATT_Application(object):
41    """A GATT client application class"""
42
43    def __init__(self):
44        """Initialize a GATT Application"""
45        self.services = dict()
46
47
48    def browse(self, attr_map_json, bluetooth_facade):
49        """Browse the application on GATT server
50
51        @param attr_map_json: a json object returned by
52                              bluetooth_device_xmlrpc_server
53
54        @bluetooth_facade: facade to communicate with adapter in DUT
55
56        """
57        servs_json = attr_map_json['services']
58        for uuid in servs_json:
59            path = servs_json[uuid]['path']
60            service_obj = GATT_Service(uuid, path, bluetooth_facade)
61            service_obj.read_properties()
62            self.add_service(service_obj)
63
64            chrcs_json = servs_json[uuid]['characteristics']
65            for uuid in chrcs_json:
66                path = chrcs_json[uuid]['path']
67                chrc_obj = GATT_Characteristic(uuid, path, bluetooth_facade)
68                chrc_obj.read_properties()
69                service_obj.add_characteristic(chrc_obj)
70
71                descs_json = chrcs_json[uuid]['descriptors']
72                for uuid in descs_json:
73                    path = descs_json[uuid]['path']
74                    desc_obj = GATT_Descriptor(uuid, path, bluetooth_facade)
75                    desc_obj.read_properties()
76                    chrc_obj.add_descriptor(desc_obj)
77
78
79    def find_by_uuid(self, uuid):
80        """Find attribute under this application by specifying UUID
81
82        @param uuid: string of UUID
83
84        @return: Attribute object if found,
85                 none otherwise
86        """
87        for serv_uuid, serv in self.services.items():
88            found = serv.find_by_uuid(uuid)
89            if found:
90                return found
91        return None
92
93
94    def add_service(self, service):
95        """Add a service into this application"""
96        self.services[service.uuid] = service
97
98
99    @staticmethod
100    def diff(appl_a, appl_b):
101        """Compare two Applications, and return their difference
102
103        @param appl_a: the first application which is going to be compared
104
105        @param appl_b: the second application which is going to be compared
106
107        @return: a list of string, each describes one difference
108
109        """
110        result = []
111
112        uuids_a = set(appl_a.services.keys())
113        uuids_b = set(appl_b.services.keys())
114        uuids = uuids_a.union(uuids_b)
115
116        for uuid in uuids:
117            serv_a = appl_a.services.get(uuid, None)
118            serv_b = appl_b.services.get(uuid, None)
119
120            if not serv_a or not serv_b:
121                result.append("Service %s is not included in both Applications:"
122                              "%s vs %s" % (uuid, bool(serv_a), bool(serv_b)))
123            else:
124                result.extend(GATT_Service.diff(serv_a, serv_b))
125        return result
126
127
128class GATT_Service(object):
129    """GATT client service class"""
130    PROPERTIES = ['UUID', 'Primary', 'Device', 'Includes']
131
132
133    def __init__(self, uuid, object_path, bluetooth_facade):
134        """Initialize a GATT service object
135
136        @param uuid: string of UUID
137
138        @param object_path: object path of this service
139
140        @param bluetooth_facade: facade to communicate with adapter in DUT
141
142        """
143        self.uuid = uuid
144        self.object_path = object_path
145        self.bluetooth_facade = bluetooth_facade
146        self.properties = dict()
147        self.characteristics = dict()
148
149
150    def add_characteristic(self, chrc_obj):
151        """Add a characteristic attribute into service
152
153        @param chrc_obj: a characteristic object
154
155        """
156        self.characteristics[chrc_obj.uuid] = chrc_obj
157
158
159    def read_properties(self):
160        """Read all properties in this service"""
161        for prop_name in self.PROPERTIES:
162            self.properties[prop_name] = self.read_property(prop_name)
163        return self.properties
164
165
166    def read_property(self, property_name):
167        """Read a property in this service
168
169        @param property_name: string of the name of the property
170
171        @return: the value of the property
172
173        """
174        return self.bluetooth_facade.get_gatt_service_property(
175                                        self.object_path, property_name)
176
177    def find_by_uuid(self, uuid):
178        """Find attribute under this service by specifying UUID
179
180        @param uuid: string of UUID
181
182        @return: Attribute object if found,
183                 none otherwise
184
185        """
186        if self.uuid == uuid:
187            return self
188
189        for chrc_uuid, chrc in self.characteristics.items():
190            found = chrc.find_by_uuid(uuid)
191            if found:
192                return found
193        return None
194
195
196    @staticmethod
197    def diff(serv_a, serv_b):
198        """Compare two Services, and return their difference
199
200        @param serv_a: the first service which is going to be compared
201
202        @param serv_b: the second service which is going to be compared
203
204        @return: a list of string, each describes one difference
205
206        """
207        result = []
208
209        for prop_name in GATT_Service.PROPERTIES:
210            if serv_a.properties[prop_name] != serv_b.properties[prop_name]:
211                result.append("Service %s is different in %s: %s vs %s" %
212                              (serv_a.uuid, prop_name,
213                              serv_a.properties[prop_name],
214                              serv_b.properties[prop_name]))
215
216        uuids_a = set(serv_a.characteristics.keys())
217        uuids_b = set(serv_b.characteristics.keys())
218        uuids = uuids_a.union(uuids_b)
219
220        for uuid in uuids:
221            chrc_a = serv_a.characteristics.get(uuid, None)
222            chrc_b = serv_b.characteristics.get(uuid, None)
223
224            if not chrc_a or not chrc_b:
225                result.append("Characteristic %s is not included in both "
226                              "Services: %s vs %s" % (uuid, bool(chrc_a),
227                                                    bool(chrc_b)))
228            else:
229                result.extend(GATT_Characteristic.diff(chrc_a, chrc_b))
230        return result
231
232
233class GATT_Characteristic(object):
234    """GATT client characteristic class"""
235
236    PROPERTIES = ['UUID', 'Service', 'Value', 'Notifying', 'Flags']
237
238
239    def __init__(self, uuid, object_path, bluetooth_facade):
240        """Initialize a GATT characteristic object
241
242        @param uuid: string of UUID
243
244        @param object_path: object path of this characteristic
245
246        @param bluetooth_facade: facade to communicate with adapter in DUT
247
248        """
249        self.uuid = uuid
250        self.object_path = object_path
251        self.bluetooth_facade = bluetooth_facade
252        self.properties = dict()
253        self.descriptors = dict()
254
255
256    def add_descriptor(self, desc_obj):
257        """Add a characteristic attribute into service
258
259        @param desc_obj: a descriptor object
260
261        """
262        self.descriptors[desc_obj.uuid] = desc_obj
263
264
265    def read_properties(self):
266        """Read all properties in this characteristic"""
267        for prop_name in self.PROPERTIES:
268            self.properties[prop_name] = self.read_property(prop_name)
269        return self.properties
270
271
272    def read_property(self, property_name):
273        """Read a property in this characteristic
274
275        @param property_name: string of the name of the property
276
277        @return: the value of the property
278
279        """
280        return self.bluetooth_facade.get_gatt_characteristic_property(
281                                        self.object_path, property_name)
282
283
284    def find_by_uuid(self, uuid):
285        """Find attribute under this characteristic by specifying UUID
286
287        @param uuid: string of UUID
288
289        @return: Attribute object if found,
290                 none otherwise
291
292        """
293        if self.uuid == uuid:
294            return self
295
296        for desc_uuid, desc in self.descriptors.items():
297            if desc_uuid == uuid:
298                return desc
299        return None
300
301
302    def read_value(self):
303        """Perform ReadValue in DUT and store it in property 'Value'
304
305        @return: bytearray of the value
306
307        """
308        value = self.bluetooth_facade.gatt_characteristic_read_value(
309                                                self.uuid, self.object_path)
310        self.properties['Value'] = bytearray(base64.standard_b64decode(value))
311        return self.properties['Value']
312
313
314    @staticmethod
315    def diff(chrc_a, chrc_b):
316        """Compare two Characteristics, and return their difference
317
318        @param serv_a: the first service which is going to be compared
319
320        @param serv_b: the second service which is going to be compared
321
322        @return: a list of string, each describes one difference
323
324        """
325        result = []
326
327        for prop_name in GATT_Characteristic.PROPERTIES:
328            if chrc_a.properties[prop_name] != chrc_b.properties[prop_name]:
329                result.append("Characteristic %s is different in %s: %s vs %s"
330                              % (chrc_a.uuid, prop_name,
331                              chrc_a.properties[prop_name],
332                              chrc_b.properties[prop_name]))
333
334        uuids_a = set(chrc_a.descriptors.keys())
335        uuids_b = set(chrc_b.descriptors.keys())
336        uuids = uuids_a.union(uuids_b)
337
338        for uuid in uuids:
339            desc_a = chrc_a.descriptors.get(uuid, None)
340            desc_b = chrc_b.descriptors.get(uuid, None)
341
342            if not desc_a or not desc_b:
343                result.append("Descriptor %s is not included in both"
344                              "Characteristic: %s vs %s" % (uuid, bool(desc_a),
345                                                          bool(desc_b)))
346            else:
347                result.extend(GATT_Descriptor.diff(desc_a, desc_b))
348        return result
349
350
351class GATT_Descriptor(object):
352    """GATT client descriptor class"""
353
354    PROPERTIES = ['UUID', 'Characteristic', 'Value', 'Flags']
355
356    def __init__(self, uuid, object_path, bluetooth_facade):
357        """Initialize a GATT descriptor object
358
359        @param uuid: string of UUID
360
361        @param object_path: object path of this descriptor
362
363        @param bluetooth_facade: facade to communicate with adapter in DUT
364
365        """
366        self.uuid = uuid
367        self.object_path = object_path
368        self.bluetooth_facade = bluetooth_facade
369        self.properties = dict()
370
371
372    def read_properties(self):
373        """Read all properties in this characteristic"""
374        for prop_name in self.PROPERTIES:
375            self.properties[prop_name] = self.read_property(prop_name)
376        return self.properties
377
378
379    def read_property(self, property_name):
380        """Read a property in this characteristic
381
382        @param property_name: string of the name of the property
383
384        @return: the value of the property
385
386        """
387        return self.bluetooth_facade.get_gatt_descriptor_property(
388                                        self.object_path, property_name)
389
390
391    def read_value(self):
392        """Perform ReadValue in DUT and store it in property 'Value'
393
394        @return: bytearray of the value
395
396        """
397        value = self.bluetooth_facade.gatt_descriptor_read_value(
398                                                self.uuid, self.object_path)
399        self.properties['Value'] = bytearray(base64.standard_b64decode(value))
400
401        return self.properties['Value']
402
403
404    @staticmethod
405    def diff(desc_a, desc_b):
406        """Compare two Descriptors, and return their difference
407
408        @param serv_a: the first service which is going to be compared
409
410        @param serv_b: the second service which is going to be compared
411
412        @return: a list of string, each describes one difference
413
414        """
415        result = []
416
417        for prop_name in desc_a.properties.keys():
418            if desc_a.properties[prop_name] != desc_b.properties[prop_name]:
419                result.append("Descriptor %s is different in %s: %s vs %s" %
420                              (desc_a.uuid, prop_name,
421                              desc_a.properties[prop_name],
422                              desc_b.properties[prop_name]))
423
424        return result
425
426
427def UUID_Short2Full(uuid):
428    """Transform 2 bytes uuid string to 16 bytes
429
430    @param uuid: 2 bytes shortened UUID string in hex
431
432    @return: full uuid string
433    """
434    uuid_template = '0000%s-0000-1000-8000-00805f9b34fb'
435    return uuid_template % uuid
436
437
438class GATT_HIDApplication(GATT_Application):
439    """Default HID Application on Raspberry Pi GATT server
440    """
441
442    BatteryServiceUUID = UUID_Short2Full('180f')
443    BatteryLevelUUID = UUID_Short2Full('2a19')
444    CliChrcConfigUUID = UUID_Short2Full('2902')
445    GenericAttributeProfileUUID = UUID_Short2Full('1801')
446    ServiceChangedUUID = UUID_Short2Full('2a05')
447    DeviceInfoUUID = UUID_Short2Full('180a')
448    ManufacturerNameStrUUID = UUID_Short2Full('2a29')
449    PnPIDUUID = UUID_Short2Full('2a50')
450    GenericAccessProfileUUID = UUID_Short2Full('1800')
451    DeviceNameUUID = UUID_Short2Full('2a00')
452    AppearanceUUID = UUID_Short2Full('2a01')
453
454
455    def __init__(self):
456        """
457        """
458        GATT_Application.__init__(self)
459        BatteryService = GATT_Service(self.BatteryServiceUUID, None, None)
460        BatteryService.properties = {
461                'UUID': BatteryService.uuid,
462                'Primary': True,
463                'Device': None,
464                'Includes': []
465        }
466        self.add_service(BatteryService)
467
468        BatteryLevel = GATT_Characteristic(self.BatteryLevelUUID, None, None)
469        BatteryLevel.properties = {
470                'UUID': BatteryLevel.uuid,
471                'Service': None,
472                'Value': [],
473                'Notifying': False,
474                'Flags': ['read', 'notify']
475        }
476        BatteryService.add_characteristic(BatteryLevel)
477
478        CliChrcConfig = GATT_Descriptor(self.CliChrcConfigUUID, None, None)
479        CliChrcConfig.properties = {
480                'UUID': CliChrcConfig.uuid,
481                'Characteristic': None,
482                'Value': [],
483                'Flags': None
484        }
485
486        BatteryLevel.add_descriptor(CliChrcConfig)
487
488        GenericAttributeProfile = GATT_Service(self.GenericAttributeProfileUUID,
489                                               None, None)
490        GenericAttributeProfile.properties = {
491                'UUID': GenericAttributeProfile.uuid,
492                'Primary': True,
493                'Device': None,
494                'Includes': []
495        }
496        self.add_service(GenericAttributeProfile)
497
498        ServiceChanged = GATT_Characteristic(self.ServiceChangedUUID, None,
499                                             None)
500        ServiceChanged.properties = {
501                'UUID': ServiceChanged.uuid,
502                'Service': None,
503                'Value': [],
504                'Notifying': False,
505                'Flags': ['indicate']
506        }
507        GenericAttributeProfile.add_characteristic(ServiceChanged)
508
509        CliChrcConfig = GATT_Descriptor(self.CliChrcConfigUUID, None, None)
510        CliChrcConfig.properties = {
511                'UUID': CliChrcConfig.uuid,
512                'Characteristic': None,
513                'Value': [],
514                'Flags': None
515        }
516        ServiceChanged.add_descriptor(CliChrcConfig)
517
518        DeviceInfo = GATT_Service(self.DeviceInfoUUID, None, None)
519        DeviceInfo.properties = {
520                'UUID': DeviceInfo.uuid,
521                'Primary': True,
522                'Device': None,
523                'Includes': []
524        }
525        self.add_service(DeviceInfo)
526
527        ManufacturerNameStr = GATT_Characteristic(self.ManufacturerNameStrUUID,
528                                                  None, None)
529        ManufacturerNameStr.properties = {
530                'UUID': ManufacturerNameStr.uuid,
531                'Service': None,
532                'Value': [],
533                'Notifying': None,
534                'Flags': ['read']
535        }
536        DeviceInfo.add_characteristic(ManufacturerNameStr)
537
538        PnPID = GATT_Characteristic(self.PnPIDUUID, None, None)
539        PnPID.properties = {
540                'UUID': PnPID.uuid,
541                'Service': None,
542                'Value': [],
543                'Notifying': None,
544                'Flags': ['read']
545        }
546        DeviceInfo.add_characteristic(PnPID)
547
548        GenericAccessProfile = GATT_Service(self.GenericAccessProfileUUID,
549                                            None, None)
550        GenericAccessProfile.properties = {
551                'UUID': GenericAccessProfile.uuid,
552                'Primary': True,
553                'Device': None,
554                'Includes': []
555        }
556        self.add_service(GenericAccessProfile)
557
558        DeviceName = GATT_Characteristic(self.DeviceNameUUID, None, None)
559        DeviceName.properties = {
560                'UUID': DeviceName.uuid,
561                'Service': None,
562                'Value': [],
563                'Notifying': None,
564                'Flags': ['read']
565        }
566        GenericAccessProfile.add_characteristic(DeviceName)
567
568        Appearance = GATT_Characteristic(self.AppearanceUUID, None, None)
569        Appearance.properties = {
570                'UUID': Appearance.uuid,
571                'Service': None,
572                'Value': [],
573                'Notifying': None,
574                'Flags': ['read']
575        }
576        GenericAccessProfile.add_characteristic(Appearance)
577