1# Copyright 2020 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import print_function 6from __future__ import absolute_import 7from __future__ import unicode_literals 8from __future__ import division 9 10import os 11import json 12import base64 13import logging 14 15import common 16from autotest_lib.server.cros.servo.topology import topology_constants as stc 17 18 19class ServoTopologyError(Exception): 20 """ 21 Generic Exception for failures from ServoTopology object. 22 """ 23 pass 24 25 26class MissingServoError(ServoTopologyError): 27 """ 28 Exception to throw when child servo type is missing. 29 """ 30 31 def __init__(self, message, servo_type): 32 self._servo_type = servo_type 33 self.message = message 34 35 def __str__(self): 36 return repr(self.message) 37 38 39class ServoTopology(object): 40 """Class to read, generate and validate servo topology in the lab. 41 42 The class support detection of servo listed in VID_PID_SERVO_TYPES. 43 To save servo topology to host-info date passed two steps: 44 - convert to the json 45 - encode to base64 46 """ 47 # Command to get usb-path to device 48 SERVOD_TOOL_USB_PATH = 'servodtool device -s %s usb-path' 49 50 # Base folder where all servo devices will be enumerated. 51 SERVOS_BASE_PATH = '/sys/bus/usb/devices' 52 53 # Minimal length of usb-path for servo devices connected to the host. 54 MIN_SERVO_PATH = len(SERVOS_BASE_PATH + '/X') 55 56 def __init__(self, servo_host): 57 self._host = servo_host 58 self.reset() 59 60 def read(self, host_info): 61 """Reading servo-topology info.""" 62 logging.info('Reading servo topology info...') 63 self.reset() 64 if not host_info: 65 logging.info('The host_info not provided. Skip reading.') 66 return 67 b64_val = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX) 68 self._topology = _parse_string_as_topology(b64_val) 69 logging.debug('Loaded servo topology: %s', self._topology) 70 if self._topology: 71 logging.info('Servo topology loaded successfully.') 72 73 def save(self, host_info_store): 74 """Saving servo-topology info.""" 75 if self.is_empty(): 76 logging.info('Topology is empty. Skip saving.') 77 return 78 if not host_info_store: 79 logging.info('The host_info_store not provided. Skip saving.') 80 return 81 logging.info('Saving servo topology info...') 82 data = _convert_topology_to_string(self._topology) 83 if not data: 84 logging.info('Servo topology fail to save data.' 85 ' Please file a bug.') 86 return 87 host_info = host_info_store.get() 88 prev_value = host_info.get_label_value(stc.SERVO_TOPOLOGY_LABEL_PREFIX) 89 if prev_value and prev_value == data: 90 logging.info('Servo topology was not changed. Skip saving.') 91 return 92 logging.debug('Previous saved topology: %s', prev_value) 93 host_info.set_version_label(stc.SERVO_TOPOLOGY_LABEL_PREFIX, data) 94 host_info_store.commit(host_info) 95 logging.info('Servo topology saved successfully.') 96 97 def reset(self): 98 """Reset topology to the initialize state. 99 100 All cash will be reset to empty state. 101 """ 102 self._topology = None 103 104 def generate(self): 105 """Read servo data and create topology.""" 106 self.reset() 107 try: 108 self._topology = self._generate() 109 except Exception as e: 110 logging.debug('(Not critical) %s', e) 111 logging.info('Fail to generate servo-topology') 112 if not self.is_empty(): 113 logging.info('Servo topology successfully generated.') 114 115 def is_empty(self): 116 """If topology data was initialized.""" 117 return not bool(self._topology) 118 119 def validate(self, raise_error=False, dual_set=False, compare=False): 120 """Validate topology against expected topology. 121 122 Validation against: 123 - set-up expectation: min one child or 2 for DUAL_V4 124 - last saved topology: check if any device missed 125 126 Update topology cache if validation passed successfully. 127 128 @params raise_error: raise error if validate did not pass otherwise 129 return False. 130 @params dual_set: Check if servo expect DUAL_V4 setup. 131 @params compare: Validate against saved topology. 132 """ 133 new_st = self._generate() 134 logging.debug("Generate topology: %s", new_st) 135 if not new_st or not new_st.get(stc.ST_DEVICE_MAIN): 136 message = 'Main device is not detected' 137 return self._process_error(message, raise_error) 138 children = new_st.get(stc.ST_DEVICE_CHILDREN) 139 # basic setup has to have minimum one child. 140 if not children or len(children) < 1: 141 message = 'Each setup has at least one child' 142 return self._process_error(message, raise_error) 143 children_types = [c.get(stc.ST_DEVICE_TYPE) for c in children] 144 # DUAL_V4 setup has to have cr50 and one more child. 145 if dual_set: 146 if stc.ST_CR50_TYPE not in children_types: 147 return self._missing_servo_error(stc.ST_CR50_TYPE, raise_error) 148 if len(children) < 2: 149 message = 'Expected two children but have only one' 150 return self._process_error(message, raise_error) 151 if compare and not self.is_empty(): 152 main_device = new_st.get(stc.ST_DEVICE_MAIN) 153 t = self._topology 154 old_main = t.get(stc.ST_DEVICE_MAIN) 155 old_children = t.get(stc.ST_DEVICE_CHILDREN) 156 if not all([ 157 old_children, 158 old_main, 159 old_main.get(stc.ST_DEVICE_HUB_PORT), 160 ]): 161 # Old data is invalid for comparasing 162 return True 163 if not self._equal_item(old_main, main_device): 164 message = 'Main servo was changed' 165 return self._process_error(message, raise_error) 166 for child in old_children: 167 old_type = child.get(stc.ST_DEVICE_TYPE) 168 if old_type not in children_types: 169 return self._missing_servo_error(old_type, raise_error) 170 if len(children) < len(old_children): 171 message = 'Some child is missed' 172 return self._process_error(message, raise_error) 173 logging.info('Servo topology successfully verified.') 174 self._topology = new_st 175 return True 176 177 def is_servo_serial_provided(self): 178 """Verify that root servo serial is provided.""" 179 root_servo_serial = self._host.servo_serial 180 if not root_servo_serial: 181 logging.info('Root servo serial is not provided.') 182 return False 183 logging.debug('Root servo serial: %s', root_servo_serial) 184 return True 185 186 def _process_error(self, message, raise_error): 187 if not raise_error: 188 logging.info('Validate servo topology failed with: %s', message) 189 return False 190 raise ServoTopologyError(message) 191 192 def _missing_servo_error(self, servo_type, raise_error): 193 message = 'Missed servo: %s!' % servo_type 194 if not raise_error: 195 logging.info('Validate servo topology failed with: %s', message) 196 return False 197 raise MissingServoError(message, servo_type) 198 199 def _equal_item(self, old, new): 200 """Servo was replugged to another port""" 201 for field in stc.SERVO_TOPOLOGY_ITEM_COMPARE_FIELDS: 202 if old.get(field) != new.get(field): 203 return False 204 return True 205 206 def _generate(self): 207 """Generate and return topology structure. 208 209 Read and generate topology structure with out update the state. 210 """ 211 logging.debug('Trying generate a servo-topology') 212 if not self.is_servo_serial_provided(): 213 return 214 root_servo_serial = self._host.servo_serial 215 root_servo = None 216 children = [] 217 devices = self.get_list_of_devices() 218 for device in devices: 219 if not device.is_good(): 220 logging.info('Skip %s as missing some data', device) 221 continue 222 if device.get_serial_number() == root_servo_serial: 223 root_servo = device.get_topology_item() 224 else: 225 children.append(device.get_topology_item()) 226 if not root_servo: 227 logging.debug('Root servo missed!') 228 return None 229 topology = { 230 stc.ST_DEVICE_MAIN: root_servo, 231 stc.ST_DEVICE_CHILDREN: children 232 } 233 logging.debug('Servo topology: %s', topology) 234 return topology 235 236 def _get_servo_hub_path(self, servo_serial): 237 """Get path to the servo hub. 238 239 The root servo is connected directly to the servo-hub. To find other 240 servos connected to the hub we need find the path to the servo-hub. 241 The servod-tool always return direct path to the servo, like: 242 /sys/bus/usb/devices/1-3.2.1 243 base path: /sys/bus/usb/devices/ 244 root-servo: 1-3.2.1 245 the alternative path is '/sys/bus/usb/devices/1-3.2/1-3.2.1/' 246 where '1-3.2' is path to servo-hub. To extract path to servo-hub 247 logic parse parse and remove last digit of the port where root servo 248 connected to the servo-hub. 249 base path: /sys/bus/usb/devices/ 250 servo-hub: 1-3.2 251 root-servo: .1 252 After we will join only base path with servo-hub. 253 254 @params servo_serial Serial number of the servo connected to hub 255 @returns: A string representation of fs-path to servo-hub device 256 """ 257 logging.debug('Try to find a hub-path for servo:%s', servo_serial) 258 cmd_hub = self.SERVOD_TOOL_USB_PATH % servo_serial 259 servo_path = self._read_line(cmd_hub) 260 logging.debug('Servo %s path: %s', servo_serial, servo_path) 261 if not servo_path or len(servo_path) < self.MIN_SERVO_PATH: 262 logging.info('Servo not detected.') 263 return None 264 base_path = os.path.dirname(servo_path) 265 root_servo_tail = os.path.basename(servo_path) 266 # Removing last port as 267 servo_hub_tail = '.'.join(root_servo_tail.split('.')[:-1]) 268 return os.path.join(base_path, servo_hub_tail) 269 270 def get_root_servo(self): 271 """Get root servo device. 272 273 @returns: ConnectedServo if device found. 274 """ 275 logging.debug('Try to find a root servo') 276 if not self.is_servo_serial_provided(): 277 return None 278 # Find the path to the servo-hub folder. 279 root_servo_serial = self._host.servo_serial 280 cmd_hub = self.SERVOD_TOOL_USB_PATH % root_servo_serial 281 servo_path = self._read_line(cmd_hub) 282 logging.debug('Servo %s path: %s', root_servo_serial, servo_path) 283 if not servo_path or len(servo_path) < self.MIN_SERVO_PATH: 284 logging.info('Servo not detected.') 285 return None 286 device = self._get_device(servo_path) 287 if device and device.is_good(): 288 return device 289 return None 290 291 def get_root_servo_from_cache(self): 292 """Get root servo device based on topology cache data. 293 294 First we try to find servo based on topology info. 295 296 @returns: ConnectedServo if device found. 297 """ 298 logging.info('Trying to find root device from topology cache!') 299 if (not self._topology or not self._topology.get(stc.ST_DEVICE_MAIN)): 300 logging.info('Topology cache is empty or not present') 301 return None 302 devpath = self._topology.get( 303 stc.ST_DEVICE_MAIN)[stc.ST_DEVICE_HUB_PORT] 304 logging.debug('devpath=%s', devpath) 305 if not devpath: 306 return None 307 # devpath represent sequence of ports used to detect device 308 device_fs_port = '1-%s' % devpath 309 logging.debug('device_fs_port=%s', device_fs_port) 310 device_path = os.path.join(self.SERVOS_BASE_PATH, device_fs_port) 311 device = self._get_device(device_path) 312 logging.info('device=%s', device) 313 if device and device.is_good(): 314 return device 315 logging.debug('Trying to verify present of the hub!') 316 hub_folder = '.'.join(device_fs_port.split('.')[:-1]) 317 logging.debug('servo_hub_folder=%s', hub_folder) 318 hub_product = os.path.join(self.SERVOS_BASE_PATH, hub_folder, 319 'product') 320 logging.debug('hub_product=%s', hub_product) 321 hub_name = self._read_line('cat %s' % hub_product) 322 logging.debug('hub_name=%s', hub_name) 323 if hub_name: 324 raise ServoTopologyError( 325 'Root servo hardware potentially missing!') 326 raise ServoTopologyError( 327 'No USB device on expected port for the servo!') 328 329 def get_list_of_devices(self): 330 """Generate list of devices with serials. 331 332 Logic based on detecting all device enumerated under servo-hub device. 333 334 @returns: Collection of detected device connected to the servo-hub. 335 """ 336 logging.debug('Trying generate device-a servo-topology') 337 if not self.is_servo_serial_provided(): 338 return [] 339 # Find the path to the servo-hub folder. 340 hub_path = self._get_servo_hub_path(self._host.servo_serial) 341 logging.debug('Servo hub path: %s', hub_path) 342 if not hub_path: 343 return [] 344 345 # Find all serial filed of devices under servo-hub. Each device 346 # has to have serial number. 347 devices_cmd = 'find %s/* -name serial' % hub_path 348 devices = self._read_multilines(devices_cmd) 349 children = [] 350 for device in devices: 351 logging.debug('Child device %s', device) 352 device_dir = os.path.dirname(device) 353 child = self._get_device(device_dir) 354 if not child: 355 logging.debug('Child missed some data.') 356 continue 357 children.append(child) 358 logging.debug('Detected devices: %s', len(children)) 359 return children 360 361 def update_servo_version(self, device=None): 362 """Update version of servo device. 363 364 @params device: ConnectedServo instance. 365 """ 366 if not device: 367 logging.debug('Device is not provided') 368 return 369 device._version = self._read_file(device.get_path(), 'configuration') 370 logging.debug('New servo version: %s', device.get_version()) 371 372 def get_list_available_servos(self): 373 """List all servos enumerated on the host.""" 374 logging.debug('Started process to collect all devices on the host.') 375 devices = [] 376 # Looking only devices with Google vendor-id (18d1). 377 cmd = 'grep -s -R "18d1" %s/*/idVendor' % self.SERVOS_BASE_PATH 378 result_paths = self._read_multilines(cmd) 379 for path in result_paths: 380 idVendor_path = path.split(':')[0] 381 if not idVendor_path: 382 logging.debug('Cannot extract path to file from: %s', path) 383 continue 384 base_path = os.path.dirname(idVendor_path) 385 if not base_path: 386 logging.debug('Cannot extract base path from: %s', 387 idVendor_path) 388 continue 389 device = self._get_device(base_path) 390 if not device: 391 logging.debug('Not found device under: %s', base_path) 392 continue 393 devices.append(device) 394 return devices 395 396 def _get_vid_pid(self, path): 397 """Read VID and PID of the device. 398 399 @params path Absolute path to the device in FS. 400 @returns: A string representation VID:PID of device. 401 """ 402 vid = self._read_file(path, 'idVendor') 403 pid = self._read_file(path, 'idProduct') 404 if not vid or not pid: 405 return None 406 vid_pid = '%s:%s' % (vid, pid) 407 logging.debug("VID/PID of device device: '%s'", vid_pid) 408 return vid_pid 409 410 def _get_device(self, path): 411 """Create device representation. 412 413 @params path: Absolute path to the device in FS. 414 @returns: ConnectedServo if VID/PID present. 415 """ 416 vid_pid = self._get_vid_pid(path) 417 if not vid_pid: 418 return None 419 serial = self._read_file(path, 'serial') 420 product = self._read_file(path, 'product') 421 hub_path = self._read_file(path, 'devpath') 422 configuration = self._read_file(path, 'configuration') 423 servo_type = stc.VID_PID_SERVO_TYPES.get(vid_pid) 424 if not servo_type: 425 return None 426 return ConnectedServo(device_path=path, 427 device_product=product, 428 device_serial=serial, 429 device_type=servo_type, 430 device_vid_pid=vid_pid, 431 device_hub_path=hub_path, 432 device_version=configuration) 433 434 def _read_file(self, path, file_name): 435 """Read context of the file and return result as one line. 436 437 If execution finished with error result will be empty string. 438 439 @params path: Path to the folder where file located. 440 @params file_name: The file name to read. 441 """ 442 if not path or not file_name: 443 return '' 444 f = os.path.join(path, file_name) 445 return self._read_line('cat %s' % f) 446 447 def _read_line(self, command): 448 """Execute terminal command and return result as one line. 449 450 If execution finished with error result will be empty string. 451 452 @params command: String to execute. 453 """ 454 r = self._host.run(command, ignore_status=True, timeout=30) 455 if r.exit_status == 0: 456 return r.stdout.strip() 457 return '' 458 459 def _read_multilines(self, command): 460 """Execute terminal command and return result as multi-line. 461 462 If execution finished with error result will be an empty array. 463 464 @params command: String to execute. 465 """ 466 r = self._host.run(command, ignore_status=True, timeout=30) 467 if r.exit_status == 0: 468 return r.stdout.splitlines() 469 return [] 470 471 472class ConnectedServo(object): 473 """Class to hold info about connected detected.""" 474 475 def __init__(self, 476 device_path=None, 477 device_product=None, 478 device_serial=None, 479 device_type=None, 480 device_vid_pid=None, 481 device_hub_path=None, 482 device_version=None): 483 self._path = device_path 484 self._product = device_product 485 self._serial = device_serial 486 self._type = device_type 487 self._vid_pid = device_vid_pid 488 self._hub_path = device_hub_path 489 self._version = device_version 490 491 def get_topology_item(self): 492 """Extract as topology item.""" 493 return { 494 stc.ST_DEVICE_SERIAL: self._serial, 495 stc.ST_DEVICE_TYPE: self._type, 496 stc.ST_DEVICE_PRODUCT: self._product, 497 stc.ST_DEVICE_HUB_PORT: self._hub_path 498 } 499 500 def is_good(self): 501 """Check if minimal data for topology item is present.""" 502 return self._serial and self._type and self._hub_path 503 504 def get_type(self): 505 """Servo type.""" 506 return self._type 507 508 def get_path(self): 509 """Path to servo folder in sysfs.""" 510 return self._path 511 512 def get_serial_number(self): 513 """Servo serial number.""" 514 return self._serial 515 516 def get_version(self): 517 """Get servo version.""" 518 return self._version 519 520 def __str__(self): 521 return ("Device %s:%s (%s, %s) version: %s" % 522 (self._type, self._serial, self._vid_pid, self._hub_path, 523 self._version)) 524 525 526def _convert_topology_to_string(topology): 527 """Convert topology to the string respresentation. 528 529 Convert topology to json and encode by Base64 for host-info file. 530 531 @params topology: Servo topology data 532 @returns: topology representation in Base64 string 533 """ 534 if not topology: 535 return '' 536 try: 537 # generate json similar to golang to avoid extra updates 538 json_string = json.dumps(topology, separators=(',', ':')) 539 logging.debug('Servo topology (json): %s', json_string) 540 except Exception as e: 541 logging.debug('(Not critical) %s', e) 542 logging.info('Failed to convert topology to json') 543 return '' 544 try: 545 # recommended to convert to the bytes for python 3 546 b64_string = base64.b64encode(json_string.encode("utf-8")) 547 logging.debug('Servo topology (b64): %s', b64_string) 548 return b64_string.decode() 549 except Exception as e: 550 logging.debug('(Not critical) %s', e) 551 logging.info('Failed to convert topology to base64') 552 return '' 553 554 555def _parse_string_as_topology(src): 556 """Parse and load servo topology from string. 557 558 Decode Base64 and load as json of servo-topology data. 559 560 @params src: topology representation in Base64 string 561 @returns: servo topology data 562 """ 563 if not src: 564 logging.debug('Servo topology data not present in host-info.') 565 return None 566 try: 567 json_string = base64.b64decode(src) 568 logging.debug('Servo topology (json) from host-info: %s', json_string) 569 return json.loads(json_string) 570 except Exception as e: 571 logging.debug('(Not critical) %s', e) 572 logging.info('Fail to read servo-topology from host-info.') 573 return None 574