1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import time 7import StringIO 8import subprocess 9 10from autotest_lib.client.common_lib import error, utils 11from autotest_lib.client.common_lib.cros import tpm_utils 12from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 13 14 15class firmware_IntegratedU2F(FirmwareTest): 16 """Verify U2F using the on-board cr50 firmware works.""" 17 version = 1 18 19 U2F_TEST_PATH = '/usr/local/bin/U2FTest' 20 21 U2F_FORCE_PATH = '/var/lib/u2f/force/u2f.force' 22 G2F_FORCE_PATH = '/var/lib/u2f/force/g2f.force' 23 USER_KEYS_FORCE_PATH = '/var/lib/u2f/force/user_keys.force' 24 25 VID = '18D1' 26 PID = '502C' 27 SHORT_WAIT = 1 28 29 def cleanup(self): 30 """Remove *.force files""" 31 self.host.run('rm -f /var/lib/u2f/force/*.force') 32 33 # Restart u2fd so that flag change takes effect. 34 self.host.run('restart u2fd') 35 36 # Put the device back to a known state; also restarts the device. 37 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 38 39 super(firmware_IntegratedU2F, self).cleanup() 40 41 42 def u2fd_is_running(self): 43 """Returns True if u2fd is running on the host""" 44 return 'running' in self.host.run('status u2fd').stdout 45 46 47 def cryptohome_ready(self): 48 """Return True if cryptohome is running.""" 49 return 'running' in self.host.run('status cryptohomed').stdout 50 51 52 def owner_key_exists(self): 53 """Return True if /var/lib/whitelist/owner.key exists.""" 54 logging.info('checking for owner key') 55 return self.host.path_exists('/var/lib/whitelist/owner.key') 56 57 58 def wait_for_policy(self): 59 """Start u2fd on the host""" 60 61 # Wait for cryptohome to show the TPM is ready before logging in. 62 if not utils.wait_for_value(self.cryptohome_ready, True, 63 timeout_sec=60): 64 raise error.TestError('Cryptohome did not start') 65 66 # Wait for the owner key to exist before trying to start u2fd. 67 if not utils.wait_for_value(self.owner_key_exists, True, 68 timeout_sec=120): 69 raise error.TestError('Device did not create owner key') 70 71 72 def set_u2fd_flags(self, u2f, g2f, user_keys): 73 # Start by removing all flags. 74 self.host.run('rm -f /var/lib/u2f/force/*.force') 75 76 if u2f: 77 self.host.run('touch %s' % self.U2F_FORCE_PATH) 78 79 if g2f: 80 self.host.run('touch %s' % self.G2F_FORCE_PATH) 81 82 if user_keys: 83 self.host.run('touch %s' % self.USER_KEYS_FORCE_PATH) 84 85 # Restart u2fd so that flag change takes effect. 86 self.host.run('restart u2fd') 87 88 # Make sure it is still running 89 if not self.u2fd_is_running(): 90 raise error.TestFail('could not start u2fd') 91 logging.info('u2fd is running') 92 93 94 def find_u2f_device(self): 95 """Find the U2F device 96 97 Returns: 98 0 if the device hasn't been found. Non-zero if it has 99 """ 100 self.device = '' 101 path = '/sys/bus/hid/devices/*:%s:%s.*/hidraw' % (self.VID, self.PID) 102 try: 103 self.device = self.host.run('ls ' + path).stdout.strip() 104 except error.AutoservRunError, e: 105 logging.info('Could not find device') 106 return len(self.device) 107 108 109 def update_u2f_device_path(self): 110 """Get the integrated u2f device.""" 111 start_time = time.time() 112 utils.wait_for_value(self.find_u2f_device, max_threshold=1, 113 timeout_sec=30) 114 wait_time = int(time.time() - start_time) 115 if wait_time: 116 logging.info('Took %ss to find device', wait_time) 117 self.dev_path = '/dev/' + self.device 118 119 120 def check_u2ftest_and_press_power_button(self): 121 """Check stdout and press the power button if prompted 122 123 Returns: 124 True if the process has terminated. 125 """ 126 time.sleep(self.SHORT_WAIT) 127 self.output += self.get_u2ftest_output() 128 logging.info(self.output) 129 if 'Touch device and hit enter..' in self.output: 130 # press the power button 131 self.servo.power_short_press() 132 logging.info('pressed power button') 133 time.sleep(self.SHORT_WAIT) 134 # send enter to the test process 135 self.u2ftest_job.sp.stdin.write('\n') 136 logging.info('hit enter') 137 self.output = '' 138 return self.u2ftest_job.sp.poll() is not None 139 140 141 def get_u2ftest_output(self): 142 """Read the new output""" 143 self.u2ftest_job.process_output() 144 self.stdout.seek(self.last_len) 145 output = self.stdout.read().strip() 146 self.last_len = self.stdout.len 147 return output 148 149 def run_u2ftest(self): 150 """Run U2FTest with the U2F device""" 151 self.last_len = 0 152 self.output = '' 153 154 u2ftest_cmd = utils.sh_escape('%s %s' % (self.U2F_TEST_PATH, 155 self.dev_path)) 156 full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'), 157 u2ftest_cmd) 158 self.stdout = StringIO.StringIO() 159 # Start running U2FTest in the background. 160 self.u2ftest_job = utils.BgJob(full_ssh_command, 161 nickname='u2ftest', 162 stdout_tee=self.stdout, 163 stderr_tee=utils.TEE_TO_LOGS, 164 stdin=subprocess.PIPE) 165 if self.u2ftest_job == None: 166 raise error.TestFail('could not start u2ftest') 167 168 try: 169 utils.wait_for_value(self.check_u2ftest_and_press_power_button, 170 expected_value=True, timeout_sec=30) 171 finally: 172 self.close_u2ftest() 173 174 175 def close_u2ftest(self): 176 """Terminate the process and check the results.""" 177 exit_status = utils.nuke_subprocess(self.u2ftest_job.sp) 178 179 stdout = self.stdout.getvalue().strip() 180 if stdout: 181 logging.debug('stdout of U2FTest:\n%s', stdout) 182 if exit_status: 183 logging.error('stderr of U2FTest:\n%s', self.output) 184 raise error.TestError('U2FTest: %s' % self.output) 185 186 187 def run_once(self, host): 188 """Run U2FTest""" 189 self.host = host 190 191 if not self.host.path_exists(self.U2F_TEST_PATH): 192 raise error.TestNAError('Device does not have U2FTest support') 193 194 # u2fd reads files from the user's home dir, so we need to log in. 195 self.host.run('/usr/local/autotest/bin/autologin.py') 196 197 # u2fd needs the policy file to exist. 198 self.wait_for_policy() 199 200 logging.info("testing u2fd --u2f") 201 self.set_u2fd_flags(True, False, False) 202 # Setting the flags restarts u2fd, which will re-create the u2f device. 203 self.update_u2f_device_path() 204 self.run_u2ftest(); 205 206 logging.info("testing u2fd --g2f") 207 self.set_u2fd_flags(False, True, False) 208 self.update_u2f_device_path() 209 self.run_u2ftest(); 210 211 logging.info("testing u2fd --u2f --user_keys") 212 self.set_u2fd_flags(True, False, True) 213 self.update_u2f_device_path() 214 self.run_u2ftest(); 215 216 logging.info("testing u2fd --g2f --user_keys") 217 self.set_u2fd_flags(False, True, True) 218 self.update_u2f_device_path() 219 self.run_u2ftest(); 220