1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Python script for wrappers to various libraries. 18 19Class CmdInput inherts from the cmd library. 20 21Functions that start with "do_" have a method 22signature that doesn't match the actual command 23line command and that is intended. This is so the 24"help" command knows what to display (in this case 25the documentation of the command itself). 26 27For example: 28Looking at the function "do_tool_set_target_device_name" 29has the inputs self and line which is expected of this type 30of method signature. When the "help" command is done on the 31method name you get the function documentation as such: 32 33(Cmd) help tool_set_target_device_name 34 35 Description: Reset the target device name. 36 Input(s): 37 device_name: Required. The advertising name to connect to. 38 Usage: tool_set_target_device_name new_target_device name 39 Examples: 40 tool_set_target_device_name le_watch 41 42This is all to say this documentation pattern is expected. 43 44""" 45 46from acts_contrib.test_utils.abstract_devices.bluetooth_device import create_bluetooth_device 47from acts_contrib.test_utils.bt.bt_constants import bt_attribute_values 48from acts_contrib.test_utils.bt.bt_constants import sig_appearance_constants 49from acts_contrib.test_utils.bt.bt_constants import sig_uuid_constants 50from acts_contrib.test_utils.fuchsia.sdp_records import sdp_pts_record_list 51 52import acts_contrib.test_utils.bt.gatt_test_database as gatt_test_database 53 54import cmd 55import pprint 56import time 57"""Various Global Strings""" 58BASE_UUID = sig_uuid_constants['BASE_UUID'] 59CMD_LOG = "CMD {} result: {}" 60FAILURE = "CMD {} threw exception: {}" 61BASIC_ADV_NAME = "fs_test" 62 63 64class CommandInput(cmd.Cmd): 65 ble_adv_interval = 1000 66 ble_adv_appearance = None 67 ble_adv_data_include_tx_power_level = False 68 ble_adv_include_name = True 69 ble_adv_include_scan_response = False 70 ble_adv_name = "fs_test" 71 ble_adv_data_manufacturer_data = None 72 ble_adv_data_service_data = None 73 ble_adv_data_service_uuid_list = None 74 ble_adv_data_uris = None 75 76 bt_control_ids = [] 77 bt_control_names = [] 78 bt_control_devices = [] 79 bt_scan_poll_timer = 0.5 80 target_device_name = "" 81 le_ids = [] 82 unique_mac_addr_id = None 83 84 def setup_vars(self, dut, target_device_name, log): 85 self.pri_dut = dut 86 # Note: test_dut is the start of a slow conversion from a Fuchsia specific 87 # Tool to an abstract_device tool. Only commands that use test_dut will work 88 # Otherwise this tool is primarially targeted at Fuchsia devices. 89 self.test_dut = create_bluetooth_device(self.pri_dut) 90 self.test_dut.initialize_bluetooth_controller() 91 self.target_device_name = target_device_name 92 self.log = log 93 94 def emptyline(self): 95 pass 96 97 def do_EOF(self, line): 98 "End Script" 99 return True 100 101 """ Useful Helper functions and cmd line tooling """ 102 103 def str_to_bool(self, s): 104 if s.lower() == 'true': 105 return True 106 elif s.lower() == 'false': 107 return False 108 109 def _find_unique_id_over_le(self): 110 scan_filter = {"name_substring": self.target_device_name} 111 self.unique_mac_addr_id = None 112 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 113 tries = 10 114 for i in range(tries): 115 time.sleep(self.bt_scan_poll_timer) 116 scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices( 117 )['result'] 118 for device in scan_res: 119 name, did, connectable = device["name"], device["id"], device[ 120 "connectable"] 121 if (self.target_device_name in name): 122 self.unique_mac_addr_id = did 123 self.log.info( 124 "Successfully found device: name, id: {}, {}".format( 125 name, did)) 126 break 127 if self.unique_mac_addr_id: 128 break 129 self.pri_dut.gattc_lib.bleStopBleScan() 130 131 def _find_unique_id_over_bt_control(self): 132 self.unique_mac_addr_id = None 133 self.bt_control_devices = [] 134 self.pri_dut.bts_lib.requestDiscovery(True) 135 tries = 10 136 for i in range(tries): 137 if self.unique_mac_addr_id: 138 break 139 time.sleep(self.bt_scan_poll_timer) 140 device_list = self.pri_dut.bts_lib.getKnownRemoteDevices( 141 )['result'] 142 for id_dict in device_list: 143 device = device_list[id_dict] 144 self.bt_control_devices.append(device) 145 name = None 146 if device['name'] is not None: 147 name = device['name'] 148 did, address = device['id'], device['address'] 149 150 self.bt_control_ids.append(did) 151 if name is not None: 152 self.bt_control_names.append(name) 153 if self.target_device_name in name: 154 self.unique_mac_addr_id = did 155 self.log.info( 156 "Successfully found device: name, id, address: {}, {}, {}" 157 .format(name, did, address)) 158 break 159 self.pri_dut.bts_lib.requestDiscovery(False) 160 161 def do_tool_take_bt_snoop_log(self, custom_name): 162 """ 163 Description: Takes the bt snoop log from the Fuchsia device. 164 Logs will show up in your config files' logpath directory. 165 166 Input(s): 167 custom_name: Optional. Override the default pcap file name. 168 169 Usage: tool_set_target_device_name new_target_device name 170 Examples: 171 tool_take_bt_snoop_log connection_error 172 tool_take_bt_snoop_log 173 """ 174 self.pri_dut.take_bt_snoop_log(custom_name) 175 176 def do_tool_refresh_unique_id(self, line): 177 """ 178 Description: Refresh command line tool mac unique id. 179 Usage: 180 Examples: 181 tool_refresh_unique_id 182 """ 183 try: 184 self._find_unique_id_over_le() 185 except Exception as err: 186 self.log.error( 187 "Failed to scan or find scan result: {}".format(err)) 188 189 def do_tool_refresh_unique_id_using_bt_control(self, line): 190 """ 191 Description: Refresh command line tool mac unique id. 192 Usage: 193 Examples: 194 tool_refresh_unique_id_using_bt_control 195 """ 196 try: 197 self._find_unique_id_over_bt_control() 198 except Exception as err: 199 self.log.error( 200 "Failed to scan or find scan result: {}".format(err)) 201 202 def do_tool_set_target_device_name(self, line): 203 """ 204 Description: Reset the target device name. 205 Input(s): 206 device_name: Required. The advertising name to connect to. 207 Usage: tool_set_target_device_name new_target_device name 208 Examples: 209 tool_set_target_device_name le_watch 210 """ 211 self.log.info("Setting target_device_name to: {}".format(line)) 212 self.target_device_name = line 213 214 def do_tool_set_unique_mac_addr_id(self, line): 215 """ 216 Description: Sets the unique mac address id (Specific to Fuchsia) 217 Input(s): 218 device_id: Required. The id to set the unique mac address id to 219 Usage: tool_set_unique_mac_addr_id device_id 220 Examples: 221 tool_set_unique_mac_addr_id 7fb2cae53aad9e0d 222 """ 223 self.unique_mac_addr_id = line 224 225 """Begin BLE advertise wrappers""" 226 227 def complete_ble_adv_data_include_name(self, text, line, begidx, endidx): 228 roles = ["true", "false"] 229 if not text: 230 completions = roles 231 else: 232 completions = [s for s in roles if s.startswith(text)] 233 return completions 234 235 def do_ble_adv_data_include_name(self, line): 236 cmd = "Include name in the advertisement." 237 try: 238 self.ble_adv_include_name = self.str_to_bool(line) 239 except Exception as err: 240 self.log.error(FAILURE.format(cmd, err)) 241 242 def do_ble_adv_data_set_name(self, line): 243 cmd = "Set the name to be included in the advertisement." 244 try: 245 self.ble_adv_name = line 246 except Exception as err: 247 self.log.error(FAILURE.format(cmd, err)) 248 249 def complete_ble_adv_data_set_appearance(self, text, line, begidx, endidx): 250 if not text: 251 completions = list(sig_appearance_constants.keys()) 252 else: 253 completions = [ 254 s for s in sig_appearance_constants.keys() 255 if s.startswith(text) 256 ] 257 return completions 258 259 def do_ble_adv_data_set_appearance(self, line): 260 cmd = "Set the appearance to known SIG values." 261 try: 262 self.ble_adv_appearance = line 263 except Exception as err: 264 self.log.error(FAILURE.format(cmd, err)) 265 266 def complete_ble_adv_data_include_tx_power_level(self, text, line, begidx, 267 endidx): 268 options = ['true', 'false'] 269 if not text: 270 completions = list(options)[:] 271 else: 272 completions = [s for s in options if s.startswith(text)] 273 return completions 274 275 def do_ble_adv_data_include_tx_power_level(self, line): 276 """Include the tx_power_level in the advertising data. 277 Description: Adds tx_power_level to the advertisement data to the BLE 278 advertisement. 279 Input(s): 280 value: Required. True or False 281 Usage: ble_adv_data_include_tx_power_level bool_value 282 Examples: 283 ble_adv_data_include_tx_power_level true 284 ble_adv_data_include_tx_power_level false 285 """ 286 cmd = "Include tx_power_level in advertisement." 287 try: 288 self.ble_adv_data_include_tx_power_level = self.str_to_bool(line) 289 except Exception as err: 290 self.log.info(FAILURE.format(cmd, err)) 291 292 def complete_ble_adv_include_scan_response(self, text, line, begidx, 293 endidx): 294 options = ['true', 'false'] 295 if not text: 296 completions = list(options)[:] 297 else: 298 completions = [s for s in options if s.startswith(text)] 299 return completions 300 301 def do_ble_adv_include_scan_response(self, line): 302 """Include scan response in advertisement. inputs: [true|false] 303 Note: Currently just sets the scan response data to the 304 Advertisement data. 305 """ 306 cmd = "Include tx_power_level in advertisement." 307 try: 308 self.ble_adv_include_scan_response = self.str_to_bool(line) 309 except Exception as err: 310 self.log.info(FAILURE.format(cmd, err)) 311 312 def do_ble_adv_data_add_manufacturer_data(self, line): 313 """Include manufacturer id and data to the advertisment 314 Description: Adds manufacturer data to the BLE advertisement. 315 Input(s): 316 id: Required. The int representing the manufacturer id. 317 data: Required. The string representing the data. 318 Usage: ble_adv_data_add_manufacturer_data id data 319 Examples: 320 ble_adv_data_add_manufacturer_data 1 test 321 """ 322 cmd = "Include manufacturer id and data to the advertisment." 323 try: 324 325 info = line.split() 326 if self.ble_adv_data_manufacturer_data is None: 327 self.ble_adv_data_manufacturer_data = [] 328 self.ble_adv_data_manufacturer_data.append({ 329 "id": int(info[0]), 330 "data": info[1] 331 }) 332 except Exception as err: 333 self.log.info(FAILURE.format(cmd, err)) 334 335 def do_ble_adv_data_add_service_data(self, line): 336 """Include service data to the advertisment 337 Description: Adds service data to the BLE advertisement. 338 Input(s): 339 uuid: Required. The string representing the uuid. 340 data: Required. The string representing the data. 341 Usage: ble_adv_data_add_service_data uuid data 342 Examples: 343 ble_adv_data_add_service_data 00001801-0000-1000-8000-00805f9b34fb test 344 """ 345 cmd = "Include manufacturer id and data to the advertisment." 346 try: 347 info = line.split() 348 if self.ble_adv_data_service_data is None: 349 self.ble_adv_data_service_data = [] 350 self.ble_adv_data_service_data.append({ 351 "uuid": info[0], 352 "data": info[1] 353 }) 354 except Exception as err: 355 self.log.info(FAILURE.format(cmd, err)) 356 357 def do_ble_adv_add_service_uuid_list(self, line): 358 """Include a list of service uuids to the advertisment: 359 Description: Adds service uuid list to the BLE advertisement. 360 Input(s): 361 uuid: Required. A list of N string UUIDs to add. 362 Usage: ble_adv_add_service_uuid_list uuid0 uuid1 ... uuidN 363 Examples: 364 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 365 ble_adv_add_service_uuid_list 00001801-0000-1000-8000-00805f9b34fb 00001802-0000-1000-8000-00805f9b34fb 366 """ 367 cmd = "Include service uuid list to the advertisment data." 368 try: 369 self.ble_adv_data_service_uuid_list = line 370 except Exception as err: 371 self.log.info(FAILURE.format(cmd, err)) 372 373 def do_ble_adv_data_set_uris(self, uris): 374 """Set the URIs of the LE advertisement data: 375 Description: Adds list of String UIRs 376 See (RFC 3986 1.1.2 https://tools.ietf.org/html/rfc3986) 377 Valid URI examples: 378 ftp://ftp.is.co.za/rfc/rfc1808.txt 379 http://www.ietf.org/rfc/rfc2396.txt 380 ldap://[2001:db8::7]/c=GB?objectClass?one 381 mailto:John.Doe@example.com 382 news:comp.infosystems.www.servers.unix 383 tel:+1-816-555-1212 384 telnet://192.0.2.16:80/ 385 urn:oasis:names:specification:docbook:dtd:xml:4.1.2 386 Input(s): 387 uris: Required. A list of URIs to add. 388 Usage: ble_adv_data_set_uris uri0 uri1 ... uriN 389 Examples: 390 ble_adv_data_set_uris telnet://192.0.2.16:80/ 391 ble_adv_data_set_uris tel:+1-816-555-1212 392 """ 393 cmd = "Set the appearance to known SIG values." 394 try: 395 self.ble_adv_data_uris = uris.split() 396 except Exception as err: 397 self.log.error(FAILURE.format(cmd, err)) 398 399 def start_advertisement(self, connectable): 400 """ Handle setting advertising data and the advertisement 401 Note: After advertisement is successful, clears values set for 402 * Manufacturer data 403 * Appearance information 404 * Scan Response 405 * Service UUIDs 406 * URI list 407 Args: 408 connectable: Bool of whether to start a connectable 409 advertisement or not. 410 """ 411 adv_data_name = self.ble_adv_name 412 if not self.ble_adv_include_name: 413 adv_data_name = None 414 415 manufacturer_data = self.ble_adv_data_manufacturer_data 416 417 tx_power_level = None 418 if self.ble_adv_data_include_tx_power_level: 419 tx_power_level = 1 # Not yet implemented so set to 1 420 421 scan_response = self.ble_adv_include_scan_response 422 423 adv_data = { 424 "name": adv_data_name, 425 "appearance": self.ble_adv_appearance, 426 "service_data": self.ble_adv_data_service_data, 427 "tx_power_level": tx_power_level, 428 "service_uuids": self.ble_adv_data_service_uuid_list, 429 "manufacturer_data": manufacturer_data, 430 "uris": self.ble_adv_data_uris, 431 } 432 433 if not self.ble_adv_include_scan_response: 434 scan_response = None 435 else: 436 scan_response = adv_data 437 438 result = self.pri_dut.ble_lib.bleStartBleAdvertising( 439 adv_data, scan_response, self.ble_adv_interval, connectable) 440 self.log.info("Result of starting advertisement: {}".format(result)) 441 self.ble_adv_data_manufacturer_data = None 442 self.ble_adv_appearance = None 443 self.ble_adv_include_scan_response = False 444 self.ble_adv_data_service_uuid_list = None 445 self.ble_adv_data_uris = None 446 self.ble_adv_data_service_data = None 447 448 def do_ble_start_generic_connectable_advertisement(self, line): 449 """ 450 Description: Start a connectable LE advertisement 451 452 Usage: ble_start_generic_connectable_advertisement 453 """ 454 cmd = "Start a connectable LE advertisement" 455 try: 456 connectable = True 457 self.start_advertisement(connectable) 458 except Exception as err: 459 self.log.error(FAILURE.format(cmd, err)) 460 461 def do_ble_start_generic_nonconnectable_advertisement(self, line): 462 """ 463 Description: Start a non-connectable LE advertisement 464 465 Usage: ble_start_generic_nonconnectable_advertisement 466 """ 467 cmd = "Start a nonconnectable LE advertisement" 468 try: 469 connectable = False 470 self.start_advertisement(connectable) 471 except Exception as err: 472 self.log.error(FAILURE.format(cmd, err)) 473 474 def do_ble_stop_advertisement(self, line): 475 """ 476 Description: Stop a BLE advertisement. 477 Usage: ble_stop_advertisement 478 """ 479 cmd = "Stop a connectable LE advertisement" 480 try: 481 self.pri_dut.ble_lib.bleStopBleAdvertising() 482 except Exception as err: 483 self.log.error(FAILURE.format(cmd, err)) 484 485 """End BLE advertise wrappers""" 486 """Begin GATT client wrappers""" 487 488 def complete_gattc_connect_by_id(self, text, line, begidx, endidx): 489 if not text: 490 completions = list(self.le_ids)[:] 491 else: 492 completions = [s for s in self.le_ids if s.startswith(text)] 493 return completions 494 495 def do_gattc_connect_by_id(self, line): 496 """ 497 Description: Connect to a LE peripheral. 498 Input(s): 499 device_id: Required. The unique device ID from Fuchsia 500 discovered devices. 501 Usage: 502 Examples: 503 gattc_connect device_id 504 """ 505 cmd = "Connect to a LE peripheral by input ID." 506 try: 507 508 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 509 line) 510 self.log.info("Connection status: {}".format( 511 pprint.pformat(connection_status))) 512 except Exception as err: 513 self.log.error(FAILURE.format(cmd, err)) 514 515 def do_gattc_connect(self, line): 516 """ 517 Description: Connect to a LE peripheral. 518 Optional input: device_name 519 Input(s): 520 device_name: Optional. The peripheral ID to connect to. 521 Usage: 522 Examples: 523 gattc_connect 524 gattc_connect eddystone_123 525 """ 526 cmd = "Connect to a LE peripheral." 527 try: 528 if len(line) > 0: 529 self.target_device_name = line 530 self.unique_mac_addr_id = None 531 if not self.unique_mac_addr_id: 532 try: 533 self._find_unique_id() 534 except Exception as err: 535 self.log.info("Failed to scan or find device.") 536 return 537 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 538 self.unique_mac_addr_id) 539 self.log.info("Connection status: {}".format( 540 pprint.pformat(connection_status))) 541 except Exception as err: 542 self.log.error(FAILURE.format(cmd, err)) 543 544 def do_gattc_connect_disconnect_iterations(self, line): 545 """ 546 Description: Connect then disconnect to a LE peripheral multiple times. 547 Input(s): 548 iterations: Required. The number of iterations to run. 549 Usage: 550 Examples: 551 gattc_connect_disconnect_iterations 10 552 """ 553 cmd = "Connect to a LE peripheral." 554 try: 555 if not self.unique_mac_addr_id: 556 try: 557 self._find_unique_id() 558 except Exception as err: 559 self.log.info("Failed to scan or find device.") 560 return 561 for i in range(int(line)): 562 self.log.info("Running iteration {}".format(i + 1)) 563 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 564 self.unique_mac_addr_id) 565 self.log.info("Connection status: {}".format( 566 pprint.pformat(connection_status))) 567 time.sleep(4) 568 disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 569 self.unique_mac_addr_id) 570 self.log.info("Disconnect status: {}".format(disc_status)) 571 time.sleep(3) 572 except Exception as err: 573 self.log.error(FAILURE.format(cmd, err)) 574 575 def do_gattc_disconnect(self, line): 576 """ 577 Description: Disconnect from LE peripheral. 578 Assumptions: Already connected to a peripheral. 579 Usage: 580 Examples: 581 gattc_disconnect 582 """ 583 cmd = "Disconenct from LE peripheral." 584 try: 585 disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 586 self.unique_mac_addr_id) 587 self.log.info("Disconnect status: {}".format(disconnect_status)) 588 except Exception as err: 589 self.log.error(FAILURE.format(cmd, err)) 590 591 def do_gattc_list_services(self, discover_chars): 592 """ 593 Description: List services from LE peripheral. 594 Assumptions: Already connected to a peripheral. 595 Input(s): 596 discover_chars: Optional. An optional input to discover all 597 characteristics on the service. 598 Usage: 599 Examples: 600 gattc_list_services 601 gattc_list_services true 602 """ 603 cmd = "List services from LE peripheral." 604 try: 605 606 services = self.pri_dut.gattc_lib.listServices( 607 self.unique_mac_addr_id) 608 self.log.info("Discovered Services: \n{}".format( 609 pprint.pformat(services))) 610 discover_characteristics = self.str_to_bool(discover_chars) 611 if discover_chars: 612 for service in services.get('result'): 613 self.pri_dut.gattc_lib.connectToService( 614 self.unique_mac_addr_id, service.get('id')) 615 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 616 self.log.info("Discovered chars:\n{}".format( 617 pprint.pformat(chars))) 618 619 except Exception as err: 620 self.log.error(FAILURE.format(cmd, err)) 621 622 def do_gattc_connect_to_service(self, line): 623 """ 624 Description: Connect to Peripheral GATT server service. 625 Assumptions: Already connected to peripheral. 626 Input(s): 627 service_id: Required. The service id reference on the GATT server. 628 Usage: 629 Examples: 630 gattc_connect_to_service service_id 631 """ 632 cmd = "GATT client connect to GATT server service." 633 try: 634 self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id, 635 int(line)) 636 except Exception as err: 637 self.log.error(FAILURE.format(cmd, err)) 638 639 def do_gattc_discover_characteristics(self, line): 640 """ 641 Description: Discover characteristics from a connected service. 642 Assumptions: Already connected to a GATT server service. 643 Usage: 644 Examples: 645 gattc_discover_characteristics 646 """ 647 cmd = "Discover and list characteristics from a GATT server." 648 try: 649 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 650 self.log.info("Discovered chars:\n{}".format( 651 pprint.pformat(chars))) 652 except Exception as err: 653 self.log.error(FAILURE.format(cmd, err)) 654 655 def do_gattc_notify_all_chars(self, line): 656 """ 657 Description: Enable all notifications on all Characteristics on 658 a GATT server. 659 Assumptions: Basic GATT connection made. 660 Usage: 661 Examples: 662 gattc_notify_all_chars 663 """ 664 cmd = "Read all characteristics from the GATT service." 665 try: 666 services = self.pri_dut.gattc_lib.listServices( 667 self.unique_mac_addr_id) 668 for service in services['result']: 669 service_id = service['id'] 670 service_uuid = service['uuid_type'] 671 self.pri_dut.gattc_lib.connectToService( 672 self.unique_mac_addr_id, service_id) 673 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 674 print("Reading chars in service uuid: {}".format(service_uuid)) 675 676 for char in chars['result']: 677 char_id = char['id'] 678 char_uuid = char['uuid_type'] 679 # quick char filter for apple-4 test... remove later 680 print("found uuid {}".format(char_uuid)) 681 try: 682 self.pri_dut.gattc_lib.enableNotifyCharacteristic( 683 char_id) 684 except Exception as err: 685 print("error enabling notification") 686 except Exception as err: 687 self.log.error(FAILURE.format(cmd, err)) 688 689 def do_gattc_read_all_chars(self, line): 690 """ 691 Description: Read all Characteristic values from a GATT server across 692 all services. 693 Assumptions: Basic GATT connection made. 694 Usage: 695 Examples: 696 gattc_read_all_chars 697 """ 698 cmd = "Read all characteristics from the GATT service." 699 try: 700 services = self.pri_dut.gattc_lib.listServices( 701 self.unique_mac_addr_id) 702 for service in services['result']: 703 service_id = service['id'] 704 service_uuid = service['uuid_type'] 705 self.pri_dut.gattc_lib.connectToService( 706 self.unique_mac_addr_id, service_id) 707 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 708 print("Reading chars in service uuid: {}".format(service_uuid)) 709 710 for char in chars['result']: 711 char_id = char['id'] 712 char_uuid = char['uuid_type'] 713 try: 714 read_val = \ 715 self.pri_dut.gattc_lib.readCharacteristicById( 716 char_id) 717 print(" Characteristic uuid / Value: {} / {}".format( 718 char_uuid, read_val['result'])) 719 str_value = "" 720 for val in read_val['result']: 721 str_value += chr(val) 722 print(" str val: {}".format(str_value)) 723 except Exception as err: 724 print(err) 725 pass 726 except Exception as err: 727 self.log.error(FAILURE.format(cmd, err)) 728 729 def do_gattc_read_all_desc(self, line): 730 """ 731 Description: Read all Descriptors values from a GATT server across 732 all services. 733 Assumptions: Basic GATT connection made. 734 Usage: 735 Examples: 736 gattc_read_all_chars 737 """ 738 cmd = "Read all descriptors from the GATT service." 739 try: 740 services = self.pri_dut.gattc_lib.listServices( 741 self.unique_mac_addr_id) 742 for service in services['result']: 743 service_id = service['id'] 744 service_uuid = service['uuid_type'] 745 self.pri_dut.gattc_lib.connectToService( 746 self.unique_mac_addr_id, service_id) 747 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 748 print("Reading descs in service uuid: {}".format(service_uuid)) 749 750 for char in chars['result']: 751 char_id = char['id'] 752 char_uuid = char['uuid_type'] 753 descriptors = char['descriptors'] 754 print(" Reading descs in char uuid: {}".format(char_uuid)) 755 for desc in descriptors: 756 desc_id = desc["id"] 757 desc_uuid = desc["uuid_type"] 758 try: 759 read_val = self.pri_dut.gattc_lib.readDescriptorById( 760 desc_id) 761 print(" Descriptor uuid / Value: {} / {}".format( 762 desc_uuid, read_val['result'])) 763 except Exception as err: 764 pass 765 except Exception as err: 766 self.log.error(FAILURE.format(cmd, err)) 767 768 def do_gattc_write_all_desc(self, line): 769 """ 770 Description: Write a value to all Descriptors on the GATT server. 771 Assumptions: Basic GATT connection made. 772 Input(s): 773 offset: Required. The offset to start writing to. 774 size: Required. The size of bytes to write (value will be generated). 775 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 776 Usage: 777 Examples: 778 gattc_write_all_desc 0 100 779 gattc_write_all_desc 10 2 780 """ 781 cmd = "Read all descriptors from the GATT service." 782 try: 783 args = line.split() 784 if len(args) != 2: 785 self.log.info("2 Arguments required: [Offset] [Size]") 786 return 787 offset = int(args[0]) 788 size = args[1] 789 write_value = [] 790 for i in range(int(size)): 791 write_value.append(i % 256) 792 services = self.pri_dut.gattc_lib.listServices( 793 self.unique_mac_addr_id) 794 for service in services['result']: 795 service_id = service['id'] 796 service_uuid = service['uuid_type'] 797 self.pri_dut.gattc_lib.connectToService( 798 self.unique_mac_addr_id, service_id) 799 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 800 print("Writing descs in service uuid: {}".format(service_uuid)) 801 802 for char in chars['result']: 803 char_id = char['id'] 804 char_uuid = char['uuid_type'] 805 descriptors = char['descriptors'] 806 print(" Reading descs in char uuid: {}".format(char_uuid)) 807 for desc in descriptors: 808 desc_id = desc["id"] 809 desc_uuid = desc["uuid_type"] 810 try: 811 write_val = self.pri_dut.gattc_lib.writeDescriptorById( 812 desc_id, offset, write_value) 813 print(" Descriptor uuid / Result: {} / {}".format( 814 desc_uuid, write_val['result'])) 815 except Exception as err: 816 pass 817 except Exception as err: 818 self.log.error(FAILURE.format(cmd, err)) 819 820 def do_gattc_read_all_long_desc(self, line): 821 """ 822 Description: Read all long Characteristic Descriptors 823 Assumptions: Basic GATT connection made. 824 Input(s): 825 offset: Required. The offset to start reading from. 826 max_bytes: Required. The max size of bytes to return. 827 Usage: 828 Examples: 829 gattc_read_all_long_desc 0 100 830 gattc_read_all_long_desc 10 20 831 """ 832 cmd = "Read all long descriptors from the GATT service." 833 try: 834 args = line.split() 835 if len(args) != 2: 836 self.log.info("2 Arguments required: [Offset] [Size]") 837 return 838 offset = int(args[0]) 839 max_bytes = int(args[1]) 840 services = self.pri_dut.ble_lib.bleListServices( 841 self.unique_mac_addr_id) 842 for service in services['result']: 843 service_id = service['id'] 844 service_uuid = service['uuid_type'] 845 self.pri_dut.gattc_lib.connectToService( 846 self.unique_mac_addr_id, service_id) 847 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 848 print("Reading descs in service uuid: {}".format(service_uuid)) 849 850 for char in chars['result']: 851 char_id = char['id'] 852 char_uuid = char['uuid_type'] 853 descriptors = char['descriptors'] 854 print(" Reading descs in char uuid: {}".format(char_uuid)) 855 for desc in descriptors: 856 desc_id = desc["id"] 857 desc_uuid = desc["uuid_type"] 858 try: 859 read_val = self.pri_dut.gattc_lib.readLongDescriptorById( 860 desc_id, offset, max_bytes) 861 print(" Descriptor uuid / Result: {} / {}".format( 862 desc_uuid, read_val['result'])) 863 except Exception as err: 864 pass 865 except Exception as err: 866 self.log.error(FAILURE.format(cmd, err)) 867 868 def do_gattc_read_all_long_char(self, line): 869 """ 870 Description: Read all long Characteristic 871 Assumptions: Basic GATT connection made. 872 Input(s): 873 offset: Required. The offset to start reading from. 874 max_bytes: Required. The max size of bytes to return. 875 Usage: 876 Examples: 877 gattc_read_all_long_char 0 100 878 gattc_read_all_long_char 10 20 879 """ 880 cmd = "Read all long Characteristics from the GATT service." 881 try: 882 args = line.split() 883 if len(args) != 2: 884 self.log.info("2 Arguments required: [Offset] [Size]") 885 return 886 offset = int(args[0]) 887 max_bytes = int(args[1]) 888 services = self.pri_dut.ble_lib.bleListServices( 889 self.unique_mac_addr_id) 890 for service in services['result']: 891 service_id = service['id'] 892 service_uuid = service['uuid_type'] 893 self.pri_dut.gattc_lib.connectToService( 894 self.unique_mac_addr_id, service_id) 895 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 896 print("Reading chars in service uuid: {}".format(service_uuid)) 897 898 for char in chars['result']: 899 char_id = char['id'] 900 char_uuid = char['uuid_type'] 901 try: 902 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 903 char_id, offset, max_bytes) 904 print(" Char uuid / Result: {} / {}".format( 905 char_uuid, read_val['result'])) 906 except Exception as err: 907 pass 908 except Exception as err: 909 self.log.error(FAILURE.format(cmd, err)) 910 911 def do_gattc_write_all_chars(self, line): 912 """ 913 Description: Write all characteristic values from a GATT server across 914 all services. 915 Assumptions: Basic GATT connection made. 916 Input(s): 917 offset: Required. The offset to start writing on. 918 size: The write value size (value will be generated) 919 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 920 Usage: 921 Examples: 922 gattc_write_all_chars 0 10 923 gattc_write_all_chars 10 1 924 """ 925 cmd = "Read all characteristics from the GATT service." 926 try: 927 args = line.split() 928 if len(args) != 2: 929 self.log.info("2 Arguments required: [Offset] [Size]") 930 return 931 offset = int(args[0]) 932 size = int(args[1]) 933 write_value = [] 934 for i in range(size): 935 write_value.append(i % 256) 936 services = self.pri_dut.gattc_lib.listServices( 937 self.unique_mac_addr_id) 938 for service in services['result']: 939 service_id = service['id'] 940 service_uuid = service['uuid_type'] 941 self.pri_dut.gattc_lib.connectToService( 942 self.unique_mac_addr_id, service_id) 943 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 944 print("Writing chars in service uuid: {}".format(service_uuid)) 945 946 for char in chars['result']: 947 char_id = char['id'] 948 char_uuid = char['uuid_type'] 949 try: 950 write_result = self.pri_dut.gattc_lib.writeCharById( 951 char_id, offset, write_value) 952 print(" Characteristic uuid write result: {} / {}". 953 format(char_uuid, write_result['result'])) 954 except Exception as err: 955 print("error writing char {}".format(err)) 956 pass 957 except Exception as err: 958 self.log.error(FAILURE.format(cmd, err)) 959 960 def do_gattc_write_all_chars_without_response(self, line): 961 """ 962 Description: Write all characteristic values from a GATT server across 963 all services. 964 Assumptions: Basic GATT connection made. 965 Input(s): 966 size: The write value size (value will be generated). 967 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 968 Usage: 969 Examples: 970 gattc_write_all_chars_without_response 100 971 """ 972 cmd = "Read all characteristics from the GATT service." 973 try: 974 args = line.split() 975 if len(args) != 1: 976 self.log.info("1 Arguments required: [Size]") 977 return 978 size = int(args[0]) 979 write_value = [] 980 for i in range(size): 981 write_value.append(i % 256) 982 services = self.pri_dut.gattc_lib.listServices( 983 self.unique_mac_addr_id) 984 for service in services['result']: 985 service_id = service['id'] 986 service_uuid = service['uuid_type'] 987 self.pri_dut.gattc_lib.connectToService( 988 self.unique_mac_addr_id, service_id) 989 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 990 print("Reading chars in service uuid: {}".format(service_uuid)) 991 992 for char in chars['result']: 993 char_id = char['id'] 994 char_uuid = char['uuid_type'] 995 try: 996 write_result = \ 997 self.pri_dut.gattc_lib.writeCharByIdWithoutResponse( 998 char_id, write_value) 999 print(" Characteristic uuid write result: {} / {}". 1000 format(char_uuid, write_result['result'])) 1001 except Exception as err: 1002 pass 1003 except Exception as err: 1004 self.log.error(FAILURE.format(cmd, err)) 1005 1006 def do_gattc_write_char_by_id(self, line): 1007 """ 1008 Description: Write char by characteristic id reference. 1009 Assumptions: Already connected to a GATT server service. 1010 Input(s): 1011 characteristic_id: The characteristic id reference on the GATT 1012 service 1013 offset: The offset value to use 1014 size: Function will generate random bytes by input size. 1015 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1016 Usage: 1017 Examples: 1018 gattc_write_char_by_id char_id 0 5 1019 gattc_write_char_by_id char_id 20 1 1020 """ 1021 cmd = "Write to GATT server characteristic ." 1022 try: 1023 args = line.split() 1024 if len(args) != 3: 1025 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1026 return 1027 id = int(args[0], 16) 1028 offset = int(args[1]) 1029 size = int(args[2]) 1030 write_value = [] 1031 for i in range(size): 1032 write_value.append(i % 256) 1033 self.test_dut.gatt_client_write_characteristic_by_handle( 1034 self.unique_mac_addr_id, id, offset, write_value) 1035 except Exception as err: 1036 self.log.error(FAILURE.format(cmd, err)) 1037 1038 def do_gattc_write_long_char_by_id(self, line): 1039 """ 1040 Description: Write long char by characteristic id reference. 1041 Assumptions: Already connected to a GATT server service. 1042 Input(s): 1043 characteristic_id: The characteristic id reference on the GATT 1044 service 1045 offset: The offset value to use 1046 size: Function will generate random bytes by input size. 1047 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1048 reliable_mode: Optional: Reliable writes represented as bool 1049 Usage: 1050 Examples: 1051 gattc_write_long_char_by_id char_id 0 5 1052 gattc_write_long_char_by_id char_id 20 1 1053 gattc_write_long_char_by_id char_id 20 1 true 1054 gattc_write_long_char_by_id char_id 20 1 false 1055 """ 1056 cmd = "Long Write to GATT server characteristic ." 1057 try: 1058 args = line.split() 1059 if len(args) < 3: 1060 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1061 return 1062 id = int(args[0], 16) 1063 offset = int(args[1]) 1064 size = int(args[2]) 1065 reliable_mode = False 1066 if len(args) > 3: 1067 reliable_mode = self.str_to_bool(args[3]) 1068 write_value = [] 1069 for i in range(size): 1070 write_value.append(i % 256) 1071 self.test_dut.gatt_client_write_long_characteristic_by_handle( 1072 self.unique_mac_addr_id, id, offset, write_value, 1073 reliable_mode) 1074 except Exception as err: 1075 self.log.error(FAILURE.format(cmd, err)) 1076 1077 def do_gattc_write_long_desc_by_id(self, line): 1078 """ 1079 Description: Write long char by descrioptor id reference. 1080 Assumptions: Already connected to a GATT server service. 1081 Input(s): 1082 characteristic_id: The characteristic id reference on the GATT 1083 service 1084 offset: The offset value to use 1085 size: Function will generate random bytes by input size. 1086 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1087 Usage: 1088 Examples: 1089 gattc_write_long_desc_by_id char_id 0 5 1090 gattc_write_long_desc_by_id char_id 20 1 1091 """ 1092 cmd = "Long Write to GATT server descriptor ." 1093 try: 1094 args = line.split() 1095 if len(args) != 3: 1096 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1097 return 1098 id = int(args[0], 16) 1099 offset = int(args[1]) 1100 size = int(args[2]) 1101 write_value = [] 1102 for i in range(size): 1103 write_value.append(i % 256) 1104 self.test_dut.gatt_client_write_long_descriptor_by_handle( 1105 self.unique_mac_addr_id, id, offset, write_value) 1106 except Exception as err: 1107 self.log.error(FAILURE.format(cmd, err)) 1108 1109 def do_gattc_write_char_by_id_without_response(self, line): 1110 """ 1111 Description: Write char by characteristic id reference without response. 1112 Assumptions: Already connected to a GATT server service. 1113 Input(s): 1114 characteristic_id: The characteristic id reference on the GATT 1115 service 1116 size: Function will generate random bytes by input size. 1117 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1118 Usage: 1119 Examples: 1120 gattc_write_char_by_id_without_response char_id 5 1121 """ 1122 cmd = "Write characteristic by id without response." 1123 try: 1124 args = line.split() 1125 if len(args) != 2: 1126 self.log.info("2 Arguments required: [Id] [Size]") 1127 return 1128 id = int(args[0], 16) 1129 size = args[1] 1130 write_value = [] 1131 for i in range(int(size)): 1132 write_value.append(i % 256) 1133 self.test_dut.gatt_client_write_characteristic_without_response_by_handle( 1134 self.unique_mac_addr_id, id, write_value) 1135 except Exception as err: 1136 self.log.error(FAILURE.format(cmd, err)) 1137 1138 def do_gattc_enable_notify_char_by_id(self, line): 1139 """ 1140 Description: Enable Characteristic notification on Characteristic ID. 1141 Assumptions: Already connected to a GATT server service. 1142 Input(s): 1143 characteristic_id: The characteristic id reference on the GATT 1144 service 1145 Usage: 1146 Examples: 1147 gattc_enable_notify_char_by_id char_id 1148 """ 1149 cmd = "Enable notifications by Characteristic id." 1150 try: 1151 id = int(line, 16) 1152 self.test_dut.gatt_client_enable_notifiy_characteristic_by_handle( 1153 self.unique_mac_addr_id, id) 1154 except Exception as err: 1155 self.log.error(FAILURE.format(cmd, err)) 1156 1157 def do_gattc_disable_notify_char_by_id(self, line): 1158 """ 1159 Description: Disable Characteristic notification on Characteristic ID. 1160 Assumptions: Already connected to a GATT server service. 1161 Input(s): 1162 characteristic_id: The characteristic id reference on the GATT 1163 service 1164 Usage: 1165 Examples: 1166 gattc_disable_notify_char_by_id char_id 1167 """ 1168 cmd = "Disable notify Characteristic by id." 1169 try: 1170 id = int(line, 16) 1171 self.test_dut.gatt_client_disable_notifiy_characteristic_by_handle( 1172 self.unique_mac_addr_id, id) 1173 except Exception as err: 1174 self.log.error(FAILURE.format(cmd, err)) 1175 1176 def do_gattc_read_char_by_id(self, line): 1177 """ 1178 Description: Read Characteristic by ID. 1179 Assumptions: Already connected to a GATT server service. 1180 Input(s): 1181 characteristic_id: The characteristic id reference on the GATT 1182 service 1183 Usage: 1184 Examples: 1185 gattc_read_char_by_id char_id 1186 """ 1187 cmd = "Read Characteristic value by ID." 1188 try: 1189 id = int(line, 16) 1190 read_val = self.test_dut.gatt_client_read_characteristic_by_handle( 1191 self.unique_mac_addr_id, id) 1192 self.log.info("Characteristic Value with id {}: {}".format( 1193 id, read_val)) 1194 except Exception as err: 1195 self.log.error(FAILURE.format(cmd, err)) 1196 1197 def do_gattc_read_char_by_uuid(self, characteristic_uuid): 1198 """ 1199 Description: Read Characteristic by UUID (read by type). 1200 Assumptions: Already connected to a GATT server service. 1201 Input(s): 1202 characteristic_uuid: The characteristic id reference on the GATT 1203 service 1204 Usage: 1205 Examples: 1206 gattc_read_char_by_id char_id 1207 """ 1208 cmd = "Read Characteristic value by ID." 1209 try: 1210 short_uuid_len = 4 1211 if len(characteristic_uuid) == short_uuid_len: 1212 characteristic_uuid = BASE_UUID.format(characteristic_uuid) 1213 1214 read_val = self.test_dut.gatt_client_read_characteristic_by_uuid( 1215 self.unique_mac_addr_id, characteristic_uuid) 1216 self.log.info("Characteristic Value with id {}: {}".format( 1217 id, read_val)) 1218 except Exception as err: 1219 self.log.error(FAILURE.format(cmd, err)) 1220 1221 def do_gattc_write_desc_by_id(self, line): 1222 """ 1223 Description: Write Descriptor by characteristic id reference. 1224 Assumptions: Already connected to a GATT server service. 1225 Input(s): 1226 descriptor_id: The Descriptor id reference on the GATT service 1227 offset: The offset value to use 1228 size: Function will generate random bytes by input size. 1229 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 1230 Usage: 1231 Examples: 1232 gattc_write_desc_by_id desc_id 0 5 1233 gattc_write_desc_by_id desc_id 20 1 1234 """ 1235 cmd = "Write Descriptor by id." 1236 try: 1237 args = line.split() 1238 id = int(args[0], 16) 1239 offset = int(args[1]) 1240 size = args[2] 1241 write_value = [] 1242 for i in range(int(size)): 1243 write_value.append(i % 256) 1244 write_result = self.test_dut.gatt_client_write_descriptor_by_handle( 1245 self.unique_mac_addr_id, id, offset, write_value) 1246 self.log.info("Descriptor Write result {}: {}".format( 1247 id, write_result)) 1248 except Exception as err: 1249 self.log.error(FAILURE.format(cmd, err)) 1250 1251 def do_gattc_read_desc_by_id(self, line): 1252 """ 1253 Description: Read Descriptor by ID. 1254 Assumptions: Already connected to a GATT server service. 1255 Input(s): 1256 descriptor_id: The Descriptor id reference on the GATT service 1257 Usage: 1258 Examples: 1259 gattc_read_desc_by_id desc_id 1260 """ 1261 cmd = "Read Descriptor by ID." 1262 try: 1263 id = int(line, 16) 1264 read_val = self.test_dut.gatt_client_read_descriptor_by_handle( 1265 self.unique_mac_addr_id, id) 1266 self.log.info("Descriptor Value with id {}: {}".format( 1267 id, read_val)) 1268 except Exception as err: 1269 self.log.error(FAILURE.format(cmd, err)) 1270 1271 def do_gattc_read_long_char_by_id(self, line): 1272 """ 1273 Description: Read long Characteristic value by id. 1274 Assumptions: Already connected to a GATT server service. 1275 Input(s): 1276 characteristic_id: The characteristic id reference on the GATT 1277 service 1278 offset: The offset value to use. 1279 max_bytes: The max bytes size to return. 1280 Usage: 1281 Examples: 1282 gattc_read_long_char_by_id char_id 0 10 1283 gattc_read_long_char_by_id char_id 20 1 1284 """ 1285 cmd = "Read long Characteristic value by id." 1286 try: 1287 args = line.split() 1288 if len(args) != 3: 1289 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 1290 return 1291 id = int(args[0], 16) 1292 offset = int(args[1]) 1293 max_bytes = int(args[2]) 1294 read_val = self.test_dut.gatt_client_read_long_characteristic_by_handle( 1295 self.unique_mac_addr_id, id, offset, max_bytes) 1296 self.log.info("Characteristic Value with id {}: {}".format( 1297 id, read_val['result'])) 1298 1299 except Exception as err: 1300 self.log.error(FAILURE.format(cmd, err)) 1301 1302 """End GATT client wrappers""" 1303 """Begin LE scan wrappers""" 1304 1305 def _update_scan_results(self, scan_results): 1306 self.le_ids = [] 1307 for scan in scan_results['result']: 1308 self.le_ids.append(scan['id']) 1309 1310 def do_ble_start_scan(self, line): 1311 """ 1312 Description: Perform a BLE scan. 1313 Default filter name: "" 1314 Optional input: filter_device_name 1315 Usage: 1316 Examples: 1317 ble_start_scan 1318 ble_start_scan eddystone 1319 """ 1320 cmd = "Perform a BLE scan and list discovered devices." 1321 try: 1322 scan_filter = {"name_substring": ""} 1323 if line: 1324 scan_filter = {"name_substring": line} 1325 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 1326 except Exception as err: 1327 self.log.error(FAILURE.format(cmd, err)) 1328 1329 def do_ble_stop_scan(self, line): 1330 """ 1331 Description: Stops a BLE scan and returns discovered devices. 1332 Usage: 1333 Examples: 1334 ble_stop_scan 1335 """ 1336 cmd = "Stops a BLE scan and returns discovered devices." 1337 try: 1338 scan_results = self.pri_dut.gattc_lib.bleStopBleScan() 1339 self._update_scan_results(scan_results) 1340 self.log.info(pprint.pformat(scan_results)) 1341 except Exception as err: 1342 self.log.error(FAILURE.format(cmd, err)) 1343 1344 def do_ble_get_discovered_devices(self, line): 1345 """ 1346 Description: Get discovered LE devices of an active scan. 1347 Usage: 1348 Examples: 1349 ble_stop_scan 1350 """ 1351 cmd = "Get discovered LE devices of an active scan." 1352 try: 1353 scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices() 1354 self._update_scan_results(scan_results) 1355 self.log.info(pprint.pformat(scan_results)) 1356 except Exception as err: 1357 self.log.error(FAILURE.format(cmd, err)) 1358 1359 """End LE scan wrappers""" 1360 """Begin GATT Server wrappers""" 1361 1362 def do_gatts_close(self, line): 1363 """ 1364 Description: Close active GATT server. 1365 1366 Usage: 1367 Examples: 1368 gatts_close 1369 """ 1370 cmd = "Close active GATT server." 1371 try: 1372 result = self.pri_dut.gatts_lib.closeServer() 1373 self.log.info(result) 1374 except Exception as err: 1375 self.log.error(FAILURE.format(cmd, err)) 1376 1377 def complete_gatts_setup_database(self, text, line, begidx, endidx): 1378 if not text: 1379 completions = list( 1380 gatt_test_database.GATT_SERVER_DB_MAPPING.keys()) 1381 else: 1382 completions = [ 1383 s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys() 1384 if s.startswith(text) 1385 ] 1386 return completions 1387 1388 def do_gatts_setup_database(self, line): 1389 """ 1390 Description: Setup a Gatt server database based on pre-defined inputs. 1391 Supports Tab Autocomplete. 1392 Input(s): 1393 descriptor_db_name: The descriptor db name that matches one in 1394 acts_contrib.test_utils.bt.gatt_test_database 1395 Usage: 1396 Examples: 1397 gatts_setup_database LARGE_DB_1 1398 """ 1399 cmd = "Setup GATT Server Database Based of pre-defined dictionaries" 1400 try: 1401 scan_results = self.pri_dut.gatts_lib.publishServer( 1402 gatt_test_database.GATT_SERVER_DB_MAPPING.get(line)) 1403 self.log.info(scan_results) 1404 except Exception as err: 1405 self.log.error(FAILURE.format(cmd, err)) 1406 1407 """End GATT Server wrappers""" 1408 """Begin Bluetooth Controller wrappers""" 1409 1410 def complete_btc_pair(self, text, line, begidx, endidx): 1411 """ Provides auto-complete for btc_pair cmd. 1412 1413 See Cmd module for full description. 1414 """ 1415 arg_completion = len(line.split(" ")) - 1 1416 pairing_security_level_options = ['ENCRYPTED', 'AUTHENTICATED', 'NONE'] 1417 bondable_options = ['BONDABLE', 'NON_BONDABLE', 'NONE'] 1418 transport_options = ['BREDR', 'LE'] 1419 if arg_completion == 1: 1420 if not text: 1421 completions = pairing_security_level_options 1422 else: 1423 completions = [ 1424 s for s in pairing_security_level_options 1425 if s.startswith(text) 1426 ] 1427 return completions 1428 if arg_completion == 2: 1429 if not text: 1430 completions = bondable_options 1431 else: 1432 completions = [ 1433 s for s in bondable_options if s.startswith(text) 1434 ] 1435 return completions 1436 if arg_completion == 3: 1437 if not text: 1438 completions = transport_options 1439 else: 1440 completions = [ 1441 s for s in transport_options if s.startswith(text) 1442 ] 1443 return completions 1444 1445 def do_btc_pair(self, line): 1446 """ 1447 Description: Sends an outgoing pairing request. 1448 1449 Input(s): 1450 pairing security level: ENCRYPTED, AUTHENTICATED, or NONE 1451 bondable: BONDABLE, NON_BONDABLE, or NONE 1452 transport: BREDR or LE 1453 1454 Usage: 1455 Examples: 1456 btc_pair NONE NONE BREDR 1457 btc_pair ENCRYPTED NONE LE 1458 btc_pair AUTHENTICATED NONE LE 1459 btc_pair NONE NON_BONDABLE BREDR 1460 """ 1461 cmd = "Send an outgoing pairing request." 1462 pairing_security_level_mapping = { 1463 "ENCRYPTED": 1, 1464 "AUTHENTICATED": 2, 1465 "NONE": None, 1466 } 1467 1468 bondable_mapping = { 1469 "BONDABLE": True, 1470 "NON_BONDABLE": False, 1471 "NONE": None, 1472 } 1473 1474 transport_mapping = { 1475 "BREDR": 1, 1476 "LE": 2, 1477 } 1478 1479 try: 1480 options = line.split(" ") 1481 result = self.test_dut.init_pair( 1482 self.unique_mac_addr_id, 1483 pairing_security_level_mapping.get(options[0]), 1484 bondable_mapping.get(options[1]), 1485 transport_mapping.get(options[2]), 1486 ) 1487 self.log.info(result) 1488 except Exception as err: 1489 self.log.error(FAILURE.format(cmd, err)) 1490 1491 def complete_btc_accept_pairing(self, text, line, begidx, endidx): 1492 """ Provides auto-complete for btc_set_io_capabilities cmd. 1493 1494 See Cmd module for full description. 1495 """ 1496 arg_completion = len(line.split(" ")) - 1 1497 input_options = ['NONE', 'CONFIRMATION', 'KEYBOARD'] 1498 output_options = ['NONE', 'DISPLAY'] 1499 if arg_completion == 1: 1500 if not text: 1501 completions = input_options 1502 else: 1503 completions = [s for s in input_options if s.startswith(text)] 1504 return completions 1505 if arg_completion == 2: 1506 if not text: 1507 completions = output_options 1508 else: 1509 completions = [s for s in output_options if s.startswith(text)] 1510 return completions 1511 1512 def do_btc_accept_pairing(self, line): 1513 """ 1514 Description: Accept all incoming pairing requests. 1515 1516 Input(s): 1517 input: String - The input I/O capabilities to use 1518 Available Values: 1519 NONE - Input capability type None 1520 CONFIRMATION - Input capability type confirmation 1521 KEYBOARD - Input capability type Keyboard 1522 output: String - The output I/O Capabilities to use 1523 Available Values: 1524 NONE - Output capability type None 1525 DISPLAY - output capability type Display 1526 1527 Usage: 1528 Examples: 1529 btc_accept_pairing 1530 btc_accept_pairing NONE DISPLAY 1531 btc_accept_pairing NONE NONE 1532 btc_accept_pairing KEYBOARD DISPLAY 1533 """ 1534 cmd = "Accept incoming pairing requests" 1535 try: 1536 input_capabilities = "NONE" 1537 output_capabilities = "NONE" 1538 options = line.split(" ") 1539 if len(options) > 1: 1540 input_capabilities = options[0] 1541 output_capabilities = options[1] 1542 result = self.pri_dut.bts_lib.acceptPairing( 1543 input_capabilities, output_capabilities) 1544 self.log.info(result) 1545 except Exception as err: 1546 self.log.error(FAILURE.format(cmd, err)) 1547 1548 def do_btc_forget_device(self, line): 1549 """ 1550 Description: Forget pairing of the current device under test. 1551 Current device under test is the device found by 1552 tool_refresh_unique_id from custom user param. This function 1553 will also perform a clean disconnect if actively connected. 1554 1555 Usage: 1556 Examples: 1557 btc_forget_device 1558 """ 1559 cmd = "For pairing of the current device under test." 1560 try: 1561 self.log.info("Forgetting device id: {}".format( 1562 self.unique_mac_addr_id)) 1563 result = self.pri_dut.bts_lib.forgetDevice(self.unique_mac_addr_id) 1564 self.log.info(result) 1565 except Exception as err: 1566 self.log.error(FAILURE.format(cmd, err)) 1567 1568 def do_btc_set_discoverable(self, discoverable): 1569 """ 1570 Description: Change Bluetooth Controller discoverablility. 1571 Input(s): 1572 discoverable: true to set discoverable 1573 false to set non-discoverable 1574 Usage: 1575 Examples: 1576 btc_set_discoverable true 1577 btc_set_discoverable false 1578 """ 1579 cmd = "Change Bluetooth Controller discoverablility." 1580 try: 1581 result = self.test_dut.set_discoverable( 1582 self.str_to_bool(discoverable)) 1583 self.log.info(result) 1584 except Exception as err: 1585 self.log.error(FAILURE.format(cmd, err)) 1586 1587 def do_btc_set_name(self, name): 1588 """ 1589 Description: Change Bluetooth Controller local name. 1590 Input(s): 1591 name: The name to set the Bluetooth Controller name to. 1592 1593 Usage: 1594 Examples: 1595 btc_set_name fs_test 1596 """ 1597 cmd = "Change Bluetooth Controller local name." 1598 try: 1599 result = self.test_dut.set_bluetooth_local_name(name) 1600 self.log.info(result) 1601 except Exception as err: 1602 self.log.error(FAILURE.format(cmd, err)) 1603 1604 def do_btc_request_discovery(self, discover): 1605 """ 1606 Description: Change whether the Bluetooth Controller is in active. 1607 discovery or not. 1608 Input(s): 1609 discover: true to start discovery 1610 false to end discovery 1611 Usage: 1612 Examples: 1613 btc_request_discovery true 1614 btc_request_discovery false 1615 """ 1616 cmd = "Change whether the Bluetooth Controller is in active." 1617 try: 1618 result = self.pri_dut.bts_lib.requestDiscovery( 1619 self.str_to_bool(discover)) 1620 self.log.info(result) 1621 except Exception as err: 1622 self.log.error(FAILURE.format(cmd, err)) 1623 1624 def do_btc_get_known_remote_devices(self, line): 1625 """ 1626 Description: Get a list of known devices. 1627 1628 Usage: 1629 Examples: 1630 btc_get_known_remote_devices 1631 """ 1632 cmd = "Get a list of known devices." 1633 self.bt_control_devices = [] 1634 try: 1635 device_list = self.pri_dut.bts_lib.getKnownRemoteDevices( 1636 )['result'] 1637 for id_dict in device_list: 1638 device = device_list[id_dict] 1639 self.bt_control_devices.append(device) 1640 self.log.info("Device found {}".format(device)) 1641 1642 except Exception as err: 1643 self.log.error(FAILURE.format(cmd, err)) 1644 1645 def do_btc_forget_all_known_devices(self, line): 1646 """ 1647 Description: Forget all known devices. 1648 1649 Usage: 1650 Examples: 1651 btc_forget_all_known_devices 1652 """ 1653 cmd = "Forget all known devices." 1654 try: 1655 device_list = self.pri_dut.bts_lib.getKnownRemoteDevices( 1656 )['result'] 1657 for device in device_list: 1658 d = device_list[device] 1659 if d['bonded'] or d['connected']: 1660 self.log.info("Unbonding deivce: {}".format(d)) 1661 self.log.info( 1662 self.pri_dut.bts_lib.forgetDevice(d['id'])['result']) 1663 except Exception as err: 1664 self.log.error(FAILURE.format(cmd, err)) 1665 1666 def do_btc_connect_device(self, line): 1667 """ 1668 Description: Connect to device under test. 1669 Device under test is specified by either user params 1670 or 1671 tool_set_target_device_name <name> 1672 do_tool_refresh_unique_id_using_bt_control 1673 1674 Usage: 1675 Examples: 1676 btc_connect_device 1677 """ 1678 cmd = "Connect to device under test." 1679 try: 1680 result = self.pri_dut.bts_lib.connectDevice( 1681 self.unique_mac_addr_id) 1682 self.log.info(result) 1683 except Exception as err: 1684 self.log.error(FAILURE.format(cmd, err)) 1685 1686 def complete_btc_connect_device_by_id(self, text, line, begidx, endidx): 1687 if not text: 1688 completions = list(self.bt_control_ids)[:] 1689 else: 1690 completions = [ 1691 s for s in self.bt_control_ids if s.startswith(text) 1692 ] 1693 return completions 1694 1695 def do_btc_connect_device_by_id(self, device_id): 1696 """ 1697 Description: Connect to device id based on pre-defined inputs. 1698 Supports Tab Autocomplete. 1699 Input(s): 1700 device_id: The device id to connect to. 1701 1702 Usage: 1703 Examples: 1704 btc_connect_device_by_id <device_id> 1705 """ 1706 cmd = "Connect to device id based on pre-defined inputs." 1707 try: 1708 result = self.pri_dut.bts_lib.connectDevice(device_id) 1709 self.log.info(result) 1710 except Exception as err: 1711 self.log.error(FAILURE.format(cmd, err)) 1712 1713 def complete_btc_connect_device_by_name(self, text, line, begidx, endidx): 1714 if not text: 1715 completions = list(self.bt_control_names)[:] 1716 else: 1717 completions = [ 1718 s for s in self.bt_control_names if s.startswith(text) 1719 ] 1720 return completions 1721 1722 def do_btc_connect_device_by_name(self, device_name): 1723 """ 1724 Description: Connect to device id based on pre-defined inputs. 1725 Supports Tab Autocomplete. 1726 Input(s): 1727 device_id: The device id to connect to. 1728 1729 Usage: 1730 Examples: 1731 btc_connect_device_by_name <device_id> 1732 """ 1733 cmd = "Connect to device name based on pre-defined inputs." 1734 try: 1735 for device in self.bt_control_devices: 1736 if device_name is device['name']: 1737 1738 result = self.pri_dut.bts_lib.connectDevice(device['id']) 1739 self.log.info(result) 1740 except Exception as err: 1741 self.log.error(FAILURE.format(cmd, err)) 1742 1743 def do_btc_disconnect_device(self, line): 1744 """ 1745 Description: Disconnect to device under test. 1746 Device under test is specified by either user params 1747 or 1748 tool_set_target_device_name <name> 1749 do_tool_refresh_unique_id_using_bt_control 1750 1751 Usage: 1752 Examples: 1753 btc_disconnect_device 1754 """ 1755 cmd = "Disconnect to device under test." 1756 try: 1757 result = self.pri_dut.bts_lib.disconnectDevice( 1758 self.unique_mac_addr_id) 1759 self.log.info(result) 1760 except Exception as err: 1761 self.log.error(FAILURE.format(cmd, err)) 1762 1763 def do_btc_init_bluetooth_control(self, line): 1764 """ 1765 Description: Initialize the Bluetooth Controller. 1766 1767 Usage: 1768 Examples: 1769 btc_init_bluetooth_control 1770 """ 1771 cmd = "Initialize the Bluetooth Controller." 1772 try: 1773 result = self.test_dut.initialize_bluetooth_controller() 1774 self.log.info(result) 1775 except Exception as err: 1776 self.log.error(FAILURE.format(cmd, err)) 1777 1778 def do_btc_get_local_address(self, line): 1779 """ 1780 Description: Get the local BR/EDR address of the Bluetooth Controller. 1781 1782 Usage: 1783 Examples: 1784 btc_get_local_address 1785 """ 1786 cmd = "Get the local BR/EDR address of the Bluetooth Controller." 1787 try: 1788 result = self.test_dut.get_local_bluetooth_address() 1789 self.log.info(result) 1790 except Exception as err: 1791 self.log.error(FAILURE.format(cmd, err)) 1792 1793 def do_btc_input_pairing_pin(self, line): 1794 """ 1795 Description: Sends a pairing pin to SL4F's Bluetooth Control's 1796 Pairing Delegate. 1797 1798 Usage: 1799 Examples: 1800 btc_input_pairing_pin 123456 1801 """ 1802 cmd = "Input pairing pin to the Fuchsia device." 1803 try: 1804 result = self.pri_dut.bts_lib.inputPairingPin(line)['result'] 1805 self.log.info(result) 1806 except Exception as err: 1807 self.log.error(FAILURE.format(cmd, err)) 1808 1809 def do_btc_get_pairing_pin(self, line): 1810 """ 1811 Description: Gets the pairing pin from SL4F's Bluetooth Control's 1812 Pairing Delegate. 1813 1814 Usage: 1815 Examples: 1816 btc_get_pairing_pin 1817 """ 1818 cmd = "Get the pairing pin from the Fuchsia device." 1819 try: 1820 result = self.pri_dut.bts_lib.getPairingPin()['result'] 1821 self.log.info(result) 1822 except Exception as err: 1823 self.log.error(FAILURE.format(cmd, err)) 1824 1825 """End Bluetooth Control wrappers""" 1826 """Begin Profile Server wrappers""" 1827 1828 def do_sdp_pts_example(self, num_of_records): 1829 """ 1830 Description: An example of how to setup a generic SDP record 1831 and SDP search capabilities. This example will pass a few 1832 SDP tests. 1833 1834 Input(s): 1835 num_of_records: The number of records to add. 1836 1837 Usage: 1838 Examples: 1839 sdp_pts_example 1 1840 sdp pts_example 10 1841 """ 1842 cmd = "Setup SDP for PTS testing." 1843 1844 attributes = [ 1845 bt_attribute_values['ATTR_PROTOCOL_DESCRIPTOR_LIST'], 1846 bt_attribute_values['ATTR_SERVICE_CLASS_ID_LIST'], 1847 bt_attribute_values['ATTR_BLUETOOTH_PROFILE_DESCRIPTOR_LIST'], 1848 bt_attribute_values['ATTR_A2DP_SUPPORTED_FEATURES'], 1849 ] 1850 1851 try: 1852 self.pri_dut.sdp_lib.addSearch( 1853 attributes, int(sig_uuid_constants['AudioSource'], 16)) 1854 self.pri_dut.sdp_lib.addSearch( 1855 attributes, int(sig_uuid_constants['A/V_RemoteControl'], 16)) 1856 self.pri_dut.sdp_lib.addSearch(attributes, 1857 int(sig_uuid_constants['PANU'], 16)) 1858 self.pri_dut.sdp_lib.addSearch( 1859 attributes, int(sig_uuid_constants['SerialPort'], 16)) 1860 self.pri_dut.sdp_lib.addSearch( 1861 attributes, int(sig_uuid_constants['DialupNetworking'], 16)) 1862 self.pri_dut.sdp_lib.addSearch( 1863 attributes, int(sig_uuid_constants['OBEXObjectPush'], 16)) 1864 self.pri_dut.sdp_lib.addSearch( 1865 attributes, int(sig_uuid_constants['OBEXFileTransfer'], 16)) 1866 self.pri_dut.sdp_lib.addSearch( 1867 attributes, int(sig_uuid_constants['Headset'], 16)) 1868 self.pri_dut.sdp_lib.addSearch( 1869 attributes, int(sig_uuid_constants['HandsfreeAudioGateway'], 1870 16)) 1871 self.pri_dut.sdp_lib.addSearch( 1872 attributes, int(sig_uuid_constants['Handsfree'], 16)) 1873 self.pri_dut.sdp_lib.addSearch( 1874 attributes, int(sig_uuid_constants['SIM_Access'], 16)) 1875 for i in range(int(num_of_records)): 1876 result = self.pri_dut.sdp_lib.addService( 1877 sdp_pts_record_list[i]) 1878 self.log.info(result) 1879 except Exception as err: 1880 self.log.error(FAILURE.format(cmd, err)) 1881 1882 def do_sdp_cleanup(self, line): 1883 """ 1884 Description: Cleanup any existing SDP records 1885 1886 Usage: 1887 Examples: 1888 sdp_cleanup 1889 """ 1890 cmd = "Cleanup SDP objects." 1891 try: 1892 result = self.pri_dut.sdp_lib.cleanUp() 1893 self.log.info(result) 1894 except Exception as err: 1895 self.log.error(FAILURE.format(cmd, err)) 1896 1897 def do_sdp_init(self, line): 1898 """ 1899 Description: Init the profile proxy for setting up SDP records 1900 1901 Usage: 1902 Examples: 1903 sdp_init 1904 """ 1905 cmd = "Initialize profile proxy objects for adding SDP records" 1906 try: 1907 result = self.pri_dut.sdp_lib.init() 1908 self.log.info(result) 1909 except Exception as err: 1910 self.log.error(FAILURE.format(cmd, err)) 1911 1912 def do_sdp_connect_l2cap(self, line): 1913 """ 1914 Description: Send an l2cap connection request over an input psm value. 1915 1916 Note: Must be already connected to a peer. 1917 1918 Input(s): 1919 psm: The int hex value to connect over. Available PSMs: 1920 SDP 0x0001 See Bluetooth Service Discovery Protocol (SDP) 1921 RFCOMM 0x0003 See RFCOMM with TS 07.10 1922 TCS-BIN 0x0005 See Bluetooth Telephony Control Specification / 1923 TCS Binary 1924 TCS-BIN-CORDLESS 0x0007 See Bluetooth Telephony Control 1925 Specification / TCS Binary 1926 BNEP 0x000F See Bluetooth Network Encapsulation Protocol 1927 HID_Control 0x0011 See Human Interface Device 1928 HID_Interrupt 0x0013 See Human Interface Device 1929 UPnP 0x0015 See [ESDP] 1930 AVCTP 0x0017 See Audio/Video Control Transport Protocol 1931 AVDTP 0x0019 See Audio/Video Distribution Transport Protocol 1932 AVCTP_Browsing 0x001B See Audio/Video Remote Control Profile 1933 UDI_C-Plane 0x001D See the Unrestricted Digital Information 1934 Profile [UDI] 1935 ATT 0x001F See Bluetooth Core Specification 1936 3DSP 0x0021 See 3D Synchronization Profile. 1937 LE_PSM_IPSP 0x0023 See Internet Protocol Support Profile 1938 (IPSP) 1939 OTS 0x0025 See Object Transfer Service (OTS) 1940 EATT 0x0027 See Bluetooth Core Specification 1941 mode: String - The channel mode to connect to. Available values: 1942 Basic mode: BASIC 1943 Enhanced Retransmission mode: ERTM 1944 1945 Usage: 1946 Examples: 1947 sdp_connect_l2cap 0001 BASIC 1948 sdp_connect_l2cap 0019 ERTM 1949 """ 1950 cmd = "Connect l2cap" 1951 try: 1952 info = line.split() 1953 result = self.pri_dut.sdp_lib.connectL2cap(self.unique_mac_addr_id, 1954 int(info[0], 16), 1955 info[1]) 1956 self.log.info(result) 1957 except Exception as err: 1958 self.log.error(FAILURE.format(cmd, err)) 1959 1960 """End Profile Server wrappers""" 1961 """Begin AVDTP wrappers""" 1962 1963 def do_avdtp_init(self, initiator_delay): 1964 """ 1965 Description: Init the A2DP component start and AVDTP service to 1966 initiate. 1967 1968 Input(s): 1969 initiator_delay: [Optional] The stream initiator delay to set in 1970 milliseconds. 1971 1972 Usage: 1973 Examples: 1974 avdtp_init 0 1975 avdtp_init 2000 1976 avdtp_init 1977 """ 1978 cmd = "Initialize AVDTP proxy" 1979 try: 1980 if not initiator_delay: 1981 initiator_delay = None 1982 result = self.pri_dut.avdtp_lib.init(initiator_delay) 1983 self.log.info(result) 1984 except Exception as err: 1985 self.log.error(FAILURE.format(cmd, err)) 1986 1987 def do_avdtp_kill_a2dp(self, line): 1988 """ 1989 Description: Quickly kill any A2DP components. 1990 1991 Usage: 1992 Examples: 1993 avdtp_kill_a2dp 1994 """ 1995 cmd = "Kill A2DP service" 1996 try: 1997 result = self.pri_dut.control_daemon("bt-a2dp.cmx", "stop") 1998 self.log.info(result) 1999 except Exception as err: 2000 self.log.error(FAILURE.format(cmd, err)) 2001 2002 def do_avdtp_get_connected_peers(self, line): 2003 """ 2004 Description: Get the connected peers for the AVDTP service 2005 2006 Usage: 2007 Examples: 2008 avdtp_get_connected_peers 2009 """ 2010 cmd = "AVDTP get connected peers" 2011 try: 2012 result = self.pri_dut.avdtp_lib.getConnectedPeers() 2013 self.log.info(result) 2014 except Exception as err: 2015 self.log.error(FAILURE.format(cmd, err)) 2016 2017 def do_avdtp_set_configuration(self, peer_id): 2018 """ 2019 Description: Send AVDTP command to connected peer: set configuration 2020 2021 Input(s): 2022 peer_id: The specified peer_id. 2023 2024 Usage: 2025 Examples: 2026 avdtp_set_configuration <peer_id> 2027 """ 2028 cmd = "Send AVDTP set configuration to connected peer" 2029 try: 2030 result = self.pri_dut.avdtp_lib.setConfiguration(int(peer_id)) 2031 self.log.info(result) 2032 except Exception as err: 2033 self.log.error(FAILURE.format(cmd, err)) 2034 2035 def do_avdtp_get_configuration(self, peer_id): 2036 """ 2037 Description: Send AVDTP command to connected peer: get configuration 2038 2039 Input(s): 2040 peer_id: The specified peer_id. 2041 2042 Usage: 2043 Examples: 2044 avdtp_get_configuration <peer_id> 2045 """ 2046 cmd = "Send AVDTP get configuration to connected peer" 2047 try: 2048 result = self.pri_dut.avdtp_lib.getConfiguration(int(peer_id)) 2049 self.log.info(result) 2050 except Exception as err: 2051 self.log.error(FAILURE.format(cmd, err)) 2052 2053 def do_avdtp_get_capabilities(self, peer_id): 2054 """ 2055 Description: Send AVDTP command to connected peer: get capabilities 2056 2057 Input(s): 2058 peer_id: The specified peer_id. 2059 2060 Usage: 2061 Examples: 2062 avdtp_get_capabilities <peer_id> 2063 """ 2064 cmd = "Send AVDTP get capabilities to connected peer" 2065 try: 2066 result = self.pri_dut.avdtp_lib.getCapabilities(int(peer_id)) 2067 self.log.info(result) 2068 except Exception as err: 2069 self.log.error(FAILURE.format(cmd, err)) 2070 2071 def do_avdtp_get_all_capabilities(self, peer_id): 2072 """ 2073 Description: Send AVDTP command to connected peer: get all capabilities 2074 2075 Input(s): 2076 peer_id: The specified peer_id. 2077 2078 Usage: 2079 Examples: 2080 avdtp_get_all_capabilities <peer_id> 2081 """ 2082 cmd = "Send AVDTP get all capabilities to connected peer" 2083 try: 2084 result = self.pri_dut.avdtp_lib.getAllCapabilities(int(peer_id)) 2085 self.log.info(result) 2086 except Exception as err: 2087 self.log.error(FAILURE.format(cmd, err)) 2088 2089 def do_avdtp_reconfigure_stream(self, peer_id): 2090 """ 2091 Description: Send AVDTP command to connected peer: reconfigure stream 2092 2093 Input(s): 2094 peer_id: The specified peer_id. 2095 2096 Usage: 2097 Examples: 2098 avdtp_reconfigure_stream <peer_id> 2099 """ 2100 cmd = "Send AVDTP reconfigure stream to connected peer" 2101 try: 2102 result = self.pri_dut.avdtp_lib.reconfigureStream(int(peer_id)) 2103 self.log.info(result) 2104 except Exception as err: 2105 self.log.error(FAILURE.format(cmd, err)) 2106 2107 def do_avdtp_suspend_stream(self, peer_id): 2108 """ 2109 Description: Send AVDTP command to connected peer: suspend stream 2110 2111 Input(s): 2112 peer_id: The specified peer_id. 2113 2114 Usage: 2115 Examples: 2116 avdtp_suspend_stream <peer_id> 2117 """ 2118 cmd = "Send AVDTP suspend stream to connected peer" 2119 try: 2120 result = self.pri_dut.avdtp_lib.suspendStream(int(peer_id)) 2121 self.log.info(result) 2122 except Exception as err: 2123 self.log.error(FAILURE.format(cmd, err)) 2124 2125 def do_avdtp_suspend_reconfigure(self, peer_id): 2126 """ 2127 Description: Send AVDTP command to connected peer: suspend reconfigure 2128 2129 Input(s): 2130 peer_id: The specified peer_id. 2131 2132 Usage: 2133 Examples: 2134 avdtp_suspend_reconfigure <peer_id> 2135 """ 2136 cmd = "Send AVDTP suspend reconfigure to connected peer" 2137 try: 2138 result = self.pri_dut.avdtp_lib.suspendAndReconfigure(int(peer_id)) 2139 self.log.info(result) 2140 except Exception as err: 2141 self.log.error(FAILURE.format(cmd, err)) 2142 2143 def do_avdtp_release_stream(self, peer_id): 2144 """ 2145 Description: Send AVDTP command to connected peer: release stream 2146 2147 Input(s): 2148 peer_id: The specified peer_id. 2149 2150 Usage: 2151 Examples: 2152 avdtp_release_stream <peer_id> 2153 """ 2154 cmd = "Send AVDTP release stream to connected peer" 2155 try: 2156 result = self.pri_dut.avdtp_lib.releaseStream(int(peer_id)) 2157 self.log.info(result) 2158 except Exception as err: 2159 self.log.error(FAILURE.format(cmd, err)) 2160 2161 def do_avdtp_establish_stream(self, peer_id): 2162 """ 2163 Description: Send AVDTP command to connected peer: establish stream 2164 2165 Input(s): 2166 peer_id: The specified peer_id. 2167 2168 Usage: 2169 Examples: 2170 avdtp_establish_stream <peer_id> 2171 """ 2172 cmd = "Send AVDTP establish stream to connected peer" 2173 try: 2174 result = self.pri_dut.avdtp_lib.establishStream(int(peer_id)) 2175 self.log.info(result) 2176 except Exception as err: 2177 self.log.error(FAILURE.format(cmd, err)) 2178 2179 def do_avdtp_start_stream(self, peer_id): 2180 """ 2181 Description: Send AVDTP command to connected peer: start stream 2182 2183 Input(s): 2184 peer_id: The specified peer_id. 2185 2186 Usage: 2187 Examples: 2188 avdtp_start_stream <peer_id> 2189 """ 2190 cmd = "Send AVDTP start stream to connected peer" 2191 try: 2192 result = self.pri_dut.avdtp_lib.startStream(int(peer_id)) 2193 self.log.info(result) 2194 except Exception as err: 2195 self.log.error(FAILURE.format(cmd, err)) 2196 2197 def do_avdtp_abort_stream(self, peer_id): 2198 """ 2199 Description: Send AVDTP command to connected peer: abort stream 2200 2201 Input(s): 2202 peer_id: The specified peer_id. 2203 2204 Usage: 2205 Examples: 2206 avdtp_abort_stream <peer_id> 2207 """ 2208 cmd = "Send AVDTP abort stream to connected peer" 2209 try: 2210 result = self.pri_dut.avdtp_lib.abortStream(int(peer_id)) 2211 self.log.info(result) 2212 except Exception as err: 2213 self.log.error(FAILURE.format(cmd, err)) 2214 2215 def do_avdtp_remove_service(self, line): 2216 """ 2217 Description: Removes the AVDTP service in use. 2218 2219 Usage: 2220 Examples: 2221 avdtp_establish_stream <peer_id> 2222 """ 2223 cmd = "Remove AVDTP service" 2224 try: 2225 result = self.pri_dut.avdtp_lib.removeService() 2226 self.log.info(result) 2227 except Exception as err: 2228 self.log.error(FAILURE.format(cmd, err)) 2229 2230 """End AVDTP wrappers""" 2231 """Begin Audio wrappers""" 2232 2233 def do_audio_start_output_save(self, line): 2234 """ 2235 Description: Start audio output save 2236 2237 Usage: 2238 Examples: 2239 audio_start_output_save 2240 """ 2241 cmd = "Start audio capture" 2242 try: 2243 result = self.pri_dut.audio_lib.startOutputSave() 2244 self.log.info(result) 2245 except Exception as err: 2246 self.log.error(FAILURE.format(cmd, err)) 2247 2248 def do_audio_stop_output_save(self, line): 2249 """ 2250 Description: Stop audio output save 2251 2252 Usage: 2253 Examples: 2254 audio_stop_output_save 2255 """ 2256 cmd = "Stop audio capture" 2257 try: 2258 result = self.pri_dut.audio_lib.stopOutputSave() 2259 self.log.info(result) 2260 except Exception as err: 2261 self.log.error(FAILURE.format(cmd, err)) 2262 2263 def do_audio_get_output_audio(self, line): 2264 """ 2265 Description: Get the audio output saved to a local file 2266 2267 Usage: 2268 Examples: 2269 audio_get_output_audio 2270 """ 2271 cmd = "Get audio capture" 2272 try: 2273 save_path = "{}/{}".format(self.pri_dut.log_path, "audio.raw") 2274 result = self.pri_dut.audio_lib.getOutputAudio(save_path) 2275 except Exception as err: 2276 self.log.error(FAILURE.format(cmd, err)) 2277 2278 """End Audio wrappers""" 2279 """Begin HFP wrappers""" 2280 2281 def do_hfp_init(self, line): 2282 """ 2283 Description: Init the HFP component initiate. 2284 2285 Usage: 2286 Examples: 2287 hfp_init 2288 """ 2289 cmd = "Initialize HFP proxy" 2290 try: 2291 result = self.pri_dut.hfp_lib.init() 2292 self.log.info(result) 2293 except Exception as err: 2294 self.log.error(FAILURE.format(cmd, err)) 2295 2296 def do_hfp_remove_service(self, line): 2297 """ 2298 Description: Removes the HFP service in use. 2299 2300 Usage: 2301 Examples: 2302 hfp_remove_service 2303 """ 2304 cmd = "Remove HFP service" 2305 try: 2306 result = self.pri_dut.hfp_lib.removeService() 2307 self.log.info(result) 2308 except Exception as err: 2309 self.log.error(FAILURE.format(cmd, err)) 2310 2311 def do_hfp_list_peers(self, line): 2312 """ 2313 Description: List all HFP Hands-Free peers connected to the DUT. 2314 2315 Input(s): 2316 2317 Usage: 2318 Examples: 2319 hfp_list_peers 2320 """ 2321 cmd = "Lists connected peers" 2322 try: 2323 result = self.pri_dut.hfp_lib.listPeers() 2324 self.log.info(result) 2325 except Exception as err: 2326 self.log.error(FAILURE.format(cmd, err)) 2327 2328 def do_hfp_set_active_peer(self, line): 2329 """ 2330 Description: Set the active HFP Hands-Free peer for the DUT. 2331 2332 Input(s): 2333 peer_id: The id of the peer to be set active. 2334 2335 Usage: 2336 Examples: 2337 hfp_set_active_peer <peer_id> 2338 """ 2339 cmd = "Set the active peer" 2340 try: 2341 peer_id = int(line.strip()) 2342 result = self.pri_dut.hfp_lib.setActivePeer(peer_id) 2343 self.log.info(result) 2344 except Exception as err: 2345 self.log.error(FAILURE.format(cmd, err)) 2346 2347 def do_hfp_list_calls(self, line): 2348 """ 2349 Description: List all calls known to the sl4f component on the DUT. 2350 2351 Input(s): 2352 2353 Usage: 2354 Examples: 2355 hfp_list_calls 2356 """ 2357 cmd = "Lists all calls" 2358 try: 2359 result = self.pri_dut.hfp_lib.listCalls() 2360 self.log.info(result) 2361 except Exception as err: 2362 self.log.error(FAILURE.format(cmd, err)) 2363 2364 def do_hfp_new_call(self, line): 2365 """ 2366 Description: Simulate a call on the call manager 2367 2368 Input(s): 2369 remote: The number of the remote party on the simulated call 2370 state: The state of the call. Must be one of "ringing", "waiting", 2371 "dialing", "alerting", "active", "held". 2372 2373 Usage: 2374 Examples: 2375 hfp_new_call <remote> <state> 2376 hfp_new_call 14085555555 active 2377 hfp_new_call 14085555555 held 2378 hfp_new_call 14085555555 ringing 2379 hfp_new_call 14085555555 alerting 2380 hfp_new_call 14085555555 dialing 2381 """ 2382 cmd = "Simulates a call" 2383 try: 2384 info = line.strip().split() 2385 if len(info) != 2: 2386 raise ValueError("Exactly two command line arguments required: <remote> <state>") 2387 remote, state = info[0], info[1] 2388 result = self.pri_dut.hfp_lib.newCall(remote, state) 2389 self.log.info(result) 2390 except Exception as err: 2391 self.log.error(FAILURE.format(cmd, err)) 2392 2393 def do_hfp_incoming_call(self, line): 2394 """ 2395 Description: Simulate an incoming call on the call manager 2396 2397 Input(s): 2398 remote: The number of the remote party on the incoming call 2399 2400 Usage: 2401 Examples: 2402 hfp_incoming_call <remote> 2403 hfp_incoming_call 14085555555 2404 """ 2405 cmd = "Simulates an incoming call" 2406 try: 2407 remote = line.strip() 2408 result = self.pri_dut.hfp_lib.initiateIncomingCall(remote) 2409 self.log.info(result) 2410 except Exception as err: 2411 self.log.error(FAILURE.format(cmd, err)) 2412 2413 def do_hfp_outgoing_call(self, line): 2414 """ 2415 Description: Simulate an outgoing call on the call manager 2416 2417 Input(s): 2418 remote: The number of the remote party on the outgoing call 2419 2420 Usage: 2421 Examples: 2422 hfp_outgoing_call <remote> 2423 """ 2424 cmd = "Simulates an outgoing call" 2425 try: 2426 remote = line.strip() 2427 result = self.pri_dut.hfp_lib.initiateOutgoingCall(remote) 2428 self.log.info(result) 2429 except Exception as err: 2430 self.log.error(FAILURE.format(cmd, err)) 2431 2432 def do_hfp_set_call_active(self, line): 2433 """ 2434 Description: Set the specified call to the "OngoingActive" state. 2435 2436 Input(s): 2437 call_id: The unique id of the call. 2438 2439 Usage: 2440 Examples: 2441 hfp_outgoing_call <call_id> 2442 """ 2443 cmd = "Set the specified call to active" 2444 try: 2445 call_id = int(line.strip()) 2446 result = self.pri_dut.hfp_lib.setCallActive(call_id) 2447 self.log.info(result) 2448 except Exception as err: 2449 self.log.error(FAILURE.format(cmd, err)) 2450 2451 def do_hfp_set_call_held(self, line): 2452 """ 2453 Description: Set the specified call to the "OngoingHeld" state. 2454 2455 Input(s): 2456 call_id: The unique id of the call. 2457 2458 Usage: 2459 Examples: 2460 hfp_outgoing_call <call_id> 2461 """ 2462 cmd = "Set the specified call to held" 2463 try: 2464 call_id = int(line.strip()) 2465 result = self.pri_dut.hfp_lib.setCallHeld(call_id) 2466 self.log.info(result) 2467 except Exception as err: 2468 self.log.error(FAILURE.format(cmd, err)) 2469 2470 def do_hfp_set_call_terminated(self, line): 2471 """ 2472 Description: Set the specified call to the "Terminated" state. 2473 2474 Input(s): 2475 call_id: The unique id of the call. 2476 2477 Usage: 2478 Examples: 2479 hfp_outgoing_call <call_id> 2480 """ 2481 cmd = "Set the specified call to terminated" 2482 try: 2483 call_id = int(line.strip()) 2484 result = self.pri_dut.hfp_lib.setCallTerminated(call_id) 2485 self.log.info(result) 2486 except Exception as err: 2487 self.log.error(FAILURE.format(cmd, err)) 2488 2489 def do_hfp_set_call_transferred_to_ag(self, line): 2490 """ 2491 Description: Set the specified call to the "TransferredToAg" state. 2492 2493 Input(s): 2494 call_id: The unique id of the call. 2495 2496 Usage: 2497 Examples: 2498 hfp_outgoing_call <call_id> 2499 """ 2500 cmd = "Set the specified call to TransferredToAg" 2501 try: 2502 call_id = int(line.strip()) 2503 result = self.pri_dut.hfp_lib.setCallTransferredToAg(call_id) 2504 self.log.info(result) 2505 except Exception as err: 2506 self.log.error(FAILURE.format(cmd, err)) 2507 2508 def do_hfp_set_speaker_gain(self, line): 2509 """ 2510 Description: Set the active peer's speaker gain. 2511 2512 Input(s): 2513 value: The gain value to set. Must be between 0-15 inclusive. 2514 2515 Usage: 2516 Examples: 2517 hfp_set_speaker_gain <value> 2518 """ 2519 cmd = "Set the active peer's speaker gain" 2520 try: 2521 value = int(line.strip()) 2522 result = self.pri_dut.hfp_lib.setSpeakerGain(value) 2523 self.log.info(result) 2524 except Exception as err: 2525 self.log.error(FAILURE.format(cmd, err)) 2526 2527 def do_hfp_set_microphone_gain(self, line): 2528 """ 2529 Description: Set the active peer's microphone gain. 2530 2531 Input(s): 2532 value: The gain value to set. Must be between 0-15 inclusive. 2533 2534 Usage: 2535 Examples: 2536 hfp_set_microphone_gain <value> 2537 """ 2538 cmd = "Set the active peer's microphone gain" 2539 try: 2540 value = int(line.strip()) 2541 result = self.pri_dut.hfp_lib.setMicrophoneGain(value) 2542 self.log.info(result) 2543 except Exception as err: 2544 self.log.error(FAILURE.format(cmd, err)) 2545 2546 def do_hfp_set_service_available(self, line): 2547 """ 2548 Description: Sets the simulated network service status reported by the call manager. 2549 2550 Input(s): 2551 value: "true" to set the network connection to available. 2552 2553 Usage: 2554 Examples: 2555 hfp_set_service_available <value> 2556 hfp_set_service_available true 2557 hfp_set_service_available false 2558 """ 2559 cmd = "Sets the simulated network service status reported by the call manager" 2560 try: 2561 value = line.strip() == "true" 2562 result = self.pri_dut.hfp_lib.setServiceAvailable(value) 2563 self.log.info(result) 2564 except Exception as err: 2565 self.log.error(FAILURE.format(cmd, err)) 2566 2567 def do_hfp_set_roaming(self, line): 2568 """ 2569 Description: Sets the simulated roaming status reported by the call manager. 2570 2571 Input(s): 2572 value: "true" to set the network connection to roaming. 2573 2574 Usage: 2575 Examples: 2576 hfp_set_roaming <value> 2577 hfp_set_roaming true 2578 hfp_set_roaming false 2579 """ 2580 cmd = "Sets the simulated roaming status reported by the call manager" 2581 try: 2582 value = line.strip() == "true" 2583 result = self.pri_dut.hfp_lib.setRoaming(value) 2584 self.log.info(result) 2585 except Exception as err: 2586 self.log.error(FAILURE.format(cmd, err)) 2587 2588 def do_hfp_set_signal_strength(self, line): 2589 """ 2590 Description: Sets the simulated signal strength reported by the call manager. 2591 2592 Input(s): 2593 value: The signal strength value to set. Must be between 0-5 inclusive. 2594 2595 Usage: 2596 Examples: 2597 hfp_set_signal_strength <value> 2598 hfp_set_signal_strength 0 2599 hfp_set_signal_strength 3 2600 hfp_set_signal_strength 5 2601 """ 2602 cmd = "Sets the simulated signal strength reported by the call manager" 2603 try: 2604 value = int(line.strip()) 2605 result = self.pri_dut.hfp_lib.setSignalStrength(value) 2606 self.log.info(result) 2607 except Exception as err: 2608 self.log.error(FAILURE.format(cmd, err)) 2609 2610 def do_hfp_set_subscriber_number(self, line): 2611 """ 2612 Description: Sets the subscriber number reported by the call manager. 2613 2614 Input(s): 2615 value: The subscriber number to set. Maximum length 128 characters. 2616 2617 Usage: 2618 Examples: 2619 hfp_set_subscriber_number <value> 2620 hfp_set_subscriber_number 14085555555 2621 """ 2622 cmd = "Sets the subscriber number reported by the call manager" 2623 try: 2624 value = line.strip() 2625 result = self.pri_dut.hfp_lib.setSubscriberNumber(value) 2626 self.log.info(result) 2627 except Exception as err: 2628 self.log.error(FAILURE.format(cmd, err)) 2629 2630 def do_hfp_set_operator(self, line): 2631 """ 2632 Description: Sets the operator value reported by the call manager. 2633 2634 Input(s): 2635 value: The operator value to set. Maximum length 16 characters. 2636 2637 Usage: 2638 Examples: 2639 hfp_set_operator <value> 2640 hfp_set_operator GoogleFi 2641 """ 2642 cmd = "Sets the operator value reported by the call manager" 2643 try: 2644 value = line.strip() 2645 result = self.pri_dut.hfp_lib.setOperator(value) 2646 self.log.info(result) 2647 except Exception as err: 2648 self.log.error(FAILURE.format(cmd, err)) 2649 2650 def do_hfp_set_nrec_support(self, line): 2651 """ 2652 Description: Sets the noise reduction/echo cancelation support reported by the call manager. 2653 2654 Input(s): 2655 value: The nrec support bool. 2656 2657 Usage: 2658 Examples: 2659 hfp_set_nrec_support <value> 2660 hfp_set_nrec_support true 2661 hfp_set_nrec_support false 2662 """ 2663 cmd = "Sets the noise reduction/echo cancelation support reported by the call manager" 2664 try: 2665 value = line.strip() == "true" 2666 result = self.pri_dut.hfp_lib.setNrecSupport(value) 2667 self.log.info(result) 2668 except Exception as err: 2669 self.log.error(FAILURE.format(cmd, err)) 2670 2671 def do_hfp_set_battery_level(self, line): 2672 """ 2673 Description: Sets the battery level reported by the call manager. 2674 2675 Input(s): 2676 value: The integer battery level value. Must be 0-5 inclusive. 2677 2678 Usage: 2679 Examples: 2680 hfp_set_battery_level <value> 2681 hfp_set_battery_level 0 2682 hfp_set_battery_level 3 2683 """ 2684 cmd = "Set the battery level reported by the call manager" 2685 try: 2686 value = int(line.strip()) 2687 result = self.pri_dut.hfp_lib.setBatteryLevel(value) 2688 self.log.info(result) 2689 except Exception as err: 2690 self.log.error(FAILURE.format(cmd, err)) 2691 2692 def do_hfp_set_last_dialed(self, line): 2693 """ 2694 Description: Sets the last dialed number in the call manager. 2695 2696 Input(s): 2697 number: The number of the remote party. 2698 2699 Usage: 2700 Examples: 2701 hfp_set_last_dialed <number> 2702 hfp_set_last_dialed 14085555555 2703 """ 2704 cmd = "Sets the last dialed number in the call manager." 2705 try: 2706 number = line.strip() 2707 result = self.pri_dut.hfp_lib.setLastDialed(number) 2708 self.log.info(result) 2709 except Exception as err: 2710 self.log.error(FAILURE.format(cmd, err)) 2711 2712 def do_hfp_clear_last_dialed(self, line): 2713 """ 2714 Description: Clears the last dialed number in the call manager. 2715 2716 Usage: 2717 Examples: 2718 hfp_clear_last_dialed 2719 """ 2720 cmd = "Clears the last dialed number in the call manager." 2721 try: 2722 result = self.pri_dut.hfp_lib.clearLastDialed() 2723 self.log.info(result) 2724 except Exception as err: 2725 self.log.error(FAILURE.format(cmd, err)) 2726 2727 def do_hfp_set_memory_location(self, line): 2728 """ 2729 Description: Sets a memory location to point to a remote number. 2730 2731 Input(s): 2732 location: The memory location at which to store the number. 2733 number: The number of the remote party to be stored. 2734 2735 Usage: 2736 Examples: 2737 hfp_set_memory_location <location> <number> 2738 hfp_set_memory_location 0 14085555555 2739 """ 2740 cmd = "Sets a memory location to point to a remote number." 2741 try: 2742 info = line.strip().split() 2743 if len(info) != 2: 2744 raise ValueError("Exactly two command line arguments required: <location> <number>") 2745 location, number = info[0], info[1] 2746 result = self.pri_dut.hfp_lib.setMemoryLocation(location, number) 2747 self.log.info(result) 2748 except Exception as err: 2749 self.log.error(FAILURE.format(cmd, err)) 2750 2751 def do_hfp_clear_memory_location(self, line): 2752 """ 2753 Description: Sets a memory location to point to a remote number. 2754 2755 Input(s): 2756 localtion: The memory location to clear. 2757 2758 Usage: 2759 Examples: 2760 hfp_clear_memory_location <location> 2761 hfp_clear_memory_location 0 2762 """ 2763 cmd = "Sets a memory location to point to a remote number." 2764 try: 2765 location = line.strip() 2766 result = self.pri_dut.hfp_lib.clearMemoryLocation(location) 2767 self.log.info(result) 2768 except Exception as err: 2769 self.log.error(FAILURE.format(cmd, err)) 2770 2771 def do_hfp_set_dial_result(self, line): 2772 """ 2773 Description: Sets the status result to be returned when the number is dialed. 2774 2775 Input(s): 2776 number: The number of the remote party. 2777 status: The status to be returned when an outgoing call is initiated to the number. 2778 2779 Usage: 2780 Examples: 2781 hfp_set_battery_level <value> 2782 """ 2783 cmd = "Sets the status result to be returned when the number is dialed." 2784 try: 2785 info = line.strip().split() 2786 if len(info) != 2: 2787 raise ValueError("Exactly two command line arguments required: <number> <status>") 2788 number, status = info[0], int(info[1]) 2789 result = self.pri_dut.hfp_lib.setDialResult(number, status) 2790 self.log.info(result) 2791 except Exception as err: 2792 self.log.error(FAILURE.format(cmd, err)) 2793 2794 def do_hfp_get_state(self, line): 2795 """ 2796 Description: Get the call manager's complete state 2797 2798 Usage: 2799 Examples: 2800 hfp_get_state 2801 """ 2802 cmd = "Get the call manager's state" 2803 try: 2804 result = self.pri_dut.hfp_lib.getState() 2805 self.log.info(result) 2806 except Exception as err: 2807 self.log.error(FAILURE.format(cmd, err)) 2808 """End HFP wrappers""" 2809