• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2021 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Client class to access the Floss adapter interface."""
6
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10
11from enum import IntEnum
12from gi.repository import GLib
13import logging
14import math
15import random
16
17from autotest_lib.client.cros.bluetooth.floss.observer_base import ObserverBase
18from autotest_lib.client.cros.bluetooth.floss.utils import (glib_call,
19                                                            glib_callback,
20                                                            PropertySet)
21
22
23class BondState(IntEnum):
24    """Bluetooth bonding state."""
25    NOT_BONDED = 0
26    BONDING = 1
27    BONDED = 2
28
29
30class Transport(IntEnum):
31    """Bluetooth transport type."""
32    AUTO = 0
33    BREDR = 1
34    LE = 2
35
36
37class SspVariant(IntEnum):
38    """Bluetooth SSP variant type."""
39    PASSKEY_CONFIRMATION = 0
40    PASSKEY_ENTRY = 1
41    CONSENT = 2
42    PASSKEY_NOTIFICATION = 3
43
44
45class BluetoothCallbacks:
46    """Callbacks for the Adapter Interface.
47
48    Implement this to observe these callbacks when exporting callbacks via
49    register_callback.
50    """
51    def on_address_changed(self, addr):
52        """Adapter address changed.
53
54        @param addr: New address of the adapter.
55        """
56        pass
57
58    def on_device_found(self, remote_device):
59        """Device found via discovery.
60
61        @param remote_device: Remove device found during discovery session.
62        """
63        pass
64
65    def on_discovering_changed(self, discovering):
66        """Discovering state has changed.
67
68        @param discovering: Whether discovery enabled or disabled.
69        """
70        pass
71
72    def on_ssp_request(self, remote_device, class_of_device, variant, passkey):
73        """Simple secure pairing request for agent to reply.
74
75        @param remote_device: Remote device that is being paired.
76        @param class_of_device: Class of device as described in HCI spec.
77        @param variant: SSP variant (0-3). [Confirmation, Entry, Consent, Notification]
78        @param passkey: Passkey to display (so user can confirm or type it).
79        """
80        pass
81
82    def on_bond_state_changed(self, status, device_address, state):
83        """Bonding/Pairing state has changed for a device.
84
85        @param status: Success (0) or failure reason for bonding.
86        @param device_address: This notification is for this BDADDR.
87        @param state: Bonding state. 0 = Not bonded, 1 = Bonding, 2 = Bonded.
88        """
89        pass
90
91
92class BluetoothConnectionCallbacks:
93    """Callbacks for the Device Connection interface.
94
95    Implement this to observe these callbacks when exporting callbacks via
96    register_connection_callback
97    """
98    def on_device_connected(self, remote_device):
99        """Notification that a device has completed HCI connection.
100
101        @param remote_device: Remote device that completed HCI connection.
102        """
103        pass
104
105    def on_device_disconnected(self, remote_device):
106        """Notification that a device has completed HCI disconnection.
107
108        @param remote_device: Remote device that completed HCI disconnection.
109        """
110        pass
111
112
113class FlossAdapterClient(BluetoothCallbacks, BluetoothConnectionCallbacks):
114    """Handles method calls to and callbacks from the Adapter interface."""
115
116    ADAPTER_SERVICE = 'org.chromium.bluetooth'
117    ADAPTER_INTERFACE = 'org.chromium.bluetooth.Bluetooth'
118    ADAPTER_OBJECT_PATTERN = '/org/chromium/bluetooth/hci{}/adapter'
119    ADAPTER_CB_INTF = 'org.chromium.bluetooth.BluetoothCallback'
120    ADAPTER_CB_OBJ_PATTERN = '/org/chromium/bluetooth/hci{}/test_adapter_client{}'
121    ADAPTER_CONN_CB_INTF = 'org.chromium.bluetooth.BluetoothConnectionCallback'
122    ADAPTER_CONN_CB_OBJ_PATTERN = '/org/chromium/bluetooth/hci{}/test_connection_client{}'
123
124    @staticmethod
125    def parse_dbus_device(remote_device_dbus):
126        """Parse a dbus variant dict as a remote device.
127
128        @param remote_device_dbus: Variant dict with signature a{sv}.
129
130        @return Parsing success, BluetoothDevice tuple
131        """
132        if 'address' in remote_device_dbus and 'name' in remote_device_dbus:
133            return True, (str(remote_device_dbus['address']),
134                          str(remote_device_dbus['name']))
135
136        return False, None
137
138    class ExportedAdapterCallbacks(ObserverBase):
139        """
140        <node>
141            <interface name="org.chromium.bluetooth.BluetoothCallback">
142                <method name="OnAddressChanged">
143                    <arg type="s" name="addr" direction="in" />
144                </method>
145                <method name="OnDeviceFound">
146                    <arg type="a{sv}" name="remote_device_dbus" direction="in" />
147                </method>
148                <method name="OnDiscoveringChanged">
149                    <arg type="b" name="discovering" direction="in" />
150                </method>
151                <method name="OnSspRequest">
152                    <arg type="a{sv}" name="remote_device_dbus" direction="in" />
153                    <arg type="u" name="class_of_device" direction="in" />
154                    <arg type="u" name="variant" direction="in" />
155                    <arg type="u" name="passkey" direction="in" />
156                </method>
157                <method name="OnBondStateChanged">
158                    <arg type="u" name="status" direction="in" />
159                    <arg type="s" name="address" direction="in" />
160                    <arg type="u" name="state" direction="in" />
161                </method>
162            </interface>
163        </node>
164        """
165
166        def __init__(self):
167            """Construct exported callbacks object.
168            """
169            ObserverBase.__init__(self)
170
171        def OnAddressChanged(self, addr):
172            """Handle address changed callbacks."""
173            for observer in self.observers.values():
174                observer.on_address_changed(addr)
175
176        def OnDeviceFound(self, remote_device_dbus):
177            """Handle device found from discovery."""
178            parsed, remote_device = FlossAdapterClient.parse_dbus_device(
179                    remote_device_dbus)
180            if not parsed:
181                logging.debug('OnDeviceFound parse error: {}'.format(
182                        remote_device_dbus))
183                return
184
185            for observer in self.observers.values():
186                observer.on_device_found(remote_device)
187
188        def OnDiscoveringChanged(self, discovering):
189            """Handle discovering state changed."""
190            for observer in self.observers.values():
191                observer.on_discovering_changed(bool(discovering))
192
193        def OnSspRequest(self, remote_device_dbus, class_of_device, variant,
194                         passkey):
195            """Handle pairing/bonding request to agent."""
196            parsed, remote_device = FlossAdapterClient.parse_dbus_device(
197                    remote_device_dbus)
198            if not parsed:
199                logging.debug('OnSspRequest parse error: {}'.format(
200                        remote_device_dbus))
201                return
202
203            for observer in self.observers.values():
204                observer.on_ssp_request(remote_device, class_of_device,
205                                        variant, passkey)
206
207        def OnBondStateChanged(self, status, address, state):
208            """Handle bond state changed callbacks."""
209            for observer in self.observers.values():
210                observer.on_bond_state_changed(status, address, state)
211
212    class ExportedConnectionCallbacks(ObserverBase):
213        """
214        <node>
215            <interface name="org.chromium.bluetooth.BluetoothConnectionCallback">
216                <method name="OnDeviceConnected">
217                    <arg type="a{sv}" name="remote_device_dbus" direction="in" />
218                </method>
219                <method name="OnDeviceDisconnected">
220                    <arg type="a{sv}" name="remote_device_dbus" direction="in" />
221                </method>
222            </interface>
223        </node>
224        """
225
226        def __init__(self, bus, object_path):
227            """Construct exported connection callbacks object.
228            """
229            ObserverBase.__init__(self)
230
231        def OnDeviceConnected(self, remote_device_dbus):
232            """Handle device connected."""
233            parsed, remote_device = FlossAdapterClient.parse_dbus_device(
234                    remote_device_dbus)
235            if not parsed:
236                logging.debug('OnDeviceConnected parse error: {}'.format(
237                        remote_device_dbus))
238                return
239
240            for observer in self.observers.values():
241                observer.on_device_connected(remote_device)
242
243        def OnDeviceDisconnected(self, remote_device_dbus):
244            """Handle device disconnected."""
245            parsed, remote_device = FlossAdapterClient.parse_dbus_device(
246                    remote_device_dbus)
247            if not parsed:
248                logging.debug('OnDeviceDisconnected parse error: {}'.format(
249                        remote_device_dbus))
250                return
251
252            for observer in self.observers.values():
253                observer.on_device_disconnected(remote_device)
254
255    def __init__(self, bus, hci):
256        """Construct the client.
257
258        @param bus: DBus bus over which we'll establish connections.
259        @param hci: HCI adapter index. Get this value from `get_default_adapter`
260                    on FlossManagerClient.
261        """
262        self.bus = bus
263        self.hci = hci
264        self.objpath = self.ADAPTER_OBJECT_PATTERN.format(hci)
265
266        # We don't register callbacks by default.
267        self.callbacks = None
268        self.connection_callbacks = None
269
270        # Locally cached values
271        self.known_devices = {}
272        self.discovering = False
273
274        # Initialize properties when registering callbacks (we know proxy is
275        # valid at this point).
276        self.properties = None
277        self.remote_properties = None
278
279    def __del__(self):
280        """Destructor"""
281        del self.callbacks
282        del self.connection_callbacks
283
284    def _make_device(self,
285                     address,
286                     name,
287                     bond_state=BondState.NOT_BONDED,
288                     connected=False):
289        """Make a device dict."""
290        return {
291                'address': address,
292                'name': name,
293                'bond_state': bond_state,
294                'connected': connected,
295        }
296
297    @glib_callback()
298    def on_device_found(self, remote_device):
299        """Remote device was found as part of discovery."""
300        address, name = remote_device
301
302        # Update a new device
303        if not address in self.known_devices:
304            self.known_devices[address] = self._make_device(address, name)
305        # Update name if previous cached value didn't have a name
306        elif not self.known_devices[address]:
307            self.known_devices[address]['name'] = name
308
309    @glib_callback()
310    def on_discovering_changed(self, discovering):
311        """Discovering state has changed."""
312        # Ignore a no-op
313        if self.discovering == discovering:
314            return
315
316        # Cache the value
317        self.discovering = discovering
318
319        # If we are freshly starting discoveyr, clear all locally cached known
320        # devices (that are not bonded or connected)
321        if discovering:
322            # Filter known devices to currently bonded or connected devices
323            self.known_devices = {
324                    key: value
325                    for key, value in self.known_devices.items()
326                    if value.get('bond_state', 0) > 0
327                    or value.get('connected', False)
328            }
329
330    @glib_callback()
331    def on_bond_state_changed(self, status, address, state):
332        """Bond state has changed."""
333        # You can bond unknown devices if it was previously bonded
334        if not address in self.known_devices:
335            self.known_devices[address] = self._make_device(
336                    address,
337                    '',
338                    bond_state=state,
339            )
340        else:
341            self.known_devices[address]['bond_state'] = state
342
343    @glib_callback()
344    def on_device_connected(self, remote_device):
345        """Remote device connected hci."""
346        address, name = remote_device
347        if not address in self.known_devices:
348            self.known_devices[address] = self._make_device(address,
349                                                            name,
350                                                            connected=True)
351        else:
352            self.known_devices[address]['connected'] = True
353
354    @glib_callback()
355    def on_device_disconnected(self, remote_device):
356        """Remote device disconnected hci."""
357        address, name = remote_device
358        if not address in self.known_devices:
359            self.known_devices[address] = self._make_device(address,
360                                                            name,
361                                                            connected=False)
362        else:
363            self.known_devices[address]['connected'] = False
364
365    def _make_dbus_device(self, address, name):
366        return {
367                'address': GLib.Variant('s', address),
368                'name': GLib.Variant('s', name)
369        }
370
371    @glib_call(False)
372    def has_proxy(self):
373        """Checks whether adapter proxy can be acquired."""
374        return bool(self.proxy())
375
376    def proxy(self):
377        """Gets proxy object to adapter interface for method calls."""
378        return self.bus.get(self.ADAPTER_SERVICE,
379                            self.objpath)[self.ADAPTER_INTERFACE]
380
381    # TODO(b/227405934): Not sure we want GetRemoteRssi on adapter api since
382    #                    it's unlikely to be accurate over time. Use a mock for
383    #                    testing for now.
384    def get_mock_remote_rssi(self, device):
385        """Gets mock value for remote device rssi."""
386        return -50
387
388    def register_properties(self):
389        """Registers a property set for this client."""
390        self.properties = PropertySet({
391                'Address': (self.proxy().GetAddress, None),
392                'Name': (self.proxy().GetName, self.proxy().SetName),
393                'Class': (self.proxy().GetBluetoothClass,
394                          self.proxy().SetBluetoothClass),
395                'Uuids': (self.proxy().GetUuids, None),
396                'Discoverable':
397                (self.proxy().GetDiscoverable, self.proxy().SetDiscoverable),
398        })
399
400        self.remote_properties = PropertySet({
401                'Name': (self.proxy().GetRemoteName, None),
402                'Type': (self.proxy().GetRemoteType, None),
403                'Alias': (self.proxy().GetRemoteAlias, None),
404                'Class': (self.proxy().GetRemoteClass, None),
405                'RSSI': (self.get_mock_remote_rssi, None),
406        })
407
408    @glib_call(False)
409    def register_callbacks(self):
410        """Registers callbacks for this client.
411
412        This will also initialize properties and populate the list of bonded
413        devices since this should be the first thing that gets called after we
414        know that the adapter client has a valid proxy object.
415        """
416        # Make sure properties are registered
417        if not self.properties:
418            self.register_properties()
419
420        # Prevent callback registration multiple times
421        if self.callbacks and self.connection_callbacks:
422            return True
423
424        # Generate a random number between 1-1000
425        rnumber = math.floor(random.random() * 1000 + 1)
426
427        # Reset known devices to just bonded devices and their connection
428        # states.
429        self.known_devices.clear()
430        bonded_devices = self.proxy().GetBondedDevices()
431        for device in bonded_devices:
432            (success, devtuple) = FlossAdapterClient.parse_dbus_device(device)
433            if success:
434                (address, name) = devtuple
435                cstate = self.proxy().GetConnectionState(
436                        self._make_dbus_device(address, name))
437                logging.info('[%s:%s] initially bonded. Connected = %d',
438                             address, name, cstate)
439                self.known_devices[address] = self._make_device(
440                        address,
441                        name,
442                        bond_state=BondState.BONDED,
443                        connected=bool(cstate > 0))
444
445        if not self.callbacks:
446            # Create and publish callbacks
447            self.callbacks = self.ExportedAdapterCallbacks()
448            self.callbacks.add_observer('adapter_client', self)
449            objpath = self.ADAPTER_CB_OBJ_PATTERN.format(self.hci, rnumber)
450            self.bus.register_object(objpath, self.callbacks, None)
451
452            # Register published callback with adapter daemon
453            self.proxy().RegisterCallback(objpath)
454
455        if not self.connection_callbacks:
456            self.connection_callbacks = self.ExportedConnectionCallbacks(
457                    self.bus, objpath)
458            self.connection_callbacks.add_observer('adapter_client', self)
459            objpath = self.ADAPTER_CONN_CB_OBJ_PATTERN.format(
460                    self.hci, rnumber)
461            self.bus.register_object(objpath, self.connection_callbacks, None)
462
463            self.proxy().RegisterConnectionCallback(objpath)
464
465        return True
466
467    def register_callback_observer(self, name, observer):
468        """Add an observer for all callbacks.
469
470        @param name: Name of the observer.
471        @param observer: Observer that implements all callback classes.
472        """
473        if isinstance(observer, BluetoothCallbacks):
474            self.callbacks.add_observer(name, observer)
475
476        if isinstance(observer, BluetoothConnectionCallbacks):
477            self.connection_callbacks.add_observer(name, observer)
478
479    def unregister_callback_observer(self, name, observer):
480        """Remove an observer for all callbacks.
481
482        @param name: Name of the observer.
483        @param observer: Observer that implements all callback classes.
484        """
485        if isinstance(observer, BluetoothCallbacks):
486            self.callbacks.remove_observer(name, observer)
487
488        if isinstance(observer, BluetoothConnectionCallbacks):
489            self.connection_callbacks.remove_observer(name, observer)
490
491    @glib_call('')
492    def get_address(self):
493        """Gets the adapter's current address."""
494        return str(self.proxy().GetAddress())
495
496    @glib_call('')
497    def get_name(self):
498        """Gets the adapter's name."""
499        return str(self.proxy().GetName())
500
501    @glib_call(None)
502    def get_property(self, prop_name):
503        """Gets property by name."""
504        return self.properties.get(prop_name)
505
506    @glib_call(None)
507    def get_remote_property(self, address, prop_name):
508        """Gets remote device property by name."""
509        name = 'Test device'
510        if address in self.known_devices:
511            name = self.known_devices[address]['name']
512
513        remote_device = self._make_dbus_device(address, name)
514        return self.remote_properties.get(prop_name, remote_device)
515
516    @glib_call(None)
517    def set_property(self, prop_name, *args):
518        """Sets property by name."""
519        return self.properties.set(prop_name, *args)
520
521    @glib_call(None)
522    def set_remote_property(self, address, prop_name, *args):
523        """Sets remote property by name."""
524        name = 'Test device'
525        if address in self.known_devices:
526            name = self.known_devices[address]['name']
527
528        remote_device = self._make_dbus_device(address, name)
529        return self.properties.set(prop_name, remote_device, *args)
530
531    @glib_call(False)
532    def start_discovery(self):
533        """Starts discovery session."""
534        return bool(self.proxy().StartDiscovery())
535
536    @glib_call(False)
537    def stop_discovery(self):
538        """Stops discovery session."""
539        return bool(self.proxy().CancelDiscovery())
540
541    @glib_call(False)
542    def is_discovering(self):
543        """Is adapter discovering?"""
544        return bool(self.discovering)
545
546    @glib_call(False)
547    def has_device(self, address):
548        """Checks to see if device with address is known."""
549        return address in self.known_devices
550
551    def is_bonded(self, address):
552        """Checks if the given address is currently fully bonded."""
553        return address in self.known_devices and self.known_devices[
554                address].get('bond_state',
555                             BondState.NOT_BONDED) == BondState.BONDED
556
557    @glib_call(False)
558    def create_bond(self, address, transport):
559        """Creates bond with target address.
560        """
561        name = 'Test bond'
562        if address in self.known_devices:
563            name = self.known_devices[address]['name']
564
565        remote_device = self._make_dbus_device(address, name)
566        return bool(self.proxy().CreateBond(remote_device, int(transport)))
567
568    @glib_call(False)
569    def cancel_bond(self, address):
570        """Call cancel bond with no additional checks. Prefer |forget_device|.
571
572        @param address: Device to cancel bond.
573        @returns Result of |CancelBondProcess|.
574        """
575        name = 'Test bond'
576        if address in self.known_devices:
577            name = self.known_devices[address]['name']
578
579        remote_device = self._make_dbus_device(address, name)
580        return bool(self.proxy().CancelBond(remote_device))
581
582    @glib_call(False)
583    def remove_bond(self, address):
584        """Call remove bond with no additional checks. Prefer |forget_device|.
585
586        @param address: Device to remove bond.
587        @returns Result of |RemoveBond|.
588        """
589        name = 'Test bond'
590        if address in self.known_devices:
591            name = self.known_devices[address]['name']
592
593        remote_device = self._make_dbus_device(address, name)
594        return bool(self.proxy().RemoveBond(remote_device))
595
596    @glib_call(False)
597    def forget_device(self, address):
598        """Forgets device from local cache and removes bonding.
599
600        If a device is currently bonding or bonded, it will cancel or remove the
601        bond to totally remove this device.
602
603        @return
604            True if device was known and was removed.
605            False if device was unknown or removal failed.
606        """
607        if address not in self.known_devices:
608            return False
609
610        # Remove the device from known devices first
611        device = self.known_devices[address]
612        del self.known_devices[address]
613
614        remote_device = self._make_dbus_device(device['address'],
615                                               device['name'])
616
617        # Extra actions if bond state is not NOT_BONDED
618        if device['bond_state'] == BondState.BONDING:
619            return bool(self.proxy().CancelBondProcess(remote_device))
620        elif device['bond_state'] == BondState.BONDED:
621            return bool(self.proxy().RemoveBond(remote_device))
622
623        return True
624
625    @glib_call(False)
626    def set_pairing_confirmation(self, address, accept):
627        """Confirm that a pairing should be completed on a bonding device."""
628        # Device should be known or already `Bonding`
629        if address not in self.known_devices:
630            logging.debug('[%s] Unknown device in set_pairing_confirmation',
631                          address)
632            return False
633
634        device = self.known_devices[address]
635        remote_device = self._make_dbus_device(address, device['name'])
636
637        return bool(self.proxy().SetPairingConfirmation(remote_device, accept))
638
639    def get_connected_devices_count(self):
640        """Gets the number of known, connected devices."""
641        return sum([
642                1 for x in self.known_devices.values()
643                if x.get('connected', False)
644        ])
645
646    def is_connected(self, address):
647        """Checks whether a device is connected."""
648        return address in self.known_devices and self.known_devices[
649                address].get('connected', False)
650
651    @glib_call(False)
652    def connect_all_enabled_profiles(self, address):
653        """Connect all enabled profiles for target address."""
654        device = self._make_dbus_device(
655                address,
656                self.known_devices.get(address, {}).get('name', 'Test device'))
657        return bool(self.proxy().ConnectAllEnabledProfiles(device))
658
659    @glib_call(False)
660    def disconnect_all_enabled_profiles(self, address):
661        """Disconnect all enabled profiles for target address."""
662        device = self._make_dbus_device(
663                address,
664                self.known_devices.get(address, {}).get('name', 'Test device'))
665        return bool(self.proxy().DisconnectAllEnabledProfiles(device))
666