• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2019 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Python script for wrappers to various libraries.
18
19Class CmdInput inherts from the cmd library.
20
21Functions that start with "do_" have a method
22signature that doesn't match the actual command
23line command and that is intended. This is so the
24"help" command knows what to display (in this case
25the documentation of the command itself).
26
27For example:
28Looking at the function "do_tool_set_target_device_name"
29has the inputs self and line which is expected of this type
30of method signature. When the "help" command is done on the
31method name you get the function documentation as such:
32
33(Cmd) help tool_set_target_device_name
34
35        Description: Reset the target device name.
36        Input(s):
37            device_name: Required. The advertising name to connect to.
38        Usage: tool_set_target_device_name new_target_device name
39          Examples:
40            tool_set_target_device_name le_watch
41
42This is all to say this documentation pattern is expected.
43
44"""
45
46from acts_contrib.test_utils.audio_analysis_lib.check_quality import quality_analysis
47from acts_contrib.test_utils.bt.bt_constants import audio_bits_per_sample_32
48from acts_contrib.test_utils.bt.bt_constants import audio_sample_rate_48000
49from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device
50from acts_contrib.test_utils.bt.bt_constants import bt_attribute_values
51from acts_contrib.test_utils.bt.bt_constants import sig_appearance_constants
52from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants
53from acts_contrib.test_utils.fuchsia.sdp_records import sdp_pts_record_list
54
55import acts_contrib.test_utils.bt.gatt_test_database as gatt_test_database
56
57import cmd
58import pprint
59import time
60"""Various Global Strings"""
61BASE_UUID = sig_uuid_constants['BASE_UUID']
62CMD_LOG = "CMD {} result: {}"
63FAILURE = "CMD {} threw exception: {}"
64BASIC_ADV_NAME = "fs_test"
65
66
67class CommandInput(cmd.Cmd):
68    ble_adv_interval = 1000
69    ble_adv_appearance = None
70    ble_adv_data_include_tx_power_level = False
71    ble_adv_include_name = True
72    ble_adv_include_scan_response = False
73    ble_adv_name = "fs_test"
74    ble_adv_data_manufacturer_data = None
75    ble_adv_data_service_data = None
76    ble_adv_data_service_uuid_list = None
77    ble_adv_data_uris = None
78
79    bt_control_ids = []
80    bt_control_names = []
81    bt_control_devices = []
82    bt_scan_poll_timer = 0.5
83    target_device_name = ""
84    le_ids = []
85    unique_mac_addr_id = None
86
87    def setup_vars(self, dut, target_device_name, log):
88        self.pri_dut = dut
89        # Note: test_dut is the start of a slow conversion from a Fuchsia specific
90        # Tool to an abstract_device tool. Only commands that use test_dut will work
91        # Otherwise this tool is primarially targeted at Fuchsia devices.
92        self.test_dut = create_bluetooth_device(self.pri_dut)
93        self.test_dut.initialize_bluetooth_controller()
94        self.target_device_name = target_device_name
95        self.log = log
96
97    def emptyline(self):
98        pass
99
100    def do_EOF(self, line):
101        "End Script"
102        return True
103
104    """ Useful Helper functions and cmd line tooling """
105
106    def str_to_bool(self, s):
107        if s.lower() == 'true':
108            return True
109        elif s.lower() == 'false':
110            return False
111
112    def _find_unique_id_over_le(self):
113        scan_filter = {"name_substring": self.target_device_name}
114        self.unique_mac_addr_id = None
115        self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
116        tries = 10
117        for i in range(tries):
118            time.sleep(self.bt_scan_poll_timer)
119            scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices(
120            )['result']
121            for device in scan_res:
122                name, did, connectable = device["name"], device["id"], device[
123                    "connectable"]
124                if (self.target_device_name in name):
125                    self.unique_mac_addr_id = did
126                    self.log.info(
127                        "Successfully found device: name, id: {}, {}".format(
128                            name, did))
129                    break
130            if self.unique_mac_addr_id:
131                break
132        self.pri_dut.gattc_lib.bleStopBleScan()
133
134    def _find_unique_id_over_bt_control(self):
135        self.unique_mac_addr_id = None
136        self.bt_control_devices = []
137        self.pri_dut.bts_lib.requestDiscovery(True)
138        tries = 10
139        for i in range(tries):
140            if self.unique_mac_addr_id:
141                break
142            time.sleep(self.bt_scan_poll_timer)
143            device_list = self.pri_dut.bts_lib.getKnownRemoteDevices(
144            )['result']
145            for id_dict in device_list:
146                device = device_list[id_dict]
147                self.bt_control_devices.append(device)
148                name = None
149                if device['name'] is not None:
150                    name = device['name']
151                did, address = device['id'], device['address']
152
153                self.bt_control_ids.append(did)
154                if name is not None:
155                    self.bt_control_names.append(name)
156                    if self.target_device_name in name:
157                        self.unique_mac_addr_id = did
158                        self.log.info(
159                            "Successfully found device: name, id, address: {}, {}, {}"
160                            .format(name, did, address))
161                        break
162        self.pri_dut.bts_lib.requestDiscovery(False)
163
164    def do_tool_take_bt_snoop_log(self, custom_name):
165        """
166        Description: Takes the bt snoop log from the Fuchsia device.
167        Logs will show up in your config files' logpath directory.
168
169        Input(s):
170            custom_name: Optional. Override the default pcap file name.
171
172        Usage: tool_set_target_device_name new_target_device name
173          Examples:
174            tool_take_bt_snoop_log connection_error
175            tool_take_bt_snoop_log
176        """
177        self.pri_dut.take_bt_snoop_log(custom_name)
178
179    def do_tool_refresh_unique_id(self, line):
180        """
181        Description: Refresh command line tool mac unique id.
182        Usage:
183          Examples:
184            tool_refresh_unique_id
185        """
186        try:
187            self._find_unique_id_over_le()
188        except Exception as err:
189            self.log.error(
190                "Failed to scan or find scan result: {}".format(err))
191
192    def do_tool_refresh_unique_id_using_bt_control(self, line):
193        """
194        Description: Refresh command line tool mac unique id.
195        Usage:
196          Examples:
197            tool_refresh_unique_id_using_bt_control
198        """
199        try:
200            self._find_unique_id_over_bt_control()
201        except Exception as err:
202            self.log.error(
203                "Failed to scan or find scan result: {}".format(err))
204
205    def do_tool_set_target_device_name(self, line):
206        """
207        Description: Reset the target device name.
208        Input(s):
209            device_name: Required. The advertising name to connect to.
210        Usage: tool_set_target_device_name new_target_device name
211          Examples:
212            tool_set_target_device_name le_watch
213        """
214        self.log.info("Setting target_device_name to: {}".format(line))
215        self.target_device_name = line
216
217    def do_tool_set_unique_mac_addr_id(self, line):
218        """
219        Description: Sets the unique mac address id (Specific to Fuchsia)
220        Input(s):
221            device_id: Required. The id to set the unique mac address id to
222        Usage: tool_set_unique_mac_addr_id device_id
223          Examples:
224            tool_set_unique_mac_addr_id 7fb2cae53aad9e0d
225        """
226        self.unique_mac_addr_id = line
227
228    """Begin BLE advertise wrappers"""
229
230    def complete_ble_adv_data_include_name(self, text, line, begidx, endidx):
231        roles = ["true", "false"]
232        if not text:
233            completions = roles
234        else:
235            completions = [s for s in roles if s.startswith(text)]
236        return completions
237
238    def do_ble_adv_data_include_name(self, line):
239        cmd = "Include name in the advertisement."
240        try:
241            self.ble_adv_include_name = self.str_to_bool(line)
242        except Exception as err:
243            self.log.error(FAILURE.format(cmd, err))
244
245    def do_ble_adv_data_set_name(self, line):
246        cmd = "Set the name to be included in the advertisement."
247        try:
248            self.ble_adv_name = line
249        except Exception as err:
250            self.log.error(FAILURE.format(cmd, err))
251
252    def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx):
253        if not text:
254            completions = list(sig_appearance_constants.keys())
255        else:
256            completions = [
257                s for s in sig_appearance_constants.keys()
258                if s.startswith(text)
259            ]
260        return completions
261
262    def do_ble_adv_data_set_appearance(self, line):
263        cmd = "Set the appearance to known SIG values."
264        try:
265            self.ble_adv_appearance = line
266        except Exception as err:
267            self.log.error(FAILURE.format(cmd, err))
268
269    def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx,
270                                                     endidx):
271        options = ['true', 'false']
272        if not text:
273            completions = list(options)[:]
274        else:
275            completions = [s for s in options if s.startswith(text)]
276        return completions
277
278    def do_ble_adv_data_include_tx_power_level(self, line):
279        """Include the tx_power_level in the advertising data.
280        Description: Adds tx_power_level to the advertisement data to the BLE
281            advertisement.
282        Input(s):
283            value: Required. True or False
284        Usage: ble_adv_data_include_tx_power_level bool_value
285          Examples:
286            ble_adv_data_include_tx_power_level true
287            ble_adv_data_include_tx_power_level false
288        """
289        cmd = "Include tx_power_level in advertisement."
290        try:
291            self.ble_adv_data_include_tx_power_level = self.str_to_bool(line)
292        except Exception as err:
293            self.log.info(FAILURE.format(cmd, err))
294
295    def complete_ble_adv_include_scan_response(self, text, line, begidx,
296                                               endidx):
297        options = ['true', 'false']
298        if not text:
299            completions = list(options)[:]
300        else:
301            completions = [s for s in options if s.startswith(text)]
302        return completions
303
304    def do_ble_adv_include_scan_response(self, line):
305        """Include scan response in advertisement. inputs: [true|false]
306            Note: Currently just sets the scan response data to the
307                Advertisement data.
308        """
309        cmd = "Include tx_power_level in advertisement."
310        try:
311            self.ble_adv_include_scan_response = self.str_to_bool(line)
312        except Exception as err:
313            self.log.info(FAILURE.format(cmd, err))
314
315    def do_ble_adv_data_add_manufacturer_data(self, line):
316        """Include manufacturer id and data to the advertisment
317        Description: Adds manufacturer data to the BLE advertisement.
318        Input(s):
319            id: Required. The int representing the manufacturer id.
320            data: Required. The string representing the data.
321        Usage: ble_adv_data_add_manufacturer_data id data
322          Examples:
323            ble_adv_data_add_manufacturer_data 1 test
324        """
325        cmd = "Include manufacturer id and data to the advertisment."
326        try:
327
328            info = line.split()
329            if self.ble_adv_data_manufacturer_data is None:
330                self.ble_adv_data_manufacturer_data = []
331            self.ble_adv_data_manufacturer_data.append({
332                "id": int(info[0]),
333                "data": info[1]
334            })
335        except Exception as err:
336            self.log.info(FAILURE.format(cmd, err))
337
338    def do_ble_adv_data_add_service_data(self, line):
339        """Include service data to the advertisment
340        Description: Adds service data to the BLE advertisement.
341        Input(s):
342            uuid: Required. The string representing the uuid.
343            data: Required. The string representing the data.
344        Usage: ble_adv_data_add_service_data uuid data
345          Examples:
346            ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test
347        """
348        cmd = "Include manufacturer id and data to the advertisment."
349        try:
350            info = line.split()
351            if self.ble_adv_data_service_data is None:
352                self.ble_adv_data_service_data = []
353            self.ble_adv_data_service_data.append({
354                "uuid": info[0],
355                "data": info[1]
356            })
357        except Exception as err:
358            self.log.info(FAILURE.format(cmd, err))
359
360    def do_ble_adv_add_service_uuid_list(self, line):
361        """Include a list of service uuids to the advertisment:
362        Description: Adds service uuid list to the BLE advertisement.
363        Input(s):
364            uuid: Required. A list of N string UUIDs to add.
365        Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN
366          Examples:
367            ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb
368            ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb
369        """
370        cmd = "Include service uuid list to the advertisment data."
371        try:
372            self.ble_adv_data_service_uuid_list = line
373        except Exception as err:
374            self.log.info(FAILURE.format(cmd, err))
375
376    def do_ble_adv_data_set_uris(self, uris):
377        """Set the URIs of the LE advertisement data:
378        Description: Adds list of String UIRs
379          See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986)
380          Valid URI examples:
381            ftp://ftp.is.co.za/rfc/rfc1808.txt
382            http://www.ietf.org/rfc/rfc2396.txt
383            ldap://[2001:db8::7]/c=GB?objectClass?one
384            mailto:John.Doe@example.com
385            news:comp.infosystems.www.servers.unix
386            tel:+1-816-555-1212
387            telnet://192.0.2.16:80/
388            urn:oasis:names:specification:docbook:dtd:xml:4.1.2
389        Input(s):
390            uris: Required. A list of URIs to add.
391        Usage: ble_adv_data_set_uris uri0 uri1 ... uriN
392          Examples:
393            ble_adv_data_set_uris telnet://192.0.2.16:80/
394            ble_adv_data_set_uris tel:+1-816-555-1212
395        """
396        cmd = "Set the appearance to known SIG values."
397        try:
398            self.ble_adv_data_uris = uris.split()
399        except Exception as err:
400            self.log.error(FAILURE.format(cmd, err))
401
402    def start_advertisement(self, connectable):
403        """ Handle setting advertising data and the advertisement
404            Note: After advertisement is successful, clears values set for
405                * Manufacturer data
406                * Appearance information
407                * Scan Response
408                * Service UUIDs
409                * URI list
410            Args:
411                connectable: Bool of whether to start a connectable
412                    advertisement or not.
413        """
414        adv_data_name = self.ble_adv_name
415        if not self.ble_adv_include_name:
416            adv_data_name = None
417
418        manufacturer_data = self.ble_adv_data_manufacturer_data
419
420        tx_power_level = None
421        if self.ble_adv_data_include_tx_power_level:
422            tx_power_level = 1  # Not yet implemented so set to 1
423
424        scan_response = self.ble_adv_include_scan_response
425
426        adv_data = {
427            "name": adv_data_name,
428            "appearance": self.ble_adv_appearance,
429            "service_data": self.ble_adv_data_service_data,
430            "tx_power_level": tx_power_level,
431            "service_uuids": self.ble_adv_data_service_uuid_list,
432            "manufacturer_data": manufacturer_data,
433            "uris": self.ble_adv_data_uris,
434        }
435
436        if not self.ble_adv_include_scan_response:
437            scan_response = None
438        else:
439            scan_response = adv_data
440
441        result = self.pri_dut.ble_lib.bleStartBleAdvertising(
442            adv_data, scan_response, self.ble_adv_interval, connectable)
443        self.log.info("Result of starting advertisement: {}".format(result))
444        self.ble_adv_data_manufacturer_data = None
445        self.ble_adv_appearance = None
446        self.ble_adv_include_scan_response = False
447        self.ble_adv_data_service_uuid_list = None
448        self.ble_adv_data_uris = None
449        self.ble_adv_data_service_data = None
450
451    def do_ble_start_generic_connectable_advertisement(self, line):
452        """
453        Description: Start a connectable LE advertisement
454
455        Usage: ble_start_generic_connectable_advertisement
456        """
457        cmd = "Start a connectable LE advertisement"
458        try:
459            connectable = True
460            self.start_advertisement(connectable)
461        except Exception as err:
462            self.log.error(FAILURE.format(cmd, err))
463
464    def do_ble_start_generic_nonconnectable_advertisement(self, line):
465        """
466        Description: Start a non-connectable LE advertisement
467
468        Usage: ble_start_generic_nonconnectable_advertisement
469        """
470        cmd = "Start a nonconnectable LE advertisement"
471        try:
472            connectable = False
473            self.start_advertisement(connectable)
474        except Exception as err:
475            self.log.error(FAILURE.format(cmd, err))
476
477    def do_ble_stop_advertisement(self, line):
478        """
479        Description: Stop a BLE advertisement.
480        Usage: ble_stop_advertisement
481        """
482        cmd = "Stop a connectable LE advertisement"
483        try:
484            self.pri_dut.ble_lib.bleStopBleAdvertising()
485        except Exception as err:
486            self.log.error(FAILURE.format(cmd, err))
487
488    """End BLE advertise wrappers"""
489    """Begin GATT client wrappers"""
490
491    def complete_gattc_connect_by_id(self, text, line, begidx, endidx):
492        if not text:
493            completions = list(self.le_ids)[:]
494        else:
495            completions = [s for s in self.le_ids if s.startswith(text)]
496        return completions
497
498    def do_gattc_connect_by_id(self, line):
499        """
500        Description: Connect to a LE peripheral.
501        Input(s):
502            device_id: Required. The unique device ID from Fuchsia
503                discovered devices.
504        Usage:
505          Examples:
506            gattc_connect device_id
507        """
508        cmd = "Connect to a LE peripheral by input ID."
509        try:
510
511            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
512                line)
513            self.log.info("Connection status: {}".format(
514                pprint.pformat(connection_status)))
515        except Exception as err:
516            self.log.error(FAILURE.format(cmd, err))
517
518    def do_gattc_connect(self, line):
519        """
520        Description: Connect to a LE peripheral.
521        Optional input: device_name
522        Input(s):
523            device_name: Optional. The peripheral ID to connect to.
524        Usage:
525          Examples:
526            gattc_connect
527            gattc_connect eddystone_123
528        """
529        cmd = "Connect to a LE peripheral."
530        try:
531            if len(line) > 0:
532                self.target_device_name = line
533                self.unique_mac_addr_id = None
534            if not self.unique_mac_addr_id:
535                try:
536                    self._find_unique_id()
537                except Exception as err:
538                    self.log.info("Failed to scan or find device.")
539                    return
540            connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
541                self.unique_mac_addr_id)
542            self.log.info("Connection status: {}".format(
543                pprint.pformat(connection_status)))
544        except Exception as err:
545            self.log.error(FAILURE.format(cmd, err))
546
547    def do_gattc_connect_disconnect_iterations(self, line):
548        """
549        Description: Connect then disconnect to a LE peripheral multiple times.
550        Input(s):
551            iterations: Required. The number of iterations to run.
552        Usage:
553          Examples:
554            gattc_connect_disconnect_iterations 10
555        """
556        cmd = "Connect to a LE peripheral."
557        try:
558            if not self.unique_mac_addr_id:
559                try:
560                    self._find_unique_id()
561                except Exception as err:
562                    self.log.info("Failed to scan or find device.")
563                    return
564            for i in range(int(line)):
565                self.log.info("Running iteration {}".format(i + 1))
566                connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral(
567                    self.unique_mac_addr_id)
568                self.log.info("Connection status: {}".format(
569                    pprint.pformat(connection_status)))
570                time.sleep(4)
571                disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
572                    self.unique_mac_addr_id)
573                self.log.info("Disconnect status: {}".format(disc_status))
574                time.sleep(3)
575        except Exception as err:
576            self.log.error(FAILURE.format(cmd, err))
577
578    def do_gattc_disconnect(self, line):
579        """
580        Description: Disconnect from LE peripheral.
581        Assumptions: Already connected to a peripheral.
582        Usage:
583          Examples:
584            gattc_disconnect
585        """
586        cmd = "Disconenct from LE peripheral."
587        try:
588            disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral(
589                self.unique_mac_addr_id)
590            self.log.info("Disconnect status: {}".format(disconnect_status))
591        except Exception as err:
592            self.log.error(FAILURE.format(cmd, err))
593
594    def do_gattc_list_services(self, discover_chars):
595        """
596        Description: List services from LE peripheral.
597        Assumptions: Already connected to a peripheral.
598        Input(s):
599            discover_chars: Optional. An optional input to discover all
600                characteristics on the service.
601        Usage:
602          Examples:
603            gattc_list_services
604            gattc_list_services true
605        """
606        cmd = "List services from LE peripheral."
607        try:
608
609            services = self.pri_dut.gattc_lib.listServices(
610                self.unique_mac_addr_id)
611            self.log.info("Discovered Services: \n{}".format(
612                pprint.pformat(services)))
613            discover_characteristics = self.str_to_bool(discover_chars)
614            if discover_chars:
615                for service in services.get('result'):
616                    self.pri_dut.gattc_lib.connectToService(
617                        self.unique_mac_addr_id, service.get('id'))
618                    chars = self.pri_dut.gattc_lib.discoverCharacteristics()
619                    self.log.info("Discovered chars:\n{}".format(
620                        pprint.pformat(chars)))
621
622        except Exception as err:
623            self.log.error(FAILURE.format(cmd, err))
624
625    def do_gattc_connect_to_service(self, line):
626        """
627        Description: Connect to Peripheral GATT server service.
628        Assumptions: Already connected to peripheral.
629        Input(s):
630            service_id: Required. The service id reference on the GATT server.
631        Usage:
632          Examples:
633            gattc_connect_to_service service_id
634        """
635        cmd = "GATT client connect to GATT server service."
636        try:
637            self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id,
638                                                    int(line))
639        except Exception as err:
640            self.log.error(FAILURE.format(cmd, err))
641
642    def do_gattc_discover_characteristics(self, line):
643        """
644        Description: Discover characteristics from a connected service.
645        Assumptions: Already connected to a GATT server service.
646        Usage:
647          Examples:
648            gattc_discover_characteristics
649        """
650        cmd = "Discover and list characteristics from a GATT server."
651        try:
652            chars = self.pri_dut.gattc_lib.discoverCharacteristics()
653            self.log.info("Discovered chars:\n{}".format(
654                pprint.pformat(chars)))
655        except Exception as err:
656            self.log.error(FAILURE.format(cmd, err))
657
658    def do_gattc_notify_all_chars(self, line):
659        """
660        Description: Enable all notifications on all Characteristics on
661            a GATT server.
662        Assumptions: Basic GATT connection made.
663        Usage:
664          Examples:
665            gattc_notify_all_chars
666        """
667        cmd = "Read all characteristics from the GATT service."
668        try:
669            services = self.pri_dut.gattc_lib.listServices(
670                self.unique_mac_addr_id)
671            for service in services['result']:
672                service_id = service['id']
673                service_uuid = service['uuid_type']
674                self.pri_dut.gattc_lib.connectToService(
675                    self.unique_mac_addr_id, service_id)
676                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
677                print("Reading chars in service uuid: {}".format(service_uuid))
678
679                for char in chars['result']:
680                    char_id = char['id']
681                    char_uuid = char['uuid_type']
682                    # quick char filter for apple-4 test... remove later
683                    print("found uuid {}".format(char_uuid))
684                    try:
685                        self.pri_dut.gattc_lib.enableNotifyCharacteristic(
686                            char_id)
687                    except Exception as err:
688                        print("error enabling notification")
689        except Exception as err:
690            self.log.error(FAILURE.format(cmd, err))
691
692    def do_gattc_read_all_chars(self, line):
693        """
694        Description: Read all Characteristic values from a GATT server across
695            all services.
696        Assumptions: Basic GATT connection made.
697        Usage:
698          Examples:
699            gattc_read_all_chars
700        """
701        cmd = "Read all characteristics from the GATT service."
702        try:
703            services = self.pri_dut.gattc_lib.listServices(
704                self.unique_mac_addr_id)
705            for service in services['result']:
706                service_id = service['id']
707                service_uuid = service['uuid_type']
708                self.pri_dut.gattc_lib.connectToService(
709                    self.unique_mac_addr_id, service_id)
710                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
711                print("Reading chars in service uuid: {}".format(service_uuid))
712
713                for char in chars['result']:
714                    char_id = char['id']
715                    char_uuid = char['uuid_type']
716                    try:
717                        read_val =  \
718                            self.pri_dut.gattc_lib.readCharacteristicById(
719                                char_id)
720                        print("  Characteristic uuid / Value: {} / {}".format(
721                            char_uuid, read_val['result']))
722                        str_value = ""
723                        for val in read_val['result']:
724                            str_value += chr(val)
725                        print("    str val: {}".format(str_value))
726                    except Exception as err:
727                        print(err)
728                        pass
729        except Exception as err:
730            self.log.error(FAILURE.format(cmd, err))
731
732    def do_gattc_read_all_desc(self, line):
733        """
734        Description: Read all Descriptors values from a GATT server across
735            all services.
736        Assumptions: Basic GATT connection made.
737        Usage:
738          Examples:
739            gattc_read_all_chars
740        """
741        cmd = "Read all descriptors from the GATT service."
742        try:
743            services = self.pri_dut.gattc_lib.listServices(
744                self.unique_mac_addr_id)
745            for service in services['result']:
746                service_id = service['id']
747                service_uuid = service['uuid_type']
748                self.pri_dut.gattc_lib.connectToService(
749                    self.unique_mac_addr_id, service_id)
750                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
751                print("Reading descs in service uuid: {}".format(service_uuid))
752
753                for char in chars['result']:
754                    char_id = char['id']
755                    char_uuid = char['uuid_type']
756                    descriptors = char['descriptors']
757                    print("  Reading descs in char uuid: {}".format(char_uuid))
758                    for desc in descriptors:
759                        desc_id = desc["id"]
760                        desc_uuid = desc["uuid_type"]
761                    try:
762                        read_val = self.pri_dut.gattc_lib.readDescriptorById(
763                            desc_id)
764                        print("    Descriptor uuid / Value: {} / {}".format(
765                            desc_uuid, read_val['result']))
766                    except Exception as err:
767                        pass
768        except Exception as err:
769            self.log.error(FAILURE.format(cmd, err))
770
771    def do_gattc_write_all_desc(self, line):
772        """
773        Description: Write a value to all Descriptors on the GATT server.
774        Assumptions: Basic GATT connection made.
775        Input(s):
776            offset: Required. The offset to start writing to.
777            size: Required. The size of bytes to write (value will be generated).
778                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
779        Usage:
780          Examples:
781            gattc_write_all_desc 0 100
782            gattc_write_all_desc 10 2
783        """
784        cmd = "Read all descriptors from the GATT service."
785        try:
786            args = line.split()
787            if len(args) != 2:
788                self.log.info("2 Arguments required: [Offset] [Size]")
789                return
790            offset = int(args[0])
791            size = args[1]
792            write_value = []
793            for i in range(int(size)):
794                write_value.append(i % 256)
795            services = self.pri_dut.gattc_lib.listServices(
796                self.unique_mac_addr_id)
797            for service in services['result']:
798                service_id = service['id']
799                service_uuid = service['uuid_type']
800                self.pri_dut.gattc_lib.connectToService(
801                    self.unique_mac_addr_id, service_id)
802                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
803                print("Writing descs in service uuid: {}".format(service_uuid))
804
805                for char in chars['result']:
806                    char_id = char['id']
807                    char_uuid = char['uuid_type']
808                    descriptors = char['descriptors']
809                    print("  Reading descs in char uuid: {}".format(char_uuid))
810                    for desc in descriptors:
811                        desc_id = desc["id"]
812                        desc_uuid = desc["uuid_type"]
813                    try:
814                        write_val = self.pri_dut.gattc_lib.writeDescriptorById(
815                            desc_id, offset, write_value)
816                        print("    Descriptor uuid / Result: {} / {}".format(
817                            desc_uuid, write_val['result']))
818                    except Exception as err:
819                        pass
820        except Exception as err:
821            self.log.error(FAILURE.format(cmd, err))
822
823    def do_gattc_read_all_long_desc(self, line):
824        """
825        Description: Read all long Characteristic Descriptors
826        Assumptions: Basic GATT connection made.
827        Input(s):
828            offset: Required. The offset to start reading from.
829            max_bytes: Required. The max size of bytes to return.
830        Usage:
831          Examples:
832            gattc_read_all_long_desc 0 100
833            gattc_read_all_long_desc 10 20
834        """
835        cmd = "Read all long descriptors from the GATT service."
836        try:
837            args = line.split()
838            if len(args) != 2:
839                self.log.info("2 Arguments required: [Offset] [Size]")
840                return
841            offset = int(args[0])
842            max_bytes = int(args[1])
843            services = self.pri_dut.ble_lib.bleListServices(
844                self.unique_mac_addr_id)
845            for service in services['result']:
846                service_id = service['id']
847                service_uuid = service['uuid_type']
848                self.pri_dut.gattc_lib.connectToService(
849                    self.unique_mac_addr_id, service_id)
850                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
851                print("Reading descs in service uuid: {}".format(service_uuid))
852
853                for char in chars['result']:
854                    char_id = char['id']
855                    char_uuid = char['uuid_type']
856                    descriptors = char['descriptors']
857                    print("  Reading descs in char uuid: {}".format(char_uuid))
858                    for desc in descriptors:
859                        desc_id = desc["id"]
860                        desc_uuid = desc["uuid_type"]
861                    try:
862                        read_val = self.pri_dut.gattc_lib.readLongDescriptorById(
863                            desc_id, offset, max_bytes)
864                        print("    Descriptor uuid / Result: {} / {}".format(
865                            desc_uuid, read_val['result']))
866                    except Exception as err:
867                        pass
868        except Exception as err:
869            self.log.error(FAILURE.format(cmd, err))
870
871    def do_gattc_read_all_long_char(self, line):
872        """
873        Description: Read all long Characteristic
874        Assumptions: Basic GATT connection made.
875        Input(s):
876            offset: Required. The offset to start reading from.
877            max_bytes: Required. The max size of bytes to return.
878        Usage:
879          Examples:
880            gattc_read_all_long_char 0 100
881            gattc_read_all_long_char 10 20
882        """
883        cmd = "Read all long Characteristics from the GATT service."
884        try:
885            args = line.split()
886            if len(args) != 2:
887                self.log.info("2 Arguments required: [Offset] [Size]")
888                return
889            offset = int(args[0])
890            max_bytes = int(args[1])
891            services = self.pri_dut.ble_lib.bleListServices(
892                self.unique_mac_addr_id)
893            for service in services['result']:
894                service_id = service['id']
895                service_uuid = service['uuid_type']
896                self.pri_dut.gattc_lib.connectToService(
897                    self.unique_mac_addr_id, service_id)
898                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
899                print("Reading chars in service uuid: {}".format(service_uuid))
900
901                for char in chars['result']:
902                    char_id = char['id']
903                    char_uuid = char['uuid_type']
904                    try:
905                        read_val = self.pri_dut.gattc_lib.readLongCharacteristicById(
906                            char_id, offset, max_bytes)
907                        print("    Char uuid / Result: {} / {}".format(
908                            char_uuid, read_val['result']))
909                    except Exception as err:
910                        pass
911        except Exception as err:
912            self.log.error(FAILURE.format(cmd, err))
913
914    def do_gattc_write_all_chars(self, line):
915        """
916        Description: Write all characteristic values from a GATT server across
917            all services.
918        Assumptions: Basic GATT connection made.
919        Input(s):
920            offset: Required. The offset to start writing on.
921            size: The write value size (value will be generated)
922                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
923        Usage:
924          Examples:
925            gattc_write_all_chars 0 10
926            gattc_write_all_chars 10 1
927        """
928        cmd = "Read all characteristics from the GATT service."
929        try:
930            args = line.split()
931            if len(args) != 2:
932                self.log.info("2 Arguments required: [Offset] [Size]")
933                return
934            offset = int(args[0])
935            size = int(args[1])
936            write_value = []
937            for i in range(size):
938                write_value.append(i % 256)
939            services = self.pri_dut.gattc_lib.listServices(
940                self.unique_mac_addr_id)
941            for service in services['result']:
942                service_id = service['id']
943                service_uuid = service['uuid_type']
944                self.pri_dut.gattc_lib.connectToService(
945                    self.unique_mac_addr_id, service_id)
946                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
947                print("Writing chars in service uuid: {}".format(service_uuid))
948
949                for char in chars['result']:
950                    char_id = char['id']
951                    char_uuid = char['uuid_type']
952                    try:
953                        write_result = self.pri_dut.gattc_lib.writeCharById(
954                            char_id, offset, write_value)
955                        print("  Characteristic uuid write result: {} / {}".
956                              format(char_uuid, write_result['result']))
957                    except Exception as err:
958                        print("error writing char {}".format(err))
959                        pass
960        except Exception as err:
961            self.log.error(FAILURE.format(cmd, err))
962
963    def do_gattc_write_all_chars_without_response(self, line):
964        """
965        Description: Write all characteristic values from a GATT server across
966            all services.
967        Assumptions: Basic GATT connection made.
968        Input(s):
969            size: The write value size (value will be generated).
970                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
971        Usage:
972          Examples:
973            gattc_write_all_chars_without_response 100
974        """
975        cmd = "Read all characteristics from the GATT service."
976        try:
977            args = line.split()
978            if len(args) != 1:
979                self.log.info("1 Arguments required: [Size]")
980                return
981            size = int(args[0])
982            write_value = []
983            for i in range(size):
984                write_value.append(i % 256)
985            services = self.pri_dut.gattc_lib.listServices(
986                self.unique_mac_addr_id)
987            for service in services['result']:
988                service_id = service['id']
989                service_uuid = service['uuid_type']
990                self.pri_dut.gattc_lib.connectToService(
991                    self.unique_mac_addr_id, service_id)
992                chars = self.pri_dut.gattc_lib.discoverCharacteristics()
993                print("Reading chars in service uuid: {}".format(service_uuid))
994
995                for char in chars['result']:
996                    char_id = char['id']
997                    char_uuid = char['uuid_type']
998                    try:
999                        write_result = \
1000                            self.pri_dut.gattc_lib.writeCharByIdWithoutResponse(
1001                                char_id, write_value)
1002                        print("  Characteristic uuid write result: {} / {}".
1003                              format(char_uuid, write_result['result']))
1004                    except Exception as err:
1005                        pass
1006        except Exception as err:
1007            self.log.error(FAILURE.format(cmd, err))
1008
1009    def do_gattc_write_char_by_id(self, line):
1010        """
1011        Description: Write char by characteristic id reference.
1012        Assumptions: Already connected to a GATT server service.
1013        Input(s):
1014            characteristic_id: The characteristic id reference on the GATT
1015                service
1016            offset: The offset value to use
1017            size: Function will generate random bytes by input size.
1018                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1019        Usage:
1020          Examples:
1021            gattc_write_char_by_id char_id 0 5
1022            gattc_write_char_by_id char_id 20 1
1023        """
1024        cmd = "Write to GATT server characteristic ."
1025        try:
1026            args = line.split()
1027            if len(args) != 3:
1028                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1029                return
1030            id = int(args[0], 16)
1031            offset = int(args[1])
1032            size = int(args[2])
1033            write_value = []
1034            for i in range(size):
1035                write_value.append(i % 256)
1036            self.test_dut.gatt_client_write_characteristic_by_handle(
1037                self.unique_mac_addr_id, id, offset, write_value)
1038        except Exception as err:
1039            self.log.error(FAILURE.format(cmd, err))
1040
1041    def do_gattc_write_long_char_by_id(self, line):
1042        """
1043        Description: Write long char by characteristic id reference.
1044        Assumptions: Already connected to a GATT server service.
1045        Input(s):
1046            characteristic_id: The characteristic id reference on the GATT
1047                service
1048            offset: The offset value to use
1049            size: Function will generate random bytes by input size.
1050                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1051            reliable_mode: Optional: Reliable writes represented as bool
1052        Usage:
1053          Examples:
1054            gattc_write_long_char_by_id char_id 0 5
1055            gattc_write_long_char_by_id char_id 20 1
1056            gattc_write_long_char_by_id char_id 20 1 true
1057            gattc_write_long_char_by_id char_id 20 1 false
1058        """
1059        cmd = "Long Write to GATT server characteristic ."
1060        try:
1061            args = line.split()
1062            if len(args) < 3:
1063                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1064                return
1065            id = int(args[0], 16)
1066            offset = int(args[1])
1067            size = int(args[2])
1068            reliable_mode = False
1069            if len(args) > 3:
1070                reliable_mode = self.str_to_bool(args[3])
1071            write_value = []
1072            for i in range(size):
1073                write_value.append(i % 256)
1074            self.test_dut.gatt_client_write_long_characteristic_by_handle(
1075                self.unique_mac_addr_id, id, offset, write_value,
1076                reliable_mode)
1077        except Exception as err:
1078            self.log.error(FAILURE.format(cmd, err))
1079
1080    def do_gattc_write_long_desc_by_id(self, line):
1081        """
1082        Description: Write long char by descrioptor id reference.
1083        Assumptions: Already connected to a GATT server service.
1084        Input(s):
1085            characteristic_id: The characteristic id reference on the GATT
1086                service
1087            offset: The offset value to use
1088            size: Function will generate random bytes by input size.
1089                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1090        Usage:
1091          Examples:
1092            gattc_write_long_desc_by_id char_id 0 5
1093            gattc_write_long_desc_by_id char_id 20 1
1094        """
1095        cmd = "Long Write to GATT server descriptor ."
1096        try:
1097            args = line.split()
1098            if len(args) != 3:
1099                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1100                return
1101            id = int(args[0], 16)
1102            offset = int(args[1])
1103            size = int(args[2])
1104            write_value = []
1105            for i in range(size):
1106                write_value.append(i % 256)
1107            self.test_dut.gatt_client_write_long_descriptor_by_handle(
1108                self.unique_mac_addr_id, id, offset, write_value)
1109        except Exception as err:
1110            self.log.error(FAILURE.format(cmd, err))
1111
1112    def do_gattc_write_char_by_id_without_response(self, line):
1113        """
1114        Description: Write char by characteristic id reference without response.
1115        Assumptions: Already connected to a GATT server service.
1116        Input(s):
1117            characteristic_id: The characteristic id reference on the GATT
1118                service
1119            size: Function will generate random bytes by input size.
1120                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1121        Usage:
1122          Examples:
1123            gattc_write_char_by_id_without_response char_id 5
1124        """
1125        cmd = "Write characteristic by id without response."
1126        try:
1127            args = line.split()
1128            if len(args) != 2:
1129                self.log.info("2 Arguments required: [Id] [Size]")
1130                return
1131            id = int(args[0], 16)
1132            size = args[1]
1133            write_value = []
1134            for i in range(int(size)):
1135                write_value.append(i % 256)
1136            self.test_dut.gatt_client_write_characteristic_without_response_by_handle(
1137                self.unique_mac_addr_id, id, write_value)
1138        except Exception as err:
1139            self.log.error(FAILURE.format(cmd, err))
1140
1141    def do_gattc_enable_notify_char_by_id(self, line):
1142        """
1143        Description: Enable Characteristic notification on Characteristic ID.
1144        Assumptions: Already connected to a GATT server service.
1145        Input(s):
1146            characteristic_id: The characteristic id reference on the GATT
1147                service
1148        Usage:
1149          Examples:
1150            gattc_enable_notify_char_by_id char_id
1151        """
1152        cmd = "Enable notifications by Characteristic id."
1153        try:
1154            id = int(line, 16)
1155            self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle(
1156                self.unique_mac_addr_id, id)
1157        except Exception as err:
1158            self.log.error(FAILURE.format(cmd, err))
1159
1160    def do_gattc_disable_notify_char_by_id(self, line):
1161        """
1162        Description: Disable Characteristic notification on Characteristic ID.
1163        Assumptions: Already connected to a GATT server service.
1164        Input(s):
1165            characteristic_id: The characteristic id reference on the GATT
1166                service
1167        Usage:
1168          Examples:
1169            gattc_disable_notify_char_by_id char_id
1170        """
1171        cmd = "Disable notify Characteristic by id."
1172        try:
1173            id = int(line, 16)
1174            self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle(
1175                self.unique_mac_addr_id, id)
1176        except Exception as err:
1177            self.log.error(FAILURE.format(cmd, err))
1178
1179    def do_gattc_read_char_by_id(self, line):
1180        """
1181        Description: Read Characteristic by ID.
1182        Assumptions: Already connected to a GATT server service.
1183        Input(s):
1184            characteristic_id: The characteristic id reference on the GATT
1185                service
1186        Usage:
1187          Examples:
1188            gattc_read_char_by_id char_id
1189        """
1190        cmd = "Read Characteristic value by ID."
1191        try:
1192            id = int(line, 16)
1193            read_val = self.test_dut.gatt_client_read_characteristic_by_handle(
1194                self.unique_mac_addr_id, id)
1195            self.log.info("Characteristic Value with id {}: {}".format(
1196                id, read_val))
1197        except Exception as err:
1198            self.log.error(FAILURE.format(cmd, err))
1199
1200    def do_gattc_read_char_by_uuid(self, characteristic_uuid):
1201        """
1202        Description: Read Characteristic by UUID (read by type).
1203        Assumptions: Already connected to a GATT server service.
1204        Input(s):
1205            characteristic_uuid: The characteristic id reference on the GATT
1206                service
1207        Usage:
1208          Examples:
1209            gattc_read_char_by_id char_id
1210        """
1211        cmd = "Read Characteristic value by ID."
1212        try:
1213            short_uuid_len = 4
1214            if len(characteristic_uuid) == short_uuid_len:
1215                characteristic_uuid = BASE_UUID.format(characteristic_uuid)
1216
1217            read_val = self.test_dut.gatt_client_read_characteristic_by_uuid(
1218                self.unique_mac_addr_id, characteristic_uuid)
1219            self.log.info("Characteristic Value with id {}: {}".format(
1220                id, read_val))
1221        except Exception as err:
1222            self.log.error(FAILURE.format(cmd, err))
1223
1224    def do_gattc_write_desc_by_id(self, line):
1225        """
1226        Description: Write Descriptor by characteristic id reference.
1227        Assumptions: Already connected to a GATT server service.
1228        Input(s):
1229            descriptor_id: The Descriptor id reference on the GATT service
1230            offset: The offset value to use
1231            size: Function will generate random bytes by input size.
1232                IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04]
1233        Usage:
1234          Examples:
1235            gattc_write_desc_by_id desc_id 0 5
1236            gattc_write_desc_by_id desc_id 20 1
1237        """
1238        cmd = "Write Descriptor by id."
1239        try:
1240            args = line.split()
1241            id = int(args[0], 16)
1242            offset = int(args[1])
1243            size = args[2]
1244            write_value = []
1245            for i in range(int(size)):
1246                write_value.append(i % 256)
1247            write_result = self.test_dut.gatt_client_write_descriptor_by_handle(
1248                self.unique_mac_addr_id, id, offset, write_value)
1249            self.log.info("Descriptor Write result {}: {}".format(
1250                id, write_result))
1251        except Exception as err:
1252            self.log.error(FAILURE.format(cmd, err))
1253
1254    def do_gattc_read_desc_by_id(self, line):
1255        """
1256        Description: Read Descriptor by ID.
1257        Assumptions: Already connected to a GATT server service.
1258        Input(s):
1259            descriptor_id: The Descriptor id reference on the GATT service
1260        Usage:
1261          Examples:
1262            gattc_read_desc_by_id desc_id
1263        """
1264        cmd = "Read Descriptor by ID."
1265        try:
1266            id = int(line, 16)
1267            read_val = self.test_dut.gatt_client_read_descriptor_by_handle(
1268                self.unique_mac_addr_id, id)
1269            self.log.info("Descriptor Value with id {}: {}".format(
1270                id, read_val))
1271        except Exception as err:
1272            self.log.error(FAILURE.format(cmd, err))
1273
1274    def do_gattc_read_long_char_by_id(self, line):
1275        """
1276        Description: Read long Characteristic value by id.
1277        Assumptions: Already connected to a GATT server service.
1278        Input(s):
1279            characteristic_id: The characteristic id reference on the GATT
1280                service
1281            offset: The offset value to use.
1282            max_bytes: The max bytes size to return.
1283        Usage:
1284          Examples:
1285            gattc_read_long_char_by_id char_id 0 10
1286            gattc_read_long_char_by_id char_id 20 1
1287        """
1288        cmd = "Read long Characteristic value by id."
1289        try:
1290            args = line.split()
1291            if len(args) != 3:
1292                self.log.info("3 Arguments required: [Id] [Offset] [Size]")
1293                return
1294            id = int(args[0], 16)
1295            offset = int(args[1])
1296            max_bytes = int(args[2])
1297            read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle(
1298                self.unique_mac_addr_id, id, offset, max_bytes)
1299            self.log.info("Characteristic Value with id {}: {}".format(
1300                id, read_val['result']))
1301
1302        except Exception as err:
1303            self.log.error(FAILURE.format(cmd, err))
1304
1305    """End GATT client wrappers"""
1306    """Begin LE scan wrappers"""
1307
1308    def _update_scan_results(self, scan_results):
1309        self.le_ids = []
1310        for scan in scan_results['result']:
1311            self.le_ids.append(scan['id'])
1312
1313    def do_ble_start_scan(self, line):
1314        """
1315        Description: Perform a BLE scan.
1316        Default filter name: ""
1317        Optional input: filter_device_name
1318        Usage:
1319          Examples:
1320            ble_start_scan
1321            ble_start_scan eddystone
1322        """
1323        cmd = "Perform a BLE scan and list discovered devices."
1324        try:
1325            scan_filter = {"name_substring": ""}
1326            if line:
1327                scan_filter = {"name_substring": line}
1328            self.pri_dut.gattc_lib.bleStartBleScan(scan_filter)
1329        except Exception as err:
1330            self.log.error(FAILURE.format(cmd, err))
1331
1332    def do_ble_stop_scan(self, line):
1333        """
1334        Description: Stops a BLE scan and returns discovered devices.
1335        Usage:
1336          Examples:
1337            ble_stop_scan
1338        """
1339        cmd = "Stops a BLE scan and returns discovered devices."
1340        try:
1341            scan_results = self.pri_dut.gattc_lib.bleStopBleScan()
1342            self._update_scan_results(scan_results)
1343            self.log.info(pprint.pformat(scan_results))
1344        except Exception as err:
1345            self.log.error(FAILURE.format(cmd, err))
1346
1347    def do_ble_get_discovered_devices(self, line):
1348        """
1349        Description: Get discovered LE devices of an active scan.
1350        Usage:
1351          Examples:
1352            ble_stop_scan
1353        """
1354        cmd = "Get discovered LE devices of an active scan."
1355        try:
1356            scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices()
1357            self._update_scan_results(scan_results)
1358            self.log.info(pprint.pformat(scan_results))
1359        except Exception as err:
1360            self.log.error(FAILURE.format(cmd, err))
1361
1362    """End LE scan wrappers"""
1363    """Begin GATT Server wrappers"""
1364
1365    def do_gatts_close(self, line):
1366        """
1367        Description: Close active GATT server.
1368
1369        Usage:
1370          Examples:
1371            gatts_close
1372        """
1373        cmd = "Close active GATT server."
1374        try:
1375            result = self.pri_dut.gatts_lib.closeServer()
1376            self.log.info(result)
1377        except Exception as err:
1378            self.log.error(FAILURE.format(cmd, err))
1379
1380    def complete_gatts_setup_database(self, text, line, begidx, endidx):
1381        if not text:
1382            completions = list(
1383                gatt_test_database.GATT_SERVER_DB_MAPPING.keys())
1384        else:
1385            completions = [
1386                s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
1387                if s.startswith(text)
1388            ]
1389        return completions
1390
1391    def do_gatts_setup_database(self, line):
1392        """
1393        Description: Setup a Gatt server database based on pre-defined inputs.
1394            Supports Tab Autocomplete.
1395        Input(s):
1396            descriptor_db_name: The descriptor db name that matches one in
1397                acts_contrib.test_utils.bt.gatt_test_database
1398        Usage:
1399          Examples:
1400            gatts_setup_database LARGE_DB_1
1401        """
1402        cmd = "Setup GATT Server Database Based of pre-defined dictionaries"
1403        try:
1404            scan_results = self.pri_dut.gatts_lib.publishServer(
1405                gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
1406            self.log.info(scan_results)
1407        except Exception as err:
1408            self.log.error(FAILURE.format(cmd, err))
1409
1410    """End GATT Server wrappers"""
1411    """Begin Bluetooth Controller wrappers"""
1412
1413    def complete_btc_pair(self, text, line, begidx, endidx):
1414        """ Provides auto-complete for btc_pair cmd.
1415
1416        See Cmd module for full description.
1417        """
1418        arg_completion = len(line.split(" ")) - 1
1419        pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE']
1420        bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE']
1421        transport_options = ['BREDR', 'LE']
1422        if arg_completion == 1:
1423            if not text:
1424                completions = pairing_security_level_options
1425            else:
1426                completions = [
1427                    s for s in pairing_security_level_options
1428                    if s.startswith(text)
1429                ]
1430            return completions
1431        if arg_completion == 2:
1432            if not text:
1433                completions = bondable_options
1434            else:
1435                completions = [
1436                    s for s in bondable_options if s.startswith(text)
1437                ]
1438            return completions
1439        if arg_completion == 3:
1440            if not text:
1441                completions = transport_options
1442            else:
1443                completions = [
1444                    s for s in transport_options if s.startswith(text)
1445                ]
1446            return completions
1447
1448    def do_btc_pair(self, line):
1449        """
1450        Description: Sends an outgoing pairing request.
1451
1452        Input(s):
1453            pairing security level: ENCRYPTED, AUTHENTICATED, or NONE
1454            bondable: BONDABLE, NON_BONDABLE, or NONE
1455            transport: BREDR or LE
1456
1457        Usage:
1458          Examples:
1459            btc_pair NONE NONE BREDR
1460            btc_pair ENCRYPTED NONE LE
1461            btc_pair AUTHENTICATED NONE LE
1462            btc_pair NONE NON_BONDABLE BREDR
1463        """
1464        cmd = "Send an outgoing pairing request."
1465        pairing_security_level_mapping = {
1466            "ENCRYPTED": 1,
1467            "AUTHENTICATED": 2,
1468            "NONE": None,
1469        }
1470
1471        bondable_mapping = {
1472            "BONDABLE": True,
1473            "NON_BONDABLE": False,
1474            "NONE": None,
1475        }
1476
1477        transport_mapping = {
1478            "BREDR": 1,
1479            "LE": 2,
1480        }
1481
1482        try:
1483            options = line.split(" ")
1484            result = self.test_dut.init_pair(
1485                self.unique_mac_addr_id,
1486                pairing_security_level_mapping.get(options[0]),
1487                bondable_mapping.get(options[1]),
1488                transport_mapping.get(options[2]),
1489            )
1490            self.log.info(result)
1491        except Exception as err:
1492            self.log.error(FAILURE.format(cmd, err))
1493
1494    def complete_btc_accept_pairing(self, text, line, begidx, endidx):
1495        """ Provides auto-complete for btc_set_io_capabilities cmd.
1496
1497        See Cmd module for full description.
1498        """
1499        arg_completion = len(line.split(" ")) - 1
1500        input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD']
1501        output_options = ['NONE', 'DISPLAY']
1502        if arg_completion == 1:
1503            if not text:
1504                completions = input_options
1505            else:
1506                completions = [s for s in input_options if s.startswith(text)]
1507            return completions
1508        if arg_completion == 2:
1509            if not text:
1510                completions = output_options
1511            else:
1512                completions = [s for s in output_options if s.startswith(text)]
1513            return completions
1514
1515    def do_btc_accept_pairing(self, line):
1516        """
1517        Description: Accept all incoming pairing requests.
1518
1519        Input(s):
1520            input: String - The input I/O capabilities to use
1521                Available Values:
1522                NONE - Input capability type None
1523                CONFIRMATION - Input capability type confirmation
1524                KEYBOARD - Input capability type Keyboard
1525            output: String - The output I/O Capabilities to use
1526                Available Values:
1527                NONE - Output capability type None
1528                DISPLAY - output capability type Display
1529
1530        Usage:
1531          Examples:
1532            btc_accept_pairing
1533            btc_accept_pairing NONE DISPLAY
1534            btc_accept_pairing NONE NONE
1535            btc_accept_pairing KEYBOARD DISPLAY
1536        """
1537        cmd = "Accept incoming pairing requests"
1538        try:
1539            input_capabilities = "NONE"
1540            output_capabilities = "NONE"
1541            options = line.split(" ")
1542            if len(options) > 1:
1543                input_capabilities = options[0]
1544                output_capabilities = options[1]
1545            result = self.pri_dut.bts_lib.acceptPairing(
1546                input_capabilities, output_capabilities)
1547            self.log.info(result)
1548        except Exception as err:
1549            self.log.error(FAILURE.format(cmd, err))
1550
1551    def do_btc_forget_device(self, line):
1552        """
1553        Description: Forget pairing of the current device under test.
1554            Current device under test is the device found by
1555            tool_refresh_unique_id from custom user param. This function
1556            will also perform a clean disconnect if actively connected.
1557
1558        Usage:
1559          Examples:
1560            btc_forget_device
1561        """
1562        cmd = "For pairing of the current device under test."
1563        try:
1564            self.log.info("Forgetting device id: {}".format(
1565                self.unique_mac_addr_id))
1566            result = self.pri_dut.bts_lib.forgetDevice(self.unique_mac_addr_id)
1567            self.log.info(result)
1568        except Exception as err:
1569            self.log.error(FAILURE.format(cmd, err))
1570
1571    def do_btc_set_discoverable(self, discoverable):
1572        """
1573        Description: Change Bluetooth Controller discoverablility.
1574        Input(s):
1575            discoverable: true to set discoverable
1576                          false to set non-discoverable
1577        Usage:
1578          Examples:
1579            btc_set_discoverable true
1580            btc_set_discoverable false
1581        """
1582        cmd = "Change Bluetooth Controller discoverablility."
1583        try:
1584            result = self.test_dut.set_discoverable(
1585                self.str_to_bool(discoverable))
1586            self.log.info(result)
1587        except Exception as err:
1588            self.log.error(FAILURE.format(cmd, err))
1589
1590    def do_btc_set_name(self, name):
1591        """
1592        Description: Change Bluetooth Controller local name.
1593        Input(s):
1594            name: The name to set the Bluetooth Controller name to.
1595
1596        Usage:
1597          Examples:
1598            btc_set_name fs_test
1599        """
1600        cmd = "Change Bluetooth Controller local name."
1601        try:
1602            result = self.test_dut.set_bluetooth_local_name(name)
1603            self.log.info(result)
1604        except Exception as err:
1605            self.log.error(FAILURE.format(cmd, err))
1606
1607    def do_btc_request_discovery(self, discover):
1608        """
1609        Description: Change whether the Bluetooth Controller is in active.
1610            discovery or not.
1611        Input(s):
1612            discover: true to start discovery
1613                      false to end discovery
1614        Usage:
1615          Examples:
1616            btc_request_discovery true
1617            btc_request_discovery false
1618        """
1619        cmd = "Change whether the Bluetooth Controller is in active."
1620        try:
1621            result = self.pri_dut.bts_lib.requestDiscovery(
1622                self.str_to_bool(discover))
1623            self.log.info(result)
1624        except Exception as err:
1625            self.log.error(FAILURE.format(cmd, err))
1626
1627    def do_btc_get_known_remote_devices(self, line):
1628        """
1629        Description: Get a list of known devices.
1630
1631        Usage:
1632          Examples:
1633            btc_get_known_remote_devices
1634        """
1635        cmd = "Get a list of known devices."
1636        self.bt_control_devices = []
1637        try:
1638            device_list = self.pri_dut.bts_lib.getKnownRemoteDevices(
1639            )['result']
1640            for id_dict in device_list:
1641                device = device_list[id_dict]
1642                self.bt_control_devices.append(device)
1643                self.log.info("Device found {}".format(device))
1644
1645        except Exception as err:
1646            self.log.error(FAILURE.format(cmd, err))
1647
1648    def do_btc_forget_all_known_devices(self, line):
1649        """
1650        Description: Forget all known devices.
1651
1652        Usage:
1653          Examples:
1654            btc_forget_all_known_devices
1655        """
1656        cmd = "Forget all known devices."
1657        try:
1658            device_list = self.pri_dut.bts_lib.getKnownRemoteDevices(
1659            )['result']
1660            for device in device_list:
1661                d = device_list[device]
1662                if d['bonded'] or d['connected']:
1663                    self.log.info("Unbonding deivce: {}".format(d))
1664                    self.log.info(
1665                        self.pri_dut.bts_lib.forgetDevice(d['id'])['result'])
1666        except Exception as err:
1667            self.log.error(FAILURE.format(cmd, err))
1668
1669    def do_btc_connect_device(self, line):
1670        """
1671        Description: Connect to device under test.
1672            Device under test is specified by either user params
1673            or
1674                tool_set_target_device_name <name>
1675                do_tool_refresh_unique_id_using_bt_control
1676
1677        Usage:
1678          Examples:
1679            btc_connect_device
1680        """
1681        cmd = "Connect to device under test."
1682        try:
1683            result = self.pri_dut.bts_lib.connectDevice(
1684                self.unique_mac_addr_id)
1685            self.log.info(result)
1686        except Exception as err:
1687            self.log.error(FAILURE.format(cmd, err))
1688
1689    def complete_btc_connect_device_by_id(self, text, line, begidx, endidx):
1690        if not text:
1691            completions = list(self.bt_control_ids)[:]
1692        else:
1693            completions = [
1694                s for s in self.bt_control_ids if s.startswith(text)
1695            ]
1696        return completions
1697
1698    def do_btc_connect_device_by_id(self, device_id):
1699        """
1700        Description: Connect to device id based on pre-defined inputs.
1701            Supports Tab Autocomplete.
1702        Input(s):
1703            device_id: The device id to connect to.
1704
1705        Usage:
1706          Examples:
1707            btc_connect_device_by_id <device_id>
1708        """
1709        cmd = "Connect to device id based on pre-defined inputs."
1710        try:
1711            result = self.pri_dut.bts_lib.connectDevice(device_id)
1712            self.log.info(result)
1713        except Exception as err:
1714            self.log.error(FAILURE.format(cmd, err))
1715
1716    def complete_btc_connect_device_by_name(self, text, line, begidx, endidx):
1717        if not text:
1718            completions = list(self.bt_control_names)[:]
1719        else:
1720            completions = [
1721                s for s in self.bt_control_names if s.startswith(text)
1722            ]
1723        return completions
1724
1725    def do_btc_connect_device_by_name(self, device_name):
1726        """
1727        Description: Connect to device id based on pre-defined inputs.
1728            Supports Tab Autocomplete.
1729        Input(s):
1730            device_id: The device id to connect to.
1731
1732        Usage:
1733          Examples:
1734            btc_connect_device_by_name <device_id>
1735        """
1736        cmd = "Connect to device name based on pre-defined inputs."
1737        try:
1738            for device in self.bt_control_devices:
1739                if device_name is device['name']:
1740
1741                    result = self.pri_dut.bts_lib.connectDevice(device['id'])
1742                    self.log.info(result)
1743        except Exception as err:
1744            self.log.error(FAILURE.format(cmd, err))
1745
1746    def do_btc_disconnect_device(self, line):
1747        """
1748        Description: Disconnect to device under test.
1749            Device under test is specified by either user params
1750            or
1751                tool_set_target_device_name <name>
1752                do_tool_refresh_unique_id_using_bt_control
1753
1754        Usage:
1755          Examples:
1756            btc_disconnect_device
1757        """
1758        cmd = "Disconnect to device under test."
1759        try:
1760            result = self.pri_dut.bts_lib.disconnectDevice(
1761                self.unique_mac_addr_id)
1762            self.log.info(result)
1763        except Exception as err:
1764            self.log.error(FAILURE.format(cmd, err))
1765
1766    def do_btc_init_bluetooth_control(self, line):
1767        """
1768        Description: Initialize the Bluetooth Controller.
1769
1770        Usage:
1771          Examples:
1772            btc_init_bluetooth_control
1773        """
1774        cmd = "Initialize the Bluetooth Controller."
1775        try:
1776            result = self.test_dut.initialize_bluetooth_controller()
1777            self.log.info(result)
1778        except Exception as err:
1779            self.log.error(FAILURE.format(cmd, err))
1780
1781    def do_btc_get_local_address(self, line):
1782        """
1783        Description: Get the local BR/EDR address of the Bluetooth Controller.
1784
1785        Usage:
1786          Examples:
1787            btc_get_local_address
1788        """
1789        cmd = "Get the local BR/EDR address of the Bluetooth Controller."
1790        try:
1791            result = self.test_dut.get_local_bluetooth_address()
1792            self.log.info(result)
1793        except Exception as err:
1794            self.log.error(FAILURE.format(cmd, err))
1795
1796    def do_btc_input_pairing_pin(self, line):
1797        """
1798        Description: Sends a pairing pin to SL4F's Bluetooth Control's
1799        Pairing Delegate.
1800
1801        Usage:
1802          Examples:
1803            btc_input_pairing_pin 123456
1804        """
1805        cmd = "Input pairing pin to the Fuchsia device."
1806        try:
1807            result = self.pri_dut.bts_lib.inputPairingPin(line)['result']
1808            self.log.info(result)
1809        except Exception as err:
1810            self.log.error(FAILURE.format(cmd, err))
1811
1812    def do_btc_get_pairing_pin(self, line):
1813        """
1814        Description: Gets the pairing pin from SL4F's Bluetooth Control's
1815        Pairing Delegate.
1816
1817        Usage:
1818          Examples:
1819            btc_get_pairing_pin
1820        """
1821        cmd = "Get the pairing pin from the Fuchsia device."
1822        try:
1823            result = self.pri_dut.bts_lib.getPairingPin()['result']
1824            self.log.info(result)
1825        except Exception as err:
1826            self.log.error(FAILURE.format(cmd, err))
1827
1828    """End Bluetooth Control wrappers"""
1829    """Begin Profile Server wrappers"""
1830
1831    def do_sdp_pts_example(self, num_of_records):
1832        """
1833        Description: An example of how to setup a generic SDP record
1834            and SDP search capabilities. This example will pass a few
1835            SDP tests.
1836
1837        Input(s):
1838            num_of_records: The number of records to add.
1839
1840        Usage:
1841          Examples:
1842            sdp_pts_example 1
1843            sdp pts_example 10
1844        """
1845        cmd = "Setup SDP for PTS testing."
1846
1847        attributes = [
1848            bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'],
1849            bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'],
1850            bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'],
1851            bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'],
1852        ]
1853
1854        try:
1855            self.pri_dut.sdp_lib.addSearch(
1856                attributes, int(sig_uuid_constants['AudioSource'], 16))
1857            self.pri_dut.sdp_lib.addSearch(
1858                attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16))
1859            self.pri_dut.sdp_lib.addSearch(attributes,
1860                                           int(sig_uuid_constants['PANU'], 16))
1861            self.pri_dut.sdp_lib.addSearch(
1862                attributes, int(sig_uuid_constants['SerialPort'], 16))
1863            self.pri_dut.sdp_lib.addSearch(
1864                attributes, int(sig_uuid_constants['DialupNetworking'], 16))
1865            self.pri_dut.sdp_lib.addSearch(
1866                attributes, int(sig_uuid_constants['OBEXObjectPush'], 16))
1867            self.pri_dut.sdp_lib.addSearch(
1868                attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16))
1869            self.pri_dut.sdp_lib.addSearch(
1870                attributes, int(sig_uuid_constants['Headset'], 16))
1871            self.pri_dut.sdp_lib.addSearch(
1872                attributes, int(sig_uuid_constants['HandsfreeAudioGateway'],
1873                                16))
1874            self.pri_dut.sdp_lib.addSearch(
1875                attributes, int(sig_uuid_constants['Handsfree'], 16))
1876            self.pri_dut.sdp_lib.addSearch(
1877                attributes, int(sig_uuid_constants['SIM_Access'], 16))
1878            for i in range(int(num_of_records)):
1879                result = self.pri_dut.sdp_lib.addService(
1880                    sdp_pts_record_list[i])
1881                self.log.info(result)
1882        except Exception as err:
1883            self.log.error(FAILURE.format(cmd, err))
1884
1885    def do_sdp_cleanup(self, line):
1886        """
1887        Description: Cleanup any existing SDP records
1888
1889        Usage:
1890          Examples:
1891            sdp_cleanup
1892        """
1893        cmd = "Cleanup SDP objects."
1894        try:
1895            result = self.pri_dut.sdp_lib.cleanUp()
1896            self.log.info(result)
1897        except Exception as err:
1898            self.log.error(FAILURE.format(cmd, err))
1899
1900    def do_sdp_init(self, line):
1901        """
1902        Description: Init the profile proxy for setting up SDP records
1903
1904        Usage:
1905          Examples:
1906            sdp_init
1907        """
1908        cmd = "Initialize profile proxy objects for adding SDP records"
1909        try:
1910            result = self.pri_dut.sdp_lib.init()
1911            self.log.info(result)
1912        except Exception as err:
1913            self.log.error(FAILURE.format(cmd, err))
1914
1915    def do_sdp_connect_l2cap(self, line):
1916        """
1917        Description: Send an l2cap connection request over an input psm value.
1918
1919        Note: Must be already connected to a peer.
1920
1921        Input(s):
1922            psm: The int hex value to connect over. Available PSMs:
1923                SDP 0x0001  See Bluetooth Service Discovery Protocol (SDP)
1924                RFCOMM  0x0003  See RFCOMM with TS 07.10
1925                TCS-BIN 0x0005  See Bluetooth Telephony Control Specification /
1926                    TCS Binary
1927                TCS-BIN-CORDLESS    0x0007  See Bluetooth Telephony Control
1928                    Specification / TCS Binary
1929                BNEP    0x000F  See Bluetooth Network Encapsulation Protocol
1930                HID_Control 0x0011  See Human Interface Device
1931                HID_Interrupt   0x0013  See Human Interface Device
1932                UPnP    0x0015  See [ESDP]
1933                AVCTP   0x0017  See Audio/Video Control Transport Protocol
1934                AVDTP   0x0019  See Audio/Video Distribution Transport Protocol
1935                AVCTP_Browsing  0x001B  See Audio/Video Remote Control Profile
1936                UDI_C-Plane 0x001D  See the Unrestricted Digital Information
1937                    Profile [UDI]
1938                ATT 0x001F  See Bluetooth Core Specification​
1939                ​3DSP   0x0021​ ​​See 3D Synchronization Profile.
1940                ​LE_PSM_IPSP    ​0x0023 ​See Internet Protocol Support Profile
1941                    (IPSP)
1942                OTS 0x0025  See Object Transfer Service (OTS)
1943                EATT    0x0027  See Bluetooth Core Specification
1944            mode: String - The channel mode to connect to. Available values:
1945                Basic mode: BASIC
1946                Enhanced Retransmission mode: ERTM
1947
1948        Usage:
1949          Examples:
1950            sdp_connect_l2cap 0001 BASIC
1951            sdp_connect_l2cap 0019 ERTM
1952        """
1953        cmd = "Connect l2cap"
1954        try:
1955            info = line.split()
1956            result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id,
1957                                                       int(info[0], 16),
1958                                                       info[1])
1959            self.log.info(result)
1960        except Exception as err:
1961            self.log.error(FAILURE.format(cmd, err))
1962
1963    """End Profile Server wrappers"""
1964    """Begin AVDTP wrappers"""
1965
1966    def do_avdtp_init(self, initiator_delay):
1967        """
1968        Description: Init the A2DP component start and AVDTP service to
1969            initiate.
1970
1971        Input(s):
1972            initiator_delay: [Optional] The stream initiator delay to set in
1973            milliseconds.
1974
1975        Usage:
1976          Examples:
1977            avdtp_init 0
1978            avdtp_init 2000
1979            avdtp_init
1980        """
1981        cmd = "Initialize AVDTP proxy"
1982        try:
1983            if not initiator_delay:
1984                initiator_delay = None
1985            result = self.pri_dut.avdtp_lib.init(initiator_delay)
1986            self.log.info(result)
1987        except Exception as err:
1988            self.log.error(FAILURE.format(cmd, err))
1989
1990    def do_avdtp_kill_a2dp(self, line):
1991        """
1992        Description: Quickly kill any A2DP components.
1993
1994        Usage:
1995          Examples:
1996            avdtp_kill_a2dp
1997        """
1998        cmd = "Kill A2DP service"
1999        try:
2000            result = self.pri_dut.control_daemon("bt-a2dp.cmx", "stop")
2001            self.log.info(result)
2002        except Exception as err:
2003            self.log.error(FAILURE.format(cmd, err))
2004
2005    def do_avdtp_get_connected_peers(self, line):
2006        """
2007        Description: Get the connected peers for the AVDTP service
2008
2009        Usage:
2010          Examples:
2011            avdtp_get_connected_peers
2012        """
2013        cmd = "AVDTP get connected peers"
2014        try:
2015            result = self.pri_dut.avdtp_lib.getConnectedPeers()
2016            self.log.info(result)
2017        except Exception as err:
2018            self.log.error(FAILURE.format(cmd, err))
2019
2020    def do_avdtp_set_configuration(self, peer_id):
2021        """
2022        Description: Send AVDTP command to connected peer: set configuration
2023
2024        Input(s):
2025            peer_id: The specified peer_id.
2026
2027        Usage:
2028          Examples:
2029            avdtp_set_configuration <peer_id>
2030        """
2031        cmd = "Send AVDTP set configuration to connected peer"
2032        try:
2033            result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id))
2034            self.log.info(result)
2035        except Exception as err:
2036            self.log.error(FAILURE.format(cmd, err))
2037
2038    def do_avdtp_get_configuration(self, peer_id):
2039        """
2040        Description: Send AVDTP command to connected peer: get configuration
2041
2042        Input(s):
2043            peer_id: The specified peer_id.
2044
2045        Usage:
2046          Examples:
2047            avdtp_get_configuration <peer_id>
2048        """
2049        cmd = "Send AVDTP get configuration to connected peer"
2050        try:
2051            result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id))
2052            self.log.info(result)
2053        except Exception as err:
2054            self.log.error(FAILURE.format(cmd, err))
2055
2056    def do_avdtp_get_capabilities(self, peer_id):
2057        """
2058        Description: Send AVDTP command to connected peer: get capabilities
2059
2060        Input(s):
2061            peer_id: The specified peer_id.
2062
2063        Usage:
2064          Examples:
2065            avdtp_get_capabilities <peer_id>
2066        """
2067        cmd = "Send AVDTP get capabilities to connected peer"
2068        try:
2069            result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id))
2070            self.log.info(result)
2071        except Exception as err:
2072            self.log.error(FAILURE.format(cmd, err))
2073
2074    def do_avdtp_get_all_capabilities(self, peer_id):
2075        """
2076        Description: Send AVDTP command to connected peer: get all capabilities
2077
2078        Input(s):
2079            peer_id: The specified peer_id.
2080
2081        Usage:
2082          Examples:
2083            avdtp_get_all_capabilities <peer_id>
2084        """
2085        cmd = "Send AVDTP get all capabilities to connected peer"
2086        try:
2087            result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id))
2088            self.log.info(result)
2089        except Exception as err:
2090            self.log.error(FAILURE.format(cmd, err))
2091
2092    def do_avdtp_reconfigure_stream(self, peer_id):
2093        """
2094        Description: Send AVDTP command to connected peer: reconfigure stream
2095
2096        Input(s):
2097            peer_id: The specified peer_id.
2098
2099        Usage:
2100          Examples:
2101            avdtp_reconfigure_stream <peer_id>
2102        """
2103        cmd = "Send AVDTP reconfigure stream to connected peer"
2104        try:
2105            result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id))
2106            self.log.info(result)
2107        except Exception as err:
2108            self.log.error(FAILURE.format(cmd, err))
2109
2110    def do_avdtp_suspend_stream(self, peer_id):
2111        """
2112        Description: Send AVDTP command to connected peer: suspend stream
2113
2114        Input(s):
2115            peer_id: The specified peer_id.
2116
2117        Usage:
2118          Examples:
2119            avdtp_suspend_stream <peer_id>
2120        """
2121        cmd = "Send AVDTP suspend stream to connected peer"
2122        try:
2123            result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id))
2124            self.log.info(result)
2125        except Exception as err:
2126            self.log.error(FAILURE.format(cmd, err))
2127
2128    def do_avdtp_suspend_reconfigure(self, peer_id):
2129        """
2130        Description: Send AVDTP command to connected peer: suspend reconfigure
2131
2132        Input(s):
2133            peer_id: The specified peer_id.
2134
2135        Usage:
2136          Examples:
2137            avdtp_suspend_reconfigure <peer_id>
2138        """
2139        cmd = "Send AVDTP suspend reconfigure to connected peer"
2140        try:
2141            result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id))
2142            self.log.info(result)
2143        except Exception as err:
2144            self.log.error(FAILURE.format(cmd, err))
2145
2146    def do_avdtp_release_stream(self, peer_id):
2147        """
2148        Description: Send AVDTP command to connected peer: release stream
2149
2150        Input(s):
2151            peer_id: The specified peer_id.
2152
2153        Usage:
2154          Examples:
2155            avdtp_release_stream <peer_id>
2156        """
2157        cmd = "Send AVDTP release stream to connected peer"
2158        try:
2159            result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id))
2160            self.log.info(result)
2161        except Exception as err:
2162            self.log.error(FAILURE.format(cmd, err))
2163
2164    def do_avdtp_establish_stream(self, peer_id):
2165        """
2166        Description: Send AVDTP command to connected peer: establish stream
2167
2168        Input(s):
2169            peer_id: The specified peer_id.
2170
2171        Usage:
2172          Examples:
2173            avdtp_establish_stream <peer_id>
2174        """
2175        cmd = "Send AVDTP establish stream to connected peer"
2176        try:
2177            result = self.pri_dut.avdtp_lib.establishStream(int(peer_id))
2178            self.log.info(result)
2179        except Exception as err:
2180            self.log.error(FAILURE.format(cmd, err))
2181
2182    def do_avdtp_start_stream(self, peer_id):
2183        """
2184        Description: Send AVDTP command to connected peer: start stream
2185
2186        Input(s):
2187            peer_id: The specified peer_id.
2188
2189        Usage:
2190          Examples:
2191            avdtp_start_stream <peer_id>
2192        """
2193        cmd = "Send AVDTP start stream to connected peer"
2194        try:
2195            result = self.pri_dut.avdtp_lib.startStream(int(peer_id))
2196            self.log.info(result)
2197        except Exception as err:
2198            self.log.error(FAILURE.format(cmd, err))
2199
2200    def do_avdtp_abort_stream(self, peer_id):
2201        """
2202        Description: Send AVDTP command to connected peer: abort stream
2203
2204        Input(s):
2205            peer_id: The specified peer_id.
2206
2207        Usage:
2208          Examples:
2209            avdtp_abort_stream <peer_id>
2210        """
2211        cmd = "Send AVDTP abort stream to connected peer"
2212        try:
2213            result = self.pri_dut.avdtp_lib.abortStream(int(peer_id))
2214            self.log.info(result)
2215        except Exception as err:
2216            self.log.error(FAILURE.format(cmd, err))
2217
2218    def do_avdtp_remove_service(self, line):
2219        """
2220        Description: Removes the AVDTP service in use.
2221
2222        Usage:
2223          Examples:
2224            avdtp_establish_stream <peer_id>
2225        """
2226        cmd = "Remove AVDTP service"
2227        try:
2228            result = self.pri_dut.avdtp_lib.removeService()
2229            self.log.info(result)
2230        except Exception as err:
2231            self.log.error(FAILURE.format(cmd, err))
2232
2233    """End AVDTP wrappers"""
2234    """Begin Audio wrappers"""
2235
2236    def do_audio_start_output_save(self, line):
2237        """
2238        Description: Start audio output save
2239
2240        Usage:
2241          Examples:
2242            audio_start_output_save
2243        """
2244        cmd = "Start audio capture"
2245        try:
2246            result = self.pri_dut.audio_lib.startOutputSave()
2247            self.log.info(result)
2248        except Exception as err:
2249            self.log.error(FAILURE.format(cmd, err))
2250
2251    def do_audio_stop_output_save(self, line):
2252        """
2253        Description: Stop audio output save
2254
2255        Usage:
2256          Examples:
2257            audio_stop_output_save
2258        """
2259        cmd = "Stop audio capture"
2260        try:
2261            result = self.pri_dut.audio_lib.stopOutputSave()
2262            self.log.info(result)
2263        except Exception as err:
2264            self.log.error(FAILURE.format(cmd, err))
2265
2266    def do_audio_get_output_audio(self, line):
2267        """
2268        Description: Get the audio output saved to a local file
2269
2270        Usage:
2271          Examples:
2272            audio_get_output_audio
2273        """
2274        cmd = "Get audio capture"
2275        try:
2276            save_path = "{}/{}".format(self.pri_dut.log_path, "audio.raw")
2277            result = self.pri_dut.audio_lib.getOutputAudio(save_path)
2278        except Exception as err:
2279            self.log.error(FAILURE.format(cmd, err))
2280
2281    def do_audio_5_min_test(self, line):
2282        """
2283        Description: Capture and anlyize sine audio waves played from a Bluetooth A2DP
2284        Source device.
2285
2286        Pre steps:
2287        1. Pair A2DP source device
2288        2. Prepare generated SOX file over preferred codec on source device.
2289            Quick way to generate necessary audio files:
2290            sudo apt-get install sox
2291            sox -b 16 -r 48000 -c 2 -n audio_file_2k1k_5_min.wav synth 300 sine 2000 sine 3000
2292
2293        Usage:
2294          Examples:
2295            audio_5_min_test
2296        """
2297        cmd = "5 min audio capture test"
2298        input("Press Enter once Source device is streaming audio file")
2299        try:
2300            result = self.pri_dut.audio_lib.startOutputSave()
2301            self.log.info(result)
2302            for i in range(5):
2303                print("Minutes left: {}".format(10 - i))
2304                time.sleep(60)
2305            result = self.pri_dut.audio_lib.stopOutputSave()
2306            log_time = int(time.time())
2307            save_path = "{}/{}".format(self.pri_dut.log_path,
2308                                       "{}_audio.raw".format(log_time))
2309            analysis_path = "{}/{}".format(
2310                self.pri_dut.log_path,
2311                "{}_audio_analysis.txt".format(log_time))
2312            result = self.pri_dut.audio_lib.getOutputAudio(save_path)
2313
2314            channels = 1
2315            try:
2316                quality_analysis(filename=save_path,
2317                                 output_file=analysis_path,
2318                                 bit_width=audio_bits_per_sample_32,
2319                                 rate=audio_sample_rate_48000,
2320                                 channel=channels,
2321                                 spectral_only=False)
2322
2323            except Exception as err:
2324                self.log.error("Failed to analyze raw audio: {}".format(err))
2325                return False
2326
2327            self.log.info("Analysis output here: {}".format(analysis_path))
2328            self.log.info("Analysis Results: {}".format(
2329                open(analysis_path).readlines()))
2330        except Exception as err:
2331            self.log.error(FAILURE.format(cmd, err))
2332
2333    """End Audio wrappers"""
2334    """Begin HFP wrappers"""
2335
2336    def do_hfp_init(self, line):
2337        """
2338        Description: Init the HFP component initiate.
2339
2340        Usage:
2341          Examples:
2342            hfp_init
2343        """
2344        cmd = "Initialize HFP proxy"
2345        try:
2346            result = self.pri_dut.hfp_lib.init()
2347            self.log.info(result)
2348        except Exception as err:
2349            self.log.error(FAILURE.format(cmd, err))
2350
2351    def do_hfp_remove_service(self, line):
2352        """
2353        Description: Removes the HFP service in use.
2354
2355        Usage:
2356          Examples:
2357            hfp_remove_service
2358        """
2359        cmd = "Remove HFP service"
2360        try:
2361            result = self.pri_dut.hfp_lib.removeService()
2362            self.log.info(result)
2363        except Exception as err:
2364            self.log.error(FAILURE.format(cmd, err))
2365
2366    def do_hfp_list_peers(self, line):
2367        """
2368        Description: List all HFP Hands-Free peers connected to the DUT.
2369
2370        Input(s):
2371
2372        Usage:
2373          Examples:
2374            hfp_list_peers
2375        """
2376        cmd = "Lists connected peers"
2377        try:
2378            result = self.pri_dut.hfp_lib.listPeers()
2379            self.log.info(pprint.pformat(result))
2380        except Exception as err:
2381            self.log.error(FAILURE.format(cmd, err))
2382
2383    def do_hfp_set_active_peer(self, line):
2384        """
2385        Description: Set the active HFP Hands-Free peer for the DUT.
2386
2387        Input(s):
2388            peer_id: The id of the peer to be set active.
2389
2390        Usage:
2391          Examples:
2392            hfp_set_active_peer <peer_id>
2393        """
2394        cmd = "Set the active peer"
2395        try:
2396            peer_id = int(line.strip())
2397            result = self.pri_dut.hfp_lib.setActivePeer(peer_id)
2398            self.log.info(result)
2399        except Exception as err:
2400            self.log.error(FAILURE.format(cmd, err))
2401
2402    def do_hfp_list_calls(self, line):
2403        """
2404        Description: List all calls known to the sl4f component on the DUT.
2405
2406        Input(s):
2407
2408        Usage:
2409          Examples:
2410            hfp_list_calls
2411        """
2412        cmd = "Lists all calls"
2413        try:
2414            result = self.pri_dut.hfp_lib.listCalls()
2415            self.log.info(pprint.pformat(result))
2416        except Exception as err:
2417            self.log.error(FAILURE.format(cmd, err))
2418
2419    def do_hfp_new_call(self, line):
2420        """
2421        Description: Simulate a call on the call manager
2422
2423        Input(s):
2424            remote: The number of the remote party on the simulated call
2425            state: The state of the call. Must be one of "ringing", "waiting",
2426                   "dialing", "alerting", "active", "held".
2427            direction: The direction of the call. Must be one of "incoming", "outgoing".
2428
2429        Usage:
2430          Examples:
2431            hfp_new_call <remote> <state> <direction>
2432            hfp_new_call 14085555555 active incoming
2433            hfp_new_call 14085555555 held outgoing
2434            hfp_new_call 14085555555 ringing incoming
2435            hfp_new_call 14085555555 waiting incoming
2436            hfp_new_call 14085555555 alerting outgoing
2437            hfp_new_call 14085555555 dialing outgoing
2438        """
2439        cmd = "Simulates a call"
2440        try:
2441            info = line.strip().split()
2442            if len(info) != 3:
2443                raise ValueError(
2444                    "Exactly three command line arguments required: <remote> <state> <direction>"
2445                )
2446            remote, state, direction = info[0], info[1], info[2]
2447            result = self.pri_dut.hfp_lib.newCall(remote, state, direction)
2448            self.log.info(result)
2449        except Exception as err:
2450            self.log.error(FAILURE.format(cmd, err))
2451
2452    def do_hfp_incoming_call(self, line):
2453        """
2454        Description: Simulate an incoming call on the call manager
2455
2456        Input(s):
2457            remote: The number of the remote party on the incoming call
2458
2459        Usage:
2460          Examples:
2461            hfp_incoming_call <remote>
2462            hfp_incoming_call 14085555555
2463        """
2464        cmd = "Simulates an incoming call"
2465        try:
2466            remote = line.strip()
2467            result = self.pri_dut.hfp_lib.initiateIncomingCall(remote)
2468            self.log.info(result)
2469        except Exception as err:
2470            self.log.error(FAILURE.format(cmd, err))
2471
2472    def do_hfp_waiting_call(self, line):
2473        """
2474        Description: Simulate an incoming call on the call manager when there is
2475        an onging active call already.
2476
2477        Input(s):
2478            remote: The number of the remote party on the incoming call
2479
2480        Usage:
2481          Examples:
2482            hfp_waiting_call <remote>
2483            hfp_waiting_call 14085555555
2484        """
2485        cmd = "Simulates an incoming call"
2486        try:
2487            remote = line.strip()
2488            result = self.pri_dut.hfp_lib.initiateIncomingWaitingCall(remote)
2489            self.log.info(result)
2490        except Exception as err:
2491            self.log.error(FAILURE.format(cmd, err))
2492
2493    def do_hfp_outgoing_call(self, line):
2494        """
2495        Description: Simulate an outgoing call on the call manager
2496
2497        Input(s):
2498            remote: The number of the remote party on the outgoing call
2499
2500        Usage:
2501          Examples:
2502            hfp_outgoing_call <remote>
2503        """
2504        cmd = "Simulates an outgoing call"
2505        try:
2506            remote = line.strip()
2507            result = self.pri_dut.hfp_lib.initiateOutgoingCall(remote)
2508            self.log.info(result)
2509        except Exception as err:
2510            self.log.error(FAILURE.format(cmd, err))
2511
2512    def do_hfp_set_call_active(self, line):
2513        """
2514        Description: Set the specified call to the "OngoingActive" state.
2515
2516        Input(s):
2517            call_id: The unique id of the call.
2518
2519        Usage:
2520          Examples:
2521            hfp_outgoing_call <call_id>
2522        """
2523        cmd = "Set the specified call to active"
2524        try:
2525            call_id = int(line.strip())
2526            result = self.pri_dut.hfp_lib.setCallActive(call_id)
2527            self.log.info(result)
2528        except Exception as err:
2529            self.log.error(FAILURE.format(cmd, err))
2530
2531    def do_hfp_set_call_held(self, line):
2532        """
2533        Description: Set the specified call to the "OngoingHeld" state.
2534
2535        Input(s):
2536            call_id: The unique id of the call.
2537
2538        Usage:
2539          Examples:
2540            hfp_outgoing_call <call_id>
2541        """
2542        cmd = "Set the specified call to held"
2543        try:
2544            call_id = int(line.strip())
2545            result = self.pri_dut.hfp_lib.setCallHeld(call_id)
2546            self.log.info(result)
2547        except Exception as err:
2548            self.log.error(FAILURE.format(cmd, err))
2549
2550    def do_hfp_set_call_terminated(self, line):
2551        """
2552        Description: Set the specified call to the "Terminated" state.
2553
2554        Input(s):
2555            call_id: The unique id of the call.
2556
2557        Usage:
2558          Examples:
2559            hfp_outgoing_call <call_id>
2560        """
2561        cmd = "Set the specified call to terminated"
2562        try:
2563            call_id = int(line.strip())
2564            result = self.pri_dut.hfp_lib.setCallTerminated(call_id)
2565            self.log.info(result)
2566        except Exception as err:
2567            self.log.error(FAILURE.format(cmd, err))
2568
2569    def do_hfp_set_call_transferred_to_ag(self, line):
2570        """
2571        Description: Set the specified call to the "TransferredToAg" state.
2572
2573        Input(s):
2574            call_id: The unique id of the call.
2575
2576        Usage:
2577          Examples:
2578            hfp_outgoing_call <call_id>
2579        """
2580        cmd = "Set the specified call to TransferredToAg"
2581        try:
2582            call_id = int(line.strip())
2583            result = self.pri_dut.hfp_lib.setCallTransferredToAg(call_id)
2584            self.log.info(result)
2585        except Exception as err:
2586            self.log.error(FAILURE.format(cmd, err))
2587
2588    def do_hfp_set_speaker_gain(self, line):
2589        """
2590        Description: Set the active peer's speaker gain.
2591
2592        Input(s):
2593            value: The gain value to set. Must be between 0-15 inclusive.
2594
2595        Usage:
2596          Examples:
2597            hfp_set_speaker_gain <value>
2598        """
2599        cmd = "Set the active peer's speaker gain"
2600        try:
2601            value = int(line.strip())
2602            result = self.pri_dut.hfp_lib.setSpeakerGain(value)
2603            self.log.info(result)
2604        except Exception as err:
2605            self.log.error(FAILURE.format(cmd, err))
2606
2607    def do_hfp_set_microphone_gain(self, line):
2608        """
2609        Description: Set the active peer's microphone gain.
2610
2611        Input(s):
2612            value: The gain value to set. Must be between 0-15 inclusive.
2613
2614        Usage:
2615          Examples:
2616            hfp_set_microphone_gain <value>
2617        """
2618        cmd = "Set the active peer's microphone gain"
2619        try:
2620            value = int(line.strip())
2621            result = self.pri_dut.hfp_lib.setMicrophoneGain(value)
2622            self.log.info(result)
2623        except Exception as err:
2624            self.log.error(FAILURE.format(cmd, err))
2625
2626    def do_hfp_set_service_available(self, line):
2627        """
2628        Description: Sets the simulated network service status reported by the call manager.
2629
2630        Input(s):
2631            value: "true" to set the network connection to available.
2632
2633        Usage:
2634          Examples:
2635            hfp_set_service_available <value>
2636            hfp_set_service_available true
2637            hfp_set_service_available false
2638        """
2639        cmd = "Sets the simulated network service status reported by the call manager"
2640        try:
2641            value = line.strip() == "true"
2642            result = self.pri_dut.hfp_lib.setServiceAvailable(value)
2643            self.log.info(result)
2644        except Exception as err:
2645            self.log.error(FAILURE.format(cmd, err))
2646
2647    def do_hfp_set_roaming(self, line):
2648        """
2649        Description: Sets the simulated roaming status reported by the call manager.
2650
2651        Input(s):
2652            value: "true" to set the network connection to roaming.
2653
2654        Usage:
2655          Examples:
2656            hfp_set_roaming <value>
2657            hfp_set_roaming true
2658            hfp_set_roaming false
2659        """
2660        cmd = "Sets the simulated roaming status reported by the call manager"
2661        try:
2662            value = line.strip() == "true"
2663            result = self.pri_dut.hfp_lib.setRoaming(value)
2664            self.log.info(result)
2665        except Exception as err:
2666            self.log.error(FAILURE.format(cmd, err))
2667
2668    def do_hfp_set_signal_strength(self, line):
2669        """
2670        Description: Sets the simulated signal strength reported by the call manager.
2671
2672        Input(s):
2673            value: The signal strength value to set. Must be between 0-5 inclusive.
2674
2675        Usage:
2676          Examples:
2677            hfp_set_signal_strength <value>
2678            hfp_set_signal_strength 0
2679            hfp_set_signal_strength 3
2680            hfp_set_signal_strength 5
2681        """
2682        cmd = "Sets the simulated signal strength reported by the call manager"
2683        try:
2684            value = int(line.strip())
2685            result = self.pri_dut.hfp_lib.setSignalStrength(value)
2686            self.log.info(result)
2687        except Exception as err:
2688            self.log.error(FAILURE.format(cmd, err))
2689
2690    def do_hfp_set_subscriber_number(self, line):
2691        """
2692        Description: Sets the subscriber number reported by the call manager.
2693
2694        Input(s):
2695            value: The subscriber number to set. Maximum length 128 characters.
2696
2697        Usage:
2698          Examples:
2699            hfp_set_subscriber_number <value>
2700            hfp_set_subscriber_number 14085555555
2701        """
2702        cmd = "Sets the subscriber number reported by the call manager"
2703        try:
2704            value = line.strip()
2705            result = self.pri_dut.hfp_lib.setSubscriberNumber(value)
2706            self.log.info(result)
2707        except Exception as err:
2708            self.log.error(FAILURE.format(cmd, err))
2709
2710    def do_hfp_set_operator(self, line):
2711        """
2712        Description: Sets the operator value reported by the call manager.
2713
2714        Input(s):
2715            value: The operator value to set. Maximum length 16 characters.
2716
2717        Usage:
2718          Examples:
2719            hfp_set_operator <value>
2720            hfp_set_operator GoogleFi
2721        """
2722        cmd = "Sets the operator value reported by the call manager"
2723        try:
2724            value = line.strip()
2725            result = self.pri_dut.hfp_lib.setOperator(value)
2726            self.log.info(result)
2727        except Exception as err:
2728            self.log.error(FAILURE.format(cmd, err))
2729
2730    def do_hfp_set_nrec_support(self, line):
2731        """
2732        Description: Sets the noise reduction/echo cancelation support reported by the call manager.
2733
2734        Input(s):
2735            value: The nrec support bool.
2736
2737        Usage:
2738          Examples:
2739            hfp_set_nrec_support <value>
2740            hfp_set_nrec_support true
2741            hfp_set_nrec_support false
2742        """
2743        cmd = "Sets the noise reduction/echo cancelation support reported by the call manager"
2744        try:
2745            value = line.strip() == "true"
2746            result = self.pri_dut.hfp_lib.setNrecSupport(value)
2747            self.log.info(result)
2748        except Exception as err:
2749            self.log.error(FAILURE.format(cmd, err))
2750
2751    def do_hfp_set_battery_level(self, line):
2752        """
2753        Description: Sets the battery level reported by the call manager.
2754
2755        Input(s):
2756            value: The integer battery level value. Must be 0-5 inclusive.
2757
2758        Usage:
2759          Examples:
2760            hfp_set_battery_level <value>
2761            hfp_set_battery_level 0
2762            hfp_set_battery_level 3
2763        """
2764        cmd = "Set the battery level reported by the call manager"
2765        try:
2766            value = int(line.strip())
2767            result = self.pri_dut.hfp_lib.setBatteryLevel(value)
2768            self.log.info(result)
2769        except Exception as err:
2770            self.log.error(FAILURE.format(cmd, err))
2771
2772    def do_hfp_set_last_dialed(self, line):
2773        """
2774        Description: Sets the last dialed number in the call manager.
2775
2776        Input(s):
2777            number: The number of the remote party.
2778
2779        Usage:
2780          Examples:
2781            hfp_set_last_dialed <number>
2782            hfp_set_last_dialed 14085555555
2783        """
2784        cmd = "Sets the last dialed number in the call manager."
2785        try:
2786            number = line.strip()
2787            result = self.pri_dut.hfp_lib.setLastDialed(number)
2788            self.log.info(result)
2789        except Exception as err:
2790            self.log.error(FAILURE.format(cmd, err))
2791
2792    def do_hfp_clear_last_dialed(self, line):
2793        """
2794        Description: Clears the last dialed number in the call manager.
2795
2796        Usage:
2797          Examples:
2798            hfp_clear_last_dialed
2799        """
2800        cmd = "Clears the last dialed number in the call manager."
2801        try:
2802            result = self.pri_dut.hfp_lib.clearLastDialed()
2803            self.log.info(result)
2804        except Exception as err:
2805            self.log.error(FAILURE.format(cmd, err))
2806
2807    def do_hfp_set_memory_location(self, line):
2808        """
2809        Description: Sets a memory location to point to a remote number.
2810
2811        Input(s):
2812            location: The memory location at which to store the number.
2813            number: The number of the remote party to be stored.
2814
2815        Usage:
2816          Examples:
2817            hfp_set_memory_location <location> <number>
2818            hfp_set_memory_location 0 14085555555
2819        """
2820        cmd = "Sets a memory location to point to a remote number."
2821        try:
2822            info = line.strip().split()
2823            if len(info) != 2:
2824                raise ValueError(
2825                    "Exactly two command line arguments required: <location> <number>"
2826                )
2827            location, number = info[0], info[1]
2828            result = self.pri_dut.hfp_lib.setMemoryLocation(location, number)
2829            self.log.info(result)
2830        except Exception as err:
2831            self.log.error(FAILURE.format(cmd, err))
2832
2833    def do_hfp_clear_memory_location(self, line):
2834        """
2835        Description: Sets a memory location to point to a remote number.
2836
2837        Input(s):
2838            localtion: The memory location to clear.
2839
2840        Usage:
2841          Examples:
2842            hfp_clear_memory_location <location>
2843            hfp_clear_memory_location 0
2844        """
2845        cmd = "Sets a memory location to point to a remote number."
2846        try:
2847            location = line.strip()
2848            result = self.pri_dut.hfp_lib.clearMemoryLocation(location)
2849            self.log.info(result)
2850        except Exception as err:
2851            self.log.error(FAILURE.format(cmd, err))
2852
2853    def do_hfp_set_dial_result(self, line):
2854        """
2855        Description: Sets the status result to be returned when the number is dialed.
2856
2857        Input(s):
2858            number: The number of the remote party.
2859            status: The status to be returned when an outgoing call is initiated to the number.
2860
2861        Usage:
2862          Examples:
2863            hfp_set_battery_level <value>
2864        """
2865        cmd = "Sets the status result to be returned when the number is dialed."
2866        try:
2867            info = line.strip().split()
2868            if len(info) != 2:
2869                raise ValueError(
2870                    "Exactly two command line arguments required: <number> <status>"
2871                )
2872            number, status = info[0], int(info[1])
2873            result = self.pri_dut.hfp_lib.setDialResult(number, status)
2874            self.log.info(pprint.pformat(result))
2875        except Exception as err:
2876            self.log.error(FAILURE.format(cmd, err))
2877
2878    def do_hfp_get_state(self, line):
2879        """
2880        Description: Get the call manager's complete state
2881
2882        Usage:
2883          Examples:
2884            hfp_get_state
2885        """
2886        cmd = "Get the call manager's state"
2887        try:
2888            result = self.pri_dut.hfp_lib.getState()
2889            self.log.info(pprint.pformat(result))
2890        except Exception as err:
2891            self.log.error(FAILURE.format(cmd, err))
2892
2893    def do_hfp_set_connection_behavior(self, line):
2894        """
2895        Description: Set the Service Level Connection (SLC) behavior when a new peer connects.
2896
2897        Input(s):
2898            autoconnect: Enable/Disable autoconnection of SLC.
2899
2900        Usage:
2901          Examples:
2902            hfp_set_connection_behavior <autoconnect>
2903            hfp_set_connection_behavior true
2904            hfp_set_connection_behavior false
2905        """
2906        cmd = "Set the Service Level Connection (SLC) behavior"
2907        try:
2908            autoconnect = line.strip().lower() == "true"
2909            result = self.pri_dut.hfp_lib.setConnectionBehavior(autoconnect)
2910            self.log.info(result)
2911        except Exception as err:
2912            self.log.error(FAILURE.format(cmd, err))
2913
2914    """End HFP wrappers"""
2915    """Begin RFCOMM wrappers"""
2916
2917    def do_rfcomm_init(self, line):
2918        """
2919        Description: Initialize the RFCOMM component services.
2920
2921        Usage:
2922          Examples:
2923            rfcomm_init
2924        """
2925        cmd = "Initialize RFCOMM proxy"
2926        try:
2927            result = self.pri_dut.rfcomm_lib.init()
2928            self.log.info(result)
2929        except Exception as err:
2930            self.log.error(FAILURE.format(cmd, err))
2931
2932    def do_rfcomm_remove_service(self, line):
2933        """
2934        Description: Removes the RFCOMM service in use.
2935
2936        Usage:
2937          Examples:
2938            rfcomm_remove_service
2939        """
2940        cmd = "Remove RFCOMM service"
2941        try:
2942            result = self.pri_dut.rfcomm_lib.removeService()
2943            self.log.info(result)
2944        except Exception as err:
2945            self.log.error(FAILURE.format(cmd, err))
2946
2947    def do_rfcomm_disconnect_session(self, line):
2948        """
2949        Description: Closes the RFCOMM Session.
2950
2951        Usage:
2952          Examples:
2953            rfcomm_disconnect_session
2954            rfcomm_disconnect_session
2955        """
2956        cmd = "Disconnect the RFCOMM Session"
2957        try:
2958            result = self.pri_dut.rfcomm_lib.disconnectSession(
2959                self.unique_mac_addr_id)
2960            self.log.info(result)
2961        except Exception as err:
2962            self.log.error(FAILURE.format(cmd, err))
2963
2964    def do_rfcomm_connect_rfcomm_channel(self, line):
2965        """
2966        Description: Make an outgoing RFCOMM connection.
2967
2968        Usage:
2969          Examples:
2970            rfcomm_connect_rfcomm_channel <server_channel_number>
2971            rfcomm_connect_rfcomm_channel 2
2972        """
2973        cmd = "Make an outgoing RFCOMM connection"
2974        try:
2975            server_channel_number = int(line.strip())
2976            result = self.pri_dut.rfcomm_lib.connectRfcommChannel(
2977                self.unique_mac_addr_id, server_channel_number)
2978            self.log.info(result)
2979        except Exception as err:
2980            self.log.error(FAILURE.format(cmd, err))
2981
2982    def do_rfcomm_disconnect_rfcomm_channel(self, line):
2983        """
2984        Description: Close the RFCOMM connection with the peer
2985
2986        Usage:
2987          Examples:
2988            rfcomm_disconnect_rfcomm_channel <server_channel_number>
2989            rfcomm_disconnect_rfcomm_channel 2
2990        """
2991        cmd = "Close the RFCOMM channel"
2992        try:
2993            server_channel_number = int(line.strip())
2994            result = self.pri_dut.rfcomm_lib.disconnectRfcommChannel(
2995                self.unique_mac_addr_id, server_channel_number)
2996            self.log.info(result)
2997        except Exception as err:
2998            self.log.error(FAILURE.format(cmd, err))
2999
3000    def do_rfcomm_send_remote_line_status(self, line):
3001        """
3002        Description: Send a remote line status for the RFCOMM channel.
3003
3004        Usage:
3005          Examples:
3006            rfcomm_send_remote_line_status <server_channel_number>
3007            rfcomm_send_remote_line_status 2
3008        """
3009        cmd = "Send a remote line status update for the RFCOMM channel"
3010        try:
3011            server_channel_number = int(line.strip())
3012            result = self.pri_dut.rfcomm_lib.sendRemoteLineStatus(
3013                self.unique_mac_addr_id, server_channel_number)
3014            self.log.info(result)
3015        except Exception as err:
3016            self.log.error(FAILURE.format(cmd, err))
3017
3018    def do_rfcomm_write_rfcomm(self, line):
3019        """
3020        Description: Send data over the RFCOMM channel.
3021
3022        Usage:
3023          Examples:
3024            rfcomm_write_rfcomm <server_channel_number> <data>
3025            rfcomm_write_rfcomm 2 foobar
3026        """
3027        cmd = "Send data using the RFCOMM channel"
3028        try:
3029            info = line.strip().split()
3030            if len(info) != 2:
3031                raise ValueError(
3032                    "Exactly two command line arguments required: <server_channel_number> <data>"
3033                )
3034            server_channel_number = int(info[0])
3035            data = info[1]
3036            result = self.pri_dut.rfcomm_lib.writeRfcomm(
3037                self.unique_mac_addr_id, server_channel_number, data)
3038            self.log.info(result)
3039        except Exception as err:
3040            self.log.error(FAILURE.format(cmd, err))
3041
3042    """End RFCOMM wrappers"""
3043