• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import print_function
6from __future__ import absolute_import
7from __future__ import unicode_literals
8from __future__ import division
9
10import os
11import json
12import base64
13import logging
14
15import common
16from autotest_lib.server.cros.servo.topology import topology_constants as stc
17
18
19class ServoTopologyError(Exception):
20    """
21    Generic Exception for failures from ServoTopology object.
22    """
23    pass
24
25
26class MissingServoError(ServoTopologyError):
27    """
28    Exception to throw when child servo type is missing.
29    """
30
31    def __init__(self, message, servo_type):
32        self._servo_type = servo_type
33        self.message = message
34
35    def __str__(self):
36        return repr(self.message)
37
38
39class ServoTopology(object):
40    """Class to read, generate and validate servo topology in the lab.
41
42    The class support detection of servo listed in VID_PID_SERVO_TYPES.
43    To save servo topology to host-info date passed two steps:
44       - convert to the json
45       - encode to base64
46    """
47    # Command to get usb-path to device
48    SERVOD_TOOL_USB_PATH = 'servodtool device -s %s usb-path'
49
50    # Base folder where all servo devices will be enumerated.
51    SERVOS_BASE_PATH = '/sys/bus/usb/devices'
52
53    # Minimal length of usb-path for servo devices connected to the host.
54    MIN_SERVO_PATH = len(SERVOS_BASE_PATH + '/X')
55
56    def __init__(self, servo_host):
57        self._host = servo_host
58        self.reset()
59
60    def read(self, host_info):
61        """Reading servo-topology info."""
62        logging.info('Reading servo topology info...')
63        self.reset()
64        if not host_info:
65            logging.info('The host_info not provided. Skip reading.')
66            return
67        b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
68        self._topology = _parse_string_as_topology(b64_val)
69        logging.debug('Loaded servo topology: %s', self._topology)
70        if self._topology:
71            logging.info('Servo topology loaded successfully.')
72
73    def save(self, host_info_store):
74        """Saving servo-topology info."""
75        if self.is_empty():
76            logging.info('Topology is empty. Skip saving.')
77            return
78        if not host_info_store:
79            logging.info('The host_info_store not provided. Skip saving.')
80            return
81        logging.info('Saving servo topology info...')
82        data = _convert_topology_to_string(self._topology)
83        if not data:
84            logging.info('Servo topology fail to save data.'
85                         ' Please file a bug.')
86            return
87        host_info = host_info_store.get()
88        prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX)
89        if prev_value and prev_value == data:
90            logging.info('Servo topology was not changed. Skip saving.')
91            return
92        logging.debug('Previous saved topology: %s', prev_value)
93        host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data)
94        host_info_store.commit(host_info)
95        logging.info('Servo topology saved successfully.')
96
97    def reset(self):
98        """Reset topology to the initialize state.
99
100        All cash will be reset to empty state.
101        """
102        self._topology = None
103
104    def generate(self):
105        """Read servo data and create topology."""
106        self.reset()
107        try:
108            self._topology = self._generate()
109        except Exception as e:
110            logging.debug('(Not critical) %s', e)
111            logging.info('Fail to generate servo-topology')
112        if not self.is_empty():
113            logging.info('Servo topology successfully generated.')
114
115    def is_empty(self):
116        """If topology data was initialized."""
117        return not bool(self._topology)
118
119    def validate(self, raise_error=False, dual_set=False, compare=False):
120        """Validate topology against expected topology.
121
122        Validation against:
123        - set-up expectation: min one child or 2 for DUAL_V4
124        - last saved topology: check if any device missed
125
126        Update topology cache if validation passed successfully.
127
128        @params raise_error: raise error if validate did not pass otherwise
129                             return False.
130        @params dual_set:    Check if servo expect DUAL_V4 setup.
131        @params compare:     Validate against saved topology.
132        """
133        new_st = self._generate()
134        logging.debug("Generate topology: %s", new_st)
135        if not new_st or not new_st.get(stc.ST_DEVICE_MAIN):
136            message = 'Main device is not detected'
137            return self._process_error(message, raise_error)
138        children = new_st.get(stc.ST_DEVICE_CHILDREN)
139        # basic setup has to have minimum one child.
140        if not children or len(children) < 1:
141            message = 'Each setup has at least one child'
142            return self._process_error(message, raise_error)
143        children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children]
144        # DUAL_V4 setup has to have cr50 and one more child.
145        if dual_set:
146            if stc.ST_CR50_TYPE not in children_types:
147                return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error)
148            if len(children) < 2:
149                message = 'Expected two children but have only one'
150                return self._process_error(message, raise_error)
151        if compare and not self.is_empty():
152            main_device = new_st.get(stc.ST_DEVICE_MAIN)
153            t = self._topology
154            old_main = t.get(stc.ST_DEVICE_MAIN)
155            old_children = t.get(stc.ST_DEVICE_CHILDREN)
156            if not all([
157                    old_children,
158                    old_main,
159                    old_main.get(stc.ST_DEVICE_HUB_PORT),
160            ]):
161                # Old data is invalid for comparasing
162                return True
163            if not self._equal_item(old_main, main_device):
164                message = 'Main servo was changed'
165                return self._process_error(message, raise_error)
166            for child in old_children:
167                old_type = child.get(stc.ST_DEVICE_TYPE)
168                if old_type not in children_types:
169                    return self._missing_servo_error(old_type, raise_error)
170            if len(children) < len(old_children):
171                message = 'Some child is missed'
172                return self._process_error(message, raise_error)
173        logging.info('Servo topology successfully verified.')
174        self._topology = new_st
175        return True
176
177    def is_servo_serial_provided(self):
178        """Verify that root servo serial is provided."""
179        root_servo_serial = self._host.servo_serial
180        if not root_servo_serial:
181            logging.info('Root servo serial is not provided.')
182            return False
183        logging.debug('Root servo serial: %s', root_servo_serial)
184        return True
185
186    def _process_error(self, message, raise_error):
187        if not raise_error:
188            logging.info('Validate servo topology failed with: %s', message)
189            return False
190        raise ServoTopologyError(message)
191
192    def _missing_servo_error(self, servo_type, raise_error):
193        message = 'Missed servo: %s!' % servo_type
194        if not raise_error:
195            logging.info('Validate servo topology failed with: %s', message)
196            return False
197        raise MissingServoError(message, servo_type)
198
199    def _equal_item(self, old, new):
200        """Servo was replugged to another port"""
201        for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS:
202            if old.get(field) != new.get(field):
203                return False
204        return True
205
206    def _generate(self):
207        """Generate and return topology structure.
208
209        Read and generate topology structure with out update the state.
210        """
211        logging.debug('Trying generate a servo-topology')
212        if not self.is_servo_serial_provided():
213            return
214        root_servo_serial = self._host.servo_serial
215        root_servo = None
216        children = []
217        devices = self.get_list_of_devices()
218        for device in devices:
219            if not device.is_good():
220                logging.info('Skip %s as missing some data', device)
221                continue
222            if device.get_serial_number() == root_servo_serial:
223                root_servo = device.get_topology_item()
224            else:
225                children.append(device.get_topology_item())
226        if not root_servo:
227            logging.debug('Root servo missed!')
228            return None
229        topology = {
230                stc.ST_DEVICE_MAIN: root_servo,
231                stc.ST_DEVICE_CHILDREN: children
232        }
233        logging.debug('Servo topology: %s', topology)
234        return topology
235
236    def _get_servo_hub_path(self, servo_serial):
237        """Get path to the servo hub.
238
239        The root servo is connected directly to the servo-hub. To find other
240        servos connected to the hub we need find the path to the servo-hub.
241        The servod-tool always return direct path to the servo, like:
242            /sys/bus/usb/devices/1-3.2.1
243            base path:  /sys/bus/usb/devices/
244            root-servo:  1-3.2.1
245        the alternative path is '/sys/bus/usb/devices/1-3.2/1-3.2.1/'
246        where '1-3.2' is path to servo-hub. To extract path to servo-hub
247        logic parse parse and remove last digit of the port where root servo
248        connected to the servo-hub.
249            base path:  /sys/bus/usb/devices/
250            servo-hub:  1-3.2
251            root-servo: .1
252        After we will join only base path with servo-hub.
253
254        @params servo_serial    Serial number of the servo connected to hub
255        @returns: A string representation of fs-path to servo-hub device
256        """
257        logging.debug('Try to find a hub-path for servo:%s', servo_serial)
258        cmd_hub = self.SERVOD_TOOL_USB_PATH % servo_serial
259        servo_path = self._read_line(cmd_hub)
260        logging.debug('Servo %s path: %s', servo_serial, servo_path)
261        if not servo_path or len(servo_path) < self.MIN_SERVO_PATH:
262            logging.info('Servo not detected.')
263            return None
264        base_path = os.path.dirname(servo_path)
265        root_servo_tail = os.path.basename(servo_path)
266        # Removing last port as
267        servo_hub_tail = '.'.join(root_servo_tail.split('.')[:-1])
268        return os.path.join(base_path, servo_hub_tail)
269
270    def get_root_servo(self):
271        """Get root servo device.
272
273        @returns: ConnectedServo if device found.
274        """
275        logging.debug('Try to find a root servo')
276        if not self.is_servo_serial_provided():
277            return None
278        # Find the path to the servo-hub folder.
279        root_servo_serial = self._host.servo_serial
280        cmd_hub = self.SERVOD_TOOL_USB_PATH % root_servo_serial
281        servo_path = self._read_line(cmd_hub)
282        logging.debug('Servo %s path: %s', root_servo_serial, servo_path)
283        if not servo_path or len(servo_path) < self.MIN_SERVO_PATH:
284            logging.info('Servo not detected.')
285            return None
286        device = self._get_device(servo_path)
287        if device and device.is_good():
288            return device
289        return None
290
291    def get_root_servo_from_cache(self):
292        """Get root servo device based on topology cache data.
293
294        First we try to find servo based on topology info.
295
296        @returns: ConnectedServo if device found.
297        """
298        logging.info('Trying to find root device from topology cache!')
299        if (not self._topology or not self._topology.get(stc.ST_DEVICE_MAIN)):
300            logging.info('Topology cache is empty or not present')
301            return None
302        devpath = self._topology.get(
303                stc.ST_DEVICE_MAIN)[stc.ST_DEVICE_HUB_PORT]
304        logging.debug('devpath=%s', devpath)
305        if not devpath:
306            return None
307        # devpath represent sequence of ports used to detect device
308        device_fs_port = '1-%s' % devpath
309        logging.debug('device_fs_port=%s', device_fs_port)
310        device_path = os.path.join(self.SERVOS_BASE_PATH, device_fs_port)
311        device = self._get_device(device_path)
312        logging.info('device=%s', device)
313        if device and device.is_good():
314            return device
315        logging.debug('Trying to verify present of the hub!')
316        hub_folder = '.'.join(device_fs_port.split('.')[:-1])
317        logging.debug('servo_hub_folder=%s', hub_folder)
318        hub_product = os.path.join(self.SERVOS_BASE_PATH, hub_folder,
319                                   'product')
320        logging.debug('hub_product=%s', hub_product)
321        hub_name = self._read_line('cat %s' % hub_product)
322        logging.debug('hub_name=%s', hub_name)
323        if hub_name:
324            raise ServoTopologyError(
325                    'Root servo hardware potentially missing!')
326        raise ServoTopologyError(
327                'No USB device on expected port for the servo!')
328
329    def get_list_of_devices(self):
330        """Generate list of devices with serials.
331
332        Logic based on detecting all device enumerated under servo-hub device.
333
334        @returns: Collection of detected device connected to the servo-hub.
335        """
336        logging.debug('Trying generate device-a servo-topology')
337        if not self.is_servo_serial_provided():
338            return []
339        # Find the path to the servo-hub folder.
340        hub_path = self._get_servo_hub_path(self._host.servo_serial)
341        logging.debug('Servo hub path: %s', hub_path)
342        if not hub_path:
343            return []
344
345        # Find all serial filed of devices under servo-hub. Each device
346        # has to have serial number.
347        devices_cmd = 'find %s/* -name serial' % hub_path
348        devices = self._read_multilines(devices_cmd)
349        children = []
350        for device in devices:
351            logging.debug('Child device %s', device)
352            device_dir = os.path.dirname(device)
353            child = self._get_device(device_dir)
354            if not child:
355                logging.debug('Child missed some data.')
356                continue
357            children.append(child)
358        logging.debug('Detected devices: %s', len(children))
359        return children
360
361    def update_servo_version(self, device=None):
362        """Update version of servo device.
363
364        @params device: ConnectedServo instance.
365        """
366        if not device:
367            logging.debug('Device is not provided')
368            return
369        device._version = self._read_file(device.get_path(), 'configuration')
370        logging.debug('New servo version: %s', device.get_version())
371
372    def get_list_available_servos(self):
373        """List all servos enumerated on the host."""
374        logging.debug('Started process to collect all devices on the host.')
375        devices = []
376        # Looking only devices with Google vendor-id (18d1).
377        cmd = 'grep -s  -R "18d1" %s/*/idVendor' % self.SERVOS_BASE_PATH
378        result_paths = self._read_multilines(cmd)
379        for path in result_paths:
380            idVendor_path = path.split(':')[0]
381            if not idVendor_path:
382                logging.debug('Cannot extract path to file from: %s', path)
383                continue
384            base_path = os.path.dirname(idVendor_path)
385            if not base_path:
386                logging.debug('Cannot extract base path from: %s',
387                              idVendor_path)
388                continue
389            device = self._get_device(base_path)
390            if not device:
391                logging.debug('Not found device under: %s', base_path)
392                continue
393            devices.append(device)
394        return devices
395
396    def _get_vid_pid(self, path):
397        """Read VID and PID of the device.
398
399        @params path    Absolute path to the device in FS.
400        @returns: A string representation VID:PID of device.
401        """
402        vid = self._read_file(path, 'idVendor')
403        pid = self._read_file(path, 'idProduct')
404        if not vid or not pid:
405            return None
406        vid_pid = '%s:%s' % (vid, pid)
407        logging.debug("VID/PID of device device: '%s'", vid_pid)
408        return vid_pid
409
410    def _get_device(self, path):
411        """Create device representation.
412
413        @params path:   Absolute path to the device in FS.
414        @returns: ConnectedServo if VID/PID present.
415        """
416        vid_pid = self._get_vid_pid(path)
417        if not vid_pid:
418            return None
419        serial = self._read_file(path, 'serial')
420        product = self._read_file(path, 'product')
421        hub_path = self._read_file(path, 'devpath')
422        configuration = self._read_file(path, 'configuration')
423        servo_type = stc.VID_PID_SERVO_TYPES.get(vid_pid)
424        if not servo_type:
425            return None
426        return ConnectedServo(device_path=path,
427                              device_product=product,
428                              device_serial=serial,
429                              device_type=servo_type,
430                              device_vid_pid=vid_pid,
431                              device_hub_path=hub_path,
432                              device_version=configuration)
433
434    def _read_file(self, path, file_name):
435        """Read context of the file and return result as one line.
436
437        If execution finished with error result will be empty string.
438
439        @params path:       Path to the folder where file located.
440        @params file_name:  The file name to read.
441        """
442        if not path or not file_name:
443            return ''
444        f = os.path.join(path, file_name)
445        return self._read_line('cat %s' % f)
446
447    def _read_line(self, command):
448        """Execute terminal command and return result as one line.
449
450        If execution finished with error result will be empty string.
451
452        @params command:    String to execute.
453        """
454        r = self._host.run(command, ignore_status=True, timeout=30)
455        if r.exit_status == 0:
456            return r.stdout.strip()
457        return ''
458
459    def _read_multilines(self, command):
460        """Execute terminal command and return result as multi-line.
461
462        If execution finished with error result will be an empty array.
463
464        @params command:    String to execute.
465        """
466        r = self._host.run(command, ignore_status=True, timeout=30)
467        if r.exit_status == 0:
468            return r.stdout.splitlines()
469        return []
470
471
472class ConnectedServo(object):
473    """Class to hold info about connected detected."""
474
475    def __init__(self,
476                 device_path=None,
477                 device_product=None,
478                 device_serial=None,
479                 device_type=None,
480                 device_vid_pid=None,
481                 device_hub_path=None,
482                 device_version=None):
483        self._path = device_path
484        self._product = device_product
485        self._serial = device_serial
486        self._type = device_type
487        self._vid_pid = device_vid_pid
488        self._hub_path = device_hub_path
489        self._version = device_version
490
491    def get_topology_item(self):
492        """Extract as topology item."""
493        return {
494                stc.ST_DEVICE_SERIAL: self._serial,
495                stc.ST_DEVICE_TYPE: self._type,
496                stc.ST_DEVICE_PRODUCT: self._product,
497                stc.ST_DEVICE_HUB_PORT: self._hub_path
498        }
499
500    def is_good(self):
501        """Check if minimal data for topology item is present."""
502        return self._serial and self._type and self._hub_path
503
504    def get_type(self):
505        """Servo type."""
506        return self._type
507
508    def get_path(self):
509        """Path to servo folder in sysfs."""
510        return self._path
511
512    def get_serial_number(self):
513        """Servo serial number."""
514        return self._serial
515
516    def get_version(self):
517        """Get servo version."""
518        return self._version
519
520    def __str__(self):
521        return ("Device %s:%s (%s, %s) version: %s" %
522                (self._type, self._serial, self._vid_pid, self._hub_path,
523                 self._version))
524
525
526def _convert_topology_to_string(topology):
527    """Convert topology to the string respresentation.
528
529    Convert topology to json and encode by Base64 for host-info file.
530
531    @params topology: Servo topology data
532    @returns: topology representation in Base64 string
533    """
534    if not topology:
535        return ''
536    try:
537        # generate json similar to golang to avoid extra updates
538        json_string = json.dumps(topology, separators=(',', ':'))
539        logging.debug('Servo topology (json): %s', json_string)
540    except Exception as e:
541        logging.debug('(Not critical) %s', e)
542        logging.info('Failed to convert topology to json')
543        return ''
544    try:
545        # recommended to convert to the bytes for python 3
546        b64_string = base64.b64encode(json_string.encode("utf-8"))
547        logging.debug('Servo topology (b64): %s', b64_string)
548        return b64_string.decode()
549    except Exception as e:
550        logging.debug('(Not critical) %s', e)
551        logging.info('Failed to convert topology to base64')
552    return ''
553
554
555def _parse_string_as_topology(src):
556    """Parse and load servo topology from string.
557
558    Decode Base64 and load as json of servo-topology data.
559
560    @params src: topology representation in Base64 string
561    @returns: servo topology data
562    """
563    if not src:
564        logging.debug('Servo topology data not present in host-info.')
565        return None
566    try:
567        json_string = base64.b64decode(src)
568        logging.debug('Servo topology (json) from host-info: %s', json_string)
569        return json.loads(json_string)
570    except Exception as e:
571        logging.debug('(Not critical) %s', e)
572        logging.info('Fail to read servo-topology from host-info.')
573    return None
574