1#!/usr/bin/env python3 2# 3# Copyright 2019 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import inspect 18import logging 19 20from queue import Empty 21 22from acts.controllers.android_device import AndroidDevice 23from acts.controllers.fuchsia_device import FuchsiaDevice 24from acts_contrib.test_utils.bt.bt_constants import ble_scan_settings_modes 25from acts_contrib.test_utils.bt.bt_constants import gatt_cb_strings 26from acts_contrib.test_utils.bt.bt_constants import gatt_event 27from acts_contrib.test_utils.bt.bt_constants import scan_result 28from acts_contrib.test_utils.bt.bt_gatt_utils import GattTestUtilsError 29from acts_contrib.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection 30from acts_contrib.test_utils.bt.bt_gatt_utils import setup_gatt_connection 31from acts_contrib.test_utils.fuchsia.bt_test_utils import le_scan_for_device_by_name 32 33import acts_contrib.test_utils.bt.bt_test_utils as bt_test_utils 34 35 36def create_bluetooth_device(hardware_device): 37 """Creates a generic Bluetooth device based on type of device that is sent 38 to the functions. 39 40 Args: 41 hardware_device: A Bluetooth hardware device that is supported by ACTS. 42 """ 43 if isinstance(hardware_device, FuchsiaDevice): 44 return FuchsiaBluetoothDevice(hardware_device) 45 elif isinstance(hardware_device, AndroidDevice): 46 return AndroidBluetoothDevice(hardware_device) 47 else: 48 raise ValueError('Unable to create BluetoothDevice for type %s' % 49 type(hardware_device)) 50 51 52class BluetoothDevice(object): 53 """Class representing a generic Bluetooth device. 54 55 Each object of this class represents a generic Bluetooth device. 56 Android device and Fuchsia devices are the currently supported devices. 57 58 Attributes: 59 device: A generic Bluetooth device. 60 """ 61 def __init__(self, device): 62 self.device = device 63 self.log = logging 64 65 def a2dp_initiate_open_stream(self): 66 """Base generic Bluetooth interface. Only called if not overridden by 67 another supported device. 68 """ 69 raise NotImplementedError("{} must be defined.".format( 70 inspect.currentframe().f_code.co_name)) 71 72 def start_profile_a2dp_sink(self): 73 """Base generic Bluetooth interface. Only called if not overridden by 74 another supported device. 75 """ 76 raise NotImplementedError("{} must be defined.".format( 77 inspect.currentframe().f_code.co_name)) 78 79 def stop_profile_a2dp_sink(self): 80 """Base generic Bluetooth interface. Only called if not overridden by 81 another supported device. 82 """ 83 raise NotImplementedError("{} must be defined.".format( 84 inspect.currentframe().f_code.co_name)) 85 86 def start_pairing_helper(self): 87 """Base generic Bluetooth interface. Only called if not overridden by 88 another supported device. 89 """ 90 raise NotImplementedError("{} must be defined.".format( 91 inspect.currentframe().f_code.co_name)) 92 93 def set_discoverable(self, is_discoverable): 94 """Base generic Bluetooth interface. Only called if not overridden by 95 another supported device. 96 """ 97 raise NotImplementedError("{} must be defined.".format( 98 inspect.currentframe().f_code.co_name)) 99 100 def bluetooth_toggle_state(self, state): 101 """Base generic Bluetooth interface. Only called if not overridden by 102 another supported device. 103 """ 104 raise NotImplementedError("{} must be defined.".format( 105 inspect.currentframe().f_code.co_name)) 106 107 def gatt_client_discover_characteristic_by_uuid(self, peer_identifier, 108 uuid): 109 """Base generic Bluetooth interface. Only called if not overridden by 110 another supported device. 111 """ 112 raise NotImplementedError("{} must be defined.".format( 113 inspect.currentframe().f_code.co_name)) 114 115 def initialize_bluetooth_controller(self): 116 """Base generic Bluetooth interface. Only called if not overridden by 117 another supported device. 118 """ 119 raise NotImplementedError("{} must be defined.".format( 120 inspect.currentframe().f_code.co_name)) 121 122 def get_pairing_pin(self): 123 """Base generic Bluetooth interface. Only called if not overridden by 124 another supported device. 125 """ 126 raise NotImplementedError("{} must be defined.".format( 127 inspect.currentframe().f_code.co_name)) 128 129 def input_pairing_pin(self, pin): 130 """Base generic Bluetooth interface. Only called if not overridden by 131 another supported device. 132 """ 133 raise NotImplementedError("{} must be defined.".format( 134 inspect.currentframe().f_code.co_name)) 135 136 def get_bluetooth_local_address(self): 137 """Base generic Bluetooth interface. Only called if not overridden by 138 another supported device. 139 """ 140 raise NotImplementedError("{} must be defined.".format( 141 inspect.currentframe().f_code.co_name)) 142 143 def gatt_connect(self, peer_identifier, transport, autoconnect): 144 """Base generic Bluetooth interface. Only called if not overridden by 145 another supported device. 146 """ 147 raise NotImplementedError("{} must be defined.".format( 148 inspect.currentframe().f_code.co_name)) 149 150 def gatt_client_write_characteristic_without_response_by_handle( 151 self, peer_identifier, handle, value): 152 """Base generic Bluetooth interface. Only called if not overridden by 153 another supported device. 154 """ 155 raise NotImplementedError("{} must be defined.".format( 156 inspect.currentframe().f_code.co_name)) 157 158 def gatt_client_write_characteristic_by_handle(self, peer_identifier, 159 handle, offset, value): 160 """Base generic Bluetooth interface. Only called if not overridden by 161 another supported device. 162 """ 163 raise NotImplementedError("{} must be defined.".format( 164 inspect.currentframe().f_code.co_name)) 165 166 def gatt_client_read_characteristic_by_handle(self, peer_identifier, 167 handle): 168 """Base generic Bluetooth interface. Only called if not overridden by 169 another supported device. 170 """ 171 raise NotImplementedError("{} must be defined.".format( 172 inspect.currentframe().f_code.co_name)) 173 174 def gatt_client_read_characteristic_by_uuid(self, peer_identifier, uuid): 175 """Base generic Bluetooth interface. Only called if not overridden by 176 another supported device. 177 """ 178 raise NotImplementedError("{} must be defined.".format( 179 inspect.currentframe().f_code.co_name)) 180 181 def gatt_client_read_long_characteristic_by_handle(self, peer_identifier, 182 handle, offset, 183 max_bytes): 184 """Base generic Bluetooth interface. Only called if not overridden by 185 another supported device. 186 """ 187 raise NotImplementedError("{} must be defined.".format( 188 inspect.currentframe().f_code.co_name)) 189 190 def gatt_client_enable_notifiy_characteristic_by_handle( 191 self, peer_identifier, handle): 192 """Base generic Bluetooth interface. Only called if not overridden by 193 another supported device. 194 """ 195 raise NotImplementedError("{} must be defined.".format( 196 inspect.currentframe().f_code.co_name)) 197 198 def gatt_client_disable_notifiy_characteristic_by_handle( 199 self, peer_identifier, handle): 200 """Base generic Bluetooth interface. Only called if not overridden by 201 another supported device. 202 """ 203 raise NotImplementedError("{} must be defined.".format( 204 inspect.currentframe().f_code.co_name)) 205 206 def gatt_client_read_descriptor_by_handle(self, peer_identifier, handle): 207 """Base generic Bluetooth interface. Only called if not overridden by 208 another supported device. 209 """ 210 raise NotImplementedError("{} must be defined.".format( 211 inspect.currentframe().f_code.co_name)) 212 213 def gatt_client_write_descriptor_by_handle(self, peer_identifier, handle, 214 offset, value): 215 """Base generic Bluetooth interface. Only called if not overridden by 216 another supported device. 217 """ 218 raise NotImplementedError("{} must be defined.".format( 219 inspect.currentframe().f_code.co_name)) 220 221 def gatt_client_long_read_descriptor_by_handle(self, peer_identifier, 222 handle, offset, max_bytes): 223 """Base generic Bluetooth interface. Only called if not overridden by 224 another supported device. 225 """ 226 raise NotImplementedError("{} must be defined.".format( 227 inspect.currentframe().f_code.co_name)) 228 229 def gatt_disconnect(self, peer_identifier): 230 """Base generic Bluetooth interface. Only called if not overridden by 231 another supported device. 232 """ 233 raise NotImplementedError("{} must be defined.".format( 234 inspect.currentframe().f_code.co_name)) 235 236 def gatt_client_refresh(self, peer_identifier): 237 """Base generic Bluetooth interface. Only called if not overridden by 238 another supported device. 239 """ 240 raise NotImplementedError("{} must be defined.".format( 241 inspect.currentframe().f_code.co_name)) 242 243 def le_scan_with_name_filter(self, name, timeout): 244 """Base generic Bluetooth interface. Only called if not overridden by 245 another supported device. 246 """ 247 raise NotImplementedError("{} must be defined.".format( 248 inspect.currentframe().f_code.co_name)) 249 250 def log_info(self, log): 251 """Base generic Bluetooth interface. Only called if not overridden by 252 another supported device. 253 """ 254 raise NotImplementedError("{} must be defined.".format( 255 inspect.currentframe().f_code.co_name)) 256 257 def reset_bluetooth(self): 258 """Base generic Bluetooth interface. Only called if not overridden by 259 another supported device. 260 """ 261 raise NotImplementedError("{} must be defined.".format( 262 inspect.currentframe().f_code.co_name)) 263 264 def sdp_add_search(self, attribute_list, profile_id): 265 """Base generic Bluetooth interface. Only called if not overridden by 266 another supported device. 267 """ 268 raise NotImplementedError("{} must be defined.".format( 269 inspect.currentframe().f_code.co_name)) 270 271 def sdp_add_service(self, sdp_record): 272 """Base generic Bluetooth interface. Only called if not overridden by 273 another supported device. 274 """ 275 raise NotImplementedError("{} must be defined.".format( 276 inspect.currentframe().f_code.co_name)) 277 278 def sdp_clean_up(self): 279 """Base generic Bluetooth interface. Only called if not overridden by 280 another supported device. 281 """ 282 raise NotImplementedError("{} must be defined.".format( 283 inspect.currentframe().f_code.co_name)) 284 285 def sdp_init(self): 286 """Base generic Bluetooth interface. Only called if not overridden by 287 another supported device. 288 """ 289 raise NotImplementedError("{} must be defined.".format( 290 inspect.currentframe().f_code.co_name)) 291 292 def sdp_remove_service(self, service_id): 293 """Base generic Bluetooth interface. Only called if not overridden by 294 another supported device. 295 """ 296 raise NotImplementedError("{} must be defined.".format( 297 inspect.currentframe().f_code.co_name)) 298 299 def start_le_advertisement(self, adv_data, scan_response, adv_interval, 300 connectable): 301 """Base generic Bluetooth interface. Only called if not overridden by 302 another supported device. 303 """ 304 raise NotImplementedError("{} must be defined.".format( 305 inspect.currentframe().f_code.co_name)) 306 307 def stop_le_advertisement(self): 308 """Base generic Bluetooth interface. Only called if not overridden by 309 another supported device. 310 """ 311 raise NotImplementedError("{} must be defined.".format( 312 inspect.currentframe().f_code.co_name)) 313 314 def set_bluetooth_local_name(self, name): 315 """Base generic Bluetooth interface. Only called if not overridden by 316 another supported device. 317 """ 318 raise NotImplementedError("{} must be defined.".format( 319 inspect.currentframe().f_code.co_name)) 320 321 def setup_gatt_server(self, database): 322 """Base generic Bluetooth interface. Only called if not overridden by 323 another supported device. 324 """ 325 raise NotImplementedError("{} must be defined.".format( 326 inspect.currentframe().f_code.co_name)) 327 328 def close_gatt_server(self): 329 """Base generic Bluetooth interface. Only called if not overridden by 330 another supported device. 331 """ 332 raise NotImplementedError("{} must be defined.".format( 333 inspect.currentframe().f_code.co_name)) 334 335 def unbond_device(self, peer_identifier): 336 """Base generic Bluetooth interface. Only called if not overridden by 337 another supported device. 338 """ 339 raise NotImplementedError("{} must be defined.".format( 340 inspect.currentframe().f_code.co_name)) 341 342 def unbond_all_known_devices(self): 343 """Base generic Bluetooth interface. Only called if not overridden by 344 another supported device. 345 """ 346 raise NotImplementedError("{} must be defined.".format( 347 inspect.currentframe().f_code.co_name)) 348 349 def init_pair(self, peer_identifier, security_level, non_bondable, 350 transport): 351 """Base generic Bluetooth interface. Only called if not overridden by 352 another supported device. 353 """ 354 raise NotImplementedError("{} must be defined.".format( 355 inspect.currentframe().f_code.co_name)) 356 357 358class AndroidBluetoothDevice(BluetoothDevice): 359 """Class wrapper for an Android Bluetooth device. 360 361 Each object of this class represents a generic Bluetooth device. 362 Android device and Fuchsia devices are the currently supported devices/ 363 364 Attributes: 365 android_device: An Android Bluetooth device. 366 """ 367 def __init__(self, android_device): 368 super().__init__(android_device) 369 self.gatt_timeout = 10 370 self.peer_mapping = {} 371 self.discovered_services_index = None 372 373 def _client_wait(self, gatt_event, gatt_callback): 374 return self._timed_pop(gatt_event, gatt_callback) 375 376 def _timed_pop(self, gatt_event, gatt_callback): 377 expected_event = gatt_event["evt"].format(gatt_callback) 378 try: 379 return self.device.ed.pop_event(expected_event, self.gatt_timeout) 380 except Empty as emp: 381 raise AssertionError(gatt_event["err"].format(expected_event)) 382 383 def _setup_discovered_services_index(self, bluetooth_gatt): 384 """ Sets the discovered services index for the gatt connection 385 related to the Bluetooth GATT callback object. 386 387 Args: 388 bluetooth_gatt: The BluetoothGatt callback id 389 """ 390 if not self.discovered_services_index: 391 self.device.droid.gattClientDiscoverServices(bluetooth_gatt) 392 expected_event = gatt_cb_strings['gatt_serv_disc'].format( 393 self.gatt_callback) 394 event = self.dut.ed.pop_event(expected_event, self.gatt_timeout) 395 self.discovered_services_index = event['data']['ServicesIndex'] 396 397 def a2dp_initiate_open_stream(self): 398 raise NotImplementedError("{} not yet implemented.".format( 399 inspect.currentframe().f_code.co_name)) 400 401 def start_profile_a2dp_sink(self): 402 raise NotImplementedError("{} not yet implemented.".format( 403 inspect.currentframe().f_code.co_name)) 404 405 def stop_profile_a2dp_sink(self): 406 raise NotImplementedError("{} not yet implemented.".format( 407 inspect.currentframe().f_code.co_name)) 408 409 def bluetooth_toggle_state(self, state): 410 self.device.droid.bluetoothToggleState(state) 411 412 def set_discoverable(self, is_discoverable): 413 """ Sets the device's discoverability. 414 415 Args: 416 is_discoverable: True if discoverable, false if not discoverable 417 """ 418 if is_discoverable: 419 self.device.droid.bluetoothMakeDiscoverable() 420 else: 421 self.device.droid.bluetoothMakeUndiscoverable() 422 423 def initialize_bluetooth_controller(self): 424 """ Just pass for Android as there is no concept of initializing 425 a Bluetooth controller. 426 """ 427 pass 428 429 def start_pairing_helper(self): 430 """ Starts the Android pairing helper. 431 """ 432 self.device.droid.bluetoothStartPairingHelper(True) 433 434 def gatt_client_write_characteristic_without_response_by_handle( 435 self, peer_identifier, handle, value): 436 """ Perform a GATT Client write Characteristic without response to 437 remote peer GATT server database. 438 439 Args: 440 peer_identifier: The mac address associated with the GATT connection 441 handle: The characteristic handle (or instance id). 442 value: The list of bytes to write. 443 Returns: 444 True if success, False if failure. 445 """ 446 peer_info = self.peer_mapping.get(peer_identifier) 447 if not peer_info: 448 self.log.error( 449 "Peer idenifier {} not currently connected or unknown.".format( 450 peer_identifier)) 451 return False 452 self._setup_discovered_services_index() 453 self.device.droid.gattClientWriteCharacteristicByInstanceId( 454 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 455 handle, value) 456 try: 457 event = self._client_wait(gatt_event['char_write'], 458 peer_info.get('gatt_callback')) 459 except AssertionError as err: 460 self.log.error("Failed to write Characteristic: {}".format(err)) 461 return True 462 463 def gatt_client_write_characteristic_by_handle(self, peer_identifier, 464 handle, offset, value): 465 """ Perform a GATT Client write Characteristic without response to 466 remote peer GATT server database. 467 468 Args: 469 peer_identifier: The mac address associated with the GATT connection 470 handle: The characteristic handle (or instance id). 471 offset: Not used yet. 472 value: The list of bytes to write. 473 Returns: 474 True if success, False if failure. 475 """ 476 peer_info = self.peer_mapping.get(peer_identifier) 477 if not peer_info: 478 self.log.error( 479 "Peer idenifier {} not currently connected or unknown.".format( 480 peer_identifier)) 481 return False 482 self._setup_discovered_services_index() 483 self.device.droid.gattClientWriteCharacteristicByInstanceId( 484 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 485 handle, value) 486 try: 487 event = self._client_wait(gatt_event['char_write'], 488 peer_info.get('gatt_callback')) 489 except AssertionError as err: 490 self.log.error("Failed to write Characteristic: {}".format(err)) 491 return True 492 493 def gatt_client_read_characteristic_by_handle(self, peer_identifier, 494 handle): 495 """ Perform a GATT Client read Characteristic to remote peer GATT 496 server database. 497 498 Args: 499 peer_identifier: The mac address associated with the GATT connection 500 handle: The characteristic handle (or instance id). 501 Returns: 502 Value of Characteristic if success, None if failure. 503 """ 504 peer_info = self.peer_mapping.get(peer_identifier) 505 if not peer_info: 506 self.log.error( 507 "Peer idenifier {} not currently connected or unknown.".format( 508 peer_identifier)) 509 return False 510 self._setup_discovered_services_index() 511 self.dut.droid.gattClientReadCharacteristicByInstanceId( 512 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 513 handle) 514 try: 515 event = self._client_wait(gatt_event['char_read'], 516 peer_info.get('gatt_callback')) 517 except AssertionError as err: 518 self.log.error("Failed to read Characteristic: {}".format(err)) 519 520 return event['data']['CharacteristicValue'] 521 522 def gatt_client_read_long_characteristic_by_handle(self, peer_identifier, 523 handle, offset, 524 max_bytes): 525 """ Perform a GATT Client read Characteristic to remote peer GATT 526 server database. 527 528 Args: 529 peer_identifier: The mac address associated with the GATT connection 530 offset: Not used yet. 531 handle: The characteristic handle (or instance id). 532 max_bytes: Not used yet. 533 Returns: 534 Value of Characteristic if success, None if failure. 535 """ 536 peer_info = self.peer_mapping.get(peer_identifier) 537 if not peer_info: 538 self.log.error( 539 "Peer idenifier {} not currently connected or unknown.".format( 540 peer_identifier)) 541 return False 542 self._setup_discovered_services_index() 543 self.dut.droid.gattClientReadCharacteristicByInstanceId( 544 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 545 handle) 546 try: 547 event = self._client_wait(gatt_event['char_read'], 548 peer_info.get('gatt_callback')) 549 except AssertionError as err: 550 self.log.error("Failed to read Characteristic: {}".format(err)) 551 552 return event['data']['CharacteristicValue'] 553 554 def gatt_client_enable_notifiy_characteristic_by_handle( 555 self, peer_identifier, handle): 556 """ Perform a GATT Client enable Characteristic notification to remote 557 peer GATT server database. 558 559 Args: 560 peer_identifier: The mac address associated with the GATT connection 561 handle: The characteristic handle. 562 Returns: 563 True is success, False if failure. 564 """ 565 raise NotImplementedError("{} not yet implemented.".format( 566 inspect.currentframe().f_code.co_name)) 567 568 def gatt_client_disable_notifiy_characteristic_by_handle( 569 self, peer_identifier, handle): 570 """ Perform a GATT Client disable Characteristic notification to remote 571 peer GATT server database. 572 573 Args: 574 peer_identifier: The mac address associated with the GATT connection 575 handle: The characteristic handle. 576 Returns: 577 True is success, False if failure. 578 """ 579 raise NotImplementedError("{} not yet implemented.".format( 580 inspect.currentframe().f_code.co_name)) 581 582 def gatt_client_read_descriptor_by_handle(self, peer_identifier, handle): 583 """ Perform a GATT Client read Descriptor to remote peer GATT 584 server database. 585 586 Args: 587 peer_identifier: The mac address associated with the GATT connection 588 handle: The Descriptor handle (or instance id). 589 Returns: 590 Value of Descriptor if success, None if failure. 591 """ 592 peer_info = self.peer_mapping.get(peer_identifier) 593 if not peer_info: 594 self.log.error( 595 "Peer idenifier {} not currently connected or unknown.".format( 596 peer_identifier)) 597 return False 598 self._setup_discovered_services_index() 599 self.dut.droid.gattClientReadDescriptorByInstanceId( 600 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 601 handle) 602 try: 603 event = self._client_wait(gatt_event['desc_read'], 604 peer_info.get('gatt_callback')) 605 except AssertionError as err: 606 self.log.error("Failed to read Descriptor: {}".format(err)) 607 # TODO: Implement sending Descriptor value in SL4A such that the data 608 # can be represented by: event['data']['DescriptorValue'] 609 return "" 610 611 def gatt_client_write_descriptor_by_handle(self, peer_identifier, handle, 612 offset, value): 613 """ Perform a GATT Client write Descriptor to the remote peer GATT 614 server database. 615 616 Args: 617 peer_identifier: The mac address associated with the GATT connection 618 handle: The Descriptor handle (or instance id). 619 offset: Not used yet 620 value: The list of bytes to write. 621 Returns: 622 True if success, False if failure. 623 """ 624 peer_info = self.peer_mapping.get(peer_identifier) 625 if not peer_info: 626 self.log.error( 627 "Peer idenifier {} not currently connected or unknown.".format( 628 peer_identifier)) 629 return False 630 self._setup_discovered_services_index() 631 self.device.droid.gattClientWriteDescriptorByInstanceId( 632 peer_info.get('bluetooth_gatt'), self.discovered_services_index, 633 handle, value) 634 try: 635 event = self._client_wait(gatt_event['desc_write'], 636 peer_info.get('gatt_callback')) 637 except AssertionError as err: 638 self.log.error("Failed to write Characteristic: {}".format(err)) 639 return True 640 641 def gatt_connect(self, peer_identifier, transport, autoconnect=False): 642 """ Perform a GATT connection to a perihperal. 643 644 Args: 645 peer_identifier: The mac address to connect to. 646 transport: Which transport to use. 647 autoconnect: Set autocnnect to True or False. 648 Returns: 649 True if success, False if failure. 650 """ 651 try: 652 bluetooth_gatt, gatt_callback = setup_gatt_connection( 653 self.device, peer_identifier, autoconnect, transport) 654 self.peer_mapping[peer_identifier] = { 655 "bluetooth_gatt": bluetooth_gatt, 656 "gatt_callback": gatt_callback 657 } 658 except GattTestUtilsError as err: 659 self.log.error(err) 660 return False 661 return True 662 663 def gatt_disconnect(self, peer_identifier): 664 """ Perform a GATT disconnect from a perihperal. 665 666 Args: 667 peer_identifier: The peer to disconnect from. 668 Returns: 669 True if success, False if failure. 670 """ 671 peer_info = self.peer_mapping.get(peer_identifier) 672 if not peer_info: 673 self.log.error( 674 "No previous connections made to {}".format(peer_identifier)) 675 return False 676 677 try: 678 disconnect_gatt_connection(self.device, 679 peer_info.get("bluetooth_gatt"), 680 peer_info.get("gatt_callback")) 681 self.device.droid.gattClientClose(peer_info.get("bluetooth_gatt")) 682 except GattTestUtilsError as err: 683 self.log.error(err) 684 return False 685 self.device.droid.gattClientClose(peer_info.get("bluetooth_gatt")) 686 687 def gatt_client_refresh(self, peer_identifier): 688 """ Perform a GATT Client Refresh of a perihperal. 689 690 Clears the internal cache and forces a refresh of the services from the 691 remote device. 692 693 Args: 694 peer_identifier: The peer to refresh. 695 """ 696 peer_info = self.peer_mapping.get(peer_identifier) 697 if not peer_info: 698 self.log.error( 699 "No previous connections made to {}".format(peer_identifier)) 700 return False 701 self.device.droid.gattClientRefresh(peer_info["bluetooth_gatt"]) 702 703 def le_scan_with_name_filter(self, name, timeout): 704 """ Scan over LE for a specific device name. 705 706 Args: 707 name: The name filter to set. 708 timeout: The timeout to wait to find the advertisement. 709 Returns: 710 Discovered mac address or None 711 """ 712 self.device.droid.bleSetScanSettingsScanMode( 713 ble_scan_settings_modes['low_latency']) 714 filter_list = self.device.droid.bleGenFilterList() 715 scan_settings = self.device.droid.bleBuildScanSetting() 716 scan_callback = self.device.droid.bleGenScanCallback() 717 self.device.droid.bleSetScanFilterDeviceName(name) 718 self.device.droid.bleBuildScanFilter(filter_list) 719 self.device.droid.bleSetScanFilterDeviceName(self.name) 720 self.device.droid.bleStartBleScan(filter_list, scan_settings, 721 scan_callback) 722 try: 723 event = self.device.ed.pop_event(scan_result.format(scan_callback), 724 timeout) 725 return event['data']['Result']['deviceInfo']['address'] 726 except Empty as err: 727 self.log.info("Scanner did not find advertisement {}".format(err)) 728 return None 729 730 def log_info(self, log): 731 """ Log directly onto the device. 732 733 Args: 734 log: The informative log. 735 """ 736 self.device.droid.log.logI(log) 737 738 def set_bluetooth_local_name(self, name): 739 """ Sets the Bluetooth controller's local name 740 Args: 741 name: The name to set. 742 """ 743 self.device.droid.bluetoothSetLocalName(name) 744 745 def get_local_bluetooth_address(self): 746 """ Returns the Bluetooth local address. 747 """ 748 return self.device.droid.bluetoothGetLocalAddress() 749 750 def reset_bluetooth(self): 751 """ Resets Bluetooth on the Android Device. 752 """ 753 bt_test_utils.reset_bluetooth([self.device]) 754 755 def sdp_add_search(self, attribute_list, profile_id): 756 """Adds an SDP search record. 757 Args: 758 attribute_list: The list of attributes to set 759 profile_id: The profile ID to set. 760 """ 761 # Android devices currently have no hooks to modify the SDP record. 762 pass 763 764 def sdp_add_service(self, sdp_record): 765 """Adds an SDP service record. 766 Args: 767 sdp_record: The dictionary representing the search record to add. 768 Returns: 769 service_id: The service id to track the service record published. 770 None if failed. 771 """ 772 # Android devices currently have no hooks to modify the SDP record. 773 pass 774 775 def sdp_clean_up(self): 776 """Cleans up all objects related to SDP. 777 """ 778 self.device.sdp_lib.cleanUp() 779 780 def sdp_init(self): 781 """Initializes SDP on the device. 782 """ 783 # Android devices currently have no hooks to modify the SDP record. 784 pass 785 786 def sdp_remove_service(self, service_id): 787 """Removes a service based on an input id. 788 Args: 789 service_id: The service ID to remove. 790 """ 791 # Android devices currently have no hooks to modify the SDP record. 792 pass 793 794 def unbond_all_known_devices(self): 795 """ Unbond all known remote devices. 796 """ 797 self.device.droid.bluetoothFactoryReset() 798 799 def unbond_device(self, peer_identifier): 800 """ Unbond peer identifier. 801 802 Args: 803 peer_identifier: The mac address for the peer to unbond. 804 805 """ 806 self.device.droid.bluetoothUnbond(peer_identifier) 807 808 def init_pair(self, peer_identifier, security_level, non_bondable, 809 transport): 810 """ Send an outgoing pairing request the input peer_identifier. 811 812 Android currently does not support setting various security levels or 813 bondable modes. Making them available for other bluetooth_device 814 variants. Depending on the Address type, Android will figure out the 815 transport to pair automatically. 816 817 Args: 818 peer_identifier: A string representing the device id. 819 security_level: Not yet implemented. See Fuchsia device impl. 820 non_bondable: Not yet implemented. See Fuchsia device impl. 821 transport: Not yet implemented. See Fuchsia device impl. 822 823 """ 824 self.dut.droid.bluetoothBond(self.peer_identifier) 825 826 827class FuchsiaBluetoothDevice(BluetoothDevice): 828 """Class wrapper for an Fuchsia Bluetooth device. 829 830 Each object of this class represents a generic luetooth device. 831 Android device and Fuchsia devices are the currently supported devices/ 832 833 Attributes: 834 fuchsia_device: A Fuchsia Bluetooth device. 835 """ 836 def __init__(self, fuchsia_device): 837 super().__init__(fuchsia_device) 838 839 def a2dp_initiate_open_stream(self): 840 raise NotImplementedError("{} not yet implemented.".format( 841 inspect.currentframe().f_code.co_name)) 842 843 def start_profile_a2dp_sink(self): 844 """ Starts the A2DP sink profile. 845 """ 846 self.device.control_daemon("bt-a2dp-sink.cmx", "start") 847 848 def stop_profile_a2dp_sink(self): 849 """ Stops the A2DP sink profile. 850 """ 851 self.device.control_daemon("bt-a2dp-sink.cmx", "stop") 852 853 def start_pairing_helper(self): 854 self.device.bts_lib.acceptPairing() 855 856 def bluetooth_toggle_state(self, state): 857 """Stub for Fuchsia implementation.""" 858 pass 859 860 def set_discoverable(self, is_discoverable): 861 """ Sets the device's discoverability. 862 863 Args: 864 is_discoverable: True if discoverable, false if not discoverable 865 """ 866 self.device.bts_lib.setDiscoverable(is_discoverable) 867 868 def get_pairing_pin(self): 869 """ Get the pairing pin from the active pairing delegate. 870 """ 871 return self.device.bts_lib.getPairingPin()['result'] 872 873 def input_pairing_pin(self, pin): 874 """ Input pairing pin to active pairing delegate. 875 876 Args: 877 pin: The pin to input. 878 """ 879 self.device.bts_lib.inputPairingPin(pin) 880 881 def initialize_bluetooth_controller(self): 882 """ Initialize Bluetooth controller for first time use. 883 """ 884 self.device.bts_lib.initBluetoothSys() 885 886 def get_local_bluetooth_address(self): 887 """ Returns the Bluetooth local address. 888 """ 889 return self.device.bts_lib.getActiveAdapterAddress().get("result") 890 891 def set_bluetooth_local_name(self, name): 892 """ Sets the Bluetooth controller's local name 893 Args: 894 name: The name to set. 895 """ 896 self.device.bts_lib.setName(name) 897 898 def gatt_client_write_characteristic_without_response_by_handle( 899 self, peer_identifier, handle, value): 900 """ Perform a GATT Client write Characteristic without response to 901 remote peer GATT server database. 902 903 Args: 904 peer_identifier: The peer to connect to. 905 handle: The characteristic handle. 906 value: The list of bytes to write. 907 Returns: 908 True if success, False if failure. 909 """ 910 if (not self._find_service_id_and_connect_to_service_for_handle( 911 peer_identifier, handle)): 912 self.log.warn( 913 "Unable to find handle {} in GATT server db.".format(handle)) 914 result = self.device.gattc_lib.writeCharByIdWithoutResponse( 915 handle, value) 916 if result.get("error") is not None: 917 self.log.error( 918 "Failed to write characteristic handle {} with err: {}".format( 919 handle, result.get("error"))) 920 return False 921 return True 922 923 def gatt_client_write_characteristic_by_handle(self, peer_identifier, 924 handle, offset, value): 925 """ Perform a GATT Client write Characteristic to remote peer GATT 926 server database. 927 928 Args: 929 peer_identifier: The peer to connect to. 930 handle: The characteristic handle. 931 offset: The offset to start writing to. 932 value: The list of bytes to write. 933 Returns: 934 True if success, False if failure. 935 """ 936 if (not self._find_service_id_and_connect_to_service_for_handle( 937 peer_identifier, handle)): 938 self.log.warn( 939 "Unable to find handle {} in GATT server db.".format(handle)) 940 result = self.device.gattc_lib.writeCharById(handle, offset, value) 941 if result.get("error") is not None: 942 self.log.error( 943 "Failed to write characteristic handle {} with err: {}".format( 944 handle, result.get("error"))) 945 return False 946 return True 947 948 def gatt_client_write_long_characteristic_by_handle( 949 self, peer_identifier, handle, offset, value, reliable_mode=False): 950 """ Perform a GATT Client write long Characteristic to remote peer GATT 951 server database. 952 953 Args: 954 peer_identifier: The peer to connect to. 955 handle: The characteristic handle. 956 offset: The offset to start writing to. 957 value: The list of bytes to write. 958 reliable_mode: A bool value representing a reliable write or not. 959 Returns: 960 True if success, False if failure. 961 """ 962 if (not self._find_service_id_and_connect_to_service_for_handle( 963 peer_identifier, handle)): 964 self.log.error( 965 "Unable to find handle {} in GATT server db.".format(handle)) 966 return False 967 result = self.device.gattc_lib.writeLongCharById( 968 handle, offset, value, reliable_mode) 969 if result.get("error") is not None: 970 self.log.error( 971 "Failed to write long characteristic handle {} with err: {}". 972 format(peer_identifier, result.get("error"))) 973 return False 974 return True 975 976 def gatt_client_write_long_descriptor_by_handle(self, peer_identifier, 977 handle, offset, value): 978 """ Perform a GATT Client write long Descriptor to remote peer GATT 979 server database. 980 981 Args: 982 peer_identifier: The peer to connect to. 983 handle: The descriptor handle. 984 offset: The offset to start writing to. 985 value: The list of bytes to write. 986 Returns: 987 True if success, False if failure. 988 """ 989 if (not self._find_service_id_and_connect_to_service_for_handle( 990 peer_identifier, handle)): 991 self.log.error( 992 "Unable to find handle {} in GATT server db.".format(handle)) 993 return False 994 result = self.device.gattc_lib.writeLongDescById(handle, offset, value) 995 if result.get("error") is not None: 996 self.log.error( 997 "Failed to write long descriptor handle {} with err: {}". 998 format(peer_identifier, result.get("error"))) 999 return False 1000 return True 1001 1002 def gatt_client_read_characteristic_by_handle(self, peer_identifier, 1003 handle): 1004 """ Perform a GATT Client read Characteristic to remote peer GATT 1005 server database. 1006 1007 Args: 1008 peer_identifier: The peer to connect to. 1009 handle: The characteristic handle. 1010 Returns: 1011 Value of Characteristic if success, None if failure. 1012 """ 1013 if (not self._find_service_id_and_connect_to_service_for_handle( 1014 peer_identifier, handle)): 1015 self.log.warn( 1016 "Unable to find handle {} in GATT server db.".format(handle)) 1017 result = self.device.gattc_lib.readCharacteristicById(handle) 1018 if result.get("error") is not None: 1019 self.log.error( 1020 "Failed to read characteristic handle {} with err: {}".format( 1021 handle, result.get("error"))) 1022 return None 1023 return result.get("result") 1024 1025 def gatt_client_read_characteristic_by_uuid(self, peer_identifier, uuid): 1026 """ Perform a GATT Client read Characteristic by uuid to remote peer GATT 1027 server database. 1028 1029 Args: 1030 peer_identifier: The peer to connect to. 1031 uuid: The characteristic uuid. 1032 Returns: 1033 Value of Characteristic if success, None if failure. 1034 """ 1035 if (not self._find_service_id_and_connect_to_service_for_handle( 1036 peer_identifier, uuid, uuid=True)): 1037 self.log.warn( 1038 "Unable to find uuid {} in GATT server db.".format(uuid)) 1039 result = self.device.gattc_lib.readCharacteristicByType(uuid) 1040 if result.get("error") is not None: 1041 self.log.error( 1042 "Failed to read characteristic uuid {} with err: {}".format( 1043 uuid, result.get("error"))) 1044 return None 1045 return result.get("result") 1046 1047 def gatt_client_read_long_characteristic_by_handle(self, peer_identifier, 1048 handle, offset, 1049 max_bytes): 1050 """ Perform a GATT Client read Characteristic to remote peer GATT 1051 server database. 1052 1053 Args: 1054 peer_identifier: The peer to connect to. 1055 handle: The characteristic handle. 1056 offset: The offset to start reading. 1057 max_bytes: The max bytes to return for each read. 1058 Returns: 1059 Value of Characteristic if success, None if failure. 1060 """ 1061 if (not self._find_service_id_and_connect_to_service_for_handle( 1062 peer_identifier, handle)): 1063 self.log.warn( 1064 "Unable to find handle {} in GATT server db.".format(handle)) 1065 result = self.device.gattc_lib.readLongCharacteristicById( 1066 handle, offset, max_bytes) 1067 if result.get("error") is not None: 1068 self.log.error( 1069 "Failed to read characteristic handle {} with err: {}".format( 1070 handle, result.get("error"))) 1071 return None 1072 return result.get("result") 1073 1074 def gatt_client_enable_notifiy_characteristic_by_handle( 1075 self, peer_identifier, handle): 1076 """ Perform a GATT Client enable Characteristic notification to remote 1077 peer GATT server database. 1078 1079 Args: 1080 peer_identifier: The peer to connect to. 1081 handle: The characteristic handle. 1082 Returns: 1083 True is success, False if failure. 1084 """ 1085 if (not self._find_service_id_and_connect_to_service_for_handle( 1086 peer_identifier, handle)): 1087 self.log.warn( 1088 "Unable to find handle {} in GATT server db.".format(handle)) 1089 result = self.device.gattc_lib.enableNotifyCharacteristic(handle) 1090 if result.get("error") is not None: 1091 self.log.error( 1092 "Failed to enable characteristic notifications for handle {} " 1093 "with err: {}".format(handle, result.get("error"))) 1094 return None 1095 return result.get("result") 1096 1097 def gatt_client_disable_notifiy_characteristic_by_handle( 1098 self, peer_identifier, handle): 1099 """ Perform a GATT Client disable Characteristic notification to remote 1100 peer GATT server database. 1101 1102 Args: 1103 peer_identifier: The peer to connect to. 1104 handle: The characteristic handle. 1105 Returns: 1106 True is success, False if failure. 1107 """ 1108 if (not self._find_service_id_and_connect_to_service_for_handle( 1109 peer_identifier, handle)): 1110 self.log.warn( 1111 "Unable to find handle {} in GATT server db.".format(handle)) 1112 result = self.device.gattc_lib.disableNotifyCharacteristic(handle) 1113 if result.get("error") is not None: 1114 self.log.error( 1115 "Failed to disable characteristic notifications for handle {} " 1116 "with err: {}".format(peer_identifier, result.get("error"))) 1117 return None 1118 return result.get("result") 1119 1120 def gatt_client_read_descriptor_by_handle(self, peer_identifier, handle): 1121 """ Perform a GATT Client read Descriptor to remote peer GATT server 1122 database. 1123 1124 Args: 1125 peer_identifier: The peer to connect to. 1126 handle: The Descriptor handle. 1127 Returns: 1128 Value of Descriptor if success, None if failure. 1129 """ 1130 if (not self._find_service_id_and_connect_to_service_for_handle( 1131 peer_identifier, handle)): 1132 self.log.warn( 1133 "Unable to find handle {} in GATT server db.".format(handle)) 1134 result = self.device.gattc_lib.readDescriptorById(handle) 1135 if result.get("error") is not None: 1136 self.log.error( 1137 "Failed to read descriptor for handle {} with err: {}".format( 1138 peer_identifier, result.get("error"))) 1139 return None 1140 return result.get("result") 1141 1142 def gatt_client_write_descriptor_by_handle(self, peer_identifier, handle, 1143 offset, value): 1144 """ Perform a GATT Client write Descriptor to remote peer GATT server 1145 database. 1146 1147 Args: 1148 peer_identifier: The peer to connect to. 1149 handle: The Descriptor handle. 1150 offset: The offset to start writing at. 1151 value: The list of bytes to write. 1152 Returns: 1153 True if success, False if failure. 1154 """ 1155 if (not self._find_service_id_and_connect_to_service_for_handle( 1156 peer_identifier, handle)): 1157 self.log.warn( 1158 "Unable to find handle {} in GATT server db.".format(handle)) 1159 result = self.device.gattc_lib.writeDescriptorById( 1160 handle, offset, value) 1161 if result.get("error") is not None: 1162 self.log.error( 1163 "Failed to write descriptor for handle {} with err: {}".format( 1164 peer_identifier, result.get("error"))) 1165 return None 1166 return True 1167 1168 def gatt_connect(self, peer_identifier, transport, autoconnect): 1169 """ Perform a GATT connection to a perihperal. 1170 1171 Args: 1172 peer_identifier: The peer to connect to. 1173 transport: Not implemented. 1174 autoconnect: Not implemented. 1175 Returns: 1176 True if success, False if failure. 1177 """ 1178 connection_result = self.device.gattc_lib.bleConnectToPeripheral( 1179 peer_identifier) 1180 if connection_result.get("error") is not None: 1181 self.log.error("Failed to connect to peer id {}: {}".format( 1182 peer_identifier, connection_result.get("error"))) 1183 return False 1184 return True 1185 1186 def gatt_client_refresh(self, peer_identifier): 1187 """ Perform a GATT Client Refresh of a perihperal. 1188 1189 Clears the internal cache and forces a refresh of the services from the 1190 remote device. In Fuchsia there is no FIDL api to automatically do this 1191 yet. Therefore just read all Characteristics which satisfies the same 1192 requirements. 1193 1194 Args: 1195 peer_identifier: The peer to refresh. 1196 """ 1197 self._read_all_characteristics(peer_identifier) 1198 1199 def gatt_client_discover_characteristic_by_uuid(self, peer_identifier, 1200 uuid): 1201 """ Perform a GATT Client Refresh of a perihperal. 1202 1203 Clears the internal cache and forces a refresh of the services from the 1204 remote device. In Fuchsia there is no FIDL api to automatically do this 1205 yet. Therefore just read all Characteristics which satisfies the same 1206 requirements. 1207 1208 Args: 1209 peer_identifier: The peer to refresh. 1210 """ 1211 self._read_all_characteristics(peer_identifier, uuid) 1212 1213 def gatt_disconnect(self, peer_identifier): 1214 """ Perform a GATT disconnect from a perihperal. 1215 1216 Args: 1217 peer_identifier: The peer to disconnect from. 1218 Returns: 1219 True if success, False if failure. 1220 """ 1221 disconnect_result = self.device.gattc_lib.bleDisconnectPeripheral( 1222 peer_identifier) 1223 if disconnect_result.get("error") is not None: 1224 self.log.error("Failed to disconnect from peer id {}: {}".format( 1225 peer_identifier, disconnect_result.get("error"))) 1226 return False 1227 return True 1228 1229 def reset_bluetooth(self): 1230 """Stub for Fuchsia implementation.""" 1231 pass 1232 1233 def sdp_add_search(self, attribute_list, profile_id): 1234 """Adds an SDP search record. 1235 Args: 1236 attribute_list: The list of attributes to set 1237 profile_id: The profile ID to set. 1238 """ 1239 return self.device.sdp_lib.addSearch(attribute_list, profile_id) 1240 1241 def sdp_add_service(self, sdp_record): 1242 """Adds an SDP service record. 1243 Args: 1244 sdp_record: The dictionary representing the search record to add. 1245 """ 1246 return self.device.sdp_lib.addService(sdp_record) 1247 1248 def sdp_clean_up(self): 1249 """Cleans up all objects related to SDP. 1250 """ 1251 return self.device.sdp_lib.cleanUp() 1252 1253 def sdp_init(self): 1254 """Initializes SDP on the device. 1255 """ 1256 return self.device.sdp_lib.init() 1257 1258 def sdp_remove_service(self, service_id): 1259 """Removes a service based on an input id. 1260 Args: 1261 service_id: The service ID to remove. 1262 """ 1263 return self.device.sdp_lib.init() 1264 1265 def start_le_advertisement(self, adv_data, scan_response, adv_interval, 1266 connectable): 1267 """ Starts an LE advertisement 1268 1269 Args: 1270 adv_data: Advertisement data. 1271 adv_interval: Advertisement interval. 1272 """ 1273 self.device.ble_lib.bleStartBleAdvertising(adv_data, scan_response, 1274 adv_interval, connectable) 1275 1276 def stop_le_advertisement(self): 1277 """ Stop active LE advertisement. 1278 """ 1279 self.device.ble_lib.bleStopBleAdvertising() 1280 1281 def setup_gatt_server(self, database): 1282 """ Sets up an input GATT server. 1283 1284 Args: 1285 database: A dictionary representing the GATT database to setup. 1286 """ 1287 self.device.gatts_lib.publishServer(database) 1288 1289 def close_gatt_server(self): 1290 """ Closes an existing GATT server. 1291 """ 1292 self.device.gatts_lib.closeServer() 1293 1294 def le_scan_with_name_filter(self, name, timeout): 1295 """ Scan over LE for a specific device name. 1296 1297 Args: 1298 name: The name filter to set. 1299 timeout: The timeout to wait to find the advertisement. 1300 Returns: 1301 Discovered device id or None 1302 """ 1303 partial_match = True 1304 return le_scan_for_device_by_name(self.device, self.device.log, name, 1305 timeout, partial_match) 1306 1307 def log_info(self, log): 1308 """ Log directly onto the device. 1309 1310 Args: 1311 log: The informative log. 1312 """ 1313 self.device.logging_lib.logI(log) 1314 pass 1315 1316 def unbond_all_known_devices(self): 1317 """ Unbond all known remote devices. 1318 """ 1319 try: 1320 device_list = self.device.bts_lib.getKnownRemoteDevices()['result'] 1321 for device_info in device_list: 1322 device = device_list[device_info] 1323 if device['bonded']: 1324 self.device.bts_lib.forgetDevice(device['id']) 1325 except Exception as err: 1326 self.log.err("Unable to unbond all devices: {}".format(err)) 1327 1328 def unbond_device(self, peer_identifier): 1329 """ Unbond peer identifier. 1330 1331 Args: 1332 peer_identifier: The peer identifier for the peer to unbond. 1333 1334 """ 1335 self.device.bts_lib.forgetDevice(peer_identifier) 1336 1337 def _find_service_id_and_connect_to_service_for_handle( 1338 self, peer_identifier, handle, uuid=False): 1339 fail_err = "Failed to find handle {} in Peer database." 1340 if uuid: 1341 handle = handle.lower() 1342 try: 1343 services = self.device.gattc_lib.listServices(peer_identifier) 1344 for service in services['result']: 1345 service_id = service['id'] 1346 self.device.gattc_lib.connectToService(peer_identifier, 1347 service_id) 1348 chars = self.device.gattc_lib.discoverCharacteristics() 1349 1350 for char in chars['result']: 1351 char_id = char['id'] 1352 if uuid: 1353 char_id = char['uuid_type'] 1354 if handle == char_id: 1355 return True 1356 descriptors = char['descriptors'] 1357 for desc in descriptors: 1358 desc_id = desc["id"] 1359 if uuid: 1360 desc_id = desc['uuid_type'] 1361 if handle == desc_id: 1362 return True 1363 except Exception as err: 1364 self.log.error(fail_err.format(err)) 1365 return False 1366 1367 def _read_all_characteristics(self, peer_identifier, uuid=None): 1368 fail_err = "Failed to read all characteristics with: {}" 1369 try: 1370 services = self.device.gattc_lib.listServices(peer_identifier) 1371 for service in services['result']: 1372 service_id = service['id'] 1373 service_uuid = service['uuid_type'] 1374 self.device.gattc_lib.connectToService(peer_identifier, 1375 service_id) 1376 chars = self.device.gattc_lib.discoverCharacteristics() 1377 self.log.info( 1378 "Reading chars in service uuid: {}".format(service_uuid)) 1379 1380 for char in chars['result']: 1381 char_id = char['id'] 1382 char_uuid = char['uuid_type'] 1383 if uuid and uuid.lower() not in char_uuid.lower(): 1384 continue 1385 try: 1386 read_val = \ 1387 self.device.gattc_lib.readCharacteristicById( 1388 char_id) 1389 self.log.info( 1390 "\tCharacteristic uuid / Value: {} / {}".format( 1391 char_uuid, read_val['result'])) 1392 str_value = "" 1393 for val in read_val['result']: 1394 str_value += chr(val) 1395 self.log.info("\t\tstr val: {}".format(str_value)) 1396 except Exception as err: 1397 self.log.error(err) 1398 pass 1399 except Exception as err: 1400 self.log.error(fail_err.forma(err)) 1401 1402 def _perform_read_all_descriptors(self, peer_identifier): 1403 fail_err = "Failed to read all characteristics with: {}" 1404 try: 1405 services = self.device.gattc_lib.listServices(peer_identifier) 1406 for service in services['result']: 1407 service_id = service['id'] 1408 service_uuid = service['uuid_type'] 1409 self.device.gattc_lib.connectToService(peer_identifier, 1410 service_id) 1411 chars = self.device.gattc_lib.discoverCharacteristics() 1412 self.log.info( 1413 "Reading descs in service uuid: {}".format(service_uuid)) 1414 1415 for char in chars['result']: 1416 char_id = char['id'] 1417 char_uuid = char['uuid_type'] 1418 descriptors = char['descriptors'] 1419 self.log.info( 1420 "\tReading descs in char uuid: {}".format(char_uuid)) 1421 for desc in descriptors: 1422 desc_id = desc["id"] 1423 desc_uuid = desc["uuid_type"] 1424 try: 1425 read_val = self.device.gattc_lib.readDescriptorById( 1426 desc_id) 1427 self.log.info( 1428 "\t\tDescriptor uuid / Value: {} / {}".format( 1429 desc_uuid, read_val['result'])) 1430 except Exception as err: 1431 pass 1432 except Exception as err: 1433 self.log.error(fail_err.format(err)) 1434 1435 def init_pair(self, peer_identifier, security_level, non_bondable, 1436 transport): 1437 """ Send an outgoing pairing request the input peer_identifier. 1438 1439 Android currently does not support setting various security levels or 1440 bondable modes. Making them available for other bluetooth_device 1441 variants. Depending on the Address type, Android will figure out the 1442 transport to pair automatically. 1443 1444 Args: 1445 peer_identifier: A string representing the device id. 1446 security_level: The security level required for this pairing request 1447 represented as a u64. (Only for LE pairing) 1448 Available Values 1449 1 - ENCRYPTED: Encrypted without MITM protection 1450 (unauthenticated) 1451 2 - AUTHENTICATED: Encrypted with MITM protection 1452 (authenticated) 1453 None: No pairing security level. 1454 non_bondable: A bool representing whether the pairing mode is 1455 bondable or not. None is also accepted. False if bondable, True 1456 if non-bondable 1457 transport: A u64 representing the transport type. 1458 Available Values 1459 1 - BREDR: Classic BR/EDR transport 1460 2 - LE: Bluetooth Low Energy Transport 1461 Returns: 1462 True if successful, False if failed. 1463 """ 1464 try: 1465 self.device.bts_lib.pair(peer_identifier, security_level, 1466 non_bondable, transport) 1467 return True 1468 except Exception as err: 1469 fail_err = "Failed to pair to peer_identifier {} with: {}".format( 1470 peer_identifier) 1471 self.log.error(fail_err.format(err)) 1472