• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Expects to be run in an environment with sudo and no interactive password
6# prompt, such as within the Chromium OS development chroot.
7
8import ast
9import logging
10import os
11import re
12import socket
13import time
14import xmlrpclib
15
16from autotest_lib.client.common_lib import error
17from autotest_lib.client.common_lib import lsbrelease_utils
18from autotest_lib.server import utils as server_utils
19from autotest_lib.server.cros.servo import firmware_programmer
20from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
21
22# Time to wait when probing for a usb device, it takes on avg 17 seconds
23# to do a full probe.
24_USB_PROBE_TIMEOUT = 40
25
26
27# Regex to match XMLRPC errors due to a servod control not existing.
28NO_CONTROL_RE = re.compile(r'No control named (\w*\.?\w*)')
29
30
31# The minimum voltage on the charger port on servo v4 that is expected. This is
32# to query whether a charger is plugged into servo v4 and thus pd control
33# capabilities can be used.
34V4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400
35
36class ControlUnavailableError(error.TestFail):
37    """Custom error class to indicate a control is unavailable on servod."""
38    pass
39
40
41def _extract_image_from_tarball(tarball, dest_dir, image_candidates):
42    """Try extracting the image_candidates from the tarball.
43
44    @param tarball: The path of the tarball.
45    @param dest_path: The path of the destination.
46    @param image_candidates: A tuple of the paths of image candidates.
47
48    @return: The first path from the image candidates, which succeeds, or None
49             if all the image candidates fail.
50    """
51
52    # Create the firmware_name subdirectory if it doesn't exist
53    if not os.path.exists(dest_dir):
54        os.mkdir(dest_dir)
55
56    # Generate a list of all tarball files
57    tarball_files = server_utils.system_output(
58        ('tar tf %s' % tarball), timeout=120, ignore_status=True).splitlines()
59
60    # Check if image candidates are in the list of tarball files
61    for image in image_candidates:
62        if image in tarball_files:
63            # Extract and return the first image candidate found
64            status = server_utils.system(
65                ('tar xf %s -C %s %s' % (tarball, dest_dir, image)),
66                timeout=120, ignore_status=True)
67            if status == 0:
68                return image
69    return None
70
71
72class _PowerStateController(object):
73
74    """Class to provide board-specific power operations.
75
76    This class is responsible for "power on" and "power off"
77    operations that can operate without making assumptions in
78    advance about board state.  It offers an interface that
79    abstracts out the different sequences required for different
80    board types.
81
82    """
83    # Constants acceptable to be passed for the `rec_mode` parameter
84    # to power_on().
85    #
86    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
87    #   SD card.
88    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
89
90    REC_ON = 'rec'
91    REC_OFF = 'on'
92    REC_ON_FORCE_MRC = 'rec_force_mrc'
93
94    # Delay in seconds needed between asserting and de-asserting
95    # warm reset.
96    _RESET_HOLD_TIME = 0.5
97
98
99    def __init__(self, servo):
100        """Initialize the power state control.
101
102        @param servo Servo object providing the underlying `set` and `get`
103                     methods for the target controls.
104
105        """
106        self._servo = servo
107        self.supported = self._servo.has_control('power_state')
108        if not self.supported:
109            logging.info('Servo setup does not support power-state operations. '
110                         'All power-state calls will lead to error.TestFail')
111
112    def _check_supported(self):
113        """Throw an error if dts mode control is not supported."""
114        if not self.supported:
115            raise error.TestFail('power_state controls not supported')
116
117    def reset(self):
118        """Force the DUT to reset.
119
120        The DUT is guaranteed to be on at the end of this call,
121        regardless of its previous state, provided that there is
122        working OS software. This also guarantees that the EC has
123        been restarted.
124
125        """
126        self._check_supported()
127        self._servo.set_nocheck('power_state', 'reset')
128
129    def warm_reset(self):
130        """Apply warm reset to the DUT.
131
132        This asserts, then de-asserts the 'warm_reset' signal.
133        Generally, this causes the board to restart.
134
135        """
136        # TODO: warm_reset support has added to power_state.py. Once it
137        # available to labstation remove fallback method.
138        self._check_supported()
139        try:
140            self._servo.set_nocheck('power_state', 'warm_reset')
141        except error.TestFail as err:
142            logging.info("Fallback to warm_reset control method")
143            self._servo.set_get_all(['warm_reset:on',
144                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
145                                 'warm_reset:off'])
146    def power_off(self):
147        """Force the DUT to power off.
148
149        The DUT is guaranteed to be off at the end of this call,
150        regardless of its previous state, provided that there is
151        working EC and boot firmware.  There is no requirement for
152        working OS software.
153
154        """
155        self._check_supported()
156        self._servo.set_nocheck('power_state', 'off')
157
158    def power_on(self, rec_mode=REC_OFF):
159        """Force the DUT to power on.
160
161        Prior to calling this function, the DUT must be powered off,
162        e.g. with a call to `power_off()`.
163
164        At power on, recovery mode is set as specified by the
165        corresponding argument.  When booting with recovery mode on, it
166        is the caller's responsibility to unplug/plug in a bootable
167        external storage device.
168
169        If the DUT requires a delay after powering on but before
170        processing inputs such as USB stick insertion, the delay is
171        handled by this method; the caller is not responsible for such
172        delays.
173
174        @param rec_mode Setting of recovery mode to be applied at
175                        power on. default: REC_OFF aka 'off'
176
177        """
178        self._check_supported()
179        self._servo.set_nocheck('power_state', rec_mode)
180
181
182class _Uart(object):
183    """Class to capture UART streams of CPU, EC, Cr50, etc."""
184    _UartToCapture = ('cpu', 'ec', 'cr50', 'servo_v4', 'servo_micro', 'usbpd')
185
186    def __init__(self, servo):
187        self._servo = servo
188        self._streams = []
189        self.logs_dir = None
190
191    def _start_stop_capture(self, uart, start):
192        """Helper function to start/stop capturing on specified UART.
193
194        @param uart:  The UART name to start/stop capturing.
195        @param start:  True to start capturing, otherwise stop.
196
197        @returns True if the operation completes successfully.
198                 False if the UART capturing is not supported or failed due to
199                 an error.
200        """
201        logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop',
202                      uart)
203        uart_cmd = '%s_uart_capture' % uart
204        target_level = 'on' if start else 'off'
205        level = None
206        if self._servo.has_control(uart_cmd):
207            # Do our own implementation of set() here as not_applicable
208            # should also count as a valid control.
209            logging.debug('Trying to set %s to %s.', uart_cmd, target_level)
210            try:
211                self._servo.set_nocheck(uart_cmd, target_level)
212                level = self._servo.get(uart_cmd)
213            except error.TestFail as e:
214                # Any sort of test failure here should not stop the test. This
215                # is just to capture more output. Log and move on.
216                logging.warning('Failed to set %s to %s. %s. Ignoring.',
217                                uart_cmd, target_level, str(e))
218            if level == target_level:
219              logging.debug('Managed to set %s to %s.', uart_cmd, level)
220            else:
221              logging.debug('Failed to set %s to %s. Got %s.', uart_cmd,
222                            target_level, level)
223        return level == target_level
224
225    def start_capture(self):
226        """Start capturing UART streams."""
227        for uart in self._UartToCapture:
228            if self._start_stop_capture(uart, True):
229                self._streams.append(('%s_uart_stream' % uart, '%s_uart.log' %
230                                      uart))
231
232    def dump(self):
233        """Dump UART streams to log files accordingly."""
234        if not self.logs_dir:
235            return
236
237        for stream, logfile in self._streams:
238            logfile_fullname = os.path.join(self.logs_dir, logfile)
239            try:
240                content = self._servo.get(stream)
241            except Exception as err:
242                logging.warn('Failed to get UART log for %s: %s', stream, err)
243                continue
244
245            if content == 'not_applicable':
246                logging.warn('%s is not applicable', stream)
247                continue
248
249            # The UART stream may contain non-printable characters, and servo
250            # returns it in string representation. We use `ast.leteral_eval`
251            # to revert it back.
252            with open(logfile_fullname, 'a') as fd:
253                try:
254                    fd.write(ast.literal_eval(content))
255                except ValueError:
256                    logging.exception('Invalid value for %s: %r', stream,
257                                      content)
258
259    def stop_capture(self):
260        """Stop capturing UART streams."""
261        for uart in self._UartToCapture:
262            try:
263                self._start_stop_capture(uart, False)
264            except Exception as err:
265                logging.warn('Failed to stop UART logging for %s: %s', uart,
266                             err)
267
268
269class Servo(object):
270
271    """Manages control of a Servo board.
272
273    Servo is a board developed by hardware group to aide in the debug and
274    control of various partner devices. Servo's features include the simulation
275    of pressing the power button, closing the lid, and pressing Ctrl-d. This
276    class manages setting up and communicating with a servo demon (servod)
277    process. It provides both high-level functions for common servo tasks and
278    low-level functions for directly setting and reading gpios.
279
280    """
281
282    # Power button press delays in seconds.
283    #
284    # The EC specification says that 8.0 seconds should be enough
285    # for the long power press.  However, some platforms need a bit
286    # more time.  Empirical testing has found these requirements:
287    #   Alex: 8.2 seconds
288    #   ZGB:  8.5 seconds
289    # The actual value is set to the largest known necessary value.
290    #
291    # TODO(jrbarnette) Being generous is the right thing to do for
292    # existing platforms, but if this code is to be used for
293    # qualification of new hardware, we should be less generous.
294    SHORT_DELAY = 0.1
295
296    # Maximum number of times to re-read power button on release.
297    GET_RETRY_MAX = 10
298
299    # Delays to deal with DUT state transitions.
300    SLEEP_DELAY = 6
301    BOOT_DELAY = 10
302
303    # Default minimum time interval between 'press' and 'release'
304    # keyboard events.
305    SERVO_KEY_PRESS_DELAY = 0.1
306
307    # Time to toggle recovery switch on and off.
308    REC_TOGGLE_DELAY = 0.1
309
310    # Time to toggle development switch on and off.
311    DEV_TOGGLE_DELAY = 0.1
312
313    # Time between an usb disk plugged-in and detected in the system.
314    USB_DETECTION_DELAY = 10
315    # Time to keep USB power off before and after USB mux direction is changed
316    USB_POWEROFF_DELAY = 2
317
318    # Time to wait before timing out on servo initialization.
319    INIT_TIMEOUT_SECS = 10
320
321
322    def __init__(self, servo_host, servo_serial=None):
323        """Sets up the servo communication infrastructure.
324
325        @param servo_host: A ServoHost object representing
326                           the host running servod.
327        @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost
328        @param servo_serial: Serial number of the servo board.
329        """
330        # TODO(fdeng): crbug.com/298379
331        # We should move servo_host object out of servo object
332        # to minimize the dependencies on the rest of Autotest.
333        self._servo_host = servo_host
334        self._servo_serial = servo_serial
335        self._server = servo_host.get_servod_server_proxy()
336        self._servo_type = self.get_servo_version()
337        self._power_state = _PowerStateController(self)
338        self._uart = _Uart(self)
339        self._usb_state = None
340        self._programmer = None
341        self._prev_log_inode = None
342        self._prev_log_size = 0
343
344    @property
345    def servo_serial(self):
346        """Returns the serial number of the servo board."""
347        return self._servo_serial
348
349    def rotate_servod_logs(self, filename=None, directory=None):
350        """Save the latest servod log into a local directory, then rotate logs.
351
352        The files will be <filename>.DEBUG, <filename>.INFO, <filename>.WARNING,
353        or just <filename>.log if not using split level logging.
354
355        @param filename: local filename prefix (no file extension) to use.
356                         If None, rotate log but don't save it.
357        @param directory: local directory to save logs into (if unset, use cwd)
358        """
359        if self.is_localhost():
360            # Local servod usually runs without log-dir, so can't be collected.
361            # TODO(crbug.com/1011516): allow localhost when rotation is enabled
362            return
363
364        log_dir = '/var/log/servod_%s' % self._servo_host.servo_port
365
366        if filename:
367            logging.info("Saving servod logs: %s/%s.*", directory or '.',
368                         filename)
369            # TODO(crrev.com/c/1793030): remove no-level case once CL is pushed
370            for level_name in ('', 'DEBUG', 'INFO', 'WARNING'):
371
372                remote_path = os.path.join(log_dir, 'latest')
373                if level_name:
374                    remote_path += '.%s' % level_name
375
376                local_path = '%s.%s' % (filename, level_name or 'log')
377                if directory:
378                    local_path = os.path.join(directory, local_path)
379
380                try:
381                    self._servo_host.get_file(
382                            remote_path, local_path, try_rsync=False)
383
384                except error.AutoservRunError as e:
385                    result = e.result_obj
386                    if result.exit_status != 0:
387                        stderr = result.stderr.strip()
388
389                        # File not existing is okay, but warn for anything else.
390                        if 'no such' not in stderr.lower():
391                            logging.warn(
392                                    "Couldn't retrieve servod log: %s",
393                                    stderr or '\n%s' % result)
394
395                try:
396                    if os.stat(local_path).st_size == 0:
397                        os.unlink(local_path)
398                except EnvironmentError:
399                    pass
400
401        else:
402            # No filename given, so caller wants to discard the log lines.
403            # Remove the symlinks to prevent old log-dir links from being
404            # picked up multiple times when using servod without log-dir.
405            remote_path = os.path.join(log_dir, 'latest*')
406            self._servo_host.run(
407                    "rm %s" % remote_path,
408                    stderr_tee=None, ignore_status=True)
409
410        # Servod log rotation renames current log, then creates a new file with
411        # the old name: log.<date> -> log.<date>.1.tbz2 -> log.<date>.2.tbz2
412
413        # Must rotate after copying, or the copy would be the new, empty file.
414        try:
415            self.set_nocheck('rotate_servod_logs', 'yes')
416        except ControlUnavailableError as e:
417            # Missing control (possibly old servod)
418            logging.warn("Couldn't rotate servod logs: %s", str(e))
419        except error.TestFail:
420            # Control exists but gave an error; don't let it fail the test.
421            # The error is already logged in set_nocheck().
422            pass
423
424    def get_power_state_controller(self):
425        """Return the power state controller for this Servo.
426
427        The power state controller provides board-independent
428        interfaces for reset, power-on, power-off operations.
429
430        """
431        return self._power_state
432
433
434    def initialize_dut(self, cold_reset=False, enable_main=True):
435        """Initializes a dut for testing purposes.
436
437        This sets various servo signals back to default values
438        appropriate for the target board.  By default, if the DUT
439        is already on, it stays on.  If the DUT is powered off
440        before initialization, its state afterward is unspecified.
441
442        Rationale:  Basic initialization of servo sets the lid open,
443        when there is a lid.  This operation won't affect powered on
444        units; however, setting the lid open may power on a unit
445        that's off, depending on the board type and previous state
446        of the device.
447
448        If `cold_reset` is a true value, the DUT and its EC will be
449        reset, and the DUT rebooted in normal mode.
450
451        @param cold_reset If True, cold reset the device after
452                          initialization.
453        @param enable_main If True, make sure the main servo device has
454                           control of the dut.
455
456        """
457        if enable_main:
458            self.enable_main_servo_device()
459
460        try:
461            self._server.hwinit()
462        except socket.error as e:
463            e.filename = '%s:%s' % (self._servo_host.hostname,
464                                    self._servo_host.servo_port)
465            raise
466        self._usb_state = None
467        if self.has_control('usb_mux_oe1'):
468            self.set('usb_mux_oe1', 'on')
469            self.switch_usbkey('off')
470        else:
471            logging.warning('Servod command \'usb_mux_oe1\' is not available. '
472                            'Any USB drive related servo routines will fail.')
473        self._uart.start_capture()
474        if cold_reset:
475            if not self._power_state.supported:
476                logging.info('Cold-reset for DUT requested, but servo '
477                             'setup does not support power_state. Skipping.')
478            else:
479                self._power_state.reset()
480        logging.debug('Servo initialized, version is %s',
481                      self._server.get_version())
482        if self.has_control('init_keyboard'):
483            # This indicates the servod version does not
484            # have explicit keyboard initialization yet.
485            # Ignore this.
486            # TODO(coconutruben): change this back to set() about a month
487            # after crrev.com/c/1586239 has been merged (or whenever that
488            # logic is in the labstation images).
489            self.set_nocheck('init_keyboard','on')
490
491
492    def is_localhost(self):
493        """Is the servod hosted locally?
494
495        Returns:
496          True if local hosted; otherwise, False.
497        """
498        return self._servo_host.is_localhost()
499
500
501    def get_os_version(self):
502        """Returns the chromeos release version."""
503        lsb_release_content = self.system_output('cat /etc/lsb-release',
504                                                 ignore_status=True)
505        return lsbrelease_utils.get_chromeos_release_builder_path(
506                    lsb_release_content=lsb_release_content)
507
508
509    def get_servod_version(self):
510        """Returns the servod version."""
511        result = self._servo_host.run('servod --version')
512        # TODO: use system_output once servod --version prints to stdout
513        stdout = result.stdout.strip()
514        return stdout if stdout else result.stderr.strip()
515
516
517    def power_long_press(self):
518        """Simulate a long power button press."""
519        # After a long power press, the EC may ignore the next power
520        # button press (at least on Alex).  To guarantee that this
521        # won't happen, we need to allow the EC one second to
522        # collect itself.
523        # long_press is defined as 8.5s in servod
524        self.set_nocheck('power_key', 'long_press')
525
526
527    def power_normal_press(self):
528        """Simulate a normal power button press."""
529        # press is defined as 1.2s in servod
530        self.set_nocheck('power_key', 'press')
531
532
533    def power_short_press(self):
534        """Simulate a short power button press."""
535        # tab is defined as 0.2s in servod
536        self.set_nocheck('power_key', 'tab')
537
538
539    def power_key(self, press_secs='tab'):
540        """Simulate a power button press.
541
542        @param press_secs: int, float, str; time to press key in seconds or
543                           known shorthand: 'tab' 'press' 'long_press'
544        """
545        self.set_nocheck('power_key', press_secs)
546
547
548    def pwr_button(self, action='press'):
549        """Simulate a power button press.
550
551        @param action: str; could be press or could be release.
552        """
553        self.set_nocheck('pwr_button', action)
554
555
556    def lid_open(self):
557        """Simulate opening the lid and raise exception if all attempts fail"""
558        self.set('lid_open', 'yes')
559
560
561    def lid_close(self):
562        """Simulate closing the lid and raise exception if all attempts fail
563
564        Waits 6 seconds to ensure the device is fully asleep before returning.
565        """
566        self.set('lid_open', 'no')
567        time.sleep(Servo.SLEEP_DELAY)
568
569
570    def vbus_power_get(self):
571        """Get current vbus_power."""
572        return self.get('vbus_power')
573
574
575    def volume_up(self, timeout=300):
576        """Simulate pushing the volume down button.
577
578        @param timeout: Timeout for setting the volume.
579        """
580        self.set_get_all(['volume_up:yes',
581                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
582                          'volume_up:no'])
583        # we need to wait for commands to take effect before moving on
584        time_left = float(timeout)
585        while time_left > 0.0:
586            value = self.get('volume_up')
587            if value == 'no':
588                return
589            time.sleep(self.SHORT_DELAY)
590            time_left = time_left - self.SHORT_DELAY
591        raise error.TestFail("Failed setting volume_up to no")
592
593    def volume_down(self, timeout=300):
594        """Simulate pushing the volume down button.
595
596        @param timeout: Timeout for setting the volume.
597        """
598        self.set_get_all(['volume_down:yes',
599                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
600                          'volume_down:no'])
601        # we need to wait for commands to take effect before moving on
602        time_left = float(timeout)
603        while time_left > 0.0:
604            value = self.get('volume_down')
605            if value == 'no':
606                return
607            time.sleep(self.SHORT_DELAY)
608            time_left = time_left - self.SHORT_DELAY
609        raise error.TestFail("Failed setting volume_down to no")
610
611    def ctrl_d(self, press_secs='tab'):
612        """Simulate Ctrl-d simultaneous button presses.
613
614        @param press_secs: int, float, str; time to press key in seconds or
615                           known shorthand: 'tab' 'press' 'long_press'
616        """
617        self.set_nocheck('ctrl_d', press_secs)
618
619
620    def ctrl_u(self, press_secs='tab'):
621        """Simulate Ctrl-u simultaneous button presses.
622
623        @param press_secs: int, float, str; time to press key in seconds or
624                           known shorthand: 'tab' 'press' 'long_press'
625        """
626        self.set_nocheck('ctrl_u', press_secs)
627
628
629    def ctrl_enter(self, press_secs='tab'):
630        """Simulate Ctrl-enter simultaneous button presses.
631
632        @param press_secs: int, float, str; time to press key in seconds or
633                           known shorthand: 'tab' 'press' 'long_press'
634        """
635        self.set_nocheck('ctrl_enter', press_secs)
636
637
638    def ctrl_key(self, press_secs='tab'):
639        """Simulate Enter key button press.
640
641        @param press_secs: int, float, str; time to press key in seconds or
642                           known shorthand: 'tab' 'press' 'long_press'
643        """
644        self.set_nocheck('ctrl_key', press_secs)
645
646
647    def enter_key(self, press_secs='tab'):
648        """Simulate Enter key button press.
649
650        @param press_secs: int, float, str; time to press key in seconds or
651                           known shorthand: 'tab' 'press' 'long_press'
652        """
653        self.set_nocheck('enter_key', press_secs)
654
655
656    def refresh_key(self, press_secs='tab'):
657        """Simulate Refresh key (F3) button press.
658
659        @param press_secs: int, float, str; time to press key in seconds or
660                           known shorthand: 'tab' 'press' 'long_press'
661        """
662        self.set_nocheck('refresh_key', press_secs)
663
664
665    def ctrl_refresh_key(self, press_secs='tab'):
666        """Simulate Ctrl and Refresh (F3) simultaneous press.
667
668        This key combination is an alternative of Space key.
669
670        @param press_secs: int, float, str; time to press key in seconds or
671                           known shorthand: 'tab' 'press' 'long_press'
672        """
673        self.set_nocheck('ctrl_refresh_key', press_secs)
674
675
676    def imaginary_key(self, press_secs='tab'):
677        """Simulate imaginary key button press.
678
679        Maps to a key that doesn't physically exist.
680
681        @param press_secs: int, float, str; time to press key in seconds or
682                           known shorthand: 'tab' 'press' 'long_press'
683        """
684        self.set_nocheck('imaginary_key', press_secs)
685
686
687    def sysrq_x(self, press_secs='tab'):
688        """Simulate Alt VolumeUp X simulataneous press.
689
690        This key combination is the kernel system request (sysrq) X.
691
692        @param press_secs: int, float, str; time to press key in seconds or
693                           known shorthand: 'tab' 'press' 'long_press'
694        """
695        self.set_nocheck('sysrq_x', press_secs)
696
697
698    def toggle_recovery_switch(self):
699        """Toggle recovery switch on and off."""
700        self.enable_recovery_mode()
701        time.sleep(self.REC_TOGGLE_DELAY)
702        self.disable_recovery_mode()
703
704
705    def enable_recovery_mode(self):
706        """Enable recovery mode on device."""
707        self.set('rec_mode', 'on')
708
709
710    def disable_recovery_mode(self):
711        """Disable recovery mode on device."""
712        self.set('rec_mode', 'off')
713
714
715    def toggle_development_switch(self):
716        """Toggle development switch on and off."""
717        self.enable_development_mode()
718        time.sleep(self.DEV_TOGGLE_DELAY)
719        self.disable_development_mode()
720
721
722    def enable_development_mode(self):
723        """Enable development mode on device."""
724        self.set('dev_mode', 'on')
725
726
727    def disable_development_mode(self):
728        """Disable development mode on device."""
729        self.set('dev_mode', 'off')
730
731    def boot_devmode(self):
732        """Boot a dev-mode device that is powered off."""
733        self.power_short_press()
734        self.pass_devmode()
735
736
737    def pass_devmode(self):
738        """Pass through boot screens in dev-mode."""
739        time.sleep(Servo.BOOT_DELAY)
740        self.ctrl_d()
741        time.sleep(Servo.BOOT_DELAY)
742
743
744    def get_board(self):
745        """Get the board connected to servod."""
746        return self._server.get_board()
747
748
749    def get_base_board(self):
750        """Get the board of the base connected to servod."""
751        try:
752            return self._server.get_base_board()
753        except  xmlrpclib.Fault as e:
754            # TODO(waihong): Remove the following compatibility check when
755            # the new versions of hdctools are deployed.
756            if 'not supported' in str(e):
757                logging.warning('The servod is too old that get_base_board '
758                        'not supported.')
759                return ''
760            raise
761
762
763    def get_ec_active_copy(self):
764        """Get the active copy of the EC image."""
765        return self.get('ec_active_copy')
766
767
768    def _get_xmlrpclib_exception(self, xmlexc):
769        """Get meaningful exception string from xmlrpc.
770
771        Args:
772            xmlexc: xmlrpclib.Fault object
773
774        xmlrpclib.Fault.faultString has the following format:
775
776        <type 'exception type'>:'actual error message'
777
778        Parse and return the real exception from the servod side instead of the
779        less meaningful one like,
780           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
781           attribute 'hw_driver'">
782
783        Returns:
784            string of underlying exception raised in servod.
785        """
786        return re.sub('^.*>:', '', xmlexc.faultString)
787
788    def has_control(self, ctrl_name, prefix=''):
789        """Query servod server to determine if |ctrl_name| is a valid control.
790
791        @param ctrl_name Name of the control.
792        @param prefix: prefix to route control to correct servo device.
793
794        @returns: true if |ctrl_name| is a known control, false otherwise.
795        """
796        cltr_name = self._build_ctrl_name(ctrl_name, prefix)
797        try:
798            # If the control exists, doc() will work.
799            self._server.doc(ctrl_name)
800            return True
801        except xmlrpclib.Fault as e:
802            if re.search('No control %s' % ctrl_name,
803                         self._get_xmlrpclib_exception(e)):
804                return False
805            raise e
806
807    def _build_ctrl_name(self, ctrl_name, prefix):
808        """Helper to build the control name if a prefix is used.
809
810        @param ctrl_name Name of the control.
811        @param prefix: prefix to route control to correct servo device.
812
813        @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty.
814        """
815        assert ctrl_name
816        if prefix:
817            return '%s.%s' % (prefix, ctrl_name)
818        return ctrl_name
819
820    def get(self, ctrl_name, prefix=''):
821        """Get the value of a gpio from Servod.
822
823        @param ctrl_name Name of the control.
824        @param prefix: prefix to route control to correct servo device.
825
826        @returns: server response to |ctrl_name| request.
827
828        @raise ControlUnavailableError: if |ctrl_name| not a known control.
829        @raise error.TestFail: for all other failures doing get().
830        """
831        cltr_name = self._build_ctrl_name(ctrl_name, prefix)
832        try:
833            return self._server.get(ctrl_name)
834        except  xmlrpclib.Fault as e:
835            err_str = self._get_xmlrpclib_exception(e)
836            err_msg = "Getting '%s' :: %s" % (ctrl_name, err_str)
837            unknown_ctrl = re.findall(NO_CONTROL_RE, err_str)
838            if unknown_ctrl:
839                raise ControlUnavailableError('No control named %r' %
840                                              unknown_ctrl[0])
841            else:
842                logging.error(err_msg)
843                raise error.TestFail(err_msg)
844
845
846    def set(self, ctrl_name, ctrl_value, prefix=''):
847        """Set and check the value of a gpio using Servod.
848
849        @param ctrl_name: Name of the control.
850        @param ctrl_value: New setting for the control.
851        @param prefix: prefix to route control to correct servo device.
852        @raise error.TestFail: if the control value fails to change.
853        """
854        cltr_name = self._build_ctrl_name(ctrl_name, prefix)
855        self.set_nocheck(ctrl_name, ctrl_value)
856        retry_count = Servo.GET_RETRY_MAX
857        actual_value = self.get(ctrl_name)
858        while ctrl_value != actual_value and retry_count:
859            logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value,
860                            retry_count)
861            retry_count -= 1
862            time.sleep(Servo.SHORT_DELAY)
863            actual_value = self.get(ctrl_name)
864
865        if ctrl_value != actual_value:
866            raise error.TestFail(
867                    'Servo failed to set %s to %s. Got %s.'
868                    % (ctrl_name, ctrl_value, actual_value))
869
870
871    def set_nocheck(self, ctrl_name, ctrl_value, prefix=''):
872        """Set the value of a gpio using Servod.
873
874        @param ctrl_name Name of the control.
875        @param ctrl_value New setting for the control.
876        @param prefix: prefix to route control to correct servo device.
877
878        @raise ControlUnavailableError: if |ctrl_name| not a known control.
879        @raise error.TestFail: for all other failures doing set().
880        """
881        cltr_name = self._build_ctrl_name(ctrl_name, prefix)
882        # The real danger here is to pass a None value through the xmlrpc.
883        assert ctrl_value is not None
884        logging.debug('Setting %s to %r', ctrl_name, ctrl_value)
885        try:
886            self._server.set(ctrl_name, ctrl_value)
887        except  xmlrpclib.Fault as e:
888            err_str = self._get_xmlrpclib_exception(e)
889            err_msg = "Setting '%s' :: %s" % (ctrl_name, err_str)
890            unknown_ctrl = re.findall(NO_CONTROL_RE, err_str)
891            if unknown_ctrl:
892                raise ControlUnavailableError('No control named %r' %
893                                              unknown_ctrl[0])
894            else:
895                logging.error(err_msg)
896                raise error.TestFail(err_msg)
897
898
899    def set_get_all(self, controls):
900        """Set &| get one or more control values.
901
902        @param controls: list of strings, controls to set &| get.
903
904        @raise: error.TestError in case error occurs setting/getting values.
905        """
906        rv = []
907        try:
908            logging.debug('Set/get all: %s', str(controls))
909            rv = self._server.set_get_all(controls)
910        except xmlrpclib.Fault as e:
911            # TODO(waihong): Remove the following backward compatibility when
912            # the new versions of hdctools are deployed.
913            if 'not supported' in str(e):
914                logging.warning('The servod is too old that set_get_all '
915                        'not supported. Use set and get instead.')
916                for control in controls:
917                    if ':' in control:
918                        (name, value) = control.split(':')
919                        if name == 'sleep':
920                            time.sleep(float(value))
921                        else:
922                            self.set_nocheck(name, value)
923                        rv.append(True)
924                    else:
925                        rv.append(self.get(name))
926            else:
927                err_msg = "Problem with '%s' :: %s" % \
928                    (controls, self._get_xmlrpclib_exception(e))
929                raise error.TestFail(err_msg)
930        return rv
931
932
933    # TODO(waihong) It may fail if multiple servo's are connected to the same
934    # host. Should look for a better way, like the USB serial name, to identify
935    # the USB device.
936    # TODO(sbasi) Remove this code from autoserv once firmware tests have been
937    # updated.
938    def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT):
939        """Probe the USB disk device plugged-in the servo from the host side.
940
941        It uses servod to discover if there is a usb device attached to it.
942
943        @param timeout The timeout period when probing for the usb host device.
944
945        @return: String of USB disk path (e.g. '/dev/sdb') or None.
946        """
947        # Set up Servo's usb mux.
948        self.switch_usbkey('host')
949        return self._server.probe_host_usb_dev(timeout) or None
950
951
952    def image_to_servo_usb(self, image_path=None,
953                           make_image_noninteractive=False):
954        """Install an image to the USB key plugged into the servo.
955
956        This method may copy any image to the servo USB key including a
957        recovery image or a test image.  These images are frequently used
958        for test purposes such as restoring a corrupted image or conducting
959        an upgrade of ec/fw/kernel as part of a test of a specific image part.
960
961        @param image_path Path on the host to the recovery image.
962        @param make_image_noninteractive Make the recovery image
963                                   noninteractive, therefore the DUT
964                                   will reboot automatically after
965                                   installation.
966        """
967        # We're about to start plugging/unplugging the USB key.  We
968        # don't know the state of the DUT, or what it might choose
969        # to do to the device after hotplug.  To avoid surprises,
970        # force the DUT to be off.
971        self._server.hwinit()
972        if self.has_control('init_keyboard'):
973            # This indicates the servod version does not
974            # have explicit keyboard initialization yet.
975            # Ignore this.
976            # TODO(coconutruben): change this back to set() about a month
977            # after crrev.com/c/1586239 has been merged (or whenever that
978            # logic is in the labstation images).
979            self.set_nocheck('init_keyboard','on')
980        self._power_state.power_off()
981
982        if image_path:
983            # Set up Servo's usb mux.
984            self.switch_usbkey('host')
985            logging.info('Searching for usb device and copying image to it. '
986                         'Please wait a few minutes...')
987            if not self._server.download_image_to_usb(image_path):
988                logging.error('Failed to transfer requested image to USB. '
989                              'Please take a look at Servo Logs.')
990                raise error.AutotestError('Download image to usb failed.')
991            if make_image_noninteractive:
992                logging.info('Making image noninteractive')
993                if not self._server.make_image_noninteractive():
994                    logging.error('Failed to make image noninteractive. '
995                                  'Please take a look at Servo Logs.')
996
997    def boot_in_recovery_mode(self):
998        """Boot host DUT in recovery mode."""
999        self._power_state.power_on(rec_mode=self._power_state.REC_ON)
1000        self.switch_usbkey('dut')
1001
1002
1003    def install_recovery_image(self, image_path=None,
1004                               make_image_noninteractive=False):
1005        """Install the recovery image specified by the path onto the DUT.
1006
1007        This method uses google recovery mode to install a recovery image
1008        onto a DUT through the use of a USB stick that is mounted on a servo
1009        board specified by the usb_dev.  If no image path is specified
1010        we use the recovery image already on the usb image.
1011
1012        @param image_path: Path on the host to the recovery image.
1013        @param make_image_noninteractive: Make the recovery image
1014                noninteractive, therefore the DUT will reboot automatically
1015                after installation.
1016        """
1017        self.image_to_servo_usb(image_path, make_image_noninteractive)
1018        # Give the DUT some time to power_off if we skip
1019        # download image to usb. (crbug.com/982993)
1020        if not image_path:
1021            time.sleep(10)
1022        self.boot_in_recovery_mode()
1023
1024
1025    def _scp_image(self, image_path):
1026        """Copy image to the servo host.
1027
1028        When programming a firmware image on the DUT, the image must be
1029        located on the host to which the servo device is connected. Sometimes
1030        servo is controlled by a remote host, in this case the image needs to
1031        be transferred to the remote host. This adds the servod port number, to
1032        make sure tests for different DUTs don't trample on each other's files.
1033
1034        @param image_path: a string, name of the firmware image file to be
1035               transferred.
1036        @return: a string, full path name of the copied file on the remote.
1037        """
1038        name = os.path.basename(image_path)
1039        remote_name = 'dut_%s.%s' % (self._servo_host.servo_port, name)
1040        dest_path = os.path.join('/tmp', remote_name)
1041        logging.info('Copying %s to %s', name, dest_path)
1042        self._servo_host.send_file(image_path, dest_path)
1043        return dest_path
1044
1045
1046    def system(self, command, timeout=3600):
1047        """Execute the passed in command on the servod host.
1048
1049        @param command Command to be executed.
1050        @param timeout Maximum number of seconds of runtime allowed. Default to
1051                       1 hour.
1052        """
1053        logging.info('Will execute on servo host: %s', command)
1054        self._servo_host.run(command, timeout=timeout)
1055
1056
1057    def system_output(self, command, timeout=3600,
1058                      ignore_status=False, args=()):
1059        """Execute the passed in command on the servod host, return stdout.
1060
1061        @param command a string, the command to execute
1062        @param timeout an int, max number of seconds to wait til command
1063               execution completes. Default to 1 hour.
1064        @param ignore_status a Boolean, if true - ignore command's nonzero exit
1065               status, otherwise an exception will be thrown
1066        @param args a tuple of strings, each becoming a separate command line
1067               parameter for the command
1068        @return command's stdout as a string.
1069        """
1070        return self._servo_host.run(command, timeout=timeout,
1071                                    ignore_status=ignore_status,
1072                                    args=args).stdout.strip()
1073
1074
1075    def get_servo_version(self, active=False):
1076        """Get the version of the servo, e.g., servo_v2 or servo_v3.
1077
1078        @param active: Only return the servo type with the active device.
1079        @return: The version of the servo.
1080
1081        """
1082        servo_type = self._server.get_version()
1083        if '_and_' not in servo_type or not active:
1084            return servo_type
1085
1086        # If servo v4 is using ccd and servo micro, modify the servo type to
1087        # reflect the active device.
1088        active_device = self.get('active_v4_device')
1089        if active_device in servo_type:
1090            logging.info('%s is active', active_device)
1091            return 'servo_v4_with_' + active_device
1092
1093        logging.warn("%s is active even though it's not in servo type",
1094                     active_device)
1095        return servo_type
1096
1097
1098    def get_main_servo_device(self):
1099        """Return the main servo device"""
1100        return self._servo_type.split('_with_')[-1].split('_and_')[0]
1101
1102
1103    def enable_main_servo_device(self):
1104        """Make sure the main device has control of the dut."""
1105        # Cr50 detects servo using the EC uart. It doesn't work well if the
1106        # board doesn't use EC uart. The lab active_v4_device doesn't handle
1107        # this correctly. Check ec_uart_pty before trying to change the active
1108        # device.
1109        # TODO(crbug.com/1016842): reenable the setting the main device when
1110        # active device works on labstations.
1111        return
1112        if not self.has_control('active_v4_device'):
1113            return
1114        self.set('active_v4_device', self.get_main_servo_device())
1115
1116
1117    def main_device_is_ccd(self):
1118        """Whether the main servo device (no prefixes) is a ccd device."""
1119        servo = self._server.get_version()
1120        return 'ccd_cr50' in servo and 'servo_micro' not in servo
1121
1122
1123    def main_device_is_flex(self):
1124        """Whether the main servo device (no prefixes) is a legacy device."""
1125        return not self.main_device_is_ccd()
1126
1127
1128    def main_device_is_active(self):
1129        """Return whether the main device is the active device.
1130
1131        This is only relevant for a dual setup with ccd and legacy on the same
1132        DUT. The main device is the servo that has no prefix on its controls.
1133        This helper answers the question whether that device is also the
1134        active device or not.
1135        """
1136        # TODO(coconutruben): The current implementation of the dual setup only
1137        # ever has legacy as the main device. Therefore, it suffices to ask
1138        # whether the active device is ccd.
1139        if not self.dts_mode_is_valid():
1140            # Use dts support as a proxy to whether the servo setup could
1141            # support a dual role. Only those setups now support legacy and ccd.
1142            return True
1143        active_device = self.get('active_v4_device')
1144        return 'ccd_cr50' not in active_device
1145
1146    def _initialize_programmer(self, rw_only=False):
1147        """Initialize the firmware programmer.
1148
1149        @param rw_only: True to initialize a programmer which only
1150                        programs the RW portions.
1151        """
1152        if self._programmer:
1153            return
1154        # Initialize firmware programmer
1155        if self._servo_type.startswith('servo_v2'):
1156            self._programmer = firmware_programmer.ProgrammerV2(self)
1157            self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
1158        # Both servo v3 and v4 use the same programming methods so just leverage
1159        # ProgrammerV3 for servo v4 as well.
1160        elif (self._servo_type.startswith('servo_v3') or
1161              self._servo_type.startswith('servo_v4')):
1162            self._programmer = firmware_programmer.ProgrammerV3(self)
1163            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
1164        else:
1165            raise error.TestError(
1166                    'No firmware programmer for servo version: %s' %
1167                    self._servo_type)
1168
1169
1170    def program_bios(self, image, rw_only=False):
1171        """Program bios on DUT with given image.
1172
1173        @param image: a string, file name of the BIOS image to program
1174                      on the DUT.
1175        @param rw_only: True to only program the RW portion of BIOS.
1176
1177        """
1178        self._initialize_programmer()
1179        if not self.is_localhost():
1180            image = self._scp_image(image)
1181        if rw_only:
1182            self._programmer_rw.program_bios(image)
1183        else:
1184            self._programmer.program_bios(image)
1185
1186
1187    def program_ec(self, image, rw_only=False):
1188        """Program ec on DUT with given image.
1189
1190        @param image: a string, file name of the EC image to program
1191                      on the DUT.
1192        @param rw_only: True to only program the RW portion of EC.
1193
1194        """
1195        self._initialize_programmer()
1196        if not self.is_localhost():
1197            image = self._scp_image(image)
1198        if rw_only:
1199            self._programmer_rw.program_ec(image)
1200        else:
1201            self._programmer.program_ec(image)
1202
1203
1204    def extract_ec_image(self, board, model, tarball_path):
1205        """Helper function to extract EC image from downloaded tarball.
1206
1207        @param board: The DUT board name.
1208        @param model: The DUT model name.
1209        @param tarball_path: The path of the downloaded build tarball.
1210
1211        @return: Path to extracted EC image.
1212        """
1213
1214        # Ignore extracting EC image and re-programming if not a Chrome EC
1215        chrome_ec = FAFTConfig(board).chrome_ec
1216        if not chrome_ec:
1217            logging.info('Not a Chrome EC, ignore re-programming it')
1218            return None
1219
1220        # Best effort; try to retrieve the EC board from the version as
1221        # reported by the EC.
1222        ec_board = None
1223        try:
1224            ec_board = self.get('ec_board')
1225        except Exception as err:
1226            logging.info('Failed to get ec_board value; ignoring')
1227            pass
1228
1229        # Array of candidates for EC image
1230        ec_image_candidates = ['ec.bin',
1231                               '%s/ec.bin' % model,
1232                               '%s/ec.bin' % board]
1233        if ec_board:
1234          ec_image_candidates.append('%s/ec.bin' % ec_board)
1235
1236        # Extract EC image from tarball
1237        dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC')
1238        ec_image = _extract_image_from_tarball(tarball_path, dest_dir,
1239                                               ec_image_candidates)
1240
1241        # Check if EC image was found and return path or raise error
1242        if ec_image:
1243            return os.path.join(dest_dir, ec_image)
1244        else:
1245            raise error.TestError('Failed to extract EC image from %s',
1246                                  tarball_path)
1247
1248
1249    def extract_bios_image(self, board, model, tarball_path):
1250        """Helper function to extract BIOS image from downloaded tarball.
1251
1252        @param board: The DUT board name.
1253        @param model: The DUT model name.
1254        @param tarball_path: The path of the downloaded build tarball.
1255
1256        @return: Path to extracted BIOS image.
1257        """
1258
1259        # Array of candidates for BIOS image
1260        bios_image_candidates = ['image.bin',
1261                                 'image-%s.bin' % model,
1262                                 'image-%s.bin' % board]
1263
1264        # Extract BIOS image from tarball
1265        dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS')
1266        bios_image = _extract_image_from_tarball(tarball_path, dest_dir,
1267                                                 bios_image_candidates)
1268
1269        # Check if BIOS image was found and return path or raise error
1270        if bios_image:
1271            return os.path.join(dest_dir, bios_image)
1272        else:
1273            raise error.TestError('Failed to extract BIOS image from %s',
1274                                  tarball_path)
1275
1276
1277    def _switch_usbkey_power(self, power_state, detection_delay=False):
1278        """Switch usbkey power.
1279
1280        This function switches usbkey power by setting the value of
1281        'prtctl4_pwren'. If the power is already in the
1282        requested state, this function simply returns.
1283
1284        @param power_state: A string, 'on' or 'off'.
1285        @param detection_delay: A boolean value, if True, sleep
1286                                for |USB_DETECTION_DELAY| after switching
1287                                the power on.
1288        """
1289        # TODO(kevcheng): Forgive me for this terrible hack. This is just to
1290        # handle beaglebones that haven't yet updated and have the
1291        # safe_switch_usbkey_power RPC.  I'll remove this once all beaglebones
1292        # have been updated and also think about a better way to handle
1293        # situations like this.
1294        try:
1295            self._server.safe_switch_usbkey_power(power_state)
1296        except Exception:
1297            self.set('prtctl4_pwren', power_state)
1298        if power_state == 'off':
1299            time.sleep(self.USB_POWEROFF_DELAY)
1300        elif detection_delay:
1301            time.sleep(self.USB_DETECTION_DELAY)
1302
1303
1304    def switch_usbkey(self, usb_state):
1305        """Connect USB flash stick to either host or DUT, or turn USB port off.
1306
1307        This function switches the servo multiplexer to provide electrical
1308        connection between the USB port J3 and either host or DUT side. It
1309        can also be used to turn the USB port off.
1310
1311        Switching to 'dut' or 'host' is accompanied by powercycling
1312        of the USB stick, because it sometimes gets wedged if the mux
1313        is switched while the stick power is on.
1314
1315        @param usb_state: A string, one of 'dut', 'host', or 'off'.
1316                          'dut' and 'host' indicate which side the
1317                          USB flash device is required to be connected to.
1318                          'off' indicates turning the USB port off.
1319
1320        @raise: error.TestError in case the parameter is not 'dut'
1321                'host', or 'off'.
1322        """
1323        if self.get_usbkey_direction() == usb_state:
1324            return
1325
1326        if usb_state == 'off':
1327            self._switch_usbkey_power('off')
1328            self._usb_state = usb_state
1329            return
1330        elif usb_state == 'host':
1331            mux_direction = 'servo_sees_usbkey'
1332        elif usb_state == 'dut':
1333            mux_direction = 'dut_sees_usbkey'
1334        else:
1335            raise error.TestError('Unknown USB state request: %s' % usb_state)
1336
1337        self._switch_usbkey_power('off')
1338        # TODO(kevcheng): Forgive me for this terrible hack. This is just to
1339        # handle beaglebones that haven't yet updated and have the
1340        # safe_switch_usbkey RPC.  I'll remove this once all beaglebones have
1341        # been updated and also think about a better way to handle situations
1342        # like this.
1343        try:
1344            self._server.safe_switch_usbkey(mux_direction)
1345        except Exception:
1346            self.set('usb_mux_sel1', mux_direction)
1347        time.sleep(self.USB_POWEROFF_DELAY)
1348        self._switch_usbkey_power('on', usb_state == 'host')
1349        self._usb_state = usb_state
1350
1351
1352    def get_usbkey_direction(self):
1353        """Get which side USB is connected to or 'off' if usb power is off.
1354
1355        @return: A string, one of 'dut', 'host', or 'off'.
1356        """
1357        if not self._usb_state:
1358            if self.get('prtctl4_pwren') == 'off':
1359                self._usb_state = 'off'
1360            elif self.get('usb_mux_sel1').startswith('dut'):
1361                self._usb_state = 'dut'
1362            else:
1363                self._usb_state = 'host'
1364        return self._usb_state
1365
1366
1367    def set_servo_v4_role(self, role):
1368        """Set the power role of servo v4, either 'src' or 'snk'.
1369
1370        It does nothing if not a servo v4.
1371
1372        @param role: Power role for DUT port on servo v4, either 'src' or 'snk'.
1373        """
1374        if self._servo_type.startswith('servo_v4'):
1375            value = self.get('servo_v4_role')
1376            if value != role:
1377                self.set_nocheck('servo_v4_role', role)
1378            else:
1379                logging.debug('Already in the role: %s.', role)
1380        else:
1381            logging.debug('Not a servo v4, unable to set role to %s.', role)
1382
1383
1384    def supports_built_in_pd_control(self):
1385        """Return whether the servo type supports pd charging and control."""
1386        if 'servo_v4' not in self._servo_type:
1387            # Only servo v4 supports this feature.
1388            logging.info('%r type does not support pd control.',
1389                         self._servo_type)
1390            return False
1391        # On servo v4, it still needs to be the type-c version.
1392        if not self.get('servo_v4_type') == 'type-c':
1393            logging.info('PD controls require a type-c servo v4.')
1394            return False
1395        # Lastly, one cannot really do anything without a plugged in charger.
1396        chg_port_mv = self.get('ppchg5_mv')
1397        if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV:
1398            logging.warn('It appears that no charger is plugged into servo v4. '
1399                         'Charger port voltage: %dmV', chg_port_mv)
1400            return False
1401        logging.info('Charger port voltage: %dmV', chg_port_mv)
1402        return True
1403
1404    def dts_mode_is_valid(self):
1405        """Return whether servo setup supports dts mode control for cr50."""
1406        if 'servo_v4' not in self._servo_type:
1407            # Only servo v4 supports this feature.
1408            logging.debug('%r type does not support dts mode control.',
1409                          self._servo_type)
1410            return False
1411        # On servo v4, it still needs ot be the type-c version.
1412        if not 'type-c' == self.get('servo_v4_type'):
1413            logging.info('DTS controls require a type-c servo v4.')
1414            return False
1415        return True
1416
1417    def dts_mode_is_safe(self):
1418        """Return whether servo setup supports dts mode without losing access.
1419
1420        DTS mode control exists but the main device might go through ccd.
1421        In that case, it's only safe to control dts mode if the main device
1422        is legacy as otherwise the connection to the main device cuts out.
1423        """
1424        return self.dts_mode_is_valid() and self.main_device_is_flex()
1425
1426    def get_dts_mode(self):
1427        """Return servo dts mode.
1428
1429        @returns: on/off whether dts is on or off
1430        """
1431        if not self.dts_mode_is_valid():
1432            logging.info('Not a valid servo setup. Unable to get dts mode.')
1433            return
1434        return self.get('servo_v4_dts_mode')
1435
1436    def set_dts_mode(self, state):
1437        """Set servo dts mode to off or on.
1438
1439        It does nothing if not a servo v4. Disable the ccd watchdog if we're
1440        disabling dts mode. CCD will disconnect. The watchdog only allows CCD
1441        to disconnect for 10 seconds until it kills servod. Disable the
1442        watchdog, so CCD can stay disconnected indefinitely.
1443
1444        @param state: Set servo v4 dts mode 'off' or 'on'.
1445        """
1446        if not self.dts_mode_is_valid():
1447            logging.info('Not a valid servo setup. Unable to set dts mode %s.',
1448                         state)
1449            return
1450
1451        # TODO(mruthven): remove watchdog check once the labstation has been
1452        # updated to have support for modifying the watchdog.
1453        set_watchdog = (self.has_control('watchdog') and
1454                        'ccd' in self._servo_type)
1455        enable_watchdog = state == 'on'
1456
1457        if set_watchdog and not enable_watchdog:
1458            self.set_nocheck('watchdog_remove', 'ccd')
1459
1460        self.set_nocheck('servo_v4_dts_mode', state)
1461
1462        if set_watchdog and enable_watchdog:
1463            self.set_nocheck('watchdog_add', 'ccd')
1464
1465
1466    def _get_servo_type_fw_version(self, servo_type, prefix=''):
1467        """Helper to handle fw retrieval for micro/v4 vs ccd.
1468
1469        @param servo_type: one of 'servo_v4', 'servo_micro', 'ccd_cr50'
1470        @param prefix: whether the control has a prefix
1471
1472        @returns: fw version for non-ccd devices, cr50 version for ccd device
1473        """
1474        if servo_type == 'ccd_cr50':
1475            # ccd_cr50 runs on cr50, so need to query the cr50 fw.
1476            servo_type = 'cr50'
1477        cmd = '%s_version' % servo_type
1478        try:
1479            return self.get(cmd, prefix=prefix)
1480        except error.TestFail:
1481            # Do not fail here, simply report the version as unknown.
1482            logging.warn('Unable to query %r to get servo fw version.', cmd)
1483            return 'unknown'
1484
1485
1486    def get_servo_fw_versions(self):
1487        """Retrieve a summary of attached servos and their firmware.
1488
1489        Note: that only the Google firmware owned servos supports this e.g.
1490        micro, v4, etc. For anything else, the dictionary will have no entry.
1491        If no device is has Google owned firmware (e.g. v3) then the result
1492        is an empty dictionary.
1493
1494        @returns: dict, a collection of each attached servo & their firmware.
1495        """
1496        def get_fw_version_tag(tag, dev):
1497            return '%s_version.%s' % (dev, tag)
1498
1499        fw_versions = {}
1500        if 'servo_v4' not in self._servo_type:
1501            return {}
1502        v4_tag = get_fw_version_tag('support', 'servo_v4')
1503        fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_v4')
1504        if 'with' in self._servo_type:
1505            dut_devs = self._servo_type.split('_with_')[1].split('_and_')
1506            main_tag = get_fw_version_tag('main', dut_devs[0])
1507            fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0])
1508            if len(dut_devs) == 2:
1509                # Right now, the only way for this to happen is for a dual setup
1510                # to exist where ccd is attached on top of servo micro. Thus, we
1511                # know that the prefix is ccd_cr50 and the type is ccd_cr50.
1512                # TODO(coconutruben): If the new servod is not deployed by
1513                # the time that there are more cases of '_and_' devices,
1514                # this needs to be reworked.
1515                dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1])
1516                fw = self._get_servo_type_fw_version(dut_devs[1], 'ccd_cr50')
1517                fw_versions[dual_tag] = fw
1518        return fw_versions
1519
1520    @property
1521    def uart_logs_dir(self):
1522        """Return the directory to save UART logs."""
1523        return self._uart.logs_dir if self._uart else ""
1524
1525
1526    @uart_logs_dir.setter
1527    def uart_logs_dir(self, logs_dir):
1528        """Set directory to save UART logs.
1529
1530        @param logs_dir  String of directory name."""
1531        if self._uart:
1532            self._uart.logs_dir = logs_dir
1533
1534
1535    def close(self):
1536        """Close the servo object."""
1537        if self._uart:
1538            self._uart.stop_capture()
1539            self._uart.dump()
1540            self._uart = None
1541