1#!/usr/bin/env python3 2# 3# Copyright (C) 2019 The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may not 6# use this file except in compliance with the License. You may obtain a copy of 7# the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations under 15# the License. 16""" 17Python script for wrappers to various libraries. 18 19Class CmdInput inherts from the cmd library. 20 21Functions that start with "do_" have a method 22signature that doesn't match the actual command 23line command and that is intended. This is so the 24"help" command knows what to display (in this case 25the documentation of the command itself). 26 27For example: 28Looking at the function "do_tool_set_target_device_name" 29has the inputs self and line which is expected of this type 30of method signature. When the "help" command is done on the 31method name you get the function documentation as such: 32 33(Cmd) help tool_set_target_device_name 34 35 Description: Reset the target device name. 36 Input(s): 37 device_name: Required. The advertising name to connect to. 38 Usage: tool_set_target_device_name new_target_device name 39 Examples: 40 tool_set_target_device_name le_watch 41 42This is all to say this documentation pattern is expected. 43 44""" 45 46import acts.test_utils.bt.gatt_test_database as gatt_test_database 47 48import cmd 49import pprint 50import time 51"""Various Global Strings""" 52CMD_LOG = "CMD {} result: {}" 53FAILURE = "CMD {} threw exception: {}" 54BASIC_ADV_NAME = "fs_test" 55 56 57class CmdInput(cmd.Cmd): 58 ble_advertise_interval = 1000 59 target_device_name = "" 60 le_ids = [] 61 unique_mac_addr_id = None 62 63 def setup_vars(self, fuchsia_devices, target_device_name, log): 64 self.pri_dut = fuchsia_devices[0] 65 if len(fuchsia_devices) > 1: 66 self.sec_dut = fuchsia_devices[1] 67 self.target_device_name = target_device_name 68 self.log = log 69 70 def emptyline(self): 71 pass 72 73 def do_EOF(self, line): 74 "End Script" 75 return True 76 77 """ Useful Helper functions and cmd line tooling """ 78 79 def _find_unique_id(self): 80 scan_time_ms = 100000 81 scan_filter = {"name_substring": self.target_device_name} 82 scan_count = 1 83 self.unique_mac_addr_id = None 84 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 85 for i in range(100): 86 time.sleep(.5) 87 scan_res = self.pri_dut.gattc_lib.bleGetDiscoveredDevices()[ 88 'result'] 89 for device in scan_res: 90 name, did, connectable = device["name"], device["id"], device[ 91 "connectable"] 92 if (name): 93 self.log.info( 94 "Discovered device with name, id: {}, {}".format( 95 name, did)) 96 if (self.target_device_name in name): 97 self.unique_mac_addr_id = did 98 self.log.info( 99 "Successfully found device: name, id: {}, {}".format( 100 name, did)) 101 break 102 if self.unique_mac_addr_id: 103 break 104 self.pri_dut.gattc_lib.bleStopBleScan() 105 106 def do_tool_refesh_unique_id(self, line): 107 """ 108 Description: Refresh command line tool mac unique id. 109 Usage: 110 Examples: 111 tool_refresh_unique_id 112 """ 113 try: 114 self._find_unique_id() 115 except Exception as err: 116 self.log.error( 117 "Failed to scan or find scan result: {}".format(err)) 118 119 def do_tool_set_target_device_name(self, line): 120 """ 121 Description: Reset the target device name. 122 Input(s): 123 device_name: Required. The advertising name to connect to. 124 Usage: tool_set_target_device_name new_target_device name 125 Examples: 126 tool_set_target_device_name le_watch 127 """ 128 self.log.info("Setting target_device_name to: {}".format(line)) 129 self.target_device_name = line 130 131 """Begin BLE advertise wrappers""" 132 133 def do_ble_start_generic_connectable_advertisement(self, line): 134 """ 135 Description: Start a connectable LE advertisement 136 Usage: ble_start_generic_connectable_advertisement 137 """ 138 cmd = "Start a connectable LE advertisement" 139 try: 140 adv_data = {"name": BASIC_ADV_NAME} 141 self.pri_dut.ble_lib.bleStartBleAdvertising( 142 adv_data, self.ble_advertise_interval) 143 except Exception as err: 144 self.log.error(FAILURE.format(cmd, err)) 145 146 def do_ble_start_generic_nonconnectable_advertisement(self, line): 147 """ 148 Description: Start a non-connectable LE advertisement 149 Usage: ble_start_generic_nonconnectable_advertisement 150 """ 151 cmd = "Start a nonconnectable LE advertisement" 152 try: 153 adv_data = {"name": BASIC_ADV_NAME} 154 self.pri_dut.ble_lib.bleStartBleAdvertising( 155 adv_data, self.ble_advertise_interval, False) 156 except Exception as err: 157 self.log.error(FAILURE.format(cmd, err)) 158 159 def do_ble_stop_advertisement(self, line): 160 """ 161 Description: Stop a BLE advertisement. 162 Usage: ble_stop_advertisement 163 """ 164 cmd = "Stop a connectable LE advertisement" 165 try: 166 self.pri_dut.ble_lib.bleStopBleAdvertising() 167 except Exception as err: 168 self.log.error(FAILURE.format(cmd, err)) 169 170 """End BLE advertise wrappers""" 171 """Begin GATT client wrappers""" 172 173 def complete_gattc_connect_by_id(self, text, line, begidx, endidx): 174 if not text: 175 completions = list(self.le_ids)[:] 176 else: 177 completions = [s for s in self.le_ids if s.startswith(text)] 178 return completions 179 180 def do_gattc_connect_by_id(self, line): 181 """ 182 Description: Connect to a LE peripheral. 183 Input(s): 184 device_id: Required. The unique device ID from Fuchsia 185 discovered devices. 186 Usage: 187 Examples: 188 gattc_connect device_id 189 """ 190 cmd = "Connect to a LE peripheral by input ID." 191 try: 192 193 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 194 line) 195 self.log.info("Connection status: {}".format( 196 pprint.pformat(connection_status))) 197 except Exception as err: 198 self.log.error(FAILURE.format(cmd, err)) 199 200 def do_gattc_connect(self, line): 201 """ 202 Description: Connect to a LE peripheral. 203 Optional input: device_name 204 Input(s): 205 device_name: Optional. The peripheral ID to connect to. 206 Usage: 207 Examples: 208 gattc_connect 209 gattc_connect eddystone_123 210 """ 211 cmd = "Connect to a LE peripheral." 212 try: 213 if len(line) > 0: 214 self.target_device_name = line 215 self.unique_mac_addr_id = None 216 if not self.unique_mac_addr_id: 217 try: 218 self._find_unique_id() 219 except Exception as err: 220 self.log.info("Failed to scan or find device.") 221 return 222 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 223 self.unique_mac_addr_id) 224 self.log.info("Connection status: {}".format( 225 pprint.pformat(connection_status))) 226 except Exception as err: 227 self.log.error(FAILURE.format(cmd, err)) 228 229 def do_gattc_connect_disconnect_iterations(self, line): 230 """ 231 Description: Connect then disconnect to a LE peripheral multiple times. 232 Input(s): 233 iterations: Required. The number of iterations to run. 234 Usage: 235 Examples: 236 gattc_connect_disconnect_iterations 10 237 """ 238 cmd = "Connect to a LE peripheral." 239 try: 240 if not self.unique_mac_addr_id: 241 try: 242 self._find_unique_id() 243 except Exception as err: 244 self.log.info("Failed to scan or find device.") 245 return 246 for i in range(int(line)): 247 self.log.info("Running iteration {}".format(i + 1)) 248 connection_status = self.pri_dut.gattc_lib.bleConnectToPeripheral( 249 self.unique_mac_addr_id) 250 self.log.info("Connection status: {}".format( 251 pprint.pformat(connection_status))) 252 time.sleep(4) 253 disc_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 254 self.unique_mac_addr_id) 255 self.log.info("Disconnect status: {}".format(disc_status)) 256 time.sleep(3) 257 except Exception as err: 258 self.log.error(FAILURE.format(cmd, err)) 259 260 def do_gattc_disconnect(self, line): 261 """ 262 Description: Disconnect from LE peripheral. 263 Assumptions: Already connected to a peripheral. 264 Usage: 265 Examples: 266 gattc_disconnect 267 """ 268 cmd = "Disconenct from LE peripheral." 269 try: 270 disconnect_status = self.pri_dut.gattc_lib.bleDisconnectPeripheral( 271 self.unique_mac_addr_id) 272 self.log.info("Disconnect status: {}".format(disconnect_status)) 273 except Exception as err: 274 self.log.error(FAILURE.format(cmd, err)) 275 276 def do_gattc_list_services(self, line): 277 """ 278 Description: List services from LE peripheral. 279 Assumptions: Already connected to a peripheral. 280 Usage: 281 Examples: 282 gattc_list_services 283 """ 284 cmd = "List services from LE peripheral." 285 try: 286 services = self.pri_dut.gattc_lib.listServices( 287 self.unique_mac_addr_id) 288 self.log.info("Discovered Services: \n{}".format( 289 pprint.pformat(services))) 290 except Exception as err: 291 self.log.error(FAILURE.format(cmd, err)) 292 293 def do_gattc_connect_to_service(self, line): 294 """ 295 Description: Connect to Peripheral GATT server service. 296 Assumptions: Already connected to peripheral. 297 Input(s): 298 service_id: Required. The service id reference on the GATT server. 299 Usage: 300 Examples: 301 gattc_connect_to_service service_id 302 """ 303 cmd = "GATT client connect to GATT server service." 304 try: 305 self.pri_dut.gattc_lib.connectToService(self.unique_mac_addr_id, 306 int(line)) 307 except Exception as err: 308 self.log.error(FAILURE.format(cmd, err)) 309 310 def do_gattc_discover_characteristics(self, line): 311 """ 312 Description: Discover characteristics from a connected service. 313 Assumptions: Already connected to a GATT server service. 314 Usage: 315 Examples: 316 gattc_discover_characteristics 317 """ 318 cmd = "Discover and list characteristics from a GATT server." 319 try: 320 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 321 self.log.info("Discovered chars:\n{}".format( 322 pprint.pformat(chars))) 323 except Exception as err: 324 self.log.error(FAILURE.format(cmd, err)) 325 326 def do_gattc_notify_all_chars(self, line): 327 """ 328 Description: Enable all notifications on all Characteristics on 329 a GATT server. 330 Assumptions: Basic GATT connection made. 331 Usage: 332 Examples: 333 gattc_notify_all_chars 334 """ 335 cmd = "Read all characteristics from the GATT service." 336 try: 337 services = self.pri_dut.gattc_lib.listServices( 338 self.unique_mac_addr_id) 339 for service in services['result']: 340 service_id = service['id'] 341 service_uuid = service['uuid_type'] 342 self.pri_dut.gattc_lib.connectToService( 343 self.unique_mac_addr_id, service_id) 344 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 345 print("Reading chars in service uuid: {}".format(service_uuid)) 346 347 for char in chars['result']: 348 char_id = char['id'] 349 char_uuid = char['uuid_type'] 350 # quick char filter for apple-4 test... remove later 351 print("found uuid {}".format(char_uuid)) 352 try: 353 self.pri_dut.gattc_lib.enableNotifyCharacteristic( 354 char_id) 355 except Exception as err: 356 print("error enabling notification") 357 except Exception as err: 358 self.log.error(FAILURE.format(cmd, err)) 359 360 def do_gattc_read_all_chars(self, line): 361 """ 362 Description: Read all Characteristic values from a GATT server across 363 all services. 364 Assumptions: Basic GATT connection made. 365 Usage: 366 Examples: 367 gattc_read_all_chars 368 """ 369 cmd = "Read all characteristics from the GATT service." 370 try: 371 services = self.pri_dut.gattc_lib.listServices( 372 self.unique_mac_addr_id) 373 for service in services['result']: 374 service_id = service['id'] 375 service_uuid = service['uuid_type'] 376 self.pri_dut.gattc_lib.connectToService( 377 self.unique_mac_addr_id, service_id) 378 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 379 print("Reading chars in service uuid: {}".format(service_uuid)) 380 381 for char in chars['result']: 382 char_id = char['id'] 383 char_uuid = char['uuid_type'] 384 try: 385 read_val = \ 386 self.pri_dut.gattc_lib.readCharacteristicById( 387 char_id) 388 print(" Characteristic uuid / Value: {} / {}".format( 389 char_uuid, read_val['result'])) 390 str_value = "" 391 for val in read_val['result']: 392 str_value += chr(val) 393 print(" str val: {}".format(str_value)) 394 except Exception as err: 395 print(err) 396 pass 397 except Exception as err: 398 self.log.error(FAILURE.format(cmd, err)) 399 400 def do_gattc_read_all_desc(self, line): 401 """ 402 Description: Read all Descriptors values from a GATT server across 403 all services. 404 Assumptions: Basic GATT connection made. 405 Usage: 406 Examples: 407 gattc_read_all_chars 408 """ 409 cmd = "Read all descriptors from the GATT service." 410 try: 411 services = self.pri_dut.gattc_lib.listServices( 412 self.unique_mac_addr_id) 413 for service in services['result']: 414 service_id = service['id'] 415 service_uuid = service['uuid_type'] 416 self.pri_dut.gattc_lib.connectToService( 417 self.unique_mac_addr_id, service_id) 418 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 419 print("Reading descs in service uuid: {}".format(service_uuid)) 420 421 for char in chars['result']: 422 char_id = char['id'] 423 char_uuid = char['uuid_type'] 424 descriptors = char['descriptors'] 425 print(" Reading descs in char uuid: {}".format(char_uuid)) 426 for desc in descriptors: 427 desc_id = desc["id"] 428 desc_uuid = desc["uuid_type"] 429 try: 430 read_val = self.pri_dut.gattc_lib.readDescriptorById( 431 desc_id) 432 print(" Descriptor uuid / Value: {} / {}".format( 433 desc_uuid, read_val['result'])) 434 except Exception as err: 435 pass 436 except Exception as err: 437 self.log.error(FAILURE.format(cmd, err)) 438 439 def do_gattc_write_all_desc(self, line): 440 """ 441 Description: Write a value to all Descriptors on the GATT server. 442 Assumptions: Basic GATT connection made. 443 Input(s): 444 offset: Required. The offset to start writing to. 445 size: Required. The size of bytes to write (value will be generated). 446 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 447 Usage: 448 Examples: 449 gattc_write_all_desc 0 100 450 gattc_write_all_desc 10 2 451 """ 452 cmd = "Read all descriptors from the GATT service." 453 try: 454 args = line.split() 455 if len(args) != 2: 456 self.log.info("2 Arguments required: [Offset] [Size]") 457 return 458 offset = int(args[0]) 459 size = args[1] 460 write_value = [] 461 for i in range(int(size)): 462 write_value.append(i % 256) 463 services = self.pri_dut.gattc_lib.listServices( 464 self.unique_mac_addr_id) 465 for service in services['result']: 466 service_id = service['id'] 467 service_uuid = service['uuid_type'] 468 self.pri_dut.gattc_lib.connectToService( 469 self.unique_mac_addr_id, service_id) 470 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 471 print("Writing descs in service uuid: {}".format(service_uuid)) 472 473 for char in chars['result']: 474 char_id = char['id'] 475 char_uuid = char['uuid_type'] 476 descriptors = char['descriptors'] 477 print(" Reading descs in char uuid: {}".format(char_uuid)) 478 for desc in descriptors: 479 desc_id = desc["id"] 480 desc_uuid = desc["uuid_type"] 481 try: 482 write_val = self.pri_dut.gattc_lib.writeDescriptorById( 483 desc_id, offset, write_value) 484 print(" Descriptor uuid / Result: {} / {}".format( 485 desc_uuid, write_val['result'])) 486 except Exception as err: 487 pass 488 except Exception as err: 489 self.log.error(FAILURE.format(cmd, err)) 490 491 def do_gattc_read_all_long_desc(self, line): 492 """ 493 Description: Read all long Characteristic Descriptors 494 Assumptions: Basic GATT connection made. 495 Input(s): 496 offset: Required. The offset to start reading from. 497 max_bytes: Required. The max size of bytes to return. 498 Usage: 499 Examples: 500 gattc_read_all_long_desc 0 100 501 gattc_read_all_long_desc 10 20 502 """ 503 cmd = "Read all long descriptors from the GATT service." 504 try: 505 args = line.split() 506 if len(args) != 2: 507 self.log.info("2 Arguments required: [Offset] [Size]") 508 return 509 offset = int(args[0]) 510 max_bytes = int(args[1]) 511 services = self.pri_dut.ble_lib.bleListServices( 512 self.unique_mac_addr_id) 513 for service in services['result']: 514 service_id = service['id'] 515 service_uuid = service['uuid_type'] 516 self.pri_dut.gattc_lib.connectToService( 517 self.unique_mac_addr_id, service_id) 518 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 519 print("Reading descs in service uuid: {}".format(service_uuid)) 520 521 for char in chars['result']: 522 char_id = char['id'] 523 char_uuid = char['uuid_type'] 524 descriptors = char['descriptors'] 525 print(" Reading descs in char uuid: {}".format(char_uuid)) 526 for desc in descriptors: 527 desc_id = desc["id"] 528 desc_uuid = desc["uuid_type"] 529 try: 530 read_val = self.pri_dut.gattc_lib.readLongDescriptorById( 531 desc_id, offset, max_bytes) 532 print(" Descriptor uuid / Result: {} / {}".format( 533 desc_uuid, read_val['result'])) 534 except Exception as err: 535 pass 536 except Exception as err: 537 self.log.error(FAILURE.format(cmd, err)) 538 539 def do_gattc_read_all_long_char(self, line): 540 """ 541 Description: Read all long Characteristic 542 Assumptions: Basic GATT connection made. 543 Input(s): 544 offset: Required. The offset to start reading from. 545 max_bytes: Required. The max size of bytes to return. 546 Usage: 547 Examples: 548 gattc_read_all_long_char 0 100 549 gattc_read_all_long_char 10 20 550 """ 551 cmd = "Read all long Characteristics from the GATT service." 552 try: 553 args = line.split() 554 if len(args) != 2: 555 self.log.info("2 Arguments required: [Offset] [Size]") 556 return 557 offset = int(args[0]) 558 max_bytes = int(args[1]) 559 services = self.pri_dut.ble_lib.bleListServices( 560 self.unique_mac_addr_id) 561 for service in services['result']: 562 service_id = service['id'] 563 service_uuid = service['uuid_type'] 564 self.pri_dut.gattc_lib.connectToService( 565 self.unique_mac_addr_id, service_id) 566 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 567 print("Reading chars in service uuid: {}".format(service_uuid)) 568 569 for char in chars['result']: 570 char_id = char['id'] 571 char_uuid = char['uuid_type'] 572 try: 573 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 574 char_id, offset, max_bytes) 575 print(" Char uuid / Result: {} / {}".format( 576 char_uuid, read_val['result'])) 577 except Exception as err: 578 pass 579 except Exception as err: 580 self.log.error(FAILURE.format(cmd, err)) 581 582 def do_gattc_write_all_chars(self, line): 583 """ 584 Description: Write all characteristic values from a GATT server across 585 all services. 586 Assumptions: Basic GATT connection made. 587 Input(s): 588 offset: Required. The offset to start writing on. 589 size: The write value size (value will be generated) 590 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 591 Usage: 592 Examples: 593 gattc_write_all_chars 0 10 594 gattc_write_all_chars 10 1 595 """ 596 cmd = "Read all characteristics from the GATT service." 597 try: 598 args = line.split() 599 if len(args) != 2: 600 self.log.info("2 Arguments required: [Offset] [Size]") 601 return 602 offset = int(args[0]) 603 size = int(args[1]) 604 write_value = [] 605 for i in range(size): 606 write_value.append(i % 256) 607 services = self.pri_dut.gattc_lib.listServices( 608 self.unique_mac_addr_id) 609 for service in services['result']: 610 service_id = service['id'] 611 service_uuid = service['uuid_type'] 612 self.pri_dut.gattc_lib.connectToService( 613 self.unique_mac_addr_id, service_id) 614 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 615 print("Writing chars in service uuid: {}".format(service_uuid)) 616 617 for char in chars['result']: 618 char_id = char['id'] 619 char_uuid = char['uuid_type'] 620 try: 621 write_result = self.pri_dut.gattc_lib.writeCharById( 622 char_id, offset, write_value) 623 print(" Characteristic uuid write result: {} / {}". 624 format(char_uuid, write_result['result'])) 625 except Exception as err: 626 print("error writing char {}".format(err)) 627 pass 628 except Exception as err: 629 self.log.error(FAILURE.format(cmd, err)) 630 631 def do_gattc_write_all_chars_without_response(self, line): 632 """ 633 Description: Write all characteristic values from a GATT server across 634 all services. 635 Assumptions: Basic GATT connection made. 636 Input(s): 637 size: The write value size (value will be generated). 638 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 639 Usage: 640 Examples: 641 gattc_write_all_chars_without_response 100 642 """ 643 cmd = "Read all characteristics from the GATT service." 644 try: 645 args = line.split() 646 if len(args) != 1: 647 self.log.info("1 Arguments required: [Size]") 648 return 649 size = int(args[0]) 650 write_value = [] 651 for i in range(size): 652 write_value.append(i % 256) 653 services = self.pri_dut.gattc_lib.listServices( 654 self.unique_mac_addr_id) 655 for service in services['result']: 656 service_id = service['id'] 657 service_uuid = service['uuid_type'] 658 self.pri_dut.gattc_lib.connectToService( 659 self.unique_mac_addr_id, service_id) 660 chars = self.pri_dut.gattc_lib.discoverCharacteristics() 661 print("Reading chars in service uuid: {}".format(service_uuid)) 662 663 for char in chars['result']: 664 char_id = char['id'] 665 char_uuid = char['uuid_type'] 666 try: 667 write_result = \ 668 self.pri_dut.gattc_lib.writeCharByIdWithoutResponse( 669 char_id, write_value) 670 print(" Characteristic uuid write result: {} / {}". 671 format(char_uuid, write_result['result'])) 672 except Exception as err: 673 pass 674 except Exception as err: 675 self.log.error(FAILURE.format(cmd, err)) 676 677 def do_gattc_write_char_by_id(self, line): 678 """ 679 Description: Write char by characteristic id reference. 680 Assumptions: Already connected to a GATT server service. 681 Input(s): 682 characteristic_id: The characteristic id reference on the GATT 683 service 684 offset: The offset value to use 685 size: Function will generate random bytes by input size. 686 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 687 Usage: 688 Examples: 689 gattc_write_char_by_id char_id 0 5 690 gattc_write_char_by_id char_id 20 1 691 """ 692 cmd = "Write to GATT server characteristic ." 693 try: 694 args = line.split() 695 if len(args) != 3: 696 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 697 return 698 id = int(args[0]) 699 offset = int(args[1]) 700 size = int(args[2]) 701 write_value = [] 702 for i in range(size): 703 write_value.append(i % 256) 704 self.pri_dut.gattc_lib.writeCharById(id, offset, write_value) 705 except Exception as err: 706 self.log.error(FAILURE.format(cmd, err)) 707 708 def do_gattc_write_char_by_id_without_response(self, line): 709 """ 710 Description: Write char by characteristic id reference without response. 711 Assumptions: Already connected to a GATT server service. 712 Input(s): 713 characteristic_id: The characteristic id reference on the GATT 714 service 715 size: Function will generate random bytes by input size. 716 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 717 Usage: 718 Examples: 719 gattc_write_char_by_id_without_response char_id 5 720 """ 721 cmd = "Write characteristic by id without response." 722 try: 723 args = line.split() 724 if len(args) != 2: 725 self.log.info("2 Arguments required: [Id] [Size]") 726 return 727 id = int(args[0]) 728 size = args[1] 729 write_value = [] 730 for i in range(int(size)): 731 write_value.append(i % 256) 732 self.pri_dut.gattc_lib.writeCharByIdWithoutResponse( 733 id, write_value) 734 except Exception as err: 735 self.log.error(FAILURE.format(cmd, err)) 736 737 def do_gattc_enable_notify_char_by_id(self, line): 738 """ 739 Description: Enable Characteristic notification on Characteristic ID. 740 Assumptions: Already connected to a GATT server service. 741 Input(s): 742 characteristic_id: The characteristic id reference on the GATT 743 service 744 Usage: 745 Examples: 746 gattc_enable_notify_char_by_id char_id 747 """ 748 cmd = "Enable notifications by Characteristic id." 749 try: 750 id = int(line) 751 self.pri_dut.gattc_lib.enableNotifyCharacteristic(id) 752 except Exception as err: 753 self.log.error(FAILURE.format(cmd, err)) 754 755 def do_gattc_disable_notify_char_by_id(self, line): 756 """ 757 Description: Disable Characteristic notification on Characteristic ID. 758 Assumptions: Already connected to a GATT server service. 759 Input(s): 760 characteristic_id: The characteristic id reference on the GATT 761 service 762 Usage: 763 Examples: 764 gattc_disable_notify_char_by_id char_id 765 """ 766 cmd = "Disable notify Characteristic by id." 767 try: 768 id = int(line) 769 self.pri_dut.gattc_lib.disableNotifyCharacteristic(id) 770 except Exception as err: 771 self.log.error(FAILURE.format(cmd, err)) 772 773 def do_gattc_read_char_by_id(self, line): 774 """ 775 Description: Read Characteristic by ID. 776 Assumptions: Already connected to a GATT server service. 777 Input(s): 778 characteristic_id: The characteristic id reference on the GATT 779 service 780 Usage: 781 Examples: 782 gattc_read_char_by_id char_id 783 """ 784 cmd = "Read Characteristic value by ID." 785 try: 786 id = int(line) 787 read_val = self.pri_dut.gattc_lib.readCharacteristicById(id) 788 self.log.info("Characteristic Value with id {}: {}".format( 789 id, read_val['result'])) 790 str_value = "" 791 for val in read_val['result']: 792 str_value += chr(val) 793 print(" str val: {}".format(str_value)) 794 except Exception as err: 795 self.log.error(FAILURE.format(cmd, err)) 796 797 def do_gattc_write_desc_by_id(self, line): 798 """ 799 Description: Write Descriptor by characteristic id reference. 800 Assumptions: Already connected to a GATT server service. 801 Input(s): 802 descriptor_id: The Descriptor id reference on the GATT service 803 offset: The offset value to use 804 size: Function will generate random bytes by input size. 805 IE: Input of 5 will send a byte array of [00, 01, 02, 03, 04] 806 Usage: 807 Examples: 808 gattc_write_desc_by_id desc_id 0 5 809 gattc_write_desc_by_id desc_id 20 1 810 """ 811 cmd = "Write Descriptor by id." 812 try: 813 args = line.split() 814 id = int(args[0]) 815 offset = int(args[1]) 816 size = args[2] 817 write_value = [] 818 for i in range(int(size)): 819 write_value.append(i % 256) 820 write_result = self.pri_dut.gattc_lib.writeDescriptorById( 821 id, offset, write_value) 822 self.log.info("Descriptor Write result {}: {}".format( 823 id, write_result['result'])) 824 except Exception as err: 825 self.log.error(FAILURE.format(cmd, err)) 826 827 def do_gattc_read_desc_by_id(self, line): 828 """ 829 Description: Read Descriptor by ID. 830 Assumptions: Already connected to a GATT server service. 831 Input(s): 832 descriptor_id: The Descriptor id reference on the GATT service 833 Usage: 834 Examples: 835 gattc_read_desc_by_id desc_id 836 """ 837 cmd = "Read Descriptor by ID." 838 try: 839 id = int(line) 840 read_val = self.pri_dut.gattc_lib.readDescriptorById(id) 841 self.log.info("Descriptor Value with id {}: {}".format( 842 id, read_val['result'])) 843 except Exception as err: 844 self.log.error(FAILURE.format(cmd, err)) 845 846 def do_gattc_read_long_char_by_id(self, line): 847 """ 848 Description: Read long Characteristic value by id. 849 Assumptions: Already connected to a GATT server service. 850 Input(s): 851 characteristic_id: The characteristic id reference on the GATT 852 service 853 offset: The offset value to use. 854 max_bytes: The max bytes size to return. 855 Usage: 856 Examples: 857 gattc_read_long_char_by_id char_id 0 10 858 gattc_read_long_char_by_id char_id 20 1 859 """ 860 cmd = "Read long Characteristic value by id." 861 try: 862 args = line.split() 863 if len(args) != 3: 864 self.log.info("3 Arguments required: [Id] [Offset] [Size]") 865 return 866 id = int(args[0]) 867 offset = int(args[1]) 868 max_bytes = int(args[2]) 869 read_val = self.pri_dut.gattc_lib.readLongCharacteristicById( 870 id, offset, max_bytes) 871 self.log.info("Characteristic Value with id {}: {}".format( 872 id, read_val['result'])) 873 874 except Exception as err: 875 self.log.error(FAILURE.format(cmd, err)) 876 877 """End GATT client wrappers""" 878 """Begin LE scan wrappers""" 879 880 def _update_scan_results(self, scan_results): 881 self.le_ids = [] 882 for scan in scan_results['result']: 883 self.le_ids.append(scan['id']) 884 885 def do_ble_start_scan(self, line): 886 """ 887 Description: Perform a BLE scan. 888 Default filter name: "" 889 Optional input: filter_device_name 890 Usage: 891 Examples: 892 ble_start_scan 893 ble_start_scan eddystone 894 """ 895 cmd = "Perform a BLE scan and list discovered devices." 896 try: 897 scan_filter = {"name_substring": ""} 898 if line: 899 scan_filter = {"name_substring": line} 900 self.pri_dut.gattc_lib.bleStartBleScan(scan_filter) 901 except Exception as err: 902 self.log.error(FAILURE.format(cmd, err)) 903 904 def do_ble_stop_scan(self, line): 905 """ 906 Description: Stops a BLE scan and returns discovered devices. 907 Usage: 908 Examples: 909 ble_stop_scan 910 """ 911 cmd = "Stops a BLE scan and returns discovered devices." 912 try: 913 scan_results = self.pri_dut.gattc_lib.bleStopBleScan() 914 self._update_scan_results(scan_results) 915 self.log.info(pprint.pformat(scan_results)) 916 except Exception as err: 917 self.log.error(FAILURE.format(cmd, err)) 918 919 def do_ble_get_discovered_devices(self, line): 920 """ 921 Description: Get discovered LE devices of an active scan. 922 Usage: 923 Examples: 924 ble_stop_scan 925 """ 926 cmd = "Get discovered LE devices of an active scan." 927 try: 928 scan_results = self.pri_dut.gattc_lib.bleGetDiscoveredDevices() 929 self._update_scan_results(scan_results) 930 self.log.info(pprint.pformat(scan_results)) 931 except Exception as err: 932 self.log.error(FAILURE.format(cmd, err)) 933 934 """End LE scan wrappers""" 935 """Begin GATT Server wrappers""" 936 937 def complete_gatts_setup_database(self, text, line, begidx, endidx): 938 if not text: 939 completions = list( 940 gatt_test_database.GATT_SERVER_DB_MAPPING.keys()) 941 else: 942 completions = [ 943 s for s in gatt_test_database.GATT_SERVER_DB_MAPPING.keys() 944 if s.startswith(text) 945 ] 946 return completions 947 948 def do_gatts_setup_database(self, line): 949 """ 950 Description: Setup a Gatt server database based on pre-defined inputs. 951 Supports Tab Autocomplete. 952 Input(s): 953 descriptor_db_name: The descriptor db name that matches one in 954 acts.test_utils.bt.gatt_test_database 955 Usage: 956 Examples: 957 gatts_setup_database LARGE_DB_1 958 """ 959 cmd = "Setup GATT Server Database Based of pre-defined dictionaries" 960 try: 961 scan_results = self.pri_dut.gatts_lib.publishServer( 962 gatt_test_database.GATT_SERVER_DB_MAPPING.get(line)) 963 print(scan_results) 964 except Exception as err: 965 self.log.error(FAILURE.format(cmd, err)) 966 967 """End GATT Server wrappers""" 968