#!/usr/bin/env python2 # Copyright 2020 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import os import logging import common from autotest_lib.client.common_lib import utils as client_utils try: from chromite.lib import metrics except ImportError: metrics = client_utils.metrics_mock class _BaseUpdateServoFw(object): """Base class to update firmware on servo""" # Command to update servo device. # param 1: servo board (servo_v4|servo_micro) # param 2: serial number of main device on the board UPDATER = 'servo_updater -b %s -s %s --reboot' UPDATER_FORCE = UPDATER + ' --force' # Command to read current version on the servo # param 1: serial number of main device on the board SERVO_VERSION = 'cat $(servodtool device -s %s usb-path)/configuration' # Command to read servod config file with extracting value by key # param 1: servo port, provided by servo config # param 2: required parammeter (key) from config file SERVOD_CONFIG = 'cat /var/lib/servod/config_%s | grep %s' # Command to get PATH to the latest available firmware on the host # param 1: servo board (servo_v4|servo_micro) LATEST_VERSION_FW = 'realpath /usr/share/servo_updater/firmware/%s.bin' # Command to get servo product supported by device # param 1: serial number of main device on the board SERVO_PRODUCT = 'cat $(servodtool device -s %s usb-path)/product' def __init__(self, servo_host): self._host = servo_host # keep flag that class support and can run updater self._supported = None def check_needs(self, ignore_version=False): """Check if class supports update for particular servo type. @params ignore_version: do not check the version on the device. """ if self._supported is None: if not self._host: self._supported = False elif not self._host.is_labstation(): self._supported = False elif not self._host.servo_serial: self._supported = False elif not self._check_needs(): self._supported = False elif not ignore_version: self._supported = self._is_outdated_version() else: self._supported = True return self._supported def update(self, force_update=False, ignore_version=False): """Update firmware on the servo. Steps: 1) Verify servo is not updated by checking the versions. 2) Try to get serial number for the servo. 3) Updating firmware. @params force_update: run updater with force option. @params ignore_version: do not check the version on the device. """ if not self.check_needs(ignore_version): logging.info('The board %s does not need update or ' 'not present in the setup.', self.get_board()) return if not self.get_serial_number(): logging.info('Serial number is not detected. It means no update' ' will be performed on servo.') return self._update_firmware(force_update) def _check_needs(self): """Check is servo type supported""" raise NotImplementedError('Please implement method to perform' ' check of supporting the servo type') def get_board(self): """Return servo type supported by updater""" raise NotImplementedError('Please implement method to return' ' servo type') def get_serial_number(self): """Return serial number for main servo device on servo""" raise NotImplementedError('Please implement method to return' ' serial number') def _get_updater_cmd(self, force_update): """Return command to run firmware updater for the servo device. @params force_update: run updater with force option. """ board = self.get_board() serial_number = self.get_serial_number() if force_update: cmd = self.UPDATER_FORCE else: cmd = self.UPDATER return cmd % (board, serial_number) def _update_firmware(self, force_update): """Execute firmware updater command. Method generate a metric to collect statistics of update. @params force_update: run updater with force option. """ cmd = self._get_updater_cmd(force_update) logging.info('Servo fw update: %s', cmd) result = self._host.run(cmd, ignore_status=True).stdout.strip() logging.debug('Servo fw update finished; %s', result) logging.info('Servo fw update finished') metrics.Counter( 'chromeos/autotest/audit/servo/fw_update' ).increment(fields={'status': 'success'}) def _get_config_value(self, key): """Read configuration value by provided key. @param key: key from key=value pair in config file. eg: 'HUB' or 'SERVO_MICRO_SERIAL' """ """Read value from servod config file""" cmd = self.SERVOD_CONFIG % (self._host.servo_port, key) result = self._host.run(cmd, ignore_status=True).stdout.strip() if result: return result[len(key)+1:] return None def _current_version(self): """Get current version on servo device""" cmd = self.SERVO_VERSION % self.get_serial_number() version = self._host.run(cmd, ignore_status=True).stdout.strip() logging.debug('Current version: %s', version) return version def _latest_version(self): """Get latest version available on servo-host""" cmd = self.LATEST_VERSION_FW % self.get_board() filepath = self._host.run(cmd, ignore_status=True).stdout.strip() if not filepath: return None version = os.path.basename(os.path.splitext(filepath)[0]).strip() logging.debug('Latest version: %s', version) return version def _is_outdated_version(self): """Compare version to determine request to update the Servo or not. Method generate metrics to collect statistics with version. """ current_version = self._current_version() latest_version = self._latest_version() if not current_version or not latest_version: return True if current_version == latest_version: return False metrics.Counter( 'chromeos/autotest/audit/servo/fw_need_update' ).increment(fields={'version': current_version}) return True def _get_product(self): """Get servo product from servo device""" cmd = self.SERVO_PRODUCT % self.get_serial_number() return self._host.run(cmd, ignore_status=True).stdout.strip() class UpdateServoV4Fw(_BaseUpdateServoFw): """Servo firmware updater for servo_v4 version. Update firmware will be only if new version present and servo was not updated. """ def get_board(self): """Return servo type supported by updater""" return 'servo_v4' def get_serial_number(self): # serial number of servo_v4 match with device number return self._host.servo_serial def _check_needs(self): """Check if servo is servo_v4. Check servo type. Check access to the serial number. """ if self._get_product() != 'Servo V4': return False if not self.get_serial_number(): return False return True class UpdateServoMicroFw(_BaseUpdateServoFw): """Servo firmware updater for servo_micro version. Update firmware will be only if new version present and servo was not updated. """ def __init__(self, servo_host): super(UpdateServoMicroFw, self).__init__(servo_host) self._serial_number = None def get_board(self): """Return servo type supported by updater""" return 'servo_micro' def get_serial_number(self): # serial number of servo_v4 match with device number if self._serial_number is None: # servo_micro serial number is not match to serial on # the servo device servod is keeping it in config file serial = self._get_config_value('SERVO_MICRO_SERIAL') self._serial_number = serial if serial is not None else '' return self._serial_number def _check_needs(self): """Check if servo is servo_micro. Check servo type. Check access to the serial number. """ if not self.get_serial_number(): # set does not include servo_micro return False if self._get_product() != 'Servo Micro': return False return True # List servo firmware updaters SERVO_UPDATERS = ( UpdateServoV4Fw, UpdateServoMicroFw, ) def update_servo_firmware(host, boards=None, force_update=False, ignore_version=False): """Update firmware on servo devices. @params host: ServoHost instance to run all required commands. @params force_update: run updater with force option. @params ignore_version: do not check the version on the device. """ if boards is None: boards = [] if ignore_version: logging.debug('Running servo_updater with ignore_version=True') # to run updater we need make sure the servod is not running host.stop_servod() # initialize all updaters updaters = [updater(host) for updater in SERVO_UPDATERS] for updater in updaters: board = updater.get_board() if len(boards) > 0 and board not in boards: logging.info('The %s is not requested for update', board) continue logging.info('Try to update board: %s', board) try: updater.update(force_update=force_update, ignore_version=ignore_version) except Exception as e: data = {'host': host.get_dut_hostname() or '', 'board': board} metrics.Counter( 'chromeos/autotest/audit/servo/fw/update/error' ).increment(fields=data) logging.info('Fail update firmware for %s', board) logging.debug('Fail update firmware for %s: %s', board, str(e))