1# Lint as: python2, python3 2# Copyright 2018 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Helper class for managing charging the DUT with Servo v4.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12import logging 13from six.moves import range 14import time 15 16from autotest_lib.client.common_lib import error 17from autotest_lib.client.common_lib.cros import retry 18 19# Base delay time in seconds for Servo role change and PD negotiation. 20_DELAY_SEC = 0.1 21# Total delay time in minutes for Servo role change and PD negotiation. 22_TIMEOUT_MIN = 0.3 23# Exponential backoff for Servo role change and PD negotiation. 24_BACKOFF = 2 25# Number of attempts to recover Servo v4. 26_RETRYS = 3 27# Seconds to wait after resetting the role on a recovery attempt 28# before trying to set it to the intended role again. 29_RECOVERY_WAIT_SEC = 1 30# Delay to wait before polling whether the role as been changed successfully. 31_ROLE_SETTLING_DELAY_SEC = 1 32# Timeout in minutes to attempt checking AC information over ssh. 33# Ethernet connection through the v4 flickers on role change. The usb 34# adapter needs to reenumerate and the DUT reconnect before information can be 35# queried. This delay has proven sufficient to overcome this in the current 36# implementation. 37_ETH_REENUMERATE_TIMEOUT_MIN = 1 38 39 40def _invert_role(role): 41 """Helper to invert the role. 42 43 @param role: role to invert 44 45 @returns: 46 'src' if |role| is 'snk' 47 'snk' if |role| is 'src' 48 """ 49 return 'src' if role == 'snk' else 'snk' 50 51class ServoV4ChargeManager(object): 52 """A helper class for managing charging the DUT with Servo v4.""" 53 54 def __init__(self, host, servo): 55 """Check for correct Servo setup. 56 57 Make sure that Servo is v4 and can manage charging. Make sure that DUT 58 responds to Servo charging commands. Restore Servo v4 power role after 59 sanity check. 60 61 @param host: CrosHost object representing the DUT or None. 62 If host is None, then the is_ac_connected check on the 63 host object is skipped. 64 @param servo: a proxy for servod. 65 """ 66 super(ServoV4ChargeManager, self).__init__() 67 self._host = host 68 self._servo = servo 69 if not self._servo.supports_built_in_pd_control(): 70 raise error.TestNAError('Servo setup does not support PD control. ' 71 'Check logs for details.') 72 73 self._original_role = self._servo.get('servo_v4_role') 74 if self._original_role == 'snk': 75 self.start_charging() 76 self.stop_charging() 77 elif self._original_role == 'src': 78 self.stop_charging() 79 self.start_charging() 80 else: 81 raise error.TestNAError('Unrecognized Servo v4 power role: %s.' % 82 self._original_role) 83 84 # TODO(b/129882930): once both sides are stable, remove the _retry_wrapper 85 # wrappers as they aren't needed anymore. The current motivation for the 86 # retry loop in the autotest framework is to have a 'stable' library i.e. 87 # retries but also a mechanism and and easy to remove bridge once the bug 88 # is fixed, and we don't require the bandaid anymore. 89 90 def _retry_wrapper(self, role, verify): 91 """Try up to |_RETRYS| times to set the v4 to |role|. 92 93 @param role: string 'src' or 'snk'. If 'src' connect DUT to AC power; if 94 'snk' disconnect DUT from AC power. 95 @param verify: bool to verify that charging started/stopped. 96 97 @returns: number of retries needed for success 98 """ 99 for retry in range(_RETRYS): 100 try: 101 self._change_role(role, verify) 102 return retry 103 except error.TestError as e: 104 if retry < _RETRYS - 1: 105 # Ensure this retry loop and logging isn't run on the 106 # last iteration. 107 logging.warning('Failed to set to %s %d times. %s ' 108 'Trying to cycle through %s to ' 109 'recover.', role, retry + 1, str(e), 110 _invert_role(role)) 111 # Cycle through the other state before retrying. Do not 112 # verify as this is strictly a recovery mechanism - sleep 113 # instead. 114 self._change_role(_invert_role(role), verify=False) 115 time.sleep(_RECOVERY_WAIT_SEC) 116 logging.error('Giving up on %s.', role) 117 raise e 118 119 def stop_charging(self, verify=True): 120 """Cut off AC power supply to DUT with Servo. 121 122 @param verify: whether to verify that charging stopped. 123 124 @returns: number of retries needed for success 125 """ 126 return self._retry_wrapper('snk', verify) 127 128 def start_charging(self, verify=True): 129 """Connect AC power supply to DUT with Servo. 130 131 @param verify: whether to verify that charging started. 132 133 @returns: number of retries needed for success 134 """ 135 return self._retry_wrapper('src', verify) 136 137 def restore_original_setting(self, verify=True): 138 """Restore Servo to original charging setting. 139 140 @param verify: whether to verify that original role was restored. 141 """ 142 self._retry_wrapper(self._original_role, verify) 143 144 def _change_role(self, role, verify=True): 145 """Change Servo PD role and check if DUT responded accordingly. 146 147 @param role: string 'src' or 'snk'. If 'src' connect DUT to AC power; if 148 'snk' disconnect DUT from AC power. 149 @param verify: bool to verify that charging started/stopped. 150 151 @raises error.TestError: if the role did not change successfully. 152 """ 153 self._servo.set_nocheck('servo_v4_role', role) 154 # Sometimes the role reverts quickly. Add a short delay to let the new 155 # role stabilize. 156 time.sleep(_ROLE_SETTLING_DELAY_SEC) 157 158 if not verify: 159 return 160 161 @retry.retry(error.TestError, timeout_min=_TIMEOUT_MIN, 162 delay_sec=_DELAY_SEC, backoff=_BACKOFF) 163 def check_servo_role(role): 164 """Check if servo role is as expected, if not, retry.""" 165 if self._servo.get('servo_v4_role') != role: 166 raise error.TestError('Servo v4 failed to set its PD role to ' 167 '%s.' % role) 168 check_servo_role(role) 169 170 connected = True if role == 'src' else False 171 172 @retry.retry(error.TestError, timeout_min=_TIMEOUT_MIN, 173 delay_sec=_DELAY_SEC, backoff=_BACKOFF) 174 def check_ac_connected(connected): 175 """Check if the EC believes an AC charger is connected.""" 176 if not self._servo.has_control('charger_connected'): 177 # TODO(coconutruben): remove this check once labs have the 178 # latest hdctools with the required control. 179 logging.warn('Could not verify %r control as the ' 180 'control is not available on servod.', 181 'charger_connected') 182 return 183 ec_opinion = self._servo.get('charger_connected') 184 if ec_opinion != connected: 185 str_lookup = {True: 'connected', False: 'disconnected'} 186 msg = ('EC thinks charger is %s but it should be %s.' 187 % (str_lookup[ec_opinion], 188 str_lookup[connected])) 189 raise error.TestError(msg) 190 191 check_ac_connected(connected) 192 193 @retry.retry(error.TestError, timeout_min=_ETH_REENUMERATE_TIMEOUT_MIN, 194 delay_sec=_DELAY_SEC, backoff=_BACKOFF) 195 def check_host_ac(connected): 196 """Check if DUT AC power is as expected, if not, retry.""" 197 if self._host.is_ac_connected() != connected: 198 intent = 'connect' if connected else 'disconnect' 199 raise error.TestError('DUT failed to %s AC power.'% intent) 200 201 if self._host and self._host.is_up_fast(): 202 # If the DUT has been charging in S3/S5/G3, cannot verify. 203 check_host_ac(connected) 204