• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A module containing kernel handler class used by SAFT."""
5
6import hashlib
7import os
8import re
9
10TMP_FILE_NAME = 'kernel_header_dump'
11
12# Types of kernel modifications.
13KERNEL_BODY_MOD = 1
14KERNEL_VERSION_MOD = 2
15KERNEL_RESIGN_MOD = 3
16
17
18class KernelHandlerError(Exception):
19    """KernelHandler-specific exception."""
20    pass
21
22
23class KernelHandler(object):
24    """An object to provide ChromeOS kernel related actions.
25
26    Mostly it allows to corrupt and restore a particular kernel partition
27    (designated by the partition name, A or B.
28    """
29
30    # This value is used to alter contents of a byte in the appropriate kernel
31    # image. First added to corrupt the image, then subtracted to restore the
32    # image.
33    DELTA = 1
34
35    # The maximum kernel size in MB.
36    KERNEL_SIZE_MB = 16
37
38    def __init__(self):
39        self.os_if = None
40        self.dump_file_name = None
41        self.partition_map = {}
42        self.root_dev = None
43
44    def _get_version(self, device):
45        """Get version of the kernel hosted on the passed in partition."""
46        # 16 K should be enough to include headers and keys
47        data = self.os_if.read_partition(device, 0x4000)
48        return self.os_if.retrieve_body_version(data)
49
50    def _get_datakey_version(self, device):
51        """Get datakey version of kernel hosted on the passed in partition."""
52        # 16 K should be enought to include headers and keys
53        data = self.os_if.read_partition(device, 0x4000)
54        return self.os_if.retrieve_datakey_version(data)
55
56    def _get_partition_map(self, internal_disk=True):
57        """Scan `cgpt show <device> output to find kernel devices.
58
59        Args:
60          internal_disk - decide whether to use internal kernel disk.
61        """
62        if internal_disk:
63            target_device = self.os_if.get_internal_disk(
64                    self.os_if.get_root_part())
65        else:
66            target_device = self.root_dev
67
68        kernel_partitions = re.compile('KERN-([AB])')
69        disk_map = self.os_if.run_shell_command_get_output(
70                'cgpt show %s' % target_device)
71
72        for line in disk_map:
73            matched_line = kernel_partitions.search(line)
74            if not matched_line:
75                continue
76            label = matched_line.group(1)
77            part_info = {}
78            device = self.os_if.join_part(target_device, line.split()[2])
79            part_info['device'] = device
80            part_info['version'] = self._get_version(device)
81            part_info['datakey_version'] = self._get_datakey_version(device)
82            self.partition_map[label] = part_info
83
84    def dump_kernel(self, section, kernel_path):
85        """Dump the specified kernel to a file.
86
87        @param section: The kernel to dump. May be A or B.
88        @param kernel_path: The path to the kernel image.
89        """
90        dev = self.partition_map[section.upper()]['device']
91        cmd = 'dd if=%s of=%s bs=%dM count=1' % (dev, kernel_path,
92                                                 self.KERNEL_SIZE_MB)
93        self.os_if.run_shell_command(cmd)
94
95    def write_kernel(self, section, kernel_path):
96        """Write a kernel image to the specified section.
97
98        @param section: The kernel to write. May be A or B.
99        @param kernel_path: The path to the kernel image to write.
100        """
101        dev = self.partition_map[section.upper()]['device']
102        dd_cmd = 'dd if=%s of=%s bs=%dM count=1' % (kernel_path, dev,
103                                                    self.KERNEL_SIZE_MB)
104        self.os_if.run_shell_command(dd_cmd, modifies_device=True)
105
106    def _modify_kernel(self,
107                       section,
108                       delta,
109                       modification_type=KERNEL_BODY_MOD,
110                       key_path=None):
111        """Modify kernel image on a disk partition.
112
113        This method supports three types of kernel modification. KERNEL_BODY_MOD
114        just adds the value of delta to the first byte of the kernel blob.
115        This might leave the kernel corrupted (as required by the test).
116
117        The second type, KERNEL_VERSION_MOD - will use 'delta' as the new
118        version number, it will put it in the kernel header, and then will
119        resign the kernel blob.
120
121        The third type. KERNEL_RESIGN_MOD - will resign the kernel with keys in
122        argument key_path. If key_path is None, choose dev_key_path as resign
123        key directory.
124        """
125        self.dump_kernel(section, self.dump_file_name)
126        data = list(self.os_if.read_file(self.dump_file_name))
127        if modification_type == KERNEL_BODY_MOD:
128            data[0] = '%c' % ((ord(data[0]) + delta) % 0x100)
129            self.os_if.write_file(self.dump_file_name, ''.join(data))
130            kernel_to_write = self.dump_file_name
131        elif modification_type == KERNEL_VERSION_MOD:
132            new_version = delta
133            kernel_to_write = self.dump_file_name + '.new'
134            self.os_if.run_shell_command(
135                    'vbutil_kernel --repack %s --version %d '
136                    '--signprivate %s --oldblob %s' %
137                    (kernel_to_write, new_version,
138                     os.path.join(self.dev_key_path,
139                                  'kernel_data_key.vbprivk'),
140                     self.dump_file_name))
141        elif modification_type == KERNEL_RESIGN_MOD:
142            if key_path and self.os_if.is_dir(key_path):
143                resign_key_path = key_path
144            else:
145                resign_key_path = self.dev_key_path
146
147            kernel_to_write = self.dump_file_name + '.new'
148            self.os_if.run_shell_command(
149                    'vbutil_kernel --repack %s '
150                    '--signprivate %s --oldblob %s --keyblock %s' %
151                    (kernel_to_write,
152                     os.path.join(resign_key_path, 'kernel_data_key.vbprivk'),
153                     self.dump_file_name,
154                     os.path.join(resign_key_path, 'kernel.keyblock')))
155        else:
156            return  # Unsupported mode, ignore.
157        self.write_kernel(section, kernel_to_write)
158
159    def corrupt_kernel(self, section):
160        """Corrupt a kernel section (add DELTA to the first byte)."""
161        self._modify_kernel(section.upper(), self.DELTA)
162
163    def restore_kernel(self, section):
164        """Restore the previously corrupted kernel."""
165        self._modify_kernel(section.upper(), -self.DELTA)
166
167    def get_version(self, section):
168        """Return version read from this section blob's header."""
169        return self.partition_map[section.upper()]['version']
170
171    def get_datakey_version(self, section):
172        """Return datakey version read from this section blob's header."""
173        return self.partition_map[section.upper()]['datakey_version']
174
175    def get_sha(self, section):
176        """Return the SHA1 hash of the section blob."""
177        s = hashlib.sha1()
178        dev = self.partition_map[section.upper()]['device']
179        s.update(self.os_if.read_file(dev))
180        return s.hexdigest()
181
182    def set_version(self, section, version):
183        """Set version of this kernel blob and re-sign it."""
184        if version < 0:
185            raise KernelHandlerError('Bad version value %d' % version)
186        self._modify_kernel(section.upper(), version, KERNEL_VERSION_MOD)
187
188    def resign_kernel(self, section, key_path=None):
189        """Resign kernel with original kernel version and keys in key_path."""
190        self._modify_kernel(section.upper(), self.get_version(section),
191                            KERNEL_RESIGN_MOD, key_path)
192
193    def init(self, os_if, dev_key_path='.', internal_disk=True):
194        """Initialize the kernel handler object.
195
196        Input argument is an OS interface object reference.
197        """
198        self.os_if = os_if
199        self.dev_key_path = dev_key_path
200        self.root_dev = os_if.get_root_dev()
201        self.dump_file_name = os_if.state_dir_file(TMP_FILE_NAME)
202        self._get_partition_map(internal_disk)
203