• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This class defines the CrosHost Label class."""
6
7import collections
8import logging
9import os
10import re
11
12import common
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import global_config
16from autotest_lib.client.cros.audio import cras_utils
17from autotest_lib.client.cros.video import constants as video_test_constants
18from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
19from autotest_lib.server.hosts import base_label
20from autotest_lib.server.hosts import common_label
21from autotest_lib.server.hosts import servo_host
22from autotest_lib.site_utils import hwid_lib
23
24# pylint: disable=missing-docstring
25LsbOutput = collections.namedtuple('LsbOutput', ['unibuild', 'board'])
26
27# fallback values if we can't contact the HWID server
28HWID_LABELS_FALLBACK = ['sku', 'phase', 'touchscreen', 'touchpad', 'variant', 'stylus']
29
30# Repair and Deploy taskName
31REPAIR_TASK_NAME = 'repair'
32DEPLOY_TASK_NAME = 'deploy'
33
34
35def _parse_lsb_output(host):
36    """Parses the LSB output and returns key data points for labeling.
37
38    @param host: Host that the command will be executed against
39    @returns: LsbOutput with the result of parsing the /etc/lsb-release output
40    """
41    release_info = utils.parse_cmd_output('cat /etc/lsb-release',
42                                          run_method=host.run)
43
44    unibuild = release_info.get('CHROMEOS_RELEASE_UNIBUILD') == '1'
45    return LsbOutput(unibuild, release_info['CHROMEOS_RELEASE_BOARD'])
46
47
48class BoardLabel(base_label.StringPrefixLabel):
49    """Determine the correct board label for the device."""
50
51    _NAME = ds_constants.BOARD_PREFIX.rstrip(':')
52
53    def generate_labels(self, host):
54        # We only want to apply the board labels once, which is when they get
55        # added to the AFE.  That way we don't have to worry about the board
56        # label switching on us if the wrong builds get put on the devices.
57        # crbug.com/624207 records one event of the board label switching
58        # unexpectedly on us.
59        board = host.host_info_store.get().board
60        if board:
61            return [board]
62        for label in host._afe_host.labels:
63            if label.startswith(self._NAME + ':'):
64                return [label.split(':')[-1]]
65
66        return [_parse_lsb_output(host).board]
67
68
69class ModelLabel(base_label.StringPrefixLabel):
70    """Determine the correct model label for the device."""
71
72    _NAME = ds_constants.MODEL_LABEL
73
74    def generate_labels(self, host):
75        # Based on the issue explained in BoardLabel, return the existing
76        # label if it has already been set once.
77        model = host.host_info_store.get().model
78        if model:
79            return [model]
80        for label in host._afe_host.labels:
81            if label.startswith(self._NAME + ':'):
82                return [label.split(':')[-1]]
83
84        lsb_output = _parse_lsb_output(host)
85        model = None
86
87        if lsb_output.unibuild:
88            test_label_cmd = 'cros_config / test-label'
89            result = host.run(command=test_label_cmd, ignore_status=True)
90            if result.exit_status == 0:
91                model = result.stdout.strip()
92            if not model:
93                mosys_cmd = 'mosys platform model'
94                result = host.run(command=mosys_cmd, ignore_status=True)
95                if result.exit_status == 0:
96                    model = result.stdout.strip()
97
98        # We need some sort of backwards compatibility for boards that
99        # are not yet supported with mosys and unified builds.
100        # This is necessary so that we can begin changing cbuildbot to take
101        # advantage of the model/board label differentiations for
102        # scheduling, while still retaining backwards compatibility.
103        return [model or lsb_output.board]
104
105
106class DeviceSkuLabel(base_label.StringPrefixLabel):
107    """Determine the correct device_sku label for the device."""
108
109    _NAME =  ds_constants.DEVICE_SKU_LABEL
110
111    def generate_labels(self, host):
112        device_sku = host.host_info_store.get().device_sku
113        if device_sku:
114            return [device_sku]
115
116        mosys_cmd = 'mosys platform sku'
117        result = host.run(command=mosys_cmd, ignore_status=True)
118        if result.exit_status == 0:
119            return [result.stdout.strip()]
120
121        return []
122
123    def update_for_task(self, task_name):
124        # This label is stored in the lab config, so only deploy tasks update it
125        # or when no task name is mentioned.
126        return task_name in (DEPLOY_TASK_NAME, '')
127
128
129class BrandCodeLabel(base_label.StringPrefixLabel):
130    """Determine the correct brand_code (aka RLZ-code) for the device."""
131
132    _NAME =  ds_constants.BRAND_CODE_LABEL
133
134    def generate_labels(self, host):
135        brand_code = host.host_info_store.get().brand_code
136        if brand_code:
137            return [brand_code]
138
139        cros_config_cmd = 'cros_config / brand-code'
140        result = host.run(command=cros_config_cmd, ignore_status=True)
141        if result.exit_status == 0:
142            return [result.stdout.strip()]
143
144        return []
145
146
147class BluetoothLabel(base_label.BaseLabel):
148    """Label indicating if bluetooth is detected."""
149
150    _NAME = 'bluetooth'
151
152    def exists(self, host):
153        # Based on crbug.com/966219, the label is flipping sometimes.
154        # Potentially this is caused by testing itself.
155        # Making this label permanently sticky.
156        info = host.host_info_store.get()
157        for label in info.labels:
158            if label.startswith(self._NAME):
159                return True
160
161        result = host.run('test -d /sys/class/bluetooth/hci0',
162                          ignore_status=True)
163
164        return result.exit_status == 0
165
166
167class ECLabel(base_label.BaseLabel):
168    """Label to determine the type of EC on this host."""
169
170    _NAME = 'ec:cros'
171
172    def exists(self, host):
173        cmd = 'mosys ec info'
174        # The output should look like these, so that the last field should
175        # match our EC version scheme:
176        #
177        #   stm | stm32f100 | snow_v1.3.139-375eb9f
178        #   ti | Unknown-10de | peppy_v1.5.114-5d52788
179        #
180        # Non-Chrome OS ECs will look like these:
181        #
182        #   ENE | KB932 | 00BE107A00
183        #   ite | it8518 | 3.08
184        #
185        # And some systems don't have ECs at all (Lumpy, for example).
186        regexp = r'^.*\|\s*(\S+_v\d+\.\d+\.\d+-[0-9a-f]+)\s*$'
187
188        ecinfo = host.run(command=cmd, ignore_status=True)
189        if ecinfo.exit_status == 0:
190            res = re.search(regexp, ecinfo.stdout)
191            if res:
192                logging.info("EC version is %s", res.groups()[0])
193                return True
194            logging.info("%s got: %s", cmd, ecinfo.stdout)
195            # Has an EC, but it's not a Chrome OS EC
196        logging.info("%s exited with status %d", cmd, ecinfo.exit_status)
197        return False
198
199
200class Cr50Label(base_label.StringPrefixLabel):
201    """Label indicating the cr50 image type."""
202
203    _NAME = 'cr50'
204
205    def __init__(self):
206        self.ver = None
207
208    def exists(self, host):
209        # Make sure the gsctool version command runs ok
210        self.ver = host.run('gsctool -a -f', ignore_status=True)
211        return self.ver.exit_status == 0
212
213    def _get_version(self, region):
214        """Get the version number of the given region"""
215        return re.search(region + ' (\d+\.\d+\.\d+)', self.ver.stdout).group(1)
216
217    def generate_labels(self, host):
218        # Check the major version to determine prePVT vs PVT
219        version = self._get_version('RW')
220        major_version = int(version.split('.')[1])
221        # PVT images have a odd major version prePVT have even
222        return ['pvt' if (major_version % 2) else 'prepvt']
223
224
225class Cr50RWKeyidLabel(Cr50Label):
226    """Label indicating the cr50 RW version."""
227    _REGION = 'RW'
228    _NAME = 'cr50-rw-keyid'
229
230    def _get_keyid_info(self, region):
231        """Get the keyid of the given region."""
232        match = re.search('keyids:.*%s (\S+)' % region, self.ver.stdout)
233        keyid = match.group(1).rstrip(',')
234        is_prod = int(keyid, 16) & (1 << 2)
235        return [keyid, 'prod' if is_prod else 'dev']
236
237    def generate_labels(self, host):
238        """Get the key type."""
239        return self._get_keyid_info(self._REGION)
240
241
242class Cr50ROKeyidLabel(Cr50RWKeyidLabel):
243    """Label indicating the RO key type."""
244    _REGION = 'RO'
245    _NAME = 'cr50-ro-keyid'
246
247
248class Cr50RWVersionLabel(Cr50Label):
249    """Label indicating the cr50 RW version."""
250    _REGION = 'RW'
251    _NAME = 'cr50-rw-version'
252
253    def generate_labels(self, host):
254        """Get the version and key type"""
255        return [self._get_version(self._REGION)]
256
257
258class Cr50ROVersionLabel(Cr50RWVersionLabel):
259    """Label indicating the RO version."""
260    _REGION = 'RO'
261    _NAME = 'cr50-ro-version'
262
263
264class AccelsLabel(base_label.BaseLabel):
265    """Determine the type of accelerometers on this host."""
266
267    _NAME = 'accel:cros-ec'
268
269    def exists(self, host):
270        # Check to make sure we have ectool
271        rv = host.run('which ectool', ignore_status=True)
272        if rv.exit_status:
273            logging.info("No ectool cmd found; assuming no EC accelerometers")
274            return False
275
276        # Check that the EC supports the motionsense command
277        rv = host.run('ectool motionsense', ignore_status=True)
278        if rv.exit_status:
279            logging.info("EC does not support motionsense command; "
280                         "assuming no EC accelerometers")
281            return False
282
283        # Check that EC motion sensors are active
284        active = host.run('ectool motionsense active').stdout.split('\n')
285        if active[0] == "0":
286            logging.info("Motion sense inactive; assuming no EC accelerometers")
287            return False
288
289        logging.info("EC accelerometers found")
290        return True
291
292
293class ChameleonLabel(base_label.BaseLabel):
294    """Determine if a Chameleon is connected to this host."""
295
296    _NAME = 'chameleon'
297
298    def exists(self, host):
299        # See crbug.com/1004500#2 for details.
300        # https://chromium.googlesource.com/chromiumos/third_party/autotest/+
301        # /refs/heads/master/server/hosts/cros_host.py#335 shows that
302        # _chameleon_host_list is not reliable.
303        has_chameleon = len(host.chameleon_list) > 0
304        # TODO(crbug.com/995900) -- debug why chameleon label is flipping
305        try:
306            logging.info("has_chameleon %s", has_chameleon)
307            logging.info("chameleon_host_list %s",
308                         getattr(host, "_chameleon_host_list", "NO_ATTRIBUTE"))
309            logging.info("chameleon_list %s",
310                         getattr(host, "chameleon_list", "NO_ATTRIBUTE"))
311            logging.info("multi_chameleon %s",
312                         getattr(host, "multi_chameleon", "NO_ATTRIBUTE"))
313        except:
314            pass
315        return has_chameleon
316
317    def update_for_task(self, task_name):
318        # This label is stored in the state config, so only repair tasks update
319        # it or when no task name is mentioned.
320        return task_name in (REPAIR_TASK_NAME, '')
321
322
323class ChameleonConnectionLabel(base_label.StringPrefixLabel):
324    """Return the Chameleon connection label."""
325
326    _NAME = 'chameleon'
327
328    def exists(self, host):
329        return len(host._chameleon_host_list) > 0
330
331
332    def generate_labels(self, host):
333        return [chameleon.get_label() for chameleon in host.chameleon_list]
334
335    def update_for_task(self, task_name):
336        # This label is stored in the lab config, so only deploy tasks update it
337        # or when no task name is mentioned.
338        return task_name in (DEPLOY_TASK_NAME, '')
339
340
341class ChameleonPeripheralsLabel(base_label.StringPrefixLabel):
342    """Return the Chameleon peripherals labels.
343
344    The 'chameleon:bt_hid' label is applied if the bluetooth
345    classic hid device, i.e, RN-42 emulation kit, is detected.
346
347    Any peripherals plugged into the chameleon board would be
348    detected and applied proper labels in this class.
349    """
350
351    _NAME = 'chameleon'
352
353    def exists(self, host):
354        return len(host._chameleon_host_list) > 0
355
356
357    def generate_labels(self, host):
358        labels_list = []
359
360        for chameleon, chameleon_host in \
361                        zip(host.chameleon_list, host._chameleon_host_list):
362            labels = []
363            try:
364                bt_hid_device = chameleon.get_bluetooth_hid_mouse()
365                if bt_hid_device.CheckSerialConnection():
366                    labels.append('bt_hid')
367            except:
368                logging.error('Error with initializing bt_hid_mouse on '
369                              'chameleon %s', chameleon_host.hostname)
370
371            try:
372                ble_hid_device = chameleon.get_ble_mouse()
373                if ble_hid_device.CheckSerialConnection():
374                    labels.append('bt_ble_hid')
375            except:
376                logging.error('Error with initializing ble_hid_mouse on '
377                              'chameleon %s', chameleon_host.hostname)
378
379            try:
380                bt_a2dp_sink = chameleon.get_bluetooth_a2dp_sink()
381                if bt_a2dp_sink.CheckSerialConnection():
382                    labels.append('bt_a2dp_sink')
383            except:
384                logging.error('Error with initializing bt_a2dp_sink on '
385                              'chameleon %s', chameleon_host.hostname)
386
387            try:
388                bt_base_device = chameleon.get_bluetooth_base()
389                if bt_base_device.IsDetected():
390                    labels.append('bt_base')
391            except:
392                logging.error('Error in detecting bt_base on '
393                              'chameleon %s', chameleon_host.hostname)
394
395            if labels != []:
396                labels.append('bt_peer')
397
398            if host.multi_chameleon:
399                labels_list.append(labels)
400            else:
401                labels_list.extend(labels)
402
403
404        logging.info('Bluetooth labels are %s', labels_list)
405        return labels_list
406
407    def update_for_task(self, task_name):
408        # This label is stored in the lab config, so only deploy tasks update it
409        # or when no task name is mentioned.
410        return task_name in (DEPLOY_TASK_NAME, '')
411
412
413class AudioLoopbackDongleLabel(base_label.BaseLabel):
414    """Return the label if an audio loopback dongle is plugged in."""
415
416    _NAME = 'audio_loopback_dongle'
417
418    def exists(self, host):
419        # Based on crbug.com/991285, AudioLoopbackDongle sometimes flips.
420        # Ensure that AudioLoopbackDongle.exists returns True
421        # forever, after it returns True *once*.
422        if self._cached_exists(host):
423            # If the current state is True, return it, don't run the command on
424            # the DUT and potentially flip the state.
425            return True
426        # If the current state is not True, run the command on
427        # the DUT. The new state will be set to whatever the command
428        # produces.
429        return self._host_run_exists(host)
430
431    def _cached_exists(self, host):
432        """Get the state of AudioLoopbackDongle in the data store"""
433        info = host.host_info_store.get()
434        for label in info.labels:
435            if label.startswith(self._NAME):
436                return True
437        return False
438
439    def _host_run_exists(self, host):
440        """Detect presence of audio_loopback_dongle by physically
441        running a command on the DUT."""
442        nodes_info = host.run(command=cras_utils.get_cras_nodes_cmd(),
443                              ignore_status=True).stdout
444        if (cras_utils.node_type_is_plugged('HEADPHONE', nodes_info) and
445            cras_utils.node_type_is_plugged('MIC', nodes_info)):
446            return True
447        return False
448
449    def update_for_task(self, task_name):
450        # This label is stored in the state config, so only repair tasks update
451        # it or when no task name is mentioned.
452        return task_name in (REPAIR_TASK_NAME, '')
453
454
455class PowerSupplyLabel(base_label.StringPrefixLabel):
456    """
457    Return the label describing the power supply type.
458
459    Labels representing this host's power supply.
460         * `power:battery` when the device has a battery intended for
461                extended use
462         * `power:AC_primary` when the device has a battery not intended
463                for extended use (for moving the machine, etc)
464         * `power:AC_only` when the device has no battery at all.
465    """
466
467    _NAME = 'power'
468
469    def __init__(self):
470        self.psu_cmd_result = None
471
472
473    def exists(self, host):
474        self.psu_cmd_result = host.run(command='mosys psu type',
475                                       ignore_status=True)
476        return self.psu_cmd_result.stdout.strip() != 'unknown'
477
478
479    def generate_labels(self, host):
480        if self.psu_cmd_result.exit_status:
481            # The psu command for mosys is not included for all platforms. The
482            # assumption is that the device will have a battery if the command
483            # is not found.
484            return ['battery']
485        return [self.psu_cmd_result.stdout.strip()]
486
487
488class StorageLabel(base_label.StringPrefixLabel):
489    """
490    Return the label describing the storage type.
491
492    Determine if the internal device is SCSI or dw_mmc device.
493    Then check that it is SSD or HDD or eMMC or something else.
494
495    Labels representing this host's internal device type:
496             * `storage:ssd` when internal device is solid state drive
497             * `storage:hdd` when internal device is hard disk drive
498             * `storage:mmc` when internal device is mmc drive
499             * `storage:nvme` when internal device is NVMe drive
500             * `storage:ufs` when internal device is ufs drive
501             * None          When internal device is something else or
502                             when we are unable to determine the type
503    """
504
505    _NAME = 'storage'
506
507    def __init__(self):
508        self.type_str = ''
509
510
511    def exists(self, host):
512        # The output should be /dev/mmcblk* for SD/eMMC or /dev/sd* for scsi
513        rootdev_cmd = ' '.join(['. /usr/sbin/write_gpt.sh;',
514                                '. /usr/share/misc/chromeos-common.sh;',
515                                'load_base_vars;',
516                                'get_fixed_dst_drive'])
517        rootdev = host.run(command=rootdev_cmd, ignore_status=True)
518        if rootdev.exit_status:
519            logging.info("Fail to run %s", rootdev_cmd)
520            return False
521        rootdev_str = rootdev.stdout.strip()
522
523        if not rootdev_str:
524            return False
525
526        rootdev_base = os.path.basename(rootdev_str)
527
528        mmc_pattern = '/dev/mmcblk[0-9]'
529        if re.match(mmc_pattern, rootdev_str):
530            # Use type to determine if the internal device is eMMC or somthing
531            # else. We can assume that MMC is always an internal device.
532            type_cmd = 'cat /sys/block/%s/device/type' % rootdev_base
533            type = host.run(command=type_cmd, ignore_status=True)
534            if type.exit_status:
535                logging.info("Fail to run %s", type_cmd)
536                return False
537            type_str = type.stdout.strip()
538
539            if type_str == 'MMC':
540                self.type_str = 'mmc'
541                return True
542
543        scsi_pattern = '/dev/sd[a-z]+'
544        if re.match(scsi_pattern, rootdev.stdout):
545            # Read symlink for /sys/block/sd* to determine if the internal
546            # device is connected via ata or usb.
547            link_cmd = 'readlink /sys/block/%s' % rootdev_base
548            link = host.run(command=link_cmd, ignore_status=True)
549            if link.exit_status:
550                logging.info("Fail to run %s", link_cmd)
551                return False
552            link_str = link.stdout.strip()
553            if 'usb' in link_str:
554                return False
555            elif 'ufs' in link_str:
556              self.type_str = 'ufs'
557              return True
558
559            # Read rotation to determine if the internal device is ssd or hdd.
560            rotate_cmd = str('cat /sys/block/%s/queue/rotational'
561                              % rootdev_base)
562            rotate = host.run(command=rotate_cmd, ignore_status=True)
563            if rotate.exit_status:
564                logging.info("Fail to run %s", rotate_cmd)
565                return False
566            rotate_str = rotate.stdout.strip()
567
568            rotate_dict = {'0':'ssd', '1':'hdd'}
569            self.type_str = rotate_dict.get(rotate_str)
570            return True
571
572        nvme_pattern = '/dev/nvme[0-9]+n[0-9]+'
573        if re.match(nvme_pattern, rootdev_str):
574            self.type_str = 'nvme'
575            return True
576
577        # All other internal device / error case will always fall here
578        return False
579
580    def generate_labels(self, host):
581        return [self.type_str]
582
583
584class ServoLabel(base_label.BaseLabel):
585    """
586    Label servo is applying if a servo is present.
587    Label servo_state present always.
588    """
589
590    _NAME_OLD = 'servo'
591    _NAME = 'servo_state'
592    _NAME_WORKING = 'servo_state:WORKING'
593    _NAME_BROKEN = 'servo_state:BROKEN'
594
595    def get(self, host):
596        if self.exists(host):
597            return [self._NAME_OLD, self._NAME_WORKING]
598        return [self._NAME_BROKEN]
599
600    def get_all_labels(self):
601        return set([self._NAME]), set([self._NAME_OLD])
602
603    def exists(self, host):
604        # Based on crbug.com/995900, Servo sometimes flips.
605        # Ensure that ServoLabel.exists returns True
606        # forever, after it returns True *once*.
607        if self._cached_exists(host):
608            # If the current state is True, return it, don't run the command on
609            # the DUT and potentially flip the state.
610            return True
611        # If the current state is not True, run the command on
612        # the DUT. The new state will be set to whatever the command
613        # produces.
614        return self._host_run_exists(host)
615
616    def _cached_exists(self, host):
617        """Get the state of Servo in the data store"""
618        info = host.host_info_store.get()
619        for label in info.labels:
620            if label.startswith(self._NAME):
621                if label.startswith(self._NAME_WORKING):
622                    return True
623            elif label.startswith(self._NAME_OLD):
624                return True
625        return False
626
627    def _host_run_exists(self, host):
628        """
629        Check if the servo label should apply to the host or not.
630
631        @returns True if a servo host is detected, False otherwise.
632        """
633        servo_host_hostname = None
634        servo_args = servo_host.get_servo_args_for_host(host)
635        if servo_args:
636            servo_host_hostname = servo_args.get(servo_host.SERVO_HOST_ATTR)
637        return (servo_host_hostname is not None
638                and servo_host.servo_host_is_up(servo_host_hostname))
639
640    def update_for_task(self, task_name):
641        # This label is stored in the state config, so only repair tasks update
642        # it or when no task name is mentioned.
643        return task_name in (REPAIR_TASK_NAME, '')
644
645
646class ArcLabel(base_label.BaseLabel):
647    """Label indicates if host has ARC support."""
648
649    _NAME = 'arc'
650
651    @base_label.forever_exists_decorate
652    def exists(self, host):
653        return 0 == host.run(
654            'grep CHROMEOS_ARC_VERSION /etc/lsb-release',
655            ignore_status=True).exit_status
656
657
658class CtsArchLabel(base_label.StringLabel):
659    """Labels to determine the abi of the CTS bundle (arm or x86 only)."""
660
661    _NAME = ['cts_abi_arm', 'cts_abi_x86', 'cts_cpu_arm', 'cts_cpu_x86']
662
663    def _get_cts_abis(self, arch):
664        """Return supported CTS ABIs.
665
666        @return List of supported CTS bundle ABIs.
667        """
668        cts_abis = {'x86_64': ['arm', 'x86'], 'arm': ['arm']}
669        return cts_abis.get(arch, [])
670
671    def _get_cts_cpus(self, arch):
672        """Return supported CTS native CPUs.
673
674        This is needed for CTS_Instant scheduling.
675        @return List of supported CTS native CPUs.
676        """
677        cts_cpus = {'x86_64': ['x86'], 'arm': ['arm']}
678        return cts_cpus.get(arch, [])
679
680    def generate_labels(self, host):
681        cpu_arch = host.get_cpu_arch()
682        abi_labels = ['cts_abi_' + abi for abi in self._get_cts_abis(cpu_arch)]
683        cpu_labels = ['cts_cpu_' + cpu for cpu in self._get_cts_cpus(cpu_arch)]
684        return abi_labels + cpu_labels
685
686
687class VideoGlitchLabel(base_label.BaseLabel):
688    """Label indicates if host supports video glitch detection tests."""
689
690    _NAME = 'video_glitch_detection'
691
692    def exists(self, host):
693        board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
694
695        return board in video_test_constants.SUPPORTED_BOARDS
696
697
698class InternalDisplayLabel(base_label.StringLabel):
699    """Label that determines if the device has an internal display."""
700
701    _NAME = 'internal_display'
702
703    def generate_labels(self, host):
704        from autotest_lib.client.cros.graphics import graphics_utils
705        from autotest_lib.client.common_lib import utils as common_utils
706
707        def __system_output(cmd):
708            return host.run(cmd).stdout
709
710        def __read_file(remote_path):
711            return host.run('cat %s' % remote_path).stdout
712
713        # Hijack the necessary client functions so that we can take advantage
714        # of the client lib here.
715        # FIXME: find a less hacky way than this
716        original_system_output = utils.system_output
717        original_read_file = common_utils.read_file
718        utils.system_output = __system_output
719        common_utils.read_file = __read_file
720        try:
721            return ([self._NAME]
722                    if graphics_utils.has_internal_display()
723                    else [])
724        finally:
725            utils.system_output = original_system_output
726            common_utils.read_file = original_read_file
727
728
729class LucidSleepLabel(base_label.BaseLabel):
730    """Label that determines if device has support for lucid sleep."""
731
732    # TODO(kevcheng): See if we can determine if this label is applicable a
733    # better way (crbug.com/592146).
734    _NAME = 'lucidsleep'
735    LUCID_SLEEP_BOARDS = ['nocturne', 'poppy']
736
737    def exists(self, host):
738        board = host.get_board().replace(ds_constants.BOARD_PREFIX, '')
739        return board in self.LUCID_SLEEP_BOARDS
740
741
742def _parse_hwid_labels(hwid_info_list):
743    if len(hwid_info_list) == 0:
744        return hwid_info_list
745
746    res = []
747    # See crbug.com/997816#c7 for details of two potential formats of returns
748    # from HWID server.
749    if isinstance(hwid_info_list[0], dict):
750        # Format of hwid_info:
751        # [{u'name': u'sku', u'value': u'xxx'}, ..., ]
752        for hwid_info in hwid_info_list:
753            value = hwid_info.get('value', '')
754            name = hwid_info.get('name', '')
755            # There should always be a name but just in case there is not.
756            if name:
757                new_label = name if not value else '%s:%s' % (name, value)
758                res.append(new_label)
759    else:
760        # Format of hwid_info:
761        # [<DUTLabel name: 'sku' value: u'xxx'>, ..., ]
762        for hwid_info in hwid_info_list:
763            new_label = str(hwid_info)
764            logging.info('processing hwid label: %s', new_label)
765            res.append(new_label)
766
767    return res
768
769
770class HWIDLabel(base_label.StringLabel):
771    """Return all the labels generated from the hwid."""
772
773    # We leave out _NAME because hwid_lib will generate everything for us.
774
775    def __init__(self):
776        # Grab the key file needed to access the hwid service.
777        self.key_file = global_config.global_config.get_config_value(
778                'CROS', 'HWID_KEY', type=str)
779
780
781    @staticmethod
782    def _merge_hwid_label_lists(new, old):
783        """merge a list of old and new values for hwid_labels.
784        preferring new values if available
785
786        @returns: list of labels"""
787        # TODO(gregorynisbet): what is the appropriate way to merge
788        # old and new information?
789        retained = set(x for x in old)
790        for label in new:
791            key, sep, value = label.partition(':')
792            # If we have a key-value key such as variant:aaa,
793            # then we remove all the old labels with the same key.
794            if sep:
795                retained = set(x for x in retained if (not x.startswith(key + ':')))
796        return list(sorted(retained.union(new)))
797
798
799    def _hwid_label_names(self):
800        """get the labels that hwid_lib controls.
801
802        @returns: hwid_labels
803        """
804        all_hwid_labels, _ = self.get_all_labels()
805        # If and only if get_all_labels was unsuccessful,
806        # it will return a falsey value.
807        out = all_hwid_labels or HWID_LABELS_FALLBACK
808
809        # TODO(gregorynisbet): remove this
810        # TODO(crbug.com/999785)
811        if "sku" not in out:
812            logging.info("sku-less label names %s", out)
813
814        return out
815
816
817    def _old_label_values(self, host):
818        """get the hwid_lib labels on previous run
819
820        @returns: hwid_labels"""
821        out = []
822        info = host.host_info_store.get()
823        for hwid_label in self._hwid_label_names():
824            for label in info.labels:
825                # NOTE: we want *all* the labels starting
826                # with this prefix.
827                if label.startswith(hwid_label):
828                    out.append(label)
829        return out
830
831
832    def generate_labels(self, host):
833        # use previous values as default
834        old_hwid_labels = self._old_label_values(host)
835        logging.info("old_hwid_labels: %r", old_hwid_labels)
836        hwid = host.run_output('crossystem hwid').strip()
837        hwid_info_list = []
838        try:
839            hwid_info_response = hwid_lib.get_hwid_info(
840                hwid=hwid,
841                info_type=hwid_lib.HWID_INFO_LABEL,
842                key_file=self.key_file,
843            )
844            logging.info("hwid_info_response: %r", hwid_info_response)
845            hwid_info_list = hwid_info_response.get('labels', [])
846        except hwid_lib.HwIdException as e:
847            logging.info("HwIdException: %s", e)
848
849        new_hwid_labels = _parse_hwid_labels(hwid_info_list)
850        logging.info("new HWID labels: %r", new_hwid_labels)
851
852        return HWIDLabel._merge_hwid_label_lists(
853            old=old_hwid_labels,
854            new=new_hwid_labels,
855        )
856
857
858    def get_all_labels(self):
859        """We need to try all labels as a prefix and as standalone.
860
861        We don't know for sure which labels are prefix labels and which are
862        standalone so we try all of them as both.
863        """
864        all_hwid_labels = []
865        try:
866            all_hwid_labels = hwid_lib.get_all_possible_dut_labels(
867                    self.key_file)
868        except IOError:
869            logging.error('Can not open key file: %s', self.key_file)
870        except hwid_lib.HwIdException as e:
871            logging.error('hwid service: %s', e)
872        return all_hwid_labels, all_hwid_labels
873
874
875class DetachableBaseLabel(base_label.BaseLabel):
876    """Label indicating if device has detachable keyboard."""
877
878    _NAME = 'detachablebase'
879
880    def exists(self, host):
881        return host.run('which hammerd', ignore_status=True).exit_status == 0
882
883
884class FingerprintLabel(base_label.BaseLabel):
885    """Label indicating whether device has fingerprint sensor."""
886
887    _NAME = 'fingerprint'
888
889    def exists(self, host):
890        return host.run('test -c /dev/cros_fp',
891                        ignore_status=True).exit_status == 0
892
893
894class ReferenceDesignLabel(base_label.StringPrefixLabel):
895    """Determine the correct reference design label for the device. """
896
897    _NAME = 'reference_design'
898
899    def __init__(self):
900        self.response = None
901
902    def exists(self, host):
903        self.response = host.run('mosys platform family', ignore_status=True)
904        return self.response.exit_status == 0
905
906    def generate_labels(self, host):
907        if self.exists(host):
908            return [self.response.stdout.strip()]
909
910
911CROS_LABELS = [
912    AudioLoopbackDongleLabel(), #STATECONFIG
913    ChameleonConnectionLabel(), #LABCONFIG
914    ChameleonLabel(), #STATECONFIG
915    ChameleonPeripheralsLabel(), #LABCONFIG
916    common_label.OSLabel(),
917    DeviceSkuLabel(), #LABCONFIG
918    HWIDLabel(),
919    ServoLabel(), #STATECONFIG
920]
921
922LABSTATION_LABELS = [
923    common_label.OSLabel(),
924]
925