• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import time
7import StringIO
8import subprocess
9
10from autotest_lib.client.common_lib import error, utils
11from autotest_lib.client.common_lib.cros import tpm_utils
12from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
13
14
15class firmware_IntegratedU2F(FirmwareTest):
16    """Verify U2F using the on-board cr50 firmware works."""
17    version = 1
18
19    U2F_TEST_PATH = '/usr/local/bin/U2FTest'
20
21    U2F_FORCE_PATH = '/var/lib/u2f/force/u2f.force'
22    G2F_FORCE_PATH = '/var/lib/u2f/force/g2f.force'
23    USER_KEYS_FORCE_PATH = '/var/lib/u2f/force/user_keys.force'
24
25    VID = '18D1'
26    PID = '502C'
27    SHORT_WAIT = 1
28
29    def cleanup(self):
30        """Remove *.force files"""
31        self.host.run('rm -f /var/lib/u2f/force/*.force')
32
33        # Restart u2fd so that flag change takes effect.
34        self.host.run('restart u2fd')
35
36        # Put the device back to a known state; also restarts the device.
37        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
38
39        super(firmware_IntegratedU2F, self).cleanup()
40
41
42    def u2fd_is_running(self):
43        """Returns True if u2fd is running on the host"""
44        return 'running' in self.host.run('status u2fd').stdout
45
46
47    def cryptohome_ready(self):
48        """Return True if cryptohome is running."""
49        return 'running' in self.host.run('status cryptohomed').stdout
50
51
52    def owner_key_exists(self):
53        """Return True if /var/lib/whitelist/owner.key exists."""
54        logging.info('checking for owner key')
55        return self.host.path_exists('/var/lib/whitelist/owner.key')
56
57
58    def wait_for_policy(self):
59        """Start u2fd on the host"""
60
61        # Wait for cryptohome to show the TPM is ready before logging in.
62        if not utils.wait_for_value(self.cryptohome_ready, True,
63                                    timeout_sec=60):
64            raise error.TestError('Cryptohome did not start')
65
66        # Wait for the owner key to exist before trying to start u2fd.
67        if not utils.wait_for_value(self.owner_key_exists, True,
68                                    timeout_sec=120):
69            raise error.TestError('Device did not create owner key')
70
71
72    def set_u2fd_flags(self, u2f, g2f, user_keys):
73        # Start by removing all flags.
74        self.host.run('rm -f /var/lib/u2f/force/*.force')
75
76        if u2f:
77          self.host.run('touch %s' % self.U2F_FORCE_PATH)
78
79        if g2f:
80          self.host.run('touch %s' % self.G2F_FORCE_PATH)
81
82        if user_keys:
83          self.host.run('touch %s' % self.USER_KEYS_FORCE_PATH)
84
85        # Restart u2fd so that flag change takes effect.
86        self.host.run('restart u2fd')
87
88        # Make sure it is still running
89        if not self.u2fd_is_running():
90            raise error.TestFail('could not start u2fd')
91        logging.info('u2fd is running')
92
93
94    def find_u2f_device(self):
95        """Find the U2F device
96
97        Returns:
98            0 if the device hasn't been found. Non-zero if it has
99        """
100        self.device = ''
101        path = '/sys/bus/hid/devices/*:%s:%s.*/hidraw' % (self.VID, self.PID)
102        try:
103            self.device = self.host.run('ls ' + path).stdout.strip()
104        except error.AutoservRunError, e:
105            logging.info('Could not find device')
106        return len(self.device)
107
108
109    def update_u2f_device_path(self):
110        """Get the integrated u2f device."""
111        start_time = time.time()
112        utils.wait_for_value(self.find_u2f_device, max_threshold=1,
113            timeout_sec=30)
114        wait_time = int(time.time() - start_time)
115        if wait_time:
116            logging.info('Took %ss to find device', wait_time)
117        self.dev_path = '/dev/' + self.device
118
119
120    def check_u2ftest_and_press_power_button(self):
121        """Check stdout and press the power button if prompted
122
123        Returns:
124            True if the process has terminated.
125        """
126        time.sleep(self.SHORT_WAIT)
127        self.output += self.get_u2ftest_output()
128        logging.info(self.output)
129        if 'Touch device and hit enter..' in self.output:
130            # press the power button
131            self.servo.power_short_press()
132            logging.info('pressed power button')
133            time.sleep(self.SHORT_WAIT)
134            # send enter to the test process
135            self.u2ftest_job.sp.stdin.write('\n')
136            logging.info('hit enter')
137            self.output = ''
138        return self.u2ftest_job.sp.poll() is not None
139
140
141    def get_u2ftest_output(self):
142        """Read the new output"""
143        self.u2ftest_job.process_output()
144        self.stdout.seek(self.last_len)
145        output = self.stdout.read().strip()
146        self.last_len = self.stdout.len
147        return output
148
149    def run_u2ftest(self):
150        """Run U2FTest with the U2F device"""
151        self.last_len = 0
152        self.output = ''
153
154        u2ftest_cmd = utils.sh_escape('%s %s' % (self.U2F_TEST_PATH,
155                                                 self.dev_path))
156        full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'),
157            u2ftest_cmd)
158        self.stdout = StringIO.StringIO()
159        # Start running U2FTest in the background.
160        self.u2ftest_job = utils.BgJob(full_ssh_command,
161                                       nickname='u2ftest',
162                                       stdout_tee=self.stdout,
163                                       stderr_tee=utils.TEE_TO_LOGS,
164                                       stdin=subprocess.PIPE)
165        if self.u2ftest_job == None:
166            raise error.TestFail('could not start u2ftest')
167
168        try:
169            utils.wait_for_value(self.check_u2ftest_and_press_power_button,
170                expected_value=True, timeout_sec=30)
171        finally:
172            self.close_u2ftest()
173
174
175    def close_u2ftest(self):
176        """Terminate the process and check the results."""
177        exit_status = utils.nuke_subprocess(self.u2ftest_job.sp)
178
179        stdout = self.stdout.getvalue().strip()
180        if stdout:
181            logging.debug('stdout of U2FTest:\n%s', stdout)
182        if exit_status:
183            logging.error('stderr of U2FTest:\n%s', self.output)
184            raise error.TestError('U2FTest: %s' % self.output)
185
186
187    def run_once(self, host):
188        """Run U2FTest"""
189        self.host = host
190
191        if not self.host.path_exists(self.U2F_TEST_PATH):
192            raise error.TestNAError('Device does not have U2FTest support')
193
194        # u2fd reads files from the user's home dir, so we need to log in.
195        self.host.run('/usr/local/autotest/bin/autologin.py')
196
197        # u2fd needs the policy file to exist.
198        self.wait_for_policy()
199
200        logging.info("testing u2fd --u2f")
201        self.set_u2fd_flags(True, False, False)
202        # Setting the flags restarts u2fd, which will re-create the u2f device.
203        self.update_u2f_device_path()
204        self.run_u2ftest();
205
206        logging.info("testing u2fd --g2f")
207        self.set_u2fd_flags(False, True, False)
208        self.update_u2f_device_path()
209        self.run_u2ftest();
210
211        logging.info("testing u2fd --u2f --user_keys")
212        self.set_u2fd_flags(True, False, True)
213        self.update_u2f_device_path()
214        self.run_u2ftest();
215
216        logging.info("testing u2fd --g2f --user_keys")
217        self.set_u2fd_flags(False, True, True)
218        self.update_u2f_device_path()
219        self.run_u2ftest();
220