1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""This class defines the CrosHost Label class.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12import collections 13import logging 14import re 15 16import common 17 18from autotest_lib.client.bin import utils 19from autotest_lib.client.common_lib import global_config 20from autotest_lib.client.cros.audio import cras_utils 21from autotest_lib.server.cros.dynamic_suite import constants as ds_constants 22from autotest_lib.server.hosts import base_label 23from autotest_lib.server.hosts import common_label 24from autotest_lib.server.hosts import servo_constants 25from autotest_lib.site_utils import hwid_lib 26from six.moves import zip 27 28# pylint: disable=missing-docstring 29LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board']) 30 31# fallback values if we can't contact the HWID server 32HWID_LABELS_FALLBACK = ['sku', 'phase', 'touchscreen', 'touchpad', 'variant', 'stylus'] 33 34# Repair and Deploy taskName 35REPAIR_TASK_NAME = 'repair' 36DEPLOY_TASK_NAME = 'deploy' 37 38 39def _parse_lsb_output(host): 40 """Parses the LSB output and returns key data points for labeling. 41 42 @param host: Host that the command will be executed against 43 @returns: LsbOutput with the result of parsing the /etc/lsb-release output 44 """ 45 release_info = utils.parse_cmd_output('cat /etc/lsb-release', 46 run_method=host.run) 47 48 unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1' 49 return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD']) 50 51 52class DeviceSkuLabel(base_label.StringPrefixLabel): 53 """Determine the correct device_sku label for the device.""" 54 55 _NAME = ds_constants.DEVICE_SKU_LABEL 56 57 def generate_labels(self, host): 58 device_sku = host.host_info_store.get().device_sku 59 if device_sku: 60 return [device_sku] 61 62 mosys_cmd = 'mosys platform sku' 63 result = host.run(command=mosys_cmd, ignore_status=True) 64 if result.exit_status == 0: 65 return [result.stdout.strip()] 66 67 return [] 68 69 def update_for_task(self, task_name): 70 # This label is stored in the lab config. 71 return task_name in (DEPLOY_TASK_NAME, REPAIR_TASK_NAME, '') 72 73 74class BrandCodeLabel(base_label.StringPrefixLabel): 75 """Determine the correct brand_code (aka RLZ-code) for the device.""" 76 77 _NAME = ds_constants.BRAND_CODE_LABEL 78 79 def generate_labels(self, host): 80 brand_code = host.host_info_store.get().brand_code 81 if brand_code: 82 return [brand_code] 83 84 cros_config_cmd = 'cros_config / brand-code' 85 result = host.run(command=cros_config_cmd, ignore_status=True) 86 if result.exit_status == 0: 87 return [result.stdout.strip()] 88 89 return [] 90 91 92class BluetoothPeerLabel(base_label.StringPrefixLabel): 93 """Return the Bluetooth peer labels. 94 95 working_bluetooth_btpeer label is applied if a Raspberry Pi Bluetooth peer 96 is detected.There can be up to 4 Bluetooth peers. Labels 97 working_bluetooth_btpeer:[1-4] will be assigned depending on the number of 98 peers present. 99 100 """ 101 102 _NAME = 'working_bluetooth_btpeer' 103 104 def exists(self, host): 105 return len(host._btpeer_host_list) > 0 106 107 def generate_labels(self, host): 108 labels_list = [] 109 count = 1 110 111 for (btpeer, btpeer_host) in \ 112 zip(host.btpeer_list, host._btpeer_host_list): 113 try: 114 # Initialize one device type to make sure the peer is working 115 bt_hid_device = btpeer.get_bluetooth_hid_mouse() 116 if bt_hid_device.CheckSerialConnection(): 117 labels_list.append(str(count)) 118 count += 1 119 except Exception as e: 120 logging.error('Error with initializing bt_hid_mouse on ' 121 'btpeer %s %s', btpeer_host.hostname, e) 122 123 logging.info('Bluetooth Peer labels are %s', labels_list) 124 return labels_list 125 126 def update_for_task(self, task_name): 127 # This label is stored in the state config, so only repair tasks update 128 # it or when no task name is mentioned. 129 return task_name in (REPAIR_TASK_NAME, '') 130 131 132class Cr50Label(base_label.StringPrefixLabel): 133 """Label indicating the cr50 image type.""" 134 135 _NAME = 'cr50' 136 137 def __init__(self): 138 self.ver = None 139 140 def exists(self, host): 141 # Make sure the gsctool version command runs ok 142 self.ver = host.run('gsctool -a -f', ignore_status=True) 143 return self.ver.exit_status == 0 144 145 def _get_version(self, region): 146 """Get the version number of the given region""" 147 return re.search(region + ' (\d+\.\d+\.\d+)', self.ver.stdout).group(1) 148 149 def generate_labels(self, host): 150 # Check the major version to determine prePVT vs PVT 151 version = self._get_version('RW') 152 major_version = int(version.split('.')[1]) 153 # PVT images have a odd major version prePVT have even 154 return ['pvt' if (major_version % 2) else 'prepvt'] 155 156 def update_for_task(self, task_name): 157 # This label is stored in the state config, so only repair tasks update 158 # it or when no task name is mentioned. 159 return task_name in (REPAIR_TASK_NAME, '') 160 161 162class Cr50RWKeyidLabel(Cr50Label): 163 """Label indicating the cr50 RW version.""" 164 _REGION = 'RW' 165 _NAME = 'cr50-rw-keyid' 166 167 def _get_keyid_info(self, region): 168 """Get the keyid of the given region.""" 169 match = re.search('keyids:.*%s (\S+)' % region, self.ver.stdout) 170 keyid = match.group(1).rstrip(',') 171 is_prod = int(keyid, 16) & (1 << 2) 172 return [keyid, 'prod' if is_prod else 'dev'] 173 174 def generate_labels(self, host): 175 """Get the key type.""" 176 return self._get_keyid_info(self._REGION) 177 178 179class Cr50ROKeyidLabel(Cr50RWKeyidLabel): 180 """Label indicating the RO key type.""" 181 _REGION = 'RO' 182 _NAME = 'cr50-ro-keyid' 183 184 185class ChameleonLabel(base_label.BaseLabel): 186 """Determine if a Chameleon is connected to this host.""" 187 188 _NAME = 'chameleon' 189 190 def exists(self, host): 191 # See crbug.com/1004500#2 for details. 192 has_chameleon = host._chameleon_host is not None 193 # TODO(crbug.com/995900) -- debug why chameleon label is flipping 194 try: 195 logging.info("has_chameleon %s", has_chameleon) 196 logging.info("_chameleon_host %s", 197 getattr(host, "_chameleon_host", "NO_ATTRIBUTE")) 198 logging.info("chameleon %s", 199 getattr(host, "chameleon", "NO_ATTRIBUTE")) 200 except: 201 pass 202 return has_chameleon 203 204 def update_for_task(self, task_name): 205 # This label is stored in the state config, so only repair tasks update 206 # it or when no task name is mentioned. 207 return task_name in (REPAIR_TASK_NAME, '') 208 209 210class ChameleonConnectionLabel(base_label.StringPrefixLabel): 211 """Return the Chameleon connection label.""" 212 213 _NAME = 'chameleon' 214 215 def exists(self, host): 216 return host._chameleon_host is not None 217 218 def generate_labels(self, host): 219 return [host.chameleon.get_label()] 220 221 def update_for_task(self, task_name): 222 # This label is stored in the lab config, so only deploy tasks update it 223 # or when no task name is mentioned. 224 return task_name in (DEPLOY_TASK_NAME, '') 225 226 227class AudioLoopbackDongleLabel(base_label.BaseLabel): 228 """Return the label if an audio loopback dongle is plugged in.""" 229 230 _NAME = 'audio_loopback_dongle' 231 232 def exists(self, host): 233 # Based on crbug.com/991285, AudioLoopbackDongle sometimes flips. 234 # Ensure that AudioLoopbackDongle.exists returns True 235 # forever, after it returns True *once*. 236 if self._cached_exists(host): 237 # If the current state is True, return it, don't run the command on 238 # the DUT and potentially flip the state. 239 return True 240 # If the current state is not True, run the command on 241 # the DUT. The new state will be set to whatever the command 242 # produces. 243 return self._host_run_exists(host) 244 245 def _cached_exists(self, host): 246 """Get the state of AudioLoopbackDongle in the data store""" 247 info = host.host_info_store.get() 248 for label in info.labels: 249 if label.startswith(self._NAME): 250 return True 251 return False 252 253 def _host_run_exists(self, host): 254 """Detect presence of audio_loopback_dongle by physically 255 running a command on the DUT.""" 256 nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(), 257 ignore_status=True).stdout 258 if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and 259 cras_utils.node_type_is_plugged('MIC', nodes_info)): 260 return True 261 return False 262 263 def update_for_task(self, task_name): 264 # This label is stored in the state config, so only repair tasks update 265 # it or when no task name is mentioned. 266 return task_name in (REPAIR_TASK_NAME, '') 267 268 269class ServoTypeLabel(base_label.StringPrefixLabel): 270 _NAME = servo_constants.SERVO_TYPE_LABEL_PREFIX 271 272 def generate_labels(self, host): 273 info = host.host_info_store.get() 274 275 servo_type = self._get_from_labels(info) 276 if servo_type != '': 277 logging.info("Using servo_type: %s from cache!", servo_type) 278 return [servo_type] 279 280 if host.servo is not None: 281 try: 282 servo_type = host.servo.get_servo_version() 283 if servo_type != '': 284 return [servo_type] 285 logging.warning('Cannot collect servo_type from servo' 286 ' by `dut-control servo_type`! Please file a bug' 287 ' and inform infra team as we are not expected ' 288 ' to reach this point.') 289 except Exception as e: 290 # We don't want fail the label and break DUTs here just 291 # because of servo issue. 292 logging.error("Failed to update servo_type, %s", str(e)) 293 return [] 294 295 def _get_from_labels(self, info): 296 prefix = self._NAME + ':' 297 for label in info.labels: 298 if label.startswith(prefix): 299 suffix_length = len(prefix) 300 return label[suffix_length:] 301 return '' 302 303 def update_for_task(self, task_name): 304 # This label is stored in the lab config, 305 # only deploy and repair tasks update it 306 # or when no task name is mentioned. 307 return task_name in (DEPLOY_TASK_NAME, '') 308 309 310def _parse_hwid_labels(hwid_info_list): 311 if len(hwid_info_list) == 0: 312 return hwid_info_list 313 314 res = [] 315 # See crbug.com/997816#c7 for details of two potential formats of returns 316 # from HWID server. 317 if isinstance(hwid_info_list[0], dict): 318 # Format of hwid_info: 319 # [{u'name': u'sku', u'value': u'xxx'}, ..., ] 320 for hwid_info in hwid_info_list: 321 value = hwid_info.get('value', '') 322 name = hwid_info.get('name', '') 323 # There should always be a name but just in case there is not. 324 if name: 325 new_label = name if not value else '%s:%s' % (name, value) 326 res.append(new_label) 327 else: 328 # Format of hwid_info: 329 # [<DUTLabel name: 'sku' value: u'xxx'>, ..., ] 330 for hwid_info in hwid_info_list: 331 new_label = str(hwid_info) 332 logging.info('processing hwid label: %s', new_label) 333 res.append(new_label) 334 335 return res 336 337 338class HWIDLabel(base_label.StringLabel): 339 """Return all the labels generated from the hwid.""" 340 341 # We leave out _NAME because hwid_lib will generate everything for us. 342 343 def __init__(self): 344 # Grab the key file needed to access the hwid service. 345 self.key_file = global_config.global_config.get_config_value( 346 'CROS', 'HWID_KEY', type=str) 347 348 349 @staticmethod 350 def _merge_hwid_label_lists(new, old): 351 """merge a list of old and new values for hwid_labels. 352 preferring new values if available 353 354 @returns: list of labels""" 355 # TODO(gregorynisbet): what is the appropriate way to merge 356 # old and new information? 357 retained = set(x for x in old) 358 for label in new: 359 key, sep, value = label.partition(':') 360 # If we have a key-value key such as variant:aaa, 361 # then we remove all the old labels with the same key. 362 if sep: 363 retained = set(x for x in retained if (not x.startswith(key + ':'))) 364 return list(sorted(retained.union(new))) 365 366 367 def _hwid_label_names(self): 368 """get the labels that hwid_lib controls. 369 370 @returns: hwid_labels 371 """ 372 all_hwid_labels, _ = self.get_all_labels() 373 # If and only if get_all_labels was unsuccessful, 374 # it will return a falsey value. 375 out = all_hwid_labels or HWID_LABELS_FALLBACK 376 377 # TODO(gregorynisbet): remove this 378 # TODO(crbug.com/999785) 379 if "sku" not in out: 380 logging.info("sku-less label names %s", out) 381 382 return out 383 384 385 def _old_label_values(self, host): 386 """get the hwid_lib labels on previous run 387 388 @returns: hwid_labels""" 389 out = [] 390 info = host.host_info_store.get() 391 for hwid_label in self._hwid_label_names(): 392 for label in info.labels: 393 # NOTE: we want *all* the labels starting 394 # with this prefix. 395 if label.startswith(hwid_label): 396 out.append(label) 397 return out 398 399 400 def generate_labels(self, host): 401 # use previous values as default 402 old_hwid_labels = self._old_label_values(host) 403 logging.info("old_hwid_labels: %r", old_hwid_labels) 404 hwid = host.run_output('crossystem hwid').strip() 405 hwid_info_list = [] 406 try: 407 hwid_info_response = hwid_lib.get_hwid_info( 408 hwid=hwid, 409 info_type=hwid_lib.HWID_INFO_LABEL, 410 key_file=self.key_file, 411 ) 412 logging.info("hwid_info_response: %r", hwid_info_response) 413 hwid_info_list = hwid_info_response.get('labels', []) 414 except hwid_lib.HwIdException as e: 415 logging.info("HwIdException: %s", e) 416 417 new_hwid_labels = _parse_hwid_labels(hwid_info_list) 418 logging.info("new HWID labels: %r", new_hwid_labels) 419 420 return HWIDLabel._merge_hwid_label_lists( 421 old=old_hwid_labels, 422 new=new_hwid_labels, 423 ) 424 425 426 def get_all_labels(self): 427 """We need to try all labels as a prefix and as standalone. 428 429 We don't know for sure which labels are prefix labels and which are 430 standalone so we try all of them as both. 431 """ 432 all_hwid_labels = [] 433 try: 434 all_hwid_labels = hwid_lib.get_all_possible_dut_labels( 435 self.key_file) 436 except IOError: 437 logging.error('Can not open key file: %s', self.key_file) 438 except hwid_lib.HwIdException as e: 439 logging.error('hwid service: %s', e) 440 return all_hwid_labels, all_hwid_labels 441 442 443CROS_LABELS = [ 444 AudioLoopbackDongleLabel(), #STATECONFIG 445 BluetoothPeerLabel(), #STATECONFIG 446 ChameleonConnectionLabel(), #LABCONFIG 447 ChameleonLabel(), #STATECONFIG 448 common_label.OSLabel(), 449 DeviceSkuLabel(), #LABCONFIG 450 HWIDLabel(), 451 ServoTypeLabel(), #LABCONFIG 452 # Temporarily add back as there's no way to reference cr50 configs. 453 # See crbug.com/1057145 for the root cause. 454 # See crbug.com/1057719 for future tracking. 455 Cr50Label(), 456 Cr50ROKeyidLabel(), 457] 458 459LABSTATION_LABELS = [ 460 common_label.OSLabel(), 461] 462