• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16"""
17Python script for wrappers to various libraries.
18"""
19from acts.test_utils.bt.bt_constants import bt_scan_mode_types
20from acts.test_utils.bt.bt_constants import gatt_server_responses
21import acts.test_utils.bt.gatt_test_database as gatt_test_database
22from acts.test_utils.bt.bt_carkit_lib import E2eBtCarkitLib
23
24import threading
25import time
26import cmd
27"""Various Global Strings"""
28CMD_LOG = "CMD {} result: {}"
29FAILURE = "CMD {} threw exception: {}"
30
31
32class CmdInput(cmd.Cmd):
33    android_devices = []
34    """Simple command processor for Bluetooth PTS Testing"""
35
36    def connect_hsp_helper(self, ad):
37        """A helper function for making HSP connections"""
38        end_time = time.time() + 20
39        connected_hsp_devices = len(ad.droid.bluetoothHspGetConnectedDevices())
40        while connected_hsp_devices != 1 and time.time() < end_time:
41            try:
42                ad.droid.bluetoothHspConnect(self.mac_addr)
43                time.sleep(3)
44                if len(ad.droid.bluetoothHspGetConnectedDevices() == 1):
45                    break
46            except Exception:
47                self.log.debug("Failed to connect hsp trying again...")
48            try:
49                ad.droid.bluetoothConnectBonded(self.mac_addr)
50            except Exception:
51                self.log.info("Failed to connect to bonded device...")
52            connected_hsp_devices = len(
53                ad.droid.bluetoothHspGetConnectedDevices())
54        if connected_hsp_devices != 1:
55            self.log.error("Failed to reconnect to HSP service...")
56            return False
57        self.log.info("Connected to HSP service...")
58        return True
59
60    def setup_vars(self, android_devices, mac_addr, log):
61        self.pri_dut = android_devices[0]
62        if len(android_devices) > 1:
63            self.sec_dut = android_devices[1]
64        if len(android_devices) > 2:
65            self.ter_dut = android_devices[2]
66        if len(android_devices) > 3:
67            self.qua_dut = android_devices[3]
68        self.mac_addr = mac_addr
69        self.log = log
70        self.pri_dut.bta.set_target_mac_addr(mac_addr)
71        self.pri_dut.gattc.set_target_mac_addr(mac_addr)
72        self.pri_dut.rfcomm.set_target_mac_addr(mac_addr)
73        self.bt_carkit_lib = E2eBtCarkitLib(self.log, mac_addr)
74
75    def emptyline(self):
76        pass
77
78    def do_EOF(self, line):
79        "End Script"
80        return True
81
82    def do_reset_mac_address(self, line):
83        """Reset target mac address for libraries based on the current connected device"""
84        try:
85            device = self.pri_dut.droid.bluetoothGetConnectedDevices()[0]
86            #self.setup_vars(self.android_devices, device['address'], self.log)
87            self.log.info("New device is {}".format(device))
88        except Exception as err:
89            self.log.info("Failed to setup new vars with {}".format(err))
90
91    """Begin GATT Client wrappers"""
92
93    def do_gattc_socket_conn_begin_connect_thread_psm(self, line):
94        cmd = ""
95        try:
96            self.pri_dut.gattc.socket_conn_begin_connect_thread_psm(line)
97        except Exception as err:
98            self.log.info(FAILURE.format(cmd, err))
99
100    def do_gattc_socket_conn_begin_accept_thread_psm(self, line):
101        cmd = ""
102        try:
103            self.pri_dut.gattc.socket_conn_begin_accept_thread_psm(line)
104        except Exception as err:
105            self.log.info(FAILURE.format(cmd, err))
106
107    def do_gattc_request_le_connection_parameters(self, line):
108        cmd = ""
109        try:
110            self.pri_dut.gattc.request_le_connection_parameters()
111        except Exception as err:
112            self.log.info(FAILURE.format(cmd, err))
113
114    def do_gattc_connect_over_le(self, line):
115        """Perform GATT connection over LE"""
116        cmd = "Gatt connect over LE"
117        try:
118            autoconnect = False
119            if line:
120                autoconnect = bool(line)
121            self.pri_dut.gattc.connect_over_le(autoconnect)
122        except Exception as err:
123            self.log.info(FAILURE.format(cmd, err))
124
125    def do_gattc_connect_over_bredr(self, line):
126        """Perform GATT connection over BREDR"""
127        cmd = "Gatt connect over BR/EDR"
128        try:
129            self.pri_dut.gattc.connect_over_bredr()
130        except Exception as err:
131            self.log.info(FAILURE.format(cmd, err))
132
133    def do_gattc_disconnect(self, line):
134        """Perform GATT disconnect"""
135        cmd = "Gatt Disconnect"
136        try:
137            self.pri_dut.gattc.disconnect()
138        except Exception as err:
139            self.log.info(FAILURE.format(cmd, err))
140
141    def do_gattc_read_char_by_uuid(self, line):
142        """GATT client read Characteristic by UUID."""
143        cmd = "GATT client read Characteristic by UUID."
144        try:
145            self.pri_dut.gattc.read_char_by_uuid(line)
146        except Exception as err:
147            self.log.info(FAILURE.format(cmd, err))
148
149    def do_gattc_request_mtu(self, line):
150        """Request MTU Change of input value"""
151        cmd = "Request MTU Value"
152        try:
153            self.pri_dut.gattc.request_mtu(line)
154        except Exception as err:
155            self.log.info(FAILURE.format(cmd, err))
156
157    def do_gattc_list_all_uuids(self, line):
158        """From the GATT Client, discover services and list all services,
159        chars and descriptors
160        """
161        cmd = "Discovery Services and list all UUIDS"
162        try:
163            self.pri_dut.gattc.list_all_uuids()
164        except Exception as err:
165            self.log.info(FAILURE.format(cmd, err))
166
167    def do_gattc_discover_services(self, line):
168        """GATT Client discover services of GATT Server"""
169        cmd = "Discovery Services of GATT Server"
170        try:
171            self.pri_dut.gattc.discover_services()
172        except Exception as err:
173            self.log.info(FAILURE.format(cmd, err))
174
175    def do_gattc_refresh(self, line):
176        """Perform Gatt Client Refresh"""
177        cmd = "GATT Client Refresh"
178        try:
179            self.pri_dut.gattc.refresh()
180        except Exception as err:
181            self.log.info(FAILURE.format(cmd, err))
182
183    def do_gattc_read_char_by_instance_id(self, line):
184        """From the GATT Client, discover services and list all services,
185        chars and descriptors
186        """
187        cmd = "GATT Client Read By Instance ID"
188        try:
189            self.pri_dut.gattc.read_char_by_instance_id(line)
190        except Exception as err:
191            self.log.info(FAILURE.format(cmd, err))
192
193    def do_gattc_write_char_by_instance_id(self, line):
194        """GATT Client Write to Characteristic by instance ID"""
195        cmd = "GATT Client write to Characteristic by instance ID"
196        try:
197            self.pri_dut.gattc.write_char_by_instance_id(line)
198        except Exception as err:
199            self.log.info(FAILURE.format(cmd, err))
200
201    def do_gattc_write_char_by_instance_id_value(self, line):
202        """GATT Client Write to Characteristic by instance ID"""
203        cmd = "GATT Client write to Characteristic by instance ID"
204        try:
205            self.pri_dut.gattc.write_char_by_instance_id_value(line)
206        except Exception as err:
207            self.log.info(FAILURE.format(cmd, err))
208
209    def do_gattc_mod_write_char_by_instance_id(self, line):
210        """GATT Client Write to Char that doesn't have write permission"""
211        cmd = "GATT Client Write to Char that doesn't have write permission"
212        try:
213            self.pri_dut.gattc.mod_write_char_by_instance_id(line)
214        except Exception as err:
215            self.log.info(FAILURE.format(cmd, err))
216
217    def do_gattc_write_invalid_char_by_instance_id(self, line):
218        """GATT Client Write to Char that doesn't exists"""
219        try:
220            self.pri_dut.gattc.write_invalid_char_by_instance_id(line)
221        except Exception as err:
222            self.log.info(FAILURE.format(cmd, err))
223
224    def do_gattc_mod_read_char_by_instance_id(self, line):
225        """GATT Client Read Char that doesn't have write permission"""
226        cmd = "GATT Client Read Char that doesn't have write permission"
227        try:
228            self.pri_dut.gattc.mod_read_char_by_instance_id(line)
229        except Exception as err:
230            self.log.info(FAILURE.format(cmd, err))
231
232    def do_gattc_read_invalid_char_by_instance_id(self, line):
233        """GATT Client Read Char that doesn't exists"""
234        cmd = "GATT Client Read Char that doesn't exists"
235        try:
236            self.pri_dut.gattc.read_invalid_char_by_instance_id(line)
237        except Exception as err:
238            self.log.info(FAILURE.format(cmd, err))
239
240    def do_gattc_mod_write_desc_by_instance_id(self, line):
241        """GATT Client Write to Desc that doesn't have write permission"""
242        cmd = "GATT Client Write to Desc that doesn't have write permission"
243        try:
244            self.pri_dut.gattc.mod_write_desc_by_instance_id(line)
245        except Exception as err:
246            self.log.info(FAILURE.format(cmd, err))
247
248    def do_gattc_write_invalid_desc_by_instance_id(self, line):
249        """GATT Client Write to Desc that doesn't exists"""
250        cmd = "GATT Client Write to Desc that doesn't exists"
251        try:
252            self.pri_dut.gattc.write_invalid_desc_by_instance_id(line)
253        except Exception as err:
254            self.log.info(FAILURE.format(cmd, err))
255
256    def do_gattc_mod_read_desc_by_instance_id(self, line):
257        """GATT Client Read Desc that doesn't have write permission"""
258        cmd = "GATT Client Read Desc that doesn't have write permission"
259        try:
260            self.pri_dut.gattc.mod_read_desc_by_instance_id(line)
261        except Exception as err:
262            self.log.info(FAILURE.format(cmd, err))
263
264    def do_gattc_read_invalid_desc_by_instance_id(self, line):
265        """GATT Client Read Desc that doesn't exists"""
266        cmd = "GATT Client Read Desc that doesn't exists"
267        try:
268            self.pri_dut.gattc.read_invalid_desc_by_instance_id(line)
269        except Exception as err:
270            self.log.info(FAILURE.format(cmd, err))
271
272    def do_gattc_mod_read_char_by_uuid_and_instance_id(self, line):
273        """GATT Client Read Char that doesn't have write permission"""
274        cmd = "GATT Client Read Char that doesn't have write permission"
275        try:
276            self.pri_dut.gattc.mod_read_char_by_uuid_and_instance_id(line)
277        except Exception as err:
278            self.log.info(FAILURE.format(cmd, err))
279
280    def do_gattc_read_invalid_char_by_uuid(self, line):
281        """GATT Client Read Char that doesn't exist"""
282        cmd = "GATT Client Read Char that doesn't exist"
283        try:
284            self.pri_dut.gattc.read_invalid_char_by_uuid(line)
285        except Exception as err:
286            self.log.info(FAILURE.format(cmd, err))
287
288    def do_gattc_write_desc_by_instance_id(self, line):
289        """GATT Client Write to Descriptor by instance ID"""
290        cmd = "GATT Client Write to Descriptor by instance ID"
291        try:
292            self.pri_dut.gattc.write_desc_by_instance_id(line)
293        except Exception as err:
294            self.log.info(FAILURE.format(cmd, err))
295
296    def do_gattc_enable_notification_desc_by_instance_id(self, line):
297        """GATT Client Enable Notification on Descriptor by instance ID"""
298        cmd = "GATT Client Enable Notification on Descriptor by instance ID"
299        try:
300            self.pri_dut.gattc.enable_notification_desc_by_instance_id(line)
301        except Exception as err:
302            self.log.info(FAILURE.format(cmd, err))
303
304    def do_gattc_enable_indication_desc_by_instance_id(self, line):
305        """GATT Client Enable Indication on Descriptor by instance ID"""
306        cmd = "GATT Client Enable Indication on Descriptor by instance ID"
307        try:
308            self.pri_dut.gattc.enable_indication_desc_by_instance_id(line)
309        except Exception as err:
310            self.log.info(FAILURE.format(cmd, err))
311
312    def do_gattc_char_enable_all_notifications(self, line):
313        """GATT Client enable all notifications"""
314        cmd = "GATT Client enable all notifications"
315        try:
316            self.pri_dut.gattc.char_enable_all_notifications()
317        except Exception as err:
318            self.log.info(FAILURE.format(cmd, err))
319
320    def do_gattc_read_char_by_invalid_instance_id(self, line):
321        """GATT Client read char by non-existant instance id"""
322        cmd = "GATT Client read char by non-existant instance id"
323        try:
324            self.pri_dut.gattc.read_char_by_invalid_instance_id(line)
325        except Exception as err:
326            self.log.info(FAILURE.format(cmd, err))
327
328    def do_gattc_begin_reliable_write(self, line):
329        """Begin a reliable write on the Bluetooth Gatt Client"""
330        cmd = "GATT Client Begin Reliable Write"
331        try:
332            self.pri_dut.gattc.begin_reliable_write()
333        except Exception as err:
334            self.log.info(FAILURE.format(cmd, err))
335
336    def do_gattc_abort_reliable_write(self, line):
337        """Abort a reliable write on the Bluetooth Gatt Client"""
338        cmd = "GATT Client Abort Reliable Write"
339        try:
340            self.pri_dut.gattc.abort_reliable_write()
341        except Exception as err:
342            self.log.info(FAILURE.format(cmd, err))
343
344    def do_gattc_execute_reliable_write(self, line):
345        """Execute a reliable write on the Bluetooth Gatt Client"""
346        cmd = "GATT Client Execute Reliable Write"
347        try:
348            self.pri_dut.gattc.execute_reliable_write()
349        except Exception as err:
350            self.log.info(FAILURE.format(cmd, err))
351
352    def do_gattc_read_all_char(self, line):
353        """GATT Client read all Characteristic values"""
354        cmd = "GATT Client read all Characteristic values"
355        try:
356            self.pri_dut.gattc.read_all_char()
357        except Exception as err:
358            self.log.info(FAILURE.format(cmd, err))
359
360    def do_gattc_write_all_char(self, line):
361        """Write to every Characteristic on the GATT server"""
362        cmd = "GATT Client Write All Characteristics"
363        try:
364            self.pri_dut.gattc.write_all_char(line)
365        except Exception as err:
366            self.log.info(FAILURE.format(cmd, err))
367
368    def do_gattc_write_all_desc(self, line):
369        """ Write to every Descriptor on the GATT server """
370        cmd = "GATT Client Write All Descriptors"
371        try:
372            self.pri_dut.gattc.write_all_desc(line)
373        except Exception as err:
374            self.log.info(FAILURE.format(cmd, err))
375
376    def do_gattc_write_desc_notification_by_instance_id(self, line):
377        """Write 0x00 or 0x02 to notification descriptor [instance_id]"""
378        cmd = "Write 0x00 0x02 to notification descriptor"
379        try:
380            self.pri_dut.gattc.write_desc_notification_by_instance_id(line)
381        except Exception as err:
382            self.log.info(FAILURE.format(cmd, err))
383
384    def do_gattc_discover_service_by_uuid(self, line):
385        """Discover service by uuid"""
386        cmd = "Discover service by uuid"
387        try:
388            self.pri_dut.gattc.discover_service_by_uuid(line)
389        except Exception as err:
390            self.log.info(FAILURE.format(cmd, err))
391
392    def do_gattc_read_all_desc(self, line):
393        """Read all Descriptor values"""
394        cmd = "Read all Descriptor values"
395        try:
396            self.pri_dut.gattc.read_all_desc()
397        except Exception as err:
398            self.log.info(FAILURE.format(cmd, err))
399
400    """End GATT Client wrappers"""
401    """Begin GATT Server wrappers"""
402
403    def do_gatts_close_bluetooth_gatt_servers(self, line):
404        """Close Bluetooth Gatt Servers"""
405        cmd = "Close Bluetooth Gatt Servers"
406        try:
407            self.pri_dut.gatts.close_bluetooth_gatt_servers()
408        except Exception as err:
409            self.log.info(FAILURE.format(cmd, err))
410
411    def complete_gatts_setup_database(self, text, line, begidx, endidx):
412        if not text:
413            completions = list(
414                gatt_test_database.GATT_SERVER_DB_MAPPING.keys())[:]
415        else:
416            completions = [
417                s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys()
418                if s.startswith(text)
419            ]
420        return completions
421
422    def complete_gatts_send_response(self, text, line, begidx, endidx):
423        """GATT Server database name completion"""
424        if not text:
425            completions = list(gatt_server_responses.keys())[:]
426        else:
427            completions = [
428                s for s in gatt_server_responses.keys() if s.startswith(text)
429            ]
430        return completions
431
432    def complete_gatts_send_continuous_response(self, text, line, begidx,
433                                                endidx):
434        """GATT Server database name completion"""
435        if not text:
436            completions = list(gatt_server_responses.keys())[:]
437        else:
438            completions = [
439                s for s in gatt_server_responses.keys() if s.startswith(text)
440            ]
441        return completions
442
443    def complete_gatts_send_continuous_response_data(self, text, line, begidx,
444                                                     endidx):
445        """GATT Server database name completion"""
446        if not text:
447            completions = list(gatt_server_responses.keys())[:]
448        else:
449            completions = [
450                s for s in gatt_server_responses.keys() if s.startswith(text)
451            ]
452        return completions
453
454    def do_gatts_list_all_uuids(self, line):
455        """From the GATT Client, discover services and list all services,
456        chars and descriptors
457        """
458        cmd = "Discovery Services and list all UUIDS"
459        try:
460            self.pri_dut.gatts.list_all_uuids()
461        except Exception as err:
462            self.log.info(FAILURE.format(cmd, err))
463
464    def do_gatts_send_response(self, line):
465        """Send a response to the GATT Client"""
466        cmd = "GATT server send response"
467        try:
468            self.pri_dut.gatts.send_response(line)
469        except Exception as err:
470            self.log.info(FAILURE.format(cmd, err))
471
472    def do_gatts_notify_characteristic_changed(self, line):
473        """Notify char changed by instance id [instance_id] [true|false]"""
474        cmd = "Notify characteristic changed by instance id"
475        try:
476            info = line.split()
477            instance_id = info[0]
478            confirm_str = info[1]
479            confirm = False
480            if confirm_str.lower() == 'true':
481                confirm = True
482            self.pri_dut.gatts.notify_characteristic_changed(
483                instance_id, confirm)
484        except Exception as err:
485            self.log.info(FAILURE.format(cmd, err))
486
487    def do_gatts_setup_database(self, line):
488        cmd = "Setup GATT Server database: {}".format(line)
489        try:
490            self.pri_dut.gatts.setup_gatts_db(
491                gatt_test_database.GATT_SERVER_DB_MAPPING.get(line))
492        except Exception as err:
493            self.log.info(FAILURE.format(cmd, err))
494
495    def do_gatts_characteristic_set_value_by_instance_id(self, line):
496        """Set Characteristic value by instance id"""
497        cmd = "Change value of a characteristic by instance id"
498        try:
499            info = line.split()
500            instance_id = info[0]
501            size = int(info[1])
502            value = []
503            for i in range(size):
504                value.append(i % 256)
505            self.pri_dut.gatts.characteristic_set_value_by_instance_id(
506                instance_id, value)
507        except Exception as err:
508            self.log.info(FAILURE.format(cmd, err))
509
510    def do_notify_characteristic_changed(self, line):
511        """Notify characteristic changed [instance_id] [confirm]"""
512        cmd = "Notify characteristic changed"
513        try:
514            info = line.split()
515            instance_id = info[0]
516            confirm = bool(info[1])
517            self.pri_dut.gatts.notify_characteristic_changed(
518                instance_id, confirm)
519        except Exception as err:
520            self.log.info(FAILURE.format(cmd, err))
521
522    def do_gatts_open(self, line):
523        """Open a GATT Server instance"""
524        cmd = "Open an empty GATT Server"
525        try:
526            self.pri_dut.gatts.open()
527        except Exception as err:
528            self.log.info(FAILURE.format(cmd, err))
529
530    def do_gatts_clear_services(self, line):
531        """Clear BluetoothGattServices from BluetoothGattServer"""
532        cmd = "Clear BluetoothGattServices from BluetoothGattServer"
533        try:
534            self.pri_dut.gatts.gatt_server_clear_services()
535        except Exception as err:
536            self.log.info(FAILURE.format(cmd, err))
537
538    def do_gatts_send_continuous_response(self, line):
539        """Send continous response with random data"""
540        cmd = "Send continous response with random data"
541        try:
542            self.pri_dut.gatts.send_continuous_response(line)
543        except Exception as err:
544            self.log.info(FAILURE.format(cmd, err))
545
546    def do_gatts_send_continuous_response_data(self, line):
547        """Send continous response including requested data"""
548        cmd = "Send continous response including requested data"
549        try:
550            self.pri_dut.gatts.send_continuous_response_data(line)
551        except Exception as err:
552            self.log.info(FAILURE.format(cmd, err))
553
554    """End GATT Server wrappers"""
555    """Begin Ble wrappers"""
556
557    def complete_ble_adv_data_include_local_name(self, text, line, begidx,
558                                                 endidx):
559        options = ['true', 'false']
560        if not text:
561            completions = list(options)[:]
562        else:
563            completions = [s for s in options if s.startswith(text)]
564        return completions
565
566    def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx,
567                                                     endidx):
568        options = ['true', 'false']
569        if not text:
570            completions = list(options)[:]
571        else:
572            completions = [s for s in options if s.startswith(text)]
573        return completions
574
575    def complete_ble_stop_advertisement(self, text, line, begidx, endidx):
576        str_adv_list = list(map(str, self.pri_dut.ble.ADVERTISEMENT_LIST))
577        if not text:
578            completions = str_adv_list[:]
579        else:
580            completions = [s for s in str_adv_list if s.startswith(text)]
581        return completions
582
583    def do_ble_start_generic_connectable_advertisement(self, line):
584        """Start a connectable LE advertisement"""
585        cmd = "Start a connectable LE advertisement"
586        try:
587            self.pri_dut.ble.start_generic_connectable_advertisement(line)
588        except Exception as err:
589            self.log.info(FAILURE.format(cmd, err))
590
591    def _start_max_advertisements(self, ad):
592        self.pri_dut.ble.start_max_advertisements(ad)
593
594    def do_ble_start_generic_connectable_beacon_swarm(self, line):
595        """Start a connectable LE advertisement"""
596        cmd = "Start as many advertisements as possible on all devices"
597        try:
598            threads = []
599            for ad in self.android_devices:
600                thread = threading.Thread(
601                    target=self._start_max_advertisements, args=([ad]))
602                threads.append(thread)
603                thread.start()
604            for t in threads:
605                t.join()
606        except Exception as err:
607            self.log.info(FAILURE.format(cmd, err))
608
609    def do_ble_start_connectable_advertisement_set(self, line):
610        """Start a connectable advertisement set"""
611        try:
612            self.pri_dut.ble.start_connectable_advertisement_set(line)
613        except Exception as err:
614            self.log.error("Failed to start advertisement: {}".format(err))
615
616    def do_ble_stop_all_advertisement_set(self, line):
617        """Stop all advertisement sets"""
618        try:
619            self.pri_dut.ble.stop_all_advertisement_set(line)
620        except Exception as err:
621            self.log.error("Failed to stop advertisement: {}".format(err))
622
623    def do_ble_adv_add_service_uuid_list(self, line):
624        """Add service UUID to the LE advertisement inputs:
625         [uuid1 uuid2 ... uuidN]"""
626        cmd = "Add a valid service UUID to the advertisement."
627        try:
628            self.pri_dut.ble.adv_add_service_uuid_list(line)
629        except Exception as err:
630            self.log.info(FAILURE.format(cmd, err))
631
632    def do_ble_adv_data_include_local_name(self, line):
633        """Include local name in the advertisement. inputs: [true|false]"""
634        cmd = "Include local name in the advertisement."
635        try:
636            self.pri_dut.ble.adv_data_include_local_name(line)
637        except Exception as err:
638            self.log.info(FAILURE.format(cmd, err))
639
640    def do_ble_adv_data_include_tx_power_level(self, line):
641        """Include tx power level in the advertisement. inputs: [true|false]"""
642        cmd = "Include local name in the advertisement."
643        try:
644            self.pri_dut.ble.adv_data_include_tx_power_level(line)
645        except Exception as err:
646            self.log.info(FAILURE.format(cmd, err))
647
648    def do_ble_adv_data_add_manufacturer_data(self, line):
649        """Include manufacturer id and data to the advertisment:
650        [id data1 data2 ... dataN]"""
651        cmd = "Include manufacturer id and data to the advertisment."
652        try:
653            self.pri_dut.ble.adv_data_add_manufacturer_data(line)
654        except Exception as err:
655            self.log.info(FAILURE.format(cmd, err))
656
657    def do_ble_start_generic_nonconnectable_advertisement(self, line):
658        """Start a nonconnectable LE advertisement"""
659        cmd = "Start a nonconnectable LE advertisement"
660        try:
661            self.pri_dut.ble.start_generic_nonconnectable_advertisement(line)
662        except Exception as err:
663            self.log.info(FAILURE.format(cmd, err))
664
665    def do_ble_list_active_advertisement_ids(self, line):
666        """List all active BLE advertisements"""
667        self.log.info("IDs: {}".format(self.pri_dut.ble.advertisement_list))
668
669    def do_ble_stop_all_advertisements(self, line):
670        """Stop all LE advertisements"""
671        cmd = "Stop all LE advertisements"
672        try:
673            self.pri_dut.ble.stop_all_advertisements(line)
674        except Exception as err:
675            self.log.info(FAILURE.format(cmd, err))
676
677    def do_ble_stop_advertisement(self, line):
678        """Stop an LE advertisement"""
679        cmd = "Stop a connectable LE advertisement"
680        try:
681            self.pri_dut.ble.ble_stop_advertisement(line)
682        except Exception as err:
683            self.log.info(FAILURE.format(cmd, err))
684
685    """End Ble wrappers"""
686    """Begin Bta wrappers"""
687
688    def complete_bta_start_pairing_helper(self, text, line, begidx, endidx):
689        options = ['true', 'false']
690        if not text:
691            completions = list(options)[:]
692        else:
693            completions = [s for s in options if s.startswith(text)]
694        return completions
695
696    def complete_bta_set_scan_mode(self, text, line, begidx, endidx):
697        completions = list(bt_scan_mode_types.keys())[:]
698        if not text:
699            completions = completions[:]
700        else:
701            completions = [s for s in completions if s.startswith(text)]
702        return completions
703
704    def do_bta_set_scan_mode(self, line):
705        """Set the Scan mode of the Bluetooth Adapter"""
706        cmd = "Set the Scan mode of the Bluetooth Adapter"
707        try:
708            self.pri_dut.bta.set_scan_mode(line)
709        except Exception as err:
710            self.log.info(FAILURE.format(cmd, err))
711
712    def do_bta_set_device_name(self, line):
713        """Set Bluetooth Adapter Name"""
714        cmd = "Set Bluetooth Adapter Name"
715        try:
716            self.pri_dut.bta.set_device_name(line)
717        except Exception as err:
718            self.log.info(FAILURE.format(cmd, err))
719
720    def do_bta_enable(self, line):
721        """Enable Bluetooth Adapter"""
722        cmd = "Enable Bluetooth Adapter"
723        try:
724            self.pri_dut.bta.enable()
725        except Exception as err:
726            self.log.info(FAILURE.format(cmd, err))
727
728    def do_bta_disable(self, line):
729        """Disable Bluetooth Adapter"""
730        cmd = "Disable Bluetooth Adapter"
731        try:
732            self.pri_dut.bta.disable()
733        except Exception as err:
734            self.log.info(FAILURE.format(cmd, err))
735
736    def do_bta_init_bond(self, line):
737        """Initiate bond to PTS device"""
738        cmd = "Initiate Bond"
739        try:
740            self.pri_dut.bta.init_bond()
741        except Exception as err:
742            self.log.info(FAILURE.format(cmd, err))
743
744    def do_bta_start_discovery(self, line):
745        """Start BR/EDR Discovery"""
746        cmd = "Start BR/EDR Discovery"
747        try:
748            self.pri_dut.bta.start_discovery()
749        except Exception as err:
750            self.log.info(FAILURE.format(cmd, err))
751
752    def do_bta_stop_discovery(self, line):
753        """Stop BR/EDR Discovery"""
754        cmd = "Stop BR/EDR Discovery"
755        try:
756            self.pri_dut.bta.stop_discovery()
757        except Exception as err:
758            self.log.info(FAILURE.format(cmd, err))
759
760    def do_bta_get_discovered_devices(self, line):
761        """Get Discovered Br/EDR Devices"""
762        cmd = "Get Discovered Br/EDR Devices\n"
763        try:
764            self.pri_dut.bta.get_discovered_devices()
765        except Exception as err:
766            self.log.info(FAILURE.format(cmd, err))
767
768    def do_bta_bond(self, line):
769        """Bond to PTS device"""
770        cmd = "Bond to the PTS dongle directly"
771        try:
772            self.pri_dut.bta.bond()
773        except Exception as err:
774            self.log.info(FAILURE.format(cmd, err))
775
776    def do_bta_disconnect(self, line):
777        """BTA disconnect"""
778        cmd = "BTA disconnect"
779        try:
780            self.pri_dut.bta.disconnect()
781        except Exception as err:
782            self.log.info(FAILURE.format(cmd, err))
783
784    def do_bta_unbond(self, line):
785        """Unbond from PTS device"""
786        cmd = "Unbond from the PTS dongle"
787        try:
788            self.pri_dut.bta.unbond()
789        except Exception as err:
790            self.log.info(FAILURE.format(cmd, err))
791
792    def do_bta_start_pairing_helper(self, line):
793        """Start or stop Bluetooth Pairing Helper"""
794        cmd = "Start or stop BT Pairing helper"
795        try:
796            self.pri_dut.bta.start_pairing_helper(line)
797        except Exception as err:
798            self.log.info(FAILURE.format(cmd, err))
799
800    def do_bta_push_pairing_pin(self, line):
801        """Push pairing pin to the Android Device"""
802        cmd = "Push the pin to the Android Device"
803        try:
804            self.pri_dut.bta.push_pairing_pin(line)
805        except Exception as err:
806            self.log.info(FAILURE.format(cmd, err))
807
808    def do_bta_get_pairing_pin(self, line):
809        """Get pairing PIN"""
810        cmd = "Get Pin Info"
811        try:
812            self.pri_dut.bta.get_pairing_pin()
813        except Exception as err:
814            self.log.info(FAILURE.format(cmd, err))
815
816    def do_bta_fetch_uuids_with_sdp(self, line):
817        """BTA fetch UUIDS with SDP"""
818        cmd = "Fetch UUIDS with SDP"
819        try:
820            self.pri_dut.bta.fetch_uuids_with_sdp()
821        except Exception as err:
822            self.log.info(FAILURE.format(cmd, err))
823
824    def do_bta_connect_profiles(self, line):
825        """Connect available profiles"""
826        cmd = "Connect all profiles possible"
827        try:
828            self.pri_dut.bta.connect_profiles()
829        except Exception as err:
830            self.log.info(FAILURE.format(cmd, err))
831
832    def do_bta_tts_speak(self, line):
833        cmd = "Open audio channel by speaking characters"
834        try:
835            self.pri_dut.bta.tts_speak()
836        except Exception as err:
837            self.log.info(FAILURE.format(cmd, err))
838
839    """End Bta wrappers"""
840    """Begin Rfcomm wrappers"""
841
842    def do_rfcomm_connect(self, line):
843        """Perform an RFCOMM connect"""
844        try:
845            self.pri_dut.rfcomm.connect(line)
846        except Exception as err:
847            self.log.info(FAILURE.format(cmd, err))
848
849    def do_rfcomm_open_rfcomm_socket(self, line):
850        """Open rfcomm socket"""
851        cmd = "Open RFCOMM socket"
852        try:
853            self.pri_dut.rfcomm.open_rfcomm_socket()
854        except Exception as err:
855            self.log.info(FAILURE.format(cmd, err))
856
857    def do_rfcomm_open_l2cap_socket(self, line):
858        """Open L2CAP socket"""
859        cmd = "Open L2CAP socket"
860        try:
861            self.pri_dut.rfcomm.open_l2cap_socket()
862        except Exception as err:
863            self.log.info(FAILURE.format(cmd, err))
864
865    def do_rfcomm_write(self, line):
866        cmd = "Write String data over an RFCOMM connection"
867        try:
868            self.pri_dut.rfcomm.write(line)
869        except Exception as err:
870            self.log.info(FAILURE.format(cmd, err))
871
872    def do_rfcomm_write_binary(self, line):
873        cmd = "Write String data over an RFCOMM connection"
874        try:
875            self.pri_dut.rfcomm.write_binary(line)
876        except Exception as err:
877            self.log.info(FAILURE.format(cmd, err))
878
879    def do_rfcomm_end_connect(self, line):
880        cmd = "End RFCOMM connection"
881        try:
882            self.pri_dut.rfcomm.end_connect()
883        except Exception as err:
884            self.log.info(FAILURE.format(cmd, err))
885
886    def do_rfcomm_accept(self, line):
887        cmd = "Accept RFCOMM connection"
888        try:
889            self.pri_dut.rfcomm.accept(line)
890        except Exception as err:
891            self.log.info(FAILURE.format(cmd, err))
892
893    def do_rfcomm_stop(self, line):
894        cmd = "STOP RFCOMM Connection"
895        try:
896            self.pri_dut.rfcomm.stop()
897        except Exception as err:
898            self.log.info(FAILURE.format(cmd, err))
899
900    def do_rfcomm_open_l2cap_socket(self, line):
901        """Open L2CAP socket"""
902        cmd = "Open L2CAP socket"
903        try:
904            self.pri_dut.rfcomm.open_l2cap_socket()
905        except Exception as err:
906            self.log.info(FAILURE.format(cmd, err))
907
908    """End Rfcomm wrappers"""
909    """Begin Config wrappers"""
910
911    def do_config_reset(self, line):
912        """Reset Bluetooth Config file"""
913        cmd = "Reset Bluetooth Config file"
914        try:
915            self.pri_dut.config.reset()
916        except Exception as err:
917            self.log.info(FAILURE.format(cmd, err))
918
919    def do_config_set_nonbond(self, line):
920        """Set NonBondable Mode"""
921        cmd = "Set NonBondable Mode"
922        try:
923            self.pri_dut.config.set_nonbond()
924        except Exception as err:
925            self.log.info(FAILURE.format(cmd, err))
926
927    def do_config_set_disable_mitm(self, line):
928        """Set Disable MITM"""
929        cmd = "Set Disable MITM"
930        try:
931            self.pri_dut.config.set_disable_mitm()
932        except Exception as err:
933            self.log.info(FAILURE.format(cmd, err))
934
935    """End Config wrappers"""
936    """Begin HFP/HSP wrapper"""
937
938    def do_bta_hfp_start_voice_recognition(self, line):
939        self.pri_dut.droid.bluetoothHspStartVoiceRecognition(self.mac_addr)
940
941    def do_bta_hfp_stop_voice_recognition(self, line):
942        self.pri_dut.droid.bluetoothHspStopVoiceRecognition(self.mac_addr)
943
944    def do_test_clcc_response(self, line):
945        # Experimental
946        clcc_index = 1
947        clcc_direction = 1
948        clcc_status = 4
949        clcc_mode = 0
950        clcc_mpty = False
951        clcc_number = "18888888888"
952        clcc_type = 0
953        self.pri_dut.droid.bluetoothHspClccResponse(
954            clcc_index, clcc_direction, clcc_status, clcc_mode, clcc_mpty,
955            clcc_number, clcc_type)
956
957    def do_bta_hsp_force_sco_audio_on(self, line):
958        """HFP/HSP Force SCO Audio ON"""
959        cmd = "HFP/HSP Force SCO Audio ON"
960        try:
961            if not self.pri_dut.droid.bluetoothHspForceScoAudio(True):
962                self.log.info(
963                    FAILURE.format(cmd,
964                                   "bluetoothHspForceScoAudio returned false"))
965        except Exception as err:
966            self.log.info(FAILURE.format(cmd, err))
967
968    def do_bta_hsp_force_sco_audio_off(self, line):
969        """HFP/HSP Force SCO Audio OFF"""
970        cmd = "HFP/HSP Force SCO Audio OFF"
971        try:
972            if not self.pri_dut.droid.bluetoothHspForceScoAudio(False):
973                self.log.info(
974                    FAILURE.format(cmd,
975                                   "bluetoothHspForceScoAudio returned false"))
976        except Exception as err:
977            self.log.info(FAILURE.format(cmd, err))
978
979    def do_bta_hsp_connect_audio(self, line):
980        """HFP/HSP connect audio"""
981        cmd = "HFP/HSP connect audio"
982        try:
983            if not self.pri_dut.droid.bluetoothHspConnectAudio(self.mac_addr):
984                self.log.info(
985                    FAILURE.format(
986                        cmd, "bluetoothHspConnectAudio returned false for " +
987                        self.mac_addr))
988        except Exception as err:
989            self.log.info(FAILURE.format(cmd, err))
990
991    def do_bta_hsp_disconnect_audio(self, line):
992        """HFP/HSP disconnect audio"""
993        cmd = "HFP/HSP disconnect audio"
994        try:
995            if not self.pri_dut.droid.bluetoothHspDisconnectAudio(
996                    self.mac_addr):
997                self.log.info(
998                    FAILURE.format(
999                        cmd,
1000                        "bluetoothHspDisconnectAudio returned false for " +
1001                        self.mac_addr))
1002        except Exception as err:
1003            self.log.info(FAILURE.format(cmd, err))
1004
1005    def do_bta_hsp_connect_slc(self, line):
1006        """HFP/HSP connect SLC with additional tries and help"""
1007        cmd = "Connect to hsp with some help"
1008        try:
1009            if not self.connect_hsp_helper(self.pri_dut):
1010                self.log.error("Failed to connect to HSP")
1011        except Exception as err:
1012            self.log.info(FAILURE.format(cmd, err))
1013
1014    def do_bta_hsp_disconnect_slc(self, line):
1015        """HFP/HSP disconnect SLC"""
1016        cmd = "HFP/HSP disconnect SLC"
1017        try:
1018            if not self.pri_dut.droid.bluetoothHspDisconnect(self.mac_addr):
1019                self.log.info(
1020                    FAILURE.format(
1021                        cmd, "bluetoothHspDisconnect returned false for " +
1022                        self.mac_addr))
1023        except Exception as err:
1024            self.log.info(FAILURE.format(cmd, err))
1025
1026    """End HFP/HSP wrapper"""
1027    """Begin HID wrappers"""
1028
1029    def do_hid_get_report(self, line):
1030        """Get HID Report"""
1031        cmd = "Get HID Report"
1032        try:
1033            self.pri_dut.droid.bluetoothHidGetReport(self.mac_addr, 1, 1, 1024)
1034        except Exception as err:
1035            self.log.info(FAILURE.format(cmd, err))
1036
1037    def do_hid_set_report(self, line):
1038        """Get HID Report"""
1039        cmd = "Get HID Report"
1040        try:
1041            self.pri_dut.droid.bluetoothHidSetReport(self.mac_addr, 1, "Test")
1042        except Exception as err:
1043            self.log.info(FAILURE.format(cmd, err))
1044
1045    def do_hid_virtual_unplug(self, line):
1046        """Get HID Report"""
1047        cmd = "Get HID Report"
1048        try:
1049            self.pri_dut.droid.bluetoothHidVirtualUnplug(self.mac_addr)
1050        except Exception as err:
1051            self.log.info(FAILURE.format(cmd, err))
1052
1053    def do_hid_send_report(self, line):
1054        """Get HID Report"""
1055        cmd = "Get HID Report"
1056        try:
1057            self.pri_dut.droid.bluetoothHidSendData(self.mac_addr, "42")
1058        except Exception as err:
1059            self.log.info(FAILURE.format(cmd, err))
1060
1061    def do_hid_set_protocol_mode(self, line):
1062        """HID set protocol mode (0 == report, 1 == boot, 225 == unsupported)"""
1063        cmd = "Set protocol mode (0 == report, 1 == boot, 225 == unsupported)"
1064        try:
1065            self.pri_dut.droid.bluetoothHidSetProtocolMode(self.mac_addr, int(line))
1066        except Exception as err:
1067            self.log.info(FAILURE.format(cmd, err))
1068
1069    """End HID wrappers"""
1070    """Begin carkit test wrappers"""
1071
1072    def do_test_suite_generic_bt_tests(self, line):
1073        """Run generic Bluetooth connection test suite"""
1074        generic_bt_tests = [
1075            tuple((self.bt_carkit_lib.disconnect_reconnect_multiple_iterations,
1076                   [self.pri_dut])),
1077            tuple((self.bt_carkit_lib.disconnect_a2dp_only_then_reconnect,
1078                   [self.pri_dut])),
1079            tuple((self.bt_carkit_lib.disconnect_hsp_only_then_reconnect,
1080                   [self.pri_dut])),
1081            tuple((
1082                self.bt_carkit_lib.disconnect_both_hsp_and_a2dp_then_reconnect,
1083                [self.pri_dut])),
1084        ]
1085        try:
1086            for func, param in generic_bt_tests:
1087                try:
1088                    func(param)
1089                except Exception:
1090                    self.log.info("Test {} failed.".format(func))
1091        except Exception as err:
1092            self.log.info(FAILURE.format(cmd, err))
1093
1094    def do_e2e_connect_hsp_helper(self, line):
1095        """Connect to HSP/HFP with additional tries and help"""
1096        cmd = "Connect to hsp with some help"
1097        try:
1098            self.bt_carkit_lib.connect_hsp_helper(self.pri_dut)
1099        except Exception as err:
1100            self.log.info(FAILURE.format(cmd, err))
1101
1102    def do_e2e_hfp_setup_multi_incomming_calls(self, line):
1103        """Setup two incomming calls"""
1104        cmd = "Setup multiple incomming calls"
1105        try:
1106            self.bt_carkit_lib.setup_multi_call(self.sec_dut, self.ter_dut,
1107                                                self.pri_dut)
1108        except Exception as err:
1109            self.log.info(FAILURE.format(cmd, err))
1110
1111    def do_e2e_disconnect_reconnect_multiple_iterations(self, line):
1112        """Quick disconnect/reconnect stress test"""
1113        try:
1114            self.bt_carkit_lib.disconnect_reconnect_multiple_iterations(
1115                self.pri_dut)
1116        except Exception as err:
1117            self.log.info(FAILURE.format(cmd, err))
1118
1119    def do_e2e_disconnect_a2dp_only_then_reconnect(self, line):
1120        """Test disconnect-reconnect a2dp only scenario from phone."""
1121        cmd = "Test disconnect-reconnect a2dp only scenario from phone."
1122        try:
1123            self.bt_carkit_lib.disconnect_a2dp_only_then_reconnect(
1124                self.pri_dut)
1125        except Exception as err:
1126            self.log.info(FAILURE.format(cmd, err))
1127
1128    def do_e2e_disconnect_hsp_only_then_reconnect(self, line):
1129        """Test disconnect-reconnect hsp only scenario from phone."""
1130        cmd = "Test disconnect-reconnect hsp only scenario from phone."
1131        try:
1132            self.bt_carkit_lib.disconnect_hsp_only_then_reconnect(self.pri_dut)
1133        except Exception as err:
1134            self.log.info(FAILURE.format(cmd, err))
1135
1136    def do_e2e_disconnect_both_hsp_and_a2dp_then_reconnect(self, line):
1137        """Test disconnect-reconnect hsp and a2dp scenario from phone."""
1138        cmd = "Test disconnect-reconnect hsp and a2dp scenario from phone."
1139        try:
1140            self.bt_carkit_lib.disconnect_both_hsp_and_a2dp_then_reconnect(
1141                self.pri_dut)
1142        except Exception as err:
1143            self.log.info(FAILURE.format(cmd, err))
1144
1145    def do_e2e_outgoing_call_private_number(self, line):
1146        """Test outgoing call scenario from phone to private number"""
1147        cmd = "Test outgoing call scenario from phone to private number"
1148        try:
1149            self.bt_carkit_lib.outgoing_call_private_number(
1150                self.pri_dut, self.ter_dut)
1151        except Exception as err:
1152            self.log.info(FAILURE.format(cmd, err))
1153
1154    def do_e2e_outgoing_call_a2dp_play_before_and_after(self, line):
1155        """Test outgoing call scenario while playing music. Music should resume
1156        after call."""
1157        cmd = "Test outgoing call scenario while playing music. Music should " \
1158              "resume after call."
1159        try:
1160            self.bt_carkit_lib.outgoing_call_a2dp_play_before_and_after(
1161                self.pri_dut, self.sec_dut)
1162        except Exception as err:
1163            self.log.info(FAILURE.format(cmd, err))
1164
1165    def do_e2e_outgoing_call_unknown_contact(self, line):
1166        """Test outgoing call scenario from phone to unknow contact"""
1167        cmd = "Test outgoing call scenario from phone to unknow contact"
1168        try:
1169            self.bt_carkit_lib.outgoing_call_unknown_contact(
1170                self.pri_dut, self.ter_dut)
1171        except Exception as err:
1172            self.log.info(FAILURE.format(cmd, err))
1173
1174    def do_e2e_incomming_call_private_number(self, line):
1175        """Test incomming call scenario to phone from unknown contact"""
1176        cmd = "Test incomming call scenario to phone from unknown contact"
1177        try:
1178            self.bt_carkit_lib.incomming_call_private_number(
1179                self.pri_dut, self.ter_dut)
1180        except Exception as err:
1181            self.log.info(FAILURE.format(cmd, err))
1182
1183    def do_e2e_outgoing_call_multiple_iterations(self, line):
1184        """Test outgoing call quickly 3 times"""
1185        cmd = "Test outgoing call quickly 3 times"
1186        try:
1187            self.bt_carkit_lib.outgoing_call_multiple_iterations(
1188                self.pri_dut, self.sec_dut)
1189        except Exception as err:
1190            self.log.info(FAILURE.format(cmd, err))
1191
1192    def do_e2e_outgoing_call_hsp_disabled_then_enabled_during_call(self, line):
1193        """Test outgoing call hsp disabled then enable during call."""
1194        cmd = "Test outgoing call hsp disabled then enable during call."
1195        try:
1196            self.bt_carkit_lib.outgoing_call_hsp_disabled_then_enabled_during_call(
1197                self.pri_dut, self.sec_dut)
1198        except Exception as err:
1199            self.log.info(FAILURE.format(cmd, err))
1200
1201    def do_e2e_call_audio_routes(self, line):
1202        """Test various audio routes scenario from phone."""
1203        cmd = "Test various audio routes scenario from phone."
1204        try:
1205            self.bt_carkit_lib.call_audio_routes(self.pri_dut, self.sec_dut)
1206        except Exception as err:
1207            self.log.info(FAILURE.format(cmd, err))
1208
1209    def do_e2e_sms_receive_different_sizes(self, line):
1210        """Test recieve sms of different sizes."""
1211        cmd = "Test recieve sms of different sizes."
1212        try:
1213            self.bt_carkit_lib.sms_receive_different_sizes(
1214                self.pri_dut, self.sec_dut)
1215        except Exception as err:
1216            self.log.info(FAILURE.format(cmd, err))
1217
1218    def do_e2e_sms_receive_multiple(self, line):
1219        """Test recieve sms of different sizes."""
1220        cmd = "Test recieve sms of different sizes."
1221        try:
1222            self.bt_carkit_lib.sms_receive_multiple(self.pri_dut, self.sec_dut)
1223        except Exception as err:
1224            self.log.info(FAILURE.format(cmd, err))
1225
1226    def do_e2e_sms_send_outgoing_texts(self, line):
1227        """Test send sms of different sizes."""
1228        cmd = "Test send sms of different sizes."
1229        try:
1230            self.bt_carkit_lib.sms_send_outgoing_texts(self.pri_dut,
1231                                                       self.sec_dut)
1232        except Exception as err:
1233            self.log.info(FAILURE.format(cmd, err))
1234
1235    def do_e2e_sms_during_incomming_call(self, line):
1236        """Test incomming call scenario to phone from unknown contact"""
1237        cmd = "Test incomming call scenario to phone from unknown contact"
1238        try:
1239            self.bt_carkit_lib.sms_during_incomming_call(
1240                self.pri_dut, self.sec_dut)
1241        except Exception as err:
1242            self.log.info(FAILURE.format(cmd, err))
1243
1244    def do_e2e_multi_incomming_call(self, line):
1245        """Test 2 incomming calls scenario to phone."""
1246        cmd = "Test 2 incomming calls scenario to phone."
1247        try:
1248            self.bt_carkit_lib.multi_incomming_call(self.pri_dut, self.sec_dut,
1249                                                    self.ter_dut)
1250        except Exception as err:
1251            self.log.info(FAILURE.format(cmd, err))
1252
1253    def do_e2e_multi_call_audio_routing(self, line):
1254        """Test 2 incomming calls scenario to phone, then test audio routing."""
1255        cmd = "Test 2 incomming calls scenario to phone, then test audio" \
1256            "routing."
1257        try:
1258            self.bt_carkit_lib.multi_call_audio_routing(
1259                self.pri_dut, self.sec_dut, self.ter_dut)
1260        except Exception as err:
1261            self.log.info(FAILURE.format(cmd, err))
1262
1263    def do_e2e_multi_call_swap_multiple_times(self, line):
1264        """Test 2 incomming calls scenario to phone, then swap the calls
1265        multiple times"""
1266        cmd = "Test 2 incomming calls scenario to phone, then swap the calls" \
1267            "multiple times"
1268        try:
1269            self.bt_carkit_lib.multi_call_swap_multiple_times(
1270                self.pri_dut, self.sec_dut, self.ter_dut)
1271        except Exception as err:
1272            self.log.info(FAILURE.format(cmd, err))
1273
1274    def do_e2e_multi_call_join_conference_call(self, line):
1275        """Test 2 incomming calls scenario to phone then join the calls."""
1276        cmd = "Test 2 incomming calls scenario to phone then join the calls."
1277        try:
1278            self.bt_carkit_lib.multi_call_join_conference_call(
1279                self.pri_dut, self.sec_dut, self.ter_dut)
1280        except Exception as err:
1281            self.log.info(FAILURE.format(cmd, err))
1282
1283    def do_e2e_multi_call_join_conference_call_hangup_conf_call(self, line):
1284        """Test 2 incomming calls scenario to phone then join the calls,
1285        then terminate the call from the primary dut."""
1286        cmd = "Test 2 incomming calls scenario to phone then join the calls, " \
1287            "then terminate the call from the primary dut."
1288        try:
1289            self.bt_carkit_lib.multi_call_join_conference_call_hangup_conf_call(
1290                self.pri_dut, self.sec_dut, self.ter_dut)
1291        except Exception as err:
1292            self.log.info(FAILURE.format(cmd, err))
1293
1294    def do_e2e_outgoing_multi_call_join_conference_call(self, line):
1295        """Test 2 outgoing calls scenario from phone then join the calls."""
1296        cmd = "Test 2 outgoing calls scenario from phone then join the calls."
1297        try:
1298            self.bt_carkit_lib.outgoing_multi_call_join_conference_call(
1299                self.pri_dut, self.sec_dut, self.ter_dut)
1300        except Exception as err:
1301            self.log.info(FAILURE.format(cmd, err))
1302
1303    def do_e2e_multi_call_join_conference_call_audio_routes(self, line):
1304        """Test 2 incomming calls scenario to phone then join the calls,
1305        then test different audio routes."""
1306        cmd = "Test 2 incomming calls scenario to phone then join the calls, " \
1307            "then test different audio routes."
1308        try:
1309            self.bt_carkit_lib.multi_call_join_conference_call_audio_routes(
1310                self.pri_dut, self.sec_dut, self.ter_dut)
1311        except Exception as err:
1312            self.log.info(FAILURE.format(cmd, err))
1313
1314    def do_e2e_avrcp_play_pause(self, line):
1315        """Test avrcp play/pause commands multiple times from phone"""
1316        cmd = "Test avrcp play/pause commands multiple times from phone"
1317        try:
1318            self.bt_carkit_lib.avrcp_play_pause(self.pri_dut)
1319        except Exception as err:
1320            self.log.info(FAILURE.format(cmd, err))
1321
1322    def do_e2e_avrcp_next_previous_song(self, line):
1323        """Test AVRCP go to the next song then the previous song."""
1324        cmd = "Test AVRCP go to the next song then the previous song."
1325        try:
1326            self.bt_carkit_lib.avrcp_next_previous_song(self.pri_dut)
1327        except Exception as err:
1328            self.log.info(FAILURE.format(cmd, err))
1329
1330    def do_e2e_avrcp_next_previous(self, line):
1331        """Test AVRCP go to the next song then the press previous after a few
1332        seconds."""
1333        cmd = "Test AVRCP go to the next song then the press previous after " \
1334            "a few seconds."
1335        try:
1336            self.bt_carkit_lib.avrcp_next_previous(self.pri_dut)
1337        except Exception as err:
1338            self.log.info(FAILURE.format(cmd, err))
1339
1340    def do_e2e_avrcp_next_repetative(self, line):
1341        """Test AVRCP go to the next 10 times"""
1342        cmd = "Test AVRCP go to the next 10 times"
1343        try:
1344            self.bt_carkit_lib.avrcp_next_repetative(self.pri_dut)
1345        except Exception as err:
1346            self.log.info(FAILURE.format(cmd, err))
1347
1348    def _process_question(self, question, expected_response):
1349        while True:
1350            try:
1351                result = input(question.format(level)).lower()
1352            except Exception as err:
1353                print(err)
1354
1355    def do_e2e_cycle_battery_level(self, line):
1356        """Cycle battery level through different values and verify result on carkit"""
1357        cmd = "Test that verifies battery level indicator changes with the " \
1358            "phone. Phone current level."
1359        try:
1360            self.bt_carkit_lib.cycle_battery_level(self.pri_dut)
1361        except Exception as err:
1362            self.log.info(FAILURE.format(cmd, err))
1363
1364    def do_e2e_cycle_absolute_volume_control(self, line):
1365        """Cycle media volume level through different values and verify result on carkit"""
1366        cmd = "Test aboslute volume on carkit by changed volume levels from phone."
1367        try:
1368            self.bt_carkit_lib.cycle_absolute_volume_control(self.pri_dut)
1369        except Exception as err:
1370            self.log.info(FAILURE.format(cmd, err))
1371
1372    def do_e2e_test_voice_recognition_from_phone(self, line):
1373        """Test Voice Recognition from phone."""
1374        cmd = "Test voice recognition from phone."
1375        try:
1376            self.bt_carkit_lib.test_voice_recognition_from_phone(self.pri_dut)
1377        except Exception as err:
1378            self.log.info(FAILURE.format(cmd, err))
1379
1380    def do_e2e_test_audio_and_voice_recognition_from_phone(self, line):
1381        """Test Voice Recognition from phone and confirm music audio continues."""
1382        cmd = "Test Voice Recognition from phone and confirm music audio continues."
1383        try:
1384            self.bt_carkit_lib.test_audio_and_voice_recognition_from_phone(
1385                self.pri_dut)
1386        except Exception as err:
1387            self.log.info(FAILURE.format(cmd, err))
1388
1389    """End carkit test wrappers"""
1390    """Begin adb shell test wrappers"""
1391
1392    def do_set_battery_level(self, line):
1393        """Set battery level based on input"""
1394        cmd = "Set battery level based on input"
1395        try:
1396            self.pri_dut.shell.set_battery_level(int(line))
1397        except Exception as err:
1398            self.log.info(FAILURE.format(cmd, err))
1399
1400    """End adb shell test wrappers"""
1401