1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module containing kernel handler class used by SAFT.""" 5 6import hashlib 7import os 8import re 9 10TMP_FILE_NAME = 'kernel_header_dump' 11 12# Types of kernel modifications. 13KERNEL_BODY_MOD = 1 14KERNEL_VERSION_MOD = 2 15KERNEL_RESIGN_MOD = 3 16 17 18class KernelHandlerError(Exception): 19 """KernelHandler-specific exception.""" 20 pass 21 22 23class KernelHandler(object): 24 """An object to provide ChromeOS kernel related actions. 25 26 Mostly it allows to corrupt and restore a particular kernel partition 27 (designated by the partition name, A or B. 28 29 @type os_if: autotest_lib.client.cros.faft.utils.os_interface.OSInterface 30 """ 31 32 # This value is used to alter contents of a byte in the appropriate kernel 33 # image. First added to corrupt the image, then subtracted to restore the 34 # image. 35 DELTA = 1 36 37 # The maximum kernel size in MB. 38 KERNEL_SIZE_MB = 16 39 40 def __init__(self, os_if): 41 self.os_if = os_if 42 self.dump_file_name = None 43 self.partition_map = {} 44 self.root_dev = None 45 self.initialized = False 46 47 def _get_version(self, device): 48 """Get version of the kernel hosted on the passed in partition.""" 49 # 16 K should be enough to include headers and keys 50 data = self.os_if.read_partition(device, 0x4000) 51 return self.os_if.retrieve_body_version(data) 52 53 def _get_datakey_version(self, device): 54 """Get datakey version of kernel hosted on the passed in partition.""" 55 # 16 K should be enought to include headers and keys 56 data = self.os_if.read_partition(device, 0x4000) 57 return self.os_if.retrieve_datakey_version(data) 58 59 def _get_partition_map(self, internal_disk=True): 60 """Scan `cgpt show <device> output to find kernel devices. 61 62 Args: 63 internal_disk - decide whether to use internal kernel disk. 64 """ 65 if internal_disk: 66 target_device = self.os_if.get_internal_disk( 67 self.os_if.get_root_part()) 68 else: 69 target_device = self.root_dev 70 71 kernel_partitions = re.compile('KERN-([AB])') 72 disk_map = self.os_if.run_shell_command_get_output( 73 'cgpt show %s' % target_device) 74 75 for line in disk_map: 76 matched_line = kernel_partitions.search(line) 77 if not matched_line: 78 continue 79 label = matched_line.group(1) 80 part_info = {} 81 device = self.os_if.join_part(target_device, line.split()[2]) 82 part_info['device'] = device 83 part_info['version'] = self._get_version(device) 84 part_info['datakey_version'] = self._get_datakey_version(device) 85 self.partition_map[label] = part_info 86 87 def dump_kernel(self, section, kernel_path): 88 """Dump the specified kernel to a file. 89 90 @param section: The kernel to dump. May be A or B. 91 @param kernel_path: The path to the kernel image. 92 """ 93 dev = self.partition_map[section.upper()]['device'] 94 cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path, 95 self.KERNEL_SIZE_MB) 96 self.os_if.run_shell_command(cmd) 97 98 def write_kernel(self, section, kernel_path): 99 """Write a kernel image to the specified section. 100 101 @param section: The kernel to write. May be A or B. 102 @param kernel_path: The path to the kernel image to write. 103 """ 104 dev = self.partition_map[section.upper()]['device'] 105 dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev, 106 self.KERNEL_SIZE_MB) 107 self.os_if.run_shell_command(dd_cmd, modifies_device=True) 108 109 def _modify_kernel(self, 110 section, 111 delta, 112 modification_type=KERNEL_BODY_MOD, 113 key_path=None): 114 """Modify kernel image on a disk partition. 115 116 This method supports three types of kernel modification. KERNEL_BODY_MOD 117 just adds the value of delta to the first byte of the kernel blob. 118 This might leave the kernel corrupted (as required by the test). 119 120 The second type, KERNEL_VERSION_MOD - will use 'delta' as the new 121 version number, it will put it in the kernel header, and then will 122 resign the kernel blob. 123 124 The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in 125 argument key_path. If key_path is None, choose dev_key_path as resign 126 key directory. 127 """ 128 self.dump_kernel(section, self.dump_file_name) 129 data = list(self.os_if.read_file(self.dump_file_name)) 130 if modification_type == KERNEL_BODY_MOD: 131 data[0] = '%c' % ((ord(data[0]) + delta) % 0x100) 132 self.os_if.write_file(self.dump_file_name, ''.join(data)) 133 kernel_to_write = self.dump_file_name 134 elif modification_type == KERNEL_VERSION_MOD: 135 new_version = delta 136 kernel_to_write = self.dump_file_name + '.new' 137 self.os_if.run_shell_command( 138 'vbutil_kernel --repack %s --version %d ' 139 '--signprivate %s --oldblob %s' % 140 (kernel_to_write, new_version, 141 os.path.join(self.dev_key_path, 142 'kernel_data_key.vbprivk'), 143 self.dump_file_name)) 144 elif modification_type == KERNEL_RESIGN_MOD: 145 if key_path and self.os_if.is_dir(key_path): 146 resign_key_path = key_path 147 else: 148 resign_key_path = self.dev_key_path 149 150 kernel_to_write = self.dump_file_name + '.new' 151 self.os_if.run_shell_command( 152 'vbutil_kernel --repack %s ' 153 '--signprivate %s --oldblob %s --keyblock %s' % 154 (kernel_to_write, 155 os.path.join(resign_key_path, 'kernel_data_key.vbprivk'), 156 self.dump_file_name, 157 os.path.join(resign_key_path, 'kernel.keyblock'))) 158 else: 159 return # Unsupported mode, ignore. 160 self.write_kernel(section, kernel_to_write) 161 162 def corrupt_kernel(self, section): 163 """Corrupt a kernel section (add DELTA to the first byte).""" 164 self._modify_kernel(section.upper(), self.DELTA) 165 166 def restore_kernel(self, section): 167 """Restore the previously corrupted kernel.""" 168 self._modify_kernel(section.upper(), -self.DELTA) 169 170 def get_version(self, section): 171 """Return version read from this section blob's header.""" 172 return self.partition_map[section.upper()]['version'] 173 174 def get_datakey_version(self, section): 175 """Return datakey version read from this section blob's header.""" 176 return self.partition_map[section.upper()]['datakey_version'] 177 178 def get_sha(self, section): 179 """Return the SHA1 hash of the section blob.""" 180 s = hashlib.sha1() 181 dev = self.partition_map[section.upper()]['device'] 182 s.update(self.os_if.read_file(dev)) 183 return s.hexdigest() 184 185 def set_version(self, section, version): 186 """Set version of this kernel blob and re-sign it.""" 187 if version < 0: 188 raise KernelHandlerError('Bad version value %d' % version) 189 self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD) 190 191 def resign_kernel(self, section, key_path=None): 192 """Resign kernel with original kernel version and keys in key_path.""" 193 self._modify_kernel(section.upper(), self.get_version(section), 194 KERNEL_RESIGN_MOD, key_path) 195 196 def init(self, dev_key_path='.', internal_disk=True): 197 """Initialize the kernel handler object. 198 199 Input argument is an OS interface object reference. 200 """ 201 self.dev_key_path = dev_key_path 202 self.root_dev = self.os_if.get_root_dev() 203 self.dump_file_name = self.os_if.state_dir_file(TMP_FILE_NAME) 204 self._get_partition_map(internal_disk) 205 self.initialized = True 206