• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5#
6# Expects to be run in an environment with sudo and no interactive password
7# prompt, such as within the Chromium OS development chroot.
8
9import ast
10import logging
11import os
12import re
13import six
14import six.moves.xmlrpc_client
15import six.moves.http_client
16import time
17
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.common_lib import lsbrelease_utils
20from autotest_lib.client.common_lib import seven
21from autotest_lib.server import utils as server_utils
22from autotest_lib.server.cros.servo import firmware_programmer
23from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
24
25
26# Regex to match XMLRPC errors due to a servod control not existing.
27# Servod uses both 'No control %s' and 'No control named %s' in exceptions.
28NO_CONTROL_RE = re.compile(r'No control(?: named)? (?P<name>\w*\.?\w*)')
29
30# Please see servo/drv/pty_driver.py for error messages to match.
31
32# This common prefix can apply to all subtypes of console errors.
33# The first portion is an optional qualifier of the type
34# of error that occurred. Each error is or'd.
35CONSOLE_COMMON_RE = (r'((Timeout waiting for response|'
36                     r'Known error [\w\'\".\s]+). )?'
37                     # The second portion is an optional name for the console
38                     # source
39                     r'(\w+\: )?')
40
41# Regex to match XMLRPC errors due to a console being unresponsive.
42NO_CONSOLE_OUTPUT_RE = re.compile(r'%sNo data was sent from the pty\.' %
43                                  CONSOLE_COMMON_RE)
44
45
46# Regex to match XMLRPC errors due to a console control failing, but the
47# underlying Console being responsive.
48CONSOLE_MISMATCH_RE = re.compile(r'%sThere was output:' % CONSOLE_COMMON_RE)
49
50
51# The minimum voltage on the charger port on servo v4 that is expected. This is
52# to query whether a charger is plugged into servo v4 and thus pd control
53# capabilities can be used.
54V4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400
55
56
57class ControlUnavailableError(error.TestFail):
58    """Custom error class to indicate a control is unavailable on servod."""
59    pass
60
61
62class ConsoleError(error.TestFail):
63    """Common error class for servod console-back control failures."""
64    pass
65
66
67class UnresponsiveConsoleError(ConsoleError):
68    """Error for: A console control fails for lack of console output."""
69    pass
70
71
72class ResponsiveConsoleError(ConsoleError):
73    """Error for: A console control fails but console is responsive."""
74    pass
75
76
77class ServodBadResponse(six.moves.http_client.BadStatusLine):
78    """Indicates a bad HTTP response from servod"""
79
80    def __init__(self, when, line):
81        """
82
83        @param when: Description of the operation being performed (get/set)
84        @param line: The line that came from the server, often an empty string.
85        """
86        super(ServodBadResponse, self).__init__(line)
87        self.when = when
88
89    def __str__(self):
90        """String representation of the exception"""
91        return '%s -- StatusLine=%s' % (self.when, self.line)
92
93
94class ServodEmptyResponse(ServodBadResponse):
95    """Indicates an empty response from servod, possibly because it exited."""
96    pass
97
98
99class ServodConnectionError(seven.SOCKET_ERRORS[0]):
100    """Indicates socket errors seen during communication with servod"""
101
102    def __init__(self, when, errno, strerror, filename):
103        """Instance initializer
104
105        The filename is used to add details to the exception message:
106        [Errno 104] Connection reset by peer: "<Servo 'ipaddr:9999'>"
107
108        @param when: Description of the operation being performed at the time
109        @param errno: errno value, such as ECONNRESET
110        @param strerror: OS-provided description ("connection reset by peer")
111        @param filename: Something to report as a path, such as a socket address
112        """
113        # [Errno 104] [Setting ctrl:val] Connection reset by peer: <Servo...
114        self.when = when
115        super(ServodConnectionError, self).__init__(errno, strerror, filename)
116
117    def __str__(self):
118        """String representation of the exception"""
119        msgv = [self.when]
120        if self.errno is not None or self.strerror is not None:
121            msgv.append('--')
122        if self.errno is not None:
123            msgv.append('[Errno %d]' % self.errno)
124        if self.strerror is not None:
125            msgv.append(self.strerror)
126        return '%s: %r' % (' '.join(msgv), self.filename)
127
128
129# TODO: once in python 3, inherit from AbstractContextManager
130class _WrapServoErrors(object):
131    """
132    Wrap an operation, replacing BadStatusLine and socket.error with
133    servo-specific versions, and extracting exception info from xmlrplib.Fault.
134
135    @param servo_name: The servo object, used to add the servo name to errors.
136                       See the ServodConnectionError docstring.
137    @param description:  String to use when describing what was being done
138    @raise ServodBadStatusLine: if exception is a httplib.BadStatusLine
139    @raise ServodSocketError: if exception is a socket.error
140    @raise ControlUnavailableError: if Fault matches NO_CONTROL_RE
141    @raise UnresponsiveConsoleError: if Fault matches NO_CONSOLE_OUTPUT_RE
142    @raise ResponsiveConsoleError: if Fault matches CONSOLE_MISMATCH_RE
143    """
144
145    def __init__(self, servo, description):
146        self.servo_name = str(servo)
147        self.description = description
148
149    @staticmethod
150    def _get_xmlrpclib_exception(xmlexc):
151        """Get meaningful exception string from xmlrpc.
152
153        Args:
154            xmlexc: xmlrpclib.Fault object
155
156        xmlrpclib.Fault.faultString has the following format:
157
158        <type 'exception type'>:'actual error message'
159
160        Parse and return the real exception from the servod side instead of the
161        less meaningful one like,
162           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
163           attribute 'hw_driver'">
164
165        Returns:
166            string of underlying exception raised in servod.
167        """
168        return re.sub('^.*>:', '', xmlexc.faultString)
169
170    @staticmethod
171    def _log_exception(exc_type, exc_val, exc_tb):
172        """Log exception information"""
173        if exc_val is not None:
174            logging.debug(
175                    'Wrapped exception:', exc_info=(exc_type, exc_val, exc_tb))
176
177    def __enter__(self):
178        """Enter the context"""
179        return self
180
181    def __exit__(self, exc_type, exc_val, exc_tb):
182        """Exit the context, handling the exception if there was one"""
183        try:
184            if isinstance(exc_val, six.moves.http_client.BadStatusLine):
185                self._log_exception(exc_type, exc_val, exc_tb)
186                if exc_val.line in ('', "''"):
187                    err = ServodEmptyResponse(self.description, exc_val.line)
188                else:
189                    err = ServodBadResponse(self.description, exc_val.line)
190                six.reraise(err.__class__, err, exc_tb)
191
192            if isinstance(exc_val, seven.SOCKET_ERRORS):
193                self._log_exception(exc_type, exc_val, exc_tb)
194                if len(exc_val.args) == 0:
195                    errno = None
196                    strerror = None
197                elif len(exc_val.args) == 1:
198                    errno = None
199                    strerror = exc_val.args[0]
200                else:
201                    errno = exc_val.args[0]
202                    strerror = exc_val.args[1]
203                err = ServodConnectionError(self.description, errno, strerror,
204                                            self.servo_name)
205                six.reraise(err.__class__, err, exc_tb)
206
207            if isinstance(exc_val, six.moves.xmlrpc_client.Fault):
208                err_str = self._get_xmlrpclib_exception(exc_val)
209                err_msg = '%s :: %s' % (self.description, err_str)
210                unknown_ctrl = re.search(NO_CONTROL_RE, err_str)
211                if not unknown_ctrl:
212                    # Log the full text for errors, except unavailable controls.
213                    self._log_exception(exc_type, exc_val, exc_tb)
214                    logging.debug(err_msg)
215                if unknown_ctrl:
216                    # The error message for unavailable controls is huge, since
217                    # it reports all known controls.  Don't log the full text.
218                    unknown_ctrl_name = unknown_ctrl.group('name')
219                    logging.debug('%s :: No control named %r',
220                                  self.description, unknown_ctrl_name)
221                    err = ControlUnavailableError(
222                            'No control named %r' % unknown_ctrl_name)
223                elif re.search(NO_CONSOLE_OUTPUT_RE, err_str):
224                    err = UnresponsiveConsoleError(
225                            'Console not printing output. %s.' %
226                            self.description)
227                elif re.search(CONSOLE_MISMATCH_RE, err_str):
228                    err = ResponsiveConsoleError(
229                            'Control failed but console alive. %s.' %
230                            self.description)
231                else:
232                    err = error.TestFail(err_msg)
233                six.reraise(err.__class__, err, exc_tb)
234        finally:
235            del exc_tb
236
237
238def _extract_image_from_tarball(tarball, dest_dir, image_candidates, timeout):
239    """Try extracting the image_candidates from the tarball.
240
241    @param tarball: The path of the tarball.
242    @param dest_path: The path of the destination.
243    @param image_candidates: A tuple of the paths of image candidates.
244    @param timeout: Time to wait in seconds before timing out.
245
246    @return: The first path from the image candidates, which succeeds, or None
247             if all the image candidates fail.
248    """
249
250    # Create the firmware_name subdirectory if it doesn't exist
251    if not os.path.exists(dest_dir):
252        os.mkdir(dest_dir)
253
254    # Generate a list of all tarball files
255    stdout = server_utils.system_output('tar tf %s' % tarball,
256                                        timeout=timeout,
257                                        ignore_status=True,
258                                        args=image_candidates)
259    tarball_files = stdout.splitlines()
260
261    # Check if image candidates are in the list of tarball files
262    for image in image_candidates:
263        logging.debug("Trying to extract %s (autotest)", image)
264        if image in tarball_files:
265            # Extract and return the first image candidate found
266            tar_cmd = 'tar xf %s -C %s %s' % (tarball, dest_dir, image)
267            status = server_utils.system(tar_cmd,
268                                         timeout=timeout,
269                                         ignore_status=True)
270            if status == 0:
271                return image
272    return None
273
274
275class _PowerStateController(object):
276
277    """Class to provide board-specific power operations.
278
279    This class is responsible for "power on" and "power off"
280    operations that can operate without making assumptions in
281    advance about board state.  It offers an interface that
282    abstracts out the different sequences required for different
283    board types.
284
285    """
286    # Constants acceptable to be passed for the `rec_mode` parameter
287    # to power_on().
288    #
289    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
290    #   SD card.
291    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
292
293    REC_ON = 'rec'
294    REC_OFF = 'on'
295    REC_ON_FORCE_MRC = 'rec_force_mrc'
296
297    # Delay in seconds needed between asserting and de-asserting
298    # warm reset.
299    _RESET_HOLD_TIME = 0.5
300
301
302    def __init__(self, servo):
303        """Initialize the power state control.
304
305        @param servo Servo object providing the underlying `set` and `get`
306                     methods for the target controls.
307
308        """
309        self._servo = servo
310        self.supported = self._servo.has_control('power_state')
311        self.last_rec_mode = self.REC_OFF
312        if not self.supported:
313            logging.info('Servo setup does not support power-state operations. '
314                         'All power-state calls will lead to error.TestFail')
315
316    def _check_supported(self):
317        """Throw an error if dts mode control is not supported."""
318        if not self.supported:
319            raise error.TestFail('power_state controls not supported')
320
321    def reset(self):
322        """Force the DUT to reset.
323
324        The DUT is guaranteed to be on at the end of this call,
325        regardless of its previous state, provided that there is
326        working OS software. This also guarantees that the EC has
327        been restarted.
328
329        """
330        self._check_supported()
331        self._servo.set_nocheck('power_state', 'reset')
332
333    def cr50_reset(self):
334        """Force the DUT to reset.
335
336        The DUT is guaranteed to be on at the end of this call,
337        regardless of its previous state, provided that there is
338        working OS software. This also guarantees that the EC has
339        been restarted. Works only for ccd connections.
340
341        """
342        self._check_supported()
343        self._servo.set_nocheck('power_state', 'cr50_reset')
344
345    def warm_reset(self):
346        """Apply warm reset to the DUT.
347
348        This asserts, then de-asserts the 'warm_reset' signal.
349        Generally, this causes the board to restart.
350
351        """
352        # TODO: warm_reset support has added to power_state.py. Once it
353        # available to labstation remove fallback method.
354        self._check_supported()
355        try:
356            self._servo.set_nocheck('power_state', 'warm_reset')
357        except error.TestFail as err:
358            logging.info("Fallback to warm_reset control method")
359            self._servo.set_get_all(['warm_reset:on',
360                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
361                                 'warm_reset:off'])
362
363    def power_off(self):
364        """Force the DUT to power off.
365
366        The DUT is guaranteed to be off at the end of this call,
367        regardless of its previous state, provided that there is
368        working EC and boot firmware.  There is no requirement for
369        working OS software.
370
371        """
372        self._check_supported()
373        self._servo.set_nocheck('power_state', 'off')
374
375    def power_on(self, rec_mode=REC_OFF):
376        """Force the DUT to power on.
377
378        Prior to calling this function, the DUT must be powered off,
379        e.g. with a call to `power_off()`.
380
381        At power on, recovery mode is set as specified by the
382        corresponding argument.  When booting with recovery mode on, it
383        is the caller's responsibility to unplug/plug in a bootable
384        external storage device.
385
386        If the DUT requires a delay after powering on but before
387        processing inputs such as USB stick insertion, the delay is
388        handled by this method; the caller is not responsible for such
389        delays.
390
391        @param rec_mode Setting of recovery mode to be applied at
392                        power on. default: REC_OFF aka 'off'
393
394        """
395        self._check_supported()
396        self._servo.set_nocheck('power_state', rec_mode)
397        self.last_rec_mode = rec_mode
398
399    def retry_power_on(self):
400        """Retry powering on the DUT.
401
402        After power_on(...) the system might not come up reliably, although
403        the reasons aren't known yet. This function retries turning on the
404        system again, trying to bring it in the last state that power_on()
405        attempted to reach.
406        """
407        self._check_supported()
408        self._servo.set_nocheck('power_state', self.last_rec_mode)
409
410
411class _Uart(object):
412    """Class to capture UART streams of CPU, EC, Cr50, etc."""
413    _UartToCapture = ('cpu', 'cr50', 'ec', 'servo_micro', 'servo_v4', 'usbpd',
414                      'ccd_cr50.ec', 'ccd_cr50.cpu', 'ccd_cr50.cr50'
415                      'ccd_gsc.ec', 'ccd_gsc.cpu', 'ccd_gsc.cr50')
416
417
418    def __init__(self, servo):
419        self._servo = servo
420        self._streams = []
421        self.logs_dir = None
422
423    def _start_stop_capture(self, uart, start):
424        """Helper function to start/stop capturing on specified UART.
425
426        @param uart:  The UART name to start/stop capturing.
427        @param start:  True to start capturing, otherwise stop.
428
429        @returns True if the operation completes successfully.
430                 False if the UART capturing is not supported or failed due to
431                 an error.
432        """
433        logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop',
434                      uart)
435        uart_cmd = '%s_uart_capture' % uart
436        target_level = 'on' if start else 'off'
437        level = None
438        if self._servo.has_control(uart_cmd):
439            # Do our own implementation of set() here as not_applicable
440            # should also count as a valid control.
441            logging.debug('Trying to set %s to %s.', uart_cmd, target_level)
442            try:
443                self._servo.set_nocheck(uart_cmd, target_level)
444                level = self._servo.get(uart_cmd)
445            except error.TestFail as e:
446                # Any sort of test failure here should not stop the test. This
447                # is just to capture more output. Log and move on.
448                logging.warning('Failed to set %s to %s. %s. Ignoring.',
449                                uart_cmd, target_level, str(e))
450            if level == target_level:
451                logging.debug('Managed to set %s to %s.', uart_cmd, level)
452            else:
453                logging.debug('Failed to set %s to %s. Got %s.', uart_cmd,
454                              target_level, level)
455        return level == target_level
456
457    def start_capture(self):
458        """Start capturing UART streams."""
459        for uart in self._UartToCapture:
460            # Always try to start the uart. Only add it to _streams if it's not
461            # in the list.
462            if (self._start_stop_capture(uart, True)
463                        and uart not in self._streams):
464                self._streams.append(uart)
465
466    def get_logfile(self, uart):
467        """Return the path to the uart logfile or none if logs_dir isn't set."""
468        if not self.logs_dir:
469            return None
470        return os.path.join(self.logs_dir, '%s_uart.txt' % uart)
471
472    def dump(self):
473        """Dump UART streams to log files accordingly."""
474        if not self.logs_dir:
475            return
476
477        for uart in self._streams:
478            logfile_fullname = self.get_logfile(uart)
479            stream = '%s_uart_stream' % uart
480            try:
481                content = self._servo.get(stream)
482            except Exception as err:
483                logging.warning('Failed to get UART log for %s: %s', stream, err)
484                continue
485
486            if content == 'not_applicable':
487                logging.warning('%s is not applicable', stream)
488                continue
489
490            # The UART stream may contain non-printable characters, and servo
491            # returns it in string representation. We use `ast.leteral_eval`
492            # to revert it back.
493            with open(logfile_fullname, 'a') as fd:
494                try:
495                    fd.write(ast.literal_eval(content))
496                except ValueError:
497                    logging.exception('Invalid value for %s: %r', stream,
498                                      content)
499
500    def stop_capture(self):
501        """Stop capturing UART streams."""
502        for uart in self._UartToCapture:
503            try:
504                self._start_stop_capture(uart, False)
505            except Exception as err:
506                logging.warning('Failed to stop UART logging for %s: %s', uart,
507                             err)
508
509
510class Servo(object):
511
512    """Manages control of a Servo board.
513
514    Servo is a board developed by hardware group to aide in the debug and
515    control of various partner devices. Servo's features include the simulation
516    of pressing the power button, closing the lid, and pressing Ctrl-d. This
517    class manages setting up and communicating with a servo demon (servod)
518    process. It provides both high-level functions for common servo tasks and
519    low-level functions for directly setting and reading gpios.
520
521    """
522
523    # Power button press delays in seconds.
524    #
525    # The EC specification says that 8.0 seconds should be enough
526    # for the long power press.  However, some platforms need a bit
527    # more time.  Empirical testing has found these requirements:
528    #   Alex: 8.2 seconds
529    #   ZGB:  8.5 seconds
530    # The actual value is set to the largest known necessary value.
531    #
532    # TODO(jrbarnette) Being generous is the right thing to do for
533    # existing platforms, but if this code is to be used for
534    # qualification of new hardware, we should be less generous.
535    SHORT_DELAY = 0.1
536
537    # Maximum number of times to re-read power button on release.
538    GET_RETRY_MAX = 10
539
540    # Delays to deal with DUT state transitions.
541    SLEEP_DELAY = 6
542    BOOT_DELAY = 10
543
544    # Default minimum time interval between 'press' and 'release'
545    # keyboard events.
546    SERVO_KEY_PRESS_DELAY = 0.1
547
548    # Time to toggle recovery switch on and off.
549    REC_TOGGLE_DELAY = 0.1
550
551    # Time to toggle development switch on and off.
552    DEV_TOGGLE_DELAY = 0.1
553
554    # Time between an usb disk plugged-in and detected in the system.
555    USB_DETECTION_DELAY = 5
556
557    # Time to wait before timing out on servo initialization.
558    INIT_TIMEOUT_SECS = 10
559
560    # Time to wait before timing out when extracting firmware images.
561    #
562    # This was increased from 60 seconds to support boards with very
563    # large (>500MB) firmware archives taking longer than expected to
564    # extract firmware on the lab host machines (b/149419503).
565    EXTRACT_TIMEOUT_SECS = 900
566
567    # The VBUS voltage threshold used to detect if VBUS is supplied
568    VBUS_THRESHOLD = 3000.0
569
570    # List of servos that connect to a debug header on the board.
571    FLEX_SERVOS = ['c2d2', 'servo_micro', 'servo_v3']
572
573    # List of servos that rely on gsc commands for some part of dut control.
574    GSC_DRV_SERVOS = ['c2d2', 'ccd_gsc', 'ccd_cr50']
575
576    CCD_PREFIX = 'ccd_'
577
578    def __init__(self, servo_host, servo_serial=None, delay_init=False):
579        """Sets up the servo communication infrastructure.
580
581        @param servo_host: A ServoHost object representing
582                           the host running servod.
583        @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost
584        @param servo_serial: Serial number of the servo board.
585        @param delay_init:  Delay cache servo_type and power_state to prevent
586                            attempt to connect to the servod.
587        """
588        # TODO(fdeng): crbug.com/298379
589        # We should move servo_host object out of servo object
590        # to minimize the dependencies on the rest of Autotest.
591        self._servo_host = servo_host
592        self._servo_serial = servo_serial
593        self._servo_type = None
594        self._power_state = None
595        self._programmer = None
596        self._prev_log_inode = None
597        self._prev_log_size = 0
598        self._ccd_watchdog_disabled = False
599        if not delay_init:
600            self._servo_type = self.get_servo_version()
601            self._power_state = _PowerStateController(self)
602        self._uart = _Uart(self)
603
604    def __str__(self):
605        """Description of this object and address, for use in errors"""
606        return "<%s '%s:%s'>" % (
607                type(self).__name__,
608                self._servo_host.hostname,
609                self._servo_host.servo_port)
610
611    @property
612    def _server(self):
613        with _WrapServoErrors(
614                servo=self, description='get_servod_server_proxy()'):
615            return self._servo_host.get_servod_server_proxy()
616
617    @property
618    def servo_serial(self):
619        """Returns the serial number of the servo board."""
620        return self._servo_serial
621
622    def get_power_state_controller(self):
623        """Return the power state controller for this Servo.
624
625        The power state controller provides board-independent
626        interfaces for reset, power-on, power-off operations.
627
628        """
629        if self._power_state is None:
630            self._power_state = _PowerStateController(self)
631        return self._power_state
632
633
634    def initialize_dut(self, cold_reset=False, enable_main=True):
635        """Initializes a dut for testing purposes.
636
637        This sets various servo signals back to default values
638        appropriate for the target board.  By default, if the DUT
639        is already on, it stays on.  If the DUT is powered off
640        before initialization, its state afterward is unspecified.
641
642        Rationale:  Basic initialization of servo sets the lid open,
643        when there is a lid.  This operation won't affect powered on
644        units; however, setting the lid open may power on a unit
645        that's off, depending on the board type and previous state
646        of the device.
647
648        If `cold_reset` is a true value, the DUT and its EC will be
649        reset, and the DUT rebooted in normal mode.
650
651        @param cold_reset If True, cold reset the device after
652                          initialization.
653        @param enable_main If True, make sure the main servo device has
654                           control of the dut.
655
656        """
657        if enable_main:
658            self.enable_main_servo_device()
659
660        with _WrapServoErrors(
661                servo=self, description='initialize_dut()->hwinit()'):
662            self._server.hwinit()
663        if self.has_control('usb_mux_oe1'):
664            self.set('usb_mux_oe1', 'on')
665            self.switch_usbkey('off')
666        else:
667            logging.warning('Servod command \'usb_mux_oe1\' is not available. '
668                            'Any USB drive related servo routines will fail.')
669        # Create a record of SBU voltages if this is running support servo (v4,
670        # v4p1).
671        # TODO(coconutruben): eventually, replace this with a metric to track
672        # SBU voltages wrt servo-hw/dut-hw
673        if self.has_control('servo_dut_sbu1_mv'):
674            # Attempt to take a reading of sbu1 and sbu2 multiple times to
675            # account for situations where the two lines exchange hi/lo roles
676            # frequently.
677            for i in range(10):
678                try:
679                    sbu1 = int(self.get('servo_dut_sbu1_mv'))
680                    sbu2 = int(self.get('servo_dut_sbu2_mv'))
681                    logging.info('attempt %d sbu1 %d sbu2 %d', i, sbu1, sbu2)
682                except error.TestFail as e:
683                    # This is a nice to have but if reading this fails, it
684                    # shouldn't interfere with the test.
685                    logging.exception(e)
686        self._uart.start_capture()
687        # Run testlab open if servo relies on ccd to control the dut.
688        if self.main_device_uses_gsc_drv():
689            self.set_nocheck('cr50_testlab', 'open')
690        if cold_reset:
691            if not self.get_power_state_controller().supported:
692                logging.info('Cold-reset for DUT requested, but servo '
693                             'setup does not support power_state. Skipping.')
694            else:
695                self.get_power_state_controller().reset()
696        with _WrapServoErrors(
697                servo=self, description='initialize_dut()->get_version()'):
698            version = self._server.get_version()
699        logging.debug('Servo initialized, version is %s', version)
700
701
702    def is_localhost(self):
703        """Is the servod hosted locally?
704
705        Returns:
706          True if local hosted; otherwise, False.
707        """
708        return self._servo_host.is_localhost()
709
710
711    def get_os_version(self):
712        """Returns the chromeos release version."""
713        lsb_release_content = self.system_output('cat /etc/lsb-release',
714                                                 ignore_status=True)
715        return lsbrelease_utils.get_chromeos_release_builder_path(
716                    lsb_release_content=lsb_release_content)
717
718
719    def get_servod_version(self):
720        """Returns the servod version."""
721        # TODO: use system_output once servod --sversion prints to stdout
722        try:
723            result = self._servo_host.run('servod --sversion 2>&1')
724        except error.AutoservRunError as e:
725            if 'command execution error' in str(e):
726                # Fall back to version if sversion is not supported yet.
727                result = self._servo_host.run('servod --version 2>&1')
728                return result.stdout.strip() or result.stderr.strip()
729            # An actually unexpected error occurred, just raise.
730            raise e
731        sversion = result.stdout or result.stderr
732        # The sversion output contains 3 lines:
733        # servod v1.0.816-ff8e966 // the extended version with git hash
734        # 2020-04-08 01:10:29 // the time of the latest commit
735        # chromeos-ci-legacy-us-central1-b-x32-55-u8zc // builder information
736        # For debugging purposes, we mainly care about the version, and the
737        # timestamp.
738        if type(sversion) == type(b' '):
739            sversion = sversion.decode("utf-8")
740        return ' '.join(sversion.split()[1:4])
741
742
743    def power_long_press(self):
744        """Simulate a long power button press."""
745        # After a long power press, the EC may ignore the next power
746        # button press (at least on Alex).  To guarantee that this
747        # won't happen, we need to allow the EC one second to
748        # collect itself.
749        # long_press is defined as 8.5s in servod
750        self.power_key('long_press')
751
752
753    def power_normal_press(self):
754        """Simulate a normal power button press."""
755        # press is defined as 1.2s in servod
756        self.power_key('press')
757
758
759    def power_short_press(self):
760        """Simulate a short power button press."""
761        # tab is defined as 0.2s in servod
762        self.power_key('tab')
763
764
765    def power_key(self, press_secs='tab'):
766        """Simulate a power button press.
767
768        @param press_secs: int, float, str; time to press key in seconds or
769                           known shorthand: 'tab' 'press' 'long_press'
770        """
771        # TODO(b/224804060): use the power_key control for all servo types when
772        # c2d2 has a defined power_key driver.
773        if 'c2d2' not in self.get_servo_type():
774            self.set_nocheck('power_key', press_secs)
775            return
776        if isinstance(press_secs, str):
777            if press_secs == 'tab':
778                press_secs = 0.2
779            elif press_secs == 'press':
780                press_secs = 1.2
781            elif press_secs == 'long_press':
782                press_secs = 8.5
783            else:
784                raise error.TestError('Invalid press %r' % press_secs)
785        logging.info('Manual power button press for %ds', press_secs)
786        self.set_nocheck('pwr_button', 'press')
787        time.sleep(press_secs)
788        self.set_nocheck('pwr_button', 'release')
789
790
791    def pwr_button(self, action='press'):
792        """Simulate a power button press.
793
794        @param action: str; could be press or could be release.
795        """
796        self.set_nocheck('pwr_button', action)
797
798
799    def lid_open(self):
800        """Simulate opening the lid and raise exception if all attempts fail"""
801        self.set('lid_open', 'yes')
802
803
804    def lid_close(self):
805        """Simulate closing the lid and raise exception if all attempts fail
806
807        Waits 6 seconds to ensure the device is fully asleep before returning.
808        """
809        self.set('lid_open', 'no')
810        time.sleep(Servo.SLEEP_DELAY)
811
812
813    def vbus_power_get(self):
814        """Get current vbus_power."""
815        return self.get('vbus_power')
816
817
818    def volume_up(self, timeout=300):
819        """Simulate pushing the volume down button.
820
821        @param timeout: Timeout for setting the volume.
822        """
823        self.set_get_all(['volume_up:yes',
824                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
825                          'volume_up:no'])
826        # we need to wait for commands to take effect before moving on
827        time_left = float(timeout)
828        while time_left > 0.0:
829            value = self.get('volume_up')
830            if value == 'no':
831                return
832            time.sleep(self.SHORT_DELAY)
833            time_left = time_left - self.SHORT_DELAY
834        raise error.TestFail("Failed setting volume_up to no")
835
836    def volume_down(self, timeout=300):
837        """Simulate pushing the volume down button.
838
839        @param timeout: Timeout for setting the volume.
840        """
841        self.set_get_all(['volume_down:yes',
842                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
843                          'volume_down:no'])
844        # we need to wait for commands to take effect before moving on
845        time_left = float(timeout)
846        while time_left > 0.0:
847            value = self.get('volume_down')
848            if value == 'no':
849                return
850            time.sleep(self.SHORT_DELAY)
851            time_left = time_left - self.SHORT_DELAY
852        raise error.TestFail("Failed setting volume_down to no")
853
854    def arrow_up(self, press_secs='tab'):
855        """Simulate arrow up key presses.
856
857        @param press_secs: int, float, str; time to press key in seconds or
858                           known shorthand: 'tab' 'press' 'long_press'.
859        """
860        # TODO: Remove this check after a lab update to include CL:1913684
861        if not self.has_control('arrow_up'):
862            logging.warning('Control arrow_up ignored. '
863                            'Please update hdctools')
864            return
865        self.set_nocheck('arrow_up', press_secs)
866
867    def arrow_down(self, press_secs='tab'):
868        """Simulate arrow down key presses.
869
870        @param press_secs: int, float, str; time to press key in seconds or
871                           known shorthand: 'tab' 'press' 'long_press'.
872        """
873        # TODO: Remove this check after a lab update to include CL:1913684
874        if not self.has_control('arrow_down'):
875            logging.warning('Control arrow_down ignored. '
876                            'Please update hdctools')
877            return
878        self.set_nocheck('arrow_down', press_secs)
879
880    def ctrl_d(self, press_secs='tab'):
881        """Simulate Ctrl-d simultaneous button presses.
882
883        @param press_secs: int, float, str; time to press key in seconds or
884                           known shorthand: 'tab' 'press' 'long_press'
885        """
886        self.set_nocheck('ctrl_d', press_secs)
887
888
889    def ctrl_r(self, press_secs='tab'):
890        """Simulate Ctrl-r simultaneous button presses.
891
892        @param press_secs: int, float, str; time to press key in seconds or
893                           known shorthand: 'tab' 'press' 'long_press'
894        """
895        self.set_nocheck('ctrl_r', press_secs)
896
897    def ctrl_s(self, press_secs='tab'):
898        """Simulate Ctrl-s simultaneous button presses.
899
900        @param press_secs: int, float, str; time to press key in seconds or
901                           known shorthand: 'tab' 'press' 'long_press'
902        """
903        self.set_nocheck('ctrl_s', press_secs)
904
905
906    def ctrl_u(self, press_secs='tab'):
907        """Simulate Ctrl-u simultaneous button presses.
908
909        @param press_secs: int, float, str; time to press key in seconds or
910                           known shorthand: 'tab' 'press' 'long_press'
911        """
912        self.set_nocheck('ctrl_u', press_secs)
913
914
915    def ctrl_enter(self, press_secs='tab'):
916        """Simulate Ctrl-enter simultaneous button presses.
917
918        @param press_secs: int, float, str; time to press key in seconds or
919                           known shorthand: 'tab' 'press' 'long_press'
920        """
921        self.set_nocheck('ctrl_enter', press_secs)
922
923
924    def ctrl_key(self, press_secs='tab'):
925        """Simulate Enter key button press.
926
927        @param press_secs: int, float, str; time to press key in seconds or
928                           known shorthand: 'tab' 'press' 'long_press'
929        """
930        self.set_nocheck('ctrl_key', press_secs)
931
932
933    def enter_key(self, press_secs='tab'):
934        """Simulate Enter key button press.
935
936        @param press_secs: int, float, str; time to press key in seconds or
937                           known shorthand: 'tab' 'press' 'long_press'
938        """
939        self.set_nocheck('enter_key', press_secs)
940
941
942    def refresh_key(self, press_secs='tab'):
943        """Simulate Refresh key (F3) button press.
944
945        @param press_secs: int, float, str; time to press key in seconds or
946                           known shorthand: 'tab' 'press' 'long_press'
947        """
948        self.set_nocheck('refresh_key', press_secs)
949
950
951    def ctrl_refresh_key(self, press_secs='tab'):
952        """Simulate Ctrl and Refresh (F3) simultaneous press.
953
954        This key combination is an alternative of Space key.
955
956        @param press_secs: int, float, str; time to press key in seconds or
957                           known shorthand: 'tab' 'press' 'long_press'
958        """
959        self.set_nocheck('ctrl_refresh_key', press_secs)
960
961
962    def imaginary_key(self, press_secs='tab'):
963        """Simulate imaginary key button press.
964
965        Maps to a key that doesn't physically exist.
966
967        @param press_secs: int, float, str; time to press key in seconds or
968                           known shorthand: 'tab' 'press' 'long_press'
969        """
970        self.set_nocheck('imaginary_key', press_secs)
971
972
973    def sysrq_x(self, press_secs='tab'):
974        """Simulate Alt VolumeUp X simulataneous press.
975
976        This key combination is the kernel system request (sysrq) X.
977
978        @param press_secs: int, float, str; time to press key in seconds or
979                           known shorthand: 'tab' 'press' 'long_press'
980        """
981        self.set_nocheck('sysrq_x', press_secs)
982
983
984    def toggle_recovery_switch(self):
985        """Toggle recovery switch on and off."""
986        self.enable_recovery_mode()
987        time.sleep(self.REC_TOGGLE_DELAY)
988        self.disable_recovery_mode()
989
990
991    def enable_recovery_mode(self):
992        """Enable recovery mode on device."""
993        self.set('rec_mode', 'on')
994
995
996    def disable_recovery_mode(self):
997        """Disable recovery mode on device."""
998        self.set('rec_mode', 'off')
999
1000
1001    def toggle_development_switch(self):
1002        """Toggle development switch on and off."""
1003        self.enable_development_mode()
1004        time.sleep(self.DEV_TOGGLE_DELAY)
1005        self.disable_development_mode()
1006
1007
1008    def enable_development_mode(self):
1009        """Enable development mode on device."""
1010        self.set('dev_mode', 'on')
1011
1012
1013    def disable_development_mode(self):
1014        """Disable development mode on device."""
1015        self.set('dev_mode', 'off')
1016
1017    def boot_devmode(self):
1018        """Boot a dev-mode device that is powered off."""
1019        self.power_short_press()
1020        self.pass_devmode()
1021
1022
1023    def pass_devmode(self):
1024        """Pass through boot screens in dev-mode."""
1025        time.sleep(Servo.BOOT_DELAY)
1026        self.ctrl_d()
1027        time.sleep(Servo.BOOT_DELAY)
1028
1029
1030    def get_board(self):
1031        """Get the board connected to servod."""
1032        with _WrapServoErrors(servo=self, description='get_board()'):
1033            return self._server.get_board()
1034
1035
1036    def get_base_board(self):
1037        """Get the board of the base connected to servod."""
1038        try:
1039            with _WrapServoErrors(servo=self, description='get_base_board()'):
1040                return self._server.get_base_board()
1041        except six.moves.xmlrpc_client.Fault as e:
1042            # TODO(waihong): Remove the following compatibility check when
1043            # the new versions of hdctools are deployed.
1044            if 'not supported' in str(e):
1045                logging.warning('The servod is too old that get_base_board '
1046                                'not supported.')
1047                return ''
1048            raise
1049
1050    def can_set_active_device(self):
1051        """Returns True if the servo setup supports setting the active device
1052
1053        Servo can only change the active device if there are multiple devices
1054        and servo has the active_dut_controller control.
1055        """
1056        return ('_and_' in self.get_servo_type()
1057                and self.has_control('active_dut_controller'))
1058
1059    def get_active_device_prefix(self):
1060        """Return ccd_(gsc|cr50) or '' if the main device is active."""
1061        active_device = ''
1062        if self.can_set_active_device():
1063            # If servo v4 is allowing dual_v4 devices, then choose the
1064            # active device.
1065            active_device = self.get('active_dut_controller')
1066            if active_device == self.get_main_servo_device():
1067                active_device = ''
1068        return active_device
1069
1070    def get_ec_board(self):
1071        """Get the board name from EC."""
1072
1073        return self.get('ec_board', prefix=self.get_active_device_prefix())
1074
1075    def get_ec_active_copy(self):
1076        """Get the active copy of the EC image."""
1077        return self.get('ec_active_copy')
1078
1079    def has_control(self, ctrl_name, prefix=''):
1080        """Query servod server to determine if |ctrl_name| is a valid control.
1081
1082        @param ctrl_name Name of the control.
1083        @param prefix: prefix to route control to correct servo device.
1084
1085        @returns: true if |ctrl_name| is a known control, false otherwise.
1086        """
1087        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1088        try:
1089            # If the control exists, doc() will work.
1090            with _WrapServoErrors(
1091                    servo=self,
1092                    description='has_control(%s)->doc()' % ctrl_name):
1093                self._server.doc(ctrl_name)
1094            return True
1095        except ControlUnavailableError:
1096            return False
1097
1098    def _build_ctrl_name(self, ctrl_name, prefix):
1099        """Helper to build the control name if a prefix is used.
1100
1101        @param ctrl_name Name of the control.
1102        @param prefix: prefix to route control to correct servo device.
1103
1104        @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty.
1105        """
1106        assert ctrl_name
1107        if prefix:
1108            return '%s.%s' % (prefix, ctrl_name)
1109        return ctrl_name
1110
1111    def get(self, ctrl_name, prefix=''):
1112        """Get the value of a gpio from Servod.
1113
1114        @param ctrl_name Name of the control.
1115        @param prefix: prefix to route control to correct servo device.
1116
1117        @returns: server response to |ctrl_name| request.
1118
1119        @raise ControlUnavailableError: if |ctrl_name| not a known control.
1120        @raise error.TestFail: for all other failures doing get().
1121        """
1122        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1123        with _WrapServoErrors(
1124                servo=self, description='Getting %s' % ctrl_name):
1125            return self._server.get(ctrl_name)
1126
1127    def set(self, ctrl_name, ctrl_value, prefix=''):
1128        """Set and check the value of a gpio using Servod.
1129
1130        @param ctrl_name: Name of the control.
1131        @param ctrl_value: New setting for the control.
1132        @param prefix: prefix to route control to correct servo device.
1133        @raise error.TestFail: if the control value fails to change.
1134        """
1135        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1136        self.set_nocheck(ctrl_name, ctrl_value)
1137        retry_count = Servo.GET_RETRY_MAX
1138        actual_value = self.get(ctrl_name)
1139        while ctrl_value != actual_value and retry_count:
1140            logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value,
1141                            retry_count)
1142            retry_count -= 1
1143            time.sleep(Servo.SHORT_DELAY)
1144            actual_value = self.get(ctrl_name)
1145
1146        if ctrl_value != actual_value:
1147            raise error.TestFail(
1148                    'Servo failed to set %s to %s. Got %s.'
1149                    % (ctrl_name, ctrl_value, actual_value))
1150
1151    def set_nocheck(self, ctrl_name, ctrl_value, prefix=''):
1152        """Set the value of a gpio using Servod.
1153
1154        @param ctrl_name Name of the control.
1155        @param ctrl_value New setting for the control.
1156        @param prefix: prefix to route control to correct servo device.
1157
1158        @raise ControlUnavailableError: if |ctrl_name| not a known control.
1159        @raise error.TestFail: for all other failures doing set().
1160        """
1161        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1162        # The real danger here is to pass a None value through the xmlrpc.
1163        assert ctrl_value is not None
1164        description = 'Setting %s to %r' % (ctrl_name, ctrl_value)
1165        logging.debug('%s', description)
1166        with _WrapServoErrors(servo=self, description=description):
1167            self._server.set(ctrl_name, ctrl_value)
1168
1169    def set_get_all(self, controls):
1170        """Set &| get one or more control values.
1171
1172        @param controls: list of strings, controls to set &| get.
1173
1174        @raise: error.TestError in case error occurs setting/getting values.
1175        """
1176        description = 'Set/get all: %s' % str(controls)
1177        logging.debug('%s', description)
1178        with _WrapServoErrors(servo=self, description=description):
1179            return self._server.set_get_all(controls)
1180
1181
1182    def probe_host_usb_dev(self):
1183        """Probe the USB disk device plugged-in the servo from the host side.
1184
1185        It uses servod to discover if there is a usb device attached to it.
1186
1187        @return: String of USB disk path (e.g. '/dev/sdb') or None.
1188        """
1189        # Set up Servo's usb mux.
1190        return self.get('image_usbkey_dev') or None
1191
1192
1193    def image_to_servo_usb(self, image_path=None,
1194                           make_image_noninteractive=False,
1195                           power_off_dut=True):
1196        """Install an image to the USB key plugged into the servo.
1197
1198        This method may copy any image to the servo USB key including a
1199        recovery image or a test image.  These images are frequently used
1200        for test purposes such as restoring a corrupted image or conducting
1201        an upgrade of ec/fw/kernel as part of a test of a specific image part.
1202
1203        @param image_path: Path on the host to the recovery image.
1204        @param make_image_noninteractive: Make the recovery image
1205                                   noninteractive, therefore the DUT
1206                                   will reboot automatically after
1207                                   installation.
1208        @param power_off_dut: To put the DUT in power off mode.
1209        """
1210        # We're about to start plugging/unplugging the USB key.  We
1211        # don't know the state of the DUT, or what it might choose
1212        # to do to the device after hotplug.  To avoid surprises,
1213        # force the DUT to be off.
1214        if power_off_dut:
1215            self.get_power_state_controller().power_off()
1216
1217        if image_path:
1218            logging.info('Searching for usb device and copying image to it. '
1219                         'Please wait a few minutes...')
1220            # The servod control automatically sets up the host in the host
1221            # direction.
1222            try:
1223                self.set_nocheck('download_image_to_usb_dev', image_path)
1224            except error.TestFail as e:
1225                logging.error('Failed to transfer requested image to USB. %s.'
1226                              'Please take a look at Servo Logs.', str(e))
1227                raise error.AutotestError('Download image to usb failed.')
1228            if make_image_noninteractive:
1229                logging.info('Making image noninteractive')
1230                try:
1231                    dev = self.probe_host_usb_dev()
1232                    if not dev:
1233                        # This is fine but also should never happen: if we
1234                        # successfully download an image but somehow cannot
1235                        # find the stick again, it needs to be investigated.
1236                        raise error.TestFail('No image usb key detected '
1237                                             'after successful download. '
1238                                             'Please investigate.')
1239                    # The modification has to happen on partition 1.
1240                    dev_partition = '%s1' % dev
1241                    self.set_nocheck('make_usb_dev_image_noninteractive',
1242                                     dev_partition)
1243                except error.TestFail as e:
1244                    logging.error('Failed to make image noninteractive. %s.'
1245                                  'Please take a look at Servo Logs.',
1246                                  str(e))
1247
1248    def boot_in_recovery_mode(self, snk_mode=False):
1249        """Boot host DUT in recovery mode.
1250
1251        @param snk_mode: If True, switch servo_v4 role to 'snk' mode before
1252                         boot DUT into recovery mode.
1253        """
1254        # This call has a built-in delay to ensure that we wait a timeout
1255        # for the stick to enumerate and settle on the DUT side.
1256        self.switch_usbkey('dut')
1257        # Switch servo_v4 mode to snk as the DUT won't able to see usb drive
1258        # in recovery mode if the servo is in src mode(see crbug.com/1129165).
1259        if snk_mode:
1260            logging.info('Setting servo_v4 role to snk mode in order to make'
1261                         ' the DUT can see usb drive while in recovery mode.')
1262            self.set_servo_v4_role('snk')
1263
1264        try:
1265            power_state = self.get_power_state_controller()
1266            power_state.power_on(rec_mode=power_state.REC_ON)
1267        except error.TestFail as e:
1268            self.set_servo_v4_role('src')
1269            logging.error('Failed to boot DUT in recovery mode. %s.', str(e))
1270            raise error.AutotestError('Failed to boot DUT in recovery mode.')
1271
1272    def install_recovery_image(self,
1273                               image_path=None,
1274                               make_image_noninteractive=False,
1275                               snk_mode=False):
1276        """Install the recovery image specified by the path onto the DUT.
1277
1278        This method uses google recovery mode to install a recovery image
1279        onto a DUT through the use of a USB stick that is mounted on a servo
1280        board specified by the usb_dev.  If no image path is specified
1281        we use the recovery image already on the usb image.
1282
1283        This method will switch servo_v4 role to 'snk' mode in order to make
1284        the DUT can see the usb drive plugged on servo, the caller should
1285        set servo_v4 role back to 'src' mode one the DUT exit recovery mode.
1286
1287        @param image_path: Path on the host to the recovery image.
1288        @param make_image_noninteractive: Make the recovery image
1289                noninteractive, therefore the DUT will reboot automatically
1290                after installation.
1291        @param snk_mode: If True, switch servo_v4 role to 'snk' mode before
1292                         boot DUT into recovery mode.
1293        """
1294        self.image_to_servo_usb(image_path, make_image_noninteractive)
1295        # Give the DUT some time to power_off if we skip
1296        # download image to usb. (crbug.com/982993)
1297        if not image_path:
1298            time.sleep(10)
1299        self.boot_in_recovery_mode(snk_mode=snk_mode)
1300
1301
1302    def _scp_image(self, image_path):
1303        """Copy image to the servo host.
1304
1305        When programming a firmware image on the DUT, the image must be
1306        located on the host to which the servo device is connected. Sometimes
1307        servo is controlled by a remote host, in this case the image needs to
1308        be transferred to the remote host. This adds the servod port number, to
1309        make sure tests for different DUTs don't trample on each other's files.
1310        Along with the firmware image, any subsidiary files in the same
1311        directory shall be copied to the host as well.
1312
1313        @param image_path: a string, name of the firmware image file to be
1314               transferred.
1315        @return: a string, full path name of the copied file on the remote.
1316        """
1317        src_path = os.path.dirname(image_path)
1318        dest_path = os.path.join('/tmp', 'dut_%d' % self._servo_host.servo_port)
1319        logging.info('Copying %s to %s', src_path, dest_path)
1320        # Copy a directory, src_path to dest_path. send_file() will create a
1321        # directory named basename(src_path) under dest_path, and copy all files
1322        # in src_path to the destination.
1323        self._servo_host.send_file(src_path, dest_path, delete_dest=True)
1324
1325        # Make a image path of the destination.
1326        # e.g. /tmp/dut_9999/EC/ec.bin
1327        rv = os.path.join(dest_path, os.path.basename(src_path))
1328        return os.path.join(rv, os.path.basename(image_path))
1329
1330
1331    def system(self, command, timeout=3600):
1332        """Execute the passed in command on the servod host.
1333
1334        @param command Command to be executed.
1335        @param timeout Maximum number of seconds of runtime allowed. Default to
1336                       1 hour.
1337        """
1338        logging.info('Will execute on servo host: %s', command)
1339        self._servo_host.run(command, timeout=timeout)
1340
1341
1342    def system_output(self, command, timeout=3600,
1343                      ignore_status=False, args=()):
1344        """Execute the passed in command on the servod host, return stdout.
1345
1346        @param command a string, the command to execute
1347        @param timeout an int, max number of seconds to wait til command
1348               execution completes. Default to 1 hour.
1349        @param ignore_status a Boolean, if true - ignore command's nonzero exit
1350               status, otherwise an exception will be thrown
1351        @param args a tuple of strings, each becoming a separate command line
1352               parameter for the command
1353        @return command's stdout as a string.
1354        """
1355        return self._servo_host.run(command, timeout=timeout,
1356                                    ignore_status=ignore_status,
1357                                    args=args).stdout.strip()
1358
1359
1360    def get_servo_version(self, active=False):
1361        """Get the version of the servo, e.g., servo_v2 or servo_v3.
1362
1363        @param active: Only return the servo type with the active device.
1364        @return: The version of the servo.
1365
1366        """
1367        with _WrapServoErrors(
1368                servo=self, description='get_servo_version()->get_version()'):
1369            servo_type = self._server.get_version()
1370        if '_and_' not in servo_type or not active:
1371            return servo_type
1372
1373        # If servo v4 is using ccd and servo micro, modify the servo type to
1374        # reflect the active device.
1375        active_device = self.get('active_dut_controller')
1376        if active_device in servo_type:
1377            logging.info('%s is active', active_device)
1378            return 'servo_v4_with_' + active_device
1379
1380        logging.warning("%s is active even though it's not in servo type",
1381                     active_device)
1382        return servo_type
1383
1384
1385    def get_servo_type(self):
1386        if self._servo_type is None:
1387            self._servo_type = self.get_servo_version()
1388        return self._servo_type
1389
1390    def get_servo_v4_type(self):
1391        """Return the servo_v4_type (such as 'type-c'), or None if not v4."""
1392        if not hasattr(self, '_servo_v4_type'):
1393            if 'servo_v4' in self.get_servo_type():
1394                self._servo_v4_type = self.get('root.dut_connection_type')
1395            else:
1396                self._servo_v4_type = None
1397        return self._servo_v4_type
1398
1399    def is_servo_v4_type_a(self):
1400        """True if the servo is v4 and type-a, else False."""
1401        return self.get_servo_v4_type() == 'type-a'
1402
1403    def is_servo_v4_type_c(self):
1404        """True if the servo is v4 and type-c, else False."""
1405        return self.get_servo_v4_type() == 'type-c'
1406
1407    def get_main_servo_device(self):
1408        """Return the main servo device"""
1409        return self.get_servo_type().split('_with_')[-1].split('_and_')[0]
1410
1411
1412    def enable_main_servo_device(self):
1413        """Make sure the main device has control of the dut."""
1414        if not self.can_set_active_device():
1415            return
1416        self.set('active_dut_controller', self.get_main_servo_device())
1417
1418    def get_ccd_servo_device(self):
1419        """Return the ccd servo device or '' if no ccd devices are connected."""
1420        servo_type = self.get_servo_type()
1421        if 'ccd' not in servo_type:
1422            return ''
1423        return servo_type.split('_with_')[-1].split('_and_')[-1]
1424
1425    def active_device_is_ccd(self):
1426        """Returns True if a ccd device is active."""
1427        return 'ccd' in self.get_servo_version(active=True)
1428
1429    def enable_ccd_servo_device(self):
1430        """Make sure the ccd device has control of the dut.
1431
1432        Returns True if the ccd device is in control of the dut.
1433        """
1434        if self.active_device_is_ccd():
1435            return True
1436        ccd_device = self.get_ccd_servo_device()
1437        if not self.can_set_active_device() or not ccd_device:
1438            return False
1439        self.set('active_dut_controller', ccd_device)
1440        return True
1441
1442    def main_device_is_ccd(self):
1443        """Whether the main servo device (no prefixes) is a ccd device."""
1444        servo = self.get_servo_type()
1445        return 'ccd' in servo and not self.main_device_is_flex()
1446
1447    def main_device_is_flex(self):
1448        """Whether the main servo device (no prefixes) is a legacy device."""
1449        servo = self.get_servo_type()
1450        return any([flex in servo for flex in self.FLEX_SERVOS])
1451
1452    def main_device_uses_gsc_drv(self):
1453        """Whether the main servo device uses gsc drivers.
1454
1455        Servo may use gsc wp or console commands to control the dut. These
1456        get restricted with ccd capabilities. This returns true if some of
1457        the servo functionality will be disabled if ccd is restricted.
1458        """
1459        return self.get_main_servo_device() in self.GSC_DRV_SERVOS
1460
1461    def _initialize_programmer(self, rw_only=False):
1462        """Initialize the firmware programmer.
1463
1464        @param rw_only: True to initialize a programmer which only
1465                        programs the RW portions.
1466        """
1467        if self._programmer:
1468            return
1469        # Initialize firmware programmer
1470        servo_type = self.get_servo_type()
1471        if servo_type.startswith('servo_v2'):
1472            self._programmer = firmware_programmer.ProgrammerV2(self)
1473            self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
1474        # Both servo v3 and v4 use the same programming methods so just leverage
1475        # ProgrammerV3 for servo v4 as well.
1476        elif (servo_type.startswith('servo_v3')
1477              or servo_type.startswith('servo_v4')):
1478            self._programmer = firmware_programmer.ProgrammerV3(self)
1479            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
1480        else:
1481            raise error.TestError(
1482                    'No firmware programmer for servo version: %s' %
1483                    self.get_servo_type())
1484
1485
1486    def program_bios(self, image, rw_only=False, copy_image=True):
1487        """Program bios on DUT with given image.
1488
1489        @param image: a string, file name of the BIOS image to program
1490                      on the DUT.
1491        @param rw_only: True to only program the RW portion of BIOS.
1492        @param copy_image: True indicates we need scp the image to servohost
1493                           while False means the image file is already on
1494                           servohost.
1495        @return: a string, full path name of the copied file on the remote.
1496        """
1497        self._initialize_programmer()
1498        # We don't need scp if test runs locally.
1499        if copy_image and not self.is_localhost():
1500            image = self._scp_image(image)
1501        if rw_only:
1502            self._programmer_rw.program_bios(image)
1503        else:
1504            self._programmer.program_bios(image)
1505        return image
1506
1507
1508    def program_ec(self, image, rw_only=False, copy_image=True):
1509        """Program ec on DUT with given image.
1510
1511        @param image: a string, file name of the EC image to program
1512                      on the DUT.
1513        @param rw_only: True to only program the RW portion of EC.
1514        @param copy_image: True indicates we need scp the image to servohost
1515                           while False means the image file is already on
1516                           servohost.
1517        @return: a string, full path name of the copied file on the remote.
1518        """
1519        self._initialize_programmer()
1520        # We don't need scp if test runs locally.
1521        if copy_image and not self.is_localhost():
1522            image = self._scp_image(image)
1523        if rw_only:
1524            self._programmer_rw.program_ec(image)
1525        else:
1526            self._programmer.program_ec(image)
1527        return image
1528
1529
1530    def extract_ec_image(self, board, model, tarball_path, fake_image=False):
1531        """Helper function to extract EC image from downloaded tarball.
1532
1533        @param board: The DUT board name.
1534        @param model: The DUT model name.
1535        @param tarball_path: The path of the downloaded build tarball.
1536        @param fake_image: True to return a fake zero-filled image instead.
1537
1538        @return: Path to extracted EC image.
1539        """
1540
1541        # Ignore extracting EC image and re-programming if not a Chrome EC
1542        chrome_ec = FAFTConfig(board).chrome_ec
1543        if not chrome_ec:
1544            logging.warning('Not a Chrome EC, ignore re-programming it')
1545            return None
1546
1547        # Most boards use the model name as the ec directory.
1548        ec_image_candidates = ['%s/ec.bin' % model]
1549
1550        if model == "dragonair":
1551            ec_image_candidates.append('dratini/ec.bin')
1552
1553        # If that isn't found try the name from the EC RO version.
1554        try:
1555            fw_target = self.get_ec_board().lower()
1556            ec_image_candidates.append('%s/ec.bin' % fw_target)
1557        except Exception as err:
1558            logging.warning('Failed to get ec_board value; ignoring')
1559
1560        # Fallback to the name of the board, and then a bare ec.bin.
1561        ec_image_candidates.append('%s/ec.bin' % board)
1562        ec_image_candidates.append('ec.bin')
1563
1564        # Extract EC image from tarball
1565        dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC')
1566        ec_image = _extract_image_from_tarball(tarball_path,
1567                                               dest_dir,
1568                                               ec_image_candidates,
1569                                               self.EXTRACT_TIMEOUT_SECS)
1570
1571        # Check if EC image was found and return path or raise error
1572        if ec_image:
1573            # Extract subsidiary binaries for EC
1574            # Find a monitor binary for NPCX_UUT chip type, if any.
1575            mon_candidates = [candidate.replace('ec.bin', 'npcx_monitor.bin')
1576                              for candidate in ec_image_candidates]
1577            _extract_image_from_tarball(tarball_path, dest_dir, mon_candidates,
1578                                        self.EXTRACT_TIMEOUT_SECS)
1579
1580            if fake_image:
1581                # Create a small (25% of original size) zero-filled binary to
1582                # replace the real ec_image
1583                file_size = os.path.getsize(ec_image) / 4
1584                ec_image = os.path.join(os.path.dirname(ec_image),
1585                                        "zero_ec.bin")
1586                dump_cmd = 'dd if=/dev/zero of=%s bs=4096 count=%d' % (
1587                        os.path.join(dest_dir, ec_image), file_size / 4096)
1588                if server_utils.system(dump_cmd, ignore_status=True) != 0:
1589                    return None
1590
1591            return os.path.join(dest_dir, ec_image)
1592        else:
1593            raise error.TestError('Failed to extract EC image from %s' %
1594                                  tarball_path)
1595
1596
1597    def extract_bios_image(self, board, model, tarball_path):
1598        """Helper function to extract BIOS image from downloaded tarball.
1599
1600        @param board: The DUT board name.
1601        @param model: The DUT model name.
1602        @param tarball_path: The path of the downloaded build tarball.
1603
1604        @return: Path to extracted BIOS image.
1605        """
1606
1607        # Most boards use the model name as the image filename.
1608        bios_image_candidates = [
1609                'image-%s.bin' % model,
1610        ]
1611
1612        if model == "dragonair":
1613            bios_image_candidates.append('image-dratini.bin')
1614
1615        # If that isn't found try the name from the EC RO version.
1616        try:
1617            fw_target = self.get_ec_board().lower()
1618            bios_image_candidates.append('image-%s.bin' % fw_target)
1619        except Exception as err:
1620            logging.warning('Failed to get ec_board value; ignoring')
1621
1622        # Fallback to the name of the board, and then a bare image.bin.
1623        bios_image_candidates.append('image-%s.bin' % board)
1624        bios_image_candidates.append('image.bin')
1625
1626        # Extract BIOS image from tarball
1627        dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS')
1628        bios_image = _extract_image_from_tarball(tarball_path,
1629                                                 dest_dir,
1630                                                 bios_image_candidates,
1631                                                 self.EXTRACT_TIMEOUT_SECS)
1632
1633        # Check if BIOS image was found and return path or raise error
1634        if bios_image:
1635            return os.path.join(dest_dir, bios_image)
1636        else:
1637            raise error.TestError('Failed to extract BIOS image from %s' %
1638                                  tarball_path)
1639
1640
1641    def switch_usbkey(self, usb_state):
1642        """Connect USB flash stick to either host or DUT, or turn USB port off.
1643
1644        This function switches the servo multiplexer to provide electrical
1645        connection between the USB port J3 and either host or DUT side. It
1646        can also be used to turn the USB port off.
1647
1648        @param usb_state: A string, one of 'dut', 'host', or 'off'.
1649                          'dut' and 'host' indicate which side the
1650                          USB flash device is required to be connected to.
1651                          'off' indicates turning the USB port off.
1652
1653        @raise: error.TestError in case the parameter is not 'dut'
1654                'host', or 'off'.
1655        """
1656        if self.get_usbkey_state() == usb_state:
1657            return
1658
1659        if usb_state == 'off':
1660            self.set_nocheck('image_usbkey_pwr', 'off')
1661            return
1662        elif usb_state == 'host':
1663            mux_direction = 'servo_sees_usbkey'
1664        elif usb_state == 'dut':
1665            mux_direction = 'dut_sees_usbkey'
1666        else:
1667            raise error.TestError('Unknown USB state request: %s' % usb_state)
1668        # On the servod side, this control will ensure that
1669        # - the port is power cycled if it is changing directions
1670        # - the port ends up in a powered state after this call
1671        # - if facing the host side, the call only returns once a usb block
1672        #   device is detected, or after a generous timeout (10s)
1673        self.set('image_usbkey_direction', mux_direction)
1674        # As servod makes no guarantees when switching to the dut side,
1675        # add a detection delay here when facing the dut.
1676        if mux_direction == 'dut_sees_usbkey':
1677            time.sleep(self.USB_DETECTION_DELAY)
1678
1679    def get_usbkey_state(self):
1680        """Get which side USB is connected to or 'off' if usb power is off.
1681
1682        @return: A string, one of 'dut', 'host', or 'off'.
1683        """
1684        pwr = self.get('image_usbkey_pwr')
1685        if pwr == 'off':
1686            return pwr
1687        direction = self.get('image_usbkey_direction')
1688        if direction == 'servo_sees_usbkey':
1689            return 'host'
1690        if direction == 'dut_sees_usbkey':
1691            return 'dut'
1692        raise error.TestFail('image_usbkey_direction set an unknown mux '
1693                             'direction: %s' % direction)
1694
1695    def set_servo_v4_role(self, role):
1696        """Set the power role of servo v4, either 'src' or 'snk'.
1697
1698        It does nothing if not a servo v4.
1699
1700        @param role: Power role for DUT port on servo v4, either 'src' or 'snk'.
1701        """
1702        if not self.get_servo_type().startswith('servo_v4'):
1703            logging.debug('Not a servo v4, unable to set role to %s.', role)
1704            return
1705
1706        if not self.has_control('servo_pd_role'):
1707            logging.debug(
1708                    'Servo does not has servo_v4_role control, unable'
1709                    ' to set role to %s.', role)
1710            return
1711
1712        value = self.get('servo_pd_role')
1713        if value != role:
1714            self.set_nocheck('servo_pd_role', role)
1715        else:
1716            logging.debug('Already in the role: %s.', role)
1717
1718    def get_servo_v4_role(self):
1719        """Get the power role of servo v4, either 'src' or 'snk'.
1720
1721        It returns None if not a servo v4.
1722        """
1723        if not self.get_servo_type().startswith('servo_v4'):
1724            logging.debug('Not a servo v4, unable to get role')
1725            return None
1726
1727        if not self.has_control('servo_pd_role'):
1728            logging.debug(
1729                    'Servo does not has servo_v4_role control, unable'
1730                    ' to get the role.')
1731            return None
1732
1733        return self.get('servo_pd_role')
1734
1735    def set_servo_v4_pd_comm(self, en):
1736        """Set the PD communication of servo v4, either 'on' or 'off'.
1737
1738        It does nothing if not a servo v4.
1739
1740        @param en: a string of 'on' or 'off' for PD communication.
1741        """
1742        if self.get_servo_type().startswith('servo_v4'):
1743            self.set_nocheck('servo_pd_comm', en)
1744        else:
1745            logging.debug('Not a servo v4, unable to set PD comm to %s.', en)
1746
1747    def supports_built_in_pd_control(self):
1748        """Return whether the servo type supports pd charging and control."""
1749        # Only servo v4 type-c supports this feature.
1750        if not self.is_servo_v4_type_c():
1751            logging.info('PD controls require a servo v4 type-c.')
1752            return False
1753        # Lastly, one cannot really do anything without a plugged in charger.
1754        chg_port_mv = self.get('ppchg5_mv')
1755        if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV:
1756            logging.info(
1757                    'It appears that no charger is plugged into servo v4. '
1758                    'Charger port voltage: %dmV', chg_port_mv)
1759            return False
1760        logging.info('Charger port voltage: %dmV', chg_port_mv)
1761        return True
1762
1763    def dts_mode_is_valid(self):
1764        """Return whether servo setup supports dts mode control for cr50."""
1765        # Only servo v4 type-c supports this feature.
1766        return self.is_servo_v4_type_c()
1767
1768    def dts_mode_is_safe(self):
1769        """Return whether servo setup supports dts mode without losing access.
1770
1771        DTS mode control exists but the main device might go through ccd.
1772        In that case, it's only safe to control dts mode if the main device
1773        is legacy as otherwise the connection to the main device cuts out.
1774        """
1775        return self.dts_mode_is_valid() and self.main_device_is_flex()
1776
1777    def get_dts_mode(self):
1778        """Return servo dts mode.
1779
1780        @returns: on/off whether dts is on or off
1781        """
1782        if not self.dts_mode_is_valid():
1783            logging.info('Not a valid servo setup. Unable to get dts mode.')
1784            return
1785        return self.get('servo_dts_mode')
1786
1787    def ccd_watchdog_enable(self, enable):
1788        """Control the ccd watchdog."""
1789        if 'ccd' not in self.get_servo_type():
1790            return
1791        if self._ccd_watchdog_disabled and enable:
1792            logging.info('CCD watchdog disabled for test')
1793            return
1794        control = 'watchdog_add' if enable else 'watchdog_remove'
1795        self.set_nocheck(control, 'ccd')
1796
1797    def disable_ccd_watchdog_for_test(self):
1798        """Prevent servo from enabling the watchdog."""
1799        self._ccd_watchdog_disabled = True
1800        self.ccd_watchdog_enable(False)
1801
1802    def allow_ccd_watchdog_for_test(self):
1803        """Allow servo to enable the ccd watchdog."""
1804        self._ccd_watchdog_disabled = False
1805        self.ccd_watchdog_enable(True)
1806
1807    def set_dts_mode(self, state):
1808        """Set servo dts mode to off or on.
1809
1810        It does nothing if not a servo v4. Disable the ccd watchdog if we're
1811        disabling dts mode. CCD will disconnect. The watchdog only allows CCD
1812        to disconnect for 10 seconds until it kills servod. Disable the
1813        watchdog, so CCD can stay disconnected indefinitely.
1814
1815        @param state: Set servo v4 dts mode 'off' or 'on'.
1816        """
1817        if not self.dts_mode_is_valid():
1818            logging.info('Not a valid servo setup. Unable to set dts mode %s.',
1819                         state)
1820            return
1821
1822        enable_watchdog = state == 'on'
1823
1824        if not enable_watchdog:
1825            self.ccd_watchdog_enable(False)
1826
1827        self.set_nocheck('servo_dts_mode', state)
1828
1829        if enable_watchdog:
1830            self.ccd_watchdog_enable(True)
1831
1832
1833    def _get_servo_type_fw_version(self, servo_type, prefix=''):
1834        """Helper to handle fw retrieval for micro/v4 vs ccd.
1835
1836        @param servo_type: one of 'servo_v4', 'servo_micro', 'c2d2',
1837                           'ccd_cr50', or 'ccd_gsc'
1838        @param prefix: whether the control has a prefix
1839
1840        @returns: fw version for non-ccd devices, cr50 version for ccd device
1841        """
1842        # If it's a ccd device, remove the 'ccd_' prefix to find the firmware
1843        # name.
1844        if servo_type.startswith(self.CCD_PREFIX):
1845            servo_type = servo_type[len(self.CCD_PREFIX)::]
1846        cmd = '%s_version' % servo_type
1847        try:
1848            return self.get(cmd, prefix=prefix)
1849        except error.TestFail:
1850            # Do not fail here, simply report the version as unknown.
1851            logging.warning('Unable to query %r to get servo fw version.', cmd)
1852            return 'unknown'
1853
1854
1855    def get_servo_fw_versions(self):
1856        """Retrieve a summary of attached servos and their firmware.
1857
1858        Note: that only the Google firmware owned servos supports this e.g.
1859        micro, v4, etc. For anything else, the dictionary will have no entry.
1860        If no device is has Google owned firmware (e.g. v3) then the result
1861        is an empty dictionary.
1862
1863        @returns: dict, a collection of each attached servo & their firmware.
1864        """
1865        def get_fw_version_tag(tag, dev):
1866            return '%s_version.%s' % (dev, tag)
1867
1868        fw_versions = {}
1869        # Note, this works because v4p1 starts with v4 as well.
1870        # TODO(coconutruben): make this more robust so that it can work on
1871        # a future v-whatever as well.
1872        if 'servo_v4' not in self.get_servo_type():
1873            return {}
1874        # v4 or v4p1
1875        v4_flavor = self.get_servo_type().split('_with_')[0]
1876        v4_tag = get_fw_version_tag('root', v4_flavor)
1877        fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_fw',
1878                                                              prefix='root')
1879        if 'with' in self.get_servo_type():
1880            dut_devs = self.get_servo_type().split('_with_')[1].split('_and_')
1881            main_tag = get_fw_version_tag('main', dut_devs[0])
1882            fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0])
1883            if len(dut_devs) == 2:
1884                # Right now, the only way for this to happen is for a dual setup
1885                # to exist where ccd is attached on top of servo micro. Thus, we
1886                # know that the prefix is ccd_cr50 and the type is ccd_cr50.
1887                # TODO(coconutruben): If the new servod is not deployed by
1888                # the time that there are more cases of '_and_' devices,
1889                # this needs to be reworked.
1890                dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1])
1891                fw = self._get_servo_type_fw_version(dut_devs[1], dut_devs[1])
1892                fw_versions[dual_tag] = fw
1893        return fw_versions
1894
1895    @property
1896    def uart_logs_dir(self):
1897        """Return the directory to save UART logs."""
1898        return self._uart.logs_dir if self._uart else ""
1899
1900
1901    @uart_logs_dir.setter
1902    def uart_logs_dir(self, logs_dir):
1903        """Set directory to save UART logs.
1904
1905        @param logs_dir  String of directory name."""
1906        self._uart.logs_dir = logs_dir
1907
1908    def get_uart_logfile(self, uart):
1909        """Return the path to the uart log file."""
1910        return self._uart.get_logfile(uart)
1911
1912    def record_uart_capture(self, outdir=None):
1913        """Save uart stream output."""
1914        if outdir and not self.uart_logs_dir:
1915            self.uart_logs_dir = outdir
1916        self._uart.dump()
1917
1918    def close(self, outdir=None):
1919        """Close the servo object."""
1920        # We want to ensure that servo_v4 is in src mode to avoid DUTs
1921        # left in discharge state after a task.
1922        try:
1923            self.set_servo_v4_role('src')
1924        except Exception as e:
1925            logging.info(
1926                    'Unexpected error while setting servo_v4 role'
1927                    ' to src; %s', e)
1928
1929        self._uart.stop_capture()
1930        self.record_uart_capture(outdir)
1931
1932    def ec_reboot(self):
1933        """Reboot Just the embedded controller."""
1934        self.set_nocheck('ec_uart_flush', 'off')
1935        self.set_nocheck('ec_uart_cmd', 'reboot')
1936        self.set_nocheck('ec_uart_flush', 'on')
1937
1938    def get_vbus_voltage(self):
1939        """Get the voltage of VBUS'.
1940
1941        @returns The voltage of VBUS, if vbus_voltage is supported.
1942                 None               , if vbus_voltage is not supported.
1943        """
1944        if not self.has_control('vbus_voltage'):
1945            logging.debug('Servo does not have vbus_voltage control,'
1946                          'unable to get vbus voltage')
1947            return None
1948
1949        return self.get('vbus_voltage')
1950
1951    def supports_eth_power_control(self):
1952        """True if servo supports power management for ethernet dongle."""
1953        return self.has_control('dut_eth_pwr_en')
1954
1955    def set_eth_power(self, state):
1956        """Set ethernet dongle power state, either 'on' or 'off'.
1957
1958        Note: this functionality is supported only on servo v4p1.
1959
1960        @param state: a string of 'on' or 'off'.
1961        """
1962        if state != 'off' and state != 'on':
1963            raise error.TestError('Unknown ethernet power state request: %s' %
1964                                  state)
1965
1966        if not self.supports_eth_power_control():
1967            logging.info('Not a supported servo setup. Unable to set ethernet'
1968                         'dongle power state %s.', state)
1969            return
1970
1971        self.set_nocheck('dut_eth_pwr_en', state)
1972
1973    def eth_power_reset(self):
1974        """Reset ethernet dongle power state if supported'.
1975
1976        It does nothing if servo setup does not support power management for
1977        the etherent dongle, only log information about this.
1978        """
1979        if self.supports_eth_power_control():
1980            logging.info("Resetting servo's Ethernet controller...")
1981            self.set_eth_power('off')
1982            time.sleep(1)
1983            self.set_eth_power('on')
1984        else:
1985            logging.info("Trying to reset servo's Ethernet controller, but"
1986                         "this feature is not supported on used servo setup.")
1987