• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5#
6# Expects to be run in an environment with sudo and no interactive password
7# prompt, such as within the Chromium OS development chroot.
8
9import ast
10import logging
11import os
12import re
13import six
14import six.moves.xmlrpc_client
15import six.moves.http_client
16import time
17
18from autotest_lib.client.common_lib import error
19from autotest_lib.client.common_lib import lsbrelease_utils
20from autotest_lib.client.common_lib import seven
21from autotest_lib.server import utils as server_utils
22from autotest_lib.server.cros.servo import firmware_programmer
23from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
24
25
26# Regex to match XMLRPC errors due to a servod control not existing.
27# Servod uses both 'No control %s' and 'No control named %s' in exceptions.
28NO_CONTROL_RE = re.compile(r'No control(?: named)? (?P<name>\w*\.?\w*)')
29
30# Please see servo/drv/pty_driver.py for error messages to match.
31
32# This common prefix can apply to all subtypes of console errors.
33# The first portion is an optional qualifier of the type
34# of error that occurred. Each error is or'd.
35CONSOLE_COMMON_RE = (r'((Timeout waiting for response|'
36                     r'Known error [\w\'\".\s]+). )?'
37                     # The second portion is an optional name for the console
38                     # source
39                     r'(\w+\: )?')
40
41# Regex to match XMLRPC errors due to a console being unresponsive.
42NO_CONSOLE_OUTPUT_RE = re.compile(r'%sNo data was sent from the pty\.' %
43                                  CONSOLE_COMMON_RE)
44
45
46# Regex to match XMLRPC errors due to a console control failing, but the
47# underlying Console being responsive.
48CONSOLE_MISMATCH_RE = re.compile(r'%sThere was output:' % CONSOLE_COMMON_RE)
49
50
51# The minimum voltage on the charger port on servo v4 that is expected. This is
52# to query whether a charger is plugged into servo v4 and thus pd control
53# capabilities can be used.
54V4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400
55
56
57class ControlUnavailableError(error.TestFail):
58    """Custom error class to indicate a control is unavailable on servod."""
59    pass
60
61
62class ConsoleError(error.TestFail):
63    """Common error class for servod console-back control failures."""
64    pass
65
66
67class UnresponsiveConsoleError(ConsoleError):
68    """Error for: A console control fails for lack of console output."""
69    pass
70
71
72class ResponsiveConsoleError(ConsoleError):
73    """Error for: A console control fails but console is responsive."""
74    pass
75
76
77class ServodBadResponse(six.moves.http_client.BadStatusLine):
78    """Indicates a bad HTTP response from servod"""
79
80    def __init__(self, when, line):
81        """
82
83        @param when: Description of the operation being performed (get/set)
84        @param line: The line that came from the server, often an empty string.
85        """
86        super(ServodBadResponse, self).__init__(line)
87        self.when = when
88
89    def __str__(self):
90        """String representation of the exception"""
91        return '%s -- StatusLine=%s' % (self.when, self.line)
92
93
94class ServodEmptyResponse(ServodBadResponse):
95    """Indicates an empty response from servod, possibly because it exited."""
96    pass
97
98
99class ServodConnectionError(seven.SOCKET_ERRORS[0]):
100    """Indicates socket errors seen during communication with servod"""
101
102    def __init__(self, when, errno, strerror, filename):
103        """Instance initializer
104
105        The filename is used to add details to the exception message:
106        [Errno 104] Connection reset by peer: "<Servo 'ipaddr:9999'>"
107
108        @param when: Description of the operation being performed at the time
109        @param errno: errno value, such as ECONNRESET
110        @param strerror: OS-provided description ("connection reset by peer")
111        @param filename: Something to report as a path, such as a socket address
112        """
113        # [Errno 104] [Setting ctrl:val] Connection reset by peer: <Servo...
114        self.when = when
115        super(ServodConnectionError, self).__init__(errno, strerror, filename)
116
117    def __str__(self):
118        """String representation of the exception"""
119        return '%s -- [Errno %d] %s: %r' % (self.when, self.errno,
120                                            self.strerror, self.filename)
121
122
123# TODO: once in python 3, inherit from AbstractContextManager
124class _WrapServoErrors(object):
125    """
126    Wrap an operation, replacing BadStatusLine and socket.error with
127    servo-specific versions, and extracting exception info from xmlrplib.Fault.
128
129    @param servo_name: The servo object, used to add the servo name to errors.
130                       See the ServodConnectionError docstring.
131    @param description:  String to use when describing what was being done
132    @raise ServodBadStatusLine: if exception is a httplib.BadStatusLine
133    @raise ServodSocketError: if exception is a socket.error
134    @raise ControlUnavailableError: if Fault matches NO_CONTROL_RE
135    @raise UnresponsiveConsoleError: if Fault matches NO_CONSOLE_OUTPUT_RE
136    @raise ResponsiveConsoleError: if Fault matches CONSOLE_MISMATCH_RE
137    """
138
139    def __init__(self, servo, description):
140        self.servo_name = str(servo)
141        self.description = description
142
143    @staticmethod
144    def _get_xmlrpclib_exception(xmlexc):
145        """Get meaningful exception string from xmlrpc.
146
147        Args:
148            xmlexc: xmlrpclib.Fault object
149
150        xmlrpclib.Fault.faultString has the following format:
151
152        <type 'exception type'>:'actual error message'
153
154        Parse and return the real exception from the servod side instead of the
155        less meaningful one like,
156           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
157           attribute 'hw_driver'">
158
159        Returns:
160            string of underlying exception raised in servod.
161        """
162        return re.sub('^.*>:', '', xmlexc.faultString)
163
164    @staticmethod
165    def _log_exception(exc_type, exc_val, exc_tb):
166        """Log exception information"""
167        if exc_val is not None:
168            logging.debug(
169                    'Wrapped exception:', exc_info=(exc_type, exc_val, exc_tb))
170
171    def __enter__(self):
172        """Enter the context"""
173        return self
174
175    def __exit__(self, exc_type, exc_val, exc_tb):
176        """Exit the context, handling the exception if there was one"""
177        try:
178            if isinstance(exc_val, six.moves.http_client.BadStatusLine):
179                self._log_exception(exc_type, exc_val, exc_tb)
180                if exc_val.line in ('', "''"):
181                    err = ServodEmptyResponse(self.description, exc_val.line)
182                else:
183                    err = ServodBadResponse(self.description, exc_val.line)
184                six.reraise(err.__class__, err, exc_tb)
185
186            if isinstance(exc_val, seven.SOCKET_ERRORS):
187                self._log_exception(exc_type, exc_val, exc_tb)
188                err = ServodConnectionError(self.description, exc_val.args[0],
189                                            exc_val.args[1], self.servo_name)
190                six.reraise(err.__class__, err, exc_tb)
191
192            if isinstance(exc_val, six.moves.xmlrpc_client.Fault):
193                err_str = self._get_xmlrpclib_exception(exc_val)
194                err_msg = '%s :: %s' % (self.description, err_str)
195                unknown_ctrl = re.search(NO_CONTROL_RE, err_str)
196                if not unknown_ctrl:
197                    # Log the full text for errors, except unavailable controls.
198                    self._log_exception(exc_type, exc_val, exc_tb)
199                    logging.debug(err_msg)
200                if unknown_ctrl:
201                    # The error message for unavailable controls is huge, since
202                    # it reports all known controls.  Don't log the full text.
203                    unknown_ctrl_name = unknown_ctrl.group('name')
204                    logging.debug('%s :: No control named %r',
205                                  self.description, unknown_ctrl_name)
206                    err = ControlUnavailableError(
207                            'No control named %r' % unknown_ctrl_name)
208                elif re.search(NO_CONSOLE_OUTPUT_RE, err_str):
209                    err = UnresponsiveConsoleError(
210                            'Console not printing output. %s.' %
211                            self.description)
212                elif re.search(CONSOLE_MISMATCH_RE, err_str):
213                    err = ResponsiveConsoleError(
214                            'Control failed but console alive. %s.' %
215                            self.description)
216                else:
217                    err = error.TestFail(err_msg)
218                six.reraise(err.__class__, err, exc_tb)
219        finally:
220            del exc_tb
221
222
223def _extract_image_from_tarball(tarball, dest_dir, image_candidates, timeout):
224    """Try extracting the image_candidates from the tarball.
225
226    @param tarball: The path of the tarball.
227    @param dest_path: The path of the destination.
228    @param image_candidates: A tuple of the paths of image candidates.
229    @param timeout: Time to wait in seconds before timing out.
230
231    @return: The first path from the image candidates, which succeeds, or None
232             if all the image candidates fail.
233    """
234
235    # Create the firmware_name subdirectory if it doesn't exist
236    if not os.path.exists(dest_dir):
237        os.mkdir(dest_dir)
238
239    # Generate a list of all tarball files
240    stdout = server_utils.system_output('tar tf %s' % tarball,
241                                        timeout=timeout,
242                                        ignore_status=True)
243    tarball_files = stdout.splitlines()
244
245    # Check if image candidates are in the list of tarball files
246    for image in image_candidates:
247        if image in tarball_files:
248            # Extract and return the first image candidate found
249            tar_cmd = 'tar xf %s -C %s %s' % (tarball, dest_dir, image)
250            status = server_utils.system(tar_cmd,
251                                         timeout=timeout,
252                                         ignore_status=True)
253            if status == 0:
254                return image
255    return None
256
257
258class _PowerStateController(object):
259
260    """Class to provide board-specific power operations.
261
262    This class is responsible for "power on" and "power off"
263    operations that can operate without making assumptions in
264    advance about board state.  It offers an interface that
265    abstracts out the different sequences required for different
266    board types.
267
268    """
269    # Constants acceptable to be passed for the `rec_mode` parameter
270    # to power_on().
271    #
272    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
273    #   SD card.
274    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
275
276    REC_ON = 'rec'
277    REC_OFF = 'on'
278    REC_ON_FORCE_MRC = 'rec_force_mrc'
279
280    # Delay in seconds needed between asserting and de-asserting
281    # warm reset.
282    _RESET_HOLD_TIME = 0.5
283
284
285    def __init__(self, servo):
286        """Initialize the power state control.
287
288        @param servo Servo object providing the underlying `set` and `get`
289                     methods for the target controls.
290
291        """
292        self._servo = servo
293        self.supported = self._servo.has_control('power_state')
294        self.last_rec_mode = self.REC_OFF
295        if not self.supported:
296            logging.info('Servo setup does not support power-state operations. '
297                         'All power-state calls will lead to error.TestFail')
298
299    def _check_supported(self):
300        """Throw an error if dts mode control is not supported."""
301        if not self.supported:
302            raise error.TestFail('power_state controls not supported')
303
304    def reset(self):
305        """Force the DUT to reset.
306
307        The DUT is guaranteed to be on at the end of this call,
308        regardless of its previous state, provided that there is
309        working OS software. This also guarantees that the EC has
310        been restarted.
311
312        """
313        self._check_supported()
314        self._servo.set_nocheck('power_state', 'reset')
315
316    def cr50_reset(self):
317        """Force the DUT to reset.
318
319        The DUT is guaranteed to be on at the end of this call,
320        regardless of its previous state, provided that there is
321        working OS software. This also guarantees that the EC has
322        been restarted. Works only for ccd connections.
323
324        """
325        self._check_supported()
326        self._servo.set_nocheck('power_state', 'cr50_reset')
327
328    def warm_reset(self):
329        """Apply warm reset to the DUT.
330
331        This asserts, then de-asserts the 'warm_reset' signal.
332        Generally, this causes the board to restart.
333
334        """
335        # TODO: warm_reset support has added to power_state.py. Once it
336        # available to labstation remove fallback method.
337        self._check_supported()
338        try:
339            self._servo.set_nocheck('power_state', 'warm_reset')
340        except error.TestFail as err:
341            logging.info("Fallback to warm_reset control method")
342            self._servo.set_get_all(['warm_reset:on',
343                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
344                                 'warm_reset:off'])
345
346    def power_off(self):
347        """Force the DUT to power off.
348
349        The DUT is guaranteed to be off at the end of this call,
350        regardless of its previous state, provided that there is
351        working EC and boot firmware.  There is no requirement for
352        working OS software.
353
354        """
355        self._check_supported()
356        self._servo.set_nocheck('power_state', 'off')
357
358    def power_on(self, rec_mode=REC_OFF):
359        """Force the DUT to power on.
360
361        Prior to calling this function, the DUT must be powered off,
362        e.g. with a call to `power_off()`.
363
364        At power on, recovery mode is set as specified by the
365        corresponding argument.  When booting with recovery mode on, it
366        is the caller's responsibility to unplug/plug in a bootable
367        external storage device.
368
369        If the DUT requires a delay after powering on but before
370        processing inputs such as USB stick insertion, the delay is
371        handled by this method; the caller is not responsible for such
372        delays.
373
374        @param rec_mode Setting of recovery mode to be applied at
375                        power on. default: REC_OFF aka 'off'
376
377        """
378        self._check_supported()
379        self._servo.set_nocheck('power_state', rec_mode)
380        self.last_rec_mode = rec_mode
381
382    def retry_power_on(self):
383        """Retry powering on the DUT.
384
385        After power_on(...) the system might not come up reliably, although
386        the reasons aren't known yet. This function retries turning on the
387        system again, trying to bring it in the last state that power_on()
388        attempted to reach.
389        """
390        self._check_supported()
391        self._servo.set_nocheck('power_state', self.last_rec_mode)
392
393
394class _Uart(object):
395    """Class to capture UART streams of CPU, EC, Cr50, etc."""
396    _UartToCapture = (
397        'cpu',
398        'cr50',
399        'ec',
400        'servo_micro',
401        'servo_v4',
402        'usbpd',
403        'ccd_cr50.ec',
404        'ccd_cr50.cpu',
405        'ccd_cr50.cr50'
406    )
407
408
409    def __init__(self, servo):
410        self._servo = servo
411        self._streams = []
412        self.logs_dir = None
413
414    def _start_stop_capture(self, uart, start):
415        """Helper function to start/stop capturing on specified UART.
416
417        @param uart:  The UART name to start/stop capturing.
418        @param start:  True to start capturing, otherwise stop.
419
420        @returns True if the operation completes successfully.
421                 False if the UART capturing is not supported or failed due to
422                 an error.
423        """
424        logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop',
425                      uart)
426        uart_cmd = '%s_uart_capture' % uart
427        target_level = 'on' if start else 'off'
428        level = None
429        if self._servo.has_control(uart_cmd):
430            # Do our own implementation of set() here as not_applicable
431            # should also count as a valid control.
432            logging.debug('Trying to set %s to %s.', uart_cmd, target_level)
433            try:
434                self._servo.set_nocheck(uart_cmd, target_level)
435                level = self._servo.get(uart_cmd)
436            except error.TestFail as e:
437                # Any sort of test failure here should not stop the test. This
438                # is just to capture more output. Log and move on.
439                logging.warning('Failed to set %s to %s. %s. Ignoring.',
440                                uart_cmd, target_level, str(e))
441            if level == target_level:
442                logging.debug('Managed to set %s to %s.', uart_cmd, level)
443            else:
444                logging.debug('Failed to set %s to %s. Got %s.', uart_cmd,
445                              target_level, level)
446        return level == target_level
447
448    def start_capture(self):
449        """Start capturing UART streams."""
450        for uart in self._UartToCapture:
451            if self._start_stop_capture(uart, True):
452                self._streams.append(uart)
453
454    def get_logfile(self, uart):
455        """Return the path to the uart logfile or none if logs_dir isn't set."""
456        if not self.logs_dir:
457            return None
458        return os.path.join(self.logs_dir, '%s_uart.txt' % uart)
459
460    def dump(self):
461        """Dump UART streams to log files accordingly."""
462        if not self.logs_dir:
463            return
464
465        for uart in self._streams:
466            logfile_fullname = self.get_logfile(uart)
467            stream = '%s_uart_stream' % uart
468            try:
469                content = self._servo.get(stream)
470            except Exception as err:
471                logging.warn('Failed to get UART log for %s: %s', stream, err)
472                continue
473
474            if content == 'not_applicable':
475                logging.warn('%s is not applicable', stream)
476                continue
477
478            # The UART stream may contain non-printable characters, and servo
479            # returns it in string representation. We use `ast.leteral_eval`
480            # to revert it back.
481            with open(logfile_fullname, 'a') as fd:
482                try:
483                    fd.write(ast.literal_eval(content))
484                except ValueError:
485                    logging.exception('Invalid value for %s: %r', stream,
486                                      content)
487
488    def stop_capture(self):
489        """Stop capturing UART streams."""
490        for uart in self._UartToCapture:
491            try:
492                self._start_stop_capture(uart, False)
493            except Exception as err:
494                logging.warn('Failed to stop UART logging for %s: %s', uart,
495                             err)
496
497
498class Servo(object):
499
500    """Manages control of a Servo board.
501
502    Servo is a board developed by hardware group to aide in the debug and
503    control of various partner devices. Servo's features include the simulation
504    of pressing the power button, closing the lid, and pressing Ctrl-d. This
505    class manages setting up and communicating with a servo demon (servod)
506    process. It provides both high-level functions for common servo tasks and
507    low-level functions for directly setting and reading gpios.
508
509    """
510
511    # Power button press delays in seconds.
512    #
513    # The EC specification says that 8.0 seconds should be enough
514    # for the long power press.  However, some platforms need a bit
515    # more time.  Empirical testing has found these requirements:
516    #   Alex: 8.2 seconds
517    #   ZGB:  8.5 seconds
518    # The actual value is set to the largest known necessary value.
519    #
520    # TODO(jrbarnette) Being generous is the right thing to do for
521    # existing platforms, but if this code is to be used for
522    # qualification of new hardware, we should be less generous.
523    SHORT_DELAY = 0.1
524
525    # Maximum number of times to re-read power button on release.
526    GET_RETRY_MAX = 10
527
528    # Delays to deal with DUT state transitions.
529    SLEEP_DELAY = 6
530    BOOT_DELAY = 10
531
532    # Default minimum time interval between 'press' and 'release'
533    # keyboard events.
534    SERVO_KEY_PRESS_DELAY = 0.1
535
536    # Time to toggle recovery switch on and off.
537    REC_TOGGLE_DELAY = 0.1
538
539    # Time to toggle development switch on and off.
540    DEV_TOGGLE_DELAY = 0.1
541
542    # Time between an usb disk plugged-in and detected in the system.
543    USB_DETECTION_DELAY = 5
544
545    # Time to wait before timing out on servo initialization.
546    INIT_TIMEOUT_SECS = 10
547
548    # Time to wait before timing out when extracting firmware images.
549    #
550    # This was increased from 60 seconds to support boards with very
551    # large (>500MB) firmware archives taking longer than expected to
552    # extract firmware on the lab host machines (b/149419503).
553    EXTRACT_TIMEOUT_SECS = 180
554
555    # The VBUS voltage threshold used to detect if VBUS is supplied
556    VBUS_THRESHOLD = 3000.0
557
558    def __init__(self, servo_host, servo_serial=None):
559        """Sets up the servo communication infrastructure.
560
561        @param servo_host: A ServoHost object representing
562                           the host running servod.
563        @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost
564        @param servo_serial: Serial number of the servo board.
565        """
566        # TODO(fdeng): crbug.com/298379
567        # We should move servo_host object out of servo object
568        # to minimize the dependencies on the rest of Autotest.
569        self._servo_host = servo_host
570        self._servo_serial = servo_serial
571        self._servo_type = self.get_servo_version()
572        self._power_state = _PowerStateController(self)
573        self._uart = _Uart(self)
574        self._programmer = None
575        self._prev_log_inode = None
576        self._prev_log_size = 0
577        self._ccd_watchdog_disabled = False
578
579    def __str__(self):
580        """Description of this object and address, for use in errors"""
581        return "<%s '%s:%s'>" % (
582                type(self).__name__,
583                self._servo_host.hostname,
584                self._servo_host.servo_port)
585
586    @property
587    def _server(self):
588        with _WrapServoErrors(
589                servo=self, description='get_servod_server_proxy()'):
590            return self._servo_host.get_servod_server_proxy()
591
592    @property
593    def servo_serial(self):
594        """Returns the serial number of the servo board."""
595        return self._servo_serial
596
597    def get_power_state_controller(self):
598        """Return the power state controller for this Servo.
599
600        The power state controller provides board-independent
601        interfaces for reset, power-on, power-off operations.
602
603        """
604        return self._power_state
605
606
607    def initialize_dut(self, cold_reset=False, enable_main=True):
608        """Initializes a dut for testing purposes.
609
610        This sets various servo signals back to default values
611        appropriate for the target board.  By default, if the DUT
612        is already on, it stays on.  If the DUT is powered off
613        before initialization, its state afterward is unspecified.
614
615        Rationale:  Basic initialization of servo sets the lid open,
616        when there is a lid.  This operation won't affect powered on
617        units; however, setting the lid open may power on a unit
618        that's off, depending on the board type and previous state
619        of the device.
620
621        If `cold_reset` is a true value, the DUT and its EC will be
622        reset, and the DUT rebooted in normal mode.
623
624        @param cold_reset If True, cold reset the device after
625                          initialization.
626        @param enable_main If True, make sure the main servo device has
627                           control of the dut.
628
629        """
630        if enable_main:
631            self.enable_main_servo_device()
632
633        with _WrapServoErrors(
634                servo=self, description='initialize_dut()->hwinit()'):
635            self._server.hwinit()
636        if self.has_control('usb_mux_oe1'):
637            self.set('usb_mux_oe1', 'on')
638            self.switch_usbkey('off')
639        else:
640            logging.warning('Servod command \'usb_mux_oe1\' is not available. '
641                            'Any USB drive related servo routines will fail.')
642        # Create a record of SBU voltages if this is running support servo (v4,
643        # v4p1).
644        # TODO(coconutruben): eventually, replace this with a metric to track
645        # SBU voltages wrt servo-hw/dut-hw
646        if self.has_control('servo_v4_sbu1_mv'):
647            # Attempt to take a reading of sbu1 and sbu2 multiple times to
648            # account for situations where the two lines exchange hi/lo roles
649            # frequently.
650            for i in range(10):
651                try:
652                    sbu1 = int(self.get('servo_v4_sbu1_mv'))
653                    sbu2 = int(self.get('servo_v4_sbu2_mv'))
654                    logging.info('attempt %d sbu1 %d sbu2 %d', i, sbu1, sbu2)
655                except error.TestFail as e:
656                    # This is a nice to have but if reading this fails, it
657                    # shouldn't interfere with the test.
658                    logging.exception(e)
659        self._uart.start_capture()
660        if cold_reset:
661            if not self._power_state.supported:
662                logging.info('Cold-reset for DUT requested, but servo '
663                             'setup does not support power_state. Skipping.')
664            else:
665                self._power_state.reset()
666        with _WrapServoErrors(
667                servo=self, description='initialize_dut()->get_version()'):
668            version = self._server.get_version()
669        logging.debug('Servo initialized, version is %s', version)
670
671
672    def is_localhost(self):
673        """Is the servod hosted locally?
674
675        Returns:
676          True if local hosted; otherwise, False.
677        """
678        return self._servo_host.is_localhost()
679
680
681    def get_os_version(self):
682        """Returns the chromeos release version."""
683        lsb_release_content = self.system_output('cat /etc/lsb-release',
684                                                 ignore_status=True)
685        return lsbrelease_utils.get_chromeos_release_builder_path(
686                    lsb_release_content=lsb_release_content)
687
688
689    def get_servod_version(self):
690        """Returns the servod version."""
691        # TODO: use system_output once servod --sversion prints to stdout
692        try:
693            result = self._servo_host.run('servod --sversion 2>&1')
694        except error.AutoservRunError as e:
695            if 'command execution error' in str(e):
696                # Fall back to version if sversion is not supported yet.
697                result = self._servo_host.run('servod --version 2>&1')
698                return result.stdout.strip() or result.stderr.strip()
699            # An actually unexpected error occurred, just raise.
700            raise e
701        sversion = result.stdout or result.stderr
702        # The sversion output contains 3 lines:
703        # servod v1.0.816-ff8e966 // the extended version with git hash
704        # 2020-04-08 01:10:29 // the time of the latest commit
705        # chromeos-ci-legacy-us-central1-b-x32-55-u8zc // builder information
706        # For debugging purposes, we mainly care about the version, and the
707        # timestamp.
708        return ' '.join(sversion.split()[1:4])
709
710    def power_long_press(self):
711        """Simulate a long power button press."""
712        # After a long power press, the EC may ignore the next power
713        # button press (at least on Alex).  To guarantee that this
714        # won't happen, we need to allow the EC one second to
715        # collect itself.
716        # long_press is defined as 8.5s in servod
717        self.set_nocheck('power_key', 'long_press')
718
719
720    def power_normal_press(self):
721        """Simulate a normal power button press."""
722        # press is defined as 1.2s in servod
723        self.set_nocheck('power_key', 'press')
724
725
726    def power_short_press(self):
727        """Simulate a short power button press."""
728        # tab is defined as 0.2s in servod
729        self.set_nocheck('power_key', 'tab')
730
731
732    def power_key(self, press_secs='tab'):
733        """Simulate a power button press.
734
735        @param press_secs: int, float, str; time to press key in seconds or
736                           known shorthand: 'tab' 'press' 'long_press'
737        """
738        self.set_nocheck('power_key', press_secs)
739
740
741    def pwr_button(self, action='press'):
742        """Simulate a power button press.
743
744        @param action: str; could be press or could be release.
745        """
746        self.set_nocheck('pwr_button', action)
747
748
749    def lid_open(self):
750        """Simulate opening the lid and raise exception if all attempts fail"""
751        self.set('lid_open', 'yes')
752
753
754    def lid_close(self):
755        """Simulate closing the lid and raise exception if all attempts fail
756
757        Waits 6 seconds to ensure the device is fully asleep before returning.
758        """
759        self.set('lid_open', 'no')
760        time.sleep(Servo.SLEEP_DELAY)
761
762
763    def vbus_power_get(self):
764        """Get current vbus_power."""
765        return self.get('vbus_power')
766
767
768    def volume_up(self, timeout=300):
769        """Simulate pushing the volume down button.
770
771        @param timeout: Timeout for setting the volume.
772        """
773        self.set_get_all(['volume_up:yes',
774                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
775                          'volume_up:no'])
776        # we need to wait for commands to take effect before moving on
777        time_left = float(timeout)
778        while time_left > 0.0:
779            value = self.get('volume_up')
780            if value == 'no':
781                return
782            time.sleep(self.SHORT_DELAY)
783            time_left = time_left - self.SHORT_DELAY
784        raise error.TestFail("Failed setting volume_up to no")
785
786    def volume_down(self, timeout=300):
787        """Simulate pushing the volume down button.
788
789        @param timeout: Timeout for setting the volume.
790        """
791        self.set_get_all(['volume_down:yes',
792                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
793                          'volume_down:no'])
794        # we need to wait for commands to take effect before moving on
795        time_left = float(timeout)
796        while time_left > 0.0:
797            value = self.get('volume_down')
798            if value == 'no':
799                return
800            time.sleep(self.SHORT_DELAY)
801            time_left = time_left - self.SHORT_DELAY
802        raise error.TestFail("Failed setting volume_down to no")
803
804    def arrow_up(self, press_secs='tab'):
805        """Simulate arrow up key presses.
806
807        @param press_secs: int, float, str; time to press key in seconds or
808                           known shorthand: 'tab' 'press' 'long_press'.
809        """
810        # TODO: Remove this check after a lab update to include CL:1913684
811        if not self.has_control('arrow_up'):
812            logging.warning('Control arrow_up ignored. '
813                            'Please update hdctools')
814            return
815        self.set_nocheck('arrow_up', press_secs)
816
817    def arrow_down(self, press_secs='tab'):
818        """Simulate arrow down key presses.
819
820        @param press_secs: int, float, str; time to press key in seconds or
821                           known shorthand: 'tab' 'press' 'long_press'.
822        """
823        # TODO: Remove this check after a lab update to include CL:1913684
824        if not self.has_control('arrow_down'):
825            logging.warning('Control arrow_down ignored. '
826                            'Please update hdctools')
827            return
828        self.set_nocheck('arrow_down', press_secs)
829
830    def ctrl_d(self, press_secs='tab'):
831        """Simulate Ctrl-d simultaneous button presses.
832
833        @param press_secs: int, float, str; time to press key in seconds or
834                           known shorthand: 'tab' 'press' 'long_press'
835        """
836        self.set_nocheck('ctrl_d', press_secs)
837
838
839    def ctrl_s(self, press_secs='tab'):
840        """Simulate Ctrl-s simultaneous button presses.
841
842        @param press_secs: int, float, str; time to press key in seconds or
843                           known shorthand: 'tab' 'press' 'long_press'
844        """
845        self.set_nocheck('ctrl_s', press_secs)
846
847
848    def ctrl_u(self, press_secs='tab'):
849        """Simulate Ctrl-u simultaneous button presses.
850
851        @param press_secs: int, float, str; time to press key in seconds or
852                           known shorthand: 'tab' 'press' 'long_press'
853        """
854        self.set_nocheck('ctrl_u', press_secs)
855
856
857    def ctrl_enter(self, press_secs='tab'):
858        """Simulate Ctrl-enter simultaneous button presses.
859
860        @param press_secs: int, float, str; time to press key in seconds or
861                           known shorthand: 'tab' 'press' 'long_press'
862        """
863        self.set_nocheck('ctrl_enter', press_secs)
864
865
866    def ctrl_key(self, press_secs='tab'):
867        """Simulate Enter key button press.
868
869        @param press_secs: int, float, str; time to press key in seconds or
870                           known shorthand: 'tab' 'press' 'long_press'
871        """
872        self.set_nocheck('ctrl_key', press_secs)
873
874
875    def enter_key(self, press_secs='tab'):
876        """Simulate Enter key button press.
877
878        @param press_secs: int, float, str; time to press key in seconds or
879                           known shorthand: 'tab' 'press' 'long_press'
880        """
881        self.set_nocheck('enter_key', press_secs)
882
883
884    def refresh_key(self, press_secs='tab'):
885        """Simulate Refresh key (F3) button press.
886
887        @param press_secs: int, float, str; time to press key in seconds or
888                           known shorthand: 'tab' 'press' 'long_press'
889        """
890        self.set_nocheck('refresh_key', press_secs)
891
892
893    def ctrl_refresh_key(self, press_secs='tab'):
894        """Simulate Ctrl and Refresh (F3) simultaneous press.
895
896        This key combination is an alternative of Space key.
897
898        @param press_secs: int, float, str; time to press key in seconds or
899                           known shorthand: 'tab' 'press' 'long_press'
900        """
901        self.set_nocheck('ctrl_refresh_key', press_secs)
902
903
904    def imaginary_key(self, press_secs='tab'):
905        """Simulate imaginary key button press.
906
907        Maps to a key that doesn't physically exist.
908
909        @param press_secs: int, float, str; time to press key in seconds or
910                           known shorthand: 'tab' 'press' 'long_press'
911        """
912        self.set_nocheck('imaginary_key', press_secs)
913
914
915    def sysrq_x(self, press_secs='tab'):
916        """Simulate Alt VolumeUp X simulataneous press.
917
918        This key combination is the kernel system request (sysrq) X.
919
920        @param press_secs: int, float, str; time to press key in seconds or
921                           known shorthand: 'tab' 'press' 'long_press'
922        """
923        self.set_nocheck('sysrq_x', press_secs)
924
925
926    def toggle_recovery_switch(self):
927        """Toggle recovery switch on and off."""
928        self.enable_recovery_mode()
929        time.sleep(self.REC_TOGGLE_DELAY)
930        self.disable_recovery_mode()
931
932
933    def enable_recovery_mode(self):
934        """Enable recovery mode on device."""
935        self.set('rec_mode', 'on')
936
937
938    def disable_recovery_mode(self):
939        """Disable recovery mode on device."""
940        self.set('rec_mode', 'off')
941
942
943    def toggle_development_switch(self):
944        """Toggle development switch on and off."""
945        self.enable_development_mode()
946        time.sleep(self.DEV_TOGGLE_DELAY)
947        self.disable_development_mode()
948
949
950    def enable_development_mode(self):
951        """Enable development mode on device."""
952        self.set('dev_mode', 'on')
953
954
955    def disable_development_mode(self):
956        """Disable development mode on device."""
957        self.set('dev_mode', 'off')
958
959    def boot_devmode(self):
960        """Boot a dev-mode device that is powered off."""
961        self.power_short_press()
962        self.pass_devmode()
963
964
965    def pass_devmode(self):
966        """Pass through boot screens in dev-mode."""
967        time.sleep(Servo.BOOT_DELAY)
968        self.ctrl_d()
969        time.sleep(Servo.BOOT_DELAY)
970
971
972    def get_board(self):
973        """Get the board connected to servod."""
974        with _WrapServoErrors(servo=self, description='get_board()'):
975            return self._server.get_board()
976
977
978    def get_base_board(self):
979        """Get the board of the base connected to servod."""
980        try:
981            with _WrapServoErrors(servo=self, description='get_base_board()'):
982                return self._server.get_base_board()
983        except six.moves.xmlrpc_client.Fault as e:
984            # TODO(waihong): Remove the following compatibility check when
985            # the new versions of hdctools are deployed.
986            if 'not supported' in str(e):
987                logging.warning('The servod is too old that get_base_board '
988                                'not supported.')
989                return ''
990            raise
991
992    def get_ec_board(self):
993        """Get the board name from EC."""
994        if self.has_control('active_v4_device'):
995            # If servo v4 is allowing dual_v4 devices, then choose the
996            # active device.
997            active_device = self.get('active_v4_device')
998            if active_device == self.get_main_servo_device():
999                active_device = ''
1000        else:
1001            active_device = ''
1002        return self.get('ec_board', prefix=active_device)
1003
1004    def get_ec_active_copy(self):
1005        """Get the active copy of the EC image."""
1006        return self.get('ec_active_copy')
1007
1008    def has_control(self, ctrl_name, prefix=''):
1009        """Query servod server to determine if |ctrl_name| is a valid control.
1010
1011        @param ctrl_name Name of the control.
1012        @param prefix: prefix to route control to correct servo device.
1013
1014        @returns: true if |ctrl_name| is a known control, false otherwise.
1015        """
1016        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1017        try:
1018            # If the control exists, doc() will work.
1019            with _WrapServoErrors(
1020                    servo=self,
1021                    description='has_control(%s)->doc()' % ctrl_name):
1022                self._server.doc(ctrl_name)
1023            return True
1024        except ControlUnavailableError:
1025            return False
1026
1027    def _build_ctrl_name(self, ctrl_name, prefix):
1028        """Helper to build the control name if a prefix is used.
1029
1030        @param ctrl_name Name of the control.
1031        @param prefix: prefix to route control to correct servo device.
1032
1033        @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty.
1034        """
1035        assert ctrl_name
1036        if prefix:
1037            return '%s.%s' % (prefix, ctrl_name)
1038        return ctrl_name
1039
1040    def get(self, ctrl_name, prefix=''):
1041        """Get the value of a gpio from Servod.
1042
1043        @param ctrl_name Name of the control.
1044        @param prefix: prefix to route control to correct servo device.
1045
1046        @returns: server response to |ctrl_name| request.
1047
1048        @raise ControlUnavailableError: if |ctrl_name| not a known control.
1049        @raise error.TestFail: for all other failures doing get().
1050        """
1051        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1052        with _WrapServoErrors(
1053                servo=self, description='Getting %s' % ctrl_name):
1054            return self._server.get(ctrl_name)
1055
1056    def set(self, ctrl_name, ctrl_value, prefix=''):
1057        """Set and check the value of a gpio using Servod.
1058
1059        @param ctrl_name: Name of the control.
1060        @param ctrl_value: New setting for the control.
1061        @param prefix: prefix to route control to correct servo device.
1062        @raise error.TestFail: if the control value fails to change.
1063        """
1064        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1065        self.set_nocheck(ctrl_name, ctrl_value)
1066        retry_count = Servo.GET_RETRY_MAX
1067        actual_value = self.get(ctrl_name)
1068        while ctrl_value != actual_value and retry_count:
1069            logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value,
1070                            retry_count)
1071            retry_count -= 1
1072            time.sleep(Servo.SHORT_DELAY)
1073            actual_value = self.get(ctrl_name)
1074
1075        if ctrl_value != actual_value:
1076            raise error.TestFail(
1077                    'Servo failed to set %s to %s. Got %s.'
1078                    % (ctrl_name, ctrl_value, actual_value))
1079
1080    def set_nocheck(self, ctrl_name, ctrl_value, prefix=''):
1081        """Set the value of a gpio using Servod.
1082
1083        @param ctrl_name Name of the control.
1084        @param ctrl_value New setting for the control.
1085        @param prefix: prefix to route control to correct servo device.
1086
1087        @raise ControlUnavailableError: if |ctrl_name| not a known control.
1088        @raise error.TestFail: for all other failures doing set().
1089        """
1090        ctrl_name = self._build_ctrl_name(ctrl_name, prefix)
1091        # The real danger here is to pass a None value through the xmlrpc.
1092        assert ctrl_value is not None
1093        description = 'Setting %s to %r' % (ctrl_name, ctrl_value)
1094        logging.debug('%s', description)
1095        with _WrapServoErrors(servo=self, description=description):
1096            self._server.set(ctrl_name, ctrl_value)
1097
1098    def set_get_all(self, controls):
1099        """Set &| get one or more control values.
1100
1101        @param controls: list of strings, controls to set &| get.
1102
1103        @raise: error.TestError in case error occurs setting/getting values.
1104        """
1105        description = 'Set/get all: %s' % str(controls)
1106        logging.debug('%s', description)
1107        with _WrapServoErrors(servo=self, description=description):
1108            return self._server.set_get_all(controls)
1109
1110
1111    def probe_host_usb_dev(self):
1112        """Probe the USB disk device plugged-in the servo from the host side.
1113
1114        It uses servod to discover if there is a usb device attached to it.
1115
1116        @return: String of USB disk path (e.g. '/dev/sdb') or None.
1117        """
1118        # Set up Servo's usb mux.
1119        return self.get('image_usbkey_dev') or None
1120
1121
1122    def image_to_servo_usb(self, image_path=None,
1123                           make_image_noninteractive=False,
1124                           power_off_dut=True):
1125        """Install an image to the USB key plugged into the servo.
1126
1127        This method may copy any image to the servo USB key including a
1128        recovery image or a test image.  These images are frequently used
1129        for test purposes such as restoring a corrupted image or conducting
1130        an upgrade of ec/fw/kernel as part of a test of a specific image part.
1131
1132        @param image_path: Path on the host to the recovery image.
1133        @param make_image_noninteractive: Make the recovery image
1134                                   noninteractive, therefore the DUT
1135                                   will reboot automatically after
1136                                   installation.
1137        @param power_off_dut: To put the DUT in power off mode.
1138        """
1139        # We're about to start plugging/unplugging the USB key.  We
1140        # don't know the state of the DUT, or what it might choose
1141        # to do to the device after hotplug.  To avoid surprises,
1142        # force the DUT to be off.
1143        if power_off_dut:
1144            self._power_state.power_off()
1145
1146        if image_path:
1147            logging.info('Searching for usb device and copying image to it. '
1148                         'Please wait a few minutes...')
1149            # The servod control automatically sets up the host in the host
1150            # direction.
1151            try:
1152                self.set_nocheck('download_image_to_usb_dev', image_path)
1153            except error.TestFail as e:
1154                logging.error('Failed to transfer requested image to USB. %s.'
1155                              'Please take a look at Servo Logs.', str(e))
1156                raise error.AutotestError('Download image to usb failed.')
1157            if make_image_noninteractive:
1158                logging.info('Making image noninteractive')
1159                try:
1160                    dev = self.probe_host_usb_dev()
1161                    if not dev:
1162                        # This is fine but also should never happen: if we
1163                        # successfully download an image but somehow cannot
1164                        # find the stick again, it needs to be investigated.
1165                        raise error.TestFail('No image usb key detected '
1166                                             'after successful download. '
1167                                             'Please investigate.')
1168                    # The modification has to happen on partition 1.
1169                    dev_partition = '%s1' % dev
1170                    self.set_nocheck('make_usb_dev_image_noninteractive',
1171                                     dev_partition)
1172                except error.TestFail as e:
1173                    logging.error('Failed to make image noninteractive. %s.'
1174                                  'Please take a look at Servo Logs.',
1175                                  str(e))
1176
1177    def boot_in_recovery_mode(self, snk_mode=False):
1178        """Boot host DUT in recovery mode.
1179
1180        @param snk_mode: If True, switch servo_v4 role to 'snk' mode before
1181                         boot DUT into recovery mode.
1182        """
1183        # This call has a built-in delay to ensure that we wait a timeout
1184        # for the stick to enumerate and settle on the DUT side.
1185        self.switch_usbkey('dut')
1186        # Switch servo_v4 mode to snk as the DUT won't able to see usb drive
1187        # in recovery mode if the servo is in src mode(see crbug.com/1129165).
1188        if snk_mode:
1189            logging.info('Setting servo_v4 role to snk mode in order to make'
1190                         ' the DUT can see usb drive while in recovery mode.')
1191            self.set_servo_v4_role('snk')
1192
1193        try:
1194            self._power_state.power_on(rec_mode=self._power_state.REC_ON)
1195        except error.TestFail as e:
1196            self.set_servo_v4_role('src')
1197            logging.error('Failed to boot DUT in recovery mode. %s.', str(e))
1198            raise error.AutotestError('Failed to boot DUT in recovery mode.')
1199
1200    def install_recovery_image(self,
1201                               image_path=None,
1202                               make_image_noninteractive=False,
1203                               snk_mode=False):
1204        """Install the recovery image specified by the path onto the DUT.
1205
1206        This method uses google recovery mode to install a recovery image
1207        onto a DUT through the use of a USB stick that is mounted on a servo
1208        board specified by the usb_dev.  If no image path is specified
1209        we use the recovery image already on the usb image.
1210
1211        This method will switch servo_v4 role to 'snk' mode in order to make
1212        the DUT can see the usb drive plugged on servo, the caller should
1213        set servo_v4 role back to 'src' mode one the DUT exit recovery mode.
1214
1215        @param image_path: Path on the host to the recovery image.
1216        @param make_image_noninteractive: Make the recovery image
1217                noninteractive, therefore the DUT will reboot automatically
1218                after installation.
1219        @param snk_mode: If True, switch servo_v4 role to 'snk' mode before
1220                         boot DUT into recovery mode.
1221        """
1222        self.image_to_servo_usb(image_path, make_image_noninteractive)
1223        # Give the DUT some time to power_off if we skip
1224        # download image to usb. (crbug.com/982993)
1225        if not image_path:
1226            time.sleep(10)
1227        self.boot_in_recovery_mode(snk_mode=snk_mode)
1228
1229
1230    def _scp_image(self, image_path):
1231        """Copy image to the servo host.
1232
1233        When programming a firmware image on the DUT, the image must be
1234        located on the host to which the servo device is connected. Sometimes
1235        servo is controlled by a remote host, in this case the image needs to
1236        be transferred to the remote host. This adds the servod port number, to
1237        make sure tests for different DUTs don't trample on each other's files.
1238        Along with the firmware image, any subsidiary files in the same
1239        directory shall be copied to the host as well.
1240
1241        @param image_path: a string, name of the firmware image file to be
1242               transferred.
1243        @return: a string, full path name of the copied file on the remote.
1244        """
1245        src_path = os.path.dirname(image_path)
1246        dest_path = os.path.join('/tmp', 'dut_%d' % self._servo_host.servo_port)
1247        logging.info('Copying %s to %s', src_path, dest_path)
1248        # Copy a directory, src_path to dest_path. send_file() will create a
1249        # directory named basename(src_path) under dest_path, and copy all files
1250        # in src_path to the destination.
1251        self._servo_host.send_file(src_path, dest_path, delete_dest=True)
1252
1253        # Make a image path of the destination.
1254        # e.g. /tmp/dut_9999/EC/ec.bin
1255        rv = os.path.join(dest_path, os.path.basename(src_path))
1256        return os.path.join(rv, os.path.basename(image_path))
1257
1258
1259    def system(self, command, timeout=3600):
1260        """Execute the passed in command on the servod host.
1261
1262        @param command Command to be executed.
1263        @param timeout Maximum number of seconds of runtime allowed. Default to
1264                       1 hour.
1265        """
1266        logging.info('Will execute on servo host: %s', command)
1267        self._servo_host.run(command, timeout=timeout)
1268
1269
1270    def system_output(self, command, timeout=3600,
1271                      ignore_status=False, args=()):
1272        """Execute the passed in command on the servod host, return stdout.
1273
1274        @param command a string, the command to execute
1275        @param timeout an int, max number of seconds to wait til command
1276               execution completes. Default to 1 hour.
1277        @param ignore_status a Boolean, if true - ignore command's nonzero exit
1278               status, otherwise an exception will be thrown
1279        @param args a tuple of strings, each becoming a separate command line
1280               parameter for the command
1281        @return command's stdout as a string.
1282        """
1283        return self._servo_host.run(command, timeout=timeout,
1284                                    ignore_status=ignore_status,
1285                                    args=args).stdout.strip()
1286
1287
1288    def get_servo_version(self, active=False):
1289        """Get the version of the servo, e.g., servo_v2 or servo_v3.
1290
1291        @param active: Only return the servo type with the active device.
1292        @return: The version of the servo.
1293
1294        """
1295        with _WrapServoErrors(
1296                servo=self, description='get_servo_version()->get_version()'):
1297            servo_type = self._server.get_version()
1298        if '_and_' not in servo_type or not active:
1299            return servo_type
1300
1301        # If servo v4 is using ccd and servo micro, modify the servo type to
1302        # reflect the active device.
1303        active_device = self.get('active_v4_device')
1304        if active_device in servo_type:
1305            logging.info('%s is active', active_device)
1306            return 'servo_v4_with_' + active_device
1307
1308        logging.warn("%s is active even though it's not in servo type",
1309                     active_device)
1310        return servo_type
1311
1312
1313    def get_servo_type(self):
1314        return self._servo_type
1315
1316
1317    def get_main_servo_device(self):
1318        """Return the main servo device"""
1319        return self._servo_type.split('_with_')[-1].split('_and_')[0]
1320
1321
1322    def enable_main_servo_device(self):
1323        """Make sure the main device has control of the dut."""
1324        if not self.has_control('active_v4_device'):
1325            return
1326        self.set('active_v4_device', self.get_main_servo_device())
1327
1328
1329    def main_device_is_ccd(self):
1330        """Whether the main servo device (no prefixes) is a ccd device."""
1331        with _WrapServoErrors(
1332                servo=self, description='main_device_is_ccd()->get_version()'):
1333            servo = self._server.get_version()
1334        return 'ccd_cr50' in servo and 'servo_micro' not in servo
1335
1336
1337    def main_device_is_flex(self):
1338        """Whether the main servo device (no prefixes) is a legacy device."""
1339        return not self.main_device_is_ccd()
1340
1341
1342    def main_device_is_active(self):
1343        """Return whether the main device is the active device.
1344
1345        This is only relevant for a dual setup with ccd and legacy on the same
1346        DUT. The main device is the servo that has no prefix on its controls.
1347        This helper answers the question whether that device is also the
1348        active device or not.
1349        """
1350        # TODO(coconutruben): The current implementation of the dual setup only
1351        # ever has legacy as the main device. Therefore, it suffices to ask
1352        # whether the active device is ccd.
1353        if not self.dts_mode_is_valid():
1354            # Use dts support as a proxy to whether the servo setup could
1355            # support a dual role. Only those setups now support legacy and ccd.
1356            return True
1357        active_device = self.get('active_v4_device')
1358        return 'ccd_cr50' not in active_device
1359
1360    def _initialize_programmer(self, rw_only=False):
1361        """Initialize the firmware programmer.
1362
1363        @param rw_only: True to initialize a programmer which only
1364                        programs the RW portions.
1365        """
1366        if self._programmer:
1367            return
1368        # Initialize firmware programmer
1369        if self._servo_type.startswith('servo_v2'):
1370            self._programmer = firmware_programmer.ProgrammerV2(self)
1371            self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
1372        # Both servo v3 and v4 use the same programming methods so just leverage
1373        # ProgrammerV3 for servo v4 as well.
1374        elif (self._servo_type.startswith('servo_v3') or
1375              self._servo_type.startswith('servo_v4')):
1376            self._programmer = firmware_programmer.ProgrammerV3(self)
1377            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
1378        else:
1379            raise error.TestError(
1380                    'No firmware programmer for servo version: %s' %
1381                    self._servo_type)
1382
1383
1384    def program_bios(self, image, rw_only=False, copy_image=True):
1385        """Program bios on DUT with given image.
1386
1387        @param image: a string, file name of the BIOS image to program
1388                      on the DUT.
1389        @param rw_only: True to only program the RW portion of BIOS.
1390        @param copy_image: True indicates we need scp the image to servohost
1391                           while False means the image file is already on
1392                           servohost.
1393
1394        """
1395        self._initialize_programmer()
1396        # We don't need scp if test runs locally.
1397        if copy_image and not self.is_localhost():
1398            image = self._scp_image(image)
1399        if rw_only:
1400            self._programmer_rw.program_bios(image)
1401        else:
1402            self._programmer.program_bios(image)
1403
1404
1405    def program_ec(self, image, rw_only=False, copy_image=True):
1406        """Program ec on DUT with given image.
1407
1408        @param image: a string, file name of the EC image to program
1409                      on the DUT.
1410        @param rw_only: True to only program the RW portion of EC.
1411        @param copy_image: True indicates we need scp the image to servohost
1412                           while False means the image file is already on
1413                           servohost.
1414        """
1415        self._initialize_programmer()
1416        # We don't need scp if test runs locally.
1417        if copy_image and not self.is_localhost():
1418            image = self._scp_image(image)
1419        if rw_only:
1420            self._programmer_rw.program_ec(image)
1421        else:
1422            self._programmer.program_ec(image)
1423
1424
1425    def extract_ec_image(self, board, model, tarball_path):
1426        """Helper function to extract EC image from downloaded tarball.
1427
1428        @param board: The DUT board name.
1429        @param model: The DUT model name.
1430        @param tarball_path: The path of the downloaded build tarball.
1431
1432        @return: Path to extracted EC image.
1433        """
1434
1435        # Ignore extracting EC image and re-programming if not a Chrome EC
1436        chrome_ec = FAFTConfig(board).chrome_ec
1437        if not chrome_ec:
1438            logging.warn('Not a Chrome EC, ignore re-programming it')
1439            return None
1440
1441        # Try to retrieve firmware build target from the version reported
1442        # by the EC. If this doesn't work, we assume the firmware build
1443        # target is the same as the model name.
1444        try:
1445            fw_target = self.get_ec_board()
1446        except Exception as err:
1447            logging.warn('Failed to get ec_board value; ignoring')
1448            fw_target = model
1449            pass
1450
1451        # Array of candidates for EC image
1452        ec_image_candidates = [
1453                'ec.bin',
1454                '%s/ec.bin' % fw_target,
1455                '%s/ec.bin' % board
1456        ]
1457
1458        # Extract EC image from tarball
1459        dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC')
1460        ec_image = _extract_image_from_tarball(tarball_path,
1461                                               dest_dir,
1462                                               ec_image_candidates,
1463                                               self.EXTRACT_TIMEOUT_SECS)
1464
1465        # Check if EC image was found and return path or raise error
1466        if ec_image:
1467            # Extract subsidiary binaries for EC
1468            # Find a monitor binary for NPCX_UUT chip type, if any.
1469            mon_candidates = [candidate.replace('ec.bin', 'npcx_monitor.bin')
1470                              for candidate in ec_image_candidates]
1471            _extract_image_from_tarball(tarball_path, dest_dir, mon_candidates,
1472                                        self.EXTRACT_TIMEOUT_SECS)
1473
1474            return os.path.join(dest_dir, ec_image)
1475        else:
1476            raise error.TestError('Failed to extract EC image from %s' %
1477                                  tarball_path)
1478
1479
1480    def extract_bios_image(self, board, model, tarball_path):
1481        """Helper function to extract BIOS image from downloaded tarball.
1482
1483        @param board: The DUT board name.
1484        @param model: The DUT model name.
1485        @param tarball_path: The path of the downloaded build tarball.
1486
1487        @return: Path to extracted BIOS image.
1488        """
1489
1490        # Try to retrieve firmware build target from the version reported
1491        # by the EC. If this doesn't work, we assume the firmware build
1492        # target is the same as the model name.
1493        try:
1494            fw_target = self.get_ec_board()
1495        except Exception as err:
1496            logging.warn('Failed to get ec_board value; ignoring')
1497            fw_target = model
1498            pass
1499
1500        # Array of candidates for BIOS image
1501        bios_image_candidates = [
1502                'image.bin',
1503                'image-%s.bin' % fw_target,
1504                'image-%s.bin' % board
1505        ]
1506
1507        # Extract BIOS image from tarball
1508        dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS')
1509        bios_image = _extract_image_from_tarball(tarball_path,
1510                                                 dest_dir,
1511                                                 bios_image_candidates,
1512                                                 self.EXTRACT_TIMEOUT_SECS)
1513
1514        # Check if BIOS image was found and return path or raise error
1515        if bios_image:
1516            return os.path.join(dest_dir, bios_image)
1517        else:
1518            raise error.TestError('Failed to extract BIOS image from %s' %
1519                                  tarball_path)
1520
1521
1522    def switch_usbkey(self, usb_state):
1523        """Connect USB flash stick to either host or DUT, or turn USB port off.
1524
1525        This function switches the servo multiplexer to provide electrical
1526        connection between the USB port J3 and either host or DUT side. It
1527        can also be used to turn the USB port off.
1528
1529        @param usb_state: A string, one of 'dut', 'host', or 'off'.
1530                          'dut' and 'host' indicate which side the
1531                          USB flash device is required to be connected to.
1532                          'off' indicates turning the USB port off.
1533
1534        @raise: error.TestError in case the parameter is not 'dut'
1535                'host', or 'off'.
1536        """
1537        if self.get_usbkey_state() == usb_state:
1538            return
1539
1540        if usb_state == 'off':
1541            self.set_nocheck('image_usbkey_pwr', 'off')
1542            return
1543        elif usb_state == 'host':
1544            mux_direction = 'servo_sees_usbkey'
1545        elif usb_state == 'dut':
1546            mux_direction = 'dut_sees_usbkey'
1547        else:
1548            raise error.TestError('Unknown USB state request: %s' % usb_state)
1549        # On the servod side, this control will ensure that
1550        # - the port is power cycled if it is changing directions
1551        # - the port ends up in a powered state after this call
1552        # - if facing the host side, the call only returns once a usb block
1553        #   device is detected, or after a generous timeout (10s)
1554        self.set('image_usbkey_direction', mux_direction)
1555        # As servod makes no guarantees when switching to the dut side,
1556        # add a detection delay here when facing the dut.
1557        if mux_direction == 'dut_sees_usbkey':
1558            time.sleep(self.USB_DETECTION_DELAY)
1559
1560    def get_usbkey_state(self):
1561        """Get which side USB is connected to or 'off' if usb power is off.
1562
1563        @return: A string, one of 'dut', 'host', or 'off'.
1564        """
1565        pwr = self.get('image_usbkey_pwr')
1566        if pwr == 'off':
1567            return pwr
1568        direction = self.get('image_usbkey_direction')
1569        if direction == 'servo_sees_usbkey':
1570            return 'host'
1571        if direction == 'dut_sees_usbkey':
1572            return 'dut'
1573        raise error.TestFail('image_usbkey_direction set an unknown mux '
1574                             'direction: %s' % direction)
1575
1576    def set_servo_v4_role(self, role):
1577        """Set the power role of servo v4, either 'src' or 'snk'.
1578
1579        It does nothing if not a servo v4.
1580
1581        @param role: Power role for DUT port on servo v4, either 'src' or 'snk'.
1582        """
1583        if not self._servo_type.startswith('servo_v4'):
1584            logging.debug('Not a servo v4, unable to set role to %s.', role)
1585            return
1586
1587        if not self.has_control('servo_v4_role'):
1588            logging.debug(
1589                    'Servo does not has servo_v4_role control, unable'
1590                    ' to set role to %s.', role)
1591            return
1592
1593        value = self.get('servo_v4_role')
1594        if value != role:
1595            self.set_nocheck('servo_v4_role', role)
1596        else:
1597            logging.debug('Already in the role: %s.', role)
1598
1599    def get_servo_v4_role(self):
1600        """Get the power role of servo v4, either 'src' or 'snk'.
1601
1602        It returns None if not a servo v4.
1603        """
1604        if not self._servo_type.startswith('servo_v4'):
1605            logging.debug('Not a servo v4, unable to get role')
1606            return None
1607
1608        if not self.has_control('servo_v4_role'):
1609            logging.debug(
1610                    'Servo does not has servo_v4_role control, unable'
1611                    ' to get the role.')
1612            return None
1613
1614        return self.get('servo_v4_role')
1615
1616    def set_servo_v4_pd_comm(self, en):
1617        """Set the PD communication of servo v4, either 'on' or 'off'.
1618
1619        It does nothing if not a servo v4.
1620
1621        @param en: a string of 'on' or 'off' for PD communication.
1622        """
1623        if self._servo_type.startswith('servo_v4'):
1624            self.set_nocheck('servo_v4_pd_comm', en)
1625        else:
1626            logging.debug('Not a servo v4, unable to set PD comm to %s.', en)
1627
1628    def supports_built_in_pd_control(self):
1629        """Return whether the servo type supports pd charging and control."""
1630        if 'servo_v4' not in self._servo_type:
1631            # Only servo v4 supports this feature.
1632            logging.info('%r type does not support pd control.',
1633                         self._servo_type)
1634            return False
1635        # On servo v4, it still needs to be the type-c version.
1636        if not self.get('servo_v4_type') == 'type-c':
1637            logging.info('PD controls require a type-c servo v4.')
1638            return False
1639        # Lastly, one cannot really do anything without a plugged in charger.
1640        chg_port_mv = self.get('ppchg5_mv')
1641        if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV:
1642            logging.warn('It appears that no charger is plugged into servo v4. '
1643                         'Charger port voltage: %dmV', chg_port_mv)
1644            return False
1645        logging.info('Charger port voltage: %dmV', chg_port_mv)
1646        return True
1647
1648    def dts_mode_is_valid(self):
1649        """Return whether servo setup supports dts mode control for cr50."""
1650        if 'servo_v4' not in self._servo_type:
1651            # Only servo v4 supports this feature.
1652            logging.debug('%r type does not support dts mode control.',
1653                          self._servo_type)
1654            return False
1655        # On servo v4, it still needs ot be the type-c version.
1656        if not 'type-c' == self.get('servo_v4_type'):
1657            logging.info('DTS controls require a type-c servo v4.')
1658            return False
1659        return True
1660
1661    def dts_mode_is_safe(self):
1662        """Return whether servo setup supports dts mode without losing access.
1663
1664        DTS mode control exists but the main device might go through ccd.
1665        In that case, it's only safe to control dts mode if the main device
1666        is legacy as otherwise the connection to the main device cuts out.
1667        """
1668        return self.dts_mode_is_valid() and self.main_device_is_flex()
1669
1670    def get_dts_mode(self):
1671        """Return servo dts mode.
1672
1673        @returns: on/off whether dts is on or off
1674        """
1675        if not self.dts_mode_is_valid():
1676            logging.info('Not a valid servo setup. Unable to get dts mode.')
1677            return
1678        return self.get('servo_v4_dts_mode')
1679
1680    def ccd_watchdog_enable(self, enable):
1681        """Control the ccd watchdog."""
1682        if 'ccd' not in self._servo_type:
1683            return
1684        if self._ccd_watchdog_disabled and enable:
1685            logging.info('CCD watchdog disabled for test')
1686            return
1687        control = 'watchdog_add' if enable else 'watchdog_remove'
1688        self.set_nocheck(control, 'ccd')
1689
1690    def disable_ccd_watchdog_for_test(self):
1691        """Prevent servo from enabling the watchdog."""
1692        self._ccd_watchdog_disabled = True
1693        self.ccd_watchdog_enable(False)
1694
1695    def allow_ccd_watchdog_for_test(self):
1696        """Allow servo to enable the ccd watchdog."""
1697        self._ccd_watchdog_disabled = False
1698        self.ccd_watchdog_enable(True)
1699
1700    def set_dts_mode(self, state):
1701        """Set servo dts mode to off or on.
1702
1703        It does nothing if not a servo v4. Disable the ccd watchdog if we're
1704        disabling dts mode. CCD will disconnect. The watchdog only allows CCD
1705        to disconnect for 10 seconds until it kills servod. Disable the
1706        watchdog, so CCD can stay disconnected indefinitely.
1707
1708        @param state: Set servo v4 dts mode 'off' or 'on'.
1709        """
1710        if not self.dts_mode_is_valid():
1711            logging.info('Not a valid servo setup. Unable to set dts mode %s.',
1712                         state)
1713            return
1714
1715        enable_watchdog = state == 'on'
1716
1717        if not enable_watchdog:
1718            self.ccd_watchdog_enable(False)
1719
1720        self.set_nocheck('servo_v4_dts_mode', state)
1721
1722        if enable_watchdog:
1723            self.ccd_watchdog_enable(True)
1724
1725
1726    def _get_servo_type_fw_version(self, servo_type, prefix=''):
1727        """Helper to handle fw retrieval for micro/v4 vs ccd.
1728
1729        @param servo_type: one of 'servo_v4', 'servo_micro', 'ccd_cr50'
1730        @param prefix: whether the control has a prefix
1731
1732        @returns: fw version for non-ccd devices, cr50 version for ccd device
1733        """
1734        if servo_type == 'ccd_cr50':
1735            # ccd_cr50 runs on cr50, so need to query the cr50 fw.
1736            servo_type = 'cr50'
1737        cmd = '%s_version' % servo_type
1738        try:
1739            return self.get(cmd, prefix=prefix)
1740        except error.TestFail:
1741            # Do not fail here, simply report the version as unknown.
1742            logging.warn('Unable to query %r to get servo fw version.', cmd)
1743            return 'unknown'
1744
1745
1746    def get_servo_fw_versions(self):
1747        """Retrieve a summary of attached servos and their firmware.
1748
1749        Note: that only the Google firmware owned servos supports this e.g.
1750        micro, v4, etc. For anything else, the dictionary will have no entry.
1751        If no device is has Google owned firmware (e.g. v3) then the result
1752        is an empty dictionary.
1753
1754        @returns: dict, a collection of each attached servo & their firmware.
1755        """
1756        def get_fw_version_tag(tag, dev):
1757            return '%s_version.%s' % (dev, tag)
1758
1759        fw_versions = {}
1760        if 'servo_v4' not in self._servo_type:
1761            return {}
1762        v4_tag = get_fw_version_tag('support', 'servo_v4')
1763        fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_v4')
1764        if 'with' in self._servo_type:
1765            dut_devs = self._servo_type.split('_with_')[1].split('_and_')
1766            main_tag = get_fw_version_tag('main', dut_devs[0])
1767            fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0])
1768            if len(dut_devs) == 2:
1769                # Right now, the only way for this to happen is for a dual setup
1770                # to exist where ccd is attached on top of servo micro. Thus, we
1771                # know that the prefix is ccd_cr50 and the type is ccd_cr50.
1772                # TODO(coconutruben): If the new servod is not deployed by
1773                # the time that there are more cases of '_and_' devices,
1774                # this needs to be reworked.
1775                dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1])
1776                fw = self._get_servo_type_fw_version(dut_devs[1], 'ccd_cr50')
1777                fw_versions[dual_tag] = fw
1778        return fw_versions
1779
1780    @property
1781    def uart_logs_dir(self):
1782        """Return the directory to save UART logs."""
1783        return self._uart.logs_dir if self._uart else ""
1784
1785
1786    @uart_logs_dir.setter
1787    def uart_logs_dir(self, logs_dir):
1788        """Set directory to save UART logs.
1789
1790        @param logs_dir  String of directory name."""
1791        self._uart.logs_dir = logs_dir
1792
1793    def get_uart_logfile(self, uart):
1794        """Return the path to the uart log file."""
1795        return self._uart.get_logfile(uart)
1796
1797    def record_uart_capture(self, outdir=None):
1798        """Save uart stream output."""
1799        if outdir and not self.uart_logs_dir:
1800            self.uart_logs_dir = outdir
1801        self._uart.dump()
1802
1803    def close(self, outdir=None):
1804        """Close the servo object."""
1805        # We want to ensure that servo_v4 is in src mode to avoid DUTs
1806        # left in discharge state after a task.
1807        try:
1808            self.set_servo_v4_role('src')
1809        except Exception as e:
1810            logging.info(
1811                    'Unexpected error while setting servo_v4 role'
1812                    ' to src; %s', e)
1813
1814        self._uart.stop_capture()
1815        self.record_uart_capture(outdir)
1816
1817    def ec_reboot(self):
1818        """Reboot Just the embedded controller."""
1819        self.set_nocheck('ec_uart_flush', 'off')
1820        self.set_nocheck('ec_uart_cmd', 'reboot')
1821        self.set_nocheck('ec_uart_flush', 'on')
1822
1823    def get_vbus_voltage(self):
1824        """Get the voltage of VBUS'.
1825
1826        @returns The voltage of VBUS, if vbus_voltage is supported.
1827                 None               , if vbus_voltage is not supported.
1828        """
1829        if not self.has_control('vbus_voltage'):
1830            logging.debug('Servo does not have vbus_voltage control,'
1831                          'unable to get vbus voltage')
1832            return None
1833
1834        return self.get('vbus_voltage')
1835