1# Lint as: python2, python3 2# Copyright 2021 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Client class to access the Floss adapter interface.""" 6 7from __future__ import absolute_import 8from __future__ import division 9from __future__ import print_function 10 11from enum import IntEnum 12from gi.repository import GLib 13import logging 14import math 15import random 16 17from autotest_lib.client.cros.bluetooth.floss.observer_base import ObserverBase 18from autotest_lib.client.cros.bluetooth.floss.utils import (glib_call, 19 glib_callback, 20 PropertySet) 21 22 23class BondState(IntEnum): 24 """Bluetooth bonding state.""" 25 NOT_BONDED = 0 26 BONDING = 1 27 BONDED = 2 28 29 30class Transport(IntEnum): 31 """Bluetooth transport type.""" 32 AUTO = 0 33 BREDR = 1 34 LE = 2 35 36 37class SspVariant(IntEnum): 38 """Bluetooth SSP variant type.""" 39 PASSKEY_CONFIRMATION = 0 40 PASSKEY_ENTRY = 1 41 CONSENT = 2 42 PASSKEY_NOTIFICATION = 3 43 44 45class BluetoothCallbacks: 46 """Callbacks for the Adapter Interface. 47 48 Implement this to observe these callbacks when exporting callbacks via 49 register_callback. 50 """ 51 def on_address_changed(self, addr): 52 """Adapter address changed. 53 54 @param addr: New address of the adapter. 55 """ 56 pass 57 58 def on_device_found(self, remote_device): 59 """Device found via discovery. 60 61 @param remote_device: Remove device found during discovery session. 62 """ 63 pass 64 65 def on_discovering_changed(self, discovering): 66 """Discovering state has changed. 67 68 @param discovering: Whether discovery enabled or disabled. 69 """ 70 pass 71 72 def on_ssp_request(self, remote_device, class_of_device, variant, passkey): 73 """Simple secure pairing request for agent to reply. 74 75 @param remote_device: Remote device that is being paired. 76 @param class_of_device: Class of device as described in HCI spec. 77 @param variant: SSP variant (0-3). [Confirmation, Entry, Consent, Notification] 78 @param passkey: Passkey to display (so user can confirm or type it). 79 """ 80 pass 81 82 def on_bond_state_changed(self, status, device_address, state): 83 """Bonding/Pairing state has changed for a device. 84 85 @param status: Success (0) or failure reason for bonding. 86 @param device_address: This notification is for this BDADDR. 87 @param state: Bonding state. 0 = Not bonded, 1 = Bonding, 2 = Bonded. 88 """ 89 pass 90 91 92class BluetoothConnectionCallbacks: 93 """Callbacks for the Device Connection interface. 94 95 Implement this to observe these callbacks when exporting callbacks via 96 register_connection_callback 97 """ 98 def on_device_connected(self, remote_device): 99 """Notification that a device has completed HCI connection. 100 101 @param remote_device: Remote device that completed HCI connection. 102 """ 103 pass 104 105 def on_device_disconnected(self, remote_device): 106 """Notification that a device has completed HCI disconnection. 107 108 @param remote_device: Remote device that completed HCI disconnection. 109 """ 110 pass 111 112 113class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks): 114 """Handles method calls to and callbacks from the Adapter interface.""" 115 116 ADAPTER_SERVICE = 'org.chromium.bluetooth' 117 ADAPTER_INTERFACE = 'org.chromium.bluetooth.Bluetooth' 118 ADAPTER_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/adapter' 119 ADAPTER_CB_INTF = 'org.chromium.bluetooth.BluetoothCallback' 120 ADAPTER_CB_OBJ_PATTERN = '/org/chromium/bluetooth/hci{}/test_adapter_client{}' 121 ADAPTER_CONN_CB_INTF = 'org.chromium.bluetooth.BluetoothConnectionCallback' 122 ADAPTER_CONN_CB_OBJ_PATTERN = '/org/chromium/bluetooth/hci{}/test_connection_client{}' 123 124 @staticmethod 125 def parse_dbus_device(remote_device_dbus): 126 """Parse a dbus variant dict as a remote device. 127 128 @param remote_device_dbus: Variant dict with signature a{sv}. 129 130 @return Parsing success, BluetoothDevice tuple 131 """ 132 if 'address' in remote_device_dbus and 'name' in remote_device_dbus: 133 return True, (str(remote_device_dbus['address']), 134 str(remote_device_dbus['name'])) 135 136 return False, None 137 138 class ExportedAdapterCallbacks(ObserverBase): 139 """ 140 <node> 141 <interface name="org.chromium.bluetooth.BluetoothCallback"> 142 <method name="OnAddressChanged"> 143 <arg type="s" name="addr" direction="in" /> 144 </method> 145 <method name="OnDeviceFound"> 146 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 147 </method> 148 <method name="OnDiscoveringChanged"> 149 <arg type="b" name="discovering" direction="in" /> 150 </method> 151 <method name="OnSspRequest"> 152 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 153 <arg type="u" name="class_of_device" direction="in" /> 154 <arg type="u" name="variant" direction="in" /> 155 <arg type="u" name="passkey" direction="in" /> 156 </method> 157 <method name="OnBondStateChanged"> 158 <arg type="u" name="status" direction="in" /> 159 <arg type="s" name="address" direction="in" /> 160 <arg type="u" name="state" direction="in" /> 161 </method> 162 </interface> 163 </node> 164 """ 165 166 def __init__(self): 167 """Construct exported callbacks object. 168 """ 169 ObserverBase.__init__(self) 170 171 def OnAddressChanged(self, addr): 172 """Handle address changed callbacks.""" 173 for observer in self.observers.values(): 174 observer.on_address_changed(addr) 175 176 def OnDeviceFound(self, remote_device_dbus): 177 """Handle device found from discovery.""" 178 parsed, remote_device = FlossAdapterClient.parse_dbus_device( 179 remote_device_dbus) 180 if not parsed: 181 logging.debug('OnDeviceFound parse error: {}'.format( 182 remote_device_dbus)) 183 return 184 185 for observer in self.observers.values(): 186 observer.on_device_found(remote_device) 187 188 def OnDiscoveringChanged(self, discovering): 189 """Handle discovering state changed.""" 190 for observer in self.observers.values(): 191 observer.on_discovering_changed(bool(discovering)) 192 193 def OnSspRequest(self, remote_device_dbus, class_of_device, variant, 194 passkey): 195 """Handle pairing/bonding request to agent.""" 196 parsed, remote_device = FlossAdapterClient.parse_dbus_device( 197 remote_device_dbus) 198 if not parsed: 199 logging.debug('OnSspRequest parse error: {}'.format( 200 remote_device_dbus)) 201 return 202 203 for observer in self.observers.values(): 204 observer.on_ssp_request(remote_device, class_of_device, 205 variant, passkey) 206 207 def OnBondStateChanged(self, status, address, state): 208 """Handle bond state changed callbacks.""" 209 for observer in self.observers.values(): 210 observer.on_bond_state_changed(status, address, state) 211 212 class ExportedConnectionCallbacks(ObserverBase): 213 """ 214 <node> 215 <interface name="org.chromium.bluetooth.BluetoothConnectionCallback"> 216 <method name="OnDeviceConnected"> 217 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 218 </method> 219 <method name="OnDeviceDisconnected"> 220 <arg type="a{sv}" name="remote_device_dbus" direction="in" /> 221 </method> 222 </interface> 223 </node> 224 """ 225 226 def __init__(self, bus, object_path): 227 """Construct exported connection callbacks object. 228 """ 229 ObserverBase.__init__(self) 230 231 def OnDeviceConnected(self, remote_device_dbus): 232 """Handle device connected.""" 233 parsed, remote_device = FlossAdapterClient.parse_dbus_device( 234 remote_device_dbus) 235 if not parsed: 236 logging.debug('OnDeviceConnected parse error: {}'.format( 237 remote_device_dbus)) 238 return 239 240 for observer in self.observers.values(): 241 observer.on_device_connected(remote_device) 242 243 def OnDeviceDisconnected(self, remote_device_dbus): 244 """Handle device disconnected.""" 245 parsed, remote_device = FlossAdapterClient.parse_dbus_device( 246 remote_device_dbus) 247 if not parsed: 248 logging.debug('OnDeviceDisconnected parse error: {}'.format( 249 remote_device_dbus)) 250 return 251 252 for observer in self.observers.values(): 253 observer.on_device_disconnected(remote_device) 254 255 def __init__(self, bus, hci): 256 """Construct the client. 257 258 @param bus: DBus bus over which we'll establish connections. 259 @param hci: HCI adapter index. Get this value from `get_default_adapter` 260 on FlossManagerClient. 261 """ 262 self.bus = bus 263 self.hci = hci 264 self.objpath = self.ADAPTER_OBJECT_PATTERN.format(hci) 265 266 # We don't register callbacks by default. 267 self.callbacks = None 268 self.connection_callbacks = None 269 270 # Locally cached values 271 self.known_devices = {} 272 self.discovering = False 273 274 # Initialize properties when registering callbacks (we know proxy is 275 # valid at this point). 276 self.properties = None 277 self.remote_properties = None 278 279 def __del__(self): 280 """Destructor""" 281 del self.callbacks 282 del self.connection_callbacks 283 284 def _make_device(self, 285 address, 286 name, 287 bond_state=BondState.NOT_BONDED, 288 connected=False): 289 """Make a device dict.""" 290 return { 291 'address': address, 292 'name': name, 293 'bond_state': bond_state, 294 'connected': connected, 295 } 296 297 @glib_callback() 298 def on_device_found(self, remote_device): 299 """Remote device was found as part of discovery.""" 300 address, name = remote_device 301 302 # Update a new device 303 if not address in self.known_devices: 304 self.known_devices[address] = self._make_device(address, name) 305 # Update name if previous cached value didn't have a name 306 elif not self.known_devices[address]: 307 self.known_devices[address]['name'] = name 308 309 @glib_callback() 310 def on_discovering_changed(self, discovering): 311 """Discovering state has changed.""" 312 # Ignore a no-op 313 if self.discovering == discovering: 314 return 315 316 # Cache the value 317 self.discovering = discovering 318 319 # If we are freshly starting discoveyr, clear all locally cached known 320 # devices (that are not bonded or connected) 321 if discovering: 322 # Filter known devices to currently bonded or connected devices 323 self.known_devices = { 324 key: value 325 for key, value in self.known_devices.items() 326 if value.get('bond_state', 0) > 0 327 or value.get('connected', False) 328 } 329 330 @glib_callback() 331 def on_bond_state_changed(self, status, address, state): 332 """Bond state has changed.""" 333 # You can bond unknown devices if it was previously bonded 334 if not address in self.known_devices: 335 self.known_devices[address] = self._make_device( 336 address, 337 '', 338 bond_state=state, 339 ) 340 else: 341 self.known_devices[address]['bond_state'] = state 342 343 @glib_callback() 344 def on_device_connected(self, remote_device): 345 """Remote device connected hci.""" 346 address, name = remote_device 347 if not address in self.known_devices: 348 self.known_devices[address] = self._make_device(address, 349 name, 350 connected=True) 351 else: 352 self.known_devices[address]['connected'] = True 353 354 @glib_callback() 355 def on_device_disconnected(self, remote_device): 356 """Remote device disconnected hci.""" 357 address, name = remote_device 358 if not address in self.known_devices: 359 self.known_devices[address] = self._make_device(address, 360 name, 361 connected=False) 362 else: 363 self.known_devices[address]['connected'] = False 364 365 def _make_dbus_device(self, address, name): 366 return { 367 'address': GLib.Variant('s', address), 368 'name': GLib.Variant('s', name) 369 } 370 371 @glib_call(False) 372 def has_proxy(self): 373 """Checks whether adapter proxy can be acquired.""" 374 return bool(self.proxy()) 375 376 def proxy(self): 377 """Gets proxy object to adapter interface for method calls.""" 378 return self.bus.get(self.ADAPTER_SERVICE, 379 self.objpath)[self.ADAPTER_INTERFACE] 380 381 # TODO(b/227405934): Not sure we want GetRemoteRssi on adapter api since 382 # it's unlikely to be accurate over time. Use a mock for 383 # testing for now. 384 def get_mock_remote_rssi(self, device): 385 """Gets mock value for remote device rssi.""" 386 return -50 387 388 def register_properties(self): 389 """Registers a property set for this client.""" 390 self.properties = PropertySet({ 391 'Address': (self.proxy().GetAddress, None), 392 'Name': (self.proxy().GetName, self.proxy().SetName), 393 'Class': (self.proxy().GetBluetoothClass, 394 self.proxy().SetBluetoothClass), 395 'Uuids': (self.proxy().GetUuids, None), 396 'Discoverable': 397 (self.proxy().GetDiscoverable, self.proxy().SetDiscoverable), 398 }) 399 400 self.remote_properties = PropertySet({ 401 'Name': (self.proxy().GetRemoteName, None), 402 'Type': (self.proxy().GetRemoteType, None), 403 'Alias': (self.proxy().GetRemoteAlias, None), 404 'Class': (self.proxy().GetRemoteClass, None), 405 'RSSI': (self.get_mock_remote_rssi, None), 406 }) 407 408 @glib_call(False) 409 def register_callbacks(self): 410 """Registers callbacks for this client. 411 412 This will also initialize properties and populate the list of bonded 413 devices since this should be the first thing that gets called after we 414 know that the adapter client has a valid proxy object. 415 """ 416 # Make sure properties are registered 417 if not self.properties: 418 self.register_properties() 419 420 # Prevent callback registration multiple times 421 if self.callbacks and self.connection_callbacks: 422 return True 423 424 # Generate a random number between 1-1000 425 rnumber = math.floor(random.random() * 1000 + 1) 426 427 # Reset known devices to just bonded devices and their connection 428 # states. 429 self.known_devices.clear() 430 bonded_devices = self.proxy().GetBondedDevices() 431 for device in bonded_devices: 432 (success, devtuple) = FlossAdapterClient.parse_dbus_device(device) 433 if success: 434 (address, name) = devtuple 435 cstate = self.proxy().GetConnectionState( 436 self._make_dbus_device(address, name)) 437 logging.info('[%s:%s] initially bonded. Connected = %d', 438 address, name, cstate) 439 self.known_devices[address] = self._make_device( 440 address, 441 name, 442 bond_state=BondState.BONDED, 443 connected=bool(cstate > 0)) 444 445 if not self.callbacks: 446 # Create and publish callbacks 447 self.callbacks = self.ExportedAdapterCallbacks() 448 self.callbacks.add_observer('adapter_client', self) 449 objpath = self.ADAPTER_CB_OBJ_PATTERN.format(self.hci, rnumber) 450 self.bus.register_object(objpath, self.callbacks, None) 451 452 # Register published callback with adapter daemon 453 self.proxy().RegisterCallback(objpath) 454 455 if not self.connection_callbacks: 456 self.connection_callbacks = self.ExportedConnectionCallbacks( 457 self.bus, objpath) 458 self.connection_callbacks.add_observer('adapter_client', self) 459 objpath = self.ADAPTER_CONN_CB_OBJ_PATTERN.format( 460 self.hci, rnumber) 461 self.bus.register_object(objpath, self.connection_callbacks, None) 462 463 self.proxy().RegisterConnectionCallback(objpath) 464 465 return True 466 467 def register_callback_observer(self, name, observer): 468 """Add an observer for all callbacks. 469 470 @param name: Name of the observer. 471 @param observer: Observer that implements all callback classes. 472 """ 473 if isinstance(observer, BluetoothCallbacks): 474 self.callbacks.add_observer(name, observer) 475 476 if isinstance(observer, BluetoothConnectionCallbacks): 477 self.connection_callbacks.add_observer(name, observer) 478 479 def unregister_callback_observer(self, name, observer): 480 """Remove an observer for all callbacks. 481 482 @param name: Name of the observer. 483 @param observer: Observer that implements all callback classes. 484 """ 485 if isinstance(observer, BluetoothCallbacks): 486 self.callbacks.remove_observer(name, observer) 487 488 if isinstance(observer, BluetoothConnectionCallbacks): 489 self.connection_callbacks.remove_observer(name, observer) 490 491 @glib_call('') 492 def get_address(self): 493 """Gets the adapter's current address.""" 494 return str(self.proxy().GetAddress()) 495 496 @glib_call('') 497 def get_name(self): 498 """Gets the adapter's name.""" 499 return str(self.proxy().GetName()) 500 501 @glib_call(None) 502 def get_property(self, prop_name): 503 """Gets property by name.""" 504 return self.properties.get(prop_name) 505 506 @glib_call(None) 507 def get_remote_property(self, address, prop_name): 508 """Gets remote device property by name.""" 509 name = 'Test device' 510 if address in self.known_devices: 511 name = self.known_devices[address]['name'] 512 513 remote_device = self._make_dbus_device(address, name) 514 return self.remote_properties.get(prop_name, remote_device) 515 516 @glib_call(None) 517 def set_property(self, prop_name, *args): 518 """Sets property by name.""" 519 return self.properties.set(prop_name, *args) 520 521 @glib_call(None) 522 def set_remote_property(self, address, prop_name, *args): 523 """Sets remote property by name.""" 524 name = 'Test device' 525 if address in self.known_devices: 526 name = self.known_devices[address]['name'] 527 528 remote_device = self._make_dbus_device(address, name) 529 return self.properties.set(prop_name, remote_device, *args) 530 531 @glib_call(False) 532 def start_discovery(self): 533 """Starts discovery session.""" 534 return bool(self.proxy().StartDiscovery()) 535 536 @glib_call(False) 537 def stop_discovery(self): 538 """Stops discovery session.""" 539 return bool(self.proxy().CancelDiscovery()) 540 541 @glib_call(False) 542 def is_discovering(self): 543 """Is adapter discovering?""" 544 return bool(self.discovering) 545 546 @glib_call(False) 547 def has_device(self, address): 548 """Checks to see if device with address is known.""" 549 return address in self.known_devices 550 551 def is_bonded(self, address): 552 """Checks if the given address is currently fully bonded.""" 553 return address in self.known_devices and self.known_devices[ 554 address].get('bond_state', 555 BondState.NOT_BONDED) == BondState.BONDED 556 557 @glib_call(False) 558 def create_bond(self, address, transport): 559 """Creates bond with target address. 560 """ 561 name = 'Test bond' 562 if address in self.known_devices: 563 name = self.known_devices[address]['name'] 564 565 remote_device = self._make_dbus_device(address, name) 566 return bool(self.proxy().CreateBond(remote_device, int(transport))) 567 568 @glib_call(False) 569 def cancel_bond(self, address): 570 """Call cancel bond with no additional checks. Prefer |forget_device|. 571 572 @param address: Device to cancel bond. 573 @returns Result of |CancelBondProcess|. 574 """ 575 name = 'Test bond' 576 if address in self.known_devices: 577 name = self.known_devices[address]['name'] 578 579 remote_device = self._make_dbus_device(address, name) 580 return bool(self.proxy().CancelBond(remote_device)) 581 582 @glib_call(False) 583 def remove_bond(self, address): 584 """Call remove bond with no additional checks. Prefer |forget_device|. 585 586 @param address: Device to remove bond. 587 @returns Result of |RemoveBond|. 588 """ 589 name = 'Test bond' 590 if address in self.known_devices: 591 name = self.known_devices[address]['name'] 592 593 remote_device = self._make_dbus_device(address, name) 594 return bool(self.proxy().RemoveBond(remote_device)) 595 596 @glib_call(False) 597 def forget_device(self, address): 598 """Forgets device from local cache and removes bonding. 599 600 If a device is currently bonding or bonded, it will cancel or remove the 601 bond to totally remove this device. 602 603 @return 604 True if device was known and was removed. 605 False if device was unknown or removal failed. 606 """ 607 if address not in self.known_devices: 608 return False 609 610 # Remove the device from known devices first 611 device = self.known_devices[address] 612 del self.known_devices[address] 613 614 remote_device = self._make_dbus_device(device['address'], 615 device['name']) 616 617 # Extra actions if bond state is not NOT_BONDED 618 if device['bond_state'] == BondState.BONDING: 619 return bool(self.proxy().CancelBondProcess(remote_device)) 620 elif device['bond_state'] == BondState.BONDED: 621 return bool(self.proxy().RemoveBond(remote_device)) 622 623 return True 624 625 @glib_call(False) 626 def set_pairing_confirmation(self, address, accept): 627 """Confirm that a pairing should be completed on a bonding device.""" 628 # Device should be known or already `Bonding` 629 if address not in self.known_devices: 630 logging.debug('[%s] Unknown device in set_pairing_confirmation', 631 address) 632 return False 633 634 device = self.known_devices[address] 635 remote_device = self._make_dbus_device(address, device['name']) 636 637 return bool(self.proxy().SetPairingConfirmation(remote_device, accept)) 638 639 def get_connected_devices_count(self): 640 """Gets the number of known, connected devices.""" 641 return sum([ 642 1 for x in self.known_devices.values() 643 if x.get('connected', False) 644 ]) 645 646 def is_connected(self, address): 647 """Checks whether a device is connected.""" 648 return address in self.known_devices and self.known_devices[ 649 address].get('connected', False) 650 651 @glib_call(False) 652 def connect_all_enabled_profiles(self, address): 653 """Connect all enabled profiles for target address.""" 654 device = self._make_dbus_device( 655 address, 656 self.known_devices.get(address, {}).get('name', 'Test device')) 657 return bool(self.proxy().ConnectAllEnabledProfiles(device)) 658 659 @glib_call(False) 660 def disconnect_all_enabled_profiles(self, address): 661 """Disconnect all enabled profiles for target address.""" 662 device = self._make_dbus_device( 663 address, 664 self.known_devices.get(address, {}).get('name', 'Test device')) 665 return bool(self.proxy().DisconnectAllEnabledProfiles(device)) 666