• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2
2# Copyright 2020 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import print_function
7from __future__ import absolute_import
8from __future__ import unicode_literals
9from __future__ import division
10
11import os
12import copy
13import json
14import base64
15import logging
16
17import common
18from autotest_lib.client.common_lib import hosts
19from autotest_lib.server.cros.servo.topology import topology_constants as stc
20
21
22class ServoTopologyError(Exception):
23    """
24    Generic Exception for failures from ServoTopology object.
25    """
26    pass
27
28
29class MissingServoError(ServoTopologyError):
30    """
31    Exception to throw when child servo type is missing.
32    """
33
34    def __init__(self, message, servo_type):
35        self._servo_type = servo_type
36        self.message = message
37
38    def __str__(self):
39        return repr(self.message)
40
41
42class ServoTopology(object):
43    """Class to read, generate and validate servo topology in the lab.
44
45    The class support detection of servo listed in ST_PRODUCT_TYPES.
46    To save servo topology to host-info date passed two steps:
47       - convert to the json
48       - encode to base64
49    """
50
51    def __init__(self, servo_host):
52        self._host = servo_host
53        self._topology = None
54
55    def read(self, host_info):
56        """Reading servo-topology info."""
57        logging.info('Reading servo topology info...')
58        self._topology = None
59        if not host_info:
60            logging.info('The host_info not provided. Skip reading.')
61            return
62        b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
63        self._topology = _parse_string_as_topology(b64_val)
64        logging.debug('Loaded servo topology: %s', self._topology)
65        if self._topology:
66            logging.info('Servo topology loaded successfully.')
67
68    def save(self, host_info_store):
69        """Saving servo-topology info."""
70        if self.is_empty():
71            logging.info('Topology is empty. Skip saving.')
72            return
73        if not host_info_store:
74            logging.info('The host_info_store not provided. Skip saving.')
75            return
76        logging.info('Saving servo topology info...')
77        data = _convert_topology_to_string(self._topology)
78        if not data:
79            logging.info('Servo topology fail to save data.'
80                         ' Please file a bug.')
81            return
82        host_info = host_info_store.get()
83        prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
84        if prev_value and prev_value == data:
85            logging.info('Servo topology was not changed. Skip saving.')
86            return
87        logging.debug('Previous saved topology: %s', prev_value)
88        host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data)
89        host_info_store.commit(host_info)
90        logging.info('Servo topology saved successfully.')
91
92    def generate(self):
93        """Read servo data and create topology."""
94        try:
95            self._topology = self._generate()
96        except Exception as e:
97            self._topology = None
98            logging.debug('(Not critical) %s', e)
99            logging.info('Fail to generate servo-topology')
100        if not self.is_empty():
101            logging.info('Servo topology successfully generated.')
102
103    def is_empty(self):
104        """If topology data was initialized."""
105        return not bool(self._topology)
106
107    def validate(self, raise_error=False, dual_set=False, compare=False):
108        """Validate topology against expected topology.
109
110        Validation against:
111        - set-up expectation: min one child or 2 for DUAL_V4
112        - last saved topology: check if any device missed
113
114        @params raise_error: raise error if validate did not pass otherwise
115                             return False.
116        @params dual_set:    Check if servo expect DUAL_V4 setup.
117        @params compare:     Validate against saved topology.
118        """
119        new_st = self._generate()
120        if not new_st or not new_st.get(stc.ST_DEVICE_MAIN):
121            message = 'Main device is not detected'
122            return self._process_error(message, raise_error)
123        children = new_st.get(stc.ST_DEVICE_CHILDREN)
124        # basic setup has to have minimum one child.
125        if not children or len(children) < 1:
126            message = 'Each setup has at least one child'
127            return self._process_error(message, raise_error)
128        children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children]
129        # DUAL_V4 setup has to have cr50 and one more child.
130        if dual_set:
131            if stc.ST_CR50_TYPE not in children_types:
132                return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error)
133            if len(children) < 2:
134                message = 'Expected two children but have only one'
135                return self._process_error(message, raise_error)
136        if compare and not self.is_empty():
137            main_device = new_st.get(stc.ST_DEVICE_MAIN)
138            t = self._topology
139            old_main = t.get(stc.ST_DEVICE_MAIN)
140            old_children = t.get(stc.ST_DEVICE_CHILDREN)
141            if not all([
142                    old_children,
143                    old_main,
144                    old_main.get(stc.ST_DEVICE_HUB_PORT),
145            ]):
146                # Old data is invalid for comparasing
147                return True
148            if not self._equal_item(old_main, main_device):
149                message = 'Main servo was changed'
150                return self._process_error(message, raise_error)
151            for child in old_children:
152                old_type = child.get(stc.ST_DEVICE_TYPE)
153                if old_type not in children_types:
154                    return self._missing_servo_error(old_type, raise_error)
155            if len(children) < len(old_children):
156                message = 'Some child is missed'
157                return self._process_error(message, raise_error)
158        logging.info('Servo topology successfully verified.')
159        return True
160
161    def _process_error(self, message, raise_error):
162        if not raise_error:
163            logging.info('Validate servo topology failed with: %s', message)
164            return False
165        raise ServoTopologyError(message)
166
167    def _missing_servo_error(self, servo_type, raise_error):
168        message = 'Missed servo: %s!' % servo_type
169        if not raise_error:
170            logging.info('Validate servo topology failed with: %s', message)
171            return False
172        raise MissingServoError(message, servo_type)
173
174    def _equal_item(self, old, new):
175        """Servo was replugged to another port"""
176        for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS:
177            if old.get(field) != new.get(field):
178                return False
179        return True
180
181    def _generate(self):
182        """Generate and return topology structure.
183
184        Read and generate topology structure with out update the state.
185        """
186        logging.debug('Trying generate a servo-topology')
187        core_servo_serial = self._host.servo_serial
188        if not core_servo_serial:
189            logging.info('Servo serial is not provided.')
190            return None
191        logging.debug('Getting topology for core servo: %s', core_servo_serial)
192        # collect main device info
193        cmd_hub = 'servodtool device -s %s usb-path' % core_servo_serial
194        servo_path = self._read_line(cmd_hub)
195        logging.debug('Device -%s path: %s', core_servo_serial, servo_path)
196        if not servo_path:
197            logging.info('Core servo not detected.')
198            return None
199        if not self._is_expected_type(servo_path):
200            return None
201        main_device = self._read_device_info(servo_path)
202        if not main_device:
203            logging.debug('Core device missed some data')
204            return None
205        # collect child device info
206        children = []
207        hub_path = servo_path[0:-2]
208        logging.debug('Core hub path: %s', hub_path)
209        devices_cmd = 'find %s/* -name serial' % hub_path
210        devices = self._read_multilines(devices_cmd)
211        core_device_port = main_device.get(stc.ST_DEVICE_HUB_PORT)
212        for device in devices:
213            logging.debug('Child device %s', device)
214            device_dir = os.path.dirname(device)
215            if not self._is_expected_type(device_dir):
216                # skip not expected device type like USB or hubs
217                continue
218            child = self._read_device_info(device_dir)
219            if not child:
220                logging.debug('Child missed some data.')
221                continue
222            if core_device_port == child.get(stc.ST_DEVICE_HUB_PORT):
223                logging.debug('Skip device if match with core device')
224                continue
225            children.append(child)
226        topology = {
227                stc.ST_DEVICE_MAIN: main_device,
228                stc.ST_DEVICE_CHILDREN: children
229        }
230        logging.debug('Servo topology: %s', topology)
231        return topology
232
233    def _is_expected_type(self, path):
234        """Check if device type is known servo type.
235
236        Please update ST_PRODUCT_TYPES to extend more servo types.
237        """
238        product = self._read_file(path, 'product')
239        if bool(stc.ST_PRODUCT_TYPES.get(product)):
240            return True
241        logging.info('Unknown product: %s', product)
242        return False
243
244    def _read_device_info(self, path):
245        """Read device details for topology.
246
247        @params path: Absolute path to the device in FS.
248        """
249        serial = self._read_file(path, 'serial')
250        product = self._read_file(path, 'product')
251        hub_path = self._read_file(path, 'devpath')
252        stype = stc.ST_PRODUCT_TYPES.get(product)
253        return self._create_item(serial, stype, product, hub_path)
254
255    def _create_item(self, servo_serial, servo_type, product, hub_path):
256        """Create topology item.
257
258        Return created item only if all details provided.
259
260        @params servo_serial:   Serial number of device.
261        @params servo_type:     Product type code of the device.
262        @params product:        Product name of the device.
263        @params hub_path:       Device enumerated folder name. Show the
264                                chain of used ports to connect the device.
265        """
266        item = {
267                stc.ST_DEVICE_SERIAL: servo_serial,
268                stc.ST_DEVICE_TYPE: servo_type,
269                stc.ST_DEVICE_PRODUCT: product,
270                stc.ST_DEVICE_HUB_PORT: hub_path
271        }
272        if not (servo_serial and servo_type and product and hub_path):
273            logging.debug('Some data missing: %s', item)
274            return None
275        return item
276
277    def _read_file(self, path, file_name):
278        """Read context of the file and return result as one line.
279
280        If execution finished with error result will be empty string.
281
282        @params path:       Path to the folder where file located.
283        @params file_name:  The file name to read.
284        """
285        if not path or not file_name:
286            return ''
287        f = os.path.join(path, file_name)
288        return self._read_line('cat %s' % f)
289
290    def _read_line(self, command):
291        """Execute terminal command and return result as one line.
292
293        If execution finished with error result will be empty string.
294
295        @params command:    String to execute.
296        """
297        r = self._host.run(command, ignore_status=True, timeout=30)
298        if r.exit_status == 0:
299            return r.stdout.strip()
300        return ''
301
302    def _read_multilines(self, command):
303        """Execute terminal command and return result as multi-line.
304
305        If execution finished with error result will be an empty array.
306
307        @params command:    String to execute.
308        """
309        r = self._host.run(command, ignore_status=True, timeout=30)
310        if r.exit_status == 0:
311            return r.stdout.splitlines()
312        return []
313
314
315def _convert_topology_to_string(topology):
316    """Convert topology to the string respresentation.
317
318    Convert topology to json and encode by Base64 for host-info file.
319
320    @params topology: Servo topology data
321    @returns: topology representation in Base64 string
322    """
323    if not topology:
324        return ''
325    try:
326        # generate json similar to golang to avoid extra updates
327        json_string = json.dumps(topology, separators=(',', ':'))
328        logging.debug('Servo topology (json): %s', json_string)
329    except Exception as e:
330        logging.debug('(Not critical) %s', e)
331        logging.info('Failed to convert topology to json')
332        return ''
333    try:
334        # recommended to convert to the bytes for python 3
335        b64_string = base64.b64encode(json_string.encode("utf-8"))
336        logging.debug('Servo topology (b64): %s', b64_string)
337        return b64_string
338    except Exception as e:
339        logging.debug('(Not critical) %s', e)
340        logging.info('Failed to convert topology to base64')
341    return ''
342
343
344def _parse_string_as_topology(src):
345    """Parse and load servo topology from string.
346
347    Decode Base64 and load as json of servo-topology data.
348
349    @params src: topology representation in Base64 string
350    @returns: servo topology data
351    """
352    if not src:
353        logging.debug('Servo topology data not present in host-info.')
354        return None
355    try:
356        json_string = base64.b64decode(src)
357        logging.debug('Servo topology (json) from host-info: %s', json_string)
358        return json.loads(json_string)
359    except Exception as e:
360        logging.debug('(Not critical) %s', e)
361        logging.info('Fail to read servo-topology from host-info.')
362    return None
363