• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.server import test
12import parse
13
14GUADO_GPIO = 218  # For Front-Left USB port
15POWER_RECYCLE_WAIT_TIME = 1  # sec
16
17
18class enterprise_CFM_HuddlyUpdater(test.test):
19    """Tests the firmware updatability of HuddlyGo camera.
20
21    An event to trigger the firmware update is to power recycle of a USB port
22    which the HuddlyGo camera is attached to. The power recycle emulates
23    the power recycle of the ChromeBox or a reconnection of the peripheral
24    to the ChromeBox.
25
26    The test scenario involves the power recycling of a specific USB port
27    of the Guado ChromeBox: Front-left one. This imposes a restriction in the
28    testbed setup. This limitation is to be alleviated after the development
29    of full-fledged usb power recycle code. TODO(frankhu).
30    """
31
32    version = 1
33    _failed_test_list = []
34
35    UPDATER_WAIT_TIME = 60  # sec
36
37    FIRMWARE_PKG_ORG = 'huddly'
38    FIRMWARE_PKG_TO_TEST = 'huddly052'
39    FIRMWARE_PKG_BACKUP = 'huddly.backup'
40
41    DUT_FIRMWARE_BASE = '/lib/firmware/'
42    DUT_FIRMWARE_SRC = os.path.join(DUT_FIRMWARE_BASE, FIRMWARE_PKG_ORG)
43    DUT_FIRMWARE_SRC_BACKUP = os.path.join(DUT_FIRMWARE_BASE,
44                                           FIRMWARE_PKG_BACKUP)
45    DUT_FIRMWARE_SRC_TEST = os.path.join(DUT_FIRMWARE_BASE,
46                                         FIRMWARE_PKG_TO_TEST)
47
48    def initialize(self):
49        """initialize is a stub function."""
50        # Placeholder.
51        pass
52
53    def ls(self):
54        """ls tracks the directories of interest."""
55        cmd = 'ls -l /lib/firmware/ | grep huddly'
56        result = self._shcmd(cmd)
57
58    def cleanup(self):
59        """Bring the originally bundled firmware package back."""
60        cmd = '[ -f {} ] && rm -rf {}'.format(self.DUT_FIRMWARE_SRC,
61                                              self.DUT_FIRMWARE_SRC)
62        self._shcmd(cmd)
63
64        cmd = 'mv {} {} && rm -rf {}'.format(self.DUT_FIRMWARE_SRC_BACKUP,
65                                             self.DUT_FIRMWARE_SRC,
66                                             self.DUT_FIRMWARE_SRC_TEST)
67        self._shcmd(cmd)
68
69    def _shcmd(self, cmd):
70        """A simple wrapper for remote shell command execution."""
71        logging.info('CMD: [%s]', cmd)
72        result = self._client.run(cmd)
73
74        # result is an object with following attributes:
75        # ['__class__', '__delattr__', '__dict__', '__doc__', '__eq__',
76        # '__format__', '__getattribute__', '__hash__', '__init__',
77        # '__module__', '__new__', '__reduce__', '__reduce_ex__',
78        # '__repr__', '__setattr__', '__sizeof__', '__str__',
79        # '__subclasshook__', '__weakref__', 'command', 'duration',
80        # 'exit_status', 'stderr', 'stdout']
81        try:
82            result = self._client.run(cmd)
83        except:
84            pass
85
86        if result.stderr:
87            logging.info('CMD ERR:\n' + result.stderr)
88        logging.info('CMD OUT:\n' + result.stdout)
89        return result
90
91    def copy_firmware(self):
92        """Copy test firmware package from server to the DUT."""
93        current_dir = os.path.dirname(os.path.realpath(__file__))
94        src_firmware_path = os.path.join(current_dir, self.FIRMWARE_PKG_TO_TEST)
95        dst_firmware_path = self.DUT_FIRMWARE_BASE
96
97        msg = 'copy firmware from {} to {}'.format(src_firmware_path,
98                                                   dst_firmware_path)
99        logging.info(msg)
100        self._client.send_file(
101            src_firmware_path, dst_firmware_path, delete_dest=True)
102
103    def update_firmware(self, firmware_pkg):
104        """Update the peripheral's firmware with the specified package.
105
106        @param firmware_pkg: A string of package name specified by the leaf
107                directory name in /lib/firmware/. See class constants
108                DUT_FIRMWARE_SRC*.
109        """
110        # Set up the firmware package to test with
111        firmware_path = os.path.join(self.DUT_FIRMWARE_BASE, firmware_pkg)
112        cmd = 'ln -sfn {} {}'.format(firmware_path, self.DUT_FIRMWARE_SRC)
113        self._shcmd(cmd)
114
115        ver_dic = self.get_fw_vers()
116        had = ver_dic.get('peripheral', {}).get('app', '')
117        want = ver_dic.get('package', {}).get('app', '')
118
119        msg = 'Update plan: from {} to {} with package: {}'.format(
120            had, want, firmware_pkg)
121        logging.info(msg)
122
123        logging.info('Recycle the power to the USB port '
124                     'to which HuddlyGo is attached.')
125        self.usb_power_recycle()
126        time.sleep(self.UPDATER_WAIT_TIME)
127
128        got = self.get_fw_vers().get('peripheral', {}).get('app', '')
129
130        msg = 'Update result: had {} want {} got {}'.format(
131            had, want, got)
132        logging.info(msg)
133
134        if want != got:
135            self._failed_test_list.append(
136                'update_firmware({})'.format(firmware_pkg))
137
138    def run_once(self, host=None):
139        """Update two times. First with test package, second with the original.
140
141        Test scenario:
142          1. Copy test firmware from the server to the DUT.
143          2. Update with the test package. Wait about 50 sec till completion.
144             Confirm if the peripheral is updated with the test version.
145          3. Update with the original package. Wait about 50 sec.
146             Confirm if the peripheral is updated with the original version.
147        """
148        self._client = host
149
150        if not self.is_filesystem_readwrite():
151            # Make the file system read-writable, reboot, and continue the test
152            logging.info('DUT root file system is not read-writable. '
153                         'Converting it read-wriable...')
154            self.convert_rootfs_writable()
155        else:
156            logging.info('DUT is read-writable')
157
158
159        try:
160            self.ls()
161            cmd = 'mv {} {}'.format(self.DUT_FIRMWARE_SRC,
162                                    self.DUT_FIRMWARE_SRC_BACKUP)
163            self._shcmd(cmd)
164
165            self.ls()
166            self.copy_firmware()
167            self.ls()
168            self.update_firmware(self.FIRMWARE_PKG_TO_TEST)
169            self.ls()
170            self.update_firmware(self.FIRMWARE_PKG_BACKUP)
171
172            if self._failed_test_list:
173              msg = 'Test failed in {}'.format(
174                  ', '.join(map(str, self._failed_test_list)))
175              raise error.TestFail(msg)
176        except:
177            pass
178        finally:
179            self.cleanup()
180
181    def convert_rootfs_writable(self):
182        """Remove rootfs verification on DUT, reboot,
183        and remount the filesystem read-writable"""
184
185        logging.info('Disabling rootfs verification...')
186        self.remove_rootfs_verification()
187
188        logging.info('Rebooting...')
189        self.reboot()
190
191        logging.info('Remounting..')
192        cmd = 'mount -o remount,rw /'
193        self._shcmd(cmd)
194
195    def remove_rootfs_verification(self):
196        """Remove rootfs verification."""
197        # 2 & 4 are default partitions, and the system boots from one of them.
198        # Code from chromite/scripts/deploy_chrome.py
199        KERNEL_A_PARTITION = 2
200        KERNEL_B_PARTITION = 4
201
202        cmd_template = ('/usr/share/vboot/bin/make_dev_ssd.sh --partitions %d '
203               '--remove_rootfs_verification --force')
204        for partition in (KERNEL_A_PARTITION, KERNEL_B_PARTITION):
205            cmd = cmd_template % partition
206            self._client.run(cmd)
207
208    def reboot(self):
209        """Reboots the DUT."""
210        self._client.reboot()
211
212    def get_fw_vers(self):
213        """Queries the firmware versions.
214
215        Utilizes the output of the command 'huddly-updater --info'.
216        It queries and parses the firmware versions of app and bootloader of
217        firmware package and the peripheral's running firmwares, respectively.
218
219        @returns a dictionary hierachically storing the firmware versions.
220        """
221
222        # TODO(porce): The updater's output is to stdout, but Auto test
223        # command output comes to stderr. Investigate.
224        cmd = 'huddly-updater --info --log_to=stdout'
225        result = self._shcmd(cmd).stderr
226        ver_dic = parse.parse_fw_vers(result)
227        return ver_dic
228
229    def usb_power_recycle(self):
230        """Recycle the power to a USB port.
231
232        # TODO(frankhu): This code supports Guado, at a specific test
233        # configuration. Develop an independent tool to perform this task
234        # with minimal dependency.
235        """
236
237        try:
238            # Ignorant handling of GPIO export.
239            cmd = 'echo {} > /sys/class/gpio/export'.format(GUADO_GPIO)
240            self._shcmd(cmd)
241        except error.AutoservRunError:
242            pass
243
244        cmd = 'echo out > /sys/class/gpio/gpio{}/direction'.format(GUADO_GPIO)
245        self._shcmd(cmd)
246        cmd = 'echo 0 > /sys/class/gpio/gpio{}/value'.format(GUADO_GPIO)
247        self._shcmd(cmd)
248
249        # Wait for 1 second to avoid too fast removal and reconnection.
250        time.sleep(POWER_RECYCLE_WAIT_TIME)
251        cmd = 'echo 1 > /sys/class/gpio/gpio{}/value'.format(GUADO_GPIO)
252        self._shcmd(cmd)
253
254    def is_filesystem_readwrite(self):
255        """Check if the root file system is read-writable.
256
257        Query the DUT's filesystem /dev/root, often manifested as /dev/dm-0
258        or  is mounted as read-only or not.
259
260        @returns True if the /dev/root is read-writable. False otherwise.
261        """
262
263        cmd = 'cat /proc/mounts | grep "/dev/root"'
264        result = self._shcmd(cmd).stdout
265        fields = re.split(' |,', result)
266        return True if fields.__len__() >= 4 and fields[3] == 'rw' else False
267