1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Python script for wrappers to various libraries. 18 19Class CmdInput inherts from the cmd library. 20 21Functions that start with "do_" have a method 22signature that doesn't match the actual command 23line command and that is intended. This is so the 24"help" command knows what to display (in this case 25the documentation of the command itself). 26 27For example: 28Looking at the function "do_tool_set_target_device_name" 29has the inputs self and line which is expected of this type 30of method signature. When the "help" command is done on the 31method name you get the function documentation as such: 32 33(Cmd) help tool_set_target_device_name 34 35 Description: Reset the target device name. 36 Input(s): 37 device_name: Required. The advertising name to connect to. 38 Usage: tool_set_target_device_name new_target_device name 39 Examples: 40 tool_set_target_device_name le_watch 41 42This is all to say this documentation pattern is expected. 43 44""" 45 46from acts_contrib.test_utils.audio_analysis_lib.check_quality import quality_analysis 47from acts_contrib.test_utils.bt.bt_constants import audio_bits_per_sample_32 48from acts_contrib.test_utils.bt.bt_constants import audio_sample_rate_48000 49from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 50from acts_contrib.test_utils.bt.bt_constants import bt_attribute_values 51from acts_contrib.test_utils.bt.bt_constants import sig_appearance_constants 52from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants 53from acts_contrib.test_utils.fuchsia.sdp_records import sdp_pts_record_list 54 55import acts_contrib.test_utils.bt.gatt_test_database as gatt_test_database 56 57import cmd 58import pprint 59import time 60"""Various Global Strings""" 61BASE_UUID = sig_uuid_constants['BASE_UUID'] 62CMD_LOG = "CMD {} result: {}" 63FAILURE = "CMD {} threw exception: {}" 64BASIC_ADV_NAME = "fs_test" 65 66 67class CommandInput(cmd.Cmd): 68 ble_adv_interval = 1000 69 ble_adv_appearance = None 70 ble_adv_data_include_tx_power_level = False 71 ble_adv_include_name = True 72 ble_adv_include_scan_response = False 73 ble_adv_name = "fs_test" 74 ble_adv_data_manufacturer_data = None 75 ble_adv_data_service_data = None 76 ble_adv_data_service_uuid_list = None 77 ble_adv_data_uris = None 78 79 bt_control_ids = [] 80 bt_control_names = [] 81 bt_control_devices = [] 82 bt_scan_poll_timer = 0.5 83 target_device_name = "" 84 le_ids = [] 85 unique_mac_addr_id = None 86 87 def setup_vars(self, dut, target_device_name, log): 88 self.pri_dut = dut 89 # Note: test_dut is the start of a slow conversion from a Fuchsia specific 90 # Tool to an abstract_device tool. Only commands that use test_dut will work 91 # Otherwise this tool is primarially targeted at Fuchsia devices. 92 self.test_dut = create_bluetooth_device(self.pri_dut) 93 self.test_dut.initialize_bluetooth_controller() 94 self.target_device_name = target_device_name 95 self.log = log 96 97 def emptyline(self): 98 pass 99 100 def do_EOF(self, line): 101 "End Script" 102 return True 103 104 """ Useful Helper functions and cmd line tooling """ 105 106 def str_to_bool(self, s): 107 if s.lower() == 'true': 108 return True 109 elif s.lower() == 'false': 110 return False 111 112 def _find_unique_id_over_le(self): 113 scan_filter = {"name_substring": self.target_device_name} 114 self.unique_mac_addr_id = None 115 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 116 tries = 10 117 for i in range(tries): 118 time.sleep(self.bt_scan_poll_timer) 119 scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices( 120 )['result'] 121 for device in scan_res: 122 name, did, connectable = device["name"], device["id"], device[ 123 "connectable"] 124 if (self.target_device_name in name): 125 self.unique_mac_addr_id = did 126 self.log.info( 127 "Successfully found device: name, id: {}, {}".format( 128 name, did)) 129 break 130 if self.unique_mac_addr_id: 131 break 132 self.pri_dut.gattc_lib.bleStopBleScan() 133 134 def _find_unique_id_over_bt_control(self): 135 self.unique_mac_addr_id = None 136 self.bt_control_devices = [] 137 self.pri_dut.bts_lib.requestDiscovery(True) 138 tries = 10 139 for i in range(tries): 140 if self.unique_mac_addr_id: 141 break 142 time.sleep(self.bt_scan_poll_timer) 143 device_list = self.pri_dut.bts_lib.getKnownRemoteDevices( 144 )['result'] 145 for id_dict in device_list: 146 device = device_list[id_dict] 147 self.bt_control_devices.append(device) 148 name = None 149 if device['name'] is not None: 150 name = device['name'] 151 did, address = device['id'], device['address'] 152 153 self.bt_control_ids.append(did) 154 if name is not None: 155 self.bt_control_names.append(name) 156 if self.target_device_name in name: 157 self.unique_mac_addr_id = did 158 self.log.info( 159 "Successfully found device: name, id, address: {}, {}, {}" 160 .format(name, did, address)) 161 break 162 self.pri_dut.bts_lib.requestDiscovery(False) 163 164 def do_tool_take_bt_snoop_log(self, custom_name): 165 """ 166 Description: Takes the bt snoop log from the Fuchsia device. 167 Logs will show up in your config files' logpath directory. 168 169 Input(s): 170 custom_name: Optional. Override the default pcap file name. 171 172 Usage: tool_set_target_device_name new_target_device name 173 Examples: 174 tool_take_bt_snoop_log connection_error 175 tool_take_bt_snoop_log 176 """ 177 self.pri_dut.take_bt_snoop_log(custom_name) 178 179 def do_tool_refresh_unique_id(self, line): 180 """ 181 Description: Refresh command line tool mac unique id. 182 Usage: 183 Examples: 184 tool_refresh_unique_id 185 """ 186 try: 187 self._find_unique_id_over_le() 188 except Exception as err: 189 self.log.error( 190 "Failed to scan or find scan result: {}".format(err)) 191 192 def do_tool_refresh_unique_id_using_bt_control(self, line): 193 """ 194 Description: Refresh command line tool mac unique id. 195 Usage: 196 Examples: 197 tool_refresh_unique_id_using_bt_control 198 """ 199 try: 200 self._find_unique_id_over_bt_control() 201 except Exception as err: 202 self.log.error( 203 "Failed to scan or find scan result: {}".format(err)) 204 205 def do_tool_set_target_device_name(self, line): 206 """ 207 Description: Reset the target device name. 208 Input(s): 209 device_name: Required. The advertising name to connect to. 210 Usage: tool_set_target_device_name new_target_device name 211 Examples: 212 tool_set_target_device_name le_watch 213 """ 214 self.log.info("Setting target_device_name to: {}".format(line)) 215 self.target_device_name = line 216 217 def do_tool_set_unique_mac_addr_id(self, line): 218 """ 219 Description: Sets the unique mac address id (Specific to Fuchsia) 220 Input(s): 221 device_id: Required. The id to set the unique mac address id to 222 Usage: tool_set_unique_mac_addr_id device_id 223 Examples: 224 tool_set_unique_mac_addr_id 7fb2cae53aad9e0d 225 """ 226 self.unique_mac_addr_id = line 227 228 """Begin BLE advertise wrappers""" 229 230 def complete_ble_adv_data_include_name(self, text, line, begidx, endidx): 231 roles = ["true", "false"] 232 if not text: 233 completions = roles 234 else: 235 completions = [s for s in roles if s.startswith(text)] 236 return completions 237 238 def do_ble_adv_data_include_name(self, line): 239 cmd = "Include name in the advertisement." 240 try: 241 self.ble_adv_include_name = self.str_to_bool(line) 242 except Exception as err: 243 self.log.error(FAILURE.format(cmd, err)) 244 245 def do_ble_adv_data_set_name(self, line): 246 cmd = "Set the name to be included in the advertisement." 247 try: 248 self.ble_adv_name = line 249 except Exception as err: 250 self.log.error(FAILURE.format(cmd, err)) 251 252 def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx): 253 if not text: 254 completions = list(sig_appearance_constants.keys()) 255 else: 256 completions = [ 257 s for s in sig_appearance_constants.keys() 258 if s.startswith(text) 259 ] 260 return completions 261 262 def do_ble_adv_data_set_appearance(self, line): 263 cmd = "Set the appearance to known SIG values." 264 try: 265 self.ble_adv_appearance = line 266 except Exception as err: 267 self.log.error(FAILURE.format(cmd, err)) 268 269 def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx, 270 endidx): 271 options = ['true', 'false'] 272 if not text: 273 completions = list(options)[:] 274 else: 275 completions = [s for s in options if s.startswith(text)] 276 return completions 277 278 def do_ble_adv_data_include_tx_power_level(self, line): 279 """Include the tx_power_level in the advertising data. 280 Description: Adds tx_power_level to the advertisement data to the BLE 281 advertisement. 282 Input(s): 283 value: Required. True or False 284 Usage: ble_adv_data_include_tx_power_level bool_value 285 Examples: 286 ble_adv_data_include_tx_power_level true 287 ble_adv_data_include_tx_power_level false 288 """ 289 cmd = "Include tx_power_level in advertisement." 290 try: 291 self.ble_adv_data_include_tx_power_level = self.str_to_bool(line) 292 except Exception as err: 293 self.log.info(FAILURE.format(cmd, err)) 294 295 def complete_ble_adv_include_scan_response(self, text, line, begidx, 296 endidx): 297 options = ['true', 'false'] 298 if not text: 299 completions = list(options)[:] 300 else: 301 completions = [s for s in options if s.startswith(text)] 302 return completions 303 304 def do_ble_adv_include_scan_response(self, line): 305 """Include scan response in advertisement. inputs: [true|false] 306 Note: Currently just sets the scan response data to the 307 Advertisement data. 308 """ 309 cmd = "Include tx_power_level in advertisement." 310 try: 311 self.ble_adv_include_scan_response = self.str_to_bool(line) 312 except Exception as err: 313 self.log.info(FAILURE.format(cmd, err)) 314 315 def do_ble_adv_data_add_manufacturer_data(self, line): 316 """Include manufacturer id and data to the advertisment 317 Description: Adds manufacturer data to the BLE advertisement. 318 Input(s): 319 id: Required. The int representing the manufacturer id. 320 data: Required. The string representing the data. 321 Usage: ble_adv_data_add_manufacturer_data id data 322 Examples: 323 ble_adv_data_add_manufacturer_data 1 test 324 """ 325 cmd = "Include manufacturer id and data to the advertisment." 326 try: 327 328 info = line.split() 329 if self.ble_adv_data_manufacturer_data is None: 330 self.ble_adv_data_manufacturer_data = [] 331 self.ble_adv_data_manufacturer_data.append({ 332 "id": int(info[0]), 333 "data": info[1] 334 }) 335 except Exception as err: 336 self.log.info(FAILURE.format(cmd, err)) 337 338 def do_ble_adv_data_add_service_data(self, line): 339 """Include service data to the advertisment 340 Description: Adds service data to the BLE advertisement. 341 Input(s): 342 uuid: Required. The string representing the uuid. 343 data: Required. The string representing the data. 344 Usage: ble_adv_data_add_service_data uuid data 345 Examples: 346 ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test 347 """ 348 cmd = "Include manufacturer id and data to the advertisment." 349 try: 350 info = line.split() 351 if self.ble_adv_data_service_data is None: 352 self.ble_adv_data_service_data = [] 353 self.ble_adv_data_service_data.append({ 354 "uuid": info[0], 355 "data": info[1] 356 }) 357 except Exception as err: 358 self.log.info(FAILURE.format(cmd, err)) 359 360 def do_ble_adv_add_service_uuid_list(self, line): 361 """Include a list of service uuids to the advertisment: 362 Description: Adds service uuid list to the BLE advertisement. 363 Input(s): 364 uuid: Required. A list of N string UUIDs to add. 365 Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN 366 Examples: 367 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 368 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb 369 """ 370 cmd = "Include service uuid list to the advertisment data." 371 try: 372 self.ble_adv_data_service_uuid_list = line 373 except Exception as err: 374 self.log.info(FAILURE.format(cmd, err)) 375 376 def do_ble_adv_data_set_uris(self, uris): 377 """Set the URIs of the LE advertisement data: 378 Description: Adds list of String UIRs 379 See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986) 380 Valid URI examples: 381 ftp://ftp.is.co.za/rfc/rfc1808.txt 382 http://www.ietf.org/rfc/rfc2396.txt 383 ldap://[2001:db8::7]/c=GB?objectClass?one 384 mailto:John.Doe@example.com 385 news:comp.infosystems.www.servers.unix 386 tel:+1-816-555-1212 387 telnet://192.0.2.16:80/ 388 urn:oasis:names:specification:docbook:dtd:xml:4.1.2 389 Input(s): 390 uris: Required. A list of URIs to add. 391 Usage: ble_adv_data_set_uris uri0 uri1 ... uriN 392 Examples: 393 ble_adv_data_set_uris telnet://192.0.2.16:80/ 394 ble_adv_data_set_uris tel:+1-816-555-1212 395 """ 396 cmd = "Set the appearance to known SIG values." 397 try: 398 self.ble_adv_data_uris = uris.split() 399 except Exception as err: 400 self.log.error(FAILURE.format(cmd, err)) 401 402 def start_advertisement(self, connectable): 403 """ Handle setting advertising data and the advertisement 404 Note: After advertisement is successful, clears values set for 405 * Manufacturer data 406 * Appearance information 407 * Scan Response 408 * Service UUIDs 409 * URI list 410 Args: 411 connectable: Bool of whether to start a connectable 412 advertisement or not. 413 """ 414 adv_data_name = self.ble_adv_name 415 if not self.ble_adv_include_name: 416 adv_data_name = None 417 418 manufacturer_data = self.ble_adv_data_manufacturer_data 419 420 tx_power_level = None 421 if self.ble_adv_data_include_tx_power_level: 422 tx_power_level = 1 # Not yet implemented so set to 1 423 424 scan_response = self.ble_adv_include_scan_response 425 426 adv_data = { 427 "name": adv_data_name, 428 "appearance": self.ble_adv_appearance, 429 "service_data": self.ble_adv_data_service_data, 430 "tx_power_level": tx_power_level, 431 "service_uuids": self.ble_adv_data_service_uuid_list, 432 "manufacturer_data": manufacturer_data, 433 "uris": self.ble_adv_data_uris, 434 } 435 436 if not self.ble_adv_include_scan_response: 437 scan_response = None 438 else: 439 scan_response = adv_data 440 441 result = self.pri_dut.ble_lib.bleStartBleAdvertising( 442 adv_data, scan_response, self.ble_adv_interval, connectable) 443 self.log.info("Result of starting advertisement: {}".format(result)) 444 self.ble_adv_data_manufacturer_data = None 445 self.ble_adv_appearance = None 446 self.ble_adv_include_scan_response = False 447 self.ble_adv_data_service_uuid_list = None 448 self.ble_adv_data_uris = None 449 self.ble_adv_data_service_data = None 450 451 def do_ble_start_generic_connectable_advertisement(self, line): 452 """ 453 Description: Start a connectable LE advertisement 454 455 Usage: ble_start_generic_connectable_advertisement 456 """ 457 cmd = "Start a connectable LE advertisement" 458 try: 459 connectable = True 460 self.start_advertisement(connectable) 461 except Exception as err: 462 self.log.error(FAILURE.format(cmd, err)) 463 464 def do_ble_start_generic_nonconnectable_advertisement(self, line): 465 """ 466 Description: Start a non-connectable LE advertisement 467 468 Usage: ble_start_generic_nonconnectable_advertisement 469 """ 470 cmd = "Start a nonconnectable LE advertisement" 471 try: 472 connectable = False 473 self.start_advertisement(connectable) 474 except Exception as err: 475 self.log.error(FAILURE.format(cmd, err)) 476 477 def do_ble_stop_advertisement(self, line): 478 """ 479 Description: Stop a BLE advertisement. 480 Usage: ble_stop_advertisement 481 """ 482 cmd = "Stop a connectable LE advertisement" 483 try: 484 self.pri_dut.ble_lib.bleStopBleAdvertising() 485 except Exception as err: 486 self.log.error(FAILURE.format(cmd, err)) 487 488 """End BLE advertise wrappers""" 489 """Begin GATT client wrappers""" 490 491 def complete_gattc_connect_by_id(self, text, line, begidx, endidx): 492 if not text: 493 completions = list(self.le_ids)[:] 494 else: 495 completions = [s for s in self.le_ids if s.startswith(text)] 496 return completions 497 498 def do_gattc_connect_by_id(self, line): 499 """ 500 Description: Connect to a LE peripheral. 501 Input(s): 502 device_id: Required. The unique device ID from Fuchsia 503 discovered devices. 504 Usage: 505 Examples: 506 gattc_connect device_id 507 """ 508 cmd = "Connect to a LE peripheral by input ID." 509 try: 510 511 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 512 line) 513 self.log.info("Connection status: {}".format( 514 pprint.pformat(connection_status))) 515 except Exception as err: 516 self.log.error(FAILURE.format(cmd, err)) 517 518 def do_gattc_connect(self, line): 519 """ 520 Description: Connect to a LE peripheral. 521 Optional input: device_name 522 Input(s): 523 device_name: Optional. The peripheral ID to connect to. 524 Usage: 525 Examples: 526 gattc_connect 527 gattc_connect eddystone_123 528 """ 529 cmd = "Connect to a LE peripheral." 530 try: 531 if len(line) > 0: 532 self.target_device_name = line 533 self.unique_mac_addr_id = None 534 if not self.unique_mac_addr_id: 535 try: 536 self._find_unique_id() 537 except Exception as err: 538 self.log.info("Failed to scan or find device.") 539 return 540 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 541 self.unique_mac_addr_id) 542 self.log.info("Connection status: {}".format( 543 pprint.pformat(connection_status))) 544 except Exception as err: 545 self.log.error(FAILURE.format(cmd, err)) 546 547 def do_gattc_connect_disconnect_iterations(self, line): 548 """ 549 Description: Connect then disconnect to a LE peripheral multiple times. 550 Input(s): 551 iterations: Required. The number of iterations to run. 552 Usage: 553 Examples: 554 gattc_connect_disconnect_iterations 10 555 """ 556 cmd = "Connect to a LE peripheral." 557 try: 558 if not self.unique_mac_addr_id: 559 try: 560 self._find_unique_id() 561 except Exception as err: 562 self.log.info("Failed to scan or find device.") 563 return 564 for i in range(int(line)): 565 self.log.info("Running iteration {}".format(i + 1)) 566 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 567 self.unique_mac_addr_id) 568 self.log.info("Connection status: {}".format( 569 pprint.pformat(connection_status))) 570 time.sleep(4) 571 disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 572 self.unique_mac_addr_id) 573 self.log.info("Disconnect status: {}".format(disc_status)) 574 time.sleep(3) 575 except Exception as err: 576 self.log.error(FAILURE.format(cmd, err)) 577 578 def do_gattc_disconnect(self, line): 579 """ 580 Description: Disconnect from LE peripheral. 581 Assumptions: Already connected to a peripheral. 582 Usage: 583 Examples: 584 gattc_disconnect 585 """ 586 cmd = "Disconenct from LE peripheral." 587 try: 588 disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 589 self.unique_mac_addr_id) 590 self.log.info("Disconnect status: {}".format(disconnect_status)) 591 except Exception as err: 592 self.log.error(FAILURE.format(cmd, err)) 593 594 def do_gattc_list_services(self, discover_chars): 595 """ 596 Description: List services from LE peripheral. 597 Assumptions: Already connected to a peripheral. 598 Input(s): 599 discover_chars: Optional. An optional input to discover all 600 characteristics on the service. 601 Usage: 602 Examples: 603 gattc_list_services 604 gattc_list_services true 605 """ 606 cmd = "List services from LE peripheral." 607 try: 608 609 services = self.pri_dut.gattc_lib.listServices( 610 self.unique_mac_addr_id) 611 self.log.info("Discovered Services: \n{}".format( 612 pprint.pformat(services))) 613 discover_characteristics = self.str_to_bool(discover_chars) 614 if discover_chars: 615 for service in services.get('result'): 616 self.pri_dut.gattc_lib.connectToService( 617 self.unique_mac_addr_id, service.get('id')) 618 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 619 self.log.info("Discovered chars:\n{}".format( 620 pprint.pformat(chars))) 621 622 except Exception as err: 623 self.log.error(FAILURE.format(cmd, err)) 624 625 def do_gattc_connect_to_service(self, line): 626 """ 627 Description: Connect to Peripheral GATT server service. 628 Assumptions: Already connected to peripheral. 629 Input(s): 630 service_id: Required. The service id reference on the GATT server. 631 Usage: 632 Examples: 633 gattc_connect_to_service service_id 634 """ 635 cmd = "GATT client connect to GATT server service." 636 try: 637 self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id, 638 int(line)) 639 except Exception as err: 640 self.log.error(FAILURE.format(cmd, err)) 641 642 def do_gattc_discover_characteristics(self, line): 643 """ 644 Description: Discover characteristics from a connected service. 645 Assumptions: Already connected to a GATT server service. 646 Usage: 647 Examples: 648 gattc_discover_characteristics 649 """ 650 cmd = "Discover and list characteristics from a GATT server." 651 try: 652 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 653 self.log.info("Discovered chars:\n{}".format( 654 pprint.pformat(chars))) 655 except Exception as err: 656 self.log.error(FAILURE.format(cmd, err)) 657 658 def do_gattc_notify_all_chars(self, line): 659 """ 660 Description: Enable all notifications on all Characteristics on 661 a GATT server. 662 Assumptions: Basic GATT connection made. 663 Usage: 664 Examples: 665 gattc_notify_all_chars 666 """ 667 cmd = "Read all characteristics from the GATT service." 668 try: 669 services = self.pri_dut.gattc_lib.listServices( 670 self.unique_mac_addr_id) 671 for service in services['result']: 672 service_id = service['id'] 673 service_uuid = service['uuid_type'] 674 self.pri_dut.gattc_lib.connectToService( 675 self.unique_mac_addr_id, service_id) 676 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 677 print("Reading chars in service uuid: {}".format(service_uuid)) 678 679 for char in chars['result']: 680 char_id = char['id'] 681 char_uuid = char['uuid_type'] 682 # quick char filter for apple-4 test... remove later 683 print("found uuid {}".format(char_uuid)) 684 try: 685 self.pri_dut.gattc_lib.enableNotifyCharacteristic( 686 char_id) 687 except Exception as err: 688 print("error enabling notification") 689 except Exception as err: 690 self.log.error(FAILURE.format(cmd, err)) 691 692 def do_gattc_read_all_chars(self, line): 693 """ 694 Description: Read all Characteristic values from a GATT server across 695 all services. 696 Assumptions: Basic GATT connection made. 697 Usage: 698 Examples: 699 gattc_read_all_chars 700 """ 701 cmd = "Read all characteristics from the GATT service." 702 try: 703 services = self.pri_dut.gattc_lib.listServices( 704 self.unique_mac_addr_id) 705 for service in services['result']: 706 service_id = service['id'] 707 service_uuid = service['uuid_type'] 708 self.pri_dut.gattc_lib.connectToService( 709 self.unique_mac_addr_id, service_id) 710 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 711 print("Reading chars in service uuid: {}".format(service_uuid)) 712 713 for char in chars['result']: 714 char_id = char['id'] 715 char_uuid = char['uuid_type'] 716 try: 717 read_val = \ 718 self.pri_dut.gattc_lib.readCharacteristicById( 719 char_id) 720 print(" Characteristic uuid / Value: {} / {}".format( 721 char_uuid, read_val['result'])) 722 str_value = "" 723 for val in read_val['result']: 724 str_value += chr(val) 725 print(" str val: {}".format(str_value)) 726 except Exception as err: 727 print(err) 728 pass 729 except Exception as err: 730 self.log.error(FAILURE.format(cmd, err)) 731 732 def do_gattc_read_all_desc(self, line): 733 """ 734 Description: Read all Descriptors values from a GATT server across 735 all services. 736 Assumptions: Basic GATT connection made. 737 Usage: 738 Examples: 739 gattc_read_all_chars 740 """ 741 cmd = "Read all descriptors from the GATT service." 742 try: 743 services = self.pri_dut.gattc_lib.listServices( 744 self.unique_mac_addr_id) 745 for service in services['result']: 746 service_id = service['id'] 747 service_uuid = service['uuid_type'] 748 self.pri_dut.gattc_lib.connectToService( 749 self.unique_mac_addr_id, service_id) 750 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 751 print("Reading descs in service uuid: {}".format(service_uuid)) 752 753 for char in chars['result']: 754 char_id = char['id'] 755 char_uuid = char['uuid_type'] 756 descriptors = char['descriptors'] 757 print(" Reading descs in char uuid: {}".format(char_uuid)) 758 for desc in descriptors: 759 desc_id = desc["id"] 760 desc_uuid = desc["uuid_type"] 761 try: 762 read_val = self.pri_dut.gattc_lib.readDescriptorById( 763 desc_id) 764 print(" Descriptor uuid / Value: {} / {}".format( 765 desc_uuid, read_val['result'])) 766 except Exception as err: 767 pass 768 except Exception as err: 769 self.log.error(FAILURE.format(cmd, err)) 770 771 def do_gattc_write_all_desc(self, line): 772 """ 773 Description: Write a value to all Descriptors on the GATT server. 774 Assumptions: Basic GATT connection made. 775 Input(s): 776 offset: Required. The offset to start writing to. 777 size: Required. The size of bytes to write (value will be generated). 778 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 779 Usage: 780 Examples: 781 gattc_write_all_desc 0 100 782 gattc_write_all_desc 10 2 783 """ 784 cmd = "Read all descriptors from the GATT service." 785 try: 786 args = line.split() 787 if len(args) != 2: 788 self.log.info("2 Arguments required: [Offset] [Size]") 789 return 790 offset = int(args[0]) 791 size = args[1] 792 write_value = [] 793 for i in range(int(size)): 794 write_value.append(i % 256) 795 services = self.pri_dut.gattc_lib.listServices( 796 self.unique_mac_addr_id) 797 for service in services['result']: 798 service_id = service['id'] 799 service_uuid = service['uuid_type'] 800 self.pri_dut.gattc_lib.connectToService( 801 self.unique_mac_addr_id, service_id) 802 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 803 print("Writing descs in service uuid: {}".format(service_uuid)) 804 805 for char in chars['result']: 806 char_id = char['id'] 807 char_uuid = char['uuid_type'] 808 descriptors = char['descriptors'] 809 print(" Reading descs in char uuid: {}".format(char_uuid)) 810 for desc in descriptors: 811 desc_id = desc["id"] 812 desc_uuid = desc["uuid_type"] 813 try: 814 write_val = self.pri_dut.gattc_lib.writeDescriptorById( 815 desc_id, offset, write_value) 816 print(" Descriptor uuid / Result: {} / {}".format( 817 desc_uuid, write_val['result'])) 818 except Exception as err: 819 pass 820 except Exception as err: 821 self.log.error(FAILURE.format(cmd, err)) 822 823 def do_gattc_read_all_long_desc(self, line): 824 """ 825 Description: Read all long Characteristic Descriptors 826 Assumptions: Basic GATT connection made. 827 Input(s): 828 offset: Required. The offset to start reading from. 829 max_bytes: Required. The max size of bytes to return. 830 Usage: 831 Examples: 832 gattc_read_all_long_desc 0 100 833 gattc_read_all_long_desc 10 20 834 """ 835 cmd = "Read all long descriptors from the GATT service." 836 try: 837 args = line.split() 838 if len(args) != 2: 839 self.log.info("2 Arguments required: [Offset] [Size]") 840 return 841 offset = int(args[0]) 842 max_bytes = int(args[1]) 843 services = self.pri_dut.ble_lib.bleListServices( 844 self.unique_mac_addr_id) 845 for service in services['result']: 846 service_id = service['id'] 847 service_uuid = service['uuid_type'] 848 self.pri_dut.gattc_lib.connectToService( 849 self.unique_mac_addr_id, service_id) 850 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 851 print("Reading descs in service uuid: {}".format(service_uuid)) 852 853 for char in chars['result']: 854 char_id = char['id'] 855 char_uuid = char['uuid_type'] 856 descriptors = char['descriptors'] 857 print(" Reading descs in char uuid: {}".format(char_uuid)) 858 for desc in descriptors: 859 desc_id = desc["id"] 860 desc_uuid = desc["uuid_type"] 861 try: 862 read_val = self.pri_dut.gattc_lib.readLongDescriptorById( 863 desc_id, offset, max_bytes) 864 print(" Descriptor uuid / Result: {} / {}".format( 865 desc_uuid, read_val['result'])) 866 except Exception as err: 867 pass 868 except Exception as err: 869 self.log.error(FAILURE.format(cmd, err)) 870 871 def do_gattc_read_all_long_char(self, line): 872 """ 873 Description: Read all long Characteristic 874 Assumptions: Basic GATT connection made. 875 Input(s): 876 offset: Required. The offset to start reading from. 877 max_bytes: Required. The max size of bytes to return. 878 Usage: 879 Examples: 880 gattc_read_all_long_char 0 100 881 gattc_read_all_long_char 10 20 882 """ 883 cmd = "Read all long Characteristics from the GATT service." 884 try: 885 args = line.split() 886 if len(args) != 2: 887 self.log.info("2 Arguments required: [Offset] [Size]") 888 return 889 offset = int(args[0]) 890 max_bytes = int(args[1]) 891 services = self.pri_dut.ble_lib.bleListServices( 892 self.unique_mac_addr_id) 893 for service in services['result']: 894 service_id = service['id'] 895 service_uuid = service['uuid_type'] 896 self.pri_dut.gattc_lib.connectToService( 897 self.unique_mac_addr_id, service_id) 898 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 899 print("Reading chars in service uuid: {}".format(service_uuid)) 900 901 for char in chars['result']: 902 char_id = char['id'] 903 char_uuid = char['uuid_type'] 904 try: 905 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 906 char_id, offset, max_bytes) 907 print(" Char uuid / Result: {} / {}".format( 908 char_uuid, read_val['result'])) 909 except Exception as err: 910 pass 911 except Exception as err: 912 self.log.error(FAILURE.format(cmd, err)) 913 914 def do_gattc_write_all_chars(self, line): 915 """ 916 Description: Write all characteristic values from a GATT server across 917 all services. 918 Assumptions: Basic GATT connection made. 919 Input(s): 920 offset: Required. The offset to start writing on. 921 size: The write value size (value will be generated) 922 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 923 Usage: 924 Examples: 925 gattc_write_all_chars 0 10 926 gattc_write_all_chars 10 1 927 """ 928 cmd = "Read all characteristics from the GATT service." 929 try: 930 args = line.split() 931 if len(args) != 2: 932 self.log.info("2 Arguments required: [Offset] [Size]") 933 return 934 offset = int(args[0]) 935 size = int(args[1]) 936 write_value = [] 937 for i in range(size): 938 write_value.append(i % 256) 939 services = self.pri_dut.gattc_lib.listServices( 940 self.unique_mac_addr_id) 941 for service in services['result']: 942 service_id = service['id'] 943 service_uuid = service['uuid_type'] 944 self.pri_dut.gattc_lib.connectToService( 945 self.unique_mac_addr_id, service_id) 946 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 947 print("Writing chars in service uuid: {}".format(service_uuid)) 948 949 for char in chars['result']: 950 char_id = char['id'] 951 char_uuid = char['uuid_type'] 952 try: 953 write_result = self.pri_dut.gattc_lib.writeCharById( 954 char_id, offset, write_value) 955 print(" Characteristic uuid write result: {} / {}". 956 format(char_uuid, write_result['result'])) 957 except Exception as err: 958 print("error writing char {}".format(err)) 959 pass 960 except Exception as err: 961 self.log.error(FAILURE.format(cmd, err)) 962 963 def do_gattc_write_all_chars_without_response(self, line): 964 """ 965 Description: Write all characteristic values from a GATT server across 966 all services. 967 Assumptions: Basic GATT connection made. 968 Input(s): 969 size: The write value size (value will be generated). 970 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 971 Usage: 972 Examples: 973 gattc_write_all_chars_without_response 100 974 """ 975 cmd = "Read all characteristics from the GATT service." 976 try: 977 args = line.split() 978 if len(args) != 1: 979 self.log.info("1 Arguments required: [Size]") 980 return 981 size = int(args[0]) 982 write_value = [] 983 for i in range(size): 984 write_value.append(i % 256) 985 services = self.pri_dut.gattc_lib.listServices( 986 self.unique_mac_addr_id) 987 for service in services['result']: 988 service_id = service['id'] 989 service_uuid = service['uuid_type'] 990 self.pri_dut.gattc_lib.connectToService( 991 self.unique_mac_addr_id, service_id) 992 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 993 print("Reading chars in service uuid: {}".format(service_uuid)) 994 995 for char in chars['result']: 996 char_id = char['id'] 997 char_uuid = char['uuid_type'] 998 try: 999 write_result = \ 1000 self.pri_dut.gattc_lib.writeCharByIdWithoutResponse( 1001 char_id, write_value) 1002 print(" Characteristic uuid write result: {} / {}". 1003 format(char_uuid, write_result['result'])) 1004 except Exception as err: 1005 pass 1006 except Exception as err: 1007 self.log.error(FAILURE.format(cmd, err)) 1008 1009 def do_gattc_write_char_by_id(self, line): 1010 """ 1011 Description: Write char by characteristic id reference. 1012 Assumptions: Already connected to a GATT server service. 1013 Input(s): 1014 characteristic_id: The characteristic id reference on the GATT 1015 service 1016 offset: The offset value to use 1017 size: Function will generate random bytes by input size. 1018 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1019 Usage: 1020 Examples: 1021 gattc_write_char_by_id char_id 0 5 1022 gattc_write_char_by_id char_id 20 1 1023 """ 1024 cmd = "Write to GATT server characteristic ." 1025 try: 1026 args = line.split() 1027 if len(args) != 3: 1028 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1029 return 1030 id = int(args[0], 16) 1031 offset = int(args[1]) 1032 size = int(args[2]) 1033 write_value = [] 1034 for i in range(size): 1035 write_value.append(i % 256) 1036 self.test_dut.gatt_client_write_characteristic_by_handle( 1037 self.unique_mac_addr_id, id, offset, write_value) 1038 except Exception as err: 1039 self.log.error(FAILURE.format(cmd, err)) 1040 1041 def do_gattc_write_long_char_by_id(self, line): 1042 """ 1043 Description: Write long char by characteristic id reference. 1044 Assumptions: Already connected to a GATT server service. 1045 Input(s): 1046 characteristic_id: The characteristic id reference on the GATT 1047 service 1048 offset: The offset value to use 1049 size: Function will generate random bytes by input size. 1050 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1051 reliable_mode: Optional: Reliable writes represented as bool 1052 Usage: 1053 Examples: 1054 gattc_write_long_char_by_id char_id 0 5 1055 gattc_write_long_char_by_id char_id 20 1 1056 gattc_write_long_char_by_id char_id 20 1 true 1057 gattc_write_long_char_by_id char_id 20 1 false 1058 """ 1059 cmd = "Long Write to GATT server characteristic ." 1060 try: 1061 args = line.split() 1062 if len(args) < 3: 1063 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1064 return 1065 id = int(args[0], 16) 1066 offset = int(args[1]) 1067 size = int(args[2]) 1068 reliable_mode = False 1069 if len(args) > 3: 1070 reliable_mode = self.str_to_bool(args[3]) 1071 write_value = [] 1072 for i in range(size): 1073 write_value.append(i % 256) 1074 self.test_dut.gatt_client_write_long_characteristic_by_handle( 1075 self.unique_mac_addr_id, id, offset, write_value, 1076 reliable_mode) 1077 except Exception as err: 1078 self.log.error(FAILURE.format(cmd, err)) 1079 1080 def do_gattc_write_long_desc_by_id(self, line): 1081 """ 1082 Description: Write long char by descrioptor id reference. 1083 Assumptions: Already connected to a GATT server service. 1084 Input(s): 1085 characteristic_id: The characteristic id reference on the GATT 1086 service 1087 offset: The offset value to use 1088 size: Function will generate random bytes by input size. 1089 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1090 Usage: 1091 Examples: 1092 gattc_write_long_desc_by_id char_id 0 5 1093 gattc_write_long_desc_by_id char_id 20 1 1094 """ 1095 cmd = "Long Write to GATT server descriptor ." 1096 try: 1097 args = line.split() 1098 if len(args) != 3: 1099 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1100 return 1101 id = int(args[0], 16) 1102 offset = int(args[1]) 1103 size = int(args[2]) 1104 write_value = [] 1105 for i in range(size): 1106 write_value.append(i % 256) 1107 self.test_dut.gatt_client_write_long_descriptor_by_handle( 1108 self.unique_mac_addr_id, id, offset, write_value) 1109 except Exception as err: 1110 self.log.error(FAILURE.format(cmd, err)) 1111 1112 def do_gattc_write_char_by_id_without_response(self, line): 1113 """ 1114 Description: Write char by characteristic id reference without response. 1115 Assumptions: Already connected to a GATT server service. 1116 Input(s): 1117 characteristic_id: The characteristic id reference on the GATT 1118 service 1119 size: Function will generate random bytes by input size. 1120 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1121 Usage: 1122 Examples: 1123 gattc_write_char_by_id_without_response char_id 5 1124 """ 1125 cmd = "Write characteristic by id without response." 1126 try: 1127 args = line.split() 1128 if len(args) != 2: 1129 self.log.info("2 Arguments required: [Id] [Size]") 1130 return 1131 id = int(args[0], 16) 1132 size = args[1] 1133 write_value = [] 1134 for i in range(int(size)): 1135 write_value.append(i % 256) 1136 self.test_dut.gatt_client_write_characteristic_without_response_by_handle( 1137 self.unique_mac_addr_id, id, write_value) 1138 except Exception as err: 1139 self.log.error(FAILURE.format(cmd, err)) 1140 1141 def do_gattc_enable_notify_char_by_id(self, line): 1142 """ 1143 Description: Enable Characteristic notification on Characteristic ID. 1144 Assumptions: Already connected to a GATT server service. 1145 Input(s): 1146 characteristic_id: The characteristic id reference on the GATT 1147 service 1148 Usage: 1149 Examples: 1150 gattc_enable_notify_char_by_id char_id 1151 """ 1152 cmd = "Enable notifications by Characteristic id." 1153 try: 1154 id = int(line, 16) 1155 self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle( 1156 self.unique_mac_addr_id, id) 1157 except Exception as err: 1158 self.log.error(FAILURE.format(cmd, err)) 1159 1160 def do_gattc_disable_notify_char_by_id(self, line): 1161 """ 1162 Description: Disable Characteristic notification on Characteristic ID. 1163 Assumptions: Already connected to a GATT server service. 1164 Input(s): 1165 characteristic_id: The characteristic id reference on the GATT 1166 service 1167 Usage: 1168 Examples: 1169 gattc_disable_notify_char_by_id char_id 1170 """ 1171 cmd = "Disable notify Characteristic by id." 1172 try: 1173 id = int(line, 16) 1174 self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle( 1175 self.unique_mac_addr_id, id) 1176 except Exception as err: 1177 self.log.error(FAILURE.format(cmd, err)) 1178 1179 def do_gattc_read_char_by_id(self, line): 1180 """ 1181 Description: Read Characteristic by ID. 1182 Assumptions: Already connected to a GATT server service. 1183 Input(s): 1184 characteristic_id: The characteristic id reference on the GATT 1185 service 1186 Usage: 1187 Examples: 1188 gattc_read_char_by_id char_id 1189 """ 1190 cmd = "Read Characteristic value by ID." 1191 try: 1192 id = int(line, 16) 1193 read_val = self.test_dut.gatt_client_read_characteristic_by_handle( 1194 self.unique_mac_addr_id, id) 1195 self.log.info("Characteristic Value with id {}: {}".format( 1196 id, read_val)) 1197 except Exception as err: 1198 self.log.error(FAILURE.format(cmd, err)) 1199 1200 def do_gattc_read_char_by_uuid(self, characteristic_uuid): 1201 """ 1202 Description: Read Characteristic by UUID (read by type). 1203 Assumptions: Already connected to a GATT server service. 1204 Input(s): 1205 characteristic_uuid: The characteristic id reference on the GATT 1206 service 1207 Usage: 1208 Examples: 1209 gattc_read_char_by_id char_id 1210 """ 1211 cmd = "Read Characteristic value by ID." 1212 try: 1213 short_uuid_len = 4 1214 if len(characteristic_uuid) == short_uuid_len: 1215 characteristic_uuid = BASE_UUID.format(characteristic_uuid) 1216 1217 read_val = self.test_dut.gatt_client_read_characteristic_by_uuid( 1218 self.unique_mac_addr_id, characteristic_uuid) 1219 self.log.info("Characteristic Value with id {}: {}".format( 1220 id, read_val)) 1221 except Exception as err: 1222 self.log.error(FAILURE.format(cmd, err)) 1223 1224 def do_gattc_write_desc_by_id(self, line): 1225 """ 1226 Description: Write Descriptor by characteristic id reference. 1227 Assumptions: Already connected to a GATT server service. 1228 Input(s): 1229 descriptor_id: The Descriptor id reference on the GATT service 1230 offset: The offset value to use 1231 size: Function will generate random bytes by input size. 1232 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1233 Usage: 1234 Examples: 1235 gattc_write_desc_by_id desc_id 0 5 1236 gattc_write_desc_by_id desc_id 20 1 1237 """ 1238 cmd = "Write Descriptor by id." 1239 try: 1240 args = line.split() 1241 id = int(args[0], 16) 1242 offset = int(args[1]) 1243 size = args[2] 1244 write_value = [] 1245 for i in range(int(size)): 1246 write_value.append(i % 256) 1247 write_result = self.test_dut.gatt_client_write_descriptor_by_handle( 1248 self.unique_mac_addr_id, id, offset, write_value) 1249 self.log.info("Descriptor Write result {}: {}".format( 1250 id, write_result)) 1251 except Exception as err: 1252 self.log.error(FAILURE.format(cmd, err)) 1253 1254 def do_gattc_read_desc_by_id(self, line): 1255 """ 1256 Description: Read Descriptor by ID. 1257 Assumptions: Already connected to a GATT server service. 1258 Input(s): 1259 descriptor_id: The Descriptor id reference on the GATT service 1260 Usage: 1261 Examples: 1262 gattc_read_desc_by_id desc_id 1263 """ 1264 cmd = "Read Descriptor by ID." 1265 try: 1266 id = int(line, 16) 1267 read_val = self.test_dut.gatt_client_read_descriptor_by_handle( 1268 self.unique_mac_addr_id, id) 1269 self.log.info("Descriptor Value with id {}: {}".format( 1270 id, read_val)) 1271 except Exception as err: 1272 self.log.error(FAILURE.format(cmd, err)) 1273 1274 def do_gattc_read_long_char_by_id(self, line): 1275 """ 1276 Description: Read long Characteristic value by id. 1277 Assumptions: Already connected to a GATT server service. 1278 Input(s): 1279 characteristic_id: The characteristic id reference on the GATT 1280 service 1281 offset: The offset value to use. 1282 max_bytes: The max bytes size to return. 1283 Usage: 1284 Examples: 1285 gattc_read_long_char_by_id char_id 0 10 1286 gattc_read_long_char_by_id char_id 20 1 1287 """ 1288 cmd = "Read long Characteristic value by id." 1289 try: 1290 args = line.split() 1291 if len(args) != 3: 1292 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1293 return 1294 id = int(args[0], 16) 1295 offset = int(args[1]) 1296 max_bytes = int(args[2]) 1297 read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle( 1298 self.unique_mac_addr_id, id, offset, max_bytes) 1299 self.log.info("Characteristic Value with id {}: {}".format( 1300 id, read_val['result'])) 1301 1302 except Exception as err: 1303 self.log.error(FAILURE.format(cmd, err)) 1304 1305 """End GATT client wrappers""" 1306 """Begin LE scan wrappers""" 1307 1308 def _update_scan_results(self, scan_results): 1309 self.le_ids = [] 1310 for scan in scan_results['result']: 1311 self.le_ids.append(scan['id']) 1312 1313 def do_ble_start_scan(self, line): 1314 """ 1315 Description: Perform a BLE scan. 1316 Default filter name: "" 1317 Optional input: filter_device_name 1318 Usage: 1319 Examples: 1320 ble_start_scan 1321 ble_start_scan eddystone 1322 """ 1323 cmd = "Perform a BLE scan and list discovered devices." 1324 try: 1325 scan_filter = {"name_substring": ""} 1326 if line: 1327 scan_filter = {"name_substring": line} 1328 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 1329 except Exception as err: 1330 self.log.error(FAILURE.format(cmd, err)) 1331 1332 def do_ble_stop_scan(self, line): 1333 """ 1334 Description: Stops a BLE scan and returns discovered devices. 1335 Usage: 1336 Examples: 1337 ble_stop_scan 1338 """ 1339 cmd = "Stops a BLE scan and returns discovered devices." 1340 try: 1341 scan_results = self.pri_dut.gattc_lib.bleStopBleScan() 1342 self._update_scan_results(scan_results) 1343 self.log.info(pprint.pformat(scan_results)) 1344 except Exception as err: 1345 self.log.error(FAILURE.format(cmd, err)) 1346 1347 def do_ble_get_discovered_devices(self, line): 1348 """ 1349 Description: Get discovered LE devices of an active scan. 1350 Usage: 1351 Examples: 1352 ble_stop_scan 1353 """ 1354 cmd = "Get discovered LE devices of an active scan." 1355 try: 1356 scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices() 1357 self._update_scan_results(scan_results) 1358 self.log.info(pprint.pformat(scan_results)) 1359 except Exception as err: 1360 self.log.error(FAILURE.format(cmd, err)) 1361 1362 """End LE scan wrappers""" 1363 """Begin GATT Server wrappers""" 1364 1365 def do_gatts_close(self, line): 1366 """ 1367 Description: Close active GATT server. 1368 1369 Usage: 1370 Examples: 1371 gatts_close 1372 """ 1373 cmd = "Close active GATT server." 1374 try: 1375 result = self.pri_dut.gatts_lib.closeServer() 1376 self.log.info(result) 1377 except Exception as err: 1378 self.log.error(FAILURE.format(cmd, err)) 1379 1380 def complete_gatts_setup_database(self, text, line, begidx, endidx): 1381 if not text: 1382 completions = list( 1383 gatt_test_database.GATT_SERVER_DB_MAPPING.keys()) 1384 else: 1385 completions = [ 1386 s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys() 1387 if s.startswith(text) 1388 ] 1389 return completions 1390 1391 def do_gatts_setup_database(self, line): 1392 """ 1393 Description: Setup a Gatt server database based on pre-defined inputs. 1394 Supports Tab Autocomplete. 1395 Input(s): 1396 descriptor_db_name: The descriptor db name that matches one in 1397 acts_contrib.test_utils.bt.gatt_test_database 1398 Usage: 1399 Examples: 1400 gatts_setup_database LARGE_DB_1 1401 """ 1402 cmd = "Setup GATT Server Database Based of pre-defined dictionaries" 1403 try: 1404 scan_results = self.pri_dut.gatts_lib.publishServer( 1405 gatt_test_database.GATT_SERVER_DB_MAPPING.get(line)) 1406 self.log.info(scan_results) 1407 except Exception as err: 1408 self.log.error(FAILURE.format(cmd, err)) 1409 1410 """End GATT Server wrappers""" 1411 """Begin Bluetooth Controller wrappers""" 1412 1413 def complete_btc_pair(self, text, line, begidx, endidx): 1414 """ Provides auto-complete for btc_pair cmd. 1415 1416 See Cmd module for full description. 1417 """ 1418 arg_completion = len(line.split(" ")) - 1 1419 pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE'] 1420 bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE'] 1421 transport_options = ['BREDR', 'LE'] 1422 if arg_completion == 1: 1423 if not text: 1424 completions = pairing_security_level_options 1425 else: 1426 completions = [ 1427 s for s in pairing_security_level_options 1428 if s.startswith(text) 1429 ] 1430 return completions 1431 if arg_completion == 2: 1432 if not text: 1433 completions = bondable_options 1434 else: 1435 completions = [ 1436 s for s in bondable_options if s.startswith(text) 1437 ] 1438 return completions 1439 if arg_completion == 3: 1440 if not text: 1441 completions = transport_options 1442 else: 1443 completions = [ 1444 s for s in transport_options if s.startswith(text) 1445 ] 1446 return completions 1447 1448 def do_btc_pair(self, line): 1449 """ 1450 Description: Sends an outgoing pairing request. 1451 1452 Input(s): 1453 pairing security level: ENCRYPTED, AUTHENTICATED, or NONE 1454 bondable: BONDABLE, NON_BONDABLE, or NONE 1455 transport: BREDR or LE 1456 1457 Usage: 1458 Examples: 1459 btc_pair NONE NONE BREDR 1460 btc_pair ENCRYPTED NONE LE 1461 btc_pair AUTHENTICATED NONE LE 1462 btc_pair NONE NON_BONDABLE BREDR 1463 """ 1464 cmd = "Send an outgoing pairing request." 1465 pairing_security_level_mapping = { 1466 "ENCRYPTED": 1, 1467 "AUTHENTICATED": 2, 1468 "NONE": None, 1469 } 1470 1471 bondable_mapping = { 1472 "BONDABLE": True, 1473 "NON_BONDABLE": False, 1474 "NONE": None, 1475 } 1476 1477 transport_mapping = { 1478 "BREDR": 1, 1479 "LE": 2, 1480 } 1481 1482 try: 1483 options = line.split(" ") 1484 result = self.test_dut.init_pair( 1485 self.unique_mac_addr_id, 1486 pairing_security_level_mapping.get(options[0]), 1487 bondable_mapping.get(options[1]), 1488 transport_mapping.get(options[2]), 1489 ) 1490 self.log.info(result) 1491 except Exception as err: 1492 self.log.error(FAILURE.format(cmd, err)) 1493 1494 def complete_btc_accept_pairing(self, text, line, begidx, endidx): 1495 """ Provides auto-complete for btc_set_io_capabilities cmd. 1496 1497 See Cmd module for full description. 1498 """ 1499 arg_completion = len(line.split(" ")) - 1 1500 input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD'] 1501 output_options = ['NONE', 'DISPLAY'] 1502 if arg_completion == 1: 1503 if not text: 1504 completions = input_options 1505 else: 1506 completions = [s for s in input_options if s.startswith(text)] 1507 return completions 1508 if arg_completion == 2: 1509 if not text: 1510 completions = output_options 1511 else: 1512 completions = [s for s in output_options if s.startswith(text)] 1513 return completions 1514 1515 def do_btc_accept_pairing(self, line): 1516 """ 1517 Description: Accept all incoming pairing requests. 1518 1519 Input(s): 1520 input: String - The input I/O capabilities to use 1521 Available Values: 1522 NONE - Input capability type None 1523 CONFIRMATION - Input capability type confirmation 1524 KEYBOARD - Input capability type Keyboard 1525 output: String - The output I/O Capabilities to use 1526 Available Values: 1527 NONE - Output capability type None 1528 DISPLAY - output capability type Display 1529 1530 Usage: 1531 Examples: 1532 btc_accept_pairing 1533 btc_accept_pairing NONE DISPLAY 1534 btc_accept_pairing NONE NONE 1535 btc_accept_pairing KEYBOARD DISPLAY 1536 """ 1537 cmd = "Accept incoming pairing requests" 1538 try: 1539 input_capabilities = "NONE" 1540 output_capabilities = "NONE" 1541 options = line.split(" ") 1542 if len(options) > 1: 1543 input_capabilities = options[0] 1544 output_capabilities = options[1] 1545 result = self.pri_dut.bts_lib.acceptPairing( 1546 input_capabilities, output_capabilities) 1547 self.log.info(result) 1548 except Exception as err: 1549 self.log.error(FAILURE.format(cmd, err)) 1550 1551 def do_btc_forget_device(self, line): 1552 """ 1553 Description: Forget pairing of the current device under test. 1554 Current device under test is the device found by 1555 tool_refresh_unique_id from custom user param. This function 1556 will also perform a clean disconnect if actively connected. 1557 1558 Usage: 1559 Examples: 1560 btc_forget_device 1561 """ 1562 cmd = "For pairing of the current device under test." 1563 try: 1564 self.log.info("Forgetting device id: {}".format( 1565 self.unique_mac_addr_id)) 1566 result = self.pri_dut.bts_lib.forgetDevice(self.unique_mac_addr_id) 1567 self.log.info(result) 1568 except Exception as err: 1569 self.log.error(FAILURE.format(cmd, err)) 1570 1571 def do_btc_set_discoverable(self, discoverable): 1572 """ 1573 Description: Change Bluetooth Controller discoverablility. 1574 Input(s): 1575 discoverable: true to set discoverable 1576 false to set non-discoverable 1577 Usage: 1578 Examples: 1579 btc_set_discoverable true 1580 btc_set_discoverable false 1581 """ 1582 cmd = "Change Bluetooth Controller discoverablility." 1583 try: 1584 result = self.test_dut.set_discoverable( 1585 self.str_to_bool(discoverable)) 1586 self.log.info(result) 1587 except Exception as err: 1588 self.log.error(FAILURE.format(cmd, err)) 1589 1590 def do_btc_set_name(self, name): 1591 """ 1592 Description: Change Bluetooth Controller local name. 1593 Input(s): 1594 name: The name to set the Bluetooth Controller name to. 1595 1596 Usage: 1597 Examples: 1598 btc_set_name fs_test 1599 """ 1600 cmd = "Change Bluetooth Controller local name." 1601 try: 1602 result = self.test_dut.set_bluetooth_local_name(name) 1603 self.log.info(result) 1604 except Exception as err: 1605 self.log.error(FAILURE.format(cmd, err)) 1606 1607 def do_btc_request_discovery(self, discover): 1608 """ 1609 Description: Change whether the Bluetooth Controller is in active. 1610 discovery or not. 1611 Input(s): 1612 discover: true to start discovery 1613 false to end discovery 1614 Usage: 1615 Examples: 1616 btc_request_discovery true 1617 btc_request_discovery false 1618 """ 1619 cmd = "Change whether the Bluetooth Controller is in active." 1620 try: 1621 result = self.pri_dut.bts_lib.requestDiscovery( 1622 self.str_to_bool(discover)) 1623 self.log.info(result) 1624 except Exception as err: 1625 self.log.error(FAILURE.format(cmd, err)) 1626 1627 def do_btc_get_known_remote_devices(self, line): 1628 """ 1629 Description: Get a list of known devices. 1630 1631 Usage: 1632 Examples: 1633 btc_get_known_remote_devices 1634 """ 1635 cmd = "Get a list of known devices." 1636 self.bt_control_devices = [] 1637 try: 1638 device_list = self.pri_dut.bts_lib.getKnownRemoteDevices( 1639 )['result'] 1640 for id_dict in device_list: 1641 device = device_list[id_dict] 1642 self.bt_control_devices.append(device) 1643 self.log.info("Device found {}".format(device)) 1644 1645 except Exception as err: 1646 self.log.error(FAILURE.format(cmd, err)) 1647 1648 def do_btc_forget_all_known_devices(self, line): 1649 """ 1650 Description: Forget all known devices. 1651 1652 Usage: 1653 Examples: 1654 btc_forget_all_known_devices 1655 """ 1656 cmd = "Forget all known devices." 1657 try: 1658 device_list = self.pri_dut.bts_lib.getKnownRemoteDevices( 1659 )['result'] 1660 for device in device_list: 1661 d = device_list[device] 1662 if d['bonded'] or d['connected']: 1663 self.log.info("Unbonding deivce: {}".format(d)) 1664 self.log.info( 1665 self.pri_dut.bts_lib.forgetDevice(d['id'])['result']) 1666 except Exception as err: 1667 self.log.error(FAILURE.format(cmd, err)) 1668 1669 def do_btc_connect_device(self, line): 1670 """ 1671 Description: Connect to device under test. 1672 Device under test is specified by either user params 1673 or 1674 tool_set_target_device_name <name> 1675 do_tool_refresh_unique_id_using_bt_control 1676 1677 Usage: 1678 Examples: 1679 btc_connect_device 1680 """ 1681 cmd = "Connect to device under test." 1682 try: 1683 result = self.pri_dut.bts_lib.connectDevice( 1684 self.unique_mac_addr_id) 1685 self.log.info(result) 1686 except Exception as err: 1687 self.log.error(FAILURE.format(cmd, err)) 1688 1689 def complete_btc_connect_device_by_id(self, text, line, begidx, endidx): 1690 if not text: 1691 completions = list(self.bt_control_ids)[:] 1692 else: 1693 completions = [ 1694 s for s in self.bt_control_ids if s.startswith(text) 1695 ] 1696 return completions 1697 1698 def do_btc_connect_device_by_id(self, device_id): 1699 """ 1700 Description: Connect to device id based on pre-defined inputs. 1701 Supports Tab Autocomplete. 1702 Input(s): 1703 device_id: The device id to connect to. 1704 1705 Usage: 1706 Examples: 1707 btc_connect_device_by_id <device_id> 1708 """ 1709 cmd = "Connect to device id based on pre-defined inputs." 1710 try: 1711 result = self.pri_dut.bts_lib.connectDevice(device_id) 1712 self.log.info(result) 1713 except Exception as err: 1714 self.log.error(FAILURE.format(cmd, err)) 1715 1716 def complete_btc_connect_device_by_name(self, text, line, begidx, endidx): 1717 if not text: 1718 completions = list(self.bt_control_names)[:] 1719 else: 1720 completions = [ 1721 s for s in self.bt_control_names if s.startswith(text) 1722 ] 1723 return completions 1724 1725 def do_btc_connect_device_by_name(self, device_name): 1726 """ 1727 Description: Connect to device id based on pre-defined inputs. 1728 Supports Tab Autocomplete. 1729 Input(s): 1730 device_id: The device id to connect to. 1731 1732 Usage: 1733 Examples: 1734 btc_connect_device_by_name <device_id> 1735 """ 1736 cmd = "Connect to device name based on pre-defined inputs." 1737 try: 1738 for device in self.bt_control_devices: 1739 if device_name is device['name']: 1740 1741 result = self.pri_dut.bts_lib.connectDevice(device['id']) 1742 self.log.info(result) 1743 except Exception as err: 1744 self.log.error(FAILURE.format(cmd, err)) 1745 1746 def do_btc_disconnect_device(self, line): 1747 """ 1748 Description: Disconnect to device under test. 1749 Device under test is specified by either user params 1750 or 1751 tool_set_target_device_name <name> 1752 do_tool_refresh_unique_id_using_bt_control 1753 1754 Usage: 1755 Examples: 1756 btc_disconnect_device 1757 """ 1758 cmd = "Disconnect to device under test." 1759 try: 1760 result = self.pri_dut.bts_lib.disconnectDevice( 1761 self.unique_mac_addr_id) 1762 self.log.info(result) 1763 except Exception as err: 1764 self.log.error(FAILURE.format(cmd, err)) 1765 1766 def do_btc_init_bluetooth_control(self, line): 1767 """ 1768 Description: Initialize the Bluetooth Controller. 1769 1770 Usage: 1771 Examples: 1772 btc_init_bluetooth_control 1773 """ 1774 cmd = "Initialize the Bluetooth Controller." 1775 try: 1776 result = self.test_dut.initialize_bluetooth_controller() 1777 self.log.info(result) 1778 except Exception as err: 1779 self.log.error(FAILURE.format(cmd, err)) 1780 1781 def do_btc_get_local_address(self, line): 1782 """ 1783 Description: Get the local BR/EDR address of the Bluetooth Controller. 1784 1785 Usage: 1786 Examples: 1787 btc_get_local_address 1788 """ 1789 cmd = "Get the local BR/EDR address of the Bluetooth Controller." 1790 try: 1791 result = self.test_dut.get_local_bluetooth_address() 1792 self.log.info(result) 1793 except Exception as err: 1794 self.log.error(FAILURE.format(cmd, err)) 1795 1796 def do_btc_input_pairing_pin(self, line): 1797 """ 1798 Description: Sends a pairing pin to SL4F's Bluetooth Control's 1799 Pairing Delegate. 1800 1801 Usage: 1802 Examples: 1803 btc_input_pairing_pin 123456 1804 """ 1805 cmd = "Input pairing pin to the Fuchsia device." 1806 try: 1807 result = self.pri_dut.bts_lib.inputPairingPin(line)['result'] 1808 self.log.info(result) 1809 except Exception as err: 1810 self.log.error(FAILURE.format(cmd, err)) 1811 1812 def do_btc_get_pairing_pin(self, line): 1813 """ 1814 Description: Gets the pairing pin from SL4F's Bluetooth Control's 1815 Pairing Delegate. 1816 1817 Usage: 1818 Examples: 1819 btc_get_pairing_pin 1820 """ 1821 cmd = "Get the pairing pin from the Fuchsia device." 1822 try: 1823 result = self.pri_dut.bts_lib.getPairingPin()['result'] 1824 self.log.info(result) 1825 except Exception as err: 1826 self.log.error(FAILURE.format(cmd, err)) 1827 1828 """End Bluetooth Control wrappers""" 1829 """Begin Profile Server wrappers""" 1830 1831 def do_sdp_pts_example(self, num_of_records): 1832 """ 1833 Description: An example of how to setup a generic SDP record 1834 and SDP search capabilities. This example will pass a few 1835 SDP tests. 1836 1837 Input(s): 1838 num_of_records: The number of records to add. 1839 1840 Usage: 1841 Examples: 1842 sdp_pts_example 1 1843 sdp pts_example 10 1844 """ 1845 cmd = "Setup SDP for PTS testing." 1846 1847 attributes = [ 1848 bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'], 1849 bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'], 1850 bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'], 1851 bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'], 1852 ] 1853 1854 try: 1855 self.pri_dut.sdp_lib.addSearch( 1856 attributes, int(sig_uuid_constants['AudioSource'], 16)) 1857 self.pri_dut.sdp_lib.addSearch( 1858 attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16)) 1859 self.pri_dut.sdp_lib.addSearch(attributes, 1860 int(sig_uuid_constants['PANU'], 16)) 1861 self.pri_dut.sdp_lib.addSearch( 1862 attributes, int(sig_uuid_constants['SerialPort'], 16)) 1863 self.pri_dut.sdp_lib.addSearch( 1864 attributes, int(sig_uuid_constants['DialupNetworking'], 16)) 1865 self.pri_dut.sdp_lib.addSearch( 1866 attributes, int(sig_uuid_constants['OBEXObjectPush'], 16)) 1867 self.pri_dut.sdp_lib.addSearch( 1868 attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16)) 1869 self.pri_dut.sdp_lib.addSearch( 1870 attributes, int(sig_uuid_constants['Headset'], 16)) 1871 self.pri_dut.sdp_lib.addSearch( 1872 attributes, int(sig_uuid_constants['HandsfreeAudioGateway'], 1873 16)) 1874 self.pri_dut.sdp_lib.addSearch( 1875 attributes, int(sig_uuid_constants['Handsfree'], 16)) 1876 self.pri_dut.sdp_lib.addSearch( 1877 attributes, int(sig_uuid_constants['SIM_Access'], 16)) 1878 for i in range(int(num_of_records)): 1879 result = self.pri_dut.sdp_lib.addService( 1880 sdp_pts_record_list[i]) 1881 self.log.info(result) 1882 except Exception as err: 1883 self.log.error(FAILURE.format(cmd, err)) 1884 1885 def do_sdp_cleanup(self, line): 1886 """ 1887 Description: Cleanup any existing SDP records 1888 1889 Usage: 1890 Examples: 1891 sdp_cleanup 1892 """ 1893 cmd = "Cleanup SDP objects." 1894 try: 1895 result = self.pri_dut.sdp_lib.cleanUp() 1896 self.log.info(result) 1897 except Exception as err: 1898 self.log.error(FAILURE.format(cmd, err)) 1899 1900 def do_sdp_init(self, line): 1901 """ 1902 Description: Init the profile proxy for setting up SDP records 1903 1904 Usage: 1905 Examples: 1906 sdp_init 1907 """ 1908 cmd = "Initialize profile proxy objects for adding SDP records" 1909 try: 1910 result = self.pri_dut.sdp_lib.init() 1911 self.log.info(result) 1912 except Exception as err: 1913 self.log.error(FAILURE.format(cmd, err)) 1914 1915 def do_sdp_connect_l2cap(self, line): 1916 """ 1917 Description: Send an l2cap connection request over an input psm value. 1918 1919 Note: Must be already connected to a peer. 1920 1921 Input(s): 1922 psm: The int hex value to connect over. Available PSMs: 1923 SDP 0x0001 See Bluetooth Service Discovery Protocol (SDP) 1924 RFCOMM 0x0003 See RFCOMM with TS 07.10 1925 TCS-BIN 0x0005 See Bluetooth Telephony Control Specification / 1926 TCS Binary 1927 TCS-BIN-CORDLESS 0x0007 See Bluetooth Telephony Control 1928 Specification / TCS Binary 1929 BNEP 0x000F See Bluetooth Network Encapsulation Protocol 1930 HID_Control 0x0011 See Human Interface Device 1931 HID_Interrupt 0x0013 See Human Interface Device 1932 UPnP 0x0015 See [ESDP] 1933 AVCTP 0x0017 See Audio/Video Control Transport Protocol 1934 AVDTP 0x0019 See Audio/Video Distribution Transport Protocol 1935 AVCTP_Browsing 0x001B See Audio/Video Remote Control Profile 1936 UDI_C-Plane 0x001D See the Unrestricted Digital Information 1937 Profile [UDI] 1938 ATT 0x001F See Bluetooth Core Specification 1939 3DSP 0x0021 See 3D Synchronization Profile. 1940 LE_PSM_IPSP 0x0023 See Internet Protocol Support Profile 1941 (IPSP) 1942 OTS 0x0025 See Object Transfer Service (OTS) 1943 EATT 0x0027 See Bluetooth Core Specification 1944 mode: String - The channel mode to connect to. Available values: 1945 Basic mode: BASIC 1946 Enhanced Retransmission mode: ERTM 1947 1948 Usage: 1949 Examples: 1950 sdp_connect_l2cap 0001 BASIC 1951 sdp_connect_l2cap 0019 ERTM 1952 """ 1953 cmd = "Connect l2cap" 1954 try: 1955 info = line.split() 1956 result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id, 1957 int(info[0], 16), 1958 info[1]) 1959 self.log.info(result) 1960 except Exception as err: 1961 self.log.error(FAILURE.format(cmd, err)) 1962 1963 """End Profile Server wrappers""" 1964 """Begin AVDTP wrappers""" 1965 1966 def do_avdtp_init(self, initiator_delay): 1967 """ 1968 Description: Init the A2DP component start and AVDTP service to 1969 initiate. 1970 1971 Input(s): 1972 initiator_delay: [Optional] The stream initiator delay to set in 1973 milliseconds. 1974 1975 Usage: 1976 Examples: 1977 avdtp_init 0 1978 avdtp_init 2000 1979 avdtp_init 1980 """ 1981 cmd = "Initialize AVDTP proxy" 1982 try: 1983 if not initiator_delay: 1984 initiator_delay = None 1985 result = self.pri_dut.avdtp_lib.init(initiator_delay) 1986 self.log.info(result) 1987 except Exception as err: 1988 self.log.error(FAILURE.format(cmd, err)) 1989 1990 def do_avdtp_kill_a2dp(self, line): 1991 """ 1992 Description: Quickly kill any A2DP components. 1993 1994 Usage: 1995 Examples: 1996 avdtp_kill_a2dp 1997 """ 1998 cmd = "Kill A2DP service" 1999 try: 2000 result = self.pri_dut.control_daemon("bt-a2dp.cmx", "stop") 2001 self.log.info(result) 2002 except Exception as err: 2003 self.log.error(FAILURE.format(cmd, err)) 2004 2005 def do_avdtp_get_connected_peers(self, line): 2006 """ 2007 Description: Get the connected peers for the AVDTP service 2008 2009 Usage: 2010 Examples: 2011 avdtp_get_connected_peers 2012 """ 2013 cmd = "AVDTP get connected peers" 2014 try: 2015 result = self.pri_dut.avdtp_lib.getConnectedPeers() 2016 self.log.info(result) 2017 except Exception as err: 2018 self.log.error(FAILURE.format(cmd, err)) 2019 2020 def do_avdtp_set_configuration(self, peer_id): 2021 """ 2022 Description: Send AVDTP command to connected peer: set configuration 2023 2024 Input(s): 2025 peer_id: The specified peer_id. 2026 2027 Usage: 2028 Examples: 2029 avdtp_set_configuration <peer_id> 2030 """ 2031 cmd = "Send AVDTP set configuration to connected peer" 2032 try: 2033 result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id)) 2034 self.log.info(result) 2035 except Exception as err: 2036 self.log.error(FAILURE.format(cmd, err)) 2037 2038 def do_avdtp_get_configuration(self, peer_id): 2039 """ 2040 Description: Send AVDTP command to connected peer: get configuration 2041 2042 Input(s): 2043 peer_id: The specified peer_id. 2044 2045 Usage: 2046 Examples: 2047 avdtp_get_configuration <peer_id> 2048 """ 2049 cmd = "Send AVDTP get configuration to connected peer" 2050 try: 2051 result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id)) 2052 self.log.info(result) 2053 except Exception as err: 2054 self.log.error(FAILURE.format(cmd, err)) 2055 2056 def do_avdtp_get_capabilities(self, peer_id): 2057 """ 2058 Description: Send AVDTP command to connected peer: get capabilities 2059 2060 Input(s): 2061 peer_id: The specified peer_id. 2062 2063 Usage: 2064 Examples: 2065 avdtp_get_capabilities <peer_id> 2066 """ 2067 cmd = "Send AVDTP get capabilities to connected peer" 2068 try: 2069 result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id)) 2070 self.log.info(result) 2071 except Exception as err: 2072 self.log.error(FAILURE.format(cmd, err)) 2073 2074 def do_avdtp_get_all_capabilities(self, peer_id): 2075 """ 2076 Description: Send AVDTP command to connected peer: get all capabilities 2077 2078 Input(s): 2079 peer_id: The specified peer_id. 2080 2081 Usage: 2082 Examples: 2083 avdtp_get_all_capabilities <peer_id> 2084 """ 2085 cmd = "Send AVDTP get all capabilities to connected peer" 2086 try: 2087 result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id)) 2088 self.log.info(result) 2089 except Exception as err: 2090 self.log.error(FAILURE.format(cmd, err)) 2091 2092 def do_avdtp_reconfigure_stream(self, peer_id): 2093 """ 2094 Description: Send AVDTP command to connected peer: reconfigure stream 2095 2096 Input(s): 2097 peer_id: The specified peer_id. 2098 2099 Usage: 2100 Examples: 2101 avdtp_reconfigure_stream <peer_id> 2102 """ 2103 cmd = "Send AVDTP reconfigure stream to connected peer" 2104 try: 2105 result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id)) 2106 self.log.info(result) 2107 except Exception as err: 2108 self.log.error(FAILURE.format(cmd, err)) 2109 2110 def do_avdtp_suspend_stream(self, peer_id): 2111 """ 2112 Description: Send AVDTP command to connected peer: suspend stream 2113 2114 Input(s): 2115 peer_id: The specified peer_id. 2116 2117 Usage: 2118 Examples: 2119 avdtp_suspend_stream <peer_id> 2120 """ 2121 cmd = "Send AVDTP suspend stream to connected peer" 2122 try: 2123 result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id)) 2124 self.log.info(result) 2125 except Exception as err: 2126 self.log.error(FAILURE.format(cmd, err)) 2127 2128 def do_avdtp_suspend_reconfigure(self, peer_id): 2129 """ 2130 Description: Send AVDTP command to connected peer: suspend reconfigure 2131 2132 Input(s): 2133 peer_id: The specified peer_id. 2134 2135 Usage: 2136 Examples: 2137 avdtp_suspend_reconfigure <peer_id> 2138 """ 2139 cmd = "Send AVDTP suspend reconfigure to connected peer" 2140 try: 2141 result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id)) 2142 self.log.info(result) 2143 except Exception as err: 2144 self.log.error(FAILURE.format(cmd, err)) 2145 2146 def do_avdtp_release_stream(self, peer_id): 2147 """ 2148 Description: Send AVDTP command to connected peer: release stream 2149 2150 Input(s): 2151 peer_id: The specified peer_id. 2152 2153 Usage: 2154 Examples: 2155 avdtp_release_stream <peer_id> 2156 """ 2157 cmd = "Send AVDTP release stream to connected peer" 2158 try: 2159 result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id)) 2160 self.log.info(result) 2161 except Exception as err: 2162 self.log.error(FAILURE.format(cmd, err)) 2163 2164 def do_avdtp_establish_stream(self, peer_id): 2165 """ 2166 Description: Send AVDTP command to connected peer: establish stream 2167 2168 Input(s): 2169 peer_id: The specified peer_id. 2170 2171 Usage: 2172 Examples: 2173 avdtp_establish_stream <peer_id> 2174 """ 2175 cmd = "Send AVDTP establish stream to connected peer" 2176 try: 2177 result = self.pri_dut.avdtp_lib.establishStream(int(peer_id)) 2178 self.log.info(result) 2179 except Exception as err: 2180 self.log.error(FAILURE.format(cmd, err)) 2181 2182 def do_avdtp_start_stream(self, peer_id): 2183 """ 2184 Description: Send AVDTP command to connected peer: start stream 2185 2186 Input(s): 2187 peer_id: The specified peer_id. 2188 2189 Usage: 2190 Examples: 2191 avdtp_start_stream <peer_id> 2192 """ 2193 cmd = "Send AVDTP start stream to connected peer" 2194 try: 2195 result = self.pri_dut.avdtp_lib.startStream(int(peer_id)) 2196 self.log.info(result) 2197 except Exception as err: 2198 self.log.error(FAILURE.format(cmd, err)) 2199 2200 def do_avdtp_abort_stream(self, peer_id): 2201 """ 2202 Description: Send AVDTP command to connected peer: abort stream 2203 2204 Input(s): 2205 peer_id: The specified peer_id. 2206 2207 Usage: 2208 Examples: 2209 avdtp_abort_stream <peer_id> 2210 """ 2211 cmd = "Send AVDTP abort stream to connected peer" 2212 try: 2213 result = self.pri_dut.avdtp_lib.abortStream(int(peer_id)) 2214 self.log.info(result) 2215 except Exception as err: 2216 self.log.error(FAILURE.format(cmd, err)) 2217 2218 def do_avdtp_remove_service(self, line): 2219 """ 2220 Description: Removes the AVDTP service in use. 2221 2222 Usage: 2223 Examples: 2224 avdtp_establish_stream <peer_id> 2225 """ 2226 cmd = "Remove AVDTP service" 2227 try: 2228 result = self.pri_dut.avdtp_lib.removeService() 2229 self.log.info(result) 2230 except Exception as err: 2231 self.log.error(FAILURE.format(cmd, err)) 2232 2233 """End AVDTP wrappers""" 2234 """Begin Audio wrappers""" 2235 2236 def do_audio_start_output_save(self, line): 2237 """ 2238 Description: Start audio output save 2239 2240 Usage: 2241 Examples: 2242 audio_start_output_save 2243 """ 2244 cmd = "Start audio capture" 2245 try: 2246 result = self.pri_dut.audio_lib.startOutputSave() 2247 self.log.info(result) 2248 except Exception as err: 2249 self.log.error(FAILURE.format(cmd, err)) 2250 2251 def do_audio_stop_output_save(self, line): 2252 """ 2253 Description: Stop audio output save 2254 2255 Usage: 2256 Examples: 2257 audio_stop_output_save 2258 """ 2259 cmd = "Stop audio capture" 2260 try: 2261 result = self.pri_dut.audio_lib.stopOutputSave() 2262 self.log.info(result) 2263 except Exception as err: 2264 self.log.error(FAILURE.format(cmd, err)) 2265 2266 def do_audio_get_output_audio(self, line): 2267 """ 2268 Description: Get the audio output saved to a local file 2269 2270 Usage: 2271 Examples: 2272 audio_get_output_audio 2273 """ 2274 cmd = "Get audio capture" 2275 try: 2276 save_path = "{}/{}".format(self.pri_dut.log_path, "audio.raw") 2277 result = self.pri_dut.audio_lib.getOutputAudio(save_path) 2278 except Exception as err: 2279 self.log.error(FAILURE.format(cmd, err)) 2280 2281 def do_audio_5_min_test(self, line): 2282 """ 2283 Description: Capture and anlyize sine audio waves played from a Bluetooth A2DP 2284 Source device. 2285 2286 Pre steps: 2287 1. Pair A2DP source device 2288 2. Prepare generated SOX file over preferred codec on source device. 2289 Quick way to generate necessary audio files: 2290 sudo apt-get install sox 2291 sox -b 16 -r 48000 -c 2 -n audio_file_2k1k_5_min.wav synth 300 sine 2000 sine 3000 2292 2293 Usage: 2294 Examples: 2295 audio_5_min_test 2296 """ 2297 cmd = "5 min audio capture test" 2298 input("Press Enter once Source device is streaming audio file") 2299 try: 2300 result = self.pri_dut.audio_lib.startOutputSave() 2301 self.log.info(result) 2302 for i in range(5): 2303 print("Minutes left: {}".format(10 - i)) 2304 time.sleep(60) 2305 result = self.pri_dut.audio_lib.stopOutputSave() 2306 log_time = int(time.time()) 2307 save_path = "{}/{}".format(self.pri_dut.log_path, 2308 "{}_audio.raw".format(log_time)) 2309 analysis_path = "{}/{}".format( 2310 self.pri_dut.log_path, 2311 "{}_audio_analysis.txt".format(log_time)) 2312 result = self.pri_dut.audio_lib.getOutputAudio(save_path) 2313 2314 channels = 1 2315 try: 2316 quality_analysis(filename=save_path, 2317 output_file=analysis_path, 2318 bit_width=audio_bits_per_sample_32, 2319 rate=audio_sample_rate_48000, 2320 channel=channels, 2321 spectral_only=False) 2322 2323 except Exception as err: 2324 self.log.error("Failed to analyze raw audio: {}".format(err)) 2325 return False 2326 2327 self.log.info("Analysis output here: {}".format(analysis_path)) 2328 self.log.info("Analysis Results: {}".format( 2329 open(analysis_path).readlines())) 2330 except Exception as err: 2331 self.log.error(FAILURE.format(cmd, err)) 2332 2333 """End Audio wrappers""" 2334 """Begin HFP wrappers""" 2335 2336 def do_hfp_init(self, line): 2337 """ 2338 Description: Init the HFP component initiate. 2339 2340 Usage: 2341 Examples: 2342 hfp_init 2343 """ 2344 cmd = "Initialize HFP proxy" 2345 try: 2346 result = self.pri_dut.hfp_lib.init() 2347 self.log.info(result) 2348 except Exception as err: 2349 self.log.error(FAILURE.format(cmd, err)) 2350 2351 def do_hfp_remove_service(self, line): 2352 """ 2353 Description: Removes the HFP service in use. 2354 2355 Usage: 2356 Examples: 2357 hfp_remove_service 2358 """ 2359 cmd = "Remove HFP service" 2360 try: 2361 result = self.pri_dut.hfp_lib.removeService() 2362 self.log.info(result) 2363 except Exception as err: 2364 self.log.error(FAILURE.format(cmd, err)) 2365 2366 def do_hfp_list_peers(self, line): 2367 """ 2368 Description: List all HFP Hands-Free peers connected to the DUT. 2369 2370 Input(s): 2371 2372 Usage: 2373 Examples: 2374 hfp_list_peers 2375 """ 2376 cmd = "Lists connected peers" 2377 try: 2378 result = self.pri_dut.hfp_lib.listPeers() 2379 self.log.info(pprint.pformat(result)) 2380 except Exception as err: 2381 self.log.error(FAILURE.format(cmd, err)) 2382 2383 def do_hfp_set_active_peer(self, line): 2384 """ 2385 Description: Set the active HFP Hands-Free peer for the DUT. 2386 2387 Input(s): 2388 peer_id: The id of the peer to be set active. 2389 2390 Usage: 2391 Examples: 2392 hfp_set_active_peer <peer_id> 2393 """ 2394 cmd = "Set the active peer" 2395 try: 2396 peer_id = int(line.strip()) 2397 result = self.pri_dut.hfp_lib.setActivePeer(peer_id) 2398 self.log.info(result) 2399 except Exception as err: 2400 self.log.error(FAILURE.format(cmd, err)) 2401 2402 def do_hfp_list_calls(self, line): 2403 """ 2404 Description: List all calls known to the sl4f component on the DUT. 2405 2406 Input(s): 2407 2408 Usage: 2409 Examples: 2410 hfp_list_calls 2411 """ 2412 cmd = "Lists all calls" 2413 try: 2414 result = self.pri_dut.hfp_lib.listCalls() 2415 self.log.info(pprint.pformat(result)) 2416 except Exception as err: 2417 self.log.error(FAILURE.format(cmd, err)) 2418 2419 def do_hfp_new_call(self, line): 2420 """ 2421 Description: Simulate a call on the call manager 2422 2423 Input(s): 2424 remote: The number of the remote party on the simulated call 2425 state: The state of the call. Must be one of "ringing", "waiting", 2426 "dialing", "alerting", "active", "held". 2427 direction: The direction of the call. Must be one of "incoming", "outgoing". 2428 2429 Usage: 2430 Examples: 2431 hfp_new_call <remote> <state> <direction> 2432 hfp_new_call 14085555555 active incoming 2433 hfp_new_call 14085555555 held outgoing 2434 hfp_new_call 14085555555 ringing incoming 2435 hfp_new_call 14085555555 waiting incoming 2436 hfp_new_call 14085555555 alerting outgoing 2437 hfp_new_call 14085555555 dialing outgoing 2438 """ 2439 cmd = "Simulates a call" 2440 try: 2441 info = line.strip().split() 2442 if len(info) != 3: 2443 raise ValueError( 2444 "Exactly three command line arguments required: <remote> <state> <direction>" 2445 ) 2446 remote, state, direction = info[0], info[1], info[2] 2447 result = self.pri_dut.hfp_lib.newCall(remote, state, direction) 2448 self.log.info(result) 2449 except Exception as err: 2450 self.log.error(FAILURE.format(cmd, err)) 2451 2452 def do_hfp_incoming_call(self, line): 2453 """ 2454 Description: Simulate an incoming call on the call manager 2455 2456 Input(s): 2457 remote: The number of the remote party on the incoming call 2458 2459 Usage: 2460 Examples: 2461 hfp_incoming_call <remote> 2462 hfp_incoming_call 14085555555 2463 """ 2464 cmd = "Simulates an incoming call" 2465 try: 2466 remote = line.strip() 2467 result = self.pri_dut.hfp_lib.initiateIncomingCall(remote) 2468 self.log.info(result) 2469 except Exception as err: 2470 self.log.error(FAILURE.format(cmd, err)) 2471 2472 def do_hfp_waiting_call(self, line): 2473 """ 2474 Description: Simulate an incoming call on the call manager when there is 2475 an onging active call already. 2476 2477 Input(s): 2478 remote: The number of the remote party on the incoming call 2479 2480 Usage: 2481 Examples: 2482 hfp_waiting_call <remote> 2483 hfp_waiting_call 14085555555 2484 """ 2485 cmd = "Simulates an incoming call" 2486 try: 2487 remote = line.strip() 2488 result = self.pri_dut.hfp_lib.initiateIncomingWaitingCall(remote) 2489 self.log.info(result) 2490 except Exception as err: 2491 self.log.error(FAILURE.format(cmd, err)) 2492 2493 def do_hfp_outgoing_call(self, line): 2494 """ 2495 Description: Simulate an outgoing call on the call manager 2496 2497 Input(s): 2498 remote: The number of the remote party on the outgoing call 2499 2500 Usage: 2501 Examples: 2502 hfp_outgoing_call <remote> 2503 """ 2504 cmd = "Simulates an outgoing call" 2505 try: 2506 remote = line.strip() 2507 result = self.pri_dut.hfp_lib.initiateOutgoingCall(remote) 2508 self.log.info(result) 2509 except Exception as err: 2510 self.log.error(FAILURE.format(cmd, err)) 2511 2512 def do_hfp_set_call_active(self, line): 2513 """ 2514 Description: Set the specified call to the "OngoingActive" state. 2515 2516 Input(s): 2517 call_id: The unique id of the call. 2518 2519 Usage: 2520 Examples: 2521 hfp_outgoing_call <call_id> 2522 """ 2523 cmd = "Set the specified call to active" 2524 try: 2525 call_id = int(line.strip()) 2526 result = self.pri_dut.hfp_lib.setCallActive(call_id) 2527 self.log.info(result) 2528 except Exception as err: 2529 self.log.error(FAILURE.format(cmd, err)) 2530 2531 def do_hfp_set_call_held(self, line): 2532 """ 2533 Description: Set the specified call to the "OngoingHeld" state. 2534 2535 Input(s): 2536 call_id: The unique id of the call. 2537 2538 Usage: 2539 Examples: 2540 hfp_outgoing_call <call_id> 2541 """ 2542 cmd = "Set the specified call to held" 2543 try: 2544 call_id = int(line.strip()) 2545 result = self.pri_dut.hfp_lib.setCallHeld(call_id) 2546 self.log.info(result) 2547 except Exception as err: 2548 self.log.error(FAILURE.format(cmd, err)) 2549 2550 def do_hfp_set_call_terminated(self, line): 2551 """ 2552 Description: Set the specified call to the "Terminated" state. 2553 2554 Input(s): 2555 call_id: The unique id of the call. 2556 2557 Usage: 2558 Examples: 2559 hfp_outgoing_call <call_id> 2560 """ 2561 cmd = "Set the specified call to terminated" 2562 try: 2563 call_id = int(line.strip()) 2564 result = self.pri_dut.hfp_lib.setCallTerminated(call_id) 2565 self.log.info(result) 2566 except Exception as err: 2567 self.log.error(FAILURE.format(cmd, err)) 2568 2569 def do_hfp_set_call_transferred_to_ag(self, line): 2570 """ 2571 Description: Set the specified call to the "TransferredToAg" state. 2572 2573 Input(s): 2574 call_id: The unique id of the call. 2575 2576 Usage: 2577 Examples: 2578 hfp_outgoing_call <call_id> 2579 """ 2580 cmd = "Set the specified call to TransferredToAg" 2581 try: 2582 call_id = int(line.strip()) 2583 result = self.pri_dut.hfp_lib.setCallTransferredToAg(call_id) 2584 self.log.info(result) 2585 except Exception as err: 2586 self.log.error(FAILURE.format(cmd, err)) 2587 2588 def do_hfp_set_speaker_gain(self, line): 2589 """ 2590 Description: Set the active peer's speaker gain. 2591 2592 Input(s): 2593 value: The gain value to set. Must be between 0-15 inclusive. 2594 2595 Usage: 2596 Examples: 2597 hfp_set_speaker_gain <value> 2598 """ 2599 cmd = "Set the active peer's speaker gain" 2600 try: 2601 value = int(line.strip()) 2602 result = self.pri_dut.hfp_lib.setSpeakerGain(value) 2603 self.log.info(result) 2604 except Exception as err: 2605 self.log.error(FAILURE.format(cmd, err)) 2606 2607 def do_hfp_set_microphone_gain(self, line): 2608 """ 2609 Description: Set the active peer's microphone gain. 2610 2611 Input(s): 2612 value: The gain value to set. Must be between 0-15 inclusive. 2613 2614 Usage: 2615 Examples: 2616 hfp_set_microphone_gain <value> 2617 """ 2618 cmd = "Set the active peer's microphone gain" 2619 try: 2620 value = int(line.strip()) 2621 result = self.pri_dut.hfp_lib.setMicrophoneGain(value) 2622 self.log.info(result) 2623 except Exception as err: 2624 self.log.error(FAILURE.format(cmd, err)) 2625 2626 def do_hfp_set_service_available(self, line): 2627 """ 2628 Description: Sets the simulated network service status reported by the call manager. 2629 2630 Input(s): 2631 value: "true" to set the network connection to available. 2632 2633 Usage: 2634 Examples: 2635 hfp_set_service_available <value> 2636 hfp_set_service_available true 2637 hfp_set_service_available false 2638 """ 2639 cmd = "Sets the simulated network service status reported by the call manager" 2640 try: 2641 value = line.strip() == "true" 2642 result = self.pri_dut.hfp_lib.setServiceAvailable(value) 2643 self.log.info(result) 2644 except Exception as err: 2645 self.log.error(FAILURE.format(cmd, err)) 2646 2647 def do_hfp_set_roaming(self, line): 2648 """ 2649 Description: Sets the simulated roaming status reported by the call manager. 2650 2651 Input(s): 2652 value: "true" to set the network connection to roaming. 2653 2654 Usage: 2655 Examples: 2656 hfp_set_roaming <value> 2657 hfp_set_roaming true 2658 hfp_set_roaming false 2659 """ 2660 cmd = "Sets the simulated roaming status reported by the call manager" 2661 try: 2662 value = line.strip() == "true" 2663 result = self.pri_dut.hfp_lib.setRoaming(value) 2664 self.log.info(result) 2665 except Exception as err: 2666 self.log.error(FAILURE.format(cmd, err)) 2667 2668 def do_hfp_set_signal_strength(self, line): 2669 """ 2670 Description: Sets the simulated signal strength reported by the call manager. 2671 2672 Input(s): 2673 value: The signal strength value to set. Must be between 0-5 inclusive. 2674 2675 Usage: 2676 Examples: 2677 hfp_set_signal_strength <value> 2678 hfp_set_signal_strength 0 2679 hfp_set_signal_strength 3 2680 hfp_set_signal_strength 5 2681 """ 2682 cmd = "Sets the simulated signal strength reported by the call manager" 2683 try: 2684 value = int(line.strip()) 2685 result = self.pri_dut.hfp_lib.setSignalStrength(value) 2686 self.log.info(result) 2687 except Exception as err: 2688 self.log.error(FAILURE.format(cmd, err)) 2689 2690 def do_hfp_set_subscriber_number(self, line): 2691 """ 2692 Description: Sets the subscriber number reported by the call manager. 2693 2694 Input(s): 2695 value: The subscriber number to set. Maximum length 128 characters. 2696 2697 Usage: 2698 Examples: 2699 hfp_set_subscriber_number <value> 2700 hfp_set_subscriber_number 14085555555 2701 """ 2702 cmd = "Sets the subscriber number reported by the call manager" 2703 try: 2704 value = line.strip() 2705 result = self.pri_dut.hfp_lib.setSubscriberNumber(value) 2706 self.log.info(result) 2707 except Exception as err: 2708 self.log.error(FAILURE.format(cmd, err)) 2709 2710 def do_hfp_set_operator(self, line): 2711 """ 2712 Description: Sets the operator value reported by the call manager. 2713 2714 Input(s): 2715 value: The operator value to set. Maximum length 16 characters. 2716 2717 Usage: 2718 Examples: 2719 hfp_set_operator <value> 2720 hfp_set_operator GoogleFi 2721 """ 2722 cmd = "Sets the operator value reported by the call manager" 2723 try: 2724 value = line.strip() 2725 result = self.pri_dut.hfp_lib.setOperator(value) 2726 self.log.info(result) 2727 except Exception as err: 2728 self.log.error(FAILURE.format(cmd, err)) 2729 2730 def do_hfp_set_nrec_support(self, line): 2731 """ 2732 Description: Sets the noise reduction/echo cancelation support reported by the call manager. 2733 2734 Input(s): 2735 value: The nrec support bool. 2736 2737 Usage: 2738 Examples: 2739 hfp_set_nrec_support <value> 2740 hfp_set_nrec_support true 2741 hfp_set_nrec_support false 2742 """ 2743 cmd = "Sets the noise reduction/echo cancelation support reported by the call manager" 2744 try: 2745 value = line.strip() == "true" 2746 result = self.pri_dut.hfp_lib.setNrecSupport(value) 2747 self.log.info(result) 2748 except Exception as err: 2749 self.log.error(FAILURE.format(cmd, err)) 2750 2751 def do_hfp_set_battery_level(self, line): 2752 """ 2753 Description: Sets the battery level reported by the call manager. 2754 2755 Input(s): 2756 value: The integer battery level value. Must be 0-5 inclusive. 2757 2758 Usage: 2759 Examples: 2760 hfp_set_battery_level <value> 2761 hfp_set_battery_level 0 2762 hfp_set_battery_level 3 2763 """ 2764 cmd = "Set the battery level reported by the call manager" 2765 try: 2766 value = int(line.strip()) 2767 result = self.pri_dut.hfp_lib.setBatteryLevel(value) 2768 self.log.info(result) 2769 except Exception as err: 2770 self.log.error(FAILURE.format(cmd, err)) 2771 2772 def do_hfp_set_last_dialed(self, line): 2773 """ 2774 Description: Sets the last dialed number in the call manager. 2775 2776 Input(s): 2777 number: The number of the remote party. 2778 2779 Usage: 2780 Examples: 2781 hfp_set_last_dialed <number> 2782 hfp_set_last_dialed 14085555555 2783 """ 2784 cmd = "Sets the last dialed number in the call manager." 2785 try: 2786 number = line.strip() 2787 result = self.pri_dut.hfp_lib.setLastDialed(number) 2788 self.log.info(result) 2789 except Exception as err: 2790 self.log.error(FAILURE.format(cmd, err)) 2791 2792 def do_hfp_clear_last_dialed(self, line): 2793 """ 2794 Description: Clears the last dialed number in the call manager. 2795 2796 Usage: 2797 Examples: 2798 hfp_clear_last_dialed 2799 """ 2800 cmd = "Clears the last dialed number in the call manager." 2801 try: 2802 result = self.pri_dut.hfp_lib.clearLastDialed() 2803 self.log.info(result) 2804 except Exception as err: 2805 self.log.error(FAILURE.format(cmd, err)) 2806 2807 def do_hfp_set_memory_location(self, line): 2808 """ 2809 Description: Sets a memory location to point to a remote number. 2810 2811 Input(s): 2812 location: The memory location at which to store the number. 2813 number: The number of the remote party to be stored. 2814 2815 Usage: 2816 Examples: 2817 hfp_set_memory_location <location> <number> 2818 hfp_set_memory_location 0 14085555555 2819 """ 2820 cmd = "Sets a memory location to point to a remote number." 2821 try: 2822 info = line.strip().split() 2823 if len(info) != 2: 2824 raise ValueError( 2825 "Exactly two command line arguments required: <location> <number>" 2826 ) 2827 location, number = info[0], info[1] 2828 result = self.pri_dut.hfp_lib.setMemoryLocation(location, number) 2829 self.log.info(result) 2830 except Exception as err: 2831 self.log.error(FAILURE.format(cmd, err)) 2832 2833 def do_hfp_clear_memory_location(self, line): 2834 """ 2835 Description: Sets a memory location to point to a remote number. 2836 2837 Input(s): 2838 localtion: The memory location to clear. 2839 2840 Usage: 2841 Examples: 2842 hfp_clear_memory_location <location> 2843 hfp_clear_memory_location 0 2844 """ 2845 cmd = "Sets a memory location to point to a remote number." 2846 try: 2847 location = line.strip() 2848 result = self.pri_dut.hfp_lib.clearMemoryLocation(location) 2849 self.log.info(result) 2850 except Exception as err: 2851 self.log.error(FAILURE.format(cmd, err)) 2852 2853 def do_hfp_set_dial_result(self, line): 2854 """ 2855 Description: Sets the status result to be returned when the number is dialed. 2856 2857 Input(s): 2858 number: The number of the remote party. 2859 status: The status to be returned when an outgoing call is initiated to the number. 2860 2861 Usage: 2862 Examples: 2863 hfp_set_battery_level <value> 2864 """ 2865 cmd = "Sets the status result to be returned when the number is dialed." 2866 try: 2867 info = line.strip().split() 2868 if len(info) != 2: 2869 raise ValueError( 2870 "Exactly two command line arguments required: <number> <status>" 2871 ) 2872 number, status = info[0], int(info[1]) 2873 result = self.pri_dut.hfp_lib.setDialResult(number, status) 2874 self.log.info(pprint.pformat(result)) 2875 except Exception as err: 2876 self.log.error(FAILURE.format(cmd, err)) 2877 2878 def do_hfp_get_state(self, line): 2879 """ 2880 Description: Get the call manager's complete state 2881 2882 Usage: 2883 Examples: 2884 hfp_get_state 2885 """ 2886 cmd = "Get the call manager's state" 2887 try: 2888 result = self.pri_dut.hfp_lib.getState() 2889 self.log.info(pprint.pformat(result)) 2890 except Exception as err: 2891 self.log.error(FAILURE.format(cmd, err)) 2892 2893 def do_hfp_set_connection_behavior(self, line): 2894 """ 2895 Description: Set the Service Level Connection (SLC) behavior when a new peer connects. 2896 2897 Input(s): 2898 autoconnect: Enable/Disable autoconnection of SLC. 2899 2900 Usage: 2901 Examples: 2902 hfp_set_connection_behavior <autoconnect> 2903 hfp_set_connection_behavior true 2904 hfp_set_connection_behavior false 2905 """ 2906 cmd = "Set the Service Level Connection (SLC) behavior" 2907 try: 2908 autoconnect = line.strip().lower() == "true" 2909 result = self.pri_dut.hfp_lib.setConnectionBehavior(autoconnect) 2910 self.log.info(result) 2911 except Exception as err: 2912 self.log.error(FAILURE.format(cmd, err)) 2913 2914 """End HFP wrappers""" 2915 """Begin RFCOMM wrappers""" 2916 2917 def do_rfcomm_init(self, line): 2918 """ 2919 Description: Initialize the RFCOMM component services. 2920 2921 Usage: 2922 Examples: 2923 rfcomm_init 2924 """ 2925 cmd = "Initialize RFCOMM proxy" 2926 try: 2927 result = self.pri_dut.rfcomm_lib.init() 2928 self.log.info(result) 2929 except Exception as err: 2930 self.log.error(FAILURE.format(cmd, err)) 2931 2932 def do_rfcomm_remove_service(self, line): 2933 """ 2934 Description: Removes the RFCOMM service in use. 2935 2936 Usage: 2937 Examples: 2938 rfcomm_remove_service 2939 """ 2940 cmd = "Remove RFCOMM service" 2941 try: 2942 result = self.pri_dut.rfcomm_lib.removeService() 2943 self.log.info(result) 2944 except Exception as err: 2945 self.log.error(FAILURE.format(cmd, err)) 2946 2947 def do_rfcomm_disconnect_session(self, line): 2948 """ 2949 Description: Closes the RFCOMM Session. 2950 2951 Usage: 2952 Examples: 2953 rfcomm_disconnect_session 2954 rfcomm_disconnect_session 2955 """ 2956 cmd = "Disconnect the RFCOMM Session" 2957 try: 2958 result = self.pri_dut.rfcomm_lib.disconnectSession( 2959 self.unique_mac_addr_id) 2960 self.log.info(result) 2961 except Exception as err: 2962 self.log.error(FAILURE.format(cmd, err)) 2963 2964 def do_rfcomm_connect_rfcomm_channel(self, line): 2965 """ 2966 Description: Make an outgoing RFCOMM connection. 2967 2968 Usage: 2969 Examples: 2970 rfcomm_connect_rfcomm_channel <server_channel_number> 2971 rfcomm_connect_rfcomm_channel 2 2972 """ 2973 cmd = "Make an outgoing RFCOMM connection" 2974 try: 2975 server_channel_number = int(line.strip()) 2976 result = self.pri_dut.rfcomm_lib.connectRfcommChannel( 2977 self.unique_mac_addr_id, server_channel_number) 2978 self.log.info(result) 2979 except Exception as err: 2980 self.log.error(FAILURE.format(cmd, err)) 2981 2982 def do_rfcomm_disconnect_rfcomm_channel(self, line): 2983 """ 2984 Description: Close the RFCOMM connection with the peer 2985 2986 Usage: 2987 Examples: 2988 rfcomm_disconnect_rfcomm_channel <server_channel_number> 2989 rfcomm_disconnect_rfcomm_channel 2 2990 """ 2991 cmd = "Close the RFCOMM channel" 2992 try: 2993 server_channel_number = int(line.strip()) 2994 result = self.pri_dut.rfcomm_lib.disconnectRfcommChannel( 2995 self.unique_mac_addr_id, server_channel_number) 2996 self.log.info(result) 2997 except Exception as err: 2998 self.log.error(FAILURE.format(cmd, err)) 2999 3000 def do_rfcomm_send_remote_line_status(self, line): 3001 """ 3002 Description: Send a remote line status for the RFCOMM channel. 3003 3004 Usage: 3005 Examples: 3006 rfcomm_send_remote_line_status <server_channel_number> 3007 rfcomm_send_remote_line_status 2 3008 """ 3009 cmd = "Send a remote line status update for the RFCOMM channel" 3010 try: 3011 server_channel_number = int(line.strip()) 3012 result = self.pri_dut.rfcomm_lib.sendRemoteLineStatus( 3013 self.unique_mac_addr_id, server_channel_number) 3014 self.log.info(result) 3015 except Exception as err: 3016 self.log.error(FAILURE.format(cmd, err)) 3017 3018 def do_rfcomm_write_rfcomm(self, line): 3019 """ 3020 Description: Send data over the RFCOMM channel. 3021 3022 Usage: 3023 Examples: 3024 rfcomm_write_rfcomm <server_channel_number> <data> 3025 rfcomm_write_rfcomm 2 foobar 3026 """ 3027 cmd = "Send data using the RFCOMM channel" 3028 try: 3029 info = line.strip().split() 3030 if len(info) != 2: 3031 raise ValueError( 3032 "Exactly two command line arguments required: <server_channel_number> <data>" 3033 ) 3034 server_channel_number = int(info[0]) 3035 data = info[1] 3036 result = self.pri_dut.rfcomm_lib.writeRfcomm( 3037 self.unique_mac_addr_id, server_channel_number, data) 3038 self.log.info(result) 3039 except Exception as err: 3040 self.log.error(FAILURE.format(cmd, err)) 3041 3042 """End RFCOMM wrappers""" 3043