• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""This class defines the CrosHost Label class."""
7
8from __future__ import absolute_import
9from __future__ import division
10from __future__ import print_function
11
12import collections
13import logging
14import re
15
16import common
17
18from autotest_lib.client.bin import utils
19from autotest_lib.client.common_lib import global_config
20from autotest_lib.client.cros.audio import cras_utils
21from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
22from autotest_lib.server.hosts import base_label
23from autotest_lib.server.hosts import common_label
24from autotest_lib.server.hosts import servo_constants
25from autotest_lib.site_utils import hwid_lib
26from six.moves import zip
27
28# pylint: disable=missing-docstring
29LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])
30
31# fallback values if we can't contact the HWID server
32HWID_LABELS_FALLBACK = ['sku', 'phase', 'touchscreen', 'touchpad', 'variant', 'stylus']
33
34# Repair and Deploy taskName
35REPAIR_TASK_NAME = 'repair'
36DEPLOY_TASK_NAME = 'deploy'
37
38
39def _parse_lsb_output(host):
40    """Parses the LSB output and returns key data points for labeling.
41
42    @param host: Host that the command will be executed against
43    @returns: LsbOutput with the result of parsing the /etc/lsb-release output
44    """
45    release_info = utils.parse_cmd_output('cat /etc/lsb-release',
46                                          run_method=host.run)
47
48    unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
49    return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])
50
51
52class DeviceSkuLabel(base_label.StringPrefixLabel):
53    """Determine the correct device_sku label for the device."""
54
55    _NAME =  ds_constants.DEVICE_SKU_LABEL
56
57    def generate_labels(self, host):
58        device_sku = host.host_info_store.get().device_sku
59        if device_sku:
60            return [device_sku]
61
62        mosys_cmd = 'mosys platform sku'
63        result = host.run(command=mosys_cmd, ignore_status=True)
64        if result.exit_status == 0:
65            return [result.stdout.strip()]
66
67        return []
68
69    def update_for_task(self, task_name):
70        # This label is stored in the lab config.
71        return task_name in (DEPLOY_TASK_NAME, REPAIR_TASK_NAME, '')
72
73
74class BrandCodeLabel(base_label.StringPrefixLabel):
75    """Determine the correct brand_code (aka RLZ-code) for the device."""
76
77    _NAME =  ds_constants.BRAND_CODE_LABEL
78
79    def generate_labels(self, host):
80        brand_code = host.host_info_store.get().brand_code
81        if brand_code:
82            return [brand_code]
83
84        cros_config_cmd = 'cros_config / brand-code'
85        result = host.run(command=cros_config_cmd, ignore_status=True)
86        if result.exit_status == 0:
87            return [result.stdout.strip()]
88
89        return []
90
91
92class BluetoothPeerLabel(base_label.StringPrefixLabel):
93    """Return the Bluetooth peer labels.
94
95    working_bluetooth_btpeer label is applied if a Raspberry Pi Bluetooth peer
96    is detected.There can be up to 4 Bluetooth peers. Labels
97    working_bluetooth_btpeer:[1-4] will be assigned depending on the number of
98    peers present.
99
100    """
101
102    _NAME = 'working_bluetooth_btpeer'
103
104    def exists(self, host):
105        return  len(host._btpeer_host_list) > 0
106
107    def generate_labels(self, host):
108        labels_list = []
109        count = 1
110
111        for (btpeer, btpeer_host) in \
112                        zip(host.btpeer_list, host._btpeer_host_list):
113            try:
114                # Initialize one device type to make sure the peer is working
115                bt_hid_device = btpeer.get_bluetooth_hid_mouse()
116                if bt_hid_device.CheckSerialConnection():
117                    labels_list.append(str(count))
118                    count += 1
119            except Exception as e:
120                logging.error('Error with initializing bt_hid_mouse on '
121                              'btpeer %s %s', btpeer_host.hostname, e)
122
123        logging.info('Bluetooth Peer labels are %s', labels_list)
124        return labels_list
125
126    def update_for_task(self, task_name):
127        # This label is stored in the state config, so only repair tasks update
128        # it or when no task name is mentioned.
129        return task_name in (REPAIR_TASK_NAME, '')
130
131
132class Cr50Label(base_label.StringPrefixLabel):
133    """Label indicating the cr50 image type."""
134
135    _NAME = 'cr50'
136
137    def __init__(self):
138        self.ver = None
139
140    def exists(self, host):
141        # Make sure the gsctool version command runs ok
142        self.ver = host.run('gsctool -a -f', ignore_status=True)
143        return self.ver.exit_status == 0
144
145    def _get_version(self, region):
146        """Get the version number of the given region"""
147        return re.search(region + ' (\d+\.\d+\.\d+)', self.ver.stdout).group(1)
148
149    def generate_labels(self, host):
150        # Check the major version to determine prePVT vs PVT
151        version = self._get_version('RW')
152        major_version = int(version.split('.')[1])
153        # PVT images have a odd major version prePVT have even
154        return ['pvt' if (major_version % 2) else 'prepvt']
155
156    def update_for_task(self, task_name):
157        # This label is stored in the state config, so only repair tasks update
158        # it or when no task name is mentioned.
159        return task_name in (REPAIR_TASK_NAME, '')
160
161
162class Cr50RWKeyidLabel(Cr50Label):
163    """Label indicating the cr50 RW version."""
164    _REGION = 'RW'
165    _NAME = 'cr50-rw-keyid'
166
167    def _get_keyid_info(self, region):
168        """Get the keyid of the given region."""
169        match = re.search('keyids:.*%s (\S+)' % region, self.ver.stdout)
170        keyid = match.group(1).rstrip(',')
171        is_prod = int(keyid, 16) & (1 << 2)
172        return [keyid, 'prod' if is_prod else 'dev']
173
174    def generate_labels(self, host):
175        """Get the key type."""
176        return self._get_keyid_info(self._REGION)
177
178
179class Cr50ROKeyidLabel(Cr50RWKeyidLabel):
180    """Label indicating the RO key type."""
181    _REGION = 'RO'
182    _NAME = 'cr50-ro-keyid'
183
184
185class ChameleonLabel(base_label.BaseLabel):
186    """Determine if a Chameleon is connected to this host."""
187
188    _NAME = 'chameleon'
189
190    def exists(self, host):
191        # See crbug.com/1004500#2 for details.
192        has_chameleon = host._chameleon_host is not None
193        # TODO(crbug.com/995900) -- debug why chameleon label is flipping
194        try:
195            logging.info("has_chameleon %s", has_chameleon)
196            logging.info("_chameleon_host %s",
197                         getattr(host, "_chameleon_host", "NO_ATTRIBUTE"))
198            logging.info("chameleon %s",
199                         getattr(host, "chameleon", "NO_ATTRIBUTE"))
200        except:
201            pass
202        return has_chameleon
203
204    def update_for_task(self, task_name):
205        # This label is stored in the state config, so only repair tasks update
206        # it or when no task name is mentioned.
207        return task_name in (REPAIR_TASK_NAME, '')
208
209
210class ChameleonConnectionLabel(base_label.StringPrefixLabel):
211    """Return the Chameleon connection label."""
212
213    _NAME = 'chameleon'
214
215    def exists(self, host):
216        return host._chameleon_host is not None
217
218    def generate_labels(self, host):
219        return [host.chameleon.get_label()]
220
221    def update_for_task(self, task_name):
222        # This label is stored in the lab config, so only deploy tasks update it
223        # or when no task name is mentioned.
224        return task_name in (DEPLOY_TASK_NAME, '')
225
226
227class AudioLoopbackDongleLabel(base_label.BaseLabel):
228    """Return the label if an audio loopback dongle is plugged in."""
229
230    _NAME = 'audio_loopback_dongle'
231
232    def exists(self, host):
233        # Based on crbug.com/991285, AudioLoopbackDongle sometimes flips.
234        # Ensure that AudioLoopbackDongle.exists returns True
235        # forever, after it returns True *once*.
236        if self._cached_exists(host):
237            # If the current state is True, return it, don't run the command on
238            # the DUT and potentially flip the state.
239            return True
240        # If the current state is not True, run the command on
241        # the DUT. The new state will be set to whatever the command
242        # produces.
243        return self._host_run_exists(host)
244
245    def _cached_exists(self, host):
246        """Get the state of AudioLoopbackDongle in the data store"""
247        info = host.host_info_store.get()
248        for label in info.labels:
249            if label.startswith(self._NAME):
250                return True
251        return False
252
253    def _host_run_exists(self, host):
254        """Detect presence of audio_loopback_dongle by physically
255        running a command on the DUT."""
256        nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
257                              ignore_status=True).stdout
258        if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
259            cras_utils.node_type_is_plugged('MIC', nodes_info)):
260            return True
261        return False
262
263    def update_for_task(self, task_name):
264        # This label is stored in the state config, so only repair tasks update
265        # it or when no task name is mentioned.
266        return task_name in (REPAIR_TASK_NAME, '')
267
268
269class ServoTypeLabel(base_label.StringPrefixLabel):
270    _NAME = servo_constants.SERVO_TYPE_LABEL_PREFIX
271
272    def generate_labels(self, host):
273        info = host.host_info_store.get()
274
275        servo_type = self._get_from_labels(info)
276        if servo_type != '':
277            logging.info("Using servo_type: %s from cache!", servo_type)
278            return [servo_type]
279
280        if host.servo is not None:
281            try:
282                servo_type = host.servo.get_servo_version()
283                if servo_type != '':
284                    return [servo_type]
285                logging.warning('Cannot collect servo_type from servo'
286                ' by `dut-control servo_type`! Please file a bug'
287                ' and inform infra team as we are not expected '
288                ' to reach this point.')
289            except Exception as e:
290                # We don't want fail the label and break DUTs here just
291                # because of servo issue.
292                logging.error("Failed to update servo_type, %s", str(e))
293        return []
294
295    def _get_from_labels(self, info):
296        prefix = self._NAME + ':'
297        for label in info.labels:
298            if  label.startswith(prefix):
299                suffix_length = len(prefix)
300                return label[suffix_length:]
301        return ''
302
303    def update_for_task(self, task_name):
304        # This label is stored in the lab config,
305        # only deploy and repair tasks update it
306        # or when no task name is mentioned.
307        return task_name in (DEPLOY_TASK_NAME, '')
308
309
310def _parse_hwid_labels(hwid_info_list):
311    if len(hwid_info_list) == 0:
312        return hwid_info_list
313
314    res = []
315    # See crbug.com/997816#c7 for details of two potential formats of returns
316    # from HWID server.
317    if isinstance(hwid_info_list[0], dict):
318        # Format of hwid_info:
319        # [{u'name': u'sku', u'value': u'xxx'}, ..., ]
320        for hwid_info in hwid_info_list:
321            value = hwid_info.get('value', '')
322            name = hwid_info.get('name', '')
323            # There should always be a name but just in case there is not.
324            if name:
325                new_label = name if not value else '%s:%s' % (name, value)
326                res.append(new_label)
327    else:
328        # Format of hwid_info:
329        # [<DUTLabel name: 'sku' value: u'xxx'>, ..., ]
330        for hwid_info in hwid_info_list:
331            new_label = str(hwid_info)
332            logging.info('processing hwid label: %s', new_label)
333            res.append(new_label)
334
335    return res
336
337
338class HWIDLabel(base_label.StringLabel):
339    """Return all the labels generated from the hwid."""
340
341    # We leave out _NAME because hwid_lib will generate everything for us.
342
343    def __init__(self):
344        # Grab the key file needed to access the hwid service.
345        self.key_file = global_config.global_config.get_config_value(
346                'CROS', 'HWID_KEY', type=str)
347
348
349    @staticmethod
350    def _merge_hwid_label_lists(new, old):
351        """merge a list of old and new values for hwid_labels.
352        preferring new values if available
353
354        @returns: list of labels"""
355        # TODO(gregorynisbet): what is the appropriate way to merge
356        # old and new information?
357        retained = set(x for x in old)
358        for label in new:
359            key, sep, value = label.partition(':')
360            # If we have a key-value key such as variant:aaa,
361            # then we remove all the old labels with the same key.
362            if sep:
363                retained = set(x for x in retained if (not x.startswith(key + ':')))
364        return list(sorted(retained.union(new)))
365
366
367    def _hwid_label_names(self):
368        """get the labels that hwid_lib controls.
369
370        @returns: hwid_labels
371        """
372        all_hwid_labels, _ = self.get_all_labels()
373        # If and only if get_all_labels was unsuccessful,
374        # it will return a falsey value.
375        out = all_hwid_labels or HWID_LABELS_FALLBACK
376
377        # TODO(gregorynisbet): remove this
378        # TODO(crbug.com/999785)
379        if "sku" not in out:
380            logging.info("sku-less label names %s", out)
381
382        return out
383
384
385    def _old_label_values(self, host):
386        """get the hwid_lib labels on previous run
387
388        @returns: hwid_labels"""
389        out = []
390        info = host.host_info_store.get()
391        for hwid_label in self._hwid_label_names():
392            for label in info.labels:
393                # NOTE: we want *all* the labels starting
394                # with this prefix.
395                if label.startswith(hwid_label):
396                    out.append(label)
397        return out
398
399
400    def generate_labels(self, host):
401        # use previous values as default
402        old_hwid_labels = self._old_label_values(host)
403        logging.info("old_hwid_labels: %r", old_hwid_labels)
404        hwid = host.run_output('crossystem hwid').strip()
405        hwid_info_list = []
406        try:
407            hwid_info_response = hwid_lib.get_hwid_info(
408                hwid=hwid,
409                info_type=hwid_lib.HWID_INFO_LABEL,
410                key_file=self.key_file,
411            )
412            logging.info("hwid_info_response: %r", hwid_info_response)
413            hwid_info_list = hwid_info_response.get('labels', [])
414        except hwid_lib.HwIdException as e:
415            logging.info("HwIdException: %s", e)
416
417        new_hwid_labels = _parse_hwid_labels(hwid_info_list)
418        logging.info("new HWID labels: %r", new_hwid_labels)
419
420        return HWIDLabel._merge_hwid_label_lists(
421            old=old_hwid_labels,
422            new=new_hwid_labels,
423        )
424
425
426    def get_all_labels(self):
427        """We need to try all labels as a prefix and as standalone.
428
429        We don't know for sure which labels are prefix labels and which are
430        standalone so we try all of them as both.
431        """
432        all_hwid_labels = []
433        try:
434            all_hwid_labels = hwid_lib.get_all_possible_dut_labels(
435                    self.key_file)
436        except IOError:
437            logging.error('Can not open key file: %s', self.key_file)
438        except hwid_lib.HwIdException as e:
439            logging.error('hwid service: %s', e)
440        return all_hwid_labels, all_hwid_labels
441
442
443CROS_LABELS = [
444    AudioLoopbackDongleLabel(), #STATECONFIG
445    BluetoothPeerLabel(), #STATECONFIG
446    ChameleonConnectionLabel(), #LABCONFIG
447    ChameleonLabel(), #STATECONFIG
448    common_label.OSLabel(),
449    DeviceSkuLabel(), #LABCONFIG
450    HWIDLabel(),
451    ServoTypeLabel(), #LABCONFIG
452    # Temporarily add back as there's no way to reference cr50 configs.
453    # See crbug.com/1057145 for the root cause.
454    # See crbug.com/1057719 for future tracking.
455    Cr50Label(),
456    Cr50ROKeyidLabel(),
457]
458
459LABSTATION_LABELS = [
460    common_label.OSLabel(),
461]
462