1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module containing kernel handler class used by SAFT.""" 5 6import hashlib 7import os 8import re 9 10TMP_FILE_NAME = 'kernel_header_dump' 11 12# Types of kernel modifications. 13KERNEL_BODY_MOD = 1 14KERNEL_VERSION_MOD = 2 15KERNEL_RESIGN_MOD = 3 16 17 18class KernelHandlerError(Exception): 19 """KernelHandler-specific exception.""" 20 pass 21 22 23class KernelHandler(object): 24 """An object to provide ChromeOS kernel related actions. 25 26 Mostly it allows to corrupt and restore a particular kernel partition 27 (designated by the partition name, A or B. 28 """ 29 30 # This value is used to alter contents of a byte in the appropriate kernel 31 # image. First added to corrupt the image, then subtracted to restore the 32 # image. 33 DELTA = 1 34 35 # The maximum kernel size in MB. 36 KERNEL_SIZE_MB = 16 37 38 def __init__(self): 39 self.os_if = None 40 self.dump_file_name = None 41 self.partition_map = {} 42 self.root_dev = None 43 44 def _get_version(self, device): 45 """Get version of the kernel hosted on the passed in partition.""" 46 # 16 K should be enough to include headers and keys 47 data = self.os_if.read_partition(device, 0x4000) 48 return self.os_if.retrieve_body_version(data) 49 50 def _get_datakey_version(self, device): 51 """Get datakey version of kernel hosted on the passed in partition.""" 52 # 16 K should be enought to include headers and keys 53 data = self.os_if.read_partition(device, 0x4000) 54 return self.os_if.retrieve_datakey_version(data) 55 56 def _get_partition_map(self, internal_disk=True): 57 """Scan `cgpt show <device> output to find kernel devices. 58 59 Args: 60 internal_disk - decide whether to use internal kernel disk. 61 """ 62 if internal_disk: 63 target_device = self.os_if.get_internal_disk( 64 self.os_if.get_root_part()) 65 else: 66 target_device = self.root_dev 67 68 kernel_partitions = re.compile('KERN-([AB])') 69 disk_map = self.os_if.run_shell_command_get_output( 70 'cgpt show %s' % target_device) 71 72 for line in disk_map: 73 matched_line = kernel_partitions.search(line) 74 if not matched_line: 75 continue 76 label = matched_line.group(1) 77 part_info = {} 78 device = self.os_if.join_part(target_device, line.split()[2]) 79 part_info['device'] = device 80 part_info['version'] = self._get_version(device) 81 part_info['datakey_version'] = self._get_datakey_version(device) 82 self.partition_map[label] = part_info 83 84 def dump_kernel(self, section, kernel_path): 85 """Dump the specified kernel to a file. 86 87 @param section: The kernel to dump. May be A or B. 88 @param kernel_path: The path to the kernel image. 89 """ 90 dev = self.partition_map[section.upper()]['device'] 91 cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path, 92 self.KERNEL_SIZE_MB) 93 self.os_if.run_shell_command(cmd) 94 95 def write_kernel(self, section, kernel_path): 96 """Write a kernel image to the specified section. 97 98 @param section: The kernel to write. May be A or B. 99 @param kernel_path: The path to the kernel image to write. 100 """ 101 dev = self.partition_map[section.upper()]['device'] 102 dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev, 103 self.KERNEL_SIZE_MB) 104 self.os_if.run_shell_command(dd_cmd, modifies_device=True) 105 106 def _modify_kernel(self, 107 section, 108 delta, 109 modification_type=KERNEL_BODY_MOD, 110 key_path=None): 111 """Modify kernel image on a disk partition. 112 113 This method supports three types of kernel modification. KERNEL_BODY_MOD 114 just adds the value of delta to the first byte of the kernel blob. 115 This might leave the kernel corrupted (as required by the test). 116 117 The second type, KERNEL_VERSION_MOD - will use 'delta' as the new 118 version number, it will put it in the kernel header, and then will 119 resign the kernel blob. 120 121 The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in 122 argument key_path. If key_path is None, choose dev_key_path as resign 123 key directory. 124 """ 125 self.dump_kernel(section, self.dump_file_name) 126 data = list(self.os_if.read_file(self.dump_file_name)) 127 if modification_type == KERNEL_BODY_MOD: 128 data[0] = '%c' % ((ord(data[0]) + delta) % 0x100) 129 self.os_if.write_file(self.dump_file_name, ''.join(data)) 130 kernel_to_write = self.dump_file_name 131 elif modification_type == KERNEL_VERSION_MOD: 132 new_version = delta 133 kernel_to_write = self.dump_file_name + '.new' 134 self.os_if.run_shell_command( 135 'vbutil_kernel --repack %s --version %d ' 136 '--signprivate %s --oldblob %s' % 137 (kernel_to_write, new_version, 138 os.path.join(self.dev_key_path, 139 'kernel_data_key.vbprivk'), 140 self.dump_file_name)) 141 elif modification_type == KERNEL_RESIGN_MOD: 142 if key_path and self.os_if.is_dir(key_path): 143 resign_key_path = key_path 144 else: 145 resign_key_path = self.dev_key_path 146 147 kernel_to_write = self.dump_file_name + '.new' 148 self.os_if.run_shell_command( 149 'vbutil_kernel --repack %s ' 150 '--signprivate %s --oldblob %s --keyblock %s' % 151 (kernel_to_write, 152 os.path.join(resign_key_path, 'kernel_data_key.vbprivk'), 153 self.dump_file_name, 154 os.path.join(resign_key_path, 'kernel.keyblock'))) 155 else: 156 return # Unsupported mode, ignore. 157 self.write_kernel(section, kernel_to_write) 158 159 def corrupt_kernel(self, section): 160 """Corrupt a kernel section (add DELTA to the first byte).""" 161 self._modify_kernel(section.upper(), self.DELTA) 162 163 def restore_kernel(self, section): 164 """Restore the previously corrupted kernel.""" 165 self._modify_kernel(section.upper(), -self.DELTA) 166 167 def get_version(self, section): 168 """Return version read from this section blob's header.""" 169 return self.partition_map[section.upper()]['version'] 170 171 def get_datakey_version(self, section): 172 """Return datakey version read from this section blob's header.""" 173 return self.partition_map[section.upper()]['datakey_version'] 174 175 def get_sha(self, section): 176 """Return the SHA1 hash of the section blob.""" 177 s = hashlib.sha1() 178 dev = self.partition_map[section.upper()]['device'] 179 s.update(self.os_if.read_file(dev)) 180 return s.hexdigest() 181 182 def set_version(self, section, version): 183 """Set version of this kernel blob and re-sign it.""" 184 if version < 0: 185 raise KernelHandlerError('Bad version value %d' % version) 186 self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD) 187 188 def resign_kernel(self, section, key_path=None): 189 """Resign kernel with original kernel version and keys in key_path.""" 190 self._modify_kernel(section.upper(), self.get_version(section), 191 KERNEL_RESIGN_MOD, key_path) 192 193 def init(self, os_if, dev_key_path='.', internal_disk=True): 194 """Initialize the kernel handler object. 195 196 Input argument is an OS interface object reference. 197 """ 198 self.os_if = os_if 199 self.dev_key_path = dev_key_path 200 self.root_dev = os_if.get_root_dev() 201 self.dump_file_name = os_if.state_dir_file(TMP_FILE_NAME) 202 self._get_partition_map(internal_disk) 203