1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Utility functions used for PKCS#11 library testing.""" 7 8from __future__ import absolute_import 9from __future__ import division 10from __future__ import print_function 11 12import grp, logging, os, pwd, re, stat, sys, shutil, pwd, grp 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error 16 17USER_TOKEN_PREFIX = 'User TPM Token ' 18TMP_CHAPS_DIR = '/tmp/chaps' 19CHAPS_DIR_PERM = 0o750 20SYSTEM_TOKEN_NAME = 'System TPM Token' 21SYSTEM_TOKEN_DIR = '/var/lib/chaps' 22INVALID_SLOT_ID = '100' 23 24 25def __run_cmd(cmd, ignore_status=False): 26 """Runs a command and returns the output from both stdout and stderr.""" 27 return utils.system_output(cmd + ' 2>&1', retain_output=True, 28 ignore_status=ignore_status).strip() 29 30def __get_token_paths(exclude_system_token): 31 """Return a list with a path for each PKCS #11 token currently loaded.""" 32 token_paths = [] 33 for line in __run_cmd('chaps_client --list').split('\n'): 34 match = re.search(r'Slot \d+: (/.*)$', line) 35 if match: 36 if exclude_system_token and match.group(1) == SYSTEM_TOKEN_DIR: 37 continue 38 token_paths.append(match.group(1)) 39 return token_paths 40 41def __get_pkcs11_file_list(token_path): 42 """Return string with PKCS#11 file paths and their associated metadata.""" 43 find_args = '-printf "\'%p\', \'%u:%g\', 0%m\n"' 44 file_list_output = __run_cmd('find %s ' % token_path + find_args) 45 return file_list_output 46 47def __get_token_slot_by_path(token_path): 48 token_list = __run_cmd('p11_replay --list_tokens') 49 for line in token_list.split('\n'): 50 match = re.search(r'^Slot (\d+): ' + token_path, line) 51 if not match: 52 continue 53 return match.group(1) 54 return INVALID_SLOT_ID 55 56def __verify_tokenname(token_path): 57 """Verify that the TPM token name is correct.""" 58 # The token path is expected to be of the form: 59 # /run/daemon-store/chaps/<obfuscated_user_id> 60 match = re.search(r'/run/daemon-store/chaps/(.*)', token_path) 61 if not match: 62 return False 63 obfuscated_user = match.group(1) 64 # We expect the token label to contain first 16 characters of the obfuscated 65 # user id. This is the same value we extracted from |token_path|. 66 expected_user_token_label = USER_TOKEN_PREFIX + obfuscated_user[:16] 67 # The p11_replay tool will list tokens in the following form: 68 # Slot 1: <token label> 69 token_list = __run_cmd('p11_replay --list_tokens') 70 for line in token_list.split('\n'): 71 match = re.search(r'^Slot \d+: (.*)$', line) 72 if not match: 73 continue 74 token_label = match.group(1).rstrip() 75 if (token_label == expected_user_token_label): 76 return True 77 # Ignore the system token label. 78 if token_label == SYSTEM_TOKEN_NAME: 79 continue 80 logging.error('Unexpected token label: |%s|', token_label) 81 logging.error('Invalid or missing PKCS#11 token label!') 82 return False 83 84def __verify_permissions(token_path): 85 """Verify that the permissions on the initialized token dir are correct.""" 86 # List of 3-tuples consisting of (path, user:group, octal permissions). 87 # Can be generated (for example), by: 88 # find <token_path>/chaps -printf "'%p', '%u:%g', 0%m\n" 89 expected_permissions = [ 90 (token_path, 'chaps:chronos-access', CHAPS_DIR_PERM), 91 ('%s/database' % token_path, 'chaps:chronos-access', CHAPS_DIR_PERM)] 92 for item in expected_permissions: 93 path = item[0] 94 (user, group) = item[1].split(':') 95 perms = item[2] 96 stat_buf = os.lstat(path) 97 if not stat_buf: 98 logging.error('Could not stat %s while checking for permissions.', 99 path) 100 return False 101 # Check ownership. 102 path_user = pwd.getpwuid(stat_buf.st_uid).pw_name 103 path_group = grp.getgrgid(stat_buf.st_gid).gr_name 104 if path_user != user or path_group != group: 105 logging.error('Ownership of %s does not match! Got = (%s, %s)' 106 ', Expected = (%s, %s)', path, path_user, path_group, 107 user, group) 108 return False 109 110 # Check permissions. 111 path_perms = stat.S_IMODE(stat_buf.st_mode) 112 if path_perms != perms: 113 logging.error('Permissions for %s do not match! (Got = %s' 114 ', Expected = %s)', path, oct(path_perms), oct(perms)) 115 return False 116 117 return True 118 119def verify_pkcs11_initialized(): 120 """Checks if the PKCS#11 token is initialized properly.""" 121 token_path_list = __get_token_paths(exclude_system_token=True) 122 if len(token_path_list) != 1: 123 logging.error('Expecting a single signed-in user with a token.') 124 return False 125 126 verify_cmd = ('cryptohome --action=pkcs11_is_user_token_ok') 127 __run_cmd(verify_cmd) 128 129 verify_result = True 130 # Do additional sanity tests. 131 if not __verify_tokenname(token_path_list[0]): 132 logging.error('Verification of token name failed!') 133 verify_result = False 134 if not __verify_permissions(token_path_list[0]): 135 logging.error('PKCS#11 file list:\n%s', 136 __get_pkcs11_file_list(token_path_list[0])) 137 logging.error( 138 'Verification of PKCS#11 subsystem and token permissions failed!') 139 verify_result = False 140 return verify_result 141 142def load_p11_test_token(auth_data='1234'): 143 """Loads the test token onto a slot. 144 145 @param auth_data: The authorization data to use for the token. 146 """ 147 utils.system('sudo chaps_client --load --path=%s --auth="%s"' % 148 (TMP_CHAPS_DIR, auth_data)) 149 150def change_p11_test_token_auth_data(auth_data, new_auth_data): 151 """Changes authorization data for the test token. 152 153 @param auth_data: The current authorization data. 154 @param new_auth_data: The new authorization data. 155 """ 156 utils.system('sudo chaps_client --change_auth --path=%s --auth="%s" ' 157 '--new_auth="%s"' % (TMP_CHAPS_DIR, auth_data, new_auth_data)) 158 159def unload_p11_test_token(): 160 """Unloads a loaded test token.""" 161 utils.system('sudo chaps_client --unload --path=%s' % TMP_CHAPS_DIR) 162 163def copytree_with_ownership(src, dst): 164 """Like shutil.copytree but also copies owner and group attributes. 165 @param src: Source directory. 166 @param dst: Destination directory. 167 """ 168 utils.system('cp -rp %s %s' % (src, dst)) 169 170def setup_p11_test_token(unload_user_tokens, auth_data='1234'): 171 """Configures a PKCS #11 token for testing. 172 173 Any existing test token will be automatically cleaned up. 174 175 @param unload_user_tokens: Whether to unload all user tokens. 176 @param auth_data: Initial token authorization data. 177 """ 178 cleanup_p11_test_token() 179 if unload_user_tokens: 180 for path in __get_token_paths(exclude_system_token=False): 181 utils.system('sudo chaps_client --unload --path=%s' % path) 182 os.makedirs(TMP_CHAPS_DIR) 183 uid = pwd.getpwnam('chaps')[2] 184 gid = grp.getgrnam('chronos-access')[2] 185 os.chown(TMP_CHAPS_DIR, uid, gid) 186 os.chmod(TMP_CHAPS_DIR, CHAPS_DIR_PERM) 187 load_p11_test_token(auth_data) 188 unload_p11_test_token() 189 copytree_with_ownership(TMP_CHAPS_DIR, '%s_bak' % TMP_CHAPS_DIR) 190 191def restore_p11_test_token(): 192 """Restores a PKCS #11 test token to its initial state.""" 193 shutil.rmtree(TMP_CHAPS_DIR) 194 copytree_with_ownership('%s_bak' % TMP_CHAPS_DIR, TMP_CHAPS_DIR) 195 196def get_p11_test_token_db_path(): 197 """Returns the test token database path.""" 198 return '%s/database' % TMP_CHAPS_DIR 199 200def verify_p11_test_token(): 201 """Verifies that a test token is working and persistent.""" 202 output = __run_cmd('p11_replay --generate --replay_wifi', 203 ignore_status=True) 204 if not re.search('Sign: CKR_OK', output): 205 print(output, file=sys.stderr) 206 return False 207 unload_p11_test_token() 208 load_p11_test_token() 209 output = __run_cmd('p11_replay --replay_wifi --cleanup', 210 ignore_status=True) 211 if not re.search('Sign: CKR_OK', output): 212 print(output, file=sys.stderr) 213 return False 214 return True 215 216def cleanup_p11_test_token(): 217 """Deletes the test token.""" 218 unload_p11_test_token() 219 shutil.rmtree(TMP_CHAPS_DIR, ignore_errors=True) 220 shutil.rmtree('%s_bak' % TMP_CHAPS_DIR, ignore_errors=True) 221 222def wait_for_pkcs11_token(): 223 """Waits for the PKCS #11 token to be available. 224 225 This should be called only after a login and is typically called immediately 226 after a login. 227 228 Returns: 229 True if the token is available. 230 """ 231 try: 232 utils.poll_for_condition( 233 lambda: utils.system('cryptohome --action=pkcs11_is_user_token_ok', 234 ignore_status=True) == 0, 235 desc='PKCS #11 token.', 236 timeout=300) 237 except utils.TimeoutError: 238 return False 239 return True 240 241def __p11_replay_on_user_token(extra_args=''): 242 """Executes a typical command replay on the current user token. 243 244 Args: 245 extra_args: Additional arguments to pass to p11_replay. 246 247 Returns: 248 The command output. 249 """ 250 if not wait_for_pkcs11_token(): 251 raise error.TestError('Timeout while waiting for pkcs11 token') 252 return __run_cmd('p11_replay --slot=%s %s' 253 % (__get_token_slot_by_path(USER_TOKEN_PREFIX), 254 extra_args), 255 ignore_status=True) 256 257def inject_and_test_key(): 258 """Injects a key into a PKCS #11 token and tests that it can sign.""" 259 output = __p11_replay_on_user_token('--replay_wifi --inject') 260 return re.search('Sign: CKR_OK', output) 261 262def test_and_cleanup_key(): 263 """Tests a PKCS #11 key before deleting it.""" 264 output = __p11_replay_on_user_token('--replay_wifi --cleanup') 265 return re.search('Sign: CKR_OK', output) 266 267def generate_user_key(): 268 """Generates a key in the current user token.""" 269 output = __p11_replay_on_user_token('--replay_wifi --generate') 270 return re.search('Sign: CKR_OK', output) 271 272