1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A module containing kernel handler class used by SAFT.""" 6 7import hashlib 8import os 9import re 10 11TMP_FILE_NAME = 'kernel_header_dump' 12 13# Types of kernel modifications. 14KERNEL_BODY_MOD = 1 15KERNEL_VERSION_MOD = 2 16KERNEL_RESIGN_MOD = 3 17 18 19class KernelHandlerError(Exception): 20 pass 21 22 23class KernelHandler(object): 24 """An object to provide ChromeOS kernel related actions. 25 26 Mostly it allows to corrupt and restore a particular kernel partition 27 (designated by the partition name, A or B. 28 """ 29 30 # This value is used to alter contents of a byte in the appropriate kernel 31 # image. First added to corrupt the image, then subtracted to restore the 32 # image. 33 DELTA = 1 34 35 # The maximum kernel size in MB. 36 KERNEL_SIZE_MB = 16 37 38 def __init__(self): 39 self.os_if = None 40 self.dump_file_name = None 41 self.partition_map = {} 42 self.root_dev = None 43 44 def _get_version(self, device): 45 """Get version of the kernel hosted on the passed in partition.""" 46 # 16 K should be enough to include headers and keys 47 data = self.os_if.read_partition(device, 0x4000) 48 return self.os_if.retrieve_body_version(data) 49 50 def _get_datakey_version(self,device): 51 """Get datakey version of kernel hosted on the passed in partition.""" 52 # 16 K should be enought to include headers and keys 53 data = self.os_if.read_partition(device, 0x4000) 54 return self.os_if.retrieve_datakey_version(data) 55 56 def _get_partition_map(self, internal_disk=True): 57 """Scan `cgpt show <device> output to find kernel devices. 58 59 Args: 60 internal_disk - decide whether to use internal kernel disk. 61 """ 62 if internal_disk: 63 target_device = self.os_if.get_internal_disk( 64 self.os_if.get_root_part()) 65 else: 66 target_device = self.root_dev 67 68 kernel_partitions = re.compile('KERN-([AB])') 69 disk_map = self.os_if.run_shell_command_get_output( 70 'cgpt show %s' % target_device) 71 72 for line in disk_map: 73 matched_line = kernel_partitions.search(line) 74 if not matched_line: 75 continue 76 label = matched_line.group(1) 77 part_info = {} 78 device = self.os_if.join_part(target_device, line.split()[2]) 79 part_info['device'] = device 80 part_info['version'] = self._get_version(device) 81 part_info['datakey_version'] = self._get_datakey_version(device) 82 self.partition_map[label] = part_info 83 84 def dump_kernel(self, section, kernel_path): 85 """Dump the specified kernel to a file. 86 87 @param section: The kernel to dump. May be A or B. 88 @param kernel_path: The path to the kernel image. 89 """ 90 dev = self.partition_map[section.upper()]['device'] 91 cmd = 'dd if=%s of=%s bs=%dM count=1' % ( 92 dev, kernel_path, self.KERNEL_SIZE_MB) 93 self.os_if.run_shell_command(cmd) 94 95 def write_kernel(self, section, kernel_path): 96 """Write a kernel image to the specified section. 97 98 @param section: The kernel to write. May be A or B. 99 @param kernel_path: The path to the kernel image to write. 100 """ 101 dev = self.partition_map[section.upper()]['device'] 102 cmd = 'dd if=%s of=%s bs=%dM count=1' % ( 103 kernel_path, dev, self.KERNEL_SIZE_MB) 104 self.os_if.run_shell_command(cmd) 105 106 def _modify_kernel(self, section, 107 delta, 108 modification_type=KERNEL_BODY_MOD, 109 key_path=None): 110 """Modify kernel image on a disk partition. 111 112 This method supports three types of kernel modification. KERNEL_BODY_MOD 113 just adds the value of delta to the first byte of the kernel blob. 114 This might leave the kernel corrupted (as required by the test). 115 116 The second type, KERNEL_VERSION_MOD - will use 'delta' as the new 117 version number, it will put it in the kernel header, and then will 118 resign the kernel blob. 119 120 The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in 121 argument key_path. If key_path is None, choose dev_key_path as resign 122 key directory. 123 """ 124 self.dump_kernel(section, self.dump_file_name) 125 data = list(self.os_if.read_file(self.dump_file_name)) 126 if modification_type == KERNEL_BODY_MOD: 127 data[0] = '%c' % ((ord(data[0]) + delta) % 0x100) 128 self.os_if.write_file(self.dump_file_name, ''.join(data)) 129 kernel_to_write = self.dump_file_name 130 elif modification_type == KERNEL_VERSION_MOD: 131 new_version = delta 132 kernel_to_write = self.dump_file_name + '.new' 133 self.os_if.run_shell_command( 134 'vbutil_kernel --repack %s --version %d ' 135 '--signprivate %s --oldblob %s' % ( 136 kernel_to_write, new_version, 137 os.path.join(self.dev_key_path, 'kernel_data_key.vbprivk'), 138 self.dump_file_name)) 139 elif modification_type == KERNEL_RESIGN_MOD: 140 if key_path and self.os_if.is_dir(key_path): 141 resign_key_path = key_path 142 else: 143 resign_key_path = self.dev_key_path 144 145 kernel_to_write = self.dump_file_name + '.new' 146 self.os_if.run_shell_command( 147 'vbutil_kernel --repack %s ' 148 '--signprivate %s --oldblob %s --keyblock %s' % ( 149 kernel_to_write, 150 os.path.join(resign_key_path, 'kernel_data_key.vbprivk'), 151 self.dump_file_name, 152 os.path.join(resign_key_path, 'kernel.keyblock'))) 153 else: 154 return # Unsupported mode, ignore. 155 self.write_kernel(section, kernel_to_write) 156 157 def corrupt_kernel(self, section): 158 """Corrupt a kernel section (add DELTA to the first byte).""" 159 self._modify_kernel(section.upper(), self.DELTA) 160 161 def restore_kernel(self, section): 162 """Restore the previously corrupted kernel.""" 163 self._modify_kernel(section.upper(), -self.DELTA) 164 165 def get_version(self, section): 166 """Return version read from this section blob's header.""" 167 return self.partition_map[section.upper()]['version'] 168 169 def get_datakey_version(self, section): 170 """Return datakey version read from this section blob's header.""" 171 return self.partition_map[section.upper()]['datakey_version'] 172 173 def get_sha(self, section): 174 """Return the SHA1 hash of the section blob.""" 175 s = hashlib.sha1() 176 dev = self.partition_map[section.upper()]['device'] 177 s.update(self.os_if.read_file(dev)) 178 return s.hexdigest() 179 180 def set_version(self, section, version): 181 """Set version of this kernel blob and re-sign it.""" 182 if version < 0: 183 raise KernelHandlerError('Bad version value %d' % version) 184 self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD) 185 186 def resign_kernel(self, section, key_path=None): 187 """Resign kernel with original kernel version and keys in key_path.""" 188 self._modify_kernel(section.upper(), 189 self.get_version(section), 190 KERNEL_RESIGN_MOD, 191 key_path) 192 193 def init(self, os_if, dev_key_path='.', internal_disk=True): 194 """Initialize the kernel handler object. 195 196 Input argument is an OS interface object reference. 197 """ 198 self.os_if = os_if 199 self.dev_key_path = dev_key_path 200 self.root_dev = os_if.get_root_dev() 201 self.dump_file_name = os_if.state_dir_file(TMP_FILE_NAME) 202 self._get_partition_map(internal_disk) 203