1#!/usr/bin/env python2 2# Copyright 2020 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import os 7import logging 8 9import common 10from autotest_lib.client.common_lib import utils as client_utils 11 12try: 13 from chromite.lib import metrics 14except ImportError: 15 metrics = client_utils.metrics_mock 16 17 18class _BaseUpdateServoFw(object): 19 """Base class to update firmware on servo""" 20 21 # Command to update servo device. 22 # param 1: servo board (servo_v4|servo_micro) 23 # param 2: serial number of main device on the board 24 UPDATER = 'servo_updater -b %s -s %s --reboot' 25 UPDATER_FORCE = UPDATER + ' --force' 26 27 # Command to read current version on the servo 28 # param 1: serial number of main device on the board 29 SERVO_VERSION = 'cat $(servodtool device -s %s usb-path)/configuration' 30 31 # Command to read servod config file with extracting value by key 32 # param 1: servo port, provided by servo config 33 # param 2: required parammeter (key) from config file 34 SERVOD_CONFIG = 'cat /var/lib/servod/config_%s | grep %s' 35 36 # Command to get PATH to the latest available firmware on the host 37 # param 1: servo board (servo_v4|servo_micro) 38 LATEST_VERSION_FW = 'realpath /usr/share/servo_updater/firmware/%s.bin' 39 40 # Command to get servo product supported by device 41 # param 1: serial number of main device on the board 42 SERVO_PRODUCT = 'cat $(servodtool device -s %s usb-path)/product' 43 44 def __init__(self, servo_host): 45 self._host = servo_host 46 # keep flag that class support and can run updater 47 self._supported = None 48 49 def check_needs(self, ignore_version=False): 50 """Check if class supports update for particular servo type. 51 52 @params ignore_version: do not check the version on the device. 53 """ 54 if self._supported is None: 55 if not self._host: 56 self._supported = False 57 elif not self._host.is_labstation(): 58 self._supported = False 59 elif not self._host.servo_serial: 60 self._supported = False 61 elif not self._check_needs(): 62 self._supported = False 63 elif not ignore_version: 64 self._supported = self._is_outdated_version() 65 else: 66 self._supported = True 67 return self._supported 68 69 def update(self, force_update=False, ignore_version=False): 70 """Update firmware on the servo. 71 72 Steps: 73 1) Verify servo is not updated by checking the versions. 74 2) Try to get serial number for the servo. 75 3) Updating firmware. 76 77 @params force_update: run updater with force option. 78 @params ignore_version: do not check the version on the device. 79 """ 80 if not self.check_needs(ignore_version): 81 logging.info('The board %s does not need update or ' 82 'not present in the setup.', self.get_board()) 83 return 84 if not self.get_serial_number(): 85 logging.info('Serial number is not detected. It means no update' 86 ' will be performed on servo.') 87 return 88 self._update_firmware(force_update) 89 90 def _check_needs(self): 91 """Check is servo type supported""" 92 raise NotImplementedError('Please implement method to perform' 93 ' check of supporting the servo type') 94 95 def get_board(self): 96 """Return servo type supported by updater""" 97 raise NotImplementedError('Please implement method to return' 98 ' servo type') 99 100 def get_serial_number(self): 101 """Return serial number for main servo device on servo""" 102 raise NotImplementedError('Please implement method to return' 103 ' serial number') 104 105 def _get_updater_cmd(self, force_update): 106 """Return command to run firmware updater for the servo device. 107 108 @params force_update: run updater with force option. 109 """ 110 board = self.get_board() 111 serial_number = self.get_serial_number() 112 if force_update: 113 cmd = self.UPDATER_FORCE 114 else: 115 cmd = self.UPDATER 116 return cmd % (board, serial_number) 117 118 def _update_firmware(self, force_update): 119 """Execute firmware updater command. 120 121 Method generate a metric to collect statistics of update. 122 @params force_update: run updater with force option. 123 """ 124 cmd = self._get_updater_cmd(force_update) 125 logging.info('Servo fw update: %s', cmd) 126 result = self._host.run(cmd, ignore_status=True).stdout.strip() 127 logging.debug('Servo fw update finished; %s', result) 128 logging.info('Servo fw update finished') 129 metrics.Counter( 130 'chromeos/autotest/audit/servo/fw_update' 131 ).increment(fields={'status': 'success'}) 132 133 def _get_config_value(self, key): 134 """Read configuration value by provided key. 135 136 @param key: key from key=value pair in config file. 137 eg: 'HUB' or 'SERVO_MICRO_SERIAL' 138 """ 139 """Read value from servod config file""" 140 cmd = self.SERVOD_CONFIG % (self._host.servo_port, key) 141 result = self._host.run(cmd, ignore_status=True).stdout.strip() 142 if result: 143 return result[len(key)+1:] 144 return None 145 146 def _current_version(self): 147 """Get current version on servo device""" 148 cmd = self.SERVO_VERSION % self.get_serial_number() 149 version = self._host.run(cmd, ignore_status=True).stdout.strip() 150 logging.debug('Current version: %s', version) 151 return version 152 153 def _latest_version(self): 154 """Get latest version available on servo-host""" 155 cmd = self.LATEST_VERSION_FW % self.get_board() 156 filepath = self._host.run(cmd, ignore_status=True).stdout.strip() 157 if not filepath: 158 return None 159 version = os.path.basename(os.path.splitext(filepath)[0]).strip() 160 logging.debug('Latest version: %s', version) 161 return version 162 163 def _is_outdated_version(self): 164 """Compare version to determine request to update the Servo or not. 165 166 Method generate metrics to collect statistics with version. 167 """ 168 current_version = self._current_version() 169 latest_version = self._latest_version() 170 if not current_version or not latest_version: 171 return True 172 if current_version == latest_version: 173 return False 174 metrics.Counter( 175 'chromeos/autotest/audit/servo/fw_need_update' 176 ).increment(fields={'version': current_version}) 177 return True 178 179 def _get_product(self): 180 """Get servo product from servo device""" 181 cmd = self.SERVO_PRODUCT % self.get_serial_number() 182 return self._host.run(cmd, ignore_status=True).stdout.strip() 183 184 185class UpdateServoV4Fw(_BaseUpdateServoFw): 186 """Servo firmware updater for servo_v4 version. 187 188 Update firmware will be only if new version present and servo 189 was not updated. 190 """ 191 def get_board(self): 192 """Return servo type supported by updater""" 193 return 'servo_v4' 194 195 def get_serial_number(self): 196 # serial number of servo_v4 match with device number 197 return self._host.servo_serial 198 199 def _check_needs(self): 200 """Check if servo is servo_v4. 201 202 Check servo type. 203 Check access to the serial number. 204 """ 205 if self._get_product() != 'Servo V4': 206 return False 207 if not self.get_serial_number(): 208 return False 209 return True 210 211 212class UpdateServoMicroFw(_BaseUpdateServoFw): 213 """Servo firmware updater for servo_micro version. 214 215 Update firmware will be only if new version present and servo 216 was not updated. 217 """ 218 def __init__(self, servo_host): 219 super(UpdateServoMicroFw, self).__init__(servo_host) 220 self._serial_number = None 221 222 def get_board(self): 223 """Return servo type supported by updater""" 224 return 'servo_micro' 225 226 def get_serial_number(self): 227 # serial number of servo_v4 match with device number 228 if self._serial_number is None: 229 # servo_micro serial number is not match to serial on 230 # the servo device servod is keeping it in config file 231 serial = self._get_config_value('SERVO_MICRO_SERIAL') 232 self._serial_number = serial if serial is not None else '' 233 return self._serial_number 234 235 def _check_needs(self): 236 """Check if servo is servo_micro. 237 238 Check servo type. 239 Check access to the serial number. 240 """ 241 if not self.get_serial_number(): 242 # set does not include servo_micro 243 return False 244 if self._get_product() != 'Servo Micro': 245 return False 246 return True 247 248 249# List servo firmware updaters 250SERVO_UPDATERS = ( 251 UpdateServoV4Fw, 252 UpdateServoMicroFw, 253) 254 255 256def update_servo_firmware(host, 257 boards=None, 258 force_update=False, 259 ignore_version=False): 260 """Update firmware on servo devices. 261 262 @params host: ServoHost instance to run all required commands. 263 @params force_update: run updater with force option. 264 @params ignore_version: do not check the version on the device. 265 """ 266 if boards is None: 267 boards = [] 268 if ignore_version: 269 logging.debug('Running servo_updater with ignore_version=True') 270 271 # to run updater we need make sure the servod is not running 272 host.stop_servod() 273 # initialize all updaters 274 updaters = [updater(host) for updater in SERVO_UPDATERS] 275 276 for updater in updaters: 277 board = updater.get_board() 278 if len(boards) > 0 and board not in boards: 279 logging.info('The %s is not requested for update', board) 280 continue 281 logging.info('Try to update board: %s', board) 282 try: 283 updater.update(force_update=force_update, 284 ignore_version=ignore_version) 285 except Exception as e: 286 data = {'host': host.get_dut_hostname() or '', 287 'board': board} 288 metrics.Counter( 289 'chromeos/autotest/audit/servo/fw/update/error' 290 ).increment(fields=data) 291 logging.info('Fail update firmware for %s', board) 292 logging.debug('Fail update firmware for %s: %s', board, str(e)) 293