• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Python script for wrappers to various libraries.
18
19Class CmdInput inherts from the cmd library.
20
21Functions that start with "do_" have a method
22signature that doesn't match the actual command
23line command and that is intended. This is so the
24"help" command knows what to display (in this case
25the documentation of the command itself).
26
27For example:
28Looking at the function "do_tool_set_target_device_name"
29has the inputs self and line which is expected of this type
30of method signature. When the "help" command is done on the
31method name you get the function documentation as such:
32
33(Cmd) help tool_set_target_device_name
34
35        Description: Reset the target device name.
36        Input(s):
37            device_name: Required. The advertising name to connect to.
38        Usage: tool_set_target_device_name new_target_device name
39          Examples:
40            tool_set_target_device_name le_watch
41
42This is all to say this documentation pattern is expected.
43
44"""
45
46import acts.test_utils.bt.gatt_test_database as gatt_test_database
47
48import cmd
49import pprint
50import time
51"""Various Global Strings"""
52CMD_LOG = "CMD {} result: {}"
53FAILURE = "CMD {} threw exception: {}"
54BASIC_ADV_NAME = "fs_test"
55
56
57class CmdInput(cmd.Cmd):
58    ble_advertise_interval = 1000
59    target_device_name = ""
60    le_ids = []
61    unique_mac_addr_id = None
62
63    def setup_vars(self, fuchsia_devices, target_device_name, log):
64        self.pri_dut = fuchsia_devices[0]
65        if len(fuchsia_devices) > 1:
66            self.sec_dut = fuchsia_devices[1]
67        self.target_device_name = target_device_name
68        self.log = log
69
70    def emptyline(self):
71        pass
72
73    def do_EOF(self, line):
74        "End Script"
75        return True
76
77    """ Useful Helper functions and cmd line tooling """
78
79    def _find_unique_id(self):
80        scan_time_ms = 100000
81        scan_filter = {"name_substring": self.target_device_name}
82        scan_count = 1
83        self.unique_mac_addr_id = None
84        self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
85        for i in range(100):
86            time.sleep(.5)
87            scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices()[
88                'result']
89            for device in scan_res:
90                name, did, connectable = device["name"], device["id"], device[
91                    "connectable"]
92                if (name):
93                    self.log.info(
94                        "Discovered device with name, id: {}, {}".format(
95                            name, did))
96                if (self.target_device_name in name):
97                    self.unique_mac_addr_id = did
98                    self.log.info(
99                        "Successfully found device: name, id: {}, {}".format(
100                            name, did))
101                    break
102            if self.unique_mac_addr_id:
103                break
104        self.pri_dut.gattc_lib.bleStopBleScan()
105
106    def do_tool_refesh_unique_id(self, line):
107        """
108        Description: Refresh command line tool mac unique id.
109        Usage:
110          Examples:
111            tool_refresh_unique_id
112        """
113        try:
114            self._find_unique_id()
115        except Exception as err:
116            self.log.error(
117                "Failed to scan or find scan result: {}".format(err))
118
119    def do_tool_set_target_device_name(self, line):
120        """
121        Description: Reset the target device name.
122        Input(s):
123            device_name: Required. The advertising name to connect to.
124        Usage: tool_set_target_device_name new_target_device name
125          Examples:
126            tool_set_target_device_name le_watch
127        """
128        self.log.info("Setting target_device_name to: {}".format(line))
129        self.target_device_name = line
130
131    """Begin BLE advertise wrappers"""
132
133    def do_ble_start_generic_connectable_advertisement(self, line):
134        """
135        Description: Start a connectable LE advertisement
136        Usage: ble_start_generic_connectable_advertisement
137        """
138        cmd = "Start a connectable LE advertisement"
139        try:
140            adv_data = {"name": BASIC_ADV_NAME}
141            self.pri_dut.ble_lib.bleStartBleAdvertising(
142                adv_data, self.ble_advertise_interval)
143        except Exception as err:
144            self.log.error(FAILURE.format(cmd, err))
145
146    def do_ble_start_generic_nonconnectable_advertisement(self, line):
147        """
148        Description: Start a non-connectable LE advertisement
149        Usage: ble_start_generic_nonconnectable_advertisement
150        """
151        cmd = "Start a nonconnectable LE advertisement"
152        try:
153            adv_data = {"name": BASIC_ADV_NAME}
154            self.pri_dut.ble_lib.bleStartBleAdvertising(
155                adv_data, self.ble_advertise_interval, False)
156        except Exception as err:
157            self.log.error(FAILURE.format(cmd, err))
158
159    def do_ble_stop_advertisement(self, line):
160        """
161        Description: Stop a BLE advertisement.
162        Usage: ble_stop_advertisement
163        """
164        cmd = "Stop a connectable LE advertisement"
165        try:
166            self.pri_dut.ble_lib.bleStopBleAdvertising()
167        except Exception as err:
168            self.log.error(FAILURE.format(cmd, err))
169
170    """End BLE advertise wrappers"""
171    """Begin GATT client wrappers"""
172
173    def complete_gattc_connect_by_id(self, text, line, begidx, endidx):
174        if not text:
175            completions = list(self.le_ids)[:]
176        else:
177            completions = [s for s in self.le_ids if s.startswith(text)]
178        return completions
179
180    def do_gattc_connect_by_id(self, line):
181        """
182        Description: Connect to a LE peripheral.
183        Input(s):
184            device_id: Required. The unique device ID from Fuchsia
185                discovered devices.
186        Usage:
187          Examples:
188            gattc_connect device_id
189        """
190        cmd = "Connect to a LE peripheral by input ID."
191        try:
192
193            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
194                line)
195            self.log.info("Connection status: {}".format(
196                pprint.pformat(connection_status)))
197        except Exception as err:
198            self.log.error(FAILURE.format(cmd, err))
199
200    def do_gattc_connect(self, line):
201        """
202        Description: Connect to a LE peripheral.
203        Optional input: device_name
204        Input(s):
205            device_name: Optional. The peripheral ID to connect to.
206        Usage:
207          Examples:
208            gattc_connect
209            gattc_connect eddystone_123
210        """
211        cmd = "Connect to a LE peripheral."
212        try:
213            if len(line) > 0:
214                self.target_device_name = line
215                self.unique_mac_addr_id = None
216            if not self.unique_mac_addr_id:
217                try:
218                    self._find_unique_id()
219                except Exception as err:
220                    self.log.info("Failed to scan or find device.")
221                    return
222            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
223                self.unique_mac_addr_id)
224            self.log.info("Connection status: {}".format(
225                pprint.pformat(connection_status)))
226        except Exception as err:
227            self.log.error(FAILURE.format(cmd, err))
228
229    def do_gattc_connect_disconnect_iterations(self, line):
230        """
231        Description: Connect then disconnect to a LE peripheral multiple times.
232        Input(s):
233            iterations: Required. The number of iterations to run.
234        Usage:
235          Examples:
236            gattc_connect_disconnect_iterations 10
237        """
238        cmd = "Connect to a LE peripheral."
239        try:
240            if not self.unique_mac_addr_id:
241                try:
242                    self._find_unique_id()
243                except Exception as err:
244                    self.log.info("Failed to scan or find device.")
245                    return
246            for i in range(int(line)):
247                self.log.info("Running iteration {}".format(i + 1))
248                connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
249                    self.unique_mac_addr_id)
250                self.log.info("Connection status: {}".format(
251                    pprint.pformat(connection_status)))
252                time.sleep(4)
253                disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
254                    self.unique_mac_addr_id)
255                self.log.info("Disconnect status: {}".format(disc_status))
256                time.sleep(3)
257        except Exception as err:
258            self.log.error(FAILURE.format(cmd, err))
259
260    def do_gattc_disconnect(self, line):
261        """
262        Description: Disconnect from LE peripheral.
263        Assumptions: Already connected to a peripheral.
264        Usage:
265          Examples:
266            gattc_disconnect
267        """
268        cmd = "Disconenct from LE peripheral."
269        try:
270            disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
271                self.unique_mac_addr_id)
272            self.log.info("Disconnect status: {}".format(disconnect_status))
273        except Exception as err:
274            self.log.error(FAILURE.format(cmd, err))
275
276    def do_gattc_list_services(self, line):
277        """
278        Description: List services from LE peripheral.
279        Assumptions: Already connected to a peripheral.
280        Usage:
281          Examples:
282            gattc_list_services
283        """
284        cmd = "List services from LE peripheral."
285        try:
286            services = self.pri_dut.gattc_lib.listServices(
287                self.unique_mac_addr_id)
288            self.log.info("Discovered Services: \n{}".format(
289                pprint.pformat(services)))
290        except Exception as err:
291            self.log.error(FAILURE.format(cmd, err))
292
293    def do_gattc_connect_to_service(self, line):
294        """
295        Description: Connect to Peripheral GATT server service.
296        Assumptions: Already connected to peripheral.
297        Input(s):
298            service_id: Required. The service id reference on the GATT server.
299        Usage:
300          Examples:
301            gattc_connect_to_service service_id
302        """
303        cmd = "GATT client connect to GATT server service."
304        try:
305            self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id,
306                                                    int(line))
307        except Exception as err:
308            self.log.error(FAILURE.format(cmd, err))
309
310    def do_gattc_discover_characteristics(self, line):
311        """
312        Description: Discover characteristics from a connected service.
313        Assumptions: Already connected to a GATT server service.
314        Usage:
315          Examples:
316            gattc_discover_characteristics
317        """
318        cmd = "Discover and list characteristics from a GATT server."
319        try:
320            chars = self.pri_dut.gattc_lib.discoverCharacteristics()
321            self.log.info("Discovered chars:\n{}".format(
322                pprint.pformat(chars)))
323        except Exception as err:
324            self.log.error(FAILURE.format(cmd, err))
325
326    def do_gattc_notify_all_chars(self, line):
327        """
328        Description: Enable all notifications on all Characteristics on
329            a GATT server.
330        Assumptions: Basic GATT connection made.
331        Usage:
332          Examples:
333            gattc_notify_all_chars
334        """
335        cmd = "Read all characteristics from the GATT service."
336        try:
337            services = self.pri_dut.gattc_lib.listServices(
338                self.unique_mac_addr_id)
339            for service in services['result']:
340                service_id = service['id']
341                service_uuid = service['uuid_type']
342                self.pri_dut.gattc_lib.connectToService(
343                    self.unique_mac_addr_id, service_id)
344                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
345                print("Reading chars in service uuid: {}".format(service_uuid))
346
347                for char in chars['result']:
348                    char_id = char['id']
349                    char_uuid = char['uuid_type']
350                    # quick char filter for apple-4 test... remove later
351                    print("found uuid {}".format(char_uuid))
352                    try:
353                        self.pri_dut.gattc_lib.enableNotifyCharacteristic(
354                            char_id)
355                    except Exception as err:
356                        print("error enabling notification")
357        except Exception as err:
358            self.log.error(FAILURE.format(cmd, err))
359
360    def do_gattc_read_all_chars(self, line):
361        """
362        Description: Read all Characteristic values from a GATT server across
363            all services.
364        Assumptions: Basic GATT connection made.
365        Usage:
366          Examples:
367            gattc_read_all_chars
368        """
369        cmd = "Read all characteristics from the GATT service."
370        try:
371            services = self.pri_dut.gattc_lib.listServices(
372                self.unique_mac_addr_id)
373            for service in services['result']:
374                service_id = service['id']
375                service_uuid = service['uuid_type']
376                self.pri_dut.gattc_lib.connectToService(
377                    self.unique_mac_addr_id, service_id)
378                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
379                print("Reading chars in service uuid: {}".format(service_uuid))
380
381                for char in chars['result']:
382                    char_id = char['id']
383                    char_uuid = char['uuid_type']
384                    try:
385                        read_val =  \
386                            self.pri_dut.gattc_lib.readCharacteristicById(
387                                char_id)
388                        print("  Characteristic uuid / Value: {} / {}".format(
389                            char_uuid, read_val['result']))
390                        str_value = ""
391                        for val in read_val['result']:
392                            str_value += chr(val)
393                        print("    str val: {}".format(str_value))
394                    except Exception as err:
395                        print(err)
396                        pass
397        except Exception as err:
398            self.log.error(FAILURE.format(cmd, err))
399
400    def do_gattc_read_all_desc(self, line):
401        """
402        Description: Read all Descriptors values from a GATT server across
403            all services.
404        Assumptions: Basic GATT connection made.
405        Usage:
406          Examples:
407            gattc_read_all_chars
408        """
409        cmd = "Read all descriptors from the GATT service."
410        try:
411            services = self.pri_dut.gattc_lib.listServices(
412                self.unique_mac_addr_id)
413            for service in services['result']:
414                service_id = service['id']
415                service_uuid = service['uuid_type']
416                self.pri_dut.gattc_lib.connectToService(
417                    self.unique_mac_addr_id, service_id)
418                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
419                print("Reading descs in service uuid: {}".format(service_uuid))
420
421                for char in chars['result']:
422                    char_id = char['id']
423                    char_uuid = char['uuid_type']
424                    descriptors = char['descriptors']
425                    print("  Reading descs in char uuid: {}".format(char_uuid))
426                    for desc in descriptors:
427                        desc_id = desc["id"]
428                        desc_uuid = desc["uuid_type"]
429                    try:
430                        read_val = self.pri_dut.gattc_lib.readDescriptorById(
431                            desc_id)
432                        print("    Descriptor uuid / Value: {} / {}".format(
433                            desc_uuid, read_val['result']))
434                    except Exception as err:
435                        pass
436        except Exception as err:
437            self.log.error(FAILURE.format(cmd, err))
438
439    def do_gattc_write_all_desc(self, line):
440        """
441        Description: Write a value to all Descriptors on the GATT server.
442        Assumptions: Basic GATT connection made.
443        Input(s):
444            offset: Required. The offset to start writing to.
445            size: Required. The size of bytes to write (value will be generated).
446                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
447        Usage:
448          Examples:
449            gattc_write_all_desc 0 100
450            gattc_write_all_desc 10 2
451        """
452        cmd = "Read all descriptors from the GATT service."
453        try:
454            args = line.split()
455            if len(args) != 2:
456                self.log.info("2 Arguments required: [Offset] [Size]")
457                return
458            offset = int(args[0])
459            size = args[1]
460            write_value = []
461            for i in range(int(size)):
462                write_value.append(i % 256)
463            services = self.pri_dut.gattc_lib.listServices(
464                self.unique_mac_addr_id)
465            for service in services['result']:
466                service_id = service['id']
467                service_uuid = service['uuid_type']
468                self.pri_dut.gattc_lib.connectToService(
469                    self.unique_mac_addr_id, service_id)
470                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
471                print("Writing descs in service uuid: {}".format(service_uuid))
472
473                for char in chars['result']:
474                    char_id = char['id']
475                    char_uuid = char['uuid_type']
476                    descriptors = char['descriptors']
477                    print("  Reading descs in char uuid: {}".format(char_uuid))
478                    for desc in descriptors:
479                        desc_id = desc["id"]
480                        desc_uuid = desc["uuid_type"]
481                    try:
482                        write_val = self.pri_dut.gattc_lib.writeDescriptorById(
483                            desc_id, offset, write_value)
484                        print("    Descriptor uuid / Result: {} / {}".format(
485                            desc_uuid, write_val['result']))
486                    except Exception as err:
487                        pass
488        except Exception as err:
489            self.log.error(FAILURE.format(cmd, err))
490
491    def do_gattc_read_all_long_desc(self, line):
492        """
493        Description: Read all long Characteristic Descriptors
494        Assumptions: Basic GATT connection made.
495        Input(s):
496            offset: Required. The offset to start reading from.
497            max_bytes: Required. The max size of bytes to return.
498        Usage:
499          Examples:
500            gattc_read_all_long_desc 0 100
501            gattc_read_all_long_desc 10 20
502        """
503        cmd = "Read all long descriptors from the GATT service."
504        try:
505            args = line.split()
506            if len(args) != 2:
507                self.log.info("2 Arguments required: [Offset] [Size]")
508                return
509            offset = int(args[0])
510            max_bytes = int(args[1])
511            services = self.pri_dut.ble_lib.bleListServices(
512                self.unique_mac_addr_id)
513            for service in services['result']:
514                service_id = service['id']
515                service_uuid = service['uuid_type']
516                self.pri_dut.gattc_lib.connectToService(
517                    self.unique_mac_addr_id, service_id)
518                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
519                print("Reading descs in service uuid: {}".format(service_uuid))
520
521                for char in chars['result']:
522                    char_id = char['id']
523                    char_uuid = char['uuid_type']
524                    descriptors = char['descriptors']
525                    print("  Reading descs in char uuid: {}".format(char_uuid))
526                    for desc in descriptors:
527                        desc_id = desc["id"]
528                        desc_uuid = desc["uuid_type"]
529                    try:
530                        read_val = self.pri_dut.gattc_lib.readLongDescriptorById(
531                            desc_id, offset, max_bytes)
532                        print("    Descriptor uuid / Result: {} / {}".format(
533                            desc_uuid, read_val['result']))
534                    except Exception as err:
535                        pass
536        except Exception as err:
537            self.log.error(FAILURE.format(cmd, err))
538
539    def do_gattc_read_all_long_char(self, line):
540        """
541        Description: Read all long Characteristic
542        Assumptions: Basic GATT connection made.
543        Input(s):
544            offset: Required. The offset to start reading from.
545            max_bytes: Required. The max size of bytes to return.
546        Usage:
547          Examples:
548            gattc_read_all_long_char 0 100
549            gattc_read_all_long_char 10 20
550        """
551        cmd = "Read all long Characteristics from the GATT service."
552        try:
553            args = line.split()
554            if len(args) != 2:
555                self.log.info("2 Arguments required: [Offset] [Size]")
556                return
557            offset = int(args[0])
558            max_bytes = int(args[1])
559            services = self.pri_dut.ble_lib.bleListServices(
560                self.unique_mac_addr_id)
561            for service in services['result']:
562                service_id = service['id']
563                service_uuid = service['uuid_type']
564                self.pri_dut.gattc_lib.connectToService(
565                    self.unique_mac_addr_id, service_id)
566                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
567                print("Reading chars in service uuid: {}".format(service_uuid))
568
569                for char in chars['result']:
570                    char_id = char['id']
571                    char_uuid = char['uuid_type']
572                    try:
573                        read_val = self.pri_dut.gattc_lib.readLongCharacteristicById(
574                            char_id, offset, max_bytes)
575                        print("    Char uuid / Result: {} / {}".format(
576                            char_uuid, read_val['result']))
577                    except Exception as err:
578                        pass
579        except Exception as err:
580            self.log.error(FAILURE.format(cmd, err))
581
582    def do_gattc_write_all_chars(self, line):
583        """
584        Description: Write all characteristic values from a GATT server across
585            all services.
586        Assumptions: Basic GATT connection made.
587        Input(s):
588            offset: Required. The offset to start writing on.
589            size: The write value size (value will be generated)
590                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
591        Usage:
592          Examples:
593            gattc_write_all_chars 0 10
594            gattc_write_all_chars 10 1
595        """
596        cmd = "Read all characteristics from the GATT service."
597        try:
598            args = line.split()
599            if len(args) != 2:
600                self.log.info("2 Arguments required: [Offset] [Size]")
601                return
602            offset = int(args[0])
603            size = int(args[1])
604            write_value = []
605            for i in range(size):
606                write_value.append(i % 256)
607            services = self.pri_dut.gattc_lib.listServices(
608                self.unique_mac_addr_id)
609            for service in services['result']:
610                service_id = service['id']
611                service_uuid = service['uuid_type']
612                self.pri_dut.gattc_lib.connectToService(
613                    self.unique_mac_addr_id, service_id)
614                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
615                print("Writing chars in service uuid: {}".format(service_uuid))
616
617                for char in chars['result']:
618                    char_id = char['id']
619                    char_uuid = char['uuid_type']
620                    try:
621                        write_result = self.pri_dut.gattc_lib.writeCharById(
622                            char_id, offset, write_value)
623                        print("  Characteristic uuid write result: {} / {}".
624                              format(char_uuid, write_result['result']))
625                    except Exception as err:
626                        print("error writing char {}".format(err))
627                        pass
628        except Exception as err:
629            self.log.error(FAILURE.format(cmd, err))
630
631    def do_gattc_write_all_chars_without_response(self, line):
632        """
633        Description: Write all characteristic values from a GATT server across
634            all services.
635        Assumptions: Basic GATT connection made.
636        Input(s):
637            size: The write value size (value will be generated).
638                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
639        Usage:
640          Examples:
641            gattc_write_all_chars_without_response 100
642        """
643        cmd = "Read all characteristics from the GATT service."
644        try:
645            args = line.split()
646            if len(args) != 1:
647                self.log.info("1 Arguments required: [Size]")
648                return
649            size = int(args[0])
650            write_value = []
651            for i in range(size):
652                write_value.append(i % 256)
653            services = self.pri_dut.gattc_lib.listServices(
654                self.unique_mac_addr_id)
655            for service in services['result']:
656                service_id = service['id']
657                service_uuid = service['uuid_type']
658                self.pri_dut.gattc_lib.connectToService(
659                    self.unique_mac_addr_id, service_id)
660                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
661                print("Reading chars in service uuid: {}".format(service_uuid))
662
663                for char in chars['result']:
664                    char_id = char['id']
665                    char_uuid = char['uuid_type']
666                    try:
667                        write_result = \
668                            self.pri_dut.gattc_lib.writeCharByIdWithoutResponse(
669                                char_id, write_value)
670                        print("  Characteristic uuid write result: {} / {}".
671                              format(char_uuid, write_result['result']))
672                    except Exception as err:
673                        pass
674        except Exception as err:
675            self.log.error(FAILURE.format(cmd, err))
676
677    def do_gattc_write_char_by_id(self, line):
678        """
679        Description: Write char by characteristic id reference.
680        Assumptions: Already connected to a GATT server service.
681        Input(s):
682            characteristic_id: The characteristic id reference on the GATT
683                service
684            offset: The offset value to use
685            size: Function will generate random bytes by input size.
686                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
687        Usage:
688          Examples:
689            gattc_write_char_by_id char_id 0 5
690            gattc_write_char_by_id char_id 20 1
691        """
692        cmd = "Write to GATT server characteristic ."
693        try:
694            args = line.split()
695            if len(args) != 3:
696                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
697                return
698            id = int(args[0])
699            offset = int(args[1])
700            size = int(args[2])
701            write_value = []
702            for i in range(size):
703                write_value.append(i % 256)
704            self.pri_dut.gattc_lib.writeCharById(id, offset, write_value)
705        except Exception as err:
706            self.log.error(FAILURE.format(cmd, err))
707
708    def do_gattc_write_char_by_id_without_response(self, line):
709        """
710        Description: Write char by characteristic id reference without response.
711        Assumptions: Already connected to a GATT server service.
712        Input(s):
713            characteristic_id: The characteristic id reference on the GATT
714                service
715            size: Function will generate random bytes by input size.
716                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
717        Usage:
718          Examples:
719            gattc_write_char_by_id_without_response char_id 5
720        """
721        cmd = "Write characteristic by id without response."
722        try:
723            args = line.split()
724            if len(args) != 2:
725                self.log.info("2 Arguments required: [Id] [Size]")
726                return
727            id = int(args[0])
728            size = args[1]
729            write_value = []
730            for i in range(int(size)):
731                write_value.append(i % 256)
732            self.pri_dut.gattc_lib.writeCharByIdWithoutResponse(
733                id, write_value)
734        except Exception as err:
735            self.log.error(FAILURE.format(cmd, err))
736
737    def do_gattc_enable_notify_char_by_id(self, line):
738        """
739        Description: Enable Characteristic notification on Characteristic ID.
740        Assumptions: Already connected to a GATT server service.
741        Input(s):
742            characteristic_id: The characteristic id reference on the GATT
743                service
744        Usage:
745          Examples:
746            gattc_enable_notify_char_by_id char_id
747        """
748        cmd = "Enable notifications by Characteristic id."
749        try:
750            id = int(line)
751            self.pri_dut.gattc_lib.enableNotifyCharacteristic(id)
752        except Exception as err:
753            self.log.error(FAILURE.format(cmd, err))
754
755    def do_gattc_disable_notify_char_by_id(self, line):
756        """
757        Description: Disable Characteristic notification on Characteristic ID.
758        Assumptions: Already connected to a GATT server service.
759        Input(s):
760            characteristic_id: The characteristic id reference on the GATT
761                service
762        Usage:
763          Examples:
764            gattc_disable_notify_char_by_id char_id
765        """
766        cmd = "Disable notify Characteristic by id."
767        try:
768            id = int(line)
769            self.pri_dut.gattc_lib.disableNotifyCharacteristic(id)
770        except Exception as err:
771            self.log.error(FAILURE.format(cmd, err))
772
773    def do_gattc_read_char_by_id(self, line):
774        """
775        Description: Read Characteristic by ID.
776        Assumptions: Already connected to a GATT server service.
777        Input(s):
778            characteristic_id: The characteristic id reference on the GATT
779                service
780        Usage:
781          Examples:
782            gattc_read_char_by_id char_id
783        """
784        cmd = "Read Characteristic value by ID."
785        try:
786            id = int(line)
787            read_val = self.pri_dut.gattc_lib.readCharacteristicById(id)
788            self.log.info("Characteristic Value with id {}: {}".format(
789                id, read_val['result']))
790            str_value = ""
791            for val in read_val['result']:
792                str_value += chr(val)
793            print("    str val: {}".format(str_value))
794        except Exception as err:
795            self.log.error(FAILURE.format(cmd, err))
796
797    def do_gattc_write_desc_by_id(self, line):
798        """
799        Description: Write Descriptor by characteristic id reference.
800        Assumptions: Already connected to a GATT server service.
801        Input(s):
802            descriptor_id: The Descriptor id reference on the GATT service
803            offset: The offset value to use
804            size: Function will generate random bytes by input size.
805                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
806        Usage:
807          Examples:
808            gattc_write_desc_by_id desc_id 0 5
809            gattc_write_desc_by_id desc_id 20 1
810        """
811        cmd = "Write Descriptor by id."
812        try:
813            args = line.split()
814            id = int(args[0])
815            offset = int(args[1])
816            size = args[2]
817            write_value = []
818            for i in range(int(size)):
819                write_value.append(i % 256)
820            write_result = self.pri_dut.gattc_lib.writeDescriptorById(
821                id, offset, write_value)
822            self.log.info("Descriptor Write result {}: {}".format(
823                id, write_result['result']))
824        except Exception as err:
825            self.log.error(FAILURE.format(cmd, err))
826
827    def do_gattc_read_desc_by_id(self, line):
828        """
829        Description: Read Descriptor by ID.
830        Assumptions: Already connected to a GATT server service.
831        Input(s):
832            descriptor_id: The Descriptor id reference on the GATT service
833        Usage:
834          Examples:
835            gattc_read_desc_by_id desc_id
836        """
837        cmd = "Read Descriptor by ID."
838        try:
839            id = int(line)
840            read_val = self.pri_dut.gattc_lib.readDescriptorById(id)
841            self.log.info("Descriptor Value with id {}: {}".format(
842                id, read_val['result']))
843        except Exception as err:
844            self.log.error(FAILURE.format(cmd, err))
845
846    def do_gattc_read_long_char_by_id(self, line):
847        """
848        Description: Read long Characteristic value by id.
849        Assumptions: Already connected to a GATT server service.
850        Input(s):
851            characteristic_id: The characteristic id reference on the GATT
852                service
853            offset: The offset value to use.
854            max_bytes: The max bytes size to return.
855        Usage:
856          Examples:
857            gattc_read_long_char_by_id char_id 0 10
858            gattc_read_long_char_by_id char_id 20 1
859        """
860        cmd = "Read long Characteristic value by id."
861        try:
862            args = line.split()
863            if len(args) != 3:
864                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
865                return
866            id = int(args[0])
867            offset = int(args[1])
868            max_bytes = int(args[2])
869            read_val = self.pri_dut.gattc_lib.readLongCharacteristicById(
870                id, offset, max_bytes)
871            self.log.info("Characteristic Value with id {}: {}".format(
872                id, read_val['result']))
873
874        except Exception as err:
875            self.log.error(FAILURE.format(cmd, err))
876
877    """End GATT client wrappers"""
878    """Begin LE scan wrappers"""
879
880    def _update_scan_results(self, scan_results):
881        self.le_ids = []
882        for scan in scan_results['result']:
883            self.le_ids.append(scan['id'])
884
885    def do_ble_start_scan(self, line):
886        """
887        Description: Perform a BLE scan.
888        Default filter name: ""
889        Optional input: filter_device_name
890        Usage:
891          Examples:
892            ble_start_scan
893            ble_start_scan eddystone
894        """
895        cmd = "Perform a BLE scan and list discovered devices."
896        try:
897            scan_filter = {"name_substring": ""}
898            if line:
899                scan_filter = {"name_substring": line}
900            self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
901        except Exception as err:
902            self.log.error(FAILURE.format(cmd, err))
903
904    def do_ble_stop_scan(self, line):
905        """
906        Description: Stops a BLE scan and returns discovered devices.
907        Usage:
908          Examples:
909            ble_stop_scan
910        """
911        cmd = "Stops a BLE scan and returns discovered devices."
912        try:
913            scan_results = self.pri_dut.gattc_lib.bleStopBleScan()
914            self._update_scan_results(scan_results)
915            self.log.info(pprint.pformat(scan_results))
916        except Exception as err:
917            self.log.error(FAILURE.format(cmd, err))
918
919    def do_ble_get_discovered_devices(self, line):
920        """
921        Description: Get discovered LE devices of an active scan.
922        Usage:
923          Examples:
924            ble_stop_scan
925        """
926        cmd = "Get discovered LE devices of an active scan."
927        try:
928            scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices()
929            self._update_scan_results(scan_results)
930            self.log.info(pprint.pformat(scan_results))
931        except Exception as err:
932            self.log.error(FAILURE.format(cmd, err))
933
934    """End LE scan wrappers"""
935    """Begin GATT Server wrappers"""
936
937    def complete_gatts_setup_database(self, text, line, begidx, endidx):
938        if not text:
939            completions = list(
940                gatt_test_database.GATT_SERVER_DB_MAPPING.keys())
941        else:
942            completions = [
943                s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
944                if s.startswith(text)
945            ]
946        return completions
947
948    def do_gatts_setup_database(self, line):
949        """
950        Description: Setup a Gatt server database based on pre-defined inputs.
951            Supports Tab Autocomplete.
952        Input(s):
953            descriptor_db_name: The descriptor db name that matches one in
954                acts.test_utils.bt.gatt_test_database
955        Usage:
956          Examples:
957            gatts_setup_database LARGE_DB_1
958        """
959        cmd = "Setup GATT Server Database Based of pre-defined dictionaries"
960        try:
961            scan_results = self.pri_dut.gatts_lib.publishServer(
962                gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
963            print(scan_results)
964        except Exception as err:
965            self.log.error(FAILURE.format(cmd, err))
966
967    """End GATT Server wrappers"""
968