1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging, os, tempfile, shutil, stat, time 7from autotest_lib.client.bin import test, utils 8from autotest_lib.client.common_lib import error 9 10# TODO: 11# - mock out TPM and check all error conditions 12# - test failure when things aren't mounted correctly 13 14class test_checker(object): 15 """ A helper for test result """ 16 17 def __init__(self): 18 logging.info("test_checker.__init__") 19 """ Empty failure list means test passes. """ 20 self._failures = [] 21 22 def _passed(self, msg): 23 logging.info('ok: %s', (msg)) 24 25 def _failed(self, msg): 26 logging.error('FAIL: %s', (msg)) 27 self._failures.append(msg) 28 29 def _fatal(self, msg): 30 logging.error('FATAL: %s', (msg)) 31 raise error.TestError(msg) 32 33 def check(self, boolean, msg, fatal=False): 34 """ Check the result log error """ 35 if boolean == True: 36 self._passed(msg) 37 else: 38 msg = "could not satisfy '%s'" % (msg) 39 if fatal: 40 self._fatal(msg) 41 else: 42 self._failed(msg) 43 44 def test_raise(self): 45 """ Raise a failure if anything unexpected was seen. """ 46 if len(self._failures): 47 raise error.TestFail((", ".join(self._failures))) 48 49chk = test_checker() 50 51 52class EncryptedStateful(object): 53 """ A helper to operate the encrypted stateful. """ 54 55 def _prepare_simulated_root(self): 56 os.makedirs(self.var) 57 os.makedirs(self.chronos) 58 os.makedirs(self.stateful) 59 60 # Build fake stateful block device (emulate 10G sda1). 61 self.stateful_block = os.path.join(self.root, 'stateful.block') 62 utils.system("truncate -s 10G %s" % (self.stateful_block)) 63 utils.system("mkfs.ext4 -F %s" % (self.stateful_block)) 64 utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" % 65 (self.stateful_block, self.stateful)) 66 67 def __init__(self, root=None): 68 if root == None: 69 self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition', 70 prefix='.test-enc-stateful-') 71 self.simulated = True 72 else: 73 self.root = root 74 self.simulated = False 75 76 self.var = os.path.join(self.root, 'var') 77 self.chronos = os.path.join(self.root, 'home', 'chronos') 78 self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition') 79 self.mount_log = os.path.join(self.stateful, 'mount.log') 80 self.key = os.path.join(self.stateful, 'encrypted.key') 81 self.needs_finalization = os.path.join(self.stateful, 82 'encrypted.needs-finalization') 83 self.block = os.path.join(self.stateful, 'encrypted.block') 84 self.encrypted = os.path.join(self.stateful, 'encrypted') 85 86 if self.simulated: 87 try: 88 self._prepare_simulated_root() 89 except: 90 shutil.rmtree(self.root) 91 raise 92 93 self.mounted = not self.simulated 94 95 def mount(self, args=""): 96 """ Mount the encstateful partition """ 97 if self.mounted or not self.simulated: 98 return 99 # TODO(keescook): figure out what is killing the resizer and 100 # remove the explicit use of "tee" here. 101 # Without the pipe to "tee", mount-encrypted's forked resizing 102 # process gets killed, even though it is using daemon(). (Is 103 # autotest doing something odd here?) This leaves the filesystem 104 # unresized. It would be better to have the resizer running in 105 # the background, as it is designed, so we can examine its behavior 106 # during testing (e.g. "does the filesystem actually grow?"). 107 utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted --unsafe " 108 "%s 2>&1 | tee %s" % (self.root, args, self.mount_log)) 109 self.mounted = True 110 111 def umount(self): 112 """ Unmount the encstateful partition """ 113 if not self.mounted or not self.simulated: 114 return 115 utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" % 116 (self.root)) 117 self.mounted = False 118 119 def __del__(self): 120 """ Clean up when destroyed. """ 121 if self.simulated: 122 self.umount() 123 utils.system("umount -n %s" % (self.stateful)) 124 shutil.rmtree(self.root) 125 126 def check_sizes(self, finalized=True): 127 """ 128 Perform common post-mount size/owner checks on the filesystem and 129 backing files. 130 """ 131 # Do we have the expected backing files? 132 chk.check(os.path.exists(self.block), "%s exists" % (self.block)) 133 if finalized: 134 keyfile = self.key 135 other = self.needs_finalization 136 else: 137 keyfile = self.needs_finalization 138 other = self.key 139 chk.check(os.path.exists(keyfile), "%s exists" % (keyfile)) 140 chk.check(not os.path.exists(other), "%s does not exist" % (other)) 141 142 # Check the key file stat. 143 info = os.stat(keyfile) 144 chk.check(stat.S_ISREG(info.st_mode), 145 "%s is regular file" % (keyfile)) 146 chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile)) 147 chk.check(info.st_gid == 0, "%s has group root" % (keyfile)) 148 chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR), 149 "%s is S_IRUSR | S_IWUSR" % (keyfile)) 150 chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile)) 151 152 # Check the block file stat. 153 info = os.stat(self.block) 154 chk.check(stat.S_ISREG(info.st_mode), 155 "%s is regular file" % (self.block)) 156 chk.check(info.st_uid == 0, "%s is owned by root" % (self.block)) 157 chk.check(info.st_gid == 0, "%s has group root" % (self.block)) 158 chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR), 159 "%s is S_IRUSR | S_IWUSR" % (self.block)) 160 # Make sure block file is roughly a third of the size of the root 161 # filesystem (within 5%). 162 top = os.statvfs(self.stateful) 163 backing_size = float(info.st_size) 164 third = top.f_blocks * top.f_frsize * .3 165 chk.check(backing_size > (third * .95) 166 and backing_size < (third * 1.05), 167 "%s is near %d bytes (was %d)" % (self.block, third, 168 info.st_size)) 169 170 # Wait for resize manager task to finish. 171 utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted", 172 ignore_status=True) != 0, 173 error.TestError('resizer still running')) 174 175 # Verify filesystem is within 5% of the block file size. 176 info = os.statvfs(self.encrypted) 177 encrypted_size = float(info.f_frsize) * float(info.f_blocks) 178 chk.check(encrypted_size / backing_size > 0.95, 179 "%s fs (%d) is nearly the backing device size (%d)" % 180 (self.encrypted, encrypted_size, backing_size)) 181 # Verify there is a reasonable number of inodes in the encrypted 182 # filesystem (near 25% inodes-to-blocks ratio). 183 inode_ratio = float(info.f_files) / float(info.f_blocks) 184 chk.check(inode_ratio > 0.20 and inode_ratio < 0.30, 185 "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" % 186 (self.encrypted, inode_ratio*100)) 187 188 # Raise non-fatal failures now, if they were encountered. 189 chk.test_raise() 190 191 # Wait for kernel background writing to finish. 192 def _backing_stabilize(self): 193 start = None 194 size = 0 195 while True: 196 k = int( 197 utils.system_output("du -sk %s" % (self.block), 198 retain_output=True).split()[0]) 199 if start == None: 200 start = k 201 if k == size: 202 # Backing file has remained the same size for 10 seconds. 203 # Assume the kernel is done with background initialization. 204 break 205 time.sleep(10) 206 utils.system("sync") 207 size = k 208 logging.info("%s stabilized at %dK (was %dK)", 209 (self.block, size, start)) 210 211 def check_reclamation(self): 212 """ 213 Check that the backing file reclaims space when filesystem contents 214 are deleted. 215 """ 216 # This test is sensitive to other things happening on the filesystem, 217 # so we must wait for background initialization to finish first. 218 self._backing_stabilize() 219 220 megs = 200 221 data = os.path.join(self.var, "check_reclamation") 222 orig = os.statvfs(self.stateful) 223 224 # 200M file added to encrypted filesystem. 225 utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data, 226 megs)) 227 # Wait for background allocations to finish. 228 self._backing_stabilize() 229 filled = os.statvfs(self.stateful) 230 231 # 200M file removed from encrypted filesystem. 232 utils.system("rm %s; sync" % (data)) 233 # Wait for background hole-punching to finish. 234 self._backing_stabilize() 235 done = os.statvfs(self.stateful) 236 237 # Did the underlying filesystem grow by the size of the test file? 238 file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize) 239 fs_blocks_used = float(orig.f_bfree - filled.f_bfree) 240 chk.check(file_blocks_used / fs_blocks_used > 0.95, 241 "%d file blocks account for most of %d fs blocks" % 242 (file_blocks_used, fs_blocks_used)) 243 244 # Did the underlying filesystem shrink on removal? 245 fs_blocks_done = float(orig.f_bfree - done.f_bfree) 246 chk.check(fs_blocks_done / file_blocks_used < 0.05, 247 "most of %d fs blocks reclaimed (%d fs blocks left over, " 248 "free: %d -> %d -> %d)" % 249 (fs_blocks_used, fs_blocks_done, 250 orig.f_bfree, filled.f_bfree, done.f_bfree)) 251 252 # Raise non-fatal failures now, if they were encountered. 253 chk.test_raise() 254 255 256class platform_EncryptedStateful(test.test): 257 """ Test encrypted stateful partition.""" 258 version = 1 259 260 def is_punch_hole_supported(self): 261 """ 262 With b/80549098, PUNCH_HOLE was disabled for all kernel trees 263 before v4.4. This means that the reclamation check will only work 264 with kernels that support PUNCH_HOLE. 265 """ 266 kernel_ver = os.uname()[2] 267 if utils.compare_versions(kernel_ver, "4.4") < 0 : 268 return False 269 return True 270 271 def existing_partition(self): 272 """ Do a no-write test of system's existing encrypted partition. """ 273 # Examine the existing encrypted partition. 274 encstate = EncryptedStateful("/") 275 276 # Perform post-mount confidence check (and handle unfinalized devices). 277 encstate.check_sizes(finalized=os.path.exists(encstate.key)) 278 279 def no_tpm(self): 280 """ 281 Do a no-write, no-TPM test with confidence checks. Also do a 282 reclamation check against the encrypted stateful partition. 283 """ 284 encstate = EncryptedStateful() 285 286 # Make sure we haven't run here before. 287 chk.check(not os.path.exists(encstate.key), 288 "%s does not exist" % (encstate.key)) 289 chk.check(not os.path.exists(encstate.block), 290 "%s does not exist" % (encstate.block)) 291 292 # Relocate the TPM device during mount. 293 tpm = "/dev/tpm0" 294 off = "%s.off" % (tpm) 295 try: 296 if os.path.exists(tpm): 297 utils.system("mv %s %s" % (tpm, off)) 298 # Mount without a TPM. 299 encstate.mount() 300 finally: 301 if os.path.exists(off): 302 utils.system("mv %s %s" % (off, tpm)) 303 304 # Perform post-mount confidence checks. 305 encstate.check_sizes(finalized=True) 306 307 # Check disk reclamation for kernels that support PUNCH_HOLE. 308 if self.is_punch_hole_supported(): 309 encstate.check_reclamation() 310 311 # Check explicit umount. 312 encstate.umount() 313 314 def run_once(self): 315 """ Primary autotest function. """ 316 # Do a no-write test of system's existing encrypted partition. 317 self.existing_partition() 318 319 # Do a no-write, no-TPM test with confidence checks. Also do a 320 # reclamation check against the encrypted stateful partition. 321 self.no_tpm() 322