# Lint as: python2, python3 # Copyright 2016 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from __future__ import absolute_import from __future__ import division from __future__ import print_function import sys import functools import logging import math import time import common from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import hosts from autotest_lib.client.common_lib import utils from autotest_lib.server.cros.power import servo_charger from autotest_lib.server.cros.servo import servo from autotest_lib.server.hosts import cros_constants from autotest_lib.server.hosts import repair_utils from autotest_lib.server.hosts import servo_constants from autotest_lib.server.cros.servo.topology import servo_topology from autotest_lib.site_utils.admin_audit import servo_updater import six try: from chromite.lib import metrics except ImportError: metrics = utils.metrics_mock # TODO(gregorynisbet): will importing chromite always succeed in all contexts? from chromite.lib import timeout_util def ignore_exception_for_non_cros_host(func): """ Decorator to ignore ControlUnavailableError if servo host is not cros host. When using test_that command on a workstation, this enables usage of additional servo devices such as servo micro and Sweetberry. This shall not change any lab behavior. """ @functools.wraps(func) def wrapper(self, host): """ Wrapper around func. """ try: func(self, host) except servo.ControlUnavailableError as e: if host.is_cros_host(): raise logging.warning("Servo host is not cros host, ignore %s: %s", type(e).__name__, e) return wrapper class _UpdateVerifier(hosts.Verifier): """ Verifier to trigger a servo host update, if necessary. The operation doesn't wait for the update to complete and is considered a success whether or not the servo is currently up-to-date. """ @timeout_util.TimeoutDecorator(cros_constants.LONG_VERIFY_TIMEOUT_SEC) def verify(self, host): # First, only run this verifier if the host is in the physical lab. # Secondly, skip if the test is being run by test_that, because subnet # restrictions can cause the update to fail. try: if host.is_labstation(): logging.info("Skip update check because the host is a" " labstation and labstation update is handled" " by labstation AdminRepair task.") return if host.is_in_lab() and host.job and host.job.in_lab: if ( not host.get_dut_host_info() or not host.get_dut_host_info().servo_cros_stable_version ): logging.info('Servo stable version missed.' ' Skip update check action.') return # We have seen cases that invalid GPT headers/entries block # v3s from been update, so always try to repair here. # See crbug.com/994396, crbug.com/1057302. host.run('cgpt repair /dev/mmcblk0', ignore_status=True) host.update_image() # We don't want failure from update block DUT repair action. # See crbug.com/1029950. except Exception as e: six.reraise(hosts.AutoservNonCriticalVerifyError, str(e), sys.exc_info()[2]) @property def description(self): return 'servo host software is up-to-date' class _ConfigVerifier(hosts.Verifier): """ Base verifier for the servo config file verifiers. """ CONFIG_FILE = '/var/lib/servod/config' ATTR = '' @staticmethod def _get_config_val(host, config_file, attr): """ Get the `attr` for `host` from `config_file`. @param host Host to be checked for `config_file`. @param config_file Path to the config file to be tested. @param attr Attribute to get from config file. @return The attr val as set in the config file, or `None` if the file was absent. """ getboard = ('CONFIG=%s ; [ -f $CONFIG ] && ' '. $CONFIG && echo $%s' % (config_file, attr)) attr_val = host.run(getboard, ignore_status=True).stdout return attr_val.strip('\n') if attr_val else None @staticmethod def _validate_attr(host, val, expected_val, attr, config_file): """ Check that the attr setting is valid for the host. This presupposes that a valid config file was found. Raise an execption if: * There was no attr setting from the file (i.e. the setting is an empty string), or * The attr setting is valid, the attr is known, and the setting doesn't match the DUT. @param host Host to be checked for `config_file`. @param val Value to be tested. @param expected_val Expected value. @param attr Attribute we're validating. @param config_file Path to the config file to be tested. """ if not val: raise hosts.AutoservVerifyError( 'config file %s exists, but %s ' 'is not set' % (attr, config_file)) if expected_val is not None and val != expected_val: raise hosts.AutoservVerifyError( '%s is %s; it should be %s' % (attr, val, expected_val)) def _get_config(self, host): """ Return the config file to check. @param host Host object. @return The config file to check. """ return '%s_%d' % (self.CONFIG_FILE, host.servo_port) @property def description(self): return 'servo %s setting is correct' % self.ATTR class _SerialConfigVerifier(_ConfigVerifier): """ Verifier for the servo SERIAL configuration. """ ATTR = 'SERIAL' @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): """ Test whether the `host` has a `SERIAL` setting configured. This tests the config file names used by the `servod` upstart job for a valid setting of the `SERIAL` variable. The following conditions raise errors: * The SERIAL setting doesn't match the DUT's entry in the AFE database. * There is no config file. """ if not host.is_cros_host(): return # Not all servo hosts will have a servo serial so don't verify if it's # not set. if host.servo_serial is None: return config = self._get_config(host) serialval = self._get_config_val(host, config, self.ATTR) if serialval is None: raise hosts.AutoservVerifyError( 'Servo serial is unconfigured; should be %s' % host.servo_serial ) self._validate_attr(host, serialval, host.servo_serial, self.ATTR, config) class _BoardConfigVerifier(_ConfigVerifier): """ Verifier for the servo BOARD configuration. """ ATTR = 'BOARD' @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): """ Test whether the `host` has a `BOARD` setting configured. This tests the config file names used by the `servod` upstart job for a valid setting of the `BOARD` variable. The following conditions raise errors: * A config file exists, but the content contains no setting for BOARD. * The BOARD setting doesn't match the DUT's entry in the AFE database. * There is no config file. """ if not host.is_cros_host(): return config = self._get_config(host) boardval = self._get_config_val(host, config, self.ATTR) if boardval is None: msg = 'Servo board is unconfigured' if host.servo_board is not None: msg += '; should be %s' % host.servo_board raise hosts.AutoservVerifyError(msg) self._validate_attr(host, boardval, host.servo_board, self.ATTR, config) class _ServodJobVerifier(hosts.Verifier): """ Verifier to check that the `servod` upstart job is running. """ @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): if not host.is_cros_host(): return status_cmd = 'status servod PORT=%d' % host.servo_port job_status = host.run(status_cmd, ignore_status=True).stdout if 'start/running' not in job_status: raise hosts.AutoservVerifyError( 'servod not running on %s port %d' % (host.hostname, host.servo_port)) @property def description(self): return 'servod upstart job is running' class _DiskSpaceVerifier(hosts.Verifier): """ Verifier to make sure there is enough disk space left on servohost. """ @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): # Check available space of stateful is greater than threshold, in Gib. host.check_diskspace('/mnt/stateful_partition', 0.5) @property def description(self): return 'servohost has enough disk space.' class _ServodConnectionVerifier(hosts.Verifier): """ Verifier to check that we can connect to servod server. If this verifier failed, it most likely servod was crashed or in a crashing loop. For servo_v4 it's usually caused by not able to detect CCD or servo_micro. """ @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): host.initilize_servo() @property def description(self): return 'servod service is taking calls' class _ServodControlVerifier(hosts.Verifier): """ Verifier to check basic servo control functionality. This tests the connection to the target servod service with a simple method call. As a side-effect, all servo signals are initialized to default values. N.B. Initializing servo signals is necessary because the power button and lid switch verifiers both test against expected initial values. """ @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): try: host.initialize_dut_for_servo() except Exception as e: six.reraise(hosts.AutoservNonCriticalVerifyError, str(e), sys.exc_info()[2]) @property def description(self): return 'Basic servod control is working' class _Cr50ConsoleVerifier(hosts.Verifier): """Verifier to check if cr50 console is present and working. Validating based by running commands and expect they will not fail. If any command fail then console is not working as expected. """ COMMAND_TO_CHECK_CONSOLE = ( 'cr50_ccd_level', 'cr50_testlab', 'cr50_ccd_state_flags', ) @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): try: for command in self.COMMAND_TO_CHECK_CONSOLE: if host.get_servo().has_control(command): # Response of command is not important. host.get_servo().get(command) except Exception as e: six.reraise(hosts.AutoservNonCriticalVerifyError, str(e), sys.exc_info()[2]) def _is_applicable(self, host): # Only when DUT is running through ccd. # TODO(coconutruben): replace with ccd API when available in servo.py return (host.get_servo() and host.get_servo().get_main_servo_device() == 'ccd_cr50') @property def description(self): return 'CR50 console is working' class _CCDTestlabVerifier(hosts.Verifier): """ Verifier to check that ccd testlab is enabled. All DUT connected by ccd has to supported cr50 with enabled testlab to allow manipulation by servo. The flag testlab is sticky and will stay enabled if was set up. The testlab can be enabled when ccd is open. (go/ccd-setup) """ @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): if not host.get_servo().has_control('cr50_testlab'): raise hosts.AutoservVerifyError( 'cr50 has to be supported when use servo with ' 'ccd_cr50/type-c connection') status = host.get_servo().get('cr50_testlab') # check by 'on' to fail when get unexpected value if status == 'on': # ccd testlab enabled return raise hosts.AutoservNonCriticalVerifyError( 'The ccd testlab is disabled; DUT requires manual work ' 'to enable it (go/ccd-setup).') def _is_applicable(self, host): # Only when DUT is running through ccd. # TODO(coconutruben): replace with ccd API when available in servo.py return (host.get_servo() and host.get_servo().get_main_servo_device() == 'ccd_cr50') @property def description(self): return 'ccd testlab enabled' class _CCDPowerDeliveryVerifier(hosts.Verifier): """Verifier to check and reset servo_v4_role for servos that support power delivery feature(a.k.a power pass through). There are currently two position of servo_v4_role, src and snk: src -- servo in power delivery mode and passes power to the DUT. snk -- servo in normal mode and not passes power to DUT. We want to ensure that servo_v4_role is set to src. TODO(xianuowang@) Convert it to verifier/repair action pair or remove it once we collected enough metrics. """ # Change to use the constant value in CrosHost if we move it to # verifier/repair pair. CHANGE_SERVO_ROLE_TIMEOUT = 180 @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): if host.get_servo().get('servo_v4_role') == 'snk': logging.warning('The servo initlized with role snk while' ' supporting power delivery, resetting role' ' to src...') try: logging.info('setting power direction with retries') # do not pass host since host does not inherit from CrosHost. charge_manager = servo_charger.ServoV4ChargeManager( host=None, servo=host.get_servo(), ) attempts = charge_manager.start_charging() logging.info('setting power direction took %d tries', attempts) # if control makes it here, we successfully changed the host # direction result = 'src' except Exception as e: logging.error( 'setting power direction with retries failed %s', str(e), ) finally: time.sleep(self.CHANGE_SERVO_ROLE_TIMEOUT) result = host.get_servo().get('servo_v4_role') logging.debug('Servo_v4 role after reset: %s', result) metrics_data = { 'hostname': host.get_dut_hostname() or 'unknown', 'status': 'success' if result == 'src' else 'failed', 'board': host.servo_board or 'unknown', 'model': host.servo_model or 'unknown' } metrics.Counter( 'chromeos/autotest/repair/verifier/power_delivery3' ).increment(fields=metrics_data) def _is_applicable(self, host): return (host.is_in_lab() and host.get_servo().supports_built_in_pd_control()) @property def description(self): return 'ensure applicable servo is in "src" mode for power delivery' class _BaseDUTConnectionVerifier(hosts.Verifier): """Verifier to check connection between DUT and servo.""" # Bus voltage on ppdut5. Value can be: # - less than 500 - DUT is likely not connected # - between 500 and 4000 - unexpected value # - more than 4000 - DUT is likely connected MAX_PPDUT5_MV_WHEN_NOT_CONNECTED = 500 MIN_PPDUT5_MV_WHEN_CONNECTED = 4000 def _is_usb_hub_connected(self, host): """Checking bus voltage on ppdut5. Supported only on servo_v4 boards. If voltage value is lower than 500 then device is not connected. When value higher 4000 means the device is connected. If value between 500 and 4000 is not expected and will be marked as connected and collected information which DUT has this exception. @returns: bool """ logging.debug('Started check by ppdut5_mv:on') try: val = host.get_servo().get('ppdut5_mv') if val < self.MAX_PPDUT5_MV_WHEN_NOT_CONNECTED: # servo is not connected to the DUT return False if val < self.MIN_PPDUT5_MV_WHEN_CONNECTED: # is unexpected value. # collecting metrics to look case by case # TODO(otabek) for analysis b:163845694 data = host._get_host_metrics_data() metrics.Counter('chromeos/autotest/repair/ppdut5_mv_case' ).increment(fields=data) # else: # servo is physical connected to the DUT except Exception as e: logging.debug('(Not critical) %s', e) return True def _is_ribbon_cable_connected(self, host): """Check if ribbon cable is connected to the DUT. The servo_micro/flex - can be checked by `cold_reset` signal. When `cold_reset` is `on` it commonly indicates that the DUT is disconnected. To avoid mistake of real signal we try switch it off and if is cannot then servo is not connected. @returns: bool """ logging.debug('Started check by cold_reset:on') try: if host.get_servo().get('cold_reset') == 'on': # If cold_reset has is on can be right signal # or caused by missing connection between servo_micro and DUT. # if we can switch it to the off then it was signal. host.get_servo().set('cold_reset', 'off') except error.TestFail: logging.debug('Ribbon cable is not connected to the DUT.') return False except Exception as e: logging.debug('(Not critical) %s', e) return True def _is_dut_power_on(self, host): # DUT is running in normal state. # if EC not supported by board then we expect error try: return host.get_servo().get('ec_system_powerstate') == 'S0' except Exception as e: logging.debug('(Not critical) %s', e) return False def _is_servo_v4_type_a(self, host): return (host.is_labstation() and host.get_servo().has_control('servo_v4_type') and host.get_servo().get('servo_v4_type') == 'type-a') def _is_servo_v4_type_c(self, host): return (host.is_labstation() and host.get_servo().has_control('servo_v4_type') and host.get_servo().get('servo_v4_type') == 'type-c') def _is_servo_v3(self, host): return not host.is_labstation() class _DUTConnectionVerifier(_BaseDUTConnectionVerifier): """Verifier to check connection Servo to the DUT. Servo_v4 type-a connected to the DUT by: 1) servo_micro - checked by `cold_reset`. Servo_v4 type-c connected to the DUT by: 1) ccd - checked by ppdut5_mv. Servo_v3 connected to the DUT by: 1) legacy servo header - can be checked by `cold_reset`. """ @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): if self._is_servo_v4_type_a(host): if not self._is_ribbon_cable_connected(host): raise hosts.AutoservVerifyError( 'Servo_micro is likely not connected to the DUT.') elif self._is_servo_v4_type_c(host): logging.info('Skip check for type-c till confirm it in the lab') # TODO(otabek@) block check till verify on the lab # if not self._is_usb_hub_connected(host): # raise hosts.AutoservVerifyError( # 'Servo_v4 is likely not connected to the DUT.') elif self._is_servo_v3(host): if not self._is_ribbon_cable_connected(host): raise hosts.AutoservVerifyError( 'Servo_v3 is likely not connected to the DUT.') else: logging.warn('Unsupported servo type!') def _is_applicable(self, host): if host.is_ec_supported(): return True logging.info('DUT is not support EC.') return False @property def description(self): return 'Ensure the Servo connected to the DUT.' class _ServoHubConnectionVerifier(_BaseDUTConnectionVerifier): """Verifier to check connection ServoHub to DUT. Servo_v4 type-a connected to the DUT by: 1) USB hub - checked by ppdut5_mv. """ @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): if self._is_servo_v4_type_a(host): if (self._is_dut_power_on(host) and not self._is_usb_hub_connected(host)): raise hosts.AutoservVerifyError( 'Servo USB hub is likely not connected to the DUT.') def _is_applicable(self, host): if host.is_ec_supported(): return True logging.info('DUT is not support EC.') return False @property def description(self): return 'Ensure the Servo HUB connected to the DUT.' class _TopologyVerifier(hosts.Verifier): """Verifier that all servo component is presented.""" @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): topology = servo_topology.ServoTopology(host) topology.read(host.get_dut_host_info()) try: # Linux takes 1 second to detect and enumerate USB device since # 2010 year. We take 10 seconds to be sure as old standard was # 5 seconds. time.sleep(10) topology.validate(raise_error=True, dual_set=host.is_dual_setup(), compare=True) except servo_topology.ServoTopologyError as e: six.reraise(hosts.AutoservVerifyError, str(e), sys.exc_info()[2]) def _is_applicable(self, host): if host.is_localhost(): logging.info('Target servo is not in a lab,' ' action is not applicable.') return False if not host.is_servo_topology_supported(): logging.info('Target servo-topology is not supported,' ' action is not applicable.') return False return True @property def description(self): return 'Ensure all Servo component present.' class _PowerButtonVerifier(hosts.Verifier): """ Verifier to check sanity of the `pwr_button` signal. Tests that the `pwr_button` signal shows the power button has been released. When `pwr_button` is stuck at `press`, it commonly indicates that the ribbon cable is disconnected. """ # TODO (crbug.com/646593) - Remove list below once servo has been updated # with a dummy pwr_button signal. _BOARDS_WO_PWR_BUTTON = ['arkham', 'gale', 'mistral', 'storm', 'whirlwind'] @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): if host.servo_board in self._BOARDS_WO_PWR_BUTTON: return try: button = host.get_servo().get('pwr_button') except Exception as e: six.reraise(hosts.AutoservNonCriticalVerifyError, str(e), sys.exc_info()[2]) if button != 'release': raise hosts.AutoservNonCriticalVerifyError( 'Check ribbon cable: \'pwr_button\' is stuck') def _is_applicable(self, host): return (host.get_servo() and host.get_servo().main_device_is_flex()) @property def description(self): return 'pwr_button control is normal' class _BatteryVerifier(hosts.Verifier): """Collect battery info for analysis.""" @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.VERIFY_TIMEOUT_SEC) def verify(self, host): try: servo = host.get_servo() charging = False if servo.has_control('battery_is_charging'): charging = servo.get('battery_is_charging') level = -1 if servo.has_control('battery_charge_percent'): level = servo.get('battery_charge_percent') design_mah = servo.get('battery_full_design_mah') charge_mah = servo.get('battery_full_charge_mah') logging.info('Charging: %s', charging) logging.info('Percentage: %s', level) logging.info('Full charge max: %s', charge_mah) logging.info('Full design max: %s', design_mah) # based on analysis of ratio we can find out what is # the level when we can say that battery is dead ratio = int(math.floor(charge_mah / design_mah * 100.0)) logging.info('Ratio: %s', ratio) data = { 'board': host.servo_board or 'unknown', 'model': host.servo_model or 'unknown', 'ratio': ratio } metrics.Counter('chromeos/autotest/battery/ratio').increment( fields=data) except Exception as e: # Keeping it with info level because we do not expect it. logging.info('(Not critical) %s', e) def _is_applicable(self, host): if not host.is_ec_supported(): logging.info('The board not support EC') return False dut_info = host.get_dut_host_info() if dut_info: host_info = host.get_dut_host_info() if host_info.get_label_value('power') != 'battery': logging.info('The board does not have battery') return False servo = host.get_servo() if (not servo.has_control('battery_full_design_mah') or not servo.has_control('battery_full_charge_mah')): logging.info('The board is not supported battery controls...') return False return True @property def description(self): return 'Logs battery levels' class _LidVerifier(hosts.Verifier): """ Verifier to check sanity of the `lid_open` signal. """ @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): try: lid_open = host.get_servo().get('lid_open') except Exception as e: six.reraise(hosts.AutoservNonCriticalVerifyError, str(e), sys.exc_info()[2]) if lid_open != 'yes' and lid_open != 'not_applicable': raise hosts.AutoservNonCriticalVerifyError( 'Check lid switch: lid_open is %s' % lid_open) @property def description(self): return 'lid_open control is normal' class _EcBoardVerifier(hosts.Verifier): """ Verifier response from the 'ec_board' control. """ @ignore_exception_for_non_cros_host @timeout_util.TimeoutDecorator(cros_constants.SHORT_VERIFY_TIMEOUT_SEC) def verify(self, host): if host.is_ec_supported(): ec_board_name = '' try: ec_board_name = host.get_servo().get_ec_board() logging.debug('EC board: %s', ec_board_name) except Exception as e: raise hosts.AutoservNonCriticalVerifyError( '`ec_board` control is not responding; ' 'may be caused of broken EC firmware') else: logging.info('The board not support EC') @property def description(self): return 'Check EC by get `ec_board` control' class _RestartServod(hosts.RepairAction): """Restart `servod` with the proper BOARD setting.""" @timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC) def repair(self, host): if not host.is_cros_host(): raise hosts.AutoservRepairError( 'Can\'t restart servod: not running ' 'embedded Chrome OS.', 'servo_not_applicable_to_non_cros_host') host.restart_servod() @property def description(self): return 'Start servod with the proper config settings.' class _ServoRebootRepair(repair_utils.RebootRepair): """Try repair servo by reboot servohost. This is the same as the standard `RebootRepair`, for servo_v3 it will reboot the beaglebone board immidiately while for labstation it will request a reboot by touch a flag file on its labstation, then labstation reboot will be handled by labstation AdminRepair task as labstation host multiple servos and need do an synchronized reboot. """ @timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC) def repair(self, host): super(_ServoRebootRepair, self).repair(host) # restart servod for v3 after reboot. host.restart_servod() def _is_applicable(self, host): if host.is_localhost() or not host.is_cros_host(): logging.info('Target servo is not in a lab, the reboot repair' ' action is not applicable.') return False if host.is_labstation(): host.request_reboot() logging.info('Reboot labstation requested, it will be handled' ' by labstation AdminRepair task.') return False return True @property def description(self): return 'Reboot the servo host.' class _ToggleCCLineRepair(hosts.RepairAction): """Try repair servod by toggle cc. When cr50 is not enumerated we can try to recover it by toggle cc line. Repair action running from servohost. We using usb_console temporally witch required stop servod. TODO(otabek@) review the logic when b/159755652 implemented """ @timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC) def repair(self, host): host.stop_servod() self._reset_usbc_pigtail_connection(host) host.restart_servod() def _is_applicable(self, host): if host.is_localhost() or not host.is_labstation(): return False if not host.servo_serial: return False return self._is_type_c(host) def _is_type_c(self, host): if host.get_dut_host_info(): servo_type = host.get_dut_host_info().get_label_value( servo_constants.SERVO_TYPE_LABEL_PREFIX) return 'ccd_cr50' in servo_type return False def _reset_usbc_pigtail_connection(self, host): """Reset USBC pigtail connection on servo board. To reset need to run 'cc off' and then 'cc srcdts' in usb_console. """ logging.debug('Starting reset USBC pigtail connection.') def _run_command(cc_command): """Run configuration channel commands. @returns: True if pas successful and False if fail. """ try: cmd = (r"echo 'cc %s' | usb_console -d 18d1:501b -s %s" % (cc_command, host.servo_serial)) resp = host.run(cmd, timeout=host.DEFAULT_TERMINAL_TIMEOUT) return True except Exception as e: logging.info('(Non-critical) %s.', e) return False logging.info('Turn off configuration channel. And wait 5 seconds.') if _run_command('off'): # wait till command will be effected time.sleep(5) logging.info('Turn on configuration channel. ' 'And wait 15 seconds.') if _run_command('srcdts'): # wait till command will be effected time.sleep(15) @property def description(self): return 'Toggle cc lines' class _ECRebootRepair(hosts.RepairAction): """ Reboot EC on DUT from servo. """ def _is_applicable(self, host): return (not host.is_localhost()) and host.is_ec_supported() @timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC) def repair(self, host): host.get_servo().ec_reboot() @property def description(self): return 'Reboot EC' class _DutRebootRepair(hosts.RepairAction): """ Reboot DUT to recover some servo controls depending on EC console. Some servo controls, like lid_open, requires communicating with DUT through EC UART console. Failure of this kinds of controls can be recovered by rebooting the DUT. """ @timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC) def repair(self, host): host.get_servo().get_power_state_controller().reset() # Get the lid_open value which requires EC console. lid_open = host.get_servo().get('lid_open') if lid_open != 'yes' and lid_open != 'not_applicable': raise hosts.AutoservVerifyError( 'Still fail to contact EC console after rebooting DUT') @property def description(self): return 'Reset the DUT via servo' class _DiskCleanupRepair(hosts.RepairAction): """ Remove old logs/metrics/crash_dumps on servohost to free up disk space. """ KEEP_LOGS_MAX_DAYS = 5 FILE_TO_REMOVE = [ '/var/lib/metrics/uma-events', '/var/spool/crash/*', '/var/log/chrome/*', '/var/log/ui/*', '/home/chronos/BrowserMetrics/*' ] @timeout_util.TimeoutDecorator(cros_constants.SHORT_REPAIR_TIMEOUT_SEC) def repair(self, host): if host.is_localhost(): # we don't want to remove anything from local testing. return # Remove old servod logs. host.run('/usr/bin/find /var/log/servod_* -mtime +%d -print -delete' % self.KEEP_LOGS_MAX_DAYS, ignore_status=True) # Remove pre-defined metrics and crash dumps. for path in self.FILE_TO_REMOVE: host.run('rm %s' % path, ignore_status=True) @property def description(self): return 'Clean up old logs/metrics on servohost to free up disk space.' class _ServoMicroFlashRepair(hosts.RepairAction): """ Remove old logs/metrics/crash_dumps on servohost to free up disk space. """ _TARGET_SERVO = 'servo_micro' @timeout_util.TimeoutDecorator(cros_constants.REPAIR_TIMEOUT_SEC) def repair(self, host): if not host.is_cros_host(): raise hosts.AutoservRepairError( 'Can\'t restart servod: not running ' 'embedded Chrome OS.', 'servo_not_applicable_to_non_cros_host') servo = host.get_servo() if not servo or self._TARGET_SERVO not in servo.get_servo_type(): logging.info("Servo-micro is not present on set-up") return try: servo_updater.update_servo_firmware(host, boards=(self._TARGET_SERVO, ), force_update=True, ignore_version=True) except Exception as e: logging.debug("(Not critical) Servo device update error: %s", e) raise hosts.AutoservVerifyError( 'Still fail to contact EC console after rebooting DUT') # Update time when we reflashed the fw on the device dhp = host.get_dut_health_profile() dhp.refresh_servo_miro_fw_update_run_time() host.restart_servod() def is_time_to_try(self, dhp): """Verify that it is time when we can try to re-flash fw on servo_micro. Re-flashing limited to once per 2 weeks to avoid over-flashing the servo device. """ today_time = int(time.time()) last_check = dhp.get_servo_micro_fw_update_time_epoch() can_run = today_time > (last_check + (14 * 24 * 60 * 60)) if not can_run: logging.info("The servo_micro fw updated in las 2 weeks ago.") return can_run def _is_applicable(self, host): return (not host.is_localhost() and host.get_dut_health_profile() and self.is_time_to_try(host.get_dut_health_profile())) @property def description(self): return 'Re-flash servo_micro firmware.' def create_servo_repair_strategy(): """ Return a `RepairStrategy` for a `ServoHost`. """ config = ['brd_config', 'ser_config'] verify_dag = [ (repair_utils.SshVerifier, 'servo_ssh', []), (_DiskSpaceVerifier, 'disk_space', ['servo_ssh']), (_UpdateVerifier, 'update', ['servo_ssh']), (_BoardConfigVerifier, 'brd_config', ['servo_ssh']), (_SerialConfigVerifier, 'ser_config', ['servo_ssh']), (_ServodJobVerifier, 'servod_job', config + ['disk_space']), (_TopologyVerifier, 'servo_topology', ['servod_job']), (_ServodConnectionVerifier, 'servod_connection', ['servod_job']), (_ServodControlVerifier, 'servod_control', ['servod_connection']), (_DUTConnectionVerifier, 'dut_connected', ['servod_connection']), (_ServoHubConnectionVerifier, 'hub_connected', ['dut_connected']), (_PowerButtonVerifier, 'pwr_button', ['hub_connected']), (_BatteryVerifier, 'battery', ['hub_connected']), (_LidVerifier, 'lid_open', ['hub_connected']), (_EcBoardVerifier, 'ec_board', ['dut_connected']), (_Cr50ConsoleVerifier, 'cr50_console', ['dut_connected']), (_CCDTestlabVerifier, 'ccd_testlab', ['cr50_console']), (_CCDPowerDeliveryVerifier, 'power_delivery', ['dut_connected']), ] servod_deps = [ 'servod_job', 'servo_topology', 'servod_connection', 'servod_control', 'dut_connected', 'hub_connected', 'pwr_button', 'cr50_console' ] repair_actions = [ (_DiskCleanupRepair, 'disk_cleanup', ['servo_ssh'], ['disk_space' ]), (_ServoMicroFlashRepair, 'servo_micro_flash', ['servo_ssh', 'servo_topology'], ['dut_connected']), (_RestartServod, 'restart', ['servo_ssh'], config + servod_deps), (_ServoRebootRepair, 'servo_reboot', ['servo_ssh'], servod_deps), (_ToggleCCLineRepair, 'servo_cc', ['servo_ssh'], servod_deps), (_DutRebootRepair, 'dut_reboot', ['servod_connection'], ['servod_control', 'lid_open', 'ec_board']), (_ECRebootRepair, 'ec_reboot', ['servod_connection'], ['servod_control', 'lid_open', 'ec_board']), ] return hosts.RepairStrategy(verify_dag, repair_actions, 'servo')