# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging, os, tempfile, shutil, stat, time from autotest_lib.client.bin import test, utils from autotest_lib.client.common_lib import error # TODO: # - mock out TPM and check all error conditions # - test failure when things aren't mounted correctly class test_checker(object): def __init__(self): logging.info("test_checker.__init__") # Empty failure list means test passes. self._failures = [] def _passed(self, msg): logging.info('ok: %s' % (msg)) def _failed(self, msg): logging.error('FAIL: %s' % (msg)) self._failures.append(msg) def _fatal(self, msg): logging.error('FATAL: %s' % (msg)) raise error.TestError(msg) def check(self, boolean, msg, fatal=False): if boolean == True: self._passed(msg) else: msg = "could not satisfy '%s'" % (msg) if fatal: self._fatal(msg) else: self._failed(msg) def test_raise(self): # Raise a failure if anything unexpected was seen. if len(self._failures): raise error.TestFail((", ".join(self._failures))) chk = test_checker() class EncryptedStateful(object): def _prepare_simulated_root(self): os.makedirs(self.var) os.makedirs(self.chronos) os.makedirs(self.stateful) # Build fake stateful block device (emulate 10G sda1). self.stateful_block = os.path.join(self.root, 'stateful.block') utils.system("truncate -s 10G %s" % (self.stateful_block)) utils.system("mkfs.ext4 -F %s" % (self.stateful_block)) utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" % (self.stateful_block, self.stateful)) def __init__(self, root=None): if root == None: self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition', prefix='.test-enc-stateful-') self.simulated = True else: self.root = root self.simulated = False self.var = os.path.join(self.root, 'var') self.chronos = os.path.join(self.root, 'home', 'chronos') self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition') self.mount_log = os.path.join(self.stateful, 'mount.log') self.key = os.path.join(self.stateful, 'encrypted.key') self.needs_finalization = os.path.join(self.stateful, 'encrypted.needs-finalization') self.block = os.path.join(self.stateful, 'encrypted.block') self.encrypted = os.path.join(self.stateful, 'encrypted') if self.simulated: try: self._prepare_simulated_root() except: shutil.rmtree(self.root) raise self.mounted = not self.simulated def mount(self, args=""): if self.mounted or not self.simulated: return # TODO(keescook): figure out what is killing the resizer and # remove the explicit use of "tee" here. # Without the pipe to "tee", mount-encrypted's forked resizing # process gets killed, even though it is using daemon(). (Is # autotest doing something odd here?) This leaves the filesystem # unresized. It would be better to have the resizer running in # the background, as it is designed, so we can examine its behavior # during testing (e.g. "does the filesystem actually grow?"). utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted %s 2>&1 " "| tee %s" % (self.root, args, self.mount_log)) self.mounted = True def umount(self): if not self.mounted or not self.simulated: return utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" % (self.root)) self.mounted = False # Clean up when destroyed. def __del__(self): if self.simulated: self.umount() utils.system("umount -n %s" % (self.stateful)) shutil.rmtree(self.root) # Perform common post-mount size/owner checks on the filesystem and # backing files. def check_sizes(self, finalized=True): # Do we have the expected backing files? chk.check(os.path.exists(self.block), "%s exists" % (self.block)) if finalized: keyfile = self.key other = self.needs_finalization else: keyfile = self.needs_finalization other = self.key chk.check(os.path.exists(keyfile), "%s exists" % (keyfile)) chk.check(not os.path.exists(other), "%s does not exist" % (other)) # Sanity check the key file stat. info = os.stat(keyfile) chk.check(stat.S_ISREG(info.st_mode), "%s is regular file" % (keyfile)) chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile)) chk.check(info.st_gid == 0, "%s has group root" % (keyfile)) chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR), "%s is S_IRUSR | S_IWUSR" % (keyfile)) chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile)) # Sanity check the block file stat. info = os.stat(self.block) chk.check(stat.S_ISREG(info.st_mode), "%s is regular file" % (self.block)) chk.check(info.st_uid == 0, "%s is owned by root" % (self.block)) chk.check(info.st_gid == 0, "%s has group root" % (self.block)) chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR), "%s is S_IRUSR | S_IWUSR" % (self.block)) # Make sure block file is roughly a third of the size of the root # filesystem (within 5%). top = os.statvfs(self.stateful) backing_size = float(info.st_size) third = top.f_blocks * top.f_frsize * .3 chk.check(backing_size > (third * .95) and backing_size < (third * 1.05), "%s is near %d bytes (was %d)" % (self.block, third, info.st_size)) # Wait for resize manager task to finish. utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted", ignore_status=True) != 0, error.TestError('resizer still running')) # Verify filesystem is within 5% of the block file size. info = os.statvfs(self.encrypted) encrypted_size = float(info.f_frsize) * float(info.f_blocks) chk.check(encrypted_size / backing_size > 0.95, "%s fs (%d) is nearly the backing device size (%d)" % (self.encrypted, encrypted_size, backing_size)) # Verify there is a reasonable number of inodes in the encrypted # filesystem (near 25% inodes-to-blocks ratio). inode_ratio = float(info.f_files) / float(info.f_blocks) chk.check(inode_ratio > 0.20 and inode_ratio < 0.30, "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" % (self.encrypted, inode_ratio*100)) # Raise non-fatal failures now, if they were encountered. chk.test_raise() # Wait for kernel background writing to finish. def _backing_stabilize(self): start = None size = 0 while True: k = long(utils.system_output("du -sk %s" % (self.block), retain_output = True).split()[0]) if start == None: start = k if k == size: # Backing file has remained the same size for 10 seconds. # Assume the kernel is done with background initialization. break time.sleep(10) utils.system("sync") size = k logging.info("%s stabilized at %dK (was %dK)" % (self.block, size, start)) # Check that the backing file reclaims space when filesystem contents # are deleted. def check_reclamation(self): # This test is sensitive to other things happening on the filesystem, # so we must wait for background initialization to finish first. self._backing_stabilize() megs = 200 data = os.path.join(self.var, "check_reclamation") orig = os.statvfs(self.stateful) # 200M file added to encrypted filesystem. utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data, megs)) # Wait for background allocations to finish. self._backing_stabilize() filled = os.statvfs(self.stateful) # 200M file removed from encrypted filesystem. utils.system("rm %s; sync" % (data)) # Wait for background hole-punching to finish. self._backing_stabilize() done = os.statvfs(self.stateful) # Did the underlying filesystem grow by the size of the test file? file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize) fs_blocks_used = float(orig.f_bfree - filled.f_bfree) chk.check(file_blocks_used / fs_blocks_used > 0.95, "%d file blocks account for most of %d fs blocks" % (file_blocks_used, fs_blocks_used)) # Did the underlying filesystem shrink on removal? fs_blocks_done = float(orig.f_bfree - done.f_bfree) chk.check(fs_blocks_done / file_blocks_used < 0.05, "most of %d fs blocks reclaimed (%d fs blocks left over, " "free: %d -> %d -> %d)" % (fs_blocks_used, fs_blocks_done, orig.f_bfree, filled.f_bfree, done.f_bfree)) # Raise non-fatal failures now, if they were encountered. chk.test_raise() class platform_EncryptedStateful(test.test): version = 1 # With b/80549098, PUNCH_HOLE was disabled for all kernel trees # before v4.4. This means that the reclamation check will only work # with kernels that support PUNCH_HOLE. def is_punch_hole_supported(self): kernel_ver = os.uname()[2] if utils.compare_versions(kernel_ver, "4.4") < 0 : return False return True def existing_partition(self): # Examine the existing encrypted partition. encstate = EncryptedStateful("/") # Perform post-mount sanity checks (and handle unfinalized devices). encstate.check_sizes(finalized=os.path.exists(encstate.key)) def no_tpm(self): encstate = EncryptedStateful() # Make sure we haven't run here before. chk.check(not os.path.exists(encstate.key), "%s does not exist" % (encstate.key)) chk.check(not os.path.exists(encstate.block), "%s does not exist" % (encstate.block)) # Relocate the TPM device during mount. tpm = "/dev/tpm0" off = "%s.off" % (tpm) try: if os.path.exists(tpm): utils.system("mv %s %s" % (tpm, off)) # Mount without a TPM. encstate.mount() finally: if os.path.exists(off): utils.system("mv %s %s" % (off, tpm)) # Perform post-mount sanity checks. encstate.check_sizes(finalized=False) # Check disk reclamation for kernels that support PUNCH_HOLE. if self.is_punch_hole_supported(): encstate.check_reclamation() # Check explicit umount. encstate.umount() def run_once(self): # Do a no-write test of system's existing encrypted partition. self.existing_partition() # Do a no-write, no-TPM test with sanity checks. Also do a reclamation # check against the encrypted stateful partition. self.no_tpm()