• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import os
7import logging
8
9import common
10from autotest_lib.client.common_lib import utils as client_utils
11
12try:
13    from chromite.lib import metrics
14except ImportError:
15    metrics = client_utils.metrics_mock
16
17
18class _BaseUpdateServoFw(object):
19    """Base class to update firmware on servo"""
20
21    # Command to update servo device.
22    # param 1: servo board (servo_v4|servo_micro)
23    # param 2: serial number of main device on the board
24    UPDATER = 'servo_updater -b %s -s %s --reboot'
25    UPDATER_FORCE = UPDATER + ' --force'
26
27    # Command to read current version on the servo
28    # param 1: serial number of main device on the board
29    SERVO_VERSION = 'cat $(servodtool device -s %s usb-path)/configuration'
30
31    # Command to read servod config file with extracting value by key
32    # param 1: servo port, provided by servo config
33    # param 2: required parammeter (key) from config file
34    SERVOD_CONFIG = 'cat /var/lib/servod/config_%s | grep %s'
35
36    # Command to get PATH to the latest available firmware on the host
37    # param 1: servo board (servo_v4|servo_micro)
38    LATEST_VERSION_FW = 'realpath /usr/share/servo_updater/firmware/%s.bin'
39
40    # Command to get servo product supported by device
41    # param 1: serial number of main device on the board
42    SERVO_PRODUCT = 'cat $(servodtool device -s %s usb-path)/product'
43
44    def __init__(self, servo_host):
45        self._host = servo_host
46        # keep flag that class support and can run updater
47        self._supported = None
48
49    def check_needs(self, ignore_version=False):
50        """Check if class supports update for particular servo type.
51
52        @params ignore_version: do not check the version on the device.
53        """
54        if self._supported is None:
55            if not self._host:
56                self._supported = False
57            elif not self._host.is_labstation():
58                self._supported = False
59            elif not self._host.servo_serial:
60                self._supported = False
61            elif not self._check_needs():
62                self._supported = False
63            elif not ignore_version:
64                self._supported = self._is_outdated_version()
65            else:
66                self._supported = True
67        return self._supported
68
69    def update(self, force_update=False, ignore_version=False):
70        """Update firmware on the servo.
71
72        Steps:
73        1) Verify servo is not updated by checking the versions.
74        2) Try to get serial number for the servo.
75        3) Updating firmware.
76
77        @params force_update: run updater with force option.
78        @params ignore_version: do not check the version on the device.
79        """
80        if not self.check_needs(ignore_version):
81            logging.info('The board %s does not need update or '
82                         'not present in the setup.', self.get_board())
83            return
84        if not self.get_serial_number():
85            logging.info('Serial number is not detected. It means no update'
86                         ' will be performed on servo.')
87            return
88        self._update_firmware(force_update)
89
90    def _check_needs(self):
91        """Check is servo type supported"""
92        raise NotImplementedError('Please implement method to perform'
93                                  ' check of supporting the servo type')
94
95    def get_board(self):
96        """Return servo type supported by updater"""
97        raise NotImplementedError('Please implement method to return'
98                                  ' servo type')
99
100    def get_serial_number(self):
101        """Return serial number for main servo device on servo"""
102        raise NotImplementedError('Please implement method to return'
103                                  ' serial number')
104
105    def _get_updater_cmd(self, force_update):
106        """Return command to run firmware updater for the servo device.
107
108        @params force_update: run updater with force option.
109        """
110        board = self.get_board()
111        serial_number = self.get_serial_number()
112        if force_update:
113            cmd = self.UPDATER_FORCE
114        else:
115            cmd = self.UPDATER
116        return cmd % (board, serial_number)
117
118    def _update_firmware(self, force_update):
119        """Execute firmware updater command.
120
121        Method generate a metric to collect statistics of update.
122        @params force_update: run updater with force option.
123        """
124        cmd = self._get_updater_cmd(force_update)
125        logging.info('Servo fw update: %s', cmd)
126        result = self._host.run(cmd, ignore_status=True).stdout.strip()
127        logging.debug('Servo fw update finished; %s', result)
128        logging.info('Servo fw update finished')
129        metrics.Counter(
130            'chromeos/autotest/audit/servo/fw_update'
131            ).increment(fields={'status': 'success'})
132
133    def _get_config_value(self, key):
134        """Read configuration value by provided key.
135
136        @param key: key from key=value pair in config file.
137                    eg: 'HUB' or 'SERVO_MICRO_SERIAL'
138        """
139        """Read value from servod config file"""
140        cmd = self.SERVOD_CONFIG % (self._host.servo_port, key)
141        result = self._host.run(cmd, ignore_status=True).stdout.strip()
142        if result:
143            return result[len(key)+1:]
144        return None
145
146    def _current_version(self):
147        """Get current version on servo device"""
148        cmd = self.SERVO_VERSION % self.get_serial_number()
149        version = self._host.run(cmd, ignore_status=True).stdout.strip()
150        logging.debug('Current version: %s', version)
151        return version
152
153    def _latest_version(self):
154        """Get latest version available on servo-host"""
155        cmd = self.LATEST_VERSION_FW % self.get_board()
156        filepath = self._host.run(cmd, ignore_status=True).stdout.strip()
157        if not filepath:
158            return None
159        version = os.path.basename(os.path.splitext(filepath)[0]).strip()
160        logging.debug('Latest version: %s', version)
161        return version
162
163    def _is_outdated_version(self):
164        """Compare version to determine request to update the Servo or not.
165
166        Method generate metrics to collect statistics with version.
167        """
168        current_version = self._current_version()
169        latest_version = self._latest_version()
170        if not current_version or not latest_version:
171            return True
172        if current_version == latest_version:
173            return False
174        metrics.Counter(
175            'chromeos/autotest/audit/servo/fw_need_update'
176            ).increment(fields={'version': current_version})
177        return True
178
179    def _get_product(self):
180        """Get servo product from servo device"""
181        cmd = self.SERVO_PRODUCT % self.get_serial_number()
182        return self._host.run(cmd, ignore_status=True).stdout.strip()
183
184
185class UpdateServoV4Fw(_BaseUpdateServoFw):
186    """Servo firmware updater for servo_v4 version.
187
188    Update firmware will be only if new version present and servo
189    was not updated.
190    """
191    def get_board(self):
192        """Return servo type supported by updater"""
193        return 'servo_v4'
194
195    def get_serial_number(self):
196        # serial number of servo_v4 match with device number
197        return self._host.servo_serial
198
199    def _check_needs(self):
200        """Check if servo is servo_v4.
201
202        Check servo type.
203        Check access to the serial number.
204        """
205        if self._get_product() != 'Servo V4':
206            return False
207        if not self.get_serial_number():
208            return False
209        return True
210
211
212class UpdateServoMicroFw(_BaseUpdateServoFw):
213    """Servo firmware updater for servo_micro version.
214
215    Update firmware will be only if new version present and servo
216    was not updated.
217    """
218    def __init__(self, servo_host):
219        super(UpdateServoMicroFw, self).__init__(servo_host)
220        self._serial_number = None
221
222    def get_board(self):
223        """Return servo type supported by updater"""
224        return 'servo_micro'
225
226    def get_serial_number(self):
227        # serial number of servo_v4 match with device number
228        if self._serial_number is None:
229            # servo_micro serial number is not match to serial on
230            # the servo device servod is keeping it in config file
231            serial = self._get_config_value('SERVO_MICRO_SERIAL')
232            self._serial_number = serial if serial is not None else ''
233        return self._serial_number
234
235    def _check_needs(self):
236        """Check if servo is servo_micro.
237
238        Check servo type.
239        Check access to the serial number.
240        """
241        if not self.get_serial_number():
242            # set does not include servo_micro
243            return False
244        if self._get_product() != 'Servo Micro':
245            return False
246        return True
247
248
249# List servo firmware updaters
250SERVO_UPDATERS = (
251    UpdateServoV4Fw,
252    UpdateServoMicroFw,
253)
254
255
256def update_servo_firmware(host,
257                          boards=None,
258                          force_update=False,
259                          ignore_version=False):
260    """Update firmware on servo devices.
261
262    @params host: ServoHost instance to run all required commands.
263    @params force_update: run updater with force option.
264    @params ignore_version: do not check the version on the device.
265    """
266    if boards is None:
267        boards = []
268    if ignore_version:
269        logging.debug('Running servo_updater with ignore_version=True')
270
271    # to run updater we need make sure the servod is not running
272    host.stop_servod()
273    # initialize all updaters
274    updaters = [updater(host) for updater in SERVO_UPDATERS]
275
276    for updater in updaters:
277        board = updater.get_board()
278        if len(boards) > 0 and board not in boards:
279            logging.info('The %s is not requested for update', board)
280            continue
281        logging.info('Try to update board: %s', board)
282        try:
283            updater.update(force_update=force_update,
284                           ignore_version=ignore_version)
285        except Exception as e:
286            data = {'host': host.get_dut_hostname() or '',
287                    'board': board}
288            metrics.Counter(
289                'chromeos/autotest/audit/servo/fw/update/error'
290                ).increment(fields=data)
291            logging.info('Fail update firmware for %s', board)
292            logging.debug('Fail update firmware for %s: %s', board, str(e))
293