1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import dbus, gobject, logging, os, random, re, shutil, string, sys, time 11from dbus.mainloop.glib import DBusGMainLoop 12from six.moves import map 13 14import common 15 16from autotest_lib.client.cros import constants 17from autotest_lib.client.bin import utils 18from autotest_lib.client.common_lib import error 19from autotest_lib.client.cros.cros_disks import DBusClient 20 21ATTESTATION_CMD = '/usr/bin/attestation_client' 22CRYPTOHOME_CMD = '/usr/sbin/cryptohome' 23TPM_MANAGER_CMD = '/usr/bin/tpm_manager_client' 24GUEST_USER_NAME = '$guest' 25UNAVAILABLE_ACTION = 'Unknown action or no action given.' 26MOUNT_RETRY_COUNT = 20 27TEMP_MOUNT_PATTERN = '/home/.shadow/%s/temporary_mount' 28VAULT_PATH_PATTERN = '/home/.shadow/%s/vault' 29 30DBUS_PROTOS_DEP = 'dbus_protos' 31 32 33class ChromiumOSError(error.TestError): 34 """Generic error for ChromiumOS-specific exceptions.""" 35 pass 36 37def __run_cmd(cmd): 38 return utils.system_output(cmd + ' 2>&1', retain_output=True, 39 ignore_status=True).strip() 40 41def get_user_hash(user): 42 """Get the user hash for the given user.""" 43 return utils.system_output(['cryptohome', '--action=obfuscate_user', 44 '--user=%s' % user]) 45 46 47def user_path(user): 48 """Get the user mount point for the given user.""" 49 return utils.system_output(['cryptohome-path', 'user', user]) 50 51 52def system_path(user): 53 """Get the system mount point for the given user.""" 54 return utils.system_output(['cryptohome-path', 'system', user]) 55 56 57def temporary_mount_path(user): 58 """Get the vault mount path used during crypto-migration for the user. 59 60 @param user: user the temporary mount should be for 61 """ 62 return TEMP_MOUNT_PATTERN % (get_user_hash(user)) 63 64 65def vault_path(user): 66 """ Get the vault path for the given user. 67 68 @param user: The user who's vault path should be returned. 69 """ 70 return VAULT_PATH_PATTERN % (get_user_hash(user)) 71 72 73def ensure_clean_cryptohome_for(user, password=None): 74 """Ensure a fresh cryptohome exists for user. 75 76 @param user: user who needs a shiny new cryptohome. 77 @param password: if unset, a random password will be used. 78 """ 79 if not password: 80 password = ''.join(random.sample(string.ascii_lowercase, 6)) 81 unmount_vault(user) 82 remove_vault(user) 83 mount_vault(user, password, create=True) 84 85 86def get_tpm_status(): 87 """Get the TPM status. 88 89 Returns: 90 A TPM status dictionary, for example: 91 { 'Enabled': True, 92 'Owned': True, 93 'Ready': True 94 } 95 """ 96 out = __run_cmd(TPM_MANAGER_CMD + ' status --nonsensitive') 97 status = {} 98 for field in ['is_enabled', 'is_owned']: 99 match = re.search('%s: (true|false)' % field, out) 100 if not match: 101 raise ChromiumOSError('Invalid TPM status: "%s".' % out) 102 status[field] = match.group(1) == 'true' 103 status['Enabled'] = status['is_enabled'] 104 status['Owned'] = status['is_owned'] 105 status['Ready'] = status['is_enabled'] and status['is_owned'] 106 return status 107 108 109def get_tpm_password(): 110 """Get the TPM password. 111 112 Returns: 113 A TPM password 114 """ 115 out = __run_cmd(TPM_MANAGER_CMD + ' status') 116 match = re.search('owner_password: (\w*)', out) 117 password = '' 118 if match: 119 hex_pass = match.group(1).decode("hex") 120 password = ''.join( 121 chr(int(hex_pass[i:i + 2], 16)) 122 for i in range(0, len(hex_pass), 2)) 123 return password 124 125 126def get_tpm_da_info(): 127 """Get the TPM dictionary attack information. 128 Returns: 129 A TPM dictionary attack status dictionary, for example: 130 { 131 'dictionary_attack_counter': 0, 132 'dictionary_attack_threshold': 200, 133 'dictionary_attack_lockout_in_effect': False, 134 'dictionary_attack_lockout_seconds_remaining': 0 135 } 136 """ 137 status = {} 138 out = __run_cmd(TPM_MANAGER_CMD + ' get_da_info') 139 for line in out.splitlines()[1:-1]: 140 items = line.strip().split(':') 141 if len(items) != 2: 142 continue 143 if items[1].strip() == 'false': 144 value = False 145 elif items[1].strip() == 'true': 146 value = True 147 elif items[1].split('(')[0].strip().isdigit(): 148 value = int(items[1].split('(')[0].strip()) 149 else: 150 value = items[1].strip(' "') 151 status[items[0].strip()] = value 152 return status 153 154 155def get_fwmp(cleared_fwmp=False): 156 """Get the firmware management parameters. 157 158 Args: 159 cleared_fwmp: True if the space should not exist. 160 161 Returns: 162 The dictionary with the FWMP contents, for example: 163 { 'flags': 0xbb41, 164 'developer_key_hash': 165 "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\ 166 000\000\000\000\000\000\000\000\000\000\000", 167 } 168 or a dictionary with the Error if the FWMP doesn't exist and 169 cleared_fwmp is True 170 { 'error': 'CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS_INVALID' } 171 172 Raises: 173 ChromiumOSError if any expected field is not found in the cryptohome 174 output. This would typically happen when FWMP state does not match 175 'clreared_fwmp' 176 """ 177 out = __run_cmd(CRYPTOHOME_CMD + 178 ' --action=get_firmware_management_parameters') 179 180 if cleared_fwmp: 181 fields = ['error'] 182 else: 183 fields = ['flags', 'developer_key_hash'] 184 185 status = {} 186 for field in fields: 187 match = re.search('%s: (\S+)\n' % field, out) 188 if not match: 189 raise ChromiumOSError('Invalid FWMP field %s: "%s".' % 190 (field, out)) 191 status[field] = match.group(1) 192 return status 193 194 195def set_fwmp(flags, developer_key_hash=None): 196 """Set the firmware management parameter contents. 197 198 Args: 199 developer_key_hash: a string with the developer key hash 200 201 Raises: 202 ChromiumOSError cryptohome cannot set the FWMP contents 203 """ 204 cmd = (CRYPTOHOME_CMD + 205 ' --action=set_firmware_management_parameters ' 206 '--flags=' + flags) 207 if developer_key_hash: 208 cmd += ' --developer_key_hash=' + developer_key_hash 209 210 out = __run_cmd(cmd) 211 if 'SetFirmwareManagementParameters success' not in out: 212 raise ChromiumOSError('failed to set FWMP: %s' % out) 213 214 215def is_tpm_lockout_in_effect(): 216 """Returns true if the TPM lockout is in effect; false otherwise.""" 217 status = get_tpm_da_info() 218 return status.get('dictionary_attack_lockout_in_effect', None) 219 220 221def get_login_status(): 222 """Query the login status 223 224 Returns: 225 A login status dictionary containing: 226 { 'owner_user_exists': True|False, 227 'boot_lockbox_finalized': True|False 228 } 229 """ 230 out = __run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status') 231 status = {} 232 for field in ['owner_user_exists', 'boot_lockbox_finalized']: 233 match = re.search('%s: (true|false)' % field, out) 234 if not match: 235 raise ChromiumOSError('Invalid login status: "%s".' % out) 236 status[field] = match.group(1) == 'true' 237 return status 238 239 240def get_install_attribute_status(): 241 """Query the install attribute status 242 243 Returns: 244 A status string, which could be: 245 "UNKNOWN" 246 "TPM_NOT_OWNED" 247 "FIRST_INSTALL" 248 "VALID" 249 "INVALID" 250 """ 251 out = __run_cmd(CRYPTOHOME_CMD + ' --action=install_attributes_get_status') 252 return out.strip() 253 254 255def get_tpm_attestation_status(): 256 """Get the TPM attestation status. Works similar to get_tpm_status(). 257 """ 258 out = __run_cmd(ATTESTATION_CMD + ' status') 259 status = {} 260 for field in ['prepared_for_enrollment', 'enrolled']: 261 match = re.search('%s: (true|false)' % field, out) 262 if not match: 263 raise ChromiumOSError('Invalid attestation status: "%s".' % out) 264 status[field] = match.group(1) == 'true' 265 return status 266 267 268def take_tpm_ownership(wait_for_ownership=True): 269 """Take TPM owernship. 270 271 Args: 272 wait_for_ownership: block until TPM is owned if true 273 """ 274 __run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership') 275 if wait_for_ownership: 276 # Note that waiting for the 'Ready' flag is more correct than waiting 277 # for the 'Owned' flag, as the latter is set by cryptohomed before some 278 # of the ownership tasks are completed. 279 utils.poll_for_condition( 280 lambda: get_tpm_status()['Ready'], 281 timeout=300, 282 exception=error.TestError('Timeout waiting for TPM ownership')) 283 284 285def verify_ek(): 286 """Verify the TPM endorsement key. 287 288 Returns true if EK is valid. 289 """ 290 cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek' 291 return (utils.system(cmd, ignore_status=True) == 0) 292 293 294def remove_vault(user): 295 """Remove the given user's vault from the shadow directory.""" 296 logging.debug('user is %s', user) 297 user_hash = get_user_hash(user) 298 logging.debug('Removing vault for user %s with hash %s', user, user_hash) 299 cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user 300 __run_cmd(cmd) 301 # Ensure that the vault does not exist. 302 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 303 raise ChromiumOSError('Cryptohome could not remove the user\'s vault.') 304 305 306def remove_all_vaults(): 307 """Remove any existing vaults from the shadow directory. 308 309 This function must be run with root privileges. 310 """ 311 for item in os.listdir(constants.SHADOW_ROOT): 312 abs_item = os.path.join(constants.SHADOW_ROOT, item) 313 if os.path.isdir(os.path.join(abs_item, 'vault')): 314 logging.debug('Removing vault for user with hash %s', item) 315 shutil.rmtree(abs_item) 316 317 318def mount_vault(user, password, create=False, key_label=None): 319 """Mount the given user's vault. Mounts should be created by calling this 320 function with create=True, and can be used afterwards with create=False. 321 Only try to mount existing vaults created with this function. 322 323 """ 324 args = [CRYPTOHOME_CMD, '--action=mount_ex', '--user=%s' % user, 325 '--password=%s' % password, '--async'] 326 if create: 327 args += ['--create'] 328 if key_label is None: 329 key_label = 'bar' 330 if key_label is not None: 331 args += ['--key_label=%s' % key_label] 332 logging.info(__run_cmd(' '.join(args))) 333 # Ensure that the vault exists in the shadow directory. 334 user_hash = get_user_hash(user) 335 if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 336 retry = 0 337 mounted = False 338 while retry < MOUNT_RETRY_COUNT and not mounted: 339 time.sleep(1) 340 logging.info("Retry %s", str(retry + 1)) 341 __run_cmd(' '.join(args)) 342 # TODO: Remove this additional call to get_user_hash(user) when 343 # crbug.com/690994 is fixed 344 user_hash = get_user_hash(user) 345 if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)): 346 mounted = True 347 retry += 1 348 if not mounted: 349 raise ChromiumOSError('Cryptohome vault not found after mount.') 350 # Ensure that the vault is mounted. 351 if not is_permanent_vault_mounted(user=user, allow_fail=True): 352 raise ChromiumOSError('Cryptohome created a vault but did not mount.') 353 354 355def mount_guest(): 356 """Mount the guest vault.""" 357 args = [CRYPTOHOME_CMD, '--action=mount_guest_ex'] 358 logging.info(__run_cmd(' '.join(args))) 359 # Ensure that the guest vault is mounted. 360 if not is_guest_vault_mounted(allow_fail=True): 361 raise ChromiumOSError('Cryptohome did not mount guest vault.') 362 363 364def test_auth(user, password): 365 """Test key auth.""" 366 cmd = [CRYPTOHOME_CMD, '--action=check_key_ex', '--user=%s' % user, 367 '--password=%s' % password, '--async'] 368 out = __run_cmd(' '.join(cmd)) 369 logging.info(out) 370 return 'Key authenticated.' in out 371 372 373def add_le_key(user, password, new_password, new_key_label): 374 """Add low entropy key.""" 375 args = [CRYPTOHOME_CMD, '--action=add_key_ex', '--key_policy=le', 376 '--user=%s' % user, '--password=%s' % password, 377 '--new_key_label=%s' % new_key_label, 378 '--new_password=%s' % new_password] 379 logging.info(__run_cmd(' '.join(args))) 380 381 382def remove_key(user, password, remove_key_label): 383 """Remove a key.""" 384 args = [CRYPTOHOME_CMD, '--action=remove_key_ex', '--user=%s' % user, 385 '--password=%s' % password, 386 '--remove_key_label=%s' % remove_key_label] 387 logging.info(__run_cmd(' '.join(args))) 388 389 390def get_supported_key_policies(): 391 """Get supported key policies.""" 392 args = [CRYPTOHOME_CMD, '--action=get_supported_key_policies'] 393 out = __run_cmd(' '.join(args)) 394 logging.info(out) 395 policies = {} 396 for line in out.splitlines(): 397 match = re.search(' ([^:]+): (true|false)', line) 398 if match: 399 policies[match.group(1)] = match.group(2) == 'true' 400 return policies 401 402 403def unmount_vault(user=None): 404 """Unmount the given user's vault. 405 406 Once unmounting for a specific user is supported, the user parameter will 407 name the target user. See crosbug.com/20778. 408 """ 409 __run_cmd(CRYPTOHOME_CMD + ' --action=unmount') 410 # Ensure that the vault is not mounted. 411 if user is not None and is_vault_mounted(user, allow_fail=True): 412 raise ChromiumOSError('Cryptohome did not unmount the user.') 413 414 415def __get_mount_info(mount_point, allow_fail=False): 416 """Get information about the active mount at a given mount point.""" 417 cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts' 418 # 'cryptohome-namespace-mounter' is currently only used for Guest sessions. 419 mounter_exe = 'cryptohome-namespace-mounter' 420 mounter_pid = 'pgrep -o -f %s' % mounter_exe 421 mounter_path = '/proc/$(%s)/mounts' % mounter_pid 422 423 status = utils.system(mounter_pid, ignore_status=True) 424 # Only check for these mounts if the mounter executable is running. 425 if status == 0: 426 try: 427 logging.debug('Active %s mounts:\n' % mounter_exe + 428 utils.system_output('cat %s' % mounter_path)) 429 ns_mount_line = utils.system_output( 430 'grep %s %s' % (mount_point, mounter_path), 431 ignore_status=allow_fail) 432 except Exception as e: 433 logging.error(e) 434 raise ChromiumOSError('Could not get info about cryptohome vault ' 435 'through %s. See logs for complete ' 436 'mount-point.' 437 % os.path.dirname(str(mount_point))) 438 return ns_mount_line.split() 439 440 try: 441 logging.debug('Active cryptohome mounts:\n%s', 442 utils.system_output('cat %s' % cryptohomed_path)) 443 mount_line = utils.system_output( 444 'grep %s %s' % (mount_point, cryptohomed_path), 445 ignore_status=allow_fail) 446 except Exception as e: 447 logging.error(e) 448 raise ChromiumOSError('Could not get info about cryptohome vault ' 449 'through %s. See logs for complete mount-point.' 450 % os.path.dirname(str(mount_point))) 451 return mount_line.split() 452 453 454def __get_user_mount_info(user, allow_fail=False): 455 """Get information about the active mounts for a given user. 456 457 Returns the active mounts at the user's user and system mount points. If no 458 user is given, the active mount at the shared mount point is returned 459 (regular users have a bind-mount at this mount point for backwards 460 compatibility; the guest user has a mount at this mount point only). 461 """ 462 return [__get_mount_info(mount_point=user_path(user), 463 allow_fail=allow_fail), 464 __get_mount_info(mount_point=system_path(user), 465 allow_fail=allow_fail)] 466 467def is_vault_mounted(user, regexes=None, allow_fail=False): 468 """Check whether a vault is mounted for the given user. 469 470 user: If no user is given, the shared mount point is checked, determining 471 whether a vault is mounted for any user. 472 regexes: dictionary of regexes to matches against the mount information. 473 The mount filesystem for the user's user and system mounts point must 474 match one of the keys. 475 The mount source point must match the selected device regex. 476 477 In addition, if mounted over ext4, we check the directory is encrypted. 478 """ 479 if regexes is None: 480 regexes = { 481 constants.CRYPTOHOME_FS_REGEX_ANY : 482 constants.CRYPTOHOME_DEV_REGEX_ANY 483 } 484 user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail) 485 for mount_info in user_mount_info: 486 # Look at each /proc/../mount lines that match mount point for a given 487 # user user/system mount (/home/user/.... /home/root/...) 488 489 # We should have at least 3 arguments (source, mount, type of mount) 490 if len(mount_info) < 3: 491 return False 492 493 device_regex = None 494 for fs_regex in regexes.keys(): 495 if re.match(fs_regex, mount_info[2]): 496 device_regex = regexes[fs_regex] 497 break 498 499 if not device_regex: 500 # The third argument in not the expected mount point type. 501 return False 502 503 # Check if the mount source match the device regex: it can be loose, 504 # (anything) or stricter if we expect guest filesystem. 505 if not re.match(device_regex, mount_info[0]): 506 return False 507 508 return True 509 510 511def is_guest_vault_mounted(allow_fail=False): 512 """Check whether a vault is mounted for the guest user. 513 It should be a mount of an ext4 partition on a loop device 514 or be backed by tmpfs. 515 """ 516 return is_vault_mounted( 517 user=GUEST_USER_NAME, 518 regexes={ 519 # Remove tmpfs support when it becomes unnecessary as all guest 520 # modes will use ext4 on a loop device. 521 constants.CRYPTOHOME_FS_REGEX_EXT4 : 522 constants.CRYPTOHOME_DEV_REGEX_LOOP_DEVICE, 523 constants.CRYPTOHOME_FS_REGEX_TMPFS : 524 constants.CRYPTOHOME_DEV_REGEX_GUEST, 525 }, 526 allow_fail=allow_fail) 527 528def is_permanent_vault_mounted(user, allow_fail=False): 529 """Check if user is mounted over ecryptfs or ext4 crypto. """ 530 return is_vault_mounted( 531 user=user, 532 regexes={ 533 constants.CRYPTOHOME_FS_REGEX_ECRYPTFS : 534 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW, 535 constants.CRYPTOHOME_FS_REGEX_EXT4 : 536 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_DEVICE, 537 }, 538 allow_fail=allow_fail) 539 540def get_mounted_vault_path(user, allow_fail=False): 541 """Get the path where the decrypted data for the user is located.""" 542 return os.path.join(constants.SHADOW_ROOT, get_user_hash(user), 'mount') 543 544 545def canonicalize(credential): 546 """Perform basic canonicalization of |email_address|. 547 548 Perform basic canonicalization of |email_address|, taking into account that 549 gmail does not consider '.' or caps inside a username to matter. It also 550 ignores everything after a '+'. For example, 551 c.masone+abc@gmail.com == cMaSone@gmail.com, per 552 http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313 553 """ 554 if not credential: 555 return None 556 557 parts = credential.split('@') 558 if len(parts) != 2: 559 raise error.TestError('Malformed email: ' + credential) 560 561 (name, domain) = parts 562 name = name.partition('+')[0] 563 if (domain == constants.SPECIAL_CASE_DOMAIN): 564 name = name.replace('.', '') 565 return '@'.join([name, domain]).lower() 566 567 568def crash_cryptohomed(): 569 """Let cryptohome crash.""" 570 # Try to kill cryptohomed so we get something to work with. 571 pid = __run_cmd('pgrep cryptohomed') 572 try: 573 pid = int(pid) 574 except ValueError as e: # empty or invalid string 575 raise error.TestError('Cryptohomed was not running') 576 utils.system('kill -ABRT %d' % pid) 577 # CONT just in case cryptohomed had a spurious STOP. 578 utils.system('kill -CONT %d' % pid) 579 utils.poll_for_condition( 580 lambda: utils.system('ps -p %d' % pid, 581 ignore_status=True) != 0, 582 timeout=180, 583 exception=error.TestError( 584 'Timeout waiting for cryptohomed to coredump')) 585 586 587def create_ecryptfs_homedir(user, password): 588 """Creates a new home directory as ecryptfs. 589 590 If a home directory for the user exists already, it will be removed. 591 The resulting home directory will be mounted. 592 593 @param user: Username to create the home directory for. 594 @param password: Password to use when creating the home directory. 595 """ 596 unmount_vault(user) 597 remove_vault(user) 598 args = [ 599 CRYPTOHOME_CMD, 600 '--action=mount_ex', 601 '--user=%s' % user, 602 '--password=%s' % password, 603 '--key_label=foo', 604 '--ecryptfs', 605 '--create'] 606 logging.info(__run_cmd(' '.join(args))) 607 if not is_vault_mounted(user, regexes={ 608 constants.CRYPTOHOME_FS_REGEX_ECRYPTFS : 609 constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW 610 }, allow_fail=True): 611 raise ChromiumOSError('Ecryptfs home could not be created') 612 613 614def do_dircrypto_migration(user, password, timeout=600): 615 """Start dircrypto migration for the user. 616 617 @param user: The user to migrate. 618 @param password: The password used to mount the users vault 619 @param timeout: How long in seconds to wait for the migration to finish 620 before failing. 621 """ 622 unmount_vault(user) 623 args = [ 624 CRYPTOHOME_CMD, 625 '--action=mount_ex', 626 '--to_migrate_from_ecryptfs', 627 '--user=%s' % user, 628 '--password=%s' % password] 629 logging.info(__run_cmd(' '.join(args))) 630 if not __get_mount_info(temporary_mount_path(user), allow_fail=True): 631 raise ChromiumOSError('Failed to mount home for migration') 632 args = [CRYPTOHOME_CMD, '--action=migrate_to_dircrypto', '--user=%s' % user] 633 logging.info(__run_cmd(' '.join(args))) 634 utils.poll_for_condition( 635 lambda: not __get_mount_info( 636 temporary_mount_path(user), allow_fail=True), 637 timeout=timeout, 638 exception=error.TestError( 639 'Timeout waiting for dircrypto migration to finish')) 640 641 642def change_password(user, password, new_password): 643 """Change user password.""" 644 args = [ 645 CRYPTOHOME_CMD, 646 '--action=migrate_key_ex', 647 '--user=%s' % user, 648 '--old_password=%s' % password, 649 '--password=%s' % new_password] 650 out = __run_cmd(' '.join(args)) 651 logging.info(out) 652 if 'Key migration succeeded.' not in out: 653 raise ChromiumOSError('Key migration failed.') 654 655 656class CryptohomeProxy(DBusClient): 657 """A DBus proxy client for testing the Cryptohome DBus server. 658 """ 659 CRYPTOHOME_BUS_NAME = 'org.chromium.Cryptohome' 660 CRYPTOHOME_OBJECT_PATH = '/org/chromium/Cryptohome' 661 CRYPTOHOME_INTERFACE = 'org.chromium.CryptohomeInterface' 662 ASYNC_CALL_STATUS_SIGNAL = 'AsyncCallStatus' 663 ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS = ( 664 'async_id', 'return_status', 'return_code' 665 ) 666 DBUS_PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties' 667 668 # Default timeout in seconds for the D-Bus connection. 669 DEFAULT_DBUS_TIMEOUT = 30 670 671 def __init__(self, bus_loop=None, autodir=None, job=None, 672 timeout=DEFAULT_DBUS_TIMEOUT): 673 if autodir and job: 674 # Install D-Bus protos necessary for some methods. 675 dep_dir = os.path.join(autodir, 'deps', DBUS_PROTOS_DEP) 676 job.install_pkg(DBUS_PROTOS_DEP, 'dep', dep_dir) 677 sys.path.append(dep_dir) 678 679 # Set up D-Bus main loop and interface. 680 self.main_loop = gobject.MainLoop() 681 if bus_loop is None: 682 bus_loop = DBusGMainLoop(set_as_default=True) 683 self.bus = dbus.SystemBus(mainloop=bus_loop) 684 super(CryptohomeProxy, self).__init__(self.main_loop, self.bus, 685 self.CRYPTOHOME_BUS_NAME, 686 self.CRYPTOHOME_OBJECT_PATH, 687 timeout) 688 self.iface = dbus.Interface(self.proxy_object, 689 self.CRYPTOHOME_INTERFACE) 690 self.properties = dbus.Interface(self.proxy_object, 691 self.DBUS_PROPERTIES_INTERFACE) 692 self.handle_signal(self.CRYPTOHOME_INTERFACE, 693 self.ASYNC_CALL_STATUS_SIGNAL, 694 self.ASYNC_CALL_STATUS_SIGNAL_ARGUMENTS) 695 696 697 # Wrap all proxied calls to catch cryptohomed failures. 698 def __call(self, method, *args): 699 try: 700 return method(*args, timeout=180) 701 except dbus.exceptions.DBusException as e: 702 if e.get_dbus_name() == 'org.freedesktop.DBus.Error.NoReply': 703 logging.error('Cryptohome is not responding. Sending ABRT') 704 crash_cryptohomed() 705 raise ChromiumOSError('cryptohomed aborted. Check crashes!') 706 raise e 707 708 709 def __wait_for_specific_signal(self, signal, data): 710 """Wait for the |signal| with matching |data| 711 Returns the resulting dict on success or {} on error. 712 """ 713 # Do not bubble up the timeout here, just return {}. 714 result = {} 715 try: 716 result = self.wait_for_signal(signal) 717 except utils.TimeoutError: 718 return {} 719 for k in data.keys(): 720 if k not in result or result[k] != data[k]: 721 return {} 722 return result 723 724 725 # Perform a data-less async call. 726 # TODO(wad) Add __async_data_call. 727 def __async_call(self, method, *args): 728 # Clear out any superfluous async call signals. 729 self.clear_signal_content(self.ASYNC_CALL_STATUS_SIGNAL) 730 out = self.__call(method, *args) 731 logging.debug('Issued call ' + str(method) + 732 ' with async_id ' + str(out)) 733 result = {} 734 try: 735 # __wait_for_specific_signal has a 10s timeout 736 result = utils.poll_for_condition( 737 lambda: self.__wait_for_specific_signal( 738 self.ASYNC_CALL_STATUS_SIGNAL, {'async_id' : out}), 739 timeout=180, 740 desc='matching %s signal' % self.ASYNC_CALL_STATUS_SIGNAL) 741 except utils.TimeoutError as e: 742 logging.error('Cryptohome timed out. Sending ABRT.') 743 crash_cryptohomed() 744 raise ChromiumOSError('cryptohomed aborted. Check crashes!') 745 return result 746 747 748 def mount(self, user, password, create=False, key_label='bar'): 749 """Mounts a cryptohome. 750 751 Returns True if the mount succeeds or False otherwise. 752 """ 753 import rpc_pb2 754 755 acc = rpc_pb2.AccountIdentifier() 756 acc.account_id = user 757 758 auth = rpc_pb2.AuthorizationRequest() 759 auth.key.secret = password 760 auth.key.data.label = key_label 761 762 mount_req = rpc_pb2.MountRequest() 763 if create: 764 mount_req.create.copy_authorization_key = True 765 766 out = self.__call(self.iface.MountEx, acc.SerializeToString(), 767 auth.SerializeToString(), mount_req.SerializeToString()) 768 parsed_out = rpc_pb2.BaseReply() 769 parsed_out.ParseFromString(''.join(map(chr, out))) 770 return parsed_out.error == rpc_pb2.CRYPTOHOME_ERROR_NOT_SET 771 772 773 def unmount(self, user): 774 """Unmounts a cryptohome. 775 776 Returns True if the unmount suceeds or false otherwise. 777 """ 778 import rpc_pb2 779 780 req = rpc_pb2.UnmountRequest() 781 782 out = self.__call(self.iface.UnmountEx, req.SerializeToString()) 783 parsed_out = rpc_pb2.BaseReply() 784 parsed_out.ParseFromString(''.join(map(chr, out))) 785 return parsed_out.error == rpc_pb2.CRYPTOHOME_ERROR_NOT_SET 786 787 788 def is_mounted(self, user): 789 """Tests whether a user's cryptohome is mounted.""" 790 return (utils.is_mountpoint(user_path(user)) 791 and utils.is_mountpoint(system_path(user))) 792 793 794 def require_mounted(self, user): 795 """Raises a test failure if a user's cryptohome is not mounted.""" 796 utils.require_mountpoint(user_path(user)) 797 utils.require_mountpoint(system_path(user)) 798 799 800 def remove(self, user): 801 """Removes a users cryptohome. 802 803 Returns True if the operation succeeds or False otherwise. 804 """ 805 import rpc_pb2 806 807 acc = rpc_pb2.AccountIdentifier() 808 acc.account_id = user 809 810 out = self.__call(self.iface.RemoveEx, acc.SerializeToString()) 811 parsed_out = rpc_pb2.BaseReply() 812 parsed_out.ParseFromString(''.join(map(chr, out))) 813 return parsed_out.error == rpc_pb2.CRYPTOHOME_ERROR_NOT_SET 814 815 816 def ensure_clean_cryptohome_for(self, user, password=None): 817 """Ensure a fresh cryptohome exists for user. 818 819 @param user: user who needs a shiny new cryptohome. 820 @param password: if unset, a random password will be used. 821 """ 822 if not password: 823 password = ''.join(random.sample(string.ascii_lowercase, 6)) 824 self.remove(user) 825 self.mount(user, password, create=True) 826 827 def lock_install_attributes(self, attrs): 828 """Set and lock install attributes for the device. 829 830 @param attrs: dict of install attributes. 831 """ 832 take_tpm_ownership() 833 self.wait_for_install_attributes_ready() 834 for key, value in attrs.items(): 835 if not self.__call(self.iface.InstallAttributesSet, key, 836 dbus.ByteArray(value + '\0')): 837 return False 838 return self.__call(self.iface.InstallAttributesFinalize) 839 840 def wait_for_install_attributes_ready(self): 841 """Wait until install attributes are ready. 842 """ 843 utils.poll_for_condition( 844 lambda: self.__call(self.iface.InstallAttributesIsReady), 845 timeout=300, 846 exception=error.TestError( 847 'Timeout waiting for install attributes are ready')) 848