• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3# Copyright 2024, The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Repacks the ramdisk image to add kernel modules.
18
19Unpacks a ramdisk image, extracts and replaces kernel modules from another
20initramfs image, and repacks the ramdisk.
21"""
22
23import argparse
24import enum
25import itertools
26import logging
27import os
28import pathlib
29import shutil
30import subprocess
31import tempfile
32
33logger = logging.getLogger(__name__)
34
35_ANDROID_RAMDISK_DIR = "android_ramdisk"
36_KERNEL_RAMDISK_DIR = "kernel_ramdisk"
37
38_KNOWN_MODULES_FILES = [
39    "modules.alias",
40    "modules.dep",
41    "modules.softdep",
42    "modules.devname",
43]
44
45def _parse_args():
46    """Parse command-line options."""
47    parser = argparse.ArgumentParser(
48        description='Repacks ramdisk image with modules from --kernel-ramdisk',
49    )
50
51    parser.add_argument(
52        '--android-ramdisk',
53        help='filename of input android ramdisk',
54        required=True)
55    parser.add_argument(
56        '--kernel-ramdisk',
57        help='filename of ramdisk to extract kernel modules from, '
58             'or the path of an existing directory containing the modules',
59        required=True)
60    parser.add_argument(
61        '--output-ramdisk',
62        help='filename of repacked ramdisk',
63        required=True)
64    parser.add_argument(
65        '--override-modules-load',
66        help='replace the modules.load file in ramdisk with an override',
67        required=False)
68    parser.add_argument(
69        '--check-modules-order',
70        action='store_true',
71        help='check all the modules.order.* files and '
72             'discard modules not found in them')
73    parser.add_argument(
74        '--extra-modules-order',
75        action='append',
76        help='names of additional modules.order.* files to check')
77
78    return parser.parse_args()
79
80
81class RamdiskFormat(enum.Enum):
82    """Enum class for different ramdisk compression formats."""
83    LZ4 = 1
84    GZIP = 2
85
86
87# Based on system/tools/mkbootimg/repack_bootimg.py
88class RamdiskImage:
89    """A class that supports packing/unpacking a ramdisk."""
90    def __init__(self, ramdisk_img, directory, allow_dir):
91        # The caller gave us a directory instead of an image
92        # Assume it's already been extracted.
93        if os.path.isdir(ramdisk_img):
94            if not allow_dir:
95                raise RuntimeError(
96                    f"Directory not allowed for image {ramdisk_img}")
97
98            self._ramdisk_img = None
99            self._ramdisk_format = None
100            self._ramdisk_dir = ramdisk_img
101            return
102
103        self._ramdisk_img = ramdisk_img
104        self._ramdisk_format = None
105        self._ramdisk_dir = directory
106
107        self._unpack()
108
109    def _unpack(self):
110        """Unpacks the ramdisk."""
111        # The compression format might be in 'lz4' or 'gzip' format,
112        # trying lz4 first.
113        for compression_type, compression_util in [
114            (RamdiskFormat.LZ4, 'lz4'),
115            (RamdiskFormat.GZIP, 'gzip')]:
116
117            # Command arguments:
118            #   -d: decompression
119            #   -c: write to stdout
120            decompression_cmd = [
121                compression_util, '-d', '-c', self._ramdisk_img]
122
123            decompressed_result = subprocess.run(
124                decompression_cmd, check=False, capture_output=True)
125
126            if decompressed_result.returncode == 0:
127                self._ramdisk_format = compression_type
128                break
129
130        if self._ramdisk_format is not None:
131            # toybox cpio arguments:
132            #   -i: extract files from stdin
133            #   -d: create directories if needed
134            #   -u: override existing files
135            cpio_run = subprocess.run(
136                ['toybox', 'cpio', '-idu'], check=False,
137                input=decompressed_result.stdout, cwd=self._ramdisk_dir,
138                capture_output=True)
139            if (cpio_run.returncode != 0 and
140                b"Operation not permitted" not in cpio_run.stderr):
141                raise RuntimeError(f"cpio failed:\n{cpio_run.stderr}")
142
143            print(f"=== Unpacked ramdisk: '{self._ramdisk_img}' at "
144                  f"'{self._ramdisk_dir}' ===")
145        else:
146            raise RuntimeError('Failed to decompress ramdisk.')
147
148    def repack(self, out_ramdisk_file):
149        """Repacks a ramdisk from self._ramdisk_dir.
150
151        Args:
152            out_ramdisk_file: the output ramdisk file to save.
153        """
154        compression_cmd = ['lz4', '-l', '-12', '--favor-decSpeed']
155        if self._ramdisk_format == RamdiskFormat.GZIP:
156            compression_cmd = ['gzip']
157
158        print('Repacking ramdisk, which might take a few seconds ...')
159
160        mkbootfs_result = subprocess.run(
161            ['mkbootfs', self._ramdisk_dir], check=True, capture_output=True)
162
163        with open(out_ramdisk_file, 'wb') as output_fd:
164            subprocess.run(compression_cmd, check=True,
165                           input=mkbootfs_result.stdout, stdout=output_fd)
166
167        print(f"=== Repacked ramdisk: '{out_ramdisk_file}' ===")
168
169    @property
170    def ramdisk_dir(self):
171        """Returns the internal ramdisk dir."""
172        return self._ramdisk_dir
173
174    def get_modules(self):
175        """Returns the list of modules used in this ramdisk."""
176        modules_file_path = os.path.join(
177            self._ramdisk_dir, "lib/modules/modules.load")
178        with open(modules_file_path, "r", encoding="utf-8") as modules_file:
179            return [line.strip() for line in modules_file]
180
181    def write_modules(self, modules):
182        """Writes the list of modules used in this ramdisk."""
183        modules_file_path = os.path.join(
184            self._ramdisk_dir, "lib/modules/modules.load")
185        with open(modules_file_path, "w", encoding="utf-8") as modules_file:
186            for module in modules:
187                modules_file.write(f"{module}\n")
188
189
190def _load_modules_order(src_dir, extra_modules_order):
191    # Concatenate all the modules.order.* files because out-of-tree
192    # modules get their own order file separate from the one for
193    # the kernel build
194    full_modules_order = set()
195    for (dir_path, subdirs, files) in os.walk(src_dir):
196        for file in files:
197            if file == "modules.order" or file in extra_modules_order:
198                modules_order_path = os.path.join(dir_path, file)
199                with open(modules_order_path, "r", encoding="utf-8") as modules_order:
200                    for line in modules_order:
201                        full_modules_order.add(line.strip())
202
203    return full_modules_order
204
205
206def _replace_modules(dest_ramdisk, src_ramdisk, override_modules_load,
207                     check_modules_order, extra_modules_order):
208    """Replace any modules in dest_ramdisk with modules from src_ramdisk"""
209    src_dir = pathlib.Path(src_ramdisk.ramdisk_dir)
210    dest_dir = os.path.join(dest_ramdisk.ramdisk_dir, "lib/modules")
211
212    # Replace the modules.load file with a new one if the caller gave it
213    if override_modules_load:
214        dest_modules_load = os.path.join(dest_dir, "modules.load")
215        shutil.copy(override_modules_load, dest_modules_load)
216
217        # Update the dependency and alias files as well
218        for (dir_path, subdirs, files) in os.walk(src_dir):
219            for file in files:
220                if file == "modules.load":
221                    raise RuntimeError(
222                        "Unexpected modules.load in kernel build")
223
224                if file in _KNOWN_MODULES_FILES:
225                    src_file = os.path.join(dir_path, file)
226                    dest_file = os.path.join(dest_dir, file)
227                    shutil.copy(src_file, dest_file)
228
229    if check_modules_order:
230        modules_order = _load_modules_order(src_dir, extra_modules_order)
231
232    updated_modules = []
233    for module in dest_ramdisk.get_modules():
234        dest_module = os.path.join(dest_dir, module)
235        matches = list(src_dir.glob(f"**/{module}"))
236        if len(matches) > 1:
237            raise RuntimeError(
238                f"Found multiple candidates for module {module}")
239
240        # Ramdisks produced by Android have all the modules under /lib/modules
241        dest_base_module = os.path.join(dest_dir, os.path.basename(module))
242        if os.path.exists(dest_base_module):
243            os.remove(dest_base_module)
244
245        if len(matches) == 0:
246            logger.warning(
247                "Could not find module %s, deleting this module.",
248                module)
249            if os.path.exists(dest_module):
250                os.remove(dest_module)
251            continue
252
253        if check_modules_order and module not in modules_order:
254            logger.warning(
255                "Module %s not in modules.order, deleting this module.",
256                module)
257            if os.path.exists(dest_module):
258                os.remove(dest_module)
259            continue
260
261        os.makedirs(os.path.dirname(dest_module), exist_ok=True)
262        shutil.copy(matches[0], dest_module)
263        updated_modules.append(module)
264
265    dest_ramdisk.write_modules(updated_modules)
266
267
268def main():
269    """Parse arguments and repack ramdisk image."""
270    args = _parse_args()
271    with tempfile.TemporaryDirectory() as tempdir:
272        android_ramdisk = os.path.join(tempdir, _ANDROID_RAMDISK_DIR)
273        os.mkdir(android_ramdisk)
274        kernel_ramdisk = os.path.join(tempdir, _KERNEL_RAMDISK_DIR)
275        os.mkdir(kernel_ramdisk)
276        android_ramdisk = RamdiskImage(
277            args.android_ramdisk, os.path.join(tempdir, _ANDROID_RAMDISK_DIR),
278            allow_dir=False)
279        kernel_ramdisk = RamdiskImage(
280            args.kernel_ramdisk, os.path.join(tempdir, _KERNEL_RAMDISK_DIR),
281            allow_dir=True)
282        _replace_modules(android_ramdisk, kernel_ramdisk, args.override_modules_load,
283                         args.check_modules_order, args.extra_modules_order)
284        android_ramdisk.repack(args.output_ramdisk)
285
286
287if __name__ == '__main__':
288    main()
289