• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Generate QEMU options for Trusty test framework"""
2
3import logging
4import os
5import pathlib
6import shutil
7import subprocess
8import tempfile
9
10from qemu_error import RunnerGenericError
11
12logger = logging.getLogger(__name__)
13
14def _find_dtc():
15    for search_dir in ["out/host/linux-x86/bin", "prebuilts/misc/linux-x86/dtc", "bin", "linux-build/scripts/dtc"]:
16        path = os.path.join(search_dir, "dtc")
17        if os.path.exists(path):
18            return path
19    return None
20
21
22class QemuDrive:
23    def __init__(
24        self,
25        name: str,
26        index: int,
27        dir_name: str,
28        read_only: bool = True,
29        ext: str = ".img",
30    ):
31        self.name = name
32        self.index = index
33        self.read_only = read_only
34        self.ext = ext
35        self.img_path = self.path(pathlib.Path(dir_name))
36        if not os.path.exists(self.img_path):
37            raise RunnerGenericError(
38                f"image cannot be found at {self.img_path}"
39            )
40
41    def index_letter(self):
42        return chr(ord('a') + self.index)
43
44    def path(self, directory: os.PathLike):
45        return os.path.join(directory,f"{self.name}{self.ext}")
46
47    def ensure_image_exists(self, instance_dir: os.PathLike):
48        if self.read_only:
49            return
50
51        path = self.path(instance_dir)
52        if not os.path.exists(path):
53            shutil.copy(self.img_path, path)
54
55    def delete_image_changes(self, instance_dir: os.PathLike):
56        if self.read_only:
57            return
58
59        try:
60            os.remove(self.path(instance_dir))
61        except FileNotFoundError:
62            pass
63
64    def args(self, image_dir: os.PathLike, instance_dir: os.PathLike):
65        path = self.path(image_dir if self.read_only else instance_dir)
66        snapshot = "on" if self.read_only else "off"
67        return [
68            "-drive",
69            (f"file={path},index={self.index},if=none,"
70             + f"id=hd{self.index_letter()},format=raw,snapshot={snapshot}"),
71            "-device",
72            f"virtio-blk-device,drive=hd{self.index_letter()}"
73        ]
74
75class QemuArm64Options(object):
76
77    MACHINE = "virt,secure=on,virtualization=on"
78
79    # -no-reboot is to avoid the logs of trusty starting again when the system
80    # exists uncleanly, e.g., due to a boot or test failure.
81    BASIC_ARGS = [
82        "-nographic", "-no-reboot",
83        "-cpu", "max,sve=off,pauth=off",
84        "-smp", "4",
85        "-m", "1024",
86        "-d", "unimp",
87        "-semihosting-config", "enable=on,target=native",
88        "-machine", "acpi=off",
89    ]
90
91    LINUX_ARGS = (
92        "earlyprintk console=ttyAMA0,38400 keep_bootcon "
93        "root=/dev/ram0 init=/init androidboot.hardware=qemu_trusty "
94        "androidboot.hypervisor.version=1.0.0 "
95        "androidboot.hypervisor.vm.supported=1 "
96        "androidboot.hypervisor.protected_vm.supported=1 "
97        "trusty-log.log_ratelimit_interval=0 trusty-log.log_to_dmesg=always")
98
99    def __init__(self, config, instance_dir):
100        self.args = []
101        self.config = config
102        self.instance_dir = instance_dir
103
104    def create_rpmb_data(self):
105        """If no rpmb data image exists, copy the snapshot to create new one."""
106        os.makedirs(self.instance_dir, exist_ok=True)
107        path = self.rpmb_data_path()
108        if not os.path.exists(path):
109            shutil.copy(self.rpmb_data_snapshot_path(), path)
110
111    def delete_rpmb_data(self):
112        try:
113            os.remove(self.rpmb_data_path())
114        except FileNotFoundError:
115            pass
116
117    def rpmb_data_snapshot_path(self):
118        return f"{self.config.atf}/RPMB_DATA"
119
120    def rpmb_data_path(self):
121        return f"{self.instance_dir}/RPMB_DATA"
122
123    def rpmb_options(self, sock):
124        return [
125            "-device", "virtio-serial",
126            "-device", "virtserialport,chardev=rpmb0,name=rpmb0",
127            "-chardev", f"socket,id=rpmb0,path={sock}"]
128
129    def get_initrd_filename(self):
130        if self.config.initrd and pathlib.Path(self.config.initrd).is_file():
131            return self.config.initrd
132        return self.config.android_image_dir + "/ramdisk.img"
133
134    def initrd_dts(self):
135        file_stats = os.stat(self.get_initrd_filename())
136        start_addr = 0x48000000
137        end_addr = start_addr + file_stats.st_size
138
139        return f"""/ {{
140        chosen {{
141            linux,initrd-start = <0x0 0x{start_addr:08x}>;
142            linux,initrd-end = <0x0 0x{end_addr:08x}>;
143        }};
144    }};
145        """
146
147    def gen_dtb(self, args, dtb_tmp_file):
148        """Computes a trusty device tree, returning a file for it"""
149        dtc = _find_dtc()
150        if dtc is None:
151            raise RunnerGenericError("Could not find dtc tool")
152        with tempfile.NamedTemporaryFile() as dtb_gen:
153            dump_dtb_cmd = [
154                self.config.qemu, "-machine",
155                f"{self.MACHINE},dumpdtb={dtb_gen.name}"
156            ] + [arg for arg in args if arg != "-S"]
157            logger.info("dump dtb command: %s", " ".join(dump_dtb_cmd))
158            returncode = subprocess.call(dump_dtb_cmd)
159            if returncode != 0:
160                raise RunnerGenericError(
161                    f"dumping dtb failed with {returncode}")
162            dtb_to_dts_cmd = [dtc, "-q", "-O", "dts", dtb_gen.name]
163            logger.info("dtb to dts command: %s", " ".join(dtb_to_dts_cmd))
164            # pylint: disable=consider-using-with
165            with subprocess.Popen(dtb_to_dts_cmd,
166                                  stdout=subprocess.PIPE,
167                                  universal_newlines=True) as dtb_to_dts:
168                dts = dtb_to_dts.communicate()[0]
169                if dtb_to_dts.returncode != 0:
170                    raise RunnerGenericError(
171                        f"dtb_to_dts failed with {dtb_to_dts.returncode}")
172
173        dts += self.initrd_dts()
174
175        # Subprocess closes dtb, so we can't allow it to autodelete
176        dtb = dtb_tmp_file
177        dts_to_dtb_cmd = [dtc, "-q", "-O", "dtb"]
178        with subprocess.Popen(dts_to_dtb_cmd,
179                              stdin=subprocess.PIPE,
180                              stdout=dtb,
181                              universal_newlines=True) as dts_to_dtb:
182            dts_to_dtb.communicate(dts)
183            dts_to_dtb_ret = dts_to_dtb.wait()
184
185        if dts_to_dtb_ret:
186            raise RunnerGenericError(f"dts_to_dtb failed with {dts_to_dtb_ret}")
187        return ["-dtb", dtb.name]
188
189    def drives(self) -> list[QemuDrive]:
190        return [
191            QemuDrive("metadata", 3, self.config.atf, read_only=False),
192            QemuDrive(
193                "userdata", 2, self.config.android_image_dir, read_only=False
194            ),
195            QemuDrive("vendor", 1, self.config.android_image_dir),
196            QemuDrive("system", 0, self.config.android_image_dir),
197        ]
198
199    def create_drives_data(self):
200        """If drives images don't exist, create some from their snapshots."""
201        os.makedirs(self.instance_dir, exist_ok=True)
202        for drive in self.drives():
203            drive.ensure_image_exists(self.instance_dir)
204
205    def delete_drives_data(self):
206        for drive in self.drives():
207            drive.delete_image_changes(self.instance_dir)
208
209    def android_drives_args(self):
210        """Generates arguments for mapping all default drives"""
211        args = []
212        # This is order sensitive due to using e.g. root=/dev/vda
213        for drive in self.drives():
214            args += drive.args(self.config.android_image_dir, self.instance_dir)
215        return args
216
217    def machine_options(self):
218        return ["-machine", self.MACHINE]
219
220    def basic_options(self):
221        return list(self.BASIC_ARGS)
222
223    def bios_options(self):
224        return ["-bios", f"{self.config.atf}/bl1.bin"]
225
226    def find_kernel_image(self):
227        if pathlib.Path(self.config.linux).is_file():
228            return self.config.linux
229        image = f"{self.config.linux}/arch/{self.config.linux_arch}/boot/Image"
230        if os.path.exists(image):
231            return image
232        return None
233
234    def linux_options(self):
235        kernel = self.find_kernel_image()
236        if kernel is None:
237            raise RunnerGenericError("Could not find kernel image")
238        return [
239            "-kernel",
240            kernel,
241            "-initrd",
242            self.get_initrd_filename(),
243            "-append", self.LINUX_ARGS
244        ]
245
246    def android_trusty_user_data(self):
247        return os.path.join(self.config.android_image_dir, "data")
248