1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module containing rootfs handler class.""" 5 6import os 7import re 8 9TMP_FILE_NAME = 'kernel_dump' 10 11_KERNEL_MAP = {'A': '2', 'B': '4'} 12_ROOTFS_MAP = {'A': '3', 'B': '5'} 13_DM_DEVICE = 'verifyroot' 14_DM_DEV_PATH = os.path.join('/dev/mapper', _DM_DEVICE) 15 16 17class RootfsHandler(object): 18 """An object to provide ChromeOS root FS related actions. 19 20 It provides functions to verify the integrity of the root FS. 21 22 @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface 23 """ 24 25 def __init__(self, os_if): 26 self.os_if = os_if 27 self.root_dev = None 28 self.kernel_dump_file = None 29 self.initialized = False 30 31 def verify_rootfs(self, section): 32 """Verifies the integrity of the root FS. 33 34 @param section: The rootfs to verify. May be A or B. 35 """ 36 kernel_path = self.os_if.join_part(self.root_dev, 37 _KERNEL_MAP[section.upper()]) 38 rootfs_path = self.os_if.join_part(self.root_dev, 39 _ROOTFS_MAP[section.upper()]) 40 # vbutil_kernel won't operate on a device, only a file. 41 self.os_if.run_shell_command( 42 'dd if=%s of=%s' % (kernel_path, self.kernel_dump_file)) 43 vbutil_kernel = self.os_if.run_shell_command_get_output( 44 'vbutil_kernel --verify %s --verbose' % self.kernel_dump_file) 45 DM_REGEXP = re.compile( 46 r'dm="(?:1 )?vroot none ro(?: 1)?,(0 (\d+) .+)"') 47 match = DM_REGEXP.search('\n'.join(vbutil_kernel)) 48 if not match: 49 return False 50 51 table = match.group(1) 52 partition_size = int(match.group(2)) * 512 53 54 if table.find('PARTUUID=%U/PARTNROFF=1') < 0: 55 return False 56 table = table.replace('PARTUUID=%U/PARTNROFF=1', rootfs_path) 57 # Cause I/O error on invalid bytes 58 table += ' error_behavior=eio' 59 60 self._remove_mapper() 61 assert not self.os_if.path_exists(_DM_DEV_PATH) 62 self.os_if.run_shell_command( 63 "dmsetup create -r %s --table '%s'" % (_DM_DEVICE, table)) 64 assert self.os_if.path_exists(_DM_DEV_PATH) 65 try: 66 count = self.os_if.get_file_size(_DM_DEV_PATH) 67 return count == partition_size 68 except: 69 return False 70 finally: 71 self._remove_mapper() 72 73 def _remove_mapper(self): 74 """Removes the dm device mapper used by this class.""" 75 if self.os_if.path_exists(_DM_DEV_PATH): 76 self.os_if.run_shell_command_get_output( 77 'dmsetup remove %s' % _DM_DEVICE) 78 79 def init(self): 80 """Initialize the rootfs handler object.""" 81 self.root_dev = self.os_if.get_root_dev() 82 self.kernel_dump_file = self.os_if.state_dir_file(TMP_FILE_NAME) 83 self.initialized = True 84