1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import logging, os, random, re, shutil, string, time 11 12import common 13 14from autotest_lib.client.cros import constants 15from autotest_lib.client.bin import utils 16from autotest_lib.client.common_lib import error 17from autotest_lib.client.common_lib.cros import tpm_utils 18from autotest_lib.client.cros.tpm import * 19 20ATTESTATION_CMD = '/usr/bin/attestation_client' 21CRYPTOHOME_CMD = '/usr/sbin/cryptohome' 22TPM_MANAGER_CMD = '/usr/bin/tpm_manager_client' 23GUEST_USER_NAME = '$guest' 24UNAVAILABLE_ACTION = 'Unknown action or no action given.' 25MOUNT_RETRY_COUNT = 20 26TEMP_MOUNT_PATTERN = '/home/.shadow/%s/temporary_mount' 27VAULT_PATH_PATTERN = '/home/.shadow/%s/vault' 28 29DBUS_PROTOS_DEP = 'dbus_protos' 30 31LEC_KEY = 'low_entropy_credentials_supported' 32 33 34def get_user_hash(user): 35 """Get the user hash for the given user.""" 36 return utils.system_output(['cryptohome', '--action=obfuscate_user', 37 '--user=%s' % user]) 38 39 40def user_path(user): 41 """Get the user mount point for the given user.""" 42 return utils.system_output(['cryptohome-path', 'user', user]) 43 44 45def system_path(user): 46 """Get the system mount point for the given user.""" 47 return utils.system_output(['cryptohome-path', 'system', user]) 48 49 50def temporary_mount_path(user): 51 """Get the vault mount path used during crypto-migration for the user. 52 53 @param user: user the temporary mount should be for 54 """ 55 return TEMP_MOUNT_PATTERN % (get_user_hash(user)) 56 57 58def vault_path(user): 59 """ Get the vault path for the given user. 60 61 @param user: The user who's vault path should be returned. 62 """ 63 return VAULT_PATH_PATTERN % (get_user_hash(user)) 64 65 66def ensure_clean_cryptohome_for(user, password=None): 67 """Ensure a fresh cryptohome exists for user. 68 69 @param user: user who needs a shiny new cryptohome. 70 @param password: if unset, a random password will be used. 71 """ 72 if not password: 73 password = ''.join(random.sample(string.ascii_lowercase, 6)) 74 unmount_vault(user) 75 remove_vault(user) 76 mount_vault(user, password, create=True) 77 78 79def get_tpm_password(): 80 """Get the TPM password. 81 82 Returns: 83 A TPM password 84 """ 85 out = run_cmd(TPM_MANAGER_CMD + ' status') 86 match = re.search('owner_password: (\w*)', out) 87 password = '' 88 if match: 89 hex_pass = match.group(1) 90 password = ''.join( 91 chr(int(hex_pass[i:i + 2], 16)) 92 for i in range(0, len(hex_pass), 2)) 93 return password 94 95 96def get_fwmp(cleared_fwmp=False): 97 """Get the firmware management parameters. 98 99 Args: 100 cleared_fwmp: True if the space should not exist. 101 102 Returns: 103 The dictionary with the FWMP contents, for example: 104 { 'flags': 0xbb41, 105 'developer_key_hash': 106 "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\ 107 000\000\000\000\000\000\000\000\000\000\000", 108 } 109 or a dictionary with the Error if the FWMP doesn't exist and 110 cleared_fwmp is True 111 { 'error': 'CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS_INVALID' } 112 113 Raises: 114 ChromiumOSError if any expected field is not found in the cryptohome 115 output. This would typically happen when FWMP state does not match 116 'cleared_fwmp' 117 """ 118 out = run_cmd(CRYPTOHOME_CMD + 119 ' --action=get_firmware_management_parameters') 120 121 if cleared_fwmp: 122 if tpm_utils.FwmpIsAllZero(out): 123 return {} 124 fields = ['error'] 125 else: 126 fields = ['flags', 'developer_key_hash'] 127 128 status = {} 129 for field in fields: 130 match = re.search('%s: (\S+)\s' % field, out) 131 if not match: 132 raise ChromiumOSError('Invalid FWMP field %s: "%s".' % 133 (field, out)) 134 status[field] = match.group(1).strip() 135 return status 136 137 138def set_fwmp(flags, developer_key_hash=None): 139 """Set the firmware management parameter contents. 140 141 Args: 142 developer_key_hash: a string with the developer key hash 143 144 Raises: 145 ChromiumOSError cryptohome cannot set the FWMP contents 146 """ 147 cmd = (CRYPTOHOME_CMD + 148 ' --action=set_firmware_management_parameters ' 149 '--flags=' + flags) 150 if developer_key_hash: 151 cmd += ' --developer_key_hash=' + developer_key_hash 152 153 out = run_cmd(cmd) 154 if 'SetFirmwareManagementParameters success' not in out: 155 raise ChromiumOSError('failed to set FWMP: %s' % out) 156 157 158def is_tpm_lockout_in_effect(): 159 """Returns true if the TPM lockout is in effect; false otherwise.""" 160 status = get_tpm_da_info() 161 return status.get('dictionary_attack_lockout_in_effect', None) 162 163 164def get_login_status(): 165 """Query the login status 166 167 Returns: 168 A login status dictionary containing: 169 { 'owner_user_exists': True|False } 170 """ 171 out = run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status') 172 status = {} 173 for field in ['owner_user_exists']: 174 match = re.search('%s: (true|false)' % field, out) 175 if not match: 176 raise ChromiumOSError('Invalid login status: "%s".' % out) 177 status[field] = match.group(1) == 'true' 178 return status 179 180 181def get_install_attribute_status(): 182 """Query the install attribute status 183 184 Returns: 185 A status string, which could be: 186 "UNKNOWN" 187 "TPM_NOT_OWNED" 188 "FIRST_INSTALL" 189 "VALID" 190 "INVALID" 191 """ 192 out = run_cmd(CRYPTOHOME_CMD + ' --action=install_attributes_get_status') 193 return out.strip() 194 195 196def lock_install_attributes(attrs): 197 """Set and lock install attributes for the device. 198 199 @param attrs: dict of install attributes. 200 """ 201 202 take_tpm_ownership() 203 wait_for_install_attributes_ready() 204 for name, value in attrs.items(): 205 args = [ 206 CRYPTOHOME_CMD, '--action=install_attributes_set', 207 '--name="%s"' % name, 208 '--value="%s"' % value 209 ] 210 cmd = ' '.join(args) 211 if (utils.system(cmd, ignore_status=True) != 0): 212 return False 213 214 out = run_cmd(CRYPTOHOME_CMD + ' --action=install_attributes_finalize') 215 return (out.strip() == 'InstallAttributesFinalize(): 1') 216 217 218def wait_for_install_attributes_ready(): 219 """Wait until install attributes are ready. 220 """ 221 cmd = CRYPTOHOME_CMD + ' --action=install_attributes_is_ready' 222 utils.poll_for_condition( 223 lambda: run_cmd(cmd).strip() == 'InstallAttributesIsReady(): 1', 224 timeout=300, 225 exception=error.TestError( 226 'Timeout waiting for install attributes to be ready')) 227 228 229def get_tpm_attestation_status(): 230 """Get the TPM attestation status. Works similar to get_tpm_status(). 231 """ 232 out = run_cmd(ATTESTATION_CMD + ' status') 233 status = {} 234 for field in ['prepared_for_enrollment', 'enrolled']: 235 match = re.search('%s: (true|false)' % field, out) 236 if not match: 237 raise ChromiumOSError('Invalid attestation status: "%s".' % out) 238 status[field] = match.group(1) == 'true' 239 return status 240 241 242def take_tpm_ownership(wait_for_ownership=True): 243 """Take TPM ownership. 244 245 Args: 246 wait_for_ownership: block until TPM is owned if true 247 """ 248 run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership') 249 if wait_for_ownership: 250 # Note that waiting for the 'Ready' flag is more correct than waiting 251 # for the 'Owned' flag, as the latter is set by cryptohomed before some 252 # of the ownership tasks are completed. 253 utils.poll_for_condition( 254 lambda: get_tpm_status()['Ready'], 255 timeout=300, 256 exception=error.TestError('Timeout waiting for TPM ownership')) 257 258 259def verify_ek(): 260 """Verify the TPM endorsement key. 261 262 Returns true if EK is valid. 263 """ 264 cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek' 265 return (utils.system(cmd, ignore_status=True) == 0) 266 267 268def remove_vault(user): 269 """Remove the given user's vault from the shadow directory.""" 270 logging.debug('user is %s', user) 271 user_hash = get_user_hash(user) 272 logging.debug('Removing vault for user %s with hash %s', user, user_hash) 273 cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user 274 run_cmd(cmd) 275 # Ensure that the vault does not exist. 276 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 277 raise ChromiumOSError('Cryptohome could not remove the user\'s vault.') 278 279 280def remove_all_vaults(): 281 """Remove any existing vaults from the shadow directory. 282 283 This function must be run with root privileges. 284 """ 285 for item in os.listdir(constants.SHADOW_ROOT): 286 abs_item = os.path.join(constants.SHADOW_ROOT, item) 287 if os.path.isdir(os.path.join(abs_item, 'vault')): 288 logging.debug('Removing vault for user with hash %s', item) 289 shutil.rmtree(abs_item) 290 291 292def mount_vault(user, password, create=False, key_label=None): 293 """Mount the given user's vault. Mounts should be created by calling this 294 function with create=True, and can be used afterwards with create=False. 295 Only try to mount existing vaults created with this function. 296 297 """ 298 args = [CRYPTOHOME_CMD, '--action=mount_ex', '--user=%s' % user, 299 '--password=%s' % password, '--async'] 300 if create: 301 args += ['--create'] 302 if key_label is None: 303 key_label = 'bar' 304 if key_label is not None: 305 args += ['--key_label=%s' % key_label] 306 logging.info(run_cmd(' '.join(args))) 307 # Ensure that the vault exists in the shadow directory. 308 user_hash = get_user_hash(user) 309 if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 310 retry = 0 311 mounted = False 312 while retry < MOUNT_RETRY_COUNT and not mounted: 313 time.sleep(1) 314 logging.info("Retry %s", str(retry + 1)) 315 run_cmd(' '.join(args)) 316 # TODO: Remove this additional call to get_user_hash(user) when 317 # crbug.com/690994 is fixed 318 user_hash = get_user_hash(user) 319 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 320 mounted = True 321 retry += 1 322 if not mounted: 323 raise ChromiumOSError('Cryptohome vault not found after mount.') 324 # Ensure that the vault is mounted. 325 if not is_permanent_vault_mounted(user=user, allow_fail=True): 326 raise ChromiumOSError('Cryptohome created a vault but did not mount.') 327 328 329def mount_guest(): 330 """Mount the guest vault.""" 331 args = [CRYPTOHOME_CMD, '--action=mount_guest_ex'] 332 logging.info(run_cmd(' '.join(args))) 333 # Ensure that the guest vault is mounted. 334 if not is_guest_vault_mounted(allow_fail=True): 335 raise ChromiumOSError('Cryptohome did not mount guest vault.') 336 337 338def test_auth(user, password): 339 """Test key auth.""" 340 cmd = [CRYPTOHOME_CMD, '--action=check_key_ex', '--user=%s' % user, 341 '--password=%s' % password, '--async'] 342 out = run_cmd(' '.join(cmd)) 343 logging.info(out) 344 return 'Key authenticated.' in out 345 346 347def add_le_key(user, password, new_password, new_key_label): 348 """Add low entropy key.""" 349 args = [CRYPTOHOME_CMD, '--action=add_key_ex', '--key_policy=le', 350 '--user=%s' % user, '--password=%s' % password, 351 '--new_key_label=%s' % new_key_label, 352 '--new_password=%s' % new_password] 353 logging.info(run_cmd(' '.join(args))) 354 355 356def remove_key(user, password, remove_key_label): 357 """Remove a key.""" 358 args = [CRYPTOHOME_CMD, '--action=remove_key_ex', '--user=%s' % user, 359 '--password=%s' % password, 360 '--remove_key_label=%s' % remove_key_label] 361 logging.info(run_cmd(' '.join(args))) 362 363 364def get_supported_key_policies(host=None): 365 """Get supported key policies.""" 366 args = [CRYPTOHOME_CMD, '--action=get_supported_key_policies'] 367 if host is not None: 368 out = host.run(args).stdout 369 else: 370 out = run_cmd(' '.join(args)) 371 logging.info(out) 372 policies = {} 373 for line in out.splitlines(): 374 match = re.search('([^:]+): (true|false)', line.strip()) 375 if match: 376 policies[match.group(1)] = match.group(2) == 'true' 377 return policies 378 379 380def is_low_entropy_credentials_supported(host=None): 381 """ Returns True if low entropy credentials are supported.""" 382 key_policies = get_supported_key_policies(host) 383 return LEC_KEY in key_policies and key_policies[LEC_KEY] 384 385 386def unmount_vault(user=None): 387 """Unmount the given user's vault. 388 389 Once unmounting for a specific user is supported, the user parameter will 390 name the target user. See crosbug.com/20778. 391 """ 392 run_cmd(CRYPTOHOME_CMD + ' --action=unmount') 393 # Ensure that the vault is not mounted. 394 if user is not None and is_vault_mounted(user, allow_fail=True): 395 raise ChromiumOSError('Cryptohome did not unmount the user.') 396 397 398def __get_mount_info(mount_point, allow_fail=False): 399 """Get information about the active mount at a given mount point.""" 400 cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts' 401 # 'cryptohome-namespace-mounter' is currently only used for Guest sessions. 402 mounter_exe = 'cryptohome-namespace-mounter' 403 mounter_pid = 'pgrep -o -f %s' % mounter_exe 404 mounter_path = '/proc/$(%s)/mounts' % mounter_pid 405 406 status = utils.system(mounter_pid, ignore_status=True) 407 # Only check for these mounts if the mounter executable is running. 408 if status == 0: 409 try: 410 logging.debug('Active %s mounts:\n' % mounter_exe + 411 utils.system_output('cat %s' % mounter_path)) 412 ns_mount_line = utils.system_output( 413 'grep %s %s' % (mount_point, mounter_path), 414 ignore_status=allow_fail) 415 except Exception as e: 416 logging.error(e) 417 raise ChromiumOSError('Could not get info about cryptohome vault ' 418 'through %s. See logs for complete ' 419 'mount-point.' 420 % os.path.dirname(str(mount_point))) 421 return ns_mount_line.split() 422 423 try: 424 logging.debug('Active cryptohome mounts:\n%s', 425 utils.system_output('cat %s' % cryptohomed_path)) 426 mount_line = utils.system_output( 427 'grep %s %s' % (mount_point, cryptohomed_path), 428 ignore_status=allow_fail) 429 except Exception as e: 430 logging.error(e) 431 raise ChromiumOSError('Could not get info about cryptohome vault ' 432 'through %s. See logs for complete mount-point.' 433 % os.path.dirname(str(mount_point))) 434 return mount_line.split() 435 436 437def __get_user_mount_info(user, allow_fail=False): 438 """Get information about the active mounts for a given user. 439 440 Returns the active mounts at the user's user and system mount points. If no 441 user is given, the active mount at the shared mount point is returned 442 (regular users have a bind-mount at this mount point for backwards 443 compatibility; the guest user has a mount at this mount point only). 444 """ 445 return [__get_mount_info(mount_point=user_path(user), 446 allow_fail=allow_fail), 447 __get_mount_info(mount_point=system_path(user), 448 allow_fail=allow_fail)] 449 450 451def is_vault_mounted(user, regexes=None, allow_fail=False): 452 """Check whether a vault is mounted for the given user. 453 454 user: If no user is given, the shared mount point is checked, determining 455 whether a vault is mounted for any user. 456 regexes: dictionary of regexes to matches against the mount information. 457 The mount filesystem for the user's user and system mounts point must 458 match one of the keys. 459 The mount source point must match the selected device regex. 460 461 In addition, if mounted over ext4, we check the directory is encrypted. 462 """ 463 if regexes is None: 464 regexes = { 465 constants.CRYPTOHOME_FS_REGEX_ANY : 466 constants.CRYPTOHOME_DEV_REGEX_ANY 467 } 468 user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail) 469 for mount_info in user_mount_info: 470 # Look at each /proc/../mount lines that match mount point for a given 471 # user user/system mount (/home/user/.... /home/root/...) 472 473 # We should have at least 3 arguments (source, mount, type of mount) 474 if len(mount_info) < 3: 475 return False 476 477 device_regex = None 478 for fs_regex in regexes.keys(): 479 if re.match(fs_regex, mount_info[2]): 480 device_regex = regexes[fs_regex] 481 break 482 483 if not device_regex: 484 # The third argument in not the expected mount point type. 485 return False 486 487 # Check if the mount source match the device regex: it can be loose, 488 # (anything) or stricter if we expect guest filesystem. 489 if not re.match(device_regex, mount_info[0]): 490 return False 491 492 return True 493 494 495def is_guest_vault_mounted(allow_fail=False): 496 """Check whether a vault is mounted for the guest user. 497 It should be a mount of an ext4 partition on a loop device 498 or be backed by tmpfs. 499 """ 500 return is_vault_mounted( 501 user=GUEST_USER_NAME, 502 regexes={ 503 # Remove tmpfs support when it becomes unnecessary as all guest 504 # modes will use ext4 on a loop device. 505 constants.CRYPTOHOME_FS_REGEX_EXT4: 506 constants.CRYPTOHOME_DEV_REGEX_LOOP_DEVICE, 507 constants.CRYPTOHOME_FS_REGEX_TMPFS: 508 constants.CRYPTOHOME_DEV_REGEX_GUEST, 509 }, 510 allow_fail=allow_fail) 511 512 513def is_permanent_vault_mounted(user, allow_fail=False): 514 """Check if user is mounted over ecryptfs or ext4 crypto. """ 515 return is_vault_mounted( 516 user=user, 517 regexes={ 518 constants.CRYPTOHOME_FS_REGEX_ECRYPTFS: 519 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW, 520 constants.CRYPTOHOME_FS_REGEX_EXT4: 521 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_DEVICE, 522 }, 523 allow_fail=allow_fail) 524 525 526def get_mounted_vault_path(user, allow_fail=False): 527 """Get the path where the decrypted data for the user is located.""" 528 return os.path.join(constants.SHADOW_ROOT, get_user_hash(user), 'mount') 529 530 531def canonicalize(credential): 532 """Perform basic canonicalization of |email_address|. 533 534 Perform basic canonicalization of |email_address|, taking into account that 535 gmail does not consider '.' or caps inside a username to matter. It also 536 ignores everything after a '+'. For example, 537 c.masone+abc@gmail.com == cMaSone@gmail.com, per 538 http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313 539 """ 540 if not credential: 541 return None 542 543 parts = credential.split('@') 544 if len(parts) != 2: 545 raise error.TestError('Malformed email: ' + credential) 546 547 (name, domain) = parts 548 name = name.partition('+')[0] 549 if (domain == constants.SPECIAL_CASE_DOMAIN): 550 name = name.replace('.', '') 551 return '@'.join([name, domain]).lower() 552 553 554def crash_cryptohomed(): 555 """Let cryptohome crash.""" 556 # Try to kill cryptohomed so we get something to work with. 557 pid = run_cmd('pgrep cryptohomed') 558 try: 559 pid = int(pid) 560 except ValueError as e: # empty or invalid string 561 raise error.TestError('Cryptohomed was not running') 562 utils.system('kill -ABRT %d' % pid) 563 # CONT just in case cryptohomed had a spurious STOP. 564 utils.system('kill -CONT %d' % pid) 565 utils.poll_for_condition( 566 lambda: utils.system('ps -p %d' % pid, 567 ignore_status=True) != 0, 568 timeout=180, 569 exception=error.TestError( 570 'Timeout waiting for cryptohomed to coredump')) 571 572 573def create_ecryptfs_homedir(user, password): 574 """Creates a new home directory as ecryptfs. 575 576 If a home directory for the user exists already, it will be removed. 577 The resulting home directory will be mounted. 578 579 @param user: Username to create the home directory for. 580 @param password: Password to use when creating the home directory. 581 """ 582 unmount_vault(user) 583 remove_vault(user) 584 args = [ 585 CRYPTOHOME_CMD, 586 '--action=mount_ex', 587 '--user=%s' % user, 588 '--password=%s' % password, 589 '--key_label=foo', 590 '--ecryptfs', 591 '--create'] 592 logging.info(run_cmd(' '.join(args))) 593 if not is_vault_mounted( 594 user, 595 regexes={ 596 constants.CRYPTOHOME_FS_REGEX_ECRYPTFS: 597 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW 598 }, 599 allow_fail=True): 600 raise ChromiumOSError('Ecryptfs home could not be created') 601 602 603def do_dircrypto_migration(user, password, timeout=600): 604 """Start dircrypto migration for the user. 605 606 @param user: The user to migrate. 607 @param password: The password used to mount the users vault 608 @param timeout: How long in seconds to wait for the migration to finish 609 before failing. 610 """ 611 unmount_vault(user) 612 args = [ 613 CRYPTOHOME_CMD, 614 '--action=mount_ex', 615 '--to_migrate_from_ecryptfs', 616 '--user=%s' % user, 617 '--password=%s' % password] 618 logging.info(run_cmd(' '.join(args))) 619 if not __get_mount_info(temporary_mount_path(user), allow_fail=True): 620 raise ChromiumOSError('Failed to mount home for migration') 621 args = [CRYPTOHOME_CMD, '--action=migrate_to_dircrypto', '--user=%s' % user] 622 logging.info(run_cmd(' '.join(args))) 623 utils.poll_for_condition( 624 lambda: not __get_mount_info( 625 temporary_mount_path(user), allow_fail=True), 626 timeout=timeout, 627 exception=error.TestError( 628 'Timeout waiting for dircrypto migration to finish')) 629 630 631def change_password(user, password, new_password): 632 """Change user password.""" 633 args = [ 634 CRYPTOHOME_CMD, 635 '--action=migrate_key_ex', 636 '--user=%s' % user, 637 '--old_password=%s' % password, 638 '--password=%s' % new_password] 639 out = run_cmd(' '.join(args)) 640 logging.info(out) 641 if 'Key migration succeeded.' not in out: 642 raise ChromiumOSError('Key migration failed.') 643