1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This class defines the CrosHost Label class.""" 6 7import collections 8import logging 9import os 10import re 11 12import common 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import global_config 16from autotest_lib.client.cros.audio import cras_utils 17from autotest_lib.client.cros.video import constants as video_test_constants 18from autotest_lib.server.cros.dynamic_suite import constants as ds_constants 19from autotest_lib.server.hosts import base_label 20from autotest_lib.server.hosts import common_label 21from autotest_lib.server.hosts import servo_host 22from autotest_lib.site_utils import hwid_lib 23 24# pylint: disable=missing-docstring 25LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board']) 26 27# fallback values if we can't contact the HWID server 28HWID_LABELS_FALLBACK = ['sku', 'phase', 'touchscreen', 'touchpad', 'variant', 'stylus'] 29 30# Repair and Deploy taskName 31REPAIR_TASK_NAME = 'repair' 32DEPLOY_TASK_NAME = 'deploy' 33 34 35def _parse_lsb_output(host): 36 """Parses the LSB output and returns key data points for labeling. 37 38 @param host: Host that the command will be executed against 39 @returns: LsbOutput with the result of parsing the /etc/lsb-release output 40 """ 41 release_info = utils.parse_cmd_output('cat /etc/lsb-release', 42 run_method=host.run) 43 44 unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1' 45 return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD']) 46 47 48class BoardLabel(base_label.StringPrefixLabel): 49 """Determine the correct board label for the device.""" 50 51 _NAME = ds_constants.BOARD_PREFIX.rstrip(':') 52 53 def generate_labels(self, host): 54 # We only want to apply the board labels once, which is when they get 55 # added to the AFE. That way we don't have to worry about the board 56 # label switching on us if the wrong builds get put on the devices. 57 # crbug.com/624207 records one event of the board label switching 58 # unexpectedly on us. 59 board = host.host_info_store.get().board 60 if board: 61 return [board] 62 for label in host._afe_host.labels: 63 if label.startswith(self._NAME + ':'): 64 return [label.split(':')[-1]] 65 66 return [_parse_lsb_output(host).board] 67 68 69class ModelLabel(base_label.StringPrefixLabel): 70 """Determine the correct model label for the device.""" 71 72 _NAME = ds_constants.MODEL_LABEL 73 74 def generate_labels(self, host): 75 # Based on the issue explained in BoardLabel, return the existing 76 # label if it has already been set once. 77 model = host.host_info_store.get().model 78 if model: 79 return [model] 80 for label in host._afe_host.labels: 81 if label.startswith(self._NAME + ':'): 82 return [label.split(':')[-1]] 83 84 lsb_output = _parse_lsb_output(host) 85 model = None 86 87 if lsb_output.unibuild: 88 test_label_cmd = 'cros_config / test-label' 89 result = host.run(command=test_label_cmd, ignore_status=True) 90 if result.exit_status == 0: 91 model = result.stdout.strip() 92 if not model: 93 mosys_cmd = 'mosys platform model' 94 result = host.run(command=mosys_cmd, ignore_status=True) 95 if result.exit_status == 0: 96 model = result.stdout.strip() 97 98 # We need some sort of backwards compatibility for boards that 99 # are not yet supported with mosys and unified builds. 100 # This is necessary so that we can begin changing cbuildbot to take 101 # advantage of the model/board label differentiations for 102 # scheduling, while still retaining backwards compatibility. 103 return [model or lsb_output.board] 104 105 106class DeviceSkuLabel(base_label.StringPrefixLabel): 107 """Determine the correct device_sku label for the device.""" 108 109 _NAME = ds_constants.DEVICE_SKU_LABEL 110 111 def generate_labels(self, host): 112 device_sku = host.host_info_store.get().device_sku 113 if device_sku: 114 return [device_sku] 115 116 mosys_cmd = 'mosys platform sku' 117 result = host.run(command=mosys_cmd, ignore_status=True) 118 if result.exit_status == 0: 119 return [result.stdout.strip()] 120 121 return [] 122 123 def update_for_task(self, task_name): 124 # This label is stored in the lab config, so only deploy tasks update it 125 # or when no task name is mentioned. 126 return task_name in (DEPLOY_TASK_NAME, '') 127 128 129class BrandCodeLabel(base_label.StringPrefixLabel): 130 """Determine the correct brand_code (aka RLZ-code) for the device.""" 131 132 _NAME = ds_constants.BRAND_CODE_LABEL 133 134 def generate_labels(self, host): 135 brand_code = host.host_info_store.get().brand_code 136 if brand_code: 137 return [brand_code] 138 139 cros_config_cmd = 'cros_config / brand-code' 140 result = host.run(command=cros_config_cmd, ignore_status=True) 141 if result.exit_status == 0: 142 return [result.stdout.strip()] 143 144 return [] 145 146 147class BluetoothLabel(base_label.BaseLabel): 148 """Label indicating if bluetooth is detected.""" 149 150 _NAME = 'bluetooth' 151 152 def exists(self, host): 153 # Based on crbug.com/966219, the label is flipping sometimes. 154 # Potentially this is caused by testing itself. 155 # Making this label permanently sticky. 156 info = host.host_info_store.get() 157 for label in info.labels: 158 if label.startswith(self._NAME): 159 return True 160 161 result = host.run('test -d /sys/class/bluetooth/hci0', 162 ignore_status=True) 163 164 return result.exit_status == 0 165 166 167class ECLabel(base_label.BaseLabel): 168 """Label to determine the type of EC on this host.""" 169 170 _NAME = 'ec:cros' 171 172 def exists(self, host): 173 cmd = 'mosys ec info' 174 # The output should look like these, so that the last field should 175 # match our EC version scheme: 176 # 177 # stm | stm32f100 | snow_v1.3.139-375eb9f 178 # ti | Unknown-10de | peppy_v1.5.114-5d52788 179 # 180 # Non-Chrome OS ECs will look like these: 181 # 182 # ENE | KB932 | 00BE107A00 183 # ite | it8518 | 3.08 184 # 185 # And some systems don't have ECs at all (Lumpy, for example). 186 regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$' 187 188 ecinfo = host.run(command=cmd, ignore_status=True) 189 if ecinfo.exit_status == 0: 190 res = re.search(regexp, ecinfo.stdout) 191 if res: 192 logging.info("EC version is %s", res.groups()[0]) 193 return True 194 logging.info("%s got: %s", cmd, ecinfo.stdout) 195 # Has an EC, but it's not a Chrome OS EC 196 logging.info("%s exited with status %d", cmd, ecinfo.exit_status) 197 return False 198 199 200class Cr50Label(base_label.StringPrefixLabel): 201 """Label indicating the cr50 image type.""" 202 203 _NAME = 'cr50' 204 205 def __init__(self): 206 self.ver = None 207 208 def exists(self, host): 209 # Make sure the gsctool version command runs ok 210 self.ver = host.run('gsctool -a -f', ignore_status=True) 211 return self.ver.exit_status == 0 212 213 def _get_version(self, region): 214 """Get the version number of the given region""" 215 return re.search(region + ' (\d+\.\d+\.\d+)', self.ver.stdout).group(1) 216 217 def generate_labels(self, host): 218 # Check the major version to determine prePVT vs PVT 219 version = self._get_version('RW') 220 major_version = int(version.split('.')[1]) 221 # PVT images have a odd major version prePVT have even 222 return ['pvt' if (major_version % 2) else 'prepvt'] 223 224 225class Cr50RWKeyidLabel(Cr50Label): 226 """Label indicating the cr50 RW version.""" 227 _REGION = 'RW' 228 _NAME = 'cr50-rw-keyid' 229 230 def _get_keyid_info(self, region): 231 """Get the keyid of the given region.""" 232 match = re.search('keyids:.*%s (\S+)' % region, self.ver.stdout) 233 keyid = match.group(1).rstrip(',') 234 is_prod = int(keyid, 16) & (1 << 2) 235 return [keyid, 'prod' if is_prod else 'dev'] 236 237 def generate_labels(self, host): 238 """Get the key type.""" 239 return self._get_keyid_info(self._REGION) 240 241 242class Cr50ROKeyidLabel(Cr50RWKeyidLabel): 243 """Label indicating the RO key type.""" 244 _REGION = 'RO' 245 _NAME = 'cr50-ro-keyid' 246 247 248class Cr50RWVersionLabel(Cr50Label): 249 """Label indicating the cr50 RW version.""" 250 _REGION = 'RW' 251 _NAME = 'cr50-rw-version' 252 253 def generate_labels(self, host): 254 """Get the version and key type""" 255 return [self._get_version(self._REGION)] 256 257 258class Cr50ROVersionLabel(Cr50RWVersionLabel): 259 """Label indicating the RO version.""" 260 _REGION = 'RO' 261 _NAME = 'cr50-ro-version' 262 263 264class AccelsLabel(base_label.BaseLabel): 265 """Determine the type of accelerometers on this host.""" 266 267 _NAME = 'accel:cros-ec' 268 269 def exists(self, host): 270 # Check to make sure we have ectool 271 rv = host.run('which ectool', ignore_status=True) 272 if rv.exit_status: 273 logging.info("No ectool cmd found; assuming no EC accelerometers") 274 return False 275 276 # Check that the EC supports the motionsense command 277 rv = host.run('ectool motionsense', ignore_status=True) 278 if rv.exit_status: 279 logging.info("EC does not support motionsense command; " 280 "assuming no EC accelerometers") 281 return False 282 283 # Check that EC motion sensors are active 284 active = host.run('ectool motionsense active').stdout.split('\n') 285 if active[0] == "0": 286 logging.info("Motion sense inactive; assuming no EC accelerometers") 287 return False 288 289 logging.info("EC accelerometers found") 290 return True 291 292 293class ChameleonLabel(base_label.BaseLabel): 294 """Determine if a Chameleon is connected to this host.""" 295 296 _NAME = 'chameleon' 297 298 def exists(self, host): 299 # See crbug.com/1004500#2 for details. 300 # https://chromium.googlesource.com/chromiumos/third_party/autotest/+ 301 # /refs/heads/master/server/hosts/cros_host.py#335 shows that 302 # _chameleon_host_list is not reliable. 303 has_chameleon = len(host.chameleon_list) > 0 304 # TODO(crbug.com/995900) -- debug why chameleon label is flipping 305 try: 306 logging.info("has_chameleon %s", has_chameleon) 307 logging.info("chameleon_host_list %s", 308 getattr(host, "_chameleon_host_list", "NO_ATTRIBUTE")) 309 logging.info("chameleon_list %s", 310 getattr(host, "chameleon_list", "NO_ATTRIBUTE")) 311 logging.info("multi_chameleon %s", 312 getattr(host, "multi_chameleon", "NO_ATTRIBUTE")) 313 except: 314 pass 315 return has_chameleon 316 317 def update_for_task(self, task_name): 318 # This label is stored in the state config, so only repair tasks update 319 # it or when no task name is mentioned. 320 return task_name in (REPAIR_TASK_NAME, '') 321 322 323class ChameleonConnectionLabel(base_label.StringPrefixLabel): 324 """Return the Chameleon connection label.""" 325 326 _NAME = 'chameleon' 327 328 def exists(self, host): 329 return len(host._chameleon_host_list) > 0 330 331 332 def generate_labels(self, host): 333 return [chameleon.get_label() for chameleon in host.chameleon_list] 334 335 def update_for_task(self, task_name): 336 # This label is stored in the lab config, so only deploy tasks update it 337 # or when no task name is mentioned. 338 return task_name in (DEPLOY_TASK_NAME, '') 339 340 341class ChameleonPeripheralsLabel(base_label.StringPrefixLabel): 342 """Return the Chameleon peripherals labels. 343 344 The 'chameleon:bt_hid' label is applied if the bluetooth 345 classic hid device, i.e, RN-42 emulation kit, is detected. 346 347 Any peripherals plugged into the chameleon board would be 348 detected and applied proper labels in this class. 349 """ 350 351 _NAME = 'chameleon' 352 353 def exists(self, host): 354 return len(host._chameleon_host_list) > 0 355 356 357 def generate_labels(self, host): 358 labels_list = [] 359 360 for chameleon, chameleon_host in \ 361 zip(host.chameleon_list, host._chameleon_host_list): 362 labels = [] 363 try: 364 bt_hid_device = chameleon.get_bluetooth_hid_mouse() 365 if bt_hid_device.CheckSerialConnection(): 366 labels.append('bt_hid') 367 except: 368 logging.error('Error with initializing bt_hid_mouse on ' 369 'chameleon %s', chameleon_host.hostname) 370 371 try: 372 ble_hid_device = chameleon.get_ble_mouse() 373 if ble_hid_device.CheckSerialConnection(): 374 labels.append('bt_ble_hid') 375 except: 376 logging.error('Error with initializing ble_hid_mouse on ' 377 'chameleon %s', chameleon_host.hostname) 378 379 try: 380 bt_a2dp_sink = chameleon.get_bluetooth_a2dp_sink() 381 if bt_a2dp_sink.CheckSerialConnection(): 382 labels.append('bt_a2dp_sink') 383 except: 384 logging.error('Error with initializing bt_a2dp_sink on ' 385 'chameleon %s', chameleon_host.hostname) 386 387 try: 388 bt_base_device = chameleon.get_bluetooth_base() 389 if bt_base_device.IsDetected(): 390 labels.append('bt_base') 391 except: 392 logging.error('Error in detecting bt_base on ' 393 'chameleon %s', chameleon_host.hostname) 394 395 if labels != []: 396 labels.append('bt_peer') 397 398 if host.multi_chameleon: 399 labels_list.append(labels) 400 else: 401 labels_list.extend(labels) 402 403 404 logging.info('Bluetooth labels are %s', labels_list) 405 return labels_list 406 407 def update_for_task(self, task_name): 408 # This label is stored in the lab config, so only deploy tasks update it 409 # or when no task name is mentioned. 410 return task_name in (DEPLOY_TASK_NAME, '') 411 412 413class AudioLoopbackDongleLabel(base_label.BaseLabel): 414 """Return the label if an audio loopback dongle is plugged in.""" 415 416 _NAME = 'audio_loopback_dongle' 417 418 def exists(self, host): 419 # Based on crbug.com/991285, AudioLoopbackDongle sometimes flips. 420 # Ensure that AudioLoopbackDongle.exists returns True 421 # forever, after it returns True *once*. 422 if self._cached_exists(host): 423 # If the current state is True, return it, don't run the command on 424 # the DUT and potentially flip the state. 425 return True 426 # If the current state is not True, run the command on 427 # the DUT. The new state will be set to whatever the command 428 # produces. 429 return self._host_run_exists(host) 430 431 def _cached_exists(self, host): 432 """Get the state of AudioLoopbackDongle in the data store""" 433 info = host.host_info_store.get() 434 for label in info.labels: 435 if label.startswith(self._NAME): 436 return True 437 return False 438 439 def _host_run_exists(self, host): 440 """Detect presence of audio_loopback_dongle by physically 441 running a command on the DUT.""" 442 nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(), 443 ignore_status=True).stdout 444 if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and 445 cras_utils.node_type_is_plugged('MIC', nodes_info)): 446 return True 447 return False 448 449 def update_for_task(self, task_name): 450 # This label is stored in the state config, so only repair tasks update 451 # it or when no task name is mentioned. 452 return task_name in (REPAIR_TASK_NAME, '') 453 454 455class PowerSupplyLabel(base_label.StringPrefixLabel): 456 """ 457 Return the label describing the power supply type. 458 459 Labels representing this host's power supply. 460 * `power:battery` when the device has a battery intended for 461 extended use 462 * `power:AC_primary` when the device has a battery not intended 463 for extended use (for moving the machine, etc) 464 * `power:AC_only` when the device has no battery at all. 465 """ 466 467 _NAME = 'power' 468 469 def __init__(self): 470 self.psu_cmd_result = None 471 472 473 def exists(self, host): 474 self.psu_cmd_result = host.run(command='mosys psu type', 475 ignore_status=True) 476 return self.psu_cmd_result.stdout.strip() != 'unknown' 477 478 479 def generate_labels(self, host): 480 if self.psu_cmd_result.exit_status: 481 # The psu command for mosys is not included for all platforms. The 482 # assumption is that the device will have a battery if the command 483 # is not found. 484 return ['battery'] 485 return [self.psu_cmd_result.stdout.strip()] 486 487 488class StorageLabel(base_label.StringPrefixLabel): 489 """ 490 Return the label describing the storage type. 491 492 Determine if the internal device is SCSI or dw_mmc device. 493 Then check that it is SSD or HDD or eMMC or something else. 494 495 Labels representing this host's internal device type: 496 * `storage:ssd` when internal device is solid state drive 497 * `storage:hdd` when internal device is hard disk drive 498 * `storage:mmc` when internal device is mmc drive 499 * `storage:nvme` when internal device is NVMe drive 500 * `storage:ufs` when internal device is ufs drive 501 * None When internal device is something else or 502 when we are unable to determine the type 503 """ 504 505 _NAME = 'storage' 506 507 def __init__(self): 508 self.type_str = '' 509 510 511 def exists(self, host): 512 # The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi 513 rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;', 514 '. /usr/share/misc/chromeos-common.sh;', 515 'load_base_vars;', 516 'get_fixed_dst_drive']) 517 rootdev = host.run(command=rootdev_cmd, ignore_status=True) 518 if rootdev.exit_status: 519 logging.info("Fail to run %s", rootdev_cmd) 520 return False 521 rootdev_str = rootdev.stdout.strip() 522 523 if not rootdev_str: 524 return False 525 526 rootdev_base = os.path.basename(rootdev_str) 527 528 mmc_pattern = '/dev/mmcblk[0-9]' 529 if re.match(mmc_pattern, rootdev_str): 530 # Use type to determine if the internal device is eMMC or somthing 531 # else. We can assume that MMC is always an internal device. 532 type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base 533 type = host.run(command=type_cmd, ignore_status=True) 534 if type.exit_status: 535 logging.info("Fail to run %s", type_cmd) 536 return False 537 type_str = type.stdout.strip() 538 539 if type_str == 'MMC': 540 self.type_str = 'mmc' 541 return True 542 543 scsi_pattern = '/dev/sd[a-z]+' 544 if re.match(scsi_pattern, rootdev.stdout): 545 # Read symlink for /sys/block/sd* to determine if the internal 546 # device is connected via ata or usb. 547 link_cmd = 'readlink /sys/block/%s' % rootdev_base 548 link = host.run(command=link_cmd, ignore_status=True) 549 if link.exit_status: 550 logging.info("Fail to run %s", link_cmd) 551 return False 552 link_str = link.stdout.strip() 553 if 'usb' in link_str: 554 return False 555 elif 'ufs' in link_str: 556 self.type_str = 'ufs' 557 return True 558 559 # Read rotation to determine if the internal device is ssd or hdd. 560 rotate_cmd = str('cat /sys/block/%s/queue/rotational' 561 % rootdev_base) 562 rotate = host.run(command=rotate_cmd, ignore_status=True) 563 if rotate.exit_status: 564 logging.info("Fail to run %s", rotate_cmd) 565 return False 566 rotate_str = rotate.stdout.strip() 567 568 rotate_dict = {'0':'ssd', '1':'hdd'} 569 self.type_str = rotate_dict.get(rotate_str) 570 return True 571 572 nvme_pattern = '/dev/nvme[0-9]+n[0-9]+' 573 if re.match(nvme_pattern, rootdev_str): 574 self.type_str = 'nvme' 575 return True 576 577 # All other internal device / error case will always fall here 578 return False 579 580 def generate_labels(self, host): 581 return [self.type_str] 582 583 584class ServoLabel(base_label.BaseLabel): 585 """ 586 Label servo is applying if a servo is present. 587 Label servo_state present always. 588 """ 589 590 _NAME_OLD = 'servo' 591 _NAME = 'servo_state' 592 _NAME_WORKING = 'servo_state:WORKING' 593 _NAME_BROKEN = 'servo_state:BROKEN' 594 595 def get(self, host): 596 if self.exists(host): 597 return [self._NAME_OLD, self._NAME_WORKING] 598 return [self._NAME_BROKEN] 599 600 def get_all_labels(self): 601 return set([self._NAME]), set([self._NAME_OLD]) 602 603 def exists(self, host): 604 # Based on crbug.com/995900, Servo sometimes flips. 605 # Ensure that ServoLabel.exists returns True 606 # forever, after it returns True *once*. 607 if self._cached_exists(host): 608 # If the current state is True, return it, don't run the command on 609 # the DUT and potentially flip the state. 610 return True 611 # If the current state is not True, run the command on 612 # the DUT. The new state will be set to whatever the command 613 # produces. 614 return self._host_run_exists(host) 615 616 def _cached_exists(self, host): 617 """Get the state of Servo in the data store""" 618 info = host.host_info_store.get() 619 for label in info.labels: 620 if label.startswith(self._NAME): 621 if label.startswith(self._NAME_WORKING): 622 return True 623 elif label.startswith(self._NAME_OLD): 624 return True 625 return False 626 627 def _host_run_exists(self, host): 628 """ 629 Check if the servo label should apply to the host or not. 630 631 @returns True if a servo host is detected, False otherwise. 632 """ 633 servo_host_hostname = None 634 servo_args = servo_host.get_servo_args_for_host(host) 635 if servo_args: 636 servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR) 637 return (servo_host_hostname is not None 638 and servo_host.servo_host_is_up(servo_host_hostname)) 639 640 def update_for_task(self, task_name): 641 # This label is stored in the state config, so only repair tasks update 642 # it or when no task name is mentioned. 643 return task_name in (REPAIR_TASK_NAME, '') 644 645 646class ArcLabel(base_label.BaseLabel): 647 """Label indicates if host has ARC support.""" 648 649 _NAME = 'arc' 650 651 @base_label.forever_exists_decorate 652 def exists(self, host): 653 return 0 == host.run( 654 'grep CHROMEOS_ARC_VERSION /etc/lsb-release', 655 ignore_status=True).exit_status 656 657 658class CtsArchLabel(base_label.StringLabel): 659 """Labels to determine the abi of the CTS bundle (arm or x86 only).""" 660 661 _NAME = ['cts_abi_arm', 'cts_abi_x86', 'cts_cpu_arm', 'cts_cpu_x86'] 662 663 def _get_cts_abis(self, arch): 664 """Return supported CTS ABIs. 665 666 @return List of supported CTS bundle ABIs. 667 """ 668 cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']} 669 return cts_abis.get(arch, []) 670 671 def _get_cts_cpus(self, arch): 672 """Return supported CTS native CPUs. 673 674 This is needed for CTS_Instant scheduling. 675 @return List of supported CTS native CPUs. 676 """ 677 cts_cpus = {'x86_64': ['x86'], 'arm': ['arm']} 678 return cts_cpus.get(arch, []) 679 680 def generate_labels(self, host): 681 cpu_arch = host.get_cpu_arch() 682 abi_labels = ['cts_abi_' + abi for abi in self._get_cts_abis(cpu_arch)] 683 cpu_labels = ['cts_cpu_' + cpu for cpu in self._get_cts_cpus(cpu_arch)] 684 return abi_labels + cpu_labels 685 686 687class VideoGlitchLabel(base_label.BaseLabel): 688 """Label indicates if host supports video glitch detection tests.""" 689 690 _NAME = 'video_glitch_detection' 691 692 def exists(self, host): 693 board = host.get_board().replace(ds_constants.BOARD_PREFIX, '') 694 695 return board in video_test_constants.SUPPORTED_BOARDS 696 697 698class InternalDisplayLabel(base_label.StringLabel): 699 """Label that determines if the device has an internal display.""" 700 701 _NAME = 'internal_display' 702 703 def generate_labels(self, host): 704 from autotest_lib.client.cros.graphics import graphics_utils 705 from autotest_lib.client.common_lib import utils as common_utils 706 707 def __system_output(cmd): 708 return host.run(cmd).stdout 709 710 def __read_file(remote_path): 711 return host.run('cat %s' % remote_path).stdout 712 713 # Hijack the necessary client functions so that we can take advantage 714 # of the client lib here. 715 # FIXME: find a less hacky way than this 716 original_system_output = utils.system_output 717 original_read_file = common_utils.read_file 718 utils.system_output = __system_output 719 common_utils.read_file = __read_file 720 try: 721 return ([self._NAME] 722 if graphics_utils.has_internal_display() 723 else []) 724 finally: 725 utils.system_output = original_system_output 726 common_utils.read_file = original_read_file 727 728 729class LucidSleepLabel(base_label.BaseLabel): 730 """Label that determines if device has support for lucid sleep.""" 731 732 # TODO(kevcheng): See if we can determine if this label is applicable a 733 # better way (crbug.com/592146). 734 _NAME = 'lucidsleep' 735 LUCID_SLEEP_BOARDS = ['nocturne', 'poppy'] 736 737 def exists(self, host): 738 board = host.get_board().replace(ds_constants.BOARD_PREFIX, '') 739 return board in self.LUCID_SLEEP_BOARDS 740 741 742def _parse_hwid_labels(hwid_info_list): 743 if len(hwid_info_list) == 0: 744 return hwid_info_list 745 746 res = [] 747 # See crbug.com/997816#c7 for details of two potential formats of returns 748 # from HWID server. 749 if isinstance(hwid_info_list[0], dict): 750 # Format of hwid_info: 751 # [{u'name': u'sku', u'value': u'xxx'}, ..., ] 752 for hwid_info in hwid_info_list: 753 value = hwid_info.get('value', '') 754 name = hwid_info.get('name', '') 755 # There should always be a name but just in case there is not. 756 if name: 757 new_label = name if not value else '%s:%s' % (name, value) 758 res.append(new_label) 759 else: 760 # Format of hwid_info: 761 # [<DUTLabel name: 'sku' value: u'xxx'>, ..., ] 762 for hwid_info in hwid_info_list: 763 new_label = str(hwid_info) 764 logging.info('processing hwid label: %s', new_label) 765 res.append(new_label) 766 767 return res 768 769 770class HWIDLabel(base_label.StringLabel): 771 """Return all the labels generated from the hwid.""" 772 773 # We leave out _NAME because hwid_lib will generate everything for us. 774 775 def __init__(self): 776 # Grab the key file needed to access the hwid service. 777 self.key_file = global_config.global_config.get_config_value( 778 'CROS', 'HWID_KEY', type=str) 779 780 781 @staticmethod 782 def _merge_hwid_label_lists(new, old): 783 """merge a list of old and new values for hwid_labels. 784 preferring new values if available 785 786 @returns: list of labels""" 787 # TODO(gregorynisbet): what is the appropriate way to merge 788 # old and new information? 789 retained = set(x for x in old) 790 for label in new: 791 key, sep, value = label.partition(':') 792 # If we have a key-value key such as variant:aaa, 793 # then we remove all the old labels with the same key. 794 if sep: 795 retained = set(x for x in retained if (not x.startswith(key + ':'))) 796 return list(sorted(retained.union(new))) 797 798 799 def _hwid_label_names(self): 800 """get the labels that hwid_lib controls. 801 802 @returns: hwid_labels 803 """ 804 all_hwid_labels, _ = self.get_all_labels() 805 # If and only if get_all_labels was unsuccessful, 806 # it will return a falsey value. 807 out = all_hwid_labels or HWID_LABELS_FALLBACK 808 809 # TODO(gregorynisbet): remove this 810 # TODO(crbug.com/999785) 811 if "sku" not in out: 812 logging.info("sku-less label names %s", out) 813 814 return out 815 816 817 def _old_label_values(self, host): 818 """get the hwid_lib labels on previous run 819 820 @returns: hwid_labels""" 821 out = [] 822 info = host.host_info_store.get() 823 for hwid_label in self._hwid_label_names(): 824 for label in info.labels: 825 # NOTE: we want *all* the labels starting 826 # with this prefix. 827 if label.startswith(hwid_label): 828 out.append(label) 829 return out 830 831 832 def generate_labels(self, host): 833 # use previous values as default 834 old_hwid_labels = self._old_label_values(host) 835 logging.info("old_hwid_labels: %r", old_hwid_labels) 836 hwid = host.run_output('crossystem hwid').strip() 837 hwid_info_list = [] 838 try: 839 hwid_info_response = hwid_lib.get_hwid_info( 840 hwid=hwid, 841 info_type=hwid_lib.HWID_INFO_LABEL, 842 key_file=self.key_file, 843 ) 844 logging.info("hwid_info_response: %r", hwid_info_response) 845 hwid_info_list = hwid_info_response.get('labels', []) 846 except hwid_lib.HwIdException as e: 847 logging.info("HwIdException: %s", e) 848 849 new_hwid_labels = _parse_hwid_labels(hwid_info_list) 850 logging.info("new HWID labels: %r", new_hwid_labels) 851 852 return HWIDLabel._merge_hwid_label_lists( 853 old=old_hwid_labels, 854 new=new_hwid_labels, 855 ) 856 857 858 def get_all_labels(self): 859 """We need to try all labels as a prefix and as standalone. 860 861 We don't know for sure which labels are prefix labels and which are 862 standalone so we try all of them as both. 863 """ 864 all_hwid_labels = [] 865 try: 866 all_hwid_labels = hwid_lib.get_all_possible_dut_labels( 867 self.key_file) 868 except IOError: 869 logging.error('Can not open key file: %s', self.key_file) 870 except hwid_lib.HwIdException as e: 871 logging.error('hwid service: %s', e) 872 return all_hwid_labels, all_hwid_labels 873 874 875class DetachableBaseLabel(base_label.BaseLabel): 876 """Label indicating if device has detachable keyboard.""" 877 878 _NAME = 'detachablebase' 879 880 def exists(self, host): 881 return host.run('which hammerd', ignore_status=True).exit_status == 0 882 883 884class FingerprintLabel(base_label.BaseLabel): 885 """Label indicating whether device has fingerprint sensor.""" 886 887 _NAME = 'fingerprint' 888 889 def exists(self, host): 890 return host.run('test -c /dev/cros_fp', 891 ignore_status=True).exit_status == 0 892 893 894class ReferenceDesignLabel(base_label.StringPrefixLabel): 895 """Determine the correct reference design label for the device. """ 896 897 _NAME = 'reference_design' 898 899 def __init__(self): 900 self.response = None 901 902 def exists(self, host): 903 self.response = host.run('mosys platform family', ignore_status=True) 904 return self.response.exit_status == 0 905 906 def generate_labels(self, host): 907 if self.exists(host): 908 return [self.response.stdout.strip()] 909 910 911CROS_LABELS = [ 912 AudioLoopbackDongleLabel(), #STATECONFIG 913 ChameleonConnectionLabel(), #LABCONFIG 914 ChameleonLabel(), #STATECONFIG 915 ChameleonPeripheralsLabel(), #LABCONFIG 916 common_label.OSLabel(), 917 DeviceSkuLabel(), #LABCONFIG 918 HWIDLabel(), 919 ServoLabel(), #STATECONFIG 920] 921 922LABSTATION_LABELS = [ 923 common_label.OSLabel(), 924] 925