• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging, os, random, re, shutil, string, time
11
12import common
13
14from autotest_lib.client.cros import constants
15from autotest_lib.client.bin import utils
16from autotest_lib.client.common_lib import error
17from autotest_lib.client.common_lib.cros import tpm_utils
18from autotest_lib.client.cros.tpm import *
19
20ATTESTATION_CMD = '/usr/bin/attestation_client'
21CRYPTOHOME_CMD = '/usr/sbin/cryptohome'
22TPM_MANAGER_CMD = '/usr/bin/tpm_manager_client'
23GUEST_USER_NAME = '$guest'
24UNAVAILABLE_ACTION = 'Unknown action or no action given.'
25MOUNT_RETRY_COUNT = 20
26TEMP_MOUNT_PATTERN = '/home/.shadow/%s/temporary_mount'
27VAULT_PATH_PATTERN = '/home/.shadow/%s/vault'
28
29DBUS_PROTOS_DEP = 'dbus_protos'
30
31LEC_KEY = 'low_entropy_credentials_supported'
32
33
34def get_user_hash(user):
35    """Get the user hash for the given user."""
36    return utils.system_output(['cryptohome', '--action=obfuscate_user',
37                                '--user=%s' % user])
38
39
40def user_path(user):
41    """Get the user mount point for the given user."""
42    return utils.system_output(['cryptohome-path', 'user', user])
43
44
45def system_path(user):
46    """Get the system mount point for the given user."""
47    return utils.system_output(['cryptohome-path', 'system', user])
48
49
50def temporary_mount_path(user):
51    """Get the vault mount path used during crypto-migration for the user.
52
53    @param user: user the temporary mount should be for
54    """
55    return TEMP_MOUNT_PATTERN % (get_user_hash(user))
56
57
58def vault_path(user):
59    """ Get the vault path for the given user.
60
61    @param user: The user who's vault path should be returned.
62    """
63    return VAULT_PATH_PATTERN % (get_user_hash(user))
64
65
66def ensure_clean_cryptohome_for(user, password=None):
67    """Ensure a fresh cryptohome exists for user.
68
69    @param user: user who needs a shiny new cryptohome.
70    @param password: if unset, a random password will be used.
71    """
72    if not password:
73        password = ''.join(random.sample(string.ascii_lowercase, 6))
74    unmount_vault(user)
75    remove_vault(user)
76    mount_vault(user, password, create=True)
77
78
79def get_tpm_password():
80    """Get the TPM password.
81
82    Returns:
83        A TPM password
84    """
85    out = run_cmd(TPM_MANAGER_CMD + ' status')
86    match = re.search('owner_password: (\w*)', out)
87    password = ''
88    if match:
89        hex_pass = match.group(1)
90        password = ''.join(
91                chr(int(hex_pass[i:i + 2], 16))
92                for i in range(0, len(hex_pass), 2))
93    return password
94
95
96def get_fwmp(cleared_fwmp=False):
97    """Get the firmware management parameters.
98
99    Args:
100        cleared_fwmp: True if the space should not exist.
101
102    Returns:
103        The dictionary with the FWMP contents, for example:
104        { 'flags': 0xbb41,
105          'developer_key_hash':
106            "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\
107             000\000\000\000\000\000\000\000\000\000\000",
108        }
109        or a dictionary with the Error if the FWMP doesn't exist and
110        cleared_fwmp is True
111        { 'error': 'CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS_INVALID' }
112
113    Raises:
114         ChromiumOSError if any expected field is not found in the cryptohome
115         output. This would typically happen when FWMP state does not match
116         'cleared_fwmp'
117    """
118    out = run_cmd(CRYPTOHOME_CMD +
119                    ' --action=get_firmware_management_parameters')
120
121    if cleared_fwmp:
122        if tpm_utils.FwmpIsAllZero(out):
123            return {}
124        fields = ['error']
125    else:
126        fields = ['flags', 'developer_key_hash']
127
128    status = {}
129    for field in fields:
130        match = re.search('%s: (\S+)\s' % field, out)
131        if not match:
132            raise ChromiumOSError('Invalid FWMP field %s: "%s".' %
133                                  (field, out))
134        status[field] = match.group(1).strip()
135    return status
136
137
138def set_fwmp(flags, developer_key_hash=None):
139    """Set the firmware management parameter contents.
140
141    Args:
142        developer_key_hash: a string with the developer key hash
143
144    Raises:
145         ChromiumOSError cryptohome cannot set the FWMP contents
146    """
147    cmd = (CRYPTOHOME_CMD +
148          ' --action=set_firmware_management_parameters '
149          '--flags=' + flags)
150    if developer_key_hash:
151        cmd += ' --developer_key_hash=' + developer_key_hash
152
153    out = run_cmd(cmd)
154    if 'SetFirmwareManagementParameters success' not in out:
155        raise ChromiumOSError('failed to set FWMP: %s' % out)
156
157
158def is_tpm_lockout_in_effect():
159    """Returns true if the TPM lockout is in effect; false otherwise."""
160    status = get_tpm_da_info()
161    return status.get('dictionary_attack_lockout_in_effect', None)
162
163
164def get_login_status():
165    """Query the login status
166
167    Returns:
168        A login status dictionary containing:
169        { 'owner_user_exists': True|False }
170    """
171    out = run_cmd(CRYPTOHOME_CMD + ' --action=get_login_status')
172    status = {}
173    for field in ['owner_user_exists']:
174        match = re.search('%s: (true|false)' % field, out)
175        if not match:
176            raise ChromiumOSError('Invalid login status: "%s".' % out)
177        status[field] = match.group(1) == 'true'
178    return status
179
180
181def get_install_attribute_status():
182    """Query the install attribute status
183
184    Returns:
185        A status string, which could be:
186          "UNKNOWN"
187          "TPM_NOT_OWNED"
188          "FIRST_INSTALL"
189          "VALID"
190          "INVALID"
191    """
192    out = run_cmd(CRYPTOHOME_CMD + ' --action=install_attributes_get_status')
193    return out.strip()
194
195
196def lock_install_attributes(attrs):
197    """Set and lock install attributes for the device.
198
199    @param attrs: dict of install attributes.
200    """
201
202    take_tpm_ownership()
203    wait_for_install_attributes_ready()
204    for name, value in attrs.items():
205        args = [
206                CRYPTOHOME_CMD, '--action=install_attributes_set',
207                '--name="%s"' % name,
208                '--value="%s"' % value
209        ]
210        cmd = ' '.join(args)
211        if (utils.system(cmd, ignore_status=True) != 0):
212            return False
213
214    out = run_cmd(CRYPTOHOME_CMD + ' --action=install_attributes_finalize')
215    return (out.strip() == 'InstallAttributesFinalize(): 1')
216
217
218def wait_for_install_attributes_ready():
219    """Wait until install attributes are ready.
220    """
221    cmd = CRYPTOHOME_CMD + ' --action=install_attributes_is_ready'
222    utils.poll_for_condition(
223            lambda: run_cmd(cmd).strip() == 'InstallAttributesIsReady(): 1',
224            timeout=300,
225            exception=error.TestError(
226                    'Timeout waiting for install attributes to be ready'))
227
228
229def get_tpm_attestation_status():
230    """Get the TPM attestation status.  Works similar to get_tpm_status().
231    """
232    out = run_cmd(ATTESTATION_CMD + ' status')
233    status = {}
234    for field in ['prepared_for_enrollment', 'enrolled']:
235        match = re.search('%s: (true|false)' % field, out)
236        if not match:
237            raise ChromiumOSError('Invalid attestation status: "%s".' % out)
238        status[field] = match.group(1) == 'true'
239    return status
240
241
242def take_tpm_ownership(wait_for_ownership=True):
243    """Take TPM ownership.
244
245    Args:
246        wait_for_ownership: block until TPM is owned if true
247    """
248    run_cmd(CRYPTOHOME_CMD + ' --action=tpm_take_ownership')
249    if wait_for_ownership:
250        # Note that waiting for the 'Ready' flag is more correct than waiting
251        # for the 'Owned' flag, as the latter is set by cryptohomed before some
252        # of the ownership tasks are completed.
253        utils.poll_for_condition(
254                lambda: get_tpm_status()['Ready'],
255                timeout=300,
256                exception=error.TestError('Timeout waiting for TPM ownership'))
257
258
259def verify_ek():
260    """Verify the TPM endorsement key.
261
262    Returns true if EK is valid.
263    """
264    cmd = CRYPTOHOME_CMD + ' --action=tpm_verify_ek'
265    return (utils.system(cmd, ignore_status=True) == 0)
266
267
268def remove_vault(user):
269    """Remove the given user's vault from the shadow directory."""
270    logging.debug('user is %s', user)
271    user_hash = get_user_hash(user)
272    logging.debug('Removing vault for user %s with hash %s', user, user_hash)
273    cmd = CRYPTOHOME_CMD + ' --action=remove --force --user=%s' % user
274    run_cmd(cmd)
275    # Ensure that the vault does not exist.
276    if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
277        raise ChromiumOSError('Cryptohome could not remove the user\'s vault.')
278
279
280def remove_all_vaults():
281    """Remove any existing vaults from the shadow directory.
282
283    This function must be run with root privileges.
284    """
285    for item in os.listdir(constants.SHADOW_ROOT):
286        abs_item = os.path.join(constants.SHADOW_ROOT, item)
287        if os.path.isdir(os.path.join(abs_item, 'vault')):
288            logging.debug('Removing vault for user with hash %s', item)
289            shutil.rmtree(abs_item)
290
291
292def mount_vault(user, password, create=False, key_label=None):
293    """Mount the given user's vault. Mounts should be created by calling this
294    function with create=True, and can be used afterwards with create=False.
295    Only try to mount existing vaults created with this function.
296
297    """
298    args = [CRYPTOHOME_CMD, '--action=mount_ex', '--user=%s' % user,
299            '--password=%s' % password, '--async']
300    if create:
301        args += ['--create']
302        if key_label is None:
303            key_label = 'bar'
304    if key_label is not None:
305        args += ['--key_label=%s' % key_label]
306    logging.info(run_cmd(' '.join(args)))
307    # Ensure that the vault exists in the shadow directory.
308    user_hash = get_user_hash(user)
309    if not os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
310        retry = 0
311        mounted = False
312        while retry < MOUNT_RETRY_COUNT and not mounted:
313            time.sleep(1)
314            logging.info("Retry %s", str(retry + 1))
315            run_cmd(' '.join(args))
316            # TODO: Remove this additional call to get_user_hash(user) when
317            # crbug.com/690994 is fixed
318            user_hash = get_user_hash(user)
319            if os.path.exists(os.path.join(constants.SHADOW_ROOT, user_hash)):
320                mounted = True
321            retry += 1
322        if not mounted:
323            raise ChromiumOSError('Cryptohome vault not found after mount.')
324    # Ensure that the vault is mounted.
325    if not is_permanent_vault_mounted(user=user, allow_fail=True):
326        raise ChromiumOSError('Cryptohome created a vault but did not mount.')
327
328
329def mount_guest():
330    """Mount the guest vault."""
331    args = [CRYPTOHOME_CMD, '--action=mount_guest_ex']
332    logging.info(run_cmd(' '.join(args)))
333    # Ensure that the guest vault is mounted.
334    if not is_guest_vault_mounted(allow_fail=True):
335        raise ChromiumOSError('Cryptohome did not mount guest vault.')
336
337
338def test_auth(user, password):
339    """Test key auth."""
340    cmd = [CRYPTOHOME_CMD, '--action=check_key_ex', '--user=%s' % user,
341           '--password=%s' % password, '--async']
342    out = run_cmd(' '.join(cmd))
343    logging.info(out)
344    return 'Key authenticated.' in out
345
346
347def add_le_key(user, password, new_password, new_key_label):
348    """Add low entropy key."""
349    args = [CRYPTOHOME_CMD, '--action=add_key_ex', '--key_policy=le',
350            '--user=%s' % user, '--password=%s' % password,
351            '--new_key_label=%s' % new_key_label,
352            '--new_password=%s' % new_password]
353    logging.info(run_cmd(' '.join(args)))
354
355
356def remove_key(user, password, remove_key_label):
357    """Remove a key."""
358    args = [CRYPTOHOME_CMD, '--action=remove_key_ex', '--user=%s' % user,
359            '--password=%s' % password,
360            '--remove_key_label=%s' % remove_key_label]
361    logging.info(run_cmd(' '.join(args)))
362
363
364def get_supported_key_policies(host=None):
365    """Get supported key policies."""
366    args = [CRYPTOHOME_CMD, '--action=get_supported_key_policies']
367    if host is not None:
368        out = host.run(args).stdout
369    else:
370        out = run_cmd(' '.join(args))
371    logging.info(out)
372    policies = {}
373    for line in out.splitlines():
374        match = re.search('([^:]+): (true|false)', line.strip())
375        if match:
376            policies[match.group(1)] = match.group(2) == 'true'
377    return policies
378
379
380def is_low_entropy_credentials_supported(host=None):
381    """ Returns True if low entropy credentials are supported."""
382    key_policies = get_supported_key_policies(host)
383    return LEC_KEY in key_policies and key_policies[LEC_KEY]
384
385
386def unmount_vault(user=None):
387    """Unmount the given user's vault.
388
389    Once unmounting for a specific user is supported, the user parameter will
390    name the target user. See crosbug.com/20778.
391    """
392    run_cmd(CRYPTOHOME_CMD + ' --action=unmount')
393    # Ensure that the vault is not mounted.
394    if user is not None and is_vault_mounted(user, allow_fail=True):
395        raise ChromiumOSError('Cryptohome did not unmount the user.')
396
397
398def __get_mount_info(mount_point, allow_fail=False):
399    """Get information about the active mount at a given mount point."""
400    cryptohomed_path = '/proc/$(pgrep cryptohomed)/mounts'
401    # 'cryptohome-namespace-mounter' is currently only used for Guest sessions.
402    mounter_exe = 'cryptohome-namespace-mounter'
403    mounter_pid = 'pgrep -o -f %s' % mounter_exe
404    mounter_path = '/proc/$(%s)/mounts' % mounter_pid
405
406    status = utils.system(mounter_pid, ignore_status=True)
407    # Only check for these mounts if the mounter executable is running.
408    if status == 0:
409        try:
410            logging.debug('Active %s mounts:\n' % mounter_exe +
411                          utils.system_output('cat %s' % mounter_path))
412            ns_mount_line = utils.system_output(
413                'grep %s %s' % (mount_point, mounter_path),
414                ignore_status=allow_fail)
415        except Exception as e:
416            logging.error(e)
417            raise ChromiumOSError('Could not get info about cryptohome vault '
418                                  'through %s. See logs for complete '
419                                  'mount-point.'
420                                  % os.path.dirname(str(mount_point)))
421        return ns_mount_line.split()
422
423    try:
424        logging.debug('Active cryptohome mounts:\n%s',
425                      utils.system_output('cat %s' % cryptohomed_path))
426        mount_line = utils.system_output(
427            'grep %s %s' % (mount_point, cryptohomed_path),
428            ignore_status=allow_fail)
429    except Exception as e:
430        logging.error(e)
431        raise ChromiumOSError('Could not get info about cryptohome vault '
432                              'through %s. See logs for complete mount-point.'
433                              % os.path.dirname(str(mount_point)))
434    return mount_line.split()
435
436
437def __get_user_mount_info(user, allow_fail=False):
438    """Get information about the active mounts for a given user.
439
440    Returns the active mounts at the user's user and system mount points. If no
441    user is given, the active mount at the shared mount point is returned
442    (regular users have a bind-mount at this mount point for backwards
443    compatibility; the guest user has a mount at this mount point only).
444    """
445    return [__get_mount_info(mount_point=user_path(user),
446                             allow_fail=allow_fail),
447            __get_mount_info(mount_point=system_path(user),
448                             allow_fail=allow_fail)]
449
450
451def is_vault_mounted(user, regexes=None, allow_fail=False):
452    """Check whether a vault is mounted for the given user.
453
454    user: If no user is given, the shared mount point is checked, determining
455      whether a vault is mounted for any user.
456    regexes: dictionary of regexes to matches against the mount information.
457      The mount filesystem for the user's user and system mounts point must
458      match one of the keys.
459      The mount source point must match the selected device regex.
460
461    In addition, if mounted over ext4, we check the directory is encrypted.
462    """
463    if regexes is None:
464        regexes = {
465            constants.CRYPTOHOME_FS_REGEX_ANY :
466               constants.CRYPTOHOME_DEV_REGEX_ANY
467        }
468    user_mount_info = __get_user_mount_info(user=user, allow_fail=allow_fail)
469    for mount_info in user_mount_info:
470        # Look at each /proc/../mount lines that match mount point for a given
471        # user user/system mount (/home/user/.... /home/root/...)
472
473        # We should have at least 3 arguments (source, mount, type of mount)
474        if len(mount_info) < 3:
475            return False
476
477        device_regex = None
478        for fs_regex in regexes.keys():
479            if re.match(fs_regex, mount_info[2]):
480                device_regex = regexes[fs_regex]
481                break
482
483        if not device_regex:
484            # The third argument in not the expected mount point type.
485            return False
486
487        # Check if the mount source match the device regex: it can be loose,
488        # (anything) or stricter if we expect guest filesystem.
489        if not re.match(device_regex, mount_info[0]):
490            return False
491
492    return True
493
494
495def is_guest_vault_mounted(allow_fail=False):
496    """Check whether a vault is mounted for the guest user.
497       It should be a mount of an ext4 partition on a loop device
498       or be backed by tmpfs.
499    """
500    return is_vault_mounted(
501            user=GUEST_USER_NAME,
502            regexes={
503                    # Remove tmpfs support when it becomes unnecessary as all guest
504                    # modes will use ext4 on a loop device.
505                    constants.CRYPTOHOME_FS_REGEX_EXT4:
506                    constants.CRYPTOHOME_DEV_REGEX_LOOP_DEVICE,
507                    constants.CRYPTOHOME_FS_REGEX_TMPFS:
508                    constants.CRYPTOHOME_DEV_REGEX_GUEST,
509            },
510            allow_fail=allow_fail)
511
512
513def is_permanent_vault_mounted(user, allow_fail=False):
514    """Check if user is mounted over ecryptfs or ext4 crypto. """
515    return is_vault_mounted(
516            user=user,
517            regexes={
518                    constants.CRYPTOHOME_FS_REGEX_ECRYPTFS:
519                    constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW,
520                    constants.CRYPTOHOME_FS_REGEX_EXT4:
521                    constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_DEVICE,
522            },
523            allow_fail=allow_fail)
524
525
526def get_mounted_vault_path(user, allow_fail=False):
527    """Get the path where the decrypted data for the user is located."""
528    return os.path.join(constants.SHADOW_ROOT, get_user_hash(user), 'mount')
529
530
531def canonicalize(credential):
532    """Perform basic canonicalization of |email_address|.
533
534    Perform basic canonicalization of |email_address|, taking into account that
535    gmail does not consider '.' or caps inside a username to matter. It also
536    ignores everything after a '+'. For example,
537    c.masone+abc@gmail.com == cMaSone@gmail.com, per
538    http://mail.google.com/support/bin/answer.py?hl=en&ctx=mail&answer=10313
539    """
540    if not credential:
541        return None
542
543    parts = credential.split('@')
544    if len(parts) != 2:
545        raise error.TestError('Malformed email: ' + credential)
546
547    (name, domain) = parts
548    name = name.partition('+')[0]
549    if (domain == constants.SPECIAL_CASE_DOMAIN):
550        name = name.replace('.', '')
551    return '@'.join([name, domain]).lower()
552
553
554def crash_cryptohomed():
555    """Let cryptohome crash."""
556    # Try to kill cryptohomed so we get something to work with.
557    pid = run_cmd('pgrep cryptohomed')
558    try:
559        pid = int(pid)
560    except ValueError as e:  # empty or invalid string
561        raise error.TestError('Cryptohomed was not running')
562    utils.system('kill -ABRT %d' % pid)
563    # CONT just in case cryptohomed had a spurious STOP.
564    utils.system('kill -CONT %d' % pid)
565    utils.poll_for_condition(
566        lambda: utils.system('ps -p %d' % pid,
567                             ignore_status=True) != 0,
568            timeout=180,
569            exception=error.TestError(
570                'Timeout waiting for cryptohomed to coredump'))
571
572
573def create_ecryptfs_homedir(user, password):
574    """Creates a new home directory as ecryptfs.
575
576    If a home directory for the user exists already, it will be removed.
577    The resulting home directory will be mounted.
578
579    @param user: Username to create the home directory for.
580    @param password: Password to use when creating the home directory.
581    """
582    unmount_vault(user)
583    remove_vault(user)
584    args = [
585            CRYPTOHOME_CMD,
586            '--action=mount_ex',
587            '--user=%s' % user,
588            '--password=%s' % password,
589            '--key_label=foo',
590            '--ecryptfs',
591            '--create']
592    logging.info(run_cmd(' '.join(args)))
593    if not is_vault_mounted(
594            user,
595            regexes={
596                    constants.CRYPTOHOME_FS_REGEX_ECRYPTFS:
597                    constants.CRYPTOHOME_DEV_REGEX_REGULAR_USER_SHADOW
598            },
599            allow_fail=True):
600        raise ChromiumOSError('Ecryptfs home could not be created')
601
602
603def do_dircrypto_migration(user, password, timeout=600):
604    """Start dircrypto migration for the user.
605
606    @param user: The user to migrate.
607    @param password: The password used to mount the users vault
608    @param timeout: How long in seconds to wait for the migration to finish
609    before failing.
610    """
611    unmount_vault(user)
612    args = [
613            CRYPTOHOME_CMD,
614            '--action=mount_ex',
615            '--to_migrate_from_ecryptfs',
616            '--user=%s' % user,
617            '--password=%s' % password]
618    logging.info(run_cmd(' '.join(args)))
619    if not __get_mount_info(temporary_mount_path(user), allow_fail=True):
620        raise ChromiumOSError('Failed to mount home for migration')
621    args = [CRYPTOHOME_CMD, '--action=migrate_to_dircrypto', '--user=%s' % user]
622    logging.info(run_cmd(' '.join(args)))
623    utils.poll_for_condition(
624        lambda: not __get_mount_info(
625                temporary_mount_path(user), allow_fail=True),
626        timeout=timeout,
627        exception=error.TestError(
628                'Timeout waiting for dircrypto migration to finish'))
629
630
631def change_password(user, password, new_password):
632    """Change user password."""
633    args = [
634            CRYPTOHOME_CMD,
635            '--action=migrate_key_ex',
636            '--user=%s' % user,
637            '--old_password=%s' % password,
638            '--password=%s' % new_password]
639    out = run_cmd(' '.join(args))
640    logging.info(out)
641    if 'Key migration succeeded.' not in out:
642        raise ChromiumOSError('Key migration failed.')
643