• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 - The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""OtaTools class."""
15
16import logging
17import os
18import tempfile
19
20from acloud import errors
21from acloud.internal.lib import utils
22
23logger = logging.getLogger(__name__)
24
25_BIN_DIR_NAME = "bin"
26_LPMAKE = "lpmake"
27_BUILD_SUPER_IMAGE = "build_super_image"
28_AVBTOOL = "avbtool"
29_SGDISK = "sgdisk"
30_SIMG2IMG = "simg2img"
31_MK_COMBINED_IMG = "mk_combined_img"
32_UNPACK_BOOTIMG = "unpack_bootimg"
33
34_BUILD_SUPER_IMAGE_TIMEOUT_SECS = 30
35_AVBTOOL_TIMEOUT_SECS = 30
36_MK_COMBINED_IMG_TIMEOUT_SECS = 180
37_UNPACK_BOOTIMG_TIMEOUT_SECS = 30
38
39_MISSING_OTA_TOOLS_MSG = ("%(tool_name)s is not found. Run `make otatools` "
40                          "in build environment, or set --local-tool to an "
41                          "extracted otatools.zip.")
42
43
44def FindOtaToolsDir(search_paths):
45    """Find OTA tools directory in the search paths.
46
47    Args:
48        search_paths: List of paths, the directories to search for OTA tools.
49
50    Returns:
51        The directory containing OTA tools.
52
53    Raises:
54        errors.CheckPathError if OTA tools are not found.
55    """
56    for search_path in search_paths:
57        if os.path.isfile(os.path.join(search_path, _BIN_DIR_NAME,
58                                       _BUILD_SUPER_IMAGE)):
59            return search_path
60    raise errors.CheckPathError(_MISSING_OTA_TOOLS_MSG %
61                                {"tool_name": "OTA tool directory"})
62
63
64def FindOtaTools(search_paths):
65    """Find OTA tools in the search paths.
66
67    Args:
68        search_paths: List of paths, the directories to search for OTA tools.
69
70    Returns:
71        An OtaTools object.
72
73    Raises:
74        errors.CheckPathError if OTA tools are not found.
75    """
76    return OtaTools(FindOtaToolsDir(search_paths))
77
78
79def GetImageForPartition(partition_name, image_dir, **image_paths):
80    """Map a partition name to an image path.
81
82    This function is used with BuildSuperImage or MkCombinedImg to mix
83    image_dir and image_paths into the output file.
84
85    Args:
86        partition_name: String, e.g., "system", "product", and "vendor".
87        image_dir: String, the directory to search for the images that are not
88                   given in image_paths.
89        image_paths: Pairs of partition names and image paths.
90
91    Returns:
92        The image path if the partition is in image_paths.
93        Otherwise, this function returns the path under image_dir.
94
95    Raises
96        errors.GetLocalImageError if the image does not exist.
97    """
98    image_path = (image_paths.get(partition_name) or
99                  os.path.join(image_dir, partition_name + ".img"))
100    if not os.path.isfile(image_path):
101        raise errors.GetLocalImageError(
102            "Cannot find image for partition %s" % partition_name)
103    return image_path
104
105
106class OtaTools:
107    """The class that executes OTA tool commands."""
108
109    def __init__(self, ota_tools_dir):
110        self._ota_tools_dir = os.path.abspath(ota_tools_dir)
111
112    def _GetBinary(self, name):
113        """Get an executable file from _ota_tools_dir.
114
115        Args:
116            name: String, the file name.
117
118        Returns:
119            String, the absolute path.
120
121        Raises:
122            errors.NoExecuteCmd if the file does not exist.
123        """
124        path = os.path.join(self._ota_tools_dir, _BIN_DIR_NAME, name)
125        if not os.path.isfile(path):
126            raise errors.NoExecuteCmd(_MISSING_OTA_TOOLS_MSG %
127                                      {"tool_name": name})
128        utils.SetExecutable(path)
129        return path
130
131    @staticmethod
132    def _RewriteMiscInfo(output_file, input_file, lpmake_path, get_image):
133        """Rewrite lpmake and image paths in misc_info.txt.
134
135        Misc info consists of multiple lines of <key>=<value>.
136        Sample input_file:
137        lpmake=lpmake
138        dynamic_partition_list= system system_ext product vendor
139
140        Sample output_file:
141        lpmake=/path/to/lpmake
142        dynamic_partition_list= system system_ext product vendor
143        system_image=/path/to/system.img
144        system_ext_image=/path/to/system_ext.img
145        product_image=/path/to/product.img
146        vendor_image=/path/to/vendor.img
147
148        This method replaces lpmake with the specified path, and sets
149        *_image for every partition in dynamic_partition_list.
150
151        Args:
152            output_file: The output file object.
153            input_file: The input file object.
154            lpmake_path: The path to lpmake binary.
155            get_image: A function that takes the partition name as the
156                       parameter and returns the image path.
157        """
158        partition_names = ()
159        for line in input_file:
160            split_line = line.strip().split("=", 1)
161            if len(split_line) < 2:
162                split_line = (split_line[0], "")
163            if split_line[0] == "dynamic_partition_list":
164                partition_names = split_line[1].split()
165            elif split_line[0] == "lpmake":
166                output_file.write("lpmake=%s\n" % lpmake_path)
167                continue
168            elif split_line[0].endswith("_image"):
169                continue
170            output_file.write(line)
171
172        if not partition_names:
173            logger.w("No dynamic partition list in misc info.")
174
175        for partition_name in partition_names:
176            output_file.write("%s_image=%s\n" %
177                              (partition_name, get_image(partition_name)))
178
179    @utils.TimeExecute(function_description="Build super image")
180    @utils.TimeoutException(_BUILD_SUPER_IMAGE_TIMEOUT_SECS)
181    def BuildSuperImage(self, output_path, misc_info_path, get_image):
182        """Use build_super_image to create a super image.
183
184        Args:
185            output_path: The path to the output super image.
186            misc_info_path: The path to the misc info that provides parameters
187                            to create the super image.
188            get_image: A function that takes the partition name as the
189                       parameter and returns the image path.
190        """
191        build_super_image = self._GetBinary(_BUILD_SUPER_IMAGE)
192        lpmake = self._GetBinary(_LPMAKE)
193
194        new_misc_info_path = None
195        try:
196            with open(misc_info_path, "r") as misc_info:
197                with tempfile.NamedTemporaryFile(
198                        prefix="misc_info_", suffix=".txt",
199                        delete=False, mode="w") as new_misc_info:
200                    new_misc_info_path = new_misc_info.name
201                    self._RewriteMiscInfo(new_misc_info, misc_info, lpmake,
202                                          get_image)
203
204            utils.Popen(build_super_image, new_misc_info_path, output_path)
205        finally:
206            if new_misc_info_path:
207                os.remove(new_misc_info_path)
208
209    @utils.TimeExecute(function_description="Make disabled vbmeta image.")
210    @utils.TimeoutException(_AVBTOOL_TIMEOUT_SECS)
211    def MakeDisabledVbmetaImage(self, output_path):
212        """Use avbtool to create a vbmeta image with verification disabled.
213
214        Args:
215            output_path: The path to the output vbmeta image.
216        """
217        avbtool = self._GetBinary(_AVBTOOL)
218        utils.Popen(avbtool, "make_vbmeta_image",
219                    "--flag", "2",
220                    "--padding_size", "4096",
221                    "--output", output_path)
222
223    @staticmethod
224    def _RewriteSystemQemuConfig(output_file, input_file, get_image):
225        """Rewrite image paths in system-qemu-config.txt.
226
227        Sample input_file:
228        out/target/product/generic_x86_64/vbmeta.img vbmeta 1
229        out/target/product/generic_x86_64/super.img super 2
230
231        Sample output_file:
232        /path/to/vbmeta.img vbmeta 1
233        /path/to/super.img super 2
234
235        This method replaces the first entry of each line with the path
236        returned by get_image.
237
238        Args:
239            output_file: The output file object.
240            input_file: The input file object.
241            get_image: A function that takes the partition name as the
242                       parameter and returns the image path.
243        """
244        for line in input_file:
245            split_line = line.split()
246            if len(split_line) == 3:
247                output_file.write("%s %s %s\n" % (get_image(split_line[1]),
248                                                  split_line[1],
249                                                  split_line[2]))
250            else:
251                output_file.write(line)
252
253    @utils.TimeExecute(function_description="Make combined image")
254    @utils.TimeoutException(_MK_COMBINED_IMG_TIMEOUT_SECS)
255    def MkCombinedImg(self, output_path, system_qemu_config_path, get_image):
256        """Use mk_combined_img to create a disk image.
257
258        Args:
259            output_path: The path to the output disk image.
260            system_qemu_config: The path to the config that provides the
261                                parition information on the disk.
262            get_image: A function that takes the partition name as the
263                       parameter and returns the image path.
264        """
265        mk_combined_img = self._GetBinary(_MK_COMBINED_IMG)
266        sgdisk = self._GetBinary(_SGDISK)
267        simg2img = self._GetBinary(_SIMG2IMG)
268
269        new_config_path = None
270        try:
271            with open(system_qemu_config_path, "r") as config:
272                with tempfile.NamedTemporaryFile(
273                        prefix="system-qemu-config_", suffix=".txt",
274                        delete=False, mode="w") as new_config:
275                    new_config_path = new_config.name
276                    self._RewriteSystemQemuConfig(new_config, config,
277                                                  get_image)
278
279            mk_combined_img_env = {"SGDISK": sgdisk, "SIMG2IMG": simg2img}
280            utils.Popen(mk_combined_img,
281                        "-i", new_config_path,
282                        "-o", output_path,
283                        env=mk_combined_img_env)
284        finally:
285            if new_config_path:
286                os.remove(new_config_path)
287
288    @utils.TimeExecute(function_description="Unpack boot image")
289    @utils.TimeoutException(_UNPACK_BOOTIMG_TIMEOUT_SECS)
290    def UnpackBootImg(self, out_dir, boot_img):
291        """Use unpack_bootimg to unpack a boot image to a direcotry.
292
293        Args:
294            out_dir: The output directory.
295            boot_img: The path to the boot image.
296        """
297        unpack_bootimg = self._GetBinary(_UNPACK_BOOTIMG)
298        utils.Popen(unpack_bootimg,
299                    "--out", out_dir,
300                    "--boot_img", boot_img)
301