• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17"""
18Class definition of B29 device for controlling the device.
19
20B29 is an engineering device with serial capabilities. It is almost like
21b20 except it has additional features that allow sending commands
22to b10 via one-wire and to pull logs from b10 via one-wire.
23
24Please see https://docs.google.com/document/d/17yJeJRNWxv5E9
25fBvw0sXkgwCBkshU_l4SxWkKgAxVmk/edit for details about available operations.
26"""
27
28import os
29import re
30import time
31
32from acts import tracelogger
33from acts import utils
34from logging import Logger
35
36logging = tracelogger.TakoTraceLogger(Logger(__file__))
37DEVICE_REGEX = (
38    r'_(?P<device_serial>[A-Z0-9]+)-(?P<interface>\w+)\s->\s'
39    r'(\.\./){2}(?P<port>\w+)'
40)
41# TODO: automate getting the latest version from x20
42DEBUG_BRIDGE = ('/google/data/ro/teams/wearables/apollo/ota/jenkins-presubmit/'
43                'ovyalov/master/apollo-sw/CL14060_v2-build13686/v13686/'
44                'automation/apollo_debug_bridge/linux2/apollo_debug_bridge')
45B29_CHIP = 'Cypress_Semiconductor_USBUART'
46
47
48# TODO:
49# as the need arises, additional functionalities of debug_bridge should be
50# integrated
51# TODO:
52# https://docs.google.com/document/d/17yJeJRNWxv5E9fBvw0sXkgwCBkshU_
53# l4SxWkKgAxVmk/edit
54
55class B29Error(Exception):
56    """Module Level Error."""
57
58
59def get_b29_devices():
60    """ Get all available B29 devices.
61
62    Returns:
63      (list) A list of available devices (ex: ['/dev/ttyACM4',...]) or empty
64      list if none found
65    """
66    devices = []
67    result = os.popen('ls -l /dev/serial/by-id/*%s*' % B29_CHIP).read()
68    for line in result.splitlines():
69        match = re.search(DEVICE_REGEX, line)
70        device_serial = match.group('device_serial')
71        log_port = None
72        commander_port = '/dev/' + match.group('port')
73        device = {
74            'commander_port': commander_port,
75            'log_port': log_port,
76            'serial_number': device_serial
77        }
78        devices.append(device)
79    return devices
80
81
82class B29Device(object):
83    """Class to control B29 device."""
84
85    def __init__(self, b29_serial):
86        """ Class to control B29 device
87        Args: String type of serial number (ex: 'D96045152F121B00'
88        """
89        self.serial = b29_serial
90        b29_port = [d['commander_port'] for d in get_b29_devices() if
91                    d['serial_number'] == b29_serial]
92        if not b29_port:
93            logging.error("unable to find b29 with serial number %s" %
94                          b29_serial)
95            raise B29Error(
96                "Recovery failed because b29_serial specified in device "
97                "manifest file is not found or invalid")
98        self.port = b29_port[0]
99        self.ping_match = {'psoc': r'Pings: tx=[\d]* rx=[1-9][0-9]',
100                           'csr': r'count=100, sent=[\d]*, received=[1-9][0-9]',
101                           'charger': r'Pings: tx=[\d]* rx=[1-9][0-9]'}
102        self.fw_version = self._get_version('fw')
103        self.app_version = self._get_version('app')
104
105    def _get_version(self, type='fw'):
106        """ Method to get version of B29
107        Returns:
108            String version if found (ex: '0006'), None otherwise
109        """
110        command = '--serial={}'.format(self.port)
111        debug_bridge_process = self._send_command(command=command)
112        if type == 'fw':
113            version_match = re.compile(r'CHARGER app version: version=([\d]*)')
114        elif type == 'app':
115            version_match = re.compile(r'APP VERSION: ([\d]*)')
116        version_str = self._parse_output_of_running_process(
117            debug_bridge_process, version_match)
118        debug_bridge_process.kill()
119        if version_str:
120            match = version_match.search(version_str)
121            version = match.groups()[0]
122            return version
123        return None
124
125    def _parse_output_of_running_process(self, subprocess, match, timeout=30):
126        """ Parses the logs from subprocess objects and checks to see if a
127        match is found within the allotted time
128        Args:
129            subprocess: object returned by _send_command (which is the same as
130            bject returned by subprocess.Popen()) match: regex match object
131            (what is returned by re.compile(r'<regex>') timeout: int - time to
132            keep retrying before bailing
133
134        """
135        start_time = time.time()
136        success_match = re.compile(match)
137        while start_time + timeout > time.time():
138            out = subprocess.stderr.readline()
139            if success_match.search(out):
140                return out
141            time.sleep(.5)
142        return False
143
144    def _send_command(self, command):
145        """ Send command to b29 using apollo debug bridge
146        Args:
147          command: The command for apollo debug to execute
148        Returns:
149          subprocess object
150        """
151        return utils.start_standing_subprocess(
152            '{} {} {}'.format(DEBUG_BRIDGE, '--rpc_port=-1', command),
153            shell=True)
154
155    def restore_golden_image(self):
156        """ Start a subprocess that calls the debug-bridge executable with
157        options that restores golden image of b10 attached to the b29. The
158        recovery restores the 'golden image' which is available in b10 partition
159         8. The process runs for 120 seconds which is adequate time for the
160         recovery to have completed.
161        """
162        # TODO:
163        # because we are accessing x20, we need to capture error resulting from
164        #  expired prodaccess and report it explicitly
165        # TODO:
166        # possibly file not found error?
167
168        # start the process, wait for two minutes and kill it
169        logging.info('Restoring golden image...')
170        command = '--serial=%s --debug_spi=dfu --sqif_partition=8' % self.port
171        debug_bridge_process = self._send_command(command=command)
172        success_match = re.compile('DFU on partition #8 successfully initiated')
173        if self._parse_output_of_running_process(debug_bridge_process,
174                                                 success_match):
175            logging.info('Golden image restored successfully')
176            debug_bridge_process.kill()
177            return True
178        logging.warning('Failed to restore golden image')
179        debug_bridge_process.kill()
180        return False
181
182    def ping_component(self, component, timeout=30):
183        """ Send ping to the specified component via B290
184        Args:
185            component = 'csr' or 'psoc' or 'charger'
186        Returns:
187            True if successful and False otherwise
188        """
189        if component not in ('csr', 'psoc', 'charger'):
190            raise B29Error('specified parameter for component is not valid')
191        logging.info('Pinging %s via B29...' % component)
192        command = '--serial={} --ping={}'.format(self.port, component)
193        debug_bridge_process = self._send_command(command=command)
194        if self._parse_output_of_running_process(debug_bridge_process,
195                                                 self.ping_match[component],
196                                                 timeout):
197            logging.info('Ping passes')
198            debug_bridge_process.kill()
199            return True
200        else:
201            logging.warning('Ping failed')
202            debug_bridge_process.kill()
203            return False
204
205    def reset_charger(self):
206        """ Send reset command to B29
207        Raises: TimeoutError (lib.utils.TimeoutError) if the device does not
208        come back within 120 seconds
209        """
210        # --charger_reset
211        if int(self.fw_version) >= 6:
212            logging.info('Resetting B29')
213            command = '--serial={} --charger_reset'.format(self.port)
214            reset_charger_process = self._send_command(command=command)
215            time.sleep(2)
216            reset_charger_process.kill()
217            logging.info('Waiting for B29 to become available..')
218            utils.wait_until(lambda: self.ping_component('charger'), 120)
219        else:
220            logging.warning('B20 firmware version %s does not support '
221                            'charger_reset argument' % self.fw_version)
222