• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging, os, tempfile, shutil, stat, time
7from autotest_lib.client.bin import test, utils
8from autotest_lib.client.common_lib import error
9
10# TODO:
11#  - mock out TPM and check all error conditions
12#  - test failure when things aren't mounted correctly
13
14class test_checker(object):
15    """ A helper for test result """
16
17    def __init__(self):
18        logging.info("test_checker.__init__")
19        """ Empty failure list means test passes. """
20        self._failures = []
21
22    def _passed(self, msg):
23        logging.info('ok: %s', (msg))
24
25    def _failed(self, msg):
26        logging.error('FAIL: %s', (msg))
27        self._failures.append(msg)
28
29    def _fatal(self, msg):
30        logging.error('FATAL: %s', (msg))
31        raise error.TestError(msg)
32
33    def check(self, boolean, msg, fatal=False):
34        """ Check the result log error """
35        if boolean == True:
36            self._passed(msg)
37        else:
38            msg = "could not satisfy '%s'" % (msg)
39            if fatal:
40                self._fatal(msg)
41            else:
42                self._failed(msg)
43
44    def test_raise(self):
45        """ Raise a failure if anything unexpected was seen. """
46        if len(self._failures):
47            raise error.TestFail((", ".join(self._failures)))
48
49chk = test_checker()
50
51
52class EncryptedStateful(object):
53    """ A helper to operate the encrypted stateful. """
54
55    def _prepare_simulated_root(self):
56        os.makedirs(self.var)
57        os.makedirs(self.chronos)
58        os.makedirs(self.stateful)
59
60        # Build fake stateful block device (emulate 10G sda1).
61        self.stateful_block = os.path.join(self.root, 'stateful.block')
62        utils.system("truncate -s 10G %s" % (self.stateful_block))
63        utils.system("mkfs.ext4 -F %s" % (self.stateful_block))
64        utils.system("mount -n -t ext4 -o loop,noatime,commit=600 %s %s" %
65                     (self.stateful_block, self.stateful))
66
67    def __init__(self, root=None):
68        if root == None:
69            self.root = tempfile.mkdtemp(dir='/mnt/stateful_partition',
70                                         prefix='.test-enc-stateful-')
71            self.simulated = True
72        else:
73            self.root = root
74            self.simulated = False
75
76        self.var = os.path.join(self.root, 'var')
77        self.chronos = os.path.join(self.root, 'home', 'chronos')
78        self.stateful = os.path.join(self.root, 'mnt', 'stateful_partition')
79        self.mount_log = os.path.join(self.stateful, 'mount.log')
80        self.key = os.path.join(self.stateful, 'encrypted.key')
81        self.needs_finalization = os.path.join(self.stateful,
82                                               'encrypted.needs-finalization')
83        self.block = os.path.join(self.stateful, 'encrypted.block')
84        self.encrypted = os.path.join(self.stateful, 'encrypted')
85
86        if self.simulated:
87            try:
88                self._prepare_simulated_root()
89            except:
90                shutil.rmtree(self.root)
91                raise
92
93        self.mounted = not self.simulated
94
95    def mount(self, args=""):
96        """ Mount the encstateful partition """
97        if self.mounted or not self.simulated:
98            return
99        # TODO(keescook): figure out what is killing the resizer and
100        # remove the explicit use of "tee" here.
101        # Without the pipe to "tee", mount-encrypted's forked resizing
102        # process gets killed, even though it is using daemon(). (Is
103        # autotest doing something odd here?) This leaves the filesystem
104        # unresized. It would be better to have the resizer running in
105        # the background, as it is designed, so we can examine its behavior
106        # during testing (e.g. "does the filesystem actually grow?").
107        utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted --unsafe "
108                     "%s 2>&1 | tee %s" % (self.root, args, self.mount_log))
109        self.mounted = True
110
111    def umount(self):
112        """ Unmount the encstateful partition """
113        if not self.mounted or not self.simulated:
114            return
115        utils.system("MOUNT_ENCRYPTED_ROOT=%s mount-encrypted umount" %
116                         (self.root))
117        self.mounted = False
118
119    def __del__(self):
120        """ Clean up when destroyed. """
121        if self.simulated:
122            self.umount()
123            utils.system("umount -n %s" % (self.stateful))
124            shutil.rmtree(self.root)
125
126    def check_sizes(self, finalized=True):
127        """
128        Perform common post-mount size/owner checks on the filesystem and
129        backing files.
130        """
131        # Do we have the expected backing files?
132        chk.check(os.path.exists(self.block), "%s exists" % (self.block))
133        if finalized:
134            keyfile = self.key
135            other = self.needs_finalization
136        else:
137            keyfile = self.needs_finalization
138            other = self.key
139        chk.check(os.path.exists(keyfile), "%s exists" % (keyfile))
140        chk.check(not os.path.exists(other), "%s does not exist" % (other))
141
142        # Check the key file stat.
143        info = os.stat(keyfile)
144        chk.check(stat.S_ISREG(info.st_mode),
145                  "%s is regular file" % (keyfile))
146        chk.check(info.st_uid == 0, "%s is owned by root" % (keyfile))
147        chk.check(info.st_gid == 0, "%s has group root" % (keyfile))
148        chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
149                  "%s is S_IRUSR | S_IWUSR" % (keyfile))
150        chk.check(info.st_size == 48, "%s is 48 bytes" % (keyfile))
151
152        # Check the block file stat.
153        info = os.stat(self.block)
154        chk.check(stat.S_ISREG(info.st_mode),
155                  "%s is regular file" % (self.block))
156        chk.check(info.st_uid == 0, "%s is owned by root" % (self.block))
157        chk.check(info.st_gid == 0, "%s has group root" % (self.block))
158        chk.check(stat.S_IMODE(info.st_mode) == (stat.S_IRUSR | stat.S_IWUSR),
159                  "%s is S_IRUSR | S_IWUSR" % (self.block))
160        # Make sure block file is roughly a third of the size of the root
161        # filesystem (within 5%).
162        top = os.statvfs(self.stateful)
163        backing_size = float(info.st_size)
164        third = top.f_blocks * top.f_frsize * .3
165        chk.check(backing_size > (third * .95)
166                  and backing_size < (third * 1.05),
167                  "%s is near %d bytes (was %d)" % (self.block, third,
168                                                    info.st_size))
169
170        # Wait for resize manager task to finish.
171        utils.poll_for_condition(lambda: utils.system("pgrep mount-encrypted",
172                                                      ignore_status=True) != 0,
173                                 error.TestError('resizer still running'))
174
175        # Verify filesystem is within 5% of the block file size.
176        info = os.statvfs(self.encrypted)
177        encrypted_size = float(info.f_frsize) * float(info.f_blocks)
178        chk.check(encrypted_size / backing_size > 0.95,
179                  "%s fs (%d) is nearly the backing device size (%d)" %
180                  (self.encrypted, encrypted_size, backing_size))
181        # Verify there is a reasonable number of inodes in the encrypted
182        # filesystem (near 25% inodes-to-blocks ratio).
183        inode_ratio = float(info.f_files) / float(info.f_blocks)
184        chk.check(inode_ratio > 0.20 and inode_ratio < 0.30,
185                  "%s has close to 25%% ratio of inodes-to-blocks (%.2f%%)" %
186                  (self.encrypted, inode_ratio*100))
187
188        # Raise non-fatal failures now, if they were encountered.
189        chk.test_raise()
190
191    # Wait for kernel background writing to finish.
192    def _backing_stabilize(self):
193        start = None
194        size = 0
195        while True:
196            k = int(
197                    utils.system_output("du -sk %s" % (self.block),
198                                        retain_output=True).split()[0])
199            if start == None:
200                start = k
201            if k == size:
202                # Backing file has remained the same size for 10 seconds.
203                # Assume the kernel is done with background initialization.
204                break
205            time.sleep(10)
206            utils.system("sync")
207            size = k
208        logging.info("%s stabilized at %dK (was %dK)",
209                     (self.block, size, start))
210
211    def check_reclamation(self):
212        """
213        Check that the backing file reclaims space when filesystem contents
214        are deleted.
215        """
216        # This test is sensitive to other things happening on the filesystem,
217        # so we must wait for background initialization to finish first.
218        self._backing_stabilize()
219
220        megs = 200
221        data = os.path.join(self.var, "check_reclamation")
222        orig = os.statvfs(self.stateful)
223
224        # 200M file added to encrypted filesystem.
225        utils.system("dd if=/dev/zero of=%s bs=1M count=%s; sync" % (data,
226                                                                     megs))
227        # Wait for background allocations to finish.
228        self._backing_stabilize()
229        filled = os.statvfs(self.stateful)
230
231        # 200M file removed from encrypted filesystem.
232        utils.system("rm %s; sync" % (data))
233        # Wait for background hole-punching to finish.
234        self._backing_stabilize()
235        done = os.statvfs(self.stateful)
236
237        # Did the underlying filesystem grow by the size of the test file?
238        file_blocks_used = float((megs * 1024 * 1024) / orig.f_frsize)
239        fs_blocks_used = float(orig.f_bfree - filled.f_bfree)
240        chk.check(file_blocks_used / fs_blocks_used > 0.95,
241                  "%d file blocks account for most of %d fs blocks" %
242                  (file_blocks_used, fs_blocks_used))
243
244        # Did the underlying filesystem shrink on removal?
245        fs_blocks_done = float(orig.f_bfree - done.f_bfree)
246        chk.check(fs_blocks_done / file_blocks_used < 0.05,
247                  "most of %d fs blocks reclaimed (%d fs blocks left over, "
248                  "free: %d -> %d -> %d)" %
249                  (fs_blocks_used, fs_blocks_done,
250                   orig.f_bfree, filled.f_bfree, done.f_bfree))
251
252        # Raise non-fatal failures now, if they were encountered.
253        chk.test_raise()
254
255
256class platform_EncryptedStateful(test.test):
257    """ Test encrypted stateful partition."""
258    version = 1
259
260    def is_punch_hole_supported(self):
261        """
262        With b/80549098, PUNCH_HOLE was disabled for all kernel trees
263        before v4.4. This means that the reclamation check will only work
264        with kernels that support PUNCH_HOLE.
265        """
266        kernel_ver = os.uname()[2]
267        if utils.compare_versions(kernel_ver, "4.4") < 0 :
268            return False
269        return True
270
271    def existing_partition(self):
272        """ Do a no-write test of system's existing encrypted partition. """
273        # Examine the existing encrypted partition.
274        encstate = EncryptedStateful("/")
275
276        # Perform post-mount confidence check (and handle unfinalized devices).
277        encstate.check_sizes(finalized=os.path.exists(encstate.key))
278
279    def no_tpm(self):
280        """
281        Do a no-write, no-TPM test with confidence checks. Also do a
282        reclamation check against the encrypted stateful partition.
283        """
284        encstate = EncryptedStateful()
285
286        # Make sure we haven't run here before.
287        chk.check(not os.path.exists(encstate.key),
288                  "%s does not exist" % (encstate.key))
289        chk.check(not os.path.exists(encstate.block),
290                  "%s does not exist" % (encstate.block))
291
292        # Relocate the TPM device during mount.
293        tpm = "/dev/tpm0"
294        off = "%s.off" % (tpm)
295        try:
296            if os.path.exists(tpm):
297                utils.system("mv %s %s" % (tpm, off))
298            # Mount without a TPM.
299            encstate.mount()
300        finally:
301            if os.path.exists(off):
302                utils.system("mv %s %s" % (off, tpm))
303
304        # Perform post-mount confidence checks.
305        encstate.check_sizes(finalized=True)
306
307        # Check disk reclamation for kernels that support PUNCH_HOLE.
308        if self.is_punch_hole_supported():
309            encstate.check_reclamation()
310
311        # Check explicit umount.
312        encstate.umount()
313
314    def run_once(self):
315        """ Primary autotest function. """
316        # Do a no-write test of system's existing encrypted partition.
317        self.existing_partition()
318
319        # Do a no-write, no-TPM test with confidence checks. Also do a
320        # reclamation check against the encrypted stateful partition.
321        self.no_tpm()
322