• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""OtaTools class."""
15
16import logging
17import os
18import tempfile
19
20from acloud import errors
21from acloud.internal.lib import utils
22
23logger = logging.getLogger(__name__)
24
25_BIN_DIR_NAME = "bin"
26_LPMAKE = "lpmake"
27_BUILD_SUPER_IMAGE = "build_super_image"
28_AVBTOOL = "avbtool"
29_FSCK_EROFS= "fsck.erofs"
30_LZ4 = "lz4"
31_MKBOOTFS = "mkbootfs"
32_SGDISK = "sgdisk"
33_SIMG2IMG = "simg2img"
34_MK_COMBINED_IMG = "mk_combined_img"
35_UNPACK_BOOTIMG = "unpack_bootimg"
36
37_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 150
38_AVBTOOL_TIMEOUT_SECS = 30
39_FSCK_EROFS_TIMEOUT_SECS = 30
40_LZ4_TIMEOUT_SECS = 30
41_MKBOOTFS_TIMEOUT_SECS = 30
42_MK_COMBINED_IMG_TIMEOUT_SECS = 180
43_UNPACK_BOOTIMG_TIMEOUT_SECS = 30
44
45_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` "
46                          "in build environment, or set --local-tool to an "
47                          "extracted otatools.zip.")
48
49
50def FindOtaToolsDir(search_paths):
51    """Find OTA tools directory in the search paths.
52
53    Args:
54        search_paths: List of paths, the directories to search for OTA tools.
55
56    Returns:
57        The directory containing OTA tools.
58
59    Raises:
60        errors.CheckPathError if OTA tools are not found.
61    """
62    for search_path in search_paths:
63        if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME,
64                                       _BUILD_SUPER_IMAGE)):
65            return search_path
66    raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG %
67                                {"tool_name": "OTA tool directory"})
68
69
70def FindOtaTools(search_paths):
71    """Find OTA tools in the search paths.
72
73    Args:
74        search_paths: List of paths, the directories to search for OTA tools.
75
76    Returns:
77        An OtaTools object.
78
79    Raises:
80        errors.CheckPathError if OTA tools are not found.
81    """
82    return OtaTools(FindOtaToolsDir(search_paths))
83
84
85def GetImageForPartition(partition_name, image_dir, **image_paths):
86    """Map a partition name to an image path.
87
88    This function is used with BuildSuperImage or MkCombinedImg to mix
89    image_dir and image_paths into the output file.
90
91    Args:
92        partition_name: String, e.g., "system", "product", and "vendor".
93        image_dir: String, the directory to search for the images that are not
94                   given in image_paths.
95        image_paths: Pairs of partition names and image paths.
96
97    Returns:
98        The image path if the partition is in image_paths.
99        Otherwise, this function returns the path under image_dir.
100
101    Raises
102        errors.GetLocalImageError if the image does not exist.
103    """
104    image_path = (image_paths.get(partition_name) or
105                  os.path.join(image_dir, partition_name + ".img"))
106    if not os.path.isfile(image_path):
107        raise errors.GetLocalImageError(
108            "Cannot find image for partition %s" % partition_name)
109    return image_path
110
111
112class OtaTools:
113    """The class that executes OTA tool commands."""
114
115    def __init__(self, ota_tools_dir):
116        self._ota_tools_dir = os.path.abspath(ota_tools_dir)
117
118    def _GetBinary(self, name):
119        """Get an executable file from _ota_tools_dir.
120
121        Args:
122            name: String, the file name.
123
124        Returns:
125            String, the absolute path.
126
127        Raises:
128            errors.NoExecuteCmd if the file does not exist.
129        """
130        path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name)
131        if not os.path.isfile(path):
132            raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG %
133                                      {"tool_name": name})
134        utils.SetExecutable(path)
135        return path
136
137    @staticmethod
138    def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image):
139        """Rewrite lpmake and image paths in misc_info.txt.
140
141        Misc info consists of multiple lines of <key>=<value>.
142        Sample input_file:
143        lpmake=lpmake
144        dynamic_partition_list= system system_ext product vendor
145
146        Sample output_file:
147        lpmake=/path/to/lpmake
148        dynamic_partition_list= system system_ext product vendor
149        system_image=/path/to/system.img
150        system_ext_image=/path/to/system_ext.img
151        product_image=/path/to/product.img
152        vendor_image=/path/to/vendor.img
153
154        This method replaces lpmake with the specified path, and sets
155        *_image for every partition in dynamic_partition_list.
156
157        Args:
158            output_file: The output file object.
159            input_file: The input file object.
160            lpmake_path: The path to lpmake binary.
161            get_image: A function that takes the partition name as the
162                       parameter and returns the image path.
163        """
164        partition_names = ()
165        for line in input_file:
166            split_line = line.strip().split("=", 1)
167            if len(split_line) < 2:
168                split_line = (split_line[0], "")
169            if split_line[0] == "dynamic_partition_list":
170                partition_names = split_line[1].split()
171            elif split_line[0] == "lpmake":
172                output_file.write("lpmake=%s\n" % lpmake_path)
173                continue
174            elif split_line[0].endswith("_image"):
175                continue
176            output_file.write(line)
177
178        if not partition_names:
179            logger.w("No dynamic partition list in misc info.")
180
181        for partition_name in partition_names:
182            output_file.write("%s_image=%s\n" %
183                              (partition_name, get_image(partition_name)))
184
185    @utils.TimeExecute(function_description="Build super image")
186    @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS)
187    def BuildSuperImage(self, output_path, misc_info_path, get_image):
188        """Use build_super_image to create a super image.
189
190        Args:
191            output_path: The path to the output super image.
192            misc_info_path: The path to the misc info that provides parameters
193                            to create the super image.
194            get_image: A function that takes the partition name as the
195                       parameter and returns the image path.
196        """
197        build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE)
198        lpmake = self._GetBinary(_LPMAKE)
199
200        new_misc_info_path = None
201        try:
202            with open(misc_info_path, "r") as misc_info:
203                with tempfile.NamedTemporaryFile(
204                        prefix="misc_info_", suffix=".txt",
205                        delete=False, mode="w") as new_misc_info:
206                    new_misc_info_path = new_misc_info.name
207                    self._RewriteMiscInfo(new_misc_info, misc_info, lpmake,
208                                          get_image)
209
210            utils.Popen(build_super_image, new_misc_info_path, output_path)
211        finally:
212            if new_misc_info_path:
213                os.remove(new_misc_info_path)
214
215    @utils.TimeExecute(function_description="Extract EROFS image")
216    @utils.TimeoutException(_FSCK_EROFS_TIMEOUT_SECS)
217    def ExtractErofsImage(self, output_dir, image_path):
218        """Use fsck.erofs to extract an image.
219
220        Args:
221            output_dir: The path to the output files.
222            image_path: The path to the EROFS image.
223        """
224        fsck_erofs = self._GetBinary(_FSCK_EROFS)
225        utils.Popen(fsck_erofs, "--extract=" + output_dir, image_path)
226
227    @utils.TimeExecute(function_description="lz4")
228    @utils.TimeoutException(_LZ4_TIMEOUT_SECS)
229    def Lz4(self, output_path, input_path):
230        """Compress a file into lz4.
231
232        Args:
233            output_path: The path to the output file.
234            input_path: The path to the input file.
235        """
236        lz4 = self._GetBinary(_LZ4)
237        # -l is the legacy format for Linux kernel.
238        utils.Popen(lz4, "-l", "-f", input_path, output_path)
239
240    @utils.TimeExecute(function_description="Make disabled vbmeta image.")
241    @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS)
242    def MakeDisabledVbmetaImage(self, output_path):
243        """Use avbtool to create a vbmeta image with verification disabled.
244
245        Args:
246            output_path: The path to the output vbmeta image.
247        """
248        avbtool = self._GetBinary(_AVBTOOL)
249        utils.Popen(avbtool, "make_vbmeta_image",
250                    "--flag", "2",
251                    "--padding_size", "4096",
252                    "--output", output_path)
253
254    @utils.TimeExecute(function_description="mkbootfs")
255    @utils.TimeoutException(_MKBOOTFS_TIMEOUT_SECS)
256    def MkBootFs(self, output_path, input_dir):
257        """Use mkbootfs to create a cpio file.
258
259         Args:
260             output_path: The path to the output file.
261             input_dir: The path to the input directory.
262         """
263        mkbootfs = self._GetBinary(_MKBOOTFS)
264        with open(output_path, "wb") as output_file:
265            utils.Popen(mkbootfs, input_dir, stdout=output_file)
266
267    @staticmethod
268    def _RewriteSystemQemuConfig(output_file, input_file, get_image):
269        """Rewrite image paths in system-qemu-config.txt.
270
271        Sample input_file:
272        out/target/product/generic_x86_64/vbmeta.img vbmeta 1
273        out/target/product/generic_x86_64/super.img super 2
274
275        Sample output_file:
276        /path/to/vbmeta.img vbmeta 1
277        /path/to/super.img super 2
278
279        This method replaces the first entry of each line with the path
280        returned by get_image.
281
282        Args:
283            output_file: The output file object.
284            input_file: The input file object.
285            get_image: A function that takes the partition name as the
286                       parameter and returns the image path.
287        """
288        for line in input_file:
289            split_line = line.split()
290            if len(split_line) == 3:
291                output_file.write("%s %s %s\n" % (get_image(split_line[1]),
292                                                  split_line[1],
293                                                  split_line[2]))
294            else:
295                output_file.write(line)
296
297    @utils.TimeExecute(function_description="Make combined image")
298    @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS)
299    def MkCombinedImg(self, output_path, system_qemu_config_path, get_image):
300        """Use mk_combined_img to create a disk image.
301
302        Args:
303            output_path: The path to the output disk image.
304            system_qemu_config: The path to the config that provides the
305                                parition information on the disk.
306            get_image: A function that takes the partition name as the
307                       parameter and returns the image path.
308        """
309        mk_combined_img = self._GetBinary(_MK_COMBINED_IMG)
310        sgdisk = self._GetBinary(_SGDISK)
311        simg2img = self._GetBinary(_SIMG2IMG)
312
313        new_config_path = None
314        try:
315            with open(system_qemu_config_path, "r") as config:
316                with tempfile.NamedTemporaryFile(
317                        prefix="system-qemu-config_", suffix=".txt",
318                        delete=False, mode="w") as new_config:
319                    new_config_path = new_config.name
320                    self._RewriteSystemQemuConfig(new_config, config,
321                                                  get_image)
322
323            mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
324            utils.Popen(mk_combined_img,
325                        "-i", new_config_path,
326                        "-o", output_path,
327                        env=mk_combined_img_env)
328        finally:
329            if new_config_path:
330                os.remove(new_config_path)
331
332    @utils.TimeExecute(function_description="Unpack boot image")
333    @utils.TimeoutException(_UNPACK_BOOTIMG_TIMEOUT_SECS)
334    def UnpackBootImg(self, out_dir, boot_img):
335        """Use unpack_bootimg to unpack a boot image to a direcotry.
336
337        Args:
338            out_dir: The output directory.
339            boot_img: The path to the boot image.
340        """
341        unpack_bootimg = self._GetBinary(_UNPACK_BOOTIMG)
342        utils.Popen(unpack_bootimg,
343                    "--out", out_dir,
344                    "--boot_img", boot_img)
345
346    def MixSuperImage(self, super_image, misc_info, image_dir,
347                      system_image=None, system_ext_image=None,
348                      product_image=None, system_dlkm_image=None,
349                      vendor_image=None, vendor_dlkm_image=None,
350                      odm_image=None, odm_dlkm_image=None):
351        """Create mixed super image from device images and given partition
352        images.
353
354        Args:
355            super_image: Path to the output super image.
356            misc_info: Path to the misc_info.txt.
357            image_dir: Path to image files excluding system image.
358            system_image: Path to the system image.
359            system_ext_image: Path to the system_ext image.
360            product_image: Path to the product image.
361            system_dlkm_image: Path to the system_dlkm image.
362            vendor_image: Path to the vendor image.
363            vendor_dlkm_image: Path to the vendor_dlkm image.
364            odm_image: Path to the odm image.
365            odm_dlkm_image: Path to the odm_dlkm image.
366        """
367        self.BuildSuperImage(
368            super_image, misc_info,
369            lambda partition: GetImageForPartition(
370                partition, image_dir,
371                system=system_image,
372                system_ext=system_ext_image,
373                product=product_image,
374                system_dlkm=system_dlkm_image,
375                vendor=vendor_image,
376                vendor_dlkm=vendor_dlkm_image,
377                odm=odm_image,
378                odm_dlkm=odm_dlkm_image))
379