• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus
6
7from autotest_lib.client.common_lib import utils
8from autotest_lib.client.common_lib.cros import dbus_send
9from autotest_lib.client.common_lib.cros.cfm.usb import usb_device_collector
10from autotest_lib.client.common_lib.cros.cfm.usb import cfm_usb_devices
11
12ATRUS = cfm_usb_devices.ATRUS
13
14def check_dbus_available(host):
15    """
16    Checks if Atrusd with dbus support is available.
17
18    @param host: Host object to run a particular host.
19    @returns True if dbus is supported in atrusd. False otherwise.
20    """
21    result = dbus_send.dbus_send(
22            'org.freedesktop.DBus',
23            'org.freedesktop.DBus',
24            '/org/freedesktop/DBus',
25            'ListNames',
26            args=None,
27            host=host,
28            timeout_seconds=2,
29            tolerate_failures=False)
30
31    return 'org.chromium.Atrusctl' in result.response
32
33def force_upgrade_atrus(host):
34    """
35    Executes a forced upgrade of the Atrus device.
36
37    @param host: Host object to run a particular host.
38    @returns True if the upgrade succeeded. False otherwise.
39    """
40    args = [dbus.String('/lib/firmware/google/atrus-fw-bundle-latest.bin')]
41    result = dbus_send.dbus_send(
42            'org.chromium.Atrusctl',
43            'org.chromium.Atrusctl',
44            '/org/chromium/Atrusctl',
45            'ForceFirmwareUpgrade',
46            args=args,
47            host=host,
48            timeout_seconds=90,
49            tolerate_failures=True)
50
51    return result.response
52
53def wait_for_atrus_enumeration(host, timeout_seconds=30):
54    """
55    Wait for an Atrus device to enumerate.
56
57    @param host: Host object to run a particular host.
58    @param timeout_seconds: Maximum number of seconds to wait.
59    """
60    device_manager = usb_device_collector.UsbDeviceCollector(host)
61
62    utils.poll_for_condition(
63            lambda: len(device_manager.get_devices_by_spec(ATRUS)) >= 1,
64            timeout=timeout_seconds,
65            sleep_interval=1.0,
66            desc='No atrus device enumerated')
67