1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This class defines the CrosHost Label class.""" 6 7import collections 8import logging 9import os 10import re 11 12import common 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import global_config 16from autotest_lib.client.cros.audio import cras_utils 17from autotest_lib.client.cros.video import constants as video_test_constants 18from autotest_lib.server.cros.dynamic_suite import constants as ds_constants 19from autotest_lib.server.hosts import base_label 20from autotest_lib.server.hosts import common_label 21from autotest_lib.server.hosts import servo_host 22from autotest_lib.site_utils import hwid_lib 23 24# pylint: disable=missing-docstring 25LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board']) 26 27def _parse_lsb_output(host): 28 """Parses the LSB output and returns key data points for labeling. 29 30 @param host: Host that the command will be executed against 31 @returns: LsbOutput with the result of parsing the /etc/lsb-release output 32 """ 33 release_info = utils.parse_cmd_output('cat /etc/lsb-release', 34 run_method=host.run) 35 36 unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1' 37 return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD']) 38 39 40class BoardLabel(base_label.StringPrefixLabel): 41 """Determine the correct board label for the device.""" 42 43 _NAME = ds_constants.BOARD_PREFIX.rstrip(':') 44 45 def generate_labels(self, host): 46 # We only want to apply the board labels once, which is when they get 47 # added to the AFE. That way we don't have to worry about the board 48 # label switching on us if the wrong builds get put on the devices. 49 # crbug.com/624207 records one event of the board label switching 50 # unexpectedly on us. 51 board = host.host_info_store.get().board 52 if board: 53 return [board] 54 for label in host._afe_host.labels: 55 if label.startswith(self._NAME + ':'): 56 return [label.split(':')[-1]] 57 58 return [_parse_lsb_output(host).board] 59 60 61class ModelLabel(base_label.StringPrefixLabel): 62 """Determine the correct model label for the device.""" 63 64 _NAME = ds_constants.MODEL_LABEL 65 66 def generate_labels(self, host): 67 # Based on the issue explained in BoardLabel, return the existing 68 # label if it has already been set once. 69 model = host.host_info_store.get().model 70 if model: 71 return [model] 72 for label in host._afe_host.labels: 73 if label.startswith(self._NAME + ':'): 74 return [label.split(':')[-1]] 75 76 lsb_output = _parse_lsb_output(host) 77 model = None 78 79 if lsb_output.unibuild: 80 test_label_cmd = 'cros_config / test-label' 81 result = host.run(command=test_label_cmd, ignore_status=True) 82 if result.exit_status == 0: 83 model = result.stdout.strip() 84 if not model: 85 mosys_cmd = 'mosys platform model' 86 result = host.run(command=mosys_cmd, ignore_status=True) 87 if result.exit_status == 0: 88 model = result.stdout.strip() 89 90 # We need some sort of backwards compatibility for boards that 91 # are not yet supported with mosys and unified builds. 92 # This is necessary so that we can begin changing cbuildbot to take 93 # advantage of the model/board label differentiations for 94 # scheduling, while still retaining backwards compatibility. 95 return [model or lsb_output.board] 96 97 98class LightSensorLabel(base_label.BaseLabel): 99 """Label indicating if a light sensor is detected.""" 100 101 _NAME = 'lightsensor' 102 _LIGHTSENSOR_SEARCH_DIR = '/sys/bus/iio/devices' 103 _LIGHTSENSOR_FILES = [ 104 "in_illuminance0_input", 105 "in_illuminance_input", 106 "in_illuminance0_raw", 107 "in_illuminance_raw", 108 "illuminance0_input", 109 ] 110 111 def exists(self, host): 112 search_cmd = "find -L %s -maxdepth 4 | egrep '%s'" % ( 113 self._LIGHTSENSOR_SEARCH_DIR, '|'.join(self._LIGHTSENSOR_FILES)) 114 # Run the search cmd following the symlinks. Stderr_tee is set to 115 # None as there can be a symlink loop, but the command will still 116 # execute correctly with a few messages printed to stderr. 117 result = host.run(search_cmd, stdout_tee=None, stderr_tee=None, 118 ignore_status=True) 119 120 return result.exit_status == 0 121 122 123class BluetoothLabel(base_label.BaseLabel): 124 """Label indicating if bluetooth is detected.""" 125 126 _NAME = 'bluetooth' 127 128 def exists(self, host): 129 result = host.run('test -d /sys/class/bluetooth/hci0', 130 ignore_status=True) 131 132 return result.exit_status == 0 133 134 135class ECLabel(base_label.BaseLabel): 136 """Label to determine the type of EC on this host.""" 137 138 _NAME = 'ec:cros' 139 140 def exists(self, host): 141 cmd = 'mosys ec info' 142 # The output should look like these, so that the last field should 143 # match our EC version scheme: 144 # 145 # stm | stm32f100 | snow_v1.3.139-375eb9f 146 # ti | Unknown-10de | peppy_v1.5.114-5d52788 147 # 148 # Non-Chrome OS ECs will look like these: 149 # 150 # ENE | KB932 | 00BE107A00 151 # ite | it8518 | 3.08 152 # 153 # And some systems don't have ECs at all (Lumpy, for example). 154 regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$' 155 156 ecinfo = host.run(command=cmd, ignore_status=True) 157 if ecinfo.exit_status == 0: 158 res = re.search(regexp, ecinfo.stdout) 159 if res: 160 logging.info("EC version is %s", res.groups()[0]) 161 return True 162 logging.info("%s got: %s", cmd, ecinfo.stdout) 163 # Has an EC, but it's not a Chrome OS EC 164 logging.info("%s exited with status %d", cmd, ecinfo.exit_status) 165 return False 166 167 168class Cr50Label(base_label.StringPrefixLabel): 169 """Label indicating the cr50 version.""" 170 171 _NAME = 'cr50' 172 173 def __init__(self): 174 self.ver = None 175 176 177 def exists(self, host): 178 # Make sure the gsctool version command runs ok 179 self.ver = host.run('gsctool -a -f', ignore_status=True) 180 return self.ver.exit_status == 0 181 182 183 def generate_labels(self, host): 184 # Check the major version to determine prePVT vs PVT 185 major_ver = int(re.search('RW \d+\.(\d+)\.\d+[\r\n]', 186 self.ver.stdout).group(1)) 187 # PVT images have a odd major version prePVT have even 188 return ['pvt' if (major_ver % 2) else 'prepvt'] 189 190 191class AccelsLabel(base_label.BaseLabel): 192 """Determine the type of accelerometers on this host.""" 193 194 _NAME = 'accel:cros-ec' 195 196 def exists(self, host): 197 # Check to make sure we have ectool 198 rv = host.run('which ectool', ignore_status=True) 199 if rv.exit_status: 200 logging.info("No ectool cmd found; assuming no EC accelerometers") 201 return False 202 203 # Check that the EC supports the motionsense command 204 rv = host.run('ectool motionsense', ignore_status=True) 205 if rv.exit_status: 206 logging.info("EC does not support motionsense command; " 207 "assuming no EC accelerometers") 208 return False 209 210 # Check that EC motion sensors are active 211 active = host.run('ectool motionsense active').stdout.split('\n') 212 if active[0] == "0": 213 logging.info("Motion sense inactive; assuming no EC accelerometers") 214 return False 215 216 logging.info("EC accelerometers found") 217 return True 218 219 220class ChameleonLabel(base_label.BaseLabel): 221 """Determine if a Chameleon is connected to this host.""" 222 223 _NAME = 'chameleon' 224 225 def exists(self, host): 226 return host._chameleon_host is not None 227 228 229class ChameleonConnectionLabel(base_label.StringPrefixLabel): 230 """Return the Chameleon connection label.""" 231 232 _NAME = 'chameleon' 233 234 def exists(self, host): 235 return host._chameleon_host is not None 236 237 238 def generate_labels(self, host): 239 return [host.chameleon.get_label()] 240 241 242class ChameleonPeripheralsLabel(base_label.StringPrefixLabel): 243 """Return the Chameleon peripherals labels. 244 245 The 'chameleon:bt_hid' label is applied if the bluetooth 246 classic hid device, i.e, RN-42 emulation kit, is detected. 247 248 Any peripherals plugged into the chameleon board would be 249 detected and applied proper labels in this class. 250 """ 251 252 _NAME = 'chameleon' 253 254 def exists(self, host): 255 return host._chameleon_host is not None 256 257 258 def generate_labels(self, host): 259 bt_hid_device = host.chameleon.get_bluetooth_hid_mouse() 260 return ['bt_hid'] if bt_hid_device.CheckSerialConnection() else [] 261 262 263class AudioLoopbackDongleLabel(base_label.BaseLabel): 264 """Return the label if an audio loopback dongle is plugged in.""" 265 266 _NAME = 'audio_loopback_dongle' 267 268 def exists(self, host): 269 nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(), 270 ignore_status=True).stdout 271 if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and 272 cras_utils.node_type_is_plugged('MIC', nodes_info)): 273 return True 274 return False 275 276 277class PowerSupplyLabel(base_label.StringPrefixLabel): 278 """ 279 Return the label describing the power supply type. 280 281 Labels representing this host's power supply. 282 * `power:battery` when the device has a battery intended for 283 extended use 284 * `power:AC_primary` when the device has a battery not intended 285 for extended use (for moving the machine, etc) 286 * `power:AC_only` when the device has no battery at all. 287 """ 288 289 _NAME = 'power' 290 291 def __init__(self): 292 self.psu_cmd_result = None 293 294 295 def exists(self, host): 296 self.psu_cmd_result = host.run(command='mosys psu type', 297 ignore_status=True) 298 return self.psu_cmd_result.stdout.strip() != 'unknown' 299 300 301 def generate_labels(self, host): 302 if self.psu_cmd_result.exit_status: 303 # The psu command for mosys is not included for all platforms. The 304 # assumption is that the device will have a battery if the command 305 # is not found. 306 return ['battery'] 307 return [self.psu_cmd_result.stdout.strip()] 308 309 310class StorageLabel(base_label.StringPrefixLabel): 311 """ 312 Return the label describing the storage type. 313 314 Determine if the internal device is SCSI or dw_mmc device. 315 Then check that it is SSD or HDD or eMMC or something else. 316 317 Labels representing this host's internal device type: 318 * `storage:ssd` when internal device is solid state drive 319 * `storage:hdd` when internal device is hard disk drive 320 * `storage:mmc` when internal device is mmc drive 321 * `storage:nvme` when internal device is NVMe drive 322 * `storage:ufs` when internal device is ufs drive 323 * None When internal device is something else or 324 when we are unable to determine the type 325 """ 326 327 _NAME = 'storage' 328 329 def __init__(self): 330 self.type_str = '' 331 332 333 def exists(self, host): 334 # The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi 335 rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;', 336 '. /usr/share/misc/chromeos-common.sh;', 337 'load_base_vars;', 338 'get_fixed_dst_drive']) 339 rootdev = host.run(command=rootdev_cmd, ignore_status=True) 340 if rootdev.exit_status: 341 logging.info("Fail to run %s", rootdev_cmd) 342 return False 343 rootdev_str = rootdev.stdout.strip() 344 345 if not rootdev_str: 346 return False 347 348 rootdev_base = os.path.basename(rootdev_str) 349 350 mmc_pattern = '/dev/mmcblk[0-9]' 351 if re.match(mmc_pattern, rootdev_str): 352 # Use type to determine if the internal device is eMMC or somthing 353 # else. We can assume that MMC is always an internal device. 354 type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base 355 type = host.run(command=type_cmd, ignore_status=True) 356 if type.exit_status: 357 logging.info("Fail to run %s", type_cmd) 358 return False 359 type_str = type.stdout.strip() 360 361 if type_str == 'MMC': 362 self.type_str = 'mmc' 363 return True 364 365 scsi_pattern = '/dev/sd[a-z]+' 366 if re.match(scsi_pattern, rootdev.stdout): 367 # Read symlink for /sys/block/sd* to determine if the internal 368 # device is connected via ata or usb. 369 link_cmd = 'readlink /sys/block/%s' % rootdev_base 370 link = host.run(command=link_cmd, ignore_status=True) 371 if link.exit_status: 372 logging.info("Fail to run %s", link_cmd) 373 return False 374 link_str = link.stdout.strip() 375 if 'usb' in link_str: 376 return False 377 elif 'ufs' in link_str: 378 self.type_str = 'ufs' 379 return True 380 381 # Read rotation to determine if the internal device is ssd or hdd. 382 rotate_cmd = str('cat /sys/block/%s/queue/rotational' 383 % rootdev_base) 384 rotate = host.run(command=rotate_cmd, ignore_status=True) 385 if rotate.exit_status: 386 logging.info("Fail to run %s", rotate_cmd) 387 return False 388 rotate_str = rotate.stdout.strip() 389 390 rotate_dict = {'0':'ssd', '1':'hdd'} 391 self.type_str = rotate_dict.get(rotate_str) 392 return True 393 394 nvme_pattern = '/dev/nvme[0-9]+n[0-9]+' 395 if re.match(nvme_pattern, rootdev_str): 396 self.type_str = 'nvme' 397 return True 398 399 # All other internal device / error case will always fall here 400 return False 401 402 403 def generate_labels(self, host): 404 return [self.type_str] 405 406 407class ServoLabel(base_label.BaseLabel): 408 """Label to apply if a servo is present.""" 409 410 _NAME = 'servo' 411 412 def exists(self, host): 413 """ 414 Check if the servo label should apply to the host or not. 415 416 @returns True if a servo host is detected, False otherwise. 417 """ 418 servo_host_hostname = None 419 servo_args = servo_host.get_servo_args_for_host(host) 420 if servo_args: 421 servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR) 422 return (servo_host_hostname is not None 423 and servo_host.servo_host_is_up(servo_host_hostname)) 424 425 426class ArcLabel(base_label.BaseLabel): 427 """Label indicates if host has ARC support.""" 428 429 _NAME = 'arc' 430 431 @base_label.forever_exists_decorate 432 def exists(self, host): 433 return 0 == host.run( 434 'grep CHROMEOS_ARC_VERSION /etc/lsb-release', 435 ignore_status=True).exit_status 436 437 438class CtsArchLabel(base_label.StringLabel): 439 """Labels to determine the abi of the CTS bundle (arm or x86 only).""" 440 441 _NAME = ['cts_abi_arm', 'cts_abi_x86', 'cts_cpu_arm', 'cts_cpu_x86'] 442 443 def _get_cts_abis(self, arch): 444 """Return supported CTS ABIs. 445 446 @return List of supported CTS bundle ABIs. 447 """ 448 cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']} 449 return cts_abis.get(arch, []) 450 451 def _get_cts_cpus(self, arch): 452 """Return supported CTS native CPUs. 453 454 This is needed for CTS_Instant scheduling. 455 @return List of supported CTS native CPUs. 456 """ 457 cts_cpus = {'x86_64': ['x86'], 'arm': ['arm']} 458 return cts_cpus.get(arch, []) 459 460 def generate_labels(self, host): 461 cpu_arch = host.get_cpu_arch() 462 abi_labels = ['cts_abi_' + abi for abi in self._get_cts_abis(cpu_arch)] 463 cpu_labels = ['cts_cpu_' + cpu for cpu in self._get_cts_cpus(cpu_arch)] 464 return abi_labels + cpu_labels 465 466 467class VideoGlitchLabel(base_label.BaseLabel): 468 """Label indicates if host supports video glitch detection tests.""" 469 470 _NAME = 'video_glitch_detection' 471 472 def exists(self, host): 473 board = host.get_board().replace(ds_constants.BOARD_PREFIX, '') 474 475 return board in video_test_constants.SUPPORTED_BOARDS 476 477 478class InternalDisplayLabel(base_label.StringLabel): 479 """Label that determines if the device has an internal display.""" 480 481 _NAME = 'internal_display' 482 483 def generate_labels(self, host): 484 from autotest_lib.client.cros.graphics import graphics_utils 485 from autotest_lib.client.common_lib import utils as common_utils 486 487 def __system_output(cmd): 488 return host.run(cmd).stdout 489 490 def __read_file(remote_path): 491 return host.run('cat %s' % remote_path).stdout 492 493 # Hijack the necessary client functions so that we can take advantage 494 # of the client lib here. 495 # FIXME: find a less hacky way than this 496 original_system_output = utils.system_output 497 original_read_file = common_utils.read_file 498 utils.system_output = __system_output 499 common_utils.read_file = __read_file 500 try: 501 return ([self._NAME] 502 if graphics_utils.has_internal_display() 503 else []) 504 finally: 505 utils.system_output = original_system_output 506 common_utils.read_file = original_read_file 507 508 509class LucidSleepLabel(base_label.BaseLabel): 510 """Label that determines if device has support for lucid sleep.""" 511 512 # TODO(kevcheng): See if we can determine if this label is applicable a 513 # better way (crbug.com/592146). 514 _NAME = 'lucidsleep' 515 LUCID_SLEEP_BOARDS = ['nocturne', 'poppy'] 516 517 def exists(self, host): 518 board = host.get_board().replace(ds_constants.BOARD_PREFIX, '') 519 return board in self.LUCID_SLEEP_BOARDS 520 521 522class HWIDLabel(base_label.StringLabel): 523 """Return all the labels generated from the hwid.""" 524 525 # We leave out _NAME because hwid_lib will generate everything for us. 526 527 def __init__(self): 528 # Grab the key file needed to access the hwid service. 529 self.key_file = global_config.global_config.get_config_value( 530 'CROS', 'HWID_KEY', type=str) 531 532 533 def generate_labels(self, host): 534 hwid_labels = [] 535 hwid = host.run_output('crossystem hwid').strip() 536 hwid_info_list = hwid_lib.get_hwid_info(hwid, hwid_lib.HWID_INFO_LABEL, 537 self.key_file).get('labels', []) 538 539 for hwid_info in hwid_info_list: 540 # If it's a prefix, we'll have: 541 # {'name': prefix_label, 'value': postfix_label} and create 542 # 'prefix_label:postfix_label'; otherwise it'll just be 543 # {'name': label} which should just be 'label'. 544 value = hwid_info.get('value', '') 545 name = hwid_info.get('name', '') 546 # There should always be a name but just in case there is not. 547 if name: 548 hwid_labels.append(name if not value else 549 '%s:%s' % (name, value)) 550 return hwid_labels 551 552 553 def get_all_labels(self): 554 """We need to try all labels as a prefix and as standalone. 555 556 We don't know for sure which labels are prefix labels and which are 557 standalone so we try all of them as both. 558 """ 559 all_hwid_labels = [] 560 try: 561 all_hwid_labels = hwid_lib.get_all_possible_dut_labels( 562 self.key_file) 563 except IOError: 564 logging.error('Can not open key file: %s', self.key_file) 565 except hwid_lib.HwIdException as e: 566 logging.error('hwid service: %s', e) 567 return all_hwid_labels, all_hwid_labels 568 569 570class DetachableBaseLabel(base_label.BaseLabel): 571 """Label indicating if device has detachable keyboard.""" 572 573 _NAME = 'detachablebase' 574 575 def exists(self, host): 576 return host.run('which hammerd', ignore_status=True).exit_status == 0 577 578 579class FingerprintLabel(base_label.BaseLabel): 580 """Label indicating whether device has fingerprint sensor.""" 581 582 _NAME = 'fingerprint' 583 584 def exists(self, host): 585 return host.run('test -c /dev/cros_fp', 586 ignore_status=True).exit_status == 0 587 588 589class ReferenceDesignLabel(base_label.StringPrefixLabel): 590 """Determine the correct reference design label for the device. """ 591 592 _NAME = 'reference_design' 593 594 def __init__(self): 595 self.response = None 596 597 def exists(self, host): 598 self.response = host.run('mosys platform family', ignore_status=True) 599 return self.response.exit_status == 0 600 601 def generate_labels(self, host): 602 if self.exists(host): 603 return [self.response.stdout.strip()] 604 605 606CROS_LABELS = [ 607 AccelsLabel(), 608 ArcLabel(), 609 AudioLoopbackDongleLabel(), 610 BluetoothLabel(), 611 BoardLabel(), 612 ModelLabel(), 613 ChameleonConnectionLabel(), 614 ChameleonLabel(), 615 ChameleonPeripheralsLabel(), 616 common_label.OSLabel(), 617 Cr50Label(), 618 CtsArchLabel(), 619 DetachableBaseLabel(), 620 ECLabel(), 621 FingerprintLabel(), 622 HWIDLabel(), 623 InternalDisplayLabel(), 624 LightSensorLabel(), 625 LucidSleepLabel(), 626 PowerSupplyLabel(), 627 ReferenceDesignLabel(), 628 ServoLabel(), 629 StorageLabel(), 630 VideoGlitchLabel(), 631] 632