# Lint as: python2, python3
# Copyright 2021 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Client class to access the Floss adapter interface."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from enum import IntEnum
from gi.repository import GLib
import logging
import math
import random
from autotest_lib.client.cros.bluetooth.floss.observer_base import ObserverBase
from autotest_lib.client.cros.bluetooth.floss.utils import (glib_call,
glib_callback,
PropertySet)
class BondState(IntEnum):
"""Bluetooth bonding state."""
NOT_BONDED = 0
BONDING = 1
BONDED = 2
class Transport(IntEnum):
"""Bluetooth transport type."""
AUTO = 0
BREDR = 1
LE = 2
class SspVariant(IntEnum):
"""Bluetooth SSP variant type."""
PASSKEY_CONFIRMATION = 0
PASSKEY_ENTRY = 1
CONSENT = 2
PASSKEY_NOTIFICATION = 3
class BluetoothCallbacks:
"""Callbacks for the Adapter Interface.
Implement this to observe these callbacks when exporting callbacks via
register_callback.
"""
def on_address_changed(self, addr):
"""Adapter address changed.
@param addr: New address of the adapter.
"""
pass
def on_device_found(self, remote_device):
"""Device found via discovery.
@param remote_device: Remove device found during discovery session.
"""
pass
def on_discovering_changed(self, discovering):
"""Discovering state has changed.
@param discovering: Whether discovery enabled or disabled.
"""
pass
def on_ssp_request(self, remote_device, class_of_device, variant, passkey):
"""Simple secure pairing request for agent to reply.
@param remote_device: Remote device that is being paired.
@param class_of_device: Class of device as described in HCI spec.
@param variant: SSP variant (0-3). [Confirmation, Entry, Consent, Notification]
@param passkey: Passkey to display (so user can confirm or type it).
"""
pass
def on_bond_state_changed(self, status, device_address, state):
"""Bonding/Pairing state has changed for a device.
@param status: Success (0) or failure reason for bonding.
@param device_address: This notification is for this BDADDR.
@param state: Bonding state. 0 = Not bonded, 1 = Bonding, 2 = Bonded.
"""
pass
class BluetoothConnectionCallbacks:
"""Callbacks for the Device Connection interface.
Implement this to observe these callbacks when exporting callbacks via
register_connection_callback
"""
def on_device_connected(self, remote_device):
"""Notification that a device has completed HCI connection.
@param remote_device: Remote device that completed HCI connection.
"""
pass
def on_device_disconnected(self, remote_device):
"""Notification that a device has completed HCI disconnection.
@param remote_device: Remote device that completed HCI disconnection.
"""
pass
class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):
"""Handles method calls to and callbacks from the Adapter interface."""
ADAPTER_SERVICE = 'org.chromium.bluetooth'
ADAPTER_INTERFACE = 'org.chromium.bluetooth.Bluetooth'
ADAPTER_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/adapter'
ADAPTER_CB_INTF = 'org.chromium.bluetooth.BluetoothCallback'
ADAPTER_CB_OBJ_PATTERN = '/org/chromium/bluetooth/hci{}/test_adapter_client{}'
ADAPTER_CONN_CB_INTF = 'org.chromium.bluetooth.BluetoothConnectionCallback'
ADAPTER_CONN_CB_OBJ_PATTERN = '/org/chromium/bluetooth/hci{}/test_connection_client{}'
@staticmethod
def parse_dbus_device(remote_device_dbus):
"""Parse a dbus variant dict as a remote device.
@param remote_device_dbus: Variant dict with signature a{sv}.
@return Parsing success, BluetoothDevice tuple
"""
if 'address' in remote_device_dbus and 'name' in remote_device_dbus:
return True, (str(remote_device_dbus['address']),
str(remote_device_dbus['name']))
return False, None
class ExportedAdapterCallbacks(ObserverBase):
"""
"""
def __init__(self):
"""Construct exported callbacks object.
"""
ObserverBase.__init__(self)
def OnAddressChanged(self, addr):
"""Handle address changed callbacks."""
for observer in self.observers.values():
observer.on_address_changed(addr)
def OnDeviceFound(self, remote_device_dbus):
"""Handle device found from discovery."""
parsed, remote_device = FlossAdapterClient.parse_dbus_device(
remote_device_dbus)
if not parsed:
logging.debug('OnDeviceFound parse error: {}'.format(
remote_device_dbus))
return
for observer in self.observers.values():
observer.on_device_found(remote_device)
def OnDiscoveringChanged(self, discovering):
"""Handle discovering state changed."""
for observer in self.observers.values():
observer.on_discovering_changed(bool(discovering))
def OnSspRequest(self, remote_device_dbus, class_of_device, variant,
passkey):
"""Handle pairing/bonding request to agent."""
parsed, remote_device = FlossAdapterClient.parse_dbus_device(
remote_device_dbus)
if not parsed:
logging.debug('OnSspRequest parse error: {}'.format(
remote_device_dbus))
return
for observer in self.observers.values():
observer.on_ssp_request(remote_device, class_of_device,
variant, passkey)
def OnBondStateChanged(self, status, address, state):
"""Handle bond state changed callbacks."""
for observer in self.observers.values():
observer.on_bond_state_changed(status, address, state)
class ExportedConnectionCallbacks(ObserverBase):
"""
"""
def __init__(self, bus, object_path):
"""Construct exported connection callbacks object.
"""
ObserverBase.__init__(self)
def OnDeviceConnected(self, remote_device_dbus):
"""Handle device connected."""
parsed, remote_device = FlossAdapterClient.parse_dbus_device(
remote_device_dbus)
if not parsed:
logging.debug('OnDeviceConnected parse error: {}'.format(
remote_device_dbus))
return
for observer in self.observers.values():
observer.on_device_connected(remote_device)
def OnDeviceDisconnected(self, remote_device_dbus):
"""Handle device disconnected."""
parsed, remote_device = FlossAdapterClient.parse_dbus_device(
remote_device_dbus)
if not parsed:
logging.debug('OnDeviceDisconnected parse error: {}'.format(
remote_device_dbus))
return
for observer in self.observers.values():
observer.on_device_disconnected(remote_device)
def __init__(self, bus, hci):
"""Construct the client.
@param bus: DBus bus over which we'll establish connections.
@param hci: HCI adapter index. Get this value from `get_default_adapter`
on FlossManagerClient.
"""
self.bus = bus
self.hci = hci
self.objpath = self.ADAPTER_OBJECT_PATTERN.format(hci)
# We don't register callbacks by default.
self.callbacks = None
self.connection_callbacks = None
# Locally cached values
self.known_devices = {}
self.discovering = False
# Initialize properties when registering callbacks (we know proxy is
# valid at this point).
self.properties = None
self.remote_properties = None
def __del__(self):
"""Destructor"""
del self.callbacks
del self.connection_callbacks
def _make_device(self,
address,
name,
bond_state=BondState.NOT_BONDED,
connected=False):
"""Make a device dict."""
return {
'address': address,
'name': name,
'bond_state': bond_state,
'connected': connected,
}
@glib_callback()
def on_device_found(self, remote_device):
"""Remote device was found as part of discovery."""
address, name = remote_device
# Update a new device
if not address in self.known_devices:
self.known_devices[address] = self._make_device(address, name)
# Update name if previous cached value didn't have a name
elif not self.known_devices[address]:
self.known_devices[address]['name'] = name
@glib_callback()
def on_discovering_changed(self, discovering):
"""Discovering state has changed."""
# Ignore a no-op
if self.discovering == discovering:
return
# Cache the value
self.discovering = discovering
# If we are freshly starting discoveyr, clear all locally cached known
# devices (that are not bonded or connected)
if discovering:
# Filter known devices to currently bonded or connected devices
self.known_devices = {
key: value
for key, value in self.known_devices.items()
if value.get('bond_state', 0) > 0
or value.get('connected', False)
}
@glib_callback()
def on_bond_state_changed(self, status, address, state):
"""Bond state has changed."""
# You can bond unknown devices if it was previously bonded
if not address in self.known_devices:
self.known_devices[address] = self._make_device(
address,
'',
bond_state=state,
)
else:
self.known_devices[address]['bond_state'] = state
@glib_callback()
def on_device_connected(self, remote_device):
"""Remote device connected hci."""
address, name = remote_device
if not address in self.known_devices:
self.known_devices[address] = self._make_device(address,
name,
connected=True)
else:
self.known_devices[address]['connected'] = True
@glib_callback()
def on_device_disconnected(self, remote_device):
"""Remote device disconnected hci."""
address, name = remote_device
if not address in self.known_devices:
self.known_devices[address] = self._make_device(address,
name,
connected=False)
else:
self.known_devices[address]['connected'] = False
def _make_dbus_device(self, address, name):
return {
'address': GLib.Variant('s', address),
'name': GLib.Variant('s', name)
}
@glib_call(False)
def has_proxy(self):
"""Checks whether adapter proxy can be acquired."""
return bool(self.proxy())
def proxy(self):
"""Gets proxy object to adapter interface for method calls."""
return self.bus.get(self.ADAPTER_SERVICE,
self.objpath)[self.ADAPTER_INTERFACE]
# TODO(b/227405934): Not sure we want GetRemoteRssi on adapter api since
# it's unlikely to be accurate over time. Use a mock for
# testing for now.
def get_mock_remote_rssi(self, device):
"""Gets mock value for remote device rssi."""
return -50
def register_properties(self):
"""Registers a property set for this client."""
self.properties = PropertySet({
'Address': (self.proxy().GetAddress, None),
'Name': (self.proxy().GetName, self.proxy().SetName),
'Class': (self.proxy().GetBluetoothClass,
self.proxy().SetBluetoothClass),
'Uuids': (self.proxy().GetUuids, None),
'Discoverable':
(self.proxy().GetDiscoverable, self.proxy().SetDiscoverable),
})
self.remote_properties = PropertySet({
'Name': (self.proxy().GetRemoteName, None),
'Type': (self.proxy().GetRemoteType, None),
'Alias': (self.proxy().GetRemoteAlias, None),
'Class': (self.proxy().GetRemoteClass, None),
'RSSI': (self.get_mock_remote_rssi, None),
})
@glib_call(False)
def register_callbacks(self):
"""Registers callbacks for this client.
This will also initialize properties and populate the list of bonded
devices since this should be the first thing that gets called after we
know that the adapter client has a valid proxy object.
"""
# Make sure properties are registered
if not self.properties:
self.register_properties()
# Prevent callback registration multiple times
if self.callbacks and self.connection_callbacks:
return True
# Generate a random number between 1-1000
rnumber = math.floor(random.random() * 1000 + 1)
# Reset known devices to just bonded devices and their connection
# states.
self.known_devices.clear()
bonded_devices = self.proxy().GetBondedDevices()
for device in bonded_devices:
(success, devtuple) = FlossAdapterClient.parse_dbus_device(device)
if success:
(address, name) = devtuple
cstate = self.proxy().GetConnectionState(
self._make_dbus_device(address, name))
logging.info('[%s:%s] initially bonded. Connected = %d',
address, name, cstate)
self.known_devices[address] = self._make_device(
address,
name,
bond_state=BondState.BONDED,
connected=bool(cstate > 0))
if not self.callbacks:
# Create and publish callbacks
self.callbacks = self.ExportedAdapterCallbacks()
self.callbacks.add_observer('adapter_client', self)
objpath = self.ADAPTER_CB_OBJ_PATTERN.format(self.hci, rnumber)
self.bus.register_object(objpath, self.callbacks, None)
# Register published callback with adapter daemon
self.proxy().RegisterCallback(objpath)
if not self.connection_callbacks:
self.connection_callbacks = self.ExportedConnectionCallbacks(
self.bus, objpath)
self.connection_callbacks.add_observer('adapter_client', self)
objpath = self.ADAPTER_CONN_CB_OBJ_PATTERN.format(
self.hci, rnumber)
self.bus.register_object(objpath, self.connection_callbacks, None)
self.proxy().RegisterConnectionCallback(objpath)
return True
def register_callback_observer(self, name, observer):
"""Add an observer for all callbacks.
@param name: Name of the observer.
@param observer: Observer that implements all callback classes.
"""
if isinstance(observer, BluetoothCallbacks):
self.callbacks.add_observer(name, observer)
if isinstance(observer, BluetoothConnectionCallbacks):
self.connection_callbacks.add_observer(name, observer)
def unregister_callback_observer(self, name, observer):
"""Remove an observer for all callbacks.
@param name: Name of the observer.
@param observer: Observer that implements all callback classes.
"""
if isinstance(observer, BluetoothCallbacks):
self.callbacks.remove_observer(name, observer)
if isinstance(observer, BluetoothConnectionCallbacks):
self.connection_callbacks.remove_observer(name, observer)
@glib_call('')
def get_address(self):
"""Gets the adapter's current address."""
return str(self.proxy().GetAddress())
@glib_call('')
def get_name(self):
"""Gets the adapter's name."""
return str(self.proxy().GetName())
@glib_call(None)
def get_property(self, prop_name):
"""Gets property by name."""
return self.properties.get(prop_name)
@glib_call(None)
def get_remote_property(self, address, prop_name):
"""Gets remote device property by name."""
name = 'Test device'
if address in self.known_devices:
name = self.known_devices[address]['name']
remote_device = self._make_dbus_device(address, name)
return self.remote_properties.get(prop_name, remote_device)
@glib_call(None)
def set_property(self, prop_name, *args):
"""Sets property by name."""
return self.properties.set(prop_name, *args)
@glib_call(None)
def set_remote_property(self, address, prop_name, *args):
"""Sets remote property by name."""
name = 'Test device'
if address in self.known_devices:
name = self.known_devices[address]['name']
remote_device = self._make_dbus_device(address, name)
return self.properties.set(prop_name, remote_device, *args)
@glib_call(False)
def start_discovery(self):
"""Starts discovery session."""
return bool(self.proxy().StartDiscovery())
@glib_call(False)
def stop_discovery(self):
"""Stops discovery session."""
return bool(self.proxy().CancelDiscovery())
@glib_call(False)
def is_discovering(self):
"""Is adapter discovering?"""
return bool(self.discovering)
@glib_call(False)
def has_device(self, address):
"""Checks to see if device with address is known."""
return address in self.known_devices
def is_bonded(self, address):
"""Checks if the given address is currently fully bonded."""
return address in self.known_devices and self.known_devices[
address].get('bond_state',
BondState.NOT_BONDED) == BondState.BONDED
@glib_call(False)
def create_bond(self, address, transport):
"""Creates bond with target address.
"""
name = 'Test bond'
if address in self.known_devices:
name = self.known_devices[address]['name']
remote_device = self._make_dbus_device(address, name)
return bool(self.proxy().CreateBond(remote_device, int(transport)))
@glib_call(False)
def cancel_bond(self, address):
"""Call cancel bond with no additional checks. Prefer |forget_device|.
@param address: Device to cancel bond.
@returns Result of |CancelBondProcess|.
"""
name = 'Test bond'
if address in self.known_devices:
name = self.known_devices[address]['name']
remote_device = self._make_dbus_device(address, name)
return bool(self.proxy().CancelBond(remote_device))
@glib_call(False)
def remove_bond(self, address):
"""Call remove bond with no additional checks. Prefer |forget_device|.
@param address: Device to remove bond.
@returns Result of |RemoveBond|.
"""
name = 'Test bond'
if address in self.known_devices:
name = self.known_devices[address]['name']
remote_device = self._make_dbus_device(address, name)
return bool(self.proxy().RemoveBond(remote_device))
@glib_call(False)
def forget_device(self, address):
"""Forgets device from local cache and removes bonding.
If a device is currently bonding or bonded, it will cancel or remove the
bond to totally remove this device.
@return
True if device was known and was removed.
False if device was unknown or removal failed.
"""
if address not in self.known_devices:
return False
# Remove the device from known devices first
device = self.known_devices[address]
del self.known_devices[address]
remote_device = self._make_dbus_device(device['address'],
device['name'])
# Extra actions if bond state is not NOT_BONDED
if device['bond_state'] == BondState.BONDING:
return bool(self.proxy().CancelBondProcess(remote_device))
elif device['bond_state'] == BondState.BONDED:
return bool(self.proxy().RemoveBond(remote_device))
return True
@glib_call(False)
def set_pairing_confirmation(self, address, accept):
"""Confirm that a pairing should be completed on a bonding device."""
# Device should be known or already `Bonding`
if address not in self.known_devices:
logging.debug('[%s] Unknown device in set_pairing_confirmation',
address)
return False
device = self.known_devices[address]
remote_device = self._make_dbus_device(address, device['name'])
return bool(self.proxy().SetPairingConfirmation(remote_device, accept))
def get_connected_devices_count(self):
"""Gets the number of known, connected devices."""
return sum([
1 for x in self.known_devices.values()
if x.get('connected', False)
])
def is_connected(self, address):
"""Checks whether a device is connected."""
return address in self.known_devices and self.known_devices[
address].get('connected', False)
@glib_call(False)
def connect_all_enabled_profiles(self, address):
"""Connect all enabled profiles for target address."""
device = self._make_dbus_device(
address,
self.known_devices.get(address, {}).get('name', 'Test device'))
return bool(self.proxy().ConnectAllEnabledProfiles(device))
@glib_call(False)
def disconnect_all_enabled_profiles(self, address):
"""Disconnect all enabled profiles for target address."""
device = self._make_dbus_device(
address,
self.known_devices.get(address, {}).get('name', 'Test device'))
return bool(self.proxy().DisconnectAllEnabledProfiles(device))